This commit is contained in:
withchao 2025-02-13 16:58:07 +08:00
parent a16b95b25f
commit a3e41c6dff
8 changed files with 116 additions and 70 deletions

View File

@ -30,7 +30,7 @@ import (
"github.com/openimsdk/open-im-server/v3/internal/rpc/relation"
"github.com/openimsdk/open-im-server/v3/internal/rpc/third"
"github.com/openimsdk/open-im-server/v3/internal/rpc/user"
"github.com/openimsdk/open-im-server/v3/internal/tools"
"github.com/openimsdk/open-im-server/v3/internal/tools/cron"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/tools/discovery"
@ -70,7 +70,7 @@ func main() {
putCmd(cmd, true, msggateway.Start)
putCmd(cmd, true, msgtransfer.Start)
putCmd(cmd, true, api.Start)
putCmd(cmd, true, tools.Start)
putCmd(cmd, true, cron.Start)
ctx := context.Background()
if err := cmd.run(ctx); err != nil {
fmt.Println(err)
@ -279,40 +279,45 @@ func (x *cmds) run(ctx context.Context) error {
}
}()
}
var wg sync.WaitGroup
var wait cmdManger
for i := range x.cmds {
cmd := x.cmds[i]
if !cmd.Block {
continue
}
wg.Add(1)
wait.Start(cmd.Name)
go func() {
defer wg.Done()
defer wait.Shutdown(cmd.Name)
if err := cmd.Func(ctx); err != nil {
cancel(fmt.Errorf("server %s exit %w", cmd.Name, err))
return
}
if cmd.Block {
cancel(fmt.Errorf("server %s exit", cmd.Name))
}
cancel(fmt.Errorf("server %s exit", cmd.Name))
}()
}
<-ctx.Done()
exitCause := context.Cause(ctx)
log.ZWarn(ctx, "server exit cause", exitCause)
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-time.After(time.Second * 20):
log.ZError(ctx, "server exit timeout", nil)
case <-done:
log.ZInfo(ctx, "server exit done")
log.ZWarn(ctx, "notification of service closure", exitCause)
done := wait.Wait()
<-done
timeout := time.NewTimer(time.Second * 10)
defer timeout.Stop()
for {
select {
case <-timeout.C:
log.ZWarn(ctx, "server exit timeout", nil, "running", wait.Running())
return exitCause
case _, ok := <-done:
if ok {
log.ZWarn(ctx, "waiting for the service to exit", nil, "running", wait.Running())
} else {
log.ZInfo(ctx, "all server exit done")
return exitCause
}
}
}
return exitCause
}
func putCmd[C any](cmd *cmds, block bool, fn func(ctx context.Context, config *C, client discovery.Conn, server grpc.ServiceRegistrar) error) {
@ -328,3 +333,65 @@ func putCmd[C any](cmd *cmds, block bool, fn func(ctx context.Context, config *C
return fn(ctx, &conf, standalone.GetDiscoveryConn(), standalone.GetServiceRegistrar())
})
}
type cmdManger struct {
lock sync.Mutex
done chan struct{}
count int
names map[string]struct{}
}
func (x *cmdManger) Start(name string) {
x.lock.Lock()
defer x.lock.Unlock()
if x.names == nil {
x.names = make(map[string]struct{})
}
if x.done == nil {
x.done = make(chan struct{}, 1)
}
if _, ok := x.names[name]; ok {
panic(fmt.Errorf("cmd %s already exists", name))
}
x.count++
x.names[name] = struct{}{}
}
func (x *cmdManger) Shutdown(name string) {
x.lock.Lock()
defer x.lock.Unlock()
if _, ok := x.names[name]; !ok {
panic(fmt.Errorf("cmd %s not exists", name))
}
delete(x.names, name)
x.count--
if x.count == 0 {
close(x.done)
} else {
select {
case x.done <- struct{}{}:
default:
}
}
}
func (x *cmdManger) Wait() <-chan struct{} {
x.lock.Lock()
defer x.lock.Unlock()
if x.count == 0 || x.done == nil {
tmp := make(chan struct{})
close(tmp)
return tmp
}
return x.done
}
func (x *cmdManger) Running() []string {
x.lock.Lock()
defer x.lock.Unlock()
names := make([]string, 0, len(x.names))
for name := range x.names {
names = append(names, name)
}
return names
}

View File

@ -16,9 +16,7 @@ package msgtransfer
import (
"context"
"os"
"os/signal"
"syscall"
"fmt"
disetcd "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
@ -35,7 +33,6 @@ import (
conf "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/system/program"
"google.golang.org/grpc"
)
@ -49,7 +46,7 @@ type MsgTransfer struct {
//This consumer handle message to mongo
historyMongoHandler *OnlineHistoryMongoConsumerHandler
ctx context.Context
cancel context.CancelFunc
//cancel context.CancelFunc
}
type Config struct {
@ -148,19 +145,17 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, server gr
historyMongoHandler: historyMongoHandler,
}
return msgTransfer.Start(ctx, int(config.Index), config, client)
return msgTransfer.Start(ctx)
}
func (m *MsgTransfer) Start(ctx context.Context, index int, config *Config, client discovery.Conn) error {
m.ctx, m.cancel = context.WithCancel(context.Background())
var (
netDone = make(chan struct{}, 1)
netErr error
)
func (m *MsgTransfer) Start(ctx context.Context) error {
var cancel context.CancelCauseFunc
m.ctx, cancel = context.WithCancelCause(ctx)
go func() {
for {
if err := m.historyConsumer.Subscribe(m.ctx, m.historyHandler.HandlerRedisMessage); err != nil {
cancel(fmt.Errorf("history consumer %w", err))
log.ZError(m.ctx, "historyConsumer err", err)
return
}
@ -174,6 +169,7 @@ func (m *MsgTransfer) Start(ctx context.Context, index int, config *Config, clie
}
for {
if err := m.historyMongoConsumer.Subscribe(m.ctx, fn); err != nil {
cancel(fmt.Errorf("history mongo consumer %w", err))
log.ZError(m.ctx, "historyMongoConsumer err", err)
return
}
@ -181,31 +177,11 @@ func (m *MsgTransfer) Start(ctx context.Context, index int, config *Config, clie
}()
go m.historyHandler.HandleUserHasReadSeqMessages(m.ctx)
err := m.historyHandler.redisMessageBatches.Start()
if err != nil {
return err
}
// todo
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGTERM)
select {
case <-sigs:
program.SIGTERMExit()
// graceful close kafka client.
_ = m.historyConsumer.Close()
_ = m.historyMongoConsumer.Close()
m.cancel()
m.historyHandler.redisMessageBatches.Close()
m.historyHandler.Close()
return nil
case <-netDone:
_ = m.historyConsumer.Close()
_ = m.historyMongoConsumer.Close()
m.cancel()
m.historyHandler.redisMessageBatches.Close()
m.historyHandler.Close()
close(netDone)
return netErr
}
<-m.ctx.Done()
return context.Cause(m.ctx)
}

View File

@ -1,4 +1,4 @@
package tools
package cron
import (
"context"

View File

@ -1,4 +1,4 @@
package tools
package cron
import (
"context"

View File

@ -1,12 +1,13 @@
package tools
package cron
import (
"fmt"
"os"
"time"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
"os"
"time"
)
func (c *cronServer) deleteMsg() {

View File

@ -1,12 +1,13 @@
package tools
package cron
import (
"fmt"
"os"
"time"
"github.com/openimsdk/protocol/third"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
"os"
"time"
)
func (c *cronServer) clearS3() {

View File

@ -1,12 +1,13 @@
package tools
package cron
import (
"fmt"
"os"
"time"
pbconversation "github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
"os"
"time"
)
func (c *cronServer) clearUserMsg() {

View File

@ -17,7 +17,7 @@ package cmd
import (
"context"
"github.com/openimsdk/open-im-server/v3/internal/tools"
"github.com/openimsdk/open-im-server/v3/internal/tools/cron"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
"github.com/openimsdk/open-im-server/v3/version"
@ -29,11 +29,11 @@ type CronTaskCmd struct {
*RootCmd
ctx context.Context
configMap map[string]any
cronTaskConfig *tools.Config
cronTaskConfig *cron.Config
}
func NewCronTaskCmd() *CronTaskCmd {
var cronTaskConfig tools.Config
var cronTaskConfig cron.Config
ret := &CronTaskCmd{cronTaskConfig: &cronTaskConfig}
ret.configMap = map[string]any{
config.OpenIMCronTaskCfgFileName: &cronTaskConfig.CronTask,
@ -65,6 +65,6 @@ func (a *CronTaskCmd) runE() error {
a.cronTaskConfig,
[]string{},
[]string{},
tools.Start,
cron.Start,
)
}