From aa2b43293dedc7367e3c820e26142a54fb734f3d Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Mon, 9 Jan 2023 14:51:31 +0800 Subject: [PATCH 1/3] group --- internal/rpc/group/group.go | 15 ++++++--------- .../im_mysql_model/group_member_model_k.go | 12 ++++++------ 2 files changed, 12 insertions(+), 15 deletions(-) diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 9c8060416..e42047a1d 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -259,7 +259,7 @@ func (s *groupServer) GetJoinedGroupList(ctx context.Context, req *pbGroup.GetJo var groupNode open_im_sdk.GroupInfo num, err := rocksCache.GetGroupMemberNumFromCache(groupID) if err != nil { - log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), groupID) + trace_log.SetContextInfo(ctx, "GetGroupMemberNumFromCache", err, "groupID", groupID) continue } owner, err := (*imdb.GroupMember)(nil).TakeOwnerInfo(ctx, groupID) @@ -277,7 +277,7 @@ func (s *groupServer) GetJoinedGroupList(ctx context.Context, req *pbGroup.GetJo continue } if group.Status == constant.GroupStatusDismissed { - trace_log.SetContextInfo(ctx, "GetGroupInfoFromCache", err, "groupID", groupID) + trace_log.SetContextInfo(ctx, "group.Status == constant.GroupStatusDismissed", nil, "groupID", groupID) continue } utils.CopyStructFields(&groupNode, group) @@ -698,9 +698,7 @@ func (s *groupServer) GetGroupMembersInfo(ctx context.Context, req *pbGroup.GetG for _, userID := range req.MemberList { groupMember, err := rocksCache.GetGroupMemberInfoFromCache(req.GroupID, userID) if err != nil { - log.NewError(req.OperationID, utils.GetSelfFuncName(), req.GroupID, userID, err.Error()) - var errResult error - + trace_log.SetContextInfo(ctx, "GetGroupMemberInfoFromCache", err, "groupID", req.GroupID, "userID", userID) continue } var memberNode open_im_sdk.GroupMemberFullInfo @@ -708,9 +706,8 @@ func (s *groupServer) GetGroupMembersInfo(ctx context.Context, req *pbGroup.GetG memberNode.JoinTime = int32(groupMember.JoinTime.Unix()) resp.MemberList = append(resp.MemberList, &memberNode) } - if SetErr(ctx, "GetGroupMemberInfoFromCache", err, &resp.CommonResp.ErrCode, &resp.CommonResp.ErrMsg, "groupID", req.GroupID) - return resp, nil + return } func FillGroupInfoByGroupID(operationID, groupID string, groupInfo *open_im_sdk.GroupInfo) error { @@ -1788,7 +1785,7 @@ func (s *groupServer) SetGroupMemberInfo(ctx context.Context, req *pbGroup.SetGr } else { m["ex"] = nil } - if err := imdb.UpdateGroupMemberInfoByMap(groupMember, m);err != nil { + if err := imdb.UpdateGroupMemberInfoByMap(groupMember, m); err != nil { SetErrorForResp(err, resp.CommonResp) return } @@ -1807,7 +1804,7 @@ func (s *groupServer) SetGroupMemberInfo(ctx context.Context, req *pbGroup.SetGr return } -func (s *groupServer) GetGroupAbstractInfo(ctx context.Context, req *pbGroup.GetGroupAbstractInfoReq) (resp *pbGroup.GetGroupAbstractInfoResp,_ error) { +func (s *groupServer) GetGroupAbstractInfo(ctx context.Context, req *pbGroup.GetGroupAbstractInfoReq) (resp *pbGroup.GetGroupAbstractInfoResp, _ error) { resp = &pbGroup.GetGroupAbstractInfoResp{CommonResp: &open_im_sdk.CommonResp{}} ctx = trace_log.NewRpcCtx(ctx, utils.GetSelfFuncName(), req.OperationID) defer func() { diff --git a/pkg/common/db/mysql_model/im_mysql_model/group_member_model_k.go b/pkg/common/db/mysql_model/im_mysql_model/group_member_model_k.go index b92833dc0..7739e13b2 100644 --- a/pkg/common/db/mysql_model/im_mysql_model/group_member_model_k.go +++ b/pkg/common/db/mysql_model/im_mysql_model/group_member_model_k.go @@ -38,33 +38,33 @@ type GroupMember struct { func (*GroupMember) Create(ctx context.Context, groupMemberList []*GroupMember) (err error) { defer func() { - trace_log.SetContextInfo(ctx, utils.GetSelfFuncName(), err, "groupMemberList", groupMemberList) + trace_log.SetContextInfo(ctx, utils.GetFuncName(1), err, "groupMemberList", groupMemberList) }() return utils.Wrap(db.DB.MysqlDB.DefaultGormDB().Create(&groupMemberList).Error, "") } func (*GroupMember) Delete(ctx context.Context, groupMembers []*GroupMember) (err error) { defer func() { - trace_log.SetContextInfo(ctx, utils.GetSelfFuncName(), err, "groupMembers", groupMembers) + trace_log.SetContextInfo(ctx, utils.GetFuncName(1), err, "groupMembers", groupMembers) }() return utils.Wrap(db.DB.MysqlDB.DefaultGormDB().Delete(groupMembers).Error, "") } func (*GroupMember) UpdateByMap(ctx context.Context, groupID string, userID string, args map[string]interface{}) (err error) { defer func() { - trace_log.SetContextInfo(ctx, utils.GetSelfFuncName(), err, "groupID", groupID, "userID", userID, "args", args) + trace_log.SetContextInfo(ctx, utils.GetFuncName(1), err, "groupID", groupID, "userID", userID, "args", args) }() return utils.Wrap(db.DB.MysqlDB.DefaultGormDB().Model(&GroupMember{}).Where("group_id = ? and user_id = ?", groupID, userID).Updates(args).Error, "") } func (*GroupMember) Update(ctx context.Context, groupMembers []*GroupMember) (err error) { - defer func() { trace_log.SetContextInfo(ctx, utils.GetSelfFuncName(), err, "groupMembers", groupMembers) }() + defer func() { trace_log.SetContextInfo(ctx, utils.GetFuncName(1), err, "groupMembers", groupMembers) }() return utils.Wrap(db.DB.MysqlDB.DefaultGormDB().Updates(&groupMembers).Error, "") } func (*GroupMember) Find(ctx context.Context, groupMembers []*GroupMember) (groupList []*GroupMember, err error) { defer func() { - trace_log.SetContextInfo(ctx, utils.GetSelfFuncName(), err, "groupMembers", groupMembers, "groupList", groupList) + trace_log.SetContextInfo(ctx, utils.GetFuncName(1), err, "groupMembers", groupMembers, "groupList", groupList) }() var where [][]interface{} for _, groupMember := range groupMembers { @@ -76,7 +76,7 @@ func (*GroupMember) Find(ctx context.Context, groupMembers []*GroupMember) (grou func (*GroupMember) Take(ctx context.Context, groupID string, userID string) (groupMember *GroupMember, err error) { defer func() { - trace_log.SetContextInfo(ctx, utils.GetSelfFuncName(), err, "groupID", groupID, "userID", userID, "groupMember", *groupMember) + trace_log.SetContextInfo(ctx, utils.GetFuncName(1), err, "groupID", groupID, "userID", userID, "groupMember", *groupMember) }() groupMember = &GroupMember{} return groupMember, utils.Wrap(db.DB.MysqlDB.DefaultGormDB().Where("group_id = ? and user_id = ?", groupID, userID).Take(groupMember).Error, "") From 8f51749786306cda09d5c08293bce1e6139d092b Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Tue, 10 Jan 2023 16:17:55 +0800 Subject: [PATCH 2/3] group --- internal/rpc/group/group.go | 141 +++++++++++++++++++++++++++++++++++- 1 file changed, 140 insertions(+), 1 deletion(-) diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index a3ec213bd..7bb470cd6 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -15,6 +15,9 @@ import ( cp "Open_IM/pkg/common/utils" open_im_sdk "Open_IM/pkg/proto/sdk_ws" "github.com/OpenIMSDK/getcdv3" + "google.golang.org/grpc/metadata" + "path" + "runtime/debug" pbCache "Open_IM/pkg/proto/cache" pbConversation "Open_IM/pkg/proto/conversation" @@ -53,6 +56,34 @@ func NewGroupServer(port int) *groupServer { } } +func UnaryServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + defer func() { + if r := recover(); r != nil { + log.NewError("", info.FullMethod, "panic", r, "stack", string(debug.Stack())) + } + }() + funcName := path.Base(info.FullMethod) + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return nil, errors.New("not metadata") + } + operationID := md.Get("operationID")[0] + opUserID := md.Get("opUserID")[0] + ctx = trace_log.NewRpcCtx(ctx, funcName, operationID) + defer trace_log.ShowLog(ctx) + _ = opUserID + trace_log.SetContextInfo(ctx, funcName, err, "rpc req", req.(interface{ String() string }).String()) + resp, err = handler(ctx, req) + if err != nil { + constant.SetErrorForResp(err, resp.(interface { + GetCommonResp() *open_im_sdk.CommonResp + }).GetCommonResp()) + err = nil + } + trace_log.SetContextInfo(ctx, funcName, err, "rpc resp", resp.(interface{ String() string }).String()) + return +} + func (s *groupServer) Run() { log.NewInfo("", "group rpc start ") listenIP := "" @@ -75,6 +106,7 @@ func (s *groupServer) Run() { var grpcOpts = []grpc.ServerOption{ grpc.MaxRecvMsgSize(recvSize), grpc.MaxSendMsgSize(sendSize), + grpc.UnaryInterceptor(UnaryServerInterceptor), } if config.Config.Prometheus.Enable { promePkg.NewGrpcRequestCounter() @@ -114,7 +146,114 @@ func (s *groupServer) Run() { log.NewInfo("", "group rpc success") } -func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupReq) (resp *pbGroup.CreateGroupResp, _ error) { +func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupReq) (*pbGroup.CreateGroupResp, error) { + resp := &pbGroup.CreateGroupResp{CommonResp: &open_im_sdk.CommonResp{}, GroupInfo: &open_im_sdk.GroupInfo{}} + if err := token_verify.CheckAccessV2(ctx, req.OpUserID, req.OwnerUserID); err != nil { + return resp, err + } + var groupOwnerNum int + var userIDs []string + for _, info := range req.InitMemberList { + if info.RoleLevel == constant.GroupOwner { + groupOwnerNum++ + } + userIDs = append(userIDs, info.UserID) + } + if req.OwnerUserID != "" { + groupOwnerNum++ + userIDs = append(userIDs, req.OwnerUserID) + } + if groupOwnerNum != 1 { + return resp, utils.Wrap(constant.ErrArgs, "") + } + if utils.IsRepeatStringSlice(userIDs) { + return resp, utils.Wrap(constant.ErrArgs, "") + } + users, err := rocksCache.GetUserInfoFromCacheBatch(ctx, userIDs) + if err != nil { + return resp, err + } + if len(users) != len(userIDs) { + return resp, utils.Wrap(constant.ErrArgs, "") + } + userMap := make(map[string]*imdb.User) + for i, user := range users { + userMap[user.UserID] = users[i] + } + if err := s.DelGroupAndUserCache(ctx, "", userIDs); err != nil { + return resp, err + } + if err := callbackBeforeCreateGroup(ctx, req); err != nil { + return resp, err + } + groupId := req.GroupInfo.GroupID + if groupId == "" { + groupId = utils.Md5(req.OperationID + strconv.FormatInt(time.Now().UnixNano(), 10)) + bi := big.NewInt(0) + bi.SetString(groupId[0:8], 16) + groupId = bi.String() + } + groupInfo := imdb.Group{} + utils.CopyStructFields(&groupInfo, req.GroupInfo) + groupInfo.CreatorUserID = req.OpUserID + groupInfo.GroupID = groupId + groupInfo.CreateTime = time.Now() + if groupInfo.NotificationUpdateTime.Unix() < 0 { + groupInfo.NotificationUpdateTime = utils.UnixSecondToTime(0) + } + if req.GroupInfo.GroupType != constant.SuperGroup { + var groupMembers []*imdb.GroupMember + joinGroup := func(userID string, roleLevel int32) error { + groupMember := &imdb.GroupMember{GroupID: groupId, RoleLevel: roleLevel, OperatorUserID: req.OpUserID, JoinSource: constant.JoinByInvitation, InviterUserID: req.OpUserID} + user := userMap[userID] + utils.CopyStructFields(&groupMember, user) + if err := CallbackBeforeMemberJoinGroup(ctx, req.OperationID, groupMember, groupInfo.Ex); err != nil { + return err + } + groupMembers = append(groupMembers, groupMember) + return nil + } + if req.OwnerUserID == "" { + if err := joinGroup(req.OwnerUserID, constant.GroupOwner); err != nil { + return resp, err + } + } + for _, info := range req.InitMemberList { + if err := joinGroup(info.UserID, info.RoleLevel); err != nil { + return resp, err + } + } + if err := (*imdb.GroupMember)(nil).Create(ctx, groupMembers); err != nil { + return resp, err + } + } else { + if err := db.DB.CreateSuperGroup(groupId, userIDs, len(userIDs)); err != nil { + return resp, err + } + } + if err := (*imdb.Group)(nil).Create(ctx, []*imdb.Group{&groupInfo}); err != nil { + return resp, err + } + utils.CopyStructFields(resp.GroupInfo, groupInfo) + resp.GroupInfo.MemberCount = uint32(len(userIDs)) + if req.GroupInfo.GroupType != constant.SuperGroup { + chat.GroupCreatedNotification(req.OperationID, req.OpUserID, groupId, userIDs) + } else { + for _, userID := range userIDs { + if err := rocksCache.DelJoinedSuperGroupIDListFromCache(ctx, userID); err != nil { + trace_log.SetContextInfo(ctx, "DelJoinedSuperGroupIDListFromCache", err, "userID", userID) + } + } + go func() { + for _, v := range userIDs { + chat.SuperGroupNotification(req.OperationID, v, v) + } + }() + } + return resp, nil +} + +func (s *groupServer) CreateGroup1(ctx context.Context, req *pbGroup.CreateGroupReq) (resp *pbGroup.CreateGroupResp, _ error) { resp = &pbGroup.CreateGroupResp{CommonResp: &open_im_sdk.CommonResp{}, GroupInfo: &open_im_sdk.GroupInfo{}} ctx = trace_log.NewRpcCtx(ctx, utils.GetSelfFuncName(), req.OperationID) defer func() { From 198cae5ade8679ae94b1710e2dc0d4893bcd1826 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Tue, 10 Jan 2023 18:05:13 +0800 Subject: [PATCH 3/3] group --- internal/rpc/group/group.go | 44 +++++++++++++++++++++++-------------- 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 7bb470cd6..8f025888b 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -15,7 +15,9 @@ import ( cp "Open_IM/pkg/common/utils" open_im_sdk "Open_IM/pkg/proto/sdk_ws" "github.com/OpenIMSDK/getcdv3" + "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" "path" "runtime/debug" @@ -75,10 +77,18 @@ func UnaryServerInterceptor(ctx context.Context, req interface{}, info *grpc.Una trace_log.SetContextInfo(ctx, funcName, err, "rpc req", req.(interface{ String() string }).String()) resp, err = handler(ctx, req) if err != nil { - constant.SetErrorForResp(err, resp.(interface { - GetCommonResp() *open_im_sdk.CommonResp - }).GetCommonResp()) - err = nil + errInfo := constant.ToAPIErrWithErr(err) + var code codes.Code + if errInfo.ErrCode == 0 { + code = codes.Unknown + } else { + code = codes.Code(errInfo.ErrCode) + } + sta, err := status.New(code, errInfo.ErrMsg).WithDetails(wrapperspb.String(errInfo.DetailErrMsg)) + if err != nil { + return nil, err + } + return nil, sta.Err() } trace_log.SetContextInfo(ctx, funcName, err, "rpc resp", resp.(interface{ String() string }).String()) return @@ -147,9 +157,9 @@ func (s *groupServer) Run() { } func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupReq) (*pbGroup.CreateGroupResp, error) { - resp := &pbGroup.CreateGroupResp{CommonResp: &open_im_sdk.CommonResp{}, GroupInfo: &open_im_sdk.GroupInfo{}} + resp := &pbGroup.CreateGroupResp{GroupInfo: &open_im_sdk.GroupInfo{}} if err := token_verify.CheckAccessV2(ctx, req.OpUserID, req.OwnerUserID); err != nil { - return resp, err + return nil, err } var groupOwnerNum int var userIDs []string @@ -164,27 +174,27 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupR userIDs = append(userIDs, req.OwnerUserID) } if groupOwnerNum != 1 { - return resp, utils.Wrap(constant.ErrArgs, "") + return nil, utils.Wrap(constant.ErrArgs, "") } if utils.IsRepeatStringSlice(userIDs) { - return resp, utils.Wrap(constant.ErrArgs, "") + return nil, utils.Wrap(constant.ErrArgs, "") } users, err := rocksCache.GetUserInfoFromCacheBatch(ctx, userIDs) if err != nil { - return resp, err + return nil, err } if len(users) != len(userIDs) { - return resp, utils.Wrap(constant.ErrArgs, "") + return nil, utils.Wrap(constant.ErrArgs, "") } userMap := make(map[string]*imdb.User) for i, user := range users { userMap[user.UserID] = users[i] } if err := s.DelGroupAndUserCache(ctx, "", userIDs); err != nil { - return resp, err + return nil, err } if err := callbackBeforeCreateGroup(ctx, req); err != nil { - return resp, err + return nil, err } groupId := req.GroupInfo.GroupID if groupId == "" { @@ -215,24 +225,24 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupR } if req.OwnerUserID == "" { if err := joinGroup(req.OwnerUserID, constant.GroupOwner); err != nil { - return resp, err + return nil, err } } for _, info := range req.InitMemberList { if err := joinGroup(info.UserID, info.RoleLevel); err != nil { - return resp, err + return nil, err } } if err := (*imdb.GroupMember)(nil).Create(ctx, groupMembers); err != nil { - return resp, err + return nil, err } } else { if err := db.DB.CreateSuperGroup(groupId, userIDs, len(userIDs)); err != nil { - return resp, err + return nil, err } } if err := (*imdb.Group)(nil).Create(ctx, []*imdb.Group{&groupInfo}); err != nil { - return resp, err + return nil, err } utils.CopyStructFields(resp.GroupInfo, groupInfo) resp.GroupInfo.MemberCount = uint32(len(userIDs))