diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index f8f207362..4c4cbd907 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -163,11 +163,9 @@ func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, c func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, conversationID string, storageList, notStorageList []*sdkws.MsgData) { och.toPushTopic(ctx, conversationID, notStorageList) if len(storageList) > 0 { - var currentMaxSeq int64 - var err error - if storageList[0].SessionType == constant.SuperGroupChatType { - currentMaxSeq, err = och.msgDatabase.GetGroupMaxSeq(ctx, conversationID) - if err == redis.Nil { + currentMaxSeq, err := och.msgDatabase.GetMaxSeq(ctx, conversationID) + if err == redis.Nil { + if storageList[0].SessionType == constant.SuperGroupChatType { log.ZInfo(ctx, "group chat first create conversation", "conversationID", conversationID) userIDs, err := och.groupRpcClient.GetGroupMemberIDs(ctx, storageList[0].GroupID) if err != nil { @@ -177,11 +175,7 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, con log.ZError(ctx, "single chat first create conversation error", err, "conversationID", conversationID) } } - } - } else { - currentMaxSeq, err = och.msgDatabase.GetUserMaxSeq(ctx, conversationID) - if err == redis.Nil { - log.ZInfo(ctx, "single chat first create conversation", "conversationID", conversationID) + } else { if err := och.conversationRpcClient.SingleChatFirstCreateConversation(ctx, storageList[0].RecvID, storageList[0].SendID); err != nil { log.ZError(ctx, "single chat first create conversation error", err, "conversationID", conversationID) } diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index 3c0682f4e..d43724dba 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -214,3 +214,11 @@ func (c *conversationServer) DelGroupChatConversations(ctx context.Context, req } return &pbConversation.DelGroupChatConversationsResp{}, nil } + +func (c *conversationServer) GetConversationIDs(ctx context.Context, req *pbConversation.GetConversationIDsReq) (*pbConversation.GetConversationIDsResp, error) { + conversationIDs, err := c.conversationDatabase.GetConversationIDs(ctx, req.UserID) + if err != nil { + return nil, err + } + return &pbConversation.GetConversationIDsResp{ConversationIDs: conversationIDs}, nil +} diff --git a/internal/rpc/msg/callback.go b/internal/rpc/msg/callback.go index fa648248c..4d6d70e03 100644 --- a/internal/rpc/msg/callback.go +++ b/internal/rpc/msg/callback.go @@ -2,6 +2,7 @@ package msg import ( "context" + cbapi "github.com/OpenIMSDK/Open-IM-Server/pkg/callbackstruct" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" @@ -113,7 +114,6 @@ func CallbackMsgModify(ctx context.Context, msg *pbChat.SendMsgReq) error { utils.NotNilReplace(&msg.MsgData.Status, resp.Status) utils.NotNilReplace(&msg.MsgData.Options, resp.Options) utils.NotNilReplace(&msg.MsgData.AtUserIDList, resp.AtUserIDList) - utils.NotNilReplace(&msg.MsgData.MsgDataList, resp.MsgDataList) utils.NotNilReplace(&msg.MsgData.AttachedInfo, resp.AttachedInfo) utils.NotNilReplace(&msg.MsgData.Ex, resp.Ex) return nil diff --git a/internal/rpc/msg/delete.go b/internal/rpc/msg/delete.go index bbfda0c85..b7a6e0706 100644 --- a/internal/rpc/msg/delete.go +++ b/internal/rpc/msg/delete.go @@ -20,7 +20,7 @@ func (m *msgServer) DelSuperGroupMsg(ctx context.Context, req *msg.DelSuperGroup if err := tokenverify.CheckAdmin(ctx); err != nil { return nil, err } - if err := m.MsgDatabase.DeleteUserSuperGroupMsgsAndSetMinSeq(ctx, req.GroupID, []string{req.UserID}, 0); err != nil { + if err := m.MsgDatabase.DeleteConversationMsgsAndSetMinSeq(ctx, req.GroupID, []string{req.UserID}, 0); err != nil { return nil, err } return resp, nil diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index 8fc6666cc..b71a33345 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -9,6 +9,7 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/localcache" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/tx" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/unrelation" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/tokenverify" "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry" @@ -16,6 +17,7 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" "github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient" + "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" "google.golang.org/grpc" ) @@ -126,15 +128,23 @@ func (m *msgServer) GetMaxSeq(ctx context.Context, req *sdkws.GetMaxSeqReq) (*sd if err := tokenverify.CheckAccessV3(ctx, req.UserID); err != nil { return nil, err } + conversationIDs, err := m.Conversation.GetConversationIDs(ctx, req.UserID) + if err != nil { + return nil, err + } + seqs, err := m.MsgDatabase.GetMaxSeqs(ctx, conversationIDs) + if err != nil { + log.ZWarn(ctx, "GetMaxSeqs error", err, "conversationIDs", conversationIDs) + } resp := new(sdkws.GetMaxSeqResp) - + resp.MaxSeqs = seqs return resp, nil } func (m *msgServer) PullMessageBySeqs(ctx context.Context, req *sdkws.PullMessageBySeqsReq) (*sdkws.PullMessageBySeqsResp, error) { resp := &sdkws.PullMessageBySeqsResp{} for _, seq := range req.SeqRanges { - if !seq.IsNotification { + if !utils.IsNotification(seq.ConversationID) { msgs, err := m.MsgDatabase.GetMsgBySeqsRange(ctx, seq.ConversationID, seq.Begin, seq.End, seq.Num) if err != nil { return nil, err @@ -158,7 +168,6 @@ func (m *msgServer) PullMessageBySeqs(ctx context.Context, req *sdkws.PullMessag IsNotification: true, }) } - } return resp, nil } diff --git a/internal/tools/msg.go b/internal/tools/msg.go index f39b112d9..3196d6682 100644 --- a/internal/tools/msg.go +++ b/internal/tools/msg.go @@ -78,14 +78,14 @@ func (c *MsgTool) AllUserClearMsgAndFixSeq() { func (c *MsgTool) ClearUsersMsg(ctx context.Context, userIDs []string) { for _, userID := range userIDs { - if err := c.msgDatabase.DeleteUserMsgsAndSetMinSeq(ctx, userID, int64(config.Config.Mongo.DBRetainChatRecords*24*60*60)); err != nil { + if err := c.msgDatabase.DeleteConversationMsgsAndSetMinSeq(ctx, userID, int64(config.Config.Mongo.DBRetainChatRecords*24*60*60)); err != nil { log.ZError(ctx, "DeleteUserMsgsAndSetMinSeq failed", err, "userID", userID, "DBRetainChatRecords", config.Config.Mongo.DBRetainChatRecords) } maxSeqCache, maxSeqMongo, err := c.GetAndFixUserSeqs(ctx, userID) if err != nil { continue } - c.CheckMaxSeqWithMongo(ctx, userID, maxSeqCache, maxSeqMongo, constant.WriteDiffusion) + c.CheckMaxSeqWithMongo(ctx, userID, maxSeqCache, maxSeqMongo) } } @@ -96,7 +96,7 @@ func (c *MsgTool) ClearSuperGroupMsg(ctx context.Context, superGroupIDs []string log.ZError(ctx, "ClearSuperGroupMsg failed", err, "groupID", groupID) continue } - if err := c.msgDatabase.DeleteUserSuperGroupMsgsAndSetMinSeq(ctx, groupID, userIDs, int64(config.Config.Mongo.DBRetainChatRecords*24*60*60)); err != nil { + if err := c.msgDatabase.DeleteConversationMsgsAndSetMinSeq(ctx, groupID, userIDs, int64(config.Config.Mongo.DBRetainChatRecords*24*60*60)); err != nil { log.ZError(ctx, "DeleteUserSuperGroupMsgsAndSetMinSeq failed", err, "groupID", groupID, "userID", userIDs, "DBRetainChatRecords", config.Config.Mongo.DBRetainChatRecords) } if err := c.fixGroupSeq(ctx, groupID, userIDs); err != nil { @@ -167,7 +167,7 @@ func (c *MsgTool) GetAndFixGroupUserSeq(ctx context.Context, userID string, grou return minSeqCache, nil } -func (c *MsgTool) CheckMaxSeqWithMongo(ctx context.Context, conversationID string, maxSeqCache, maxSeqMongo int64, diffusionType int) error { +func (c *MsgTool) CheckMaxSeqWithMongo(ctx context.Context, conversationID string, maxSeqCache, maxSeqMongo int64) error { if math.Abs(float64(maxSeqMongo-maxSeqCache)) > 10 { return errSeq } diff --git a/internal/tools/msg_test.go b/internal/tools/msg_test.go index 3bb09d90c..cfc4e7ad9 100644 --- a/internal/tools/msg_test.go +++ b/internal/tools/msg_test.go @@ -19,8 +19,8 @@ import ( "time" ) -func GenMsgDoc(startSeq, stopSeq, delSeq, index int64, userID string) *unRelationTb.MsgDocModel { - msgDoc := &unRelationTb.MsgDocModel{DocID: userID + strconv.Itoa(int(index))} +func GenMsgDoc(startSeq, stopSeq, delSeq, index int64, conversationID string) *unRelationTb.MsgDocModel { + msgDoc := &unRelationTb.MsgDocModel{DocID: conversationID + strconv.Itoa(int(index))} for i := startSeq; i <= stopSeq; i++ { msg := sdkws.MsgData{ SendID: "sendID1", @@ -54,7 +54,6 @@ func GenMsgDoc(startSeq, stopSeq, delSeq, index int64, userID string) *unRelatio func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) { operationID := "test" - rdb, err := cache.NewRedis() if err != nil { return @@ -65,22 +64,25 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) { } cacheModel := cache.NewMsgCacheModel(rdb) mongoClient := mgo.GetDatabase().Collection(unRelationTb.MsgDocModel{}.TableName()) - ctx := context.Background() ctx = mcontext.SetOperationID(ctx, operationID) + testUID1 := "test_del_id1" - _, err = mongoClient.DeleteOne(ctx, bson.M{"uid": testUID1 + ":" + strconv.Itoa(0)}) + var conversationID string + conversationID = utils.GetConversationIDBySessionType(constant.SuperGroupChatType, testUID1) + _, err = mongoClient.DeleteOne(ctx, bson.M{"doc_id": conversationID + ":" + strconv.Itoa(0)}) if err != nil { t.Error("DeleteOne failed") return } - err = cacheModel.SetUserMaxSeq(ctx, testUID1, 600) + + err = cacheModel.SetMaxSeq(ctx, conversationID, 600) if err != nil { t.Error("SetUserMaxSeq failed") } - msgDoc := GenMsgDoc(1, 600, 200, 0, testUID1) + msgDoc := GenMsgDoc(1, 600, 200, 0, conversationID) if _, err := mongoClient.InsertOne(ctx, msgDoc); err != nil { - t.Error("InsertOne failed", testUID1) + t.Error("InsertOne failed", conversationID) } msgTools, err := InitMsgTool() @@ -88,14 +90,14 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) { t.Error("init failed") return } - msgTools.ClearUsersMsg(ctx, []string{testUID1}) - minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err := msgTools.msgDatabase.GetUserMinMaxSeqInMongoAndCache(ctx, testUID1) + msgTools.ClearUsersMsg(ctx, []string{conversationID}) + minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err := msgTools.msgDatabase.GetConversationMinMaxSeqInMongoAndCache(ctx, conversationID) if err != nil { t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed") return } - if err := msgTools.CheckMaxSeqWithMongo(ctx, testUID1, maxSeqCache, maxSeqMongo, constant.WriteDiffusion); err != nil { - t.Error("checkMaxSeqWithMongo failed", testUID1) + if err := msgTools.CheckMaxSeqWithMongo(ctx, conversationID, maxSeqCache, maxSeqMongo); err != nil { + t.Error("checkMaxSeqWithMongo failed", conversationID) } if minSeqMongo != minSeqCache { t.Error("minSeqMongo != minSeqCache", minSeqMongo, minSeqCache) @@ -107,21 +109,23 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) { /////// uid2 testUID2 := "test_del_id2" - _, err = mongoClient.DeleteOne(ctx, bson.M{"uid": testUID2 + ":" + strconv.Itoa(0)}) + conversationID = utils.GetConversationIDBySessionType(constant.SuperGroupChatType, testUID2) + + _, err = mongoClient.DeleteOne(ctx, bson.M{"uid": conversationID + ":" + strconv.Itoa(0)}) if err != nil { t.Error("delete failed") } - _, err = mongoClient.DeleteOne(ctx, bson.M{"uid": testUID2 + ":" + strconv.Itoa(1)}) + _, err = mongoClient.DeleteOne(ctx, bson.M{"uid": conversationID + ":" + strconv.Itoa(1)}) if err != nil { t.Error("delete failed") } - err = cacheModel.SetUserMaxSeq(ctx, testUID2, 7000) + err = cacheModel.SetMaxSeq(ctx, conversationID, 7000) if err != nil { t.Error("SetUserMaxSeq failed") } - msgDoc = GenMsgDoc(1, 4999, 5000, 0, testUID2) - msgDoc2 := GenMsgDoc(5000, 7000, 6000, 1, testUID2) + msgDoc = GenMsgDoc(1, 4999, 5000, 0, conversationID) + msgDoc2 := GenMsgDoc(5000, 7000, 6000, 1, conversationID) if _, err := mongoClient.InsertOne(ctx, msgDoc); err != nil { t.Error("InsertOne failed", testUID1) } @@ -129,14 +133,14 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) { t.Error("InsertOne failed", testUID1) } - msgTools.ClearUsersMsg(ctx, []string{testUID2}) - minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err = msgTools.msgDatabase.GetUserMinMaxSeqInMongoAndCache(ctx, testUID2) + msgTools.ClearUsersMsg(ctx, []string{conversationID}) + minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err = msgTools.msgDatabase.GetConversationMinMaxSeqInMongoAndCache(ctx, conversationID) if err != nil { t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed") return } - if err := msgTools.CheckMaxSeqWithMongo(ctx, testUID2, maxSeqCache, maxSeqMongo, constant.WriteDiffusion); err != nil { - t.Error("checkMaxSeqWithMongo failed", testUID2) + if err := msgTools.CheckMaxSeqWithMongo(ctx, conversationID, maxSeqCache, maxSeqMongo); err != nil { + t.Error("checkMaxSeqWithMongo failed", conversationID) } if minSeqMongo != minSeqCache { t.Error("minSeqMongo != minSeqCache", minSeqMongo, minSeqCache) @@ -147,27 +151,28 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) { /////// uid3 testUID3 := "test_del_id3" - _, err = mongoClient.DeleteOne(ctx, bson.M{"uid": testUID3 + ":" + strconv.Itoa(0)}) + conversationID = utils.GetConversationIDBySessionType(constant.SuperGroupChatType, testUID3) + _, err = mongoClient.DeleteOne(ctx, bson.M{"uid": conversationID + ":" + strconv.Itoa(0)}) if err != nil { t.Error("delete failed") } - err = cacheModel.SetUserMaxSeq(ctx, testUID3, 4999) + err = cacheModel.SetMaxSeq(ctx, conversationID, 4999) if err != nil { t.Error("SetUserMaxSeq failed") } - msgDoc = GenMsgDoc(1, 4999, 5000, 0, testUID3) + msgDoc = GenMsgDoc(1, 4999, 5000, 0, conversationID) if _, err := mongoClient.InsertOne(ctx, msgDoc); err != nil { - t.Error("InsertOne failed", testUID3) + t.Error("InsertOne failed", conversationID) } - msgTools.ClearUsersMsg(ctx, []string{testUID3}) - minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err = msgTools.msgDatabase.GetUserMinMaxSeqInMongoAndCache(ctx, testUID3) + msgTools.ClearUsersMsg(ctx, []string{conversationID}) + minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err = msgTools.msgDatabase.GetConversationMinMaxSeqInMongoAndCache(ctx, conversationID) if err != nil { t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed") return } - if err := msgTools.CheckMaxSeqWithMongo(ctx, testUID3, maxSeqCache, maxSeqMongo, constant.WriteDiffusion); err != nil { - t.Error("checkMaxSeqWithMongo failed", testUID3) + if err := msgTools.CheckMaxSeqWithMongo(ctx, conversationID, maxSeqCache, maxSeqMongo); err != nil { + t.Error("checkMaxSeqWithMongo failed", conversationID) } if minSeqMongo != minSeqCache { t.Error("minSeqMongo != minSeqCache", minSeqMongo, minSeqCache) @@ -178,45 +183,46 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) { //// uid4 testUID4 := "test_del_id4" - _, err = mongoClient.DeleteOne(ctx, bson.M{"uid": testUID4 + ":" + strconv.Itoa(0)}) + conversationID = utils.GetConversationIDBySessionType(constant.SuperGroupChatType, testUID4) + _, err = mongoClient.DeleteOne(ctx, bson.M{"uid": conversationID + ":" + strconv.Itoa(0)}) if err != nil { t.Error("delete failed") } - _, err = mongoClient.DeleteOne(ctx, bson.M{"uid": testUID4 + ":" + strconv.Itoa(1)}) + _, err = mongoClient.DeleteOne(ctx, bson.M{"uid": conversationID + ":" + strconv.Itoa(1)}) if err != nil { t.Error("delete failed") } - _, err = mongoClient.DeleteOne(ctx, bson.M{"uid": testUID4 + ":" + strconv.Itoa(2)}) + _, err = mongoClient.DeleteOne(ctx, bson.M{"uid": conversationID + ":" + strconv.Itoa(2)}) if err != nil { t.Error("delete failed") } - err = cacheModel.SetUserMaxSeq(ctx, testUID4, 12000) - msgDoc = GenMsgDoc(1, 4999, 5000, 0, testUID4) - msgDoc2 = GenMsgDoc(5000, 9999, 10000, 1, testUID4) - msgDoc3 := GenMsgDoc(10000, 12000, 11000, 2, testUID4) + err = cacheModel.SetMaxSeq(ctx, conversationID, 12000) + msgDoc = GenMsgDoc(1, 4999, 5000, 0, conversationID) + msgDoc2 = GenMsgDoc(5000, 9999, 10000, 1, conversationID) + msgDoc3 := GenMsgDoc(10000, 12000, 11000, 2, conversationID) if _, err := mongoClient.InsertOne(ctx, msgDoc); err != nil { - t.Error("InsertOne failed", testUID4) + t.Error("InsertOne failed", conversationID) } if _, err := mongoClient.InsertOne(ctx, msgDoc2); err != nil { - t.Error("InsertOne failed", testUID4) + t.Error("InsertOne failed", conversationID) } if _, err := mongoClient.InsertOne(ctx, msgDoc3); err != nil { - t.Error("InsertOne failed", testUID4) + t.Error("InsertOne failed", conversationID) } - msgTools.ClearUsersMsg(ctx, []string{testUID4}) + msgTools.ClearUsersMsg(ctx, []string{conversationID}) if err != nil { t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed") return } - minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err = msgTools.msgDatabase.GetUserMinMaxSeqInMongoAndCache(ctx, testUID4) + minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err = msgTools.msgDatabase.GetConversationMinMaxSeqInMongoAndCache(ctx, conversationID) if err != nil { t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed") return } - if err := msgTools.CheckMaxSeqWithMongo(ctx, testUID4, maxSeqCache, maxSeqMongo, constant.WriteDiffusion); err != nil { - t.Error("checkMaxSeqWithMongo failed", testUID4) + if err := msgTools.CheckMaxSeqWithMongo(ctx, conversationID, maxSeqCache, maxSeqMongo); err != nil { + t.Error("checkMaxSeqWithMongo failed", conversationID) } if minSeqMongo != minSeqCache { t.Error("minSeqMongo != minSeqCache", minSeqMongo, minSeqCache) @@ -226,36 +232,38 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) { } testUID5 := "test_del_id5" - _, err = mongoClient.DeleteOne(ctx, bson.M{"uid": testUID5 + ":" + strconv.Itoa(0)}) + conversationID = utils.GetConversationIDBySessionType(constant.SuperGroupChatType, testUID5) + + _, err = mongoClient.DeleteOne(ctx, bson.M{"uid": conversationID + ":" + strconv.Itoa(0)}) if err != nil { t.Error("delete failed") } - _, err = mongoClient.DeleteOne(ctx, bson.M{"uid": testUID5 + ":" + strconv.Itoa(1)}) + _, err = mongoClient.DeleteOne(ctx, bson.M{"uid": conversationID + ":" + strconv.Itoa(1)}) if err != nil { t.Error("delete failed") } - err = cacheModel.SetUserMaxSeq(ctx, testUID5, 9999) - msgDoc = GenMsgDoc(1, 4999, 5000, 0, testUID5) - msgDoc2 = GenMsgDoc(5000, 9999, 10000, 1, testUID5) + err = cacheModel.SetMaxSeq(ctx, conversationID, 9999) + msgDoc = GenMsgDoc(1, 4999, 5000, 0, conversationID) + msgDoc2 = GenMsgDoc(5000, 9999, 10000, 1, conversationID) if _, err := mongoClient.InsertOne(ctx, msgDoc); err != nil { - t.Error("InsertOne failed", testUID5) + t.Error("InsertOne failed", conversationID) } if _, err := mongoClient.InsertOne(ctx, msgDoc2); err != nil { - t.Error("InsertOne failed", testUID5) + t.Error("InsertOne failed", conversationID) } - msgTools.ClearUsersMsg(ctx, []string{testUID5}) + msgTools.ClearUsersMsg(ctx, []string{conversationID}) if err != nil { t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed") return } - minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err = msgTools.msgDatabase.GetUserMinMaxSeqInMongoAndCache(ctx, testUID5) + minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err = msgTools.msgDatabase.GetConversationMinMaxSeqInMongoAndCache(ctx, conversationID) if err != nil { t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed") return } - if err := msgTools.CheckMaxSeqWithMongo(ctx, testUID5, maxSeqCache, maxSeqMongo, constant.WriteDiffusion); err != nil { - t.Error("checkMaxSeqWithMongo failed", testUID5) + if err := msgTools.CheckMaxSeqWithMongo(ctx, conversationID, maxSeqCache, maxSeqMongo); err != nil { + t.Error("checkMaxSeqWithMongo failed", conversationID) } if minSeqMongo != minSeqCache { t.Error("minSeqMongo != minSeqCache", minSeqMongo, minSeqCache) @@ -265,26 +273,28 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) { } testUID6 := "test_del_id6" - _, err = mongoClient.DeleteOne(ctx, bson.M{"uid": testUID6 + ":" + strconv.Itoa(0)}) + conversationID = utils.GetConversationIDBySessionType(constant.SuperGroupChatType, testUID6) + + _, err = mongoClient.DeleteOne(ctx, bson.M{"uid": conversationID + ":" + strconv.Itoa(0)}) if err != nil { t.Error("delete failed") } - _, err = mongoClient.DeleteOne(ctx, bson.M{"uid": testUID6 + ":" + strconv.Itoa(1)}) + _, err = mongoClient.DeleteOne(ctx, bson.M{"uid": conversationID + ":" + strconv.Itoa(1)}) if err != nil { t.Error("delete failed") } - _, err = mongoClient.DeleteOne(ctx, bson.M{"uid": testUID6 + ":" + strconv.Itoa(2)}) + _, err = mongoClient.DeleteOne(ctx, bson.M{"uid": conversationID + ":" + strconv.Itoa(2)}) if err != nil { t.Error("delete failed") } - _, err = mongoClient.DeleteOne(ctx, bson.M{"uid": testUID6 + ":" + strconv.Itoa(3)}) + _, err = mongoClient.DeleteOne(ctx, bson.M{"uid": conversationID + ":" + strconv.Itoa(3)}) if err != nil { t.Error("delete failed") } - msgDoc = GenMsgDoc(1, 4999, 5000, 0, testUID6) - msgDoc2 = GenMsgDoc(5000, 9999, 10000, 1, testUID6) - msgDoc3 = GenMsgDoc(10000, 14999, 13000, 2, testUID6) - msgDoc4 := GenMsgDoc(15000, 19999, 0, 3, testUID6) + msgDoc = GenMsgDoc(1, 4999, 5000, 0, conversationID) + msgDoc2 = GenMsgDoc(5000, 9999, 10000, 1, conversationID) + msgDoc3 = GenMsgDoc(10000, 14999, 13000, 2, conversationID) + msgDoc4 := GenMsgDoc(15000, 19999, 0, 3, conversationID) if _, err := mongoClient.InsertOne(ctx, msgDoc); err != nil { t.Error("InsertOne failed", testUID4) } @@ -297,13 +307,13 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) { if _, err := mongoClient.InsertOne(ctx, msgDoc4); err != nil { t.Error("InsertOne failed", testUID4) } - minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err = msgTools.msgDatabase.GetUserMinMaxSeqInMongoAndCache(ctx, testUID6) + minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err = msgTools.msgDatabase.GetConversationMinMaxSeqInMongoAndCache(ctx, conversationID) if err != nil { t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed") return } - if err := msgTools.CheckMaxSeqWithMongo(ctx, testUID6, maxSeqCache, maxSeqMongo, constant.WriteDiffusion); err != nil { - t.Error("checkMaxSeqWithMongo failed", testUID6) + if err := msgTools.CheckMaxSeqWithMongo(ctx, conversationID, maxSeqCache, maxSeqMongo); err != nil { + t.Error("checkMaxSeqWithMongo failed", conversationID) } if minSeqMongo != minSeqCache { t.Error("minSeqMongo != minSeqCache", minSeqMongo, minSeqCache) diff --git a/pkg/common/db/cache/msg.go b/pkg/common/db/cache/msg.go index 6a6652059..9febd2913 100644 --- a/pkg/common/db/cache/msg.go +++ b/pkg/common/db/cache/msg.go @@ -22,48 +22,43 @@ import ( ) const ( - userIncrSeq = "REDIS_USER_INCR_SEQ:" // user incr seq - appleDeviceToken = "DEVICE_TOKEN" - userMinSeq = "REDIS_USER_MIN_SEQ:" - getuiToken = "GETUI_TOKEN" - getuiTaskID = "GETUI_TASK_ID" - messageCache = "MESSAGE_CACHE:" - signalCache = "SIGNAL_CACHE:" - signalListCache = "SIGNAL_LIST_CACHE:" - fcmToken = "FCM_TOKEN:" - groupUserMinSeq = "GROUP_USER_MIN_SEQ:" - groupMaxSeq = "GROUP_MAX_SEQ:" - groupMinSeq = "GROUP_MIN_SEQ:" + maxSeq = "MAX_SEQ:" + minSeq = "MIN_SEQ:" + conversationUserMinSeq = "CON_USER_MIN_SEQ:" + + appleDeviceToken = "DEVICE_TOKEN" + getuiToken = "GETUI_TOKEN" + getuiTaskID = "GETUI_TASK_ID" + messageCache = "MESSAGE_CACHE:" + signalCache = "SIGNAL_CACHE:" + signalListCache = "SIGNAL_LIST_CACHE:" + fcmToken = "FCM_TOKEN:" + sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:" userBadgeUnreadCountSum = "USER_BADGE_UNREAD_COUNT_SUM:" exTypeKeyLocker = "EX_LOCK:" uidPidToken = "UID_PID_TOKEN_STATUS:" - userNotificationSeq = "USER_NOTIFICATION_SEQ:" - userMinNotificationSeq = "USER_MIN_NOTIFICATION_SEQ:" - groupNotificationSeq = "GROUP_NOTIFICATION_SEQ:" - groupMinNotificationSeq = "GROUP_MIN_NOTIFICATION_SEQ:" ) type MsgModel interface { - IncrUserSeq(ctx context.Context, userID string) (int64, error) - GetUserMaxSeq(ctx context.Context, userID string) (int64, error) - SetUserMaxSeq(ctx context.Context, userID string, maxSeq int64) error - SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) - GetUserMinSeq(ctx context.Context, userID string) (int64, error) - SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) - GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error) - GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) - GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) - IncrGroupMaxSeq(ctx context.Context, groupID string) (int64, error) - SetGroupMaxSeq(ctx context.Context, groupID string, maxSeq int64) error - SetGroupMinSeq(ctx context.Context, groupID string, minSeq int64) error + SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error + GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) + GetMaxSeq(ctx context.Context, conversationID string) (int64, error) + SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error + GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) + GetMinSeq(ctx context.Context, conversationID string) (int64, error) + GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) + GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (map[string]int64, error) + SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error + SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error) + AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error GetTokensWithoutError(ctx context.Context, userID, platformID string) (map[string]int, error) SetTokenMapByUidPid(ctx context.Context, userID string, platform string, m map[string]int) error DeleteTokenByUidPid(ctx context.Context, userID string, platform string, fields []string) error - GetMessagesBySeq(ctx context.Context, userID string, seqList []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error) - SetMessageToCache(ctx context.Context, userID string, msgList []*sdkws.MsgData) (int, error) - DeleteMessageFromCache(ctx context.Context, userID string, msgList []*sdkws.MsgData) error + GetMessagesBySeq(ctx context.Context, conversationID string, seqList []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error) + SetMessageToCache(ctx context.Context, conversationID string, msgList []*sdkws.MsgData) (int, error) + DeleteMessageFromCache(ctx context.Context, conversationID string, msgList []*sdkws.MsgData) error CleanUpOneUserAllMsg(ctx context.Context, userID string) error HandleSignalInvite(ctx context.Context, msg *sdkws.MsgData, pushToUserID string) (isSend bool, err error) GetSignalInvitationInfoByClientMsgID(ctx context.Context, clientMsgID string) (invitationInfo *sdkws.SignalInviteReq, err error) @@ -90,20 +85,6 @@ type MsgModel interface { SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error - // notificatio - - // IncrUserNotificationSeq(ctx context.Context, userID string) (int64, error) - // GetUserNotificationMaxSeq(ctx context.Context, userID string) (int64, error) - // SetUserNotificationMaxSeq(ctx context.Context, userID string, maxSeq int64) error - // SetUserNotificationMinSeq(ctx context.Context, userID string, minSeq int64) (err error) - // GetUserNotificationMinSeq(ctx context.Context, userID string) (int64, error) - // SetGroupNotificationUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) - // GetGroupNotificationUserMinSeq(ctx context.Context, groupID, userID string) (int64, error) - // GetGroupNotificationMaxSeq(ctx context.Context, groupID string) (int64, error) - // GetGroupNotificationMinSeq(ctx context.Context, groupID string) (int64, error) - // IncrGroupNotificationMaxSeq(ctx context.Context, groupID string) (int64, error) - // SetGroupNotificationMaxSeq(ctx context.Context, groupID string, maxSeq int64) error - // SetGroupNotificationMinSeq(ctx context.Context, groupID string, minSeq int64) error } func NewMsgCacheModel(client redis.UniversalClient) MsgModel { @@ -146,58 +127,91 @@ func (c *msgCache) DelKeys() { } } -func (c *msgCache) IncrUserSeq(ctx context.Context, userID string) (int64, error) { - return utils.Wrap2(c.rdb.Get(ctx, userIncrSeq+userID).Int64()) +func (c *msgCache) getMaxSeqKey(conversationID string) string { + return maxSeq + conversationID } -func (c *msgCache) GetUserMaxSeq(ctx context.Context, userID string) (int64, error) { - return utils.Wrap2(c.rdb.Get(ctx, userIncrSeq+userID).Int64()) +func (c *msgCache) getMinSeqKey(conversationID string) string { + return minSeq + conversationID } -func (c *msgCache) SetUserMaxSeq(ctx context.Context, userID string, maxSeq int64) error { - return errs.Wrap(c.rdb.Set(ctx, userIncrSeq+userID, maxSeq, 0).Err()) +func (c *msgCache) setSeq(ctx context.Context, conversationID string, seq int64, getkey func(conversationID string) string) error { + return utils.Wrap1(c.rdb.Set(ctx, getkey(conversationID), seq, 0).Err()) } -func (c *msgCache) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) { - return errs.Wrap(c.rdb.Set(ctx, userMinSeq+userID, minSeq, 0).Err()) +func (c *msgCache) getSeq(ctx context.Context, conversationID string, getkey func(conversationID string) string) (int64, error) { + return utils.Wrap2(c.rdb.Get(ctx, getkey(conversationID)).Int64()) } -func (c *msgCache) GetUserMinSeq(ctx context.Context, userID string) (int64, error) { - return utils.Wrap2(c.rdb.Get(ctx, userMinSeq+userID).Int64()) +func (c *msgCache) getSeqs(ctx context.Context, items []string, getkey func(s string) string) (m map[string]int64, err error) { + pipe := c.rdb.Pipeline() + for _, v := range items { + if err := pipe.Get(ctx, getkey(v)).Err(); err != nil && err != redis.Nil { + return nil, err + } + } + result, err := pipe.Exec(ctx) + if err != nil { + return nil, errs.Wrap(err) + } + m = make(map[string]int64, len(items)) + for i, v := range result { + if v.Err() != nil && err != redis.Nil { + return nil, errs.Wrap(v.Err()) + } + m[items[i]] = utils.StringToInt64(v.String()) + } + return m, nil } -func (c *msgCache) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) { - key := groupUserMinSeq + "g:" + groupID + "u:" + userID - return errs.Wrap(c.rdb.Set(ctx, key, minSeq, 0).Err()) +func (c *msgCache) SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error { + return c.setSeq(ctx, conversationID, maxSeq, c.getMaxSeqKey) } -func (c *msgCache) GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error) { - key := groupUserMinSeq + "g:" + groupID + "u:" + userID - return utils.Wrap2(c.rdb.Get(ctx, key).Int64()) +func (c *msgCache) GetMaxSeqs(ctx context.Context, conversationIDs []string) (m map[string]int64, err error) { + return c.getSeqs(ctx, conversationIDs, c.getMaxSeqKey) } -func (c *msgCache) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) { - return utils.Wrap2(c.rdb.Get(ctx, groupMaxSeq+groupID).Int64()) +func (c *msgCache) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) { + return c.getSeq(ctx, conversationID, c.getMaxSeqKey) +} +func (c *msgCache) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error { + return c.setSeq(ctx, conversationID, minSeq, c.getMinSeqKey) +} +func (c *msgCache) GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) { + return c.getSeqs(ctx, conversationIDs, c.getMinSeqKey) +} +func (c *msgCache) GetMinSeq(ctx context.Context, conversationID string) (int64, error) { + return c.getSeq(ctx, conversationID, c.getMinSeqKey) } -func (c *msgCache) GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) { - return utils.Wrap2(c.rdb.Get(ctx, groupMinSeq+groupID).Int64()) +func (c *msgCache) getConversationUserMinSeqKey(conversationID, userID string) string { + return conversationUserMinSeq + "g:" + conversationID + "u:" + userID } -func (c *msgCache) IncrGroupMaxSeq(ctx context.Context, groupID string) (int64, error) { - key := groupMaxSeq + groupID - seq, err := c.rdb.Incr(ctx, key).Uint64() - return int64(seq), errs.Wrap(err) +func (c *msgCache) GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) { + return utils.Wrap2(c.rdb.Get(ctx, c.getConversationUserMinSeqKey(conversationID, userID)).Int64()) } -func (c *msgCache) SetGroupMaxSeq(ctx context.Context, groupID string, maxSeq int64) error { - key := groupMaxSeq + groupID - return errs.Wrap(c.rdb.Set(ctx, key, maxSeq, 0).Err()) +func (c *msgCache) GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (m map[string]int64, err error) { + return c.getSeqs(ctx, userIDs, func(userID string) string { + return c.getConversationUserMinSeqKey(conversationID, userID) + }) +} +func (c *msgCache) SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error { + return utils.Wrap1(c.rdb.Set(ctx, c.getConversationUserMinSeqKey(conversationID, userID), minSeq, 0).Err()) } -func (c *msgCache) SetGroupMinSeq(ctx context.Context, groupID string, minSeq int64) error { - key := groupMinSeq + groupID - return errs.Wrap(c.rdb.Set(ctx, key, minSeq, 0).Err()) +func (c *msgCache) SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error) { + pipe := c.rdb.Pipeline() + for userID, minSeq := range seqs { + err = pipe.Set(ctx, c.getConversationUserMinSeqKey(conversationID, userID), minSeq, 0).Err() + if err != nil { + return errs.Wrap(err) + } + } + _, err = pipe.Exec(ctx) + return err } func (c *msgCache) AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error { @@ -266,11 +280,11 @@ func (c *msgCache) GetMessagesBySeq(ctx context.Context, userID string, seqs []i return seqMsgs, failedSeqs, err } -func (c *msgCache) SetMessageToCache(ctx context.Context, userID string, msgList []*sdkws.MsgData) (int, error) { +func (c *msgCache) SetMessageToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) { pipe := c.rdb.Pipeline() var failedMsgs []sdkws.MsgData - for _, msg := range msgList { - key := c.getMessageCacheKey(userID, msg.Seq) + for _, msg := range msgs { + key := c.getMessageCacheKey(conversationID, msg.Seq) s, err := utils.Pb2String(msg) if err != nil { return 0, errs.Wrap(err) @@ -281,7 +295,7 @@ func (c *msgCache) SetMessageToCache(ctx context.Context, userID string, msgList } } if len(failedMsgs) != 0 { - return len(failedMsgs), fmt.Errorf("set msg to msgCache failed, failed lists: %v, %s", failedMsgs, userID) + return len(failedMsgs), fmt.Errorf("set msg to msgCache failed, failed lists: %v, %s", failedMsgs, conversationID) } _, err := pipe.Exec(ctx) return 0, err diff --git a/pkg/common/db/controller/common_msg.go b/pkg/common/db/controller/common_msg.go index 71c494803..58f4f0d81 100644 --- a/pkg/common/db/controller/common_msg.go +++ b/pkg/common/db/controller/common_msg.go @@ -6,26 +6,28 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" ) -// sourceID 可以是通知也可以是conversation +// conversationID 可以是通知也可以是conversation type commonMsgDatabase interface { - BatchInsertChat2DB(ctx context.Context, sourceID string, msgs []*sdkws.MsgData, currentMaxSeq int64) error - DeleteMessageFromCache(ctx context.Context, sourceID string, msgs []*sdkws.MsgData) error + BatchInsertChat2DB(ctx context.Context, conversationID string, msgs []*sdkws.MsgData, currentMaxSeq int64) error + DeleteMessageFromCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) error - GetMsgBySeqs(ctx context.Context, sourceID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) - GetMsgBySeqsRange(ctx context.Context, sourceID string, begin, end, num int64) (seqMsg []*sdkws.MsgData, err error) - CleanUpUserMsg(ctx context.Context, sourceID string) error - DelMsgsBySeqs(ctx context.Context, sourceID string, seqs []int64) (totalUnExistSeqs []int64, err error) - DelMsgsAndResetMinSeq(ctx context.Context, sourceID string, userIDs []string, remainTime int64) error + GetMsgBySeqs(ctx context.Context, conversationID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) + GetMsgBySeqsRange(ctx context.Context, conversationID string, begin, end, num int64) (seqMsg []*sdkws.MsgData, err error) + CleanUpUserMsg(ctx context.Context, conversationID string) error + DelMsgsBySeqs(ctx context.Context, conversationID string, seqs []int64) (totalUnExistSeqs []int64, err error) + DelMsgsAndResetMinSeq(ctx context.Context, conversationID string, userIDs []string, remainTime int64) error - GetMinMaxSeqInMongoAndCache(ctx context.Context, sourceID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) + GetMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) - GetMaxSeq(ctx context.Context, sourceID string) (int64, error) - GetMinSeq(ctx context.Context, sourceID string) (int64, error) - SetMaxSeq(ctx context.Context, sourceID string, seq int64) error - SetMinSeq(ctx context.Context, sourceID string, seq int64) error + GetMaxSeq(ctx context.Context, conversationID string) (int64, error) + GetMinSeq(ctx context.Context, conversationID string) (int64, error) + SetMaxSeq(ctx context.Context, conversationID string, seq int64) error + SetMinSeq(ctx context.Context, conversationID string, seq int64) error + GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) + GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) - MsgToMQ(ctx context.Context, sourceID string, msg2mq *sdkws.MsgData) error - MsgToModifyMQ(ctx context.Context, sourceID string, messages []*sdkws.MsgData) error - MsgToPushMQ(ctx context.Context, sourceID string, msg2mq *sdkws.MsgData) (int32, int64, error) - MsgToMongoMQ(ctx context.Context, sourceID string, messages []*sdkws.MsgData, lastSeq int64) error + MsgToMQ(ctx context.Context, conversationID string, msg2mq *sdkws.MsgData) error + MsgToModifyMQ(ctx context.Context, conversationID string, messages []*sdkws.MsgData) error + MsgToPushMQ(ctx context.Context, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error) + MsgToMongoMQ(ctx context.Context, conversationID string, messages []*sdkws.MsgData, lastSeq int64) error } diff --git a/pkg/common/db/controller/conversation.go b/pkg/common/db/controller/conversation.go index b9a78523d..f7a227953 100644 --- a/pkg/common/db/controller/conversation.go +++ b/pkg/common/db/controller/conversation.go @@ -29,6 +29,7 @@ type ConversationDatabase interface { //SetUsersConversationFiledTx 设置多个用户会话关于某个字段的更新操作,如果会话不存在则创建,否则更新,内部保证事务操作 SetUsersConversationFiledTx(ctx context.Context, userIDs []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) error CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string) error + GetConversationIDs(ctx context.Context, userID string) ([]string, error) } func NewConversationDatabase(conversation relationTb.ConversationModelInterface, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase { @@ -216,3 +217,7 @@ func (c *ConversationDataBase) CreateGroupChatConversation(ctx context.Context, }) } + +func (c *ConversationDataBase) GetConversationIDs(ctx context.Context, userID string) ([]string, error) { + return c.cache.GetUserConversationIDs(ctx, userID) +} diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index 292e2d943..9bed3ebc6 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -38,33 +38,26 @@ type MsgDatabase interface { // incrSeq通知seq然后批量插入缓存 NotificationBatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int64, error) // 删除消息 返回不存在的seqList - DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error) - // 获取群ID或者UserID最新一条在mongo里面的消息 + DelMsgBySeqs(ctx context.Context, conversationID string, seqs []int64) (totalUnExistSeqs []int64, err error) // 通过seqList获取mongo中写扩散消息 GetMsgBySeqsRange(ctx context.Context, conversationID string, begin, end, num int64) (seqMsg []*sdkws.MsgData, err error) // 通过seqList获取大群在 mongo里面的消息 - GetMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) + GetMsgBySeqs(ctx context.Context, conversationID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) // 删除用户所有消息/redis/mongo然后重置seq - CleanUpUserMsg(ctx context.Context, userID string) error + CleanUpConversationMsgs(ctx context.Context, conversationID string) error // 删除大群消息重置群成员最小群seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除 redis cache) - DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userIDs []string, remainTime int64) error - // 删除用户消息重置最小seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache) - DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error - // 获取用户 seq mongo和redis - GetUserMinMaxSeqInMongoAndCache(ctx context.Context, userID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) - // 获取群 seq mongo和redis - GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error) - // 设置群用户最小seq 直接调用cache - SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) - GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error) - // 设置用户最小seq 直接调用cache - SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) + // DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userIDs []string, remainTime int64) error + // 删除会话消息重置最小seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache) + DeleteConversationMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error + + // 获取会话 seq mongo和redis + GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) + + // msg modify JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) - SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error - SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) GetExtendMsg(ctx context.Context, conversationID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (*pbMsg.ExtendMsg, error) InsertOrUpdateReactionExtendMsgSet(ctx context.Context, conversationID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error @@ -72,13 +65,20 @@ type MsgDatabase interface { GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error DeleteReactionExtendMsgSet(ctx context.Context, conversationID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error + // msg send status SetSendMsgStatus(ctx context.Context, id string, status int32) error GetSendMsgStatus(ctx context.Context, id string) (int32, error) - GetUserMaxSeq(ctx context.Context, userID string) (int64, error) - GetUserMinSeq(ctx context.Context, userID string) (int64, error) - GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) - GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) + SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error + GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) + GetMaxSeq(ctx context.Context, conversationID string) (int64, error) + SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error + GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) + GetMinSeq(ctx context.Context, conversationID string) (int64, error) + GetUserMinSeq(ctx context.Context, conversationIDs []string) (map[string]int64, error) + SetUserMinSeq(ctx context.Context, seqs map[string]int64) (err error) + + // to mq MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error MsgToModifyMQ(ctx context.Context, conversationID string, messages []*sdkws.MsgData) error MsgToPushMQ(ctx context.Context, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error) @@ -213,24 +213,8 @@ func (db *msgDatabase) MsgToMongoMQ(ctx context.Context, conversationID string, } return nil } -func (db *msgDatabase) GetUserMaxSeq(ctx context.Context, userID string) (int64, error) { - return db.cache.GetUserMaxSeq(ctx, userID) -} - -func (db *msgDatabase) GetUserMinSeq(ctx context.Context, userID string) (int64, error) { - return db.cache.GetUserMinSeq(ctx, userID) -} - -func (db *msgDatabase) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) { - return db.cache.GetGroupMaxSeq(ctx, groupID) -} - -func (db *msgDatabase) GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) { - return db.cache.GetGroupMinSeq(ctx, groupID) -} func (db *msgDatabase) BatchInsertChat2DB(ctx context.Context, conversationID string, msgList []*sdkws.MsgData, currentMaxSeq int64) error { - //newTime := utils.GetCurrentTimestampByMill() if int64(len(msgList)) > db.msg.GetSingleGocMsgNum() { return errors.New("too large") } @@ -309,21 +293,19 @@ func (db *msgDatabase) BatchInsertChat2DB(ctx context.Context, conversationID st } prome.Inc(prome.MsgInsertMongoSuccessCounter) } - //log.Debug(operationID, "batch mgo cost time ", mongo2.getCurrentTimestampByMill()-newTime, userID, len(msgList)) return nil } -func (db *msgDatabase) DeleteMessageFromCache(ctx context.Context, userID string, msgs []*sdkws.MsgData) error { - return db.cache.DeleteMessageFromCache(ctx, userID, msgs) +func (db *msgDatabase) DeleteMessageFromCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) error { + return db.cache.DeleteMessageFromCache(ctx, conversationID, msgs) } func (db *msgDatabase) NotificationBatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int64, error) { return 0, nil } -func (db *msgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgList []*sdkws.MsgData, currentMaxSeq int64) (int64, error) { - //newTime := utils.GetCurrentTimestampByMill() - lenList := len(msgList) +func (db *msgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData, currentMaxSeq int64) (int64, error) { + lenList := len(msgs) if int64(lenList) > db.msg.GetSingleGocMsgNum() { return 0, errors.New("too large") } @@ -332,25 +314,18 @@ func (db *msgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID } // judge sessionType to get seq lastMaxSeq := currentMaxSeq - for _, m := range msgList { + for _, m := range msgs { currentMaxSeq++ m.Seq = currentMaxSeq - //log.Debug(operationID, "cache msg node ", m.String(), m.MsgData.ClientMsgID, "userID: ", conversationID, "seq: ", currentMaxSeq) } - //log.Debug(operationID, "SetMessageToCache ", conversationID, len(msgList)) - failedNum, err := db.cache.SetMessageToCache(ctx, conversationID, msgList) + failedNum, err := db.cache.SetMessageToCache(ctx, conversationID, msgs) if err != nil { prome.Add(prome.MsgInsertRedisFailedCounter, failedNum) - //log.Error(operationID, "setMessageToCache failed, continue ", err.Error(), len(msgList), conversationID) + log.ZError(ctx, "setMessageToCache error", err, "len", len(msgs), "conversationID", conversationID) } else { prome.Inc(prome.MsgInsertRedisSuccessCounter) } - //log.Debug(operationID, "batch to redis cost time ", mongo2.getCurrentTimestampByMill()-newTime, conversationID, len(msgList)) - if msgList[0].SessionType == constant.SuperGroupChatType { - err = db.cache.SetGroupMaxSeq(ctx, conversationID, currentMaxSeq) - } else { - err = db.cache.SetUserMaxSeq(ctx, conversationID, currentMaxSeq) - } + err = db.cache.SetMaxSeq(ctx, conversationID, currentMaxSeq) if err != nil { prome.Inc(prome.SeqSetFailedCounter) } else { @@ -359,9 +334,9 @@ func (db *msgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID return lastMaxSeq, utils.Wrap(err, "") } -func (db *msgDatabase) DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error) { +func (db *msgDatabase) DelMsgBySeqs(ctx context.Context, conversationID string, seqs []int64) (totalUnExistSeqs []int64, err error) { sortkeys.Int64s(seqs) - docIDSeqsMap := db.msg.GetDocIDSeqsMap(userID, seqs) + docIDSeqsMap := db.msg.GetDocIDSeqsMap(conversationID, seqs) lock := sync.Mutex{} var wg sync.WaitGroup wg.Add(len(docIDSeqsMap)) @@ -456,14 +431,14 @@ func (db *msgDatabase) getMsgBySeqs(ctx context.Context, conversationID string, for docID, value := range m { doc, err := db.msgDocDatabase.FindOneByDocID(ctx, docID) if err != nil { - //log.NewError(operationID, "not find seqUid", seqUid, value, uid, seqList, err.Error()) + log.ZError(ctx, "get message from mongo exception", err, "docID", docID) continue } singleCount = 0 for i := 0; i < len(doc.Msg); i++ { msgPb, err := db.unmarshalMsg(&doc.Msg[i]) if err != nil { - //log.NewError(operationID, "Unmarshal err", seqUid, value, uid, seqList, err.Error()) + log.ZError(ctx, "unmarshal message exception", err, "docID", docID, "msg", &doc.Msg[i]) return nil, err } if utils.Contain(msgPb.Seq, value...) { @@ -488,12 +463,23 @@ func (db *msgDatabase) getMsgBySeqs(ctx context.Context, conversationID string, func (db *msgDatabase) getMsgBySeqsRange(ctx context.Context, conversationID string, seqs []int64, begin, end, num int64) (seqMsg []*sdkws.MsgData, err error) { m := db.msg.GetDocIDSeqsMap(conversationID, seqs) - for { + for int64(len(seqMsg)) != num { + for docID, value := range m { + beginSeq, endSeq := db.msg.GetSeqsBeginEnd(value) + msgs, seqs, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, docID, beginSeq, endSeq) + if err != nil { + log.ZError(ctx, "GetMsgBySeqIndexIn1Doc error", err, "docID", docID, "beginSeq", beginSeq, "endSeq", endSeq) + continue + } + var newMsgs []*sdkws.MsgData + for _, msg := range msgs { + if msg.Status != constant.MsgDeleted { + newMsgs = append(newMsgs, msg) + } + } + if int64(len(newMsgs)) != num { - if int64(len(seqMsg)) != num { - - } else { - break + } } } return seqMsg, nil @@ -550,53 +536,52 @@ func (db *msgDatabase) GetMsgBySeqs(ctx context.Context, conversationID string, return successMsgs, nil } -func (db *msgDatabase) CleanUpUserMsg(ctx context.Context, userID string) error { - err := db.DeleteUserMsgsAndSetMinSeq(ctx, userID, 0) +func (db *msgDatabase) CleanUpConversationMsgs(ctx context.Context, conversationID string) error { + err := db.DeleteConversationMsgsAndSetMinSeq(ctx, conversationID, 0) if err != nil { return err } - err = db.cache.CleanUpOneUserAllMsg(ctx, userID) - return utils.Wrap(err, "") + return db.cache.CleanUpOneUserAllMsg(ctx, conversationID) } -func (db *msgDatabase) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userIDs []string, remainTime int64) error { - var delStruct delMsgRecursionStruct - minSeq, err := db.deleteMsgRecursion(ctx, groupID, unRelationTb.OldestList, &delStruct, remainTime) - if err != nil { - log.ZError(ctx, "deleteMsgRecursion failed", err) - } - if minSeq == 0 { - return nil - } - //log.NewDebug(operationID, utils.GetSelfFuncName(), "delMsgIDList:", delStruct, "minSeq", minSeq) - for _, userID := range userIDs { - userMinSeq, err := db.cache.GetGroupUserMinSeq(ctx, groupID, userID) - if err != nil && err != redis.Nil { - //log.NewError(operationID, utils.GetSelfFuncName(), "GetGroupUserMinSeq failed", groupID, userID, err.Error()) - continue - } - if userMinSeq > minSeq { - err = db.cache.SetGroupUserMinSeq(ctx, groupID, userID, userMinSeq) - } else { - err = db.cache.SetGroupUserMinSeq(ctx, groupID, userID, minSeq) - } - if err != nil { - //log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupID, userID, userMinSeq, minSeq) - } - } - return nil -} +// func (db *msgDatabase) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userIDs []string, remainTime int64) error { +// var delStruct delMsgRecursionStruct +// minSeq, err := db.deleteMsgRecursion(ctx, groupID, unRelationTb.OldestList, &delStruct, remainTime) +// if err != nil { +// log.ZError(ctx, "deleteMsgRecursion failed", err) +// } +// if minSeq == 0 { +// return nil +// } +// //log.NewDebug(operationID, utils.GetSelfFuncName(), "delMsgIDList:", delStruct, "minSeq", minSeq) +// for _, userID := range userIDs { +// userMinSeq, err := db.cache.GetGroupUserMinSeq(ctx, groupID, userID) +// if err != nil && err != redis.Nil { +// //log.NewError(operationID, utils.GetSelfFuncName(), "GetGroupUserMinSeq failed", groupID, userID, err.Error()) +// continue +// } +// if userMinSeq > minSeq { +// err = db.cache.SetGroupUserMinSeq(ctx, groupID, userID, userMinSeq) +// } else { +// err = db.cache.SetGroupUserMinSeq(ctx, groupID, userID, minSeq) +// } +// if err != nil { +// //log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupID, userID, userMinSeq, minSeq) +// } +// } +// return nil +// } -func (db *msgDatabase) DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error { +func (db *msgDatabase) DeleteConversationMsgsAndSetMinSeq(ctx context.Context, conversationID string, remainTime int64) error { var delStruct delMsgRecursionStruct - minSeq, err := db.deleteMsgRecursion(ctx, userID, unRelationTb.OldestList, &delStruct, remainTime) + minSeq, err := db.deleteMsgRecursion(ctx, conversationID, unRelationTb.OldestList, &delStruct, remainTime) if err != nil { return utils.Wrap(err, "") } if minSeq == 0 { return nil } - return db.cache.SetUserMinSeq(ctx, userID, minSeq) + return db.cache.SetUserMinSeq(ctx, map[string]int64{conversationID: minSeq}) } // this is struct for recursion @@ -678,35 +663,23 @@ func (db *msgDatabase) deleteMsgRecursion(ctx context.Context, conversationID st return seq, utils.Wrap(err, "deleteMsg failed") } -func (db *msgDatabase) GetUserMinMaxSeqInMongoAndCache(ctx context.Context, userID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) { - minSeqMongo, maxSeqMongo, err = db.GetMinMaxSeqMongo(ctx, userID) +func (db *msgDatabase) GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) { + minSeqMongo, maxSeqMongo, err = db.GetMinMaxSeqMongo(ctx, conversationID) if err != nil { return 0, 0, 0, 0, err } // from cache - minSeqCache, err = db.cache.GetUserMinSeq(ctx, userID) + minSeqCache, err = db.cache.GetUserMinSeq(ctx, conversationID) if err != nil { return 0, 0, 0, 0, err } - maxSeqCache, err = db.cache.GetUserMaxSeq(ctx, userID) + maxSeqCache, err = db.cache.GetUserMaxSeq(ctx, conversationID) if err != nil { return 0, 0, 0, 0, err } return } -func (db *msgDatabase) GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error) { - minSeqMongo, maxSeqMongo, err = db.GetMinMaxSeqMongo(ctx, groupID) - if err != nil { - return 0, 0, 0, err - } - maxSeqCache, err = db.cache.GetGroupMaxSeq(ctx, groupID) - if err != nil { - return 0, 0, 0, err - } - return -} - func (db *msgDatabase) GetMinMaxSeqMongo(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo int64, err error) { oldestMsgMongo, err := db.msgDocDatabase.GetOldestMsg(ctx, conversationID) if err != nil { @@ -729,14 +702,27 @@ func (db *msgDatabase) GetMinMaxSeqMongo(ctx context.Context, conversationID str return } -func (db *msgDatabase) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) { - return db.cache.SetGroupUserMinSeq(ctx, groupID, userID, minSeq) +func (db *msgDatabase) SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error { + return db.cache.SetMaxSeq(ctx, conversationID, maxSeq) } - -func (db *msgDatabase) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) { - return db.cache.SetUserMinSeq(ctx, userID, minSeq) +func (db *msgDatabase) GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) { + return db.cache.GetMaxSeqs(ctx, conversationIDs) } - -func (db *msgDatabase) GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error) { - return db.cache.GetGroupUserMinSeq(ctx, groupID, userID) +func (db *msgDatabase) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) { + return db.cache.GetMaxSeq(ctx, conversationID) +} +func (db *msgDatabase) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error { + return db.cache.SetMinSeq(ctx, conversationID, minSeq) +} +func (db *msgDatabase) GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) { + return db.cache.GetMinSeqs(ctx, conversationIDs) +} +func (db *msgDatabase) GetMinSeq(ctx context.Context, conversationID string) (int64, error) { + return db.cache.GetMinSeq(ctx, conversationID) +} +func (db *msgDatabase) GetUserMinSeq(ctx context.Context, conversationIDs []string) (map[string]int64, error) { + return db.cache.GetUserMinSeq(ctx, conversationIDs) +} +func (db *msgDatabase) SetUserMinSeq(ctx context.Context, seqs map[string]int64) (err error) { + return db.cache.SetUserMinSeq(ctx, seqs) } diff --git a/pkg/common/db/table/unrelation/msg.go b/pkg/common/db/table/unrelation/msg.go index b6166bddc..03216dfcf 100644 --- a/pkg/common/db/table/unrelation/msg.go +++ b/pkg/common/db/table/unrelation/msg.go @@ -31,7 +31,7 @@ type MsgDocModelInterface interface { Create(ctx context.Context, model *MsgDocModel) error UpdateMsgStatusByIndexInOneDoc(ctx context.Context, docID string, msg *sdkws.MsgData, seqIndex int, status int32) error FindOneByDocID(ctx context.Context, docID string) (*MsgDocModel, error) - GetMsgBySeqIndexIn1Doc(ctx context.Context, conversationID string, begin, end int64) ([]*sdkws.MsgData, error) + GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, beginSeq, endSeq int64) ([]*sdkws.MsgData, []int64, error) GetNewestMsg(ctx context.Context, conversationID string) (*MsgInfoModel, error) GetOldestMsg(ctx context.Context, conversationID string) (*MsgInfoModel, error) Delete(ctx context.Context, docIDs []string) error @@ -76,14 +76,14 @@ func (m MsgDocModel) GetSeqDocIDList(userID string, maxSeq int64) []string { return seqUserIDs } -func (m MsgDocModel) getSeqSuperGroupID(groupID string, seq int64) string { - seqSuffix := seq / singleGocMsgNum - return m.superGroupIndexGen(groupID, seqSuffix) -} +// func (m MsgDocModel) getSeqSuperGroupID(groupID string, seq int64) string { +// seqSuffix := seq / singleGocMsgNum +// return m.superGroupIndexGen(groupID, seqSuffix) +// } -func (m MsgDocModel) superGroupIndexGen(groupID string, seqSuffix int64) string { - return "super_group_" + groupID + ":" + strconv.FormatInt(int64(seqSuffix), 10) -} +// func (m MsgDocModel) superGroupIndexGen(groupID string, seqSuffix int64) string { +// return "super_group_" + groupID + ":" + strconv.FormatInt(int64(seqSuffix), 10) +// } func (m MsgDocModel) GetDocIDSeqsMap(conversationID string, seqs []int64) map[string][]int64 { t := make(map[string][]int64) @@ -99,7 +99,14 @@ func (m MsgDocModel) GetDocIDSeqsMap(conversationID string, seqs []int64) map[st return t } -func (m MsgDocModel) getMsgIndex(seq int64) int64 { +func (m MsgDocModel) GetSeqsBeginEnd(seqs []int64) (int64, int64) { + if len(seqs) == 0 { + return 0, 0 + } + return seqs[0], seqs[len(seqs)-1] +} + +func (m MsgDocModel) GetMsgIndex(seq int64) int64 { seqSuffix := seq / singleGocMsgNum var index int64 if seqSuffix == 0 { diff --git a/pkg/common/db/unrelation/msg.go b/pkg/common/db/unrelation/msg.go index 2374b733a..d54f60e25 100644 --- a/pkg/common/db/unrelation/msg.go +++ b/pkg/common/db/unrelation/msg.go @@ -6,6 +6,7 @@ import ( "fmt" table "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" @@ -134,24 +135,31 @@ func (m *MsgMongoDriver) UpdateOneDoc(ctx context.Context, msg *table.MsgDocMode return err } -func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, begin, end int64) ([]*sdkws.MsgData, error) { - // uid = getSeqUid(uid, seq) - // seqIndex := getMsgIndex(seq) - // m.msg.GetSeqDocIDList() - result, err := m.MsgCollection.Find(ctx, bson.M{"doc_id": docID, "msg": bson.M{"$slice": []int{seqIndex, 1}}}) +func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, beginSeq, endSeq int64) (msgs []*sdkws.MsgData, seqs []int64, err error) { + beginIndex := m.msg.GetMsgIndex(beginSeq) + num := endSeq - beginSeq + 1 + result, err := m.MsgCollection.Find(ctx, bson.M{"doc_id": docID, "msg": bson.M{"$slice": []int64{beginIndex, num}}}) if err != nil { - return nil, err + return nil, nil, err } var msgInfos []table.MsgInfoModel if err := result.Decode(&msgInfos); err != nil { - return nil, err + return nil, nil, err } if len(msgInfos) < 1 { - return nil, errs.ErrRecordNotFound.Wrap("mongo GetMsgBySeqIndex failed, len is 0") + return nil, nil, errs.ErrRecordNotFound.Wrap("mongo GetMsgBySeqIndex failed, len is 0") } - var msg sdkws.MsgData - if err := proto.Unmarshal(msgInfos[0].Msg, &msg); err != nil { - return nil, err + for _, v := range msgInfos { + var msg sdkws.MsgData + if err := proto.Unmarshal(v.Msg, &msg); err != nil { + return nil, nil, err + } + if msg.Seq >= beginSeq && msg.Seq <= endSeq { + msgs = append(msgs, &msg) + seqs = append(seqs, msg.Seq) + } else { + log.ZWarn(ctx, "this msg is at wrong position", nil, "msg", &msg) + } } - return msg, nil + return msgs, seqs, nil } diff --git a/pkg/proto/conversation/conversation.pb.go b/pkg/proto/conversation/conversation.pb.go index e0c40f236..616bb57fa 100644 --- a/pkg/proto/conversation/conversation.pb.go +++ b/pkg/proto/conversation/conversation.pb.go @@ -1267,6 +1267,100 @@ func (*DelGroupChatConversationsResp) Descriptor() ([]byte, []int) { return file_conversation_conversation_proto_rawDescGZIP(), []int{22} } +type GetConversationIDsReq struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + UserID string `protobuf:"bytes,1,opt,name=userID,proto3" json:"userID"` +} + +func (x *GetConversationIDsReq) Reset() { + *x = GetConversationIDsReq{} + if protoimpl.UnsafeEnabled { + mi := &file_conversation_conversation_proto_msgTypes[23] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetConversationIDsReq) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetConversationIDsReq) ProtoMessage() {} + +func (x *GetConversationIDsReq) ProtoReflect() protoreflect.Message { + mi := &file_conversation_conversation_proto_msgTypes[23] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetConversationIDsReq.ProtoReflect.Descriptor instead. +func (*GetConversationIDsReq) Descriptor() ([]byte, []int) { + return file_conversation_conversation_proto_rawDescGZIP(), []int{23} +} + +func (x *GetConversationIDsReq) GetUserID() string { + if x != nil { + return x.UserID + } + return "" +} + +type GetConversationIDsResp struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + ConversationIDs []string `protobuf:"bytes,1,rep,name=conversationIDs,proto3" json:"conversationIDs"` +} + +func (x *GetConversationIDsResp) Reset() { + *x = GetConversationIDsResp{} + if protoimpl.UnsafeEnabled { + mi := &file_conversation_conversation_proto_msgTypes[24] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetConversationIDsResp) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetConversationIDsResp) ProtoMessage() {} + +func (x *GetConversationIDsResp) ProtoReflect() protoreflect.Message { + mi := &file_conversation_conversation_proto_msgTypes[24] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetConversationIDsResp.ProtoReflect.Descriptor instead. +func (*GetConversationIDsResp) Descriptor() ([]byte, []int) { + return file_conversation_conversation_proto_rawDescGZIP(), []int{24} +} + +func (x *GetConversationIDsResp) GetConversationIDs() []string { + if x != nil { + return x.ConversationIDs + } + return nil +} + var File_conversation_conversation_proto protoreflect.FileDescriptor var file_conversation_conversation_proto_rawDesc = []byte{ @@ -1419,99 +1513,114 @@ var file_conversation_conversation_proto_rawDesc = []byte{ 0x44, 0x12, 0x16, 0x0a, 0x06, 0x6d, 0x61, 0x78, 0x53, 0x65, 0x71, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, 0x6d, 0x61, 0x78, 0x53, 0x65, 0x71, 0x22, 0x1f, 0x0a, 0x1d, 0x44, 0x65, 0x6c, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x43, 0x68, 0x61, 0x74, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, - 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x32, 0xbd, 0x0b, 0x0a, 0x0c, 0x63, - 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x88, 0x01, 0x0a, 0x17, + 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x22, 0x2f, 0x0a, 0x15, 0x47, 0x65, + 0x74, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x44, 0x73, + 0x52, 0x65, 0x71, 0x12, 0x16, 0x0a, 0x06, 0x75, 0x73, 0x65, 0x72, 0x49, 0x44, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x06, 0x75, 0x73, 0x65, 0x72, 0x49, 0x44, 0x22, 0x42, 0x0a, 0x16, 0x47, + 0x65, 0x74, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x44, + 0x73, 0x52, 0x65, 0x73, 0x70, 0x12, 0x28, 0x0a, 0x0f, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, + 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x44, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0f, + 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x44, 0x73, 0x32, + 0xb8, 0x0c, 0x0a, 0x0c, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, + 0x12, 0x88, 0x01, 0x0a, 0x17, 0x4d, 0x6f, 0x64, 0x69, 0x66, 0x79, 0x43, 0x6f, 0x6e, 0x76, 0x65, + 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x12, 0x35, 0x2e, 0x4f, + 0x70, 0x65, 0x6e, 0x49, 0x4d, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x76, + 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x4d, 0x6f, 0x64, 0x69, 0x66, 0x79, 0x43, + 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x46, 0x69, 0x65, 0x6c, 0x64, + 0x52, 0x65, 0x71, 0x1a, 0x36, 0x2e, 0x4f, 0x70, 0x65, 0x6e, 0x49, 0x4d, 0x53, 0x65, 0x72, 0x76, + 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x4d, 0x6f, 0x64, 0x69, 0x66, 0x79, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, - 0x6f, 0x6e, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x12, 0x35, 0x2e, 0x4f, 0x70, 0x65, 0x6e, 0x49, 0x4d, - 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, - 0x69, 0x6f, 0x6e, 0x2e, 0x4d, 0x6f, 0x64, 0x69, 0x66, 0x79, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, - 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x52, 0x65, 0x71, 0x1a, 0x36, + 0x6f, 0x6e, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x52, 0x65, 0x73, 0x70, 0x12, 0x70, 0x0a, 0x0f, 0x47, + 0x65, 0x74, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x2d, 0x2e, 0x4f, 0x70, 0x65, 0x6e, 0x49, 0x4d, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2e, 0x63, 0x6f, - 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x4d, 0x6f, 0x64, 0x69, 0x66, - 0x79, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x46, 0x69, 0x65, - 0x6c, 0x64, 0x52, 0x65, 0x73, 0x70, 0x12, 0x70, 0x0a, 0x0f, 0x47, 0x65, 0x74, 0x43, 0x6f, 0x6e, - 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x2d, 0x2e, 0x4f, 0x70, 0x65, 0x6e, - 0x49, 0x4d, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, - 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x47, 0x65, 0x74, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, - 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x1a, 0x2e, 0x2e, 0x4f, 0x70, 0x65, 0x6e, 0x49, - 0x4d, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, - 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x47, 0x65, 0x74, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, - 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x12, 0x7c, 0x0a, 0x13, 0x47, 0x65, 0x74, 0x41, - 0x6c, 0x6c, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, - 0x31, 0x2e, 0x4f, 0x70, 0x65, 0x6e, 0x49, 0x4d, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2e, 0x63, - 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x47, 0x65, 0x74, 0x41, - 0x6c, 0x6c, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, - 0x65, 0x71, 0x1a, 0x32, 0x2e, 0x4f, 0x70, 0x65, 0x6e, 0x49, 0x4d, 0x53, 0x65, 0x72, 0x76, 0x65, - 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x47, - 0x65, 0x74, 0x41, 0x6c, 0x6c, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, - 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x12, 0x73, 0x0a, 0x10, 0x47, 0x65, 0x74, 0x43, 0x6f, 0x6e, - 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x2e, 0x2e, 0x4f, 0x70, 0x65, - 0x6e, 0x49, 0x4d, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, - 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x47, 0x65, 0x74, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, - 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x1a, 0x2f, 0x2e, 0x4f, 0x70, 0x65, - 0x6e, 0x49, 0x4d, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, - 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x47, 0x65, 0x74, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, - 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x12, 0x82, 0x01, 0x0a, 0x15, - 0x42, 0x61, 0x74, 0x63, 0x68, 0x53, 0x65, 0x74, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, - 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x33, 0x2e, 0x4f, 0x70, 0x65, 0x6e, 0x49, 0x4d, 0x53, 0x65, - 0x72, 0x76, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, - 0x6e, 0x2e, 0x42, 0x61, 0x74, 0x63, 0x68, 0x53, 0x65, 0x74, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, - 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x1a, 0x34, 0x2e, 0x4f, 0x70, 0x65, + 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x47, 0x65, 0x74, 0x43, 0x6f, + 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x1a, 0x2e, 0x2e, + 0x4f, 0x70, 0x65, 0x6e, 0x49, 0x4d, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, + 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x47, 0x65, 0x74, 0x43, 0x6f, 0x6e, + 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x12, 0x7c, 0x0a, + 0x13, 0x47, 0x65, 0x74, 0x41, 0x6c, 0x6c, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, + 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x31, 0x2e, 0x4f, 0x70, 0x65, 0x6e, 0x49, 0x4d, 0x53, 0x65, 0x72, + 0x76, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, + 0x2e, 0x47, 0x65, 0x74, 0x41, 0x6c, 0x6c, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, + 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x1a, 0x32, 0x2e, 0x4f, 0x70, 0x65, 0x6e, 0x49, 0x4d, + 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, + 0x69, 0x6f, 0x6e, 0x2e, 0x47, 0x65, 0x74, 0x41, 0x6c, 0x6c, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, + 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x12, 0x73, 0x0a, 0x10, 0x47, + 0x65, 0x74, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, + 0x2e, 0x2e, 0x4f, 0x70, 0x65, 0x6e, 0x49, 0x4d, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2e, 0x63, + 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x47, 0x65, 0x74, 0x43, + 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x1a, + 0x2f, 0x2e, 0x4f, 0x70, 0x65, 0x6e, 0x49, 0x4d, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2e, 0x63, + 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x47, 0x65, 0x74, 0x43, + 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, + 0x12, 0x82, 0x01, 0x0a, 0x15, 0x42, 0x61, 0x74, 0x63, 0x68, 0x53, 0x65, 0x74, 0x43, 0x6f, 0x6e, + 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x33, 0x2e, 0x4f, 0x70, 0x65, 0x6e, 0x49, 0x4d, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x42, 0x61, 0x74, 0x63, 0x68, 0x53, 0x65, 0x74, 0x43, - 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, - 0x12, 0x70, 0x0a, 0x0f, 0x53, 0x65, 0x74, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, - 0x69, 0x6f, 0x6e, 0x12, 0x2d, 0x2e, 0x4f, 0x70, 0x65, 0x6e, 0x49, 0x4d, 0x53, 0x65, 0x72, 0x76, - 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, - 0x53, 0x65, 0x74, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, - 0x65, 0x71, 0x1a, 0x2e, 0x2e, 0x4f, 0x70, 0x65, 0x6e, 0x49, 0x4d, 0x53, 0x65, 0x72, 0x76, 0x65, - 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x53, - 0x65, 0x74, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, - 0x73, 0x70, 0x12, 0x6a, 0x0a, 0x0d, 0x53, 0x65, 0x74, 0x52, 0x65, 0x63, 0x76, 0x4d, 0x73, 0x67, - 0x4f, 0x70, 0x74, 0x12, 0x2b, 0x2e, 0x4f, 0x70, 0x65, 0x6e, 0x49, 0x4d, 0x53, 0x65, 0x72, 0x76, - 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, - 0x53, 0x65, 0x74, 0x52, 0x65, 0x63, 0x76, 0x4d, 0x73, 0x67, 0x4f, 0x70, 0x74, 0x52, 0x65, 0x71, - 0x1a, 0x2c, 0x2e, 0x4f, 0x70, 0x65, 0x6e, 0x49, 0x4d, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2e, - 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x53, 0x65, 0x74, - 0x52, 0x65, 0x63, 0x76, 0x4d, 0x73, 0x67, 0x4f, 0x70, 0x74, 0x52, 0x65, 0x73, 0x70, 0x12, 0x91, - 0x01, 0x0a, 0x1a, 0x47, 0x65, 0x74, 0x52, 0x65, 0x63, 0x76, 0x4d, 0x73, 0x67, 0x4e, 0x6f, 0x74, - 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x79, 0x55, 0x73, 0x65, 0x72, 0x49, 0x44, 0x73, 0x12, 0x38, 0x2e, - 0x4f, 0x70, 0x65, 0x6e, 0x49, 0x4d, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, - 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x63, - 0x76, 0x4d, 0x73, 0x67, 0x4e, 0x6f, 0x74, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x79, 0x55, 0x73, 0x65, - 0x72, 0x49, 0x44, 0x73, 0x52, 0x65, 0x71, 0x1a, 0x39, 0x2e, 0x4f, 0x70, 0x65, 0x6e, 0x49, 0x4d, + 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x1a, + 0x34, 0x2e, 0x4f, 0x70, 0x65, 0x6e, 0x49, 0x4d, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2e, 0x63, + 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x42, 0x61, 0x74, 0x63, + 0x68, 0x53, 0x65, 0x74, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, + 0x73, 0x52, 0x65, 0x73, 0x70, 0x12, 0x70, 0x0a, 0x0f, 0x53, 0x65, 0x74, 0x43, 0x6f, 0x6e, 0x76, + 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x2d, 0x2e, 0x4f, 0x70, 0x65, 0x6e, 0x49, + 0x4d, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, + 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x53, 0x65, 0x74, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, + 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x1a, 0x2e, 0x2e, 0x4f, 0x70, 0x65, 0x6e, 0x49, 0x4d, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, - 0x69, 0x6f, 0x6e, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x63, 0x76, 0x4d, 0x73, 0x67, 0x4e, 0x6f, - 0x74, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x79, 0x55, 0x73, 0x65, 0x72, 0x49, 0x44, 0x73, 0x52, 0x65, - 0x73, 0x70, 0x12, 0x9a, 0x01, 0x0a, 0x1d, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x53, 0x69, 0x6e, - 0x67, 0x6c, 0x65, 0x43, 0x68, 0x61, 0x74, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, - 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x3b, 0x2e, 0x4f, 0x70, 0x65, 0x6e, 0x49, 0x4d, 0x53, 0x65, 0x72, - 0x76, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, - 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x53, 0x69, 0x6e, 0x67, 0x6c, 0x65, 0x43, 0x68, 0x61, - 0x74, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, - 0x71, 0x1a, 0x3c, 0x2e, 0x4f, 0x70, 0x65, 0x6e, 0x49, 0x4d, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, - 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x43, 0x72, - 0x65, 0x61, 0x74, 0x65, 0x53, 0x69, 0x6e, 0x67, 0x6c, 0x65, 0x43, 0x68, 0x61, 0x74, 0x43, 0x6f, - 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x12, - 0x97, 0x01, 0x0a, 0x1c, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x43, - 0x68, 0x61, 0x74, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, - 0x12, 0x3a, 0x2e, 0x4f, 0x70, 0x65, 0x6e, 0x49, 0x4d, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2e, - 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x43, 0x72, 0x65, - 0x61, 0x74, 0x65, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x43, 0x68, 0x61, 0x74, 0x43, 0x6f, 0x6e, 0x76, - 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x1a, 0x3b, 0x2e, 0x4f, + 0x69, 0x6f, 0x6e, 0x2e, 0x53, 0x65, 0x74, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, + 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x12, 0x6a, 0x0a, 0x0d, 0x53, 0x65, 0x74, 0x52, 0x65, + 0x63, 0x76, 0x4d, 0x73, 0x67, 0x4f, 0x70, 0x74, 0x12, 0x2b, 0x2e, 0x4f, 0x70, 0x65, 0x6e, 0x49, + 0x4d, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, + 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x53, 0x65, 0x74, 0x52, 0x65, 0x63, 0x76, 0x4d, 0x73, 0x67, 0x4f, + 0x70, 0x74, 0x52, 0x65, 0x71, 0x1a, 0x2c, 0x2e, 0x4f, 0x70, 0x65, 0x6e, 0x49, 0x4d, 0x53, 0x65, + 0x72, 0x76, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x2e, 0x53, 0x65, 0x74, 0x52, 0x65, 0x63, 0x76, 0x4d, 0x73, 0x67, 0x4f, 0x70, 0x74, 0x52, + 0x65, 0x73, 0x70, 0x12, 0x91, 0x01, 0x0a, 0x1a, 0x47, 0x65, 0x74, 0x52, 0x65, 0x63, 0x76, 0x4d, + 0x73, 0x67, 0x4e, 0x6f, 0x74, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x79, 0x55, 0x73, 0x65, 0x72, 0x49, + 0x44, 0x73, 0x12, 0x38, 0x2e, 0x4f, 0x70, 0x65, 0x6e, 0x49, 0x4d, 0x53, 0x65, 0x72, 0x76, 0x65, + 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x47, + 0x65, 0x74, 0x52, 0x65, 0x63, 0x76, 0x4d, 0x73, 0x67, 0x4e, 0x6f, 0x74, 0x4e, 0x6f, 0x74, 0x69, + 0x66, 0x79, 0x55, 0x73, 0x65, 0x72, 0x49, 0x44, 0x73, 0x52, 0x65, 0x71, 0x1a, 0x39, 0x2e, 0x4f, 0x70, 0x65, 0x6e, 0x49, 0x4d, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x76, - 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x47, + 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x63, 0x76, + 0x4d, 0x73, 0x67, 0x4e, 0x6f, 0x74, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x79, 0x55, 0x73, 0x65, 0x72, + 0x49, 0x44, 0x73, 0x52, 0x65, 0x73, 0x70, 0x12, 0x9a, 0x01, 0x0a, 0x1d, 0x43, 0x72, 0x65, 0x61, + 0x74, 0x65, 0x53, 0x69, 0x6e, 0x67, 0x6c, 0x65, 0x43, 0x68, 0x61, 0x74, 0x43, 0x6f, 0x6e, 0x76, + 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x3b, 0x2e, 0x4f, 0x70, 0x65, 0x6e, + 0x49, 0x4d, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, + 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x53, 0x69, 0x6e, 0x67, + 0x6c, 0x65, 0x43, 0x68, 0x61, 0x74, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, + 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x1a, 0x3c, 0x2e, 0x4f, 0x70, 0x65, 0x6e, 0x49, 0x4d, 0x53, + 0x65, 0x72, 0x76, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, + 0x6f, 0x6e, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x53, 0x69, 0x6e, 0x67, 0x6c, 0x65, 0x43, + 0x68, 0x61, 0x74, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, + 0x52, 0x65, 0x73, 0x70, 0x12, 0x97, 0x01, 0x0a, 0x1c, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x43, 0x68, 0x61, 0x74, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, - 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x12, 0x8e, 0x01, 0x0a, 0x19, 0x44, 0x65, - 0x6c, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x43, 0x68, 0x61, 0x74, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, - 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x37, 0x2e, 0x4f, 0x70, 0x65, 0x6e, 0x49, 0x4d, + 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x3a, 0x2e, 0x4f, 0x70, 0x65, 0x6e, 0x49, 0x4d, 0x53, 0x65, + 0x72, 0x76, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x43, 0x68, 0x61, + 0x74, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, + 0x71, 0x1a, 0x3b, 0x2e, 0x4f, 0x70, 0x65, 0x6e, 0x49, 0x4d, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, + 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x43, 0x72, + 0x65, 0x61, 0x74, 0x65, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x43, 0x68, 0x61, 0x74, 0x43, 0x6f, 0x6e, + 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x12, 0x8e, + 0x01, 0x0a, 0x19, 0x44, 0x65, 0x6c, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x43, 0x68, 0x61, 0x74, 0x43, + 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x37, 0x2e, 0x4f, + 0x70, 0x65, 0x6e, 0x49, 0x4d, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x76, + 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x44, 0x65, 0x6c, 0x47, 0x72, 0x6f, 0x75, + 0x70, 0x43, 0x68, 0x61, 0x74, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x73, 0x52, 0x65, 0x71, 0x1a, 0x38, 0x2e, 0x4f, 0x70, 0x65, 0x6e, 0x49, 0x4d, 0x53, 0x65, + 0x72, 0x76, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x2e, 0x44, 0x65, 0x6c, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x43, 0x68, 0x61, 0x74, 0x43, 0x6f, + 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x12, + 0x79, 0x0a, 0x12, 0x47, 0x65, 0x74, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, + 0x6f, 0x6e, 0x49, 0x44, 0x73, 0x12, 0x30, 0x2e, 0x4f, 0x70, 0x65, 0x6e, 0x49, 0x4d, 0x53, 0x65, + 0x72, 0x76, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x2e, 0x47, 0x65, 0x74, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x49, 0x44, 0x73, 0x52, 0x65, 0x71, 0x1a, 0x31, 0x2e, 0x4f, 0x70, 0x65, 0x6e, 0x49, 0x4d, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, - 0x69, 0x6f, 0x6e, 0x2e, 0x44, 0x65, 0x6c, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x43, 0x68, 0x61, 0x74, - 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, - 0x1a, 0x38, 0x2e, 0x4f, 0x70, 0x65, 0x6e, 0x49, 0x4d, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2e, - 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x44, 0x65, 0x6c, - 0x47, 0x72, 0x6f, 0x75, 0x70, 0x43, 0x68, 0x61, 0x74, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, - 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x42, 0x3c, 0x5a, 0x3a, 0x67, 0x69, + 0x69, 0x6f, 0x6e, 0x2e, 0x47, 0x65, 0x74, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, + 0x69, 0x6f, 0x6e, 0x49, 0x44, 0x73, 0x52, 0x65, 0x73, 0x70, 0x42, 0x3c, 0x5a, 0x3a, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x4f, 0x70, 0x65, 0x6e, 0x49, 0x4d, 0x53, 0x44, 0x4b, 0x2f, 0x4f, 0x70, 0x65, 0x6e, 0x2d, 0x49, 0x4d, 0x2d, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x63, 0x6f, 0x6e, 0x76, @@ -1530,7 +1639,7 @@ func file_conversation_conversation_proto_rawDescGZIP() []byte { return file_conversation_conversation_proto_rawDescData } -var file_conversation_conversation_proto_msgTypes = make([]protoimpl.MessageInfo, 23) +var file_conversation_conversation_proto_msgTypes = make([]protoimpl.MessageInfo, 25) var file_conversation_conversation_proto_goTypes = []interface{}{ (*Conversation)(nil), // 0: OpenIMServer.conversation.Conversation (*ModifyConversationFieldReq)(nil), // 1: OpenIMServer.conversation.ModifyConversationFieldReq @@ -1555,6 +1664,8 @@ var file_conversation_conversation_proto_goTypes = []interface{}{ (*CreateGroupChatConversationsResp)(nil), // 20: OpenIMServer.conversation.CreateGroupChatConversationsResp (*DelGroupChatConversationsReq)(nil), // 21: OpenIMServer.conversation.DelGroupChatConversationsReq (*DelGroupChatConversationsResp)(nil), // 22: OpenIMServer.conversation.DelGroupChatConversationsResp + (*GetConversationIDsReq)(nil), // 23: OpenIMServer.conversation.GetConversationIDsReq + (*GetConversationIDsResp)(nil), // 24: OpenIMServer.conversation.GetConversationIDsResp } var file_conversation_conversation_proto_depIdxs = []int32{ 0, // 0: OpenIMServer.conversation.ModifyConversationFieldReq.conversation:type_name -> OpenIMServer.conversation.Conversation @@ -1574,19 +1685,21 @@ var file_conversation_conversation_proto_depIdxs = []int32{ 17, // 14: OpenIMServer.conversation.conversation.CreateSingleChatConversations:input_type -> OpenIMServer.conversation.CreateSingleChatConversationsReq 19, // 15: OpenIMServer.conversation.conversation.CreateGroupChatConversations:input_type -> OpenIMServer.conversation.CreateGroupChatConversationsReq 21, // 16: OpenIMServer.conversation.conversation.DelGroupChatConversations:input_type -> OpenIMServer.conversation.DelGroupChatConversationsReq - 2, // 17: OpenIMServer.conversation.conversation.ModifyConversationField:output_type -> OpenIMServer.conversation.ModifyConversationFieldResp - 8, // 18: OpenIMServer.conversation.conversation.GetConversation:output_type -> OpenIMServer.conversation.GetConversationResp - 12, // 19: OpenIMServer.conversation.conversation.GetAllConversations:output_type -> OpenIMServer.conversation.GetAllConversationsResp - 10, // 20: OpenIMServer.conversation.conversation.GetConversations:output_type -> OpenIMServer.conversation.GetConversationsResp - 14, // 21: OpenIMServer.conversation.conversation.BatchSetConversations:output_type -> OpenIMServer.conversation.BatchSetConversationsResp - 4, // 22: OpenIMServer.conversation.conversation.SetConversation:output_type -> OpenIMServer.conversation.SetConversationResp - 6, // 23: OpenIMServer.conversation.conversation.SetRecvMsgOpt:output_type -> OpenIMServer.conversation.SetRecvMsgOptResp - 16, // 24: OpenIMServer.conversation.conversation.GetRecvMsgNotNotifyUserIDs:output_type -> OpenIMServer.conversation.GetRecvMsgNotNotifyUserIDsResp - 18, // 25: OpenIMServer.conversation.conversation.CreateSingleChatConversations:output_type -> OpenIMServer.conversation.CreateSingleChatConversationsResp - 20, // 26: OpenIMServer.conversation.conversation.CreateGroupChatConversations:output_type -> OpenIMServer.conversation.CreateGroupChatConversationsResp - 22, // 27: OpenIMServer.conversation.conversation.DelGroupChatConversations:output_type -> OpenIMServer.conversation.DelGroupChatConversationsResp - 17, // [17:28] is the sub-list for method output_type - 6, // [6:17] is the sub-list for method input_type + 23, // 17: OpenIMServer.conversation.conversation.GetConversationIDs:input_type -> OpenIMServer.conversation.GetConversationIDsReq + 2, // 18: OpenIMServer.conversation.conversation.ModifyConversationField:output_type -> OpenIMServer.conversation.ModifyConversationFieldResp + 8, // 19: OpenIMServer.conversation.conversation.GetConversation:output_type -> OpenIMServer.conversation.GetConversationResp + 12, // 20: OpenIMServer.conversation.conversation.GetAllConversations:output_type -> OpenIMServer.conversation.GetAllConversationsResp + 10, // 21: OpenIMServer.conversation.conversation.GetConversations:output_type -> OpenIMServer.conversation.GetConversationsResp + 14, // 22: OpenIMServer.conversation.conversation.BatchSetConversations:output_type -> OpenIMServer.conversation.BatchSetConversationsResp + 4, // 23: OpenIMServer.conversation.conversation.SetConversation:output_type -> OpenIMServer.conversation.SetConversationResp + 6, // 24: OpenIMServer.conversation.conversation.SetRecvMsgOpt:output_type -> OpenIMServer.conversation.SetRecvMsgOptResp + 16, // 25: OpenIMServer.conversation.conversation.GetRecvMsgNotNotifyUserIDs:output_type -> OpenIMServer.conversation.GetRecvMsgNotNotifyUserIDsResp + 18, // 26: OpenIMServer.conversation.conversation.CreateSingleChatConversations:output_type -> OpenIMServer.conversation.CreateSingleChatConversationsResp + 20, // 27: OpenIMServer.conversation.conversation.CreateGroupChatConversations:output_type -> OpenIMServer.conversation.CreateGroupChatConversationsResp + 22, // 28: OpenIMServer.conversation.conversation.DelGroupChatConversations:output_type -> OpenIMServer.conversation.DelGroupChatConversationsResp + 24, // 29: OpenIMServer.conversation.conversation.GetConversationIDs:output_type -> OpenIMServer.conversation.GetConversationIDsResp + 18, // [18:30] is the sub-list for method output_type + 6, // [6:18] is the sub-list for method input_type 6, // [6:6] is the sub-list for extension type_name 6, // [6:6] is the sub-list for extension extendee 0, // [0:6] is the sub-list for field type_name @@ -1874,6 +1987,30 @@ func file_conversation_conversation_proto_init() { return nil } } + file_conversation_conversation_proto_msgTypes[23].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetConversationIDsReq); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_conversation_conversation_proto_msgTypes[24].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetConversationIDsResp); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -1881,7 +2018,7 @@ func file_conversation_conversation_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_conversation_conversation_proto_rawDesc, NumEnums: 0, - NumMessages: 23, + NumMessages: 25, NumExtensions: 0, NumServices: 1, }, @@ -1918,6 +2055,7 @@ type ConversationClient interface { CreateSingleChatConversations(ctx context.Context, in *CreateSingleChatConversationsReq, opts ...grpc.CallOption) (*CreateSingleChatConversationsResp, error) CreateGroupChatConversations(ctx context.Context, in *CreateGroupChatConversationsReq, opts ...grpc.CallOption) (*CreateGroupChatConversationsResp, error) DelGroupChatConversations(ctx context.Context, in *DelGroupChatConversationsReq, opts ...grpc.CallOption) (*DelGroupChatConversationsResp, error) + GetConversationIDs(ctx context.Context, in *GetConversationIDsReq, opts ...grpc.CallOption) (*GetConversationIDsResp, error) } type conversationClient struct { @@ -2027,6 +2165,15 @@ func (c *conversationClient) DelGroupChatConversations(ctx context.Context, in * return out, nil } +func (c *conversationClient) GetConversationIDs(ctx context.Context, in *GetConversationIDsReq, opts ...grpc.CallOption) (*GetConversationIDsResp, error) { + out := new(GetConversationIDsResp) + err := c.cc.Invoke(ctx, "/OpenIMServer.conversation.conversation/GetConversationIDs", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // ConversationServer is the server API for Conversation service. type ConversationServer interface { ModifyConversationField(context.Context, *ModifyConversationFieldReq) (*ModifyConversationFieldResp, error) @@ -2040,6 +2187,7 @@ type ConversationServer interface { CreateSingleChatConversations(context.Context, *CreateSingleChatConversationsReq) (*CreateSingleChatConversationsResp, error) CreateGroupChatConversations(context.Context, *CreateGroupChatConversationsReq) (*CreateGroupChatConversationsResp, error) DelGroupChatConversations(context.Context, *DelGroupChatConversationsReq) (*DelGroupChatConversationsResp, error) + GetConversationIDs(context.Context, *GetConversationIDsReq) (*GetConversationIDsResp, error) } // UnimplementedConversationServer can be embedded to have forward compatible implementations. @@ -2079,6 +2227,9 @@ func (*UnimplementedConversationServer) CreateGroupChatConversations(context.Con func (*UnimplementedConversationServer) DelGroupChatConversations(context.Context, *DelGroupChatConversationsReq) (*DelGroupChatConversationsResp, error) { return nil, status.Errorf(codes.Unimplemented, "method DelGroupChatConversations not implemented") } +func (*UnimplementedConversationServer) GetConversationIDs(context.Context, *GetConversationIDsReq) (*GetConversationIDsResp, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetConversationIDs not implemented") +} func RegisterConversationServer(s *grpc.Server, srv ConversationServer) { s.RegisterService(&_Conversation_serviceDesc, srv) @@ -2282,6 +2433,24 @@ func _Conversation_DelGroupChatConversations_Handler(srv interface{}, ctx contex return interceptor(ctx, in, info, handler) } +func _Conversation_GetConversationIDs_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetConversationIDsReq) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ConversationServer).GetConversationIDs(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/OpenIMServer.conversation.conversation/GetConversationIDs", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ConversationServer).GetConversationIDs(ctx, req.(*GetConversationIDsReq)) + } + return interceptor(ctx, in, info, handler) +} + var _Conversation_serviceDesc = grpc.ServiceDesc{ ServiceName: "OpenIMServer.conversation.conversation", HandlerType: (*ConversationServer)(nil), @@ -2330,6 +2499,10 @@ var _Conversation_serviceDesc = grpc.ServiceDesc{ MethodName: "DelGroupChatConversations", Handler: _Conversation_DelGroupChatConversations_Handler, }, + { + MethodName: "GetConversationIDs", + Handler: _Conversation_GetConversationIDs_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "conversation/conversation.proto", diff --git a/pkg/proto/conversation/conversation.proto b/pkg/proto/conversation/conversation.proto index eac0ba6db..a5409cb0d 100644 --- a/pkg/proto/conversation/conversation.proto +++ b/pkg/proto/conversation/conversation.proto @@ -115,6 +115,14 @@ message DelGroupChatConversationsReq { message DelGroupChatConversationsResp { } +message GetConversationIDsReq { + string userID = 1; +} + +message GetConversationIDsResp { + repeated string conversationIDs = 1; +} + service conversation { rpc ModifyConversationField(ModifyConversationFieldReq)returns(ModifyConversationFieldResp); rpc GetConversation(GetConversationReq)returns(GetConversationResp); @@ -127,4 +135,5 @@ service conversation { rpc CreateSingleChatConversations(CreateSingleChatConversationsReq) returns (CreateSingleChatConversationsResp); rpc CreateGroupChatConversations(CreateGroupChatConversationsReq) returns (CreateGroupChatConversationsResp); rpc DelGroupChatConversations(DelGroupChatConversationsReq) returns(DelGroupChatConversationsResp); + rpc GetConversationIDs(GetConversationIDsReq) returns(GetConversationIDsResp); } diff --git a/pkg/rpcclient/conversation.go b/pkg/rpcclient/conversation.go index 3a7db04e5..ecd7c0912 100644 --- a/pkg/rpcclient/conversation.go +++ b/pkg/rpcclient/conversation.go @@ -67,3 +67,12 @@ func (c *ConversationClient) DelGroupChatConversations(ctx context.Context, owne _, err = conversation.NewConversationClient(cc).DelGroupChatConversations(ctx, &pbConversation.DelGroupChatConversationsReq{OwnerUserID: ownerUserIDs, GroupID: groupID, MaxSeq: maxSeq}) return err } + +func (c *ConversationClient) GetConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) { + cc, err := c.getConn() + if err != nil { + return nil, err + } + resp, err := conversation.NewConversationClient(cc).GetConversationIDs(ctx, &pbConversation.GetConversationIDsReq{UserID: ownerUserID}) + return resp.ConversationIDs, err +} diff --git a/pkg/utils/lock.go b/pkg/utils/lock.go deleted file mode 100644 index d0070f56c..000000000 --- a/pkg/utils/lock.go +++ /dev/null @@ -1,6 +0,0 @@ -package utils - -type DistributedLock interface { - Lock() - UnLock() -} diff --git a/pkg/utils/strings.go b/pkg/utils/strings.go index 4a32a6a1c..f8feeaa8d 100644 --- a/pkg/utils/strings.go +++ b/pkg/utils/strings.go @@ -101,16 +101,21 @@ func GetConversationIDBySessionType(sessionType int, ids ...string) string { } switch sessionType { case constant.SingleChatType: - return "single_" + strings.Join(ids, "_") + return "si_" + strings.Join(ids, "_") // single chat case constant.GroupChatType: - return "group_" + ids[0] + return "g_" + ids[0] // group chat case constant.SuperGroupChatType: - return "super_group_" + ids[0] + return "sg_" + ids[0] // super group chat case constant.NotificationChatType: - return "notification_" + ids[0] + return "sn_" + ids[0] // server notification chat } return "" } + +func IsNotification(conversationID string) bool { + return strings.HasPrefix(conversationID, "n_") +} + func int64ToString(i int64) string { return strconv.FormatInt(i, 10) } @@ -152,7 +157,3 @@ func IsDuplicateStringSlice(arr []string) bool { } return false } - -func IsDuplicateID(args ...interface{}) bool { - return false -}