diff --git a/cmd/main.go b/cmd/main.go index 23f325fac..56749861f 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -137,11 +137,25 @@ func (x *cmds) parseConf(conf any) error { pkt := getTypePath(field.Type()) confData, ok := x.conf[pkt] if !ok { - if typeField.Name == "FcmConfigPath" && field.Kind() == reflect.String { + switch field.Interface().(type) { + case config.Path: field.SetString(x.confPath) - continue + case config.AllConfig: + var allConf config.AllConfig + if err := x.parseConf(&allConf); err != nil { + return err + } + field.Set(reflect.ValueOf(allConf)) + case *config.AllConfig: + var allConf config.AllConfig + if err := x.parseConf(&allConf); err != nil { + return err + } + field.Set(reflect.ValueOf(&allConf)) + default: + return fmt.Errorf("config field %s %s not found", vof.Type().Name(), typeField.Name) } - return fmt.Errorf("config field %s %s not found", vof.Type().Name(), typeField.Name) + continue } if confData == nil { continue @@ -172,15 +186,24 @@ func (x *cmds) run(ctx context.Context) error { return err } } - for _, cmd := range x.cmds { - fmt.Println("start", cmd.Name) - if err := cmd.Func(ctx); err != nil { - fmt.Println("start failed", cmd.Name, err) - return err - } - fmt.Println("start ok", cmd.Name) + if len(x.cmds) == 0 { + return fmt.Errorf("no command to run") } - return nil + ctx, cancel := context.WithCancelCause(ctx) + for i := range x.cmds { + cmd := x.cmds[i] + go func() { + fmt.Println("start", cmd.Name) + if err := cmd.Func(ctx); err != nil { + fmt.Println("start failed", cmd.Name, err) + cancel(err) + return + } + fmt.Println("start end", cmd.Name) + }() + } + <-ctx.Done() + return context.Cause(ctx) } type cmdName struct { diff --git a/internal/api/config_manager.go b/internal/api/config_manager.go index c61b2cb0b..15f5e7004 100644 --- a/internal/api/config_manager.go +++ b/internal/api/config_manager.go @@ -30,16 +30,14 @@ type ConfigManager struct { client *clientv3.Client configPath string - runtimeEnv string } -func NewConfigManager(IMAdminUserID []string, cfg *config.AllConfig, client *clientv3.Client, configPath string, runtimeEnv string) *ConfigManager { +func NewConfigManager(IMAdminUserID []string, cfg *config.AllConfig, client *clientv3.Client, configPath string) *ConfigManager { cm := &ConfigManager{ imAdminUserID: IMAdminUserID, config: cfg, client: client, configPath: configPath, - runtimeEnv: runtimeEnv, } return cm } @@ -73,7 +71,7 @@ func (cm *ConfigManager) GetConfig(c *gin.Context) { func (cm *ConfigManager) GetConfigList(c *gin.Context) { var resp apistruct.GetConfigListResp resp.ConfigNames = cm.config.GetConfigNames() - resp.Environment = runtimeenv.PrintRuntimeEnvironment() + resp.Environment = runtimeenv.RuntimeEnvironment() resp.Version = version.Version apiresp.GinSuccess(c, resp) @@ -209,13 +207,7 @@ func (cm *ConfigManager) resetConfig(c *gin.Context, checkChange bool, ops ...cl changedKeys := make([]string, 0, len(configMap)) for k, v := range configMap { - err := config.Load( - cm.configPath, - k, - config.EnvPrefixMap[k], - cm.runtimeEnv, - v.new, - ) + err := config.Load(cm.configPath, k, config.EnvPrefixMap[k], v.new) if err != nil { log.ZError(c, "load config failed", err) continue diff --git a/internal/api/init.go b/internal/api/init.go index 20237ebc2..3b6a40913 100644 --- a/internal/api/init.go +++ b/internal/api/init.go @@ -44,10 +44,9 @@ import ( ) type Config struct { - *conf.AllConfig + conf.AllConfig - RuntimeEnv string - ConfigPath string + ConfigPath conf.Path } func Start(ctx context.Context, index int, config *Config) error { @@ -56,9 +55,7 @@ func Start(ctx context.Context, index int, config *Config) error { return err } - config.RuntimeEnv = runtimeenv.PrintRuntimeEnvironment() - - client, err := kdisc.NewDiscoveryRegister(&config.Discovery, config.RuntimeEnv, []string{ + client, err := kdisc.NewDiscoveryRegister(&config.Discovery, []string{ config.Discovery.RpcService.MessageGateway, }) if err != nil { @@ -135,7 +132,7 @@ func Start(ctx context.Context, index int, config *Config) error { address := net.JoinHostPort(network.GetListenIP(config.API.Api.ListenIP), strconv.Itoa(apiPort)) server := http.Server{Addr: address, Handler: router} - log.CInfo(ctx, "API server is initializing", "runtimeEnv", config.RuntimeEnv, "address", address, "apiPort", apiPort, "prometheusPort", prometheusPort) + log.CInfo(ctx, "API server is initializing", "runtimeEnv", runtimeenv.RuntimeEnvironment(), "address", address, "apiPort", apiPort, "prometheusPort", prometheusPort) go func() { err = server.ListenAndServe() if err != nil && !errors.Is(err, http.ErrServerClosed) { diff --git a/internal/api/router.go b/internal/api/router.go index da9d22463..d2f5d2c61 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -301,7 +301,7 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, cf if cfg.Discovery.Enable == config.ETCD { etcdClient = client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() } - cm := NewConfigManager(cfg.Share.IMAdminUserID, cfg.AllConfig, etcdClient, cfg.ConfigPath, cfg.RuntimeEnv) + cm := NewConfigManager(cfg.Share.IMAdminUserID, &cfg.AllConfig, etcdClient, string(cfg.ConfigPath)) { configGroup := r.Group("/config", cm.CheckAdmin) diff --git a/internal/msggateway/init.go b/internal/msggateway/init.go index f11bc3faf..3cbd1a06b 100644 --- a/internal/msggateway/init.go +++ b/internal/msggateway/init.go @@ -33,15 +33,11 @@ type Config struct { RedisConfig config.Redis WebhooksConfig config.Webhooks Discovery config.Discovery - - runtimeEnv string } // Start run ws server. func Start(ctx context.Context, index int, conf *Config) error { - conf.runtimeEnv = runtimeenv.PrintRuntimeEnvironment() - - log.CInfo(ctx, "MSG-GATEWAY server is initializing", "runtimeEnv", conf.runtimeEnv, + log.CInfo(ctx, "MSG-GATEWAY server is initializing", "runtimeEnv", runtimeenv.RuntimeEnvironment(), "rpcPorts", conf.MsgGateway.RPC.Ports, "wsPort", conf.MsgGateway.LongConnSvr.Ports, "prometheusPorts", conf.MsgGateway.Prometheus.Ports) wsPort, err := datautil.GetElemByIndex(conf.MsgGateway.LongConnSvr.Ports, index) diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index be0e94ba7..bf1c98f45 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -29,6 +29,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/mqbuild" "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/discovery/etcd" + "github.com/openimsdk/tools/mq" "github.com/openimsdk/tools/utils/jsonutil" "github.com/openimsdk/tools/utils/network" @@ -52,16 +53,16 @@ import ( ) type MsgTransfer struct { + historyConsumer mq.Consumer + historyMongoConsumer mq.Consumer // This consumer aggregated messages, subscribed to the topic:toRedis, // the message is stored in redis, Incr Redis, and then the message is sent to toPush topic for push, // and the message is sent to toMongo topic for persistence - historyCH *OnlineHistoryRedisConsumerHandler + historyHandler *OnlineHistoryRedisConsumerHandler //This consumer handle message to mongo - historyMongoCH *OnlineHistoryMongoConsumerHandler - ctx context.Context - cancel context.CancelFunc - - runTimeEnv string + historyMongoHandler *OnlineHistoryMongoConsumerHandler + ctx context.Context + cancel context.CancelFunc } type Config struct { @@ -75,11 +76,10 @@ type Config struct { } func Start(ctx context.Context, index int, config *Config) error { - runTimeEnv := runtimeenv.PrintRuntimeEnvironment() builder := mqbuild.NewBuilder(&config.Discovery, &config.KafkaConfig) - log.CInfo(ctx, "MSG-TRANSFER server is initializing", "runTimeEnv", runTimeEnv, "prometheusPorts", + log.CInfo(ctx, "MSG-TRANSFER server is initializing", "runTimeEnv", runtimeenv.RuntimeEnvironment(), "prometheusPorts", config.MsgTransfer.Prometheus.Ports, "index", index) mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build()) @@ -90,7 +90,7 @@ func Start(ctx context.Context, index int, config *Config) error { if err != nil { return err } - client, err := discRegister.NewDiscoveryRegister(&config.Discovery, runTimeEnv, nil) + client, err := discRegister.NewDiscoveryRegister(&config.Discovery, nil) if err != nil { return err } @@ -137,19 +137,25 @@ func Start(ctx context.Context, index int, config *Config) error { if err != nil { return err } - historyCH, err := NewOnlineHistoryRedisConsumerHandler(ctx, client, config, msgTransferDatabase) + historyConsumer, err := builder.GetTopicConsumer(ctx, config.KafkaConfig.ToRedisTopic) if err != nil { return err } - historyMongoCH, err := NewOnlineHistoryMongoConsumerHandler(&config.KafkaConfig, msgTransferDatabase) + historyMongoConsumer, err := builder.GetTopicConsumer(ctx, config.KafkaConfig.ToMongoTopic) if err != nil { return err } + historyHandler, err := NewOnlineHistoryRedisConsumerHandler(ctx, client, config, msgTransferDatabase) + if err != nil { + return err + } + historyMongoHandler := NewOnlineHistoryMongoConsumerHandler(msgTransferDatabase) msgTransfer := &MsgTransfer{ - historyCH: historyCH, - historyMongoCH: historyMongoCH, - runTimeEnv: runTimeEnv, + historyConsumer: historyConsumer, + historyMongoConsumer: historyMongoConsumer, + historyHandler: historyHandler, + historyMongoHandler: historyMongoHandler, } return msgTransfer.Start(index, config, client) @@ -162,10 +168,30 @@ func (m *MsgTransfer) Start(index int, config *Config, client discovery.SvcDisco netErr error ) - go m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyCH) - go m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyMongoCH) - go m.historyCH.HandleUserHasReadSeqMessages(m.ctx) - err := m.historyCH.redisMessageBatches.Start() + go func() { + for { + if err := m.historyConsumer.Subscribe(m.ctx, m.historyHandler.HandlerRedisMessage); err != nil { + log.ZError(m.ctx, "historyConsumer err", err) + return + } + } + }() + + go func() { + fn := func(ctx context.Context, key string, value []byte) error { + m.historyMongoHandler.HandleChatWs2Mongo(ctx, key, value) + return nil + } + for { + if err := m.historyMongoConsumer.Subscribe(m.ctx, fn); err != nil { + log.ZError(m.ctx, "historyMongoConsumer err", err) + return + } + } + }() + + go m.historyHandler.HandleUserHasReadSeqMessages(m.ctx) + err := m.historyHandler.redisMessageBatches.Start() if err != nil { return err } @@ -237,18 +263,18 @@ func (m *MsgTransfer) Start(index int, config *Config, client discovery.SvcDisco case <-sigs: program.SIGTERMExit() // graceful close kafka client. + _ = m.historyConsumer.Close() + _ = m.historyMongoConsumer.Close() m.cancel() - m.historyCH.redisMessageBatches.Close() - m.historyCH.Close() - m.historyCH.historyConsumerGroup.Close() - m.historyMongoCH.historyConsumerGroup.Close() + m.historyHandler.redisMessageBatches.Close() + m.historyHandler.Close() return nil case <-netDone: + _ = m.historyConsumer.Close() + _ = m.historyMongoConsumer.Close() m.cancel() - m.historyCH.redisMessageBatches.Close() - m.historyCH.Close() - m.historyCH.historyConsumerGroup.Close() - m.historyMongoCH.historyConsumerGroup.Close() + m.historyHandler.redisMessageBatches.Close() + m.historyHandler.Close() close(netDone) return netErr } diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 66bc562dc..bbf204b47 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -19,13 +19,12 @@ import ( "encoding/json" "errors" - "github.com/openimsdk/open-im-server/v3/pkg/rpcli" - "github.com/openimsdk/tools/discovery" - "github.com/openimsdk/tools/mq" - "sync" "time" + "github.com/openimsdk/open-im-server/v3/pkg/rpcli" + "github.com/openimsdk/tools/discovery" + "github.com/go-redis/redis" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" @@ -79,7 +78,7 @@ type ConsumerMessage struct { Value []byte } -func NewOnlineHistoryRedisConsumerHandler(ctx context.Context, client discovery.SvcDiscoveryRegistry, config *Config, database controller.MsgTransferDatabase, historyConsumer mq.Consumer) (*OnlineHistoryRedisConsumerHandler, error) { +func NewOnlineHistoryRedisConsumerHandler(ctx context.Context, client discovery.SvcDiscoveryRegistry, config *Config, database controller.MsgTransferDatabase) (*OnlineHistoryRedisConsumerHandler, error) { groupConn, err := client.GetConn(ctx, config.Discovery.RpcService.Group) if err != nil { return nil, err diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index d8836d54e..3e08596ec 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -17,36 +17,24 @@ package msgtransfer import ( "context" - "github.com/IBM/sarama" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" pbmsg "github.com/openimsdk/protocol/msg" "github.com/openimsdk/tools/log" - "github.com/openimsdk/tools/mq/kafka" "google.golang.org/protobuf/proto" ) type OnlineHistoryMongoConsumerHandler struct { - historyConsumerGroup *kafka.MConsumerGroup - msgTransferDatabase controller.MsgTransferDatabase + msgTransferDatabase controller.MsgTransferDatabase } -func NewOnlineHistoryMongoConsumerHandler(kafkaConf *config.Kafka, database controller.MsgTransferDatabase) (*OnlineHistoryMongoConsumerHandler, error) { - historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToMongoGroupID, []string{kafkaConf.ToMongoTopic}, true) - if err != nil { - return nil, err +func NewOnlineHistoryMongoConsumerHandler(database controller.MsgTransferDatabase) *OnlineHistoryMongoConsumerHandler { + return &OnlineHistoryMongoConsumerHandler{ + msgTransferDatabase: database, } - - mc := &OnlineHistoryMongoConsumerHandler{ - historyConsumerGroup: historyConsumerGroup, - msgTransferDatabase: database, - } - return mc, nil } -func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Context, cMsg *sarama.ConsumerMessage, key string, session sarama.ConsumerGroupSession) { - msg := cMsg.Value +func (mc *OnlineHistoryMongoConsumerHandler) HandleChatWs2Mongo(ctx context.Context, key string, msg []byte) { msgFromMQ := pbmsg.MsgDataToMongoByMQ{} err := proto.Unmarshal(msg, &msgFromMQ) if err != nil { @@ -54,7 +42,7 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont return } if len(msgFromMQ.MsgData) == 0 { - log.ZError(ctx, "msgFromMQ.MsgData is empty", nil, "cMsg", cMsg) + log.ZError(ctx, "msgFromMQ.MsgData is empty", nil, "key", key, "msg", msg) return } log.ZDebug(ctx, "mongo consumer recv msg", "msgs", msgFromMQ.String()) @@ -78,22 +66,3 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont seqs = append(seqs, msg.Seq) } } - -func (*OnlineHistoryMongoConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } - -func (*OnlineHistoryMongoConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } - -func (mc *OnlineHistoryMongoConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { // an instance in the consumer group - log.ZDebug(context.Background(), "online new session msg come", "highWaterMarkOffset", - claim.HighWaterMarkOffset(), "topic", claim.Topic(), "partition", claim.Partition()) - for msg := range claim.Messages() { - ctx := mc.historyConsumerGroup.GetContextFromMsg(msg) - if len(msg.Value) != 0 { - mc.handleChatWs2Mongo(ctx, msg, string(msg.Key), sess) - } else { - log.ZError(ctx, "mongo msg get from kafka but is nil", nil, "conversationID", msg.Key) - } - sess.MarkMessage(msg, "") - } - return nil -} diff --git a/internal/push/onlinepusher.go b/internal/push/onlinepusher.go index dc0b37089..4ebe4413a 100644 --- a/internal/push/onlinepusher.go +++ b/internal/push/onlinepusher.go @@ -11,6 +11,7 @@ import ( "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/utils/datautil" + "github.com/openimsdk/tools/utils/runtimeenv" "golang.org/x/sync/errgroup" "google.golang.org/grpc" @@ -40,14 +41,14 @@ func (u emptyOnlinePusher) GetOnlinePushFailedUserIDs(ctx context.Context, msg * } func NewOnlinePusher(disCov discovery.Conn, config *Config) OnlinePusher { - if config.runTimeEnv == conf.KUBERNETES { + if runtimeenv.RuntimeEnvironment() == conf.KUBERNETES { return NewDefaultAllNode(disCov, config) } switch config.Discovery.Enable { case conf.ETCD: return NewDefaultAllNode(disCov, config) default: - log.ZError(context.Background(), "NewOnlinePusher is error", errs.Wrap(errors.New("unsupported discovery type")), "type", config.Discovery.Enable) + log.ZWarn(context.Background(), "NewOnlinePusher is error", errs.Wrap(errors.New("unsupported discovery type")), "type", config.Discovery.Enable) return nil } } diff --git a/internal/push/push.go b/internal/push/push.go index cab910da6..5c2e34ab4 100644 --- a/internal/push/push.go +++ b/internal/push/push.go @@ -15,7 +15,6 @@ import ( "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" - "github.com/openimsdk/tools/utils/runtimeenv" "google.golang.org/grpc" ) @@ -35,9 +34,7 @@ type Config struct { WebhooksConfig config.Webhooks LocalCacheConfig config.LocalCache Discovery config.Discovery - FcmConfigPath string - - runTimeEnv string + FcmConfigPath config.Path } func (p pushServer) DelUserPushToken(ctx context.Context, @@ -49,14 +46,12 @@ func (p pushServer) DelUserPushToken(ctx context.Context, } func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { - config.runTimeEnv = runtimeenv.PrintRuntimeEnvironment() - rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build()) if err != nil { return err } cacheModel := redis.NewThirdCache(rdb) - offlinePusher, err := offlinepush.NewOfflinePusher(&config.RpcConfig, cacheModel, config.FcmConfigPath) + offlinePusher, err := offlinepush.NewOfflinePusher(&config.RpcConfig, cacheModel, string(config.FcmConfigPath)) if err != nil { return err } diff --git a/internal/tools/cron_test.go b/internal/tools/cron_test.go index b4082a5a5..96068520c 100644 --- a/internal/tools/cron_test.go +++ b/internal/tools/cron_test.go @@ -24,7 +24,7 @@ func TestName(t *testing.T) { Address: []string{"localhost:12379"}, }, } - client, err := kdisc.NewDiscoveryRegister(conf, "source") + client, err := kdisc.NewDiscoveryRegister(conf, nil) if err != nil { panic(err) } diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 2315522f3..1a87fe1d6 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -30,6 +30,8 @@ import ( const StructTagName = "yaml" +type Path string + type CacheConfig struct { Topic string `yaml:"topic"` SlotNum int `yaml:"slotNum"` diff --git a/pkg/common/config/constant.go b/pkg/common/config/constant.go index 3a83faaa3..066fcd5c0 100644 --- a/pkg/common/config/constant.go +++ b/pkg/common/config/constant.go @@ -14,12 +14,14 @@ package config +import "github.com/openimsdk/tools/utils/runtimeenv" + const ConfKey = "conf" const ( MountConfigFilePath = "CONFIG_PATH" DeploymentType = "DEPLOYMENT_TYPE" - KUBERNETES = "kubernetes" + KUBERNETES = runtimeenv.Kubernetes ETCD = "etcd" Standalone = "standalone" ) diff --git a/pkg/common/config/load_config.go b/pkg/common/config/load_config.go index 76c724b2a..142b704e1 100644 --- a/pkg/common/config/load_config.go +++ b/pkg/common/config/load_config.go @@ -7,11 +7,12 @@ import ( "github.com/mitchellh/mapstructure" "github.com/openimsdk/tools/errs" + "github.com/openimsdk/tools/utils/runtimeenv" "github.com/spf13/viper" ) -func Load(configDirectory string, configFileName string, envPrefix string, runtimeEnv string, config any) error { - if runtimeEnv == KUBERNETES { +func Load(configDirectory string, configFileName string, envPrefix string, config any) error { + if runtimeenv.RuntimeEnvironment() == KUBERNETES { mountPath := os.Getenv(MountConfigFilePath) if mountPath == "" { return errs.ErrArgs.WrapMsg(MountConfigFilePath + " env is empty") diff --git a/pkg/common/discovery/discoveryregister.go b/pkg/common/discovery/discoveryregister.go index 97b5c4988..56160b9a9 100644 --- a/pkg/common/discovery/discoveryregister.go +++ b/pkg/common/discovery/discoveryregister.go @@ -20,6 +20,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/discovery/standalone" + "github.com/openimsdk/tools/utils/runtimeenv" "google.golang.org/grpc" "github.com/openimsdk/tools/discovery/kubernetes" @@ -29,8 +30,8 @@ import ( ) // NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type. -func NewDiscoveryRegister(discovery *config.Discovery, runtimeEnv string, watchNames []string) (discovery.SvcDiscoveryRegistry, error) { - if runtimeEnv == config.KUBERNETES { +func NewDiscoveryRegister(discovery *config.Discovery, watchNames []string) (discovery.SvcDiscoveryRegistry, error) { + if runtimeenv.RuntimeEnvironment() == config.KUBERNETES { return kubernetes.NewKubernetesConnManager(discovery.Kubernetes.Namespace, grpc.WithDefaultCallOptions( grpc.MaxCallSendMsgSize(1024*1024*20), diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index 41e1edc1e..b0b6c9a36 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -33,8 +33,6 @@ import ( "github.com/openimsdk/tools/utils/jsonutil" "google.golang.org/grpc/status" - "github.com/openimsdk/tools/utils/runtimeenv" - kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discovery" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/tools/discovery" @@ -74,8 +72,6 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf return err } - runTimeEnv := runtimeenv.PrintRuntimeEnvironment() - if !autoSetPorts { rpcPort, err := datautil.GetElemByIndex(rpcPorts, index) if err != nil { @@ -99,7 +95,7 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf if autoSetPorts && discovery.Enable != conf.ETCD { return errs.New("only etcd support autoSetPorts", "rpcRegisterName", rpcRegisterName).Wrap() } - client, err := kdisc.NewDiscoveryRegister(discovery, runTimeEnv, watchServiceNames) + client, err := kdisc.NewDiscoveryRegister(discovery, watchServiceNames) if err != nil { return err } diff --git a/pkg/common/storage/database/black.go b/pkg/common/storage/database/black.go index b53fdd14d..5af960abe 100644 --- a/pkg/common/storage/database/black.go +++ b/pkg/common/storage/database/black.go @@ -16,6 +16,7 @@ package database import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/tools/db/pagination" ) @@ -29,3 +30,85 @@ type Black interface { FindOwnerBlackInfos(ctx context.Context, ownerUserID string, userIDs []string) (blacks []*model.Black, err error) FindBlackUserIDs(ctx context.Context, ownerUserID string) (blackUserIDs []string, err error) } + +var ( + _ Black = (*mgoImpl)(nil) + _ Black = (*redisImpl)(nil) +) + +type mgoImpl struct { +} + +func (m *mgoImpl) Create(ctx context.Context, blacks []*model.Black) (err error) { + //TODO implement me + panic("implement me") +} + +func (m *mgoImpl) Delete(ctx context.Context, blacks []*model.Black) (err error) { + //TODO implement me + panic("implement me") +} + +func (m *mgoImpl) Find(ctx context.Context, blacks []*model.Black) (blackList []*model.Black, err error) { + //TODO implement me + panic("implement me") +} + +func (m *mgoImpl) Take(ctx context.Context, ownerUserID, blockUserID string) (black *model.Black, err error) { + //TODO implement me + panic("implement me") +} + +func (m *mgoImpl) FindOwnerBlacks(ctx context.Context, ownerUserID string, pagination pagination.Pagination) (total int64, blacks []*model.Black, err error) { + //TODO implement me + panic("implement me") +} + +func (m *mgoImpl) FindOwnerBlackInfos(ctx context.Context, ownerUserID string, userIDs []string) (blacks []*model.Black, err error) { + //TODO implement me + panic("implement me") +} + +func (m *mgoImpl) FindBlackUserIDs(ctx context.Context, ownerUserID string) (blackUserIDs []string, err error) { + //TODO implement me + panic("implement me") +} + +type redisImpl struct { +} + +func (r *redisImpl) Create(ctx context.Context, blacks []*model.Black) (err error) { + + //TODO implement me + panic("implement me") +} + +func (r *redisImpl) Delete(ctx context.Context, blacks []*model.Black) (err error) { + //TODO implement me + panic("implement me") +} + +func (r *redisImpl) Find(ctx context.Context, blacks []*model.Black) (blackList []*model.Black, err error) { + //TODO implement me + panic("implement me") +} + +func (r *redisImpl) Take(ctx context.Context, ownerUserID, blockUserID string) (black *model.Black, err error) { + //TODO implement me + panic("implement me") +} + +func (r *redisImpl) FindOwnerBlacks(ctx context.Context, ownerUserID string, pagination pagination.Pagination) (total int64, blacks []*model.Black, err error) { + //TODO implement me + panic("implement me") +} + +func (r *redisImpl) FindOwnerBlackInfos(ctx context.Context, ownerUserID string, userIDs []string) (blacks []*model.Black, err error) { + //TODO implement me + panic("implement me") +} + +func (r *redisImpl) FindBlackUserIDs(ctx context.Context, ownerUserID string) (blackUserIDs []string, err error) { + //TODO implement me + panic("implement me") +}