mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-10-26 21:22:16 +08:00 
			
		
		
		
	redis msg cache
This commit is contained in:
		
							parent
							
								
									8cb1772e52
								
							
						
					
					
						commit
						4d1655e32c
					
				| @ -106,10 +106,8 @@ func (m *msgServer) DeleteMsgPhysical(ctx context.Context, req *msg.DeleteMsgPhy | |||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
| 	remainTime := timeutil.GetCurrentTimestampBySecond() - req.Timestamp | 	remainTime := timeutil.GetCurrentTimestampBySecond() - req.Timestamp | ||||||
| 	for _, conversationID := range req.ConversationIDs { | 	if _, err := m.DestructMsgs(ctx, &msg.DestructMsgsReq{Timestamp: remainTime, Limit: 9999}); err != nil { | ||||||
| 		if err := m.MsgDatabase.DeleteConversationMsgsAndSetMinSeq(ctx, conversationID, remainTime); err != nil { | 		return nil, err | ||||||
| 			log.ZWarn(ctx, "DeleteConversationMsgsAndSetMinSeq error", err, "conversationID", conversationID, "err", err) |  | ||||||
| 		} |  | ||||||
| 	} | 	} | ||||||
| 	return &msg.DeleteMsgPhysicalResp{}, nil | 	return &msg.DeleteMsgPhysicalResp{}, nil | ||||||
| } | } | ||||||
|  | |||||||
							
								
								
									
										35
									
								
								pkg/common/storage/cache/cachekey/msg.go
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										35
									
								
								pkg/common/storage/cache/cachekey/msg.go
									
									
									
									
										vendored
									
									
								
							| @ -15,50 +15,21 @@ | |||||||
| package cachekey | package cachekey | ||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"github.com/openimsdk/protocol/constant" |  | ||||||
| 	"strconv" | 	"strconv" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| const ( | const ( | ||||||
| 	messageCache      = "MESSAGE_CACHE:" | 	messageCache      = "MESSAGE_CACHE:" | ||||||
| 	messageDelUserList   = "MESSAGE_DEL_USER_LIST:" |  | ||||||
| 	userDelMessagesList  = "USER_DEL_MESSAGES_LIST:" |  | ||||||
| 	sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:" | 	sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:" | ||||||
| 	exTypeKeyLocker      = "EX_LOCK:" | 	messageCacheV2    = "MESSAGE_CACHE_V2:" | ||||||
| 	reactionExSingle     = "EX_SINGLE_" |  | ||||||
| 	reactionWriteGroup   = "EX_GROUP_" |  | ||||||
| 	reactionReadGroup    = "EX_SUPER_GROUP_" |  | ||||||
| 	reactionNotification = "EX_NOTIFICATION_" |  | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| func GetMessageCacheKey(conversationID string, seq int64) string { | func GetMessageCacheKey(conversationID string, seq int64) string { | ||||||
| 	return messageCache + conversationID + "_" + strconv.Itoa(int(seq)) | 	return messageCache + conversationID + "_" + strconv.Itoa(int(seq)) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func GetMessageDelUserListKey(conversationID string, seq int64) string { | func GetMessageCacheKeyV2(conversationID string, seq int64) string { | ||||||
| 	return messageDelUserList + conversationID + ":" + strconv.Itoa(int(seq)) | 	return messageCacheV2 + conversationID + "_" + strconv.Itoa(int(seq)) | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func GetUserDelListKey(conversationID, userID string) string { |  | ||||||
| 	return userDelMessagesList + conversationID + ":" + userID |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func GetMessageReactionExKey(clientMsgID string, sessionType int32) string { |  | ||||||
| 	switch sessionType { |  | ||||||
| 	case constant.SingleChatType: |  | ||||||
| 		return reactionExSingle + clientMsgID |  | ||||||
| 	case constant.WriteGroupChatType: |  | ||||||
| 		return reactionWriteGroup + clientMsgID |  | ||||||
| 	case constant.ReadGroupChatType: |  | ||||||
| 		return reactionReadGroup + clientMsgID |  | ||||||
| 	case constant.NotificationChatType: |  | ||||||
| 		return reactionNotification + clientMsgID |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	return "" |  | ||||||
| } |  | ||||||
| func GetLockMessageTypeKey(clientMsgID string, TypeKey string) string { |  | ||||||
| 	return exTypeKeyLocker + clientMsgID + "_" + TypeKey |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func GetSendMsgKey(id string) string { | func GetSendMsgKey(id string) string { | ||||||
|  | |||||||
							
								
								
									
										14
									
								
								pkg/common/storage/cache/msg.go
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										14
									
								
								pkg/common/storage/cache/msg.go
									
									
									
									
										vendored
									
									
								
							| @ -16,8 +16,7 @@ package cache | |||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
| 	"time" | 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" | ||||||
| 
 |  | ||||||
| 	"github.com/openimsdk/protocol/sdkws" | 	"github.com/openimsdk/protocol/sdkws" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| @ -27,12 +26,7 @@ type MsgCache interface { | |||||||
| 	DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error | 	DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error | ||||||
| 	SetSendMsgStatus(ctx context.Context, id string, status int32) error | 	SetSendMsgStatus(ctx context.Context, id string, status int32) error | ||||||
| 	GetSendMsgStatus(ctx context.Context, id string) (int32, error) | 	GetSendMsgStatus(ctx context.Context, id string) (int32, error) | ||||||
| 	JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) | 
 | ||||||
| 	GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) | 	GetMessageBySeqs(ctx context.Context, conversationID string, seqs []int64) ([]*model.MsgInfoModel, error) | ||||||
| 	DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error | 	DelMessageBySeqs(ctx context.Context, conversationID string, seqs []int64) error | ||||||
| 	SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) |  | ||||||
| 	GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) |  | ||||||
| 	SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error |  | ||||||
| 	LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error |  | ||||||
| 	UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error |  | ||||||
| } | } | ||||||
|  | |||||||
							
								
								
									
										103
									
								
								pkg/common/storage/cache/redis/msg.go
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										103
									
								
								pkg/common/storage/cache/redis/msg.go
									
									
									
									
										vendored
									
									
								
							| @ -2,8 +2,11 @@ package redis | |||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
|  | 	"github.com/dtm-labs/rockscache" | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" | 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" | 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" | ||||||
|  | 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" | ||||||
|  | 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" | 	"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" | ||||||
| 	"github.com/openimsdk/protocol/sdkws" | 	"github.com/openimsdk/protocol/sdkws" | ||||||
| 	"github.com/openimsdk/tools/errs" | 	"github.com/openimsdk/tools/errs" | ||||||
| @ -13,7 +16,7 @@ import ( | |||||||
| ) // | ) // | ||||||
| 
 | 
 | ||||||
| // msgCacheTimeout is  expiration time of message cache, 86400 seconds | // msgCacheTimeout is  expiration time of message cache, 86400 seconds | ||||||
| const msgCacheTimeout = 86400 | const msgCacheTimeout = time.Hour * 24 | ||||||
| 
 | 
 | ||||||
| func NewMsgCache(client redis.UniversalClient) cache.MsgCache { | func NewMsgCache(client redis.UniversalClient) cache.MsgCache { | ||||||
| 	return &msgCache{rdb: client} | 	return &msgCache{rdb: client} | ||||||
| @ -21,31 +24,19 @@ func NewMsgCache(client redis.UniversalClient) cache.MsgCache { | |||||||
| 
 | 
 | ||||||
| type msgCache struct { | type msgCache struct { | ||||||
| 	rdb            redis.UniversalClient | 	rdb            redis.UniversalClient | ||||||
|  | 	rcClient       *rockscache.Client | ||||||
|  | 	msgDocDatabase database.Msg | ||||||
|  | 	msgTable       model.MsgDocModel | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *msgCache) getMessageCacheKey(conversationID string, seq int64) string { | func (c *msgCache) getMessageCacheKey(conversationID string, seq int64) string { | ||||||
| 	return cachekey.GetMessageCacheKey(conversationID, seq) | 	return cachekey.GetMessageCacheKey(conversationID, seq) | ||||||
| } | } | ||||||
| func (c *msgCache) getMessageDelUserListKey(conversationID string, seq int64) string { |  | ||||||
| 	return cachekey.GetMessageDelUserListKey(conversationID, seq) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (c *msgCache) getUserDelList(conversationID, userID string) string { |  | ||||||
| 	return cachekey.GetUserDelListKey(conversationID, userID) |  | ||||||
| } |  | ||||||
| 
 | 
 | ||||||
| func (c *msgCache) getSendMsgKey(id string) string { | func (c *msgCache) getSendMsgKey(id string) string { | ||||||
| 	return cachekey.GetSendMsgKey(id) | 	return cachekey.GetSendMsgKey(id) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *msgCache) getLockMessageTypeKey(clientMsgID string, TypeKey string) string { |  | ||||||
| 	return cachekey.GetLockMessageTypeKey(clientMsgID, TypeKey) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (c *msgCache) getMessageReactionExPrefix(clientMsgID string, sessionType int32) string { |  | ||||||
| 	return cachekey.GetMessageReactionExKey(clientMsgID, sessionType) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (c *msgCache) SetMessagesToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) { | func (c *msgCache) SetMessagesToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) { | ||||||
| 	msgMap := datautil.SliceToMap(msgs, func(msg *sdkws.MsgData) string { | 	msgMap := datautil.SliceToMap(msgs, func(msg *sdkws.MsgData) string { | ||||||
| 		return c.getMessageCacheKey(conversationID, msg.Seq) | 		return c.getMessageCacheKey(conversationID, msg.Seq) | ||||||
| @ -64,7 +55,7 @@ func (c *msgCache) SetMessagesToCache(ctx context.Context, conversationID string | |||||||
| 				values = append(values, s) | 				values = append(values, s) | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 		return LuaSetBatchWithCommonExpire(ctx, c.rdb, keys, values, msgCacheTimeout) | 		return LuaSetBatchWithCommonExpire(ctx, c.rdb, keys, values, int(msgCacheTimeout/time.Second)) | ||||||
| 	}) | 	}) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return 0, err | 		return 0, err | ||||||
| @ -77,7 +68,6 @@ func (c *msgCache) DeleteMessagesFromCache(ctx context.Context, conversationID s | |||||||
| 	for _, seq := range seqs { | 	for _, seq := range seqs { | ||||||
| 		keys = append(keys, c.getMessageCacheKey(conversationID, seq)) | 		keys = append(keys, c.getMessageCacheKey(conversationID, seq)) | ||||||
| 	} | 	} | ||||||
| 
 |  | ||||||
| 	return ProcessKeysBySlot(ctx, c.rdb, keys, func(ctx context.Context, slot int64, keys []string) error { | 	return ProcessKeysBySlot(ctx, c.rdb, keys, func(ctx context.Context, slot int64, keys []string) error { | ||||||
| 		return LuaDeleteBatch(ctx, c.rdb, keys) | 		return LuaDeleteBatch(ctx, c.rdb, keys) | ||||||
| 	}) | 	}) | ||||||
| @ -92,48 +82,6 @@ func (c *msgCache) GetSendMsgStatus(ctx context.Context, id string) (int32, erro | |||||||
| 	return int32(result), errs.Wrap(err) | 	return int32(result), errs.Wrap(err) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *msgCache) LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error { |  | ||||||
| 	key := c.getLockMessageTypeKey(clientMsgID, TypeKey) |  | ||||||
| 	return errs.Wrap(c.rdb.SetNX(ctx, key, 1, time.Minute).Err()) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (c *msgCache) UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error { |  | ||||||
| 	key := c.getLockMessageTypeKey(clientMsgID, TypeKey) |  | ||||||
| 	return errs.Wrap(c.rdb.Del(ctx, key).Err()) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (c *msgCache) JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) { |  | ||||||
| 	n, err := c.rdb.Exists(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType)).Result() |  | ||||||
| 	if err != nil { |  | ||||||
| 		return false, errs.Wrap(err) |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	return n > 0, nil |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (c *msgCache) SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error { |  | ||||||
| 	return errs.Wrap(c.rdb.HSet(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey, value).Err()) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (c *msgCache) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) { |  | ||||||
| 	val, err := c.rdb.Expire(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), expiration).Result() |  | ||||||
| 	return val, errs.Wrap(err) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (c *msgCache) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) { |  | ||||||
| 	val, err := c.rdb.HGet(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey).Result() |  | ||||||
| 	return val, errs.Wrap(err) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (c *msgCache) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) { |  | ||||||
| 	val, err := c.rdb.HGetAll(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType)).Result() |  | ||||||
| 	return val, errs.Wrap(err) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (c *msgCache) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error { |  | ||||||
| 	return errs.Wrap(c.rdb.HDel(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), subKey).Err()) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (c *msgCache) GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) { | func (c *msgCache) GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) { | ||||||
| 	var keys []string | 	var keys []string | ||||||
| 	keySeqMap := make(map[string]int64, 10) | 	keySeqMap := make(map[string]int64, 10) | ||||||
| @ -170,3 +118,38 @@ func (c *msgCache) GetMessagesBySeq(ctx context.Context, conversationID string, | |||||||
| 	} | 	} | ||||||
| 	return seqMsgs, failedSeqs, nil | 	return seqMsgs, failedSeqs, nil | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | func (c *msgCache) GetMessageBySeqs(ctx context.Context, conversationID string, seqs []int64) ([]*model.MsgInfoModel, error) { | ||||||
|  | 	if len(seqs) == 0 { | ||||||
|  | 		return nil, nil | ||||||
|  | 	} | ||||||
|  | 	getKey := func(seq int64) string { | ||||||
|  | 		return cachekey.GetMessageCacheKeyV2(conversationID, seq) | ||||||
|  | 	} | ||||||
|  | 	getMsgID := func(msg *model.MsgInfoModel) int64 { | ||||||
|  | 		return msg.Msg.Seq | ||||||
|  | 	} | ||||||
|  | 	find := func(ctx context.Context, seqs []int64) ([]*model.MsgInfoModel, error) { | ||||||
|  | 		return c.msgDocDatabase.FindSeqs(ctx, conversationID, seqs) | ||||||
|  | 	} | ||||||
|  | 	return batchGetCache2(ctx, c.rcClient, msgCacheTimeout, seqs, getKey, getMsgID, find) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *msgCache) DelMessageBySeqs(ctx context.Context, conversationID string, seqs []int64) error { | ||||||
|  | 	if len(seqs) == 0 { | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  | 	keys := datautil.Slice(seqs, func(seq int64) string { | ||||||
|  | 		return cachekey.GetMessageCacheKeyV2(conversationID, seq) | ||||||
|  | 	}) | ||||||
|  | 	slotKeys, err := groupKeysBySlot(ctx, getRocksCacheRedisClient(c.rcClient), keys) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	for _, keys := range slotKeys { | ||||||
|  | 		if err := c.rcClient.TagAsDeletedBatch2(ctx, keys); err != nil { | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  | |||||||
| @ -18,6 +18,9 @@ import ( | |||||||
| 	"context" | 	"context" | ||||||
| 	"encoding/json" | 	"encoding/json" | ||||||
| 	"errors" | 	"errors" | ||||||
|  | 	"github.com/openimsdk/tools/utils/jsonutil" | ||||||
|  | 	"strconv" | ||||||
|  | 	"strings" | ||||||
| 	"time" | 	"time" | ||||||
| 
 | 
 | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" | 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" | ||||||
| @ -36,7 +39,6 @@ import ( | |||||||
| 	"github.com/openimsdk/tools/log" | 	"github.com/openimsdk/tools/log" | ||||||
| 	"github.com/openimsdk/tools/mq/kafka" | 	"github.com/openimsdk/tools/mq/kafka" | ||||||
| 	"github.com/openimsdk/tools/utils/datautil" | 	"github.com/openimsdk/tools/utils/datautil" | ||||||
| 	"github.com/openimsdk/tools/utils/timeutil" |  | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| const ( | const ( | ||||||
| @ -54,10 +56,8 @@ type CommonMsgDatabase interface { | |||||||
| 	GetMsgBySeqsRange(ctx context.Context, userID string, conversationID string, begin, end, num, userMaxSeq int64) (minSeq int64, maxSeq int64, seqMsg []*sdkws.MsgData, err error) | 	GetMsgBySeqsRange(ctx context.Context, userID string, conversationID string, begin, end, num, userMaxSeq int64) (minSeq int64, maxSeq int64, seqMsg []*sdkws.MsgData, err error) | ||||||
| 	// GetMsgBySeqs retrieves messages for large groups from MongoDB by sequence numbers. | 	// GetMsgBySeqs retrieves messages for large groups from MongoDB by sequence numbers. | ||||||
| 	GetMsgBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) (minSeq int64, maxSeq int64, seqMsg []*sdkws.MsgData, err error) | 	GetMsgBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) (minSeq int64, maxSeq int64, seqMsg []*sdkws.MsgData, err error) | ||||||
| 	// DeleteConversationMsgsAndSetMinSeq deletes conversation messages and resets the minimum sequence number. If `remainTime` is 0, all messages are deleted (this method does not delete Redis | 
 | ||||||
| 	// cache). |  | ||||||
| 	GetMessagesBySeqWithBounds(ctx context.Context, userID string, conversationID string, seqs []int64, pullOrder sdkws.PullOrder) (bool, int64, []*sdkws.MsgData, error) | 	GetMessagesBySeqWithBounds(ctx context.Context, userID string, conversationID string, seqs []int64, pullOrder sdkws.PullOrder) (bool, int64, []*sdkws.MsgData, error) | ||||||
| 	DeleteConversationMsgsAndSetMinSeq(ctx context.Context, conversationID string, remainTime int64) error |  | ||||||
| 	// ClearUserMsgs marks messages for deletion based on clear time and returns a list of sequence numbers for marked messages. | 	// ClearUserMsgs marks messages for deletion based on clear time and returns a list of sequence numbers for marked messages. | ||||||
| 	ClearUserMsgs(ctx context.Context, userID string, conversationID string, clearTime int64, lastMsgClearTime time.Time) (seqs []int64, err error) | 	ClearUserMsgs(ctx context.Context, userID string, conversationID string, clearTime int64, lastMsgClearTime time.Time) (seqs []int64, err error) | ||||||
| 	// DeleteUserMsgsBySeqs allows a user to delete messages based on sequence numbers. | 	// DeleteUserMsgsBySeqs allows a user to delete messages based on sequence numbers. | ||||||
| @ -80,8 +80,6 @@ type CommonMsgDatabase interface { | |||||||
| 	GetMaxSeqWithTime(ctx context.Context, conversationID string) (database.SeqTime, error) | 	GetMaxSeqWithTime(ctx context.Context, conversationID string) (database.SeqTime, error) | ||||||
| 	GetCacheMaxSeqWithTime(ctx context.Context, conversationIDs []string) (map[string]database.SeqTime, error) | 	GetCacheMaxSeqWithTime(ctx context.Context, conversationIDs []string) (map[string]database.SeqTime, error) | ||||||
| 
 | 
 | ||||||
| 	//GetMongoMaxAndMinSeq(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo int64, err error) |  | ||||||
| 	//GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) |  | ||||||
| 	SetSendMsgStatus(ctx context.Context, id string, status int32) error | 	SetSendMsgStatus(ctx context.Context, id string, status int32) error | ||||||
| 	GetSendMsgStatus(ctx context.Context, id string) (int32, error) | 	GetSendMsgStatus(ctx context.Context, id string) (int32, error) | ||||||
| 	SearchMessage(ctx context.Context, req *pbmsg.SearchMessageReq) (total int64, msgData []*pbmsg.SearchedMsgData, err error) | 	SearchMessage(ctx context.Context, req *pbmsg.SearchMessageReq) (total int64, msgData []*pbmsg.SearchedMsgData, err error) | ||||||
| @ -92,10 +90,6 @@ type CommonMsgDatabase interface { | |||||||
| 
 | 
 | ||||||
| 	RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*model.UserCount, dateCount map[string]int64, err error) | 	RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*model.UserCount, dateCount map[string]int64, err error) | ||||||
| 	RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*model.GroupCount, dateCount map[string]int64, err error) | 	RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*model.GroupCount, dateCount map[string]int64, err error) | ||||||
| 	ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) |  | ||||||
| 
 |  | ||||||
| 	// get Msg when destruct msg before |  | ||||||
| 	//DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error) |  | ||||||
| 
 | 
 | ||||||
| 	GetRandBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error) | 	GetRandBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error) | ||||||
| 
 | 
 | ||||||
| @ -139,7 +133,7 @@ func (db *commonMsgDatabase) MsgToMQ(ctx context.Context, key string, msg2mq *sd | |||||||
| 	return err | 	return err | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationID string, fields []any, key int8, firstSeq int64) error { | func (db *commonMsgDatabase) batchInsertBlock(ctx context.Context, conversationID string, fields []any, key int8, firstSeq int64) error { | ||||||
| 	if len(fields) == 0 { | 	if len(fields) == 0 { | ||||||
| 		return nil | 		return nil | ||||||
| 	} | 	} | ||||||
| @ -237,11 +231,15 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI | |||||||
| 		tryUpdate = false // The current block is inserted successfully, and the next block is inserted preferentially | 		tryUpdate = false // The current block is inserted successfully, and the next block is inserted preferentially | ||||||
| 		i += insert - 1   // Skip the inserted data | 		i += insert - 1   // Skip the inserted data | ||||||
| 	} | 	} | ||||||
|  | 
 | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (db *commonMsgDatabase) RevokeMsg(ctx context.Context, conversationID string, seq int64, revoke *model.RevokeModel) error { | func (db *commonMsgDatabase) RevokeMsg(ctx context.Context, conversationID string, seq int64, revoke *model.RevokeModel) error { | ||||||
| 	return db.BatchInsertBlock(ctx, conversationID, []any{revoke}, updateKeyRevoke, seq) | 	if err := db.batchInsertBlock(ctx, conversationID, []any{revoke}, updateKeyRevoke, seq); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	return db.msg.DelMessageBySeqs(ctx, conversationID, []int64{seq}) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (db *commonMsgDatabase) MarkSingleChatMsgsAsRead(ctx context.Context, userID string, conversationID string, totalSeqs []int64) error { | func (db *commonMsgDatabase) MarkSingleChatMsgsAsRead(ctx context.Context, userID string, conversationID string, totalSeqs []int64) error { | ||||||
| @ -256,22 +254,13 @@ func (db *commonMsgDatabase) MarkSingleChatMsgsAsRead(ctx context.Context, userI | |||||||
| 			return err | 			return err | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	return nil | 	return db.msg.DelMessageBySeqs(ctx, conversationID, totalSeqs) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversationID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) { | func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversationID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) { | ||||||
| 	for docID, seqs := range db.msgTable.GetDocIDSeqsMap(conversationID, seqs) { | 	return db.GetMessageBySeqs(ctx, conversationID, userID, seqs) | ||||||
| 		// log.ZDebug(ctx, "getMsgBySeqs", "docID", docID, "seqs", seqs) |  | ||||||
| 		msgs, err := db.findMsgInfoBySeq(ctx, userID, docID, conversationID, seqs) |  | ||||||
| 		if err != nil { |  | ||||||
| 			return nil, err |  | ||||||
| 		} |  | ||||||
| 		for _, msg := range msgs { |  | ||||||
| 			totalMsgs = append(totalMsgs, convert.MsgDB2Pb(msg.Msg)) |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 	return totalMsgs, nil |  | ||||||
| } | } | ||||||
|  | 
 | ||||||
| func (db *commonMsgDatabase) handlerDBMsg(ctx context.Context, cache map[int64][]*model.MsgInfoModel, userID, conversationID string, msg *model.MsgInfoModel) { | func (db *commonMsgDatabase) handlerDBMsg(ctx context.Context, cache map[int64][]*model.MsgInfoModel, userID, conversationID string, msg *model.MsgInfoModel) { | ||||||
| 	if msg.IsRead { | 	if msg.IsRead { | ||||||
| 		msg.Msg.IsRead = true | 		msg.Msg.IsRead = true | ||||||
| @ -360,9 +349,6 @@ func (db *commonMsgDatabase) handlerDBMsg(ctx context.Context, cache map[int64][ | |||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
| 	msg.Msg.Content = string(data) | 	msg.Msg.Content = string(data) | ||||||
| 	//if _, err := db.msgDocDatabase.UpdateMsg(ctx, db.msgTable.GetDocID(conversationID, msg.Msg.Seq), db.msgTable.GetMsgIndex(msg.Msg.Seq), "msg", msg.Msg); err != nil { |  | ||||||
| 	//	log.ZError(ctx, "UpdateMsgContent", err) |  | ||||||
| 	//} |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, userID, docID string, conversationID string, seqs []int64) (totalMsgs []*model.MsgInfoModel, err error) { | func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, userID, docID string, conversationID string, seqs []int64) (totalMsgs []*model.MsgInfoModel, err error) { | ||||||
| @ -377,24 +363,6 @@ func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, userID, docID | |||||||
| 	return msgs, err | 	return msgs, err | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, userID string, conversationID string, allSeqs []int64, begin, end int64) (seqMsgs []*sdkws.MsgData, err error) { |  | ||||||
| 	log.ZDebug(ctx, "getMsgBySeqsRange", "conversationID", conversationID, "allSeqs", allSeqs, "begin", begin, "end", end) |  | ||||||
| 	for docID, seqs := range db.msgTable.GetDocIDSeqsMap(conversationID, allSeqs) { |  | ||||||
| 		log.ZDebug(ctx, "getMsgBySeqsRange", "docID", docID, "seqs", seqs) |  | ||||||
| 		msgs, err := db.findMsgInfoBySeq(ctx, userID, docID, conversationID, seqs) |  | ||||||
| 		if err != nil { |  | ||||||
| 			return nil, err |  | ||||||
| 		} |  | ||||||
| 		for _, msg := range msgs { |  | ||||||
| 			if msg.IsRead { |  | ||||||
| 				msg.Msg.IsRead = true |  | ||||||
| 			} |  | ||||||
| 			seqMsgs = append(seqMsgs, convert.MsgDB2Pb(msg.Msg)) |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 	return seqMsgs, nil |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // GetMsgBySeqsRange In the context of group chat, we have the following parameters: | // GetMsgBySeqsRange In the context of group chat, we have the following parameters: | ||||||
| // | // | ||||||
| // "maxSeq" of a conversation: It represents the maximum value of messages in the group conversation. | // "maxSeq" of a conversation: It represents the maximum value of messages in the group conversation. | ||||||
| @ -463,37 +431,10 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin | |||||||
| 			seqs = append(seqs, i) | 			seqs = append(seqs, i) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 	successMsgs, err := db.GetMessageBySeqs(ctx, conversationID, userID, seqs) | ||||||
| 	if len(seqs) == 0 { |  | ||||||
| 		return 0, 0, nil, nil |  | ||||||
| 	} |  | ||||||
| 	newBegin := seqs[0] |  | ||||||
| 	newEnd := seqs[len(seqs)-1] |  | ||||||
| 	var successMsgs []*sdkws.MsgData |  | ||||||
| 	log.ZDebug(ctx, "GetMsgBySeqsRange", "first seqs", seqs, "newBegin", newBegin, "newEnd", newEnd) |  | ||||||
| 	cachedMsgs, failedSeqs, err := db.msg.GetMessagesBySeq(ctx, conversationID, seqs) |  | ||||||
| 	if err != nil && !errors.Is(err, redis.Nil) { |  | ||||||
| 		log.ZError(ctx, "get message from redis exception", err, "conversationID", conversationID, "seqs", seqs) |  | ||||||
| 	} |  | ||||||
| 	successMsgs = append(successMsgs, cachedMsgs...) |  | ||||||
| 	log.ZDebug(ctx, "get msgs from cache", "cachedMsgs", cachedMsgs) |  | ||||||
| 	// get from cache or db |  | ||||||
| 
 |  | ||||||
| 	if len(failedSeqs) > 0 { |  | ||||||
| 		log.ZDebug(ctx, "msgs not exist in redis", "seqs", failedSeqs) |  | ||||||
| 		mongoMsgs, err := db.getMsgBySeqsRange(ctx, userID, conversationID, failedSeqs, begin, end) |  | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 
 |  | ||||||
| 		return 0, 0, nil, err | 		return 0, 0, nil, err | ||||||
| 	} | 	} | ||||||
| 		successMsgs = append(mongoMsgs, successMsgs...) |  | ||||||
| 
 |  | ||||||
| 		//_, err = db.msg.SetMessagesToCache(ctx, conversationID, mongoMsgs) |  | ||||||
| 		//if err != nil { |  | ||||||
| 		//	return 0, 0, nil, err |  | ||||||
| 		//} |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	return minSeq, maxSeq, successMsgs, nil | 	return minSeq, maxSeq, successMsgs, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| @ -529,32 +470,10 @@ func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, co | |||||||
| 			newSeqs = append(newSeqs, seq) | 			newSeqs = append(newSeqs, seq) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	if len(newSeqs) == 0 { | 	successMsgs, err := db.GetMessageBySeqs(ctx, conversationID, userID, newSeqs) | ||||||
| 		return minSeq, maxSeq, nil, nil |  | ||||||
| 	} |  | ||||||
| 	successMsgs, failedSeqs, err := db.msg.GetMessagesBySeq(ctx, conversationID, newSeqs) |  | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		if !errors.Is(err, redis.Nil) { |  | ||||||
| 			log.ZWarn(ctx, "get message from redis exception", err, "failedSeqs", failedSeqs, "conversationID", conversationID) |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 	log.ZDebug(ctx, "db.seq.GetMessagesBySeq", "userID", userID, "conversationID", conversationID, "seqs", |  | ||||||
| 		seqs, "len(successMsgs)", len(successMsgs), "failedSeqs", failedSeqs) |  | ||||||
| 
 |  | ||||||
| 	if len(failedSeqs) > 0 { |  | ||||||
| 		mongoMsgs, err := db.getMsgBySeqs(ctx, userID, conversationID, failedSeqs) |  | ||||||
| 		if err != nil { |  | ||||||
| 
 |  | ||||||
| 		return 0, 0, nil, err | 		return 0, 0, nil, err | ||||||
| 	} | 	} | ||||||
| 
 |  | ||||||
| 		successMsgs = append(successMsgs, mongoMsgs...) |  | ||||||
| 
 |  | ||||||
| 		//_, err = db.msg.SetMessagesToCache(ctx, conversationID, mongoMsgs) |  | ||||||
| 		//if err != nil { |  | ||||||
| 		//	return 0, 0, nil, err |  | ||||||
| 		//} |  | ||||||
| 	} |  | ||||||
| 	return minSeq, maxSeq, successMsgs, nil | 	return minSeq, maxSeq, successMsgs, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| @ -607,46 +526,13 @@ func (db *commonMsgDatabase) GetMessagesBySeqWithBounds(ctx context.Context, use | |||||||
| 	if len(newSeqs) == 0 { | 	if len(newSeqs) == 0 { | ||||||
| 		return isEnd, endSeq, nil, nil | 		return isEnd, endSeq, nil, nil | ||||||
| 	} | 	} | ||||||
| 	successMsgs, failedSeqs, err := db.msg.GetMessagesBySeq(ctx, conversationID, newSeqs) | 	successMsgs, err := db.GetMessageBySeqs(ctx, conversationID, userID, newSeqs) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		if !errors.Is(err, redis.Nil) { |  | ||||||
| 			log.ZWarn(ctx, "get message from redis exception", err, "failedSeqs", failedSeqs, "conversationID", conversationID) |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 	log.ZDebug(ctx, "db.seq.GetMessagesBySeq", "userID", userID, "conversationID", conversationID, "seqs", |  | ||||||
| 		seqs, "len(successMsgs)", len(successMsgs), "failedSeqs", failedSeqs) |  | ||||||
| 
 |  | ||||||
| 	if len(failedSeqs) > 0 { |  | ||||||
| 		mongoMsgs, err := db.getMsgBySeqs(ctx, userID, conversationID, failedSeqs) |  | ||||||
| 		if err != nil { |  | ||||||
| 
 |  | ||||||
| 		return false, 0, nil, err | 		return false, 0, nil, err | ||||||
| 	} | 	} | ||||||
| 
 |  | ||||||
| 		successMsgs = append(successMsgs, mongoMsgs...) |  | ||||||
| 
 |  | ||||||
| 		//_, err = db.msg.SetMessagesToCache(ctx, conversationID, mongoMsgs) |  | ||||||
| 		//if err != nil { |  | ||||||
| 		//	return 0, 0, nil, err |  | ||||||
| 		//} |  | ||||||
| 	} |  | ||||||
| 	return isEnd, endSeq, successMsgs, nil | 	return isEnd, endSeq, successMsgs, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (db *commonMsgDatabase) DeleteConversationMsgsAndSetMinSeq(ctx context.Context, conversationID string, remainTime int64) error { |  | ||||||
| 	var delStruct delMsgRecursionStruct |  | ||||||
| 	var skip int64 |  | ||||||
| 	minSeq, err := db.deleteMsgRecursion(ctx, conversationID, skip, &delStruct, remainTime) |  | ||||||
| 	if err != nil { |  | ||||||
| 		return err |  | ||||||
| 	} |  | ||||||
| 	log.ZDebug(ctx, "DeleteConversationMsgsAndSetMinSeq", "conversationID", conversationID, "minSeq", minSeq) |  | ||||||
| 	if minSeq == 0 { |  | ||||||
| 		return nil |  | ||||||
| 	} |  | ||||||
| 	return db.seqConversation.SetMinSeq(ctx, conversationID, minSeq) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (db *commonMsgDatabase) ClearUserMsgs(ctx context.Context, userID string, conversationID string, clearTime int64, lastMsgClearTime time.Time) (seqs []int64, err error) { | func (db *commonMsgDatabase) ClearUserMsgs(ctx context.Context, userID string, conversationID string, clearTime int64, lastMsgClearTime time.Time) (seqs []int64, err error) { | ||||||
| 	var index int64 | 	var index int64 | ||||||
| 	for { | 	for { | ||||||
| @ -721,56 +607,6 @@ func (d *delMsgRecursionStruct) getSetMinSeq() int64 { | |||||||
| 	return d.minSeq | 	return d.minSeq | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // index 0....19(del) 20...69 |  | ||||||
| // seq 70 |  | ||||||
| // set minSeq 21 |  | ||||||
| // recursion deletes the list and returns the set minimum seq. |  | ||||||
| func (db *commonMsgDatabase) deleteMsgRecursion(ctx context.Context, conversationID string, index int64, delStruct *delMsgRecursionStruct, remainTime int64) (int64, error) { |  | ||||||
| 	// find from oldest list |  | ||||||
| 	msgDocModel, err := db.msgDocDatabase.GetMsgDocModelByIndex(ctx, conversationID, index, 1) |  | ||||||
| 	if err != nil || msgDocModel.DocID == "" { |  | ||||||
| 		if err != nil { |  | ||||||
| 			if err == model.ErrMsgListNotExist { |  | ||||||
| 				log.ZDebug(ctx, "deleteMsgRecursion ErrMsgListNotExist", "conversationID", conversationID, "index:", index) |  | ||||||
| 			} else { |  | ||||||
| 				log.ZError(ctx, "deleteMsgRecursion GetUserMsgListByIndex failed", err, "conversationID", conversationID, "index", index) |  | ||||||
| 			} |  | ||||||
| 		} |  | ||||||
| 		// If an error is reported, or the error cannot be obtained, it is physically deleted and seq delMongoMsgsPhysical(delStruct.delDocIDList) is returned to end the recursion |  | ||||||
| 		err = db.msgDocDatabase.DeleteDocs(ctx, delStruct.delDocIDs) |  | ||||||
| 		if err != nil { |  | ||||||
| 			return 0, err |  | ||||||
| 		} |  | ||||||
| 		return delStruct.getSetMinSeq() + 1, nil |  | ||||||
| 	} |  | ||||||
| 	log.ZDebug(ctx, "doc info", "conversationID", conversationID, "index", index, "docID", msgDocModel.DocID, "len", len(msgDocModel.Msg)) |  | ||||||
| 	if int64(len(msgDocModel.Msg)) > db.msgTable.GetSingleGocMsgNum() { |  | ||||||
| 		log.ZWarn(ctx, "msgs too large", nil, "length", len(msgDocModel.Msg), "docID:", msgDocModel.DocID) |  | ||||||
| 	} |  | ||||||
| 	if msgDocModel.IsFull() && msgDocModel.Msg[len(msgDocModel.Msg)-1].Msg.SendTime+(remainTime*1000) < timeutil.GetCurrentTimestampByMill() { |  | ||||||
| 		log.ZDebug(ctx, "doc is full and all msg is expired", "docID", msgDocModel.DocID) |  | ||||||
| 		delStruct.delDocIDs = append(delStruct.delDocIDs, msgDocModel.DocID) |  | ||||||
| 		delStruct.minSeq = msgDocModel.Msg[len(msgDocModel.Msg)-1].Msg.Seq |  | ||||||
| 	} else { |  | ||||||
| 		var delMsgIndexs []int |  | ||||||
| 		for i, MsgInfoModel := range msgDocModel.Msg { |  | ||||||
| 			if MsgInfoModel != nil && MsgInfoModel.Msg != nil { |  | ||||||
| 				if timeutil.GetCurrentTimestampByMill() > MsgInfoModel.Msg.SendTime+(remainTime*1000) { |  | ||||||
| 					delMsgIndexs = append(delMsgIndexs, i) |  | ||||||
| 				} |  | ||||||
| 			} |  | ||||||
| 		} |  | ||||||
| 		if len(delMsgIndexs) > 0 { |  | ||||||
| 			if err = db.msgDocDatabase.DeleteMsgsInOneDocByIndex(ctx, msgDocModel.DocID, delMsgIndexs); err != nil { |  | ||||||
| 				log.ZError(ctx, "deleteMsgRecursion DeleteMsgsInOneDocByIndex failed", err, "conversationID", conversationID, "index", index) |  | ||||||
| 			} |  | ||||||
| 			delStruct.minSeq = int64(msgDocModel.Msg[delMsgIndexs[len(delMsgIndexs)-1]].Msg.Seq) |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 	seq, err := db.deleteMsgRecursion(ctx, conversationID, index+1, delStruct, remainTime) |  | ||||||
| 	return seq, err |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (db *commonMsgDatabase) DeleteMsgsPhysicalBySeqs(ctx context.Context, conversationID string, allSeqs []int64) error { | func (db *commonMsgDatabase) DeleteMsgsPhysicalBySeqs(ctx context.Context, conversationID string, allSeqs []int64) error { | ||||||
| 	if err := db.msg.DeleteMessagesFromCache(ctx, conversationID, allSeqs); err != nil { | 	if err := db.msg.DeleteMessagesFromCache(ctx, conversationID, allSeqs); err != nil { | ||||||
| 		return err | 		return err | ||||||
| @ -784,7 +620,7 @@ func (db *commonMsgDatabase) DeleteMsgsPhysicalBySeqs(ctx context.Context, conve | |||||||
| 			return err | 			return err | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	return nil | 	return db.msg.DelMessageBySeqs(ctx, conversationID, allSeqs) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (db *commonMsgDatabase) DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error { | func (db *commonMsgDatabase) DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error { | ||||||
| @ -798,7 +634,7 @@ func (db *commonMsgDatabase) DeleteUserMsgsBySeqs(ctx context.Context, userID st | |||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	return nil | 	return db.msg.DelMessageBySeqs(ctx, conversationID, seqs) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (db *commonMsgDatabase) GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) { | func (db *commonMsgDatabase) GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) { | ||||||
| @ -809,11 +645,6 @@ func (db *commonMsgDatabase) GetMaxSeq(ctx context.Context, conversationID strin | |||||||
| 	return db.seqConversation.GetMaxSeq(ctx, conversationID) | 	return db.seqConversation.GetMaxSeq(ctx, conversationID) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // |  | ||||||
| //func (db *commonMsgDatabase) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error { |  | ||||||
| //	return db.seqConversation.SetMinSeq(ctx, conversationID, minSeq) |  | ||||||
| //} |  | ||||||
| 
 |  | ||||||
| func (db *commonMsgDatabase) SetMinSeqs(ctx context.Context, seqs map[string]int64) error { | func (db *commonMsgDatabase) SetMinSeqs(ctx context.Context, seqs map[string]int64) error { | ||||||
| 	return db.seqConversation.SetMinSeqs(ctx, seqs) | 	return db.seqConversation.SetMinSeqs(ctx, seqs) | ||||||
| } | } | ||||||
| @ -888,26 +719,11 @@ func (db *commonMsgDatabase) GetMinMaxSeqMongo(ctx context.Context, conversation | |||||||
| 	return | 	return | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (db *commonMsgDatabase) RangeUserSendCount( | func (db *commonMsgDatabase) RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*model.UserCount, dateCount map[string]int64, err error) { | ||||||
| 	ctx context.Context, |  | ||||||
| 	start time.Time, |  | ||||||
| 	end time.Time, |  | ||||||
| 	group bool, |  | ||||||
| 	ase bool, |  | ||||||
| 	pageNumber int32, |  | ||||||
| 	showNumber int32, |  | ||||||
| ) (msgCount int64, userCount int64, users []*model.UserCount, dateCount map[string]int64, err error) { |  | ||||||
| 	return db.msgDocDatabase.RangeUserSendCount(ctx, start, end, group, ase, pageNumber, showNumber) | 	return db.msgDocDatabase.RangeUserSendCount(ctx, start, end, group, ase, pageNumber, showNumber) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (db *commonMsgDatabase) RangeGroupSendCount( | func (db *commonMsgDatabase) RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*model.GroupCount, dateCount map[string]int64, err error) { | ||||||
| 	ctx context.Context, |  | ||||||
| 	start time.Time, |  | ||||||
| 	end time.Time, |  | ||||||
| 	ase bool, |  | ||||||
| 	pageNumber int32, |  | ||||||
| 	showNumber int32, |  | ||||||
| ) (msgCount int64, userCount int64, groups []*model.GroupCount, dateCount map[string]int64, err error) { |  | ||||||
| 	return db.msgDocDatabase.RangeGroupSendCount(ctx, start, end, ase, pageNumber, showNumber) | 	return db.msgDocDatabase.RangeGroupSendCount(ctx, start, end, ase, pageNumber, showNumber) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| @ -947,43 +763,10 @@ func (db *commonMsgDatabase) FindOneByDocIDs(ctx context.Context, conversationID | |||||||
| 	return totalMsgs, nil | 	return totalMsgs, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (db *commonMsgDatabase) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) { |  | ||||||
| 	db.msgDocDatabase.ConvertMsgsDocLen(ctx, conversationIDs) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (db *commonMsgDatabase) GetRandBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error) { | func (db *commonMsgDatabase) GetRandBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error) { | ||||||
| 	return db.msgDocDatabase.GetRandBeforeMsg(ctx, ts, limit) | 	return db.msgDocDatabase.GetRandBeforeMsg(ctx, ts, limit) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // |  | ||||||
| //func (db *commonMsgDatabase) DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error) { |  | ||||||
| //	var notNull int |  | ||||||
| //	index := make([]int, 0, len(doc.Msg)) |  | ||||||
| //	for i, message := range doc.Msg { |  | ||||||
| //		if message.Msg != nil { |  | ||||||
| //			notNull++ |  | ||||||
| //			if message.Msg.SendTime < ts { |  | ||||||
| //				index = append(index, i) |  | ||||||
| //			} |  | ||||||
| //		} |  | ||||||
| //	} |  | ||||||
| //	if len(index) == 0 { |  | ||||||
| //		return index, nil |  | ||||||
| //	} |  | ||||||
| //	maxSeq := doc.Msg[index[len(index)-1]].Msg.Seq |  | ||||||
| //	conversationID := doc.DocID[:strings.LastIndex(doc.DocID, ":")] |  | ||||||
| //	if err := db.SetMinSeq(ctx, conversationID, maxSeq+1); err != nil { |  | ||||||
| //		return index, err |  | ||||||
| //	} |  | ||||||
| //	if len(index) == notNull { |  | ||||||
| //		log.ZDebug(ctx, "Delete db in Doc", "DocID", doc.DocID, "index", index, "maxSeq", maxSeq) |  | ||||||
| //		return index, db.msgDocDatabase.DeleteDoc(ctx, doc.DocID) |  | ||||||
| //	} else { |  | ||||||
| //		log.ZDebug(ctx, "delete db in index", "DocID", doc.DocID, "index", index, "maxSeq", maxSeq) |  | ||||||
| //		return index, db.msgDocDatabase.DeleteMsgByIndex(ctx, doc.DocID, index) |  | ||||||
| //	} |  | ||||||
| //} |  | ||||||
| 
 |  | ||||||
| func (db *commonMsgDatabase) SetMinSeq(ctx context.Context, conversationID string, seq int64) error { | func (db *commonMsgDatabase) SetMinSeq(ctx context.Context, conversationID string, seq int64) error { | ||||||
| 	dbSeq, err := db.seqConversation.GetMinSeq(ctx, conversationID) | 	dbSeq, err := db.seqConversation.GetMinSeq(ctx, conversationID) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| @ -998,10 +781,6 @@ func (db *commonMsgDatabase) SetMinSeq(ctx context.Context, conversationID strin | |||||||
| 	return db.seqConversation.SetMinSeq(ctx, conversationID, seq) | 	return db.seqConversation.SetMinSeq(ctx, conversationID, seq) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (db *commonMsgDatabase) GetRandDocIDs(ctx context.Context, limit int) ([]string, error) { |  | ||||||
| 	return db.msgDocDatabase.GetRandDocIDs(ctx, limit) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (db *commonMsgDatabase) GetCacheMaxSeqWithTime(ctx context.Context, conversationIDs []string) (map[string]database.SeqTime, error) { | func (db *commonMsgDatabase) GetCacheMaxSeqWithTime(ctx context.Context, conversationIDs []string) (map[string]database.SeqTime, error) { | ||||||
| 	return db.seqConversation.GetCacheMaxSeqWithTime(ctx, conversationIDs) | 	return db.seqConversation.GetCacheMaxSeqWithTime(ctx, conversationIDs) | ||||||
| } | } | ||||||
| @ -1016,9 +795,103 @@ func (db *commonMsgDatabase) GetMaxSeqsWithTime(ctx context.Context, conversatio | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (db *commonMsgDatabase) DeleteDoc(ctx context.Context, docID string) error { | func (db *commonMsgDatabase) DeleteDoc(ctx context.Context, docID string) error { | ||||||
| 	return db.msgDocDatabase.DeleteDoc(ctx, docID) | 	index := strings.LastIndex(docID, ":") | ||||||
|  | 	if index <= 0 { | ||||||
|  | 		return errs.ErrInternalServer.WrapMsg("docID is invalid", "docID", docID) | ||||||
|  | 	} | ||||||
|  | 	index, err := strconv.Atoi(docID[index+1:]) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return errs.WrapMsg(err, "strconv.Atoi", "docID", docID) | ||||||
|  | 	} | ||||||
|  | 	conversationID := docID[:index] | ||||||
|  | 	seqs := make([]int64, db.msgTable.GetSingleGocMsgNum()) | ||||||
|  | 	minSeq := db.msgTable.GetMinSeq(index) | ||||||
|  | 	for i := range seqs { | ||||||
|  | 		seqs[i] = minSeq + int64(i) | ||||||
|  | 	} | ||||||
|  | 	if err := db.msgDocDatabase.DeleteDoc(ctx, docID); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	return db.msg.DelMessageBySeqs(ctx, conversationID, seqs) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (db *commonMsgDatabase) GetLastMessageSeqByTime(ctx context.Context, conversationID string, time int64) (int64, error) { | func (db *commonMsgDatabase) GetLastMessageSeqByTime(ctx context.Context, conversationID string, time int64) (int64, error) { | ||||||
| 	return db.msgDocDatabase.GetLastMessageSeqByTime(ctx, conversationID, time) | 	return db.msgDocDatabase.GetLastMessageSeqByTime(ctx, conversationID, time) | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | func (db *commonMsgDatabase) handlerDeleteAndRevoked(ctx context.Context, userID string, msgs []*model.MsgInfoModel) { | ||||||
|  | 	for i := range msgs { | ||||||
|  | 		msg := msgs[i] | ||||||
|  | 		if msg == nil || msg.Msg == nil { | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  | 		msg.Msg.IsRead = msg.IsRead | ||||||
|  | 		if datautil.Contain(userID, msg.DelList...) { | ||||||
|  | 			msg.Msg.Content = "" | ||||||
|  | 			msg.Msg.Status = constant.MsgDeleted | ||||||
|  | 		} | ||||||
|  | 		if msg.Revoke == nil { | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  | 		msg.Msg.ContentType = constant.MsgRevokeNotification | ||||||
|  | 		revokeContent := sdkws.MessageRevokedContent{ | ||||||
|  | 			RevokerID:                   msg.Revoke.UserID, | ||||||
|  | 			RevokerRole:                 msg.Revoke.Role, | ||||||
|  | 			ClientMsgID:                 msg.Msg.ClientMsgID, | ||||||
|  | 			RevokerNickname:             msg.Revoke.Nickname, | ||||||
|  | 			RevokeTime:                  msg.Revoke.Time, | ||||||
|  | 			SourceMessageSendTime:       msg.Msg.SendTime, | ||||||
|  | 			SourceMessageSendID:         msg.Msg.SendID, | ||||||
|  | 			SourceMessageSenderNickname: msg.Msg.SenderNickname, | ||||||
|  | 			SessionType:                 msg.Msg.SessionType, | ||||||
|  | 			Seq:                         msg.Msg.Seq, | ||||||
|  | 			Ex:                          msg.Msg.Ex, | ||||||
|  | 		} | ||||||
|  | 		data, err := jsonutil.JsonMarshal(&revokeContent) | ||||||
|  | 		if err != nil { | ||||||
|  | 			log.ZWarn(ctx, "handlerDeleteAndRevoked JsonMarshal MessageRevokedContent", err, "msg", msg) | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  | 		elem := sdkws.NotificationElem{ | ||||||
|  | 			Detail: string(data), | ||||||
|  | 		} | ||||||
|  | 		content, err := jsonutil.JsonMarshal(&elem) | ||||||
|  | 		if err != nil { | ||||||
|  | 			log.ZWarn(ctx, "handlerDeleteAndRevoked JsonMarshal NotificationElem", err, "msg", msg) | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  | 		msg.Msg.Content = string(content) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (db *commonMsgDatabase) handlerQuote(ctx context.Context, userID, conversationID string, msgs []*model.MsgInfoModel) { | ||||||
|  | 	temp := make(map[int64][]*model.MsgInfoModel) | ||||||
|  | 	for i := range msgs { | ||||||
|  | 		db.handlerDBMsg(ctx, temp, userID, conversationID, msgs[i]) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (db *commonMsgDatabase) GetMessageBySeqs(ctx context.Context, conversationID string, userID string, seqs []int64) ([]*sdkws.MsgData, error) { | ||||||
|  | 	msgs, err := db.msg.GetMessageBySeqs(ctx, conversationID, seqs) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	db.handlerDeleteAndRevoked(ctx, userID, msgs) | ||||||
|  | 	db.handlerQuote(ctx, userID, conversationID, msgs) | ||||||
|  | 	seqMsgs := make(map[int64]*model.MsgInfoModel) | ||||||
|  | 	for i, msg := range msgs { | ||||||
|  | 		if msg.Msg == nil { | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  | 		seqMsgs[msg.Msg.Seq] = msgs[i] | ||||||
|  | 	} | ||||||
|  | 	res := make([]*sdkws.MsgData, 0, len(seqs)) | ||||||
|  | 	for _, seq := range seqs { | ||||||
|  | 		if v, ok := seqMsgs[seq]; ok { | ||||||
|  | 			res = append(res, convert.MsgDB2Pb(v.Msg)) | ||||||
|  | 		} else { | ||||||
|  | 			res = append(res, &sdkws.MsgData{Seq: seq}) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	return res, nil | ||||||
|  | } | ||||||
|  | |||||||
| @ -7,15 +7,12 @@ import ( | |||||||
| 
 | 
 | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" | 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" | 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" | ||||||
| 	"github.com/openimsdk/tools/utils/datautil" |  | ||||||
| 	"golang.org/x/exp/rand" |  | ||||||
| 
 |  | ||||||
| 	"github.com/openimsdk/protocol/constant" | 	"github.com/openimsdk/protocol/constant" | ||||||
| 	"github.com/openimsdk/protocol/msg" | 	"github.com/openimsdk/protocol/msg" | ||||||
| 	"github.com/openimsdk/protocol/sdkws" | 	"github.com/openimsdk/protocol/sdkws" | ||||||
| 	"github.com/openimsdk/tools/db/mongoutil" | 	"github.com/openimsdk/tools/db/mongoutil" | ||||||
| 	"github.com/openimsdk/tools/errs" | 	"github.com/openimsdk/tools/errs" | ||||||
| 	"github.com/openimsdk/tools/log" | 	"github.com/openimsdk/tools/utils/datautil" | ||||||
| 	"github.com/openimsdk/tools/utils/jsonutil" | 	"github.com/openimsdk/tools/utils/jsonutil" | ||||||
| 	"go.mongodb.org/mongo-driver/bson" | 	"go.mongodb.org/mongo-driver/bson" | ||||||
| 	"go.mongodb.org/mongo-driver/bson/primitive" | 	"go.mongodb.org/mongo-driver/bson/primitive" | ||||||
| @ -279,95 +276,6 @@ func (m *MsgMgo) MarkSingleChatMsgsAsRead(ctx context.Context, userID string, do | |||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| //func (m *MsgMgo) searchCount(ctx context.Context, filter any) (int64, error) { |  | ||||||
| // |  | ||||||
| //	return nil, nil |  | ||||||
| //} |  | ||||||
| 
 |  | ||||||
| //func (m *MsgMgo) searchMessage(ctx context.Context, filter any, nextID primitive.ObjectID, content bool, limit int) (int64, []*model.MsgInfoModel, primitive.ObjectID, error) { |  | ||||||
| //	var pipeline bson.A |  | ||||||
| //	if !nextID.IsZero() { |  | ||||||
| //		pipeline = append(pipeline, bson.M{"$match": bson.M{"_id": bson.M{"$gt": nextID}}}) |  | ||||||
| //	} |  | ||||||
| //	pipeline = append(pipeline, |  | ||||||
| //		bson.M{"$match": filter}, |  | ||||||
| //		bson.M{"$limit": limit}, |  | ||||||
| //		bson.M{"$unwind": "$msgs"}, |  | ||||||
| //		bson.M{"$match": filter}, |  | ||||||
| //		bson.M{ |  | ||||||
| //			"$group": bson.M{ |  | ||||||
| //				"_id": "$_id", |  | ||||||
| //				"doc_id": bson.M{ |  | ||||||
| //					"$first": "$doc_id", |  | ||||||
| //				}, |  | ||||||
| //				"msgs": bson.M{"$push": "$msgs"}, |  | ||||||
| //			}, |  | ||||||
| //		}, |  | ||||||
| //	) |  | ||||||
| //	if !content { |  | ||||||
| //		pipeline = append(pipeline, |  | ||||||
| //			bson.M{ |  | ||||||
| //				"$project": bson.M{ |  | ||||||
| //					"_id":   1, |  | ||||||
| //					"count": bson.M{"$size": "$msgs"}, |  | ||||||
| //				}, |  | ||||||
| //			}, |  | ||||||
| //		) |  | ||||||
| //		type result struct { |  | ||||||
| //			ID    primitive.ObjectID `bson:"_id"` |  | ||||||
| //			Count int64              `bson:"count"` |  | ||||||
| //		} |  | ||||||
| //		res, err := mongoutil.Aggregate[result](ctx, m.coll, pipeline) |  | ||||||
| //		if err != nil { |  | ||||||
| //			return 0, nil, primitive.ObjectID{}, err |  | ||||||
| //		} |  | ||||||
| //		if len(res) == 0 { |  | ||||||
| //			return 0, nil, primitive.ObjectID{}, nil |  | ||||||
| //		} |  | ||||||
| //		var count int64 |  | ||||||
| //		for _, r := range res { |  | ||||||
| //			count += r.Count |  | ||||||
| //		} |  | ||||||
| //		return count, nil, res[len(res)-1].ID, nil |  | ||||||
| //	} |  | ||||||
| //	type result struct { |  | ||||||
| //		ID  primitive.ObjectID    `bson:"_id"` |  | ||||||
| //		Msg []*model.MsgInfoModel `bson:"msgs"` |  | ||||||
| //	} |  | ||||||
| //	res, err := mongoutil.Aggregate[result](ctx, m.coll, pipeline) |  | ||||||
| //	if err != nil { |  | ||||||
| //		return 0, nil, primitive.ObjectID{}, err |  | ||||||
| //	} |  | ||||||
| //	if len(res) == 0 { |  | ||||||
| //		return 0, nil, primitive.ObjectID{}, err |  | ||||||
| //	} |  | ||||||
| //	var count int |  | ||||||
| //	for _, r := range res { |  | ||||||
| //		count += len(r.Msg) |  | ||||||
| //	} |  | ||||||
| //	msgs := make([]*model.MsgInfoModel, 0, count) |  | ||||||
| //	for _, r := range res { |  | ||||||
| //		msgs = append(msgs, r.Msg...) |  | ||||||
| //	} |  | ||||||
| //	return int64(count), msgs, res[len(res)-1].ID, nil |  | ||||||
| //} |  | ||||||
| 
 |  | ||||||
| /* |  | ||||||
| 
 |  | ||||||
| db.msg3.aggregate( |  | ||||||
|     [ |  | ||||||
|         { |  | ||||||
|             "$match": { |  | ||||||
|                 "doc_id": "si_7009965934_8710838466:0" |  | ||||||
|             }, |  | ||||||
| 
 |  | ||||||
|         } |  | ||||||
|     ] |  | ||||||
| ) |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| */ |  | ||||||
| 
 |  | ||||||
| type searchMessageIndex struct { | type searchMessageIndex struct { | ||||||
| 	ID    primitive.ObjectID `bson:"_id"` | 	ID    primitive.ObjectID `bson:"_id"` | ||||||
| 	Index []int64            `bson:"index"` | 	Index []int64            `bson:"index"` | ||||||
| @ -556,143 +464,6 @@ func (m *MsgMgo) SearchMessage(ctx context.Context, req *msg.SearchMessageReq) ( | |||||||
| 	return count, msgs, nil | 	return count, msgs, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| //func (m *MsgMgo) SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (int32, []*model.MsgInfoModel, error) { |  | ||||||
| //	where := make(bson.A, 0, 6) |  | ||||||
| //	if req.RecvID != "" { |  | ||||||
| //		if req.SessionType == constant.ReadGroupChatType { |  | ||||||
| //			where = append(where, bson.M{ |  | ||||||
| //				"$or": bson.A{ |  | ||||||
| //					bson.M{"doc_id": "^n_" + req.RecvID + ":"}, |  | ||||||
| //					bson.M{"doc_id": "^sg_" + req.RecvID + ":"}, |  | ||||||
| //				}, |  | ||||||
| //			}) |  | ||||||
| //		} else { |  | ||||||
| //			where = append(where, bson.M{"msgs.msg.recv_id": req.RecvID}) |  | ||||||
| //		} |  | ||||||
| //	} |  | ||||||
| //	if req.SendID != "" { |  | ||||||
| //		where = append(where, bson.M{"msgs.msg.send_id": req.SendID}) |  | ||||||
| //	} |  | ||||||
| //	if req.ContentType != 0 { |  | ||||||
| //		where = append(where, bson.M{"msgs.msg.content_type": req.ContentType}) |  | ||||||
| //	} |  | ||||||
| //	if req.SessionType != 0 { |  | ||||||
| //		where = append(where, bson.M{"msgs.msg.session_type": req.SessionType}) |  | ||||||
| //	} |  | ||||||
| //	if req.SendTime != "" { |  | ||||||
| //		sendTime, err := time.Parse(time.DateOnly, req.SendTime) |  | ||||||
| //		if err != nil { |  | ||||||
| //			return 0, nil, errs.ErrArgs.WrapMsg("invalid sendTime", "req", req.SendTime, "format", time.DateOnly, "cause", err.Error()) |  | ||||||
| //		} |  | ||||||
| //		where = append(where, |  | ||||||
| //			bson.M{ |  | ||||||
| //				"msgs.msg.send_time": bson.M{ |  | ||||||
| //					"$gte": sendTime.UnixMilli(), |  | ||||||
| //				}, |  | ||||||
| //			}, |  | ||||||
| //			bson.M{ |  | ||||||
| //				"msgs.msg.send_time": bson.M{ |  | ||||||
| //					"$lt": sendTime.Add(time.Hour * 24).UnixMilli(), |  | ||||||
| //				}, |  | ||||||
| //			}, |  | ||||||
| //		) |  | ||||||
| //	} |  | ||||||
| //	opt := options.Find().SetLimit(100) |  | ||||||
| //	res, err := mongoutil.Find[model.MsgDocModel](ctx, m.coll, bson.M{"$and": where}, opt) |  | ||||||
| //	if err != nil { |  | ||||||
| //		return 0, nil, err |  | ||||||
| //	} |  | ||||||
| //	_ = res |  | ||||||
| //	fmt.Println() |  | ||||||
| // |  | ||||||
| //	return 0, nil, nil |  | ||||||
| //	pipeline := bson.A{ |  | ||||||
| //		bson.M{ |  | ||||||
| //			"$unwind": "$msgs", |  | ||||||
| //		}, |  | ||||||
| //	} |  | ||||||
| //	if len(where) > 0 { |  | ||||||
| //		pipeline = append(pipeline, bson.M{ |  | ||||||
| //			"$match": bson.M{"$and": where}, |  | ||||||
| //		}) |  | ||||||
| //	} |  | ||||||
| //	pipeline = append(pipeline, |  | ||||||
| //		bson.M{ |  | ||||||
| //			"$project": bson.M{ |  | ||||||
| //				"_id": 0, |  | ||||||
| //				"msg": "$msgs.msg", |  | ||||||
| //			}, |  | ||||||
| //		}, |  | ||||||
| //		bson.M{ |  | ||||||
| //			"$count": "count", |  | ||||||
| //		}, |  | ||||||
| //	) |  | ||||||
| //	//count, err := mongoutil.Aggregate[int32](ctx, m.coll, pipeline) |  | ||||||
| //	//if err != nil { |  | ||||||
| //	//	return 0, nil, err |  | ||||||
| //	//} |  | ||||||
| //	//if len(count) == 0 || count[0] == 0 { |  | ||||||
| //	//	return 0, nil, nil |  | ||||||
| //	//} |  | ||||||
| //	count := []int32{0} |  | ||||||
| //	pipeline = pipeline[:len(pipeline)-1] |  | ||||||
| //	pipeline = append(pipeline, |  | ||||||
| //		bson.M{ |  | ||||||
| //			"$skip": (req.Pagination.GetPageNumber() - 1) * req.Pagination.GetShowNumber(), |  | ||||||
| //		}, |  | ||||||
| //		bson.M{ |  | ||||||
| //			"$limit": req.Pagination.GetShowNumber(), |  | ||||||
| //		}, |  | ||||||
| //	) |  | ||||||
| //	msgs, err := mongoutil.Aggregate[*model.MsgInfoModel](ctx, m.coll, pipeline) |  | ||||||
| //	if err != nil { |  | ||||||
| //		return 0, nil, err |  | ||||||
| //	} |  | ||||||
| //	for i := range msgs { |  | ||||||
| //		msgInfo := msgs[i] |  | ||||||
| //		if msgInfo == nil || msgInfo.Msg == nil { |  | ||||||
| //			continue |  | ||||||
| //		} |  | ||||||
| //		if msgInfo.Revoke != nil { |  | ||||||
| //			revokeContent := sdkws.MessageRevokedContent{ |  | ||||||
| //				RevokerID:                   msgInfo.Revoke.UserID, |  | ||||||
| //				RevokerRole:                 msgInfo.Revoke.Role, |  | ||||||
| //				ClientMsgID:                 msgInfo.Msg.ClientMsgID, |  | ||||||
| //				RevokerNickname:             msgInfo.Revoke.Nickname, |  | ||||||
| //				RevokeTime:                  msgInfo.Revoke.Time, |  | ||||||
| //				SourceMessageSendTime:       msgInfo.Msg.SendTime, |  | ||||||
| //				SourceMessageSendID:         msgInfo.Msg.SendID, |  | ||||||
| //				SourceMessageSenderNickname: msgInfo.Msg.SenderNickname, |  | ||||||
| //				SessionType:                 msgInfo.Msg.SessionType, |  | ||||||
| //				Seq:                         msgInfo.Msg.Seq, |  | ||||||
| //				Ex:                          msgInfo.Msg.Ex, |  | ||||||
| //			} |  | ||||||
| //			data, err := jsonutil.JsonMarshal(&revokeContent) |  | ||||||
| //			if err != nil { |  | ||||||
| //				return 0, nil, errs.WrapMsg(err, "json.Marshal revokeContent") |  | ||||||
| //			} |  | ||||||
| //			elem := sdkws.NotificationElem{Detail: string(data)} |  | ||||||
| //			content, err := jsonutil.JsonMarshal(&elem) |  | ||||||
| //			if err != nil { |  | ||||||
| //				return 0, nil, errs.WrapMsg(err, "json.Marshal elem") |  | ||||||
| //			} |  | ||||||
| //			msgInfo.Msg.ContentType = constant.MsgRevokeNotification |  | ||||||
| //			msgInfo.Msg.Content = string(content) |  | ||||||
| //		} |  | ||||||
| //	} |  | ||||||
| //	//start := (req.Pagination.PageNumber - 1) * req.Pagination.ShowNumber |  | ||||||
| //	//n := int32(len(msgs)) |  | ||||||
| //	//if start >= n { |  | ||||||
| //	//	return n, []*relation.MsgInfoModel{}, nil |  | ||||||
| //	//} |  | ||||||
| //	//if start+req.Pagination.ShowNumber < n { |  | ||||||
| //	//	msgs = msgs[start : start+req.Pagination.ShowNumber] |  | ||||||
| //	//} else { |  | ||||||
| //	//	msgs = msgs[start:] |  | ||||||
| //	//} |  | ||||||
| //	return count[0], msgs, nil |  | ||||||
| //} |  | ||||||
| 
 |  | ||||||
| func (m *MsgMgo) RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*model.UserCount, dateCount map[string]int64, err error) { | func (m *MsgMgo) RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*model.UserCount, dateCount map[string]int64, err error) { | ||||||
| 	var sort int | 	var sort int | ||||||
| 	if ase { | 	if ase { | ||||||
| @ -1178,94 +949,6 @@ func (m *MsgMgo) RangeGroupSendCount(ctx context.Context, start time.Time, end t | |||||||
| 	return result[0].MsgCount, result[0].UserCount, groups, dateCount, nil | 	return result[0].MsgCount, result[0].UserCount, groups, dateCount, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (m *MsgMgo) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) { |  | ||||||
| 	for _, conversationID := range conversationIDs { |  | ||||||
| 		regex := primitive.Regex{Pattern: fmt.Sprintf("^%s:", conversationID)} |  | ||||||
| 		msgDocs, err := mongoutil.Find[*model.MsgDocModel](ctx, m.coll, bson.M{"doc_id": regex}) |  | ||||||
| 		if err != nil { |  | ||||||
| 			log.ZError(ctx, "convertAll find msg doc failed", err, "conversationID", conversationID) |  | ||||||
| 			continue |  | ||||||
| 		} |  | ||||||
| 		if len(msgDocs) < 1 { |  | ||||||
| 			continue |  | ||||||
| 		} |  | ||||||
| 		log.ZDebug(ctx, "msg doc convert", "conversationID", conversationID, "len(msgDocs)", len(msgDocs)) |  | ||||||
| 		if len(msgDocs[0].Msg) == int(m.model.GetSingleGocMsgNum5000()) { |  | ||||||
| 			if err := mongoutil.DeleteMany(ctx, m.coll, bson.M{"doc_id": regex}); err != nil { |  | ||||||
| 				log.ZError(ctx, "convertAll delete many failed", err, "conversationID", conversationID) |  | ||||||
| 				continue |  | ||||||
| 			} |  | ||||||
| 			var newMsgDocs []any |  | ||||||
| 			for _, msgDoc := range msgDocs { |  | ||||||
| 				if int64(len(msgDoc.Msg)) == m.model.GetSingleGocMsgNum() { |  | ||||||
| 					continue |  | ||||||
| 				} |  | ||||||
| 				var index int64 |  | ||||||
| 				for index < int64(len(msgDoc.Msg)) { |  | ||||||
| 					msg := msgDoc.Msg[index] |  | ||||||
| 					if msg != nil && msg.Msg != nil { |  | ||||||
| 						msgDocModel := model.MsgDocModel{DocID: m.model.GetDocID(conversationID, msg.Msg.Seq)} |  | ||||||
| 						end := index + m.model.GetSingleGocMsgNum() |  | ||||||
| 						if int(end) >= len(msgDoc.Msg) { |  | ||||||
| 							msgDocModel.Msg = msgDoc.Msg[index:] |  | ||||||
| 						} else { |  | ||||||
| 							msgDocModel.Msg = msgDoc.Msg[index:end] |  | ||||||
| 						} |  | ||||||
| 						newMsgDocs = append(newMsgDocs, msgDocModel) |  | ||||||
| 						index = end |  | ||||||
| 					} else { |  | ||||||
| 						break |  | ||||||
| 					} |  | ||||||
| 				} |  | ||||||
| 			} |  | ||||||
| 			if err = mongoutil.InsertMany(ctx, m.coll, newMsgDocs); err != nil { |  | ||||||
| 				log.ZError(ctx, "convertAll insert many failed", err, "conversationID", conversationID, "len(newMsgDocs)", len(newMsgDocs)) |  | ||||||
| 			} else { |  | ||||||
| 				log.ZDebug(ctx, "msg doc convert", "conversationID", conversationID, "len(newMsgDocs)", len(newMsgDocs)) |  | ||||||
| 			} |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (m *MsgMgo) GetRandDocIDs(ctx context.Context, limit int) ([]string, error) { |  | ||||||
| 	var skip int |  | ||||||
| 	var docIDs []string |  | ||||||
| 	var offset int |  | ||||||
| 
 |  | ||||||
| 	count, err := m.coll.CountDocuments(ctx, bson.M{}) |  | ||||||
| 	if err != nil { |  | ||||||
| 		return nil, err |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	if count < int64(limit) { |  | ||||||
| 		skip = 0 |  | ||||||
| 	} else { |  | ||||||
| 		rand.Seed(uint64(time.Now().UnixMilli())) |  | ||||||
| 		skip = rand.Intn(int(count / int64(limit))) |  | ||||||
| 		offset = skip * limit |  | ||||||
| 	} |  | ||||||
| 	log.ZDebug(ctx, "offset", "skip", skip, "offset", offset) |  | ||||||
| 	res, err := mongoutil.Aggregate[*model.MsgDocModel](ctx, m.coll, []bson.M{ |  | ||||||
| 		{ |  | ||||||
| 			"$project": bson.M{ |  | ||||||
| 				"doc_id": 1, |  | ||||||
| 			}, |  | ||||||
| 		}, |  | ||||||
| 		{ |  | ||||||
| 			"$skip": offset, |  | ||||||
| 		}, |  | ||||||
| 		{ |  | ||||||
| 			"$limit": limit, |  | ||||||
| 		}, |  | ||||||
| 	}) |  | ||||||
| 
 |  | ||||||
| 	for _, doc := range res { |  | ||||||
| 		docIDs = append(docIDs, doc.DocID) |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	return docIDs, errs.Wrap(err) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (m *MsgMgo) GetRandBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error) { | func (m *MsgMgo) GetRandBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error) { | ||||||
| 	return mongoutil.Aggregate[*model.MsgDocModel](ctx, m.coll, []bson.M{ | 	return mongoutil.Aggregate[*model.MsgDocModel](ctx, m.coll, []bson.M{ | ||||||
| 		{ | 		{ | ||||||
| @ -1364,3 +1047,55 @@ func (m *MsgMgo) GetLastMessageSeqByTime(ctx context.Context, conversationID str | |||||||
| 	} | 	} | ||||||
| 	return seq, nil | 	return seq, nil | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | func (m *MsgMgo) onlyFindDocIndex(ctx context.Context, docID string, indexes []int64) ([]*model.MsgInfoModel, error) { | ||||||
|  | 	if len(indexes) == 0 { | ||||||
|  | 		return nil, nil | ||||||
|  | 	} | ||||||
|  | 	pipeline := mongo.Pipeline{ | ||||||
|  | 		bson.D{{Key: "$match", Value: bson.D{ | ||||||
|  | 			{Key: "doc_id", Value: docID}, | ||||||
|  | 		}}}, | ||||||
|  | 		bson.D{{Key: "$project", Value: bson.D{ | ||||||
|  | 			{Key: "_id", Value: 0}, | ||||||
|  | 			{Key: "doc_id", Value: 1}, | ||||||
|  | 			{Key: "msgs", Value: bson.D{ | ||||||
|  | 				{Key: "$map", Value: bson.D{ | ||||||
|  | 					{Key: "input", Value: indexes}, | ||||||
|  | 					{Key: "as", Value: "index"}, | ||||||
|  | 					{Key: "in", Value: bson.D{ | ||||||
|  | 						{Key: "$arrayElemAt", Value: bson.A{"$msgs", "$$index"}}, | ||||||
|  | 					}}, | ||||||
|  | 				}}, | ||||||
|  | 			}}, | ||||||
|  | 		}}}, | ||||||
|  | 	} | ||||||
|  | 	msgDocModel, err := mongoutil.Aggregate[*model.MsgDocModel](ctx, m.coll, pipeline) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	if len(msgDocModel) == 0 { | ||||||
|  | 		return nil, nil | ||||||
|  | 	} | ||||||
|  | 	return msgDocModel[0].Msg, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (m *MsgMgo) FindSeqs(ctx context.Context, conversationID string, seqs []int64) ([]*model.MsgInfoModel, error) { | ||||||
|  | 	if len(seqs) == 0 { | ||||||
|  | 		return nil, nil | ||||||
|  | 	} | ||||||
|  | 	result := make([]*model.MsgInfoModel, 0, len(seqs)) | ||||||
|  | 	for docID, seqs := range m.model.GetDocIDSeqsMap(conversationID, seqs) { | ||||||
|  | 		res, err := m.onlyFindDocIndex(ctx, docID, datautil.Slice(seqs, m.model.GetMsgIndex)) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return nil, err | ||||||
|  | 		} | ||||||
|  | 		for i, re := range res { | ||||||
|  | 			if re == nil || re.Msg == nil { | ||||||
|  | 				continue | ||||||
|  | 			} | ||||||
|  | 			result = append(result, res[i]) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	return result, nil | ||||||
|  | } | ||||||
|  | |||||||
| @ -41,13 +41,12 @@ type Msg interface { | |||||||
| 	SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (int64, []*model.MsgInfoModel, error) | 	SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (int64, []*model.MsgInfoModel, error) | ||||||
| 	RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*model.UserCount, dateCount map[string]int64, err error) | 	RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*model.UserCount, dateCount map[string]int64, err error) | ||||||
| 	RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*model.GroupCount, dateCount map[string]int64, err error) | 	RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*model.GroupCount, dateCount map[string]int64, err error) | ||||||
| 	ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) |  | ||||||
| 
 | 
 | ||||||
| 	DeleteDoc(ctx context.Context, docID string) error | 	DeleteDoc(ctx context.Context, docID string) error | ||||||
| 	DeleteMsgByIndex(ctx context.Context, docID string, index []int) error | 	DeleteMsgByIndex(ctx context.Context, docID string, index []int) error | ||||||
| 	GetRandBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error) | 	GetRandBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error) | ||||||
| 
 | 
 | ||||||
| 	GetRandDocIDs(ctx context.Context, limit int) ([]string, error) |  | ||||||
| 
 |  | ||||||
| 	GetLastMessageSeqByTime(ctx context.Context, conversationID string, time int64) (int64, error) | 	GetLastMessageSeqByTime(ctx context.Context, conversationID string, time int64) (int64, error) | ||||||
|  | 
 | ||||||
|  | 	FindSeqs(ctx context.Context, conversationID string, seqs []int64) ([]*model.MsgInfoModel, error) | ||||||
| } | } | ||||||
|  | |||||||
| @ -143,3 +143,7 @@ func (*MsgDocModel) GenExceptionMessageBySeqs(seqs []int64) (exceptionMsg []*sdk | |||||||
| 	} | 	} | ||||||
| 	return exceptionMsg | 	return exceptionMsg | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | func (*MsgDocModel) GetMinSeq(index int) int64 { | ||||||
|  | 	return int64(index*singleGocMsgNum) + 1 | ||||||
|  | } | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user