diff --git a/cmd/cron_task/main.go b/cmd/cron_task/main.go new file mode 100644 index 000000000..1e7396d61 --- /dev/null +++ b/cmd/cron_task/main.go @@ -0,0 +1,11 @@ +package main + +import ( + "Open_IM/internal/cron_task" + "fmt" +) + +func main() { + fmt.Println("start cronTask") + cronTask.StartCronTask() +} diff --git a/config/config.yaml b/config/config.yaml index 6ffb11f92..91df1a62f 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -32,6 +32,7 @@ mongo: dbPassword: #mongo密码,建议先不设置 dbMaxPoolSize: 100 dbRetainChatRecords: 3650 #mongo保存离线消息时间(天),根据需求修改 + chatRecordsClearTime: "* * * * *" # 每天凌晨3点清除消息,该配置和linux定时任务一样, 清理操作建议设置在用户活跃少的时候 # 0 3 * * * redis: dbAddress: [ 127.0.0.1:16379 ] #redis地址 单机时,填写一个地址即可,使用redis集群时候,填写集群中多个节点地址(主从地址都可以填写,增加容灾能力),默认即可 diff --git a/internal/api/msg/del_msg.go b/internal/api/msg/del_msg.go index f52bf83ea..3fefd0c11 100644 --- a/internal/api/msg/del_msg.go +++ b/internal/api/msg/del_msg.go @@ -79,11 +79,11 @@ func DelSuperGroupMsg(c *gin.Context) { resp api.DelSuperGroupMsgResp ) rpcReq := &rpc.DelSuperGroupMsgReq{} - utils.CopyStructFields(rpcReq, &req) if err := c.BindJSON(&req); err != nil { c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()}) return } + utils.CopyStructFields(rpcReq, &req) log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req:", req) var ok bool var errInfo string diff --git a/internal/cron_task/clear_msg.go b/internal/cron_task/clear_msg.go new file mode 100644 index 000000000..d7f530a2c --- /dev/null +++ b/internal/cron_task/clear_msg.go @@ -0,0 +1,113 @@ +package cronTask + +import ( + "Open_IM/pkg/common/config" + "Open_IM/pkg/common/db" + "Open_IM/pkg/common/log" + server_api_params "Open_IM/pkg/proto/sdk_ws" + "Open_IM/pkg/utils" + "github.com/golang/protobuf/proto" +) + +const oldestList = 0 +const newestList = -1 + +func ResetUserGroupMinSeq(operationID, groupID string, userIDList []string) error { + var delMsgIDList [][2]interface{} + minSeq, err := deleteMongoMsg(operationID, groupID, oldestList, &delMsgIDList) + if err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), groupID, "deleteMongoMsg failed") + return utils.Wrap(err, "") + } + for _, userID := range userIDList { + userMinSeq, err := db.DB.GetGroupUserMinSeq(groupID, userID) + if err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), "GetGroupUserMinSeq failed", groupID, userID, err.Error()) + continue + } + if userMinSeq > uint64(minSeq) { + err = db.DB.SetGroupUserMinSeq(groupID, userID, userMinSeq) + } else { + err = db.DB.SetGroupUserMinSeq(groupID, userID, uint64(minSeq)) + } + if err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupID, userID, userMinSeq, minSeq) + } + } + return nil +} + +func DeleteMongoMsgAndResetRedisSeq(operationID, userID string) error { + var delMsgIDList [][2]interface{} + minSeq, err := deleteMongoMsg(operationID, userID, oldestList, &delMsgIDList) + if err != nil { + return utils.Wrap(err, "") + } + log.NewDebug(operationID, utils.GetSelfFuncName(), "delMsgIDMap: ", userID, delMsgIDList) + err = db.DB.SetUserMinSeq(userID, minSeq) + return err +} + +// recursion +func deleteMongoMsg(operationID string, ID string, index int64, delMsgIDList *[][2]interface{}) (uint32, error) { + // 从最旧的列表开始找 + msgs, err := db.DB.GetUserMsgListByIndex(ID, index) + if err != nil { + return 0, utils.Wrap(err, "GetUserMsgListByIndex failed") + } + if len(msgs.Msg) > db.GetSingleGocMsgNum() { + log.NewWarn(operationID, utils.GetSelfFuncName(), "msgs too large", len(msgs.Msg), msgs.UID) + } + log.NewDebug(operationID, utils.GetSelfFuncName(), "get msgs: ", msgs.UID) + for i, msg := range msgs.Msg { + // 找到列表中不需要删除的消息了 + if utils.GetCurrentTimestampByMill() < msg.SendTime+int64(config.Config.Mongo.DBRetainChatRecords)*24*60*60*1000 { + if len(*delMsgIDList) > 0 { + var IDList []string + for _, v := range *delMsgIDList { + IDList = append(IDList, v[0].(string)) + } + err := db.DB.DelMongoMsgs(IDList) + if err != nil { + return 0, utils.Wrap(err, "DelMongoMsgs failed") + } + } + minSeq := getDelMaxSeqByIDList(*delMsgIDList) + if i > 0 { + msgPb := &server_api_params.MsgData{} + err = proto.Unmarshal(msg.Msg, msgPb) + if err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), ID, index) + } else { + err = db.DB.ReplaceMsgToBlankByIndex(msgs.UID, i-1) + if err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), msgs.UID, i) + return minSeq, nil + } + minSeq = msgPb.Seq - 1 + } + } + return minSeq, nil + } + } + msgPb := &server_api_params.MsgData{} + err = proto.Unmarshal(msgs.Msg[len(msgs.Msg)-1].Msg, msgPb) + if err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), len(msgs.Msg)-1, msgs.UID) + return 0, utils.Wrap(err, "proto.Unmarshal failed") + } + *delMsgIDList = append(*delMsgIDList, [2]interface{}{msgs.UID, msgPb.Seq}) + // 没有找到 代表需要全部删除掉 继续递归查找下一个比较旧的列表 + seq, err := deleteMongoMsg(operationID, utils.GetSelfFuncName(), index+1, delMsgIDList) + if err != nil { + return 0, utils.Wrap(err, "deleteMongoMsg failed") + } + return seq, nil +} + +func getDelMaxSeqByIDList(delMsgIDList [][2]interface{}) uint32 { + if len(delMsgIDList) == 0 { + return 0 + } + return delMsgIDList[len(delMsgIDList)-1][1].(uint32) +} diff --git a/internal/cron_task/cron_task.go b/internal/cron_task/cron_task.go new file mode 100644 index 000000000..22a7a3901 --- /dev/null +++ b/internal/cron_task/cron_task.go @@ -0,0 +1,65 @@ +package cronTask + +import ( + "Open_IM/pkg/common/constant" + "Open_IM/pkg/common/db/mysql_model/im_mysql_model" + rocksCache "Open_IM/pkg/common/db/rocks_cache" + "Open_IM/pkg/common/log" + "Open_IM/pkg/utils" + "fmt" + "github.com/robfig/cron/v3" + "time" +) + +const cronTaskOperationID = "cronTaskOperationID-" + +func StartCronTask() { + log.NewInfo(utils.OperationIDGenerator(), "start cron task") + c := cron.New() + _, err := c.AddFunc("30 3-6,20-23 * * *", func() { + operationID := getCronTaskOperationID() + userIDList, err := im_mysql_model.SelectAllUserID() + if err == nil { + log.NewDebug(operationID, utils.GetSelfFuncName(), "userIDList: ", userIDList) + for _, userID := range userIDList { + if err := DeleteMongoMsgAndResetRedisSeq(operationID, userID); err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), userID) + } + } + } else { + log.NewError(operationID, utils.GetSelfFuncName(), err.Error()) + } + + workingGroupIDList, err := im_mysql_model.GetGroupIDListByGroupType(constant.WorkingGroup) + if err == nil { + for _, groupID := range workingGroupIDList { + userIDList, err = rocksCache.GetGroupMemberIDListFromCache(groupID) + if err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupID) + continue + } + log.NewDebug(operationID, utils.GetSelfFuncName(), "groupID:", groupID, "userIDList:", userIDList) + if err := ResetUserGroupMinSeq(operationID, groupID, userIDList); err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupID, userIDList) + } + + } + } else { + log.NewError(operationID, utils.GetSelfFuncName(), err.Error()) + return + } + }) + if err != nil { + fmt.Println("start cron failed", err.Error()) + panic(err) + } + c.Start() + fmt.Println("start cron task success") + for { + time.Sleep(time.Second) + } +} + +func getCronTaskOperationID() string { + return cronTaskOperationID + utils.OperationIDGenerator() +} diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 31fa0ff72..894c859c1 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -568,7 +568,7 @@ func (s *groupServer) getGroupUserLevel(groupID, userID string) (int, error) { func (s *groupServer) KickGroupMember(ctx context.Context, req *pbGroup.KickGroupMemberReq) (*pbGroup.KickGroupMemberResp, error) { log.NewInfo(req.OperationID, utils.GetSelfFuncName(), " rpc args ", req.String()) - groupInfo, err := imdb.GetGroupInfoByGroupID(req.GroupID) + groupInfo, err := rocksCache.GetGroupInfoFromCache(req.GroupID) if err != nil { log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetGroupInfoByGroupID", req.GroupID, err.Error()) return &pbGroup.KickGroupMemberResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}, nil @@ -578,7 +578,7 @@ func (s *groupServer) KickGroupMember(ctx context.Context, req *pbGroup.KickGrou if groupInfo.GroupType != constant.SuperGroup { opFlag := 0 if !token_verify.IsManagerUserID(req.OpUserID) { - opInfo, err := imdb.GetGroupMemberInfoByGroupIDAndUserID(req.GroupID, req.OpUserID) + opInfo, err := rocksCache.GetGroupMemberInfoFromCache(req.GroupID, req.OpUserID) if err != nil { errMsg := req.OperationID + " GetGroupMemberInfoByGroupIDAndUserID failed " + err.Error() + req.GroupID + req.OpUserID log.Error(req.OperationID, errMsg) @@ -605,7 +605,7 @@ func (s *groupServer) KickGroupMember(ctx context.Context, req *pbGroup.KickGrou //remove for _, v := range req.KickedUserIDList { - kickedInfo, err := imdb.GetGroupMemberInfoByGroupIDAndUserID(req.GroupID, v) + kickedInfo, err := rocksCache.GetGroupMemberInfoFromCache(req.GroupID, v) if err != nil { log.NewError(req.OperationID, " GetGroupMemberInfoByGroupIDAndUserID failed ", req.GroupID, v, err.Error()) resp.Id2ResultList = append(resp.Id2ResultList, &pbGroup.Id2Result{UserID: v, Result: -1}) diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index c9b17544d..91d8ca1d3 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -334,7 +334,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S } m := make(map[string][]string, 2) m[constant.OnlineStatus] = memberUserIDList - log.Debug(pb.OperationID, "send msg cost time1 ", db.GetCurrentTimestampByMill()-newTime, pb.MsgData.ClientMsgID) + log.Debug(pb.OperationID, "send msg cost time1 ", db.GetCurrentTimestampByMill()-newTime, pb.MsgData.ClientMsgID, pb) newTime = db.GetCurrentTimestampByMill() //split parallel send @@ -476,6 +476,11 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S log.NewError(msgToMQSingle.OperationID, "kafka send msg err:RecvID", msgToMQSingle.MsgData.RecvID, msgToMQSingle.String()) return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) } + // callback + callbackResp = callbackAfterSendGroupMsg(pb) + if callbackResp.ErrCode != 0 { + log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackAfterSendSuperGroupMsg resp: ", callbackResp) + } return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime) default: @@ -967,8 +972,17 @@ func (rpc *rpcChat) sendMsgToGroup(list []string, pb pbChat.SendMsgReq, status s func (rpc *rpcChat) sendMsgToGroupOptimization(list []string, groupPB *pbChat.SendMsgReq, status string, sendTag *bool, wg *sync.WaitGroup) { msgToMQGroup := pbChat.MsgDataToMQ{Token: groupPB.Token, OperationID: groupPB.OperationID, MsgData: groupPB.MsgData} + tempOptions := make(map[string]bool, 1) + for k, v := range groupPB.MsgData.Options { + tempOptions[k] = v + } for _, v := range list { groupPB.MsgData.RecvID = v + options := make(map[string]bool, 1) + for k, v := range tempOptions { + options[k] = v + } + groupPB.MsgData.Options = options isSend := modifyMessageByUserMessageReceiveOpt(v, groupPB.MsgData.GroupID, constant.GroupChatType, groupPB) if isSend { if v == "" || groupPB.MsgData.SendID == "" { diff --git a/internal/timed_task/clear_msg.go b/internal/timed_task/clear_msg.go deleted file mode 100644 index 6cb3cba79..000000000 --- a/internal/timed_task/clear_msg.go +++ /dev/null @@ -1,89 +0,0 @@ -package timedTask - -import ( - "Open_IM/pkg/common/config" - "Open_IM/pkg/common/constant" - "Open_IM/pkg/common/db" - "Open_IM/pkg/common/log" - server_api_params "Open_IM/pkg/proto/sdk_ws" - "Open_IM/pkg/utils" - "github.com/golang/protobuf/proto" - "strconv" - "strings" -) - -const oldestList = 0 -const newestList = -1 - -func DeleteMongoMsgAndResetRedisSeq(operationID, ID string, diffusionType int) error { - // -1 表示从当前最早的一个开始 - var delMsgIDList []string - minSeq, err := deleteMongoMsg(operationID, ID, oldestList, &delMsgIDList) - if err != nil { - return utils.Wrap(err, "") - } - log.NewDebug(operationID, utils.GetSelfFuncName(), "delMsgIDList: ", delMsgIDList) - if diffusionType == constant.WriteDiffusion { - err = db.DB.SetUserMinSeq(ID, minSeq) - } else if diffusionType == constant.ReadDiffusion { - err = db.DB.SetGroupMinSeq(ID, minSeq) - } - return err -} - -// recursion -func deleteMongoMsg(operationID string, ID string, index int64, IDList *[]string) (uint32, error) { - // 从最旧的列表开始找 - msgs, err := db.DB.GetUserMsgListByIndex(ID, index) - if err != nil { - return 0, utils.Wrap(err, "GetUserMsgListByIndex failed") - } - log.NewDebug(operationID, utils.GetSelfFuncName(), "get msgs: ", msgs.UID) - for i, msg := range msgs.Msg { - // 找到列表中不需要删除的消息了 - if msg.SendTime+int64(config.Config.Mongo.DBRetainChatRecords) > utils.GetCurrentTimestampByMill() { - if len(*IDList) > 0 { - err := db.DB.DelMongoMsgs(*IDList) - if err != nil { - return 0, utils.Wrap(err, "DelMongoMsgs failed") - } - } - minSeq := getDelMaxSeqByIDList(*IDList) - if i > 0 { - msgPb := &server_api_params.MsgData{} - err = proto.Unmarshal(msg.Msg, msgPb) - if err != nil { - log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), ID, index) - } else { - err = db.DB.ReplaceMsgToBlankByIndex(msgs.UID, i-1) - if err != nil { - log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), msgs.UID, i) - return minSeq, nil - } - minSeq = msgPb.Seq - 1 - } - } - return minSeq, nil - } - } - *IDList = append(*IDList, msgs.UID) - // 没有找到 代表需要全部删除掉 继续查找下一个比较旧的列表 - seq, err := deleteMongoMsg(operationID, utils.GetSelfFuncName(), index-1, IDList) - if err != nil { - return 0, utils.Wrap(err, "deleteMongoMsg failed") - } - return seq, nil -} - -func getDelMaxSeqByIDList(IDList []string) uint32 { - if len(IDList) == 0 { - return 0 - } - l := strings.Split(IDList[len(IDList)-1], ":") - index, _ := strconv.Atoi(l[len(l)-1]) - if index == 0 { - // 4999 - return uint32(db.GetSingleGocMsgNum()) - 1 - } // 5000 - return (uint32(db.GetSingleGocMsgNum()) - 1) + uint32(index*db.GetSingleGocMsgNum()) -} diff --git a/internal/timed_task/init.go b/internal/timed_task/init.go deleted file mode 100644 index fe4e214b0..000000000 --- a/internal/timed_task/init.go +++ /dev/null @@ -1 +0,0 @@ -package timedTask diff --git a/internal/timed_task/timed_task.go b/internal/timed_task/timed_task.go deleted file mode 100644 index 2498c730b..000000000 --- a/internal/timed_task/timed_task.go +++ /dev/null @@ -1,23 +0,0 @@ -package timedTask - -import ( - "Open_IM/pkg/common/constant" - "Open_IM/pkg/common/log" - "Open_IM/pkg/utils" - "github.com/robfig/cron/v3" -) - -func main() { - log.NewInfo(utils.OperationIDGenerator(), "start cron task") - c := cron.New() - _, err := c.AddFunc("30 3-6,20-23 * * *", func() { - operationID := utils.OperationIDGenerator() - if err := DeleteMongoMsgAndResetRedisSeq(operationID, "", constant.ReadDiffusion); err != nil { - log.NewError(operationID) - } - }) - if err != nil { - panic(err) - } - c.Start() -} diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 74c03ca4a..29779116e 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -92,16 +92,17 @@ type config struct { DBMaxLifeTime int `yaml:"dbMaxLifeTime"` } Mongo struct { - DBUri string `yaml:"dbUri"` - DBAddress string `yaml:"dbAddress"` - DBDirect bool `yaml:"dbDirect"` - DBTimeout int `yaml:"dbTimeout"` - DBDatabase string `yaml:"dbDatabase"` - DBSource string `yaml:"dbSource"` - DBUserName string `yaml:"dbUserName"` - DBPassword string `yaml:"dbPassword"` - DBMaxPoolSize int `yaml:"dbMaxPoolSize"` - DBRetainChatRecords int `yaml:"dbRetainChatRecords"` + DBUri string `yaml:"dbUri"` + DBAddress string `yaml:"dbAddress"` + DBDirect bool `yaml:"dbDirect"` + DBTimeout int `yaml:"dbTimeout"` + DBDatabase string `yaml:"dbDatabase"` + DBSource string `yaml:"dbSource"` + DBUserName string `yaml:"dbUserName"` + DBPassword string `yaml:"dbPassword"` + DBMaxPoolSize int `yaml:"dbMaxPoolSize"` + DBRetainChatRecords int `yaml:"dbRetainChatRecords"` + ChatRecordsClearTime string `yaml:"chatRecordsClearTime"` } Redis struct { DBAddress []string `yaml:"dbAddress"` diff --git a/pkg/common/constant/constant.go b/pkg/common/constant/constant.go index 2ba3d1774..fecad1d32 100644 --- a/pkg/common/constant/constant.go +++ b/pkg/common/constant/constant.go @@ -227,12 +227,6 @@ const ( WorkMomentAtUserNotification = 2 ) -const ( - // diffusionType - WriteDiffusion = 0 - ReadDiffusion = 1 -) - const ( AtAllString = "AtAllTag" AtNormal = 0 diff --git a/pkg/common/db/RedisModel.go b/pkg/common/db/RedisModel.go index 4bbd66039..535acd2e8 100644 --- a/pkg/common/db/RedisModel.go +++ b/pkg/common/db/RedisModel.go @@ -360,10 +360,18 @@ func (d *DataBases) DelUserSignalList(userID string) error { func (d *DataBases) DelMsgFromCache(uid string, seqList []uint32, operationID string) { for _, seq := range seqList { key := messageCache + uid + "_" + strconv.Itoa(int(seq)) - result := d.RDB.Get(context.Background(), key).String() + result, err := d.RDB.Get(context.Background(), key).Result() + if err != nil { + if err == go_redis.Nil { + log2.NewDebug(operationID, utils.GetSelfFuncName(), err.Error(), "redis nil") + } else { + log2.NewError(operationID, utils.GetSelfFuncName(), err.Error(), key) + } + continue + } var msg pbCommon.MsgData if err := utils.String2Pb(result, &msg); err != nil { - log2.Error(operationID, utils.GetSelfFuncName(), "String2Pb failed", msg, err.Error()) + log2.Error(operationID, utils.GetSelfFuncName(), "String2Pb failed", msg, result, key, err.Error()) continue } msg.Status = constant.MsgDeleted diff --git a/pkg/common/db/mysql_model/im_mysql_model/group_model.go b/pkg/common/db/mysql_model/im_mysql_model/group_model.go index 19c2b1164..34d420b52 100644 --- a/pkg/common/db/mysql_model/im_mysql_model/group_model.go +++ b/pkg/common/db/mysql_model/im_mysql_model/group_model.go @@ -184,3 +184,11 @@ func GetAllGroupIDList() ([]string, error) { err := db.DB.MysqlDB.DefaultGormDB().Table("groups").Pluck("group_id", &groupIDList).Error return groupIDList, err } + +func GetGroupIDListByGroupType(groupType int) ([]string, error) { + var groupIDList []string + if err := db.DB.MysqlDB.DefaultGormDB().Table("groups").Where("group_type = ? ", groupType).Pluck("group_id", &groupIDList).Error; err != nil { + return nil, err + } + return groupIDList, nil +}