From a5380a971b2ba56440e1c3cd40da386252e75953 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Thu, 23 Mar 2023 15:42:31 +0800 Subject: [PATCH] ws update --- cmd/Open-IM-SDK-Core | 2 +- internal/msggateway/client.go | 8 +-- pkg/common/mw/rpc_client_interceptor.go | 76 ++++++++++++++----------- pkg/common/mw/rpc_server_interceptor.go | 3 + 4 files changed, 51 insertions(+), 38 deletions(-) diff --git a/cmd/Open-IM-SDK-Core b/cmd/Open-IM-SDK-Core index 9fd1af471..94da8cc10 160000 --- a/cmd/Open-IM-SDK-Core +++ b/cmd/Open-IM-SDK-Core @@ -1 +1 @@ -Subproject commit 9fd1af471356fa122fb87587fb9501e9292fb416 +Subproject commit 94da8cc1074e9b6d14a94a41bf37885b27253a2d diff --git a/internal/msggateway/client.go b/internal/msggateway/client.go index 6e84ac25b..fa0729eb1 100644 --- a/internal/msggateway/client.go +++ b/internal/msggateway/client.go @@ -4,7 +4,9 @@ import ( "context" "errors" "fmt" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" "runtime/debug" @@ -119,11 +121,7 @@ func (c *Client) handleMessage(message []byte) error { if binaryReq.SendID != c.userID { return errors.New("exception conn userID not same to req userID") } - ctx := context.Background() - ctx = context.WithValue(ctx, ConnID, c.ctx.GetConnID()) - ctx = context.WithValue(ctx, OperationID, binaryReq.OperationID) - ctx = context.WithValue(ctx, CommonUserID, binaryReq.SendID) - ctx = context.WithValue(ctx, PlatformID, c.platformID) + ctx := mcontext.WithMustInfoCtx([]string{binaryReq.OperationID, binaryReq.SendID, constant.PlatformIDToName(c.platformID), c.ctx.GetConnID()}) var messageErr error var resp []byte switch binaryReq.ReqIdentifier { diff --git a/pkg/common/mw/rpc_client_interceptor.go b/pkg/common/mw/rpc_client_interceptor.go index eda323836..9c8757e6c 100644 --- a/pkg/common/mw/rpc_client_interceptor.go +++ b/pkg/common/mw/rpc_client_interceptor.go @@ -23,39 +23,11 @@ func rpcClientInterceptor(ctx context.Context, method string, req, resp interfac return errs.ErrInternalServer.Wrap("call rpc request context is nil") } log.ZInfo(ctx, "rpc client req", "funcName", method, "req", rpcString(req)) - md := metadata.Pairs() - if keys, _ := ctx.Value(constant.RpcCustomHeader).([]string); len(keys) > 0 { - for _, key := range keys { - val, ok := ctx.Value(key).([]string) - if !ok { - return errs.ErrInternalServer.Wrap(fmt.Sprintf("ctx missing key %s", key)) - } - if len(val) == 0 { - return errs.ErrInternalServer.Wrap(fmt.Sprintf("ctx key %s value is empty", key)) - } - md.Set(key, val...) - } - md.Set(constant.RpcCustomHeader, keys...) + ctx, err = getRpcContext(ctx, method) + if err != nil { + return err } - operationID, ok := ctx.Value(constant.OperationID).(string) - if !ok { - log.ZWarn(ctx, "ctx missing operationID", errors.New("ctx missing operationID"), "funcName", method) - return errs.ErrArgs.Wrap("ctx missing operationID") - } - md.Set(constant.OperationID, operationID) - args := make([]string, 0, 4) - args = append(args, constant.OperationID, operationID) - opUserID, ok := ctx.Value(constant.OpUserID).(string) - if ok { - md.Set(constant.OpUserID, opUserID) - args = append(args, constant.OpUserID, opUserID) - } - opUserIDPlatformID, ok := ctx.Value(constant.OpUserPlatform).(string) - if ok { - md.Set(constant.OpUserPlatform, opUserIDPlatformID) - } - md.Set(constant.CheckKey, genReqKey(args)) - err = invoker(metadata.NewOutgoingContext(ctx, md), method, req, resp, cc, opts...) + err = invoker(ctx, method, req, resp, cc, opts...) if err == nil { log.ZInfo(ctx, "rpc client resp", "funcName", method, "resp", rpcString(resp)) return nil @@ -78,3 +50,43 @@ func rpcClientInterceptor(ctx context.Context, method string, req, resp interfac } return errs.NewCodeError(int(sta.Code()), sta.Message()).Wrap() } + +func getRpcContext(ctx context.Context, method string) (context.Context, error) { + md := metadata.Pairs() + if keys, _ := ctx.Value(constant.RpcCustomHeader).([]string); len(keys) > 0 { + for _, key := range keys { + val, ok := ctx.Value(key).([]string) + if !ok { + return nil, errs.ErrInternalServer.Wrap(fmt.Sprintf("ctx missing key %s", key)) + } + if len(val) == 0 { + return nil, errs.ErrInternalServer.Wrap(fmt.Sprintf("ctx key %s value is empty", key)) + } + md.Set(key, val...) + } + md.Set(constant.RpcCustomHeader, keys...) + } + operationID, ok := ctx.Value(constant.OperationID).(string) + if !ok { + log.ZWarn(ctx, "ctx missing operationID", errors.New("ctx missing operationID"), "funcName", method) + return nil, errs.ErrArgs.Wrap("ctx missing operationID") + } + md.Set(constant.OperationID, operationID) + var checkArgs []string + checkArgs = append(checkArgs, constant.OperationID, operationID) + opUserID, ok := ctx.Value(constant.OpUserID).(string) + if ok { + md.Set(constant.OpUserID, opUserID) + checkArgs = append(checkArgs, constant.OpUserID, opUserID) + } + opUserIDPlatformID, ok := ctx.Value(constant.OpUserPlatform).(string) + if ok { + md.Set(constant.OpUserPlatform, opUserIDPlatformID) + } + connID, ok := ctx.Value(constant.ConnID).(string) + if ok { + md.Set(constant.ConnID, connID) + } + md.Set(constant.CheckKey, genReqKey(checkArgs)) + return metadata.NewOutgoingContext(ctx, md), nil +} diff --git a/pkg/common/mw/rpc_server_interceptor.go b/pkg/common/mw/rpc_server_interceptor.go index f47a4321b..e923f1319 100644 --- a/pkg/common/mw/rpc_server_interceptor.go +++ b/pkg/common/mw/rpc_server_interceptor.go @@ -79,6 +79,9 @@ func rpcServerInterceptor(ctx context.Context, req interface{}, info *grpc.Unary if opts := md.Get(constant.OpUserPlatform); len(opts) == 1 { ctx = context.WithValue(ctx, constant.OpUserPlatform, opts[0]) } + if opts := md.Get(constant.ConnID); len(opts) == 1 { + ctx = context.WithValue(ctx, constant.ConnID, opts[0]) + } if opts := md.Get(constant.CheckKey); len(opts) != 1 || opts[0] == "" { return nil, status.New(codes.InvalidArgument, "check key empty").Err() } else {