ws update

This commit is contained in:
Gordon 2023-03-23 15:42:31 +08:00
parent 2ba38d30f9
commit a5380a971b
4 changed files with 51 additions and 38 deletions

@ -1 +1 @@
Subproject commit 9fd1af471356fa122fb87587fb9501e9292fb416 Subproject commit 94da8cc1074e9b6d14a94a41bf37885b27253a2d

View File

@ -4,7 +4,9 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"runtime/debug" "runtime/debug"
@ -119,11 +121,7 @@ func (c *Client) handleMessage(message []byte) error {
if binaryReq.SendID != c.userID { if binaryReq.SendID != c.userID {
return errors.New("exception conn userID not same to req userID") return errors.New("exception conn userID not same to req userID")
} }
ctx := context.Background() ctx := mcontext.WithMustInfoCtx([]string{binaryReq.OperationID, binaryReq.SendID, constant.PlatformIDToName(c.platformID), c.ctx.GetConnID()})
ctx = context.WithValue(ctx, ConnID, c.ctx.GetConnID())
ctx = context.WithValue(ctx, OperationID, binaryReq.OperationID)
ctx = context.WithValue(ctx, CommonUserID, binaryReq.SendID)
ctx = context.WithValue(ctx, PlatformID, c.platformID)
var messageErr error var messageErr error
var resp []byte var resp []byte
switch binaryReq.ReqIdentifier { switch binaryReq.ReqIdentifier {

View File

@ -23,39 +23,11 @@ func rpcClientInterceptor(ctx context.Context, method string, req, resp interfac
return errs.ErrInternalServer.Wrap("call rpc request context is nil") return errs.ErrInternalServer.Wrap("call rpc request context is nil")
} }
log.ZInfo(ctx, "rpc client req", "funcName", method, "req", rpcString(req)) log.ZInfo(ctx, "rpc client req", "funcName", method, "req", rpcString(req))
md := metadata.Pairs() ctx, err = getRpcContext(ctx, method)
if keys, _ := ctx.Value(constant.RpcCustomHeader).([]string); len(keys) > 0 { if err != nil {
for _, key := range keys { return err
val, ok := ctx.Value(key).([]string)
if !ok {
return errs.ErrInternalServer.Wrap(fmt.Sprintf("ctx missing key %s", key))
}
if len(val) == 0 {
return errs.ErrInternalServer.Wrap(fmt.Sprintf("ctx key %s value is empty", key))
}
md.Set(key, val...)
}
md.Set(constant.RpcCustomHeader, keys...)
} }
operationID, ok := ctx.Value(constant.OperationID).(string) err = invoker(ctx, method, req, resp, cc, opts...)
if !ok {
log.ZWarn(ctx, "ctx missing operationID", errors.New("ctx missing operationID"), "funcName", method)
return errs.ErrArgs.Wrap("ctx missing operationID")
}
md.Set(constant.OperationID, operationID)
args := make([]string, 0, 4)
args = append(args, constant.OperationID, operationID)
opUserID, ok := ctx.Value(constant.OpUserID).(string)
if ok {
md.Set(constant.OpUserID, opUserID)
args = append(args, constant.OpUserID, opUserID)
}
opUserIDPlatformID, ok := ctx.Value(constant.OpUserPlatform).(string)
if ok {
md.Set(constant.OpUserPlatform, opUserIDPlatformID)
}
md.Set(constant.CheckKey, genReqKey(args))
err = invoker(metadata.NewOutgoingContext(ctx, md), method, req, resp, cc, opts...)
if err == nil { if err == nil {
log.ZInfo(ctx, "rpc client resp", "funcName", method, "resp", rpcString(resp)) log.ZInfo(ctx, "rpc client resp", "funcName", method, "resp", rpcString(resp))
return nil return nil
@ -78,3 +50,43 @@ func rpcClientInterceptor(ctx context.Context, method string, req, resp interfac
} }
return errs.NewCodeError(int(sta.Code()), sta.Message()).Wrap() return errs.NewCodeError(int(sta.Code()), sta.Message()).Wrap()
} }
func getRpcContext(ctx context.Context, method string) (context.Context, error) {
md := metadata.Pairs()
if keys, _ := ctx.Value(constant.RpcCustomHeader).([]string); len(keys) > 0 {
for _, key := range keys {
val, ok := ctx.Value(key).([]string)
if !ok {
return nil, errs.ErrInternalServer.Wrap(fmt.Sprintf("ctx missing key %s", key))
}
if len(val) == 0 {
return nil, errs.ErrInternalServer.Wrap(fmt.Sprintf("ctx key %s value is empty", key))
}
md.Set(key, val...)
}
md.Set(constant.RpcCustomHeader, keys...)
}
operationID, ok := ctx.Value(constant.OperationID).(string)
if !ok {
log.ZWarn(ctx, "ctx missing operationID", errors.New("ctx missing operationID"), "funcName", method)
return nil, errs.ErrArgs.Wrap("ctx missing operationID")
}
md.Set(constant.OperationID, operationID)
var checkArgs []string
checkArgs = append(checkArgs, constant.OperationID, operationID)
opUserID, ok := ctx.Value(constant.OpUserID).(string)
if ok {
md.Set(constant.OpUserID, opUserID)
checkArgs = append(checkArgs, constant.OpUserID, opUserID)
}
opUserIDPlatformID, ok := ctx.Value(constant.OpUserPlatform).(string)
if ok {
md.Set(constant.OpUserPlatform, opUserIDPlatformID)
}
connID, ok := ctx.Value(constant.ConnID).(string)
if ok {
md.Set(constant.ConnID, connID)
}
md.Set(constant.CheckKey, genReqKey(checkArgs))
return metadata.NewOutgoingContext(ctx, md), nil
}

View File

@ -79,6 +79,9 @@ func rpcServerInterceptor(ctx context.Context, req interface{}, info *grpc.Unary
if opts := md.Get(constant.OpUserPlatform); len(opts) == 1 { if opts := md.Get(constant.OpUserPlatform); len(opts) == 1 {
ctx = context.WithValue(ctx, constant.OpUserPlatform, opts[0]) ctx = context.WithValue(ctx, constant.OpUserPlatform, opts[0])
} }
if opts := md.Get(constant.ConnID); len(opts) == 1 {
ctx = context.WithValue(ctx, constant.ConnID, opts[0])
}
if opts := md.Get(constant.CheckKey); len(opts) != 1 || opts[0] == "" { if opts := md.Get(constant.CheckKey); len(opts) != 1 || opts[0] == "" {
return nil, status.New(codes.InvalidArgument, "check key empty").Err() return nil, status.New(codes.InvalidArgument, "check key empty").Err()
} else { } else {