From 31a912c013bacaee503a62b545c6b28d041dcc0b Mon Sep 17 00:00:00 2001
From: withchao <993506633@qq.com>
Date: Thu, 6 Feb 2025 18:54:13 +0800
Subject: [PATCH] fix: mq

---
 cmd/main.go                                   | 45 +++++++---
 internal/api/config_manager.go                | 14 +---
 internal/api/init.go                          | 11 +--
 internal/api/router.go                        |  2 +-
 internal/msggateway/init.go                   |  6 +-
 internal/msgtransfer/init.go                  | 78 +++++++++++------
 .../msgtransfer/online_history_msg_handler.go |  9 +-
 .../online_msg_to_mongo_handler.go            | 43 ++--------
 internal/push/onlinepusher.go                 |  5 +-
 internal/push/push.go                         |  9 +-
 internal/tools/cron_test.go                   |  2 +-
 pkg/common/config/config.go                   |  2 +
 pkg/common/config/constant.go                 |  4 +-
 pkg/common/config/load_config.go              |  5 +-
 pkg/common/discovery/discoveryregister.go     |  5 +-
 pkg/common/startrpc/start.go                  |  6 +-
 pkg/common/storage/database/black.go          | 83 +++++++++++++++++++
 17 files changed, 206 insertions(+), 123 deletions(-)

diff --git a/cmd/main.go b/cmd/main.go
index 23f325fac..56749861f 100644
--- a/cmd/main.go
+++ b/cmd/main.go
@@ -137,11 +137,25 @@ func (x *cmds) parseConf(conf any) error {
 		pkt := getTypePath(field.Type())
 		confData, ok := x.conf[pkt]
 		if !ok {
-			if typeField.Name == "FcmConfigPath" && field.Kind() == reflect.String {
+			switch field.Interface().(type) {
+			case config.Path:
 				field.SetString(x.confPath)
-				continue
+			case config.AllConfig:
+				var allConf config.AllConfig
+				if err := x.parseConf(&allConf); err != nil {
+					return err
+				}
+				field.Set(reflect.ValueOf(allConf))
+			case *config.AllConfig:
+				var allConf config.AllConfig
+				if err := x.parseConf(&allConf); err != nil {
+					return err
+				}
+				field.Set(reflect.ValueOf(&allConf))
+			default:
+				return fmt.Errorf("config field %s %s not found", vof.Type().Name(), typeField.Name)
 			}
-			return fmt.Errorf("config field %s %s not found", vof.Type().Name(), typeField.Name)
+			continue
 		}
 		if confData == nil {
 			continue
@@ -172,15 +186,24 @@ func (x *cmds) run(ctx context.Context) error {
 			return err
 		}
 	}
-	for _, cmd := range x.cmds {
-		fmt.Println("start", cmd.Name)
-		if err := cmd.Func(ctx); err != nil {
-			fmt.Println("start failed", cmd.Name, err)
-			return err
-		}
-		fmt.Println("start ok", cmd.Name)
+	if len(x.cmds) == 0 {
+		return fmt.Errorf("no command to run")
 	}
-	return nil
+	ctx, cancel := context.WithCancelCause(ctx)
+	for i := range x.cmds {
+		cmd := x.cmds[i]
+		go func() {
+			fmt.Println("start", cmd.Name)
+			if err := cmd.Func(ctx); err != nil {
+				fmt.Println("start failed", cmd.Name, err)
+				cancel(err)
+				return
+			}
+			fmt.Println("start end", cmd.Name)
+		}()
+	}
+	<-ctx.Done()
+	return context.Cause(ctx)
 }
 
 type cmdName struct {
diff --git a/internal/api/config_manager.go b/internal/api/config_manager.go
index c61b2cb0b..15f5e7004 100644
--- a/internal/api/config_manager.go
+++ b/internal/api/config_manager.go
@@ -30,16 +30,14 @@ type ConfigManager struct {
 	client        *clientv3.Client
 
 	configPath string
-	runtimeEnv string
 }
 
-func NewConfigManager(IMAdminUserID []string, cfg *config.AllConfig, client *clientv3.Client, configPath string, runtimeEnv string) *ConfigManager {
+func NewConfigManager(IMAdminUserID []string, cfg *config.AllConfig, client *clientv3.Client, configPath string) *ConfigManager {
 	cm := &ConfigManager{
 		imAdminUserID: IMAdminUserID,
 		config:        cfg,
 		client:        client,
 		configPath:    configPath,
-		runtimeEnv:    runtimeEnv,
 	}
 	return cm
 }
@@ -73,7 +71,7 @@ func (cm *ConfigManager) GetConfig(c *gin.Context) {
 func (cm *ConfigManager) GetConfigList(c *gin.Context) {
 	var resp apistruct.GetConfigListResp
 	resp.ConfigNames = cm.config.GetConfigNames()
-	resp.Environment = runtimeenv.PrintRuntimeEnvironment()
+	resp.Environment = runtimeenv.RuntimeEnvironment()
 	resp.Version = version.Version
 
 	apiresp.GinSuccess(c, resp)
@@ -209,13 +207,7 @@ func (cm *ConfigManager) resetConfig(c *gin.Context, checkChange bool, ops ...cl
 
 	changedKeys := make([]string, 0, len(configMap))
 	for k, v := range configMap {
-		err := config.Load(
-			cm.configPath,
-			k,
-			config.EnvPrefixMap[k],
-			cm.runtimeEnv,
-			v.new,
-		)
+		err := config.Load(cm.configPath, k, config.EnvPrefixMap[k], v.new)
 		if err != nil {
 			log.ZError(c, "load config failed", err)
 			continue
diff --git a/internal/api/init.go b/internal/api/init.go
index 20237ebc2..3b6a40913 100644
--- a/internal/api/init.go
+++ b/internal/api/init.go
@@ -44,10 +44,9 @@ import (
 )
 
 type Config struct {
-	*conf.AllConfig
+	conf.AllConfig
 
-	RuntimeEnv string
-	ConfigPath string
+	ConfigPath conf.Path
 }
 
 func Start(ctx context.Context, index int, config *Config) error {
@@ -56,9 +55,7 @@ func Start(ctx context.Context, index int, config *Config) error {
 		return err
 	}
 
-	config.RuntimeEnv = runtimeenv.PrintRuntimeEnvironment()
-
-	client, err := kdisc.NewDiscoveryRegister(&config.Discovery, config.RuntimeEnv, []string{
+	client, err := kdisc.NewDiscoveryRegister(&config.Discovery, []string{
 		config.Discovery.RpcService.MessageGateway,
 	})
 	if err != nil {
@@ -135,7 +132,7 @@ func Start(ctx context.Context, index int, config *Config) error {
 	address := net.JoinHostPort(network.GetListenIP(config.API.Api.ListenIP), strconv.Itoa(apiPort))
 
 	server := http.Server{Addr: address, Handler: router}
-	log.CInfo(ctx, "API server is initializing", "runtimeEnv", config.RuntimeEnv, "address", address, "apiPort", apiPort, "prometheusPort", prometheusPort)
+	log.CInfo(ctx, "API server is initializing", "runtimeEnv", runtimeenv.RuntimeEnvironment(), "address", address, "apiPort", apiPort, "prometheusPort", prometheusPort)
 	go func() {
 		err = server.ListenAndServe()
 		if err != nil && !errors.Is(err, http.ErrServerClosed) {
diff --git a/internal/api/router.go b/internal/api/router.go
index da9d22463..d2f5d2c61 100644
--- a/internal/api/router.go
+++ b/internal/api/router.go
@@ -301,7 +301,7 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, cf
 	if cfg.Discovery.Enable == config.ETCD {
 		etcdClient = client.(*etcd.SvcDiscoveryRegistryImpl).GetClient()
 	}
-	cm := NewConfigManager(cfg.Share.IMAdminUserID, cfg.AllConfig, etcdClient, cfg.ConfigPath, cfg.RuntimeEnv)
+	cm := NewConfigManager(cfg.Share.IMAdminUserID, &cfg.AllConfig, etcdClient, string(cfg.ConfigPath))
 	{
 
 		configGroup := r.Group("/config", cm.CheckAdmin)
diff --git a/internal/msggateway/init.go b/internal/msggateway/init.go
index f11bc3faf..3cbd1a06b 100644
--- a/internal/msggateway/init.go
+++ b/internal/msggateway/init.go
@@ -33,15 +33,11 @@ type Config struct {
 	RedisConfig    config.Redis
 	WebhooksConfig config.Webhooks
 	Discovery      config.Discovery
-
-	runtimeEnv string
 }
 
 // Start run ws server.
 func Start(ctx context.Context, index int, conf *Config) error {
-	conf.runtimeEnv = runtimeenv.PrintRuntimeEnvironment()
-
-	log.CInfo(ctx, "MSG-GATEWAY server is initializing", "runtimeEnv", conf.runtimeEnv,
+	log.CInfo(ctx, "MSG-GATEWAY server is initializing", "runtimeEnv", runtimeenv.RuntimeEnvironment(),
 		"rpcPorts", conf.MsgGateway.RPC.Ports,
 		"wsPort", conf.MsgGateway.LongConnSvr.Ports, "prometheusPorts", conf.MsgGateway.Prometheus.Ports)
 	wsPort, err := datautil.GetElemByIndex(conf.MsgGateway.LongConnSvr.Ports, index)
diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go
index be0e94ba7..bf1c98f45 100644
--- a/internal/msgtransfer/init.go
+++ b/internal/msgtransfer/init.go
@@ -29,6 +29,7 @@ import (
 	"github.com/openimsdk/open-im-server/v3/pkg/mqbuild"
 	"github.com/openimsdk/tools/discovery"
 	"github.com/openimsdk/tools/discovery/etcd"
+	"github.com/openimsdk/tools/mq"
 	"github.com/openimsdk/tools/utils/jsonutil"
 	"github.com/openimsdk/tools/utils/network"
 
@@ -52,16 +53,16 @@ import (
 )
 
 type MsgTransfer struct {
+	historyConsumer      mq.Consumer
+	historyMongoConsumer mq.Consumer
 	// This consumer aggregated messages, subscribed to the topic:toRedis,
 	//  the message is stored in redis, Incr Redis, and then the message is sent to toPush topic for push,
 	// and the message is sent to toMongo topic for persistence
-	historyCH *OnlineHistoryRedisConsumerHandler
+	historyHandler *OnlineHistoryRedisConsumerHandler
 	//This consumer handle message to mongo
-	historyMongoCH *OnlineHistoryMongoConsumerHandler
-	ctx            context.Context
-	cancel         context.CancelFunc
-
-	runTimeEnv string
+	historyMongoHandler *OnlineHistoryMongoConsumerHandler
+	ctx                 context.Context
+	cancel              context.CancelFunc
 }
 
 type Config struct {
@@ -75,11 +76,10 @@ type Config struct {
 }
 
 func Start(ctx context.Context, index int, config *Config) error {
-	runTimeEnv := runtimeenv.PrintRuntimeEnvironment()
 
 	builder := mqbuild.NewBuilder(&config.Discovery, &config.KafkaConfig)
 
-	log.CInfo(ctx, "MSG-TRANSFER server is initializing", "runTimeEnv", runTimeEnv, "prometheusPorts",
+	log.CInfo(ctx, "MSG-TRANSFER server is initializing", "runTimeEnv", runtimeenv.RuntimeEnvironment(), "prometheusPorts",
 		config.MsgTransfer.Prometheus.Ports, "index", index)
 
 	mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build())
@@ -90,7 +90,7 @@ func Start(ctx context.Context, index int, config *Config) error {
 	if err != nil {
 		return err
 	}
-	client, err := discRegister.NewDiscoveryRegister(&config.Discovery, runTimeEnv, nil)
+	client, err := discRegister.NewDiscoveryRegister(&config.Discovery, nil)
 	if err != nil {
 		return err
 	}
@@ -137,19 +137,25 @@ func Start(ctx context.Context, index int, config *Config) error {
 	if err != nil {
 		return err
 	}
-	historyCH, err := NewOnlineHistoryRedisConsumerHandler(ctx, client, config, msgTransferDatabase)
+	historyConsumer, err := builder.GetTopicConsumer(ctx, config.KafkaConfig.ToRedisTopic)
 	if err != nil {
 		return err
 	}
-	historyMongoCH, err := NewOnlineHistoryMongoConsumerHandler(&config.KafkaConfig, msgTransferDatabase)
+	historyMongoConsumer, err := builder.GetTopicConsumer(ctx, config.KafkaConfig.ToMongoTopic)
 	if err != nil {
 		return err
 	}
+	historyHandler, err := NewOnlineHistoryRedisConsumerHandler(ctx, client, config, msgTransferDatabase)
+	if err != nil {
+		return err
+	}
+	historyMongoHandler := NewOnlineHistoryMongoConsumerHandler(msgTransferDatabase)
 
 	msgTransfer := &MsgTransfer{
-		historyCH:      historyCH,
-		historyMongoCH: historyMongoCH,
-		runTimeEnv:     runTimeEnv,
+		historyConsumer:      historyConsumer,
+		historyMongoConsumer: historyMongoConsumer,
+		historyHandler:       historyHandler,
+		historyMongoHandler:  historyMongoHandler,
 	}
 
 	return msgTransfer.Start(index, config, client)
@@ -162,10 +168,30 @@ func (m *MsgTransfer) Start(index int, config *Config, client discovery.SvcDisco
 		netErr  error
 	)
 
-	go m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyCH)
-	go m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyMongoCH)
-	go m.historyCH.HandleUserHasReadSeqMessages(m.ctx)
-	err := m.historyCH.redisMessageBatches.Start()
+	go func() {
+		for {
+			if err := m.historyConsumer.Subscribe(m.ctx, m.historyHandler.HandlerRedisMessage); err != nil {
+				log.ZError(m.ctx, "historyConsumer err", err)
+				return
+			}
+		}
+	}()
+
+	go func() {
+		fn := func(ctx context.Context, key string, value []byte) error {
+			m.historyMongoHandler.HandleChatWs2Mongo(ctx, key, value)
+			return nil
+		}
+		for {
+			if err := m.historyMongoConsumer.Subscribe(m.ctx, fn); err != nil {
+				log.ZError(m.ctx, "historyMongoConsumer err", err)
+				return
+			}
+		}
+	}()
+
+	go m.historyHandler.HandleUserHasReadSeqMessages(m.ctx)
+	err := m.historyHandler.redisMessageBatches.Start()
 	if err != nil {
 		return err
 	}
@@ -237,18 +263,18 @@ func (m *MsgTransfer) Start(index int, config *Config, client discovery.SvcDisco
 	case <-sigs:
 		program.SIGTERMExit()
 		// graceful close kafka client.
+		_ = m.historyConsumer.Close()
+		_ = m.historyMongoConsumer.Close()
 		m.cancel()
-		m.historyCH.redisMessageBatches.Close()
-		m.historyCH.Close()
-		m.historyCH.historyConsumerGroup.Close()
-		m.historyMongoCH.historyConsumerGroup.Close()
+		m.historyHandler.redisMessageBatches.Close()
+		m.historyHandler.Close()
 		return nil
 	case <-netDone:
+		_ = m.historyConsumer.Close()
+		_ = m.historyMongoConsumer.Close()
 		m.cancel()
-		m.historyCH.redisMessageBatches.Close()
-		m.historyCH.Close()
-		m.historyCH.historyConsumerGroup.Close()
-		m.historyMongoCH.historyConsumerGroup.Close()
+		m.historyHandler.redisMessageBatches.Close()
+		m.historyHandler.Close()
 		close(netDone)
 		return netErr
 	}
diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go
index 66bc562dc..bbf204b47 100644
--- a/internal/msgtransfer/online_history_msg_handler.go
+++ b/internal/msgtransfer/online_history_msg_handler.go
@@ -19,13 +19,12 @@ import (
 	"encoding/json"
 	"errors"
 
-	"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
-	"github.com/openimsdk/tools/discovery"
-	"github.com/openimsdk/tools/mq"
-
 	"sync"
 	"time"
 
+	"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
+	"github.com/openimsdk/tools/discovery"
+
 	"github.com/go-redis/redis"
 	"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
@@ -79,7 +78,7 @@ type ConsumerMessage struct {
 	Value []byte
 }
 
-func NewOnlineHistoryRedisConsumerHandler(ctx context.Context, client discovery.SvcDiscoveryRegistry, config *Config, database controller.MsgTransferDatabase, historyConsumer mq.Consumer) (*OnlineHistoryRedisConsumerHandler, error) {
+func NewOnlineHistoryRedisConsumerHandler(ctx context.Context, client discovery.SvcDiscoveryRegistry, config *Config, database controller.MsgTransferDatabase) (*OnlineHistoryRedisConsumerHandler, error) {
 	groupConn, err := client.GetConn(ctx, config.Discovery.RpcService.Group)
 	if err != nil {
 		return nil, err
diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go
index d8836d54e..3e08596ec 100644
--- a/internal/msgtransfer/online_msg_to_mongo_handler.go
+++ b/internal/msgtransfer/online_msg_to_mongo_handler.go
@@ -17,36 +17,24 @@ package msgtransfer
 import (
 	"context"
 
-	"github.com/IBM/sarama"
-	"github.com/openimsdk/open-im-server/v3/pkg/common/config"
 	"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
 	pbmsg "github.com/openimsdk/protocol/msg"
 	"github.com/openimsdk/tools/log"
-	"github.com/openimsdk/tools/mq/kafka"
 	"google.golang.org/protobuf/proto"
 )
 
 type OnlineHistoryMongoConsumerHandler struct {
-	historyConsumerGroup *kafka.MConsumerGroup
-	msgTransferDatabase  controller.MsgTransferDatabase
+	msgTransferDatabase controller.MsgTransferDatabase
 }
 
-func NewOnlineHistoryMongoConsumerHandler(kafkaConf *config.Kafka, database controller.MsgTransferDatabase) (*OnlineHistoryMongoConsumerHandler, error) {
-	historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToMongoGroupID, []string{kafkaConf.ToMongoTopic}, true)
-	if err != nil {
-		return nil, err
+func NewOnlineHistoryMongoConsumerHandler(database controller.MsgTransferDatabase) *OnlineHistoryMongoConsumerHandler {
+	return &OnlineHistoryMongoConsumerHandler{
+		msgTransferDatabase: database,
 	}
-
-	mc := &OnlineHistoryMongoConsumerHandler{
-		historyConsumerGroup: historyConsumerGroup,
-		msgTransferDatabase:  database,
-	}
-	return mc, nil
 }
 
-func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Context, cMsg *sarama.ConsumerMessage, key string, session sarama.ConsumerGroupSession) {
-	msg := cMsg.Value
+func (mc *OnlineHistoryMongoConsumerHandler) HandleChatWs2Mongo(ctx context.Context, key string, msg []byte) {
 	msgFromMQ := pbmsg.MsgDataToMongoByMQ{}
 	err := proto.Unmarshal(msg, &msgFromMQ)
 	if err != nil {
@@ -54,7 +42,7 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont
 		return
 	}
 	if len(msgFromMQ.MsgData) == 0 {
-		log.ZError(ctx, "msgFromMQ.MsgData is empty", nil, "cMsg", cMsg)
+		log.ZError(ctx, "msgFromMQ.MsgData is empty", nil, "key", key, "msg", msg)
 		return
 	}
 	log.ZDebug(ctx, "mongo consumer recv msg", "msgs", msgFromMQ.String())
@@ -78,22 +66,3 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont
 		seqs = append(seqs, msg.Seq)
 	}
 }
-
-func (*OnlineHistoryMongoConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
-
-func (*OnlineHistoryMongoConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
-
-func (mc *OnlineHistoryMongoConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { // an instance in the consumer group
-	log.ZDebug(context.Background(), "online new session msg come", "highWaterMarkOffset",
-		claim.HighWaterMarkOffset(), "topic", claim.Topic(), "partition", claim.Partition())
-	for msg := range claim.Messages() {
-		ctx := mc.historyConsumerGroup.GetContextFromMsg(msg)
-		if len(msg.Value) != 0 {
-			mc.handleChatWs2Mongo(ctx, msg, string(msg.Key), sess)
-		} else {
-			log.ZError(ctx, "mongo msg get from kafka but is nil", nil, "conversationID", msg.Key)
-		}
-		sess.MarkMessage(msg, "")
-	}
-	return nil
-}
diff --git a/internal/push/onlinepusher.go b/internal/push/onlinepusher.go
index dc0b37089..4ebe4413a 100644
--- a/internal/push/onlinepusher.go
+++ b/internal/push/onlinepusher.go
@@ -11,6 +11,7 @@ import (
 	"github.com/openimsdk/tools/errs"
 	"github.com/openimsdk/tools/log"
 	"github.com/openimsdk/tools/utils/datautil"
+	"github.com/openimsdk/tools/utils/runtimeenv"
 	"golang.org/x/sync/errgroup"
 	"google.golang.org/grpc"
 
@@ -40,14 +41,14 @@ func (u emptyOnlinePusher) GetOnlinePushFailedUserIDs(ctx context.Context, msg *
 }
 
 func NewOnlinePusher(disCov discovery.Conn, config *Config) OnlinePusher {
-	if config.runTimeEnv == conf.KUBERNETES {
+	if runtimeenv.RuntimeEnvironment() == conf.KUBERNETES {
 		return NewDefaultAllNode(disCov, config)
 	}
 	switch config.Discovery.Enable {
 	case conf.ETCD:
 		return NewDefaultAllNode(disCov, config)
 	default:
-		log.ZError(context.Background(), "NewOnlinePusher is error", errs.Wrap(errors.New("unsupported discovery type")), "type", config.Discovery.Enable)
+		log.ZWarn(context.Background(), "NewOnlinePusher is error", errs.Wrap(errors.New("unsupported discovery type")), "type", config.Discovery.Enable)
 		return nil
 	}
 }
diff --git a/internal/push/push.go b/internal/push/push.go
index cab910da6..5c2e34ab4 100644
--- a/internal/push/push.go
+++ b/internal/push/push.go
@@ -15,7 +15,6 @@ import (
 	"github.com/openimsdk/tools/discovery"
 	"github.com/openimsdk/tools/log"
 	"github.com/openimsdk/tools/mcontext"
-	"github.com/openimsdk/tools/utils/runtimeenv"
 	"google.golang.org/grpc"
 )
 
@@ -35,9 +34,7 @@ type Config struct {
 	WebhooksConfig     config.Webhooks
 	LocalCacheConfig   config.LocalCache
 	Discovery          config.Discovery
-	FcmConfigPath      string
-
-	runTimeEnv string
+	FcmConfigPath      config.Path
 }
 
 func (p pushServer) DelUserPushToken(ctx context.Context,
@@ -49,14 +46,12 @@ func (p pushServer) DelUserPushToken(ctx context.Context,
 }
 
 func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error {
-	config.runTimeEnv = runtimeenv.PrintRuntimeEnvironment()
-
 	rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build())
 	if err != nil {
 		return err
 	}
 	cacheModel := redis.NewThirdCache(rdb)
-	offlinePusher, err := offlinepush.NewOfflinePusher(&config.RpcConfig, cacheModel, config.FcmConfigPath)
+	offlinePusher, err := offlinepush.NewOfflinePusher(&config.RpcConfig, cacheModel, string(config.FcmConfigPath))
 	if err != nil {
 		return err
 	}
diff --git a/internal/tools/cron_test.go b/internal/tools/cron_test.go
index b4082a5a5..96068520c 100644
--- a/internal/tools/cron_test.go
+++ b/internal/tools/cron_test.go
@@ -24,7 +24,7 @@ func TestName(t *testing.T) {
 			Address:       []string{"localhost:12379"},
 		},
 	}
-	client, err := kdisc.NewDiscoveryRegister(conf, "source")
+	client, err := kdisc.NewDiscoveryRegister(conf, nil)
 	if err != nil {
 		panic(err)
 	}
diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go
index 2315522f3..1a87fe1d6 100644
--- a/pkg/common/config/config.go
+++ b/pkg/common/config/config.go
@@ -30,6 +30,8 @@ import (
 
 const StructTagName = "yaml"
 
+type Path string
+
 type CacheConfig struct {
 	Topic         string `yaml:"topic"`
 	SlotNum       int    `yaml:"slotNum"`
diff --git a/pkg/common/config/constant.go b/pkg/common/config/constant.go
index 3a83faaa3..066fcd5c0 100644
--- a/pkg/common/config/constant.go
+++ b/pkg/common/config/constant.go
@@ -14,12 +14,14 @@
 
 package config
 
+import "github.com/openimsdk/tools/utils/runtimeenv"
+
 const ConfKey = "conf"
 
 const (
 	MountConfigFilePath = "CONFIG_PATH"
 	DeploymentType      = "DEPLOYMENT_TYPE"
-	KUBERNETES          = "kubernetes"
+	KUBERNETES          = runtimeenv.Kubernetes
 	ETCD                = "etcd"
 	Standalone          = "standalone"
 )
diff --git a/pkg/common/config/load_config.go b/pkg/common/config/load_config.go
index 76c724b2a..142b704e1 100644
--- a/pkg/common/config/load_config.go
+++ b/pkg/common/config/load_config.go
@@ -7,11 +7,12 @@ import (
 
 	"github.com/mitchellh/mapstructure"
 	"github.com/openimsdk/tools/errs"
+	"github.com/openimsdk/tools/utils/runtimeenv"
 	"github.com/spf13/viper"
 )
 
-func Load(configDirectory string, configFileName string, envPrefix string, runtimeEnv string, config any) error {
-	if runtimeEnv == KUBERNETES {
+func Load(configDirectory string, configFileName string, envPrefix string, config any) error {
+	if runtimeenv.RuntimeEnvironment() == KUBERNETES {
 		mountPath := os.Getenv(MountConfigFilePath)
 		if mountPath == "" {
 			return errs.ErrArgs.WrapMsg(MountConfigFilePath + " env is empty")
diff --git a/pkg/common/discovery/discoveryregister.go b/pkg/common/discovery/discoveryregister.go
index 97b5c4988..56160b9a9 100644
--- a/pkg/common/discovery/discoveryregister.go
+++ b/pkg/common/discovery/discoveryregister.go
@@ -20,6 +20,7 @@ import (
 	"github.com/openimsdk/open-im-server/v3/pkg/common/config"
 	"github.com/openimsdk/tools/discovery"
 	"github.com/openimsdk/tools/discovery/standalone"
+	"github.com/openimsdk/tools/utils/runtimeenv"
 	"google.golang.org/grpc"
 
 	"github.com/openimsdk/tools/discovery/kubernetes"
@@ -29,8 +30,8 @@ import (
 )
 
 // NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type.
-func NewDiscoveryRegister(discovery *config.Discovery, runtimeEnv string, watchNames []string) (discovery.SvcDiscoveryRegistry, error) {
-	if runtimeEnv == config.KUBERNETES {
+func NewDiscoveryRegister(discovery *config.Discovery, watchNames []string) (discovery.SvcDiscoveryRegistry, error) {
+	if runtimeenv.RuntimeEnvironment() == config.KUBERNETES {
 		return kubernetes.NewKubernetesConnManager(discovery.Kubernetes.Namespace,
 			grpc.WithDefaultCallOptions(
 				grpc.MaxCallSendMsgSize(1024*1024*20),
diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go
index 41e1edc1e..b0b6c9a36 100644
--- a/pkg/common/startrpc/start.go
+++ b/pkg/common/startrpc/start.go
@@ -33,8 +33,6 @@ import (
 	"github.com/openimsdk/tools/utils/jsonutil"
 	"google.golang.org/grpc/status"
 
-	"github.com/openimsdk/tools/utils/runtimeenv"
-
 	kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discovery"
 	"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
 	"github.com/openimsdk/tools/discovery"
@@ -74,8 +72,6 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf
 		return err
 	}
 
-	runTimeEnv := runtimeenv.PrintRuntimeEnvironment()
-
 	if !autoSetPorts {
 		rpcPort, err := datautil.GetElemByIndex(rpcPorts, index)
 		if err != nil {
@@ -99,7 +95,7 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf
 	if autoSetPorts && discovery.Enable != conf.ETCD {
 		return errs.New("only etcd support autoSetPorts", "rpcRegisterName", rpcRegisterName).Wrap()
 	}
-	client, err := kdisc.NewDiscoveryRegister(discovery, runTimeEnv, watchServiceNames)
+	client, err := kdisc.NewDiscoveryRegister(discovery, watchServiceNames)
 	if err != nil {
 		return err
 	}
diff --git a/pkg/common/storage/database/black.go b/pkg/common/storage/database/black.go
index b53fdd14d..5af960abe 100644
--- a/pkg/common/storage/database/black.go
+++ b/pkg/common/storage/database/black.go
@@ -16,6 +16,7 @@ package database
 
 import (
 	"context"
+
 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
 	"github.com/openimsdk/tools/db/pagination"
 )
@@ -29,3 +30,85 @@ type Black interface {
 	FindOwnerBlackInfos(ctx context.Context, ownerUserID string, userIDs []string) (blacks []*model.Black, err error)
 	FindBlackUserIDs(ctx context.Context, ownerUserID string) (blackUserIDs []string, err error)
 }
+
+var (
+	_ Black = (*mgoImpl)(nil)
+	_ Black = (*redisImpl)(nil)
+)
+
+type mgoImpl struct {
+}
+
+func (m *mgoImpl) Create(ctx context.Context, blacks []*model.Black) (err error) {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (m *mgoImpl) Delete(ctx context.Context, blacks []*model.Black) (err error) {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (m *mgoImpl) Find(ctx context.Context, blacks []*model.Black) (blackList []*model.Black, err error) {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (m *mgoImpl) Take(ctx context.Context, ownerUserID, blockUserID string) (black *model.Black, err error) {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (m *mgoImpl) FindOwnerBlacks(ctx context.Context, ownerUserID string, pagination pagination.Pagination) (total int64, blacks []*model.Black, err error) {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (m *mgoImpl) FindOwnerBlackInfos(ctx context.Context, ownerUserID string, userIDs []string) (blacks []*model.Black, err error) {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (m *mgoImpl) FindBlackUserIDs(ctx context.Context, ownerUserID string) (blackUserIDs []string, err error) {
+	//TODO implement me
+	panic("implement me")
+}
+
+type redisImpl struct {
+}
+
+func (r *redisImpl) Create(ctx context.Context, blacks []*model.Black) (err error) {
+
+	//TODO implement me
+	panic("implement me")
+}
+
+func (r *redisImpl) Delete(ctx context.Context, blacks []*model.Black) (err error) {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (r *redisImpl) Find(ctx context.Context, blacks []*model.Black) (blackList []*model.Black, err error) {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (r *redisImpl) Take(ctx context.Context, ownerUserID, blockUserID string) (black *model.Black, err error) {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (r *redisImpl) FindOwnerBlacks(ctx context.Context, ownerUserID string, pagination pagination.Pagination) (total int64, blacks []*model.Black, err error) {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (r *redisImpl) FindOwnerBlackInfos(ctx context.Context, ownerUserID string, userIDs []string) (blacks []*model.Black, err error) {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (r *redisImpl) FindBlackUserIDs(ctx context.Context, ownerUserID string) (blackUserIDs []string, err error) {
+	//TODO implement me
+	panic("implement me")
+}