From 1fc7c4434c714550358da8badd12f2395fc30fbd Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Wed, 11 Jan 2023 14:29:37 +0800 Subject: [PATCH] group --- cmd/open_im_api/main.go | 2 + internal/rpc/fault_tolerant/conn.go | 3 +- internal/rpc/group/group.go | 49 ++---------------- pkg/common/middleware/gin.go | 42 ++++++++++++++++ pkg/common/middleware/rpc.go | 78 +++++++++++++++++++++++++++++ 5 files changed, 128 insertions(+), 46 deletions(-) create mode 100644 pkg/common/middleware/gin.go create mode 100644 pkg/common/middleware/rpc.go diff --git a/cmd/open_im_api/main.go b/cmd/open_im_api/main.go index 1dc3ab6fb..97311f1cc 100644 --- a/cmd/open_im_api/main.go +++ b/cmd/open_im_api/main.go @@ -15,6 +15,7 @@ import ( "Open_IM/internal/api/user" "Open_IM/pkg/common/config" "Open_IM/pkg/common/log" + "Open_IM/pkg/common/middleware" "Open_IM/pkg/utils" "flag" "fmt" @@ -49,6 +50,7 @@ func main() { r := gin.New() r.Use(gin.Recovery()) r.Use(utils.CorsHandler()) + r.Use(middleware.GinParseOperationID) log.Info("load config: ", config.Config) r.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler)) if config.Config.Prometheus.Enable { diff --git a/internal/rpc/fault_tolerant/conn.go b/internal/rpc/fault_tolerant/conn.go index 0d911346c..5ba212400 100644 --- a/internal/rpc/fault_tolerant/conn.go +++ b/internal/rpc/fault_tolerant/conn.go @@ -3,6 +3,7 @@ package fault_tolerant import ( "Open_IM/pkg/common/config" "Open_IM/pkg/common/log" + "Open_IM/pkg/common/middleware" "Open_IM/pkg/utils" "github.com/OpenIMSDK/getcdv3" "google.golang.org/grpc" @@ -75,7 +76,7 @@ func GetConfigConn(serviceName string, operationID string) *grpc.ClientConn { } target := rpcRegisterIP + ":" + utils.Int32ToString(int32(configPortList[0])) log.Info(operationID, "rpcRegisterIP ", rpcRegisterIP, " port ", configPortList, " grpc target: ", target, " serviceName: ", serviceName) - conn, err := grpc.Dial(target, grpc.WithInsecure()) + conn, err := grpc.Dial(target, grpc.WithInsecure(), grpc.WithUnaryInterceptor(middleware.RpcClientInterceptor)) if err != nil { log.Error(operationID, "grpc.Dail failed ", err.Error()) return nil diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 8f025888b..c5c19c198 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -9,25 +9,20 @@ import ( imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model" rocksCache "Open_IM/pkg/common/db/rocks_cache" "Open_IM/pkg/common/log" + "Open_IM/pkg/common/middleware" promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/common/token_verify" "Open_IM/pkg/common/trace_log" cp "Open_IM/pkg/common/utils" - open_im_sdk "Open_IM/pkg/proto/sdk_ws" - "github.com/OpenIMSDK/getcdv3" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/metadata" - "google.golang.org/grpc/status" - "path" - "runtime/debug" - pbCache "Open_IM/pkg/proto/cache" pbConversation "Open_IM/pkg/proto/conversation" pbGroup "Open_IM/pkg/proto/group" + open_im_sdk "Open_IM/pkg/proto/sdk_ws" pbUser "Open_IM/pkg/proto/user" "Open_IM/pkg/utils" "context" "errors" + "github.com/OpenIMSDK/getcdv3" "math/big" "net" "strconv" @@ -58,42 +53,6 @@ func NewGroupServer(port int) *groupServer { } } -func UnaryServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { - defer func() { - if r := recover(); r != nil { - log.NewError("", info.FullMethod, "panic", r, "stack", string(debug.Stack())) - } - }() - funcName := path.Base(info.FullMethod) - md, ok := metadata.FromIncomingContext(ctx) - if !ok { - return nil, errors.New("not metadata") - } - operationID := md.Get("operationID")[0] - opUserID := md.Get("opUserID")[0] - ctx = trace_log.NewRpcCtx(ctx, funcName, operationID) - defer trace_log.ShowLog(ctx) - _ = opUserID - trace_log.SetContextInfo(ctx, funcName, err, "rpc req", req.(interface{ String() string }).String()) - resp, err = handler(ctx, req) - if err != nil { - errInfo := constant.ToAPIErrWithErr(err) - var code codes.Code - if errInfo.ErrCode == 0 { - code = codes.Unknown - } else { - code = codes.Code(errInfo.ErrCode) - } - sta, err := status.New(code, errInfo.ErrMsg).WithDetails(wrapperspb.String(errInfo.DetailErrMsg)) - if err != nil { - return nil, err - } - return nil, sta.Err() - } - trace_log.SetContextInfo(ctx, funcName, err, "rpc resp", resp.(interface{ String() string }).String()) - return -} - func (s *groupServer) Run() { log.NewInfo("", "group rpc start ") listenIP := "" @@ -116,7 +75,7 @@ func (s *groupServer) Run() { var grpcOpts = []grpc.ServerOption{ grpc.MaxRecvMsgSize(recvSize), grpc.MaxSendMsgSize(sendSize), - grpc.UnaryInterceptor(UnaryServerInterceptor), + grpc.UnaryInterceptor(middleware.RpcServerInterceptor), } if config.Config.Prometheus.Enable { promePkg.NewGrpcRequestCounter() diff --git a/pkg/common/middleware/gin.go b/pkg/common/middleware/gin.go new file mode 100644 index 000000000..c25b0c3fc --- /dev/null +++ b/pkg/common/middleware/gin.go @@ -0,0 +1,42 @@ +package middleware + +import ( + "bytes" + "encoding/json" + "github.com/gin-gonic/gin" + "io/ioutil" + "net/http" +) + +func GinParseOperationID(c *gin.Context) { + if c.Request.Method == http.MethodPost { + operationID := c.Request.Header.Get("operationID") + if operationID == "" { + body, err := ioutil.ReadAll(c.Request.Body) + if err != nil { + c.String(400, "read request body error: "+err.Error()) + c.Abort() + return + } + req := struct { + OperationID string `json:"operationID"` + }{} + if err := json.Unmarshal(body, &req); err != nil { + c.String(400, "get operationID error: "+err.Error()) + c.Abort() + return + } + if req.OperationID == "" { + c.String(400, "operationID empty") + c.Abort() + return + } + c.Request.Body = ioutil.NopCloser(bytes.NewReader(body)) + operationID = req.OperationID + } + c.Set("operationID", operationID) + c.Next() + return + } + c.Next() +} diff --git a/pkg/common/middleware/rpc.go b/pkg/common/middleware/rpc.go new file mode 100644 index 000000000..c1282bbb4 --- /dev/null +++ b/pkg/common/middleware/rpc.go @@ -0,0 +1,78 @@ +package middleware + +import ( + "Open_IM/pkg/common/constant" + "Open_IM/pkg/common/log" + "Open_IM/pkg/common/trace_log" + "Open_IM/pkg/utils" + "context" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/wrapperspb" + "path" + "runtime/debug" +) + +func RpcServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + defer func() { + if r := recover(); r != nil { + log.NewError("", info.FullMethod, "panic", r, "stack", string(debug.Stack())) + } + }() + funcName := path.Base(info.FullMethod) + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return nil, status.New(codes.InvalidArgument, "missing metadata").Err() + } + var operationID string + if opts := md.Get("operationID"); len(opts) != 1 || opts[0] == "" { + return nil, status.New(codes.InvalidArgument, "operationID error").Err() + } else { + operationID = opts[0] + } + var opUserID string + if opts := md.Get("opUserID"); len(opts) != 1 { + return nil, status.New(codes.InvalidArgument, "opUserID error").Err() + } else { + opUserID = opts[0] + } + ctx = trace_log.NewRpcCtx(ctx, funcName, operationID) + defer trace_log.ShowLog(ctx) + trace_log.SetContextInfo(ctx, funcName, err, "opUserID", opUserID, "rpcReq", req.(interface{ String() string }).String()) + resp, err = handler(ctx, req) + if err != nil { + trace_log.SetContextInfo(ctx, funcName, err) + errInfo := constant.ToAPIErrWithErr(err) + var code codes.Code + if errInfo.ErrCode == 0 { + code = codes.Unknown + } else { + code = codes.Code(errInfo.ErrCode) + } + sta, err := status.New(code, errInfo.ErrMsg).WithDetails(wrapperspb.String(errInfo.DetailErrMsg)) + if err != nil { + return nil, err + } + return nil, sta.Err() + } + trace_log.SetContextInfo(ctx, funcName, nil, "rpcResp", resp.(interface{ String() string }).String()) + return +} + +func RpcClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) (err error) { + if cc == nil { + return utils.Wrap(constant.ErrRpcConn, "") + } + operationID, ok := ctx.Value("operationID").(string) + if !ok { + return utils.Wrap(constant.ErrArgs, "ctx missing operationID") + } + opUserID, ok := ctx.Value("opUserID").(string) + if !ok { + return utils.Wrap(constant.ErrArgs, "ctx missing opUserID") + } + md := metadata.Pairs("operationID", operationID, "opUserID", opUserID) + return invoker(metadata.NewOutgoingContext(ctx, md), method, req, reply, cc, opts...) +}