mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-10-26 21:22:16 +08:00 
			
		
		
		
	Merge remote-tracking branch 'origin/v2.3.0release' into v2.3.0release
This commit is contained in:
		
						commit
						923a035e6e
					
				
							
								
								
									
										11
									
								
								cmd/cron_task/main.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										11
									
								
								cmd/cron_task/main.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,11 @@ | ||||
| package main | ||||
| 
 | ||||
| import ( | ||||
| 	"Open_IM/internal/cron_task" | ||||
| 	"fmt" | ||||
| ) | ||||
| 
 | ||||
| func main() { | ||||
| 	fmt.Println("start cronTask") | ||||
| 	cronTask.StartCronTask() | ||||
| } | ||||
| @ -32,6 +32,7 @@ mongo: | ||||
|   dbPassword:  #mongo密码,建议先不设置 | ||||
|   dbMaxPoolSize: 100 | ||||
|   dbRetainChatRecords: 3650 #mongo保存离线消息时间(天),根据需求修改 | ||||
|   chatRecordsClearTime: "* * * * *" # 每天凌晨3点清除消息,该配置和linux定时任务一样, 清理操作建议设置在用户活跃少的时候 # 0 3 * * * | ||||
| 
 | ||||
| redis: | ||||
|   dbAddress: [ 127.0.0.1:16379 ] #redis地址 单机时,填写一个地址即可,使用redis集群时候,填写集群中多个节点地址(主从地址都可以填写,增加容灾能力),默认即可 | ||||
|  | ||||
| @ -79,11 +79,11 @@ func DelSuperGroupMsg(c *gin.Context) { | ||||
| 		resp api.DelSuperGroupMsgResp | ||||
| 	) | ||||
| 	rpcReq := &rpc.DelSuperGroupMsgReq{} | ||||
| 	utils.CopyStructFields(rpcReq, &req) | ||||
| 	if err := c.BindJSON(&req); err != nil { | ||||
| 		c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()}) | ||||
| 		return | ||||
| 	} | ||||
| 	utils.CopyStructFields(rpcReq, &req) | ||||
| 	log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req:", req) | ||||
| 	var ok bool | ||||
| 	var errInfo string | ||||
|  | ||||
							
								
								
									
										113
									
								
								internal/cron_task/clear_msg.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										113
									
								
								internal/cron_task/clear_msg.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,113 @@ | ||||
| package cronTask | ||||
| 
 | ||||
| import ( | ||||
| 	"Open_IM/pkg/common/config" | ||||
| 	"Open_IM/pkg/common/db" | ||||
| 	"Open_IM/pkg/common/log" | ||||
| 	server_api_params "Open_IM/pkg/proto/sdk_ws" | ||||
| 	"Open_IM/pkg/utils" | ||||
| 	"github.com/golang/protobuf/proto" | ||||
| ) | ||||
| 
 | ||||
| const oldestList = 0 | ||||
| const newestList = -1 | ||||
| 
 | ||||
| func ResetUserGroupMinSeq(operationID, groupID string, userIDList []string) error { | ||||
| 	var delMsgIDList [][2]interface{} | ||||
| 	minSeq, err := deleteMongoMsg(operationID, groupID, oldestList, &delMsgIDList) | ||||
| 	if err != nil { | ||||
| 		log.NewError(operationID, utils.GetSelfFuncName(), groupID, "deleteMongoMsg failed") | ||||
| 		return utils.Wrap(err, "") | ||||
| 	} | ||||
| 	for _, userID := range userIDList { | ||||
| 		userMinSeq, err := db.DB.GetGroupUserMinSeq(groupID, userID) | ||||
| 		if err != nil { | ||||
| 			log.NewError(operationID, utils.GetSelfFuncName(), "GetGroupUserMinSeq failed", groupID, userID, err.Error()) | ||||
| 			continue | ||||
| 		} | ||||
| 		if userMinSeq > uint64(minSeq) { | ||||
| 			err = db.DB.SetGroupUserMinSeq(groupID, userID, userMinSeq) | ||||
| 		} else { | ||||
| 			err = db.DB.SetGroupUserMinSeq(groupID, userID, uint64(minSeq)) | ||||
| 		} | ||||
| 		if err != nil { | ||||
| 			log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupID, userID, userMinSeq, minSeq) | ||||
| 		} | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func DeleteMongoMsgAndResetRedisSeq(operationID, userID string) error { | ||||
| 	var delMsgIDList [][2]interface{} | ||||
| 	minSeq, err := deleteMongoMsg(operationID, userID, oldestList, &delMsgIDList) | ||||
| 	if err != nil { | ||||
| 		return utils.Wrap(err, "") | ||||
| 	} | ||||
| 	log.NewDebug(operationID, utils.GetSelfFuncName(), "delMsgIDMap: ", userID, delMsgIDList) | ||||
| 	err = db.DB.SetUserMinSeq(userID, minSeq) | ||||
| 	return err | ||||
| } | ||||
| 
 | ||||
| // recursion | ||||
| func deleteMongoMsg(operationID string, ID string, index int64, delMsgIDList *[][2]interface{}) (uint32, error) { | ||||
| 	// 从最旧的列表开始找 | ||||
| 	msgs, err := db.DB.GetUserMsgListByIndex(ID, index) | ||||
| 	if err != nil { | ||||
| 		return 0, utils.Wrap(err, "GetUserMsgListByIndex failed") | ||||
| 	} | ||||
| 	if len(msgs.Msg) > db.GetSingleGocMsgNum() { | ||||
| 		log.NewWarn(operationID, utils.GetSelfFuncName(), "msgs too large", len(msgs.Msg), msgs.UID) | ||||
| 	} | ||||
| 	log.NewDebug(operationID, utils.GetSelfFuncName(), "get msgs: ", msgs.UID) | ||||
| 	for i, msg := range msgs.Msg { | ||||
| 		// 找到列表中不需要删除的消息了 | ||||
| 		if utils.GetCurrentTimestampByMill() < msg.SendTime+int64(config.Config.Mongo.DBRetainChatRecords)*24*60*60*1000 { | ||||
| 			if len(*delMsgIDList) > 0 { | ||||
| 				var IDList []string | ||||
| 				for _, v := range *delMsgIDList { | ||||
| 					IDList = append(IDList, v[0].(string)) | ||||
| 				} | ||||
| 				err := db.DB.DelMongoMsgs(IDList) | ||||
| 				if err != nil { | ||||
| 					return 0, utils.Wrap(err, "DelMongoMsgs failed") | ||||
| 				} | ||||
| 			} | ||||
| 			minSeq := getDelMaxSeqByIDList(*delMsgIDList) | ||||
| 			if i > 0 { | ||||
| 				msgPb := &server_api_params.MsgData{} | ||||
| 				err = proto.Unmarshal(msg.Msg, msgPb) | ||||
| 				if err != nil { | ||||
| 					log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), ID, index) | ||||
| 				} else { | ||||
| 					err = db.DB.ReplaceMsgToBlankByIndex(msgs.UID, i-1) | ||||
| 					if err != nil { | ||||
| 						log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), msgs.UID, i) | ||||
| 						return minSeq, nil | ||||
| 					} | ||||
| 					minSeq = msgPb.Seq - 1 | ||||
| 				} | ||||
| 			} | ||||
| 			return minSeq, nil | ||||
| 		} | ||||
| 	} | ||||
| 	msgPb := &server_api_params.MsgData{} | ||||
| 	err = proto.Unmarshal(msgs.Msg[len(msgs.Msg)-1].Msg, msgPb) | ||||
| 	if err != nil { | ||||
| 		log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), len(msgs.Msg)-1, msgs.UID) | ||||
| 		return 0, utils.Wrap(err, "proto.Unmarshal failed") | ||||
| 	} | ||||
| 	*delMsgIDList = append(*delMsgIDList, [2]interface{}{msgs.UID, msgPb.Seq}) | ||||
| 	// 没有找到 代表需要全部删除掉 继续递归查找下一个比较旧的列表 | ||||
| 	seq, err := deleteMongoMsg(operationID, utils.GetSelfFuncName(), index+1, delMsgIDList) | ||||
| 	if err != nil { | ||||
| 		return 0, utils.Wrap(err, "deleteMongoMsg failed") | ||||
| 	} | ||||
| 	return seq, nil | ||||
| } | ||||
| 
 | ||||
| func getDelMaxSeqByIDList(delMsgIDList [][2]interface{}) uint32 { | ||||
| 	if len(delMsgIDList) == 0 { | ||||
| 		return 0 | ||||
| 	} | ||||
| 	return delMsgIDList[len(delMsgIDList)-1][1].(uint32) | ||||
| } | ||||
							
								
								
									
										65
									
								
								internal/cron_task/cron_task.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										65
									
								
								internal/cron_task/cron_task.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,65 @@ | ||||
| package cronTask | ||||
| 
 | ||||
| import ( | ||||
| 	"Open_IM/pkg/common/constant" | ||||
| 	"Open_IM/pkg/common/db/mysql_model/im_mysql_model" | ||||
| 	rocksCache "Open_IM/pkg/common/db/rocks_cache" | ||||
| 	"Open_IM/pkg/common/log" | ||||
| 	"Open_IM/pkg/utils" | ||||
| 	"fmt" | ||||
| 	"github.com/robfig/cron/v3" | ||||
| 	"time" | ||||
| ) | ||||
| 
 | ||||
| const cronTaskOperationID = "cronTaskOperationID-" | ||||
| 
 | ||||
| func StartCronTask() { | ||||
| 	log.NewInfo(utils.OperationIDGenerator(), "start cron task") | ||||
| 	c := cron.New() | ||||
| 	_, err := c.AddFunc("30 3-6,20-23 * * *", func() { | ||||
| 		operationID := getCronTaskOperationID() | ||||
| 		userIDList, err := im_mysql_model.SelectAllUserID() | ||||
| 		if err == nil { | ||||
| 			log.NewDebug(operationID, utils.GetSelfFuncName(), "userIDList: ", userIDList) | ||||
| 			for _, userID := range userIDList { | ||||
| 				if err := DeleteMongoMsgAndResetRedisSeq(operationID, userID); err != nil { | ||||
| 					log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), userID) | ||||
| 				} | ||||
| 			} | ||||
| 		} else { | ||||
| 			log.NewError(operationID, utils.GetSelfFuncName(), err.Error()) | ||||
| 		} | ||||
| 
 | ||||
| 		workingGroupIDList, err := im_mysql_model.GetGroupIDListByGroupType(constant.WorkingGroup) | ||||
| 		if err == nil { | ||||
| 			for _, groupID := range workingGroupIDList { | ||||
| 				userIDList, err = rocksCache.GetGroupMemberIDListFromCache(groupID) | ||||
| 				if err != nil { | ||||
| 					log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupID) | ||||
| 					continue | ||||
| 				} | ||||
| 				log.NewDebug(operationID, utils.GetSelfFuncName(), "groupID:", groupID, "userIDList:", userIDList) | ||||
| 				if err := ResetUserGroupMinSeq(operationID, groupID, userIDList); err != nil { | ||||
| 					log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupID, userIDList) | ||||
| 				} | ||||
| 
 | ||||
| 			} | ||||
| 		} else { | ||||
| 			log.NewError(operationID, utils.GetSelfFuncName(), err.Error()) | ||||
| 			return | ||||
| 		} | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		fmt.Println("start cron failed", err.Error()) | ||||
| 		panic(err) | ||||
| 	} | ||||
| 	c.Start() | ||||
| 	fmt.Println("start cron task success") | ||||
| 	for { | ||||
| 		time.Sleep(time.Second) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func getCronTaskOperationID() string { | ||||
| 	return cronTaskOperationID + utils.OperationIDGenerator() | ||||
| } | ||||
| @ -568,7 +568,7 @@ func (s *groupServer) getGroupUserLevel(groupID, userID string) (int, error) { | ||||
| 
 | ||||
| func (s *groupServer) KickGroupMember(ctx context.Context, req *pbGroup.KickGroupMemberReq) (*pbGroup.KickGroupMemberResp, error) { | ||||
| 	log.NewInfo(req.OperationID, utils.GetSelfFuncName(), " rpc args ", req.String()) | ||||
| 	groupInfo, err := imdb.GetGroupInfoByGroupID(req.GroupID) | ||||
| 	groupInfo, err := rocksCache.GetGroupInfoFromCache(req.GroupID) | ||||
| 	if err != nil { | ||||
| 		log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetGroupInfoByGroupID", req.GroupID, err.Error()) | ||||
| 		return &pbGroup.KickGroupMemberResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}, nil | ||||
| @ -578,7 +578,7 @@ func (s *groupServer) KickGroupMember(ctx context.Context, req *pbGroup.KickGrou | ||||
| 	if groupInfo.GroupType != constant.SuperGroup { | ||||
| 		opFlag := 0 | ||||
| 		if !token_verify.IsManagerUserID(req.OpUserID) { | ||||
| 			opInfo, err := imdb.GetGroupMemberInfoByGroupIDAndUserID(req.GroupID, req.OpUserID) | ||||
| 			opInfo, err := rocksCache.GetGroupMemberInfoFromCache(req.GroupID, req.OpUserID) | ||||
| 			if err != nil { | ||||
| 				errMsg := req.OperationID + " GetGroupMemberInfoByGroupIDAndUserID  failed " + err.Error() + req.GroupID + req.OpUserID | ||||
| 				log.Error(req.OperationID, errMsg) | ||||
| @ -605,7 +605,7 @@ func (s *groupServer) KickGroupMember(ctx context.Context, req *pbGroup.KickGrou | ||||
| 
 | ||||
| 		//remove | ||||
| 		for _, v := range req.KickedUserIDList { | ||||
| 			kickedInfo, err := imdb.GetGroupMemberInfoByGroupIDAndUserID(req.GroupID, v) | ||||
| 			kickedInfo, err := rocksCache.GetGroupMemberInfoFromCache(req.GroupID, v) | ||||
| 			if err != nil { | ||||
| 				log.NewError(req.OperationID, " GetGroupMemberInfoByGroupIDAndUserID failed ", req.GroupID, v, err.Error()) | ||||
| 				resp.Id2ResultList = append(resp.Id2ResultList, &pbGroup.Id2Result{UserID: v, Result: -1}) | ||||
|  | ||||
| @ -334,7 +334,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S | ||||
| 		} | ||||
| 		m := make(map[string][]string, 2) | ||||
| 		m[constant.OnlineStatus] = memberUserIDList | ||||
| 		log.Debug(pb.OperationID, "send msg cost time1 ", db.GetCurrentTimestampByMill()-newTime, pb.MsgData.ClientMsgID) | ||||
| 		log.Debug(pb.OperationID, "send msg cost time1 ", db.GetCurrentTimestampByMill()-newTime, pb.MsgData.ClientMsgID, pb) | ||||
| 		newTime = db.GetCurrentTimestampByMill() | ||||
| 
 | ||||
| 		//split  parallel send | ||||
| @ -476,6 +476,11 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S | ||||
| 			log.NewError(msgToMQSingle.OperationID, "kafka send msg err:RecvID", msgToMQSingle.MsgData.RecvID, msgToMQSingle.String()) | ||||
| 			return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) | ||||
| 		} | ||||
| 		// callback | ||||
| 		callbackResp = callbackAfterSendGroupMsg(pb) | ||||
| 		if callbackResp.ErrCode != 0 { | ||||
| 			log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackAfterSendSuperGroupMsg resp: ", callbackResp) | ||||
| 		} | ||||
| 		return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime) | ||||
| 
 | ||||
| 	default: | ||||
| @ -967,8 +972,17 @@ func (rpc *rpcChat) sendMsgToGroup(list []string, pb pbChat.SendMsgReq, status s | ||||
| 
 | ||||
| func (rpc *rpcChat) sendMsgToGroupOptimization(list []string, groupPB *pbChat.SendMsgReq, status string, sendTag *bool, wg *sync.WaitGroup) { | ||||
| 	msgToMQGroup := pbChat.MsgDataToMQ{Token: groupPB.Token, OperationID: groupPB.OperationID, MsgData: groupPB.MsgData} | ||||
| 	tempOptions := make(map[string]bool, 1) | ||||
| 	for k, v := range groupPB.MsgData.Options { | ||||
| 		tempOptions[k] = v | ||||
| 	} | ||||
| 	for _, v := range list { | ||||
| 		groupPB.MsgData.RecvID = v | ||||
| 		options := make(map[string]bool, 1) | ||||
| 		for k, v := range tempOptions { | ||||
| 			options[k] = v | ||||
| 		} | ||||
| 		groupPB.MsgData.Options = options | ||||
| 		isSend := modifyMessageByUserMessageReceiveOpt(v, groupPB.MsgData.GroupID, constant.GroupChatType, groupPB) | ||||
| 		if isSend { | ||||
| 			if v == "" || groupPB.MsgData.SendID == "" { | ||||
|  | ||||
| @ -1,89 +0,0 @@ | ||||
| package timedTask | ||||
| 
 | ||||
| import ( | ||||
| 	"Open_IM/pkg/common/config" | ||||
| 	"Open_IM/pkg/common/constant" | ||||
| 	"Open_IM/pkg/common/db" | ||||
| 	"Open_IM/pkg/common/log" | ||||
| 	server_api_params "Open_IM/pkg/proto/sdk_ws" | ||||
| 	"Open_IM/pkg/utils" | ||||
| 	"github.com/golang/protobuf/proto" | ||||
| 	"strconv" | ||||
| 	"strings" | ||||
| ) | ||||
| 
 | ||||
| const oldestList = 0 | ||||
| const newestList = -1 | ||||
| 
 | ||||
| func DeleteMongoMsgAndResetRedisSeq(operationID, ID string, diffusionType int) error { | ||||
| 	// -1 表示从当前最早的一个开始 | ||||
| 	var delMsgIDList []string | ||||
| 	minSeq, err := deleteMongoMsg(operationID, ID, oldestList, &delMsgIDList) | ||||
| 	if err != nil { | ||||
| 		return utils.Wrap(err, "") | ||||
| 	} | ||||
| 	log.NewDebug(operationID, utils.GetSelfFuncName(), "delMsgIDList: ", delMsgIDList) | ||||
| 	if diffusionType == constant.WriteDiffusion { | ||||
| 		err = db.DB.SetUserMinSeq(ID, minSeq) | ||||
| 	} else if diffusionType == constant.ReadDiffusion { | ||||
| 		err = db.DB.SetGroupMinSeq(ID, minSeq) | ||||
| 	} | ||||
| 	return err | ||||
| } | ||||
| 
 | ||||
| // recursion | ||||
| func deleteMongoMsg(operationID string, ID string, index int64, IDList *[]string) (uint32, error) { | ||||
| 	// 从最旧的列表开始找 | ||||
| 	msgs, err := db.DB.GetUserMsgListByIndex(ID, index) | ||||
| 	if err != nil { | ||||
| 		return 0, utils.Wrap(err, "GetUserMsgListByIndex failed") | ||||
| 	} | ||||
| 	log.NewDebug(operationID, utils.GetSelfFuncName(), "get msgs: ", msgs.UID) | ||||
| 	for i, msg := range msgs.Msg { | ||||
| 		// 找到列表中不需要删除的消息了 | ||||
| 		if msg.SendTime+int64(config.Config.Mongo.DBRetainChatRecords) > utils.GetCurrentTimestampByMill() { | ||||
| 			if len(*IDList) > 0 { | ||||
| 				err := db.DB.DelMongoMsgs(*IDList) | ||||
| 				if err != nil { | ||||
| 					return 0, utils.Wrap(err, "DelMongoMsgs failed") | ||||
| 				} | ||||
| 			} | ||||
| 			minSeq := getDelMaxSeqByIDList(*IDList) | ||||
| 			if i > 0 { | ||||
| 				msgPb := &server_api_params.MsgData{} | ||||
| 				err = proto.Unmarshal(msg.Msg, msgPb) | ||||
| 				if err != nil { | ||||
| 					log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), ID, index) | ||||
| 				} else { | ||||
| 					err = db.DB.ReplaceMsgToBlankByIndex(msgs.UID, i-1) | ||||
| 					if err != nil { | ||||
| 						log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), msgs.UID, i) | ||||
| 						return minSeq, nil | ||||
| 					} | ||||
| 					minSeq = msgPb.Seq - 1 | ||||
| 				} | ||||
| 			} | ||||
| 			return minSeq, nil | ||||
| 		} | ||||
| 	} | ||||
| 	*IDList = append(*IDList, msgs.UID) | ||||
| 	// 没有找到 代表需要全部删除掉 继续查找下一个比较旧的列表 | ||||
| 	seq, err := deleteMongoMsg(operationID, utils.GetSelfFuncName(), index-1, IDList) | ||||
| 	if err != nil { | ||||
| 		return 0, utils.Wrap(err, "deleteMongoMsg failed") | ||||
| 	} | ||||
| 	return seq, nil | ||||
| } | ||||
| 
 | ||||
| func getDelMaxSeqByIDList(IDList []string) uint32 { | ||||
| 	if len(IDList) == 0 { | ||||
| 		return 0 | ||||
| 	} | ||||
| 	l := strings.Split(IDList[len(IDList)-1], ":") | ||||
| 	index, _ := strconv.Atoi(l[len(l)-1]) | ||||
| 	if index == 0 { | ||||
| 		// 4999 | ||||
| 		return uint32(db.GetSingleGocMsgNum()) - 1 | ||||
| 	} // 5000 | ||||
| 	return (uint32(db.GetSingleGocMsgNum()) - 1) + uint32(index*db.GetSingleGocMsgNum()) | ||||
| } | ||||
| @ -1 +0,0 @@ | ||||
| package timedTask | ||||
| @ -1,23 +0,0 @@ | ||||
| package timedTask | ||||
| 
 | ||||
| import ( | ||||
| 	"Open_IM/pkg/common/constant" | ||||
| 	"Open_IM/pkg/common/log" | ||||
| 	"Open_IM/pkg/utils" | ||||
| 	"github.com/robfig/cron/v3" | ||||
| ) | ||||
| 
 | ||||
| func main() { | ||||
| 	log.NewInfo(utils.OperationIDGenerator(), "start cron task") | ||||
| 	c := cron.New() | ||||
| 	_, err := c.AddFunc("30 3-6,20-23 * * *", func() { | ||||
| 		operationID := utils.OperationIDGenerator() | ||||
| 		if err := DeleteMongoMsgAndResetRedisSeq(operationID, "", constant.ReadDiffusion); err != nil { | ||||
| 			log.NewError(operationID) | ||||
| 		} | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		panic(err) | ||||
| 	} | ||||
| 	c.Start() | ||||
| } | ||||
| @ -92,16 +92,17 @@ type config struct { | ||||
| 		DBMaxLifeTime  int      `yaml:"dbMaxLifeTime"` | ||||
| 	} | ||||
| 	Mongo struct { | ||||
| 		DBUri               string `yaml:"dbUri"` | ||||
| 		DBAddress           string `yaml:"dbAddress"` | ||||
| 		DBDirect            bool   `yaml:"dbDirect"` | ||||
| 		DBTimeout           int    `yaml:"dbTimeout"` | ||||
| 		DBDatabase          string `yaml:"dbDatabase"` | ||||
| 		DBSource            string `yaml:"dbSource"` | ||||
| 		DBUserName          string `yaml:"dbUserName"` | ||||
| 		DBPassword          string `yaml:"dbPassword"` | ||||
| 		DBMaxPoolSize       int    `yaml:"dbMaxPoolSize"` | ||||
| 		DBRetainChatRecords int    `yaml:"dbRetainChatRecords"` | ||||
| 		DBUri                string `yaml:"dbUri"` | ||||
| 		DBAddress            string `yaml:"dbAddress"` | ||||
| 		DBDirect             bool   `yaml:"dbDirect"` | ||||
| 		DBTimeout            int    `yaml:"dbTimeout"` | ||||
| 		DBDatabase           string `yaml:"dbDatabase"` | ||||
| 		DBSource             string `yaml:"dbSource"` | ||||
| 		DBUserName           string `yaml:"dbUserName"` | ||||
| 		DBPassword           string `yaml:"dbPassword"` | ||||
| 		DBMaxPoolSize        int    `yaml:"dbMaxPoolSize"` | ||||
| 		DBRetainChatRecords  int    `yaml:"dbRetainChatRecords"` | ||||
| 		ChatRecordsClearTime string `yaml:"chatRecordsClearTime"` | ||||
| 	} | ||||
| 	Redis struct { | ||||
| 		DBAddress     []string `yaml:"dbAddress"` | ||||
|  | ||||
| @ -227,12 +227,6 @@ const ( | ||||
| 	WorkMomentAtUserNotification  = 2 | ||||
| ) | ||||
| 
 | ||||
| const ( | ||||
| 	// diffusionType | ||||
| 	WriteDiffusion = 0 | ||||
| 	ReadDiffusion  = 1 | ||||
| ) | ||||
| 
 | ||||
| const ( | ||||
| 	AtAllString       = "AtAllTag" | ||||
| 	AtNormal          = 0 | ||||
|  | ||||
| @ -360,10 +360,18 @@ func (d *DataBases) DelUserSignalList(userID string) error { | ||||
| func (d *DataBases) DelMsgFromCache(uid string, seqList []uint32, operationID string) { | ||||
| 	for _, seq := range seqList { | ||||
| 		key := messageCache + uid + "_" + strconv.Itoa(int(seq)) | ||||
| 		result := d.RDB.Get(context.Background(), key).String() | ||||
| 		result, err := d.RDB.Get(context.Background(), key).Result() | ||||
| 		if err != nil { | ||||
| 			if err == go_redis.Nil { | ||||
| 				log2.NewDebug(operationID, utils.GetSelfFuncName(), err.Error(), "redis nil") | ||||
| 			} else { | ||||
| 				log2.NewError(operationID, utils.GetSelfFuncName(), err.Error(), key) | ||||
| 			} | ||||
| 			continue | ||||
| 		} | ||||
| 		var msg pbCommon.MsgData | ||||
| 		if err := utils.String2Pb(result, &msg); err != nil { | ||||
| 			log2.Error(operationID, utils.GetSelfFuncName(), "String2Pb failed", msg, err.Error()) | ||||
| 			log2.Error(operationID, utils.GetSelfFuncName(), "String2Pb failed", msg, result, key, err.Error()) | ||||
| 			continue | ||||
| 		} | ||||
| 		msg.Status = constant.MsgDeleted | ||||
|  | ||||
| @ -184,3 +184,11 @@ func GetAllGroupIDList() ([]string, error) { | ||||
| 	err := db.DB.MysqlDB.DefaultGormDB().Table("groups").Pluck("group_id", &groupIDList).Error | ||||
| 	return groupIDList, err | ||||
| } | ||||
| 
 | ||||
| func GetGroupIDListByGroupType(groupType int) ([]string, error) { | ||||
| 	var groupIDList []string | ||||
| 	if err := db.DB.MysqlDB.DefaultGormDB().Table("groups").Where("group_type = ? ", groupType).Pluck("group_id", &groupIDList).Error; err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	return groupIDList, nil | ||||
| } | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user