diff --git a/cmd/main.go b/cmd/main.go index 75bb39eed..c633b8800 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -30,7 +30,7 @@ import ( "github.com/openimsdk/open-im-server/v3/internal/rpc/relation" "github.com/openimsdk/open-im-server/v3/internal/rpc/third" "github.com/openimsdk/open-im-server/v3/internal/rpc/user" - "github.com/openimsdk/open-im-server/v3/internal/tools" + "github.com/openimsdk/open-im-server/v3/internal/tools/cron" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/tools/discovery" @@ -70,7 +70,7 @@ func main() { putCmd(cmd, true, msggateway.Start) putCmd(cmd, true, msgtransfer.Start) putCmd(cmd, true, api.Start) - putCmd(cmd, true, tools.Start) + putCmd(cmd, true, cron.Start) ctx := context.Background() if err := cmd.run(ctx); err != nil { fmt.Println(err) @@ -279,40 +279,45 @@ func (x *cmds) run(ctx context.Context) error { } }() } - var wg sync.WaitGroup + + var wait cmdManger for i := range x.cmds { cmd := x.cmds[i] if !cmd.Block { continue } - wg.Add(1) + wait.Start(cmd.Name) go func() { - defer wg.Done() + defer wait.Shutdown(cmd.Name) if err := cmd.Func(ctx); err != nil { cancel(fmt.Errorf("server %s exit %w", cmd.Name, err)) return } - if cmd.Block { - cancel(fmt.Errorf("server %s exit", cmd.Name)) - } + cancel(fmt.Errorf("server %s exit", cmd.Name)) }() } <-ctx.Done() exitCause := context.Cause(ctx) - log.ZWarn(ctx, "server exit cause", exitCause) - done := make(chan struct{}) - go func() { - wg.Wait() - close(done) - }() - select { - case <-time.After(time.Second * 20): - log.ZError(ctx, "server exit timeout", nil) - case <-done: - log.ZInfo(ctx, "server exit done") + log.ZWarn(ctx, "notification of service closure", exitCause) + done := wait.Wait() + <-done + timeout := time.NewTimer(time.Second * 10) + defer timeout.Stop() + for { + select { + case <-timeout.C: + log.ZWarn(ctx, "server exit timeout", nil, "running", wait.Running()) + return exitCause + case _, ok := <-done: + if ok { + log.ZWarn(ctx, "waiting for the service to exit", nil, "running", wait.Running()) + } else { + log.ZInfo(ctx, "all server exit done") + return exitCause + } + } } - return exitCause } func putCmd[C any](cmd *cmds, block bool, fn func(ctx context.Context, config *C, client discovery.Conn, server grpc.ServiceRegistrar) error) { @@ -328,3 +333,65 @@ func putCmd[C any](cmd *cmds, block bool, fn func(ctx context.Context, config *C return fn(ctx, &conf, standalone.GetDiscoveryConn(), standalone.GetServiceRegistrar()) }) } + +type cmdManger struct { + lock sync.Mutex + done chan struct{} + count int + names map[string]struct{} +} + +func (x *cmdManger) Start(name string) { + x.lock.Lock() + defer x.lock.Unlock() + if x.names == nil { + x.names = make(map[string]struct{}) + } + if x.done == nil { + x.done = make(chan struct{}, 1) + } + if _, ok := x.names[name]; ok { + panic(fmt.Errorf("cmd %s already exists", name)) + } + x.count++ + x.names[name] = struct{}{} +} + +func (x *cmdManger) Shutdown(name string) { + x.lock.Lock() + defer x.lock.Unlock() + if _, ok := x.names[name]; !ok { + panic(fmt.Errorf("cmd %s not exists", name)) + } + delete(x.names, name) + x.count-- + if x.count == 0 { + close(x.done) + } else { + select { + case x.done <- struct{}{}: + default: + } + } +} + +func (x *cmdManger) Wait() <-chan struct{} { + x.lock.Lock() + defer x.lock.Unlock() + if x.count == 0 || x.done == nil { + tmp := make(chan struct{}) + close(tmp) + return tmp + } + return x.done +} + +func (x *cmdManger) Running() []string { + x.lock.Lock() + defer x.lock.Unlock() + names := make([]string, 0, len(x.names)) + for name := range x.names { + names = append(names, name) + } + return names +} diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index e02940853..a24eba7ca 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -16,9 +16,7 @@ package msgtransfer import ( "context" - "os" - "os/signal" - "syscall" + "fmt" disetcd "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" @@ -35,7 +33,6 @@ import ( conf "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" "github.com/openimsdk/tools/log" - "github.com/openimsdk/tools/system/program" "google.golang.org/grpc" ) @@ -49,7 +46,7 @@ type MsgTransfer struct { //This consumer handle message to mongo historyMongoHandler *OnlineHistoryMongoConsumerHandler ctx context.Context - cancel context.CancelFunc + //cancel context.CancelFunc } type Config struct { @@ -148,19 +145,17 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, server gr historyMongoHandler: historyMongoHandler, } - return msgTransfer.Start(ctx, int(config.Index), config, client) + return msgTransfer.Start(ctx) } -func (m *MsgTransfer) Start(ctx context.Context, index int, config *Config, client discovery.Conn) error { - m.ctx, m.cancel = context.WithCancel(context.Background()) - var ( - netDone = make(chan struct{}, 1) - netErr error - ) +func (m *MsgTransfer) Start(ctx context.Context) error { + var cancel context.CancelCauseFunc + m.ctx, cancel = context.WithCancelCause(ctx) go func() { for { if err := m.historyConsumer.Subscribe(m.ctx, m.historyHandler.HandlerRedisMessage); err != nil { + cancel(fmt.Errorf("history consumer %w", err)) log.ZError(m.ctx, "historyConsumer err", err) return } @@ -174,6 +169,7 @@ func (m *MsgTransfer) Start(ctx context.Context, index int, config *Config, clie } for { if err := m.historyMongoConsumer.Subscribe(m.ctx, fn); err != nil { + cancel(fmt.Errorf("history mongo consumer %w", err)) log.ZError(m.ctx, "historyMongoConsumer err", err) return } @@ -181,31 +177,11 @@ func (m *MsgTransfer) Start(ctx context.Context, index int, config *Config, clie }() go m.historyHandler.HandleUserHasReadSeqMessages(m.ctx) + err := m.historyHandler.redisMessageBatches.Start() if err != nil { return err } - - // todo - sigs := make(chan os.Signal, 1) - signal.Notify(sigs, syscall.SIGTERM) - select { - case <-sigs: - program.SIGTERMExit() - // graceful close kafka client. - _ = m.historyConsumer.Close() - _ = m.historyMongoConsumer.Close() - m.cancel() - m.historyHandler.redisMessageBatches.Close() - m.historyHandler.Close() - return nil - case <-netDone: - _ = m.historyConsumer.Close() - _ = m.historyMongoConsumer.Close() - m.cancel() - m.historyHandler.redisMessageBatches.Close() - m.historyHandler.Close() - close(netDone) - return netErr - } + <-m.ctx.Done() + return context.Cause(m.ctx) } diff --git a/internal/tools/cron_task.go b/internal/tools/cron/cron_task.go similarity index 99% rename from internal/tools/cron_task.go rename to internal/tools/cron/cron_task.go index fcf00ba41..7ae314193 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron/cron_task.go @@ -1,4 +1,4 @@ -package tools +package cron import ( "context" diff --git a/internal/tools/cron_test.go b/internal/tools/cron/cron_test.go similarity index 99% rename from internal/tools/cron_test.go rename to internal/tools/cron/cron_test.go index 96068520c..b98b14f14 100644 --- a/internal/tools/cron_test.go +++ b/internal/tools/cron/cron_test.go @@ -1,4 +1,4 @@ -package tools +package cron import ( "context" diff --git a/internal/tools/msg.go b/internal/tools/cron/msg.go similarity index 98% rename from internal/tools/msg.go rename to internal/tools/cron/msg.go index cc00cc5b8..c75189047 100644 --- a/internal/tools/msg.go +++ b/internal/tools/cron/msg.go @@ -1,12 +1,13 @@ -package tools +package cron import ( "fmt" + "os" + "time" + "github.com/openimsdk/protocol/msg" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" - "os" - "time" ) func (c *cronServer) deleteMsg() { diff --git a/internal/tools/s3.go b/internal/tools/cron/s3.go similarity index 99% rename from internal/tools/s3.go rename to internal/tools/cron/s3.go index 9b6b9c408..c42e223b8 100644 --- a/internal/tools/s3.go +++ b/internal/tools/cron/s3.go @@ -1,12 +1,13 @@ -package tools +package cron import ( "fmt" + "os" + "time" + "github.com/openimsdk/protocol/third" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" - "os" - "time" ) func (c *cronServer) clearS3() { diff --git a/internal/tools/user_msg.go b/internal/tools/cron/user_msg.go similarity index 98% rename from internal/tools/user_msg.go rename to internal/tools/cron/user_msg.go index a4afa769e..0c00640a1 100644 --- a/internal/tools/user_msg.go +++ b/internal/tools/cron/user_msg.go @@ -1,12 +1,13 @@ -package tools +package cron import ( "fmt" + "os" + "time" + pbconversation "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" - "os" - "time" ) func (c *cronServer) clearUserMsg() { diff --git a/pkg/common/cmd/cron_task.go b/pkg/common/cmd/cron_task.go index eb8b1489e..c90bce174 100644 --- a/pkg/common/cmd/cron_task.go +++ b/pkg/common/cmd/cron_task.go @@ -17,7 +17,7 @@ package cmd import ( "context" - "github.com/openimsdk/open-im-server/v3/internal/tools" + "github.com/openimsdk/open-im-server/v3/internal/tools/cron" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" "github.com/openimsdk/open-im-server/v3/version" @@ -29,11 +29,11 @@ type CronTaskCmd struct { *RootCmd ctx context.Context configMap map[string]any - cronTaskConfig *tools.Config + cronTaskConfig *cron.Config } func NewCronTaskCmd() *CronTaskCmd { - var cronTaskConfig tools.Config + var cronTaskConfig cron.Config ret := &CronTaskCmd{cronTaskConfig: &cronTaskConfig} ret.configMap = map[string]any{ config.OpenIMCronTaskCfgFileName: &cronTaskConfig.CronTask, @@ -65,6 +65,6 @@ func (a *CronTaskCmd) runE() error { a.cronTaskConfig, []string{}, []string{}, - tools.Start, + cron.Start, ) }