diff --git a/cmd/open_im_api/main.go b/cmd/open_im_api/main.go index 1dc3ab6fb..97311f1cc 100644 --- a/cmd/open_im_api/main.go +++ b/cmd/open_im_api/main.go @@ -15,6 +15,7 @@ import ( "Open_IM/internal/api/user" "Open_IM/pkg/common/config" "Open_IM/pkg/common/log" + "Open_IM/pkg/common/middleware" "Open_IM/pkg/utils" "flag" "fmt" @@ -49,6 +50,7 @@ func main() { r := gin.New() r.Use(gin.Recovery()) r.Use(utils.CorsHandler()) + r.Use(middleware.GinParseOperationID) log.Info("load config: ", config.Config) r.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler)) if config.Config.Prometheus.Enable { diff --git a/internal/api_to_rpc/api.go b/internal/api_to_rpc/api.go index e77f8d340..b736bac37 100644 --- a/internal/api_to_rpc/api.go +++ b/internal/api_to_rpc/api.go @@ -1,24 +1,22 @@ package common import ( - "Open_IM/pkg/common/constant" "Open_IM/pkg/common/trace_log" utils2 "Open_IM/pkg/utils" - "context" utils "github.com/OpenIMSDK/open_utils" "github.com/gin-gonic/gin" + "google.golang.org/grpc/metadata" "reflect" ) func ApiToRpc(c *gin.Context, apiReq, apiResp interface{}, rpcName string, fn interface{}, rpcFuncName string, tokenFunc func(token string, operationID string) (string, error)) { + operationID := c.GetHeader("operationID") nCtx := trace_log.NewCtx(c, rpcFuncName) defer trace_log.ShowLog(nCtx) if err := c.BindJSON(apiReq); err != nil { trace_log.WriteErrorResponse(nCtx, "BindJSON", err) return } - reqValue := reflect.ValueOf(apiReq).Elem() - operationID := reqValue.FieldByName("OperationID").String() trace_log.SetOperationID(nCtx, operationID) trace_log.SetContextInfo(nCtx, "BindJSON", nil, "params", apiReq) etcdConn, err := utils2.GetConn(c, rpcName) @@ -39,23 +37,14 @@ func ApiToRpc(c *gin.Context, apiReq, apiResp interface{}, rpcName string, fn in return } } - if opID := rpcReqPtr.Elem().FieldByName("OperationID"); opID.IsValid() { - opID.SetString(operationID) - if opU := rpcReqPtr.Elem().FieldByName("OpUserID"); opU.IsValid() { - opU.SetString(opUserID) - } - } else { - op := rpcReqPtr.Elem().FieldByName("Operation").Elem() - op.FieldByName("OperationID").SetString(operationID) - op.FieldByName("OpUserID").SetString(opUserID) - } if err := utils.CopyStructFields(rpcReqPtr.Interface(), apiReq); err != nil { trace_log.WriteErrorResponse(nCtx, "CopyStructFields_RpcReq", err) return } + md := metadata.Pairs("operationID", operationID, "opUserID", opUserID) respArr := rpc.Call([]reflect.Value{ - reflect.ValueOf(context.Context(c)), // context.Context - rpcReqPtr, // rpc apiReq + reflect.ValueOf(metadata.NewOutgoingContext(c, md)), // context.Context + rpcReqPtr, // rpc apiReq }) // respArr => (apiResp, error) if !respArr[1].IsNil() { // rpc err != nil err := respArr[1].Interface().(error) @@ -64,17 +53,6 @@ func ApiToRpc(c *gin.Context, apiReq, apiResp interface{}, rpcName string, fn in } rpcResp := respArr[0].Elem() trace_log.SetContextInfo(nCtx, rpcFuncName, nil, "rpc req", rpcReqPtr.Interface(), "resp", rpcResp.Interface()) - commonResp := rpcResp.FieldByName("CommonResp").Elem() - errCodeVal := commonResp.FieldByName("ErrCode") - errMsgVal := commonResp.FieldByName("ErrMsg").Interface().(string) - errCode := errCodeVal.Interface().(int32) - if errCode != 0 { - trace_log.WriteErrorResponse(nCtx, "RpcErrCode", &constant.ErrInfo{ - ErrCode: errCode, - ErrMsg: errMsgVal, - }) - return - } if apiResp != nil { if err := utils.CopyStructFields(apiResp, rpcResp.Interface()); err != nil { trace_log.WriteErrorResponse(nCtx, "CopyStructFields_RpcResp", err) @@ -83,3 +61,77 @@ func ApiToRpc(c *gin.Context, apiReq, apiResp interface{}, rpcName string, fn in } trace_log.SetSuccess(nCtx, rpcFuncName, apiResp) } + +//func ApiToRpc(c *gin.Context, apiReq, apiResp interface{}, rpcName string, fn interface{}, rpcFuncName string, tokenFunc func(token string, operationID string) (string, error)) { +// nCtx := trace_log.NewCtx(c, rpcFuncName) +// defer trace_log.ShowLog(nCtx) +// if err := c.BindJSON(apiReq); err != nil { +// trace_log.WriteErrorResponse(nCtx, "BindJSON", err) +// return +// } +// reqValue := reflect.ValueOf(apiReq).Elem() +// operationID := reqValue.FieldByName("OperationID").String() +// trace_log.SetOperationID(nCtx, operationID) +// trace_log.SetContextInfo(nCtx, "BindJSON", nil, "params", apiReq) +// etcdConn, err := utils2.GetConn(c, rpcName) +// if err != nil { +// trace_log.WriteErrorResponse(nCtx, "GetDefaultConn", err) +// return +// } +// rpc := reflect.ValueOf(fn).Call([]reflect.Value{ +// reflect.ValueOf(etcdConn), +// })[0].MethodByName(rpcFuncName) // rpc func +// rpcReqPtr := reflect.New(rpc.Type().In(1).Elem()) // *req参数 +// var opUserID string +// if tokenFunc != nil { +// var err error +// opUserID, err = tokenFunc(c.GetHeader("token"), operationID) +// if err != nil { +// trace_log.WriteErrorResponse(nCtx, "TokenFunc", err) +// return +// } +// } +// if opID := rpcReqPtr.Elem().FieldByName("OperationID"); opID.IsValid() { +// opID.SetString(operationID) +// if opU := rpcReqPtr.Elem().FieldByName("OpUserID"); opU.IsValid() { +// opU.SetString(opUserID) +// } +// } else { +// op := rpcReqPtr.Elem().FieldByName("Operation").Elem() +// op.FieldByName("OperationID").SetString(operationID) +// op.FieldByName("OpUserID").SetString(opUserID) +// } +// if err := utils.CopyStructFields(rpcReqPtr.Interface(), apiReq); err != nil { +// trace_log.WriteErrorResponse(nCtx, "CopyStructFields_RpcReq", err) +// return +// } +// respArr := rpc.Call([]reflect.Value{ +// reflect.ValueOf(context.Context(c)), // context.Context +// rpcReqPtr, // rpc apiReq +// }) // respArr => (apiResp, error) +// if !respArr[1].IsNil() { // rpc err != nil +// err := respArr[1].Interface().(error) +// trace_log.WriteErrorResponse(nCtx, rpcFuncName, err, "rpc req", rpcReqPtr.Interface()) +// return +// } +// rpcResp := respArr[0].Elem() +// trace_log.SetContextInfo(nCtx, rpcFuncName, nil, "rpc req", rpcReqPtr.Interface(), "resp", rpcResp.Interface()) +// commonResp := rpcResp.FieldByName("CommonResp").Elem() +// errCodeVal := commonResp.FieldByName("ErrCode") +// errMsgVal := commonResp.FieldByName("ErrMsg").Interface().(string) +// errCode := errCodeVal.Interface().(int32) +// if errCode != 0 { +// trace_log.WriteErrorResponse(nCtx, "RpcErrCode", &constant.ErrInfo{ +// ErrCode: errCode, +// ErrMsg: errMsgVal, +// }) +// return +// } +// if apiResp != nil { +// if err := utils.CopyStructFields(apiResp, rpcResp.Interface()); err != nil { +// trace_log.WriteErrorResponse(nCtx, "CopyStructFields_RpcResp", err) +// return +// } +// } +// trace_log.SetSuccess(nCtx, rpcFuncName, apiResp) +//} diff --git a/internal/rpc/fault_tolerant/conn.go b/internal/rpc/fault_tolerant/conn.go index 0d911346c..5ba212400 100644 --- a/internal/rpc/fault_tolerant/conn.go +++ b/internal/rpc/fault_tolerant/conn.go @@ -3,6 +3,7 @@ package fault_tolerant import ( "Open_IM/pkg/common/config" "Open_IM/pkg/common/log" + "Open_IM/pkg/common/middleware" "Open_IM/pkg/utils" "github.com/OpenIMSDK/getcdv3" "google.golang.org/grpc" @@ -75,7 +76,7 @@ func GetConfigConn(serviceName string, operationID string) *grpc.ClientConn { } target := rpcRegisterIP + ":" + utils.Int32ToString(int32(configPortList[0])) log.Info(operationID, "rpcRegisterIP ", rpcRegisterIP, " port ", configPortList, " grpc target: ", target, " serviceName: ", serviceName) - conn, err := grpc.Dial(target, grpc.WithInsecure()) + conn, err := grpc.Dial(target, grpc.WithInsecure(), grpc.WithUnaryInterceptor(middleware.RpcClientInterceptor)) if err != nil { log.Error(operationID, "grpc.Dail failed ", err.Error()) return nil diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 4159485b3..84fbd0bae 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -9,25 +9,20 @@ import ( imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model" rocksCache "Open_IM/pkg/common/db/rocks_cache" "Open_IM/pkg/common/log" + "Open_IM/pkg/common/middleware" promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/common/token_verify" "Open_IM/pkg/common/trace_log" cp "Open_IM/pkg/common/utils" - "Open_IM/pkg/getcdv3" - open_im_sdk "Open_IM/pkg/proto/sdk_ws" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/metadata" - "google.golang.org/grpc/status" - "path" - "runtime/debug" - pbCache "Open_IM/pkg/proto/cache" pbConversation "Open_IM/pkg/proto/conversation" pbGroup "Open_IM/pkg/proto/group" + open_im_sdk "Open_IM/pkg/proto/sdk_ws" pbUser "Open_IM/pkg/proto/user" "Open_IM/pkg/utils" "context" "errors" + "github.com/OpenIMSDK/getcdv3" "math/big" "net" "strconv" @@ -58,42 +53,6 @@ func NewGroupServer(port int) *groupServer { } } -func UnaryServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { - defer func() { - if r := recover(); r != nil { - log.NewError("", info.FullMethod, "panic", r, "stack", string(debug.Stack())) - } - }() - funcName := path.Base(info.FullMethod) - md, ok := metadata.FromIncomingContext(ctx) - if !ok { - return nil, errors.New("not metadata") - } - operationID := md.Get("operationID")[0] - opUserID := md.Get("opUserID")[0] - ctx = trace_log.NewRpcCtx(ctx, funcName, operationID) - defer trace_log.ShowLog(ctx) - _ = opUserID - trace_log.SetContextInfo(ctx, funcName, err, "rpc req", req.(interface{ String() string }).String()) - resp, err = handler(ctx, req) - if err != nil { - errInfo := constant.ToAPIErrWithErr(err) - var code codes.Code - if errInfo.ErrCode == 0 { - code = codes.Unknown - } else { - code = codes.Code(errInfo.ErrCode) - } - sta, err := status.New(code, errInfo.ErrMsg).WithDetails(wrapperspb.String(errInfo.DetailErrMsg)) - if err != nil { - return nil, err - } - return nil, sta.Err() - } - trace_log.SetContextInfo(ctx, funcName, err, "rpc resp", resp.(interface{ String() string }).String()) - return -} - func (s *groupServer) Run() { log.NewInfo("", "group rpc start ") listenIP := "" @@ -116,7 +75,7 @@ func (s *groupServer) Run() { var grpcOpts = []grpc.ServerOption{ grpc.MaxRecvMsgSize(recvSize), grpc.MaxSendMsgSize(sendSize), - grpc.UnaryInterceptor(UnaryServerInterceptor), + grpc.UnaryInterceptor(middleware.RpcServerInterceptor), } if config.Config.Prometheus.Enable { promePkg.NewGrpcRequestCounter() @@ -263,145 +222,15 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupR return resp, nil } -func (s *groupServer) CreateGroup1(ctx context.Context, req *pbGroup.CreateGroupReq) (resp *pbGroup.CreateGroupResp, _ error) { - resp = &pbGroup.CreateGroupResp{CommonResp: &open_im_sdk.CommonResp{}, GroupInfo: &open_im_sdk.GroupInfo{}} - ctx = trace_log.NewRpcCtx(ctx, utils.GetSelfFuncName(), req.OperationID) - defer func() { - trace_log.SetContextInfo(ctx, utils.GetFuncName(1), nil, "req", req.String(), "resp", resp.String()) - trace_log.ShowLog(ctx) - }() - if err := token_verify.CheckAccessV2(ctx, req.OpUserID, req.OwnerUserID); err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return - } - var groupOwnerNum int - var userIDs []string - for _, info := range req.InitMemberList { - if info.RoleLevel == constant.GroupOwner { - groupOwnerNum++ - } - userIDs = append(userIDs, info.UserID) - } - if req.OwnerUserID != "" { - groupOwnerNum++ - userIDs = append(userIDs, req.OwnerUserID) - } - if groupOwnerNum != 1 { - constant.SetErrorForResp(constant.ErrArgs, resp.CommonResp) - return - } - if utils.IsRepeatStringSlice(userIDs) { - constant.SetErrorForResp(constant.ErrArgs, resp.CommonResp) - return - } - users, err := rocksCache.GetUserInfoFromCacheBatch(ctx, userIDs) - if err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return - } - if len(users) != len(userIDs) { - constant.SetErrorForResp(constant.ErrArgs, resp.CommonResp) - return - } - userMap := make(map[string]*imdb.User) - for i, user := range users { - userMap[user.UserID] = users[i] - } - if err := s.DelGroupAndUserCache(ctx, "", userIDs); err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return - } - if err := callbackBeforeCreateGroup(ctx, req); err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return - } - groupId := req.GroupInfo.GroupID - if groupId == "" { - groupId = utils.Md5(req.OperationID + strconv.FormatInt(time.Now().UnixNano(), 10)) - bi := big.NewInt(0) - bi.SetString(groupId[0:8], 16) - groupId = bi.String() - } - groupInfo := imdb.Group{} - utils.CopyStructFields(&groupInfo, req.GroupInfo) - groupInfo.CreatorUserID = req.OpUserID - groupInfo.GroupID = groupId - groupInfo.CreateTime = time.Now() - if groupInfo.NotificationUpdateTime.Unix() < 0 { - groupInfo.NotificationUpdateTime = utils.UnixSecondToTime(0) - } - if req.GroupInfo.GroupType != constant.SuperGroup { - var groupMembers []*imdb.GroupMember - joinGroup := func(userID string, roleLevel int32) error { - groupMember := &imdb.GroupMember{GroupID: groupId, RoleLevel: roleLevel, OperatorUserID: req.OpUserID, JoinSource: constant.JoinByInvitation, InviterUserID: req.OpUserID} - user := userMap[userID] - utils.CopyStructFields(&groupMember, user) - if err := CallbackBeforeMemberJoinGroup(ctx, req.OperationID, groupMember, groupInfo.Ex); err != nil { - return err - } - groupMembers = append(groupMembers, groupMember) - return nil - } - if req.OwnerUserID == "" { - if err := joinGroup(req.OwnerUserID, constant.GroupOwner); err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return - } - } - for _, info := range req.InitMemberList { - if err := joinGroup(info.UserID, info.RoleLevel); err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return - } - } - if err := (*imdb.GroupMember)(nil).Create(ctx, groupMembers); err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return - } - } else { - if err := db.DB.CreateSuperGroup(groupId, userIDs, len(userIDs)); err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return - } - } - if err := (*imdb.Group)(nil).Create(ctx, []*imdb.Group{&groupInfo}); err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return - } - utils.CopyStructFields(resp.GroupInfo, groupInfo) - resp.GroupInfo.MemberCount = uint32(len(userIDs)) - if req.GroupInfo.GroupType != constant.SuperGroup { - chat.GroupCreatedNotification(req.OperationID, req.OpUserID, groupId, userIDs) - } else { - for _, userID := range userIDs { - if err := rocksCache.DelJoinedSuperGroupIDListFromCache(ctx, userID); err != nil { - trace_log.SetContextInfo(ctx, "DelJoinedSuperGroupIDListFromCache", err, "userID", userID) - } - } - go func() { - for _, v := range userIDs { - chat.SuperGroupNotification(req.OperationID, v, v) - } - }() - } - return -} +func (s *groupServer) GetJoinedGroupList(ctx context.Context, req *pbGroup.GetJoinedGroupListReq) (*pbGroup.GetJoinedGroupListResp, error) { + resp := &pbGroup.GetJoinedGroupListResp{} -func (s *groupServer) GetJoinedGroupList(ctx context.Context, req *pbGroup.GetJoinedGroupListReq) (resp *pbGroup.GetJoinedGroupListResp, _ error) { - resp = &pbGroup.GetJoinedGroupListResp{CommonResp: &open_im_sdk.CommonResp{}} - ctx = trace_log.NewRpcCtx(ctx, utils.GetSelfFuncName(), req.OperationID) - defer func() { - trace_log.SetContextInfo(ctx, utils.GetFuncName(1), nil, "req", req.String(), "resp", resp.String()) - trace_log.ShowLog(ctx) - }() if err := token_verify.CheckAccessV2(ctx, req.OpUserID, req.FromUserID); err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return + return nil, err } joinedGroupList, err := rocksCache.GetJoinedGroupIDListFromCache(ctx, req.FromUserID) if err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return + return nil, err } for _, groupID := range joinedGroupList { var groupNode open_im_sdk.GroupInfo @@ -438,28 +267,22 @@ func (s *groupServer) GetJoinedGroupList(ctx context.Context, req *pbGroup.GetJo groupNode.OwnerUserID = owner.UserID resp.GroupList = append(resp.GroupList, &groupNode) } - return + return resp, nil } -func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbGroup.InviteUserToGroupReq) (resp *pbGroup.InviteUserToGroupResp, _ error) { - resp = &pbGroup.InviteUserToGroupResp{CommonResp: &open_im_sdk.CommonResp{}} - ctx = trace_log.NewRpcCtx(ctx, utils.GetSelfFuncName(), req.OperationID) - defer func() { - trace_log.SetContextInfo(ctx, utils.GetFuncName(1), nil, "req", req.String(), "resp", resp.String()) - trace_log.ShowLog(ctx) - }() +func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbGroup.InviteUserToGroupReq) (*pbGroup.InviteUserToGroupResp, error) { + resp := &pbGroup.InviteUserToGroupResp{} + if !imdb.IsExistGroupMember(req.GroupID, req.OpUserID) && !token_verify.IsManagerUserID(req.OpUserID) { constant.SetErrorForResp(constant.ErrIdentity, resp.CommonResp) - return + return nil, utils.Wrap(constant.ErrIdentity, "") } groupInfo, err := (*imdb.Group)(nil).Take(ctx, req.GroupID) if err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return + return nil, err } if groupInfo.Status == constant.GroupStatusDismissed { - constant.SetErrorForResp(constant.ErrDismissedAlready, resp.CommonResp) - return + return nil, utils.Wrap(constant.ErrDismissedAlready, "") } if groupInfo.NeedVerification == constant.AllNeedVerification && !imdb.IsGroupOwnerAdmin(req.GroupID, req.OpUserID) && !token_verify.IsManagerUserID(req.OpUserID) { @@ -489,11 +312,10 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbGroup.Invite chat.JoinGroupApplicationNotification(&joinReq) } } - return + return resp, nil } if err := s.DelGroupAndUserCache(ctx, req.GroupID, req.InvitedUserIDList); err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return + return nil, err } //from User: invite: applicant //to user: invite: invited @@ -525,8 +347,7 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbGroup.Invite toInsertInfo.InviterUserID = req.OpUserID toInsertInfo.JoinSource = constant.JoinByInvitation if err := CallbackBeforeMemberJoinGroup(ctx, req.OperationID, &toInsertInfo, groupInfo.Ex); err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return + return nil, err } err = imdb.InsertIntoGroupMember(toInsertInfo) if err != nil { @@ -545,8 +366,7 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbGroup.Invite } else { okUserIDList = req.InvitedUserIDList if err := db.DB.AddUserToSuperGroup(req.GroupID, req.InvitedUserIDList); err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return + return nil, err } } @@ -562,11 +382,12 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbGroup.Invite chat.SuperGroupNotification(req.OperationID, v, v) } } - return + return resp, nil } -func (s *groupServer) GetGroupAllMember(ctx context.Context, req *pbGroup.GetGroupAllMemberReq) (resp *pbGroup.GetGroupAllMemberResp, err error) { - resp = &pbGroup.GetGroupAllMemberResp{CommonResp: &open_im_sdk.CommonResp{}} +func (s *groupServer) GetGroupAllMember(ctx context.Context, req *pbGroup.GetGroupAllMemberReq) (*pbGroup.GetGroupAllMemberResp, error) { + resp := &pbGroup.GetGroupAllMemberResp{} + groupInfo, err := rocksCache.GetGroupInfoFromCache(ctx, req.GroupID) if err != nil { return nil, err @@ -585,17 +406,12 @@ func (s *groupServer) GetGroupAllMember(ctx context.Context, req *pbGroup.GetGro return resp, nil } -func (s *groupServer) GetGroupMemberList(ctx context.Context, req *pbGroup.GetGroupMemberListReq) (resp *pbGroup.GetGroupMemberListResp, err error) { - resp = &pbGroup.GetGroupMemberListResp{CommonResp: &open_im_sdk.CommonResp{}} - ctx = trace_log.NewRpcCtx(ctx, utils.GetSelfFuncName(), req.OperationID) - defer func() { - trace_log.SetContextInfo(ctx, utils.GetFuncName(1), nil, "req", req.String(), "resp", resp.String()) - trace_log.ShowLog(ctx) - }() +func (s *groupServer) GetGroupMemberList(ctx context.Context, req *pbGroup.GetGroupMemberListReq) (*pbGroup.GetGroupMemberListResp, error) { + resp := &pbGroup.GetGroupMemberListResp{} + memberList, err := imdb.GetGroupMemberByGroupID(req.GroupID, req.Filter, req.NextSeq, 30) if err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return + return nil, err } for _, v := range memberList { @@ -609,7 +425,7 @@ func (s *groupServer) GetGroupMemberList(ctx context.Context, req *pbGroup.GetGr } else { resp.NextSeq = req.NextSeq + int32(len(memberList)) } - return + return resp, nil } func (s *groupServer) getGroupUserLevel(groupID, userID string) (int, error) { @@ -632,17 +448,12 @@ func (s *groupServer) getGroupUserLevel(groupID, userID string) (int, error) { return opFlag, nil } -func (s *groupServer) KickGroupMember(ctx context.Context, req *pbGroup.KickGroupMemberReq) (resp *pbGroup.KickGroupMemberResp, _ error) { - resp = &pbGroup.KickGroupMemberResp{CommonResp: &open_im_sdk.CommonResp{}} - ctx = trace_log.NewRpcCtx(ctx, utils.GetSelfFuncName(), req.OperationID) - defer func() { - trace_log.SetContextInfo(ctx, utils.GetFuncName(1), nil, "req", req.String(), "resp", resp.String()) - trace_log.ShowLog(ctx) - }() +func (s *groupServer) KickGroupMember(ctx context.Context, req *pbGroup.KickGroupMemberReq) (*pbGroup.KickGroupMemberResp, error) { + resp := &pbGroup.KickGroupMemberResp{} + groupInfo, err := rocksCache.GetGroupInfoFromCache(ctx, req.GroupID) if err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return + return nil, err } var okUserIDList []string if groupInfo.GroupType != constant.SuperGroup { @@ -651,11 +462,10 @@ func (s *groupServer) KickGroupMember(ctx context.Context, req *pbGroup.KickGrou opInfo, err := rocksCache.GetGroupMemberInfoFromCache(ctx, req.GroupID, req.OpUserID) if err != nil { constant.SetErrorForResp(err, resp.CommonResp) - return + return nil, err } if opInfo.RoleLevel == constant.GroupOrdinaryUsers { - constant.SetErrorForResp(constant.ErrNoPermission, resp.CommonResp) - return + return nil, utils.Wrap(constant.ErrNoPermission, "") } else if opInfo.RoleLevel == constant.GroupOwner { opFlag = 2 //owner } else { @@ -669,12 +479,10 @@ func (s *groupServer) KickGroupMember(ctx context.Context, req *pbGroup.KickGrou if len(req.KickedUserIDList) == 0 { //log.NewError(req.OperationID, "failed, kick list 0") //return &pbGroup.KickGroupMemberResp{ErrCode: constant.ErrArgs.ErrCode, ErrMsg: constant.ErrArgs.ErrMsg}, nil - constant.SetErrorForResp(constant.ErrArgs, resp.CommonResp) - return + return nil, utils.Wrap(constant.ErrArgs, "") } if err := s.DelGroupAndUserCache(ctx, req.GroupID, req.KickedUserIDList); err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return + return nil, err } //remove for _, v := range req.KickedUserIDList { @@ -716,10 +524,9 @@ func (s *groupServer) KickGroupMember(ctx context.Context, req *pbGroup.KickGrou c.GroupID = req.GroupID c.IsNotInGroup = true reqPb.Conversation = &c - etcdConn, err := getcdv3.GetConn(ctx, config.Config.RpcRegisterName.OpenImUserName) + etcdConn, err := utils.GetConn(ctx, config.Config.RpcRegisterName.OpenImUserName) if err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return + return nil, err } client := pbUser.NewUserClient(etcdConn) respPb, err := client.SetConversation(context.Background(), &reqPb) @@ -728,8 +535,7 @@ func (s *groupServer) KickGroupMember(ctx context.Context, req *pbGroup.KickGrou } else { okUserIDList = req.KickedUserIDList if err := db.DB.RemoverUserFromSuperGroup(req.GroupID, okUserIDList); err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return + return nil, err } } @@ -753,21 +559,16 @@ func (s *groupServer) KickGroupMember(ctx context.Context, req *pbGroup.KickGrou }() } - return + return resp, nil } -func (s *groupServer) GetGroupMembersInfo(ctx context.Context, req *pbGroup.GetGroupMembersInfoReq) (resp *pbGroup.GetGroupMembersInfoResp, err error) { - defer func() { - trace_log.SetContextInfo(ctx, utils.GetSelfFuncName(), err, "rpc req", req.String(), "rpc resp", resp.String()) - trace_log.ShowLog(ctx) - }() - ctx = trace_log.NewRpcCtx(ctx, utils.GetSelfFuncName(), req.OperationID) - resp = &pbGroup.GetGroupMembersInfoResp{CommonResp: &open_im_sdk.CommonResp{}} +func (s *groupServer) GetGroupMembersInfo(ctx context.Context, req *pbGroup.GetGroupMembersInfoReq) (*pbGroup.GetGroupMembersInfoResp, error) { + resp := &pbGroup.GetGroupMembersInfoResp{} resp.MemberList = []*open_im_sdk.GroupMemberFullInfo{} for _, userID := range req.MemberList { groupMember, err := rocksCache.GetGroupMemberInfoFromCache(ctx, req.GroupID, userID) if err != nil { - return resp, nil + return nil, err } var memberNode open_im_sdk.GroupMemberFullInfo utils.CopyStructFields(&memberNode, groupMember) @@ -800,16 +601,11 @@ func FillPublicUserInfoByUserID(operationID, userID string, userInfo *open_im_sd return nil } -func (s *groupServer) GetGroupApplicationList(ctx context.Context, req *pbGroup.GetGroupApplicationListReq) (resp *pbGroup.GetGroupApplicationListResp, err error) { - ctx = trace_log.NewRpcCtx(ctx, utils.GetSelfFuncName(), req.OperationID) - trace_log.SetContextInfo(ctx, utils.GetSelfFuncName(), err, "rpc req ", req.String(), "rpc resp ", resp.String()) - trace_log.ShowLog(ctx) - - resp = &pbGroup.GetGroupApplicationListResp{CommonResp: &open_im_sdk.CommonResp{}} +func (s *groupServer) GetGroupApplicationList(ctx context.Context, req *pbGroup.GetGroupApplicationListReq) (*pbGroup.GetGroupApplicationListResp, error) { + resp := &pbGroup.GetGroupApplicationListResp{} reply, err := imdb.GetRecvGroupApplicationList(req.FromUserID) if err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return resp, nil + return nil, err } var errResult error trace_log.SetContextInfo(ctx, "GetRecvGroupApplicationList", nil, " FromUserID: ", req.FromUserID, "GroupApplicationList: ", reply) @@ -832,20 +628,14 @@ func (s *groupServer) GetGroupApplicationList(ctx context.Context, req *pbGroup. resp.GroupRequestList = append(resp.GroupRequestList, &node) } if errResult != nil && len(resp.GroupRequestList) == 0 { - constant.SetErrorForResp(err, resp.CommonResp) - return resp, nil + return nil, err } trace_log.SetRpcRespInfo(ctx, utils.GetSelfFuncName(), resp.String()) return resp, nil } -func (s *groupServer) GetGroupsInfo(ctx context.Context, req *pbGroup.GetGroupsInfoReq) (resp *pbGroup.GetGroupsInfoResp, _ error) { - resp = &pbGroup.GetGroupsInfoResp{CommonResp: &open_im_sdk.CommonResp{}} - ctx = trace_log.NewRpcCtx(ctx, utils.GetSelfFuncName(), req.OperationID) - defer func() { - trace_log.SetContextInfo(ctx, utils.GetSelfFuncName(), nil, "rpc req ", req.String(), "rpc resp ", resp.String()) - trace_log.ShowLog(ctx) - }() +func (s *groupServer) GetGroupsInfo(ctx context.Context, req *pbGroup.GetGroupsInfoReq) (*pbGroup.GetGroupsInfoResp, error) { + resp := &pbGroup.GetGroupsInfoResp{} groupsInfoList := make([]*open_im_sdk.GroupInfo, 0) for _, groupID := range req.GroupIDList { groupInfoFromRedis, err := rocksCache.GetGroupInfoFromCache(ctx, groupID) @@ -859,7 +649,7 @@ func (s *groupServer) GetGroupsInfo(ctx context.Context, req *pbGroup.GetGroupsI groupsInfoList = append(groupsInfoList, &groupInfo) } resp.GroupInfoList = groupsInfoList - return + return resp, nil } func CheckPermission(ctx context.Context, groupID string, userID string) (err error) { @@ -872,16 +662,11 @@ func CheckPermission(ctx context.Context, groupID string, userID string) (err er return nil } -func (s *groupServer) GroupApplicationResponse(ctx context.Context, req *pbGroup.GroupApplicationResponseReq) (resp *pbGroup.GroupApplicationResponseResp, _ error) { - resp = &pbGroup.GroupApplicationResponseResp{CommonResp: &open_im_sdk.CommonResp{}} - ctx = trace_log.NewRpcCtx(ctx, utils.GetSelfFuncName(), req.OperationID) - defer func() { - trace_log.SetContextInfo(ctx, utils.GetSelfFuncName(), nil, "rpc req ", req.String(), "rpc resp ", resp.String()) - trace_log.ShowLog(ctx) - }() +func (s *groupServer) GroupApplicationResponse(ctx context.Context, req *pbGroup.GroupApplicationResponseReq) (*pbGroup.GroupApplicationResponseResp, error) { + resp := &pbGroup.GroupApplicationResponseResp{} + if err := CheckPermission(ctx, req.GroupID, req.OpUserID); err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return + return nil, err } groupRequest := imdb.GroupRequest{} utils.CopyStructFields(&groupRequest, req) @@ -889,24 +674,20 @@ func (s *groupServer) GroupApplicationResponse(ctx context.Context, req *pbGroup groupRequest.HandleUserID = req.OpUserID groupRequest.HandledTime = time.Now() if err := (&imdb.GroupRequest{}).Update(ctx, []*imdb.GroupRequest{&groupRequest}); err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return + return nil, err } groupInfo, err := rocksCache.GetGroupInfoFromCache(ctx, req.GroupID) if err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return + return nil, err } if req.HandleResult == constant.GroupResponseAgree { user, err := imdb.GetUserByUserID(req.FromUserID) if err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return + return nil, err } request, err := (&imdb.GroupRequest{}).Take(ctx, req.GroupID, req.FromUserID) if err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return + return nil, err } member := imdb.GroupMember{} member.GroupID = req.GroupID @@ -920,29 +701,27 @@ func (s *groupServer) GroupApplicationResponse(ctx context.Context, req *pbGroup member.MuteEndTime = time.Unix(int64(time.Now().Second()), 0) err = CallbackBeforeMemberJoinGroup(ctx, req.OperationID, &member, groupInfo.Ex) if err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return + return nil, err } err = (&imdb.GroupMember{}).Create(ctx, []*imdb.GroupMember{&member}) if err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return + return nil, err } etcdCacheConn, err := fault_tolerant.GetDefaultConn(config.Config.RpcRegisterName.OpenImCacheName, req.OperationID) if err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return + return nil, err } cacheClient := pbCache.NewCacheClient(etcdCacheConn) cacheResp, err := cacheClient.DelGroupMemberIDListFromCache(context.Background(), &pbCache.DelGroupMemberIDListFromCacheReq{OperationID: req.OperationID, GroupID: req.GroupID}) if err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return + return nil, err } if cacheResp.CommonResp.ErrCode != 0 { - constant.SetErrorForResp(err, resp.CommonResp) - return + return nil, utils.Wrap(&constant.ErrInfo{ + ErrCode: cacheResp.CommonResp.ErrCode, + ErrMsg: cacheResp.CommonResp.ErrMsg, + }, "") } _ = rocksCache.DelGroupMemberListHashFromCache(ctx, req.GroupID) _ = rocksCache.DelJoinedGroupIDListFromCache(ctx, req.FromUserID) @@ -952,55 +731,44 @@ func (s *groupServer) GroupApplicationResponse(ctx context.Context, req *pbGroup } else if req.HandleResult == constant.GroupResponseRefuse { chat.GroupApplicationRejectedNotification(req) } else { - constant.SetErrorForResp(constant.ErrArgs, resp.CommonResp) - return + return nil, utils.Wrap(constant.ErrArgs, "") } - return + return resp, nil } -func (s *groupServer) JoinGroup(ctx context.Context, req *pbGroup.JoinGroupReq) (resp *pbGroup.JoinGroupResp, _ error) { - resp = &pbGroup.JoinGroupResp{CommonResp: &open_im_sdk.CommonResp{}} - ctx = trace_log.NewRpcCtx(ctx, utils.GetSelfFuncName(), req.OperationID) - defer func() { - trace_log.SetContextInfo(ctx, utils.GetSelfFuncName(), nil, "rpc req ", req.String(), "rpc resp ", resp.String()) - trace_log.ShowLog(ctx) - }() +func (s *groupServer) JoinGroup(ctx context.Context, req *pbGroup.JoinGroupReq) (*pbGroup.JoinGroupResp, error) { + resp := &pbGroup.JoinGroupResp{} + if _, err := imdb.GetUserByUserID(req.OpUserID); err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return + return nil, err } groupInfo, err := rocksCache.GetGroupInfoFromCache(ctx, req.GroupID) if err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return + return nil, err } if groupInfo.Status == constant.GroupStatusDismissed { constant.SetErrorForResp(constant.ErrDismissedAlready, resp.CommonResp) - return + return nil, utils.Wrap(constant.ErrDismissedAlready, "") } if groupInfo.NeedVerification == constant.Directly { if groupInfo.GroupType != constant.SuperGroup { us, err := imdb.GetUserByUserID(req.OpUserID) if err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return + return nil, err } //to group member groupMember := imdb.GroupMember{GroupID: req.GroupID, RoleLevel: constant.GroupOrdinaryUsers, OperatorUserID: req.OpUserID} utils.CopyStructFields(&groupMember, us) if err := CallbackBeforeMemberJoinGroup(ctx, req.OperationID, &groupMember, groupInfo.Ex); err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return + return nil, err } if err := s.DelGroupAndUserCache(ctx, req.GroupID, []string{req.OpUserID}); err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return + return nil, err } err = imdb.InsertIntoGroupMember(groupMember) if err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return + return nil, err } var sessionType int @@ -1019,20 +787,18 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbGroup.JoinGroupReq) c.IsNotInGroup = false c.UpdateUnreadCountTime = utils.GetCurrentTimestampByMill() reqPb.Conversation = &c - etcdConn, err := getcdv3.GetConn(ctx, config.Config.RpcRegisterName.OpenImUserName) + etcdConn, err := utils.GetConn(ctx, config.Config.RpcRegisterName.OpenImUserName) if err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return + return nil, err } client := pbUser.NewUserClient(etcdConn) respPb, err := client.SetConversation(context.Background(), &reqPb) trace_log.SetContextInfo(ctx, "SetConversation", err, "req", reqPb, "resp", respPb) chat.MemberEnterDirectlyNotification(req.GroupID, req.OpUserID, req.OperationID) - return + return resp, nil } else { constant.SetErrorForResp(constant.ErrGroupTypeNotSupport, resp.CommonResp) - log.Error(req.OperationID, "JoinGroup rpc failed, group type: ", groupInfo.GroupType, "not support directly") - return + return resp, nil } } var groupRequest imdb.GroupRequest @@ -1042,45 +808,35 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbGroup.JoinGroupReq) groupRequest.JoinSource = req.JoinSource err = imdb.InsertIntoGroupRequest(groupRequest) if err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return + return nil, err } chat.JoinGroupApplicationNotification(req) - return + return resp, nil } -func (s *groupServer) QuitGroup(ctx context.Context, req *pbGroup.QuitGroupReq) (resp *pbGroup.QuitGroupResp, _ error) { - resp = &pbGroup.QuitGroupResp{CommonResp: &open_im_sdk.CommonResp{}} - ctx = trace_log.NewRpcCtx(ctx, utils.GetSelfFuncName(), req.OperationID) - defer func() { - trace_log.SetContextInfo(ctx, utils.GetSelfFuncName(), nil, "rpc req ", req.String(), "rpc resp ", resp.String()) - trace_log.ShowLog(ctx) - }() +func (s *groupServer) QuitGroup(ctx context.Context, req *pbGroup.QuitGroupReq) (*pbGroup.QuitGroupResp, error) { + resp := &pbGroup.QuitGroupResp{} + groupInfo, err := imdb.GetGroupInfoByGroupID(req.GroupID) if err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return + return nil, err } if groupInfo.GroupType != constant.SuperGroup { _, err = rocksCache.GetGroupMemberInfoFromCache(ctx, req.GroupID, req.OpUserID) if err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return + return nil, err } if err := s.DelGroupAndUserCache(ctx, req.GroupID, []string{req.OpUserID}); err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return + return nil, err } err = imdb.DeleteGroupMemberByGroupIDAndUserID(req.GroupID, req.OpUserID) if err != nil { - log.Error(req.OperationID, "JoinGroup rpc failed, group type: ", groupInfo.GroupType, "not support directly") - return + return nil, err } } else { okUserIDList := []string{req.OpUserID} if err := db.DB.RemoverUserFromSuperGroup(req.GroupID, okUserIDList); err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return + return nil, err } } @@ -1092,7 +848,7 @@ func (s *groupServer) QuitGroup(ctx context.Context, req *pbGroup.QuitGroupReq) _ = rocksCache.DelGroupMemberListHashFromCache(ctx, req.GroupID) chat.SuperGroupNotification(req.OperationID, req.OpUserID, req.OpUserID) } - return + return resp, nil } func hasAccess(req *pbGroup.SetGroupInfoReq) bool { @@ -1103,7 +859,6 @@ func hasAccess(req *pbGroup.SetGroupInfoReq) bool { if err != nil { log.NewError(req.OperationID, "GetGroupMemberInfoByGroupIDAndUserID failed, ", err.Error(), req.GroupInfoForSet.GroupID, req.OpUserID) return false - } if groupUserInfo.RoleLevel == constant.GroupOwner || groupUserInfo.RoleLevel == constant.GroupAdmin { return true @@ -1111,25 +866,18 @@ func hasAccess(req *pbGroup.SetGroupInfoReq) bool { return false } -func (s *groupServer) SetGroupInfo(ctx context.Context, req *pbGroup.SetGroupInfoReq) (resp *pbGroup.SetGroupInfoResp, err error) { - resp = &pbGroup.SetGroupInfoResp{CommonResp: &open_im_sdk.CommonResp{}} - ctx = trace_log.NewRpcCtx(ctx, utils.GetSelfFuncName(), req.OperationID) - defer func() { - trace_log.SetContextInfo(ctx, utils.GetSelfFuncName(), nil, "rpc req ", req.String(), "rpc resp ", resp.String()) - trace_log.ShowLog(ctx) - }() +func (s *groupServer) SetGroupInfo(ctx context.Context, req *pbGroup.SetGroupInfoReq) (*pbGroup.SetGroupInfoResp, error) { + resp := &pbGroup.SetGroupInfoResp{} + if !hasAccess(req) { - constant.SetErrorForResp(constant.ErrIdentity, resp.CommonResp) - return + return nil, utils.Wrap(constant.ErrIdentity, "") } group, err := imdb.GetGroupInfoByGroupID(req.GroupInfoForSet.GroupID) if err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return + return nil, err } if group.Status == constant.GroupStatusDismissed { - constant.SetErrorForResp(constant.ErrDismissedAlready, resp.CommonResp) - return + return nil, utils.Wrap(constant.ErrDismissedAlready, "") } var changedType int32 @@ -1198,20 +946,17 @@ func (s *groupServer) SetGroupInfo(ctx context.Context, req *pbGroup.SetGroupInf if req.GroupInfoForSet.Notification != "" { //get group member user id getGroupMemberIDListFromCacheReq := &pbCache.GetGroupMemberIDListFromCacheReq{OperationID: req.OperationID, GroupID: req.GroupInfoForSet.GroupID} - etcdConn, err := getcdv3.GetConn(ctx, config.Config.RpcRegisterName.OpenImCacheName) + etcdConn, err := utils.GetConn(ctx, config.Config.RpcRegisterName.OpenImCacheName) if err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return+ + return nil, err } client := pbCache.NewCacheClient(etcdConn) cacheResp, err := client.GetGroupMemberIDListFromCache(ctx, getGroupMemberIDListFromCacheReq) if err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return + return nil, err } if err = constant.CommonResp2Err(cacheResp.CommonResp); err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return + return nil, err } var conversationReq pbConversation.ModifyConversationFieldReq conversation := pbConversation.Conversation{ @@ -1225,34 +970,26 @@ func (s *groupServer) SetGroupInfo(ctx context.Context, req *pbGroup.SetGroupInf conversationReq.FieldType = constant.FieldGroupAtType conversation.GroupAtType = constant.GroupNotification conversationReq.UserIDList = cacheResp.UserIDList - nEtcdConn, err := getcdv3.GetConn(ctx, config.Config.RpcRegisterName.OpenImConversationName) + nEtcdConn, err := utils.GetConn(ctx, config.Config.RpcRegisterName.OpenImConversationName) if err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return + return nil, err } nClient := pbConversation.NewConversationClient(nEtcdConn) conversationReply, err := nClient.ModifyConversationField(context.Background(), &conversationReq) trace_log.SetContextInfo(ctx, "ModifyConversationField", err, "req", &conversationReq, "resp", conversationReply) } - return + return resp, nil } -func (s *groupServer) TransferGroupOwner(ctx context.Context, req *pbGroup.TransferGroupOwnerReq) (resp *pbGroup.TransferGroupOwnerResp, _ error) { - resp = &pbGroup.TransferGroupOwnerResp{CommonResp: &open_im_sdk.CommonResp{}} - ctx = trace_log.NewRpcCtx(ctx, utils.GetSelfFuncName(), req.OperationID) - defer func() { - trace_log.SetContextInfo(ctx, utils.GetSelfFuncName(), nil, "rpc req ", req.String(), "rpc resp ", resp.String()) - trace_log.ShowLog(ctx) - }() +func (s *groupServer) TransferGroupOwner(ctx context.Context, req *pbGroup.TransferGroupOwnerReq) (*pbGroup.TransferGroupOwnerResp, error) { + resp := &pbGroup.TransferGroupOwnerResp{} groupInfo, err := imdb.GetGroupInfoByGroupID(req.GroupID) if err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return + return nil, err } if groupInfo.Status == constant.GroupStatusDismissed { - constant.SetErrorForResp(constant.ErrDismissedAlready, resp.CommonResp) - return + return nil, utils.Wrap(constant.ErrDismissedAlready, "") } if req.OldOwnerUserID == req.NewOwnerUserID { @@ -1278,15 +1015,15 @@ func (s *groupServer) TransferGroupOwner(ctx context.Context, req *pbGroup.Trans return nil, err } chat.GroupOwnerTransferredNotification(req) - return + return resp, nil } -func (s *groupServer) GetGroups(ctx context.Context, req *pbGroup.GetGroupsReq) (resp *pbGroup.GetGroupsResp, err error) { - resp = &pbGroup.GetGroupsResp{ - CommonResp: &open_im_sdk.CommonResp{}, +func (s *groupServer) GetGroups(ctx context.Context, req *pbGroup.GetGroupsReq) (*pbGroup.GetGroupsResp, error) { + resp := &pbGroup.GetGroupsResp{ Groups: []*pbGroup.CMSGroup{}, Pagination: &open_im_sdk.ResponsePagination{CurrentPage: req.Pagination.PageNumber, ShowNumber: req.Pagination.ShowNumber}, } + if req.GroupID != "" { groupInfoDB, err := imdb.GetGroupInfoByGroupID(req.GroupID) if err != nil { @@ -1332,8 +1069,8 @@ func (s *groupServer) GetGroups(ctx context.Context, req *pbGroup.GetGroupsReq) return resp, nil } -func (s *groupServer) GetGroupMembersCMS(ctx context.Context, req *pbGroup.GetGroupMembersCMSReq) (resp *pbGroup.GetGroupMembersCMSResp, _ error) { - resp = &pbGroup.GetGroupMembersCMSResp{CommonResp: &open_im_sdk.CommonResp{}} +func (s *groupServer) GetGroupMembersCMS(ctx context.Context, req *pbGroup.GetGroupMembersCMSReq) (*pbGroup.GetGroupMembersCMSResp, error) { + resp := &pbGroup.GetGroupMembersCMSResp{} groupMembers, err := imdb.GetGroupMembersByGroupIdCMS(req.GroupID, req.UserName, req.Pagination.ShowNumber, req.Pagination.PageNumber) if err != nil { return nil, err @@ -1358,8 +1095,8 @@ func (s *groupServer) GetGroupMembersCMS(ctx context.Context, req *pbGroup.GetGr return resp, nil } -func (s *groupServer) GetUserReqApplicationList(ctx context.Context, req *pbGroup.GetUserReqApplicationListReq) (resp *pbGroup.GetUserReqApplicationListResp, _ error) { - resp = &pbGroup.GetUserReqApplicationListResp{CommonResp: &open_im_sdk.CommonResp{}} +func (s *groupServer) GetUserReqApplicationList(ctx context.Context, req *pbGroup.GetUserReqApplicationListReq) (*pbGroup.GetUserReqApplicationListResp, error) { + resp := &pbGroup.GetUserReqApplicationListResp{} groupRequests, err := imdb.GetUserReqGroupByUserID(req.UserID) if err != nil { return nil, err @@ -1384,10 +1121,11 @@ func (s *groupServer) GetUserReqApplicationList(ctx context.Context, req *pbGrou return resp, nil } -func (s *groupServer) DismissGroup(ctx context.Context, req *pbGroup.DismissGroupReq) (resp *pbGroup.DismissGroupResp, err error) { - resp = &pbGroup.DismissGroupResp{CommonResp: &open_im_sdk.CommonResp{}} +func (s *groupServer) DismissGroup(ctx context.Context, req *pbGroup.DismissGroupReq) (*pbGroup.DismissGroupResp, error) { + resp := &pbGroup.DismissGroupResp{} + if !token_verify.IsManagerUserID(req.OpUserID) && !imdb.IsGroupOwnerAdmin(req.GroupID, req.OpUserID) { - return nil, utils.Wrap(constant.ErrNoPermission, "") + return nil, utils.Wrap(constant.ErrIdentity, "") } if err := rocksCache.DelGroupInfoFromCache(ctx, req.GroupID); err != nil { @@ -1397,7 +1135,7 @@ func (s *groupServer) DismissGroup(ctx context.Context, req *pbGroup.DismissGrou return nil, err } - err = imdb.OperateGroupStatus(req.GroupID, constant.GroupStatusDismissed) + err := imdb.OperateGroupStatus(req.GroupID, constant.GroupStatusDismissed) if err != nil { return nil, err } @@ -1421,7 +1159,7 @@ func (s *groupServer) DismissGroup(ctx context.Context, req *pbGroup.DismissGrou c.GroupID = req.GroupID c.IsNotInGroup = true reqPb.Conversation = &c - etcdConn, err := getcdv3.GetConn(ctx, config.Config.RpcRegisterName.OpenImUserName) + etcdConn, err := utils.GetConn(ctx, config.Config.RpcRegisterName.OpenImUserName) client := pbUser.NewUserClient(etcdConn) respPb, err := client.SetConversation(context.Background(), &reqPb) trace_log.SetContextInfo(ctx, "SetConversation", err, "req", &reqPb, "resp", respPb) @@ -1440,19 +1178,15 @@ func (s *groupServer) DismissGroup(ctx context.Context, req *pbGroup.DismissGrou return resp, nil } -func (s *groupServer) MuteGroupMember(ctx context.Context, req *pbGroup.MuteGroupMemberReq) (resp *pbGroup.MuteGroupMemberResp, err error) { - resp = &pbGroup.MuteGroupMemberResp{CommonResp: &open_im_sdk.CommonResp{}} - ctx = trace_log.NewRpcCtx(ctx, utils.GetSelfFuncName(), req.OperationID) - defer func() { - trace_log.SetContextInfo(ctx, utils.GetSelfFuncName(), nil, "rpc req ", req.String(), "rpc resp ", resp.String()) - trace_log.ShowLog(ctx) - }() +func (s *groupServer) MuteGroupMember(ctx context.Context, req *pbGroup.MuteGroupMemberReq) (*pbGroup.MuteGroupMemberResp, error) { + resp := &pbGroup.MuteGroupMemberResp{} + opFlag, err := s.getGroupUserLevel(req.GroupID, req.OpUserID) if err != nil { - return nil, err + return nil, err } if opFlag == 0 { - return nil, utils.Wrap(constant.ErrNoPermission, "") + return nil, err } mutedInfo, err := rocksCache.GetGroupMemberInfoFromCache(ctx, req.GroupID, req.UserID) @@ -1460,7 +1194,6 @@ func (s *groupServer) MuteGroupMember(ctx context.Context, req *pbGroup.MuteGrou return nil, err } if mutedInfo.RoleLevel == constant.GroupOwner && opFlag != 1 { - constant.SetErrorForResp(err, resp.CommonResp) return nil, err } if mutedInfo.RoleLevel == constant.GroupAdmin && opFlag == 3 { @@ -1480,15 +1213,17 @@ func (s *groupServer) MuteGroupMember(ctx context.Context, req *pbGroup.MuteGrou return resp, nil } -func (s *groupServer) CancelMuteGroupMember(ctx context.Context, req *pbGroup.CancelMuteGroupMemberReq) (resp *pbGroup.CancelMuteGroupMemberResp, err error) { - resp = &pbGroup.CancelMuteGroupMemberResp{CommonResp: &open_im_sdk.CommonResp{}} +func (s *groupServer) CancelMuteGroupMember(ctx context.Context, req *pbGroup.CancelMuteGroupMemberReq) (*pbGroup.CancelMuteGroupMemberResp, error) { + resp := &pbGroup.CancelMuteGroupMemberResp{} + opFlag, err := s.getGroupUserLevel(req.GroupID, req.OpUserID) if err != nil { - return nil, err + return nil, err } if opFlag == 0 { - return nil, utils.Wrap(constant.ErrNoPermission, "") + return nil, err } + mutedInfo, err := imdb.GetGroupMemberInfoByGroupIDAndUserID(req.GroupID, req.UserID) if err != nil { return nil, err @@ -1513,14 +1248,17 @@ func (s *groupServer) CancelMuteGroupMember(ctx context.Context, req *pbGroup.Ca return resp, nil } -func (s *groupServer) MuteGroup(ctx context.Context, req *pbGroup.MuteGroupReq) (resp *pbGroup.MuteGroupResp, err error) { - resp = &pbGroup.MuteGroupResp{CommonResp: &open_im_sdk.CommonResp{}} +func (s *groupServer) MuteGroup(ctx context.Context, req *pbGroup.MuteGroupReq) (*pbGroup.MuteGroupResp, error) { + resp := &pbGroup.MuteGroupResp{} + opFlag, err := s.getGroupUserLevel(req.GroupID, req.OpUserID) if err != nil { - constant.SetErrorForResp(err, resp.CommonResp) - return + return nil, err } if opFlag == 0 { + //errMsg := req.OperationID + "opFlag == 0 " + req.GroupID + req.OpUserID + //log.Error(req.OperationID, errMsg) + //return &pbGroup.MuteGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrAccess.ErrCode, ErrMsg: errMsg}}, nil return nil, utils.Wrap(constant.ErrNoPermission, "") } @@ -1550,15 +1288,14 @@ func (s *groupServer) MuteGroup(ctx context.Context, req *pbGroup.MuteGroupReq) return resp, nil } -func (s *groupServer) CancelMuteGroup(ctx context.Context, req *pbGroup.CancelMuteGroupReq) (resp *pbGroup.CancelMuteGroupResp, err error) { - resp = &pbGroup.CancelMuteGroupResp{CommonResp: &open_im_sdk.CommonResp{}} +func (s *groupServer) CancelMuteGroup(ctx context.Context, req *pbGroup.CancelMuteGroupReq) (*pbGroup.CancelMuteGroupResp, error) { + resp := &pbGroup.CancelMuteGroupResp{} + opFlag, err := s.getGroupUserLevel(req.GroupID, req.OpUserID) if err != nil { - constant.SetErrorForResp(err, resp.CommonResp) return nil, err } if opFlag == 0 { - constant.SetErrorForResp(err, resp.CommonResp) return nil, err } //mutedInfo, err := imdb.GetGroupMemberInfoByGroupIDAndUserID(req.GroupID, req.) @@ -1586,10 +1323,11 @@ func (s *groupServer) CancelMuteGroup(ctx context.Context, req *pbGroup.CancelMu return resp, nil } -func (s *groupServer) SetGroupMemberNickname(ctx context.Context, req *pbGroup.SetGroupMemberNicknameReq) (resp *pbGroup.SetGroupMemberNicknameResp, err error) { - resp = &pbGroup.SetGroupMemberNicknameResp{CommonResp: &open_im_sdk.CommonResp{}} +func (s *groupServer) SetGroupMemberNickname(ctx context.Context, req *pbGroup.SetGroupMemberNicknameReq) (*pbGroup.SetGroupMemberNicknameResp, error) { + resp := &pbGroup.SetGroupMemberNicknameResp{} + if req.OpUserID != req.UserID && !token_verify.IsManagerUserID(req.OpUserID) { - return nil, utils.Wrap(constant.ErrNoPermission, "") + return nil, utils.Wrap(constant.ErrIdentity, "") } cbReq := &pbGroup.SetGroupMemberInfoReq{ GroupID: req.GroupID, @@ -1626,8 +1364,9 @@ func (s *groupServer) SetGroupMemberNickname(ctx context.Context, req *pbGroup.S return resp, nil } -func (s *groupServer) SetGroupMemberInfo(ctx context.Context, req *pbGroup.SetGroupMemberInfoReq) (resp *pbGroup.SetGroupMemberInfoResp, err error) { - resp = &pbGroup.SetGroupMemberInfoResp{CommonResp: &open_im_sdk.CommonResp{}} +func (s *groupServer) SetGroupMemberInfo(ctx context.Context, req *pbGroup.SetGroupMemberInfoReq) (*pbGroup.SetGroupMemberInfoResp, error) { + resp := &pbGroup.SetGroupMemberInfoResp{} + if err := rocksCache.DelGroupMemberInfoFromCache(ctx, req.GroupID, req.UserID); err != nil { return nil, err } @@ -1671,8 +1410,9 @@ func (s *groupServer) SetGroupMemberInfo(ctx context.Context, req *pbGroup.SetGr return resp, nil } -func (s *groupServer) GetGroupAbstractInfo(ctx context.Context, req *pbGroup.GetGroupAbstractInfoReq) (resp *pbGroup.GetGroupAbstractInfoResp, err error) { - resp = &pbGroup.GetGroupAbstractInfoResp{CommonResp: &open_im_sdk.CommonResp{}} +func (s *groupServer) GetGroupAbstractInfo(ctx context.Context, req *pbGroup.GetGroupAbstractInfoReq) (*pbGroup.GetGroupAbstractInfoResp, error) { + resp := &pbGroup.GetGroupAbstractInfoResp{} + hashCode, err := rocksCache.GetGroupMemberListHashFromCache(ctx, req.GroupID) if err != nil { return nil, err @@ -1689,7 +1429,7 @@ func (s *groupServer) GetGroupAbstractInfo(ctx context.Context, req *pbGroup.Get func (s *groupServer) DelGroupAndUserCache(ctx context.Context, groupID string, userIDList []string) error { operationID := trace_log.GetOperationID(ctx) if groupID != "" { - etcdConn, err := getcdv3.GetConn(ctx, config.Config.RpcRegisterName.OpenImCacheName) + etcdConn, err := utils.GetConn(ctx, config.Config.RpcRegisterName.OpenImCacheName) if err != nil { return err } diff --git a/pkg/common/middleware/gin.go b/pkg/common/middleware/gin.go new file mode 100644 index 000000000..8155086cd --- /dev/null +++ b/pkg/common/middleware/gin.go @@ -0,0 +1,43 @@ +package middleware + +import ( + "bytes" + "encoding/json" + "github.com/gin-gonic/gin" + "io/ioutil" + "net/http" +) + +func GinParseOperationID(c *gin.Context) { + if c.Request.Method == http.MethodPost { + operationID := c.Request.Header.Get("operationID") + if operationID == "" { + body, err := ioutil.ReadAll(c.Request.Body) + if err != nil { + c.String(400, "read request body error: "+err.Error()) + c.Abort() + return + } + req := struct { + OperationID string `json:"operationID"` + }{} + if err := json.Unmarshal(body, &req); err != nil { + c.String(400, "get operationID error: "+err.Error()) + c.Abort() + return + } + if req.OperationID == "" { + c.String(400, "operationID empty") + c.Abort() + return + } + c.Request.Body = ioutil.NopCloser(bytes.NewReader(body)) + operationID = req.OperationID + c.Request.Header.Set("operationID", operationID) + } + c.Set("operationID", operationID) + c.Next() + return + } + c.Next() +} diff --git a/pkg/common/middleware/rpc.go b/pkg/common/middleware/rpc.go new file mode 100644 index 000000000..75b94d64e --- /dev/null +++ b/pkg/common/middleware/rpc.go @@ -0,0 +1,78 @@ +package middleware + +import ( + "Open_IM/pkg/common/constant" + "Open_IM/pkg/common/log" + "Open_IM/pkg/common/trace_log" + "Open_IM/pkg/utils" + "context" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/wrapperspb" + "path" + "runtime/debug" +) + +func RpcServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + defer func() { + if r := recover(); r != nil { + log.NewError("", info.FullMethod, "panic", r, "stack", string(debug.Stack())) + } + }() + funcName := path.Base(info.FullMethod) + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return nil, status.New(codes.InvalidArgument, "missing metadata").Err() + } + var operationID string + if opts := md.Get("operationID"); len(opts) != 1 || opts[0] == "" { + return nil, status.New(codes.InvalidArgument, "operationID error").Err() + } else { + operationID = opts[0] + } + var opUserID string + if opts := md.Get("opUserID"); len(opts) != 1 { + return nil, status.New(codes.InvalidArgument, "opUserID error").Err() + } else { + opUserID = opts[0] + } + ctx = trace_log.NewRpcCtx(ctx, funcName, operationID) + defer trace_log.ShowLog(ctx) + trace_log.SetContextInfo(ctx, funcName, err, "opUserID", opUserID, "rpcReq", req.(interface{ String() string }).String()) + resp, err = handler(ctx, req) + if err != nil { + trace_log.SetContextInfo(ctx, funcName, err) + errInfo := constant.ToAPIErrWithErr(err) + var code codes.Code + if errInfo.ErrCode == 0 { + code = codes.Unknown + } else { + code = codes.Code(errInfo.ErrCode) + } + sta, err := status.New(code, errInfo.ErrMsg).WithDetails(wrapperspb.String(errInfo.DetailErrMsg)) + if err != nil { + return nil, err + } + return nil, sta.Err() + } + trace_log.SetContextInfo(ctx, funcName, nil, "rpcResp", resp.(interface{ String() string }).String()) + return +} + +func RpcClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) (err error) { + //if cc == nil { + // return utils.Wrap(constant.ErrRpcConn, "") + //} + operationID, ok := ctx.Value("operationID").(string) + if !ok { + return utils.Wrap(constant.ErrArgs, "ctx missing operationID") + } + opUserID, ok := ctx.Value("opUserID").(string) + if !ok { + return utils.Wrap(constant.ErrArgs, "ctx missing opUserID") + } + md := metadata.Pairs("operationID", operationID, "opUserID", opUserID) + return invoker(metadata.NewOutgoingContext(ctx, md), method, req, reply, cc, opts...) +}