diff --git a/cmd/cron_task/main.go b/cmd/cron_task/main.go new file mode 100644 index 000000000..db62e1172 --- /dev/null +++ b/cmd/cron_task/main.go @@ -0,0 +1,11 @@ +package cron_task + +import ( + "Open_IM/internal/cron_task" + "fmt" +) + +func main() { + fmt.Println("start cronTask") + cronTask.StartCronTask() +} diff --git a/internal/timed_task/clear_msg.go b/internal/cron_task/clear_msg.go similarity index 96% rename from internal/timed_task/clear_msg.go rename to internal/cron_task/clear_msg.go index 6cb3cba79..2b4877d2e 100644 --- a/internal/timed_task/clear_msg.go +++ b/internal/cron_task/clear_msg.go @@ -1,4 +1,4 @@ -package timedTask +package cronTask import ( "Open_IM/pkg/common/config" @@ -67,7 +67,7 @@ func deleteMongoMsg(operationID string, ID string, index int64, IDList *[]string } } *IDList = append(*IDList, msgs.UID) - // 没有找到 代表需要全部删除掉 继续查找下一个比较旧的列表 + // 没有找到 代表需要全部删除掉 继续递归查找下一个比较旧的列表 seq, err := deleteMongoMsg(operationID, utils.GetSelfFuncName(), index-1, IDList) if err != nil { return 0, utils.Wrap(err, "deleteMongoMsg failed") diff --git a/internal/cron_task/cron_task.go b/internal/cron_task/cron_task.go new file mode 100644 index 000000000..ff861885a --- /dev/null +++ b/internal/cron_task/cron_task.go @@ -0,0 +1,50 @@ +package cronTask + +import ( + "Open_IM/pkg/common/constant" + "Open_IM/pkg/common/db/mysql_model/im_mysql_model" + "Open_IM/pkg/common/log" + "Open_IM/pkg/utils" + "github.com/robfig/cron/v3" +) + +const cronTaskOperationID = "cronTaskOperationID-" + +func StartCronTask() { + log.NewInfo(utils.OperationIDGenerator(), "start cron task") + c := cron.New() + _, err := c.AddFunc("30 3-6,20-23 * * *", func() { + operationID := getCronTaskOperationID() + userIDList, err := im_mysql_model.SelectAllUserID() + if err == nil { + log.NewDebug(operationID, utils.GetSelfFuncName(), "userIDList: ", userIDList) + for _, userID := range userIDList { + if err := DeleteMongoMsgAndResetRedisSeq(operationID, userID, constant.WriteDiffusion); err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), userID, constant.WriteDiffusion) + } + } + } else { + log.NewError(operationID, utils.GetSelfFuncName(), err.Error()) + } + + workingGroupIDList, err := im_mysql_model.GetGroupIDListByGroupType(constant.WorkingGroup) + if err == nil { + for _, groupID := range workingGroupIDList { + if err := DeleteMongoMsgAndResetRedisSeq(operationID, groupID, constant.ReadDiffusion); err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), operationID, groupID, constant.ReadDiffusion, err.Error()) + } + } + } else { + log.NewError(operationID, utils.GetSelfFuncName(), err.Error()) + return + } + }) + if err != nil { + panic(err) + } + c.Start() +} + +func getCronTaskOperationID() string { + return cronTaskOperationID + utils.OperationIDGenerator() +} diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 31fa0ff72..894c859c1 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -568,7 +568,7 @@ func (s *groupServer) getGroupUserLevel(groupID, userID string) (int, error) { func (s *groupServer) KickGroupMember(ctx context.Context, req *pbGroup.KickGroupMemberReq) (*pbGroup.KickGroupMemberResp, error) { log.NewInfo(req.OperationID, utils.GetSelfFuncName(), " rpc args ", req.String()) - groupInfo, err := imdb.GetGroupInfoByGroupID(req.GroupID) + groupInfo, err := rocksCache.GetGroupInfoFromCache(req.GroupID) if err != nil { log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetGroupInfoByGroupID", req.GroupID, err.Error()) return &pbGroup.KickGroupMemberResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}, nil @@ -578,7 +578,7 @@ func (s *groupServer) KickGroupMember(ctx context.Context, req *pbGroup.KickGrou if groupInfo.GroupType != constant.SuperGroup { opFlag := 0 if !token_verify.IsManagerUserID(req.OpUserID) { - opInfo, err := imdb.GetGroupMemberInfoByGroupIDAndUserID(req.GroupID, req.OpUserID) + opInfo, err := rocksCache.GetGroupMemberInfoFromCache(req.GroupID, req.OpUserID) if err != nil { errMsg := req.OperationID + " GetGroupMemberInfoByGroupIDAndUserID failed " + err.Error() + req.GroupID + req.OpUserID log.Error(req.OperationID, errMsg) @@ -605,7 +605,7 @@ func (s *groupServer) KickGroupMember(ctx context.Context, req *pbGroup.KickGrou //remove for _, v := range req.KickedUserIDList { - kickedInfo, err := imdb.GetGroupMemberInfoByGroupIDAndUserID(req.GroupID, v) + kickedInfo, err := rocksCache.GetGroupMemberInfoFromCache(req.GroupID, v) if err != nil { log.NewError(req.OperationID, " GetGroupMemberInfoByGroupIDAndUserID failed ", req.GroupID, v, err.Error()) resp.Id2ResultList = append(resp.Id2ResultList, &pbGroup.Id2Result{UserID: v, Result: -1}) diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index 0b171a5eb..91d8ca1d3 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -334,7 +334,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S } m := make(map[string][]string, 2) m[constant.OnlineStatus] = memberUserIDList - log.Debug(pb.OperationID, "send msg cost time1 ", db.GetCurrentTimestampByMill()-newTime, pb.MsgData.ClientMsgID) + log.Debug(pb.OperationID, "send msg cost time1 ", db.GetCurrentTimestampByMill()-newTime, pb.MsgData.ClientMsgID, pb) newTime = db.GetCurrentTimestampByMill() //split parallel send diff --git a/internal/timed_task/init.go b/internal/timed_task/init.go deleted file mode 100644 index fe4e214b0..000000000 --- a/internal/timed_task/init.go +++ /dev/null @@ -1 +0,0 @@ -package timedTask diff --git a/internal/timed_task/timed_task.go b/internal/timed_task/timed_task.go deleted file mode 100644 index 2498c730b..000000000 --- a/internal/timed_task/timed_task.go +++ /dev/null @@ -1,23 +0,0 @@ -package timedTask - -import ( - "Open_IM/pkg/common/constant" - "Open_IM/pkg/common/log" - "Open_IM/pkg/utils" - "github.com/robfig/cron/v3" -) - -func main() { - log.NewInfo(utils.OperationIDGenerator(), "start cron task") - c := cron.New() - _, err := c.AddFunc("30 3-6,20-23 * * *", func() { - operationID := utils.OperationIDGenerator() - if err := DeleteMongoMsgAndResetRedisSeq(operationID, "", constant.ReadDiffusion); err != nil { - log.NewError(operationID) - } - }) - if err != nil { - panic(err) - } - c.Start() -} diff --git a/pkg/common/db/mysql_model/im_mysql_model/group_model.go b/pkg/common/db/mysql_model/im_mysql_model/group_model.go index 19c2b1164..34d420b52 100644 --- a/pkg/common/db/mysql_model/im_mysql_model/group_model.go +++ b/pkg/common/db/mysql_model/im_mysql_model/group_model.go @@ -184,3 +184,11 @@ func GetAllGroupIDList() ([]string, error) { err := db.DB.MysqlDB.DefaultGormDB().Table("groups").Pluck("group_id", &groupIDList).Error return groupIDList, err } + +func GetGroupIDListByGroupType(groupType int) ([]string, error) { + var groupIDList []string + if err := db.DB.MysqlDB.DefaultGormDB().Table("groups").Where("group_type = ? ", groupType).Pluck("group_id", &groupIDList).Error; err != nil { + return nil, err + } + return groupIDList, nil +}