From cfb157d2e4b291d2b94c44fdf968a9dc1c73bf89 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Mon, 27 Mar 2023 15:05:40 +0800 Subject: [PATCH] group --- internal/rpc/group/super_group.go | 19 +++++---- pkg/common/db/cache/group.go | 69 +++++++++++++++++++++++-------- pkg/common/db/cache/rockscache.go | 37 +++++++++++++++++ pkg/common/db/controller/group.go | 41 +++++++++++++----- pkg/utils/utils_v2.go | 2 +- 5 files changed, 130 insertions(+), 38 deletions(-) diff --git a/internal/rpc/group/super_group.go b/internal/rpc/group/super_group.go index 5b34eb483..28edf3748 100644 --- a/internal/rpc/group/super_group.go +++ b/internal/rpc/group/super_group.go @@ -3,6 +3,8 @@ package group import ( "context" "fmt" + "strings" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation" @@ -10,46 +12,45 @@ import ( pbGroup "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/group" sdkws "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" - "strings" ) func (s *groupServer) GetJoinedSuperGroupList(ctx context.Context, req *pbGroup.GetJoinedSuperGroupListReq) (*pbGroup.GetJoinedSuperGroupListResp, error) { resp := &pbGroup.GetJoinedSuperGroupListResp{} - joinSuperGroup, err := s.GroupDatabase.FindJoinSuperGroup(ctx, req.UserID) + groupIDs, err := s.GroupDatabase.FindJoinSuperGroup(ctx, req.UserID) if err != nil { return nil, err } - if len(joinSuperGroup.GroupIDs) == 0 { + if len(groupIDs) == 0 { return resp, nil } - owners, err := s.GroupDatabase.FindGroupMember(ctx, joinSuperGroup.GroupIDs, nil, []int32{constant.GroupOwner}) + owners, err := s.GroupDatabase.FindGroupMember(ctx, groupIDs, nil, []int32{constant.GroupOwner}) if err != nil { return nil, err } ownerMap := utils.SliceToMap(owners, func(e *relation.GroupMemberModel) string { return e.GroupID }) - if ids := utils.Single(joinSuperGroup.GroupIDs, utils.Keys(ownerMap)); len(ids) > 0 { + if ids := utils.Single(groupIDs, utils.Keys(ownerMap)); len(ids) > 0 { return nil, errs.ErrData.Wrap(fmt.Sprintf("super group %s not owner", strings.Join(ids, ","))) } - groups, err := s.GroupDatabase.FindGroup(ctx, joinSuperGroup.GroupIDs) + groups, err := s.GroupDatabase.FindGroup(ctx, groupIDs) if err != nil { return nil, err } groupMap := utils.SliceToMap(groups, func(e *relation.GroupModel) string { return e.GroupID }) - if ids := utils.Single(joinSuperGroup.GroupIDs, utils.Keys(groupMap)); len(ids) > 0 { + if ids := utils.Single(groupIDs, utils.Keys(groupMap)); len(ids) > 0 { return nil, errs.ErrData.Wrap(fmt.Sprintf("super group info %s not found", strings.Join(ids, ","))) } - superGroupMembers, err := s.GroupDatabase.FindSuperGroup(ctx, joinSuperGroup.GroupIDs) + superGroupMembers, err := s.GroupDatabase.FindSuperGroup(ctx, groupIDs) if err != nil { return nil, err } superGroupMemberMap := utils.SliceToMapAny(superGroupMembers, func(e *unrelation.SuperGroupModel) (string, []string) { return e.GroupID, e.MemberIDs }) - resp.Groups = utils.Slice(joinSuperGroup.GroupIDs, func(groupID string) *sdkws.GroupInfo { + resp.Groups = utils.Slice(groupIDs, func(groupID string) *sdkws.GroupInfo { return DbToPbGroupInfo(groupMap[groupID], ownerMap[groupID].UserID, uint32(len(superGroupMemberMap))) }) return resp, nil diff --git a/pkg/common/db/cache/group.go b/pkg/common/db/cache/group.go index 93954f3e1..c4c984fce 100644 --- a/pkg/common/db/cache/group.go +++ b/pkg/common/db/cache/group.go @@ -14,14 +14,15 @@ import ( ) const ( - groupExpireTime = time.Second * 60 * 60 * 12 - groupInfoKey = "GROUP_INFO:" - groupMemberIDsKey = "GROUP_MEMBER_IDS:" - groupMembersHashKey = "GROUP_MEMBERS_HASH:" - groupMemberInfoKey = "GROUP_MEMBER_INFO:" - joinedSuperGroupsKey = "JOIN_SUPER_GROUPS:" - joinedGroupsKey = "JOIN_GROUPS_KEY:" - groupMemberNumKey = "GROUP_MEMBER_NUM_CACHE:" + groupExpireTime = time.Second * 60 * 60 * 12 + groupInfoKey = "GROUP_INFO:" + groupMemberIDsKey = "GROUP_MEMBER_IDS:" + groupMembersHashKey = "GROUP_MEMBERS_HASH:" + groupMemberInfoKey = "GROUP_MEMBER_INFO:" + joinedSuperGroupsKey = "JOIN_SUPER_GROUPS:" + SuperGroupMemberIDsKey = "SUPER_GROUP_MEMBER_IDS:" + joinedGroupsKey = "JOIN_GROUPS_KEY:" + groupMemberNumKey = "GROUP_MEMBER_NUM_CACHE:" ) type GroupCache interface { @@ -29,8 +30,12 @@ type GroupCache interface { NewCache() GroupCache GetGroupsInfo(ctx context.Context, groupIDs []string) (groups []*relationTb.GroupModel, err error) GetGroupInfo(ctx context.Context, groupID string) (group *relationTb.GroupModel, err error) + DelGroupsInfo(groupIDs ...string) GroupCache + GetJoinedSuperGroupIDs(ctx context.Context, userID string) (joinedSuperGroupIDs []string, err error) DelJoinedSuperGroupIDs(userIDs ...string) GroupCache + GetSuperGroupMemberIDs(ctx context.Context, groupIDs ...string) (models []*unrelationTb.SuperGroupModel, err error) + DelSuperGroupMemberIDs(groupIDs ...string) GroupCache GetGroupMembersHash(ctx context.Context, groupID string) (hashCode uint64, err error) GetGroupMemberHashMap(ctx context.Context, groupIDs []string) (map[string]*relationTb.GroupSimpleUserID, error) @@ -48,7 +53,6 @@ type GroupCache interface { GetGroupMemberNum(ctx context.Context, groupID string) (memberNum int64, err error) DelGroupsMemberNum(groupID ...string) GroupCache - DelGroupsInfo(groupIDs ...string) GroupCache } type GroupCacheRedis struct { @@ -85,6 +89,10 @@ func (g *GroupCacheRedis) getJoinedGroupsKey(userID string) string { return joinedGroupsKey + userID } +func (g *GroupCacheRedis) getSuperGroupMemberIDsKey(groupID string) string { + return SuperGroupMemberIDsKey + groupID +} + func (g *GroupCacheRedis) getGroupMembersHashKey(groupID string) string { return groupMembersHashKey + groupID } @@ -148,6 +156,33 @@ func (g *GroupCacheRedis) DelGroupsInfo(groupIDs ...string) GroupCache { return new } +func (g *GroupCacheRedis) GetJoinedSuperGroupIDs(ctx context.Context, userID string) (joinedSuperGroupIDs []string, err error) { + return getCache(ctx, g.rcClient, g.getJoinedSuperGroupsIDKey(userID), g.expireTime, func(ctx context.Context) ([]string, error) { + userGroup, err := g.mongoDB.GetSuperGroupByUserID(ctx, userID) + if err != nil { + return nil, err + } + return userGroup.GroupIDs, nil + }) +} + +func (g *GroupCacheRedis) GetSuperGroupMemberIDs(ctx context.Context, groupIDs ...string) (models []*unrelationTb.SuperGroupModel, err error) { + var keys []string + for _, group := range groupIDs { + keys = append(keys, g.getSuperGroupMemberIDsKey(group)) + } + return batchGetCache(ctx, g.rcClient, keys, g.expireTime, func(model *unrelationTb.SuperGroupModel, keys []string) (int, error) { + for i, key := range keys { + if g.getSuperGroupMemberIDsKey(model.GroupID) == key { + return i, nil + } + } + return 0, errIndex + }, func(ctx context.Context) ([]*unrelationTb.SuperGroupModel, error) { + return g.mongoDB.FindSuperGroup(ctx, groupIDs) + }) +} + // userJoinSuperGroup func (g *GroupCacheRedis) DelJoinedSuperGroupIDs(userIDs ...string) GroupCache { new := g.NewCache() @@ -159,14 +194,14 @@ func (g *GroupCacheRedis) DelJoinedSuperGroupIDs(userIDs ...string) GroupCache { return new } -func (g *GroupCacheRedis) GetJoinedSuperGroupIDs(ctx context.Context, userID string) (joinedSuperGroupIDs []string, err error) { - return getCache(ctx, g.rcClient, g.getJoinedSuperGroupsIDKey(userID), g.expireTime, func(ctx context.Context) ([]string, error) { - userGroup, err := g.mongoDB.GetSuperGroupByUserID(ctx, userID) - if err != nil { - return nil, err - } - return userGroup.GroupIDs, nil - }) +func (g *GroupCacheRedis) DelSuperGroupMemberIDs(groupIDs ...string) GroupCache { + new := g.NewCache() + var keys []string + for _, groupID := range groupIDs { + keys = append(keys, g.getSuperGroupMemberIDsKey(groupID)) + } + new.AddKeys(keys...) + return new } // groupMembersHash diff --git a/pkg/common/db/cache/rockscache.go b/pkg/common/db/cache/rockscache.go index 36c2f449c..8153245dd 100644 --- a/pkg/common/db/cache/rockscache.go +++ b/pkg/common/db/cache/rockscache.go @@ -124,3 +124,40 @@ func batchGetCache[T any](ctx context.Context, rcClient *rockscache.Client, keys } return tArrays, nil } + +func batchGetCacheMap[T any](ctx context.Context, rcClient *rockscache.Client, keys []string, originKeys []string, expire time.Duration, keyIndexFn func(t T, keys []string) (int, error), fn func(ctx context.Context) (map[string]T, error)) (map[string]T, error) { + batchMap, err := rcClient.FetchBatch2(ctx, keys, expire, func(idxs []int) (m map[int]string, err error) { + values := make(map[int]string) + tArrays, err := fn(ctx) + if err != nil { + return nil, err + } + for _, v := range tArrays { + index, err := keyIndexFn(v, keys) + if err != nil { + continue + } + bs, err := json.Marshal(v) + if err != nil { + return nil, utils.Wrap(err, "marshal failed") + } + values[index] = string(bs) + } + return values, nil + }) + if err != nil { + return nil, err + } + tMap := make(map[string]T) + for i, v := range batchMap { + if v != "" { + var t T + err = json.Unmarshal([]byte(v), &t) + if err != nil { + return nil, utils.Wrap(err, "unmarshal failed") + } + tMap[keys[i]] = t + } + } + return tMap, nil +} diff --git a/pkg/common/db/controller/group.go b/pkg/common/db/controller/group.go index 08cef8f87..e620ca1c6 100644 --- a/pkg/common/db/controller/group.go +++ b/pkg/common/db/controller/group.go @@ -13,7 +13,6 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/unrelation" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" "github.com/dtm-labs/rockscache" - _ "github.com/dtm-labs/rockscache" "github.com/go-redis/redis/v8" "go.mongodb.org/mongo-driver/mongo" "gorm.io/gorm" @@ -48,7 +47,7 @@ type GroupDatabase interface { PageGroupRequestUser(ctx context.Context, userID string, pageNumber, showNumber int32) (uint32, []*relationTb.GroupRequestModel, error) // SuperGroupModelInterface FindSuperGroup(ctx context.Context, groupIDs []string) ([]*unRelationTb.SuperGroupModel, error) - FindJoinSuperGroup(ctx context.Context, userID string) (*unRelationTb.UserToSuperGroupModel, error) + FindJoinSuperGroup(ctx context.Context, userID string) ([]string, error) CreateSuperGroup(ctx context.Context, groupID string, initMemberIDList []string) error DeleteSuperGroup(ctx context.Context, groupID string) error DeleteSuperGroupMember(ctx context.Context, groupID string, userIDs []string) error @@ -276,34 +275,54 @@ func (g *groupDatabase) PageGroupRequestUser(ctx context.Context, userID string, return g.groupRequestDB.Page(ctx, userID, pageNumber, showNumber) } -func (g *groupDatabase) FindSuperGroup(ctx context.Context, groupIDs []string) ([]*unRelationTb.SuperGroupModel, error) { - return g.mongoDB.FindSuperGroup(ctx, groupIDs) +func (g *groupDatabase) FindSuperGroup(ctx context.Context, groupIDs []string) (models []*unRelationTb.SuperGroupModel, err error) { + return g.cache.GetSuperGroupMemberIDs(ctx, groupIDs...) } -func (g *groupDatabase) FindJoinSuperGroup(ctx context.Context, userID string) (*unRelationTb.UserToSuperGroupModel, error) { - return g.mongoDB.GetSuperGroupByUserID(ctx, userID) +func (g *groupDatabase) FindJoinSuperGroup(ctx context.Context, userID string) ([]string, error) { + return g.cache.GetJoinedSuperGroupIDs(ctx, userID) } -func (g *groupDatabase) CreateSuperGroup(ctx context.Context, groupID string, initMemberIDList []string) error { +func (g *groupDatabase) CreateSuperGroup(ctx context.Context, groupID string, initMemberIDs []string) error { return g.ctxTx.Transaction(ctx, func(ctx context.Context) error { - return g.mongoDB.CreateSuperGroup(ctx, groupID, initMemberIDList) + if err := g.mongoDB.CreateSuperGroup(ctx, groupID, initMemberIDs); err != nil { + return err + } + return g.cache.DelSuperGroupMemberIDs(groupID).DelJoinedSuperGroupIDs(initMemberIDs...).ExecDel(ctx) }) } func (g *groupDatabase) DeleteSuperGroup(ctx context.Context, groupID string) error { return g.ctxTx.Transaction(ctx, func(ctx context.Context) error { - return g.mongoDB.DeleteSuperGroup(ctx, groupID) + if err := g.mongoDB.DeleteSuperGroup(ctx, groupID); err != nil { + return err + } + models, err := g.cache.GetSuperGroupMemberIDs(ctx, groupID) + if err != nil { + return err + } + cache := g.cache.DelSuperGroupMemberIDs(groupID) + if len(models) > 0 { + cache = cache.DelJoinedSuperGroupIDs(models[0].MemberIDs...) + } + return cache.ExecDel(ctx) }) } func (g *groupDatabase) DeleteSuperGroupMember(ctx context.Context, groupID string, userIDs []string) error { return g.ctxTx.Transaction(ctx, func(ctx context.Context) error { - return g.mongoDB.RemoverUserFromSuperGroup(ctx, groupID, userIDs) + if err := g.mongoDB.RemoverUserFromSuperGroup(ctx, groupID, userIDs); err != nil { + return err + } + return g.cache.DelSuperGroupMemberIDs(groupID).DelJoinedSuperGroupIDs(userIDs...).ExecDel(ctx) }) } func (g *groupDatabase) CreateSuperGroupMember(ctx context.Context, groupID string, userIDs []string) error { return g.ctxTx.Transaction(ctx, func(ctx context.Context) error { - return g.mongoDB.AddUserToSuperGroup(ctx, groupID, userIDs) + if err := g.mongoDB.AddUserToSuperGroup(ctx, groupID, userIDs); err != nil { + return err + } + return g.cache.DelSuperGroupMemberIDs(groupID).DelJoinedSuperGroupIDs(userIDs...).ExecDel(ctx) }) } diff --git a/pkg/utils/utils_v2.go b/pkg/utils/utils_v2.go index 15433cd8b..faf03869a 100644 --- a/pkg/utils/utils_v2.go +++ b/pkg/utils/utils_v2.go @@ -175,7 +175,7 @@ func SliceToMapAny[E any, K comparable, V any](es []E, fn func(e E) (K, V)) map[ // SliceToMap slice to map func SliceToMap[E any, K comparable](es []E, fn func(e E) K) map[K]E { - return SliceToMapOkAny[E, K, E](es, func(e E) (K, E, bool) { + return SliceToMapOkAny(es, func(e E) (K, E, bool) { k := fn(e) return k, e, true })