From 3d0827a6da22a2bc3b659a192f5ca68ad31ee8ee Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Thu, 4 Aug 2022 17:20:33 +0800 Subject: [PATCH] callback fix --- internal/rpc/group/group.go | 143 ++++++++++++++--------- internal/rpc/user/user.go | 15 ++- pkg/base_info/group_api_struct.go | 2 + pkg/common/constant/constant.go | 7 ++ pkg/common/db/RedisModel.go | 6 + pkg/common/db/rocks_cache/rocks_cache.go | 43 ++++++- 6 files changed, 158 insertions(+), 58 deletions(-) diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index d6d9e2a9f..640b7eb07 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -467,9 +467,7 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbGroup.Invite log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), userID) } } - if err := rocksCache.DelAllGroupMembersInfoFromCache(req.GroupID); err != nil { - log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID) - } + chat.MemberInvitedNotification(req.OperationID, req.GroupID, req.OpUserID, req.Reason, okUserIDList) } else { for _, v := range req.InvitedUserIDList { @@ -491,7 +489,6 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbGroup.Invite func (s *groupServer) GetGroupAllMember(ctx context.Context, req *pbGroup.GetGroupAllMemberReq) (*pbGroup.GetGroupAllMemberResp, error) { log.NewInfo(req.OperationID, "GetGroupAllMember, args ", req.String()) var resp pbGroup.GetGroupAllMemberResp - //groupInfo, err := imdb.GetGroupInfoByGroupID(req.GroupID) groupInfo, err := rocksCache.GetGroupInfoFromCache(req.GroupID) if err != nil { log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID) @@ -500,7 +497,7 @@ func (s *groupServer) GetGroupAllMember(ctx context.Context, req *pbGroup.GetGro return &resp, nil } if groupInfo.GroupType != constant.SuperGroup { - memberList, err := rocksCache.GetAllGroupMembersInfoFromCache(req.GroupID) + memberList, err := rocksCache.GetGroupMembersInfoFromCache(req.Count, req.Offset, req.GroupID) if err != nil { log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID) resp.ErrCode = constant.ErrDB.ErrCode @@ -693,10 +690,11 @@ func (s *groupServer) KickGroupMember(ctx context.Context, req *pbGroup.KickGrou if err != nil { log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), userID) } + if err := rocksCache.DelGroupMemberInfoFromCache(req.GroupID, userID); err != nil { + log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID) + } } - if err := rocksCache.DelAllGroupMembersInfoFromCache(req.GroupID); err != nil { - log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID) - } + chat.MemberKickedNotification(req, okUserIDList) } else { for _, userID := range okUserIDList { @@ -721,22 +719,35 @@ func (s *groupServer) GetGroupMembersInfo(ctx context.Context, req *pbGroup.GetG log.NewInfo(req.OperationID, "GetGroupMembersInfo args ", req.String()) var resp pbGroup.GetGroupMembersInfoResp resp.MemberList = []*open_im_sdk.GroupMemberFullInfo{} - groupMembers, err := rocksCache.GetAllGroupMembersInfoFromCache(req.GroupID) - if err != nil { - log.NewError(req.OperationID, utils.GetSelfFuncName(), req.GroupID, err.Error()) - resp.ErrCode = constant.ErrDB.ErrCode - resp.ErrMsg = constant.ErrDB.ErrMsg - return &resp, nil - } - for _, member := range groupMembers { - if utils.IsContain(member.UserID, req.MemberList) { - var memberNode open_im_sdk.GroupMemberFullInfo - utils.CopyStructFields(&memberNode, member) - memberNode.JoinTime = int32(member.JoinTime.Unix()) - resp.MemberList = append(resp.MemberList, &memberNode) + + for _, userID := range req.MemberList { + groupMember, err := rocksCache.GetGroupMemberInfoFromCache(req.GroupID, userID, "") + if err != nil { + log.NewError(req.OperationID, utils.GetSelfFuncName(), req.GroupID, userID, err.Error()) + continue } + var memberNode open_im_sdk.GroupMemberFullInfo + utils.CopyStructFields(&memberNode, groupMember) + memberNode.JoinTime = int32(groupMember.JoinTime.Unix()) + resp.MemberList = append(resp.MemberList, &memberNode) } + //groupMembers, err := rocksCache.GetAllGroupMembersInfoFromCache(req.GroupID) + //if err != nil { + // log.NewError(req.OperationID, utils.GetSelfFuncName(), req.GroupID, err.Error()) + // resp.ErrCode = constant.ErrDB.ErrCode + // resp.ErrMsg = constant.ErrDB.ErrMsg + // return &resp, nil + //} + //for _, member := range groupMembers { + // if utils.IsContain(member.UserID, req.MemberList) { + // var memberNode open_im_sdk.GroupMemberFullInfo + // utils.CopyStructFields(&memberNode, member) + // memberNode.JoinTime = int32(member.JoinTime.Unix()) + // resp.MemberList = append(resp.MemberList, &memberNode) + // } + //} + resp.ErrCode = 0 log.NewInfo(req.OperationID, "GetGroupMembersInfo rpc return ", resp.String()) return &resp, nil @@ -899,17 +910,17 @@ func (s *groupServer) GroupApplicationResponse(_ context.Context, req *pbGroup.G return &pbGroup.GroupApplicationResponseResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil } - group, err := rocksCache.GetGroupInfoFromCache(req.GroupID) - if err != nil { - log.NewError(req.OperationID, utils.GetSelfFuncName(), req.GroupID, err.Error()) - } - if group != nil { - if group.GroupType != constant.SuperGroup { - if err := rocksCache.DelAllGroupMembersInfoFromCache(req.GroupID); err != nil { - log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID) - } - } - } + //group, err := rocksCache.GetGroupInfoFromCache(req.GroupID) + //if err != nil { + // log.NewError(req.OperationID, utils.GetSelfFuncName(), req.GroupID, err.Error()) + //} + //if group != nil { + // if group.GroupType != constant.SuperGroup { + // if err := rocksCache.DelAllGroupMembersInfoFromCache(req.GroupID); err != nil { + // log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID) + // } + // } + //} chat.GroupApplicationAcceptedNotification(req) chat.MemberEnterNotification(req) @@ -987,10 +998,10 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbGroup.JoinGroupReq) log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error()) } - err = rocksCache.DelAllGroupMembersInfoFromCache(req.GroupID) - if err != nil { - log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error()) - } + //err = rocksCache.DelAllGroupMembersInfoFromCache(req.GroupID) + //if err != nil { + // log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error()) + //} chat.MemberEnterDirectlyNotification(req.GroupID, req.OpUserID, req.OperationID) log.NewInfo(req.OperationID, "JoinGroup rpc return ") @@ -1100,7 +1111,10 @@ func (s *groupServer) QuitGroup(ctx context.Context, req *pbGroup.QuitGroupReq) } if groupInfo.GroupType != constant.SuperGroup { - if err := rocksCache.DelAllGroupMembersInfoFromCache(req.GroupID); err != nil { + //if err := rocksCache.DelAllGroupMembersInfoFromCache(req.GroupID); err != nil { + // log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID) + //} + if err := rocksCache.DelGroupMemberInfoFromCache(req.GroupID, req.OpUserID); err != nil { log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID) } if err := rocksCache.DelJoinedGroupIDListFromCache(req.OpUserID); err != nil { @@ -1304,8 +1318,16 @@ func (s *groupServer) TransferGroupOwner(_ context.Context, req *pbGroup.Transfe log.NewError(req.OperationID, "UpdateGroupMemberInfo failed ", groupMemberInfo) return &pbGroup.TransferGroupOwnerResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil } - if err := rocksCache.DelAllGroupMembersInfoFromCache(req.GroupID); err != nil { - log.NewError(req.OperationID, utils.GetSelfFuncName(), req.GroupID, err.Error()) + //if err := rocksCache.DelAllGroupMembersInfoFromCache(req.GroupID); err != nil { + // log.NewError(req.OperationID, utils.GetSelfFuncName(), req.GroupID, err.Error()) + //} + err = rocksCache.DelGroupMemberInfoFromCache(req.GroupID, req.NewOwnerUserID) + if err != nil { + log.NewError(req.OperationID, "DelGroupMemberInfoFromCache failed ", req.GroupID, req.NewOwnerUserID) + } + err = rocksCache.DelGroupMemberInfoFromCache(req.GroupID, req.OldOwnerUserID) + if err != nil { + log.NewError(req.OperationID, "DelGroupMemberInfoFromCache failed ", req.GroupID, req.OldOwnerUserID) } chat.GroupOwnerTransferredNotification(req) return &pbGroup.TransferGroupOwnerResp{CommonResp: &pbGroup.CommonResp{ErrCode: 0, ErrMsg: ""}}, nil @@ -1581,8 +1603,13 @@ func (s *groupServer) RemoveGroupMembersCMS(_ context.Context, req *pbGroup.Remo log.NewError(req.OperationID, "DelGroupMemberIDListFromCache rpc logic call failed ", cacheResp.String()) return resp, http.WrapError(constant.ErrDB) } - if err := rocksCache.DelAllGroupMembersInfoFromCache(req.GroupId); err != nil { - log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupId) + //if err := rocksCache.DelAllGroupMembersInfoFromCache(req.GroupId); err != nil { + // log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupId) + //} + for _, userID := range resp.Success { + if err := rocksCache.DelGroupMemberInfoFromCache(req.GroupId, userID); err != nil { + log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupId, userID) + } } chat.MemberKickedNotification(reqKick, resp.Success) @@ -1643,9 +1670,9 @@ func (s *groupServer) AddGroupMembersCMS(_ context.Context, req *pbGroup.AddGrou log.NewError(req.OperationId, "DelGroupMemberIDListFromCache rpc logic call failed ", cacheResp.String()) return resp, http.WrapError(constant.ErrDB) } - if err := rocksCache.DelAllGroupMembersInfoFromCache(req.GroupId); err != nil { - log.NewError(req.OperationId, utils.GetSelfFuncName(), err.Error(), req.GroupId) - } + //if err := rocksCache.DelAllGroupMembersInfoFromCache(req.GroupId); err != nil { + // log.NewError(req.OperationId, utils.GetSelfFuncName(), err.Error(), req.GroupId) + //} chat.MemberInvitedNotification(req.OperationId, req.GroupId, req.OpUserId, "admin add you to group", resp.Success) return resp, nil @@ -1767,9 +1794,9 @@ func (s *groupServer) DismissGroup(ctx context.Context, req *pbGroup.DismissGrou log.NewError(req.OperationID, "DelGroupMemberIDListFromCache rpc logic call failed ", cacheResp.String()) return &pbGroup.DismissGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: cacheResp.CommonResp.ErrCode, ErrMsg: cacheResp.CommonResp.ErrMsg}}, nil } - if err := rocksCache.DelAllGroupMembersInfoFromCache(req.GroupID); err != nil { - log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID) - } + //if err := rocksCache.DelAllGroupMembersInfoFromCache(req.GroupID); err != nil { + // log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID) + //} log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "rpc return ", pbGroup.CommonResp{ErrCode: 0, ErrMsg: ""}) return &pbGroup.DismissGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: 0, ErrMsg: ""}}, nil } @@ -1815,7 +1842,10 @@ func (s *groupServer) MuteGroupMember(ctx context.Context, req *pbGroup.MuteGrou log.Error(req.OperationID, "UpdateGroupMemberInfo failed ", err.Error(), groupMemberInfo) return &pbGroup.MuteGroupMemberResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil } - if err := rocksCache.DelAllGroupMembersInfoFromCache(req.GroupID); err != nil { + //if err := rocksCache.DelAllGroupMembersInfoFromCache(req.GroupID); err != nil { + // log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID) + //} + if err := rocksCache.DelGroupMemberInfoFromCache(req.GroupID, req.UserID); err != nil { log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID) } chat.GroupMemberMutedNotification(req.OperationID, req.OpUserID, req.GroupID, req.UserID, req.MutedSeconds) @@ -1859,7 +1889,10 @@ func (s *groupServer) CancelMuteGroupMember(ctx context.Context, req *pbGroup.Ca log.Error(req.OperationID, "UpdateGroupMemberInfo failed ", err.Error(), groupMemberInfo) return &pbGroup.CancelMuteGroupMemberResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil } - if err := rocksCache.DelAllGroupMembersInfoFromCache(req.GroupID); err != nil { + //if err := rocksCache.DelAllGroupMembersInfoFromCache(req.GroupID); err != nil { + // log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID) + //} + if err := rocksCache.DelGroupMemberInfoFromCache(req.GroupID, req.UserID); err != nil { log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID) } chat.GroupMemberCancelMutedNotification(req.OperationID, req.OpUserID, req.GroupID, req.UserID) @@ -1979,7 +2012,10 @@ func (s *groupServer) SetGroupMemberNickname(ctx context.Context, req *pbGroup.S log.Error(req.OperationID, errMsg) return &pbGroup.SetGroupMemberNicknameResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil } - if err := rocksCache.DelAllGroupMembersInfoFromCache(req.GroupID); err != nil { + //if err := rocksCache.DelAllGroupMembersInfoFromCache(req.GroupID); err != nil { + // log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID) + //} + if err := rocksCache.DelGroupMemberInfoFromCache(req.GroupID, req.UserID); err != nil { log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID) } chat.GroupMemberInfoSetNotification(req.OperationID, req.OpUserID, req.GroupID, req.UserID) @@ -2014,8 +2050,11 @@ func (s *groupServer) SetGroupMemberInfo(ctx context.Context, req *pbGroup.SetGr resp.CommonResp.ErrMsg = constant.ErrDB.ErrMsg + ":" + err.Error() return resp, nil } - if err := rocksCache.DelAllGroupMembersInfoFromCache(req.GroupID); err != nil { - log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID) + //if err := rocksCache.DelAllGroupMembersInfoFromCache(req.GroupID); err != nil { + // log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID) + //} + if err := rocksCache.DelGroupMemberInfoFromCache(req.GroupID, req.UserID); err != nil { + log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID, req.UserID) } if req.RoleLevel != nil { switch req.RoleLevel.Value { diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index 47f6036bf..57a456d4e 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -482,8 +482,12 @@ func (s *userServer) SyncJoinedGroupMemberFaceURL(userID string, faceURL string, log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupMemberInfo) continue } - if err := rocksCache.DelAllGroupMembersInfoFromCache(groupID); err != nil { - log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupID) + //if err := rocksCache.DelAllGroupMembersInfoFromCache(groupID); err != nil { + // log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupID) + // continue + //} + if err := rocksCache.DelGroupMemberInfoFromCache(groupID, userID); err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupID, userID) continue } chat.GroupMemberInfoSetNotification(operationID, opUserID, groupID, userID) @@ -508,9 +512,12 @@ func (s *userServer) SyncJoinedGroupMemberNickname(userID string, newNickname, o log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupMemberInfo) continue } - if err := rocksCache.DelAllGroupMembersInfoFromCache(v); err != nil { + //if err := rocksCache.DelAllGroupMembersInfoFromCache(v); err != nil { + // log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), v) + // continue + //} + if err := rocksCache.DelGroupMemberInfoFromCache(v, userID); err != nil { log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), v) - continue } chat.GroupMemberInfoSetNotification(operationID, opUserID, v, userID) } diff --git a/pkg/base_info/group_api_struct.go b/pkg/base_info/group_api_struct.go index 6ba6b2d52..7424a843d 100644 --- a/pkg/base_info/group_api_struct.go +++ b/pkg/base_info/group_api_struct.go @@ -73,6 +73,8 @@ type GetGroupMemberListResp struct { type GetGroupAllMemberReq struct { GroupID string `json:"groupID" binding:"required"` OperationID string `json:"operationID" binding:"required"` + Offset int `json:"offset"` + Count int `json:"count"` } type GetGroupAllMemberResp struct { CommResp diff --git a/pkg/common/constant/constant.go b/pkg/common/constant/constant.go index 25e2fe3dc..a542560da 100644 --- a/pkg/common/constant/constant.go +++ b/pkg/common/constant/constant.go @@ -225,6 +225,13 @@ const ( WorkMomentLikeNotification = 1 WorkMomentAtUserNotification = 2 ) + +const ( + // diffusionType + WriteDiffusion = 0 + ReadDiffusion = 1 +) + const ( AtAllString = "AtAllTag" AtNormal = 0 diff --git a/pkg/common/db/RedisModel.go b/pkg/common/db/RedisModel.go index 2ef11d59b..b4dfd6139 100644 --- a/pkg/common/db/RedisModel.go +++ b/pkg/common/db/RedisModel.go @@ -34,6 +34,7 @@ const ( FcmToken = "FCM_TOKEN:" groupUserMinSeq = "GROUP_USER_MIN_SEQ:" groupMaxSeq = "GROUP_MAX_SEQ:" + groupMinSeq = "GROUP_MIN_SEQ:" sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:" ) @@ -115,6 +116,11 @@ func (d *DataBases) SetGroupMaxSeq(groupID string, maxSeq uint64) error { return d.RDB.Set(context.Background(), key, maxSeq, 0).Err() } +func (d *DataBases) SetGroupMinSeq(groupID string, minSeq uint32) error { + key := groupMinSeq + groupID + return d.RDB.Set(context.Background(), key, minSeq, 0).Err() +} + //Store userid and platform class to redis func (d *DataBases) AddTokenFlag(userID string, platformID int, token string, flag int) error { key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID) diff --git a/pkg/common/db/rocks_cache/rocks_cache.go b/pkg/common/db/rocks_cache/rocks_cache.go index 033731e8d..2300d051d 100644 --- a/pkg/common/db/rocks_cache/rocks_cache.go +++ b/pkg/common/db/rocks_cache/rocks_cache.go @@ -186,7 +186,7 @@ func DelUserInfoFromCache(userID string) error { return db.DB.Rc.TagAsDeleted(userInfoCache + userID) } -func GetGroupMemberInfoFromCache(groupID, userID string) (*db.GroupMember, error) { +func GetGroupMemberInfoFromCache(groupID, userID, fullKey string) (*db.GroupMember, error) { getGroupMemberInfo := func() (string, error) { groupMemberInfo, err := imdb.GetGroupMemberInfoByGroupIDAndUserID(groupID, userID) if err != nil { @@ -195,7 +195,10 @@ func GetGroupMemberInfoFromCache(groupID, userID string) (*db.GroupMember, error bytes, err := json.Marshal(groupMemberInfo) return string(bytes), utils.Wrap(err, "") } - groupMemberInfoStr, err := db.DB.Rc.Fetch(groupMemberInfoCache+groupID+"-"+userID, time.Second*30*60, getGroupMemberInfo) + if fullKey == "" { + fullKey = groupMemberInfoCache + groupID + "-" + userID + } + groupMemberInfoStr, err := db.DB.Rc.Fetch(fullKey, time.Second*30*60, getGroupMemberInfo) if err != nil { return nil, utils.Wrap(err, "") } @@ -208,6 +211,42 @@ func DelGroupMemberInfoFromCache(groupID, userID string) error { return db.DB.Rc.TagAsDeleted(groupMemberInfoCache + groupID + "-" + userID) } +func GetGroupMembersInfoFromCache(count, offset int32, groupID string) ([]*db.GroupMember, error) { + var cursor uint64 + var err error + var keys []string + key := groupMemberInfoCache + groupID + "-" + if count != 0 { + keys, cursor, err = db.DB.RDB.Scan(context.Background(), uint64(offset), key, int64(count)).Result() + if err != nil { + return nil, err + } + } else { + for { + var currentKeys []string + currentKeys, cursor, err = db.DB.RDB.Scan(context.Background(), cursor, key, 3000).Result() + if err != nil { + return nil, err + } + keys = append(keys, currentKeys...) + if cursor == 0 { + break + } + } + } + + var groupMemberList []*db.GroupMember + for _, key := range keys { + v, err := GetGroupMemberInfoFromCache("", "", key) + if err != nil { + log.NewError("", utils.GetSelfFuncName(), key, err.Error()) + continue + } + groupMemberList = append(groupMemberList, v) + } + return groupMemberList, nil +} + func GetAllGroupMembersInfoFromCache(groupID string) ([]*db.GroupMember, error) { getGroupMemberInfo := func() (string, error) { groupMembers, err := imdb.GetGroupMemberListByGroupID(groupID)