From 2892add766ae974fbdb4e27a0dc6477588cad3c6 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 10 Aug 2022 15:14:51 +0800 Subject: [PATCH] fix delete --- internal/cron_task/clear_msg.go | 48 ++++++++++++++++++++------------- internal/cron_task/cron_task.go | 3 ++- 2 files changed, 32 insertions(+), 19 deletions(-) diff --git a/internal/cron_task/clear_msg.go b/internal/cron_task/clear_msg.go index d7f530a2c..41c1b18e0 100644 --- a/internal/cron_task/clear_msg.go +++ b/internal/cron_task/clear_msg.go @@ -48,12 +48,29 @@ func DeleteMongoMsgAndResetRedisSeq(operationID, userID string) error { return err } +func delMongoMsgs(operationID string, delMsgIDList *[][2]interface{}) error { + if len(*delMsgIDList) > 0 { + var IDList []string + for _, v := range *delMsgIDList { + IDList = append(IDList, v[0].(string)) + } + err := db.DB.DelMongoMsgs(IDList) + if err != nil { + return utils.Wrap(err, "DelMongoMsgs failed") + } + } + return nil +} + // recursion func deleteMongoMsg(operationID string, ID string, index int64, delMsgIDList *[][2]interface{}) (uint32, error) { // 从最旧的列表开始找 msgs, err := db.DB.GetUserMsgListByIndex(ID, index) - if err != nil { - return 0, utils.Wrap(err, "GetUserMsgListByIndex failed") + if err != nil || msgs.UID == "" { + if err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), "GetUserMsgListByIndex failed", err.Error(), index, ID) + } + return getDelMaxSeqByIDList(*delMsgIDList), delMongoMsgs(operationID, delMsgIDList) } if len(msgs.Msg) > db.GetSingleGocMsgNum() { log.NewWarn(operationID, utils.GetSelfFuncName(), "msgs too large", len(msgs.Msg), msgs.UID) @@ -62,15 +79,8 @@ func deleteMongoMsg(operationID string, ID string, index int64, delMsgIDList *[] for i, msg := range msgs.Msg { // 找到列表中不需要删除的消息了 if utils.GetCurrentTimestampByMill() < msg.SendTime+int64(config.Config.Mongo.DBRetainChatRecords)*24*60*60*1000 { - if len(*delMsgIDList) > 0 { - var IDList []string - for _, v := range *delMsgIDList { - IDList = append(IDList, v[0].(string)) - } - err := db.DB.DelMongoMsgs(IDList) - if err != nil { - return 0, utils.Wrap(err, "DelMongoMsgs failed") - } + if err := delMongoMsgs(operationID, delMsgIDList); err != nil { + return 0, err } minSeq := getDelMaxSeqByIDList(*delMsgIDList) if i > 0 { @@ -84,19 +94,21 @@ func deleteMongoMsg(operationID string, ID string, index int64, delMsgIDList *[] log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), msgs.UID, i) return minSeq, nil } - minSeq = msgPb.Seq - 1 + minSeq = msgPb.Seq } } return minSeq, nil } } - msgPb := &server_api_params.MsgData{} - err = proto.Unmarshal(msgs.Msg[len(msgs.Msg)-1].Msg, msgPb) - if err != nil { - log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), len(msgs.Msg)-1, msgs.UID) - return 0, utils.Wrap(err, "proto.Unmarshal failed") + if len(msgs.Msg) > 0 { + msgPb := &server_api_params.MsgData{} + err = proto.Unmarshal(msgs.Msg[len(msgs.Msg)-1].Msg, msgPb) + if err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), len(msgs.Msg)-1, msgs.UID) + return 0, utils.Wrap(err, "proto.Unmarshal failed") + } + *delMsgIDList = append(*delMsgIDList, [2]interface{}{msgs.UID, msgPb.Seq}) } - *delMsgIDList = append(*delMsgIDList, [2]interface{}{msgs.UID, msgPb.Seq}) // 没有找到 代表需要全部删除掉 继续递归查找下一个比较旧的列表 seq, err := deleteMongoMsg(operationID, utils.GetSelfFuncName(), index+1, delMsgIDList) if err != nil { diff --git a/internal/cron_task/cron_task.go b/internal/cron_task/cron_task.go index 22a7a3901..3aabea599 100644 --- a/internal/cron_task/cron_task.go +++ b/internal/cron_task/cron_task.go @@ -1,6 +1,7 @@ package cronTask import ( + "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db/mysql_model/im_mysql_model" rocksCache "Open_IM/pkg/common/db/rocks_cache" @@ -16,7 +17,7 @@ const cronTaskOperationID = "cronTaskOperationID-" func StartCronTask() { log.NewInfo(utils.OperationIDGenerator(), "start cron task") c := cron.New() - _, err := c.AddFunc("30 3-6,20-23 * * *", func() { + _, err := c.AddFunc(config.Config.Mongo.ChatRecordsClearTime, func() { operationID := getCronTaskOperationID() userIDList, err := im_mysql_model.SelectAllUserID() if err == nil {