open-im-server/pkg/common/db/cache/conversation.go
Gordon 186952ab46
refactor: change project module name. (#1038)
* fix: to start im or chat, ZooKeeper must be started first.

* fix: msg gateway start output err info

Signed-off-by: Gordon <1432970085@qq.com>

* fix: msg gateway start output err info

Signed-off-by: Gordon <1432970085@qq.com>

* chore: package path changes

Signed-off-by: withchao <993506633@qq.com>

* fix: go mod update

Signed-off-by: Gordon <1432970085@qq.com>

* fix: token update

Signed-off-by: Gordon <1432970085@qq.com>

* chore: package path changes

Signed-off-by: withchao <993506633@qq.com>

* chore: package path changes

Signed-off-by: withchao <993506633@qq.com>

* fix: token update

Signed-off-by: Gordon <1432970085@qq.com>

* fix: token update

Signed-off-by: Gordon <1432970085@qq.com>

* fix: token update

Signed-off-by: Gordon <1432970085@qq.com>

* fix: token update

Signed-off-by: Gordon <1432970085@qq.com>

* fix: token update

Signed-off-by: Gordon <1432970085@qq.com>

* fix: token update

Signed-off-by: Gordon <1432970085@qq.com>

* fix: get all userID

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: msggateway add online status call

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* refactor: log change

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* refactor: log change

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* chore: network mode change

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* cicd: robot automated Change

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* cicd: robot automated Change

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* cicd: robot automated Change

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* feat: add api of get server time

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* feat: remove go work sum

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* cicd: robot automated Change

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* fix: pull message add isRead field

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: check msg-transfer script

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: start don't kill old process

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* cicd: robot automated Change

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* fix: check component

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: pull message set isRead only message come from single.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* cicd: robot automated Change

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* fix: multiple gateway kick user each other.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: multiple gateway kick user each other.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: multiple gateway kick user each other.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: multiple gateway kick user each other.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: multiple gateway kick user each other.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: multiple gateway kick user each other.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* cicd: robot automated Change

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* fix: multiple gateway kick user each other.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: add ex field to update group info.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* cicd: robot automated Change

* cicd: robot automated Change

* refactor: change project module name.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* refactor: change project module name.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* refactor: change project module name.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* cicd: robot automated Change

---------

Signed-off-by: Gordon <1432970085@qq.com>
Signed-off-by: withchao <993506633@qq.com>
Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>
Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: withchao <993506633@qq.com>
Co-authored-by: Xinwei Xiong <3293172751NSS@gmail.com>
Co-authored-by: FGadvancer <FGadvancer@users.noreply.github.com>
2023-09-07 11:04:36 +00:00

435 lines
13 KiB
Go

// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cache
import (
"context"
"errors"
"math/big"
"strings"
"time"
"github.com/dtm-labs/rockscache"
"github.com/redis/go-redis/v9"
"github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/relation"
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
)
const (
conversationKey = "CONVERSATION:"
conversationIDsKey = "CONVERSATION_IDS:"
conversationIDsHashKey = "CONVERSATION_IDS_HASH:"
conversationHasReadSeqKey = "CONVERSATION_HAS_READ_SEQ:"
recvMsgOptKey = "RECV_MSG_OPT:"
superGroupRecvMsgNotNotifyUserIDsKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS:"
superGroupRecvMsgNotNotifyUserIDsHashKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS_HASH:"
conversationExpireTime = time.Second * 60 * 60 * 12
)
// arg fn will exec when no data in msgCache.
type ConversationCache interface {
metaCache
NewCache() ConversationCache
// get user's conversationIDs from msgCache
GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error)
DelConversationIDs(userIDs ...string) ConversationCache
GetUserConversationIDsHash(ctx context.Context, ownerUserID string) (hash uint64, err error)
DelUserConversationIDsHash(ownerUserIDs ...string) ConversationCache
// get one conversation from msgCache
GetConversation(ctx context.Context, ownerUserID, conversationID string) (*relationtb.ConversationModel, error)
DelConversations(ownerUserID string, conversationIDs ...string) ConversationCache
DelUsersConversation(conversationID string, ownerUserIDs ...string) ConversationCache
// get one conversation from msgCache
GetConversations(
ctx context.Context,
ownerUserID string,
conversationIDs []string,
) ([]*relationtb.ConversationModel, error)
// get one user's all conversations from msgCache
GetUserAllConversations(ctx context.Context, ownerUserID string) ([]*relationtb.ConversationModel, error)
// get user conversation recv msg from msgCache
GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error)
DelUserRecvMsgOpt(ownerUserID, conversationID string) ConversationCache
// get one super group recv msg but do not notification userID list
GetSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (userIDs []string, err error)
DelSuperGroupRecvMsgNotNotifyUserIDs(groupID string) ConversationCache
// get one super group recv msg but do not notification userID list hash
GetSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) (hash uint64, err error)
DelSuperGroupRecvMsgNotNotifyUserIDsHash(groupID string) ConversationCache
GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error)
DelUserAllHasReadSeqs(ownerUserID string, conversationIDs ...string) ConversationCache
GetConversationsByConversationID(
ctx context.Context,
conversationIDs []string,
) ([]*relationtb.ConversationModel, error)
DelConversationByConversationID(conversationIDs ...string) ConversationCache
}
func NewConversationRedis(
rdb redis.UniversalClient,
opts rockscache.Options,
db relationtb.ConversationModelInterface,
) ConversationCache {
rcClient := rockscache.NewClient(rdb, opts)
return &ConversationRedisCache{
rcClient: rcClient,
metaCache: NewMetaCacheRedis(rcClient),
conversationDB: db,
expireTime: conversationExpireTime,
}
}
type ConversationRedisCache struct {
metaCache
rcClient *rockscache.Client
conversationDB relationtb.ConversationModelInterface
expireTime time.Duration
}
func NewNewConversationRedis(
rdb redis.UniversalClient,
conversationDB *relation.ConversationGorm,
options rockscache.Options,
) ConversationCache {
rcClient := rockscache.NewClient(rdb, options)
return &ConversationRedisCache{
rcClient: rcClient,
metaCache: NewMetaCacheRedis(rcClient),
conversationDB: conversationDB,
expireTime: conversationExpireTime,
}
}
func (c *ConversationRedisCache) NewCache() ConversationCache {
return &ConversationRedisCache{
rcClient: c.rcClient,
metaCache: NewMetaCacheRedis(c.rcClient, c.metaCache.GetPreDelKeys()...),
conversationDB: c.conversationDB,
expireTime: c.expireTime,
}
}
func (c *ConversationRedisCache) getConversationKey(ownerUserID, conversationID string) string {
return conversationKey + ownerUserID + ":" + conversationID
}
func (c *ConversationRedisCache) getConversationIDsKey(ownerUserID string) string {
return conversationIDsKey + ownerUserID
}
func (c *ConversationRedisCache) getSuperGroupRecvNotNotifyUserIDsKey(groupID string) string {
return superGroupRecvMsgNotNotifyUserIDsKey + groupID
}
func (c *ConversationRedisCache) getRecvMsgOptKey(ownerUserID, conversationID string) string {
return recvMsgOptKey + ownerUserID + ":" + conversationID
}
func (c *ConversationRedisCache) getSuperGroupRecvNotNotifyUserIDsHashKey(groupID string) string {
return superGroupRecvMsgNotNotifyUserIDsHashKey + groupID
}
func (c *ConversationRedisCache) getConversationHasReadSeqKey(ownerUserID, conversationID string) string {
return conversationHasReadSeqKey + ownerUserID + ":" + conversationID
}
func (c *ConversationRedisCache) GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) {
return getCache(
ctx,
c.rcClient,
c.getConversationIDsKey(ownerUserID),
c.expireTime,
func(ctx context.Context) ([]string, error) {
return c.conversationDB.FindUserIDAllConversationID(ctx, ownerUserID)
},
)
}
func (c *ConversationRedisCache) DelConversationIDs(userIDs ...string) ConversationCache {
var keys []string
for _, userID := range userIDs {
keys = append(keys, c.getConversationIDsKey(userID))
}
cache := c.NewCache()
cache.AddKeys(keys...)
return cache
}
func (c *ConversationRedisCache) getUserConversationIDsHashKey(ownerUserID string) string {
return conversationIDsHashKey + ownerUserID
}
func (c *ConversationRedisCache) GetUserConversationIDsHash(
ctx context.Context,
ownerUserID string,
) (hash uint64, err error) {
return getCache(
ctx,
c.rcClient,
c.getUserConversationIDsHashKey(ownerUserID),
c.expireTime,
func(ctx context.Context) (uint64, error) {
conversationIDs, err := c.GetUserConversationIDs(ctx, ownerUserID)
if err != nil {
return 0, err
}
utils.Sort(conversationIDs, true)
bi := big.NewInt(0)
bi.SetString(utils.Md5(strings.Join(conversationIDs, ";"))[0:8], 16)
return bi.Uint64(), nil
},
)
}
func (c *ConversationRedisCache) DelUserConversationIDsHash(ownerUserIDs ...string) ConversationCache {
var keys []string
for _, ownerUserID := range ownerUserIDs {
keys = append(keys, c.getUserConversationIDsHashKey(ownerUserID))
}
cache := c.NewCache()
cache.AddKeys(keys...)
return cache
}
func (c *ConversationRedisCache) GetConversation(
ctx context.Context,
ownerUserID, conversationID string,
) (*relationtb.ConversationModel, error) {
return getCache(
ctx,
c.rcClient,
c.getConversationKey(ownerUserID, conversationID),
c.expireTime,
func(ctx context.Context) (*relationtb.ConversationModel, error) {
return c.conversationDB.Take(ctx, ownerUserID, conversationID)
},
)
}
func (c *ConversationRedisCache) DelConversations(ownerUserID string, conversationIDs ...string) ConversationCache {
var keys []string
for _, conversationID := range conversationIDs {
keys = append(keys, c.getConversationKey(ownerUserID, conversationID))
}
cache := c.NewCache()
cache.AddKeys(keys...)
return cache
}
func (c *ConversationRedisCache) getConversationIndex(
convsation *relationtb.ConversationModel,
keys []string,
) (int, error) {
key := c.getConversationKey(convsation.OwnerUserID, convsation.ConversationID)
for _i, _key := range keys {
if _key == key {
return _i, nil
}
}
return 0, errors.New("not found key:" + key + " in keys")
}
func (c *ConversationRedisCache) GetConversations(
ctx context.Context,
ownerUserID string,
conversationIDs []string,
) ([]*relationtb.ConversationModel, error) {
var keys []string
for _, conversarionID := range conversationIDs {
keys = append(keys, c.getConversationKey(ownerUserID, conversarionID))
}
return batchGetCache(
ctx,
c.rcClient,
keys,
c.expireTime,
c.getConversationIndex,
func(ctx context.Context) ([]*relationtb.ConversationModel, error) {
return c.conversationDB.Find(ctx, ownerUserID, conversationIDs)
},
)
}
func (c *ConversationRedisCache) GetUserAllConversations(
ctx context.Context,
ownerUserID string,
) ([]*relationtb.ConversationModel, error) {
conversationIDs, err := c.GetUserConversationIDs(ctx, ownerUserID)
if err != nil {
return nil, err
}
var keys []string
for _, conversarionID := range conversationIDs {
keys = append(keys, c.getConversationKey(ownerUserID, conversarionID))
}
return batchGetCache(
ctx,
c.rcClient,
keys,
c.expireTime,
c.getConversationIndex,
func(ctx context.Context) ([]*relationtb.ConversationModel, error) {
return c.conversationDB.FindUserIDAllConversations(ctx, ownerUserID)
},
)
}
func (c *ConversationRedisCache) GetUserRecvMsgOpt(
ctx context.Context,
ownerUserID, conversationID string,
) (opt int, err error) {
return getCache(
ctx,
c.rcClient,
c.getRecvMsgOptKey(ownerUserID, conversationID),
c.expireTime,
func(ctx context.Context) (opt int, err error) {
return c.conversationDB.GetUserRecvMsgOpt(ctx, ownerUserID, conversationID)
},
)
}
func (c *ConversationRedisCache) GetSuperGroupRecvMsgNotNotifyUserIDs(
ctx context.Context,
groupID string,
) (userIDs []string, err error) {
return getCache(
ctx,
c.rcClient,
c.getSuperGroupRecvNotNotifyUserIDsKey(groupID),
c.expireTime,
func(ctx context.Context) (userIDs []string, err error) {
return c.conversationDB.FindSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID)
},
)
}
func (c *ConversationRedisCache) DelUsersConversation(conversationID string, ownerUserIDs ...string) ConversationCache {
var keys []string
for _, ownerUserID := range ownerUserIDs {
keys = append(keys, c.getConversationKey(ownerUserID, conversationID))
}
cache := c.NewCache()
cache.AddKeys(keys...)
return cache
}
func (c *ConversationRedisCache) DelUserRecvMsgOpt(ownerUserID, conversationID string) ConversationCache {
cache := c.NewCache()
cache.AddKeys(c.getRecvMsgOptKey(ownerUserID, conversationID))
return cache
}
func (c *ConversationRedisCache) DelSuperGroupRecvMsgNotNotifyUserIDs(groupID string) ConversationCache {
cache := c.NewCache()
cache.AddKeys(c.getSuperGroupRecvNotNotifyUserIDsKey(groupID))
return cache
}
func (c *ConversationRedisCache) GetSuperGroupRecvMsgNotNotifyUserIDsHash(
ctx context.Context,
groupID string,
) (hash uint64, err error) {
return getCache(
ctx,
c.rcClient,
c.getSuperGroupRecvNotNotifyUserIDsHashKey(groupID),
c.expireTime,
func(ctx context.Context) (hash uint64, err error) {
userIDs, err := c.GetSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID)
if err != nil {
return 0, err
}
utils.Sort(userIDs, true)
bi := big.NewInt(0)
bi.SetString(utils.Md5(strings.Join(userIDs, ";"))[0:8], 16)
return bi.Uint64(), nil
},
)
}
func (c *ConversationRedisCache) DelSuperGroupRecvMsgNotNotifyUserIDsHash(groupID string) ConversationCache {
cache := c.NewCache()
cache.AddKeys(c.getSuperGroupRecvNotNotifyUserIDsHashKey(groupID))
return cache
}
func (c *ConversationRedisCache) getUserAllHasReadSeqsIndex(
conversationID string,
conversationIDs []string,
) (int, error) {
for _i, _conversationID := range conversationIDs {
if _conversationID == conversationID {
return _i, nil
}
}
return 0, errors.New("not found key:" + conversationID + " in keys")
}
func (c *ConversationRedisCache) GetUserAllHasReadSeqs(
ctx context.Context,
ownerUserID string,
) (map[string]int64, error) {
conversationIDs, err := c.GetUserConversationIDs(ctx, ownerUserID)
if err != nil {
return nil, err
}
var keys []string
for _, conversarionID := range conversationIDs {
keys = append(keys, c.getConversationHasReadSeqKey(ownerUserID, conversarionID))
}
return batchGetCacheMap(
ctx,
c.rcClient,
keys,
conversationIDs,
c.expireTime,
c.getUserAllHasReadSeqsIndex,
func(ctx context.Context) (map[string]int64, error) {
return c.conversationDB.GetUserAllHasReadSeqs(ctx, ownerUserID)
},
)
}
func (c *ConversationRedisCache) DelUserAllHasReadSeqs(
ownerUserID string,
conversationIDs ...string,
) ConversationCache {
cache := c.NewCache()
for _, conversationID := range conversationIDs {
cache.AddKeys(c.getConversationHasReadSeqKey(ownerUserID, conversationID))
}
return cache
}
func (c *ConversationRedisCache) GetConversationsByConversationID(
ctx context.Context,
conversationIDs []string,
) ([]*relationtb.ConversationModel, error) {
panic("implement me")
}
func (c *ConversationRedisCache) DelConversationByConversationID(conversationIDs ...string) ConversationCache {
panic("implement me")
}