From a23ecb1710337e802edf9c035bd6bf79283f414c Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 12 Jul 2023 11:33:08 +0800 Subject: [PATCH 1/3] cron add log and fix cron --- internal/tools/conversation.go | 3 +++ internal/tools/cron_task.go | 3 ++- internal/tools/msg.go | 12 +++++++----- pkg/common/db/controller/msg.go | 4 ++-- 4 files changed, 14 insertions(+), 8 deletions(-) diff --git a/internal/tools/conversation.go b/internal/tools/conversation.go index 3eb47d1ae..a517ece46 100644 --- a/internal/tools/conversation.go +++ b/internal/tools/conversation.go @@ -1,6 +1,7 @@ package tools import ( + "context" "time" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" @@ -9,12 +10,14 @@ import ( ) func (c *MsgTool) ConversationsDestructMsgs() { + log.ZInfo(context.Background(), "start msg destruct cron task") ctx := mcontext.NewCtx(utils.GetSelfFuncName()) conversations, err := c.conversationDatabase.GetConversationIDsNeedDestruct(ctx) if err != nil { log.ZError(ctx, "get conversation id need destruct failed", err) return } + log.ZDebug(context.Background(), "nums conversations need destruct", len(conversations)) for _, conversation := range conversations { log.ZDebug(ctx, "UserMsgsDestruct", "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID, "msgDestructTime", conversation.MsgDestructTime, "lastMsgDestructTime", conversation.LatestMsgDestructTime) seqs, err := c.msgDatabase.UserMsgsDestruct(ctx, conversation.OwnerUserID, conversation.ConversationID, conversation.MsgDestructTime, conversation.LatestMsgDestructTime) diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index 81588746d..3c0f2e00e 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -11,7 +11,6 @@ import ( ) func StartCronTask() error { - log.ZInfo(context.Background(), "start cron task", "cron config", config.Config.ChatRecordsClearTime) fmt.Println("cron task start, config", config.Config.ChatRecordsClearTime) msgTool, err := InitMsgTool() if err != nil { @@ -20,11 +19,13 @@ func StartCronTask() error { c := cron.New() var wg sync.WaitGroup wg.Add(1) + log.ZInfo(context.Background(), "start chatRecordsClearTime cron task", "cron config", config.Config.ChatRecordsClearTime) _, err = c.AddFunc(config.Config.ChatRecordsClearTime, msgTool.AllConversationClearMsgAndFixSeq) if err != nil { fmt.Println("start allConversationClearMsgAndFixSeq cron failed", err.Error(), config.Config.ChatRecordsClearTime) panic(err) } + log.ZInfo(context.Background(), "start msgDestruct cron task", "cron config", config.Config.ChatRecordsClearTime) _, err = c.AddFunc(config.Config.MsgDestructTime, msgTool.ConversationsDestructMsgs) if err != nil { fmt.Println("start conversationsDestructMsgs cron failed", err.Error(), config.Config.ChatRecordsClearTime) diff --git a/internal/tools/msg.go b/internal/tools/msg.go index ef2175bf0..7247d32eb 100644 --- a/internal/tools/msg.go +++ b/internal/tools/msg.go @@ -2,7 +2,6 @@ package tools import ( "context" - "errors" "fmt" "math" "time" @@ -17,9 +16,11 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/mw" "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry/zookeeper" + "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" "github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient" "github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient/notification" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" + "github.com/redis/go-redis/v9" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) @@ -32,8 +33,6 @@ type MsgTool struct { msgNotificationSender *notification.MsgNotificationSender } -var errSeq = errors.New("cache max seq and mongo max seq is diff > 10") - func NewMsgTool(msgDatabase controller.CommonMsgDatabase, userDatabase controller.UserDatabase, groupDatabase controller.GroupDatabase, conversationDatabase controller.ConversationDatabase, msgNotificationSender *notification.MsgNotificationSender) *MsgTool { return &MsgTool{ @@ -103,12 +102,12 @@ func (c *MsgTool) ClearConversationsMsg(ctx context.Context, conversationIDs []s } func (c *MsgTool) checkMaxSeqWithMongo(ctx context.Context, conversationID string, maxSeqCache int64) error { - maxSeqMongo, _, err := c.msgDatabase.GetMongoMaxAndMinSeq(ctx, conversationID) + minSeqMongo, maxSeqMongo, err := c.msgDatabase.GetMongoMaxAndMinSeq(ctx, conversationID) if err != nil { return err } if math.Abs(float64(maxSeqMongo-maxSeqCache)) > 10 { - return errSeq + log.ZError(ctx, "cache max seq and mongo max seq is diff > 10", nil, "maxSeqMongo", maxSeqMongo, "minSeqMongo", minSeqMongo, "maxSeqCache", maxSeqCache, "conversationID", conversationID) } return nil } @@ -116,6 +115,9 @@ func (c *MsgTool) checkMaxSeqWithMongo(ctx context.Context, conversationID strin func (c *MsgTool) checkMaxSeq(ctx context.Context, conversationID string) error { maxSeq, err := c.msgDatabase.GetMaxSeq(ctx, conversationID) if err != nil { + if errs.Unwrap(err) == redis.Nil { + return nil + } return err } if err := c.checkMaxSeqWithMongo(ctx, conversationID, maxSeq); err != nil { diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index ea7cf4fb7..e233ce2b8 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -74,7 +74,7 @@ type CommonMsgDatabase interface { GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error - GetMongoMaxAndMinSeq(ctx context.Context, conversationID string) (maxSeq, minSeq int64, err error) + GetMongoMaxAndMinSeq(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo int64, err error) GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) SetSendMsgStatus(ctx context.Context, id string, status int32) error GetSendMsgStatus(ctx context.Context, id string) (int32, error) @@ -843,7 +843,7 @@ func (db *commonMsgDatabase) GetConversationMinMaxSeqInMongoAndCache(ctx context return } -func (db *commonMsgDatabase) GetMongoMaxAndMinSeq(ctx context.Context, conversationID string) (maxSeq, minSeq int64, err error) { +func (db *commonMsgDatabase) GetMongoMaxAndMinSeq(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo int64, err error) { return db.GetMinMaxSeqMongo(ctx, conversationID) } From 52fcf7073969cc8ff9059bbcd5a1523148dfad0d Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 12 Jul 2023 11:41:33 +0800 Subject: [PATCH 2/3] add log --- internal/tools/conversation.go | 2 +- internal/tools/cron_task.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/tools/conversation.go b/internal/tools/conversation.go index a517ece46..1cad58248 100644 --- a/internal/tools/conversation.go +++ b/internal/tools/conversation.go @@ -17,7 +17,7 @@ func (c *MsgTool) ConversationsDestructMsgs() { log.ZError(ctx, "get conversation id need destruct failed", err) return } - log.ZDebug(context.Background(), "nums conversations need destruct", len(conversations)) + log.ZDebug(context.Background(), "nums conversations need destruct", "nums", len(conversations)) for _, conversation := range conversations { log.ZDebug(ctx, "UserMsgsDestruct", "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID, "msgDestructTime", conversation.MsgDestructTime, "lastMsgDestructTime", conversation.LatestMsgDestructTime) seqs, err := c.msgDatabase.UserMsgsDestruct(ctx, conversation.OwnerUserID, conversation.ConversationID, conversation.MsgDestructTime, conversation.LatestMsgDestructTime) diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index 3c0f2e00e..f51724253 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -25,7 +25,7 @@ func StartCronTask() error { fmt.Println("start allConversationClearMsgAndFixSeq cron failed", err.Error(), config.Config.ChatRecordsClearTime) panic(err) } - log.ZInfo(context.Background(), "start msgDestruct cron task", "cron config", config.Config.ChatRecordsClearTime) + log.ZInfo(context.Background(), "start msgDestruct cron task", "cron config", config.Config.MsgDestructTime) _, err = c.AddFunc(config.Config.MsgDestructTime, msgTool.ConversationsDestructMsgs) if err != nil { fmt.Println("start conversationsDestructMsgs cron failed", err.Error(), config.Config.ChatRecordsClearTime) From 20bb77f0f75dc41a49f004c7f62c091471c0c17a Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 12 Jul 2023 11:49:25 +0800 Subject: [PATCH 3/3] cron --- pkg/common/db/relation/conversation_model.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/pkg/common/db/relation/conversation_model.go b/pkg/common/db/relation/conversation_model.go index c197f8582..04300f704 100644 --- a/pkg/common/db/relation/conversation_model.go +++ b/pkg/common/db/relation/conversation_model.go @@ -82,10 +82,7 @@ func (c *ConversationGorm) GetAllConversationIDs(ctx context.Context) (conversat } func (c *ConversationGorm) GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (hasReadSeqs map[string]int64, err error) { - var conversations []*relation.ConversationModel - err = utils.Wrap(c.db(ctx).Where("owner_user_id = ?", ownerUserID).Select("conversation_id", "has_read_seq").Find(&conversations).Error, "") - hasReadSeqs = make(map[string]int64, len(conversations)) - return hasReadSeqs, err + return nil, nil } func (c *ConversationGorm) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) (conversations []*relation.ConversationModel, err error) { @@ -93,5 +90,5 @@ func (c *ConversationGorm) GetConversationsByConversationID(ctx context.Context, } func (c *ConversationGorm) GetConversationIDsNeedDestruct(ctx context.Context) (conversations []*relation.ConversationModel, err error) { - return conversations, utils.Wrap(c.db(ctx).Where("is_msg_destruct = 1 && UNIX_TIMESTAMP(NOW()) > (msg_destruct_time + UNIX_TIMESTAMP(latest_msg_destruct_time)) && msg_destruct_time != 0").Error, "") + return conversations, utils.Wrap(c.db(ctx).Where("is_msg_destruct = 1 && UNIX_TIMESTAMP(NOW()) > (msg_destruct_time + UNIX_TIMESTAMP(latest_msg_destruct_time)) && msg_destruct_time != 0").Find(&conversations).Error, "") }