mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-30 22:42:29 +08:00
feat: support GetLastMessage
This commit is contained in:
parent
4fe15e9ec9
commit
3059436c6c
2
go.mod
2
go.mod
@ -12,7 +12,7 @@ require (
|
|||||||
github.com/gorilla/websocket v1.5.1
|
github.com/gorilla/websocket v1.5.1
|
||||||
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
|
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
|
||||||
github.com/mitchellh/mapstructure v1.5.0
|
github.com/mitchellh/mapstructure v1.5.0
|
||||||
github.com/openimsdk/protocol v0.0.72-alpha.69
|
github.com/openimsdk/protocol v0.0.72-alpha.70
|
||||||
github.com/openimsdk/tools v0.0.50-alpha.63
|
github.com/openimsdk/tools v0.0.50-alpha.63
|
||||||
github.com/pkg/errors v0.9.1 // indirect
|
github.com/pkg/errors v0.9.1 // indirect
|
||||||
github.com/prometheus/client_golang v1.18.0
|
github.com/prometheus/client_golang v1.18.0
|
||||||
|
4
go.sum
4
go.sum
@ -347,8 +347,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
|
|||||||
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
|
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
|
||||||
github.com/openimsdk/gomake v0.0.15-alpha.2 h1:5Q8yl8ezy2yx+q8/ucU/t4kJnDfCzNOrkXcDACCqtyM=
|
github.com/openimsdk/gomake v0.0.15-alpha.2 h1:5Q8yl8ezy2yx+q8/ucU/t4kJnDfCzNOrkXcDACCqtyM=
|
||||||
github.com/openimsdk/gomake v0.0.15-alpha.2/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
|
github.com/openimsdk/gomake v0.0.15-alpha.2/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
|
||||||
github.com/openimsdk/protocol v0.0.72-alpha.69 h1:b22oY2XTdBR/BePqA73KsrM3GDF3Vk8YcBEXZU4ArJc=
|
github.com/openimsdk/protocol v0.0.72-alpha.70 h1:j7vB81+rTthijRda2b8tlli9oWvPxr4yXHwZ8nPZIBQ=
|
||||||
github.com/openimsdk/protocol v0.0.72-alpha.69/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M=
|
github.com/openimsdk/protocol v0.0.72-alpha.70/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M=
|
||||||
github.com/openimsdk/tools v0.0.50-alpha.63 h1:dPoVvg4KWqYX/xtK3j96TwX2A/4jwT5S5XIHvSM9hTY=
|
github.com/openimsdk/tools v0.0.50-alpha.63 h1:dPoVvg4KWqYX/xtK3j96TwX2A/4jwT5S5XIHvSM9hTY=
|
||||||
github.com/openimsdk/tools v0.0.50-alpha.63/go.mod h1:B+oqV0zdewN7OiEHYJm+hW+8/Te7B8tHHgD8rK5ZLZk=
|
github.com/openimsdk/tools v0.0.50-alpha.63/go.mod h1:B+oqV0zdewN7OiEHYJm+hW+8/Te7B8tHHgD8rK5ZLZk=
|
||||||
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
|
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
|
||||||
|
@ -242,6 +242,8 @@ func (c *Client) handleMessage(message []byte) error {
|
|||||||
resp, messageErr = c.setAppBackgroundStatus(ctx, binaryReq)
|
resp, messageErr = c.setAppBackgroundStatus(ctx, binaryReq)
|
||||||
case WsSubUserOnlineStatus:
|
case WsSubUserOnlineStatus:
|
||||||
resp, messageErr = c.longConnServer.SubUserOnlineStatus(ctx, c, binaryReq)
|
resp, messageErr = c.longConnServer.SubUserOnlineStatus(ctx, c, binaryReq)
|
||||||
|
case WsPullConvLastMessage:
|
||||||
|
resp, messageErr = c.longConnServer.GetLastMessage(ctx, binaryReq)
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf(
|
return fmt.Errorf(
|
||||||
"ReqIdentifier failed,sendID:%s,msgIncr:%s,reqIdentifier:%d",
|
"ReqIdentifier failed,sendID:%s,msgIncr:%s,reqIdentifier:%d",
|
||||||
|
@ -52,6 +52,7 @@ const (
|
|||||||
WsLogoutMsg = 2003
|
WsLogoutMsg = 2003
|
||||||
WsSetBackgroundStatus = 2004
|
WsSetBackgroundStatus = 2004
|
||||||
WsSubUserOnlineStatus = 2005
|
WsSubUserOnlineStatus = 2005
|
||||||
|
WsPullConvLastMessage = 2006
|
||||||
WSDataError = 3001
|
WSDataError = 3001
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -108,6 +108,7 @@ type MessageHandler interface {
|
|||||||
GetSeqMessage(ctx context.Context, data *Req) ([]byte, error)
|
GetSeqMessage(ctx context.Context, data *Req) ([]byte, error)
|
||||||
UserLogout(ctx context.Context, data *Req) ([]byte, error)
|
UserLogout(ctx context.Context, data *Req) ([]byte, error)
|
||||||
SetUserDeviceBackground(ctx context.Context, data *Req) ([]byte, bool, error)
|
SetUserDeviceBackground(ctx context.Context, data *Req) ([]byte, bool, error)
|
||||||
|
GetLastMessage(ctx context.Context, data *Req) ([]byte, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ MessageHandler = (*GrpcHandler)(nil)
|
var _ MessageHandler = (*GrpcHandler)(nil)
|
||||||
@ -266,3 +267,15 @@ func (g *GrpcHandler) SetUserDeviceBackground(ctx context.Context, data *Req) ([
|
|||||||
}
|
}
|
||||||
return nil, req.IsBackground, nil
|
return nil, req.IsBackground, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (g *GrpcHandler) GetLastMessage(ctx context.Context, data *Req) ([]byte, error) {
|
||||||
|
var req msg.GetLastMessageReq
|
||||||
|
if err := proto.Unmarshal(data.Data, &req); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
resp, err := g.msgClient.GetLastMessage(ctx, &req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return proto.Marshal(resp)
|
||||||
|
}
|
||||||
|
@ -245,3 +245,11 @@ func (m *msgServer) SearchMessage(ctx context.Context, req *msg.SearchMessageReq
|
|||||||
func (m *msgServer) GetServerTime(ctx context.Context, _ *msg.GetServerTimeReq) (*msg.GetServerTimeResp, error) {
|
func (m *msgServer) GetServerTime(ctx context.Context, _ *msg.GetServerTimeReq) (*msg.GetServerTimeResp, error) {
|
||||||
return &msg.GetServerTimeResp{ServerTime: timeutil.GetCurrentTimestampByMill()}, nil
|
return &msg.GetServerTimeResp{ServerTime: timeutil.GetCurrentTimestampByMill()}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *msgServer) GetLastMessage(ctx context.Context, req *msg.GetLastMessageReq) (*msg.GetLastMessageResp, error) {
|
||||||
|
msgs, err := m.MsgDatabase.GetLastMessage(ctx, req.ConversationIDs, req.UserID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &msg.GetLastMessageResp{Msgs: msgs}, nil
|
||||||
|
}
|
||||||
|
@ -97,6 +97,8 @@ type CommonMsgDatabase interface {
|
|||||||
DeleteDoc(ctx context.Context, docID string) error
|
DeleteDoc(ctx context.Context, docID string) error
|
||||||
|
|
||||||
GetLastMessageSeqByTime(ctx context.Context, conversationID string, time int64) (int64, error)
|
GetLastMessageSeqByTime(ctx context.Context, conversationID string, time int64) (int64, error)
|
||||||
|
|
||||||
|
GetLastMessage(ctx context.Context, conversationIDS []string, userID string) (map[string]*sdkws.MsgData, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser cache.SeqUser, seqConversation cache.SeqConversationCache, kafkaConf *config.Kafka) (CommonMsgDatabase, error) {
|
func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser cache.SeqUser, seqConversation cache.SeqConversationCache, kafkaConf *config.Kafka) (CommonMsgDatabase, error) {
|
||||||
@ -811,8 +813,29 @@ func (db *commonMsgDatabase) GetMessageBySeqs(ctx context.Context, conversationI
|
|||||||
if v, ok := seqMsgs[seq]; ok {
|
if v, ok := seqMsgs[seq]; ok {
|
||||||
res = append(res, convert.MsgDB2Pb(v.Msg))
|
res = append(res, convert.MsgDB2Pb(v.Msg))
|
||||||
} else {
|
} else {
|
||||||
res = append(res, &sdkws.MsgData{Seq: seq})
|
res = append(res, &sdkws.MsgData{Seq: seq, Status: constant.MsgStatusHasDeleted})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (db *commonMsgDatabase) GetLastMessage(ctx context.Context, conversationIDs []string, userID string) (map[string]*sdkws.MsgData, error) {
|
||||||
|
res := make(map[string]*sdkws.MsgData)
|
||||||
|
for _, conversationID := range conversationIDs {
|
||||||
|
if _, ok := res[conversationID]; ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
msg, err := db.msgDocDatabase.GetLastMessage(ctx, conversationID)
|
||||||
|
if err != nil {
|
||||||
|
if errs.Unwrap(err) == mongo.ErrNoDocuments {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
tmp := []*model.MsgInfoModel{msg}
|
||||||
|
db.handlerDeleteAndRevoked(ctx, userID, tmp)
|
||||||
|
db.handlerQuote(ctx, userID, conversationID, tmp)
|
||||||
|
res[conversationID] = convert.MsgDB2Pb(msg.Msg)
|
||||||
|
}
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
@ -997,6 +997,68 @@ func (m *MsgMgo) GetLastMessageSeqByTime(ctx context.Context, conversationID str
|
|||||||
return seq, nil
|
return seq, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *MsgMgo) GetLastMessage(ctx context.Context, conversationID string) (*model.MsgInfoModel, error) {
|
||||||
|
pipeline := []bson.M{
|
||||||
|
{
|
||||||
|
"$match": bson.M{
|
||||||
|
"doc_id": bson.M{
|
||||||
|
"$regex": fmt.Sprintf("^%s", conversationID),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"$match": bson.M{
|
||||||
|
"msgs.msg.status": bson.M{
|
||||||
|
"$lt": constant.MsgStatusHasDeleted,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"$sort": bson.M{
|
||||||
|
"_id": -1,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"$limit": 1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"$project": bson.M{
|
||||||
|
"_id": 0,
|
||||||
|
"doc_id": 0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"$unwind": "$msgs",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"$match": bson.M{
|
||||||
|
"msgs.msg.status": bson.M{
|
||||||
|
"$lt": constant.MsgStatusHasDeleted,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"$sort": bson.M{
|
||||||
|
"msgs.msg.seq": -1,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"$limit": 1,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
type Result struct {
|
||||||
|
Msgs *model.MsgInfoModel `bson:"msgs"`
|
||||||
|
}
|
||||||
|
res, err := mongoutil.Aggregate[*Result](ctx, m.coll, pipeline)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if len(res) == 0 {
|
||||||
|
return nil, errs.Wrap(mongo.ErrNoDocuments)
|
||||||
|
}
|
||||||
|
return res[0].Msgs, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (m *MsgMgo) onlyFindDocIndex(ctx context.Context, docID string, indexes []int64) ([]*model.MsgInfoModel, error) {
|
func (m *MsgMgo) onlyFindDocIndex(ctx context.Context, docID string, indexes []int64) ([]*model.MsgInfoModel, error) {
|
||||||
if len(indexes) == 0 {
|
if len(indexes) == 0 {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
|
@ -39,5 +39,6 @@ type Msg interface {
|
|||||||
DeleteDoc(ctx context.Context, docID string) error
|
DeleteDoc(ctx context.Context, docID string) error
|
||||||
GetRandBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error)
|
GetRandBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error)
|
||||||
GetLastMessageSeqByTime(ctx context.Context, conversationID string, time int64) (int64, error)
|
GetLastMessageSeqByTime(ctx context.Context, conversationID string, time int64) (int64, error)
|
||||||
|
GetLastMessage(ctx context.Context, conversationID string) (*model.MsgInfoModel, error)
|
||||||
FindSeqs(ctx context.Context, conversationID string, seqs []int64) ([]*model.MsgInfoModel, error)
|
FindSeqs(ctx context.Context, conversationID string, seqs []int64) ([]*model.MsgInfoModel, error)
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user