mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-11-04 19:32:17 +08:00 
			
		
		
		
	feat: implement scheduled destruct msgs feature in cron task.
This commit is contained in:
		
							parent
							
								
									ee428e6395
								
							
						
					
					
						commit
						bead363df1
					
				
							
								
								
									
										2
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								go.mod
									
									
									
									
									
								
							@ -12,7 +12,7 @@ require (
 | 
			
		||||
	github.com/gorilla/websocket v1.5.1
 | 
			
		||||
	github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
 | 
			
		||||
	github.com/mitchellh/mapstructure v1.5.0
 | 
			
		||||
	github.com/openimsdk/protocol v0.0.69-alpha.47
 | 
			
		||||
	github.com/openimsdk/protocol v0.0.69-alpha.50
 | 
			
		||||
	github.com/openimsdk/tools v0.0.49-alpha.55
 | 
			
		||||
	github.com/pkg/errors v0.9.1 // indirect
 | 
			
		||||
	github.com/prometheus/client_golang v1.18.0
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										4
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										4
									
								
								go.sum
									
									
									
									
									
								
							@ -319,8 +319,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
 | 
			
		||||
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
 | 
			
		||||
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
 | 
			
		||||
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
 | 
			
		||||
github.com/openimsdk/protocol v0.0.69-alpha.47 h1:WEpU7dHSzcpiyPoUkgSt1mC9HfQ6xSDNNZf4KWbZiFI=
 | 
			
		||||
github.com/openimsdk/protocol v0.0.69-alpha.47/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
 | 
			
		||||
github.com/openimsdk/protocol v0.0.69-alpha.50 h1:4r6vY9LsjFrR8AAwORFhijOGmq2vzDH3XTX4wBiw+2M=
 | 
			
		||||
github.com/openimsdk/protocol v0.0.69-alpha.50/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
 | 
			
		||||
github.com/openimsdk/tools v0.0.49-alpha.55 h1:KPgC53oqiwZYssLKljhtXbWXifMlTj2SSQEusj4Uf4k=
 | 
			
		||||
github.com/openimsdk/tools v0.0.49-alpha.55/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
 | 
			
		||||
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
 | 
			
		||||
 | 
			
		||||
@ -16,13 +16,16 @@ package conversation
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"sort"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/config"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
 | 
			
		||||
	tablerelation "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
 | 
			
		||||
	dbModel "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/localcache"
 | 
			
		||||
	"github.com/openimsdk/tools/db/redisutil"
 | 
			
		||||
	"sort"
 | 
			
		||||
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
 | 
			
		||||
@ -44,6 +47,7 @@ type conversationServer struct {
 | 
			
		||||
	user                 *rpcclient.UserRpcClient
 | 
			
		||||
	groupRpcClient       *rpcclient.GroupRpcClient
 | 
			
		||||
	conversationDatabase controller.ConversationDatabase
 | 
			
		||||
 | 
			
		||||
	conversationNotificationSender *ConversationNotificationSender
 | 
			
		||||
	config                         *Config
 | 
			
		||||
}
 | 
			
		||||
@ -204,11 +208,11 @@ func (c *conversationServer) getConversations(ctx context.Context, ownerUserID s
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *conversationServer) SetConversation(ctx context.Context, req *pbconversation.SetConversationReq) (*pbconversation.SetConversationResp, error) {
 | 
			
		||||
	var conversation tablerelation.Conversation
 | 
			
		||||
	var conversation dbModel.Conversation
 | 
			
		||||
	if err := datautil.CopyStructFields(&conversation, req.Conversation); err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	err := c.conversationDatabase.SetUserConversations(ctx, req.Conversation.OwnerUserID, []*tablerelation.Conversation{&conversation})
 | 
			
		||||
	err := c.conversationDatabase.SetUserConversations(ctx, req.Conversation.OwnerUserID, []*dbModel.Conversation{&conversation})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
@ -232,7 +236,7 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	var unequal int
 | 
			
		||||
	var conv tablerelation.Conversation
 | 
			
		||||
	var conv dbModel.Conversation
 | 
			
		||||
	if len(req.UserIDs) == 1 {
 | 
			
		||||
		cs, err := c.conversationDatabase.FindConversations(ctx, req.UserIDs[0], []string{req.Conversation.ConversationID})
 | 
			
		||||
		if err != nil {
 | 
			
		||||
@ -243,7 +247,7 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver
 | 
			
		||||
		}
 | 
			
		||||
		conv = *cs[0]
 | 
			
		||||
	}
 | 
			
		||||
	var conversation tablerelation.Conversation
 | 
			
		||||
	var conversation dbModel.Conversation
 | 
			
		||||
	conversation.ConversationID = req.Conversation.ConversationID
 | 
			
		||||
	conversation.ConversationType = req.Conversation.ConversationType
 | 
			
		||||
	conversation.UserID = req.Conversation.UserID
 | 
			
		||||
@ -292,7 +296,7 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	if req.Conversation.IsPrivateChat != nil && req.Conversation.ConversationType != constant.ReadGroupChatType {
 | 
			
		||||
		var conversations []*tablerelation.Conversation
 | 
			
		||||
		var conversations []*dbModel.Conversation
 | 
			
		||||
		for _, ownerUserID := range req.UserIDs {
 | 
			
		||||
			conversation2 := conversation
 | 
			
		||||
			conversation2.OwnerUserID = ownerUserID
 | 
			
		||||
@ -340,12 +344,12 @@ func (c *conversationServer) CreateSingleChatConversations(ctx context.Context,
 | 
			
		||||
) (*pbconversation.CreateSingleChatConversationsResp, error) {
 | 
			
		||||
	switch req.ConversationType {
 | 
			
		||||
	case constant.SingleChatType:
 | 
			
		||||
		var conversation tablerelation.Conversation
 | 
			
		||||
		var conversation dbModel.Conversation
 | 
			
		||||
		conversation.ConversationID = req.ConversationID
 | 
			
		||||
		conversation.ConversationType = req.ConversationType
 | 
			
		||||
		conversation.OwnerUserID = req.SendID
 | 
			
		||||
		conversation.UserID = req.RecvID
 | 
			
		||||
		err := c.conversationDatabase.CreateConversation(ctx, []*tablerelation.Conversation{&conversation})
 | 
			
		||||
		err := c.conversationDatabase.CreateConversation(ctx, []*dbModel.Conversation{&conversation})
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			log.ZWarn(ctx, "create conversation failed", err, "conversation", conversation)
 | 
			
		||||
		}
 | 
			
		||||
@ -353,17 +357,17 @@ func (c *conversationServer) CreateSingleChatConversations(ctx context.Context,
 | 
			
		||||
		conversation2 := conversation
 | 
			
		||||
		conversation2.OwnerUserID = req.RecvID
 | 
			
		||||
		conversation2.UserID = req.SendID
 | 
			
		||||
		err = c.conversationDatabase.CreateConversation(ctx, []*tablerelation.Conversation{&conversation2})
 | 
			
		||||
		err = c.conversationDatabase.CreateConversation(ctx, []*dbModel.Conversation{&conversation2})
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			log.ZWarn(ctx, "create conversation failed", err, "conversation2", conversation)
 | 
			
		||||
		}
 | 
			
		||||
	case constant.NotificationChatType:
 | 
			
		||||
		var conversation tablerelation.Conversation
 | 
			
		||||
		var conversation dbModel.Conversation
 | 
			
		||||
		conversation.ConversationID = req.ConversationID
 | 
			
		||||
		conversation.ConversationType = req.ConversationType
 | 
			
		||||
		conversation.OwnerUserID = req.RecvID
 | 
			
		||||
		conversation.UserID = req.SendID
 | 
			
		||||
		err := c.conversationDatabase.CreateConversation(ctx, []*tablerelation.Conversation{&conversation})
 | 
			
		||||
		err := c.conversationDatabase.CreateConversation(ctx, []*dbModel.Conversation{&conversation})
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			log.ZWarn(ctx, "create conversation failed", err, "conversation2", conversation)
 | 
			
		||||
		}
 | 
			
		||||
@ -584,6 +588,9 @@ func (c *conversationServer) UpdateConversation(ctx context.Context, req *pbconv
 | 
			
		||||
	if req.MaxSeq != nil {
 | 
			
		||||
		m["max_seq"] = req.MaxSeq.Value
 | 
			
		||||
	}
 | 
			
		||||
	if req.LatestMsgDestructTime != nil {
 | 
			
		||||
		m["latest_msg_destruct_time"] = time.UnixMilli(req.LatestMsgDestructTime.Value)
 | 
			
		||||
	}
 | 
			
		||||
	if len(m) > 0 {
 | 
			
		||||
		if err := c.conversationDatabase.UpdateUsersConversationField(ctx, req.UserIDs, req.ConversationID, m); err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
@ -602,3 +609,54 @@ func (c *conversationServer) GetOwnerConversation(ctx context.Context, req *pbco
 | 
			
		||||
		Conversations: convert.ConversationsDB2Pb(conversations),
 | 
			
		||||
	}, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *conversationServer) GetConversationsNeedDestructMsgs(ctx context.Context, _ *pbconversation.GetConversationsNeedDestructMsgsReq) (*pbconversation.GetConversationsNeedDestructMsgsResp, error) {
 | 
			
		||||
	log.ZInfo(ctx, "ConversationDestructMsgs cron start")
 | 
			
		||||
	num, err := c.conversationDatabase.GetAllConversationIDsNumber(ctx)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.ZError(ctx, "GetAllConversationIDsNumber failed", err)
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	const batchNum = 100
 | 
			
		||||
 | 
			
		||||
	if num == 0 {
 | 
			
		||||
		return nil, errs.New("Need Destruct Msg is nil").Wrap()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	maxPage := (num + batchNum - 1) / batchNum
 | 
			
		||||
 | 
			
		||||
	temp := make([]*model.Conversation, 0, maxPage*batchNum)
 | 
			
		||||
 | 
			
		||||
	for pageNumber := 0; pageNumber < int(maxPage); pageNumber++ {
 | 
			
		||||
		pagination := &sdkws.RequestPagination{
 | 
			
		||||
			PageNumber: int32(pageNumber),
 | 
			
		||||
			ShowNumber: batchNum,
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		conversationIDs, err := c.conversationDatabase.PageConversationIDs(ctx, pagination)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber)
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		log.ZDebug(ctx, "PageConversationIDs success", "pageNumber", pageNumber, "conversationIDsNum", len(conversationIDs), "conversationIDs", conversationIDs)
 | 
			
		||||
		if len(conversationIDs) == 0 {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		conversations, err := c.conversationDatabase.GetConversationsByConversationID(ctx, conversationIDs)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			log.ZError(ctx, "GetConversationsByConversationID failed", err, "conversationIDs", conversationIDs)
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		for _, conversation := range conversations {
 | 
			
		||||
			if conversation.IsMsgDestruct && conversation.MsgDestructTime != 0 && ((time.Now().UnixMilli() > (conversation.MsgDestructTime + conversation.LatestMsgDestructTime.UnixMilli() + 8*60*60)) || // 8*60*60 is UTC+8
 | 
			
		||||
				conversation.LatestMsgDestructTime.IsZero()) {
 | 
			
		||||
				temp = append(temp, conversation)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return &pbconversation.GetConversationsNeedDestructMsgsResp{Conversations: convert.ConversationsDB2Pb(temp)}, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -2,16 +2,25 @@ package msg
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/authverify"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
 | 
			
		||||
	"github.com/openimsdk/protocol/conversation"
 | 
			
		||||
	pbconversation "github.com/openimsdk/protocol/conversation"
 | 
			
		||||
	"github.com/openimsdk/protocol/msg"
 | 
			
		||||
	"github.com/openimsdk/protocol/wrapperspb"
 | 
			
		||||
	"github.com/openimsdk/tools/errs"
 | 
			
		||||
	"github.com/openimsdk/tools/log"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"time"
 | 
			
		||||
	"github.com/openimsdk/tools/mcontext"
 | 
			
		||||
	"github.com/openimsdk/tools/utils/idutil"
 | 
			
		||||
	"github.com/openimsdk/tools/utils/stringutil"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// hard delete in Database.
 | 
			
		||||
func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.ClearMsgResp, err error) {
 | 
			
		||||
	if err := authverify.CheckAdmin(ctx, m.config.Share.IMAdminUserID); err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
@ -26,17 +35,20 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.
 | 
			
		||||
	)
 | 
			
		||||
	clearMsg := func(ctx context.Context) (bool, error) {
 | 
			
		||||
		conversationSeqs := make(map[string]struct{})
 | 
			
		||||
 | 
			
		||||
		// update latest msg destruct time in conversation DB.
 | 
			
		||||
		defer func() {
 | 
			
		||||
			req := &conversation.UpdateConversationReq{
 | 
			
		||||
				MsgDestructTime: wrapperspb.Int64(time.Now().UnixMilli()),
 | 
			
		||||
				LatestMsgDestructTime: wrapperspb.Int64(time.Now().UnixMilli()),
 | 
			
		||||
			}
 | 
			
		||||
			for conversationID := range conversationSeqs {
 | 
			
		||||
				req.ConversationID = conversationID
 | 
			
		||||
				if err := m.Conversation.UpdateConversations(ctx, req); err != nil {
 | 
			
		||||
				if err := m.Conversation.UpdateConversation(ctx, req); err != nil {
 | 
			
		||||
					log.ZError(ctx, "update conversation max seq failed", err, "conversationID", conversationID, "msgDestructTime", req.MsgDestructTime)
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}()
 | 
			
		||||
 | 
			
		||||
		msgs, err := m.MsgDatabase.GetBeforeMsg(ctx, req.Timestamp, 100)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return false, err
 | 
			
		||||
@ -61,6 +73,7 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.
 | 
			
		||||
		}
 | 
			
		||||
		return true, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for {
 | 
			
		||||
		keep, err := clearMsg(ctx)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
@ -75,3 +88,54 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.
 | 
			
		||||
	}
 | 
			
		||||
	return &msg.ClearMsgResp{}, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// soft delete for self
 | 
			
		||||
func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) (_ *msg.DestructMsgsResp, err error) {
 | 
			
		||||
	temp := convert.ConversationsPb2DB(req.Conversations)
 | 
			
		||||
 | 
			
		||||
	batchNum := 100
 | 
			
		||||
	var wg sync.WaitGroup
 | 
			
		||||
	wg.Add((len(temp) + batchNum - 1) / batchNum)
 | 
			
		||||
 | 
			
		||||
	for i := 0; i < len(temp); i += batchNum {
 | 
			
		||||
		batch := temp[i:min(i+batchNum, len(temp))]
 | 
			
		||||
 | 
			
		||||
		go func(batch []*model.Conversation) {
 | 
			
		||||
			defer wg.Done()
 | 
			
		||||
 | 
			
		||||
			for _, conversation := range temp {
 | 
			
		||||
				handleCtx := mcontext.NewCtx(stringutil.GetSelfFuncName() + "-" + idutil.OperationIDGenerator() + "-" + conversation.ConversationID + "-" + conversation.OwnerUserID)
 | 
			
		||||
				log.ZDebug(handleCtx, "User MsgsDestruct",
 | 
			
		||||
					"conversationID", conversation.ConversationID,
 | 
			
		||||
					"ownerUserID", conversation.OwnerUserID,
 | 
			
		||||
					"msgDestructTime", conversation.MsgDestructTime,
 | 
			
		||||
					"lastMsgDestructTime", conversation.LatestMsgDestructTime)
 | 
			
		||||
 | 
			
		||||
				seqs, err := m.MsgDatabase.UserMsgsDestruct(handleCtx, conversation.OwnerUserID, conversation.ConversationID, conversation.MsgDestructTime, conversation.LatestMsgDestructTime)
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					log.ZError(handleCtx, "user msg destruct failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
 | 
			
		||||
					continue
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				if len(seqs) > 0 {
 | 
			
		||||
					if err := m.Conversation.UpdateConversation(handleCtx,
 | 
			
		||||
						&pbconversation.UpdateConversationReq{
 | 
			
		||||
							UserIDs:               []string{conversation.OwnerUserID},
 | 
			
		||||
							ConversationID:        conversation.ConversationID,
 | 
			
		||||
							LatestMsgDestructTime: wrapperspb.Int64(time.Now().UnixMilli())}); err != nil {
 | 
			
		||||
						log.ZError(handleCtx, "updateUsersConversationField failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
 | 
			
		||||
						continue
 | 
			
		||||
					}
 | 
			
		||||
 | 
			
		||||
					// m.msgNotificationSender.UserDeleteMsgsNotification(handleCtx, conversation.OwnerUserID, conversation.ConversationID, seqs)
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}(batch)
 | 
			
		||||
 | 
			
		||||
	}
 | 
			
		||||
	wg.Wait()
 | 
			
		||||
 | 
			
		||||
	return nil, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -16,6 +16,7 @@ package msg
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/config"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
 | 
			
		||||
@ -50,6 +51,7 @@ type (
 | 
			
		||||
		ConversationLocalCache *rpccache.ConversationLocalCache // Local cache for conversation data.
 | 
			
		||||
		Handlers               MessageInterceptorChain          // Chain of handlers for processing messages.
 | 
			
		||||
		notificationSender     *rpcclient.NotificationSender    // RPC client for sending notifications.
 | 
			
		||||
		msgNotificationSender  *MsgNotificationSender           // RPC client for sending msg notifications.
 | 
			
		||||
		config                 *Config                          // Global configuration settings.
 | 
			
		||||
		webhookClient          *webhook.Client
 | 
			
		||||
	}
 | 
			
		||||
@ -117,7 +119,10 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	s.notificationSender = rpcclient.NewNotificationSender(&config.NotificationConfig, rpcclient.WithLocalSendMsg(s.SendMsg))
 | 
			
		||||
	s.msgNotificationSender = NewMsgNotificationSender(config, rpcclient.WithLocalSendMsg(s.SendMsg))
 | 
			
		||||
 | 
			
		||||
	msg.RegisterMsgServer(server, s)
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -17,16 +17,19 @@ package tools
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"os"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/config"
 | 
			
		||||
	kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
 | 
			
		||||
	pbconversation "github.com/openimsdk/protocol/conversation"
 | 
			
		||||
	"github.com/openimsdk/protocol/msg"
 | 
			
		||||
 | 
			
		||||
	"github.com/openimsdk/protocol/third"
 | 
			
		||||
	"github.com/openimsdk/tools/mcontext"
 | 
			
		||||
	"github.com/openimsdk/tools/mw"
 | 
			
		||||
	"google.golang.org/grpc"
 | 
			
		||||
	"google.golang.org/grpc/credentials/insecure"
 | 
			
		||||
	"os"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/openimsdk/tools/errs"
 | 
			
		||||
	"github.com/openimsdk/tools/log"
 | 
			
		||||
@ -50,18 +53,34 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
 | 
			
		||||
	}
 | 
			
		||||
	client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()))
 | 
			
		||||
	ctx = mcontext.SetOpUserID(ctx, config.Share.IMAdminUserID[0])
 | 
			
		||||
	conn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Msg)
 | 
			
		||||
 | 
			
		||||
	msgConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Msg)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	cli := msg.NewMsgClient(conn)
 | 
			
		||||
 | 
			
		||||
	thirdConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Third)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	conversationConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Conversation)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	msgClient := msg.NewMsgClient(msgConn)
 | 
			
		||||
	conversationClient := pbconversation.NewConversationClient(conversationConn)
 | 
			
		||||
	thirdClient := third.NewThirdClient(thirdConn)
 | 
			
		||||
 | 
			
		||||
	crontab := cron.New()
 | 
			
		||||
 | 
			
		||||
	clearFunc := func() {
 | 
			
		||||
		now := time.Now()
 | 
			
		||||
		deltime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.RetainChatRecords))
 | 
			
		||||
		ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deltime.UnixMilli()))
 | 
			
		||||
		log.ZInfo(ctx, "clear chat records", "deltime", deltime, "timestamp", deltime.UnixMilli())
 | 
			
		||||
		if _, err := cli.ClearMsg(ctx, &msg.ClearMsgReq{Timestamp: deltime.UnixMilli()}); err != nil {
 | 
			
		||||
		if _, err := msgClient.ClearMsg(ctx, &msg.ClearMsgReq{Timestamp: deltime.UnixMilli()}); err != nil {
 | 
			
		||||
			log.ZError(ctx, "cron clear chat records failed", err, "deltime", deltime, "cont", time.Since(now))
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
@ -71,11 +90,27 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
 | 
			
		||||
		return errs.Wrap(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	tConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Third)
 | 
			
		||||
	msgDestructFunc := func() {
 | 
			
		||||
		now := time.Now()
 | 
			
		||||
		ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), now.UnixMilli()))
 | 
			
		||||
		log.ZInfo(ctx, "msg destruct ", "now", now)
 | 
			
		||||
 | 
			
		||||
		conversations, err := conversationClient.GetConversationsNeedDestructMsgs(ctx, &pbconversation.GetConversationsNeedDestructMsgsReq{})
 | 
			
		||||
		if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
			log.ZError(ctx, "Get conversation need Destruct msgs failed.", err)
 | 
			
		||||
			return
 | 
			
		||||
		} else {
 | 
			
		||||
			_, err := msgClient.DestructMsgs(ctx, &msg.DestructMsgsReq{Conversations: conversations.Conversations})
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				log.ZError(ctx, "Destruct Msgs failed.", err)
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		log.ZInfo(ctx, "msg destruct cron task completed", "cont", time.Since(now))
 | 
			
		||||
	}
 | 
			
		||||
	if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, msgDestructFunc); err != nil {
 | 
			
		||||
		return errs.Wrap(err)
 | 
			
		||||
	}
 | 
			
		||||
	thirdClient := third.NewThirdClient(tConn)
 | 
			
		||||
 | 
			
		||||
	deleteFunc := func() {
 | 
			
		||||
		now := time.Now()
 | 
			
		||||
@ -91,6 +126,7 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
 | 
			
		||||
	if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, deleteFunc); err != nil {
 | 
			
		||||
		return errs.Wrap(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	log.ZInfo(ctx, "start cron task", "CronExecuteTime", config.CronTask.CronExecuteTime)
 | 
			
		||||
	crontab.Start()
 | 
			
		||||
	<-ctx.Done()
 | 
			
		||||
 | 
			
		||||
@ -22,7 +22,7 @@ import (
 | 
			
		||||
 | 
			
		||||
func ConversationDB2Pb(conversationDB *model.Conversation) *conversation.Conversation {
 | 
			
		||||
	conversationPB := &conversation.Conversation{}
 | 
			
		||||
	conversationPB.LatestMsgDestructTime = conversationDB.LatestMsgDestructTime.Unix()
 | 
			
		||||
	conversationPB.LatestMsgDestructTime = conversationDB.LatestMsgDestructTime.UnixMilli()
 | 
			
		||||
	if err := datautil.CopyStructFields(conversationPB, conversationDB); err != nil {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
@ -35,7 +35,7 @@ func ConversationsDB2Pb(conversationsDB []*model.Conversation) (conversationsPB
 | 
			
		||||
		if err := datautil.CopyStructFields(conversationPB, conversationDB); err != nil {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		conversationPB.LatestMsgDestructTime = conversationDB.LatestMsgDestructTime.Unix()
 | 
			
		||||
		conversationPB.LatestMsgDestructTime = conversationDB.LatestMsgDestructTime.UnixMilli()
 | 
			
		||||
		conversationsPB = append(conversationsPB, conversationPB)
 | 
			
		||||
	}
 | 
			
		||||
	return conversationsPB
 | 
			
		||||
 | 
			
		||||
@ -82,7 +82,7 @@ func (c *ConversationRpcClient) SetConversations(ctx context.Context, userIDs []
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ConversationRpcClient) UpdateConversations(ctx context.Context, conversation *pbconversation.UpdateConversationReq) error {
 | 
			
		||||
func (c *ConversationRpcClient) UpdateConversation(ctx context.Context, conversation *pbconversation.UpdateConversationReq) error {
 | 
			
		||||
	_, err := c.Client.UpdateConversation(ctx, conversation)
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
@ -146,3 +146,11 @@ func (c *ConversationRpcClient) GetConversationNotReceiveMessageUserIDs(ctx cont
 | 
			
		||||
	}
 | 
			
		||||
	return resp.UserIDs, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ConversationRpcClient) GetConversationsNeedDestructMsgs(ctx context.Context) ([]*pbconversation.Conversation, error) {
 | 
			
		||||
	resp, err := c.Client.GetConversationsNeedDestructMsgs(ctx, &pbconversation.GetConversationsNeedDestructMsgsReq{})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	return resp.Conversations, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user