diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index e2177c895..67b0fba83 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -38,6 +38,7 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e } conversationDB := relation.NewConversationGorm(db) pbConversation.RegisterConversationServer(server, &conversationServer{ + notify: notification.NewCheck(client), groupChecker: check.NewGroupChecker(client), ConversationDatabase: controller.NewConversationDatabase(conversationDB, cache.NewConversationRedis(rdb, cache.GetDefaultOpt(), conversationDB), tx.NewGorm(db)), }) diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index a5c2d77d2..cb6e42778 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -49,6 +49,7 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e pbGroup.RegisterGroupServer(server, &groupServer{ GroupDatabase: controller.InitGroupDatabase(db, rdb, mongo.GetDatabase()), UserCheck: check.NewUserCheck(client), + Notification: notification.NewCheck(client), ConversationChecker: check.NewConversationChecker(client), }) return nil @@ -152,6 +153,7 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupR groupMember.JoinSource = constant.JoinByInvitation groupMember.InviterUserID = mcontext.GetOpUserID(ctx) groupMember.JoinTime = time.Now() + groupMember.MuteEndTime = time.Unix(0, 0) if err := CallbackBeforeMemberJoinGroup(ctx, groupMember, group.Ex); err != nil && err != errs.ErrCallbackContinue { return err } @@ -313,6 +315,8 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbGroup.Invite member.OperatorUserID = opUserID member.InviterUserID = opUserID member.JoinSource = constant.JoinByInvitation + member.JoinTime = time.Now() + member.MuteEndTime = time.Unix(0, 0) if err := CallbackBeforeMemberJoinGroup(ctx, member, group.Ex); err != nil && err != errs.ErrCallbackContinue { return nil, err } @@ -640,6 +644,8 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbGroup.JoinGroupReq) groupMember.OperatorUserID = mcontext.GetOpUserID(ctx) groupMember.JoinSource = constant.JoinByInvitation groupMember.InviterUserID = mcontext.GetOpUserID(ctx) + groupMember.JoinTime = time.Now() + groupMember.MuteEndTime = time.Unix(0, 0) if err := CallbackBeforeMemberJoinGroup(ctx, groupMember, group.Ex); err != nil && err != errs.ErrCallbackContinue { return nil, err } diff --git a/pkg/apiresp/resp.go b/pkg/apiresp/resp.go index b724e1703..cd215a984 100644 --- a/pkg/apiresp/resp.go +++ b/pkg/apiresp/resp.go @@ -14,6 +14,9 @@ type ApiResponse struct { func isAllFieldsPrivate(v any) bool { typeOf := reflect.TypeOf(v) + if typeOf == nil { + return false + } if typeOf.Kind() == reflect.Ptr { typeOf = typeOf.Elem() } diff --git a/pkg/common/db/cache/black.go b/pkg/common/db/cache/black.go index ae4e9d680..bd4640eba 100644 --- a/pkg/common/db/cache/black.go +++ b/pkg/common/db/cache/black.go @@ -46,6 +46,7 @@ func (b *BlackCacheRedis) NewCache() BlackCache { expireTime: b.expireTime, rcClient: b.rcClient, blackDB: b.blackDB, + metaCache: NewMetaCacheRedis(b.rcClient, b.metaCache.GetPreDeleteKeys()...), } } diff --git a/pkg/common/db/cache/conversation.go b/pkg/common/db/cache/conversation.go index 073f6ff55..57b97c165 100644 --- a/pkg/common/db/cache/conversation.go +++ b/pkg/common/db/cache/conversation.go @@ -68,7 +68,7 @@ func NewNewConversationRedis(rdb redis.UniversalClient, conversationDB *relation } func (c *ConversationRedisCache) NewCache() ConversationCache { - return &ConversationRedisCache{rcClient: c.rcClient, metaCache: c.metaCache, conversationDB: c.conversationDB, expireTime: c.expireTime} + return &ConversationRedisCache{rcClient: c.rcClient, metaCache: NewMetaCacheRedis(c.rcClient, c.metaCache.GetPreDeleteKeys()...), conversationDB: c.conversationDB, expireTime: c.expireTime} } func (c *ConversationRedisCache) getConversationKey(ownerUserID, conversationID string) string { @@ -157,16 +157,6 @@ func (c *ConversationRedisCache) GetUserAllConversations(ctx context.Context, ow }) } -func (c *ConversationRedisCache) DelUserConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ConversationCache { - var keys []string - for _, conversationID := range conversationIDs { - keys = append(keys, c.getConversationKey(ownerUserID, conversationID)) - } - cache := c.NewCache() - cache.AddKeys(keys...) - return cache -} - func (c *ConversationRedisCache) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error) { return getCache(ctx, c.rcClient, c.getRecvMsgOptKey(ownerUserID, conversationID), c.expireTime, func(ctx context.Context) (opt int, err error) { return c.conversationDB.GetUserRecvMsgOpt(ctx, ownerUserID, conversationID) diff --git a/pkg/common/db/cache/extend_msg_set.go b/pkg/common/db/cache/extend_msg_set.go index 37ef2cf42..a4440437a 100644 --- a/pkg/common/db/cache/extend_msg_set.go +++ b/pkg/common/db/cache/extend_msg_set.go @@ -40,7 +40,7 @@ func NewExtendMsgSetCacheRedis(rdb redis.UniversalClient, extendMsgSetDB unrelat func (e *ExtendMsgSetCacheRedis) NewCache() ExtendMsgSetCache { return &ExtendMsgSetCacheRedis{ - metaCache: e.metaCache, + metaCache: NewMetaCacheRedis(e.rcClient, e.metaCache.GetPreDeleteKeys()...), expireTime: e.expireTime, extendMsgSetDB: e.extendMsgSetDB, rcClient: e.rcClient, diff --git a/pkg/common/db/cache/friend.go b/pkg/common/db/cache/friend.go index f74df8975..a7540c56e 100644 --- a/pkg/common/db/cache/friend.go +++ b/pkg/common/db/cache/friend.go @@ -48,7 +48,7 @@ func NewFriendCacheRedis(rdb redis.UniversalClient, friendDB relationTb.FriendMo } func (c *FriendCacheRedis) NewCache() FriendCache { - return &FriendCacheRedis{rcClient: c.rcClient, metaCache: c.metaCache, friendDB: c.friendDB, expireTime: c.expireTime} + return &FriendCacheRedis{rcClient: c.rcClient, metaCache: NewMetaCacheRedis(c.rcClient, c.metaCache.GetPreDeleteKeys()...), friendDB: c.friendDB, expireTime: c.expireTime} } func (f *FriendCacheRedis) getFriendIDsKey(ownerUserID string) string { diff --git a/pkg/common/db/cache/group.go b/pkg/common/db/cache/group.go index 657452f31..93954f3e1 100644 --- a/pkg/common/db/cache/group.go +++ b/pkg/common/db/cache/group.go @@ -70,7 +70,7 @@ func NewGroupCacheRedis(rdb redis.UniversalClient, groupDB relationTb.GroupModel } func (g *GroupCacheRedis) NewCache() GroupCache { - return &GroupCacheRedis{rcClient: g.rcClient, expireTime: g.expireTime, groupDB: g.groupDB, groupMemberDB: g.groupMemberDB, groupRequestDB: g.groupRequestDB, mongoDB: g.mongoDB, metaCache: g.metaCache} + return &GroupCacheRedis{rcClient: g.rcClient, expireTime: g.expireTime, groupDB: g.groupDB, groupMemberDB: g.groupMemberDB, groupRequestDB: g.groupRequestDB, mongoDB: g.mongoDB, metaCache: NewMetaCacheRedis(g.rcClient, g.metaCache.GetPreDeleteKeys()...)} } func (g *GroupCacheRedis) getGroupInfoKey(groupID string) string { diff --git a/pkg/common/db/cache/rockscache.go b/pkg/common/db/cache/rockscache.go index 7c6a43bed..36c2f449c 100644 --- a/pkg/common/db/cache/rockscache.go +++ b/pkg/common/db/cache/rockscache.go @@ -4,8 +4,10 @@ import ( "context" "encoding/json" "errors" + "time" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" "github.com/dtm-labs/rockscache" ) @@ -22,8 +24,8 @@ type metaCache interface { GetPreDeleteKeys() []string } -func NewMetaCacheRedis(rcClient *rockscache.Client) metaCache { - return &metaCacheRedis{rcClient: rcClient} +func NewMetaCacheRedis(rcClient *rockscache.Client, keys ...string) metaCache { + return &metaCacheRedis{rcClient: rcClient, keys: keys} } type metaCacheRedis struct { @@ -33,6 +35,7 @@ type metaCacheRedis struct { func (m *metaCacheRedis) ExecDel(ctx context.Context) error { if len(m.keys) > 0 { + log.ZDebug(ctx, "DelKey", "keys", m.keys) return m.rcClient.TagAsDeletedBatch2(ctx, m.keys) } return nil @@ -86,10 +89,9 @@ func getCache[T any](ctx context.Context, rcClient *rockscache.Client, key strin } func batchGetCache[T any](ctx context.Context, rcClient *rockscache.Client, keys []string, expire time.Duration, keyIndexFn func(t T, keys []string) (int, error), fn func(ctx context.Context) ([]T, error)) ([]T, error) { - var tArrays []T batchMap, err := rcClient.FetchBatch2(ctx, keys, expire, func(idxs []int) (m map[int]string, err error) { values := make(map[int]string) - tArrays, err = fn(ctx) + tArrays, err := fn(ctx) if err != nil { return nil, err } @@ -109,6 +111,7 @@ func batchGetCache[T any](ctx context.Context, rcClient *rockscache.Client, keys if err != nil { return nil, err } + var tArrays []T for _, v := range batchMap { if v != "" { var t T diff --git a/pkg/common/db/cache/user.go b/pkg/common/db/cache/user.go index 60ed9d8c9..0679720d2 100644 --- a/pkg/common/db/cache/user.go +++ b/pkg/common/db/cache/user.go @@ -44,7 +44,7 @@ func NewUserCacheRedis(rdb redis.UniversalClient, userDB relationTb.UserModelInt func (u *UserCacheRedis) NewCache() UserCache { return &UserCacheRedis{ - metaCache: u.metaCache, + metaCache: NewMetaCacheRedis(u.rcClient, u.metaCache.GetPreDeleteKeys()...), userDB: u.userDB, expireTime: u.expireTime, rcClient: u.rcClient, diff --git a/pkg/common/db/controller/conversation.go b/pkg/common/db/controller/conversation.go index 0a3b90f84..e72b06850 100644 --- a/pkg/common/db/controller/conversation.go +++ b/pkg/common/db/controller/conversation.go @@ -75,8 +75,10 @@ func (c *ConversationDataBase) SetUsersConversationFiledTx(ctx context.Context, return err } cache = cache.DelConversationIDs(NotUserIDs) + log.ZDebug(ctx, "SetUsersConversationFiledTx", "cache", cache.GetPreDeleteKeys(), "addr", &cache) } // clear cache + log.ZDebug(ctx, "SetUsersConversationFiledTx", "cache", cache.GetPreDeleteKeys(), "addr", &cache) return cache.DelUsersConversation(haveUserIDs, conversation.ConversationID).ExecDel(ctx) }) }