From 70758c306a0cdd8eaa41f624565776f09c4f875b Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Thu, 18 May 2023 15:41:08 +0800 Subject: [PATCH] sg notification --- internal/tools/cron_task.go | 5 ++-- internal/tools/msg.go | 27 ++++++++------------ pkg/common/db/cache/msg.go | 6 ++--- pkg/common/db/controller/conversation.go | 5 ++++ pkg/common/db/relation/conversation_model.go | 4 +++ pkg/common/db/table/relation/conversation.go | 1 + pkg/utils/utils.go | 2 +- 7 files changed, 27 insertions(+), 23 deletions(-) diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index 9eb63f5ef..8b2719a51 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -2,10 +2,11 @@ package tools import ( "fmt" + "sync" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" "github.com/robfig/cron/v3" - "sync" ) const cronTaskOperationID = "cronTaskOperationID-" @@ -22,7 +23,7 @@ func StartCronTask() error { c := cron.New() var wg sync.WaitGroup wg.Add(1) - _, err = c.AddFunc(config.Config.Mongo.ChatRecordsClearTime, msgTool.AllUserClearMsgAndFixSeq) + _, err = c.AddFunc(config.Config.Mongo.ChatRecordsClearTime, msgTool.AllConversationClearMsgAndFixSeq) if err != nil { fmt.Println("start cron failed", err.Error(), config.Config.Mongo.ChatRecordsClearTime) return err diff --git a/internal/tools/msg.go b/internal/tools/msg.go index c88b887db..2333558fa 100644 --- a/internal/tools/msg.go +++ b/internal/tools/msg.go @@ -20,9 +20,10 @@ import ( ) type MsgTool struct { - msgDatabase controller.CommonMsgDatabase - userDatabase controller.UserDatabase - groupDatabase controller.GroupDatabase + msgDatabase controller.CommonMsgDatabase + conversationDatabase controller.ConversationDataBase + userDatabase controller.UserDatabase + groupDatabase controller.GroupDatabase } var errSeq = errors.New("cache max seq and mongo max seq is diff > 10") @@ -56,23 +57,15 @@ func InitMsgTool() (*MsgTool, error) { return msgTool, nil } -func (c *MsgTool) AllUserClearMsgAndFixSeq() { +func (c *MsgTool) AllConversationClearMsgAndFixSeq() { ctx := mcontext.NewCtx(utils.GetSelfFuncName()) log.ZInfo(ctx, "============================ start del cron task ============================") - var err error - userIDs, err := c.userDatabase.GetAllUserID(ctx) - if err == nil { - c.ClearUsersMsg(ctx, userIDs) - } else { - log.ZError(ctx, "ClearUsersMsg failed", err) - } - // working group msg clear - superGroupIDs, err := c.groupDatabase.GetGroupIDsByGroupType(ctx, constant.WorkingGroup) - if err == nil { - c.ClearSuperGroupMsg(ctx, superGroupIDs) - } else { - log.ZError(ctx, "ClearSuperGroupMsg failed", err) + conversationIDs, err := c.conversationDatabase.GetAllConversationIDs(ctx) + if err != nil { + log.ZError(ctx, "GetAllConversationIDs failed", err) + return } + c.ClearSuperGroupMsg(ctx, conversationIDs) log.ZInfo(ctx, "============================ start del cron finished ============================") } diff --git a/pkg/common/db/cache/msg.go b/pkg/common/db/cache/msg.go index bd4b2772c..a53e5a5e8 100644 --- a/pkg/common/db/cache/msg.go +++ b/pkg/common/db/cache/msg.go @@ -137,9 +137,9 @@ func (c *msgCache) getSeqs(ctx context.Context, items []string, getkey func(s st if seq.Err() != nil && seq.Err() != redis.Nil { return nil, errs.Wrap(v.Err()) } - seqInt64 := utils.StringToInt64(seq.Val()) - if seqInt64 != 0 { - m[items[i]] = seqInt64 + val := utils.StringToInt64(seq.Val()) + if val != 0 { + m[items[i]] = val } } return m, nil diff --git a/pkg/common/db/controller/conversation.go b/pkg/common/db/controller/conversation.go index 0865a3f86..cb2dc6f15 100644 --- a/pkg/common/db/controller/conversation.go +++ b/pkg/common/db/controller/conversation.go @@ -31,6 +31,7 @@ type ConversationDatabase interface { CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string) error GetConversationIDs(ctx context.Context, userID string) ([]string, error) GetUserConversationIDsHash(ctx context.Context, ownerUserID string) (hash uint64, err error) + GetAllConversationIDs(ctx context.Context) ([]string, error) } func NewConversationDatabase(conversation relationTb.ConversationModelInterface, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase { @@ -245,3 +246,7 @@ func (c *ConversationDataBase) GetConversationIDs(ctx context.Context, userID st func (c *ConversationDataBase) GetUserConversationIDsHash(ctx context.Context, ownerUserID string) (hash uint64, err error) { return c.cache.GetUserConversationIDsHash(ctx, ownerUserID) } + +func (c *ConversationDataBase) GetAllConversationIDs(ctx context.Context) ([]string, error) { + return c.conversationDB.GetAllConversationIDs(ctx) +} diff --git a/pkg/common/db/relation/conversation_model.go b/pkg/common/db/relation/conversation_model.go index f2963295f..09b7599da 100644 --- a/pkg/common/db/relation/conversation_model.go +++ b/pkg/common/db/relation/conversation_model.go @@ -76,3 +76,7 @@ func (c *ConversationGorm) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, c var conversation relation.ConversationModel return int(conversation.RecvMsgOpt), utils.Wrap(c.db(ctx).Where("conversation_id = ? And owner_user_id = ?", conversationID, ownerUserID).Select("recv_msg_opt").Find(&conversation).Error, "") } + +func (c *ConversationGorm) GetAllConversationIDs(ctx context.Context) (conversationIDs []string, err error) { + return conversationIDs, utils.Wrap(c.db(ctx).Distinct("conversation_id").Pluck("conversation_id", &conversationIDs).Error, "") +} diff --git a/pkg/common/db/table/relation/conversation.go b/pkg/common/db/table/relation/conversation.go index b137bf12a..0f44d096c 100644 --- a/pkg/common/db/table/relation/conversation.go +++ b/pkg/common/db/table/relation/conversation.go @@ -44,5 +44,6 @@ type ConversationModelInterface interface { FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error) FindSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) + GetAllConversationIDs(ctx context.Context) ([]string, error) NewTx(tx any) ConversationModelInterface } diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index 903abc78d..a7ce4cf3c 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -278,7 +278,7 @@ func IsNotification(conversationID string) bool { func GetNotificationConvetstionID(conversationID string) string { l := strings.Split(conversationID, "_") - if len(l) > 2 { + if len(l) > 1 { l[0] = "n" return strings.Join(l, "_") }