diff --git a/internal/common/rpc_server/register.go b/internal/common/rpcserver/register.go similarity index 92% rename from internal/common/rpc_server/register.go rename to internal/common/rpcserver/register.go index 107c0cbf8..bc9e25fd7 100644 --- a/internal/common/rpc_server/register.go +++ b/internal/common/rpcserver/register.go @@ -1,4 +1,4 @@ -package rpc_server +package rpcserver import ( "Open_IM/internal/common/network" @@ -31,12 +31,11 @@ func NewRpcServer(registerIPInConfig string, port int, registerName string, zkSe if err != nil { return nil, err } - err = zkClient.Register(s.RegisterName, registerIP, s.Port) + s.RegisterCenter = zkClient + err = s.RegisterCenter.Register(s.RegisterName, registerIP, s.Port) if err != nil { return nil, err } - s.RegisterCenter = zkClient - return s, nil } diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index f723731e4..c6df00f05 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -20,7 +20,7 @@ import ( ) func NewRpcAuthServer(port int) *rpcAuth { - r, err := rpc_server.NewRpcServer(config.Config.RpcRegisterIP, port, config.Config.RpcRegisterName.OpenImAuthName, config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema) + r, err := rpcserver.NewRpcServer(config.Config.RpcRegisterIP, port, config.Config.RpcRegisterName.OpenImAuthName, config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema) if err != nil { panic(err) } @@ -35,7 +35,7 @@ func NewRpcAuthServer(port int) *rpcAuth { func (s *rpcAuth) Run() { operationID := utils.OperationIDGenerator() log.NewInfo(operationID, "rpc auth start...") - listener, address, err := rpc_server.GetTcpListen(config.Config.ListenIP, s.Port) + listener, address, err := rpcserver.GetTcpListen(config.Config.ListenIP, s.Port) if err != nil { panic(err) } @@ -141,6 +141,6 @@ func (s *rpcAuth) forceKickOff(ctx context.Context, userID string, platformID in } type rpcAuth struct { - *rpc_server.RpcServer + *rpcserver.RpcServer controller.AuthInterface } diff --git a/internal/rpc/friend/friend.go b/internal/rpc/friend/friend.go index e04700a91..00704381e 100644 --- a/internal/rpc/friend/friend.go +++ b/internal/rpc/friend/friend.go @@ -23,14 +23,14 @@ import ( ) type friendServer struct { - *rpc_server.RpcServer + *rpcserver.RpcServer controller.FriendInterface controller.BlackInterface } func NewFriendServer(port int) *friendServer { - r, err := rpc_server.NewRpcServer(config.Config.RpcRegisterIP, port, config.Config.RpcRegisterName.OpenImFriendName, config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema) + r, err := rpcserver.NewRpcServer(config.Config.RpcRegisterIP, port, config.Config.RpcRegisterName.OpenImFriendName, config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema) if err != nil { panic(err) } @@ -64,7 +64,7 @@ func NewFriendServer(port int) *friendServer { func (s *friendServer) Run() { operationID := utils.OperationIDGenerator() log.NewInfo(operationID, "friendServer run...") - listener, address, err := rpc_server.GetTcpListen(config.Config.ListenIP, s.Port) + listener, address, err := rpcserver.GetTcpListen(config.Config.ListenIP, s.Port) if err != nil { panic(err) } diff --git a/internal/rpc/group/g.go b/internal/rpc/group/g.go index db056ef63..3bbbb4afa 100644 --- a/internal/rpc/group/g.go +++ b/internal/rpc/group/g.go @@ -10,6 +10,7 @@ import ( "errors" "math/big" "strconv" + "strings" "time" ) @@ -45,6 +46,24 @@ func GetPublicUserInfoMap(ctx context.Context, userIDs []string) (map[string]*sd }), nil } +func GetUsername(ctx context.Context, userIDs []string) (map[string]string, error) { + if len(userIDs) == 0 { + return map[string]string{}, nil + } + users, err := GetPublicUserInfo(ctx, userIDs) + if err != nil { + return nil, err + } + if ids := utils.Single(userIDs, utils.Slice(users, func(e *sdkws.PublicUserInfo) string { + return e.UserID + })); len(ids) > 0 { + return nil, constant.ErrUserIDNotFound.Wrap(strings.Join(ids, ",")) + } + return utils.SliceToMapAny(users, func(e *sdkws.PublicUserInfo) (string, string) { + return e.UserID, e.Nickname + }), nil +} + func GroupNotification(ctx context.Context, groupID string) { var conversationReq pbConversation.ModifyConversationFieldReq conversation := pbConversation.Conversation{ diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 08a32bc35..9bdb77165 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -159,6 +159,7 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupR group.GroupID = genGroupID(ctx, req.GroupInfo.GroupID) joinGroup := func(userID string, roleLevel int32) error { groupMember := PbToDbGroupMember(userMap[userID]) + groupMember.Nickname = "" groupMember.GroupID = group.GroupID groupMember.RoleLevel = roleLevel groupMember.OperatorUserID = tracelog.GetOpUserID(ctx) @@ -240,7 +241,7 @@ func (s *groupServer) GetJoinedGroupList(ctx context.Context, req *pbGroup.GetJo resp.Groups = utils.Slice(utils.Order(groupIDs, groups, func(group *relationTb.GroupModel) string { return group.GroupID }), func(group *relationTb.GroupModel) *open_im_sdk.GroupInfo { - return DbToPbGroupInfo(group, ownerMap[group.GroupID].UserID, uint32(groupMemberNum[group.GroupID])) + return DbToPbGroupInfo(group, ownerMap[group.GroupID].UserID, groupMemberNum[group.GroupID]) }) return resp, nil } @@ -321,6 +322,7 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbGroup.Invite var groupMembers []*relationTb.GroupMemberModel for _, userID := range req.InvitedUserIDs { member := PbToDbGroupMember(userMap[userID]) + member.Nickname = "" member.GroupID = req.GroupID member.RoleLevel = constant.GroupOrdinaryUsers member.OperatorUserID = opUserID @@ -352,7 +354,16 @@ func (s *groupServer) GetGroupAllMember(ctx context.Context, req *pbGroup.GetGro if err != nil { return nil, err } + nameMap, err := GetUsername(ctx, utils.Filter(members, func(e *relationTb.GroupMemberModel) (string, bool) { + return e.UserID, e.Nickname == "" + })) + if err != nil { + return nil, err + } resp.Members = utils.Slice(members, func(e *relationTb.GroupMemberModel) *open_im_sdk.GroupMemberFullInfo { + if e.Nickname == "" { + e.Nickname = nameMap[e.UserID] + } return DbToPbGroupMembersCMSResp(e) }) return resp, nil @@ -365,7 +376,16 @@ func (s *groupServer) GetGroupMemberList(ctx context.Context, req *pbGroup.GetGr return nil, err } resp.Total = total + nameMap, err := GetUsername(ctx, utils.Filter(members, func(e *relationTb.GroupMemberModel) (string, bool) { + return e.UserID, e.Nickname == "" + })) + if err != nil { + return nil, err + } resp.Members = utils.Slice(members, func(e *relationTb.GroupMemberModel) *open_im_sdk.GroupMemberFullInfo { + if e.Nickname == "" { + e.Nickname = nameMap[e.UserID] + } return DbToPbGroupMembersCMSResp(e) }) return resp, nil @@ -450,7 +470,16 @@ func (s *groupServer) GetGroupMembersInfo(ctx context.Context, req *pbGroup.GetG if err != nil { return nil, err } + nameMap, err := GetUsername(ctx, utils.Filter(members, func(e *relationTb.GroupMemberModel) (string, bool) { + return e.UserID, e.Nickname == "" + })) + if err != nil { + return nil, err + } resp.Members = utils.Slice(members, func(e *relationTb.GroupMemberModel) *open_im_sdk.GroupMemberFullInfo { + if e.Nickname == "" { + e.Nickname = nameMap[e.UserID] + } return DbToPbGroupMembersCMSResp(e) }) return resp, nil @@ -801,7 +830,16 @@ func (s *groupServer) GetGroupMembersCMS(ctx context.Context, req *pbGroup.GetGr return nil, err } resp.Total = total + nameMap, err := GetUsername(ctx, utils.Filter(members, func(e *relationTb.GroupMemberModel) (string, bool) { + return e.UserID, e.Nickname == "" + })) + if err != nil { + return nil, err + } resp.Members = utils.Slice(members, func(e *relationTb.GroupMemberModel) *open_im_sdk.GroupMemberFullInfo { + if e.Nickname == "" { + e.Nickname = nameMap[e.UserID] + } return DbToPbGroupMembersCMSResp(e) }) return resp, nil @@ -1011,8 +1049,8 @@ func (s *groupServer) SetGroupMemberInfo(ctx context.Context, req *pbGroup.SetGr return nil, err } } - err = s.GroupInterface.UpdateGroupMembers(ctx, utils.Slice(req.Members, func(e *pbGroup.SetGroupMemberInfo) *controller.BatchUpdateGroupMember { - return &controller.BatchUpdateGroupMember{ + err = s.GroupInterface.UpdateGroupMembers(ctx, utils.Slice(req.Members, func(e *pbGroup.SetGroupMemberInfo) *relationTb.BatchUpdateGroupMember { + return &relationTb.BatchUpdateGroupMember{ GroupID: e.GroupID, UserID: e.UserID, Map: UpdateGroupMemberMap(e), @@ -1053,7 +1091,7 @@ func (s *groupServer) GetGroupAbstractInfo(ctx context.Context, req *pbGroup.Get } resp.GroupAbstractInfos = utils.Slice(groups, func(group *relationTb.GroupModel) *pbGroup.GroupAbstractInfo { users := groupUserMap[group.GroupID] - return DbToPbGroupAbstractInfo(group.GroupID, uint32(len(users.UserIDs)), users.Hash) + return DbToPbGroupAbstractInfo(group.GroupID, users.MemberNum, users.Hash) }) return resp, nil } @@ -1067,7 +1105,16 @@ func (s *groupServer) GetUserInGroupMembers(ctx context.Context, req *pbGroup.Ge if err != nil { return nil, err } + nameMap, err := GetUsername(ctx, utils.Filter(members, func(e *relationTb.GroupMemberModel) (string, bool) { + return e.UserID, e.Nickname == "" + })) + if err != nil { + return nil, err + } resp.Members = utils.Slice(members, func(e *relationTb.GroupMemberModel) *open_im_sdk.GroupMemberFullInfo { + if e.Nickname == "" { + e.Nickname = nameMap[e.UserID] + } return DbToPbGroupMembersCMSResp(e) }) return resp, nil @@ -1075,10 +1122,10 @@ func (s *groupServer) GetUserInGroupMembers(ctx context.Context, req *pbGroup.Ge func (s *groupServer) GetGroupMemberUserID(ctx context.Context, req *pbGroup.GetGroupMemberUserIDReq) (*pbGroup.GetGroupMemberUserIDResp, error) { resp := &pbGroup.GetGroupMemberUserIDResp{} - userIDs, err := s.GroupInterface.FindGroupMemberUserID(ctx, req.GroupID) + var err error + resp.UserIDs, err = s.GroupInterface.FindGroupMemberUserID(ctx, req.GroupID) if err != nil { return nil, err } - resp.UserIDs = userIDs return resp, nil } diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index 3afedc5f6..a7c50d823 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -25,12 +25,12 @@ import ( type userServer struct { rpcPort int rpcRegisterName string - *rpc_server.RpcServer + *rpcserver.RpcServer controller.UserInterface } func NewUserServer(port int) *userServer { - r, err := rpc_server.NewRpcServer(config.Config.RpcRegisterIP, port, config.Config.RpcRegisterName.OpenImUserName, config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema) + r, err := rpcserver.NewRpcServer(config.Config.RpcRegisterIP, port, config.Config.RpcRegisterName.OpenImUserName, config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema) if err != nil { panic(err) } @@ -52,7 +52,7 @@ func NewUserServer(port int) *userServer { func (s *userServer) Run() { operationID := utils.OperationIDGenerator() log.NewInfo(operationID, "rpc user start...") - listener, address, err := rpc_server.GetTcpListen(config.Config.ListenIP, s.Port) + listener, address, err := rpcserver.GetTcpListen(config.Config.ListenIP, s.Port) if err != nil { panic(err) } diff --git a/pkg/common/db/cache/extend_msg_set.go b/pkg/common/db/cache/extend_msg_set.go index dc5d0e3a5..6a02f0af3 100644 --- a/pkg/common/db/cache/extend_msg_set.go +++ b/pkg/common/db/cache/extend_msg_set.go @@ -9,6 +9,11 @@ import ( "time" ) +const ( + extendMsgSetCache = "EXTEND_MSG_SET_CACHE:" + extendMsgCache = "EXTEND_MSG_CACHE:" +) + type ExtendMsgSetCache struct { expireTime time.Duration rcClient *rockscache.Client diff --git a/pkg/common/db/cache/group.go b/pkg/common/db/cache/group.go index c5a9ef51b..8376a1d56 100644 --- a/pkg/common/db/cache/group.go +++ b/pkg/common/db/cache/group.go @@ -1,7 +1,6 @@ package cache import ( - "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db/relation" relationTb "Open_IM/pkg/common/db/table/relation" "Open_IM/pkg/common/db/unrelation" @@ -12,8 +11,8 @@ import ( "github.com/dtm-labs/rockscache" "github.com/go-redis/redis/v8" "math/big" - "sort" "strconv" + "strings" "time" ) @@ -117,22 +116,6 @@ func (g *GroupCacheRedis) GetGroupInfo(ctx context.Context, groupID string) (gro }) } -func (g *GroupCacheRedis) DelGroupInfo(ctx context.Context, groupID string) (err error) { - defer func() { - tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID) - }() - return g.rcClient.TagAsDeleted(g.getGroupInfoKey(groupID)) -} - -func (g *GroupCacheRedis) DelGroupsInfo(ctx context.Context, groupIDs []string) error { - for _, groupID := range groupIDs { - if err := g.DelGroupInfo(ctx, groupID); err != nil { - return err - } - } - return nil -} - // userJoinSuperGroup func (g *GroupCacheRedis) BatchDelJoinedSuperGroupIDs(ctx context.Context, userIDs []string) (err error) { for _, userID := range userIDs { @@ -160,43 +143,18 @@ func (g *GroupCacheRedis) GetJoinedSuperGroupIDs(ctx context.Context, userID str }) } -//// groupMembersHash -//func (g *GroupCacheRedis) GetGroupsMembersHash(ctx context.Context, groupIDs []string) (map[string]uint64, error) { -// return GetCache(ctx, g.rcClient, g.getGroupMembersHashKey(groupID), g.expireTime, "") -//} - // groupMembersHash func (g *GroupCacheRedis) GetGroupMembersHash(ctx context.Context, groupID string) (hashCodeUint64 uint64, err error) { - generateHash := func() (string, error) { - groupInfo, err := g.GetGroupInfo(ctx, groupID) + return GetCache(ctx, g.rcClient, g.getGroupMembersHashKey(groupID), g.expireTime, func(ctx context.Context) (uint64, error) { + userIDs, err := g.GetGroupMemberIDs(ctx, groupID) if err != nil { - return "", err - } - if groupInfo.Status == constant.GroupStatusDismissed { - return "0", nil - } - groupMemberIDList, err := g.GetGroupMemberIDs(ctx, groupID) - if err != nil { - return "", err - } - sort.Strings(groupMemberIDList) - var all string - for _, v := range groupMemberIDList { - all += v + return 0, err } + utils.Sort(userIDs, true) bi := big.NewInt(0) - bi.SetString(utils.Md5(all)[0:8], 16) - return strconv.Itoa(int(bi.Uint64())), nil - } - defer func() { - tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "hashCodeUint64", hashCodeUint64) - }() - hashCodeStr, err := g.rcClient.Fetch(g.getGroupMembersHashKey(groupID), time.Second*30*60, generateHash) - if err != nil { - return 0, utils.Wrap(err, "fetch failed") - } - hashCode, err := strconv.Atoi(hashCodeStr) - return uint64(hashCode), err + bi.SetString(utils.Md5(strings.Join(userIDs, ";"))[0:8], 16) + return bi.Uint64(), nil + }) } func (g *GroupCacheRedis) DelGroupMembersHash(ctx context.Context, groupID string) (err error) { @@ -207,41 +165,10 @@ func (g *GroupCacheRedis) DelGroupMembersHash(ctx context.Context, groupID strin } // groupMemberIDs -// from redis func (g *GroupCacheRedis) GetGroupMemberIDs(ctx context.Context, groupID string) (groupMemberIDs []string, err error) { - f := func() (string, error) { - groupInfo, err := g.GetGroupInfo(ctx, groupID) - if err != nil { - return "", err - } - var groupMemberIDList []string - if groupInfo.GroupType == constant.SuperGroup { - superGroup, err := g.mongoDB.GetSuperGroup(ctx, groupID) - if err != nil { - return "", err - } - groupMemberIDList = superGroup.MemberIDList - } else { - groupMemberIDList, err = relation.GetGroupMemberIDListByGroupID(groupID) - if err != nil { - return "", err - } - } - bytes, err := json.Marshal(groupMemberIDList) - if err != nil { - return "", utils.Wrap(err, "") - } - return string(bytes), nil - } - defer func() { - tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "groupMemberIDList", groupMemberIDs) - }() - groupIDListStr, err := g.rcClient.Fetch(g.getGroupMemberIDsKey(groupID), time.Second*30*60, f) - if err != nil { - return nil, err - } - err = json.Unmarshal([]byte(groupIDListStr), &groupMemberIDs) - return groupMemberIDs, nil + return GetCache(ctx, g.rcClient, g.getGroupMemberIDsKey(groupID), g.expireTime, func(ctx context.Context) ([]string, error) { + return g.groupMember.FindMemberUserID(ctx, groupID) + }) } func (g *GroupCacheRedis) DelGroupMemberIDs(ctx context.Context, groupID string) (err error) { @@ -389,3 +316,19 @@ func (g *GroupCacheRedis) DelGroupMemberNum(ctx context.Context, groupID string) }() return g.rcClient.TagAsDeleted(g.getGroupMemberNumKey(groupID)) } + +func (g *GroupCacheRedis) DelGroupInfo(ctx context.Context, groupID string) (err error) { + defer func() { + tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID) + }() + return g.rcClient.TagAsDeleted(g.getGroupInfoKey(groupID)) +} + +func (g *GroupCacheRedis) DelGroupsInfo(ctx context.Context, groupIDs []string) error { + for _, groupID := range groupIDs { + if err := g.DelGroupInfo(ctx, groupID); err != nil { + return err + } + } + return nil +} diff --git a/pkg/common/db/cache/rockscache.go b/pkg/common/db/cache/rockscache.go index c036de5f8..066c7367f 100644 --- a/pkg/common/db/cache/rockscache.go +++ b/pkg/common/db/cache/rockscache.go @@ -1,42 +1,15 @@ package cache import ( - "Open_IM/pkg/common/constant" - "Open_IM/pkg/common/db/relation" - "Open_IM/pkg/common/log" - "Open_IM/pkg/common/tracelog" "Open_IM/pkg/utils" "context" "encoding/json" - "math/big" - "sort" - "strconv" + "github.com/dtm-labs/rockscache" "time" ) -const ( - //userInfoCache = "USER_INFO_CACHE:" - //friendRelationCache = "FRIEND_RELATION_CACHE:" - blackListCache = "BLACK_LIST_CACHE:" - //groupCache = "GROUP_CACHE:" - //groupInfoCache = "GROUP_INFO_CACHE:" - //groupOwnerIDCache = "GROUP_OWNER_ID:" - //joinedGroupListCache = "JOINED_GROUP_LIST_CACHE:" - //groupMemberInfoCache = "GROUP_MEMBER_INFO_CACHE:" - //groupAllMemberInfoCache = "GROUP_ALL_MEMBER_INFO_CACHE:" - //allFriendInfoCache = "ALL_FRIEND_INFO_CACHE:" - //joinedSuperGroupListCache = "JOINED_SUPER_GROUP_LIST_CACHE:" - //groupMemberListHashCache = "GROUP_MEMBER_LIST_HASH_CACHE:" - //groupMemberNumCache = "GROUP_MEMBER_NUM_CACHE:" - conversationCache = "CONVERSATION_CACHE:" - conversationIDListCache = "CONVERSATION_ID_LIST_CACHE:" - - extendMsgSetCache = "EXTEND_MSG_SET_CACHE:" - extendMsgCache = "EXTEND_MSG_CACHE:" -) - const scanCount = 3000 -const RandomExpireAdjustment = 0.2 + func (rc *RcClient) DelKeys() { for _, key := range []string{"GROUP_CACHE:", "FRIEND_RELATION_CACHE", "BLACK_LIST_CACHE:", "USER_INFO_CACHE:", "GROUP_INFO_CACHE", groupOwnerIDCache, joinedGroupListCache, @@ -68,3 +41,44 @@ func (rc *RcClient) DelKeys() { } } } + + +func GetCache[T any](ctx context.Context, rcClient *rockscache.Client, key string, expire time.Duration, fn func(ctx context.Context) (T, error)) (T, error) { + var t T + var write bool + v, err := rcClient.Fetch(key, expire, func() (s string, err error) { + t, err = fn(ctx) + if err != nil { + return "", err + } + bs, err := json.Marshal(t) + if err != nil { + return "", utils.Wrap(err, "") + } + write = true + return string(bs), nil + }) + if err != nil { + return t, err + } + if write { + return t, nil + } + err = json.Unmarshal([]byte(v), &t) + if err != nil { + return t, utils.Wrap(err, "") + } + return t, nil +} + +func GetCacheFor[E any, T any](ctx context.Context, list []E, fn func(ctx context.Context, item E) (T, error)) ([]T, error) { + rs := make([]T, 0, len(list)) + for _, e := range list { + r, err := fn(ctx, e) + if err != nil { + return nil, err + } + rs = append(rs, r) + } + return rs, nil +} diff --git a/pkg/common/db/cache/utils.go b/pkg/common/db/cache/utils.go deleted file mode 100644 index d007ff58a..000000000 --- a/pkg/common/db/cache/utils.go +++ /dev/null @@ -1,49 +0,0 @@ -package cache - -import ( - "Open_IM/pkg/utils" - "context" - "encoding/json" - "github.com/dtm-labs/rockscache" - "time" -) - -func GetCache[T any](ctx context.Context, rcClient *rockscache.Client, key string, expire time.Duration, fn func(ctx context.Context) (T, error)) (T, error) { - var t T - var write bool - v, err := rcClient.Fetch(key, expire, func() (s string, err error) { - t, err = fn(ctx) - if err != nil { - return "", err - } - bs, err := json.Marshal(t) - if err != nil { - return "", utils.Wrap(err, "") - } - write = true - return string(bs), nil - }) - if err != nil { - return t, err - } - if write { - return t, nil - } - err = json.Unmarshal([]byte(v), &t) - if err != nil { - return t, utils.Wrap(err, "") - } - return t, nil -} - -func GetCacheFor[E any, T any](ctx context.Context, list []E, fn func(ctx context.Context, item E) (T, error)) ([]T, error) { - rs := make([]T, 0, len(list)) - for _, e := range list { - r, err := fn(ctx, e) - if err != nil { - return nil, err - } - rs = append(rs, r) - } - return rs, nil -} diff --git a/pkg/common/db/controller/group.go b/pkg/common/db/controller/group.go index 96085d128..ba39bb511 100644 --- a/pkg/common/db/controller/group.go +++ b/pkg/common/db/controller/group.go @@ -21,17 +21,6 @@ import ( //type GroupInterface GroupDataBaseInterface -type BatchUpdateGroupMember struct { - GroupID string - UserID string - Map map[string]any -} - -type GroupSimpleUserID struct { - Hash uint64 - UserIDs []string -} - type GroupInterface interface { CreateGroup(ctx context.Context, groups []*relationTb.GroupModel, groupMembers []*relationTb.GroupMemberModel) error TakeGroup(ctx context.Context, groupID string) (group *relationTb.GroupModel, err error) @@ -48,11 +37,11 @@ type GroupInterface interface { SearchGroupMember(ctx context.Context, keyword string, groupIDs []string, userIDs []string, roleLevels []int32, pageNumber, showNumber int32) (uint32, []*relationTb.GroupMemberModel, error) HandlerGroupRequest(ctx context.Context, groupID string, userID string, handledMsg string, handleResult int32, member *relationTb.GroupMemberModel) error DeleteGroupMember(ctx context.Context, groupID string, userIDs []string) error - MapGroupMemberUserID(ctx context.Context, groupIDs []string) (map[string]*GroupSimpleUserID, error) + MapGroupMemberUserID(ctx context.Context, groupIDs []string) (map[string]*relationTb.GroupSimpleUserID, error) MapGroupMemberNum(ctx context.Context, groupIDs []string) (map[string]uint32, error) TransferGroupOwner(ctx context.Context, groupID string, oldOwnerUserID, newOwnerUserID string, roleLevel int32) error // 转让群 UpdateGroupMember(ctx context.Context, groupID string, userID string, data map[string]any) error - UpdateGroupMembers(ctx context.Context, data []*BatchUpdateGroupMember) error + UpdateGroupMembers(ctx context.Context, data []*relationTb.BatchUpdateGroupMember) error // GroupRequest CreateGroupRequest(ctx context.Context, requests []*relationTb.GroupRequestModel) error TakeGroupRequest(ctx context.Context, groupID string, userID string) (*relationTb.GroupRequestModel, error) @@ -132,7 +121,7 @@ func (g *GroupController) DeleteGroupMember(ctx context.Context, groupID string, return g.database.DeleteGroupMember(ctx, groupID, userIDs) } -func (g *GroupController) MapGroupMemberUserID(ctx context.Context, groupIDs []string) (map[string]*GroupSimpleUserID, error) { +func (g *GroupController) MapGroupMemberUserID(ctx context.Context, groupIDs []string) (map[string]*relationTb.GroupSimpleUserID, error) { return g.database.MapGroupMemberUserID(ctx, groupIDs) } @@ -144,7 +133,7 @@ func (g *GroupController) TransferGroupOwner(ctx context.Context, groupID string return g.database.TransferGroupOwner(ctx, groupID, oldOwnerUserID, newOwnerUserID, roleLevel) } -func (g *GroupController) UpdateGroupMembers(ctx context.Context, data []*BatchUpdateGroupMember) error { +func (g *GroupController) UpdateGroupMembers(ctx context.Context, data []*relationTb.BatchUpdateGroupMember) error { return g.database.UpdateGroupMembers(ctx, data) } @@ -204,11 +193,11 @@ type GroupDataBaseInterface interface { SearchGroupMember(ctx context.Context, keyword string, groupIDs []string, userIDs []string, roleLevels []int32, pageNumber, showNumber int32) (uint32, []*relationTb.GroupMemberModel, error) HandlerGroupRequest(ctx context.Context, groupID string, userID string, handledMsg string, handleResult int32, member *relationTb.GroupMemberModel) error DeleteGroupMember(ctx context.Context, groupID string, userIDs []string) error - MapGroupMemberUserID(ctx context.Context, groupIDs []string) (map[string]*GroupSimpleUserID, error) + MapGroupMemberUserID(ctx context.Context, groupIDs []string) (map[string]*relationTb.GroupSimpleUserID, error) MapGroupMemberNum(ctx context.Context, groupIDs []string) (map[string]uint32, error) TransferGroupOwner(ctx context.Context, groupID string, oldOwnerUserID, newOwnerUserID string, roleLevel int32) error // 转让群 UpdateGroupMember(ctx context.Context, groupID string, userID string, data map[string]any) error - UpdateGroupMembers(ctx context.Context, data []*BatchUpdateGroupMember) error + UpdateGroupMembers(ctx context.Context, data []*relationTb.BatchUpdateGroupMember) error // GroupRequest CreateGroupRequest(ctx context.Context, requests []*relationTb.GroupRequestModel) error TakeGroupRequest(ctx context.Context, groupID string, userID string) (*relationTb.GroupRequestModel, error) @@ -393,20 +382,20 @@ func (g *GroupDataBase) DeleteGroupMember(ctx context.Context, groupID string, u }) } -func (g *GroupDataBase) MapGroupMemberUserID(ctx context.Context, groupIDs []string) (map[string]*GroupSimpleUserID, error) { +func (g *GroupDataBase) MapGroupMemberUserID(ctx context.Context, groupIDs []string) (map[string]*relationTb.GroupSimpleUserID, error) { mapGroupUserIDs, err := g.groupMemberDB.FindJoinUserID(ctx, groupIDs) if err != nil { return nil, err } - res := make(map[string]*GroupSimpleUserID) + res := make(map[string]*relationTb.GroupSimpleUserID) for _, groupID := range groupIDs { - users := &GroupSimpleUserID{ - UserIDs: mapGroupUserIDs[groupID], - } - if len(users.UserIDs) > 0 { - utils.Sort(users.UserIDs, true) + userIDs := mapGroupUserIDs[groupID] + users := &relationTb.GroupSimpleUserID{} + if len(userIDs) > 0 { + utils.Sort(userIDs, true) bi := big.NewInt(0) - bi.SetString(utils.Md5(strings.Join(users.UserIDs, ";"))[0:8], 16) + bi.SetString(utils.Md5(strings.Join(userIDs, ";"))[0:8], 16) + users.Hash = bi.Uint64() } res[groupID] = users } @@ -452,7 +441,7 @@ func (g *GroupDataBase) UpdateGroupMember(ctx context.Context, groupID string, u }) } -func (g *GroupDataBase) UpdateGroupMembers(ctx context.Context, data []*BatchUpdateGroupMember) error { +func (g *GroupDataBase) UpdateGroupMembers(ctx context.Context, data []*relationTb.BatchUpdateGroupMember) error { return g.db.Transaction(func(tx *gorm.DB) error { for _, item := range data { if err := g.groupMemberDB.Update(ctx, item.GroupID, item.UserID, item.Map, tx); err != nil { @@ -487,25 +476,25 @@ func (g *GroupDataBase) FindJoinSuperGroup(ctx context.Context, userID string) ( } func (g *GroupDataBase) CreateSuperGroup(ctx context.Context, groupID string, initMemberIDList []string) error { - return g.mongoDB.Transaction(ctx, func(s unrelationTb.SuperGroupModelInterface, tx any) error { - return s.CreateSuperGroup(ctx, groupID, initMemberIDList, tx) + return unrelation.MongoTransaction(ctx, g.mongoDB.MgoClient, func(tx mongo.SessionContext) error { + return g.mongoDB.CreateSuperGroup(ctx, groupID, initMemberIDList, tx) }) } func (g *GroupDataBase) DeleteSuperGroup(ctx context.Context, groupID string) error { - return g.mongoDB.Transaction(ctx, func(s unrelationTb.SuperGroupModelInterface, tx any) error { - return s.DeleteSuperGroup(ctx, groupID, tx) + return unrelation.MongoTransaction(ctx, g.mongoDB.MgoClient, func(tx mongo.SessionContext) error { + return g.mongoDB.DeleteSuperGroup(ctx, groupID, tx) }) } func (g *GroupDataBase) DeleteSuperGroupMember(ctx context.Context, groupID string, userIDs []string) error { - return g.mongoDB.Transaction(ctx, func(s unrelationTb.SuperGroupModelInterface, tx any) error { - return s.RemoverUserFromSuperGroup(ctx, groupID, userIDs, tx) + return unrelation.MongoTransaction(ctx, g.mongoDB.MgoClient, func(tx mongo.SessionContext) error { + return g.mongoDB.RemoverUserFromSuperGroup(ctx, groupID, userIDs, tx) }) } func (g *GroupDataBase) CreateSuperGroupMember(ctx context.Context, groupID string, userIDs []string) error { - return g.mongoDB.Transaction(ctx, func(s unrelationTb.SuperGroupModelInterface, tx any) error { - return s.AddUserToSuperGroup(ctx, groupID, userIDs, tx) + return unrelation.MongoTransaction(ctx, g.mongoDB.MgoClient, func(tx mongo.SessionContext) error { + return g.mongoDB.AddUserToSuperGroup(ctx, groupID, userIDs, tx) }) } diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go new file mode 100644 index 000000000..776b86b0f --- /dev/null +++ b/pkg/common/db/controller/msg.go @@ -0,0 +1,21 @@ +package controller + +import ( + pbMsg "Open_IM/pkg/proto/msg" + "context" +) + +type MsgInterface interface { + BatchInsertChat2DB(ctx context.Context, userID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq uint64) error + BatchInsertChat2Cache(ctx context.Context, insertID string, msgList []*pbMsg.MsgDataToMQ) (error, uint64) + DelMsgBySeqList(ctx context.Context, userID string, seqList []uint32) (totalUnExistSeqList []uint32, err error) + DelMsgLogic(ctx context.Context, uid string, seqList []uint32) error + DelMsgBySeqListInOneDoc(ctx context.Context, docID string, seqList []uint32) (unExistSeqList []uint32, err error) + ReplaceMsgToBlankByIndex(suffixID string, index int) (replaceMaxSeq uint32, err error) +} + +type MsgDatabaseInterface interface { + BatchInsertChat2DB(ctx context.Context, userID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq uint64) error + BatchInsertChat2Cache(ctx context.Context, insertID string, msgList []*pbMsg.MsgDataToMQ) (error, uint64) + DelMsgBySeqList(ctx context.Context, userID string, seqList []uint32) (totalUnExistSeqList []uint32, err error) +} diff --git a/pkg/common/db/table/relation/utils.go b/pkg/common/db/table/relation/utils.go new file mode 100644 index 000000000..a046de1e8 --- /dev/null +++ b/pkg/common/db/table/relation/utils.go @@ -0,0 +1,12 @@ +package relation + +type BatchUpdateGroupMember struct { + GroupID string + UserID string + Map map[string]any +} + +type GroupSimpleUserID struct { + Hash uint64 + MemberNum uint32 +} diff --git a/pkg/common/db/table/unrelation/msg.go b/pkg/common/db/table/unrelation/msg.go index 980dfa653..0fee6fc26 100644 --- a/pkg/common/db/table/unrelation/msg.go +++ b/pkg/common/db/table/unrelation/msg.go @@ -7,9 +7,9 @@ const ( CChat = "msg" ) -type UserChatModel struct { - UID string `bson:"uid"` - Msg []MsgInfoModel `bson:"msg"` +type UserMsgDocModel struct { + DocID string `bson:"uid"` + Msg []MsgInfoModel `bson:"msg"` } type MsgInfoModel struct { @@ -17,20 +17,20 @@ type MsgInfoModel struct { Msg []byte `bson:"msg"` } -func (UserChatModel) TableName() string { +func (UserMsgDocModel) TableName() string { return CChat } -func (UserChatModel) GetSingleGocMsgNum() int { +func (UserMsgDocModel) GetSingleDocMsgNum() int { return singleGocMsgNum } -func (u UserChatModel) getSeqUid(uid string, seq uint32) string { +func (u UserMsgDocModel) getSeqUid(uid string, seq uint32) string { seqSuffix := seq / singleGocMsgNum return u.indexGen(uid, seqSuffix) } -func (u UserChatModel) getSeqUserIDList(userID string, maxSeq uint32) []string { +func (u UserMsgDocModel) getSeqUserIDList(userID string, maxSeq uint32) []string { seqMaxSuffix := maxSeq / singleGocMsgNum var seqUserIDList []string for i := 0; i <= int(seqMaxSuffix); i++ { @@ -40,16 +40,16 @@ func (u UserChatModel) getSeqUserIDList(userID string, maxSeq uint32) []string { return seqUserIDList } -func (UserChatModel) getSeqSuperGroupID(groupID string, seq uint32) string { +func (UserMsgDocModel) getSeqSuperGroupID(groupID string, seq uint32) string { seqSuffix := seq / singleGocMsgNum return superGroupIndexGen(groupID, seqSuffix) } -func (u UserChatModel) GetSeqUid(uid string, seq uint32) string { +func (u UserMsgDocModel) GetSeqUid(uid string, seq uint32) string { return u.getSeqUid(uid, seq) } -func (UserChatModel) getMsgIndex(seq uint32) int { +func (UserMsgDocModel) getMsgIndex(seq uint32) int { seqSuffix := seq / singleGocMsgNum var index uint32 if seqSuffix == 0 { @@ -60,6 +60,6 @@ func (UserChatModel) getMsgIndex(seq uint32) int { return int(index) } -func (UserChatModel) indexGen(uid string, seqSuffix uint32) string { +func (UserMsgDocModel) indexGen(uid string, seqSuffix uint32) string { return uid + ":" + strconv.FormatInt(int64(seqSuffix), 10) } diff --git a/pkg/common/db/unrelation/batch_insert_chat.go b/pkg/common/db/unrelation/batch_insert_chat.go index fdfe1f3b0..ce78e4725 100644 --- a/pkg/common/db/unrelation/batch_insert_chat.go +++ b/pkg/common/db/unrelation/batch_insert_chat.go @@ -169,117 +169,3 @@ func (d *db.DataBases) BatchInsertChat2Cache(insertID string, msgList []*pbMsg.M } return utils.Wrap(err, ""), lastMaxSeq } - -//func (d *DataBases) BatchInsertChatBoth(userID string, msgList []*pbMsg.MsgDataToMQ, operationID string) (error, uint64) { -// err, lastMaxSeq := d.BatchInsertChat2Cache(userID, msgList, operationID) -// if err != nil { -// log.Error(operationID, "BatchInsertChat2Cache failed ", err.Error(), userID, len(msgList)) -// return err, 0 -// } -// for { -// if runtime.NumGoroutine() > 50000 { -// log.NewWarn(operationID, "too many NumGoroutine ", runtime.NumGoroutine()) -// time.Sleep(10 * time.Millisecond) -// } else { -// break -// } -// } -// return nil, lastMaxSeq -//} -// -//func (d *DataBases) BatchInsertChat(userID string, msgList []*pbMsg.MsgDataToMQ, operationID string) error { -// newTime := getCurrentTimestampByMill() -// if len(msgList) > GetSingleGocMsgNum() { -// return errors.New("too large") -// } -// isInit := false -// currentMaxSeq, err := d.GetUserMaxSeq(userID) -// if err == nil { -// -// } else if err == go_redis.Nil { -// isInit = true -// currentMaxSeq = 0 -// } else { -// return utils.Wrap(err, "") -// } -// var remain uint64 -// //if currentMaxSeq < uint64(GetSingleGocMsgNum()) { -// // remain = uint64(GetSingleGocMsgNum()-1) - (currentMaxSeq % uint64(GetSingleGocMsgNum())) -// //} else { -// // remain = uint64(GetSingleGocMsgNum()) - ((currentMaxSeq - (uint64(GetSingleGocMsgNum()) - 1)) % uint64(GetSingleGocMsgNum())) -// //} -// -// blk0 := uint64(GetSingleGocMsgNum() - 1) -// if currentMaxSeq < uint64(GetSingleGocMsgNum()) { -// remain = blk0 - currentMaxSeq -// } else { -// excludeBlk0 := currentMaxSeq - blk0 -// remain = (uint64(GetSingleGocMsgNum()) - (excludeBlk0 % uint64(GetSingleGocMsgNum()))) % uint64(GetSingleGocMsgNum()) -// } -// -// insertCounter := uint64(0) -// msgListToMongo := make([]MsgInfo, 0) -// msgListToMongoNext := make([]MsgInfo, 0) -// seqUid := "" -// seqUidNext := "" -// log.Debug(operationID, "remain ", remain, "insertCounter ", insertCounter, "currentMaxSeq ", currentMaxSeq, userID, len(msgList)) -// //4998 remain ==1 -// //4999 -// for _, m := range msgList { -// log.Debug(operationID, "msg node ", m.String(), m.MsgData.ClientMsgID) -// currentMaxSeq++ -// sMsg := MsgInfo{} -// sMsg.SendTime = m.MsgData.SendTime -// m.MsgData.Seq = uint32(currentMaxSeq) -// if sMsg.Msg, err = proto.Marshal(m.MsgData); err != nil { -// return utils.Wrap(err, "") -// } -// if isInit { -// msgListToMongoNext = append(msgListToMongoNext, sMsg) -// seqUidNext = getSeqUid(userID, uint32(currentMaxSeq)) -// log.Debug(operationID, "msgListToMongoNext ", seqUidNext, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain) -// continue -// } -// if insertCounter < remain { -// msgListToMongo = append(msgListToMongo, sMsg) -// insertCounter++ -// seqUid = getSeqUid(userID, uint32(currentMaxSeq)) -// log.Debug(operationID, "msgListToMongo ", seqUid, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain) -// } else { -// msgListToMongoNext = append(msgListToMongoNext, sMsg) -// seqUidNext = getSeqUid(userID, uint32(currentMaxSeq)) -// log.Debug(operationID, "msgListToMongoNext ", seqUidNext, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain) -// } -// } -// // ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) -// -// ctx := context.Background() -// c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) -// -// if seqUid != "" { -// filter := bson.M{"uid": seqUid} -// log.NewDebug(operationID, "filter ", seqUid, "list ", msgListToMongo) -// err := c.FindOneAndUpdate(ctx, filter, bson.M{"$push": bson.M{"msg": bson.M{"$each": msgListToMongo}}}).Err() -// if err != nil { -// log.Error(operationID, "FindOneAndUpdate failed ", err.Error(), filter) -// return utils.Wrap(err, "") -// } -// } -// if seqUidNext != "" { -// filter := bson.M{"uid": seqUidNext} -// sChat := UserChat{} -// sChat.UID = seqUidNext -// sChat.Msg = msgListToMongoNext -// log.NewDebug(operationID, "filter ", seqUidNext, "list ", msgListToMongoNext) -// if _, err = c.InsertOne(ctx, &sChat); err != nil { -// log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat) -// return utils.Wrap(err, "") -// } -// } -// log.NewWarn(operationID, "batch mgo cost time ", getCurrentTimestampByMill()-newTime, userID, len(msgList)) -// return utils.Wrap(d.SetUserMaxSeq(userID, uint64(currentMaxSeq)), "") -//} - -//func (d *DataBases)setMessageToCache(msgList []*pbMsg.MsgDataToMQ, uid string) (err error) { -// -//} diff --git a/pkg/common/db/unrelation/mongo.go b/pkg/common/db/unrelation/mongo.go index a359e11d0..a20fb1a65 100644 --- a/pkg/common/db/unrelation/mongo.go +++ b/pkg/common/db/unrelation/mongo.go @@ -74,8 +74,8 @@ func (m *Mongo) CreateTagIndex() { } func (m *Mongo) CreateMsgIndex() { - if err := m.createMongoIndex(cChat, false, "uid"); err != nil { - fmt.Println(err.Error() + " index create failed " + cChat + " uid, please create index by yourself in field uid") + if err := m.createMongoIndex(unrelation.CChat, false, "uid"); err != nil { + fmt.Println(err.Error() + " index create failed " + unrelation.CChat + " uid, please create index by yourself in field uid") } } @@ -150,3 +150,12 @@ func MongoTransaction(ctx context.Context, mgo *mongo.Client, fn func(ctx mongo. } return utils.Wrap(sess.CommitTransaction(sCtx), "") } + +func getTxCtx(ctx context.Context, tx []any) context.Context { + if len(tx) > 0 { + if ctx, ok := tx[0].(mongo.SessionContext); ok { + return ctx + } + } + return ctx +} diff --git a/pkg/common/db/unrelation/msg.go b/pkg/common/db/unrelation/msg.go index 9436b06b3..677d00798 100644 --- a/pkg/common/db/unrelation/msg.go +++ b/pkg/common/db/unrelation/msg.go @@ -27,6 +27,16 @@ func NewMsgMongoDriver(mgoDB *mongo.Database) *MsgMongoDriver { return &MsgMongoDriver{mgoDB: mgoDB, MsgCollection: mgoDB.Collection(unrelation.CChat)} } +func (m *MsgMongoDriver) FindOneAndUpdate(ctx context.Context, filter, update, output interface{}, opts ...*options.FindOneAndUpdateOptions) error { + return m.MsgCollection.FindOneAndUpdate(ctx, filter, update, opts...).Decode(output) +} + +func (m *MsgMongoDriver) UpdateOne(ctx context.Context, filter, update interface{}, opts ...*options.UpdateOptions) error { + _, err := m.MsgCollection.UpdateOne(ctx, filter, update, opts...) + return err +} + +// database controller func (m *MsgMongoDriver) DelMsgBySeqList(ctx context.Context, userID string, seqList []uint32) (totalUnExistSeqList []uint32, err error) { sortkeys.Uint32s(seqList) suffixUserID2SubSeqList := func(uid string, seqList []uint32) map[string][]uint32 { @@ -73,6 +83,7 @@ func (m *MsgMongoDriver) DelMsgBySeqListInOneDoc(ctx context.Context, suffixUser return unexistSeqList, nil } +// database func (m *MsgMongoDriver) DelMsgLogic(ctx context.Context, uid string, seqList []uint32) error { sortkeys.Uint32s(seqList) seqMsgs, err := d.GetMsgBySeqListMongo2(ctx, uid, seqList) @@ -88,6 +99,7 @@ func (m *MsgMongoDriver) DelMsgLogic(ctx context.Context, uid string, seqList [] return nil } +// model func (m *MsgMongoDriver) ReplaceMsgByIndex(ctx context.Context, suffixUserID string, msg *sdkws.MsgData, seqIndex int) error { log.NewInfo(operationID, utils.GetSelfFuncName(), suffixUserID, *msg) ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) diff --git a/pkg/common/db/unrelation/super_group.go b/pkg/common/db/unrelation/super_group.go index a5763ea56..720693d6d 100644 --- a/pkg/common/db/unrelation/super_group.go +++ b/pkg/common/db/unrelation/super_group.go @@ -111,15 +111,6 @@ type SuperGroupMongoDriver struct { // panic("implement me") // } -func (s *SuperGroupMongoDriver) getTxCtx(ctx context.Context, tx []any) context.Context { - if len(tx) > 0 { - if ctx, ok := tx[0].(mongo.SessionContext); ok { - return ctx - } - } - return ctx -} - //func (s *SuperGroupMongoDriver) Transaction(ctx context.Context, fn func(ctx mongo.SessionContext) error) error { // sess, err := s.MgoClient.StartSession() // if err != nil { diff --git a/pkg/utils/utils_v2.go b/pkg/utils/utils_v2.go index c01103bfa..91c8fecd9 100644 --- a/pkg/utils/utils_v2.go +++ b/pkg/utils/utils_v2.go @@ -144,11 +144,12 @@ func SliceSetAny[E any, K comparable](es []E, fn func(e E) K) map[K]struct{} { }) } -func Filter[E any](es []E, fn func(e E) bool) []E { - rs := make([]E, 0, len(es)) +func Filter[E, T any](es []E, fn func(e E) (T, bool)) []T { + rs := make([]T, 0, len(es)) for i := 0; i < len(es); i++ { - if e := es[i]; fn(e) { - rs = append(rs, e) + e := es[i] + if t, ok := fn(e); ok { + rs = append(rs, t) } } return rs