diff --git a/cmd/main.go b/cmd/main.go index dd225fa13..4a693f31c 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -6,11 +6,15 @@ import ( "flag" "fmt" "os" + "os/signal" "path" "path/filepath" "reflect" "runtime" "strings" + "sync" + "syscall" + "time" "github.com/mitchellh/mapstructure" "github.com/openimsdk/open-im-server/v3/internal/api" @@ -27,6 +31,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/discovery/standalone" + "github.com/openimsdk/tools/log" "github.com/spf13/viper" "google.golang.org/grpc" ) @@ -188,9 +193,42 @@ func (x *cmds) run(ctx context.Context) error { return err } ctx, cancel := context.WithCancelCause(ctx) + + go func() { + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGTERM) + select { + case <-ctx.Done(): + return + case val := <-sigs: + cancel(fmt.Errorf("signal %s", val.String())) + } + }() + for i := range x.cmds { cmd := x.cmds[i] + if cmd.Block { + continue + } + if err := cmd.Func(ctx); err != nil { + cancel(fmt.Errorf("server %s exit %w", cmd.Name, err)) + return err + } go func() { + if cmd.Block { + cancel(fmt.Errorf("server %s exit", cmd.Name)) + } + }() + } + var wg sync.WaitGroup + for i := range x.cmds { + cmd := x.cmds[i] + if !cmd.Block { + continue + } + wg.Add(1) + go func() { + defer wg.Done() if err := cmd.Func(ctx); err != nil { cancel(fmt.Errorf("server %s exit %w", cmd.Name, err)) return @@ -201,7 +239,22 @@ func (x *cmds) run(ctx context.Context) error { }() } <-ctx.Done() - return context.Cause(ctx) + + exitCause := context.Cause(ctx) + log.ZWarn(ctx, "server exit cause", exitCause) + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-time.After(time.Second * 20): + log.ZError(ctx, "server exit timeout", nil) + case <-done: + log.ZInfo(ctx, "server exit done") + } + return exitCause } func putCmd[C any](cmd *cmds, block bool, fn func(ctx context.Context, config *C, client discovery.Conn, server grpc.ServiceRegistrar) error) { diff --git a/internal/api/init.go b/internal/api/init.go index 3fecc119d..3c71c6b4c 100644 --- a/internal/api/init.go +++ b/internal/api/init.go @@ -23,16 +23,13 @@ import ( "os" "os/signal" "strconv" + "sync" "syscall" "time" conf "github.com/openimsdk/open-im-server/v3/pkg/common/config" - disetcd "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd" "github.com/openimsdk/tools/discovery" - "github.com/openimsdk/tools/discovery/etcd" - "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" - "github.com/openimsdk/tools/system/program" "github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/network" "github.com/openimsdk/tools/utils/runtimeenv" @@ -46,7 +43,7 @@ type Config struct { Index conf.Index } -func Start(ctx context.Context, config *Config, client discovery.Conn, _ grpc.ServiceRegistrar) error { +func Start(ctx context.Context, config *Config, client discovery.Conn, service grpc.ServiceRegistrar) error { apiPort, err := datautil.GetElemByIndex(config.API.Api.Ports, int(config.Index)) if err != nil { return err @@ -60,12 +57,6 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, _ grpc.Se //} //client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) - var ( - netDone = make(chan struct{}, 1) - netErr error - prometheusPort int - ) - //registerIP, err := network.GetRpcRegisterIP("") //if err != nil { // return err @@ -127,44 +118,56 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, _ grpc.Se // }() // //} - address := net.JoinHostPort(network.GetListenIP(config.API.Api.ListenIP), strconv.Itoa(apiPort)) - - httpServer := &http.Server{Addr: address, Handler: router} - log.CInfo(ctx, "API server is initializing", "runtimeEnv", runtimeenv.RuntimeEnvironment(), "address", address, "apiPort", apiPort, "prometheusPort", prometheusPort) + var wg sync.WaitGroup + ctx, cancel := context.WithCancelCause(ctx) go func() { - err = httpServer.ListenAndServe() - if err != nil && !errors.Is(err, http.ErrServerClosed) { - netErr = errs.WrapMsg(err, fmt.Sprintf("api start err: %s", httpServer.Addr)) - netDone <- struct{}{} + wg.Add(1) + httpServer := &http.Server{ + Handler: router, + Addr: net.JoinHostPort(network.GetListenIP(config.API.Api.ListenIP), strconv.Itoa(apiPort)), } + log.CInfo(ctx, "api server is init", "runtimeEnv", runtimeenv.RuntimeEnvironment(), "address", httpServer.Addr, "apiPort", apiPort) + go func() { + defer wg.Done() + <-ctx.Done() + if err := httpServer.Shutdown(context.Background()); err != nil { + log.ZWarn(ctx, "api server shutdown err", err) + } + }() + err := httpServer.ListenAndServe() + if err == nil { + err = errors.New("api done") + } + cancel(err) }() - if config.Discovery.Enable == conf.ETCD { - cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), config.GetConfigNames()) - cm.Watch(ctx) - } + //if config.Discovery.Enable == conf.ETCD { + // cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), config.GetConfigNames()) + // cm.Watch(ctx) + //} sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGTERM) - - shutdown := func() error { - ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) - defer cancel() - err := httpServer.Shutdown(ctx) - if err != nil { - return errs.WrapMsg(err, "shutdown err") - } - return nil - } select { - case <-sigs: - program.SIGTERMExit() - if err := shutdown(); err != nil { - return err - } - case <-netDone: - close(netDone) - return netErr + case val := <-sigs: + log.ZDebug(ctx, "recv exit", "signal", val.String()) + cancel(fmt.Errorf("signal %s", val.String())) + case <-ctx.Done(): } - return nil + exitCause := context.Cause(ctx) + log.ZWarn(ctx, "api server exit", exitCause) + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + timer := time.NewTimer(time.Second * 15) + defer timer.Stop() + select { + case <-timer.C: + log.ZWarn(ctx, "api server graceful stop timeout", nil) + case <-done: + log.ZDebug(ctx, "api server graceful stop done") + } + return exitCause }