diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index a3ec213bd..7bb470cd6 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -15,6 +15,9 @@ import ( cp "Open_IM/pkg/common/utils" open_im_sdk "Open_IM/pkg/proto/sdk_ws" "github.com/OpenIMSDK/getcdv3" + "google.golang.org/grpc/metadata" + "path" + "runtime/debug" pbCache "Open_IM/pkg/proto/cache" pbConversation "Open_IM/pkg/proto/conversation" @@ -53,6 +56,34 @@ func NewGroupServer(port int) *groupServer { } } +func UnaryServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + defer func() { + if r := recover(); r != nil { + log.NewError("", info.FullMethod, "panic", r, "stack", string(debug.Stack())) + } + }() + funcName := path.Base(info.FullMethod) + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return nil, errors.New("not metadata") + } + operationID := md.Get("operationID")[0] + opUserID := md.Get("opUserID")[0] + ctx = trace_log.NewRpcCtx(ctx, funcName, operationID) + defer trace_log.ShowLog(ctx) + _ = opUserID + trace_log.SetContextInfo(ctx, funcName, err, "rpc req", req.(interface{ String() string }).String()) + resp, err = handler(ctx, req) + if err != nil { + constant.SetErrorForResp(err, resp.(interface { + GetCommonResp() *open_im_sdk.CommonResp + }).GetCommonResp()) + err = nil + } + trace_log.SetContextInfo(ctx, funcName, err, "rpc resp", resp.(interface{ String() string }).String()) + return +} + func (s *groupServer) Run() { log.NewInfo("", "group rpc start ") listenIP := "" @@ -75,6 +106,7 @@ func (s *groupServer) Run() { var grpcOpts = []grpc.ServerOption{ grpc.MaxRecvMsgSize(recvSize), grpc.MaxSendMsgSize(sendSize), + grpc.UnaryInterceptor(UnaryServerInterceptor), } if config.Config.Prometheus.Enable { promePkg.NewGrpcRequestCounter() @@ -114,7 +146,114 @@ func (s *groupServer) Run() { log.NewInfo("", "group rpc success") } -func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupReq) (resp *pbGroup.CreateGroupResp, _ error) { +func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupReq) (*pbGroup.CreateGroupResp, error) { + resp := &pbGroup.CreateGroupResp{CommonResp: &open_im_sdk.CommonResp{}, GroupInfo: &open_im_sdk.GroupInfo{}} + if err := token_verify.CheckAccessV2(ctx, req.OpUserID, req.OwnerUserID); err != nil { + return resp, err + } + var groupOwnerNum int + var userIDs []string + for _, info := range req.InitMemberList { + if info.RoleLevel == constant.GroupOwner { + groupOwnerNum++ + } + userIDs = append(userIDs, info.UserID) + } + if req.OwnerUserID != "" { + groupOwnerNum++ + userIDs = append(userIDs, req.OwnerUserID) + } + if groupOwnerNum != 1 { + return resp, utils.Wrap(constant.ErrArgs, "") + } + if utils.IsRepeatStringSlice(userIDs) { + return resp, utils.Wrap(constant.ErrArgs, "") + } + users, err := rocksCache.GetUserInfoFromCacheBatch(ctx, userIDs) + if err != nil { + return resp, err + } + if len(users) != len(userIDs) { + return resp, utils.Wrap(constant.ErrArgs, "") + } + userMap := make(map[string]*imdb.User) + for i, user := range users { + userMap[user.UserID] = users[i] + } + if err := s.DelGroupAndUserCache(ctx, "", userIDs); err != nil { + return resp, err + } + if err := callbackBeforeCreateGroup(ctx, req); err != nil { + return resp, err + } + groupId := req.GroupInfo.GroupID + if groupId == "" { + groupId = utils.Md5(req.OperationID + strconv.FormatInt(time.Now().UnixNano(), 10)) + bi := big.NewInt(0) + bi.SetString(groupId[0:8], 16) + groupId = bi.String() + } + groupInfo := imdb.Group{} + utils.CopyStructFields(&groupInfo, req.GroupInfo) + groupInfo.CreatorUserID = req.OpUserID + groupInfo.GroupID = groupId + groupInfo.CreateTime = time.Now() + if groupInfo.NotificationUpdateTime.Unix() < 0 { + groupInfo.NotificationUpdateTime = utils.UnixSecondToTime(0) + } + if req.GroupInfo.GroupType != constant.SuperGroup { + var groupMembers []*imdb.GroupMember + joinGroup := func(userID string, roleLevel int32) error { + groupMember := &imdb.GroupMember{GroupID: groupId, RoleLevel: roleLevel, OperatorUserID: req.OpUserID, JoinSource: constant.JoinByInvitation, InviterUserID: req.OpUserID} + user := userMap[userID] + utils.CopyStructFields(&groupMember, user) + if err := CallbackBeforeMemberJoinGroup(ctx, req.OperationID, groupMember, groupInfo.Ex); err != nil { + return err + } + groupMembers = append(groupMembers, groupMember) + return nil + } + if req.OwnerUserID == "" { + if err := joinGroup(req.OwnerUserID, constant.GroupOwner); err != nil { + return resp, err + } + } + for _, info := range req.InitMemberList { + if err := joinGroup(info.UserID, info.RoleLevel); err != nil { + return resp, err + } + } + if err := (*imdb.GroupMember)(nil).Create(ctx, groupMembers); err != nil { + return resp, err + } + } else { + if err := db.DB.CreateSuperGroup(groupId, userIDs, len(userIDs)); err != nil { + return resp, err + } + } + if err := (*imdb.Group)(nil).Create(ctx, []*imdb.Group{&groupInfo}); err != nil { + return resp, err + } + utils.CopyStructFields(resp.GroupInfo, groupInfo) + resp.GroupInfo.MemberCount = uint32(len(userIDs)) + if req.GroupInfo.GroupType != constant.SuperGroup { + chat.GroupCreatedNotification(req.OperationID, req.OpUserID, groupId, userIDs) + } else { + for _, userID := range userIDs { + if err := rocksCache.DelJoinedSuperGroupIDListFromCache(ctx, userID); err != nil { + trace_log.SetContextInfo(ctx, "DelJoinedSuperGroupIDListFromCache", err, "userID", userID) + } + } + go func() { + for _, v := range userIDs { + chat.SuperGroupNotification(req.OperationID, v, v) + } + }() + } + return resp, nil +} + +func (s *groupServer) CreateGroup1(ctx context.Context, req *pbGroup.CreateGroupReq) (resp *pbGroup.CreateGroupResp, _ error) { resp = &pbGroup.CreateGroupResp{CommonResp: &open_im_sdk.CommonResp{}, GroupInfo: &open_im_sdk.GroupInfo{}} ctx = trace_log.NewRpcCtx(ctx, utils.GetSelfFuncName(), req.OperationID) defer func() {