mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-06 04:15:46 +08:00
Merge remote-tracking branch 'origin/errcode' into errcode
This commit is contained in:
commit
98e22cc699
2
go.mod
2
go.mod
@ -9,7 +9,7 @@ require (
|
||||
github.com/Shopify/sarama v1.32.0
|
||||
github.com/antonfisher/nested-logrus-formatter v1.3.1
|
||||
github.com/bwmarrin/snowflake v0.3.0
|
||||
github.com/dtm-labs/rockscache v0.0.11
|
||||
github.com/dtm-labs/rockscache v0.1.0
|
||||
github.com/gin-gonic/gin v1.8.2
|
||||
github.com/go-playground/validator/v10 v10.11.1
|
||||
github.com/go-redis/redis/v8 v8.11.5
|
||||
|
4
go.sum
4
go.sum
@ -424,8 +424,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
|
||||
github.com/dtm-labs/rockscache v0.0.11 h1:V6M+KH9fFRFDXgB+Uux1d6zwhZt1O34sgPwM0wjud9Y=
|
||||
github.com/dtm-labs/rockscache v0.0.11/go.mod h1:vJmJJmuBNxcio03abYk1QPLmmQo/Kg92jB+28QmLcgY=
|
||||
github.com/dtm-labs/rockscache v0.1.0 h1:tjJuruAUo/3wzZgQBkdQ+Pgz7GhmQ6qt0BrHHyVy3eY=
|
||||
github.com/dtm-labs/rockscache v0.1.0/go.mod h1:vJmJJmuBNxcio03abYk1QPLmmQo/Kg92jB+28QmLcgY=
|
||||
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
|
||||
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
|
||||
github.com/eapache/go-resiliency v1.2.0 h1:v7g92e/KSN71Rq7vSThKaWIq68fL4YHvWyiUKorFR1Q=
|
||||
|
@ -2,14 +2,16 @@ package msgtransfer
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/relation"
|
||||
relationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/tx"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/unrelation"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type MsgTransfer struct {
|
||||
@ -38,9 +40,9 @@ func StartTransfer(prometheusPort int) error {
|
||||
cacheModel := cache.NewCacheModel(rdb)
|
||||
msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase())
|
||||
extendMsgModel := unrelation.NewExtendMsgSetMongoDriver(mongo.GetDatabase())
|
||||
|
||||
extendMsgCache := cache.NewExtendMsgSetCacheRedis(rdb, extendMsgModel, cache.GetDefaultOpt())
|
||||
chatLogDatabase := controller.NewChatLogDatabase(relation.NewChatLogGorm(db))
|
||||
extendMsgDatabase := controller.NewExtendMsgDatabase(extendMsgModel)
|
||||
extendMsgDatabase := controller.NewExtendMsgDatabase(extendMsgModel, extendMsgCache, tx.NewMongo(mongo.GetClient()))
|
||||
msgDatabase := controller.NewMsgDatabase(msgDocModel, cacheModel)
|
||||
|
||||
msgTransfer := NewMsgTransfer(chatLogDatabase, extendMsgDatabase, msgDatabase)
|
||||
|
@ -8,6 +8,7 @@ package msgtransfer
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
|
||||
@ -65,7 +66,7 @@ func (pc *PersistentConsumerHandler) handleChatWs2Mysql(ctx context.Context, cMs
|
||||
}
|
||||
if tag {
|
||||
log.NewInfo(operationID, "msg_transfer msg persisting", string(msg))
|
||||
if err = pc.chatLogDatabase.CreateChatLog(msgFromMQ); err != nil {
|
||||
if err = pc.chatLogDatabase.CreateChatLog(&msgFromMQ); err != nil {
|
||||
log.NewError(operationID, "Message insert failed", "err", err.Error(), "msg", msgFromMQ.String())
|
||||
return
|
||||
}
|
||||
|
@ -2,6 +2,7 @@ package push
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
|
||||
kfk "github.com/OpenIMSDK/Open-IM-Server/pkg/common/kafka"
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
tableRelation "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/tx"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
|
||||
pbConversation "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/conversation"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient/check"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient/notification"
|
||||
@ -35,9 +36,10 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
conversationDB := relation.NewConversationGorm(db)
|
||||
pbConversation.RegisterConversationServer(server, &conversationServer{
|
||||
groupChecker: check.NewGroupChecker(client),
|
||||
ConversationDatabase: controller.NewConversationDatabase(relation.NewConversationGorm(db), cache.NewConversationRedis(rdb, cache.GetDefaultOpt()), tx.NewGorm(db)),
|
||||
ConversationDatabase: controller.NewConversationDatabase(conversationDB, cache.NewConversationRedis(rdb, cache.GetDefaultOpt(), conversationDB), tx.NewGorm(db)),
|
||||
})
|
||||
return nil
|
||||
}
|
||||
@ -54,7 +56,7 @@ func (c *conversationServer) GetConversation(ctx context.Context, req *pbConvers
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
return nil, nil
|
||||
return nil, errs.ErrRecordNotFound.Wrap("conversation not found")
|
||||
}
|
||||
|
||||
func (c *conversationServer) GetAllConversations(ctx context.Context, req *pbConversation.GetAllConversationsReq) (*pbConversation.GetAllConversationsResp, error) {
|
||||
@ -70,11 +72,11 @@ func (c *conversationServer) GetAllConversations(ctx context.Context, req *pbCon
|
||||
}
|
||||
|
||||
func (c *conversationServer) GetConversations(ctx context.Context, req *pbConversation.GetConversationsReq) (*pbConversation.GetConversationsResp, error) {
|
||||
resp := &pbConversation.GetConversationsResp{Conversations: []*pbConversation.Conversation{}}
|
||||
conversations, err := c.ConversationDatabase.FindConversations(ctx, req.OwnerUserID, req.ConversationIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp := &pbConversation.GetConversationsResp{Conversations: []*pbConversation.Conversation{}}
|
||||
if err := utils.CopyStructFields(&resp.Conversations, conversations); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -82,7 +84,6 @@ func (c *conversationServer) GetConversations(ctx context.Context, req *pbConver
|
||||
}
|
||||
|
||||
func (c *conversationServer) BatchSetConversations(ctx context.Context, req *pbConversation.BatchSetConversationsReq) (*pbConversation.BatchSetConversationsResp, error) {
|
||||
resp := &pbConversation.BatchSetConversationsResp{}
|
||||
var conversations []*tableRelation.ConversationModel
|
||||
if err := utils.CopyStructFields(&conversations, req.Conversations); err != nil {
|
||||
return nil, err
|
||||
@ -92,15 +93,30 @@ func (c *conversationServer) BatchSetConversations(ctx context.Context, req *pbC
|
||||
return nil, err
|
||||
}
|
||||
c.notify.ConversationChangeNotification(ctx, req.OwnerUserID)
|
||||
resp := &pbConversation.BatchSetConversationsResp{}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (c *conversationServer) SetConversation(ctx context.Context, req *pbConversation.SetConversationReq) (*pbConversation.SetConversationResp, error) {
|
||||
panic("implement me")
|
||||
var conversation tableRelation.ConversationModel
|
||||
if err := utils.CopyStructFields(&conversation, req.Conversation); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err := c.SetUserConversations(ctx, req.Conversation.OwnerUserID, []*tableRelation.ConversationModel{&conversation})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.notify.ConversationChangeNotification(ctx, req.Conversation.OwnerUserID)
|
||||
resp := &pbConversation.SetConversationResp{}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (c *conversationServer) SetRecvMsgOpt(ctx context.Context, req *pbConversation.SetRecvMsgOptReq) (*pbConversation.SetRecvMsgOptResp, error) {
|
||||
panic("implement me")
|
||||
conversation := tableRelation.ConversationModel{OwnerUserID: req.OwnerUserID, ConversationID: req.ConversationID, RecvMsgOpt: req.RecvMsgOpt}
|
||||
if err := c.SetUsersConversationFiledTx(ctx, []string{req.OwnerUserID}, &conversation, map[string]interface{}{"recv_msg_opt": req.RecvMsgOpt}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &pbConversation.SetRecvMsgOptResp{}, nil
|
||||
}
|
||||
|
||||
func (c *conversationServer) ModifyConversationField(ctx context.Context, req *pbConversation.ModifyConversationFieldReq) (*pbConversation.ModifyConversationFieldResp, error) {
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/relation"
|
||||
tablerelation "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
|
||||
@ -35,9 +36,15 @@ func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
|
||||
if err := db.AutoMigrate(&tablerelation.FriendModel{}, &tablerelation.FriendRequestModel{}, &tablerelation.BlackModel{}); err != nil {
|
||||
return err
|
||||
}
|
||||
rdb, err := cache.NewRedis()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
blackDB := relation.NewBlackGorm(db)
|
||||
friendDB := relation.NewFriendGorm(db)
|
||||
pbfriend.RegisterFriendServer(server, &friendServer{
|
||||
FriendDatabase: controller.NewFriendDatabase(relation.NewFriendGorm(db), relation.NewFriendRequestGorm(db), tx.NewGorm(db)),
|
||||
BlackDatabase: controller.NewBlackDatabase(relation.NewBlackGorm(db)),
|
||||
FriendDatabase: controller.NewFriendDatabase(friendDB, relation.NewFriendRequestGorm(db), cache.NewFriendCacheRedis(rdb, friendDB, cache.GetDefaultOpt()), tx.NewGorm(db)),
|
||||
BlackDatabase: controller.NewBlackDatabase(blackDB, cache.NewBlackCacheRedis(rdb, blackDB, cache.GetDefaultOpt())),
|
||||
notification: notification.NewCheck(client),
|
||||
userCheck: check.NewUserCheck(client),
|
||||
RegisterCenter: client,
|
||||
|
@ -1,9 +1,10 @@
|
||||
package group
|
||||
|
||||
import (
|
||||
pbGroup "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/group"
|
||||
sdkws "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
|
||||
"time"
|
||||
|
||||
pbGroup "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/group"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
|
||||
)
|
||||
|
||||
func UpdateGroupInfoMap(group *sdkws.GroupInfoForSet) map[string]any {
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/localcache"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/tx"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/unrelation"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/tokenverify"
|
||||
@ -58,8 +59,8 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
|
||||
cacheModel := cache.NewCacheModel(rdb)
|
||||
msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase())
|
||||
extendMsgModel := unrelation.NewExtendMsgSetMongoDriver(mongo.GetDatabase())
|
||||
|
||||
extendMsgDatabase := controller.NewExtendMsgDatabase(extendMsgModel)
|
||||
extendMsgCacheModel := cache.NewExtendMsgSetCacheRedis(rdb, extendMsgModel, cache.GetDefaultOpt())
|
||||
extendMsgDatabase := controller.NewExtendMsgDatabase(extendMsgModel, extendMsgCacheModel, tx.NewMongo(mongo.GetClient()))
|
||||
msgDatabase := controller.NewMsgDatabase(msgDocModel, cacheModel)
|
||||
|
||||
s := &msgServer{
|
||||
|
39
pkg/common/db/cache/black.go
vendored
39
pkg/common/db/cache/black.go
vendored
@ -2,10 +2,11 @@ package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/relation"
|
||||
"time"
|
||||
|
||||
relationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
|
||||
"github.com/dtm-labs/rockscache"
|
||||
"github.com/go-redis/redis/v8"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -16,21 +17,35 @@ const (
|
||||
// args fn will exec when no data in cache
|
||||
type BlackCache interface {
|
||||
//get blackIDs from cache
|
||||
GetBlackIDs(ctx context.Context, userID string, fn func(ctx context.Context, userID string) ([]string, error)) (blackIDs []string, err error)
|
||||
metaCache
|
||||
NewCache() BlackCache
|
||||
GetBlackIDs(ctx context.Context, userID string) (blackIDs []string, err error)
|
||||
//del user's blackIDs cache, exec when a user's black list changed
|
||||
DelBlackIDs(ctx context.Context, userID string) (err error)
|
||||
DelBlackIDs(ctx context.Context, userID string) BlackCache
|
||||
}
|
||||
|
||||
type BlackCacheRedis struct {
|
||||
metaCache
|
||||
expireTime time.Duration
|
||||
rcClient *rockscache.Client
|
||||
black *relation.BlackGorm
|
||||
blackDB relationTb.BlackModelInterface
|
||||
}
|
||||
|
||||
func NewBlackCacheRedis(rdb redis.UniversalClient, blackDB BlackCache, options rockscache.Options) *BlackCacheRedis {
|
||||
func NewBlackCacheRedis(rdb redis.UniversalClient, blackDB relationTb.BlackModelInterface, options rockscache.Options) BlackCache {
|
||||
rcClient := rockscache.NewClient(rdb, options)
|
||||
return &BlackCacheRedis{
|
||||
expireTime: blackExpireTime,
|
||||
rcClient: rockscache.NewClient(rdb, options),
|
||||
rcClient: rcClient,
|
||||
metaCache: NewMetaCacheRedis(rcClient),
|
||||
blackDB: blackDB,
|
||||
}
|
||||
}
|
||||
|
||||
func (b *BlackCacheRedis) NewCache() BlackCache {
|
||||
return &BlackCacheRedis{
|
||||
expireTime: b.expireTime,
|
||||
rcClient: b.rcClient,
|
||||
blackDB: b.blackDB,
|
||||
}
|
||||
}
|
||||
|
||||
@ -39,11 +54,13 @@ func (b *BlackCacheRedis) getBlackIDsKey(ownerUserID string) string {
|
||||
}
|
||||
|
||||
func (b *BlackCacheRedis) GetBlackIDs(ctx context.Context, userID string) (blackIDs []string, err error) {
|
||||
return GetCache(ctx, b.rcClient, b.getBlackIDsKey(userID), b.expireTime, func(ctx context.Context) ([]string, error) {
|
||||
return b.black.FindBlackUserIDs(ctx, userID)
|
||||
return getCache(ctx, b.rcClient, b.getBlackIDsKey(userID), b.expireTime, func(ctx context.Context) ([]string, error) {
|
||||
return b.blackDB.FindBlackUserIDs(ctx, userID)
|
||||
})
|
||||
}
|
||||
|
||||
func (b *BlackCacheRedis) DelBlackIDs(ctx context.Context, userID string) (err error) {
|
||||
return b.rcClient.TagAsDeleted(b.getBlackIDsKey(userID))
|
||||
func (b *BlackCacheRedis) DelBlackIDs(ctx context.Context, userID string) BlackCache {
|
||||
cache := b.NewCache()
|
||||
cache.AddKeys(userID)
|
||||
return cache
|
||||
}
|
||||
|
392
pkg/common/db/cache/conversation.go
vendored
392
pkg/common/db/cache/conversation.go
vendored
@ -2,282 +2,216 @@ package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"math/big"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/relation"
|
||||
relationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
|
||||
"github.com/dtm-labs/rockscache"
|
||||
"github.com/go-redis/redis/v8"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
conversationKey = "CONVERSATION:"
|
||||
conversationIDsKey = "CONVERSATION_IDS:"
|
||||
recvMsgOptKey = "RECV_MSG_OPT:"
|
||||
superGroupRecvMsgNotNotifyUserIDsKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS:"
|
||||
conversationExpireTime = time.Second * 60 * 60 * 12
|
||||
)
|
||||
conversationKey = "CONVERSATION:"
|
||||
conversationIDsKey = "CONVERSATION_IDS:"
|
||||
recvMsgOptKey = "RECV_MSG_OPT:"
|
||||
superGroupRecvMsgNotNotifyUserIDsKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS:"
|
||||
superGroupRecvMsgNotNotifyUserIDsHashKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS_HASH:"
|
||||
|
||||
type FuncDB func() (string, error)
|
||||
conversationExpireTime = time.Second * 60 * 60 * 12
|
||||
)
|
||||
|
||||
// arg fn will exec when no data in cache
|
||||
type ConversationCache interface {
|
||||
metaCache
|
||||
NewCache() ConversationCache
|
||||
// get user's conversationIDs from cache
|
||||
GetUserConversationIDs(ctx context.Context, userID string, fn FuncDB) ([]string, error)
|
||||
// del user's conversationIDs from cache, call when a user add or reduce a conversation
|
||||
DelUserConversationIDs(ctx context.Context, userID string) error
|
||||
DelUsersConversationIDs(ctx context.Context, userIDList []string) error
|
||||
GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error)
|
||||
DelConversationIDs(userIDs []string) ConversationCache
|
||||
// get one conversation from cache
|
||||
GetConversation(ctx context.Context, ownerUserID, conversationID string, fn FuncDB) (*relationTb.ConversationModel, error)
|
||||
GetConversation(ctx context.Context, ownerUserID, conversationID string) (*relationTb.ConversationModel, error)
|
||||
DelConvsersations(ownerUserID string, conversationIDs []string) ConversationCache
|
||||
DelUsersConversation(ownerUserIDs []string, conversationID string) ConversationCache
|
||||
// get one conversation from cache
|
||||
GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string, fn FuncDB) ([]*relationTb.ConversationModel, error)
|
||||
GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationTb.ConversationModel, error)
|
||||
// get one user's all conversations from cache
|
||||
GetUserAllConversations(ctx context.Context, ownerUserID string, fn FuncDB) ([]*relationTb.ConversationModel, error)
|
||||
// del one conversation from cache, call when one user's conversation Info changed
|
||||
DelConversation(ctx context.Context, ownerUserID, conversationID string) error
|
||||
DelUserConversations(ctx context.Context, ownerUserID string, conversationIDList []string) error
|
||||
DelUsersConversation(ctx context.Context, ownerUserIDList []string, conversationID string) error
|
||||
GetUserAllConversations(ctx context.Context, ownerUserID string) ([]*relationTb.ConversationModel, error)
|
||||
// get user conversation recv msg from cache
|
||||
GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string, fn func(ctx context.Context, ownerUserID, conversationID string) (opt int, err error)) (opt int, err error)
|
||||
// del user recv msg opt from cache, call when user's conversation recv msg opt changed
|
||||
DelUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) error
|
||||
GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error)
|
||||
DelUserRecvMsgOpt(ownerUserID, conversationID string) ConversationCache
|
||||
// get one super group recv msg but do not notification userID list
|
||||
GetSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string, fn func(ctx context.Context, groupID string) (userIDs []string, err error)) (userIDs []string, err error)
|
||||
// del one super group recv msg but do not notification userID list, call it when this list changed
|
||||
DelSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) error
|
||||
//GetSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) (hash uint32, err error)
|
||||
//DelSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string)
|
||||
GetSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (userIDs []string, err error)
|
||||
// get one super group recv msg but do not notification userID list hash
|
||||
GetSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) (hash uint64, err error)
|
||||
}
|
||||
|
||||
func NewConversationRedis(rdb redis.UniversalClient, opts rockscache.Options) ConversationCache {
|
||||
return &ConversationRedis{rcClient: rockscache.NewClient(rdb, opts)}
|
||||
func NewConversationRedis(rdb redis.UniversalClient, opts rockscache.Options, db relationTb.ConversationModelInterface) ConversationCache {
|
||||
rcClient := rockscache.NewClient(rdb, opts)
|
||||
return &ConversationRedisCache{rcClient: rcClient, metaCache: NewMetaCacheRedis(rcClient), conversationDB: db, expireTime: conversationExpireTime}
|
||||
}
|
||||
|
||||
type ConversationRedis struct {
|
||||
rcClient *rockscache.Client
|
||||
expireTime time.Duration
|
||||
type ConversationRedisCache struct {
|
||||
metaCache
|
||||
rcClient *rockscache.Client
|
||||
conversationDB relationTb.ConversationModelInterface
|
||||
expireTime time.Duration
|
||||
}
|
||||
|
||||
func (c *ConversationRedis) GetUserConversationIDs(ctx context.Context, userID string, fn FuncDB) ([]string, error) {
|
||||
return nil, nil
|
||||
func NewNewConversationRedis(rdb redis.UniversalClient, conversationDB *relation.ConversationGorm, options rockscache.Options) ConversationCache {
|
||||
rcClient := rockscache.NewClient(rdb, options)
|
||||
return &ConversationRedisCache{rcClient: rcClient, metaCache: NewMetaCacheRedis(rcClient), conversationDB: conversationDB, expireTime: conversationExpireTime}
|
||||
}
|
||||
|
||||
func (c *ConversationRedis) GetConversation(ctx context.Context, ownerUserID, conversationID string, fn FuncDB) (*relationTb.ConversationModel, error) {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
func (c *ConversationRedisCache) NewCache() ConversationCache {
|
||||
return &ConversationRedisCache{rcClient: c.rcClient, metaCache: c.metaCache, conversationDB: c.conversationDB, expireTime: c.expireTime}
|
||||
}
|
||||
|
||||
func (c *ConversationRedis) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string, fn FuncDB) ([]*relationTb.ConversationModel, error) {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (c *ConversationRedis) GetUserAllConversations(ctx context.Context, ownerUserID string, fn FuncDB) ([]*relationTb.ConversationModel, error) {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (c *ConversationRedis) DelUserConversations(ctx context.Context, ownerUserID string, conversationIDList []string) error {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (c *ConversationRedis) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string, fn func(ctx context.Context, ownerUserID string, conversationID string) (opt int, err error)) (opt int, err error) {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (c *ConversationRedis) GetSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string, fn func(ctx context.Context, groupID string) (userIDs []string, err error)) (userIDs []string, err error) {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (c *ConversationRedis) DelUsersConversationIDs(ctx context.Context, userIDList []string) error {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (c *ConversationRedis) DelUsersConversation(ctx context.Context, ownerUserIDList []string, conversationID string) error {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func NewNewConversationRedis(rdb redis.UniversalClient, conversationDB *relation.ConversationGorm, options rockscache.Options) *ConversationRedis {
|
||||
return &ConversationRedis{rcClient: rockscache.NewClient(rdb, options)}
|
||||
}
|
||||
|
||||
func (c *ConversationRedis) getConversationKey(ownerUserID, conversationID string) string {
|
||||
func (c *ConversationRedisCache) getConversationKey(ownerUserID, conversationID string) string {
|
||||
return conversationKey + ownerUserID + ":" + conversationID
|
||||
}
|
||||
|
||||
func (c *ConversationRedis) getConversationIDsKey(ownerUserID string) string {
|
||||
func (c *ConversationRedisCache) getConversationIDsKey(ownerUserID string) string {
|
||||
return conversationIDsKey + ownerUserID
|
||||
}
|
||||
|
||||
func (c *ConversationRedis) getRecvMsgOptKey(ownerUserID, conversationID string) string {
|
||||
return recvMsgOptKey + ownerUserID + ":" + conversationID
|
||||
}
|
||||
|
||||
func (c *ConversationRedis) getSuperGroupRecvNotNotifyUserIDsKey(groupID string) string {
|
||||
func (c *ConversationRedisCache) getSuperGroupRecvNotNotifyUserIDsKey(groupID string) string {
|
||||
return superGroupRecvMsgNotNotifyUserIDsKey + groupID
|
||||
}
|
||||
|
||||
//func (c *ConversationRedis) GetUserConversationIDs(ctx context.Context, ownerUserID string) (conversationIDs []string, err error) {
|
||||
// //getConversationIDs := func() (string, error) {
|
||||
// // conversationIDs, err := relation.GetConversationIDsByUserID(ownerUserID)
|
||||
// // if err != nil {
|
||||
// // return "", err
|
||||
// // }
|
||||
// // bytes, err := json.Marshal(conversationIDs)
|
||||
// // if err != nil {
|
||||
// // return "", utils.Wrap(err, "")
|
||||
// // }
|
||||
// // return string(bytes), nil
|
||||
// //}
|
||||
// //defer func() {
|
||||
// // mcontext.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversationIDs", conversationIDs)
|
||||
// //}()
|
||||
// //conversationIDsStr, err := c.rcClient.Fetch(c.getConversationIDsKey(ownerUserID), time.Second*30*60, getConversationIDs)
|
||||
// //err = json.Unmarshal([]byte(conversationIDsStr), &conversationIDs)
|
||||
// //if err != nil {
|
||||
// // return nil, utils.Wrap(err, "")
|
||||
// //}
|
||||
// //return conversationIDs, nil
|
||||
// return GetCache(ctx, c.rcClient, c.getConversationIDsKey(ownerUserID), conversationExpireTime, func(ctx context.Context) ([]string, error) {
|
||||
// panic("implement me")
|
||||
// })
|
||||
//}
|
||||
func (c *ConversationRedisCache) getRecvMsgOptKey(ownerUserID, conversationID string) string {
|
||||
return recvMsgOptKey + ownerUserID + ":" + conversationID
|
||||
}
|
||||
|
||||
func (c *ConversationRedis) GetUserConversationIDs1(ctx context.Context, ownerUserID string) (conversationIDs []string, err error) {
|
||||
//getConversationIDs := func() (string, error) {
|
||||
// conversationIDs, err := relation.GetConversationIDsByUserID(ownerUserID)
|
||||
// if err != nil {
|
||||
// return "", err
|
||||
// }
|
||||
// bytes, err := json.Marshal(conversationIDs)
|
||||
// if err != nil {
|
||||
// return "", utils.Wrap(err, "")
|
||||
// }
|
||||
// return string(bytes), nil
|
||||
//}
|
||||
//defer func() {
|
||||
// mcontext.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversationIDs", conversationIDs)
|
||||
//}()
|
||||
//conversationIDsStr, err := c.rcClient.Fetch(c.getConversationIDsKey(ownerUserID), time.Second*30*60, getConversationIDs)
|
||||
//err = json.Unmarshal([]byte(conversationIDsStr), &conversationIDs)
|
||||
//if err != nil {
|
||||
// return nil, utils.Wrap(err, "")
|
||||
//}
|
||||
//return conversationIDs, nil
|
||||
//return GetCache1[[]string](c.rcClient, c.getConversationIDsKey(ownerUserID), conversationExpireTime, fn)
|
||||
func (c *ConversationRedisCache) getSuperGroupRecvNotNotifyUserIDsHashKey(groupID string) string {
|
||||
return superGroupRecvMsgNotNotifyUserIDsHashKey + groupID
|
||||
}
|
||||
|
||||
return GetCache(ctx, c.rcClient, c.getConversationIDsKey(ownerUserID), conversationExpireTime, func(ctx context.Context) ([]string, error) {
|
||||
panic("")
|
||||
func (c *ConversationRedisCache) GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) {
|
||||
return getCache(ctx, c.rcClient, c.getConversationIDsKey(ownerUserID), c.expireTime, func(ctx context.Context) ([]string, error) {
|
||||
return c.conversationDB.FindUserIDAllConversationID(ctx, ownerUserID)
|
||||
})
|
||||
}
|
||||
|
||||
//func GetCache1[T any](rcClient *rockscache.Client, key string, expire time.Duration, fn func() (any, error)) (T, error) {
|
||||
// v, err := rcClient.Fetch(key, expire, func() (string, error) {
|
||||
// v, err := fn()
|
||||
// if err != nil {
|
||||
// return "", err
|
||||
// }
|
||||
// bs, err := json.Marshal(v)
|
||||
// if err != nil {
|
||||
// return "", utils.Wrap(err, "")
|
||||
// }
|
||||
// return string(bs), nil
|
||||
// })
|
||||
// var t T
|
||||
// if err != nil {
|
||||
// return t, err
|
||||
// }
|
||||
// err = json.Unmarshal([]byte(v), &t)
|
||||
// if err != nil {
|
||||
// return t, utils.Wrap(err, "")
|
||||
// }
|
||||
// return t, nil
|
||||
//}
|
||||
|
||||
func (c *ConversationRedis) DelUserConversationIDs(ctx context.Context, ownerUserID string) (err error) {
|
||||
return utils.Wrap(c.rcClient.TagAsDeleted(c.getConversationIDsKey(ownerUserID)), "DelUserConversationIDs err")
|
||||
func (c *ConversationRedisCache) DelConversationIDs(userIDs []string) ConversationCache {
|
||||
var keys []string
|
||||
for _, userID := range userIDs {
|
||||
keys = append(keys, c.getConversationIDsKey(userID))
|
||||
}
|
||||
cache := c.NewCache()
|
||||
cache.AddKeys(keys...)
|
||||
return cache
|
||||
}
|
||||
|
||||
//func (c *ConversationRedis) GetConversation(ctx context.Context, ownerUserID, conversationID string) (conversation *relationTb.ConversationModel, err error) {
|
||||
// return GetCache(ctx, c.rcClient, c.getConversationKey(ownerUserID, conversationID), c.expireTime, func(ctx context.Context) (*relationTb.ConversationModel, error) {
|
||||
// panic("implement me")
|
||||
// })
|
||||
//}
|
||||
|
||||
func (c *ConversationRedis) DelConversation(ctx context.Context, ownerUserID, conversationID string) (err error) {
|
||||
return utils.Wrap(c.rcClient.TagAsDeleted(c.getConversationKey(ownerUserID, conversationID)), "DelConversation err")
|
||||
func (c *ConversationRedisCache) GetConversation(ctx context.Context, ownerUserID, conversationID string) (*relationTb.ConversationModel, error) {
|
||||
return getCache(ctx, c.rcClient, c.getConversationKey(ownerUserID, conversationID), c.expireTime, func(ctx context.Context) (*relationTb.ConversationModel, error) {
|
||||
return c.conversationDB.Take(ctx, ownerUserID, conversationID)
|
||||
})
|
||||
}
|
||||
|
||||
//func (c *ConversationRedis) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []relationTb.ConversationModel, err error) {
|
||||
// defer func() {
|
||||
// mcontext.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversationIDs", conversationIDs, "conversations", conversations)
|
||||
// }()
|
||||
// for _, conversationID := range conversationIDs {
|
||||
// conversation, err := c.GetConversation(ctx, ownerUserID, conversationID)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// conversations = append(conversations, *conversation)
|
||||
// }
|
||||
// return conversations, nil
|
||||
//}
|
||||
|
||||
//func (c *ConversationRedis) GetUserAllConversations(ctx context.Context, ownerUserID string) (conversations []relationTb.ConversationModel, err error) {
|
||||
// defer func() {
|
||||
// mcontext.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversations", conversations)
|
||||
// }()
|
||||
// IDs, err := c.GetUserConversationIDs(ctx, ownerUserID)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// var conversationIDs []relationTb.ConversationModel
|
||||
// for _, conversationID := range IDs {
|
||||
// conversation, err := c.GetConversation(ctx, ownerUserID, conversationID)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// conversationIDs = append(conversationIDs, *conversation)
|
||||
// }
|
||||
// return conversationIDs, nil
|
||||
//}
|
||||
|
||||
//func (c *ConversationRedis) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error) {
|
||||
// //getConversation := func() (string, error) {
|
||||
// // conversation, err := relation.GetConversation(ownerUserID, conversationID)
|
||||
// // if err != nil {
|
||||
// // return "", err
|
||||
// // }
|
||||
// // return strconv.Itoa(int(conversation.RecvMsgOpt)), nil
|
||||
// //}
|
||||
// //defer func() {
|
||||
// // mcontext.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversationID", conversationID, "opt", opt)
|
||||
// //}()
|
||||
// //optStr, err := c.rcClient.Fetch(c.getConversationKey(ownerUserID, conversationID), c.expireTime, getConversation)
|
||||
// //if err != nil {
|
||||
// // return 0, err
|
||||
// //}
|
||||
// //return strconv.Atoi(optStr)
|
||||
// // panic("implement me")
|
||||
// return GetCache(ctx, c.rcClient, c.getConversationKey(ownerUserID, conversationID), c.expireTime, func(ctx context.Context) (int, error) {
|
||||
// panic("implement me")
|
||||
// })
|
||||
//}
|
||||
|
||||
func (c *ConversationRedis) DelUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) error {
|
||||
return utils.Wrap(c.rcClient.TagAsDeleted(c.getConversationKey(ownerUserID, conversationID)), "DelUserRecvMsgOpt failed")
|
||||
func (c *ConversationRedisCache) DelConvsersations(ownerUserID string, convsersationIDs []string) ConversationCache {
|
||||
var keys []string
|
||||
for _, conversationID := range convsersationIDs {
|
||||
keys = append(keys, c.getConversationKey(ownerUserID, conversationID))
|
||||
}
|
||||
cache := c.NewCache()
|
||||
cache.AddKeys(keys...)
|
||||
return cache
|
||||
}
|
||||
|
||||
func (c *ConversationRedis) DelSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (err error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (c *ConversationRedis) GetSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) (hash uint32, err error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (c *ConversationRedis) DelSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) {
|
||||
func (c *ConversationRedisCache) GetConversationIndex(convsation *relationTb.ConversationModel, keys []string) (int, error) {
|
||||
key := c.getConversationKey(convsation.OwnerUserID, convsation.ConversationID)
|
||||
for _i, _key := range keys {
|
||||
if _key == key {
|
||||
return _i, nil
|
||||
}
|
||||
}
|
||||
return 0, errors.New("not found key:" + key + " in keys")
|
||||
}
|
||||
|
||||
func (c *ConversationRedisCache) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationTb.ConversationModel, error) {
|
||||
var keys []string
|
||||
for _, conversarionID := range conversationIDs {
|
||||
keys = append(keys, c.getConversationKey(ownerUserID, conversarionID))
|
||||
}
|
||||
return batchGetCache(ctx, c.rcClient, keys, c.expireTime, c.GetConversationIndex, func(ctx context.Context) ([]*relationTb.ConversationModel, error) {
|
||||
return c.conversationDB.Find(ctx, ownerUserID, conversationIDs)
|
||||
})
|
||||
}
|
||||
|
||||
func (c *ConversationRedisCache) GetUserAllConversations(ctx context.Context, ownerUserID string) ([]*relationTb.ConversationModel, error) {
|
||||
conversationIDs, err := c.GetUserConversationIDs(ctx, ownerUserID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var keys []string
|
||||
for _, conversarionID := range conversationIDs {
|
||||
keys = append(keys, c.getConversationKey(ownerUserID, conversarionID))
|
||||
}
|
||||
return batchGetCache(ctx, c.rcClient, keys, c.expireTime, c.GetConversationIndex, func(ctx context.Context) ([]*relationTb.ConversationModel, error) {
|
||||
return c.conversationDB.FindUserIDAllConversations(ctx, ownerUserID)
|
||||
})
|
||||
}
|
||||
|
||||
func (c *ConversationRedisCache) DelUserConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ConversationCache {
|
||||
var keys []string
|
||||
for _, conversationID := range conversationIDs {
|
||||
keys = append(keys, c.getConversationKey(ownerUserID, conversationID))
|
||||
}
|
||||
cache := c.NewCache()
|
||||
cache.AddKeys(keys...)
|
||||
return cache
|
||||
}
|
||||
|
||||
func (c *ConversationRedisCache) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error) {
|
||||
return getCache(ctx, c.rcClient, c.getRecvMsgOptKey(ownerUserID, conversationID), c.expireTime, func(ctx context.Context) (opt int, err error) {
|
||||
return c.conversationDB.GetUserRecvMsgOpt(ctx, ownerUserID, conversationID)
|
||||
})
|
||||
}
|
||||
|
||||
func (c *ConversationRedisCache) GetSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (userIDs []string, err error) {
|
||||
return getCache(ctx, c.rcClient, c.getSuperGroupRecvNotNotifyUserIDsKey(groupID), c.expireTime, func(ctx context.Context) (userIDs []string, err error) {
|
||||
return c.conversationDB.FindSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID)
|
||||
})
|
||||
}
|
||||
|
||||
func (c *ConversationRedisCache) DelUsersConversation(ownerUserIDs []string, conversationID string) ConversationCache {
|
||||
var keys []string
|
||||
for _, ownerUserID := range ownerUserIDs {
|
||||
keys = append(keys, c.getConversationKey(ownerUserID, conversationID))
|
||||
}
|
||||
cache := c.NewCache()
|
||||
cache.AddKeys(keys...)
|
||||
return cache
|
||||
}
|
||||
|
||||
func (c *ConversationRedisCache) DelUserRecvMsgOpt(ownerUserID, conversationID string) ConversationCache {
|
||||
cache := c.NewCache()
|
||||
cache.AddKeys(c.getRecvMsgOptKey(ownerUserID, conversationID))
|
||||
return cache
|
||||
}
|
||||
|
||||
func (c *ConversationRedisCache) DelSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ConversationCache {
|
||||
cache := c.NewCache()
|
||||
cache.AddKeys(c.getSuperGroupRecvNotNotifyUserIDsKey(groupID))
|
||||
return cache
|
||||
}
|
||||
|
||||
func (c *ConversationRedisCache) GetSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) (hash uint64, err error) {
|
||||
return getCache(ctx, c.rcClient, c.getSuperGroupRecvNotNotifyUserIDsHashKey(groupID), c.expireTime, func(ctx context.Context) (hash uint64, err error) {
|
||||
userIDs, err := c.GetSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
utils.Sort(userIDs, true)
|
||||
bi := big.NewInt(0)
|
||||
bi.SetString(utils.Md5(strings.Join(userIDs, ";"))[0:8], 16)
|
||||
return bi.Uint64(), nil
|
||||
})
|
||||
}
|
||||
|
||||
func (c *ConversationRedisCache) DelSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
78
pkg/common/db/cache/extend_msg_set.go
vendored
78
pkg/common/db/cache/extend_msg_set.go
vendored
@ -2,10 +2,11 @@ package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
|
||||
"github.com/dtm-labs/rockscache"
|
||||
"time"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation"
|
||||
"github.com/dtm-labs/rockscache"
|
||||
"github.com/go-redis/redis/v8"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -13,44 +14,51 @@ const (
|
||||
extendMsgCache = "EXTEND_MSG_CACHE:"
|
||||
)
|
||||
|
||||
type ExtendMsgSetCache struct {
|
||||
expireTime time.Duration
|
||||
rcClient *rockscache.Client
|
||||
type ExtendMsgSetCache interface {
|
||||
metaCache
|
||||
NewCache() ExtendMsgSetCache
|
||||
GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, firstModifyTime int64) (extendMsg *unrelation.ExtendMsgModel, err error)
|
||||
DelExtendMsg(clientMsgID string) ExtendMsgSetCache
|
||||
}
|
||||
|
||||
func (e *ExtendMsgSetCache) getKey(clientMsgID string) string {
|
||||
type ExtendMsgSetCacheRedis struct {
|
||||
metaCache
|
||||
expireTime time.Duration
|
||||
rcClient *rockscache.Client
|
||||
extendMsgSetDB unrelation.ExtendMsgSetModelInterface
|
||||
}
|
||||
|
||||
func NewExtendMsgSetCacheRedis(rdb redis.UniversalClient, extendMsgSetDB unrelation.ExtendMsgSetModelInterface, options rockscache.Options) ExtendMsgSetCache {
|
||||
rcClient := rockscache.NewClient(rdb, options)
|
||||
return &ExtendMsgSetCacheRedis{
|
||||
metaCache: NewMetaCacheRedis(rcClient),
|
||||
expireTime: time.Second * 30 * 60,
|
||||
extendMsgSetDB: extendMsgSetDB,
|
||||
rcClient: rcClient,
|
||||
}
|
||||
}
|
||||
|
||||
func (e *ExtendMsgSetCacheRedis) NewCache() ExtendMsgSetCache {
|
||||
return &ExtendMsgSetCacheRedis{
|
||||
metaCache: e.metaCache,
|
||||
expireTime: e.expireTime,
|
||||
extendMsgSetDB: e.extendMsgSetDB,
|
||||
rcClient: e.rcClient,
|
||||
}
|
||||
}
|
||||
|
||||
func (e *ExtendMsgSetCacheRedis) getKey(clientMsgID string) string {
|
||||
return extendMsgCache + clientMsgID
|
||||
}
|
||||
|
||||
func (e *ExtendMsgSetCache) GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, firstModifyTime int64) (extendMsg *unrelation.ExtendMsgModel, err error) {
|
||||
//getExtendMsg := func() (string, error) {
|
||||
// extendMsg, err := db.DB.GetExtendMsg(sourceID, sessionType, clientMsgID, firstModifyTime)
|
||||
// if err != nil {
|
||||
// return "", utils.Wrap(err, "GetExtendMsgList failed")
|
||||
// }
|
||||
// bytes, err := json.Marshal(extendMsg)
|
||||
// if err != nil {
|
||||
// return "", utils.Wrap(err, "Marshal failed")
|
||||
// }
|
||||
// return string(bytes), nil
|
||||
//}
|
||||
//defer func() {
|
||||
// mcontext.SetCtxDebug(ctx, utils.GetFuncName(1), err, "sourceID", sourceID, "sessionType",
|
||||
// sessionType, "clientMsgID", clientMsgID, "firstModifyTime", firstModifyTime, "extendMsg", extendMsg)
|
||||
//}()
|
||||
//extendMsgStr, err := db.DB.Rc.Fetch(extendMsgCache+clientMsgID, time.Second*30*60, getExtendMsg)
|
||||
//if err != nil {
|
||||
// return nil, utils.Wrap(err, "Fetch failed")
|
||||
//}
|
||||
//extendMsg = &mongoDB.ExtendMsg{}
|
||||
//err = json.Unmarshal([]byte(extendMsgStr), extendMsg)
|
||||
//return extendMsg, utils.Wrap(err, "Unmarshal failed")
|
||||
return GetCache(ctx, e.rcClient, e.getKey(clientMsgID), e.expireTime, func(ctx context.Context) (*unrelation.ExtendMsgModel, error) {
|
||||
panic("")
|
||||
func (e *ExtendMsgSetCacheRedis) GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, firstModifyTime int64) (extendMsg *unrelation.ExtendMsgModel, err error) {
|
||||
return getCache(ctx, e.rcClient, e.getKey(clientMsgID), e.expireTime, func(ctx context.Context) (*unrelation.ExtendMsgModel, error) {
|
||||
return e.extendMsgSetDB.TakeExtendMsg(ctx, sourceID, sessionType, clientMsgID, firstModifyTime)
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func (e *ExtendMsgSetCache) DelExtendMsg(ctx context.Context, clientMsgID string) (err error) {
|
||||
return utils.Wrap(e.rcClient.TagAsDeleted(e.getKey(clientMsgID)), "DelExtendMsg err")
|
||||
func (e *ExtendMsgSetCacheRedis) DelExtendMsg(clientMsgID string) ExtendMsgSetCache {
|
||||
new := e.NewCache()
|
||||
new.AddKeys(e.getKey(clientMsgID))
|
||||
return new
|
||||
}
|
||||
|
76
pkg/common/db/cache/friend.go
vendored
76
pkg/common/db/cache/friend.go
vendored
@ -2,13 +2,12 @@ package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/relation"
|
||||
"time"
|
||||
|
||||
relationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
|
||||
"github.com/dtm-labs/rockscache"
|
||||
"github.com/go-redis/redis/v8"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -20,29 +19,38 @@ const (
|
||||
|
||||
// args fn will exec when no data in cache
|
||||
type FriendCache interface {
|
||||
GetFriendIDs(ctx context.Context, ownerUserID string, fn func(ctx context.Context, ownerUserID string) (friendIDs []string, err error)) (friendIDs []string, err error)
|
||||
metaCache
|
||||
NewCache() FriendCache
|
||||
GetFriendIDs(ctx context.Context, ownerUserID string) (friendIDs []string, err error)
|
||||
// call when friendID List changed
|
||||
DelFriendIDs(ctx context.Context, ownerUserID string) (err error)
|
||||
DelFriendIDs(ownerUserID ...string) FriendCache
|
||||
// get single friendInfo from cache
|
||||
GetFriend(ctx context.Context, ownerUserID, friendUserID string, fn func(ctx context.Context, ownerUserID, friendUserID string) (friend *relationTb.FriendModel, err error)) (friend *relationTb.FriendModel, err error)
|
||||
GetFriend(ctx context.Context, ownerUserID, friendUserID string) (friend *relationTb.FriendModel, err error)
|
||||
// del friend when friend info changed
|
||||
DelFriend(ctx context.Context, ownerUserID, friendUserID string) (err error)
|
||||
DelFriend(ownerUserID, friendUserID string) FriendCache
|
||||
}
|
||||
|
||||
type FriendCacheRedis struct {
|
||||
friendDB *relation.FriendGorm
|
||||
metaCache
|
||||
friendDB relationTb.FriendModelInterface
|
||||
expireTime time.Duration
|
||||
rcClient *rockscache.Client
|
||||
}
|
||||
|
||||
func NewFriendCacheRedis(rdb redis.UniversalClient, friendDB *relation.FriendGorm, options rockscache.Options) *FriendCacheRedis {
|
||||
func NewFriendCacheRedis(rdb redis.UniversalClient, friendDB relationTb.FriendModelInterface, options rockscache.Options) FriendCache {
|
||||
rcClient := rockscache.NewClient(rdb, options)
|
||||
return &FriendCacheRedis{
|
||||
metaCache: NewMetaCacheRedis(rcClient),
|
||||
friendDB: friendDB,
|
||||
expireTime: friendExpireTime,
|
||||
rcClient: rockscache.NewClient(rdb, options),
|
||||
rcClient: rcClient,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *FriendCacheRedis) NewCache() FriendCache {
|
||||
return &FriendCacheRedis{rcClient: c.rcClient, metaCache: c.metaCache, friendDB: c.friendDB, expireTime: c.expireTime}
|
||||
}
|
||||
|
||||
func (f *FriendCacheRedis) getFriendIDsKey(ownerUserID string) string {
|
||||
return friendIDsKey + ownerUserID
|
||||
}
|
||||
@ -56,15 +64,22 @@ func (f *FriendCacheRedis) getFriendKey(ownerUserID, friendUserID string) string
|
||||
}
|
||||
|
||||
func (f *FriendCacheRedis) GetFriendIDs(ctx context.Context, ownerUserID string) (friendIDs []string, err error) {
|
||||
return GetCache(ctx, f.rcClient, f.getFriendIDsKey(ownerUserID), f.expireTime, func(ctx context.Context) ([]string, error) {
|
||||
return getCache(ctx, f.rcClient, f.getFriendIDsKey(ownerUserID), f.expireTime, func(ctx context.Context) ([]string, error) {
|
||||
return f.friendDB.FindFriendUserIDs(ctx, ownerUserID)
|
||||
})
|
||||
}
|
||||
|
||||
func (f *FriendCacheRedis) DelFriendIDs(ctx context.Context, ownerUserID string) (err error) {
|
||||
return f.rcClient.TagAsDeleted(f.getFriendIDsKey(ownerUserID))
|
||||
func (f *FriendCacheRedis) DelFriendIDs(ownerUserID ...string) FriendCache {
|
||||
new := f.NewCache()
|
||||
var keys []string
|
||||
for _, userID := range ownerUserID {
|
||||
keys = append(keys, f.getFriendIDsKey(userID))
|
||||
}
|
||||
new.AddKeys(keys...)
|
||||
return new
|
||||
}
|
||||
|
||||
// todo
|
||||
func (f *FriendCacheRedis) GetTwoWayFriendIDs(ctx context.Context, ownerUserID string) (twoWayFriendIDs []string, err error) {
|
||||
friendIDs, err := f.GetFriendIDs(ctx, ownerUserID)
|
||||
if err != nil {
|
||||
@ -82,31 +97,20 @@ func (f *FriendCacheRedis) GetTwoWayFriendIDs(ctx context.Context, ownerUserID s
|
||||
return twoWayFriendIDs, nil
|
||||
}
|
||||
|
||||
func (f *FriendCacheRedis) DelTwoWayFriendIDs(ctx context.Context, ownerUserID string) (err error) {
|
||||
return f.rcClient.TagAsDeleted(f.getTwoWayFriendsIDsKey(ownerUserID))
|
||||
func (f *FriendCacheRedis) DelTwoWayFriendIDs(ctx context.Context, ownerUserID string) FriendCache {
|
||||
new := f.NewCache()
|
||||
new.AddKeys(f.getTwoWayFriendsIDsKey(ownerUserID))
|
||||
return new
|
||||
}
|
||||
|
||||
func (f *FriendCacheRedis) GetFriend(ctx context.Context, ownerUserID, friendUserID string, fn func(ctx context.Context, ownerUserID, friendUserID string) (friend *relationTb.FriendModel, err error)) (friend *relationTb.FriendModel, err error) {
|
||||
getFriend := func() (string, error) {
|
||||
friend, err = f.friendDB.Take(ctx, ownerUserID, friendUserID)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
bytes, err := json.Marshal(friend)
|
||||
if err != nil {
|
||||
return "", utils.Wrap(err, "")
|
||||
}
|
||||
return string(bytes), nil
|
||||
}
|
||||
friendStr, err := f.rcClient.Fetch(f.getFriendKey(ownerUserID, friendUserID), f.expireTime, getFriend)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
friend = &relationTb.FriendModel{}
|
||||
err = json.Unmarshal([]byte(friendStr), friend)
|
||||
return friend, utils.Wrap(err, "")
|
||||
func (f *FriendCacheRedis) GetFriend(ctx context.Context, ownerUserID, friendUserID string) (friend *relationTb.FriendModel, err error) {
|
||||
return getCache(ctx, f.rcClient, f.getFriendKey(ownerUserID, friendUserID), f.expireTime, func(ctx context.Context) (*relationTb.FriendModel, error) {
|
||||
return f.friendDB.Take(ctx, ownerUserID, friendUserID)
|
||||
})
|
||||
}
|
||||
|
||||
func (f *FriendCacheRedis) DelFriend(ctx context.Context, ownerUserID, friendUserID string) (err error) {
|
||||
return f.rcClient.TagAsDeleted(f.getFriendKey(ownerUserID, friendUserID))
|
||||
func (f *FriendCacheRedis) DelFriend(ownerUserID, friendUserID string) FriendCache {
|
||||
new := f.NewCache()
|
||||
new.AddKeys(f.getFriendKey(ownerUserID, friendUserID))
|
||||
return new
|
||||
}
|
||||
|
359
pkg/common/db/cache/group.go
vendored
359
pkg/common/db/cache/group.go
vendored
@ -2,14 +2,15 @@ package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
relationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
|
||||
unrelation2 "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
|
||||
"github.com/dtm-labs/rockscache"
|
||||
"github.com/go-redis/redis/v8"
|
||||
"math/big"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
relationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
|
||||
unrelationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
|
||||
"github.com/dtm-labs/rockscache"
|
||||
"github.com/go-redis/redis/v8"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -24,40 +25,54 @@ const (
|
||||
)
|
||||
|
||||
type GroupCache interface {
|
||||
metaCache
|
||||
NewCache() GroupCache
|
||||
GetGroupsInfo(ctx context.Context, groupIDs []string) (groups []*relationTb.GroupModel, err error)
|
||||
GetGroupInfo(ctx context.Context, groupID string) (group *relationTb.GroupModel, err error)
|
||||
BatchDelJoinedSuperGroupIDs(ctx context.Context, userIDs []string) (err error)
|
||||
DelJoinedSuperGroupIDs(ctx context.Context, userID string) (err error)
|
||||
GetJoinedSuperGroupIDs(ctx context.Context, userID string) (joinedSuperGroupIDs []string, err error)
|
||||
GetGroupMembersHash(ctx context.Context, groupID string) (hashCodeUint64 uint64, err error)
|
||||
GetGroupMemberHash1(ctx context.Context, groupIDs []string) (map[string]*relationTb.GroupSimpleUserID, error)
|
||||
DelGroupMembersHash(ctx context.Context, groupID string) (err error)
|
||||
DelJoinedSuperGroupIDs(userIDs ...string) GroupCache
|
||||
|
||||
GetGroupMembersHash(ctx context.Context, groupID string) (hashCode uint64, err error)
|
||||
GetGroupMemberHashMap(ctx context.Context, groupIDs []string) (map[string]*relationTb.GroupSimpleUserID, error)
|
||||
DelGroupMembersHash(groupID string) GroupCache
|
||||
|
||||
GetGroupMemberIDs(ctx context.Context, groupID string) (groupMemberIDs []string, err error)
|
||||
DelGroupMemberIDs(ctx context.Context, groupID string) (err error)
|
||||
DelJoinedGroupID(ctx context.Context, userID string) (err error)
|
||||
GetGroupsMemberIDs(ctx context.Context, groupIDs []string) (groupMemberIDs map[string][]string, err error)
|
||||
|
||||
DelGroupMemberIDs(groupID string) GroupCache
|
||||
DelJoinedGroupID(userID ...string) GroupCache
|
||||
|
||||
GetGroupMemberInfo(ctx context.Context, groupID, userID string) (groupMember *relationTb.GroupMemberModel, err error)
|
||||
DelGroupMemberInfo(ctx context.Context, groupID, userID string) (err error)
|
||||
DelGroupMemberNum(ctx context.Context, groupID string) (err error)
|
||||
DelGroupInfo(ctx context.Context, groupID string) (err error)
|
||||
DelGroupsInfo(ctx context.Context, groupIDs []string) error
|
||||
GetGroupMembersInfo(ctx context.Context, groupID string, userID []string, roleLevel []int32) (groupMembers []*relationTb.GroupMemberModel, err error)
|
||||
DelGroupMembersInfo(groupID string, userID ...string) GroupCache
|
||||
|
||||
GetGroupMemberNum(ctx context.Context, groupID string) (memberNum int64, err error)
|
||||
DelGroupsMemberNum(groupID ...string) GroupCache
|
||||
DelGroupsInfo(groupIDs ...string) GroupCache
|
||||
}
|
||||
|
||||
type GroupCacheRedis struct {
|
||||
group relationTb.GroupModelInterface
|
||||
groupMember relationTb.GroupMemberModelInterface
|
||||
groupRequest relationTb.GroupRequestModelInterface
|
||||
mongoDB unrelation2.SuperGroupModelInterface
|
||||
expireTime time.Duration
|
||||
rcClient *rockscache.Client
|
||||
metaCache
|
||||
groupDB relationTb.GroupModelInterface
|
||||
groupMemberDB relationTb.GroupMemberModelInterface
|
||||
groupRequestDB relationTb.GroupRequestModelInterface
|
||||
mongoDB unrelationTb.SuperGroupModelInterface
|
||||
expireTime time.Duration
|
||||
rcClient *rockscache.Client
|
||||
}
|
||||
|
||||
func NewGroupCacheRedis(rdb redis.UniversalClient, groupDB relationTb.GroupModelInterface, groupMemberDB relationTb.GroupMemberModelInterface, groupRequestDB relationTb.GroupRequestModelInterface, mongoClient unrelation2.SuperGroupModelInterface, opts rockscache.Options) GroupCache {
|
||||
return &GroupCacheRedis{rcClient: rockscache.NewClient(rdb, opts), expireTime: groupExpireTime,
|
||||
group: groupDB, groupMember: groupMemberDB, groupRequest: groupRequestDB,
|
||||
mongoDB: mongoClient,
|
||||
func NewGroupCacheRedis(rdb redis.UniversalClient, groupDB relationTb.GroupModelInterface, groupMemberDB relationTb.GroupMemberModelInterface, groupRequestDB relationTb.GroupRequestModelInterface, mongoClient unrelationTb.SuperGroupModelInterface, opts rockscache.Options) GroupCache {
|
||||
rcClient := rockscache.NewClient(rdb, opts)
|
||||
return &GroupCacheRedis{rcClient: rcClient, expireTime: groupExpireTime,
|
||||
groupDB: groupDB, groupMemberDB: groupMemberDB, groupRequestDB: groupRequestDB,
|
||||
mongoDB: mongoClient, metaCache: NewMetaCacheRedis(rcClient),
|
||||
}
|
||||
}
|
||||
|
||||
func (g *GroupCacheRedis) NewCache() GroupCache {
|
||||
return &GroupCacheRedis{rcClient: g.rcClient, expireTime: g.expireTime, groupDB: g.groupDB, groupMemberDB: g.groupMemberDB, groupRequestDB: g.groupRequestDB, mongoDB: g.mongoDB, metaCache: g.metaCache}
|
||||
}
|
||||
|
||||
func (g *GroupCacheRedis) getGroupInfoKey(groupID string) string {
|
||||
return groupInfoKey + groupID
|
||||
}
|
||||
@ -86,35 +101,66 @@ func (g *GroupCacheRedis) getGroupMemberNumKey(groupID string) string {
|
||||
return groupMemberNumKey + groupID
|
||||
}
|
||||
|
||||
func (g *GroupCacheRedis) GetGroupIndex(group *relationTb.GroupModel, keys []string) (int, error) {
|
||||
key := g.getGroupInfoKey(group.GroupID)
|
||||
for i, _key := range keys {
|
||||
if _key == key {
|
||||
return i, nil
|
||||
}
|
||||
}
|
||||
return 0, errIndex
|
||||
}
|
||||
|
||||
func (g *GroupCacheRedis) GetGroupMemberIndex(groupMember *relationTb.GroupMemberModel, keys []string) (int, error) {
|
||||
key := g.getGroupMemberInfoKey(groupMember.GroupID, groupMember.UserID)
|
||||
for i, _key := range keys {
|
||||
if _key == key {
|
||||
return i, nil
|
||||
}
|
||||
}
|
||||
return 0, errIndex
|
||||
}
|
||||
|
||||
// / groupInfo
|
||||
func (g *GroupCacheRedis) GetGroupsInfo(ctx context.Context, groupIDs []string) (groups []*relationTb.GroupModel, err error) {
|
||||
return GetCacheFor(ctx, groupIDs, func(ctx context.Context, groupID string) (*relationTb.GroupModel, error) {
|
||||
return g.GetGroupInfo(ctx, groupID)
|
||||
var keys []string
|
||||
for _, group := range groupIDs {
|
||||
keys = append(keys, g.getGroupInfoKey(group))
|
||||
}
|
||||
return batchGetCache(ctx, g.rcClient, keys, g.expireTime, g.GetGroupIndex, func(ctx context.Context) ([]*relationTb.GroupModel, error) {
|
||||
return g.groupDB.Find(ctx, groupIDs)
|
||||
})
|
||||
}
|
||||
|
||||
func (g *GroupCacheRedis) GetGroupInfo(ctx context.Context, groupID string) (group *relationTb.GroupModel, err error) {
|
||||
return GetCache(ctx, g.rcClient, g.getGroupInfoKey(groupID), g.expireTime, func(ctx context.Context) (*relationTb.GroupModel, error) {
|
||||
return g.group.Take(ctx, groupID)
|
||||
return getCache(ctx, g.rcClient, g.getGroupInfoKey(groupID), g.expireTime, func(ctx context.Context) (*relationTb.GroupModel, error) {
|
||||
return g.groupDB.Take(ctx, groupID)
|
||||
})
|
||||
}
|
||||
|
||||
// userJoinSuperGroup
|
||||
func (g *GroupCacheRedis) BatchDelJoinedSuperGroupIDs(ctx context.Context, userIDs []string) (err error) {
|
||||
for _, userID := range userIDs {
|
||||
if err := g.DelJoinedSuperGroupIDs(ctx, userID); err != nil {
|
||||
return err
|
||||
}
|
||||
func (g *GroupCacheRedis) DelGroupsInfo(groupIDs ...string) GroupCache {
|
||||
new := g.NewCache()
|
||||
var keys []string
|
||||
for _, groupID := range groupIDs {
|
||||
keys = append(keys, g.getGroupInfoKey(groupID))
|
||||
}
|
||||
return nil
|
||||
new.AddKeys(keys...)
|
||||
return new
|
||||
}
|
||||
|
||||
func (g *GroupCacheRedis) DelJoinedSuperGroupIDs(ctx context.Context, userID string) (err error) {
|
||||
return g.rcClient.TagAsDeleted(g.getJoinedSuperGroupsIDKey(userID))
|
||||
// userJoinSuperGroup
|
||||
func (g *GroupCacheRedis) DelJoinedSuperGroupIDs(userIDs ...string) GroupCache {
|
||||
new := g.NewCache()
|
||||
var keys []string
|
||||
for _, userID := range userIDs {
|
||||
keys = append(keys, g.getJoinedSuperGroupsIDKey(userID))
|
||||
}
|
||||
new.AddKeys(keys...)
|
||||
return new
|
||||
}
|
||||
|
||||
func (g *GroupCacheRedis) GetJoinedSuperGroupIDs(ctx context.Context, userID string) (joinedSuperGroupIDs []string, err error) {
|
||||
return GetCache(ctx, g.rcClient, g.getJoinedSuperGroupsIDKey(userID), g.expireTime, func(ctx context.Context) ([]string, error) {
|
||||
return getCache(ctx, g.rcClient, g.getJoinedSuperGroupsIDKey(userID), g.expireTime, func(ctx context.Context) ([]string, error) {
|
||||
userGroup, err := g.mongoDB.GetSuperGroupByUserID(ctx, userID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -124,8 +170,8 @@ func (g *GroupCacheRedis) GetJoinedSuperGroupIDs(ctx context.Context, userID str
|
||||
}
|
||||
|
||||
// groupMembersHash
|
||||
func (g *GroupCacheRedis) GetGroupMembersHash(ctx context.Context, groupID string) (hashCodeUint64 uint64, err error) {
|
||||
return GetCache(ctx, g.rcClient, g.getGroupMembersHashKey(groupID), g.expireTime, func(ctx context.Context) (uint64, error) {
|
||||
func (g *GroupCacheRedis) GetGroupMembersHash(ctx context.Context, groupID string) (hashCode uint64, err error) {
|
||||
return getCache(ctx, g.rcClient, g.getGroupMembersHashKey(groupID), g.expireTime, func(ctx context.Context) (uint64, error) {
|
||||
userIDs, err := g.GetGroupMemberIDs(ctx, groupID)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
@ -137,174 +183,125 @@ func (g *GroupCacheRedis) GetGroupMembersHash(ctx context.Context, groupID strin
|
||||
})
|
||||
}
|
||||
|
||||
func (g *GroupCacheRedis) GetGroupMemberHash1(ctx context.Context, groupIDs []string) (map[string]*relationTb.GroupSimpleUserID, error) {
|
||||
// todo
|
||||
mapGroupUserIDs, err := g.groupMember.FindJoinUserID(ctx, groupIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
func (g *GroupCacheRedis) GetGroupMemberHashMap(ctx context.Context, groupIDs []string) (map[string]*relationTb.GroupSimpleUserID, error) {
|
||||
res := make(map[string]*relationTb.GroupSimpleUserID)
|
||||
for _, groupID := range groupIDs {
|
||||
userIDs := mapGroupUserIDs[groupID]
|
||||
userIDs, err := g.GetGroupMemberIDs(ctx, groupID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
users := &relationTb.GroupSimpleUserID{}
|
||||
if len(userIDs) > 0 {
|
||||
utils.Sort(userIDs, true)
|
||||
bi := big.NewInt(0)
|
||||
bi.SetString(utils.Md5(strings.Join(userIDs, ";"))[0:8], 16)
|
||||
users.Hash = bi.Uint64()
|
||||
users.MemberNum = uint32(len(userIDs))
|
||||
}
|
||||
res[groupID] = users
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (g *GroupCacheRedis) DelGroupMembersHash(ctx context.Context, groupID string) (err error) {
|
||||
return g.rcClient.TagAsDeleted(g.getGroupMembersHashKey(groupID))
|
||||
func (g *GroupCacheRedis) DelGroupMembersHash(groupID string) GroupCache {
|
||||
cache := g.NewCache()
|
||||
cache.AddKeys(g.getGroupMembersHashKey(groupID))
|
||||
return cache
|
||||
}
|
||||
|
||||
// groupMemberIDs
|
||||
func (g *GroupCacheRedis) GetGroupMemberIDs(ctx context.Context, groupID string) (groupMemberIDs []string, err error) {
|
||||
return GetCache(ctx, g.rcClient, g.getGroupMemberIDsKey(groupID), g.expireTime, func(ctx context.Context) ([]string, error) {
|
||||
return g.groupMember.FindMemberUserID(ctx, groupID)
|
||||
return getCache(ctx, g.rcClient, g.getGroupMemberIDsKey(groupID), g.expireTime, func(ctx context.Context) ([]string, error) {
|
||||
return g.groupMemberDB.FindMemberUserID(ctx, groupID)
|
||||
})
|
||||
}
|
||||
|
||||
func (g *GroupCacheRedis) DelGroupMemberIDs(ctx context.Context, groupID string) (err error) {
|
||||
return g.rcClient.TagAsDeleted(g.getGroupMemberIDsKey(groupID))
|
||||
func (g *GroupCacheRedis) GetGroupsMemberIDs(ctx context.Context, groupIDs []string) (map[string][]string, error) {
|
||||
m := make(map[string][]string)
|
||||
for _, groupID := range groupIDs {
|
||||
userIDs, err := g.GetGroupMemberIDs(ctx, groupID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
m[groupID] = userIDs
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
//// JoinedGroups
|
||||
//func (g *GroupCacheRedis) GetJoinedGroupIDs(ctx context.Context, userID string) (joinedGroupIDs []string, err error) {
|
||||
// getJoinedGroupIDList := func() (string, error) {
|
||||
// joinedGroupList, err := relation.GetJoinedGroupIDListByUserID(userID)
|
||||
// if err != nil {
|
||||
// return "", err
|
||||
// }
|
||||
// bytes, err := json.Marshal(joinedGroupList)
|
||||
// if err != nil {
|
||||
// return "", utils.Wrap(err, "")
|
||||
// }
|
||||
// return string(bytes), nil
|
||||
// }
|
||||
// defer func() {
|
||||
// mcontext.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID, "joinedGroupIDs", joinedGroupIDs)
|
||||
// }()
|
||||
// joinedGroupIDListStr, err := g.rcClient.Fetch(g.getJoinedGroupsKey(userID), time.Second*30*60, getJoinedGroupIDList)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// err = json.Unmarshal([]byte(joinedGroupIDListStr), &joinedGroupIDs)
|
||||
// return joinedGroupIDs, utils.Wrap(err, "")
|
||||
//}
|
||||
|
||||
func (g *GroupCacheRedis) DelJoinedGroupID(ctx context.Context, userID string) (err error) {
|
||||
return g.rcClient.TagAsDeleted(g.getJoinedGroupsKey(userID))
|
||||
func (g *GroupCacheRedis) DelGroupMemberIDs(groupID string) GroupCache {
|
||||
cache := g.NewCache()
|
||||
cache.AddKeys(g.getGroupMemberIDsKey(groupID))
|
||||
return cache
|
||||
}
|
||||
|
||||
//func (g *GroupCacheRedis) DelJoinedGroupIDs(ctx context.Context, userIDs []string) (err error) {
|
||||
// defer func() {
|
||||
// mcontext.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID)
|
||||
// }()
|
||||
// for _, userID := range userIDs {
|
||||
// if err := g.DelJoinedGroupID(ctx, userID); err != nil {
|
||||
// return err
|
||||
// }
|
||||
// }
|
||||
// return nil
|
||||
//}
|
||||
func (g *GroupCacheRedis) GetJoinedGroupIDs(ctx context.Context, userID string) (joinedGroupIDs []string, err error) {
|
||||
return getCache(ctx, g.rcClient, g.getJoinedGroupsKey(userID), g.expireTime, func(ctx context.Context) ([]string, error) {
|
||||
return g.groupMemberDB.FindUserJoinedGroupID(ctx, userID)
|
||||
})
|
||||
}
|
||||
|
||||
func (g *GroupCacheRedis) DelJoinedGroupID(userIDs ...string) GroupCache {
|
||||
var keys []string
|
||||
for _, userID := range userIDs {
|
||||
keys = append(keys, g.getJoinedGroupsKey(userID))
|
||||
}
|
||||
cache := g.NewCache()
|
||||
cache.AddKeys(keys...)
|
||||
return cache
|
||||
}
|
||||
|
||||
func (g *GroupCacheRedis) GetGroupMemberInfo(ctx context.Context, groupID, userID string) (groupMember *relationTb.GroupMemberModel, err error) {
|
||||
return GetCache(ctx, g.rcClient, g.getGroupMemberInfoKey(groupID, userID), g.expireTime, func(ctx context.Context) (*relationTb.GroupMemberModel, error) {
|
||||
return g.groupMember.Take(ctx, groupID, userID)
|
||||
return getCache(ctx, g.rcClient, g.getGroupMemberInfoKey(groupID, userID), g.expireTime, func(ctx context.Context) (*relationTb.GroupMemberModel, error) {
|
||||
return g.groupMemberDB.Take(ctx, groupID, userID)
|
||||
})
|
||||
}
|
||||
|
||||
//func (g *GroupCacheRedis) GetGroupMembersInfo(ctx context.Context, groupID, userIDs []string) (groupMember *relationTb.GroupMemberModel, err error) {
|
||||
//
|
||||
// return nil, err
|
||||
//}
|
||||
|
||||
//func (g *GroupCacheRedis) GetGroupMembersInfo(ctx context.Context, count, offset int32, groupID string) (groupMembers []*relation.GroupMember, err error) {
|
||||
// defer func() {
|
||||
// mcontext.SetCtxDebug(ctx, utils.GetFuncName(1), err, "count", count, "offset", offset, "groupID", groupID, "groupMember", groupMembers)
|
||||
// }()
|
||||
// groupMemberIDList, err := g.GetGroupMemberIDs(ctx, groupID)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// if count < 0 || offset < 0 {
|
||||
// return nil, nil
|
||||
// }
|
||||
// var groupMemberList []*relation.GroupMember
|
||||
// var start, stop int32
|
||||
// start = offset
|
||||
// stop = offset + count
|
||||
// l := int32(len(groupMemberIDList))
|
||||
// if start > stop {
|
||||
// return nil, nil
|
||||
// }
|
||||
// if start >= l {
|
||||
// return nil, nil
|
||||
// }
|
||||
// if count != 0 {
|
||||
// if stop >= l {
|
||||
// stop = l
|
||||
// }
|
||||
// groupMemberIDList = groupMemberIDList[start:stop]
|
||||
// } else {
|
||||
// if l < 1000 {
|
||||
// stop = l
|
||||
// } else {
|
||||
// stop = 1000
|
||||
// }
|
||||
// groupMemberIDList = groupMemberIDList[start:stop]
|
||||
// }
|
||||
// for _, userID := range groupMemberIDList {
|
||||
// groupMember, err := g.GetGroupMemberInfo(ctx, groupID, userID)
|
||||
// if err != nil {
|
||||
// return
|
||||
// }
|
||||
// groupMembers = append(groupMembers, groupMember)
|
||||
// }
|
||||
// return groupMemberList, nil
|
||||
//}
|
||||
|
||||
func (g *GroupCacheRedis) DelGroupMemberInfo(ctx context.Context, groupID, userID string) (err error) {
|
||||
return g.rcClient.TagAsDeleted(g.getGroupMemberInfoKey(groupID, userID))
|
||||
}
|
||||
|
||||
// groupMemberNum
|
||||
//func (g *GroupCacheRedis) GetGroupMemberNum(ctx context.Context, groupID string) (num int, err error) {
|
||||
// getGroupMemberNum := func() (string, error) {
|
||||
// num, err := relation.GetGroupMemberNumByGroupID(groupID)
|
||||
// if err != nil {
|
||||
// return "", err
|
||||
// }
|
||||
// return strconv.Itoa(int(num)), nil
|
||||
// }
|
||||
// defer func() {
|
||||
// mcontext.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "num", num)
|
||||
// }()
|
||||
// groupMember, err := g.rcClient.Fetch(g.getGroupMemberNumKey(groupID), time.Second*30*60, getGroupMemberNum)
|
||||
// if err != nil {
|
||||
// return 0, err
|
||||
// }
|
||||
// return strconv.Atoi(groupMember)
|
||||
//}
|
||||
|
||||
func (g *GroupCacheRedis) DelGroupMemberNum(ctx context.Context, groupID string) (err error) {
|
||||
return g.rcClient.TagAsDeleted(g.getGroupMemberNumKey(groupID))
|
||||
}
|
||||
|
||||
func (g *GroupCacheRedis) DelGroupInfo(ctx context.Context, groupID string) (err error) {
|
||||
return g.rcClient.TagAsDeleted(g.getGroupInfoKey(groupID))
|
||||
}
|
||||
|
||||
func (g *GroupCacheRedis) DelGroupsInfo(ctx context.Context, groupIDs []string) error {
|
||||
for _, groupID := range groupIDs {
|
||||
if err := g.DelGroupInfo(ctx, groupID); err != nil {
|
||||
return err
|
||||
}
|
||||
func (g *GroupCacheRedis) GetGroupMembersInfo(ctx context.Context, groupID string, userIDs []string, roleLevel []int32) ([]*relationTb.GroupMemberModel, error) {
|
||||
var keys []string
|
||||
for _, userID := range userIDs {
|
||||
keys = append(keys, g.getGroupMemberInfoKey(groupID, userID))
|
||||
}
|
||||
return nil
|
||||
return batchGetCache(ctx, g.rcClient, keys, g.expireTime, g.GetGroupMemberIndex, func(ctx context.Context) ([]*relationTb.GroupMemberModel, error) {
|
||||
return g.groupMemberDB.Find(ctx, []string{groupID}, userIDs, roleLevel)
|
||||
})
|
||||
}
|
||||
|
||||
func (g *GroupCacheRedis) GetAllGroupMemberInfo(ctx context.Context, groupID string) ([]*relationTb.GroupMemberModel, error) {
|
||||
groupMemberIDs, err := g.GetGroupMemberIDs(ctx, groupID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var keys []string
|
||||
for _, groupMemberID := range groupMemberIDs {
|
||||
keys = append(keys, g.getGroupMemberInfoKey(groupID, groupMemberID))
|
||||
}
|
||||
return batchGetCache(ctx, g.rcClient, keys, g.expireTime, g.GetGroupMemberIndex, func(ctx context.Context) ([]*relationTb.GroupMemberModel, error) {
|
||||
return g.groupMemberDB.Find(ctx, []string{groupID}, groupMemberIDs, nil)
|
||||
})
|
||||
}
|
||||
|
||||
func (g *GroupCacheRedis) DelGroupMembersInfo(groupID string, userIDs ...string) GroupCache {
|
||||
var keys []string
|
||||
for _, userID := range userIDs {
|
||||
keys = append(keys, g.getGroupMemberInfoKey(groupID, userID))
|
||||
}
|
||||
cache := g.NewCache()
|
||||
cache.AddKeys(keys...)
|
||||
return cache
|
||||
}
|
||||
|
||||
func (g *GroupCacheRedis) GetGroupMemberNum(ctx context.Context, groupID string) (memberNum int64, err error) {
|
||||
return getCache(ctx, g.rcClient, g.getGroupMemberNumKey(groupID), g.expireTime, func(ctx context.Context) (int64, error) {
|
||||
return g.groupMemberDB.TakeGroupMemberNum(ctx, groupID)
|
||||
})
|
||||
}
|
||||
|
||||
func (g *GroupCacheRedis) DelGroupsMemberNum(groupID ...string) GroupCache {
|
||||
var keys []string
|
||||
for _, groupID := range groupID {
|
||||
keys = append(keys, g.getGroupMemberNumKey(groupID))
|
||||
}
|
||||
cache := g.NewCache()
|
||||
cache.AddKeys(keys...)
|
||||
return cache
|
||||
}
|
||||
|
3
pkg/common/db/cache/init_redis.go
vendored
3
pkg/common/db/cache/init_redis.go
vendored
@ -3,11 +3,12 @@ package cache
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mw/specialerror"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
|
||||
"github.com/go-redis/redis/v8"
|
||||
"time"
|
||||
)
|
||||
|
||||
func NewRedis() (redis.UniversalClient, error) {
|
||||
|
8
pkg/common/db/cache/redis.go
vendored
8
pkg/common/db/cache/redis.go
vendored
@ -4,17 +4,17 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
|
||||
pbMsg "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
|
||||
"github.com/golang/protobuf/jsonpb"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
)
|
||||
@ -251,7 +251,7 @@ func (c *cache) SetMessageToCache(ctx context.Context, userID string, msgList []
|
||||
}
|
||||
}
|
||||
if len(failedMsgs) != 0 {
|
||||
return len(failedMsgs), errors.New(fmt.Sprintf("set msg to cache failed, failed lists: %q,%s", failedMsgs, mcontext.GetOperationID(ctx)))
|
||||
return len(failedMsgs), fmt.Errorf("set msg to cache failed, failed lists: %v, %s", failedMsgs, userID)
|
||||
}
|
||||
_, err := pipe.Exec(ctx)
|
||||
return 0, err
|
||||
|
81
pkg/common/db/cache/rockscache.go
vendored
81
pkg/common/db/cache/rockscache.go
vendored
@ -3,13 +3,53 @@ package cache
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
|
||||
"github.com/dtm-labs/rockscache"
|
||||
"time"
|
||||
)
|
||||
|
||||
const scanCount = 3000
|
||||
|
||||
var errIndex = errors.New("err index")
|
||||
|
||||
type metaCache interface {
|
||||
ExecDel(ctx context.Context) error
|
||||
// delete key rapid
|
||||
DeleteKey(ctx context.Context, key string) error
|
||||
AddKeys(keys ...string)
|
||||
GetPreDeleteKeys() []string
|
||||
}
|
||||
|
||||
func NewMetaCacheRedis(rcClient *rockscache.Client) metaCache {
|
||||
return &metaCacheRedis{rcClient: rcClient}
|
||||
}
|
||||
|
||||
type metaCacheRedis struct {
|
||||
rcClient *rockscache.Client
|
||||
keys []string
|
||||
}
|
||||
|
||||
func (m *metaCacheRedis) ExecDel(ctx context.Context) error {
|
||||
if len(m.keys) > 0 {
|
||||
return m.rcClient.TagAsDeletedBatch2(ctx, m.keys)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *metaCacheRedis) DeleteKey(ctx context.Context, key string) error {
|
||||
return m.rcClient.TagAsDeleted2(ctx, key)
|
||||
}
|
||||
|
||||
func (m *metaCacheRedis) AddKeys(keys ...string) {
|
||||
m.keys = append(m.keys, keys...)
|
||||
}
|
||||
|
||||
func (m *metaCacheRedis) GetPreDeleteKeys() []string {
|
||||
return m.keys
|
||||
}
|
||||
|
||||
func GetDefaultOpt() rockscache.Options {
|
||||
opts := rockscache.NewDefaultOptions()
|
||||
opts.StrongConsistency = true
|
||||
@ -17,10 +57,10 @@ func GetDefaultOpt() rockscache.Options {
|
||||
return opts
|
||||
}
|
||||
|
||||
func GetCache[T any](ctx context.Context, rcClient *rockscache.Client, key string, expire time.Duration, fn func(ctx context.Context) (T, error)) (T, error) {
|
||||
func getCache[T any](ctx context.Context, rcClient *rockscache.Client, key string, expire time.Duration, fn func(ctx context.Context) (T, error)) (T, error) {
|
||||
var t T
|
||||
var write bool
|
||||
v, err := rcClient.Fetch(key, expire, func() (s string, err error) {
|
||||
v, err := rcClient.Fetch2(ctx, key, expire, func() (s string, err error) {
|
||||
t, err = fn(ctx)
|
||||
if err != nil {
|
||||
return "", err
|
||||
@ -45,14 +85,37 @@ func GetCache[T any](ctx context.Context, rcClient *rockscache.Client, key strin
|
||||
return t, nil
|
||||
}
|
||||
|
||||
func GetCacheFor[E any, T any](ctx context.Context, list []E, fn func(ctx context.Context, item E) (T, error)) ([]T, error) {
|
||||
rs := make([]T, 0, len(list))
|
||||
for _, e := range list {
|
||||
r, err := fn(ctx, e)
|
||||
func batchGetCache[T any](ctx context.Context, rcClient *rockscache.Client, keys []string, expire time.Duration, keyIndexFn func(t T, keys []string) (int, error), fn func(ctx context.Context) ([]T, error)) ([]T, error) {
|
||||
var tArrays []T
|
||||
batchMap, err := rcClient.FetchBatch2(ctx, keys, expire, func(idxs []int) (m map[int]string, err error) {
|
||||
values := make(map[int]string)
|
||||
tArrays, err = fn(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rs = append(rs, r)
|
||||
for _, v := range tArrays {
|
||||
index, err := keyIndexFn(v, keys)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
bs, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
return nil, utils.Wrap(err, "marshal failed")
|
||||
}
|
||||
values[index] = string(bs)
|
||||
}
|
||||
return values, nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return rs, nil
|
||||
for _, v := range batchMap {
|
||||
var t T
|
||||
err = json.Unmarshal([]byte(v), &t)
|
||||
if err != nil {
|
||||
return nil, utils.Wrap(err, "unmarshal failed")
|
||||
}
|
||||
tArrays = append(tArrays, t)
|
||||
}
|
||||
return tArrays, nil
|
||||
}
|
||||
|
124
pkg/common/db/cache/user.go
vendored
124
pkg/common/db/cache/user.go
vendored
@ -2,14 +2,11 @@ package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/relation"
|
||||
"time"
|
||||
|
||||
relationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
|
||||
"github.com/dtm-labs/rockscache"
|
||||
"github.com/go-redis/redis/v8"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -19,21 +16,38 @@ const (
|
||||
)
|
||||
|
||||
type UserCache interface {
|
||||
metaCache
|
||||
NewCache() UserCache
|
||||
GetUserInfo(ctx context.Context, userID string) (userInfo *relationTb.UserModel, err error)
|
||||
GetUsersInfo(ctx context.Context, userIDs []string) ([]*relationTb.UserModel, error)
|
||||
DelUsersInfo(userIDs []string) UserCache
|
||||
GetUserGlobalRecvMsgOpt(ctx context.Context, userID string) (opt int, err error)
|
||||
DelUsersGlobalRecvMsgOpt(userIDs []string) UserCache
|
||||
}
|
||||
|
||||
type UserCacheRedis struct {
|
||||
userDB *relation.UserGorm
|
||||
|
||||
metaCache
|
||||
userDB relationTb.UserModelInterface
|
||||
expireTime time.Duration
|
||||
|
||||
rcClient *rockscache.Client
|
||||
rcClient *rockscache.Client
|
||||
}
|
||||
|
||||
func NewUserCacheRedis(rdb redis.UniversalClient, userDB *relation.UserGorm, options rockscache.Options) *UserCacheRedis {
|
||||
func NewUserCacheRedis(rdb redis.UniversalClient, userDB relationTb.UserModelInterface, options rockscache.Options) UserCache {
|
||||
rcClient := rockscache.NewClient(rdb, options)
|
||||
return &UserCacheRedis{
|
||||
metaCache: NewMetaCacheRedis(rcClient),
|
||||
userDB: userDB,
|
||||
expireTime: userExpireTime,
|
||||
rcClient: rockscache.NewClient(rdb, options),
|
||||
rcClient: rcClient,
|
||||
}
|
||||
}
|
||||
|
||||
func (u *UserCacheRedis) NewCache() UserCache {
|
||||
return &UserCacheRedis{
|
||||
metaCache: u.metaCache,
|
||||
userDB: u.userDB,
|
||||
expireTime: u.expireTime,
|
||||
rcClient: u.rcClient,
|
||||
}
|
||||
}
|
||||
|
||||
@ -46,66 +60,50 @@ func (u *UserCacheRedis) getUserGlobalRecvMsgOptKey(userID string) string {
|
||||
}
|
||||
|
||||
func (u *UserCacheRedis) GetUserInfo(ctx context.Context, userID string) (userInfo *relationTb.UserModel, err error) {
|
||||
getUserInfo := func() (string, error) {
|
||||
userInfo, err := u.userDB.Take(ctx, userID)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
bytes, err := json.Marshal(userInfo)
|
||||
if err != nil {
|
||||
return "", utils.Wrap(err, "")
|
||||
}
|
||||
return string(bytes), nil
|
||||
}
|
||||
userInfoStr, err := u.rcClient.Fetch(u.getUserInfoKey(userID), u.expireTime, getUserInfo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
userInfo = &relationTb.UserModel{}
|
||||
err = json.Unmarshal([]byte(userInfoStr), userInfo)
|
||||
return userInfo, utils.Wrap(err, "")
|
||||
return getCache(ctx, u.rcClient, u.getUserInfoKey(userID), u.expireTime, func(ctx context.Context) (*relationTb.UserModel, error) {
|
||||
return u.userDB.Take(ctx, userID)
|
||||
})
|
||||
}
|
||||
|
||||
func (u *UserCacheRedis) GetUsersInfo(ctx context.Context, userIDs []string) ([]*relationTb.UserModel, error) {
|
||||
var users []*relationTb.UserModel
|
||||
//for _, userID := range userIDs {
|
||||
// user, err := GetUserInfoFromCache(ctx, userID)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// users = append(users, user)
|
||||
//}
|
||||
return users, nil
|
||||
}
|
||||
|
||||
func (u *UserCacheRedis) DelUserInfo(ctx context.Context, userID string) (err error) {
|
||||
return u.rcClient.TagAsDeleted(u.getUserInfoKey(userID))
|
||||
}
|
||||
|
||||
func (u *UserCacheRedis) DelUsersInfo(ctx context.Context, userIDs []string) (err error) {
|
||||
var keys []string
|
||||
for _, userID := range userIDs {
|
||||
if err := u.DelUserInfo(ctx, userID); err != nil {
|
||||
return err
|
||||
}
|
||||
keys = append(keys, u.getUserInfoKey(userID))
|
||||
}
|
||||
return nil
|
||||
return batchGetCache(ctx, u.rcClient, keys, u.expireTime, func(user *relationTb.UserModel, keys []string) (int, error) {
|
||||
for i, key := range keys {
|
||||
if key == u.getUserInfoKey(user.UserID) {
|
||||
return i, nil
|
||||
}
|
||||
}
|
||||
return 0, errIndex
|
||||
}, func(ctx context.Context) ([]*relationTb.UserModel, error) {
|
||||
return u.userDB.Find(ctx, userIDs)
|
||||
})
|
||||
}
|
||||
|
||||
func (u *UserCacheRedis) DelUsersInfo(userIDs []string) UserCache {
|
||||
var keys []string
|
||||
for _, userID := range userIDs {
|
||||
keys = append(keys, u.getUserInfoKey(userID))
|
||||
}
|
||||
cache := u.NewCache()
|
||||
cache.AddKeys(keys...)
|
||||
return cache
|
||||
}
|
||||
|
||||
func (u *UserCacheRedis) GetUserGlobalRecvMsgOpt(ctx context.Context, userID string) (opt int, err error) {
|
||||
getUserGlobalRecvMsgOpt := func() (string, error) {
|
||||
userInfo, err := u.userDB.Take(ctx, userID)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return strconv.Itoa(int(userInfo.GlobalRecvMsgOpt)), nil
|
||||
}
|
||||
optStr, err := u.rcClient.Fetch(u.getUserInfoKey(userID), u.expireTime, getUserGlobalRecvMsgOpt)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return strconv.Atoi(optStr)
|
||||
return getCache(ctx, u.rcClient, u.getUserGlobalRecvMsgOptKey(userID), u.expireTime, func(ctx context.Context) (int, error) {
|
||||
return u.userDB.GetUserGlobalRecvMsgOpt(ctx, userID)
|
||||
})
|
||||
}
|
||||
|
||||
func (u *UserCacheRedis) DelUserGlobalRecvMsgOpt(ctx context.Context, userID string) (err error) {
|
||||
return u.rcClient.TagAsDeleted(u.getUserGlobalRecvMsgOptKey(userID))
|
||||
func (u *UserCacheRedis) DelUsersGlobalRecvMsgOpt(userIDs []string) UserCache {
|
||||
var keys []string
|
||||
for _, userID := range userIDs {
|
||||
keys = append(keys, u.getUserGlobalRecvMsgOptKey(userID))
|
||||
}
|
||||
cache := u.NewCache()
|
||||
cache.AddKeys(keys...)
|
||||
return cache
|
||||
}
|
||||
|
@ -2,6 +2,7 @@ package controller
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/tokenverify"
|
||||
|
@ -3,6 +3,8 @@ package controller
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
|
||||
"gorm.io/gorm"
|
||||
@ -15,16 +17,18 @@ type BlackDatabase interface {
|
||||
Delete(ctx context.Context, blacks []*relation.BlackModel) (err error)
|
||||
// FindOwnerBlacks 获取黑名单列表
|
||||
FindOwnerBlacks(ctx context.Context, ownerUserID string, pageNumber, showNumber int32) (blacks []*relation.BlackModel, total int64, err error)
|
||||
FindBlackIDs(ctx context.Context, ownerUserID string) (blackIDs []string, err error)
|
||||
// CheckIn 检查user2是否在user1的黑名单列表中(inUser1Blacks==true) 检查user1是否在user2的黑名单列表中(inUser2Blacks==true)
|
||||
CheckIn(ctx context.Context, userID1, userID2 string) (inUser1Blacks bool, inUser2Blacks bool, err error)
|
||||
}
|
||||
|
||||
type blackDatabase struct {
|
||||
black relation.BlackModelInterface
|
||||
cache cache.BlackCache
|
||||
}
|
||||
|
||||
func NewBlackDatabase(black relation.BlackModelInterface) BlackDatabase {
|
||||
return &blackDatabase{black}
|
||||
func NewBlackDatabase(black relation.BlackModelInterface, cache cache.BlackCache) BlackDatabase {
|
||||
return &blackDatabase{black, cache}
|
||||
}
|
||||
|
||||
// Create 增加黑名单
|
||||
@ -66,3 +70,7 @@ func (b *blackDatabase) CheckIn(ctx context.Context, userID1, userID2 string) (i
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (b *blackDatabase) FindBlackIDs(ctx context.Context, ownerUserID string) (blackIDs []string, err error) {
|
||||
return b.cache.GetBlackIDs(ctx, ownerUserID)
|
||||
}
|
||||
|
@ -6,7 +6,7 @@ import (
|
||||
)
|
||||
|
||||
type ChatLogDatabase interface {
|
||||
CreateChatLog(msg pbMsg.MsgDataToMQ) error
|
||||
CreateChatLog(msg *pbMsg.MsgDataToMQ) error
|
||||
GetChatLog(chatLog *relationTb.ChatLogModel, pageNumber, showNumber int32, contentTypes []int32) (int64, []relationTb.ChatLogModel, error)
|
||||
}
|
||||
|
||||
@ -18,7 +18,7 @@ type chatLogDatabase struct {
|
||||
chatLogModel relationTb.ChatLogModelInterface
|
||||
}
|
||||
|
||||
func (c *chatLogDatabase) CreateChatLog(msg pbMsg.MsgDataToMQ) error {
|
||||
func (c *chatLogDatabase) CreateChatLog(msg *pbMsg.MsgDataToMQ) error {
|
||||
return c.chatLogModel.Create(msg)
|
||||
}
|
||||
|
||||
|
@ -2,7 +2,6 @@ package controller
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
|
||||
@ -12,10 +11,8 @@ import (
|
||||
)
|
||||
|
||||
type ConversationDatabase interface {
|
||||
//GetUserIDExistConversation 获取拥有该会话的的用户ID列表
|
||||
GetUserIDExistConversation(ctx context.Context, userIDList []string, conversationID string) ([]string, error)
|
||||
//UpdateUserConversationFiled 更新用户该会话的属性信息
|
||||
UpdateUsersConversationFiled(ctx context.Context, UserIDList []string, conversationID string, args map[string]interface{}) error
|
||||
UpdateUsersConversationFiled(ctx context.Context, userIDs []string, conversationID string, args map[string]interface{}) error
|
||||
//CreateConversation 创建一批新的会话
|
||||
CreateConversation(ctx context.Context, conversations []*relationTb.ConversationModel) error
|
||||
//SyncPeerUserPrivateConversation 同步对端私聊会话内部保证事务操作
|
||||
@ -29,7 +26,7 @@ type ConversationDatabase interface {
|
||||
//SetUserConversations 设置用户多个会话属性,如果会话不存在则创建,否则更新,内部保证原子性
|
||||
SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationTb.ConversationModel) error
|
||||
//SetUsersConversationFiledTx 设置多个用户会话关于某个字段的更新操作,如果会话不存在则创建,否则更新,内部保证事务操作
|
||||
SetUsersConversationFiledTx(ctx context.Context, userIDList []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) error
|
||||
SetUsersConversationFiledTx(ctx context.Context, userIDs []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) error
|
||||
}
|
||||
|
||||
func NewConversationDatabase(conversation relationTb.ConversationModelInterface, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase {
|
||||
@ -46,22 +43,22 @@ type ConversationDataBase struct {
|
||||
tx tx.Tx
|
||||
}
|
||||
|
||||
func (c *ConversationDataBase) SetUsersConversationFiledTx(ctx context.Context, userIDList []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) error {
|
||||
func (c *ConversationDataBase) SetUsersConversationFiledTx(ctx context.Context, userIDs []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) error {
|
||||
return c.tx.Transaction(func(tx any) error {
|
||||
conversationTx := c.conversationDB.NewTx(tx)
|
||||
haveUserID, err := conversationTx.FindUserID(ctx, userIDList, conversation.ConversationID)
|
||||
haveUserIDs, err := conversationTx.FindUserID(ctx, userIDs, conversation.ConversationID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(haveUserID) > 0 {
|
||||
err = conversationTx.UpdateByMap(ctx, haveUserID, conversation.ConversationID, filedMap)
|
||||
if len(haveUserIDs) > 0 {
|
||||
err = conversationTx.UpdateByMap(ctx, haveUserIDs, conversation.ConversationID, filedMap)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
NotUserID := utils.DifferenceString(haveUserID, userIDList)
|
||||
NotUserIDs := utils.DifferenceString(haveUserIDs, userIDs)
|
||||
var cList []*relationTb.ConversationModel
|
||||
for _, v := range NotUserID {
|
||||
for _, v := range NotUserIDs {
|
||||
temp := new(relationTb.ConversationModel)
|
||||
if err := utils.CopyStructFields(temp, conversation); err != nil {
|
||||
return err
|
||||
@ -73,26 +70,17 @@ func (c *ConversationDataBase) SetUsersConversationFiledTx(ctx context.Context,
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(NotUserID) > 0 {
|
||||
err = c.cache.DelUsersConversationIDs(ctx, NotUserID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
err = c.cache.DelUsersConversation(ctx, haveUserID, conversation.ConversationID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
// clear cache
|
||||
return c.cache.DelConversationIDs(NotUserIDs).DelUsersConversation(haveUserIDs, conversation.ConversationID).ExecDel(ctx)
|
||||
})
|
||||
}
|
||||
|
||||
func (c *ConversationDataBase) GetUserIDExistConversation(ctx context.Context, userIDList []string, conversationID string) ([]string, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (c *ConversationDataBase) UpdateUsersConversationFiled(ctx context.Context, UserIDList []string, conversationID string, args map[string]interface{}) error {
|
||||
panic("implement me")
|
||||
func (c *ConversationDataBase) UpdateUsersConversationFiled(ctx context.Context, userIDs []string, conversationID string, args map[string]interface{}) error {
|
||||
err := c.conversationDB.UpdateByMap(ctx, userIDs, conversationID, args)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return c.cache.DelUsersConversation(userIDs, conversationID).ExecDel(ctx)
|
||||
}
|
||||
|
||||
func (c *ConversationDataBase) CreateConversation(ctx context.Context, conversations []*relationTb.ConversationModel) error {
|
||||
@ -100,7 +88,6 @@ func (c *ConversationDataBase) CreateConversation(ctx context.Context, conversat
|
||||
if err := c.conversationDB.NewTx(tx).Create(ctx, conversations); err != nil {
|
||||
return err
|
||||
}
|
||||
// clear cache
|
||||
return nil
|
||||
})
|
||||
}
|
||||
@ -109,20 +96,20 @@ func (c *ConversationDataBase) SyncPeerUserPrivateConversationTx(ctx context.Con
|
||||
return c.tx.Transaction(func(tx any) error {
|
||||
userIDList := []string{conversation.OwnerUserID, conversation.UserID}
|
||||
conversationTx := c.conversationDB.NewTx(tx)
|
||||
haveUserID, err := conversationTx.FindUserID(ctx, userIDList, conversation.ConversationID)
|
||||
haveUserIDs, err := conversationTx.FindUserID(ctx, userIDList, conversation.ConversationID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
filedMap := map[string]interface{}{"is_private_chat": conversation.IsPrivateChat}
|
||||
if len(haveUserID) > 0 {
|
||||
err = conversationTx.UpdateByMap(ctx, haveUserID, conversation.ConversationID, filedMap)
|
||||
if len(haveUserIDs) > 0 {
|
||||
err = conversationTx.UpdateByMap(ctx, haveUserIDs, conversation.ConversationID, filedMap)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
NotUserID := utils.DifferenceString(haveUserID, userIDList)
|
||||
NotUserIDs := utils.DifferenceString(haveUserIDs, userIDList)
|
||||
var cList []*relationTb.ConversationModel
|
||||
for _, v := range NotUserID {
|
||||
for _, v := range NotUserIDs {
|
||||
temp := new(relationTb.ConversationModel)
|
||||
if v == conversation.UserID {
|
||||
temp.OwnerUserID = conversation.UserID
|
||||
@ -138,128 +125,71 @@ func (c *ConversationDataBase) SyncPeerUserPrivateConversationTx(ctx context.Con
|
||||
}
|
||||
cList = append(cList, temp)
|
||||
}
|
||||
if len(NotUserID) > 0 {
|
||||
if len(NotUserIDs) > 0 {
|
||||
err = c.conversationDB.Create(ctx, cList)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
err = c.cache.DelUsersConversationIDs(ctx, NotUserID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = c.cache.DelUsersConversation(ctx, haveUserID, conversation.ConversationID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
// clear cache
|
||||
return c.cache.DelConversationIDs(NotUserIDs).DelUsersConversation(haveUserIDs, conversation.ConversationID).ExecDel(ctx)
|
||||
})
|
||||
}
|
||||
|
||||
func (c *ConversationDataBase) FindConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationTb.ConversationModel, error) {
|
||||
getConversation := func() (string, error) {
|
||||
conversationList, err := c.conversationDB.Find(ctx, ownerUserID, conversationIDs)
|
||||
if err != nil {
|
||||
return "", utils.Wrap(err, "get failed")
|
||||
}
|
||||
bytes, err := json.Marshal(conversationList)
|
||||
if err != nil {
|
||||
return "", utils.Wrap(err, "Marshal failed")
|
||||
}
|
||||
return string(bytes), nil
|
||||
}
|
||||
return c.cache.GetConversations(ctx, ownerUserID, conversationIDs, getConversation)
|
||||
return c.cache.GetConversations(ctx, ownerUserID, conversationIDs)
|
||||
}
|
||||
|
||||
func (c *ConversationDataBase) GetConversation(ctx context.Context, ownerUserID string, conversationID string) (*relationTb.ConversationModel, error) {
|
||||
getConversation := func() (string, error) {
|
||||
conversationList, err := c.conversationDB.Take(ctx, ownerUserID, conversationID)
|
||||
if err != nil {
|
||||
return "", utils.Wrap(err, "get failed")
|
||||
}
|
||||
bytes, err := json.Marshal(conversationList)
|
||||
if err != nil {
|
||||
return "", utils.Wrap(err, "Marshal failed")
|
||||
}
|
||||
return string(bytes), nil
|
||||
}
|
||||
return c.cache.GetConversation(ctx, ownerUserID, conversationID, getConversation)
|
||||
return c.cache.GetConversation(ctx, ownerUserID, conversationID)
|
||||
}
|
||||
|
||||
func (c *ConversationDataBase) GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*relationTb.ConversationModel, error) {
|
||||
getConversationIDs := func() (string, error) {
|
||||
conversationIDs, err := c.conversationDB.FindUserIDAllConversationID(ctx, ownerUserID)
|
||||
if err != nil {
|
||||
return "", utils.Wrap(err, "getConversationIDList failed")
|
||||
}
|
||||
bytes, err := json.Marshal(conversationIDs)
|
||||
if err != nil {
|
||||
return "", utils.Wrap(err, "")
|
||||
}
|
||||
return string(bytes), nil
|
||||
}
|
||||
conversationIDList, err := c.cache.GetUserConversationIDs(ctx, ownerUserID, getConversationIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var conversations []*relationTb.ConversationModel
|
||||
for _, conversationID := range conversationIDList {
|
||||
conversation, err := c.GetConversation(ctx, ownerUserID, conversationID)
|
||||
if err != nil {
|
||||
return nil, utils.Wrap(err, "GetConversation failed")
|
||||
}
|
||||
conversations = append(conversations, conversation)
|
||||
}
|
||||
return conversations, nil
|
||||
return c.cache.GetUserAllConversations(ctx, ownerUserID)
|
||||
}
|
||||
|
||||
func (c *ConversationDataBase) SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationTb.ConversationModel) error {
|
||||
return c.tx.Transaction(func(tx any) error {
|
||||
var conversationIDList []string
|
||||
var conversationIDs []string
|
||||
for _, conversation := range conversations {
|
||||
conversationIDList = append(conversationIDList, conversation.ConversationID)
|
||||
conversationIDs = append(conversationIDs, conversation.ConversationID)
|
||||
}
|
||||
conversationTx := c.conversationDB.NewTx(tx)
|
||||
haveConversations, err := conversationTx.Find(ctx, ownerUserID, conversationIDList)
|
||||
existConversations, err := conversationTx.Find(ctx, ownerUserID, conversationIDs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(haveConversations) > 0 {
|
||||
if len(existConversations) > 0 {
|
||||
err = conversationTx.Update(ctx, conversations)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
var haveConversationID []string
|
||||
for _, conversation := range haveConversations {
|
||||
haveConversationID = append(haveConversationID, conversation.ConversationID)
|
||||
var existConversationIDs []string
|
||||
for _, conversation := range existConversations {
|
||||
existConversationIDs = append(existConversationIDs, conversation.ConversationID)
|
||||
}
|
||||
|
||||
NotConversationID := utils.DifferenceString(haveConversationID, conversationIDList)
|
||||
var NotConversations []*relationTb.ConversationModel
|
||||
var notExistConversations []*relationTb.ConversationModel
|
||||
for _, conversation := range conversations {
|
||||
if !utils.IsContain(conversation.ConversationID, haveConversationID) {
|
||||
NotConversations = append(NotConversations, conversation)
|
||||
if !utils.IsContain(conversation.ConversationID, existConversationIDs) {
|
||||
notExistConversations = append(notExistConversations, conversation)
|
||||
}
|
||||
}
|
||||
if len(NotConversations) > 0 {
|
||||
err = c.conversationDB.Create(ctx, NotConversations)
|
||||
if len(notExistConversations) > 0 {
|
||||
err = c.conversationDB.Create(ctx, notExistConversations)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
err = c.cache.DelUsersConversationIDs(ctx, NotConversationID)
|
||||
if err != nil {
|
||||
return err
|
||||
cache := c.cache.NewCache()
|
||||
if len(notExistConversations) > 0 {
|
||||
cache = cache.DelConversationIDs([]string{ownerUserID})
|
||||
}
|
||||
err = c.cache.DelUserConversations(ctx, ownerUserID, haveConversationID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
return cache.DelConvsersations(ownerUserID, existConversationIDs).ExecDel(ctx)
|
||||
})
|
||||
}
|
||||
|
||||
func (c *ConversationDataBase) FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) {
|
||||
return c.conversationDB.FindRecvMsgNotNotifyUserIDs(ctx, groupID)
|
||||
return c.cache.GetSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID)
|
||||
}
|
||||
|
@ -2,7 +2,10 @@ package controller
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
|
||||
unRelationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/tx"
|
||||
)
|
||||
|
||||
// for mongoDB
|
||||
@ -18,10 +21,12 @@ type ExtendMsgDatabase interface {
|
||||
|
||||
type extendMsgDatabase struct {
|
||||
database unRelationTb.ExtendMsgSetModelInterface
|
||||
cache cache.ExtendMsgSetCache
|
||||
ctxTx tx.CtxTx
|
||||
}
|
||||
|
||||
func NewExtendMsgDatabase(extendMsgModel unRelationTb.ExtendMsgSetModelInterface) ExtendMsgDatabase {
|
||||
return &extendMsgDatabase{database: extendMsgModel}
|
||||
func NewExtendMsgDatabase(extendMsgModel unRelationTb.ExtendMsgSetModelInterface, cache cache.ExtendMsgSetCache, ctxTx tx.CtxTx) ExtendMsgDatabase {
|
||||
return &extendMsgDatabase{database: extendMsgModel, cache: cache, ctxTx: ctxTx}
|
||||
}
|
||||
|
||||
func (e *extendMsgDatabase) CreateExtendMsgSet(ctx context.Context, set *unRelationTb.ExtendMsgSetModel) error {
|
||||
@ -41,12 +46,14 @@ func (e *extendMsgDatabase) InsertExtendMsg(ctx context.Context, sourceID string
|
||||
}
|
||||
|
||||
func (e *extendMsgDatabase) InsertOrUpdateReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*unRelationTb.KeyValueModel) error {
|
||||
e.cache.DelExtendMsg(clientMsgID).ExecDel(ctx)
|
||||
return e.database.InsertOrUpdateReactionExtendMsgSet(ctx, sourceID, sessionType, clientMsgID, msgFirstModifyTime, reactionExtensionList)
|
||||
}
|
||||
|
||||
func (e *extendMsgDatabase) DeleteReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*unRelationTb.KeyValueModel) error {
|
||||
return e.database.DeleteReactionExtendMsgSet(ctx, sourceID, sessionType, clientMsgID, msgFirstModifyTime, reactionExtensionList)
|
||||
}
|
||||
|
||||
func (e *extendMsgDatabase) GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (extendMsg *unRelationTb.ExtendMsgModel, err error) {
|
||||
return e.database.TakeExtendMsg(ctx, sourceID, sessionType, clientMsgID, maxMsgUpdateTime)
|
||||
return e.cache.GetExtendMsg(ctx, sourceID, sessionType, clientMsgID, maxMsgUpdateTime)
|
||||
}
|
||||
|
@ -5,6 +5,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/tx"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
|
||||
@ -45,10 +46,11 @@ type friendDatabase struct {
|
||||
friend relation.FriendModelInterface
|
||||
friendRequest relation.FriendRequestModelInterface
|
||||
tx tx.Tx
|
||||
cache cache.FriendCache
|
||||
}
|
||||
|
||||
func NewFriendDatabase(friend relation.FriendModelInterface, friendRequest relation.FriendRequestModelInterface, tx tx.Tx) FriendDatabase {
|
||||
return &friendDatabase{friend: friend, friendRequest: friendRequest, tx: tx}
|
||||
func NewFriendDatabase(friend relation.FriendModelInterface, friendRequest relation.FriendRequestModelInterface, cache cache.FriendCache, tx tx.Tx) FriendDatabase {
|
||||
return &friendDatabase{friend: friend, friendRequest: friendRequest, cache: cache, tx: tx}
|
||||
}
|
||||
|
||||
// ok 检查user2是否在user1的好友列表中(inUser1Friends==true) 检查user1是否在user2的好友列表中(inUser2Friends==true)
|
||||
@ -116,13 +118,14 @@ func (f *friendDatabase) BecomeFriends(ctx context.Context, ownerUserID string,
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
fs2, err := f.friend.NewTx(tx).FindReversalFriends(ctx, ownerUserID, friendUserIDs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var newFriendIDs []string
|
||||
for _, v := range friendUserIDs {
|
||||
fs2 = append(fs2, &relation.FriendModel{OwnerUserID: v, FriendUserID: ownerUserID, AddSource: addSource, OperatorUserID: opUserID})
|
||||
newFriendIDs = append(newFriendIDs, v)
|
||||
}
|
||||
fs22 := utils.DistinctAny(fs2, func(e *relation.FriendModel) string {
|
||||
return e.OwnerUserID
|
||||
@ -131,7 +134,8 @@ func (f *friendDatabase) BecomeFriends(ctx context.Context, ownerUserID string,
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
newFriendIDs = append(newFriendIDs, ownerUserID)
|
||||
return f.cache.DelFriendIDs(newFriendIDs...).ExecDel(ctx)
|
||||
})
|
||||
}
|
||||
|
||||
@ -200,18 +204,30 @@ func (f *friendDatabase) AgreeFriendRequest(ctx context.Context, friendRequest *
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
return f.cache.DelFriendIDs(ownerUserID, friendRequest.ToUserID).ExecDel(ctx)
|
||||
})
|
||||
}
|
||||
|
||||
// 删除好友 外部判断是否好友关系
|
||||
func (f *friendDatabase) Delete(ctx context.Context, ownerUserID string, friendUserIDs []string) (err error) {
|
||||
return f.friend.Delete(ctx, ownerUserID, friendUserIDs)
|
||||
return f.tx.Transaction(func(tx any) error {
|
||||
if err := f.friend.Delete(ctx, ownerUserID, friendUserIDs); err != nil {
|
||||
return err
|
||||
}
|
||||
return f.cache.DelFriendIDs(append(friendUserIDs, ownerUserID)...).ExecDel(ctx)
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
// 更新好友备注 零值也支持
|
||||
func (f *friendDatabase) UpdateRemark(ctx context.Context, ownerUserID, friendUserID, remark string) (err error) {
|
||||
return f.friend.UpdateRemark(ctx, ownerUserID, friendUserID, remark)
|
||||
return f.tx.Transaction(func(tx any) error {
|
||||
err := f.friend.UpdateRemark(ctx, ownerUserID, friendUserID, remark)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return f.cache.DelFriend(ownerUserID, friendUserID).ExecDel(ctx)
|
||||
})
|
||||
}
|
||||
|
||||
// 获取ownerUserID的好友列表 无结果不返回错误
|
||||
@ -247,5 +263,5 @@ func (f *friendDatabase) FindFriendsWithError(ctx context.Context, ownerUserID s
|
||||
}
|
||||
|
||||
func (f *friendDatabase) FindFriendUserIDs(ctx context.Context, ownerUserID string) (friendUserIDs []string, err error) {
|
||||
return f.friend.FindFriendUserIDs(ctx, ownerUserID)
|
||||
return f.cache.GetFriendIDs(ctx, ownerUserID)
|
||||
}
|
||||
|
@ -3,6 +3,7 @@ package controller
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/relation"
|
||||
@ -104,26 +105,26 @@ func (g *groupDatabase) GetGroupIDsByGroupType(ctx context.Context, groupType in
|
||||
return g.groupDB.GetGroupIDsByGroupType(ctx, groupType)
|
||||
}
|
||||
|
||||
func (g *groupDatabase) delGroupMemberCache(ctx context.Context, groupID string, userIDs []string) error {
|
||||
for _, userID := range userIDs {
|
||||
if err := g.cache.DelJoinedGroupID(ctx, userID); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := g.cache.DelJoinedSuperGroupIDs(ctx, userID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err := g.cache.DelGroupMemberIDs(ctx, groupID); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := g.cache.DelGroupMemberNum(ctx, groupID); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := g.cache.DelGroupMembersHash(ctx, groupID); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
// func (g *groupDatabase) delGroupMemberCache(ctx context.Context, groupID string, userIDs []string) error {
|
||||
// for _, userID := range userIDs {
|
||||
// if err := g.cache.DelJoinedGroupID(ctx, userID); err != nil {
|
||||
// return err
|
||||
// }
|
||||
// if err := g.cache.DelJoinedSuperGroupIDs(ctx, userID); err != nil {
|
||||
// return err
|
||||
// }
|
||||
// }
|
||||
// if err := g.cache.DelGroupMemberIDs(ctx, groupID); err != nil {
|
||||
// return err
|
||||
// }
|
||||
// if err := g.cache.DelGroupMemberNum(ctx, groupID); err != nil {
|
||||
// return err
|
||||
// }
|
||||
// if err := g.cache.DelGroupMembersHash(ctx, groupID); err != nil {
|
||||
// return err
|
||||
// }
|
||||
// return nil
|
||||
// }
|
||||
|
||||
func (g *groupDatabase) FindGroupMemberUserID(ctx context.Context, groupID string) ([]string, error) {
|
||||
return g.cache.GetGroupMemberIDs(ctx, groupID)
|
||||
@ -162,10 +163,7 @@ func (g *groupDatabase) UpdateGroup(ctx context.Context, groupID string, data ma
|
||||
if err := g.groupDB.NewTx(tx).UpdateMap(ctx, groupID, data); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := g.cache.DelGroupInfo(ctx, groupID); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
return g.cache.DelGroupsInfo(groupID).ExecDel(ctx)
|
||||
})
|
||||
}
|
||||
|
||||
@ -181,10 +179,7 @@ func (g *groupDatabase) DismissGroup(ctx context.Context, groupID string) error
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := g.delGroupMemberCache(ctx, groupID, userIDs); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
return g.cache.DelJoinedGroupID(userIDs...).DelGroupsInfo(groupID).DelGroupMemberIDs(groupID).DelGroupsMemberNum(groupID).DelGroupMembersHash(groupID).ExecDel(ctx)
|
||||
})
|
||||
}
|
||||
|
||||
@ -197,7 +192,7 @@ func (g *groupDatabase) TakeGroupOwner(ctx context.Context, groupID string) (*re
|
||||
}
|
||||
|
||||
func (g *groupDatabase) FindGroupMember(ctx context.Context, groupIDs []string, userIDs []string, roleLevels []int32) ([]*relationTb.GroupMemberModel, error) {
|
||||
return g.groupMemberDB.Find(ctx, groupIDs, userIDs, roleLevels) // todo cache group find
|
||||
return g.cache.GetGroupMembersInfo(ctx, groupIDs[0], userIDs, roleLevels) // todo cache group find
|
||||
}
|
||||
|
||||
func (g *groupDatabase) PageGroupMember(ctx context.Context, groupIDs []string, userIDs []string, roleLevels []int32, pageNumber, showNumber int32) (uint32, []*relationTb.GroupMemberModel, error) {
|
||||
@ -217,9 +212,7 @@ func (g *groupDatabase) HandlerGroupRequest(ctx context.Context, groupID string,
|
||||
if err := g.groupMemberDB.NewTx(tx).Create(ctx, []*relationTb.GroupMemberModel{member}); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := g.delGroupMemberCache(ctx, groupID, []string{userID}); err != nil {
|
||||
return err
|
||||
}
|
||||
return g.cache.DelGroupMembersHash(groupID).DelGroupMemberIDs(groupID).DelGroupsMemberNum(groupID).DelJoinedGroupID(member.UserID).ExecDel(ctx)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
@ -230,15 +223,12 @@ func (g *groupDatabase) DeleteGroupMember(ctx context.Context, groupID string, u
|
||||
if err := g.groupMemberDB.NewTx(tx).Delete(ctx, groupID, userIDs); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := g.delGroupMemberCache(ctx, groupID, userIDs); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
return g.cache.DelGroupMembersHash(groupID).DelGroupMemberIDs(groupID).DelGroupsMemberNum(groupID).DelJoinedGroupID(userIDs...).ExecDel(ctx)
|
||||
})
|
||||
}
|
||||
|
||||
func (g *groupDatabase) MapGroupMemberUserID(ctx context.Context, groupIDs []string) (map[string]*relationTb.GroupSimpleUserID, error) {
|
||||
return g.cache.GetGroupMemberHash1(ctx, groupIDs)
|
||||
return g.cache.GetGroupMemberHashMap(ctx, groupIDs)
|
||||
}
|
||||
|
||||
func (g *groupDatabase) MapGroupMemberNum(ctx context.Context, groupIDs []string) (map[string]uint32, error) {
|
||||
@ -261,10 +251,7 @@ func (g *groupDatabase) TransferGroupOwner(ctx context.Context, groupID string,
|
||||
if rowsAffected != 1 {
|
||||
return utils.Wrap(fmt.Errorf("newOwnerUserID %s rowsAffected = %d", newOwnerUserID, rowsAffected), "")
|
||||
}
|
||||
if err := g.delGroupMemberCache(ctx, groupID, []string{oldOwnerUserID, newOwnerUserID}); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
return g.cache.DelGroupMembersInfo(groupID, oldOwnerUserID, newOwnerUserID).ExecDel(ctx)
|
||||
})
|
||||
}
|
||||
|
||||
@ -273,24 +260,20 @@ func (g *groupDatabase) UpdateGroupMember(ctx context.Context, groupID string, u
|
||||
if err := g.groupMemberDB.NewTx(tx).Update(ctx, groupID, userID, data); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := g.cache.DelGroupMemberInfo(ctx, groupID, userID); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
return g.cache.DelGroupMembersInfo(groupID, userID).ExecDel(ctx)
|
||||
})
|
||||
}
|
||||
|
||||
func (g *groupDatabase) UpdateGroupMembers(ctx context.Context, data []*relationTb.BatchUpdateGroupMember) error {
|
||||
return g.tx.Transaction(func(tx any) error {
|
||||
var cache = g.cache.NewCache()
|
||||
for _, item := range data {
|
||||
if err := g.groupMemberDB.NewTx(tx).Update(ctx, item.GroupID, item.UserID, item.Map); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := g.cache.DelGroupMemberInfo(ctx, item.GroupID, item.UserID); err != nil {
|
||||
return err
|
||||
}
|
||||
cache = cache.DelGroupMembersInfo(item.GroupID, item.UserID)
|
||||
}
|
||||
return nil
|
||||
return cache.ExecDel(ctx)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -2,6 +2,7 @@ package relation
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
|
||||
pbMsg "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
|
||||
@ -21,7 +22,7 @@ func NewChatLogGorm(db *gorm.DB) relation.ChatLogModelInterface {
|
||||
return &ChatLogGorm{NewMetaDB(db, &relation.ChatLogModel{})}
|
||||
}
|
||||
|
||||
func (c *ChatLogGorm) Create(msg pbMsg.MsgDataToMQ) error {
|
||||
func (c *ChatLogGorm) Create(msg *pbMsg.MsgDataToMQ) error {
|
||||
chatLog := new(relation.ChatLogModel)
|
||||
copier.Copy(chatLog, msg.MsgData)
|
||||
switch msg.MsgData.SessionType {
|
||||
|
@ -2,6 +2,7 @@ package relation
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
|
||||
@ -21,43 +22,56 @@ func (c *ConversationGorm) NewTx(tx any) relation.ConversationModelInterface {
|
||||
}
|
||||
|
||||
func (c *ConversationGorm) Create(ctx context.Context, conversations []*relation.ConversationModel) (err error) {
|
||||
return utils.Wrap(c.DB.Create(&conversations).Error, "")
|
||||
return utils.Wrap(c.db(ctx).Create(&conversations).Error, "")
|
||||
}
|
||||
|
||||
func (c *ConversationGorm) Delete(ctx context.Context, groupIDs []string) (err error) {
|
||||
return utils.Wrap(c.DB.Where("group_id in (?)", groupIDs).Delete(&relation.ConversationModel{}).Error, "")
|
||||
return utils.Wrap(c.db(ctx).Where("group_id in (?)", groupIDs).Delete(&relation.ConversationModel{}).Error, "")
|
||||
}
|
||||
|
||||
func (c *ConversationGorm) UpdateByMap(ctx context.Context, userIDList []string, conversationID string, args map[string]interface{}) (err error) {
|
||||
return utils.Wrap(c.DB.Model(&relation.ConversationModel{}).Where("owner_user_id IN (?) and conversation_id=?", userIDList, conversationID).Updates(args).Error, "")
|
||||
return utils.Wrap(c.db(ctx).Where("owner_user_id IN (?) and conversation_id=?", userIDList, conversationID).Updates(args).Error, "")
|
||||
}
|
||||
|
||||
func (c *ConversationGorm) Update(ctx context.Context, conversations []*relation.ConversationModel) (err error) {
|
||||
return utils.Wrap(c.DB.Updates(&conversations).Error, "")
|
||||
return utils.Wrap(c.db(ctx).Updates(&conversations).Error, "")
|
||||
}
|
||||
|
||||
func (c *ConversationGorm) Find(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []*relation.ConversationModel, err error) {
|
||||
err = utils.Wrap(c.DB.Where("owner_user_id=? and conversation_id IN (?)", ownerUserID, conversationIDs).Find(&conversations).Error, "")
|
||||
err = utils.Wrap(c.db(ctx).Where("owner_user_id=? and conversation_id IN (?)", ownerUserID, conversationIDs).Find(&conversations).Error, "")
|
||||
return conversations, err
|
||||
}
|
||||
|
||||
func (c *ConversationGorm) Take(ctx context.Context, userID, conversationID string) (conversation *relation.ConversationModel, err error) {
|
||||
cc := &relation.ConversationModel{}
|
||||
return cc, utils.Wrap(c.DB.Where("conversation_id = ? And owner_user_id = ?", conversationID, userID).Take(cc).Error, "")
|
||||
return cc, utils.Wrap(c.db(ctx).Where("conversation_id = ? And owner_user_id = ?", conversationID, userID).Take(cc).Error, "")
|
||||
}
|
||||
|
||||
func (c *ConversationGorm) FindUserID(ctx context.Context, userIDList []string, conversationID string) (existUserID []string, err error) {
|
||||
return existUserID, utils.Wrap(c.DB.Where(" owner_user_id IN (?) and conversation_id=?", userIDList, conversationID).Pluck("owner_user_id", &existUserID).Error, "")
|
||||
return existUserID, utils.Wrap(c.db(ctx).Where(" owner_user_id IN (?) and conversation_id=?", userIDList, conversationID).Pluck("owner_user_id", &existUserID).Error, "")
|
||||
}
|
||||
|
||||
func (c *ConversationGorm) FindConversationID(ctx context.Context, userID string, conversationIDList []string) (existConversationID []string, err error) {
|
||||
return existConversationID, utils.Wrap(c.DB.Where(" conversation_id IN (?) and owner_user_id=?", conversationIDList, userID).Pluck("conversation_id", &existConversationID).Error, "")
|
||||
return existConversationID, utils.Wrap(c.db(ctx).Where(" conversation_id IN (?) and owner_user_id=?", conversationIDList, userID).Pluck("conversation_id", &existConversationID).Error, "")
|
||||
}
|
||||
|
||||
func (c *ConversationGorm) FindUserIDAllConversationID(ctx context.Context, userID string) (conversationIDList []string, err error) {
|
||||
return conversationIDList, utils.Wrap(c.DB.Model(&relation.ConversationModel{}).Where("owner_user_id=?", userID).Pluck("conversation_id", &conversationIDList).Error, "")
|
||||
return conversationIDList, utils.Wrap(c.db(ctx).Where("owner_user_id=?", userID).Pluck("conversation_id", &conversationIDList).Error, "")
|
||||
}
|
||||
|
||||
func (c *ConversationGorm) FindUserIDAllConversations(ctx context.Context, userID string) (conversations []*relation.ConversationModel, err error) {
|
||||
return conversations, utils.Wrap(c.db(ctx).Where("owner_user_id=?", userID).Find(&conversations).Error, "")
|
||||
}
|
||||
|
||||
func (c *ConversationGorm) FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (userIDs []string, err error) {
|
||||
return userIDs, utils.Wrap(c.DB.Model(&relation.ConversationModel{}).Where("group_id = ? and recv_msg_opt = ?", groupID, constant.ReceiveNotNotifyMessage).Pluck("user_id", &userIDs).Error, "")
|
||||
return userIDs, utils.Wrap(c.db(ctx).Where("group_id = ? and recv_msg_opt = ?", groupID, constant.ReceiveNotNotifyMessage).Pluck("user_id", &userIDs).Error, "")
|
||||
}
|
||||
|
||||
func (c *ConversationGorm) FindSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (userIDs []string, err error) {
|
||||
return userIDs, utils.Wrap(c.db(ctx).Where("group_id = ? and recv_msg_opt = ? and conversation_type = ?", groupID, constant.ReceiveNotNotifyMessage, constant.SuperGroupChatType).Pluck("user_id", &userIDs).Error, "")
|
||||
}
|
||||
|
||||
func (c *ConversationGorm) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error) {
|
||||
var conversation relation.ConversationModel
|
||||
return int(conversation.RecvMsgOpt), utils.Wrap(c.db(ctx).Where("conversation_id = ? And owner_user_id = ?", conversationID, ownerUserID).Select("recv_msg_opt").Find(&conversation).Error, "")
|
||||
}
|
||||
|
@ -2,6 +2,7 @@ package relation
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/ormutil"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
|
||||
@ -46,7 +47,7 @@ func (g *GroupMemberGorm) UpdateRoleLevel(ctx context.Context, groupID string, u
|
||||
return db.RowsAffected, utils.Wrap(db.Error, "")
|
||||
}
|
||||
|
||||
func (g *GroupMemberGorm) Find(ctx context.Context, groupIDs []string, userIDs []string, roleLevels []int32) (groupList []*relation.GroupMemberModel, err error) {
|
||||
func (g *GroupMemberGorm) Find(ctx context.Context, groupIDs []string, userIDs []string, roleLevels []int32) (groupMembers []*relation.GroupMemberModel, err error) {
|
||||
db := g.DB
|
||||
if len(groupIDs) > 0 {
|
||||
db = db.Where("group_id in (?)", groupIDs)
|
||||
@ -57,7 +58,7 @@ func (g *GroupMemberGorm) Find(ctx context.Context, groupIDs []string, userIDs [
|
||||
if len(roleLevels) > 0 {
|
||||
db = db.Where("role_level in (?)", roleLevels)
|
||||
}
|
||||
return groupList, utils.Wrap(db.Find(&groupList).Error, "")
|
||||
return groupMembers, utils.Wrap(db.Find(&groupMembers).Error, "")
|
||||
}
|
||||
|
||||
func (g *GroupMemberGorm) Take(ctx context.Context, groupID string, userID string) (groupMember *relation.GroupMemberModel, err error) {
|
||||
@ -100,3 +101,11 @@ func (g *GroupMemberGorm) FindJoinUserID(ctx context.Context, groupIDs []string)
|
||||
func (g *GroupMemberGorm) FindMemberUserID(ctx context.Context, groupID string) (userIDs []string, err error) {
|
||||
return userIDs, utils.Wrap(g.DB.Model(&relation.GroupMemberModel{}).Where("group_id = ?", groupID).Pluck("user_id", &userIDs).Error, "")
|
||||
}
|
||||
|
||||
func (g *GroupMemberGorm) FindUserJoinedGroupID(ctx context.Context, userID string) (groupIDs []string, err error) {
|
||||
return groupIDs, utils.Wrap(g.DB.Model(&relation.GroupMemberModel{}).Where("user_id = ?", userID).Pluck("group_id", &groupIDs).Error, "")
|
||||
}
|
||||
|
||||
func (g *GroupMemberGorm) TakeGroupMemberNum(ctx context.Context, groupID string) (count int64, err error) {
|
||||
return count, utils.Wrap(g.DB.Model(&relation.GroupMemberModel{}).Where("group_id = ?", groupID).Count(&count).Error, "")
|
||||
}
|
||||
|
@ -2,6 +2,7 @@ package relation
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
|
||||
"gorm.io/gorm"
|
||||
@ -58,3 +59,8 @@ func (u *UserGorm) GetAllUserID(ctx context.Context) (userIDs []string, err erro
|
||||
err = u.db(ctx).Pluck("user_id", &userIDs).Error
|
||||
return userIDs, err
|
||||
}
|
||||
|
||||
func (u *UserGorm) GetUserGlobalRecvMsgOpt(ctx context.Context, userID string) (opt int, err error) {
|
||||
err = u.db(ctx).Model(&relation.UserModel{}).Where("user_id = ?", userID).Pluck("global_recv_msg_opt", &opt).Error
|
||||
return opt, err
|
||||
}
|
||||
|
@ -33,6 +33,6 @@ func (ChatLogModel) TableName() string {
|
||||
}
|
||||
|
||||
type ChatLogModelInterface interface {
|
||||
Create(msg pbMsg.MsgDataToMQ) error
|
||||
Create(msg *pbMsg.MsgDataToMQ) error
|
||||
GetChatLog(chatLog *ChatLogModel, pageNumber, showNumber int32, contentTypes []int32) (int64, []ChatLogModel, error)
|
||||
}
|
||||
|
@ -32,13 +32,16 @@ func (ConversationModel) TableName() string {
|
||||
type ConversationModelInterface interface {
|
||||
Create(ctx context.Context, conversations []*ConversationModel) (err error)
|
||||
Delete(ctx context.Context, groupIDs []string) (err error)
|
||||
UpdateByMap(ctx context.Context, userIDList []string, conversationID string, args map[string]interface{}) (err error)
|
||||
UpdateByMap(ctx context.Context, userIDs []string, conversationID string, args map[string]interface{}) (err error)
|
||||
Update(ctx context.Context, conversations []*ConversationModel) (err error)
|
||||
Find(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []*ConversationModel, err error)
|
||||
FindUserID(ctx context.Context, userIDList []string, conversationID string) ([]string, error)
|
||||
FindUserID(ctx context.Context, userIDs []string, conversationID string) ([]string, error)
|
||||
FindUserIDAllConversationID(ctx context.Context, userID string) ([]string, error)
|
||||
Take(ctx context.Context, userID, conversationID string) (conversation *ConversationModel, err error)
|
||||
FindConversationID(ctx context.Context, userID string, conversationIDList []string) (existConversationID []string, err error)
|
||||
FindConversationID(ctx context.Context, userID string, conversationIDs []string) (existConversationID []string, err error)
|
||||
FindUserIDAllConversations(ctx context.Context, userID string) (conversations []*ConversationModel, err error)
|
||||
FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error)
|
||||
GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error)
|
||||
FindSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error)
|
||||
NewTx(tx any) ConversationModelInterface
|
||||
}
|
||||
|
@ -29,16 +29,18 @@ func (GroupMemberModel) TableName() string {
|
||||
|
||||
type GroupMemberModelInterface interface {
|
||||
NewTx(tx any) GroupMemberModelInterface
|
||||
Create(ctx context.Context, groupMemberList []*GroupMemberModel) (err error)
|
||||
Create(ctx context.Context, groupMembers []*GroupMemberModel) (err error)
|
||||
Delete(ctx context.Context, groupID string, userIDs []string) (err error)
|
||||
DeleteGroup(ctx context.Context, groupIDs []string) (err error)
|
||||
Update(ctx context.Context, groupID string, userID string, data map[string]any) (err error)
|
||||
UpdateRoleLevel(ctx context.Context, groupID string, userID string, roleLevel int32) (rowsAffected int64, err error)
|
||||
Find(ctx context.Context, groupIDs []string, userIDs []string, roleLevels []int32) (groupList []*GroupMemberModel, err error)
|
||||
Find(ctx context.Context, groupIDs []string, userIDs []string, roleLevels []int32) (groupMembers []*GroupMemberModel, err error)
|
||||
FindMemberUserID(ctx context.Context, groupID string) (userIDs []string, err error)
|
||||
Take(ctx context.Context, groupID string, userID string) (groupMember *GroupMemberModel, err error)
|
||||
TakeOwner(ctx context.Context, groupID string) (groupMember *GroupMemberModel, err error)
|
||||
SearchMember(ctx context.Context, keyword string, groupIDs []string, userIDs []string, roleLevels []int32, pageNumber, showNumber int32) (total uint32, groupList []*GroupMemberModel, err error)
|
||||
MapGroupMemberNum(ctx context.Context, groupIDs []string) (count map[string]uint32, err error)
|
||||
FindJoinUserID(ctx context.Context, groupIDs []string) (groupUsers map[string][]string, err error)
|
||||
FindUserJoinedGroupID(ctx context.Context, userID string) (groupIDs []string, err error)
|
||||
TakeGroupMemberNum(ctx context.Context, groupID string) (count int64, err error)
|
||||
}
|
||||
|
@ -39,4 +39,5 @@ type UserModelInterface interface {
|
||||
// 获取用户信息 不存在,不返回错误
|
||||
Page(ctx context.Context, pageNumber, showNumber int32) (users []*UserModel, count int64, err error)
|
||||
GetAllUserID(ctx context.Context) (userIDs []string, err error)
|
||||
GetUserGlobalRecvMsgOpt(ctx context.Context, userID string) (opt int, err error)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user