diff --git a/go.mod b/go.mod index 9a576660a..39a069027 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/Shopify/sarama v1.32.0 github.com/antonfisher/nested-logrus-formatter v1.3.1 github.com/bwmarrin/snowflake v0.3.0 - github.com/dtm-labs/rockscache v0.0.11 + github.com/dtm-labs/rockscache v0.1.0 github.com/gin-gonic/gin v1.8.2 github.com/go-playground/validator/v10 v10.11.1 github.com/go-redis/redis/v8 v8.11.5 diff --git a/go.sum b/go.sum index 8cba84680..ed508e08f 100644 --- a/go.sum +++ b/go.sum @@ -424,8 +424,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= -github.com/dtm-labs/rockscache v0.0.11 h1:V6M+KH9fFRFDXgB+Uux1d6zwhZt1O34sgPwM0wjud9Y= -github.com/dtm-labs/rockscache v0.0.11/go.mod h1:vJmJJmuBNxcio03abYk1QPLmmQo/Kg92jB+28QmLcgY= +github.com/dtm-labs/rockscache v0.1.0 h1:tjJuruAUo/3wzZgQBkdQ+Pgz7GhmQ6qt0BrHHyVy3eY= +github.com/dtm-labs/rockscache v0.1.0/go.mod h1:vJmJJmuBNxcio03abYk1QPLmmQo/Kg92jB+28QmLcgY= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/eapache/go-resiliency v1.2.0 h1:v7g92e/KSN71Rq7vSThKaWIq68fL4YHvWyiUKorFR1Q= diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 577f797a4..96df14c2b 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -2,14 +2,16 @@ package msgtransfer import ( "fmt" + "sync" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/relation" relationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/tx" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/unrelation" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome" - "sync" ) type MsgTransfer struct { @@ -38,9 +40,9 @@ func StartTransfer(prometheusPort int) error { cacheModel := cache.NewCacheModel(rdb) msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase()) extendMsgModel := unrelation.NewExtendMsgSetMongoDriver(mongo.GetDatabase()) - + extendMsgCache := cache.NewExtendMsgSetCacheRedis(rdb, extendMsgModel, cache.GetDefaultOpt()) chatLogDatabase := controller.NewChatLogDatabase(relation.NewChatLogGorm(db)) - extendMsgDatabase := controller.NewExtendMsgDatabase(extendMsgModel) + extendMsgDatabase := controller.NewExtendMsgDatabase(extendMsgModel, extendMsgCache, tx.NewMongo(mongo.GetClient())) msgDatabase := controller.NewMsgDatabase(msgDocModel, cacheModel) msgTransfer := NewMsgTransfer(chatLogDatabase, extendMsgDatabase, msgDatabase) diff --git a/internal/msgtransfer/persistent_msg_handler.go b/internal/msgtransfer/persistent_msg_handler.go index ddc264a2b..7508c49cc 100644 --- a/internal/msgtransfer/persistent_msg_handler.go +++ b/internal/msgtransfer/persistent_msg_handler.go @@ -8,6 +8,7 @@ package msgtransfer import ( "context" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller" @@ -65,7 +66,7 @@ func (pc *PersistentConsumerHandler) handleChatWs2Mysql(ctx context.Context, cMs } if tag { log.NewInfo(operationID, "msg_transfer msg persisting", string(msg)) - if err = pc.chatLogDatabase.CreateChatLog(msgFromMQ); err != nil { + if err = pc.chatLogDatabase.CreateChatLog(&msgFromMQ); err != nil { log.NewError(operationID, "Message insert failed", "err", err.Error(), "msg", msgFromMQ.String()) return } diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index 3beafe75d..94034cf81 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -2,6 +2,7 @@ package push import ( "context" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" kfk "github.com/OpenIMSDK/Open-IM-Server/pkg/common/kafka" diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index 2e0679bf3..e2177c895 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -10,6 +10,7 @@ import ( tableRelation "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/tx" "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry" + "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" pbConversation "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/conversation" "github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient/check" "github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient/notification" @@ -35,9 +36,10 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e if err != nil { return err } + conversationDB := relation.NewConversationGorm(db) pbConversation.RegisterConversationServer(server, &conversationServer{ groupChecker: check.NewGroupChecker(client), - ConversationDatabase: controller.NewConversationDatabase(relation.NewConversationGorm(db), cache.NewConversationRedis(rdb, cache.GetDefaultOpt()), tx.NewGorm(db)), + ConversationDatabase: controller.NewConversationDatabase(conversationDB, cache.NewConversationRedis(rdb, cache.GetDefaultOpt(), conversationDB), tx.NewGorm(db)), }) return nil } @@ -54,7 +56,7 @@ func (c *conversationServer) GetConversation(ctx context.Context, req *pbConvers } return resp, nil } - return nil, nil + return nil, errs.ErrRecordNotFound.Wrap("conversation not found") } func (c *conversationServer) GetAllConversations(ctx context.Context, req *pbConversation.GetAllConversationsReq) (*pbConversation.GetAllConversationsResp, error) { @@ -70,11 +72,11 @@ func (c *conversationServer) GetAllConversations(ctx context.Context, req *pbCon } func (c *conversationServer) GetConversations(ctx context.Context, req *pbConversation.GetConversationsReq) (*pbConversation.GetConversationsResp, error) { - resp := &pbConversation.GetConversationsResp{Conversations: []*pbConversation.Conversation{}} conversations, err := c.ConversationDatabase.FindConversations(ctx, req.OwnerUserID, req.ConversationIDs) if err != nil { return nil, err } + resp := &pbConversation.GetConversationsResp{Conversations: []*pbConversation.Conversation{}} if err := utils.CopyStructFields(&resp.Conversations, conversations); err != nil { return nil, err } @@ -82,7 +84,6 @@ func (c *conversationServer) GetConversations(ctx context.Context, req *pbConver } func (c *conversationServer) BatchSetConversations(ctx context.Context, req *pbConversation.BatchSetConversationsReq) (*pbConversation.BatchSetConversationsResp, error) { - resp := &pbConversation.BatchSetConversationsResp{} var conversations []*tableRelation.ConversationModel if err := utils.CopyStructFields(&conversations, req.Conversations); err != nil { return nil, err @@ -92,15 +93,30 @@ func (c *conversationServer) BatchSetConversations(ctx context.Context, req *pbC return nil, err } c.notify.ConversationChangeNotification(ctx, req.OwnerUserID) + resp := &pbConversation.BatchSetConversationsResp{} return resp, nil } func (c *conversationServer) SetConversation(ctx context.Context, req *pbConversation.SetConversationReq) (*pbConversation.SetConversationResp, error) { - panic("implement me") + var conversation tableRelation.ConversationModel + if err := utils.CopyStructFields(&conversation, req.Conversation); err != nil { + return nil, err + } + err := c.SetUserConversations(ctx, req.Conversation.OwnerUserID, []*tableRelation.ConversationModel{&conversation}) + if err != nil { + return nil, err + } + c.notify.ConversationChangeNotification(ctx, req.Conversation.OwnerUserID) + resp := &pbConversation.SetConversationResp{} + return resp, nil } func (c *conversationServer) SetRecvMsgOpt(ctx context.Context, req *pbConversation.SetRecvMsgOptReq) (*pbConversation.SetRecvMsgOptResp, error) { - panic("implement me") + conversation := tableRelation.ConversationModel{OwnerUserID: req.OwnerUserID, ConversationID: req.ConversationID, RecvMsgOpt: req.RecvMsgOpt} + if err := c.SetUsersConversationFiledTx(ctx, []string{req.OwnerUserID}, &conversation, map[string]interface{}{"recv_msg_opt": req.RecvMsgOpt}); err != nil { + return nil, err + } + return &pbConversation.SetRecvMsgOptResp{}, nil } func (c *conversationServer) ModifyConversationField(ctx context.Context, req *pbConversation.ModifyConversationFieldReq) (*pbConversation.ModifyConversationFieldResp, error) { diff --git a/internal/rpc/friend/friend.go b/internal/rpc/friend/friend.go index a55443b16..7c0608bd7 100644 --- a/internal/rpc/friend/friend.go +++ b/internal/rpc/friend/friend.go @@ -4,6 +4,7 @@ import ( "context" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/relation" tablerelation "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation" @@ -35,9 +36,15 @@ func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error { if err := db.AutoMigrate(&tablerelation.FriendModel{}, &tablerelation.FriendRequestModel{}, &tablerelation.BlackModel{}); err != nil { return err } + rdb, err := cache.NewRedis() + if err != nil { + return err + } + blackDB := relation.NewBlackGorm(db) + friendDB := relation.NewFriendGorm(db) pbfriend.RegisterFriendServer(server, &friendServer{ - FriendDatabase: controller.NewFriendDatabase(relation.NewFriendGorm(db), relation.NewFriendRequestGorm(db), tx.NewGorm(db)), - BlackDatabase: controller.NewBlackDatabase(relation.NewBlackGorm(db)), + FriendDatabase: controller.NewFriendDatabase(friendDB, relation.NewFriendRequestGorm(db), cache.NewFriendCacheRedis(rdb, friendDB, cache.GetDefaultOpt()), tx.NewGorm(db)), + BlackDatabase: controller.NewBlackDatabase(blackDB, cache.NewBlackCacheRedis(rdb, blackDB, cache.GetDefaultOpt())), notification: notification.NewCheck(client), userCheck: check.NewUserCheck(client), RegisterCenter: client, diff --git a/internal/rpc/group/db_map.go b/internal/rpc/group/db_map.go index d45186701..903526a82 100644 --- a/internal/rpc/group/db_map.go +++ b/internal/rpc/group/db_map.go @@ -1,9 +1,10 @@ package group import ( - pbGroup "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/group" - sdkws "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" "time" + + pbGroup "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/group" + "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" ) func UpdateGroupInfoMap(group *sdkws.GroupInfoForSet) map[string]any { diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index 29731656f..f786c4de6 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -6,6 +6,7 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/localcache" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/tx" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/unrelation" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/tokenverify" @@ -58,8 +59,8 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e cacheModel := cache.NewCacheModel(rdb) msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase()) extendMsgModel := unrelation.NewExtendMsgSetMongoDriver(mongo.GetDatabase()) - - extendMsgDatabase := controller.NewExtendMsgDatabase(extendMsgModel) + extendMsgCacheModel := cache.NewExtendMsgSetCacheRedis(rdb, extendMsgModel, cache.GetDefaultOpt()) + extendMsgDatabase := controller.NewExtendMsgDatabase(extendMsgModel, extendMsgCacheModel, tx.NewMongo(mongo.GetClient())) msgDatabase := controller.NewMsgDatabase(msgDocModel, cacheModel) s := &msgServer{ diff --git a/pkg/common/db/cache/black.go b/pkg/common/db/cache/black.go index 2a6c60512..ae4e9d680 100644 --- a/pkg/common/db/cache/black.go +++ b/pkg/common/db/cache/black.go @@ -2,10 +2,11 @@ package cache import ( "context" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/relation" + "time" + + relationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation" "github.com/dtm-labs/rockscache" "github.com/go-redis/redis/v8" - "time" ) const ( @@ -16,21 +17,35 @@ const ( // args fn will exec when no data in cache type BlackCache interface { //get blackIDs from cache - GetBlackIDs(ctx context.Context, userID string, fn func(ctx context.Context, userID string) ([]string, error)) (blackIDs []string, err error) + metaCache + NewCache() BlackCache + GetBlackIDs(ctx context.Context, userID string) (blackIDs []string, err error) //del user's blackIDs cache, exec when a user's black list changed - DelBlackIDs(ctx context.Context, userID string) (err error) + DelBlackIDs(ctx context.Context, userID string) BlackCache } type BlackCacheRedis struct { + metaCache expireTime time.Duration rcClient *rockscache.Client - black *relation.BlackGorm + blackDB relationTb.BlackModelInterface } -func NewBlackCacheRedis(rdb redis.UniversalClient, blackDB BlackCache, options rockscache.Options) *BlackCacheRedis { +func NewBlackCacheRedis(rdb redis.UniversalClient, blackDB relationTb.BlackModelInterface, options rockscache.Options) BlackCache { + rcClient := rockscache.NewClient(rdb, options) return &BlackCacheRedis{ expireTime: blackExpireTime, - rcClient: rockscache.NewClient(rdb, options), + rcClient: rcClient, + metaCache: NewMetaCacheRedis(rcClient), + blackDB: blackDB, + } +} + +func (b *BlackCacheRedis) NewCache() BlackCache { + return &BlackCacheRedis{ + expireTime: b.expireTime, + rcClient: b.rcClient, + blackDB: b.blackDB, } } @@ -39,11 +54,13 @@ func (b *BlackCacheRedis) getBlackIDsKey(ownerUserID string) string { } func (b *BlackCacheRedis) GetBlackIDs(ctx context.Context, userID string) (blackIDs []string, err error) { - return GetCache(ctx, b.rcClient, b.getBlackIDsKey(userID), b.expireTime, func(ctx context.Context) ([]string, error) { - return b.black.FindBlackUserIDs(ctx, userID) + return getCache(ctx, b.rcClient, b.getBlackIDsKey(userID), b.expireTime, func(ctx context.Context) ([]string, error) { + return b.blackDB.FindBlackUserIDs(ctx, userID) }) } -func (b *BlackCacheRedis) DelBlackIDs(ctx context.Context, userID string) (err error) { - return b.rcClient.TagAsDeleted(b.getBlackIDsKey(userID)) +func (b *BlackCacheRedis) DelBlackIDs(ctx context.Context, userID string) BlackCache { + cache := b.NewCache() + cache.AddKeys(userID) + return cache } diff --git a/pkg/common/db/cache/conversation.go b/pkg/common/db/cache/conversation.go index 8501ce7c2..ba66d2d41 100644 --- a/pkg/common/db/cache/conversation.go +++ b/pkg/common/db/cache/conversation.go @@ -2,282 +2,216 @@ package cache import ( "context" + "errors" + "math/big" + "strings" + "time" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/relation" relationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" "github.com/dtm-labs/rockscache" "github.com/go-redis/redis/v8" - "time" ) const ( - conversationKey = "CONVERSATION:" - conversationIDsKey = "CONVERSATION_IDS:" - recvMsgOptKey = "RECV_MSG_OPT:" - superGroupRecvMsgNotNotifyUserIDsKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS:" - conversationExpireTime = time.Second * 60 * 60 * 12 -) + conversationKey = "CONVERSATION:" + conversationIDsKey = "CONVERSATION_IDS:" + recvMsgOptKey = "RECV_MSG_OPT:" + superGroupRecvMsgNotNotifyUserIDsKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS:" + superGroupRecvMsgNotNotifyUserIDsHashKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS_HASH:" -type FuncDB func() (string, error) + conversationExpireTime = time.Second * 60 * 60 * 12 +) // arg fn will exec when no data in cache type ConversationCache interface { + metaCache + NewCache() ConversationCache // get user's conversationIDs from cache - GetUserConversationIDs(ctx context.Context, userID string, fn FuncDB) ([]string, error) - // del user's conversationIDs from cache, call when a user add or reduce a conversation - DelUserConversationIDs(ctx context.Context, userID string) error - DelUsersConversationIDs(ctx context.Context, userIDList []string) error + GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) + DelConversationIDs(userIDs []string) ConversationCache // get one conversation from cache - GetConversation(ctx context.Context, ownerUserID, conversationID string, fn FuncDB) (*relationTb.ConversationModel, error) + GetConversation(ctx context.Context, ownerUserID, conversationID string) (*relationTb.ConversationModel, error) + DelConvsersations(ownerUserID string, conversationIDs []string) ConversationCache + DelUsersConversation(ownerUserIDs []string, conversationID string) ConversationCache // get one conversation from cache - GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string, fn FuncDB) ([]*relationTb.ConversationModel, error) + GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationTb.ConversationModel, error) // get one user's all conversations from cache - GetUserAllConversations(ctx context.Context, ownerUserID string, fn FuncDB) ([]*relationTb.ConversationModel, error) - // del one conversation from cache, call when one user's conversation Info changed - DelConversation(ctx context.Context, ownerUserID, conversationID string) error - DelUserConversations(ctx context.Context, ownerUserID string, conversationIDList []string) error - DelUsersConversation(ctx context.Context, ownerUserIDList []string, conversationID string) error + GetUserAllConversations(ctx context.Context, ownerUserID string) ([]*relationTb.ConversationModel, error) // get user conversation recv msg from cache - GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string, fn func(ctx context.Context, ownerUserID, conversationID string) (opt int, err error)) (opt int, err error) - // del user recv msg opt from cache, call when user's conversation recv msg opt changed - DelUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) error + GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error) + DelUserRecvMsgOpt(ownerUserID, conversationID string) ConversationCache // get one super group recv msg but do not notification userID list - GetSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string, fn func(ctx context.Context, groupID string) (userIDs []string, err error)) (userIDs []string, err error) - // del one super group recv msg but do not notification userID list, call it when this list changed - DelSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) error - //GetSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) (hash uint32, err error) - //DelSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) + GetSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (userIDs []string, err error) + // get one super group recv msg but do not notification userID list hash + GetSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) (hash uint64, err error) } -func NewConversationRedis(rdb redis.UniversalClient, opts rockscache.Options) ConversationCache { - return &ConversationRedis{rcClient: rockscache.NewClient(rdb, opts)} +func NewConversationRedis(rdb redis.UniversalClient, opts rockscache.Options, db relationTb.ConversationModelInterface) ConversationCache { + rcClient := rockscache.NewClient(rdb, opts) + return &ConversationRedisCache{rcClient: rcClient, metaCache: NewMetaCacheRedis(rcClient), conversationDB: db, expireTime: conversationExpireTime} } -type ConversationRedis struct { - rcClient *rockscache.Client - expireTime time.Duration +type ConversationRedisCache struct { + metaCache + rcClient *rockscache.Client + conversationDB relationTb.ConversationModelInterface + expireTime time.Duration } -func (c *ConversationRedis) GetUserConversationIDs(ctx context.Context, userID string, fn FuncDB) ([]string, error) { - return nil, nil +func NewNewConversationRedis(rdb redis.UniversalClient, conversationDB *relation.ConversationGorm, options rockscache.Options) ConversationCache { + rcClient := rockscache.NewClient(rdb, options) + return &ConversationRedisCache{rcClient: rcClient, metaCache: NewMetaCacheRedis(rcClient), conversationDB: conversationDB, expireTime: conversationExpireTime} } -func (c *ConversationRedis) GetConversation(ctx context.Context, ownerUserID, conversationID string, fn FuncDB) (*relationTb.ConversationModel, error) { - //TODO implement me - panic("implement me") +func (c *ConversationRedisCache) NewCache() ConversationCache { + return &ConversationRedisCache{rcClient: c.rcClient, metaCache: c.metaCache, conversationDB: c.conversationDB, expireTime: c.expireTime} } -func (c *ConversationRedis) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string, fn FuncDB) ([]*relationTb.ConversationModel, error) { - //TODO implement me - panic("implement me") -} - -func (c *ConversationRedis) GetUserAllConversations(ctx context.Context, ownerUserID string, fn FuncDB) ([]*relationTb.ConversationModel, error) { - //TODO implement me - panic("implement me") -} - -func (c *ConversationRedis) DelUserConversations(ctx context.Context, ownerUserID string, conversationIDList []string) error { - //TODO implement me - panic("implement me") -} - -func (c *ConversationRedis) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string, fn func(ctx context.Context, ownerUserID string, conversationID string) (opt int, err error)) (opt int, err error) { - //TODO implement me - panic("implement me") -} - -func (c *ConversationRedis) GetSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string, fn func(ctx context.Context, groupID string) (userIDs []string, err error)) (userIDs []string, err error) { - //TODO implement me - panic("implement me") -} - -func (c *ConversationRedis) DelUsersConversationIDs(ctx context.Context, userIDList []string) error { - panic("implement me") -} - -func (c *ConversationRedis) DelUsersConversation(ctx context.Context, ownerUserIDList []string, conversationID string) error { - panic("implement me") -} - -func NewNewConversationRedis(rdb redis.UniversalClient, conversationDB *relation.ConversationGorm, options rockscache.Options) *ConversationRedis { - return &ConversationRedis{rcClient: rockscache.NewClient(rdb, options)} -} - -func (c *ConversationRedis) getConversationKey(ownerUserID, conversationID string) string { +func (c *ConversationRedisCache) getConversationKey(ownerUserID, conversationID string) string { return conversationKey + ownerUserID + ":" + conversationID } -func (c *ConversationRedis) getConversationIDsKey(ownerUserID string) string { +func (c *ConversationRedisCache) getConversationIDsKey(ownerUserID string) string { return conversationIDsKey + ownerUserID } -func (c *ConversationRedis) getRecvMsgOptKey(ownerUserID, conversationID string) string { - return recvMsgOptKey + ownerUserID + ":" + conversationID -} - -func (c *ConversationRedis) getSuperGroupRecvNotNotifyUserIDsKey(groupID string) string { +func (c *ConversationRedisCache) getSuperGroupRecvNotNotifyUserIDsKey(groupID string) string { return superGroupRecvMsgNotNotifyUserIDsKey + groupID } -//func (c *ConversationRedis) GetUserConversationIDs(ctx context.Context, ownerUserID string) (conversationIDs []string, err error) { -// //getConversationIDs := func() (string, error) { -// // conversationIDs, err := relation.GetConversationIDsByUserID(ownerUserID) -// // if err != nil { -// // return "", err -// // } -// // bytes, err := json.Marshal(conversationIDs) -// // if err != nil { -// // return "", utils.Wrap(err, "") -// // } -// // return string(bytes), nil -// //} -// //defer func() { -// // mcontext.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversationIDs", conversationIDs) -// //}() -// //conversationIDsStr, err := c.rcClient.Fetch(c.getConversationIDsKey(ownerUserID), time.Second*30*60, getConversationIDs) -// //err = json.Unmarshal([]byte(conversationIDsStr), &conversationIDs) -// //if err != nil { -// // return nil, utils.Wrap(err, "") -// //} -// //return conversationIDs, nil -// return GetCache(ctx, c.rcClient, c.getConversationIDsKey(ownerUserID), conversationExpireTime, func(ctx context.Context) ([]string, error) { -// panic("implement me") -// }) -//} +func (c *ConversationRedisCache) getRecvMsgOptKey(ownerUserID, conversationID string) string { + return recvMsgOptKey + ownerUserID + ":" + conversationID +} -func (c *ConversationRedis) GetUserConversationIDs1(ctx context.Context, ownerUserID string) (conversationIDs []string, err error) { - //getConversationIDs := func() (string, error) { - // conversationIDs, err := relation.GetConversationIDsByUserID(ownerUserID) - // if err != nil { - // return "", err - // } - // bytes, err := json.Marshal(conversationIDs) - // if err != nil { - // return "", utils.Wrap(err, "") - // } - // return string(bytes), nil - //} - //defer func() { - // mcontext.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversationIDs", conversationIDs) - //}() - //conversationIDsStr, err := c.rcClient.Fetch(c.getConversationIDsKey(ownerUserID), time.Second*30*60, getConversationIDs) - //err = json.Unmarshal([]byte(conversationIDsStr), &conversationIDs) - //if err != nil { - // return nil, utils.Wrap(err, "") - //} - //return conversationIDs, nil - //return GetCache1[[]string](c.rcClient, c.getConversationIDsKey(ownerUserID), conversationExpireTime, fn) +func (c *ConversationRedisCache) getSuperGroupRecvNotNotifyUserIDsHashKey(groupID string) string { + return superGroupRecvMsgNotNotifyUserIDsHashKey + groupID +} - return GetCache(ctx, c.rcClient, c.getConversationIDsKey(ownerUserID), conversationExpireTime, func(ctx context.Context) ([]string, error) { - panic("") +func (c *ConversationRedisCache) GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) { + return getCache(ctx, c.rcClient, c.getConversationIDsKey(ownerUserID), c.expireTime, func(ctx context.Context) ([]string, error) { + return c.conversationDB.FindUserIDAllConversationID(ctx, ownerUserID) }) } -//func GetCache1[T any](rcClient *rockscache.Client, key string, expire time.Duration, fn func() (any, error)) (T, error) { -// v, err := rcClient.Fetch(key, expire, func() (string, error) { -// v, err := fn() -// if err != nil { -// return "", err -// } -// bs, err := json.Marshal(v) -// if err != nil { -// return "", utils.Wrap(err, "") -// } -// return string(bs), nil -// }) -// var t T -// if err != nil { -// return t, err -// } -// err = json.Unmarshal([]byte(v), &t) -// if err != nil { -// return t, utils.Wrap(err, "") -// } -// return t, nil -//} - -func (c *ConversationRedis) DelUserConversationIDs(ctx context.Context, ownerUserID string) (err error) { - return utils.Wrap(c.rcClient.TagAsDeleted(c.getConversationIDsKey(ownerUserID)), "DelUserConversationIDs err") +func (c *ConversationRedisCache) DelConversationIDs(userIDs []string) ConversationCache { + var keys []string + for _, userID := range userIDs { + keys = append(keys, c.getConversationIDsKey(userID)) + } + cache := c.NewCache() + cache.AddKeys(keys...) + return cache } -//func (c *ConversationRedis) GetConversation(ctx context.Context, ownerUserID, conversationID string) (conversation *relationTb.ConversationModel, err error) { -// return GetCache(ctx, c.rcClient, c.getConversationKey(ownerUserID, conversationID), c.expireTime, func(ctx context.Context) (*relationTb.ConversationModel, error) { -// panic("implement me") -// }) -//} - -func (c *ConversationRedis) DelConversation(ctx context.Context, ownerUserID, conversationID string) (err error) { - return utils.Wrap(c.rcClient.TagAsDeleted(c.getConversationKey(ownerUserID, conversationID)), "DelConversation err") +func (c *ConversationRedisCache) GetConversation(ctx context.Context, ownerUserID, conversationID string) (*relationTb.ConversationModel, error) { + return getCache(ctx, c.rcClient, c.getConversationKey(ownerUserID, conversationID), c.expireTime, func(ctx context.Context) (*relationTb.ConversationModel, error) { + return c.conversationDB.Take(ctx, ownerUserID, conversationID) + }) } -//func (c *ConversationRedis) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []relationTb.ConversationModel, err error) { -// defer func() { -// mcontext.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversationIDs", conversationIDs, "conversations", conversations) -// }() -// for _, conversationID := range conversationIDs { -// conversation, err := c.GetConversation(ctx, ownerUserID, conversationID) -// if err != nil { -// return nil, err -// } -// conversations = append(conversations, *conversation) -// } -// return conversations, nil -//} - -//func (c *ConversationRedis) GetUserAllConversations(ctx context.Context, ownerUserID string) (conversations []relationTb.ConversationModel, err error) { -// defer func() { -// mcontext.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversations", conversations) -// }() -// IDs, err := c.GetUserConversationIDs(ctx, ownerUserID) -// if err != nil { -// return nil, err -// } -// var conversationIDs []relationTb.ConversationModel -// for _, conversationID := range IDs { -// conversation, err := c.GetConversation(ctx, ownerUserID, conversationID) -// if err != nil { -// return nil, err -// } -// conversationIDs = append(conversationIDs, *conversation) -// } -// return conversationIDs, nil -//} - -//func (c *ConversationRedis) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error) { -// //getConversation := func() (string, error) { -// // conversation, err := relation.GetConversation(ownerUserID, conversationID) -// // if err != nil { -// // return "", err -// // } -// // return strconv.Itoa(int(conversation.RecvMsgOpt)), nil -// //} -// //defer func() { -// // mcontext.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversationID", conversationID, "opt", opt) -// //}() -// //optStr, err := c.rcClient.Fetch(c.getConversationKey(ownerUserID, conversationID), c.expireTime, getConversation) -// //if err != nil { -// // return 0, err -// //} -// //return strconv.Atoi(optStr) -// // panic("implement me") -// return GetCache(ctx, c.rcClient, c.getConversationKey(ownerUserID, conversationID), c.expireTime, func(ctx context.Context) (int, error) { -// panic("implement me") -// }) -//} - -func (c *ConversationRedis) DelUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) error { - return utils.Wrap(c.rcClient.TagAsDeleted(c.getConversationKey(ownerUserID, conversationID)), "DelUserRecvMsgOpt failed") +func (c *ConversationRedisCache) DelConvsersations(ownerUserID string, convsersationIDs []string) ConversationCache { + var keys []string + for _, conversationID := range convsersationIDs { + keys = append(keys, c.getConversationKey(ownerUserID, conversationID)) + } + cache := c.NewCache() + cache.AddKeys(keys...) + return cache } -func (c *ConversationRedis) DelSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (err error) { - panic("implement me") -} - -func (c *ConversationRedis) GetSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) (hash uint32, err error) { - panic("implement me") -} - -func (c *ConversationRedis) DelSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) { +func (c *ConversationRedisCache) GetConversationIndex(convsation *relationTb.ConversationModel, keys []string) (int, error) { + key := c.getConversationKey(convsation.OwnerUserID, convsation.ConversationID) + for _i, _key := range keys { + if _key == key { + return _i, nil + } + } + return 0, errors.New("not found key:" + key + " in keys") +} + +func (c *ConversationRedisCache) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationTb.ConversationModel, error) { + var keys []string + for _, conversarionID := range conversationIDs { + keys = append(keys, c.getConversationKey(ownerUserID, conversarionID)) + } + return batchGetCache(ctx, c.rcClient, keys, c.expireTime, c.GetConversationIndex, func(ctx context.Context) ([]*relationTb.ConversationModel, error) { + return c.conversationDB.Find(ctx, ownerUserID, conversationIDs) + }) +} + +func (c *ConversationRedisCache) GetUserAllConversations(ctx context.Context, ownerUserID string) ([]*relationTb.ConversationModel, error) { + conversationIDs, err := c.GetUserConversationIDs(ctx, ownerUserID) + if err != nil { + return nil, err + } + var keys []string + for _, conversarionID := range conversationIDs { + keys = append(keys, c.getConversationKey(ownerUserID, conversarionID)) + } + return batchGetCache(ctx, c.rcClient, keys, c.expireTime, c.GetConversationIndex, func(ctx context.Context) ([]*relationTb.ConversationModel, error) { + return c.conversationDB.FindUserIDAllConversations(ctx, ownerUserID) + }) +} + +func (c *ConversationRedisCache) DelUserConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ConversationCache { + var keys []string + for _, conversationID := range conversationIDs { + keys = append(keys, c.getConversationKey(ownerUserID, conversationID)) + } + cache := c.NewCache() + cache.AddKeys(keys...) + return cache +} + +func (c *ConversationRedisCache) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error) { + return getCache(ctx, c.rcClient, c.getRecvMsgOptKey(ownerUserID, conversationID), c.expireTime, func(ctx context.Context) (opt int, err error) { + return c.conversationDB.GetUserRecvMsgOpt(ctx, ownerUserID, conversationID) + }) +} + +func (c *ConversationRedisCache) GetSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (userIDs []string, err error) { + return getCache(ctx, c.rcClient, c.getSuperGroupRecvNotNotifyUserIDsKey(groupID), c.expireTime, func(ctx context.Context) (userIDs []string, err error) { + return c.conversationDB.FindSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID) + }) +} + +func (c *ConversationRedisCache) DelUsersConversation(ownerUserIDs []string, conversationID string) ConversationCache { + var keys []string + for _, ownerUserID := range ownerUserIDs { + keys = append(keys, c.getConversationKey(ownerUserID, conversationID)) + } + cache := c.NewCache() + cache.AddKeys(keys...) + return cache +} + +func (c *ConversationRedisCache) DelUserRecvMsgOpt(ownerUserID, conversationID string) ConversationCache { + cache := c.NewCache() + cache.AddKeys(c.getRecvMsgOptKey(ownerUserID, conversationID)) + return cache +} + +func (c *ConversationRedisCache) DelSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ConversationCache { + cache := c.NewCache() + cache.AddKeys(c.getSuperGroupRecvNotNotifyUserIDsKey(groupID)) + return cache +} + +func (c *ConversationRedisCache) GetSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) (hash uint64, err error) { + return getCache(ctx, c.rcClient, c.getSuperGroupRecvNotNotifyUserIDsHashKey(groupID), c.expireTime, func(ctx context.Context) (hash uint64, err error) { + userIDs, err := c.GetSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID) + if err != nil { + return 0, err + } + utils.Sort(userIDs, true) + bi := big.NewInt(0) + bi.SetString(utils.Md5(strings.Join(userIDs, ";"))[0:8], 16) + return bi.Uint64(), nil + }) +} + +func (c *ConversationRedisCache) DelSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) { panic("implement me") } diff --git a/pkg/common/db/cache/extend_msg_set.go b/pkg/common/db/cache/extend_msg_set.go index bbd04fa0c..37ef2cf42 100644 --- a/pkg/common/db/cache/extend_msg_set.go +++ b/pkg/common/db/cache/extend_msg_set.go @@ -2,10 +2,11 @@ package cache import ( "context" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation" - "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" - "github.com/dtm-labs/rockscache" "time" + + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation" + "github.com/dtm-labs/rockscache" + "github.com/go-redis/redis/v8" ) const ( @@ -13,44 +14,51 @@ const ( extendMsgCache = "EXTEND_MSG_CACHE:" ) -type ExtendMsgSetCache struct { - expireTime time.Duration - rcClient *rockscache.Client +type ExtendMsgSetCache interface { + metaCache + NewCache() ExtendMsgSetCache + GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, firstModifyTime int64) (extendMsg *unrelation.ExtendMsgModel, err error) + DelExtendMsg(clientMsgID string) ExtendMsgSetCache } -func (e *ExtendMsgSetCache) getKey(clientMsgID string) string { +type ExtendMsgSetCacheRedis struct { + metaCache + expireTime time.Duration + rcClient *rockscache.Client + extendMsgSetDB unrelation.ExtendMsgSetModelInterface +} + +func NewExtendMsgSetCacheRedis(rdb redis.UniversalClient, extendMsgSetDB unrelation.ExtendMsgSetModelInterface, options rockscache.Options) ExtendMsgSetCache { + rcClient := rockscache.NewClient(rdb, options) + return &ExtendMsgSetCacheRedis{ + metaCache: NewMetaCacheRedis(rcClient), + expireTime: time.Second * 30 * 60, + extendMsgSetDB: extendMsgSetDB, + rcClient: rcClient, + } +} + +func (e *ExtendMsgSetCacheRedis) NewCache() ExtendMsgSetCache { + return &ExtendMsgSetCacheRedis{ + metaCache: e.metaCache, + expireTime: e.expireTime, + extendMsgSetDB: e.extendMsgSetDB, + rcClient: e.rcClient, + } +} + +func (e *ExtendMsgSetCacheRedis) getKey(clientMsgID string) string { return extendMsgCache + clientMsgID } -func (e *ExtendMsgSetCache) GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, firstModifyTime int64) (extendMsg *unrelation.ExtendMsgModel, err error) { - //getExtendMsg := func() (string, error) { - // extendMsg, err := db.DB.GetExtendMsg(sourceID, sessionType, clientMsgID, firstModifyTime) - // if err != nil { - // return "", utils.Wrap(err, "GetExtendMsgList failed") - // } - // bytes, err := json.Marshal(extendMsg) - // if err != nil { - // return "", utils.Wrap(err, "Marshal failed") - // } - // return string(bytes), nil - //} - //defer func() { - // mcontext.SetCtxDebug(ctx, utils.GetFuncName(1), err, "sourceID", sourceID, "sessionType", - // sessionType, "clientMsgID", clientMsgID, "firstModifyTime", firstModifyTime, "extendMsg", extendMsg) - //}() - //extendMsgStr, err := db.DB.Rc.Fetch(extendMsgCache+clientMsgID, time.Second*30*60, getExtendMsg) - //if err != nil { - // return nil, utils.Wrap(err, "Fetch failed") - //} - //extendMsg = &mongoDB.ExtendMsg{} - //err = json.Unmarshal([]byte(extendMsgStr), extendMsg) - //return extendMsg, utils.Wrap(err, "Unmarshal failed") - return GetCache(ctx, e.rcClient, e.getKey(clientMsgID), e.expireTime, func(ctx context.Context) (*unrelation.ExtendMsgModel, error) { - panic("") +func (e *ExtendMsgSetCacheRedis) GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, firstModifyTime int64) (extendMsg *unrelation.ExtendMsgModel, err error) { + return getCache(ctx, e.rcClient, e.getKey(clientMsgID), e.expireTime, func(ctx context.Context) (*unrelation.ExtendMsgModel, error) { + return e.extendMsgSetDB.TakeExtendMsg(ctx, sourceID, sessionType, clientMsgID, firstModifyTime) }) - } -func (e *ExtendMsgSetCache) DelExtendMsg(ctx context.Context, clientMsgID string) (err error) { - return utils.Wrap(e.rcClient.TagAsDeleted(e.getKey(clientMsgID)), "DelExtendMsg err") +func (e *ExtendMsgSetCacheRedis) DelExtendMsg(clientMsgID string) ExtendMsgSetCache { + new := e.NewCache() + new.AddKeys(e.getKey(clientMsgID)) + return new } diff --git a/pkg/common/db/cache/friend.go b/pkg/common/db/cache/friend.go index 2d1731978..f74df8975 100644 --- a/pkg/common/db/cache/friend.go +++ b/pkg/common/db/cache/friend.go @@ -2,13 +2,12 @@ package cache import ( "context" - "encoding/json" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/relation" + "time" + relationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" "github.com/dtm-labs/rockscache" "github.com/go-redis/redis/v8" - "time" ) const ( @@ -20,29 +19,38 @@ const ( // args fn will exec when no data in cache type FriendCache interface { - GetFriendIDs(ctx context.Context, ownerUserID string, fn func(ctx context.Context, ownerUserID string) (friendIDs []string, err error)) (friendIDs []string, err error) + metaCache + NewCache() FriendCache + GetFriendIDs(ctx context.Context, ownerUserID string) (friendIDs []string, err error) // call when friendID List changed - DelFriendIDs(ctx context.Context, ownerUserID string) (err error) + DelFriendIDs(ownerUserID ...string) FriendCache // get single friendInfo from cache - GetFriend(ctx context.Context, ownerUserID, friendUserID string, fn func(ctx context.Context, ownerUserID, friendUserID string) (friend *relationTb.FriendModel, err error)) (friend *relationTb.FriendModel, err error) + GetFriend(ctx context.Context, ownerUserID, friendUserID string) (friend *relationTb.FriendModel, err error) // del friend when friend info changed - DelFriend(ctx context.Context, ownerUserID, friendUserID string) (err error) + DelFriend(ownerUserID, friendUserID string) FriendCache } type FriendCacheRedis struct { - friendDB *relation.FriendGorm + metaCache + friendDB relationTb.FriendModelInterface expireTime time.Duration rcClient *rockscache.Client } -func NewFriendCacheRedis(rdb redis.UniversalClient, friendDB *relation.FriendGorm, options rockscache.Options) *FriendCacheRedis { +func NewFriendCacheRedis(rdb redis.UniversalClient, friendDB relationTb.FriendModelInterface, options rockscache.Options) FriendCache { + rcClient := rockscache.NewClient(rdb, options) return &FriendCacheRedis{ + metaCache: NewMetaCacheRedis(rcClient), friendDB: friendDB, expireTime: friendExpireTime, - rcClient: rockscache.NewClient(rdb, options), + rcClient: rcClient, } } +func (c *FriendCacheRedis) NewCache() FriendCache { + return &FriendCacheRedis{rcClient: c.rcClient, metaCache: c.metaCache, friendDB: c.friendDB, expireTime: c.expireTime} +} + func (f *FriendCacheRedis) getFriendIDsKey(ownerUserID string) string { return friendIDsKey + ownerUserID } @@ -56,15 +64,22 @@ func (f *FriendCacheRedis) getFriendKey(ownerUserID, friendUserID string) string } func (f *FriendCacheRedis) GetFriendIDs(ctx context.Context, ownerUserID string) (friendIDs []string, err error) { - return GetCache(ctx, f.rcClient, f.getFriendIDsKey(ownerUserID), f.expireTime, func(ctx context.Context) ([]string, error) { + return getCache(ctx, f.rcClient, f.getFriendIDsKey(ownerUserID), f.expireTime, func(ctx context.Context) ([]string, error) { return f.friendDB.FindFriendUserIDs(ctx, ownerUserID) }) } -func (f *FriendCacheRedis) DelFriendIDs(ctx context.Context, ownerUserID string) (err error) { - return f.rcClient.TagAsDeleted(f.getFriendIDsKey(ownerUserID)) +func (f *FriendCacheRedis) DelFriendIDs(ownerUserID ...string) FriendCache { + new := f.NewCache() + var keys []string + for _, userID := range ownerUserID { + keys = append(keys, f.getFriendIDsKey(userID)) + } + new.AddKeys(keys...) + return new } +// todo func (f *FriendCacheRedis) GetTwoWayFriendIDs(ctx context.Context, ownerUserID string) (twoWayFriendIDs []string, err error) { friendIDs, err := f.GetFriendIDs(ctx, ownerUserID) if err != nil { @@ -82,31 +97,20 @@ func (f *FriendCacheRedis) GetTwoWayFriendIDs(ctx context.Context, ownerUserID s return twoWayFriendIDs, nil } -func (f *FriendCacheRedis) DelTwoWayFriendIDs(ctx context.Context, ownerUserID string) (err error) { - return f.rcClient.TagAsDeleted(f.getTwoWayFriendsIDsKey(ownerUserID)) +func (f *FriendCacheRedis) DelTwoWayFriendIDs(ctx context.Context, ownerUserID string) FriendCache { + new := f.NewCache() + new.AddKeys(f.getTwoWayFriendsIDsKey(ownerUserID)) + return new } -func (f *FriendCacheRedis) GetFriend(ctx context.Context, ownerUserID, friendUserID string, fn func(ctx context.Context, ownerUserID, friendUserID string) (friend *relationTb.FriendModel, err error)) (friend *relationTb.FriendModel, err error) { - getFriend := func() (string, error) { - friend, err = f.friendDB.Take(ctx, ownerUserID, friendUserID) - if err != nil { - return "", err - } - bytes, err := json.Marshal(friend) - if err != nil { - return "", utils.Wrap(err, "") - } - return string(bytes), nil - } - friendStr, err := f.rcClient.Fetch(f.getFriendKey(ownerUserID, friendUserID), f.expireTime, getFriend) - if err != nil { - return nil, err - } - friend = &relationTb.FriendModel{} - err = json.Unmarshal([]byte(friendStr), friend) - return friend, utils.Wrap(err, "") +func (f *FriendCacheRedis) GetFriend(ctx context.Context, ownerUserID, friendUserID string) (friend *relationTb.FriendModel, err error) { + return getCache(ctx, f.rcClient, f.getFriendKey(ownerUserID, friendUserID), f.expireTime, func(ctx context.Context) (*relationTb.FriendModel, error) { + return f.friendDB.Take(ctx, ownerUserID, friendUserID) + }) } -func (f *FriendCacheRedis) DelFriend(ctx context.Context, ownerUserID, friendUserID string) (err error) { - return f.rcClient.TagAsDeleted(f.getFriendKey(ownerUserID, friendUserID)) +func (f *FriendCacheRedis) DelFriend(ownerUserID, friendUserID string) FriendCache { + new := f.NewCache() + new.AddKeys(f.getFriendKey(ownerUserID, friendUserID)) + return new } diff --git a/pkg/common/db/cache/group.go b/pkg/common/db/cache/group.go index e9d47cb1c..657452f31 100644 --- a/pkg/common/db/cache/group.go +++ b/pkg/common/db/cache/group.go @@ -2,14 +2,15 @@ package cache import ( "context" - relationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation" - unrelation2 "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation" - "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" - "github.com/dtm-labs/rockscache" - "github.com/go-redis/redis/v8" "math/big" "strings" "time" + + relationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation" + unrelationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation" + "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" + "github.com/dtm-labs/rockscache" + "github.com/go-redis/redis/v8" ) const ( @@ -24,40 +25,54 @@ const ( ) type GroupCache interface { + metaCache + NewCache() GroupCache GetGroupsInfo(ctx context.Context, groupIDs []string) (groups []*relationTb.GroupModel, err error) GetGroupInfo(ctx context.Context, groupID string) (group *relationTb.GroupModel, err error) - BatchDelJoinedSuperGroupIDs(ctx context.Context, userIDs []string) (err error) - DelJoinedSuperGroupIDs(ctx context.Context, userID string) (err error) GetJoinedSuperGroupIDs(ctx context.Context, userID string) (joinedSuperGroupIDs []string, err error) - GetGroupMembersHash(ctx context.Context, groupID string) (hashCodeUint64 uint64, err error) - GetGroupMemberHash1(ctx context.Context, groupIDs []string) (map[string]*relationTb.GroupSimpleUserID, error) - DelGroupMembersHash(ctx context.Context, groupID string) (err error) + DelJoinedSuperGroupIDs(userIDs ...string) GroupCache + + GetGroupMembersHash(ctx context.Context, groupID string) (hashCode uint64, err error) + GetGroupMemberHashMap(ctx context.Context, groupIDs []string) (map[string]*relationTb.GroupSimpleUserID, error) + DelGroupMembersHash(groupID string) GroupCache + GetGroupMemberIDs(ctx context.Context, groupID string) (groupMemberIDs []string, err error) - DelGroupMemberIDs(ctx context.Context, groupID string) (err error) - DelJoinedGroupID(ctx context.Context, userID string) (err error) + GetGroupsMemberIDs(ctx context.Context, groupIDs []string) (groupMemberIDs map[string][]string, err error) + + DelGroupMemberIDs(groupID string) GroupCache + DelJoinedGroupID(userID ...string) GroupCache + GetGroupMemberInfo(ctx context.Context, groupID, userID string) (groupMember *relationTb.GroupMemberModel, err error) - DelGroupMemberInfo(ctx context.Context, groupID, userID string) (err error) - DelGroupMemberNum(ctx context.Context, groupID string) (err error) - DelGroupInfo(ctx context.Context, groupID string) (err error) - DelGroupsInfo(ctx context.Context, groupIDs []string) error + GetGroupMembersInfo(ctx context.Context, groupID string, userID []string, roleLevel []int32) (groupMembers []*relationTb.GroupMemberModel, err error) + DelGroupMembersInfo(groupID string, userID ...string) GroupCache + + GetGroupMemberNum(ctx context.Context, groupID string) (memberNum int64, err error) + DelGroupsMemberNum(groupID ...string) GroupCache + DelGroupsInfo(groupIDs ...string) GroupCache } type GroupCacheRedis struct { - group relationTb.GroupModelInterface - groupMember relationTb.GroupMemberModelInterface - groupRequest relationTb.GroupRequestModelInterface - mongoDB unrelation2.SuperGroupModelInterface - expireTime time.Duration - rcClient *rockscache.Client + metaCache + groupDB relationTb.GroupModelInterface + groupMemberDB relationTb.GroupMemberModelInterface + groupRequestDB relationTb.GroupRequestModelInterface + mongoDB unrelationTb.SuperGroupModelInterface + expireTime time.Duration + rcClient *rockscache.Client } -func NewGroupCacheRedis(rdb redis.UniversalClient, groupDB relationTb.GroupModelInterface, groupMemberDB relationTb.GroupMemberModelInterface, groupRequestDB relationTb.GroupRequestModelInterface, mongoClient unrelation2.SuperGroupModelInterface, opts rockscache.Options) GroupCache { - return &GroupCacheRedis{rcClient: rockscache.NewClient(rdb, opts), expireTime: groupExpireTime, - group: groupDB, groupMember: groupMemberDB, groupRequest: groupRequestDB, - mongoDB: mongoClient, +func NewGroupCacheRedis(rdb redis.UniversalClient, groupDB relationTb.GroupModelInterface, groupMemberDB relationTb.GroupMemberModelInterface, groupRequestDB relationTb.GroupRequestModelInterface, mongoClient unrelationTb.SuperGroupModelInterface, opts rockscache.Options) GroupCache { + rcClient := rockscache.NewClient(rdb, opts) + return &GroupCacheRedis{rcClient: rcClient, expireTime: groupExpireTime, + groupDB: groupDB, groupMemberDB: groupMemberDB, groupRequestDB: groupRequestDB, + mongoDB: mongoClient, metaCache: NewMetaCacheRedis(rcClient), } } +func (g *GroupCacheRedis) NewCache() GroupCache { + return &GroupCacheRedis{rcClient: g.rcClient, expireTime: g.expireTime, groupDB: g.groupDB, groupMemberDB: g.groupMemberDB, groupRequestDB: g.groupRequestDB, mongoDB: g.mongoDB, metaCache: g.metaCache} +} + func (g *GroupCacheRedis) getGroupInfoKey(groupID string) string { return groupInfoKey + groupID } @@ -86,35 +101,66 @@ func (g *GroupCacheRedis) getGroupMemberNumKey(groupID string) string { return groupMemberNumKey + groupID } +func (g *GroupCacheRedis) GetGroupIndex(group *relationTb.GroupModel, keys []string) (int, error) { + key := g.getGroupInfoKey(group.GroupID) + for i, _key := range keys { + if _key == key { + return i, nil + } + } + return 0, errIndex +} + +func (g *GroupCacheRedis) GetGroupMemberIndex(groupMember *relationTb.GroupMemberModel, keys []string) (int, error) { + key := g.getGroupMemberInfoKey(groupMember.GroupID, groupMember.UserID) + for i, _key := range keys { + if _key == key { + return i, nil + } + } + return 0, errIndex +} + // / groupInfo func (g *GroupCacheRedis) GetGroupsInfo(ctx context.Context, groupIDs []string) (groups []*relationTb.GroupModel, err error) { - return GetCacheFor(ctx, groupIDs, func(ctx context.Context, groupID string) (*relationTb.GroupModel, error) { - return g.GetGroupInfo(ctx, groupID) + var keys []string + for _, group := range groupIDs { + keys = append(keys, g.getGroupInfoKey(group)) + } + return batchGetCache(ctx, g.rcClient, keys, g.expireTime, g.GetGroupIndex, func(ctx context.Context) ([]*relationTb.GroupModel, error) { + return g.groupDB.Find(ctx, groupIDs) }) } func (g *GroupCacheRedis) GetGroupInfo(ctx context.Context, groupID string) (group *relationTb.GroupModel, err error) { - return GetCache(ctx, g.rcClient, g.getGroupInfoKey(groupID), g.expireTime, func(ctx context.Context) (*relationTb.GroupModel, error) { - return g.group.Take(ctx, groupID) + return getCache(ctx, g.rcClient, g.getGroupInfoKey(groupID), g.expireTime, func(ctx context.Context) (*relationTb.GroupModel, error) { + return g.groupDB.Take(ctx, groupID) }) } -// userJoinSuperGroup -func (g *GroupCacheRedis) BatchDelJoinedSuperGroupIDs(ctx context.Context, userIDs []string) (err error) { - for _, userID := range userIDs { - if err := g.DelJoinedSuperGroupIDs(ctx, userID); err != nil { - return err - } +func (g *GroupCacheRedis) DelGroupsInfo(groupIDs ...string) GroupCache { + new := g.NewCache() + var keys []string + for _, groupID := range groupIDs { + keys = append(keys, g.getGroupInfoKey(groupID)) } - return nil + new.AddKeys(keys...) + return new } -func (g *GroupCacheRedis) DelJoinedSuperGroupIDs(ctx context.Context, userID string) (err error) { - return g.rcClient.TagAsDeleted(g.getJoinedSuperGroupsIDKey(userID)) +// userJoinSuperGroup +func (g *GroupCacheRedis) DelJoinedSuperGroupIDs(userIDs ...string) GroupCache { + new := g.NewCache() + var keys []string + for _, userID := range userIDs { + keys = append(keys, g.getJoinedSuperGroupsIDKey(userID)) + } + new.AddKeys(keys...) + return new } func (g *GroupCacheRedis) GetJoinedSuperGroupIDs(ctx context.Context, userID string) (joinedSuperGroupIDs []string, err error) { - return GetCache(ctx, g.rcClient, g.getJoinedSuperGroupsIDKey(userID), g.expireTime, func(ctx context.Context) ([]string, error) { + return getCache(ctx, g.rcClient, g.getJoinedSuperGroupsIDKey(userID), g.expireTime, func(ctx context.Context) ([]string, error) { userGroup, err := g.mongoDB.GetSuperGroupByUserID(ctx, userID) if err != nil { return nil, err @@ -124,8 +170,8 @@ func (g *GroupCacheRedis) GetJoinedSuperGroupIDs(ctx context.Context, userID str } // groupMembersHash -func (g *GroupCacheRedis) GetGroupMembersHash(ctx context.Context, groupID string) (hashCodeUint64 uint64, err error) { - return GetCache(ctx, g.rcClient, g.getGroupMembersHashKey(groupID), g.expireTime, func(ctx context.Context) (uint64, error) { +func (g *GroupCacheRedis) GetGroupMembersHash(ctx context.Context, groupID string) (hashCode uint64, err error) { + return getCache(ctx, g.rcClient, g.getGroupMembersHashKey(groupID), g.expireTime, func(ctx context.Context) (uint64, error) { userIDs, err := g.GetGroupMemberIDs(ctx, groupID) if err != nil { return 0, err @@ -137,174 +183,125 @@ func (g *GroupCacheRedis) GetGroupMembersHash(ctx context.Context, groupID strin }) } -func (g *GroupCacheRedis) GetGroupMemberHash1(ctx context.Context, groupIDs []string) (map[string]*relationTb.GroupSimpleUserID, error) { - // todo - mapGroupUserIDs, err := g.groupMember.FindJoinUserID(ctx, groupIDs) - if err != nil { - return nil, err - } +func (g *GroupCacheRedis) GetGroupMemberHashMap(ctx context.Context, groupIDs []string) (map[string]*relationTb.GroupSimpleUserID, error) { res := make(map[string]*relationTb.GroupSimpleUserID) for _, groupID := range groupIDs { - userIDs := mapGroupUserIDs[groupID] + userIDs, err := g.GetGroupMemberIDs(ctx, groupID) + if err != nil { + return nil, err + } users := &relationTb.GroupSimpleUserID{} if len(userIDs) > 0 { utils.Sort(userIDs, true) bi := big.NewInt(0) bi.SetString(utils.Md5(strings.Join(userIDs, ";"))[0:8], 16) users.Hash = bi.Uint64() + users.MemberNum = uint32(len(userIDs)) } res[groupID] = users } return res, nil } -func (g *GroupCacheRedis) DelGroupMembersHash(ctx context.Context, groupID string) (err error) { - return g.rcClient.TagAsDeleted(g.getGroupMembersHashKey(groupID)) +func (g *GroupCacheRedis) DelGroupMembersHash(groupID string) GroupCache { + cache := g.NewCache() + cache.AddKeys(g.getGroupMembersHashKey(groupID)) + return cache } // groupMemberIDs func (g *GroupCacheRedis) GetGroupMemberIDs(ctx context.Context, groupID string) (groupMemberIDs []string, err error) { - return GetCache(ctx, g.rcClient, g.getGroupMemberIDsKey(groupID), g.expireTime, func(ctx context.Context) ([]string, error) { - return g.groupMember.FindMemberUserID(ctx, groupID) + return getCache(ctx, g.rcClient, g.getGroupMemberIDsKey(groupID), g.expireTime, func(ctx context.Context) ([]string, error) { + return g.groupMemberDB.FindMemberUserID(ctx, groupID) }) } -func (g *GroupCacheRedis) DelGroupMemberIDs(ctx context.Context, groupID string) (err error) { - return g.rcClient.TagAsDeleted(g.getGroupMemberIDsKey(groupID)) +func (g *GroupCacheRedis) GetGroupsMemberIDs(ctx context.Context, groupIDs []string) (map[string][]string, error) { + m := make(map[string][]string) + for _, groupID := range groupIDs { + userIDs, err := g.GetGroupMemberIDs(ctx, groupID) + if err != nil { + return nil, err + } + m[groupID] = userIDs + } + return m, nil } -//// JoinedGroups -//func (g *GroupCacheRedis) GetJoinedGroupIDs(ctx context.Context, userID string) (joinedGroupIDs []string, err error) { -// getJoinedGroupIDList := func() (string, error) { -// joinedGroupList, err := relation.GetJoinedGroupIDListByUserID(userID) -// if err != nil { -// return "", err -// } -// bytes, err := json.Marshal(joinedGroupList) -// if err != nil { -// return "", utils.Wrap(err, "") -// } -// return string(bytes), nil -// } -// defer func() { -// mcontext.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID, "joinedGroupIDs", joinedGroupIDs) -// }() -// joinedGroupIDListStr, err := g.rcClient.Fetch(g.getJoinedGroupsKey(userID), time.Second*30*60, getJoinedGroupIDList) -// if err != nil { -// return nil, err -// } -// err = json.Unmarshal([]byte(joinedGroupIDListStr), &joinedGroupIDs) -// return joinedGroupIDs, utils.Wrap(err, "") -//} - -func (g *GroupCacheRedis) DelJoinedGroupID(ctx context.Context, userID string) (err error) { - return g.rcClient.TagAsDeleted(g.getJoinedGroupsKey(userID)) +func (g *GroupCacheRedis) DelGroupMemberIDs(groupID string) GroupCache { + cache := g.NewCache() + cache.AddKeys(g.getGroupMemberIDsKey(groupID)) + return cache } -//func (g *GroupCacheRedis) DelJoinedGroupIDs(ctx context.Context, userIDs []string) (err error) { -// defer func() { -// mcontext.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID) -// }() -// for _, userID := range userIDs { -// if err := g.DelJoinedGroupID(ctx, userID); err != nil { -// return err -// } -// } -// return nil -//} +func (g *GroupCacheRedis) GetJoinedGroupIDs(ctx context.Context, userID string) (joinedGroupIDs []string, err error) { + return getCache(ctx, g.rcClient, g.getJoinedGroupsKey(userID), g.expireTime, func(ctx context.Context) ([]string, error) { + return g.groupMemberDB.FindUserJoinedGroupID(ctx, userID) + }) +} + +func (g *GroupCacheRedis) DelJoinedGroupID(userIDs ...string) GroupCache { + var keys []string + for _, userID := range userIDs { + keys = append(keys, g.getJoinedGroupsKey(userID)) + } + cache := g.NewCache() + cache.AddKeys(keys...) + return cache +} func (g *GroupCacheRedis) GetGroupMemberInfo(ctx context.Context, groupID, userID string) (groupMember *relationTb.GroupMemberModel, err error) { - return GetCache(ctx, g.rcClient, g.getGroupMemberInfoKey(groupID, userID), g.expireTime, func(ctx context.Context) (*relationTb.GroupMemberModel, error) { - return g.groupMember.Take(ctx, groupID, userID) + return getCache(ctx, g.rcClient, g.getGroupMemberInfoKey(groupID, userID), g.expireTime, func(ctx context.Context) (*relationTb.GroupMemberModel, error) { + return g.groupMemberDB.Take(ctx, groupID, userID) }) } -//func (g *GroupCacheRedis) GetGroupMembersInfo(ctx context.Context, groupID, userIDs []string) (groupMember *relationTb.GroupMemberModel, err error) { -// -// return nil, err -//} - -//func (g *GroupCacheRedis) GetGroupMembersInfo(ctx context.Context, count, offset int32, groupID string) (groupMembers []*relation.GroupMember, err error) { -// defer func() { -// mcontext.SetCtxDebug(ctx, utils.GetFuncName(1), err, "count", count, "offset", offset, "groupID", groupID, "groupMember", groupMembers) -// }() -// groupMemberIDList, err := g.GetGroupMemberIDs(ctx, groupID) -// if err != nil { -// return nil, err -// } -// if count < 0 || offset < 0 { -// return nil, nil -// } -// var groupMemberList []*relation.GroupMember -// var start, stop int32 -// start = offset -// stop = offset + count -// l := int32(len(groupMemberIDList)) -// if start > stop { -// return nil, nil -// } -// if start >= l { -// return nil, nil -// } -// if count != 0 { -// if stop >= l { -// stop = l -// } -// groupMemberIDList = groupMemberIDList[start:stop] -// } else { -// if l < 1000 { -// stop = l -// } else { -// stop = 1000 -// } -// groupMemberIDList = groupMemberIDList[start:stop] -// } -// for _, userID := range groupMemberIDList { -// groupMember, err := g.GetGroupMemberInfo(ctx, groupID, userID) -// if err != nil { -// return -// } -// groupMembers = append(groupMembers, groupMember) -// } -// return groupMemberList, nil -//} - -func (g *GroupCacheRedis) DelGroupMemberInfo(ctx context.Context, groupID, userID string) (err error) { - return g.rcClient.TagAsDeleted(g.getGroupMemberInfoKey(groupID, userID)) -} - -// groupMemberNum -//func (g *GroupCacheRedis) GetGroupMemberNum(ctx context.Context, groupID string) (num int, err error) { -// getGroupMemberNum := func() (string, error) { -// num, err := relation.GetGroupMemberNumByGroupID(groupID) -// if err != nil { -// return "", err -// } -// return strconv.Itoa(int(num)), nil -// } -// defer func() { -// mcontext.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "num", num) -// }() -// groupMember, err := g.rcClient.Fetch(g.getGroupMemberNumKey(groupID), time.Second*30*60, getGroupMemberNum) -// if err != nil { -// return 0, err -// } -// return strconv.Atoi(groupMember) -//} - -func (g *GroupCacheRedis) DelGroupMemberNum(ctx context.Context, groupID string) (err error) { - return g.rcClient.TagAsDeleted(g.getGroupMemberNumKey(groupID)) -} - -func (g *GroupCacheRedis) DelGroupInfo(ctx context.Context, groupID string) (err error) { - return g.rcClient.TagAsDeleted(g.getGroupInfoKey(groupID)) -} - -func (g *GroupCacheRedis) DelGroupsInfo(ctx context.Context, groupIDs []string) error { - for _, groupID := range groupIDs { - if err := g.DelGroupInfo(ctx, groupID); err != nil { - return err - } +func (g *GroupCacheRedis) GetGroupMembersInfo(ctx context.Context, groupID string, userIDs []string, roleLevel []int32) ([]*relationTb.GroupMemberModel, error) { + var keys []string + for _, userID := range userIDs { + keys = append(keys, g.getGroupMemberInfoKey(groupID, userID)) } - return nil + return batchGetCache(ctx, g.rcClient, keys, g.expireTime, g.GetGroupMemberIndex, func(ctx context.Context) ([]*relationTb.GroupMemberModel, error) { + return g.groupMemberDB.Find(ctx, []string{groupID}, userIDs, roleLevel) + }) +} + +func (g *GroupCacheRedis) GetAllGroupMemberInfo(ctx context.Context, groupID string) ([]*relationTb.GroupMemberModel, error) { + groupMemberIDs, err := g.GetGroupMemberIDs(ctx, groupID) + if err != nil { + return nil, err + } + var keys []string + for _, groupMemberID := range groupMemberIDs { + keys = append(keys, g.getGroupMemberInfoKey(groupID, groupMemberID)) + } + return batchGetCache(ctx, g.rcClient, keys, g.expireTime, g.GetGroupMemberIndex, func(ctx context.Context) ([]*relationTb.GroupMemberModel, error) { + return g.groupMemberDB.Find(ctx, []string{groupID}, groupMemberIDs, nil) + }) +} + +func (g *GroupCacheRedis) DelGroupMembersInfo(groupID string, userIDs ...string) GroupCache { + var keys []string + for _, userID := range userIDs { + keys = append(keys, g.getGroupMemberInfoKey(groupID, userID)) + } + cache := g.NewCache() + cache.AddKeys(keys...) + return cache +} + +func (g *GroupCacheRedis) GetGroupMemberNum(ctx context.Context, groupID string) (memberNum int64, err error) { + return getCache(ctx, g.rcClient, g.getGroupMemberNumKey(groupID), g.expireTime, func(ctx context.Context) (int64, error) { + return g.groupMemberDB.TakeGroupMemberNum(ctx, groupID) + }) +} + +func (g *GroupCacheRedis) DelGroupsMemberNum(groupID ...string) GroupCache { + var keys []string + for _, groupID := range groupID { + keys = append(keys, g.getGroupMemberNumKey(groupID)) + } + cache := g.NewCache() + cache.AddKeys(keys...) + return cache } diff --git a/pkg/common/db/cache/init_redis.go b/pkg/common/db/cache/init_redis.go index d37e6a053..74c472f16 100644 --- a/pkg/common/db/cache/init_redis.go +++ b/pkg/common/db/cache/init_redis.go @@ -3,11 +3,12 @@ package cache import ( "context" "fmt" + "time" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/mw/specialerror" "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" "github.com/go-redis/redis/v8" - "time" ) func NewRedis() (redis.UniversalClient, error) { diff --git a/pkg/common/db/cache/redis.go b/pkg/common/db/cache/redis.go index 90fedf869..6a5d01227 100644 --- a/pkg/common/db/cache/redis.go +++ b/pkg/common/db/cache/redis.go @@ -4,17 +4,17 @@ import ( "context" "errors" "fmt" + "strconv" + "time" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext" pbMsg "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" "github.com/golang/protobuf/jsonpb" "github.com/golang/protobuf/proto" - "strconv" - "time" "github.com/go-redis/redis/v8" ) @@ -251,7 +251,7 @@ func (c *cache) SetMessageToCache(ctx context.Context, userID string, msgList [] } } if len(failedMsgs) != 0 { - return len(failedMsgs), errors.New(fmt.Sprintf("set msg to cache failed, failed lists: %q,%s", failedMsgs, mcontext.GetOperationID(ctx))) + return len(failedMsgs), fmt.Errorf("set msg to cache failed, failed lists: %v, %s", failedMsgs, userID) } _, err := pipe.Exec(ctx) return 0, err diff --git a/pkg/common/db/cache/rockscache.go b/pkg/common/db/cache/rockscache.go index f3438d883..55a60fba8 100644 --- a/pkg/common/db/cache/rockscache.go +++ b/pkg/common/db/cache/rockscache.go @@ -3,13 +3,53 @@ package cache import ( "context" "encoding/json" + "errors" + "time" + "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" "github.com/dtm-labs/rockscache" - "time" ) const scanCount = 3000 +var errIndex = errors.New("err index") + +type metaCache interface { + ExecDel(ctx context.Context) error + // delete key rapid + DeleteKey(ctx context.Context, key string) error + AddKeys(keys ...string) + GetPreDeleteKeys() []string +} + +func NewMetaCacheRedis(rcClient *rockscache.Client) metaCache { + return &metaCacheRedis{rcClient: rcClient} +} + +type metaCacheRedis struct { + rcClient *rockscache.Client + keys []string +} + +func (m *metaCacheRedis) ExecDel(ctx context.Context) error { + if len(m.keys) > 0 { + return m.rcClient.TagAsDeletedBatch2(ctx, m.keys) + } + return nil +} + +func (m *metaCacheRedis) DeleteKey(ctx context.Context, key string) error { + return m.rcClient.TagAsDeleted2(ctx, key) +} + +func (m *metaCacheRedis) AddKeys(keys ...string) { + m.keys = append(m.keys, keys...) +} + +func (m *metaCacheRedis) GetPreDeleteKeys() []string { + return m.keys +} + func GetDefaultOpt() rockscache.Options { opts := rockscache.NewDefaultOptions() opts.StrongConsistency = true @@ -17,10 +57,10 @@ func GetDefaultOpt() rockscache.Options { return opts } -func GetCache[T any](ctx context.Context, rcClient *rockscache.Client, key string, expire time.Duration, fn func(ctx context.Context) (T, error)) (T, error) { +func getCache[T any](ctx context.Context, rcClient *rockscache.Client, key string, expire time.Duration, fn func(ctx context.Context) (T, error)) (T, error) { var t T var write bool - v, err := rcClient.Fetch(key, expire, func() (s string, err error) { + v, err := rcClient.Fetch2(ctx, key, expire, func() (s string, err error) { t, err = fn(ctx) if err != nil { return "", err @@ -45,14 +85,37 @@ func GetCache[T any](ctx context.Context, rcClient *rockscache.Client, key strin return t, nil } -func GetCacheFor[E any, T any](ctx context.Context, list []E, fn func(ctx context.Context, item E) (T, error)) ([]T, error) { - rs := make([]T, 0, len(list)) - for _, e := range list { - r, err := fn(ctx, e) +func batchGetCache[T any](ctx context.Context, rcClient *rockscache.Client, keys []string, expire time.Duration, keyIndexFn func(t T, keys []string) (int, error), fn func(ctx context.Context) ([]T, error)) ([]T, error) { + var tArrays []T + batchMap, err := rcClient.FetchBatch2(ctx, keys, expire, func(idxs []int) (m map[int]string, err error) { + values := make(map[int]string) + tArrays, err = fn(ctx) if err != nil { return nil, err } - rs = append(rs, r) + for _, v := range tArrays { + index, err := keyIndexFn(v, keys) + if err != nil { + continue + } + bs, err := json.Marshal(v) + if err != nil { + return nil, utils.Wrap(err, "marshal failed") + } + values[index] = string(bs) + } + return values, nil + }) + if err != nil { + return nil, err } - return rs, nil + for _, v := range batchMap { + var t T + err = json.Unmarshal([]byte(v), &t) + if err != nil { + return nil, utils.Wrap(err, "unmarshal failed") + } + tArrays = append(tArrays, t) + } + return tArrays, nil } diff --git a/pkg/common/db/cache/user.go b/pkg/common/db/cache/user.go index a76e1c3a8..60ed9d8c9 100644 --- a/pkg/common/db/cache/user.go +++ b/pkg/common/db/cache/user.go @@ -2,14 +2,11 @@ package cache import ( "context" - "encoding/json" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/relation" + "time" + relationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation" - "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" "github.com/dtm-labs/rockscache" "github.com/go-redis/redis/v8" - "strconv" - "time" ) const ( @@ -19,21 +16,38 @@ const ( ) type UserCache interface { + metaCache + NewCache() UserCache + GetUserInfo(ctx context.Context, userID string) (userInfo *relationTb.UserModel, err error) + GetUsersInfo(ctx context.Context, userIDs []string) ([]*relationTb.UserModel, error) + DelUsersInfo(userIDs []string) UserCache + GetUserGlobalRecvMsgOpt(ctx context.Context, userID string) (opt int, err error) + DelUsersGlobalRecvMsgOpt(userIDs []string) UserCache } type UserCacheRedis struct { - userDB *relation.UserGorm - + metaCache + userDB relationTb.UserModelInterface expireTime time.Duration - - rcClient *rockscache.Client + rcClient *rockscache.Client } -func NewUserCacheRedis(rdb redis.UniversalClient, userDB *relation.UserGorm, options rockscache.Options) *UserCacheRedis { +func NewUserCacheRedis(rdb redis.UniversalClient, userDB relationTb.UserModelInterface, options rockscache.Options) UserCache { + rcClient := rockscache.NewClient(rdb, options) return &UserCacheRedis{ + metaCache: NewMetaCacheRedis(rcClient), userDB: userDB, expireTime: userExpireTime, - rcClient: rockscache.NewClient(rdb, options), + rcClient: rcClient, + } +} + +func (u *UserCacheRedis) NewCache() UserCache { + return &UserCacheRedis{ + metaCache: u.metaCache, + userDB: u.userDB, + expireTime: u.expireTime, + rcClient: u.rcClient, } } @@ -46,66 +60,50 @@ func (u *UserCacheRedis) getUserGlobalRecvMsgOptKey(userID string) string { } func (u *UserCacheRedis) GetUserInfo(ctx context.Context, userID string) (userInfo *relationTb.UserModel, err error) { - getUserInfo := func() (string, error) { - userInfo, err := u.userDB.Take(ctx, userID) - if err != nil { - return "", err - } - bytes, err := json.Marshal(userInfo) - if err != nil { - return "", utils.Wrap(err, "") - } - return string(bytes), nil - } - userInfoStr, err := u.rcClient.Fetch(u.getUserInfoKey(userID), u.expireTime, getUserInfo) - if err != nil { - return nil, err - } - userInfo = &relationTb.UserModel{} - err = json.Unmarshal([]byte(userInfoStr), userInfo) - return userInfo, utils.Wrap(err, "") + return getCache(ctx, u.rcClient, u.getUserInfoKey(userID), u.expireTime, func(ctx context.Context) (*relationTb.UserModel, error) { + return u.userDB.Take(ctx, userID) + }) } func (u *UserCacheRedis) GetUsersInfo(ctx context.Context, userIDs []string) ([]*relationTb.UserModel, error) { - var users []*relationTb.UserModel - //for _, userID := range userIDs { - // user, err := GetUserInfoFromCache(ctx, userID) - // if err != nil { - // return nil, err - // } - // users = append(users, user) - //} - return users, nil -} - -func (u *UserCacheRedis) DelUserInfo(ctx context.Context, userID string) (err error) { - return u.rcClient.TagAsDeleted(u.getUserInfoKey(userID)) -} - -func (u *UserCacheRedis) DelUsersInfo(ctx context.Context, userIDs []string) (err error) { + var keys []string for _, userID := range userIDs { - if err := u.DelUserInfo(ctx, userID); err != nil { - return err - } + keys = append(keys, u.getUserInfoKey(userID)) } - return nil + return batchGetCache(ctx, u.rcClient, keys, u.expireTime, func(user *relationTb.UserModel, keys []string) (int, error) { + for i, key := range keys { + if key == u.getUserInfoKey(user.UserID) { + return i, nil + } + } + return 0, errIndex + }, func(ctx context.Context) ([]*relationTb.UserModel, error) { + return u.userDB.Find(ctx, userIDs) + }) +} + +func (u *UserCacheRedis) DelUsersInfo(userIDs []string) UserCache { + var keys []string + for _, userID := range userIDs { + keys = append(keys, u.getUserInfoKey(userID)) + } + cache := u.NewCache() + cache.AddKeys(keys...) + return cache } func (u *UserCacheRedis) GetUserGlobalRecvMsgOpt(ctx context.Context, userID string) (opt int, err error) { - getUserGlobalRecvMsgOpt := func() (string, error) { - userInfo, err := u.userDB.Take(ctx, userID) - if err != nil { - return "", err - } - return strconv.Itoa(int(userInfo.GlobalRecvMsgOpt)), nil - } - optStr, err := u.rcClient.Fetch(u.getUserInfoKey(userID), u.expireTime, getUserGlobalRecvMsgOpt) - if err != nil { - return 0, err - } - return strconv.Atoi(optStr) + return getCache(ctx, u.rcClient, u.getUserGlobalRecvMsgOptKey(userID), u.expireTime, func(ctx context.Context) (int, error) { + return u.userDB.GetUserGlobalRecvMsgOpt(ctx, userID) + }) } -func (u *UserCacheRedis) DelUserGlobalRecvMsgOpt(ctx context.Context, userID string) (err error) { - return u.rcClient.TagAsDeleted(u.getUserGlobalRecvMsgOptKey(userID)) +func (u *UserCacheRedis) DelUsersGlobalRecvMsgOpt(userIDs []string) UserCache { + var keys []string + for _, userID := range userIDs { + keys = append(keys, u.getUserGlobalRecvMsgOptKey(userID)) + } + cache := u.NewCache() + cache.AddKeys(keys...) + return cache } diff --git a/pkg/common/db/controller/auth.go b/pkg/common/db/controller/auth.go index dc4b4e3ec..167ff80d0 100644 --- a/pkg/common/db/controller/auth.go +++ b/pkg/common/db/controller/auth.go @@ -2,6 +2,7 @@ package controller import ( "context" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/tokenverify" diff --git a/pkg/common/db/controller/black.go b/pkg/common/db/controller/black.go index 900939973..62c95ac6b 100644 --- a/pkg/common/db/controller/black.go +++ b/pkg/common/db/controller/black.go @@ -3,6 +3,8 @@ package controller import ( "context" "errors" + + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" "gorm.io/gorm" @@ -15,16 +17,18 @@ type BlackDatabase interface { Delete(ctx context.Context, blacks []*relation.BlackModel) (err error) // FindOwnerBlacks 获取黑名单列表 FindOwnerBlacks(ctx context.Context, ownerUserID string, pageNumber, showNumber int32) (blacks []*relation.BlackModel, total int64, err error) + FindBlackIDs(ctx context.Context, ownerUserID string) (blackIDs []string, err error) // CheckIn 检查user2是否在user1的黑名单列表中(inUser1Blacks==true) 检查user1是否在user2的黑名单列表中(inUser2Blacks==true) CheckIn(ctx context.Context, userID1, userID2 string) (inUser1Blacks bool, inUser2Blacks bool, err error) } type blackDatabase struct { black relation.BlackModelInterface + cache cache.BlackCache } -func NewBlackDatabase(black relation.BlackModelInterface) BlackDatabase { - return &blackDatabase{black} +func NewBlackDatabase(black relation.BlackModelInterface, cache cache.BlackCache) BlackDatabase { + return &blackDatabase{black, cache} } // Create 增加黑名单 @@ -66,3 +70,7 @@ func (b *blackDatabase) CheckIn(ctx context.Context, userID1, userID2 string) (i } return } + +func (b *blackDatabase) FindBlackIDs(ctx context.Context, ownerUserID string) (blackIDs []string, err error) { + return b.cache.GetBlackIDs(ctx, ownerUserID) +} diff --git a/pkg/common/db/controller/chatlog.go b/pkg/common/db/controller/chatlog.go index bfcbf7143..fc03719f1 100644 --- a/pkg/common/db/controller/chatlog.go +++ b/pkg/common/db/controller/chatlog.go @@ -6,7 +6,7 @@ import ( ) type ChatLogDatabase interface { - CreateChatLog(msg pbMsg.MsgDataToMQ) error + CreateChatLog(msg *pbMsg.MsgDataToMQ) error GetChatLog(chatLog *relationTb.ChatLogModel, pageNumber, showNumber int32, contentTypes []int32) (int64, []relationTb.ChatLogModel, error) } @@ -18,7 +18,7 @@ type chatLogDatabase struct { chatLogModel relationTb.ChatLogModelInterface } -func (c *chatLogDatabase) CreateChatLog(msg pbMsg.MsgDataToMQ) error { +func (c *chatLogDatabase) CreateChatLog(msg *pbMsg.MsgDataToMQ) error { return c.chatLogModel.Create(msg) } diff --git a/pkg/common/db/controller/conversation.go b/pkg/common/db/controller/conversation.go index ba2d048f6..7474c1f54 100644 --- a/pkg/common/db/controller/conversation.go +++ b/pkg/common/db/controller/conversation.go @@ -2,7 +2,6 @@ package controller import ( "context" - "encoding/json" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache" @@ -12,10 +11,8 @@ import ( ) type ConversationDatabase interface { - //GetUserIDExistConversation 获取拥有该会话的的用户ID列表 - GetUserIDExistConversation(ctx context.Context, userIDList []string, conversationID string) ([]string, error) //UpdateUserConversationFiled 更新用户该会话的属性信息 - UpdateUsersConversationFiled(ctx context.Context, UserIDList []string, conversationID string, args map[string]interface{}) error + UpdateUsersConversationFiled(ctx context.Context, userIDs []string, conversationID string, args map[string]interface{}) error //CreateConversation 创建一批新的会话 CreateConversation(ctx context.Context, conversations []*relationTb.ConversationModel) error //SyncPeerUserPrivateConversation 同步对端私聊会话内部保证事务操作 @@ -29,7 +26,7 @@ type ConversationDatabase interface { //SetUserConversations 设置用户多个会话属性,如果会话不存在则创建,否则更新,内部保证原子性 SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationTb.ConversationModel) error //SetUsersConversationFiledTx 设置多个用户会话关于某个字段的更新操作,如果会话不存在则创建,否则更新,内部保证事务操作 - SetUsersConversationFiledTx(ctx context.Context, userIDList []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) error + SetUsersConversationFiledTx(ctx context.Context, userIDs []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) error } func NewConversationDatabase(conversation relationTb.ConversationModelInterface, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase { @@ -46,22 +43,22 @@ type ConversationDataBase struct { tx tx.Tx } -func (c *ConversationDataBase) SetUsersConversationFiledTx(ctx context.Context, userIDList []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) error { +func (c *ConversationDataBase) SetUsersConversationFiledTx(ctx context.Context, userIDs []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) error { return c.tx.Transaction(func(tx any) error { conversationTx := c.conversationDB.NewTx(tx) - haveUserID, err := conversationTx.FindUserID(ctx, userIDList, conversation.ConversationID) + haveUserIDs, err := conversationTx.FindUserID(ctx, userIDs, conversation.ConversationID) if err != nil { return err } - if len(haveUserID) > 0 { - err = conversationTx.UpdateByMap(ctx, haveUserID, conversation.ConversationID, filedMap) + if len(haveUserIDs) > 0 { + err = conversationTx.UpdateByMap(ctx, haveUserIDs, conversation.ConversationID, filedMap) if err != nil { return err } } - NotUserID := utils.DifferenceString(haveUserID, userIDList) + NotUserIDs := utils.DifferenceString(haveUserIDs, userIDs) var cList []*relationTb.ConversationModel - for _, v := range NotUserID { + for _, v := range NotUserIDs { temp := new(relationTb.ConversationModel) if err := utils.CopyStructFields(temp, conversation); err != nil { return err @@ -73,26 +70,17 @@ func (c *ConversationDataBase) SetUsersConversationFiledTx(ctx context.Context, if err != nil { return err } - if len(NotUserID) > 0 { - err = c.cache.DelUsersConversationIDs(ctx, NotUserID) - if err != nil { - return err - } - } - err = c.cache.DelUsersConversation(ctx, haveUserID, conversation.ConversationID) - if err != nil { - return err - } - return nil + // clear cache + return c.cache.DelConversationIDs(NotUserIDs).DelUsersConversation(haveUserIDs, conversation.ConversationID).ExecDel(ctx) }) } -func (c *ConversationDataBase) GetUserIDExistConversation(ctx context.Context, userIDList []string, conversationID string) ([]string, error) { - panic("implement me") -} - -func (c *ConversationDataBase) UpdateUsersConversationFiled(ctx context.Context, UserIDList []string, conversationID string, args map[string]interface{}) error { - panic("implement me") +func (c *ConversationDataBase) UpdateUsersConversationFiled(ctx context.Context, userIDs []string, conversationID string, args map[string]interface{}) error { + err := c.conversationDB.UpdateByMap(ctx, userIDs, conversationID, args) + if err != nil { + return err + } + return c.cache.DelUsersConversation(userIDs, conversationID).ExecDel(ctx) } func (c *ConversationDataBase) CreateConversation(ctx context.Context, conversations []*relationTb.ConversationModel) error { @@ -100,7 +88,6 @@ func (c *ConversationDataBase) CreateConversation(ctx context.Context, conversat if err := c.conversationDB.NewTx(tx).Create(ctx, conversations); err != nil { return err } - // clear cache return nil }) } @@ -109,20 +96,20 @@ func (c *ConversationDataBase) SyncPeerUserPrivateConversationTx(ctx context.Con return c.tx.Transaction(func(tx any) error { userIDList := []string{conversation.OwnerUserID, conversation.UserID} conversationTx := c.conversationDB.NewTx(tx) - haveUserID, err := conversationTx.FindUserID(ctx, userIDList, conversation.ConversationID) + haveUserIDs, err := conversationTx.FindUserID(ctx, userIDList, conversation.ConversationID) if err != nil { return err } filedMap := map[string]interface{}{"is_private_chat": conversation.IsPrivateChat} - if len(haveUserID) > 0 { - err = conversationTx.UpdateByMap(ctx, haveUserID, conversation.ConversationID, filedMap) + if len(haveUserIDs) > 0 { + err = conversationTx.UpdateByMap(ctx, haveUserIDs, conversation.ConversationID, filedMap) if err != nil { return err } } - NotUserID := utils.DifferenceString(haveUserID, userIDList) + NotUserIDs := utils.DifferenceString(haveUserIDs, userIDList) var cList []*relationTb.ConversationModel - for _, v := range NotUserID { + for _, v := range NotUserIDs { temp := new(relationTb.ConversationModel) if v == conversation.UserID { temp.OwnerUserID = conversation.UserID @@ -138,128 +125,71 @@ func (c *ConversationDataBase) SyncPeerUserPrivateConversationTx(ctx context.Con } cList = append(cList, temp) } - if len(NotUserID) > 0 { + if len(NotUserIDs) > 0 { err = c.conversationDB.Create(ctx, cList) if err != nil { return err } } - err = c.cache.DelUsersConversationIDs(ctx, NotUserID) - if err != nil { - return err - } - err = c.cache.DelUsersConversation(ctx, haveUserID, conversation.ConversationID) - if err != nil { - return err - } - return nil + // clear cache + return c.cache.DelConversationIDs(NotUserIDs).DelUsersConversation(haveUserIDs, conversation.ConversationID).ExecDel(ctx) }) } func (c *ConversationDataBase) FindConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationTb.ConversationModel, error) { - getConversation := func() (string, error) { - conversationList, err := c.conversationDB.Find(ctx, ownerUserID, conversationIDs) - if err != nil { - return "", utils.Wrap(err, "get failed") - } - bytes, err := json.Marshal(conversationList) - if err != nil { - return "", utils.Wrap(err, "Marshal failed") - } - return string(bytes), nil - } - return c.cache.GetConversations(ctx, ownerUserID, conversationIDs, getConversation) + return c.cache.GetConversations(ctx, ownerUserID, conversationIDs) } func (c *ConversationDataBase) GetConversation(ctx context.Context, ownerUserID string, conversationID string) (*relationTb.ConversationModel, error) { - getConversation := func() (string, error) { - conversationList, err := c.conversationDB.Take(ctx, ownerUserID, conversationID) - if err != nil { - return "", utils.Wrap(err, "get failed") - } - bytes, err := json.Marshal(conversationList) - if err != nil { - return "", utils.Wrap(err, "Marshal failed") - } - return string(bytes), nil - } - return c.cache.GetConversation(ctx, ownerUserID, conversationID, getConversation) + return c.cache.GetConversation(ctx, ownerUserID, conversationID) } func (c *ConversationDataBase) GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*relationTb.ConversationModel, error) { - getConversationIDs := func() (string, error) { - conversationIDs, err := c.conversationDB.FindUserIDAllConversationID(ctx, ownerUserID) - if err != nil { - return "", utils.Wrap(err, "getConversationIDList failed") - } - bytes, err := json.Marshal(conversationIDs) - if err != nil { - return "", utils.Wrap(err, "") - } - return string(bytes), nil - } - conversationIDList, err := c.cache.GetUserConversationIDs(ctx, ownerUserID, getConversationIDs) - if err != nil { - return nil, err - } - var conversations []*relationTb.ConversationModel - for _, conversationID := range conversationIDList { - conversation, err := c.GetConversation(ctx, ownerUserID, conversationID) - if err != nil { - return nil, utils.Wrap(err, "GetConversation failed") - } - conversations = append(conversations, conversation) - } - return conversations, nil + return c.cache.GetUserAllConversations(ctx, ownerUserID) } func (c *ConversationDataBase) SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationTb.ConversationModel) error { return c.tx.Transaction(func(tx any) error { - var conversationIDList []string + var conversationIDs []string for _, conversation := range conversations { - conversationIDList = append(conversationIDList, conversation.ConversationID) + conversationIDs = append(conversationIDs, conversation.ConversationID) } conversationTx := c.conversationDB.NewTx(tx) - haveConversations, err := conversationTx.Find(ctx, ownerUserID, conversationIDList) + existConversations, err := conversationTx.Find(ctx, ownerUserID, conversationIDs) if err != nil { return err } - if len(haveConversations) > 0 { + if len(existConversations) > 0 { err = conversationTx.Update(ctx, conversations) if err != nil { return err } } - var haveConversationID []string - for _, conversation := range haveConversations { - haveConversationID = append(haveConversationID, conversation.ConversationID) + var existConversationIDs []string + for _, conversation := range existConversations { + existConversationIDs = append(existConversationIDs, conversation.ConversationID) } - NotConversationID := utils.DifferenceString(haveConversationID, conversationIDList) - var NotConversations []*relationTb.ConversationModel + var notExistConversations []*relationTb.ConversationModel for _, conversation := range conversations { - if !utils.IsContain(conversation.ConversationID, haveConversationID) { - NotConversations = append(NotConversations, conversation) + if !utils.IsContain(conversation.ConversationID, existConversationIDs) { + notExistConversations = append(notExistConversations, conversation) } } - if len(NotConversations) > 0 { - err = c.conversationDB.Create(ctx, NotConversations) + if len(notExistConversations) > 0 { + err = c.conversationDB.Create(ctx, notExistConversations) if err != nil { return err } } - err = c.cache.DelUsersConversationIDs(ctx, NotConversationID) - if err != nil { - return err + cache := c.cache.NewCache() + if len(notExistConversations) > 0 { + cache = cache.DelConversationIDs([]string{ownerUserID}) } - err = c.cache.DelUserConversations(ctx, ownerUserID, haveConversationID) - if err != nil { - return err - } - return nil + return cache.DelConvsersations(ownerUserID, existConversationIDs).ExecDel(ctx) }) } func (c *ConversationDataBase) FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) { - return c.conversationDB.FindRecvMsgNotNotifyUserIDs(ctx, groupID) + return c.cache.GetSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID) } diff --git a/pkg/common/db/controller/extend_msg.go b/pkg/common/db/controller/extend_msg.go index 9931ce0bf..e33e61a38 100644 --- a/pkg/common/db/controller/extend_msg.go +++ b/pkg/common/db/controller/extend_msg.go @@ -2,7 +2,10 @@ package controller import ( "context" + + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache" unRelationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/tx" ) // for mongoDB @@ -18,10 +21,12 @@ type ExtendMsgDatabase interface { type extendMsgDatabase struct { database unRelationTb.ExtendMsgSetModelInterface + cache cache.ExtendMsgSetCache + ctxTx tx.CtxTx } -func NewExtendMsgDatabase(extendMsgModel unRelationTb.ExtendMsgSetModelInterface) ExtendMsgDatabase { - return &extendMsgDatabase{database: extendMsgModel} +func NewExtendMsgDatabase(extendMsgModel unRelationTb.ExtendMsgSetModelInterface, cache cache.ExtendMsgSetCache, ctxTx tx.CtxTx) ExtendMsgDatabase { + return &extendMsgDatabase{database: extendMsgModel, cache: cache, ctxTx: ctxTx} } func (e *extendMsgDatabase) CreateExtendMsgSet(ctx context.Context, set *unRelationTb.ExtendMsgSetModel) error { @@ -41,12 +46,14 @@ func (e *extendMsgDatabase) InsertExtendMsg(ctx context.Context, sourceID string } func (e *extendMsgDatabase) InsertOrUpdateReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*unRelationTb.KeyValueModel) error { + e.cache.DelExtendMsg(clientMsgID).ExecDel(ctx) return e.database.InsertOrUpdateReactionExtendMsgSet(ctx, sourceID, sessionType, clientMsgID, msgFirstModifyTime, reactionExtensionList) } + func (e *extendMsgDatabase) DeleteReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*unRelationTb.KeyValueModel) error { return e.database.DeleteReactionExtendMsgSet(ctx, sourceID, sessionType, clientMsgID, msgFirstModifyTime, reactionExtensionList) } func (e *extendMsgDatabase) GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (extendMsg *unRelationTb.ExtendMsgModel, err error) { - return e.database.TakeExtendMsg(ctx, sourceID, sessionType, clientMsgID, maxMsgUpdateTime) + return e.cache.GetExtendMsg(ctx, sourceID, sessionType, clientMsgID, maxMsgUpdateTime) } diff --git a/pkg/common/db/controller/friend.go b/pkg/common/db/controller/friend.go index f36918132..09aaf5451 100644 --- a/pkg/common/db/controller/friend.go +++ b/pkg/common/db/controller/friend.go @@ -5,6 +5,7 @@ import ( "time" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/tx" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext" @@ -45,10 +46,11 @@ type friendDatabase struct { friend relation.FriendModelInterface friendRequest relation.FriendRequestModelInterface tx tx.Tx + cache cache.FriendCache } -func NewFriendDatabase(friend relation.FriendModelInterface, friendRequest relation.FriendRequestModelInterface, tx tx.Tx) FriendDatabase { - return &friendDatabase{friend: friend, friendRequest: friendRequest, tx: tx} +func NewFriendDatabase(friend relation.FriendModelInterface, friendRequest relation.FriendRequestModelInterface, cache cache.FriendCache, tx tx.Tx) FriendDatabase { + return &friendDatabase{friend: friend, friendRequest: friendRequest, cache: cache, tx: tx} } // ok 检查user2是否在user1的好友列表中(inUser1Friends==true) 检查user1是否在user2的好友列表中(inUser2Friends==true) @@ -116,13 +118,14 @@ func (f *friendDatabase) BecomeFriends(ctx context.Context, ownerUserID string, if err != nil { return err } - fs2, err := f.friend.NewTx(tx).FindReversalFriends(ctx, ownerUserID, friendUserIDs) if err != nil { return err } + var newFriendIDs []string for _, v := range friendUserIDs { fs2 = append(fs2, &relation.FriendModel{OwnerUserID: v, FriendUserID: ownerUserID, AddSource: addSource, OperatorUserID: opUserID}) + newFriendIDs = append(newFriendIDs, v) } fs22 := utils.DistinctAny(fs2, func(e *relation.FriendModel) string { return e.OwnerUserID @@ -131,7 +134,8 @@ func (f *friendDatabase) BecomeFriends(ctx context.Context, ownerUserID string, if err != nil { return err } - return nil + newFriendIDs = append(newFriendIDs, ownerUserID) + return f.cache.DelFriendIDs(newFriendIDs...).ExecDel(ctx) }) } @@ -200,18 +204,30 @@ func (f *friendDatabase) AgreeFriendRequest(ctx context.Context, friendRequest * if err != nil { return err } - return nil + return f.cache.DelFriendIDs(ownerUserID, friendRequest.ToUserID).ExecDel(ctx) }) } // 删除好友 外部判断是否好友关系 func (f *friendDatabase) Delete(ctx context.Context, ownerUserID string, friendUserIDs []string) (err error) { - return f.friend.Delete(ctx, ownerUserID, friendUserIDs) + return f.tx.Transaction(func(tx any) error { + if err := f.friend.Delete(ctx, ownerUserID, friendUserIDs); err != nil { + return err + } + return f.cache.DelFriendIDs(append(friendUserIDs, ownerUserID)...).ExecDel(ctx) + }) + } // 更新好友备注 零值也支持 func (f *friendDatabase) UpdateRemark(ctx context.Context, ownerUserID, friendUserID, remark string) (err error) { - return f.friend.UpdateRemark(ctx, ownerUserID, friendUserID, remark) + return f.tx.Transaction(func(tx any) error { + err := f.friend.UpdateRemark(ctx, ownerUserID, friendUserID, remark) + if err != nil { + return err + } + return f.cache.DelFriend(ownerUserID, friendUserID).ExecDel(ctx) + }) } // 获取ownerUserID的好友列表 无结果不返回错误 @@ -247,5 +263,5 @@ func (f *friendDatabase) FindFriendsWithError(ctx context.Context, ownerUserID s } func (f *friendDatabase) FindFriendUserIDs(ctx context.Context, ownerUserID string) (friendUserIDs []string, err error) { - return f.friend.FindFriendUserIDs(ctx, ownerUserID) + return f.cache.GetFriendIDs(ctx, ownerUserID) } diff --git a/pkg/common/db/controller/group.go b/pkg/common/db/controller/group.go index de73e790a..1a690f452 100644 --- a/pkg/common/db/controller/group.go +++ b/pkg/common/db/controller/group.go @@ -3,6 +3,7 @@ package controller import ( "context" "fmt" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/relation" @@ -104,26 +105,26 @@ func (g *groupDatabase) GetGroupIDsByGroupType(ctx context.Context, groupType in return g.groupDB.GetGroupIDsByGroupType(ctx, groupType) } -func (g *groupDatabase) delGroupMemberCache(ctx context.Context, groupID string, userIDs []string) error { - for _, userID := range userIDs { - if err := g.cache.DelJoinedGroupID(ctx, userID); err != nil { - return err - } - if err := g.cache.DelJoinedSuperGroupIDs(ctx, userID); err != nil { - return err - } - } - if err := g.cache.DelGroupMemberIDs(ctx, groupID); err != nil { - return err - } - if err := g.cache.DelGroupMemberNum(ctx, groupID); err != nil { - return err - } - if err := g.cache.DelGroupMembersHash(ctx, groupID); err != nil { - return err - } - return nil -} +// func (g *groupDatabase) delGroupMemberCache(ctx context.Context, groupID string, userIDs []string) error { +// for _, userID := range userIDs { +// if err := g.cache.DelJoinedGroupID(ctx, userID); err != nil { +// return err +// } +// if err := g.cache.DelJoinedSuperGroupIDs(ctx, userID); err != nil { +// return err +// } +// } +// if err := g.cache.DelGroupMemberIDs(ctx, groupID); err != nil { +// return err +// } +// if err := g.cache.DelGroupMemberNum(ctx, groupID); err != nil { +// return err +// } +// if err := g.cache.DelGroupMembersHash(ctx, groupID); err != nil { +// return err +// } +// return nil +// } func (g *groupDatabase) FindGroupMemberUserID(ctx context.Context, groupID string) ([]string, error) { return g.cache.GetGroupMemberIDs(ctx, groupID) @@ -162,10 +163,7 @@ func (g *groupDatabase) UpdateGroup(ctx context.Context, groupID string, data ma if err := g.groupDB.NewTx(tx).UpdateMap(ctx, groupID, data); err != nil { return err } - if err := g.cache.DelGroupInfo(ctx, groupID); err != nil { - return err - } - return nil + return g.cache.DelGroupsInfo(groupID).ExecDel(ctx) }) } @@ -181,10 +179,7 @@ func (g *groupDatabase) DismissGroup(ctx context.Context, groupID string) error if err != nil { return err } - if err := g.delGroupMemberCache(ctx, groupID, userIDs); err != nil { - return err - } - return nil + return g.cache.DelJoinedGroupID(userIDs...).DelGroupsInfo(groupID).DelGroupMemberIDs(groupID).DelGroupsMemberNum(groupID).DelGroupMembersHash(groupID).ExecDel(ctx) }) } @@ -197,7 +192,7 @@ func (g *groupDatabase) TakeGroupOwner(ctx context.Context, groupID string) (*re } func (g *groupDatabase) FindGroupMember(ctx context.Context, groupIDs []string, userIDs []string, roleLevels []int32) ([]*relationTb.GroupMemberModel, error) { - return g.groupMemberDB.Find(ctx, groupIDs, userIDs, roleLevels) // todo cache group find + return g.cache.GetGroupMembersInfo(ctx, groupIDs[0], userIDs, roleLevels) // todo cache group find } func (g *groupDatabase) PageGroupMember(ctx context.Context, groupIDs []string, userIDs []string, roleLevels []int32, pageNumber, showNumber int32) (uint32, []*relationTb.GroupMemberModel, error) { @@ -217,9 +212,7 @@ func (g *groupDatabase) HandlerGroupRequest(ctx context.Context, groupID string, if err := g.groupMemberDB.NewTx(tx).Create(ctx, []*relationTb.GroupMemberModel{member}); err != nil { return err } - if err := g.delGroupMemberCache(ctx, groupID, []string{userID}); err != nil { - return err - } + return g.cache.DelGroupMembersHash(groupID).DelGroupMemberIDs(groupID).DelGroupsMemberNum(groupID).DelJoinedGroupID(member.UserID).ExecDel(ctx) } return nil }) @@ -230,15 +223,12 @@ func (g *groupDatabase) DeleteGroupMember(ctx context.Context, groupID string, u if err := g.groupMemberDB.NewTx(tx).Delete(ctx, groupID, userIDs); err != nil { return err } - if err := g.delGroupMemberCache(ctx, groupID, userIDs); err != nil { - return err - } - return nil + return g.cache.DelGroupMembersHash(groupID).DelGroupMemberIDs(groupID).DelGroupsMemberNum(groupID).DelJoinedGroupID(userIDs...).ExecDel(ctx) }) } func (g *groupDatabase) MapGroupMemberUserID(ctx context.Context, groupIDs []string) (map[string]*relationTb.GroupSimpleUserID, error) { - return g.cache.GetGroupMemberHash1(ctx, groupIDs) + return g.cache.GetGroupMemberHashMap(ctx, groupIDs) } func (g *groupDatabase) MapGroupMemberNum(ctx context.Context, groupIDs []string) (map[string]uint32, error) { @@ -261,10 +251,7 @@ func (g *groupDatabase) TransferGroupOwner(ctx context.Context, groupID string, if rowsAffected != 1 { return utils.Wrap(fmt.Errorf("newOwnerUserID %s rowsAffected = %d", newOwnerUserID, rowsAffected), "") } - if err := g.delGroupMemberCache(ctx, groupID, []string{oldOwnerUserID, newOwnerUserID}); err != nil { - return err - } - return nil + return g.cache.DelGroupMembersInfo(groupID, oldOwnerUserID, newOwnerUserID).ExecDel(ctx) }) } @@ -273,24 +260,20 @@ func (g *groupDatabase) UpdateGroupMember(ctx context.Context, groupID string, u if err := g.groupMemberDB.NewTx(tx).Update(ctx, groupID, userID, data); err != nil { return err } - if err := g.cache.DelGroupMemberInfo(ctx, groupID, userID); err != nil { - return err - } - return nil + return g.cache.DelGroupMembersInfo(groupID, userID).ExecDel(ctx) }) } func (g *groupDatabase) UpdateGroupMembers(ctx context.Context, data []*relationTb.BatchUpdateGroupMember) error { return g.tx.Transaction(func(tx any) error { + var cache = g.cache.NewCache() for _, item := range data { if err := g.groupMemberDB.NewTx(tx).Update(ctx, item.GroupID, item.UserID, item.Map); err != nil { return err } - if err := g.cache.DelGroupMemberInfo(ctx, item.GroupID, item.UserID); err != nil { - return err - } + cache = cache.DelGroupMembersInfo(item.GroupID, item.UserID) } - return nil + return cache.ExecDel(ctx) }) } diff --git a/pkg/common/db/relation/chat_log_model.go b/pkg/common/db/relation/chat_log_model.go index ecd8cd2a2..05597312c 100644 --- a/pkg/common/db/relation/chat_log_model.go +++ b/pkg/common/db/relation/chat_log_model.go @@ -2,6 +2,7 @@ package relation import ( "fmt" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation" pbMsg "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg" @@ -21,7 +22,7 @@ func NewChatLogGorm(db *gorm.DB) relation.ChatLogModelInterface { return &ChatLogGorm{NewMetaDB(db, &relation.ChatLogModel{})} } -func (c *ChatLogGorm) Create(msg pbMsg.MsgDataToMQ) error { +func (c *ChatLogGorm) Create(msg *pbMsg.MsgDataToMQ) error { chatLog := new(relation.ChatLogModel) copier.Copy(chatLog, msg.MsgData) switch msg.MsgData.SessionType { diff --git a/pkg/common/db/relation/conversation_model.go b/pkg/common/db/relation/conversation_model.go index 5955bc886..72c3710a9 100644 --- a/pkg/common/db/relation/conversation_model.go +++ b/pkg/common/db/relation/conversation_model.go @@ -2,6 +2,7 @@ package relation import ( "context" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" @@ -21,43 +22,56 @@ func (c *ConversationGorm) NewTx(tx any) relation.ConversationModelInterface { } func (c *ConversationGorm) Create(ctx context.Context, conversations []*relation.ConversationModel) (err error) { - return utils.Wrap(c.DB.Create(&conversations).Error, "") + return utils.Wrap(c.db(ctx).Create(&conversations).Error, "") } func (c *ConversationGorm) Delete(ctx context.Context, groupIDs []string) (err error) { - return utils.Wrap(c.DB.Where("group_id in (?)", groupIDs).Delete(&relation.ConversationModel{}).Error, "") + return utils.Wrap(c.db(ctx).Where("group_id in (?)", groupIDs).Delete(&relation.ConversationModel{}).Error, "") } func (c *ConversationGorm) UpdateByMap(ctx context.Context, userIDList []string, conversationID string, args map[string]interface{}) (err error) { - return utils.Wrap(c.DB.Model(&relation.ConversationModel{}).Where("owner_user_id IN (?) and conversation_id=?", userIDList, conversationID).Updates(args).Error, "") + return utils.Wrap(c.db(ctx).Where("owner_user_id IN (?) and conversation_id=?", userIDList, conversationID).Updates(args).Error, "") } func (c *ConversationGorm) Update(ctx context.Context, conversations []*relation.ConversationModel) (err error) { - return utils.Wrap(c.DB.Updates(&conversations).Error, "") + return utils.Wrap(c.db(ctx).Updates(&conversations).Error, "") } func (c *ConversationGorm) Find(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []*relation.ConversationModel, err error) { - err = utils.Wrap(c.DB.Where("owner_user_id=? and conversation_id IN (?)", ownerUserID, conversationIDs).Find(&conversations).Error, "") + err = utils.Wrap(c.db(ctx).Where("owner_user_id=? and conversation_id IN (?)", ownerUserID, conversationIDs).Find(&conversations).Error, "") return conversations, err } func (c *ConversationGorm) Take(ctx context.Context, userID, conversationID string) (conversation *relation.ConversationModel, err error) { cc := &relation.ConversationModel{} - return cc, utils.Wrap(c.DB.Where("conversation_id = ? And owner_user_id = ?", conversationID, userID).Take(cc).Error, "") + return cc, utils.Wrap(c.db(ctx).Where("conversation_id = ? And owner_user_id = ?", conversationID, userID).Take(cc).Error, "") } func (c *ConversationGorm) FindUserID(ctx context.Context, userIDList []string, conversationID string) (existUserID []string, err error) { - return existUserID, utils.Wrap(c.DB.Where(" owner_user_id IN (?) and conversation_id=?", userIDList, conversationID).Pluck("owner_user_id", &existUserID).Error, "") + return existUserID, utils.Wrap(c.db(ctx).Where(" owner_user_id IN (?) and conversation_id=?", userIDList, conversationID).Pluck("owner_user_id", &existUserID).Error, "") } func (c *ConversationGorm) FindConversationID(ctx context.Context, userID string, conversationIDList []string) (existConversationID []string, err error) { - return existConversationID, utils.Wrap(c.DB.Where(" conversation_id IN (?) and owner_user_id=?", conversationIDList, userID).Pluck("conversation_id", &existConversationID).Error, "") + return existConversationID, utils.Wrap(c.db(ctx).Where(" conversation_id IN (?) and owner_user_id=?", conversationIDList, userID).Pluck("conversation_id", &existConversationID).Error, "") } func (c *ConversationGorm) FindUserIDAllConversationID(ctx context.Context, userID string) (conversationIDList []string, err error) { - return conversationIDList, utils.Wrap(c.DB.Model(&relation.ConversationModel{}).Where("owner_user_id=?", userID).Pluck("conversation_id", &conversationIDList).Error, "") + return conversationIDList, utils.Wrap(c.db(ctx).Where("owner_user_id=?", userID).Pluck("conversation_id", &conversationIDList).Error, "") +} + +func (c *ConversationGorm) FindUserIDAllConversations(ctx context.Context, userID string) (conversations []*relation.ConversationModel, err error) { + return conversations, utils.Wrap(c.db(ctx).Where("owner_user_id=?", userID).Find(&conversations).Error, "") } func (c *ConversationGorm) FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (userIDs []string, err error) { - return userIDs, utils.Wrap(c.DB.Model(&relation.ConversationModel{}).Where("group_id = ? and recv_msg_opt = ?", groupID, constant.ReceiveNotNotifyMessage).Pluck("user_id", &userIDs).Error, "") + return userIDs, utils.Wrap(c.db(ctx).Where("group_id = ? and recv_msg_opt = ?", groupID, constant.ReceiveNotNotifyMessage).Pluck("user_id", &userIDs).Error, "") +} + +func (c *ConversationGorm) FindSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (userIDs []string, err error) { + return userIDs, utils.Wrap(c.db(ctx).Where("group_id = ? and recv_msg_opt = ? and conversation_type = ?", groupID, constant.ReceiveNotNotifyMessage, constant.SuperGroupChatType).Pluck("user_id", &userIDs).Error, "") +} + +func (c *ConversationGorm) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error) { + var conversation relation.ConversationModel + return int(conversation.RecvMsgOpt), utils.Wrap(c.db(ctx).Where("conversation_id = ? And owner_user_id = ?", conversationID, ownerUserID).Select("recv_msg_opt").Find(&conversation).Error, "") } diff --git a/pkg/common/db/relation/group_member_model.go b/pkg/common/db/relation/group_member_model.go index d63416e89..5e82dd60b 100644 --- a/pkg/common/db/relation/group_member_model.go +++ b/pkg/common/db/relation/group_member_model.go @@ -2,6 +2,7 @@ package relation import ( "context" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/ormutil" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation" @@ -46,7 +47,7 @@ func (g *GroupMemberGorm) UpdateRoleLevel(ctx context.Context, groupID string, u return db.RowsAffected, utils.Wrap(db.Error, "") } -func (g *GroupMemberGorm) Find(ctx context.Context, groupIDs []string, userIDs []string, roleLevels []int32) (groupList []*relation.GroupMemberModel, err error) { +func (g *GroupMemberGorm) Find(ctx context.Context, groupIDs []string, userIDs []string, roleLevels []int32) (groupMembers []*relation.GroupMemberModel, err error) { db := g.DB if len(groupIDs) > 0 { db = db.Where("group_id in (?)", groupIDs) @@ -57,7 +58,7 @@ func (g *GroupMemberGorm) Find(ctx context.Context, groupIDs []string, userIDs [ if len(roleLevels) > 0 { db = db.Where("role_level in (?)", roleLevels) } - return groupList, utils.Wrap(db.Find(&groupList).Error, "") + return groupMembers, utils.Wrap(db.Find(&groupMembers).Error, "") } func (g *GroupMemberGorm) Take(ctx context.Context, groupID string, userID string) (groupMember *relation.GroupMemberModel, err error) { @@ -100,3 +101,11 @@ func (g *GroupMemberGorm) FindJoinUserID(ctx context.Context, groupIDs []string) func (g *GroupMemberGorm) FindMemberUserID(ctx context.Context, groupID string) (userIDs []string, err error) { return userIDs, utils.Wrap(g.DB.Model(&relation.GroupMemberModel{}).Where("group_id = ?", groupID).Pluck("user_id", &userIDs).Error, "") } + +func (g *GroupMemberGorm) FindUserJoinedGroupID(ctx context.Context, userID string) (groupIDs []string, err error) { + return groupIDs, utils.Wrap(g.DB.Model(&relation.GroupMemberModel{}).Where("user_id = ?", userID).Pluck("group_id", &groupIDs).Error, "") +} + +func (g *GroupMemberGorm) TakeGroupMemberNum(ctx context.Context, groupID string) (count int64, err error) { + return count, utils.Wrap(g.DB.Model(&relation.GroupMemberModel{}).Where("group_id = ?", groupID).Count(&count).Error, "") +} diff --git a/pkg/common/db/relation/user_model.go b/pkg/common/db/relation/user_model.go index 5f29cb268..e147a91f0 100644 --- a/pkg/common/db/relation/user_model.go +++ b/pkg/common/db/relation/user_model.go @@ -2,6 +2,7 @@ package relation import ( "context" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" "gorm.io/gorm" @@ -58,3 +59,8 @@ func (u *UserGorm) GetAllUserID(ctx context.Context) (userIDs []string, err erro err = u.db(ctx).Pluck("user_id", &userIDs).Error return userIDs, err } + +func (u *UserGorm) GetUserGlobalRecvMsgOpt(ctx context.Context, userID string) (opt int, err error) { + err = u.db(ctx).Model(&relation.UserModel{}).Where("user_id = ?", userID).Pluck("global_recv_msg_opt", &opt).Error + return opt, err +} diff --git a/pkg/common/db/table/relation/chatlog.go b/pkg/common/db/table/relation/chatlog.go index fc9680d2a..8f34f2854 100644 --- a/pkg/common/db/table/relation/chatlog.go +++ b/pkg/common/db/table/relation/chatlog.go @@ -33,6 +33,6 @@ func (ChatLogModel) TableName() string { } type ChatLogModelInterface interface { - Create(msg pbMsg.MsgDataToMQ) error + Create(msg *pbMsg.MsgDataToMQ) error GetChatLog(chatLog *ChatLogModel, pageNumber, showNumber int32, contentTypes []int32) (int64, []ChatLogModel, error) } diff --git a/pkg/common/db/table/relation/conversation.go b/pkg/common/db/table/relation/conversation.go index 00e3a26a6..6971d06f3 100644 --- a/pkg/common/db/table/relation/conversation.go +++ b/pkg/common/db/table/relation/conversation.go @@ -32,13 +32,16 @@ func (ConversationModel) TableName() string { type ConversationModelInterface interface { Create(ctx context.Context, conversations []*ConversationModel) (err error) Delete(ctx context.Context, groupIDs []string) (err error) - UpdateByMap(ctx context.Context, userIDList []string, conversationID string, args map[string]interface{}) (err error) + UpdateByMap(ctx context.Context, userIDs []string, conversationID string, args map[string]interface{}) (err error) Update(ctx context.Context, conversations []*ConversationModel) (err error) Find(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []*ConversationModel, err error) - FindUserID(ctx context.Context, userIDList []string, conversationID string) ([]string, error) + FindUserID(ctx context.Context, userIDs []string, conversationID string) ([]string, error) FindUserIDAllConversationID(ctx context.Context, userID string) ([]string, error) Take(ctx context.Context, userID, conversationID string) (conversation *ConversationModel, err error) - FindConversationID(ctx context.Context, userID string, conversationIDList []string) (existConversationID []string, err error) + FindConversationID(ctx context.Context, userID string, conversationIDs []string) (existConversationID []string, err error) + FindUserIDAllConversations(ctx context.Context, userID string) (conversations []*ConversationModel, err error) FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) + GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error) + FindSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) NewTx(tx any) ConversationModelInterface } diff --git a/pkg/common/db/table/relation/group_member.go b/pkg/common/db/table/relation/group_member.go index c05332828..fb1acf3ab 100644 --- a/pkg/common/db/table/relation/group_member.go +++ b/pkg/common/db/table/relation/group_member.go @@ -29,16 +29,18 @@ func (GroupMemberModel) TableName() string { type GroupMemberModelInterface interface { NewTx(tx any) GroupMemberModelInterface - Create(ctx context.Context, groupMemberList []*GroupMemberModel) (err error) + Create(ctx context.Context, groupMembers []*GroupMemberModel) (err error) Delete(ctx context.Context, groupID string, userIDs []string) (err error) DeleteGroup(ctx context.Context, groupIDs []string) (err error) Update(ctx context.Context, groupID string, userID string, data map[string]any) (err error) UpdateRoleLevel(ctx context.Context, groupID string, userID string, roleLevel int32) (rowsAffected int64, err error) - Find(ctx context.Context, groupIDs []string, userIDs []string, roleLevels []int32) (groupList []*GroupMemberModel, err error) + Find(ctx context.Context, groupIDs []string, userIDs []string, roleLevels []int32) (groupMembers []*GroupMemberModel, err error) FindMemberUserID(ctx context.Context, groupID string) (userIDs []string, err error) Take(ctx context.Context, groupID string, userID string) (groupMember *GroupMemberModel, err error) TakeOwner(ctx context.Context, groupID string) (groupMember *GroupMemberModel, err error) SearchMember(ctx context.Context, keyword string, groupIDs []string, userIDs []string, roleLevels []int32, pageNumber, showNumber int32) (total uint32, groupList []*GroupMemberModel, err error) MapGroupMemberNum(ctx context.Context, groupIDs []string) (count map[string]uint32, err error) FindJoinUserID(ctx context.Context, groupIDs []string) (groupUsers map[string][]string, err error) + FindUserJoinedGroupID(ctx context.Context, userID string) (groupIDs []string, err error) + TakeGroupMemberNum(ctx context.Context, groupID string) (count int64, err error) } diff --git a/pkg/common/db/table/relation/user.go b/pkg/common/db/table/relation/user.go index 6773b8412..bbfb483a3 100644 --- a/pkg/common/db/table/relation/user.go +++ b/pkg/common/db/table/relation/user.go @@ -39,4 +39,5 @@ type UserModelInterface interface { // 获取用户信息 不存在,不返回错误 Page(ctx context.Context, pageNumber, showNumber int32) (users []*UserModel, count int64, err error) GetAllUserID(ctx context.Context) (userIDs []string, err error) + GetUserGlobalRecvMsgOpt(ctx context.Context, userID string) (opt int, err error) }