This commit is contained in:
withchao 2025-02-08 16:22:14 +08:00
parent 0c20ec5a9c
commit cac877e884
2 changed files with 99 additions and 43 deletions

View File

@ -6,11 +6,15 @@ import (
"flag"
"fmt"
"os"
"os/signal"
"path"
"path/filepath"
"reflect"
"runtime"
"strings"
"sync"
"syscall"
"time"
"github.com/mitchellh/mapstructure"
"github.com/openimsdk/open-im-server/v3/internal/api"
@ -27,6 +31,7 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/discovery/standalone"
"github.com/openimsdk/tools/log"
"github.com/spf13/viper"
"google.golang.org/grpc"
)
@ -188,9 +193,42 @@ func (x *cmds) run(ctx context.Context) error {
return err
}
ctx, cancel := context.WithCancelCause(ctx)
go func() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGTERM)
select {
case <-ctx.Done():
return
case val := <-sigs:
cancel(fmt.Errorf("signal %s", val.String()))
}
}()
for i := range x.cmds {
cmd := x.cmds[i]
if cmd.Block {
continue
}
if err := cmd.Func(ctx); err != nil {
cancel(fmt.Errorf("server %s exit %w", cmd.Name, err))
return err
}
go func() {
if cmd.Block {
cancel(fmt.Errorf("server %s exit", cmd.Name))
}
}()
}
var wg sync.WaitGroup
for i := range x.cmds {
cmd := x.cmds[i]
if !cmd.Block {
continue
}
wg.Add(1)
go func() {
defer wg.Done()
if err := cmd.Func(ctx); err != nil {
cancel(fmt.Errorf("server %s exit %w", cmd.Name, err))
return
@ -201,7 +239,22 @@ func (x *cmds) run(ctx context.Context) error {
}()
}
<-ctx.Done()
return context.Cause(ctx)
exitCause := context.Cause(ctx)
log.ZWarn(ctx, "server exit cause", exitCause)
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-time.After(time.Second * 20):
log.ZError(ctx, "server exit timeout", nil)
case <-done:
log.ZInfo(ctx, "server exit done")
}
return exitCause
}
func putCmd[C any](cmd *cmds, block bool, fn func(ctx context.Context, config *C, client discovery.Conn, server grpc.ServiceRegistrar) error) {

View File

@ -23,16 +23,13 @@ import (
"os"
"os/signal"
"strconv"
"sync"
"syscall"
"time"
conf "github.com/openimsdk/open-im-server/v3/pkg/common/config"
disetcd "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/discovery/etcd"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/system/program"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/network"
"github.com/openimsdk/tools/utils/runtimeenv"
@ -46,7 +43,7 @@ type Config struct {
Index conf.Index
}
func Start(ctx context.Context, config *Config, client discovery.Conn, _ grpc.ServiceRegistrar) error {
func Start(ctx context.Context, config *Config, client discovery.Conn, service grpc.ServiceRegistrar) error {
apiPort, err := datautil.GetElemByIndex(config.API.Api.Ports, int(config.Index))
if err != nil {
return err
@ -60,12 +57,6 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, _ grpc.Se
//}
//client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
var (
netDone = make(chan struct{}, 1)
netErr error
prometheusPort int
)
//registerIP, err := network.GetRpcRegisterIP("")
//if err != nil {
// return err
@ -127,44 +118,56 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, _ grpc.Se
// }()
//
//}
address := net.JoinHostPort(network.GetListenIP(config.API.Api.ListenIP), strconv.Itoa(apiPort))
httpServer := &http.Server{Addr: address, Handler: router}
log.CInfo(ctx, "API server is initializing", "runtimeEnv", runtimeenv.RuntimeEnvironment(), "address", address, "apiPort", apiPort, "prometheusPort", prometheusPort)
var wg sync.WaitGroup
ctx, cancel := context.WithCancelCause(ctx)
go func() {
err = httpServer.ListenAndServe()
if err != nil && !errors.Is(err, http.ErrServerClosed) {
netErr = errs.WrapMsg(err, fmt.Sprintf("api start err: %s", httpServer.Addr))
netDone <- struct{}{}
wg.Add(1)
httpServer := &http.Server{
Handler: router,
Addr: net.JoinHostPort(network.GetListenIP(config.API.Api.ListenIP), strconv.Itoa(apiPort)),
}
log.CInfo(ctx, "api server is init", "runtimeEnv", runtimeenv.RuntimeEnvironment(), "address", httpServer.Addr, "apiPort", apiPort)
go func() {
defer wg.Done()
<-ctx.Done()
if err := httpServer.Shutdown(context.Background()); err != nil {
log.ZWarn(ctx, "api server shutdown err", err)
}
}()
err := httpServer.ListenAndServe()
if err == nil {
err = errors.New("api done")
}
cancel(err)
}()
if config.Discovery.Enable == conf.ETCD {
cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), config.GetConfigNames())
cm.Watch(ctx)
}
//if config.Discovery.Enable == conf.ETCD {
// cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), config.GetConfigNames())
// cm.Watch(ctx)
//}
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGTERM)
shutdown := func() error {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
err := httpServer.Shutdown(ctx)
if err != nil {
return errs.WrapMsg(err, "shutdown err")
}
return nil
}
select {
case <-sigs:
program.SIGTERMExit()
if err := shutdown(); err != nil {
return err
}
case <-netDone:
close(netDone)
return netErr
case val := <-sigs:
log.ZDebug(ctx, "recv exit", "signal", val.String())
cancel(fmt.Errorf("signal %s", val.String()))
case <-ctx.Done():
}
return nil
exitCause := context.Cause(ctx)
log.ZWarn(ctx, "api server exit", exitCause)
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
timer := time.NewTimer(time.Second * 15)
defer timer.Stop()
select {
case <-timer.C:
log.ZWarn(ctx, "api server graceful stop timeout", nil)
case <-done:
log.ZDebug(ctx, "api server graceful stop done")
}
return exitCause
}