diff --git a/go.mod b/go.mod index fd188e978..fc0b411a0 100644 --- a/go.mod +++ b/go.mod @@ -219,3 +219,7 @@ require ( golang.org/x/crypto v0.27.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect ) + +replace ( + github.com/openimsdk/tools => /Users/chao/Desktop/code/tools +) \ No newline at end of file diff --git a/internal/msggateway/ws_server.go b/internal/msggateway/ws_server.go index 24dd823f6..0731074c0 100644 --- a/internal/msggateway/ws_server.go +++ b/internal/msggateway/ws_server.go @@ -223,15 +223,19 @@ func (ws *WsServer) sendUserOnlineInfoToOtherNode(ctx context.Context, client *C if err != nil { return err } + if len(conns) == 0 || (len(conns) == 1 && ws.disCov.IsSelfNode(conns[0])) { + return nil + } + wg := errgroup.Group{} wg.SetLimit(concurrentRequest) // Online push user online message to other node for _, v := range conns { v := v - log.ZDebug(ctx, " sendUserOnlineInfoToOtherNode conn ", "target", v.Target()) - if v.Target() == ws.disCov.GetSelfConnTarget() { - log.ZDebug(ctx, "Filter out this node", "node", v.Target()) + log.ZDebug(ctx, "sendUserOnlineInfoToOtherNode conn") + if ws.disCov.IsSelfNode(v) { + log.ZDebug(ctx, "Filter out this node") continue } @@ -242,7 +246,7 @@ func (ws *WsServer) sendUserOnlineInfoToOtherNode(ctx context.Context, client *C PlatformID: int32(client.PlatformID), Token: client.token, }) if err != nil { - log.ZWarn(ctx, "MultiTerminalLoginCheck err", err, "node", v.Target()) + log.ZWarn(ctx, "MultiTerminalLoginCheck err", err) } return nil }) diff --git a/internal/push/onlinepusher.go b/internal/push/onlinepusher.go index 23e68339c..8bec6e60b 100644 --- a/internal/push/onlinepusher.go +++ b/internal/push/onlinepusher.go @@ -166,7 +166,7 @@ func (k *K8sStaticConsistentHash) GetConnsAndOnlinePush(ctx context.Context, msg } } log.ZDebug(ctx, "genUsers send hosts struct:", "usersHost", usersHost) - var usersConns = make(map[*grpc.ClientConn][]string) + var usersConns = make(map[grpc.ClientConnInterface][]string) for host, userIds := range usersHost { tconn, _ := k.disCov.GetConn(ctx, host) usersConns[tconn] = userIds diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index 3e096aa64..2e64c365c 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -192,7 +192,7 @@ func (s *authServer) forceKickOff(ctx context.Context, userID string, platformID return err } for _, v := range conns { - log.ZDebug(ctx, "forceKickOff", "conn", v.Target()) + log.ZDebug(ctx, "forceKickOff", "userID", userID, "platformID", platformID) client := msggateway.NewMsgGatewayClient(v) kickReq := &msggateway.KickUserOfflineReq{KickUserIDList: []string{userID}, PlatformID: platformID} _, err := client.KickUserOffline(ctx, kickReq) diff --git a/pkg/service/a_test.go b/pkg/service/a_test.go index fbbddaee8..34b621456 100644 --- a/pkg/service/a_test.go +++ b/pkg/service/a_test.go @@ -1,56 +1,134 @@ package service import ( - "crypto/tls" - "net" + "context" + "fmt" + "sync" "testing" - "time" + + "github.com/openimsdk/protocol/group" + "github.com/openimsdk/protocol/user" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" +) + +var ( + _ user.UnimplementedUserServer + _ group.UnimplementedGroupServer ) func TestName1(t *testing.T) { - - tls.Client(&testConn{}, &tls.Config{}).Handshake() - - time.Sleep(time.Hour) + cc := newStandaloneConn() + user.RegisterUserServer(cc.Registry(), &user.UnimplementedUserServer{}) + group.RegisterGroupServer(cc.Registry(), &group.UnimplementedGroupServer{}) + ctx := context.Background() + resp, err := user.NewUserClient(cc).GetUserStatus(ctx, &user.GetUserStatusReq{UserID: "imAdmin", UserIDs: []string{"10000", "20000"}}) + if err != nil { + t.Error(err) + return + } + t.Log(resp) } -type testConn struct { +func newStandaloneConn() *standaloneConn { + return &standaloneConn{ + registry: newStandaloneRegistry(), + serializer: NewProtoSerializer(), + } } -func (testConn) Read(b []byte) (n int, err error) { - panic("implement me") +type standaloneConn struct { + registry *standaloneRegistry + serializer Serializer } -func (testConn) Write(b []byte) (n int, err error) { - panic("implement me") +func (x *standaloneConn) Registry() grpc.ServiceRegistrar { + return x.registry } -func (testConn) Close() error { - //TODO implement me - panic("implement me") +func (x *standaloneConn) Invoke(ctx context.Context, method string, args any, reply any, opts ...grpc.CallOption) error { + handler := x.registry.getMethod(method) + if handler == nil { + return fmt.Errorf("service %s not found", method) + } + resp, err := handler(ctx, args, nil) + if err != nil { + return err + } + tmp, err := x.serializer.Marshal(resp) + if err != nil { + return err + } + return x.serializer.Unmarshal(tmp, reply) } -func (testConn) LocalAddr() net.Addr { - //TODO implement me - panic("implement me") +func (x *standaloneConn) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) { + return nil, status.Errorf(codes.Unimplemented, "method stream not implemented") } -func (testConn) RemoteAddr() net.Addr { - //TODO implement me - panic("implement me") +type serverHandler func(ctx context.Context, req any, interceptor grpc.UnaryServerInterceptor) (any, error) + +func newStandaloneRegistry() *standaloneRegistry { + return &standaloneRegistry{ + methods: make(map[string]serverHandler), + serializer: NewProtoSerializer(), + } } -func (testConn) SetDeadline(t time.Time) error { - //TODO implement me - panic("implement me") +type standaloneRegistry struct { + lock sync.RWMutex + methods map[string]serverHandler + serializer Serializer } -func (testConn) SetReadDeadline(t time.Time) error { - //TODO implement me - panic("implement me") +func (x *standaloneConn) emptyDec(req any) error { + return nil } -func (testConn) SetWriteDeadline(t time.Time) error { - //TODO implement me - panic("implement me") +func (x *standaloneRegistry) RegisterService(desc *grpc.ServiceDesc, impl any) { + x.lock.Lock() + defer x.lock.Unlock() + for i := range desc.Methods { + method := desc.Methods[i] + name := fmt.Sprintf("/%s/%s", desc.ServiceName, method.MethodName) + if _, ok := x.methods[name]; ok { + panic(fmt.Errorf("service %s already registered, method %s", desc.ServiceName, method.MethodName)) + } + x.methods[name] = func(ctx context.Context, req any, interceptor grpc.UnaryServerInterceptor) (any, error) { + return method.Handler(impl, ctx, func(in any) error { + tmp, err := x.serializer.Marshal(req) + if err != nil { + return err + } + return x.serializer.Unmarshal(tmp, in) + }, interceptor) + } + } +} + +func (x *standaloneRegistry) getMethod(name string) serverHandler { + x.lock.RLock() + defer x.lock.RUnlock() + return x.methods[name] +} + +type Serializer interface { + Marshal(any) ([]byte, error) + Unmarshal([]byte, any) error +} + +func NewProtoSerializer() Serializer { + return protoSerializer{} +} + +type protoSerializer struct{} + +func (protoSerializer) Marshal(in any) ([]byte, error) { + return proto.Marshal(in.(proto.Message)) +} + +func (protoSerializer) Unmarshal(b []byte, out any) error { + return proto.Unmarshal(b, out.(proto.Message)) } diff --git a/pkg/service/registry.go b/pkg/service/registry.go index 38a4575ac..a847a4d81 100644 --- a/pkg/service/registry.go +++ b/pkg/service/registry.go @@ -1,67 +1,34 @@ package service -import ( - "context" - "fmt" - - "google.golang.org/grpc" -) - -type Conn interface { - GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]grpc.ClientConnInterface, error) //1 - GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (grpc.ClientConnInterface, error) //2 - GetSelfConnTarget() string //3 - AddOption(opts ...grpc.DialOption) //4 - CloseConn(conn *grpc.ClientConn) //5 - // do not use this method for call rpc -} -type SvcDiscoveryRegistry interface { - Conn - Register(serviceName, host string, port int, opts ...grpc.DialOption) error //6 - UnRegister() error //7 - Close() - GetUserIdHashGatewayHost(ctx context.Context, userId string) (string, error) // -} - -var _ SvcDiscoveryRegistry = (*DiscoveryRegistry)(nil) - -type DiscoveryRegistry struct { -} - -func (x *DiscoveryRegistry) RegisterService(desc *grpc.ServiceDesc, impl any) { - fmt.Println("RegisterService", desc, impl) -} - -func (x *DiscoveryRegistry) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]grpc.ClientConnInterface, error) { - //TODO implement me - panic("implement me") -} - -func (x *DiscoveryRegistry) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (grpc.ClientConnInterface, error) { - //TODO implement me - panic("implement me") -} - -func (x *DiscoveryRegistry) GetSelfConnTarget() string { - return "" -} - -func (x *DiscoveryRegistry) AddOption(opts ...grpc.DialOption) {} - -func (x *DiscoveryRegistry) CloseConn(conn *grpc.ClientConn) { - _ = conn.Close() -} - -func (x *DiscoveryRegistry) Register(serviceName, host string, port int, opts ...grpc.DialOption) error { - return nil -} - -func (x *DiscoveryRegistry) UnRegister() error { - return nil -} - -func (x *DiscoveryRegistry) Close() {} - -func (x *DiscoveryRegistry) GetUserIdHashGatewayHost(ctx context.Context, userId string) (string, error) { - return "", nil -} +// +//import ( +// "context" +// "fmt" +// "sync" +// +// "google.golang.org/grpc" +//) +// +//type DiscoveryRegistry struct { +// lock sync.RWMutex +// services map[string]grpc.ClientConnInterface +//} +// +//func (x *DiscoveryRegistry) RegisterService(desc *grpc.ServiceDesc, impl any) { +// fmt.Println("RegisterService", desc, impl) +//} +// +//func (x *DiscoveryRegistry) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]grpc.ClientConnInterface, error) { +// //TODO implement me +// panic("implement me") +//} +// +//func (x *DiscoveryRegistry) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (grpc.ClientConnInterface, error) { +// //TODO implement me +// panic("implement me") +//} +// +//func (x *DiscoveryRegistry) IsSelfNode(cc grpc.ClientConnInterface) bool { +// +// return false +//}