From 1dad0b06d1010c5ac703d9cfe9e2ed110ca41312 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Thu, 18 May 2023 16:22:59 +0800 Subject: [PATCH] mongo --- internal/rpc/msg/server.go | 2 +- internal/tools/cron_task.go | 1 - internal/tools/msg.go | 179 +++++++------------------------- internal/tools/msg_test.go | 22 ++-- pkg/common/db/controller/msg.go | 5 + pkg/utils/utils.go | 2 +- 6 files changed, 56 insertions(+), 155 deletions(-) diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index 69eb2c7fb..5866ff0b9 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -136,7 +136,7 @@ func (m *msgServer) GetMaxSeq(ctx context.Context, req *sdkws.GetMaxSeqReq) (*sd return nil, err } for _, conversationID := range conversationIDs { - conversationIDs = append(conversationIDs, utils.GetNotificationConvetstionID(conversationID)) + conversationIDs = append(conversationIDs, utils.GetNotificationConversationIDByConversationID(conversationID)) } log.ZDebug(ctx, "GetMaxSeq", "conversationIDs", conversationIDs) maxSeqs, err := m.MsgDatabase.GetMaxSeqs(ctx, conversationIDs) diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index 8b2719a51..4cffe9990 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -9,7 +9,6 @@ import ( "github.com/robfig/cron/v3" ) -const cronTaskOperationID = "cronTaskOperationID-" const moduleName = "cron" func StartCronTask() error { diff --git a/internal/tools/msg.go b/internal/tools/msg.go index 2333558fa..83c4b9c90 100644 --- a/internal/tools/msg.go +++ b/internal/tools/msg.go @@ -7,7 +7,6 @@ import ( "math" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/relation" @@ -16,7 +15,6 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" - "github.com/go-redis/redis/v8" ) type MsgTool struct { @@ -65,169 +63,68 @@ func (c *MsgTool) AllConversationClearMsgAndFixSeq() { log.ZError(ctx, "GetAllConversationIDs failed", err) return } - c.ClearSuperGroupMsg(ctx, conversationIDs) + for _, conversationID := range conversationIDs { + conversationIDs = append(conversationIDs, utils.GetNotificationConversationIDByConversationID(conversationID)) + } + c.ClearConversationsMsg(ctx, conversationIDs) log.ZInfo(ctx, "============================ start del cron finished ============================") } -func (c *MsgTool) ClearUsersMsg(ctx context.Context, userIDs []string) { - for _, userID := range userIDs { - if err := c.msgDatabase.DeleteConversationMsgsAndSetMinSeq(ctx, userID, int64(config.Config.Mongo.DBRetainChatRecords*24*60*60)); err != nil { - log.ZError(ctx, "DeleteUserMsgsAndSetMinSeq failed", err, "userID", userID, "DBRetainChatRecords", config.Config.Mongo.DBRetainChatRecords) +func (c *MsgTool) ClearConversationsMsg(ctx context.Context, conversationIDs []string) { + for _, conversationID := range conversationIDs { + if err := c.msgDatabase.DeleteConversationMsgsAndSetMinSeq(ctx, conversationID, int64(config.Config.Mongo.DBRetainChatRecords*24*60*60)); err != nil { + log.ZError(ctx, "DeleteUserSuperGroupMsgsAndSetMinSeq failed", err, "conversationID", conversationID, "DBRetainChatRecords", config.Config.Mongo.DBRetainChatRecords) } - maxSeqCache, maxSeqMongo, err := c.GetAndFixUserSeqs(ctx, userID) - if err != nil { - continue + if err := c.fixAndCheckSeq(ctx, conversationID); err != nil { + log.ZError(ctx, "fixSeq failed", err, "conversationID", conversationID) } - c.CheckMaxSeqWithMongo(ctx, userID, maxSeqCache, maxSeqMongo) + } } -func (c *MsgTool) ClearSuperGroupMsg(ctx context.Context, superGroupIDs []string) { - for _, groupID := range superGroupIDs { - userIDs, err := c.groupDatabase.FindGroupMemberUserID(ctx, groupID) - if err != nil { - log.ZError(ctx, "ClearSuperGroupMsg failed", err, "groupID", groupID) - continue - } - if err := c.msgDatabase.DeleteConversationMsgsAndSetMinSeq(ctx, groupID, int64(config.Config.Mongo.DBRetainChatRecords*24*60*60)); err != nil { - log.ZError(ctx, "DeleteUserSuperGroupMsgsAndSetMinSeq failed", err, "groupID", groupID, "userID", userIDs, "DBRetainChatRecords", config.Config.Mongo.DBRetainChatRecords) - } - if err := c.fixGroupSeq(ctx, groupID, userIDs); err != nil { - log.ZError(ctx, "fixGroupSeq failed", err, "groupID", groupID, "userID", userIDs) - } - } -} - -func (c *MsgTool) FixGroupSeq(ctx context.Context, groupID string) error { - userIDs, err := c.groupDatabase.FindGroupMemberUserID(ctx, groupID) +func (c *MsgTool) checkMaxSeqWithMongo(ctx context.Context, conversationID string, maxSeqCache int64) error { + maxSeqMongo, _, err := c.msgDatabase.GetMongoMaxAndMinSeq(ctx, conversationID) if err != nil { return err } - return c.fixGroupSeq(ctx, groupID, userIDs) -} - -func (c *MsgTool) fixGroupSeq(ctx context.Context, groupID string, userIDs []string) error { - _, maxSeqMongo, _, maxSeqCache, err := c.msgDatabase.GetConversationMinMaxSeqInMongoAndCache(ctx, groupID) - if err != nil { - if err == unrelation.ErrMsgNotFound { - return nil - } - return err - } - for _, userID := range userIDs { - if _, err := c.GetAndFixGroupUserSeq(ctx, userID, groupID, maxSeqCache); err != nil { - continue - } - } - if err := c.CheckMaxSeqWithMongo(ctx, groupID, maxSeqCache, maxSeqMongo); err != nil { - log.ZWarn(ctx, "cache max seq and mongo max seq is diff > 10", err, "groupID", groupID, "maxSeqCache", maxSeqCache, "maxSeqMongo", maxSeqMongo, "constant.WriteDiffusion", constant.WriteDiffusion) - } - return nil -} - -func (c *MsgTool) GetAndFixUserSeqs(ctx context.Context, userID string) (maxSeqCache, maxSeqMongo int64, err error) { - minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err := c.msgDatabase.GetConversationMinMaxSeqInMongoAndCache(ctx, userID) - if err != nil { - if err != unrelation.ErrMsgNotFound { - log.ZError(ctx, "GetUserMinMaxSeqInMongoAndCache failed", err, "userID", userID) - } - return 0, 0, err - } - log.ZDebug(ctx, "userID", userID, "minSeqMongo", minSeqMongo, "maxSeqMongo", maxSeqMongo, "minSeqCache", minSeqCache, "maxSeqCache", maxSeqCache) - if minSeqCache > maxSeqCache { - if err := c.msgDatabase.SetMinSeq(ctx, userID, maxSeqCache); err != nil { - log.ZError(ctx, "SetUserMinSeq failed", err, "userID", userID, "minSeqCache", minSeqCache, "maxSeqCache", maxSeqCache) - } else { - log.ZInfo(ctx, "SetUserMinSeq success", "userID", userID, "minSeqCache", minSeqCache, "maxSeqCache", maxSeqCache) - } - } - return maxSeqCache, maxSeqMongo, nil -} - -func (c *MsgTool) GetAndFixGroupUserSeq(ctx context.Context, userID string, groupID string, maxSeqCache int64) (minSeqCache int64, err error) { - minSeqCache, err = c.msgDatabase.GetMinSeq(ctx, groupID) - if err != nil { - log.ZError(ctx, "GetGroupUserMinSeq failed", err, "groupID", groupID, "userID", userID) - return 0, err - } - if minSeqCache > maxSeqCache { - if err := c.msgDatabase.SetConversationUserMinSeq(ctx, groupID, userID, maxSeqCache); err != nil { - log.ZError(ctx, "SetGroupUserMinSeq failed", err, "groupID", groupID, "userID", userID, "minSeqCache", minSeqCache, "maxSeqCache", maxSeqCache) - } else { - log.ZInfo(ctx, "SetGroupUserMinSeq success", "groupID", groupID, "userID", userID, "minSeqCache", minSeqCache, "maxSeqCache", maxSeqCache) - } - } - return minSeqCache, nil -} - -func (c *MsgTool) CheckMaxSeqWithMongo(ctx context.Context, conversationID string, maxSeqCache, maxSeqMongo int64) error { if math.Abs(float64(maxSeqMongo-maxSeqCache)) > 10 { return errSeq } return nil } -func (c *MsgTool) ShowUserSeqs(ctx context.Context, userID string) { - -} - -func (c *MsgTool) ShowSuperGroupSeqs(ctx context.Context, groupID string) { - -} - -func (c *MsgTool) ShowSuperGroupUserSeqs(ctx context.Context, groupID, userID string) { - +func (c *MsgTool) fixAndCheckSeq(ctx context.Context, conversationID string) error { + maxSeq, err := c.msgDatabase.GetMaxSeq(ctx, conversationID) + if err != nil { + return err + } + minSeq, err := c.msgDatabase.GetMinSeq(ctx, conversationID) + if err != nil { + return err + } + if minSeq > maxSeq { + if err = c.msgDatabase.SetMinSeq(ctx, conversationID, maxSeq); err != nil { + return err + } + } + if err := c.checkMaxSeqWithMongo(ctx, conversationID, maxSeq); err != nil { + return err + } + return nil } func (c *MsgTool) FixAllSeq(ctx context.Context) error { - userIDs, err := c.userDatabase.GetAllUserID(ctx) + conversationIDs, err := c.conversationDatabase.GetAllConversationIDs(ctx) if err != nil { + log.ZError(ctx, "GetAllConversationIDs failed", err) return err } - for _, userID := range userIDs { - userCurrentMinSeq, err := c.msgDatabase.GetMinSeq(ctx, userID) - if err != nil && err != redis.Nil { - continue - } - userCurrentMaxSeq, err := c.msgDatabase.GetMaxSeq(ctx, userID) - if err != nil && err != redis.Nil { - continue - } - if userCurrentMinSeq > userCurrentMaxSeq { - if err = c.msgDatabase.SetMinSeq(ctx, userID, userCurrentMaxSeq); err != nil { - fmt.Println("SetUserMinSeq failed", userID, userCurrentMaxSeq) - } - fmt.Println("fix", userID, userCurrentMaxSeq) - } + for _, conversationID := range conversationIDs { + conversationIDs = append(conversationIDs, utils.GetNotificationConversationIDByConversationID(conversationID)) } - fmt.Println("fix users seq success") - groupIDs, err := c.groupDatabase.GetGroupIDsByGroupType(ctx, constant.WorkingGroup) - if err != nil { - return err - } - for _, groupID := range groupIDs { - maxSeq, err := c.msgDatabase.GetMaxSeq(ctx, groupID) - if err != nil { - fmt.Println("GetGroupMaxSeq failed", groupID) - continue - } - userIDs, err := c.groupDatabase.FindGroupMemberUserID(ctx, groupID) - if err != nil { - fmt.Println("get groupID", groupID, "failed, try again later") - continue - } - for _, userID := range userIDs { - userMinSeq, err := c.msgDatabase.GetMinSeq(ctx, groupID) - if err != nil && err != redis.Nil { - fmt.Println("GetGroupUserMinSeq failed", groupID, userID) - continue - } - if userMinSeq > maxSeq { - if err = c.msgDatabase.SetMinSeq(ctx, groupID, maxSeq); err != nil { - fmt.Println("SetGroupUserMinSeq failed", err.Error(), groupID, userID, maxSeq) - } - fmt.Println("fix", groupID, userID, maxSeq, userMinSeq) - } + for _, conversationID := range conversationIDs { + if err := c.fixAndCheckSeq(ctx, conversationID); err != nil { + log.ZError(ctx, "fixSeq failed", err, "conversationID", conversationID) } } fmt.Println("fix all seq finished") diff --git a/internal/tools/msg_test.go b/internal/tools/msg_test.go index cfc4e7ad9..0e61ee7f8 100644 --- a/internal/tools/msg_test.go +++ b/internal/tools/msg_test.go @@ -90,13 +90,13 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) { t.Error("init failed") return } - msgTools.ClearUsersMsg(ctx, []string{conversationID}) + msgTools.ClearConversationsMsg(ctx, []string{conversationID}) minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err := msgTools.msgDatabase.GetConversationMinMaxSeqInMongoAndCache(ctx, conversationID) if err != nil { t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed") return } - if err := msgTools.CheckMaxSeqWithMongo(ctx, conversationID, maxSeqCache, maxSeqMongo); err != nil { + if maxSeqCache != maxSeqMongo { t.Error("checkMaxSeqWithMongo failed", conversationID) } if minSeqMongo != minSeqCache { @@ -133,13 +133,13 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) { t.Error("InsertOne failed", testUID1) } - msgTools.ClearUsersMsg(ctx, []string{conversationID}) + msgTools.ClearConversationsMsg(ctx, []string{conversationID}) minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err = msgTools.msgDatabase.GetConversationMinMaxSeqInMongoAndCache(ctx, conversationID) if err != nil { t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed") return } - if err := msgTools.CheckMaxSeqWithMongo(ctx, conversationID, maxSeqCache, maxSeqMongo); err != nil { + if maxSeqCache != maxSeqMongo { t.Error("checkMaxSeqWithMongo failed", conversationID) } if minSeqMongo != minSeqCache { @@ -165,13 +165,13 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) { t.Error("InsertOne failed", conversationID) } - msgTools.ClearUsersMsg(ctx, []string{conversationID}) + msgTools.ClearConversationsMsg(ctx, []string{conversationID}) minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err = msgTools.msgDatabase.GetConversationMinMaxSeqInMongoAndCache(ctx, conversationID) if err != nil { t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed") return } - if err := msgTools.CheckMaxSeqWithMongo(ctx, conversationID, maxSeqCache, maxSeqMongo); err != nil { + if maxSeqCache != maxSeqMongo { t.Error("checkMaxSeqWithMongo failed", conversationID) } if minSeqMongo != minSeqCache { @@ -211,7 +211,7 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) { t.Error("InsertOne failed", conversationID) } - msgTools.ClearUsersMsg(ctx, []string{conversationID}) + msgTools.ClearConversationsMsg(ctx, []string{conversationID}) if err != nil { t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed") return @@ -221,7 +221,7 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) { t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed") return } - if err := msgTools.CheckMaxSeqWithMongo(ctx, conversationID, maxSeqCache, maxSeqMongo); err != nil { + if maxSeqCache != maxSeqMongo { t.Error("checkMaxSeqWithMongo failed", conversationID) } if minSeqMongo != minSeqCache { @@ -252,7 +252,7 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) { t.Error("InsertOne failed", conversationID) } - msgTools.ClearUsersMsg(ctx, []string{conversationID}) + msgTools.ClearConversationsMsg(ctx, []string{conversationID}) if err != nil { t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed") return @@ -262,7 +262,7 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) { t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed") return } - if err := msgTools.CheckMaxSeqWithMongo(ctx, conversationID, maxSeqCache, maxSeqMongo); err != nil { + if maxSeqCache != maxSeqMongo { t.Error("checkMaxSeqWithMongo failed", conversationID) } if minSeqMongo != minSeqCache { @@ -312,7 +312,7 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) { t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed") return } - if err := msgTools.CheckMaxSeqWithMongo(ctx, conversationID, maxSeqCache, maxSeqMongo); err != nil { + if maxSeqCache != maxSeqMongo { t.Error("checkMaxSeqWithMongo failed", conversationID) } if minSeqMongo != minSeqCache { diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index 762e4f19e..9d095def3 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -56,6 +56,7 @@ type CommonMsgDatabase interface { SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error) + GetMongoMaxAndMinSeq(ctx context.Context, conversationID string) (maxSeq, minSeq int64, err error) GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) SetSendMsgStatus(ctx context.Context, id string, status int32) error GetSendMsgStatus(ctx context.Context, id string) (int32, error) @@ -678,6 +679,10 @@ func (db *commonMsgDatabase) GetConversationMinMaxSeqInMongoAndCache(ctx context return } +func (db *commonMsgDatabase) GetMongoMaxAndMinSeq(ctx context.Context, conversationID string) (maxSeq, minSeq int64, err error) { + return db.GetMinMaxSeqMongo(ctx, conversationID) +} + func (db *commonMsgDatabase) GetMinMaxSeqMongo(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo int64, err error) { oldestMsgMongo, err := db.msgDocDatabase.GetOldestMsg(ctx, conversationID) if err != nil { diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index a7ce4cf3c..54a7b946d 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -276,7 +276,7 @@ func IsNotification(conversationID string) bool { return strings.HasPrefix(conversationID, "n_") } -func GetNotificationConvetstionID(conversationID string) string { +func GetNotificationConversationIDByConversationID(conversationID string) string { l := strings.Split(conversationID, "_") if len(l) > 1 { l[0] = "n"