mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-11-03 18:52:15 +08:00 
			
		
		
		
	* fix: to start im or chat, ZooKeeper must be started first. * fix: msg gateway start output err info Signed-off-by: Gordon <1432970085@qq.com> * fix: msg gateway start output err info Signed-off-by: Gordon <1432970085@qq.com> * chore: package path changes Signed-off-by: withchao <993506633@qq.com> * fix: go mod update Signed-off-by: Gordon <1432970085@qq.com> * fix: token update Signed-off-by: Gordon <1432970085@qq.com> * chore: package path changes Signed-off-by: withchao <993506633@qq.com> * chore: package path changes Signed-off-by: withchao <993506633@qq.com> * fix: token update Signed-off-by: Gordon <1432970085@qq.com> * fix: token update Signed-off-by: Gordon <1432970085@qq.com> * fix: token update Signed-off-by: Gordon <1432970085@qq.com> * fix: token update Signed-off-by: Gordon <1432970085@qq.com> * fix: token update Signed-off-by: Gordon <1432970085@qq.com> * fix: token update Signed-off-by: Gordon <1432970085@qq.com> * fix: get all userID Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: msggateway add online status call Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * refactor: log change Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * refactor: log change Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * chore: network mode change Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * cicd: robot automated Change Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> * cicd: robot automated Change Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> * cicd: robot automated Change Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> * feat: add api of get server time Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * feat: remove go work sum Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * cicd: robot automated Change Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> * fix: pull message add isRead field Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: check msg-transfer script Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: start don't kill old process Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * cicd: robot automated Change Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> * fix: check component Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: pull message set isRead only message come from single. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * cicd: robot automated Change Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> * fix: multiple gateway kick user each other. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: multiple gateway kick user each other. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: multiple gateway kick user each other. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: multiple gateway kick user each other. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: multiple gateway kick user each other. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: multiple gateway kick user each other. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * cicd: robot automated Change Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> * fix: multiple gateway kick user each other. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: add ex field to update group info. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * cicd: robot automated Change * cicd: robot automated Change * refactor: change project module name. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * refactor: change project module name. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * refactor: change project module name. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * cicd: robot automated Change * test: for pressure test. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * test: for pressure test. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * test: for pressure test. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: to start im or chat, ZooKeeper must be started first. --------- Signed-off-by: Gordon <1432970085@qq.com> Signed-off-by: withchao <993506633@qq.com> Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: withchao <993506633@qq.com> Co-authored-by: Xinwei Xiong <3293172751NSS@gmail.com> Co-authored-by: FGadvancer <FGadvancer@users.noreply.github.com>
		
			
				
	
	
		
			455 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			455 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// Copyright © 2023 OpenIM. All rights reserved.
 | 
						|
//
 | 
						|
// Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
// you may not use this file except in compliance with the License.
 | 
						|
// You may obtain a copy of the License at
 | 
						|
//
 | 
						|
//     http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
//
 | 
						|
// Unless required by applicable law or agreed to in writing, software
 | 
						|
// distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
// See the License for the specific language governing permissions and
 | 
						|
// limitations under the License.
 | 
						|
 | 
						|
package cache
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"errors"
 | 
						|
	"math/big"
 | 
						|
	"strings"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/dtm-labs/rockscache"
 | 
						|
	"github.com/redis/go-redis/v9"
 | 
						|
 | 
						|
	"github.com/OpenIMSDK/tools/utils"
 | 
						|
 | 
						|
	"github.com/openimsdk/open-im-server/v3/pkg/common/db/relation"
 | 
						|
	relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
 | 
						|
)
 | 
						|
 | 
						|
const (
 | 
						|
	conversationKey                          = "CONVERSATION:"
 | 
						|
	conversationIDsKey                       = "CONVERSATION_IDS:"
 | 
						|
	conversationIDsHashKey                   = "CONVERSATION_IDS_HASH:"
 | 
						|
	conversationHasReadSeqKey                = "CONVERSATION_HAS_READ_SEQ:"
 | 
						|
	recvMsgOptKey                            = "RECV_MSG_OPT:"
 | 
						|
	superGroupRecvMsgNotNotifyUserIDsKey     = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS:"
 | 
						|
	superGroupRecvMsgNotNotifyUserIDsHashKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS_HASH:"
 | 
						|
	conversationNotReceiveMessageUserIDsKey  = "CONVERSATION_NOT_RECEIVE_MESSAGE_USER_IDS:"
 | 
						|
 | 
						|
	conversationExpireTime = time.Second * 60 * 60 * 12
 | 
						|
)
 | 
						|
 | 
						|
// arg fn will exec when no data in msgCache.
 | 
						|
type ConversationCache interface {
 | 
						|
	metaCache
 | 
						|
	NewCache() ConversationCache
 | 
						|
	// get user's conversationIDs from msgCache
 | 
						|
	GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error)
 | 
						|
	DelConversationIDs(userIDs ...string) ConversationCache
 | 
						|
 | 
						|
	GetUserConversationIDsHash(ctx context.Context, ownerUserID string) (hash uint64, err error)
 | 
						|
	DelUserConversationIDsHash(ownerUserIDs ...string) ConversationCache
 | 
						|
 | 
						|
	// get one conversation from msgCache
 | 
						|
	GetConversation(ctx context.Context, ownerUserID, conversationID string) (*relationtb.ConversationModel, error)
 | 
						|
	DelConversations(ownerUserID string, conversationIDs ...string) ConversationCache
 | 
						|
	DelUsersConversation(conversationID string, ownerUserIDs ...string) ConversationCache
 | 
						|
	// get one conversation from msgCache
 | 
						|
	GetConversations(ctx context.Context, ownerUserID string,
 | 
						|
		conversationIDs []string) ([]*relationtb.ConversationModel, error)
 | 
						|
	// get one user's all conversations from msgCache
 | 
						|
	GetUserAllConversations(ctx context.Context, ownerUserID string) ([]*relationtb.ConversationModel, error)
 | 
						|
	// get user conversation recv msg from msgCache
 | 
						|
	GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error)
 | 
						|
	DelUserRecvMsgOpt(ownerUserID, conversationID string) ConversationCache
 | 
						|
	// get one super group recv msg but do not notification userID list
 | 
						|
	GetSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (userIDs []string, err error)
 | 
						|
	DelSuperGroupRecvMsgNotNotifyUserIDs(groupID string) ConversationCache
 | 
						|
	// get one super group recv msg but do not notification userID list hash
 | 
						|
	GetSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) (hash uint64, err error)
 | 
						|
	DelSuperGroupRecvMsgNotNotifyUserIDsHash(groupID string) ConversationCache
 | 
						|
 | 
						|
	GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error)
 | 
						|
	DelUserAllHasReadSeqs(ownerUserID string, conversationIDs ...string) ConversationCache
 | 
						|
 | 
						|
	GetConversationsByConversationID(ctx context.Context,
 | 
						|
		conversationIDs []string) ([]*relationtb.ConversationModel, error)
 | 
						|
	DelConversationByConversationID(conversationIDs ...string) ConversationCache
 | 
						|
	GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
 | 
						|
	DelConversationNotReceiveMessageUserIDs(conversationIDs ...string) ConversationCache
 | 
						|
}
 | 
						|
 | 
						|
func NewConversationRedis(
 | 
						|
	rdb redis.UniversalClient,
 | 
						|
	opts rockscache.Options,
 | 
						|
	db relationtb.ConversationModelInterface,
 | 
						|
) ConversationCache {
 | 
						|
	rcClient := rockscache.NewClient(rdb, opts)
 | 
						|
	return &ConversationRedisCache{
 | 
						|
		rcClient:       rcClient,
 | 
						|
		metaCache:      NewMetaCacheRedis(rcClient),
 | 
						|
		conversationDB: db,
 | 
						|
		expireTime:     conversationExpireTime,
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
type ConversationRedisCache struct {
 | 
						|
	metaCache
 | 
						|
	rcClient       *rockscache.Client
 | 
						|
	conversationDB relationtb.ConversationModelInterface
 | 
						|
	expireTime     time.Duration
 | 
						|
}
 | 
						|
 | 
						|
func NewNewConversationRedis(
 | 
						|
	rdb redis.UniversalClient,
 | 
						|
	conversationDB *relation.ConversationGorm,
 | 
						|
	options rockscache.Options,
 | 
						|
) ConversationCache {
 | 
						|
	rcClient := rockscache.NewClient(rdb, options)
 | 
						|
	return &ConversationRedisCache{
 | 
						|
		rcClient:       rcClient,
 | 
						|
		metaCache:      NewMetaCacheRedis(rcClient),
 | 
						|
		conversationDB: conversationDB,
 | 
						|
		expireTime:     conversationExpireTime,
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (c *ConversationRedisCache) NewCache() ConversationCache {
 | 
						|
	return &ConversationRedisCache{
 | 
						|
		rcClient:       c.rcClient,
 | 
						|
		metaCache:      NewMetaCacheRedis(c.rcClient, c.metaCache.GetPreDelKeys()...),
 | 
						|
		conversationDB: c.conversationDB,
 | 
						|
		expireTime:     c.expireTime,
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (c *ConversationRedisCache) getConversationKey(ownerUserID, conversationID string) string {
 | 
						|
	return conversationKey + ownerUserID + ":" + conversationID
 | 
						|
}
 | 
						|
 | 
						|
func (c *ConversationRedisCache) getConversationIDsKey(ownerUserID string) string {
 | 
						|
	return conversationIDsKey + ownerUserID
 | 
						|
}
 | 
						|
 | 
						|
func (c *ConversationRedisCache) getSuperGroupRecvNotNotifyUserIDsKey(groupID string) string {
 | 
						|
	return superGroupRecvMsgNotNotifyUserIDsKey + groupID
 | 
						|
}
 | 
						|
 | 
						|
func (c *ConversationRedisCache) getRecvMsgOptKey(ownerUserID, conversationID string) string {
 | 
						|
	return recvMsgOptKey + ownerUserID + ":" + conversationID
 | 
						|
}
 | 
						|
 | 
						|
func (c *ConversationRedisCache) getSuperGroupRecvNotNotifyUserIDsHashKey(groupID string) string {
 | 
						|
	return superGroupRecvMsgNotNotifyUserIDsHashKey + groupID
 | 
						|
}
 | 
						|
 | 
						|
func (c *ConversationRedisCache) getConversationHasReadSeqKey(ownerUserID, conversationID string) string {
 | 
						|
	return conversationHasReadSeqKey + ownerUserID + ":" + conversationID
 | 
						|
}
 | 
						|
 | 
						|
func (c *ConversationRedisCache) getConversationNotReceiveMessageUserIDsKey(conversationID string) string {
 | 
						|
	return conversationNotReceiveMessageUserIDsKey + conversationID
 | 
						|
}
 | 
						|
 | 
						|
func (c *ConversationRedisCache) GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) {
 | 
						|
	return getCache(
 | 
						|
		ctx,
 | 
						|
		c.rcClient,
 | 
						|
		c.getConversationIDsKey(ownerUserID),
 | 
						|
		c.expireTime,
 | 
						|
		func(ctx context.Context) ([]string, error) {
 | 
						|
			return c.conversationDB.FindUserIDAllConversationID(ctx, ownerUserID)
 | 
						|
		},
 | 
						|
	)
 | 
						|
}
 | 
						|
 | 
						|
func (c *ConversationRedisCache) DelConversationIDs(userIDs ...string) ConversationCache {
 | 
						|
	var keys []string
 | 
						|
	for _, userID := range userIDs {
 | 
						|
		keys = append(keys, c.getConversationIDsKey(userID))
 | 
						|
	}
 | 
						|
	cache := c.NewCache()
 | 
						|
	cache.AddKeys(keys...)
 | 
						|
	return cache
 | 
						|
}
 | 
						|
 | 
						|
func (c *ConversationRedisCache) getUserConversationIDsHashKey(ownerUserID string) string {
 | 
						|
	return conversationIDsHashKey + ownerUserID
 | 
						|
}
 | 
						|
 | 
						|
func (c *ConversationRedisCache) GetUserConversationIDsHash(
 | 
						|
	ctx context.Context,
 | 
						|
	ownerUserID string,
 | 
						|
) (hash uint64, err error) {
 | 
						|
	return getCache(
 | 
						|
		ctx,
 | 
						|
		c.rcClient,
 | 
						|
		c.getUserConversationIDsHashKey(ownerUserID),
 | 
						|
		c.expireTime,
 | 
						|
		func(ctx context.Context) (uint64, error) {
 | 
						|
			conversationIDs, err := c.GetUserConversationIDs(ctx, ownerUserID)
 | 
						|
			if err != nil {
 | 
						|
				return 0, err
 | 
						|
			}
 | 
						|
			utils.Sort(conversationIDs, true)
 | 
						|
			bi := big.NewInt(0)
 | 
						|
			bi.SetString(utils.Md5(strings.Join(conversationIDs, ";"))[0:8], 16)
 | 
						|
			return bi.Uint64(), nil
 | 
						|
		},
 | 
						|
	)
 | 
						|
}
 | 
						|
 | 
						|
func (c *ConversationRedisCache) DelUserConversationIDsHash(ownerUserIDs ...string) ConversationCache {
 | 
						|
	var keys []string
 | 
						|
	for _, ownerUserID := range ownerUserIDs {
 | 
						|
		keys = append(keys, c.getUserConversationIDsHashKey(ownerUserID))
 | 
						|
	}
 | 
						|
	cache := c.NewCache()
 | 
						|
	cache.AddKeys(keys...)
 | 
						|
	return cache
 | 
						|
}
 | 
						|
 | 
						|
func (c *ConversationRedisCache) GetConversation(
 | 
						|
	ctx context.Context,
 | 
						|
	ownerUserID, conversationID string,
 | 
						|
) (*relationtb.ConversationModel, error) {
 | 
						|
	return getCache(
 | 
						|
		ctx,
 | 
						|
		c.rcClient,
 | 
						|
		c.getConversationKey(ownerUserID, conversationID),
 | 
						|
		c.expireTime,
 | 
						|
		func(ctx context.Context) (*relationtb.ConversationModel, error) {
 | 
						|
			return c.conversationDB.Take(ctx, ownerUserID, conversationID)
 | 
						|
		},
 | 
						|
	)
 | 
						|
}
 | 
						|
 | 
						|
func (c *ConversationRedisCache) DelConversations(ownerUserID string, conversationIDs ...string) ConversationCache {
 | 
						|
	var keys []string
 | 
						|
	for _, conversationID := range conversationIDs {
 | 
						|
		keys = append(keys, c.getConversationKey(ownerUserID, conversationID))
 | 
						|
	}
 | 
						|
	cache := c.NewCache()
 | 
						|
	cache.AddKeys(keys...)
 | 
						|
	return cache
 | 
						|
}
 | 
						|
 | 
						|
func (c *ConversationRedisCache) getConversationIndex(
 | 
						|
	convsation *relationtb.ConversationModel,
 | 
						|
	keys []string,
 | 
						|
) (int, error) {
 | 
						|
	key := c.getConversationKey(convsation.OwnerUserID, convsation.ConversationID)
 | 
						|
	for _i, _key := range keys {
 | 
						|
		if _key == key {
 | 
						|
			return _i, nil
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return 0, errors.New("not found key:" + key + " in keys")
 | 
						|
}
 | 
						|
 | 
						|
func (c *ConversationRedisCache) GetConversations(
 | 
						|
	ctx context.Context,
 | 
						|
	ownerUserID string,
 | 
						|
	conversationIDs []string,
 | 
						|
) ([]*relationtb.ConversationModel, error) {
 | 
						|
	var keys []string
 | 
						|
	for _, conversarionID := range conversationIDs {
 | 
						|
		keys = append(keys, c.getConversationKey(ownerUserID, conversarionID))
 | 
						|
	}
 | 
						|
	return batchGetCache(
 | 
						|
		ctx,
 | 
						|
		c.rcClient,
 | 
						|
		keys,
 | 
						|
		c.expireTime,
 | 
						|
		c.getConversationIndex,
 | 
						|
		func(ctx context.Context) ([]*relationtb.ConversationModel, error) {
 | 
						|
			return c.conversationDB.Find(ctx, ownerUserID, conversationIDs)
 | 
						|
		},
 | 
						|
	)
 | 
						|
}
 | 
						|
 | 
						|
func (c *ConversationRedisCache) GetUserAllConversations(
 | 
						|
	ctx context.Context,
 | 
						|
	ownerUserID string,
 | 
						|
) ([]*relationtb.ConversationModel, error) {
 | 
						|
	conversationIDs, err := c.GetUserConversationIDs(ctx, ownerUserID)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	var keys []string
 | 
						|
	for _, conversarionID := range conversationIDs {
 | 
						|
		keys = append(keys, c.getConversationKey(ownerUserID, conversarionID))
 | 
						|
	}
 | 
						|
	return batchGetCache(
 | 
						|
		ctx,
 | 
						|
		c.rcClient,
 | 
						|
		keys,
 | 
						|
		c.expireTime,
 | 
						|
		c.getConversationIndex,
 | 
						|
		func(ctx context.Context) ([]*relationtb.ConversationModel, error) {
 | 
						|
			return c.conversationDB.FindUserIDAllConversations(ctx, ownerUserID)
 | 
						|
		},
 | 
						|
	)
 | 
						|
}
 | 
						|
 | 
						|
func (c *ConversationRedisCache) GetUserRecvMsgOpt(
 | 
						|
	ctx context.Context,
 | 
						|
	ownerUserID, conversationID string,
 | 
						|
) (opt int, err error) {
 | 
						|
	return getCache(
 | 
						|
		ctx,
 | 
						|
		c.rcClient,
 | 
						|
		c.getRecvMsgOptKey(ownerUserID, conversationID),
 | 
						|
		c.expireTime,
 | 
						|
		func(ctx context.Context) (opt int, err error) {
 | 
						|
			return c.conversationDB.GetUserRecvMsgOpt(ctx, ownerUserID, conversationID)
 | 
						|
		},
 | 
						|
	)
 | 
						|
}
 | 
						|
 | 
						|
func (c *ConversationRedisCache) GetSuperGroupRecvMsgNotNotifyUserIDs(
 | 
						|
	ctx context.Context,
 | 
						|
	groupID string,
 | 
						|
) (userIDs []string, err error) {
 | 
						|
	return getCache(
 | 
						|
		ctx,
 | 
						|
		c.rcClient,
 | 
						|
		c.getSuperGroupRecvNotNotifyUserIDsKey(groupID),
 | 
						|
		c.expireTime,
 | 
						|
		func(ctx context.Context) (userIDs []string, err error) {
 | 
						|
			return c.conversationDB.FindSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID)
 | 
						|
		},
 | 
						|
	)
 | 
						|
}
 | 
						|
 | 
						|
func (c *ConversationRedisCache) DelUsersConversation(conversationID string, ownerUserIDs ...string) ConversationCache {
 | 
						|
	var keys []string
 | 
						|
	for _, ownerUserID := range ownerUserIDs {
 | 
						|
		keys = append(keys, c.getConversationKey(ownerUserID, conversationID))
 | 
						|
	}
 | 
						|
	cache := c.NewCache()
 | 
						|
	cache.AddKeys(keys...)
 | 
						|
	return cache
 | 
						|
}
 | 
						|
 | 
						|
func (c *ConversationRedisCache) DelUserRecvMsgOpt(ownerUserID, conversationID string) ConversationCache {
 | 
						|
	cache := c.NewCache()
 | 
						|
	cache.AddKeys(c.getRecvMsgOptKey(ownerUserID, conversationID))
 | 
						|
	return cache
 | 
						|
}
 | 
						|
 | 
						|
func (c *ConversationRedisCache) DelSuperGroupRecvMsgNotNotifyUserIDs(groupID string) ConversationCache {
 | 
						|
	cache := c.NewCache()
 | 
						|
	cache.AddKeys(c.getSuperGroupRecvNotNotifyUserIDsKey(groupID))
 | 
						|
	return cache
 | 
						|
}
 | 
						|
 | 
						|
func (c *ConversationRedisCache) GetSuperGroupRecvMsgNotNotifyUserIDsHash(
 | 
						|
	ctx context.Context,
 | 
						|
	groupID string,
 | 
						|
) (hash uint64, err error) {
 | 
						|
	return getCache(
 | 
						|
		ctx,
 | 
						|
		c.rcClient,
 | 
						|
		c.getSuperGroupRecvNotNotifyUserIDsHashKey(groupID),
 | 
						|
		c.expireTime,
 | 
						|
		func(ctx context.Context) (hash uint64, err error) {
 | 
						|
			userIDs, err := c.GetSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID)
 | 
						|
			if err != nil {
 | 
						|
				return 0, err
 | 
						|
			}
 | 
						|
			utils.Sort(userIDs, true)
 | 
						|
			bi := big.NewInt(0)
 | 
						|
			bi.SetString(utils.Md5(strings.Join(userIDs, ";"))[0:8], 16)
 | 
						|
			return bi.Uint64(), nil
 | 
						|
		},
 | 
						|
	)
 | 
						|
}
 | 
						|
 | 
						|
func (c *ConversationRedisCache) DelSuperGroupRecvMsgNotNotifyUserIDsHash(groupID string) ConversationCache {
 | 
						|
	cache := c.NewCache()
 | 
						|
	cache.AddKeys(c.getSuperGroupRecvNotNotifyUserIDsHashKey(groupID))
 | 
						|
	return cache
 | 
						|
}
 | 
						|
 | 
						|
func (c *ConversationRedisCache) getUserAllHasReadSeqsIndex(
 | 
						|
	conversationID string,
 | 
						|
	conversationIDs []string,
 | 
						|
) (int, error) {
 | 
						|
	for _i, _conversationID := range conversationIDs {
 | 
						|
		if _conversationID == conversationID {
 | 
						|
			return _i, nil
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return 0, errors.New("not found key:" + conversationID + " in keys")
 | 
						|
}
 | 
						|
 | 
						|
func (c *ConversationRedisCache) GetUserAllHasReadSeqs(
 | 
						|
	ctx context.Context,
 | 
						|
	ownerUserID string,
 | 
						|
) (map[string]int64, error) {
 | 
						|
	conversationIDs, err := c.GetUserConversationIDs(ctx, ownerUserID)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	var keys []string
 | 
						|
	for _, conversarionID := range conversationIDs {
 | 
						|
		keys = append(keys, c.getConversationHasReadSeqKey(ownerUserID, conversarionID))
 | 
						|
	}
 | 
						|
	return batchGetCacheMap(
 | 
						|
		ctx,
 | 
						|
		c.rcClient,
 | 
						|
		keys,
 | 
						|
		conversationIDs,
 | 
						|
		c.expireTime,
 | 
						|
		c.getUserAllHasReadSeqsIndex,
 | 
						|
		func(ctx context.Context) (map[string]int64, error) {
 | 
						|
			return c.conversationDB.GetUserAllHasReadSeqs(ctx, ownerUserID)
 | 
						|
		},
 | 
						|
	)
 | 
						|
}
 | 
						|
 | 
						|
func (c *ConversationRedisCache) DelUserAllHasReadSeqs(ownerUserID string,
 | 
						|
	conversationIDs ...string) ConversationCache {
 | 
						|
	cache := c.NewCache()
 | 
						|
	for _, conversationID := range conversationIDs {
 | 
						|
		cache.AddKeys(c.getConversationHasReadSeqKey(ownerUserID, conversationID))
 | 
						|
	}
 | 
						|
	return cache
 | 
						|
}
 | 
						|
 | 
						|
func (c *ConversationRedisCache) GetConversationsByConversationID(
 | 
						|
	ctx context.Context,
 | 
						|
	conversationIDs []string,
 | 
						|
) ([]*relationtb.ConversationModel, error) {
 | 
						|
	panic("implement me")
 | 
						|
}
 | 
						|
 | 
						|
func (c *ConversationRedisCache) DelConversationByConversationID(conversationIDs ...string) ConversationCache {
 | 
						|
	panic("implement me")
 | 
						|
}
 | 
						|
 | 
						|
func (c *ConversationRedisCache) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) {
 | 
						|
	return getCache(
 | 
						|
		ctx,
 | 
						|
		c.rcClient,
 | 
						|
		c.getConversationNotReceiveMessageUserIDsKey(conversationID),
 | 
						|
		c.expireTime,
 | 
						|
		func(ctx context.Context) ([]string, error) {
 | 
						|
			return c.conversationDB.GetConversationNotReceiveMessageUserIDs(ctx, conversationID)
 | 
						|
		},
 | 
						|
	)
 | 
						|
}
 | 
						|
 | 
						|
func (c *ConversationRedisCache) DelConversationNotReceiveMessageUserIDs(conversationIDs ...string) ConversationCache {
 | 
						|
	cache := c.NewCache()
 | 
						|
	for _, conversationID := range conversationIDs {
 | 
						|
		cache.AddKeys(c.getConversationNotReceiveMessageUserIDsKey(conversationID))
 | 
						|
	}
 | 
						|
	return cache
 | 
						|
}
 |