mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-10-26 21:22:16 +08:00 
			
		
		
		
	Merge branch 'openimsdk:main' into main
This commit is contained in:
		
						commit
						1a4a0daead
					
				
							
								
								
									
										2
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								go.mod
									
									
									
									
									
								
							| @ -14,7 +14,7 @@ require ( | |||||||
| 	github.com/gorilla/websocket v1.5.1 | 	github.com/gorilla/websocket v1.5.1 | ||||||
| 	github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 | 	github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 | ||||||
| 	github.com/mitchellh/mapstructure v1.5.0 | 	github.com/mitchellh/mapstructure v1.5.0 | ||||||
| 	github.com/openimsdk/protocol v0.0.72-alpha.57 | 	github.com/openimsdk/protocol v0.0.72-alpha.59 | ||||||
| 	github.com/openimsdk/tools v0.0.50-alpha.38 | 	github.com/openimsdk/tools v0.0.50-alpha.38 | ||||||
| 	github.com/pkg/errors v0.9.1 // indirect | 	github.com/pkg/errors v0.9.1 // indirect | ||||||
| 	github.com/prometheus/client_golang v1.18.0 | 	github.com/prometheus/client_golang v1.18.0 | ||||||
|  | |||||||
							
								
								
									
										4
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										4
									
								
								go.sum
									
									
									
									
									
								
							| @ -319,8 +319,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= | |||||||
| github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= | github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= | ||||||
| github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= | github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= | ||||||
| github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= | github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= | ||||||
| github.com/openimsdk/protocol v0.0.72-alpha.57 h1:oAVg0SJkDK15L8yDrL0KPG32f3iB/vjEpfpX577p5n4= | github.com/openimsdk/protocol v0.0.72-alpha.59 h1:+ycb2+68mLKPIo7VrxF0id/GXP6OqZ2/nBM1YZQr7qY= | ||||||
| github.com/openimsdk/protocol v0.0.72-alpha.57/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M= | github.com/openimsdk/protocol v0.0.72-alpha.59/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M= | ||||||
| github.com/openimsdk/tools v0.0.50-alpha.38 h1:AU6/cvDfN4ciIOwAj8IWEwze3DeEp2cHYPgW3y0OlbU= | github.com/openimsdk/tools v0.0.50-alpha.38 h1:AU6/cvDfN4ciIOwAj8IWEwze3DeEp2cHYPgW3y0OlbU= | ||||||
| github.com/openimsdk/tools v0.0.50-alpha.38/go.mod h1:/Em/fQH46CuWf60+hcmvZyboGCQpSDEb2MdQ4nmQRAk= | github.com/openimsdk/tools v0.0.50-alpha.38/go.mod h1:/Em/fQH46CuWf60+hcmvZyboGCQpSDEb2MdQ4nmQRAk= | ||||||
| github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= | github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= | ||||||
|  | |||||||
| @ -414,6 +414,7 @@ func (c *ConsumerHandler) DeleteMemberAndSetConversationSeq(ctx context.Context, | |||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
|  | 	 | ||||||
| 	return c.conversationRpcClient.SetConversationMaxSeq(ctx, userIDs, conversationID, maxSeq) | 	return c.conversationRpcClient.SetConversationMaxSeq(ctx, userIDs, conversationID, maxSeq) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -441,6 +441,7 @@ func (c *conversationServer) SetConversationMaxSeq(ctx context.Context, req *pbc | |||||||
| 		map[string]any{"max_seq": req.MaxSeq}); err != nil { | 		map[string]any{"max_seq": req.MaxSeq}); err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
|  | 
 | ||||||
| 	return &pbconversation.SetConversationMaxSeqResp{}, nil | 	return &pbconversation.SetConversationMaxSeqResp{}, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| @ -670,7 +671,7 @@ func (c *conversationServer) GetOwnerConversation(ctx context.Context, req *pbco | |||||||
| 	}, nil | 	}, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *conversationServer) GetConversationsNeedDestructMsgs(ctx context.Context, _ *pbconversation.GetConversationsNeedDestructMsgsReq) (*pbconversation.GetConversationsNeedDestructMsgsResp, error) { | func (c *conversationServer) GetConversationsNeedClearMsg(ctx context.Context, _ *pbconversation.GetConversationsNeedClearMsgReq) (*pbconversation.GetConversationsNeedClearMsgResp, error) { | ||||||
| 	num, err := c.conversationDatabase.GetAllConversationIDsNumber(ctx) | 	num, err := c.conversationDatabase.GetAllConversationIDsNumber(ctx) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		log.ZError(ctx, "GetAllConversationIDsNumber failed", err) | 		log.ZError(ctx, "GetAllConversationIDsNumber failed", err) | ||||||
| @ -694,7 +695,7 @@ func (c *conversationServer) GetConversationsNeedDestructMsgs(ctx context.Contex | |||||||
| 
 | 
 | ||||||
| 		conversationIDs, err := c.conversationDatabase.PageConversationIDs(ctx, pagination) | 		conversationIDs, err := c.conversationDatabase.PageConversationIDs(ctx, pagination) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			// log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber) | 			log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber) | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| @ -717,7 +718,7 @@ func (c *conversationServer) GetConversationsNeedDestructMsgs(ctx context.Contex | |||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	return &pbconversation.GetConversationsNeedDestructMsgsResp{Conversations: convert.ConversationsDB2Pb(temp)}, nil | 	return &pbconversation.GetConversationsNeedClearMsgResp{Conversations: convert.ConversationsDB2Pb(temp)}, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *conversationServer) GetNotNotifyConversationIDs(ctx context.Context, req *pbconversation.GetNotNotifyConversationIDsReq) (*pbconversation.GetNotNotifyConversationIDsResp, error) { | func (c *conversationServer) GetNotNotifyConversationIDs(ctx context.Context, req *pbconversation.GetNotNotifyConversationIDsReq) (*pbconversation.GetNotNotifyConversationIDsResp, error) { | ||||||
|  | |||||||
| @ -964,6 +964,7 @@ func (g *groupServer) deleteMemberAndSetConversationSeq(ctx context.Context, gro | |||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
|  | 
 | ||||||
| 	return g.conversationRpcClient.SetConversationMaxSeq(ctx, userIDs, conevrsationID, maxSeq) | 	return g.conversationRpcClient.SetConversationMaxSeq(ctx, userIDs, conevrsationID, maxSeq) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -12,13 +12,14 @@ import ( | |||||||
| 	"github.com/openimsdk/tools/errs" | 	"github.com/openimsdk/tools/errs" | ||||||
| 	"github.com/openimsdk/tools/log" | 	"github.com/openimsdk/tools/log" | ||||||
| 	"github.com/openimsdk/tools/mcontext" | 	"github.com/openimsdk/tools/mcontext" | ||||||
|  | 	"github.com/openimsdk/tools/utils/datautil" | ||||||
| 	"github.com/openimsdk/tools/utils/idutil" | 	"github.com/openimsdk/tools/utils/idutil" | ||||||
| 	"github.com/openimsdk/tools/utils/stringutil" | 	"github.com/openimsdk/tools/utils/stringutil" | ||||||
| 	"golang.org/x/sync/errgroup" | 	"golang.org/x/sync/errgroup" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| // hard delete in Database. | // hard delete in Database. | ||||||
| func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.ClearMsgResp, err error) { | func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) (_ *msg.DestructMsgsResp, err error) { | ||||||
| 	if err := authverify.CheckAdmin(ctx, m.config.Share.IMAdminUserID); err != nil { | 	if err := authverify.CheckAdmin(ctx, m.config.Share.IMAdminUserID); err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
| @ -29,15 +30,16 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg. | |||||||
| 		docNum   int | 		docNum   int | ||||||
| 		msgNum   int | 		msgNum   int | ||||||
| 		start    = time.Now() | 		start    = time.Now() | ||||||
|  | 		getLimit = 5000 | ||||||
| 	) | 	) | ||||||
| 
 | 
 | ||||||
| 	clearMsg := func(ctx context.Context) (bool, error) { | 	destructMsg := func(ctx context.Context) (bool, error) { | ||||||
| 		docIDs, err := m.MsgDatabase.GetDocIDs(ctx) | 		docIDs, err := m.MsgDatabase.GetDocIDs(ctx) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return false, err | 			return false, err | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		msgs, err := m.MsgDatabase.GetBeforeMsg(ctx, req.Timestamp, docIDs, 5000) | 		msgs, err := m.MsgDatabase.GetBeforeMsg(ctx, req.Timestamp, docIDs, getLimit) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return false, err | 			return false, err | ||||||
| 		} | 		} | ||||||
| @ -61,7 +63,7 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg. | |||||||
| 		return true, nil | 		return true, nil | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	_, err = clearMsg(ctx) | 	_, err = destructMsg(ctx) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		log.ZError(ctx, "clear msg failed", err, "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start)) | 		log.ZError(ctx, "clear msg failed", err, "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start)) | ||||||
| 		return nil, err | 		return nil, err | ||||||
| @ -69,11 +71,11 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg. | |||||||
| 
 | 
 | ||||||
| 	log.ZDebug(ctx, "clearing message", "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start)) | 	log.ZDebug(ctx, "clearing message", "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start)) | ||||||
| 
 | 
 | ||||||
| 	return &msg.ClearMsgResp{}, nil | 	return &msg.DestructMsgsResp{}, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // soft delete for self | // soft delete for user self | ||||||
| func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) (_ *msg.DestructMsgsResp, err error) { | func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.ClearMsgResp, err error) { | ||||||
| 	temp := convert.ConversationsPb2DB(req.Conversations) | 	temp := convert.ConversationsPb2DB(req.Conversations) | ||||||
| 
 | 
 | ||||||
| 	batchNum := 100 | 	batchNum := 100 | ||||||
| @ -93,22 +95,31 @@ func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) | |||||||
| 					"msgDestructTime", conversation.MsgDestructTime, | 					"msgDestructTime", conversation.MsgDestructTime, | ||||||
| 					"lastMsgDestructTime", conversation.LatestMsgDestructTime) | 					"lastMsgDestructTime", conversation.LatestMsgDestructTime) | ||||||
| 
 | 
 | ||||||
| 				seqs, err := m.MsgDatabase.UserMsgsDestruct(handleCtx, conversation.OwnerUserID, conversation.ConversationID, conversation.MsgDestructTime, conversation.LatestMsgDestructTime) | 				seqs, err := m.MsgDatabase.ClearUserMsgs(handleCtx, conversation.OwnerUserID, conversation.ConversationID, conversation.MsgDestructTime, conversation.LatestMsgDestructTime) | ||||||
| 				if err != nil { | 				if err != nil { | ||||||
| 					log.ZError(handleCtx, "user msg destruct failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID) | 					log.ZError(handleCtx, "user msg destruct failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID) | ||||||
| 					continue | 					continue | ||||||
| 				} | 				} | ||||||
| 
 | 
 | ||||||
| 				if len(seqs) > 0 { | 				if len(seqs) > 0 { | ||||||
|  | 					minseq := datautil.Max(seqs...) | ||||||
|  | 
 | ||||||
|  | 					// update | ||||||
| 					if err := m.Conversation.UpdateConversation(handleCtx, | 					if err := m.Conversation.UpdateConversation(handleCtx, | ||||||
| 						&pbconversation.UpdateConversationReq{ | 						&pbconversation.UpdateConversationReq{ | ||||||
| 							UserIDs:               []string{conversation.OwnerUserID}, | 							UserIDs:               []string{conversation.OwnerUserID}, | ||||||
| 							ConversationID:        conversation.ConversationID, | 							ConversationID:        conversation.ConversationID, | ||||||
| 							LatestMsgDestructTime: wrapperspb.Int64(time.Now().UnixMilli())}); err != nil { | 							LatestMsgDestructTime: wrapperspb.Int64(time.Now().UnixMilli()), | ||||||
|  | 							MinSeq:                wrapperspb.Int64(minseq), | ||||||
|  | 						}); err != nil { | ||||||
| 						log.ZError(handleCtx, "updateUsersConversationField failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID) | 						log.ZError(handleCtx, "updateUsersConversationField failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID) | ||||||
| 						continue | 						continue | ||||||
| 					} | 					} | ||||||
| 
 | 
 | ||||||
|  | 					if err := m.Conversation.SetConversationMinSeq(handleCtx, []string{conversation.OwnerUserID}, conversation.ConversationID, minseq); err != nil { | ||||||
|  | 						return err | ||||||
|  | 					} | ||||||
|  | 
 | ||||||
| 					// if you need Notify SDK client userseq is update. | 					// if you need Notify SDK client userseq is update. | ||||||
| 					// m.msgNotificationSender.UserDeleteMsgsNotification(handleCtx, conversation.OwnerUserID, conversation.ConversationID, seqs) | 					// m.msgNotificationSender.UserDeleteMsgsNotification(handleCtx, conversation.OwnerUserID, conversation.ConversationID, seqs) | ||||||
| 				} | 				} | ||||||
|  | |||||||
| @ -76,42 +76,43 @@ func Start(ctx context.Context, config *CronTaskConfig) error { | |||||||
| 	crontab := cron.New() | 	crontab := cron.New() | ||||||
| 
 | 
 | ||||||
| 	// scheduled hard delete outdated Msgs in specific time. | 	// scheduled hard delete outdated Msgs in specific time. | ||||||
| 	clearMsgFunc := func() { | 	destructMsgsFunc := func() { | ||||||
| 		now := time.Now() | 		now := time.Now() | ||||||
| 		deltime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.RetainChatRecords)) | 		deltime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.RetainChatRecords)) | ||||||
| 		ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deltime.UnixMilli())) | 		ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deltime.UnixMilli())) | ||||||
| 		log.ZDebug(ctx, "clear chat records", "deltime", deltime, "timestamp", deltime.UnixMilli()) | 		log.ZDebug(ctx, "Destruct chat records", "deltime", deltime, "timestamp", deltime.UnixMilli()) | ||||||
| 
 | 
 | ||||||
| 		if _, err := msgClient.ClearMsg(ctx, &msg.ClearMsgReq{Timestamp: deltime.UnixMilli()}); err != nil { | 		if _, err := msgClient.DestructMsgs(ctx, &msg.DestructMsgsReq{Timestamp: deltime.UnixMilli()}); err != nil { | ||||||
| 			log.ZError(ctx, "cron clear chat records failed", err, "deltime", deltime, "cont", time.Since(now)) | 			log.ZError(ctx, "cron destruct chat records failed", err, "deltime", deltime, "cont", time.Since(now)) | ||||||
| 			return | 			return | ||||||
| 		} | 		} | ||||||
| 		log.ZDebug(ctx, "cron clear chat records success", "deltime", deltime, "cont", time.Since(now)) | 		log.ZDebug(ctx, "cron destruct chat records success", "deltime", deltime, "cont", time.Since(now)) | ||||||
| 	} | 	} | ||||||
| 	if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, clearMsgFunc); err != nil { | 	if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, destructMsgsFunc); err != nil { | ||||||
| 		return errs.Wrap(err) | 		return errs.Wrap(err) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	// scheduled soft delete outdated Msgs in specific time when user set `is_msg_destruct` feature. | 	// scheduled soft delete outdated Msgs in specific time when user set `is_msg_destruct` feature. | ||||||
| 	msgDestructFunc := func() { | 	clearMsgFunc := func() { | ||||||
| 		now := time.Now() | 		now := time.Now() | ||||||
| 		ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), now.UnixMilli())) | 		ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), now.UnixMilli())) | ||||||
| 		log.ZDebug(ctx, "msg destruct cron start", "now", now) | 		log.ZDebug(ctx, "clear msg cron start", "now", now) | ||||||
| 
 | 
 | ||||||
| 		conversations, err := conversationClient.GetConversationsNeedDestructMsgs(ctx, &pbconversation.GetConversationsNeedDestructMsgsReq{}) | 		conversations, err := conversationClient.GetConversationsNeedClearMsg(ctx, &pbconversation.GetConversationsNeedClearMsgReq{}) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			log.ZError(ctx, "Get conversation need Destruct msgs failed.", err) | 			log.ZError(ctx, "Get conversation need Destruct msgs failed.", err) | ||||||
| 			return | 			return | ||||||
| 		} else { | 		} | ||||||
| 			_, err := msgClient.DestructMsgs(ctx, &msg.DestructMsgsReq{Conversations: conversations.Conversations}) | 
 | ||||||
|  | 		_, err = msgClient.ClearMsg(ctx, &msg.ClearMsgReq{Conversations: conversations.Conversations}) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 				log.ZError(ctx, "Destruct Msgs failed.", err) | 			log.ZError(ctx, "Clear Msg failed.", err) | ||||||
| 			return | 			return | ||||||
| 		} | 		} | ||||||
|  | 
 | ||||||
|  | 		log.ZDebug(ctx, "clear msg cron task completed", "cont", time.Since(now)) | ||||||
| 	} | 	} | ||||||
| 		log.ZDebug(ctx, "msg destruct cron task completed", "cont", time.Since(now)) | 	if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, clearMsgFunc); err != nil { | ||||||
| 	} |  | ||||||
| 	if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, msgDestructFunc); err != nil { |  | ||||||
| 		return errs.Wrap(err) | 		return errs.Wrap(err) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -179,7 +179,7 @@ func (c *conversationDatabase) CreateConversation(ctx context.Context, conversat | |||||||
| 		if conversation.RecvMsgOpt == constant.ReceiveNotNotifyMessage { | 		if conversation.RecvMsgOpt == constant.ReceiveNotNotifyMessage { | ||||||
| 			notNotifyUserIDs = append(notNotifyUserIDs, conversation.OwnerUserID) | 			notNotifyUserIDs = append(notNotifyUserIDs, conversation.OwnerUserID) | ||||||
| 		} | 		} | ||||||
| 		if conversation.IsPinned == true { | 		if conversation.IsPinned { | ||||||
| 			pinnedUserIDs = append(pinnedUserIDs, conversation.OwnerUserID) | 			pinnedUserIDs = append(pinnedUserIDs, conversation.OwnerUserID) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  | |||||||
| @ -57,8 +57,8 @@ type CommonMsgDatabase interface { | |||||||
| 	// DeleteConversationMsgsAndSetMinSeq deletes conversation messages and resets the minimum sequence number. If `remainTime` is 0, all messages are deleted (this method does not delete Redis | 	// DeleteConversationMsgsAndSetMinSeq deletes conversation messages and resets the minimum sequence number. If `remainTime` is 0, all messages are deleted (this method does not delete Redis | ||||||
| 	// cache). | 	// cache). | ||||||
| 	DeleteConversationMsgsAndSetMinSeq(ctx context.Context, conversationID string, remainTime int64) error | 	DeleteConversationMsgsAndSetMinSeq(ctx context.Context, conversationID string, remainTime int64) error | ||||||
| 	// UserMsgsDestruct marks messages for deletion based on destruct time and returns a list of sequence numbers for marked messages. | 	// ClearUserMsgs marks messages for deletion based on clear time and returns a list of sequence numbers for marked messages. | ||||||
| 	UserMsgsDestruct(ctx context.Context, userID string, conversationID string, destructTime int64, lastMsgDestructTime time.Time) (seqs []int64, err error) | 	ClearUserMsgs(ctx context.Context, userID string, conversationID string, clearTime int64, lastMsgClearTime time.Time) (seqs []int64, err error) | ||||||
| 	// DeleteUserMsgsBySeqs allows a user to delete messages based on sequence numbers. | 	// DeleteUserMsgsBySeqs allows a user to delete messages based on sequence numbers. | ||||||
| 	DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error | 	DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error | ||||||
| 	// DeleteMsgsPhysicalBySeqs physically deletes messages by emptying them based on sequence numbers. | 	// DeleteMsgsPhysicalBySeqs physically deletes messages by emptying them based on sequence numbers. | ||||||
| @ -92,7 +92,7 @@ type CommonMsgDatabase interface { | |||||||
| 	RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*model.GroupCount, dateCount map[string]int64, err error) | 	RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*model.GroupCount, dateCount map[string]int64, err error) | ||||||
| 	ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) | 	ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) | ||||||
| 
 | 
 | ||||||
| 	// clear msg | 	// get Msg when destruct msg before | ||||||
| 	GetBeforeMsg(ctx context.Context, ts int64, docIds []string, limit int) ([]*model.MsgDocModel, error) | 	GetBeforeMsg(ctx context.Context, ts int64, docIds []string, limit int) ([]*model.MsgDocModel, error) | ||||||
| 	DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error) | 	DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error) | ||||||
| 
 | 
 | ||||||
| @ -528,10 +528,10 @@ func (db *commonMsgDatabase) DeleteConversationMsgsAndSetMinSeq(ctx context.Cont | |||||||
| 	return db.seqConversation.SetMinSeq(ctx, conversationID, minSeq) | 	return db.seqConversation.SetMinSeq(ctx, conversationID, minSeq) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string, conversationID string, destructTime int64, lastMsgDestructTime time.Time) (seqs []int64, err error) { | func (db *commonMsgDatabase) ClearUserMsgs(ctx context.Context, userID string, conversationID string, clearTime int64, lastMsgClearTime time.Time) (seqs []int64, err error) { | ||||||
| 	var index int64 | 	var index int64 | ||||||
| 	for { | 	for { | ||||||
| 		// from oldest 2 newest | 		// from oldest 2 newest, ASC | ||||||
| 		msgDocModel, err := db.msgDocDatabase.GetMsgDocModelByIndex(ctx, conversationID, index, 1) | 		msgDocModel, err := db.msgDocDatabase.GetMsgDocModelByIndex(ctx, conversationID, index, 1) | ||||||
| 		if err != nil || msgDocModel.DocID == "" { | 		if err != nil || msgDocModel.DocID == "" { | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| @ -544,15 +544,19 @@ func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string | |||||||
| 			// If an error is reported, or the error cannot be obtained, it is physically deleted and seq delMongoMsgsPhysical(delStruct.delDocIDList) is returned to end the recursion | 			// If an error is reported, or the error cannot be obtained, it is physically deleted and seq delMongoMsgsPhysical(delStruct.delDocIDList) is returned to end the recursion | ||||||
| 			break | 			break | ||||||
| 		} | 		} | ||||||
|  | 
 | ||||||
| 		index++ | 		index++ | ||||||
| 		// && msgDocModel.Msg[0].Msg.SendTime > lastMsgDestructTime.UnixMilli() | 
 | ||||||
|  | 		// && msgDocModel.Msg[0].Msg.SendTime > lastMsgClearTime.UnixMilli() | ||||||
| 		if len(msgDocModel.Msg) > 0 { | 		if len(msgDocModel.Msg) > 0 { | ||||||
| 			i := 0 | 			i := 0 | ||||||
| 			var over bool | 			var over bool | ||||||
| 			for _, msg := range msgDocModel.Msg { | 			for _, msg := range msgDocModel.Msg { | ||||||
| 				i++ | 				i++ | ||||||
| 				if msg != nil && msg.Msg != nil && msg.Msg.SendTime+destructTime*1000 <= time.Now().UnixMilli() { | 				// over clear time, need to clear | ||||||
| 					if msg.Msg.SendTime+destructTime*1000 > lastMsgDestructTime.UnixMilli() && !datautil.Contain(userID, msg.DelList...) { | 				if msg != nil && msg.Msg != nil && msg.Msg.SendTime+clearTime*1000 <= time.Now().UnixMilli() { | ||||||
|  | 					// if msg is not in del list, add to del list | ||||||
|  | 					if msg.Msg.SendTime+clearTime*1000 > lastMsgClearTime.UnixMilli() && !datautil.Contain(userID, msg.DelList...) { | ||||||
| 						seqs = append(seqs, msg.Msg.Seq) | 						seqs = append(seqs, msg.Msg.Seq) | ||||||
| 					} | 					} | ||||||
| 				} else { | 				} else { | ||||||
| @ -567,13 +571,18 @@ func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string | |||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	log.ZDebug(ctx, "UserMsgsDestruct", "conversationID", conversationID, "userID", userID, "seqs", seqs) | 	log.ZDebug(ctx, "ClearUserMsgs", "conversationID", conversationID, "userID", userID, "seqs", seqs) | ||||||
|  | 
 | ||||||
|  | 	// have msg need to destruct | ||||||
| 	if len(seqs) > 0 { | 	if len(seqs) > 0 { | ||||||
| 		userMinSeq := seqs[len(seqs)-1] + 1 | 		// update min seq to clear after | ||||||
| 		currentUserMinSeq, err := db.seqUser.GetUserMinSeq(ctx, conversationID, userID) | 		userMinSeq := seqs[len(seqs)-1] + 1                                             // user min seq when clear after | ||||||
|  | 		currentUserMinSeq, err := db.seqUser.GetUserMinSeq(ctx, conversationID, userID) // user min seq when clear before | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return nil, err | 			return nil, err | ||||||
| 		} | 		} | ||||||
|  | 
 | ||||||
|  | 		// if before < after, update min seq | ||||||
| 		if currentUserMinSeq < userMinSeq { | 		if currentUserMinSeq < userMinSeq { | ||||||
| 			if err := db.seqUser.SetUserMinSeq(ctx, conversationID, userID, userMinSeq); err != nil { | 			if err := db.seqUser.SetUserMinSeq(ctx, conversationID, userID, userMinSeq); err != nil { | ||||||
| 				return nil, err | 				return nil, err | ||||||
|  | |||||||
| @ -16,6 +16,7 @@ package database | |||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
|  | 
 | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" | 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" | ||||||
| 	"github.com/openimsdk/tools/db/pagination" | 	"github.com/openimsdk/tools/db/pagination" | ||||||
| ) | ) | ||||||
|  | |||||||
| @ -16,9 +16,10 @@ package mgo | |||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
|  | 	"time" | ||||||
|  | 
 | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" | 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" | 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" | ||||||
| 	"time" |  | ||||||
| 
 | 
 | ||||||
| 	"github.com/openimsdk/protocol/constant" | 	"github.com/openimsdk/protocol/constant" | ||||||
| 	"github.com/openimsdk/tools/db/mongoutil" | 	"github.com/openimsdk/tools/db/mongoutil" | ||||||
|  | |||||||
| @ -60,7 +60,7 @@ type GroupMemberMgo struct { | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (g *GroupMemberMgo) memberSort() any { | func (g *GroupMemberMgo) memberSort() any { | ||||||
| 	return bson.D{{"role_level", -1}, {"create_time", 1}} | 	return bson.D{{Key: "role_level", Value: -1}, {Key: "create_time", Value: 1}} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (g *GroupMemberMgo) Create(ctx context.Context, groupMembers []*model.GroupMember) (err error) { | func (g *GroupMemberMgo) Create(ctx context.Context, groupMembers []*model.GroupMember) (err error) { | ||||||
|  | |||||||
| @ -152,8 +152,8 @@ func (c *ConversationRpcClient) GetConversationNotReceiveMessageUserIDs(ctx cont | |||||||
| 	return resp.UserIDs, nil | 	return resp.UserIDs, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *ConversationRpcClient) GetConversationsNeedDestructMsgs(ctx context.Context) ([]*pbconversation.Conversation, error) { | func (c *ConversationRpcClient) GetConversationsNeedClearMsg(ctx context.Context) ([]*pbconversation.Conversation, error) { | ||||||
| 	resp, err := c.Client.GetConversationsNeedDestructMsgs(ctx, &pbconversation.GetConversationsNeedDestructMsgsReq{}) | 	resp, err := c.Client.GetConversationsNeedClearMsg(ctx, &pbconversation.GetConversationsNeedClearMsgReq{}) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
|  | |||||||
| @ -244,8 +244,8 @@ func (m *MessageRpcClient) GetConversationMaxSeq(ctx context.Context, conversati | |||||||
| 	return resp.MaxSeq, nil | 	return resp.MaxSeq, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (m *MessageRpcClient) ClearMsg(ctx context.Context, ts int64) error { | func (m *MessageRpcClient) DestructMsgs(ctx context.Context, ts int64) error { | ||||||
| 	_, err := m.Client.ClearMsg(ctx, &msg.ClearMsgReq{Timestamp: ts}) | 	_, err := m.Client.DestructMsgs(ctx, &msg.DestructMsgsReq{Timestamp: ts}) | ||||||
| 	return err | 	return err | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user