diff --git a/go.mod b/go.mod index 2aa573a54..0ddea16ea 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.18 require ( firebase.google.com/go v3.13.0+incompatible - github.com/OpenIMSDK/openKeeper v0.0.5 + github.com/OpenIMSDK/openKeeper v0.0.4 github.com/OpenIMSDK/open_utils v1.0.8 github.com/Shopify/sarama v1.32.0 github.com/antonfisher/nested-logrus-formatter v1.3.1 diff --git a/internal/msggateway/hub_server.go b/internal/msggateway/hub_server.go index 95bac5725..c745373c5 100644 --- a/internal/msggateway/hub_server.go +++ b/internal/msggateway/hub_server.go @@ -9,13 +9,15 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry" "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msggateway" + "github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient/notification" "github.com/OpenIMSDK/Open-IM-Server/pkg/startrpc" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" "google.golang.org/grpc" ) func (s *Server) InitServer(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { - msggateway.RegisterMsgGatewayServer(server, &Server{}) + s.notification = notification.NewCheck(client) + msggateway.RegisterMsgGatewayServer(server, s) return nil } @@ -24,6 +26,7 @@ func (s *Server) Start() error { } type Server struct { + notification *notification.Check rpcPort int prometheusPort int LongConnServer LongConnServer @@ -31,6 +34,10 @@ type Server struct { //rpcServer *RpcServer } +func (s *Server) Notification() *notification.Check { + return s.notification +} + func NewServer(rpcPort int, longConnServer LongConnServer) *Server { return &Server{rpcPort: rpcPort, LongConnServer: longConnServer, pushTerminal: []int{constant.IOSPlatformID, constant.AndroidPlatformID}} } diff --git a/internal/msggateway/init.go b/internal/msggateway/init.go index 5aba286f4..af17cff17 100644 --- a/internal/msggateway/init.go +++ b/internal/msggateway/init.go @@ -23,6 +23,7 @@ func RunWsAndServer(rpcPort, wsPort, prometheusPort int) error { return err } hubServer := NewServer(rpcPort, longServer) + longServer.SetMessageHandler(hubServer.Notification()) go hubServer.Start() go hubServer.LongConnServer.Run() wg.Wait() diff --git a/internal/msggateway/n_ws_server.go b/internal/msggateway/n_ws_server.go index 561b4ab3c..d0964b3fd 100644 --- a/internal/msggateway/n_ws_server.go +++ b/internal/msggateway/n_ws_server.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/tokenverify" "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" + "github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient/notification" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" "github.com/go-playground/validator/v10" "net/http" @@ -42,12 +43,17 @@ type WsServer struct { onlineUserConnNum int64 handshakeTimeout time.Duration readBufferSize, WriteBufferSize int + hubServer *Server validate *validator.Validate Compressor Encoder MessageHandler } +func (ws *WsServer) SetMessageHandler(rpcClient *notification.Check) { + ws.MessageHandler = NewGrpcHandler(ws.validate, rpcClient) +} + func (ws *WsServer) UnRegister(c *Client) { ws.unregisterChan <- c } @@ -90,7 +96,6 @@ func NewWsServer(opts ...Option) (*WsServer, error) { clients: newUserMap(), Compressor: NewGzipCompressor(), Encoder: NewGobEncoder(), - MessageHandler: NewGrpcHandler(v, nil), //handler: NewGrpcHandler(validate), }, nil }