From a74168d1e150df9253549bdd95170cd9efc3098f Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 10 Aug 2022 12:02:50 +0800 Subject: [PATCH] fix delete --- config/config.yaml | 1 + internal/cron_task/clear_msg.go | 69 ++++++++++++++++++++++----------- internal/cron_task/cron_task.go | 9 +++-- pkg/common/config/config.go | 21 +++++----- 4 files changed, 64 insertions(+), 36 deletions(-) diff --git a/config/config.yaml b/config/config.yaml index 6ffb11f92..91df1a62f 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -32,6 +32,7 @@ mongo: dbPassword: #mongo密码,建议先不设置 dbMaxPoolSize: 100 dbRetainChatRecords: 3650 #mongo保存离线消息时间(天),根据需求修改 + chatRecordsClearTime: "* * * * *" # 每天凌晨3点清除消息,该配置和linux定时任务一样, 清理操作建议设置在用户活跃少的时候 # 0 3 * * * redis: dbAddress: [ 127.0.0.1:16379 ] #redis地址 单机时,填写一个地址即可,使用redis集群时候,填写集群中多个节点地址(主从地址都可以填写,增加容灾能力),默认即可 diff --git a/internal/cron_task/clear_msg.go b/internal/cron_task/clear_msg.go index 5e0851712..d7f530a2c 100644 --- a/internal/cron_task/clear_msg.go +++ b/internal/cron_task/clear_msg.go @@ -7,47 +7,72 @@ import ( server_api_params "Open_IM/pkg/proto/sdk_ws" "Open_IM/pkg/utils" "github.com/golang/protobuf/proto" - "strconv" - "strings" ) const oldestList = 0 const newestList = -1 -func ResetUserGroupMinSeq(operationID, groupID, userID string) error { +func ResetUserGroupMinSeq(operationID, groupID string, userIDList []string) error { + var delMsgIDList [][2]interface{} + minSeq, err := deleteMongoMsg(operationID, groupID, oldestList, &delMsgIDList) + if err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), groupID, "deleteMongoMsg failed") + return utils.Wrap(err, "") + } + for _, userID := range userIDList { + userMinSeq, err := db.DB.GetGroupUserMinSeq(groupID, userID) + if err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), "GetGroupUserMinSeq failed", groupID, userID, err.Error()) + continue + } + if userMinSeq > uint64(minSeq) { + err = db.DB.SetGroupUserMinSeq(groupID, userID, userMinSeq) + } else { + err = db.DB.SetGroupUserMinSeq(groupID, userID, uint64(minSeq)) + } + if err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupID, userID, userMinSeq, minSeq) + } + } return nil } func DeleteMongoMsgAndResetRedisSeq(operationID, userID string) error { - // -1 表示从当前最早的一个开始 - var delMsgIDList []string + var delMsgIDList [][2]interface{} minSeq, err := deleteMongoMsg(operationID, userID, oldestList, &delMsgIDList) if err != nil { return utils.Wrap(err, "") } - log.NewDebug(operationID, utils.GetSelfFuncName(), "delMsgIDList: ", delMsgIDList) + log.NewDebug(operationID, utils.GetSelfFuncName(), "delMsgIDMap: ", userID, delMsgIDList) err = db.DB.SetUserMinSeq(userID, minSeq) return err } // recursion -func deleteMongoMsg(operationID string, ID string, index int64, IDList *[]string) (uint32, error) { +func deleteMongoMsg(operationID string, ID string, index int64, delMsgIDList *[][2]interface{}) (uint32, error) { // 从最旧的列表开始找 msgs, err := db.DB.GetUserMsgListByIndex(ID, index) if err != nil { return 0, utils.Wrap(err, "GetUserMsgListByIndex failed") } + if len(msgs.Msg) > db.GetSingleGocMsgNum() { + log.NewWarn(operationID, utils.GetSelfFuncName(), "msgs too large", len(msgs.Msg), msgs.UID) + } log.NewDebug(operationID, utils.GetSelfFuncName(), "get msgs: ", msgs.UID) for i, msg := range msgs.Msg { // 找到列表中不需要删除的消息了 - if msg.SendTime+int64(config.Config.Mongo.DBRetainChatRecords) > utils.GetCurrentTimestampByMill() { - if len(*IDList) > 0 { - err := db.DB.DelMongoMsgs(*IDList) + if utils.GetCurrentTimestampByMill() < msg.SendTime+int64(config.Config.Mongo.DBRetainChatRecords)*24*60*60*1000 { + if len(*delMsgIDList) > 0 { + var IDList []string + for _, v := range *delMsgIDList { + IDList = append(IDList, v[0].(string)) + } + err := db.DB.DelMongoMsgs(IDList) if err != nil { return 0, utils.Wrap(err, "DelMongoMsgs failed") } } - minSeq := getDelMaxSeqByIDList(*IDList) + minSeq := getDelMaxSeqByIDList(*delMsgIDList) if i > 0 { msgPb := &server_api_params.MsgData{} err = proto.Unmarshal(msg.Msg, msgPb) @@ -65,24 +90,24 @@ func deleteMongoMsg(operationID string, ID string, index int64, IDList *[]string return minSeq, nil } } - *IDList = append(*IDList, msgs.UID) + msgPb := &server_api_params.MsgData{} + err = proto.Unmarshal(msgs.Msg[len(msgs.Msg)-1].Msg, msgPb) + if err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), len(msgs.Msg)-1, msgs.UID) + return 0, utils.Wrap(err, "proto.Unmarshal failed") + } + *delMsgIDList = append(*delMsgIDList, [2]interface{}{msgs.UID, msgPb.Seq}) // 没有找到 代表需要全部删除掉 继续递归查找下一个比较旧的列表 - seq, err := deleteMongoMsg(operationID, utils.GetSelfFuncName(), index-1, IDList) + seq, err := deleteMongoMsg(operationID, utils.GetSelfFuncName(), index+1, delMsgIDList) if err != nil { return 0, utils.Wrap(err, "deleteMongoMsg failed") } return seq, nil } -func getDelMaxSeqByIDList(IDList []string) uint32 { - if len(IDList) == 0 { +func getDelMaxSeqByIDList(delMsgIDList [][2]interface{}) uint32 { + if len(delMsgIDList) == 0 { return 0 } - l := strings.Split(IDList[len(IDList)-1], ":") - index, _ := strconv.Atoi(l[len(l)-1]) - if index == 0 { - // 4999 - return uint32(db.GetSingleGocMsgNum()) - 1 - } // 5000 - return (uint32(db.GetSingleGocMsgNum()) - 1) + uint32(index*db.GetSingleGocMsgNum()) + return delMsgIDList[len(delMsgIDList)-1][1].(uint32) } diff --git a/internal/cron_task/cron_task.go b/internal/cron_task/cron_task.go index d103786e1..03ee96603 100644 --- a/internal/cron_task/cron_task.go +++ b/internal/cron_task/cron_task.go @@ -6,6 +6,7 @@ import ( rocksCache "Open_IM/pkg/common/db/rocks_cache" "Open_IM/pkg/common/log" "Open_IM/pkg/utils" + "fmt" "github.com/robfig/cron/v3" ) @@ -37,11 +38,10 @@ func StartCronTask() { continue } log.NewDebug(operationID, utils.GetSelfFuncName(), "groupID:", groupID, "userIDList:", userIDList) - for _, userID := range userIDList { - if err := ResetUserGroupMinSeq(operationID, groupID, userID); err != nil { - log.NewError(operationID, utils.GetSelfFuncName(), operationID, groupID, userID, err.Error()) - } + if err := ResetUserGroupMinSeq(operationID, groupID, userIDList); err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupID, userIDList) } + } } else { log.NewError(operationID, utils.GetSelfFuncName(), err.Error()) @@ -49,6 +49,7 @@ func StartCronTask() { } }) if err != nil { + fmt.Println("start cron failed", err.Error()) panic(err) } c.Start() diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 74c03ca4a..29779116e 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -92,16 +92,17 @@ type config struct { DBMaxLifeTime int `yaml:"dbMaxLifeTime"` } Mongo struct { - DBUri string `yaml:"dbUri"` - DBAddress string `yaml:"dbAddress"` - DBDirect bool `yaml:"dbDirect"` - DBTimeout int `yaml:"dbTimeout"` - DBDatabase string `yaml:"dbDatabase"` - DBSource string `yaml:"dbSource"` - DBUserName string `yaml:"dbUserName"` - DBPassword string `yaml:"dbPassword"` - DBMaxPoolSize int `yaml:"dbMaxPoolSize"` - DBRetainChatRecords int `yaml:"dbRetainChatRecords"` + DBUri string `yaml:"dbUri"` + DBAddress string `yaml:"dbAddress"` + DBDirect bool `yaml:"dbDirect"` + DBTimeout int `yaml:"dbTimeout"` + DBDatabase string `yaml:"dbDatabase"` + DBSource string `yaml:"dbSource"` + DBUserName string `yaml:"dbUserName"` + DBPassword string `yaml:"dbPassword"` + DBMaxPoolSize int `yaml:"dbMaxPoolSize"` + DBRetainChatRecords int `yaml:"dbRetainChatRecords"` + ChatRecordsClearTime string `yaml:"chatRecordsClearTime"` } Redis struct { DBAddress []string `yaml:"dbAddress"`