diff --git a/cmd/main.go b/cmd/main.go index 56749861f..40bce080d 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -45,17 +45,17 @@ func main() { } } cmd := newCmds(configPath) - putCmd1(cmd, auth.Start) - putCmd1(cmd, conversation.Start) - putCmd1(cmd, relation.Start) - putCmd1(cmd, group.Start) - putCmd1(cmd, msg.Start) - putCmd1(cmd, third.Start) - putCmd1(cmd, user.Start) - putCmd1(cmd, push.Start) - putCmd2(cmd, msggateway.Start) - putCmd2(cmd, msgtransfer.Start) - putCmd2(cmd, api.Start) + putCmd1(cmd, false, auth.Start) + putCmd1(cmd, false, conversation.Start) + putCmd1(cmd, false, relation.Start) + putCmd1(cmd, false, group.Start) + putCmd1(cmd, false, msg.Start) + putCmd1(cmd, false, third.Start) + putCmd1(cmd, false, user.Start) + putCmd1(cmd, false, push.Start) + putCmd3(cmd, true, msggateway.Start) + putCmd2(cmd, true, msgtransfer.Start) + putCmd2(cmd, true, api.Start) ctx := context.Background() if err := cmd.run(ctx); err != nil { fmt.Println(err) @@ -176,8 +176,8 @@ func (x *cmds) parseConf(conf any) error { return nil } -func (x *cmds) add(name string, fn func(ctx context.Context) error) { - x.cmds = append(x.cmds, cmdName{Name: name, Func: fn}) +func (x *cmds) add(name string, block bool, fn func(ctx context.Context) error) { + x.cmds = append(x.cmds, cmdName{Name: name, Block: block, Func: fn}) } func (x *cmds) run(ctx context.Context) error { @@ -193,13 +193,14 @@ func (x *cmds) run(ctx context.Context) error { for i := range x.cmds { cmd := x.cmds[i] go func() { - fmt.Println("start", cmd.Name) + //fmt.Println("start", cmd.Name) if err := cmd.Func(ctx); err != nil { - fmt.Println("start failed", cmd.Name, err) - cancel(err) + cancel(fmt.Errorf("server %s exit %w", cmd.Name, err)) return } - fmt.Println("start end", cmd.Name) + if cmd.Block { + cancel(fmt.Errorf("server %s exit", cmd.Name)) + } }() } <-ctx.Done() @@ -207,8 +208,9 @@ func (x *cmds) run(ctx context.Context) error { } type cmdName struct { - Name string - Func func(ctx context.Context) error + Name string + Func func(ctx context.Context) error + Block bool } func getFuncPacketName(fn any) string { @@ -219,8 +221,8 @@ func getFuncPacketName(fn any) string { return name } -func putCmd1[C any](cmd *cmds, fn func(ctx context.Context, config *C, client discovery.Conn, server grpc.ServiceRegistrar) error) { - cmd.add(getFuncPacketName(fn), func(ctx context.Context) error { +func putCmd1[C any](cmd *cmds, block bool, fn func(ctx context.Context, config *C, client discovery.Conn, server grpc.ServiceRegistrar) error) { + cmd.add(getFuncPacketName(fn), block, func(ctx context.Context) error { var conf C if err := cmd.parseConf(&conf); err != nil { return err @@ -229,8 +231,8 @@ func putCmd1[C any](cmd *cmds, fn func(ctx context.Context, config *C, client di }) } -func putCmd2[C any](cmd *cmds, fn func(ctx context.Context, index int, config *C) error) { - cmd.add(getFuncPacketName(fn), func(ctx context.Context) error { +func putCmd2[C any](cmd *cmds, block bool, fn func(ctx context.Context, index int, config *C) error) { + cmd.add(getFuncPacketName(fn), block, func(ctx context.Context) error { var conf C if err := cmd.parseConf(&conf); err != nil { return err @@ -238,3 +240,13 @@ func putCmd2[C any](cmd *cmds, fn func(ctx context.Context, index int, config *C return fn(ctx, 0, &conf) }) } + +func putCmd3[C any](cmd *cmds, block bool, fn func(ctx context.Context, config *C, client discovery.Conn, server grpc.ServiceRegistrar, index int) error) { + cmd.add(getFuncPacketName(fn), block, func(ctx context.Context) error { + var conf C + if err := cmd.parseConf(&conf); err != nil { + return err + } + return fn(ctx, &conf, standalone.GetDiscoveryConn(), standalone.GetServiceRegistrar(), 0) + }) +} diff --git a/go.mod b/go.mod index 6c6afc496..714578de3 100644 --- a/go.mod +++ b/go.mod @@ -220,4 +220,5 @@ require ( gopkg.in/ini.v1 v1.67.0 // indirect ) -replace github.com/openimsdk/tools => github.com/withchao/tools v0.0.50-alpha.38.0.20250207092500-cc9b95a0cdcc +//replace github.com/openimsdk/tools => github.com/withchao/tools v0.0.50-alpha.38.0.20250207092500-cc9b95a0cdcc +replace github.com/openimsdk/tools => /Users/chao/Desktop/code/tools diff --git a/internal/msggateway/hub_server.go b/internal/msggateway/hub_server.go index 7e8448880..376a697fd 100644 --- a/internal/msggateway/hub_server.go +++ b/internal/msggateway/hub_server.go @@ -22,7 +22,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" - "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/msggateway" "github.com/openimsdk/protocol/sdkws" @@ -51,26 +50,26 @@ func (s *Server) InitServer(ctx context.Context, config *Config, disCov discover return nil } -func (s *Server) Start(ctx context.Context, index int, conf *Config) error { - return startrpc.Start(ctx, &conf.Discovery, &conf.MsgGateway.Prometheus, conf.MsgGateway.ListenIP, - conf.MsgGateway.RPC.RegisterIP, - conf.MsgGateway.RPC.AutoSetPorts, conf.MsgGateway.RPC.Ports, index, - conf.Discovery.RpcService.MessageGateway, - nil, - conf, - []string{ - conf.Share.GetConfigFileName(), - conf.Discovery.GetConfigFileName(), - conf.MsgGateway.GetConfigFileName(), - conf.WebhooksConfig.GetConfigFileName(), - conf.RedisConfig.GetConfigFileName(), - }, - []string{ - conf.Discovery.RpcService.MessageGateway, - }, - s.InitServer, - ) -} +//func (s *Server) Start(ctx context.Context, index int, conf *Config) error { +// return startrpc.Start(ctx, &conf.Discovery, &conf.MsgGateway.Prometheus, conf.MsgGateway.ListenIP, +// conf.MsgGateway.RPC.RegisterIP, +// conf.MsgGateway.RPC.AutoSetPorts, conf.MsgGateway.RPC.Ports, index, +// conf.Discovery.RpcService.MessageGateway, +// nil, +// conf, +// []string{ +// conf.Share.GetConfigFileName(), +// conf.Discovery.GetConfigFileName(), +// conf.MsgGateway.GetConfigFileName(), +// conf.WebhooksConfig.GetConfigFileName(), +// conf.RedisConfig.GetConfigFileName(), +// }, +// []string{ +// conf.Discovery.RpcService.MessageGateway, +// }, +// s.InitServer, +// ) +//} type Server struct { msggateway.UnimplementedMsgGatewayServer diff --git a/internal/msggateway/init.go b/internal/msggateway/init.go index 3cbd1a06b..916dd0385 100644 --- a/internal/msggateway/init.go +++ b/internal/msggateway/init.go @@ -21,8 +21,10 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/rpccache" "github.com/openimsdk/tools/db/redisutil" + "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/runtimeenv" + "google.golang.org/grpc" "github.com/openimsdk/tools/log" ) @@ -36,7 +38,7 @@ type Config struct { } // Start run ws server. -func Start(ctx context.Context, index int, conf *Config) error { +func Start(ctx context.Context, conf *Config, client discovery.Conn, server grpc.ServiceRegistrar, index int) error { log.CInfo(ctx, "MSG-GATEWAY server is initializing", "runtimeEnv", runtimeenv.RuntimeEnvironment(), "rpcPorts", conf.MsgGateway.RPC.Ports, "wsPort", conf.MsgGateway.LongConnSvr.Ports, "prometheusPorts", conf.MsgGateway.Prometheus.Ports) @@ -63,12 +65,55 @@ func Start(ctx context.Context, index int, conf *Config) error { return err }) + if err := hubServer.InitServer(ctx, conf, client, server); err != nil { + return err + } + go longServer.ChangeOnlineStatus(4) - netDone := make(chan error) - go func() { - err = hubServer.Start(ctx, index, conf) - netDone <- err - }() - return hubServer.LongConnServer.Run(netDone) + //netDone := make(chan error) + //go func() { + // err = hubServer.Start(ctx, index, conf) + // netDone <- err + //}() + return hubServer.LongConnServer.Run(ctx) } + +// +//// Start run ws server. +//func Start(ctx context.Context, index int, conf *Config) error { +// log.CInfo(ctx, "MSG-GATEWAY server is initializing", "runtimeEnv", runtimeenv.RuntimeEnvironment(), +// "rpcPorts", conf.MsgGateway.RPC.Ports, +// "wsPort", conf.MsgGateway.LongConnSvr.Ports, "prometheusPorts", conf.MsgGateway.Prometheus.Ports) +// wsPort, err := datautil.GetElemByIndex(conf.MsgGateway.LongConnSvr.Ports, index) +// if err != nil { +// return err +// } +// +// rdb, err := redisutil.NewRedisClient(ctx, conf.RedisConfig.Build()) +// if err != nil { +// return err +// } +// longServer := NewWsServer( +// conf, +// WithPort(wsPort), +// WithMaxConnNum(int64(conf.MsgGateway.LongConnSvr.WebsocketMaxConnNum)), +// WithHandshakeTimeout(time.Duration(conf.MsgGateway.LongConnSvr.WebsocketTimeout)*time.Second), +// WithMessageMaxMsgLength(conf.MsgGateway.LongConnSvr.WebsocketMaxMsgLen), +// ) +// +// hubServer := NewServer(longServer, conf, func(srv *Server) error { +// var err error +// longServer.online, err = rpccache.NewOnlineCache(srv.userClient, nil, rdb, false, longServer.subscriberUserOnlineStatusChanges) +// return err +// }) +// +// go longServer.ChangeOnlineStatus(4) +// +// netDone := make(chan error) +// go func() { +// err = hubServer.Start(ctx, index, conf) +// netDone <- err +// }() +// return hubServer.LongConnServer.Run(netDone) +//} diff --git a/internal/msggateway/ws_server.go b/internal/msggateway/ws_server.go index d62a6c6dc..3f7fd7ec0 100644 --- a/internal/msggateway/ws_server.go +++ b/internal/msggateway/ws_server.go @@ -2,7 +2,6 @@ package msggateway import ( "context" - "errors" "fmt" "net/http" "sync" @@ -11,7 +10,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/rpcli" - "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd" "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" "github.com/openimsdk/open-im-server/v3/pkg/rpccache" pbAuth "github.com/openimsdk/protocol/auth" @@ -30,7 +28,7 @@ import ( ) type LongConnServer interface { - Run(done chan error) error + Run(ctx context.Context) error wsHandler(w http.ResponseWriter, r *http.Request) GetUserAllCons(userID string) ([]*Client, bool) GetUserPlatformCons(userID string, platform int) ([]*Client, bool, bool) @@ -158,19 +156,14 @@ func NewWsServer(msgGatewayConfig *Config, opts ...Option) *WsServer { } } -func (ws *WsServer) Run(done chan error) error { - var ( - client *Client - netErr error - shutdownDone = make(chan struct{}, 1) - ) - - server := http.Server{Addr: ":" + stringutil.IntToString(ws.port), Handler: nil} +func (ws *WsServer) Run(ctx context.Context) error { + var client *Client + ctx, cancel := context.WithCancelCause(ctx) go func() { for { select { - case <-shutdownDone: + case <-ctx.Done(): return case client = <-ws.registerChan: ws.registerClient(client) @@ -181,42 +174,31 @@ func (ws *WsServer) Run(done chan error) error { } } }() - netDone := make(chan struct{}, 1) + + server := http.Server{Addr: fmt.Sprintf(":%d", ws.port), Handler: nil} go func() { http.HandleFunc("/", ws.wsHandler) err := server.ListenAndServe() - if err != nil && !errors.Is(err, http.ErrServerClosed) { - netErr = errs.WrapMsg(err, "ws start err", server.Addr) - netDone <- struct{}{} + if err == nil { + err = fmt.Errorf("http server closed") } + cancel(fmt.Errorf("msg gateway %w", err)) }() - ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + shutDown := func() error { + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() sErr := server.Shutdown(ctx) if sErr != nil { return errs.WrapMsg(sErr, "shutdown err") } - close(shutdownDone) return nil } - etcd.RegisterShutDown(shutDown) - defer cancel() - var err error - select { - case err = <-done: - if err := shutDown(); err != nil { - return err - } - if err != nil { - return err - } - case <-netDone: - } - return netErr - + <-ctx.Done() + return shutDown() } -var concurrentRequest = 3 +const concurrentRequest = 3 func (ws *WsServer) sendUserOnlineInfoToOtherNode(ctx context.Context, client *Client) error { conns, err := ws.disCov.GetConns(ctx, ws.msgGatewayConfig.Discovery.RpcService.MessageGateway) diff --git a/internal/push/onlinepusher.go b/internal/push/onlinepusher.go index f4a3285ad..e55765706 100644 --- a/internal/push/onlinepusher.go +++ b/internal/push/onlinepusher.go @@ -2,7 +2,7 @@ package push import ( "context" - "errors" + "fmt" "sync" "github.com/openimsdk/protocol/msggateway" @@ -40,18 +40,17 @@ func (u emptyOnlinePusher) GetOnlinePushFailedUserIDs(ctx context.Context, msg * return nil } -func NewOnlinePusher(disCov discovery.Conn, config *Config) OnlinePusher { +func NewOnlinePusher(disCov discovery.Conn, config *Config) (OnlinePusher, error) { if runtimeenv.RuntimeEnvironment() == conf.KUBERNETES { - return NewDefaultAllNode(disCov, config) + return NewDefaultAllNode(disCov, config), nil } switch config.Discovery.Enable { case conf.ETCD: - return NewDefaultAllNode(disCov, config) + return NewDefaultAllNode(disCov, config), nil case conf.Standalone: - return nil + return NewDefaultAllNode(disCov, config), nil default: - log.ZWarn(context.Background(), "NewOnlinePusher is error", errs.Wrap(errors.New("unsupported discovery type")), "type", config.Discovery.Enable) - return nil + return nil, errs.New(fmt.Sprintf("unsupported discovery type %s", config.Discovery.Enable)) } } diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index e8240f031..707782c70 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -45,8 +45,6 @@ type ConsumerHandler struct { } func NewConsumerHandler(ctx context.Context, config *Config, database controller.PushDatabase, offlinePusher offlinepush.OfflinePusher, rdb redis.UniversalClient, client discovery.Conn) (*ConsumerHandler, error) { - var consumerHandler ConsumerHandler - var err error userConn, err := client.GetConn(ctx, config.Discovery.RpcService.User) if err != nil { return nil, err @@ -63,13 +61,18 @@ func NewConsumerHandler(ctx context.Context, config *Config, database controller if err != nil { return nil, err } + onlinePusher, err := NewOnlinePusher(client, config) + if err != nil { + return nil, err + } + var consumerHandler ConsumerHandler consumerHandler.userClient = rpcli.NewUserClient(userConn) consumerHandler.groupClient = rpcli.NewGroupClient(groupConn) consumerHandler.msgClient = rpcli.NewMsgClient(msgConn) consumerHandler.conversationClient = rpcli.NewConversationClient(conversationConn) consumerHandler.offlinePusher = offlinePusher - consumerHandler.onlinePusher = NewOnlinePusher(client, config) + consumerHandler.onlinePusher = onlinePusher consumerHandler.groupLocalCache = rpccache.NewGroupLocalCache(consumerHandler.groupClient, &config.LocalCacheConfig, rdb) consumerHandler.conversationLocalCache = rpccache.NewConversationLocalCache(consumerHandler.conversationClient, &config.LocalCacheConfig, rdb) consumerHandler.webhookClient = webhook.NewWebhookClient(config.WebhooksConfig.URL)