From 569d5d2d8986c5994fefa82af8e2231757c91bd1 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Tue, 9 Aug 2022 15:12:13 +0800 Subject: [PATCH 01/11] super group add after callback --- internal/rpc/msg/send_msg.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index c9b17544d..d25e17fe2 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -476,6 +476,11 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S log.NewError(msgToMQSingle.OperationID, "kafka send msg err:RecvID", msgToMQSingle.MsgData.RecvID, msgToMQSingle.String()) return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) } + // callback + callbackResp = callbackAfterSendGroupMsg(pb) + if callbackResp.ErrCode != 0 { + log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackAfterSendSuperGroupMsg resp: ", callbackResp) + } return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime) default: From db8c4f369cae282095cfb329fffe4006a8044be8 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Tue, 9 Aug 2022 15:41:16 +0800 Subject: [PATCH 02/11] send options bug fix --- internal/rpc/msg/send_msg.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index d25e17fe2..747f401fa 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -974,6 +974,11 @@ func (rpc *rpcChat) sendMsgToGroupOptimization(list []string, groupPB *pbChat.Se msgToMQGroup := pbChat.MsgDataToMQ{Token: groupPB.Token, OperationID: groupPB.OperationID, MsgData: groupPB.MsgData} for _, v := range list { groupPB.MsgData.RecvID = v + options := make(map[string]bool, 1) + for k, v := range groupPB.MsgData.Options { + options[k] = v + } + groupPB.MsgData.Options = options isSend := modifyMessageByUserMessageReceiveOpt(v, groupPB.MsgData.GroupID, constant.GroupChatType, groupPB) if isSend { if v == "" || groupPB.MsgData.SendID == "" { From 7bd7edfec9f98b98060959ac1f1b52da45f5fa9a Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Tue, 9 Aug 2022 15:54:16 +0800 Subject: [PATCH 03/11] send options bug fix --- internal/rpc/msg/send_msg.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index 747f401fa..0b171a5eb 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -972,10 +972,14 @@ func (rpc *rpcChat) sendMsgToGroup(list []string, pb pbChat.SendMsgReq, status s func (rpc *rpcChat) sendMsgToGroupOptimization(list []string, groupPB *pbChat.SendMsgReq, status string, sendTag *bool, wg *sync.WaitGroup) { msgToMQGroup := pbChat.MsgDataToMQ{Token: groupPB.Token, OperationID: groupPB.OperationID, MsgData: groupPB.MsgData} + tempOptions := make(map[string]bool, 1) + for k, v := range groupPB.MsgData.Options { + tempOptions[k] = v + } for _, v := range list { groupPB.MsgData.RecvID = v options := make(map[string]bool, 1) - for k, v := range groupPB.MsgData.Options { + for k, v := range tempOptions { options[k] = v } groupPB.MsgData.Options = options From 2c0a2d7877d738c7c3b0b7f78946740051c019bb Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Tue, 9 Aug 2022 16:38:33 +0800 Subject: [PATCH 04/11] getui debug --- cmd/cron_task/main.go | 11 ++++ .../{timed_task => cron_task}/clear_msg.go | 4 +- internal/cron_task/cron_task.go | 50 +++++++++++++++++++ internal/rpc/group/group.go | 6 +-- internal/rpc/msg/send_msg.go | 2 +- internal/timed_task/init.go | 1 - internal/timed_task/timed_task.go | 23 --------- .../mysql_model/im_mysql_model/group_model.go | 8 +++ 8 files changed, 75 insertions(+), 30 deletions(-) create mode 100644 cmd/cron_task/main.go rename internal/{timed_task => cron_task}/clear_msg.go (96%) create mode 100644 internal/cron_task/cron_task.go delete mode 100644 internal/timed_task/init.go delete mode 100644 internal/timed_task/timed_task.go diff --git a/cmd/cron_task/main.go b/cmd/cron_task/main.go new file mode 100644 index 000000000..db62e1172 --- /dev/null +++ b/cmd/cron_task/main.go @@ -0,0 +1,11 @@ +package cron_task + +import ( + "Open_IM/internal/cron_task" + "fmt" +) + +func main() { + fmt.Println("start cronTask") + cronTask.StartCronTask() +} diff --git a/internal/timed_task/clear_msg.go b/internal/cron_task/clear_msg.go similarity index 96% rename from internal/timed_task/clear_msg.go rename to internal/cron_task/clear_msg.go index 6cb3cba79..2b4877d2e 100644 --- a/internal/timed_task/clear_msg.go +++ b/internal/cron_task/clear_msg.go @@ -1,4 +1,4 @@ -package timedTask +package cronTask import ( "Open_IM/pkg/common/config" @@ -67,7 +67,7 @@ func deleteMongoMsg(operationID string, ID string, index int64, IDList *[]string } } *IDList = append(*IDList, msgs.UID) - // 没有找到 代表需要全部删除掉 继续查找下一个比较旧的列表 + // 没有找到 代表需要全部删除掉 继续递归查找下一个比较旧的列表 seq, err := deleteMongoMsg(operationID, utils.GetSelfFuncName(), index-1, IDList) if err != nil { return 0, utils.Wrap(err, "deleteMongoMsg failed") diff --git a/internal/cron_task/cron_task.go b/internal/cron_task/cron_task.go new file mode 100644 index 000000000..ff861885a --- /dev/null +++ b/internal/cron_task/cron_task.go @@ -0,0 +1,50 @@ +package cronTask + +import ( + "Open_IM/pkg/common/constant" + "Open_IM/pkg/common/db/mysql_model/im_mysql_model" + "Open_IM/pkg/common/log" + "Open_IM/pkg/utils" + "github.com/robfig/cron/v3" +) + +const cronTaskOperationID = "cronTaskOperationID-" + +func StartCronTask() { + log.NewInfo(utils.OperationIDGenerator(), "start cron task") + c := cron.New() + _, err := c.AddFunc("30 3-6,20-23 * * *", func() { + operationID := getCronTaskOperationID() + userIDList, err := im_mysql_model.SelectAllUserID() + if err == nil { + log.NewDebug(operationID, utils.GetSelfFuncName(), "userIDList: ", userIDList) + for _, userID := range userIDList { + if err := DeleteMongoMsgAndResetRedisSeq(operationID, userID, constant.WriteDiffusion); err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), userID, constant.WriteDiffusion) + } + } + } else { + log.NewError(operationID, utils.GetSelfFuncName(), err.Error()) + } + + workingGroupIDList, err := im_mysql_model.GetGroupIDListByGroupType(constant.WorkingGroup) + if err == nil { + for _, groupID := range workingGroupIDList { + if err := DeleteMongoMsgAndResetRedisSeq(operationID, groupID, constant.ReadDiffusion); err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), operationID, groupID, constant.ReadDiffusion, err.Error()) + } + } + } else { + log.NewError(operationID, utils.GetSelfFuncName(), err.Error()) + return + } + }) + if err != nil { + panic(err) + } + c.Start() +} + +func getCronTaskOperationID() string { + return cronTaskOperationID + utils.OperationIDGenerator() +} diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 31fa0ff72..894c859c1 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -568,7 +568,7 @@ func (s *groupServer) getGroupUserLevel(groupID, userID string) (int, error) { func (s *groupServer) KickGroupMember(ctx context.Context, req *pbGroup.KickGroupMemberReq) (*pbGroup.KickGroupMemberResp, error) { log.NewInfo(req.OperationID, utils.GetSelfFuncName(), " rpc args ", req.String()) - groupInfo, err := imdb.GetGroupInfoByGroupID(req.GroupID) + groupInfo, err := rocksCache.GetGroupInfoFromCache(req.GroupID) if err != nil { log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetGroupInfoByGroupID", req.GroupID, err.Error()) return &pbGroup.KickGroupMemberResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}, nil @@ -578,7 +578,7 @@ func (s *groupServer) KickGroupMember(ctx context.Context, req *pbGroup.KickGrou if groupInfo.GroupType != constant.SuperGroup { opFlag := 0 if !token_verify.IsManagerUserID(req.OpUserID) { - opInfo, err := imdb.GetGroupMemberInfoByGroupIDAndUserID(req.GroupID, req.OpUserID) + opInfo, err := rocksCache.GetGroupMemberInfoFromCache(req.GroupID, req.OpUserID) if err != nil { errMsg := req.OperationID + " GetGroupMemberInfoByGroupIDAndUserID failed " + err.Error() + req.GroupID + req.OpUserID log.Error(req.OperationID, errMsg) @@ -605,7 +605,7 @@ func (s *groupServer) KickGroupMember(ctx context.Context, req *pbGroup.KickGrou //remove for _, v := range req.KickedUserIDList { - kickedInfo, err := imdb.GetGroupMemberInfoByGroupIDAndUserID(req.GroupID, v) + kickedInfo, err := rocksCache.GetGroupMemberInfoFromCache(req.GroupID, v) if err != nil { log.NewError(req.OperationID, " GetGroupMemberInfoByGroupIDAndUserID failed ", req.GroupID, v, err.Error()) resp.Id2ResultList = append(resp.Id2ResultList, &pbGroup.Id2Result{UserID: v, Result: -1}) diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index 0b171a5eb..91d8ca1d3 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -334,7 +334,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S } m := make(map[string][]string, 2) m[constant.OnlineStatus] = memberUserIDList - log.Debug(pb.OperationID, "send msg cost time1 ", db.GetCurrentTimestampByMill()-newTime, pb.MsgData.ClientMsgID) + log.Debug(pb.OperationID, "send msg cost time1 ", db.GetCurrentTimestampByMill()-newTime, pb.MsgData.ClientMsgID, pb) newTime = db.GetCurrentTimestampByMill() //split parallel send diff --git a/internal/timed_task/init.go b/internal/timed_task/init.go deleted file mode 100644 index fe4e214b0..000000000 --- a/internal/timed_task/init.go +++ /dev/null @@ -1 +0,0 @@ -package timedTask diff --git a/internal/timed_task/timed_task.go b/internal/timed_task/timed_task.go deleted file mode 100644 index 2498c730b..000000000 --- a/internal/timed_task/timed_task.go +++ /dev/null @@ -1,23 +0,0 @@ -package timedTask - -import ( - "Open_IM/pkg/common/constant" - "Open_IM/pkg/common/log" - "Open_IM/pkg/utils" - "github.com/robfig/cron/v3" -) - -func main() { - log.NewInfo(utils.OperationIDGenerator(), "start cron task") - c := cron.New() - _, err := c.AddFunc("30 3-6,20-23 * * *", func() { - operationID := utils.OperationIDGenerator() - if err := DeleteMongoMsgAndResetRedisSeq(operationID, "", constant.ReadDiffusion); err != nil { - log.NewError(operationID) - } - }) - if err != nil { - panic(err) - } - c.Start() -} diff --git a/pkg/common/db/mysql_model/im_mysql_model/group_model.go b/pkg/common/db/mysql_model/im_mysql_model/group_model.go index 19c2b1164..34d420b52 100644 --- a/pkg/common/db/mysql_model/im_mysql_model/group_model.go +++ b/pkg/common/db/mysql_model/im_mysql_model/group_model.go @@ -184,3 +184,11 @@ func GetAllGroupIDList() ([]string, error) { err := db.DB.MysqlDB.DefaultGormDB().Table("groups").Pluck("group_id", &groupIDList).Error return groupIDList, err } + +func GetGroupIDListByGroupType(groupType int) ([]string, error) { + var groupIDList []string + if err := db.DB.MysqlDB.DefaultGormDB().Table("groups").Where("group_type = ? ", groupType).Pluck("group_id", &groupIDList).Error; err != nil { + return nil, err + } + return groupIDList, nil +} From bcc0c7c2df4cc104088238ec95aabfa6a7484fdf Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Tue, 9 Aug 2022 17:29:41 +0800 Subject: [PATCH 05/11] delete conversation update --- internal/api/msg/del_msg.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/api/msg/del_msg.go b/internal/api/msg/del_msg.go index f52bf83ea..3fefd0c11 100644 --- a/internal/api/msg/del_msg.go +++ b/internal/api/msg/del_msg.go @@ -79,11 +79,11 @@ func DelSuperGroupMsg(c *gin.Context) { resp api.DelSuperGroupMsgResp ) rpcReq := &rpc.DelSuperGroupMsgReq{} - utils.CopyStructFields(rpcReq, &req) if err := c.BindJSON(&req); err != nil { c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()}) return } + utils.CopyStructFields(rpcReq, &req) log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req:", req) var ok bool var errInfo string From 706f6c6747af71b48e95199df9a87652ea936ab8 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Tue, 9 Aug 2022 18:48:11 +0800 Subject: [PATCH 06/11] getui debug --- internal/cron_task/clear_msg.go | 15 +++++++-------- internal/cron_task/cron_task.go | 17 +++++++++++++---- pkg/common/constant/constant.go | 6 ------ pkg/common/db/RedisModel.go | 2 +- 4 files changed, 21 insertions(+), 19 deletions(-) diff --git a/internal/cron_task/clear_msg.go b/internal/cron_task/clear_msg.go index 2b4877d2e..5e0851712 100644 --- a/internal/cron_task/clear_msg.go +++ b/internal/cron_task/clear_msg.go @@ -2,7 +2,6 @@ package cronTask import ( "Open_IM/pkg/common/config" - "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db" "Open_IM/pkg/common/log" server_api_params "Open_IM/pkg/proto/sdk_ws" @@ -15,19 +14,19 @@ import ( const oldestList = 0 const newestList = -1 -func DeleteMongoMsgAndResetRedisSeq(operationID, ID string, diffusionType int) error { +func ResetUserGroupMinSeq(operationID, groupID, userID string) error { + return nil +} + +func DeleteMongoMsgAndResetRedisSeq(operationID, userID string) error { // -1 表示从当前最早的一个开始 var delMsgIDList []string - minSeq, err := deleteMongoMsg(operationID, ID, oldestList, &delMsgIDList) + minSeq, err := deleteMongoMsg(operationID, userID, oldestList, &delMsgIDList) if err != nil { return utils.Wrap(err, "") } log.NewDebug(operationID, utils.GetSelfFuncName(), "delMsgIDList: ", delMsgIDList) - if diffusionType == constant.WriteDiffusion { - err = db.DB.SetUserMinSeq(ID, minSeq) - } else if diffusionType == constant.ReadDiffusion { - err = db.DB.SetGroupMinSeq(ID, minSeq) - } + err = db.DB.SetUserMinSeq(userID, minSeq) return err } diff --git a/internal/cron_task/cron_task.go b/internal/cron_task/cron_task.go index ff861885a..d103786e1 100644 --- a/internal/cron_task/cron_task.go +++ b/internal/cron_task/cron_task.go @@ -3,6 +3,7 @@ package cronTask import ( "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db/mysql_model/im_mysql_model" + rocksCache "Open_IM/pkg/common/db/rocks_cache" "Open_IM/pkg/common/log" "Open_IM/pkg/utils" "github.com/robfig/cron/v3" @@ -19,8 +20,8 @@ func StartCronTask() { if err == nil { log.NewDebug(operationID, utils.GetSelfFuncName(), "userIDList: ", userIDList) for _, userID := range userIDList { - if err := DeleteMongoMsgAndResetRedisSeq(operationID, userID, constant.WriteDiffusion); err != nil { - log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), userID, constant.WriteDiffusion) + if err := DeleteMongoMsgAndResetRedisSeq(operationID, userID); err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), userID) } } } else { @@ -30,8 +31,16 @@ func StartCronTask() { workingGroupIDList, err := im_mysql_model.GetGroupIDListByGroupType(constant.WorkingGroup) if err == nil { for _, groupID := range workingGroupIDList { - if err := DeleteMongoMsgAndResetRedisSeq(operationID, groupID, constant.ReadDiffusion); err != nil { - log.NewError(operationID, utils.GetSelfFuncName(), operationID, groupID, constant.ReadDiffusion, err.Error()) + userIDList, err = rocksCache.GetGroupMemberIDListFromCache(groupID) + if err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupID) + continue + } + log.NewDebug(operationID, utils.GetSelfFuncName(), "groupID:", groupID, "userIDList:", userIDList) + for _, userID := range userIDList { + if err := ResetUserGroupMinSeq(operationID, groupID, userID); err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), operationID, groupID, userID, err.Error()) + } } } } else { diff --git a/pkg/common/constant/constant.go b/pkg/common/constant/constant.go index 2ba3d1774..fecad1d32 100644 --- a/pkg/common/constant/constant.go +++ b/pkg/common/constant/constant.go @@ -227,12 +227,6 @@ const ( WorkMomentAtUserNotification = 2 ) -const ( - // diffusionType - WriteDiffusion = 0 - ReadDiffusion = 1 -) - const ( AtAllString = "AtAllTag" AtNormal = 0 diff --git a/pkg/common/db/RedisModel.go b/pkg/common/db/RedisModel.go index 4bbd66039..f21c61bc1 100644 --- a/pkg/common/db/RedisModel.go +++ b/pkg/common/db/RedisModel.go @@ -363,7 +363,7 @@ func (d *DataBases) DelMsgFromCache(uid string, seqList []uint32, operationID st result := d.RDB.Get(context.Background(), key).String() var msg pbCommon.MsgData if err := utils.String2Pb(result, &msg); err != nil { - log2.Error(operationID, utils.GetSelfFuncName(), "String2Pb failed", msg, err.Error()) + log2.Error(operationID, utils.GetSelfFuncName(), "String2Pb failed", msg, result, key, err.Error()) continue } msg.Status = constant.MsgDeleted From 6c0922a6d83ac9eb1676185cded2542735746a3a Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Tue, 9 Aug 2022 19:44:27 +0800 Subject: [PATCH 07/11] fix delete --- pkg/common/db/RedisModel.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/pkg/common/db/RedisModel.go b/pkg/common/db/RedisModel.go index f21c61bc1..535acd2e8 100644 --- a/pkg/common/db/RedisModel.go +++ b/pkg/common/db/RedisModel.go @@ -360,7 +360,15 @@ func (d *DataBases) DelUserSignalList(userID string) error { func (d *DataBases) DelMsgFromCache(uid string, seqList []uint32, operationID string) { for _, seq := range seqList { key := messageCache + uid + "_" + strconv.Itoa(int(seq)) - result := d.RDB.Get(context.Background(), key).String() + result, err := d.RDB.Get(context.Background(), key).Result() + if err != nil { + if err == go_redis.Nil { + log2.NewDebug(operationID, utils.GetSelfFuncName(), err.Error(), "redis nil") + } else { + log2.NewError(operationID, utils.GetSelfFuncName(), err.Error(), key) + } + continue + } var msg pbCommon.MsgData if err := utils.String2Pb(result, &msg); err != nil { log2.Error(operationID, utils.GetSelfFuncName(), "String2Pb failed", msg, result, key, err.Error()) From a74168d1e150df9253549bdd95170cd9efc3098f Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 10 Aug 2022 12:02:50 +0800 Subject: [PATCH 08/11] fix delete --- config/config.yaml | 1 + internal/cron_task/clear_msg.go | 69 ++++++++++++++++++++++----------- internal/cron_task/cron_task.go | 9 +++-- pkg/common/config/config.go | 21 +++++----- 4 files changed, 64 insertions(+), 36 deletions(-) diff --git a/config/config.yaml b/config/config.yaml index 6ffb11f92..91df1a62f 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -32,6 +32,7 @@ mongo: dbPassword: #mongo密码,建议先不设置 dbMaxPoolSize: 100 dbRetainChatRecords: 3650 #mongo保存离线消息时间(天),根据需求修改 + chatRecordsClearTime: "* * * * *" # 每天凌晨3点清除消息,该配置和linux定时任务一样, 清理操作建议设置在用户活跃少的时候 # 0 3 * * * redis: dbAddress: [ 127.0.0.1:16379 ] #redis地址 单机时,填写一个地址即可,使用redis集群时候,填写集群中多个节点地址(主从地址都可以填写,增加容灾能力),默认即可 diff --git a/internal/cron_task/clear_msg.go b/internal/cron_task/clear_msg.go index 5e0851712..d7f530a2c 100644 --- a/internal/cron_task/clear_msg.go +++ b/internal/cron_task/clear_msg.go @@ -7,47 +7,72 @@ import ( server_api_params "Open_IM/pkg/proto/sdk_ws" "Open_IM/pkg/utils" "github.com/golang/protobuf/proto" - "strconv" - "strings" ) const oldestList = 0 const newestList = -1 -func ResetUserGroupMinSeq(operationID, groupID, userID string) error { +func ResetUserGroupMinSeq(operationID, groupID string, userIDList []string) error { + var delMsgIDList [][2]interface{} + minSeq, err := deleteMongoMsg(operationID, groupID, oldestList, &delMsgIDList) + if err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), groupID, "deleteMongoMsg failed") + return utils.Wrap(err, "") + } + for _, userID := range userIDList { + userMinSeq, err := db.DB.GetGroupUserMinSeq(groupID, userID) + if err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), "GetGroupUserMinSeq failed", groupID, userID, err.Error()) + continue + } + if userMinSeq > uint64(minSeq) { + err = db.DB.SetGroupUserMinSeq(groupID, userID, userMinSeq) + } else { + err = db.DB.SetGroupUserMinSeq(groupID, userID, uint64(minSeq)) + } + if err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupID, userID, userMinSeq, minSeq) + } + } return nil } func DeleteMongoMsgAndResetRedisSeq(operationID, userID string) error { - // -1 表示从当前最早的一个开始 - var delMsgIDList []string + var delMsgIDList [][2]interface{} minSeq, err := deleteMongoMsg(operationID, userID, oldestList, &delMsgIDList) if err != nil { return utils.Wrap(err, "") } - log.NewDebug(operationID, utils.GetSelfFuncName(), "delMsgIDList: ", delMsgIDList) + log.NewDebug(operationID, utils.GetSelfFuncName(), "delMsgIDMap: ", userID, delMsgIDList) err = db.DB.SetUserMinSeq(userID, minSeq) return err } // recursion -func deleteMongoMsg(operationID string, ID string, index int64, IDList *[]string) (uint32, error) { +func deleteMongoMsg(operationID string, ID string, index int64, delMsgIDList *[][2]interface{}) (uint32, error) { // 从最旧的列表开始找 msgs, err := db.DB.GetUserMsgListByIndex(ID, index) if err != nil { return 0, utils.Wrap(err, "GetUserMsgListByIndex failed") } + if len(msgs.Msg) > db.GetSingleGocMsgNum() { + log.NewWarn(operationID, utils.GetSelfFuncName(), "msgs too large", len(msgs.Msg), msgs.UID) + } log.NewDebug(operationID, utils.GetSelfFuncName(), "get msgs: ", msgs.UID) for i, msg := range msgs.Msg { // 找到列表中不需要删除的消息了 - if msg.SendTime+int64(config.Config.Mongo.DBRetainChatRecords) > utils.GetCurrentTimestampByMill() { - if len(*IDList) > 0 { - err := db.DB.DelMongoMsgs(*IDList) + if utils.GetCurrentTimestampByMill() < msg.SendTime+int64(config.Config.Mongo.DBRetainChatRecords)*24*60*60*1000 { + if len(*delMsgIDList) > 0 { + var IDList []string + for _, v := range *delMsgIDList { + IDList = append(IDList, v[0].(string)) + } + err := db.DB.DelMongoMsgs(IDList) if err != nil { return 0, utils.Wrap(err, "DelMongoMsgs failed") } } - minSeq := getDelMaxSeqByIDList(*IDList) + minSeq := getDelMaxSeqByIDList(*delMsgIDList) if i > 0 { msgPb := &server_api_params.MsgData{} err = proto.Unmarshal(msg.Msg, msgPb) @@ -65,24 +90,24 @@ func deleteMongoMsg(operationID string, ID string, index int64, IDList *[]string return minSeq, nil } } - *IDList = append(*IDList, msgs.UID) + msgPb := &server_api_params.MsgData{} + err = proto.Unmarshal(msgs.Msg[len(msgs.Msg)-1].Msg, msgPb) + if err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), len(msgs.Msg)-1, msgs.UID) + return 0, utils.Wrap(err, "proto.Unmarshal failed") + } + *delMsgIDList = append(*delMsgIDList, [2]interface{}{msgs.UID, msgPb.Seq}) // 没有找到 代表需要全部删除掉 继续递归查找下一个比较旧的列表 - seq, err := deleteMongoMsg(operationID, utils.GetSelfFuncName(), index-1, IDList) + seq, err := deleteMongoMsg(operationID, utils.GetSelfFuncName(), index+1, delMsgIDList) if err != nil { return 0, utils.Wrap(err, "deleteMongoMsg failed") } return seq, nil } -func getDelMaxSeqByIDList(IDList []string) uint32 { - if len(IDList) == 0 { +func getDelMaxSeqByIDList(delMsgIDList [][2]interface{}) uint32 { + if len(delMsgIDList) == 0 { return 0 } - l := strings.Split(IDList[len(IDList)-1], ":") - index, _ := strconv.Atoi(l[len(l)-1]) - if index == 0 { - // 4999 - return uint32(db.GetSingleGocMsgNum()) - 1 - } // 5000 - return (uint32(db.GetSingleGocMsgNum()) - 1) + uint32(index*db.GetSingleGocMsgNum()) + return delMsgIDList[len(delMsgIDList)-1][1].(uint32) } diff --git a/internal/cron_task/cron_task.go b/internal/cron_task/cron_task.go index d103786e1..03ee96603 100644 --- a/internal/cron_task/cron_task.go +++ b/internal/cron_task/cron_task.go @@ -6,6 +6,7 @@ import ( rocksCache "Open_IM/pkg/common/db/rocks_cache" "Open_IM/pkg/common/log" "Open_IM/pkg/utils" + "fmt" "github.com/robfig/cron/v3" ) @@ -37,11 +38,10 @@ func StartCronTask() { continue } log.NewDebug(operationID, utils.GetSelfFuncName(), "groupID:", groupID, "userIDList:", userIDList) - for _, userID := range userIDList { - if err := ResetUserGroupMinSeq(operationID, groupID, userID); err != nil { - log.NewError(operationID, utils.GetSelfFuncName(), operationID, groupID, userID, err.Error()) - } + if err := ResetUserGroupMinSeq(operationID, groupID, userIDList); err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupID, userIDList) } + } } else { log.NewError(operationID, utils.GetSelfFuncName(), err.Error()) @@ -49,6 +49,7 @@ func StartCronTask() { } }) if err != nil { + fmt.Println("start cron failed", err.Error()) panic(err) } c.Start() diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 74c03ca4a..29779116e 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -92,16 +92,17 @@ type config struct { DBMaxLifeTime int `yaml:"dbMaxLifeTime"` } Mongo struct { - DBUri string `yaml:"dbUri"` - DBAddress string `yaml:"dbAddress"` - DBDirect bool `yaml:"dbDirect"` - DBTimeout int `yaml:"dbTimeout"` - DBDatabase string `yaml:"dbDatabase"` - DBSource string `yaml:"dbSource"` - DBUserName string `yaml:"dbUserName"` - DBPassword string `yaml:"dbPassword"` - DBMaxPoolSize int `yaml:"dbMaxPoolSize"` - DBRetainChatRecords int `yaml:"dbRetainChatRecords"` + DBUri string `yaml:"dbUri"` + DBAddress string `yaml:"dbAddress"` + DBDirect bool `yaml:"dbDirect"` + DBTimeout int `yaml:"dbTimeout"` + DBDatabase string `yaml:"dbDatabase"` + DBSource string `yaml:"dbSource"` + DBUserName string `yaml:"dbUserName"` + DBPassword string `yaml:"dbPassword"` + DBMaxPoolSize int `yaml:"dbMaxPoolSize"` + DBRetainChatRecords int `yaml:"dbRetainChatRecords"` + ChatRecordsClearTime string `yaml:"chatRecordsClearTime"` } Redis struct { DBAddress []string `yaml:"dbAddress"` From 4c101649701ade5b0413d5278af7c3fbea9d140a Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 10 Aug 2022 12:07:32 +0800 Subject: [PATCH 09/11] fix delete --- cmd/cron_task/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/cron_task/main.go b/cmd/cron_task/main.go index db62e1172..1e7396d61 100644 --- a/cmd/cron_task/main.go +++ b/cmd/cron_task/main.go @@ -1,4 +1,4 @@ -package cron_task +package main import ( "Open_IM/internal/cron_task" From cfe218b8a440c041a44a4c752cbb4bc883ca0405 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 10 Aug 2022 12:08:28 +0800 Subject: [PATCH 10/11] fix delete --- internal/cron_task/cron_task.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/cron_task/cron_task.go b/internal/cron_task/cron_task.go index 03ee96603..7617f81a0 100644 --- a/internal/cron_task/cron_task.go +++ b/internal/cron_task/cron_task.go @@ -53,6 +53,7 @@ func StartCronTask() { panic(err) } c.Start() + fmt.Println("start cron task success") } func getCronTaskOperationID() string { From 2ee094311566f92e9ccbb073209a7202b3a2e0a4 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 10 Aug 2022 12:09:28 +0800 Subject: [PATCH 11/11] fix delete --- internal/cron_task/cron_task.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/cron_task/cron_task.go b/internal/cron_task/cron_task.go index 7617f81a0..22a7a3901 100644 --- a/internal/cron_task/cron_task.go +++ b/internal/cron_task/cron_task.go @@ -8,6 +8,7 @@ import ( "Open_IM/pkg/utils" "fmt" "github.com/robfig/cron/v3" + "time" ) const cronTaskOperationID = "cronTaskOperationID-" @@ -54,6 +55,9 @@ func StartCronTask() { } c.Start() fmt.Println("start cron task success") + for { + time.Sleep(time.Second) + } } func getCronTaskOperationID() string {