diff --git a/go.mod b/go.mod index 03a7a4d4d..10ff7e6d0 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 github.com/openimsdk/protocol v0.0.72-alpha.70 - github.com/openimsdk/tools v0.0.50-alpha.63 + github.com/openimsdk/tools v0.0.50-alpha.64 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 diff --git a/go.sum b/go.sum index 4c297134d..98ba2e7e2 100644 --- a/go.sum +++ b/go.sum @@ -349,8 +349,8 @@ github.com/openimsdk/gomake v0.0.15-alpha.2 h1:5Q8yl8ezy2yx+q8/ucU/t4kJnDfCzNOrk github.com/openimsdk/gomake v0.0.15-alpha.2/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/protocol v0.0.72-alpha.70 h1:j7vB81+rTthijRda2b8tlli9oWvPxr4yXHwZ8nPZIBQ= github.com/openimsdk/protocol v0.0.72-alpha.70/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M= -github.com/openimsdk/tools v0.0.50-alpha.63 h1:dPoVvg4KWqYX/xtK3j96TwX2A/4jwT5S5XIHvSM9hTY= -github.com/openimsdk/tools v0.0.50-alpha.63/go.mod h1:B+oqV0zdewN7OiEHYJm+hW+8/Te7B8tHHgD8rK5ZLZk= +github.com/openimsdk/tools v0.0.50-alpha.64 h1:KmtE8V2K8atQJJg1xq2ySSrPQyf8ldwk8fw6jRnsxCw= +github.com/openimsdk/tools v0.0.50-alpha.64/go.mod h1:B+oqV0zdewN7OiEHYJm+hW+8/Te7B8tHHgD8rK5ZLZk= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= diff --git a/internal/api/config_manager.go b/internal/api/config_manager.go index c330cad46..7a36bb605 100644 --- a/internal/api/config_manager.go +++ b/internal/api/config_manager.go @@ -19,22 +19,29 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" ) +const ( + // wait for Restart http call return + waitHttp = time.Millisecond * 200 +) + type ConfigManager struct { imAdminUserID []string config *config.AllConfig client *clientv3.Client - configPath string - runtimeEnv string + + configPath string + runtimeEnv string } func NewConfigManager(IMAdminUserID []string, cfg *config.AllConfig, client *clientv3.Client, configPath string, runtimeEnv string) *ConfigManager { - return &ConfigManager{ + cm := &ConfigManager{ imAdminUserID: IMAdminUserID, config: cfg, client: client, configPath: configPath, runtimeEnv: runtimeEnv, } + return cm } func (cm *ConfigManager) CheckAdmin(c *gin.Context) { @@ -85,49 +92,49 @@ func (cm *ConfigManager) SetConfig(c *gin.Context) { var err error switch req.ConfigName { case cm.config.Discovery.GetConfigFileName(): - err = compareAndSave[config.Discovery](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + err = compareAndSave[config.Discovery](c, cm.config.Name2Config(req.ConfigName), &req, cm) case cm.config.Kafka.GetConfigFileName(): - err = compareAndSave[config.Kafka](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + err = compareAndSave[config.Kafka](c, cm.config.Name2Config(req.ConfigName), &req, cm) case cm.config.LocalCache.GetConfigFileName(): - err = compareAndSave[config.LocalCache](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + err = compareAndSave[config.LocalCache](c, cm.config.Name2Config(req.ConfigName), &req, cm) case cm.config.Log.GetConfigFileName(): - err = compareAndSave[config.Log](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + err = compareAndSave[config.Log](c, cm.config.Name2Config(req.ConfigName), &req, cm) case cm.config.Minio.GetConfigFileName(): - err = compareAndSave[config.Minio](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + err = compareAndSave[config.Minio](c, cm.config.Name2Config(req.ConfigName), &req, cm) case cm.config.Mongo.GetConfigFileName(): - err = compareAndSave[config.Mongo](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + err = compareAndSave[config.Mongo](c, cm.config.Name2Config(req.ConfigName), &req, cm) case cm.config.Notification.GetConfigFileName(): - err = compareAndSave[config.Notification](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + err = compareAndSave[config.Notification](c, cm.config.Name2Config(req.ConfigName), &req, cm) case cm.config.API.GetConfigFileName(): - err = compareAndSave[config.API](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + err = compareAndSave[config.API](c, cm.config.Name2Config(req.ConfigName), &req, cm) case cm.config.CronTask.GetConfigFileName(): - err = compareAndSave[config.CronTask](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + err = compareAndSave[config.CronTask](c, cm.config.Name2Config(req.ConfigName), &req, cm) case cm.config.MsgGateway.GetConfigFileName(): - err = compareAndSave[config.MsgGateway](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + err = compareAndSave[config.MsgGateway](c, cm.config.Name2Config(req.ConfigName), &req, cm) case cm.config.MsgTransfer.GetConfigFileName(): - err = compareAndSave[config.MsgTransfer](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + err = compareAndSave[config.MsgTransfer](c, cm.config.Name2Config(req.ConfigName), &req, cm) case cm.config.Push.GetConfigFileName(): - err = compareAndSave[config.Push](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + err = compareAndSave[config.Push](c, cm.config.Name2Config(req.ConfigName), &req, cm) case cm.config.Auth.GetConfigFileName(): - err = compareAndSave[config.Auth](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + err = compareAndSave[config.Auth](c, cm.config.Name2Config(req.ConfigName), &req, cm) case cm.config.Conversation.GetConfigFileName(): - err = compareAndSave[config.Conversation](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + err = compareAndSave[config.Conversation](c, cm.config.Name2Config(req.ConfigName), &req, cm) case cm.config.Friend.GetConfigFileName(): - err = compareAndSave[config.Friend](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + err = compareAndSave[config.Friend](c, cm.config.Name2Config(req.ConfigName), &req, cm) case cm.config.Group.GetConfigFileName(): - err = compareAndSave[config.Group](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + err = compareAndSave[config.Group](c, cm.config.Name2Config(req.ConfigName), &req, cm) case cm.config.Msg.GetConfigFileName(): - err = compareAndSave[config.Msg](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + err = compareAndSave[config.Msg](c, cm.config.Name2Config(req.ConfigName), &req, cm) case cm.config.Third.GetConfigFileName(): - err = compareAndSave[config.Third](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + err = compareAndSave[config.Third](c, cm.config.Name2Config(req.ConfigName), &req, cm) case cm.config.User.GetConfigFileName(): - err = compareAndSave[config.User](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + err = compareAndSave[config.User](c, cm.config.Name2Config(req.ConfigName), &req, cm) case cm.config.Redis.GetConfigFileName(): - err = compareAndSave[config.Redis](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + err = compareAndSave[config.Redis](c, cm.config.Name2Config(req.ConfigName), &req, cm) case cm.config.Share.GetConfigFileName(): - err = compareAndSave[config.Share](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + err = compareAndSave[config.Share](c, cm.config.Name2Config(req.ConfigName), &req, cm) case cm.config.Webhooks.GetConfigFileName(): - err = compareAndSave[config.Webhooks](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + err = compareAndSave[config.Webhooks](c, cm.config.Name2Config(req.ConfigName), &req, cm) default: apiresp.GinError(c, errs.ErrArgs.Wrap()) return @@ -139,7 +146,7 @@ func (cm *ConfigManager) SetConfig(c *gin.Context) { apiresp.GinSuccess(c, nil) } -func compareAndSave[T any](c *gin.Context, old any, req *apistruct.SetConfigReq, client *clientv3.Client) error { +func compareAndSave[T any](c *gin.Context, old any, req *apistruct.SetConfigReq, cm *ConfigManager) error { conf := new(T) err := json.Unmarshal([]byte(req.Data), &conf) if err != nil { @@ -153,7 +160,7 @@ func compareAndSave[T any](c *gin.Context, old any, req *apistruct.SetConfigReq, if err != nil { return errs.ErrArgs.WithDetail(err.Error()).Wrap() } - _, err = client.Put(c, etcd.BuildKey(req.ConfigName), string(data)) + _, err = cm.client.Put(c, etcd.BuildKey(req.ConfigName), string(data)) if err != nil { return errs.WrapMsg(err, "save to etcd failed") } @@ -161,16 +168,19 @@ func compareAndSave[T any](c *gin.Context, old any, req *apistruct.SetConfigReq, } func (cm *ConfigManager) ResetConfig(c *gin.Context) { - go cm.resetConfig(c) + go func() { + if err := cm.resetConfig(c, true); err != nil { + log.ZError(c, "reset config err", err) + } + }() apiresp.GinSuccess(c, nil) } -func (cm *ConfigManager) resetConfig(c *gin.Context) { +func (cm *ConfigManager) resetConfig(c *gin.Context, checkChange bool, ops ...clientv3.Op) error { txn := cm.client.Txn(c) type initConf struct { - old any - new any - isChanged bool + old any + new any } configMap := map[string]*initConf{ cm.config.Discovery.GetConfigFileName(): {old: &cm.config.Discovery, new: new(config.Discovery)}, @@ -210,13 +220,12 @@ func (cm *ConfigManager) resetConfig(c *gin.Context) { log.ZError(c, "load config failed", err) continue } - v.isChanged = reflect.DeepEqual(v.old, v.new) - if !v.isChanged { + equal := reflect.DeepEqual(v.old, v.new) + if !checkChange || !equal { changedKeys = append(changedKeys, k) } } - ops := make([]clientv3.Op, 0) for _, k := range changedKeys { data, err := json.Marshal(configMap[k].new) if err != nil { @@ -229,10 +238,10 @@ func (cm *ConfigManager) resetConfig(c *gin.Context) { txn.Then(ops...) _, err := txn.Commit() if err != nil { - log.ZError(c, "commit etcd txn failed", err) - return + return errs.WrapMsg(err, "commit etcd txn failed") } } + return nil } func (cm *ConfigManager) Restart(c *gin.Context) { @@ -241,10 +250,59 @@ func (cm *ConfigManager) Restart(c *gin.Context) { } func (cm *ConfigManager) restart(c *gin.Context) { - time.Sleep(time.Millisecond * 200) // wait for Restart http call return + time.Sleep(waitHttp) // wait for Restart http call return t := time.Now().Unix() _, err := cm.client.Put(c, etcd.BuildKey(etcd.RestartKey), strconv.Itoa(int(t))) if err != nil { log.ZError(c, "restart etcd put key failed", err) } } + +func (cm *ConfigManager) SetEnableConfigManager(c *gin.Context) { + var req apistruct.SetEnableConfigManagerReq + if err := c.BindJSON(&req); err != nil { + apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) + return + } + var enableStr string + if req.Enable { + enableStr = etcd.Enable + } else { + enableStr = etcd.Disable + } + resp, err := cm.client.Get(c, etcd.BuildKey(etcd.EnableConfigCenterKey)) + if err != nil { + apiresp.GinError(c, errs.WrapMsg(err, "getEnableConfigManager failed")) + return + } + if !(resp.Count > 0 && string(resp.Kvs[0].Value) == etcd.Enable) && req.Enable { + go func() { + time.Sleep(waitHttp) // wait for Restart http call return + err := cm.resetConfig(c, false, clientv3.OpPut(etcd.BuildKey(etcd.EnableConfigCenterKey), enableStr)) + if err != nil { + log.ZError(c, "resetConfig failed", err) + } + }() + } else { + _, err = cm.client.Put(c, etcd.BuildKey(etcd.EnableConfigCenterKey), enableStr) + if err != nil { + apiresp.GinError(c, errs.WrapMsg(err, "setEnableConfigManager failed")) + return + } + } + + apiresp.GinSuccess(c, nil) +} + +func (cm *ConfigManager) GetEnableConfigManager(c *gin.Context) { + resp, err := cm.client.Get(c, etcd.BuildKey(etcd.EnableConfigCenterKey)) + if err != nil { + apiresp.GinError(c, errs.WrapMsg(err, "getEnableConfigManager failed")) + return + } + var enable bool + if resp.Count > 0 && string(resp.Kvs[0].Value) == etcd.Enable { + enable = true + } + apiresp.GinSuccess(c, &apistruct.GetEnableConfigManagerResp{Enable: enable}) +} diff --git a/internal/api/init.go b/internal/api/init.go index 780ecb913..20237ebc2 100644 --- a/internal/api/init.go +++ b/internal/api/init.go @@ -58,7 +58,9 @@ func Start(ctx context.Context, index int, config *Config) error { config.RuntimeEnv = runtimeenv.PrintRuntimeEnvironment() - client, err := kdisc.NewDiscoveryRegister(&config.Discovery, config.RuntimeEnv) + client, err := kdisc.NewDiscoveryRegister(&config.Discovery, config.RuntimeEnv, []string{ + config.Discovery.RpcService.MessageGateway, + }) if err != nil { return errs.WrapMsg(err, "failed to register discovery service") } diff --git a/internal/api/router.go b/internal/api/router.go index e516d8ca8..0ae0eab05 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -314,6 +314,8 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, cf configGroup.POST("/get_config", cm.GetConfig) configGroup.POST("/set_config", cm.SetConfig) configGroup.POST("/reset_config", cm.ResetConfig) + configGroup.POST("/set_enable_config_manager", cm.SetEnableConfigManager) + configGroup.POST("/get_enable_config_manager", cm.GetEnableConfigManager) } { r.POST("/restart", cm.CheckAdmin, cm.Restart) diff --git a/internal/msggateway/hub_server.go b/internal/msggateway/hub_server.go index 490711cfe..887a90d7a 100644 --- a/internal/msggateway/hub_server.go +++ b/internal/msggateway/hub_server.go @@ -16,9 +16,10 @@ package msggateway import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "sync/atomic" + "github.com/openimsdk/open-im-server/v3/pkg/rpcli" + "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" @@ -64,6 +65,9 @@ func (s *Server) Start(ctx context.Context, index int, conf *Config) error { conf.WebhooksConfig.GetConfigFileName(), conf.RedisConfig.GetConfigFileName(), }, + []string{ + conf.Discovery.RpcService.MessageGateway, + }, s.InitServer, ) } diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index fcd6152dc..96e6bbde0 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -87,7 +87,7 @@ func Start(ctx context.Context, index int, config *Config) error { if err != nil { return err } - client, err := discRegister.NewDiscoveryRegister(&config.Discovery, runTimeEnv) + client, err := discRegister.NewDiscoveryRegister(&config.Discovery, runTimeEnv, nil) if err != nil { return err } diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index 71fd886f6..da1c6320e 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -2,11 +2,14 @@ package tools import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discovery" + disetcd "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd" pbconversation "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/third" + "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/mw" @@ -27,38 +30,47 @@ type CronTaskConfig struct { runTimeEnv string } -func Start(ctx context.Context, config *CronTaskConfig) error { - config.runTimeEnv = runtimeenv.PrintRuntimeEnvironment() +func Start(ctx context.Context, conf *CronTaskConfig) error { + conf.runTimeEnv = runtimeenv.PrintRuntimeEnvironment() - log.CInfo(ctx, "CRON-TASK server is initializing", "runTimeEnv", config.runTimeEnv, "chatRecordsClearTime", config.CronTask.CronExecuteTime, "msgDestructTime", config.CronTask.RetainChatRecords) - if config.CronTask.RetainChatRecords < 1 { + log.CInfo(ctx, "CRON-TASK server is initializing", "runTimeEnv", conf.runTimeEnv, "chatRecordsClearTime", conf.CronTask.CronExecuteTime, "msgDestructTime", conf.CronTask.RetainChatRecords) + if conf.CronTask.RetainChatRecords < 1 { return errs.New("msg destruct time must be greater than 1").Wrap() } - client, err := kdisc.NewDiscoveryRegister(&config.Discovery, config.runTimeEnv) + client, err := kdisc.NewDiscoveryRegister(&conf.Discovery, conf.runTimeEnv, nil) if err != nil { return errs.WrapMsg(err, "failed to register discovery service") } client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials())) - ctx = mcontext.SetOpUserID(ctx, config.Share.IMAdminUserID[0]) + ctx = mcontext.SetOpUserID(ctx, conf.Share.IMAdminUserID[0]) - msgConn, err := client.GetConn(ctx, config.Discovery.RpcService.Msg) + msgConn, err := client.GetConn(ctx, conf.Discovery.RpcService.Msg) if err != nil { return err } - thirdConn, err := client.GetConn(ctx, config.Discovery.RpcService.Third) + thirdConn, err := client.GetConn(ctx, conf.Discovery.RpcService.Third) if err != nil { return err } - conversationConn, err := client.GetConn(ctx, config.Discovery.RpcService.Conversation) + conversationConn, err := client.GetConn(ctx, conf.Discovery.RpcService.Conversation) if err != nil { return err } + if conf.Discovery.Enable == config.ETCD { + cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), []string{ + conf.CronTask.GetConfigFileName(), + conf.Share.GetConfigFileName(), + conf.Discovery.GetConfigFileName(), + }) + cm.Watch(ctx) + } + srv := &cronServer{ ctx: ctx, - config: config, + config: conf, cron: cron.New(), msgClient: msg.NewMsgClient(msgConn), conversationClient: pbconversation.NewConversationClient(conversationConn), @@ -74,7 +86,7 @@ func Start(ctx context.Context, config *CronTaskConfig) error { if err := srv.registerClearUserMsg(); err != nil { return err } - log.ZDebug(ctx, "start cron task", "CronExecuteTime", config.CronTask.CronExecuteTime) + log.ZDebug(ctx, "start cron task", "CronExecuteTime", conf.CronTask.CronExecuteTime) srv.cron.Start() <-ctx.Done() return nil diff --git a/pkg/apistruct/config_manager.go b/pkg/apistruct/config_manager.go index 84b8fb36b..9b8641c9d 100644 --- a/pkg/apistruct/config_manager.go +++ b/pkg/apistruct/config_manager.go @@ -14,3 +14,11 @@ type SetConfigReq struct { ConfigName string `json:"configName"` Data string `json:"data"` } + +type SetEnableConfigManagerReq struct { + Enable bool `json:"enable"` +} + +type GetEnableConfigManagerResp struct { + Enable bool `json:"enable"` +} diff --git a/pkg/common/cmd/auth.go b/pkg/common/cmd/auth.go index a5ab3fea7..b09d4153f 100644 --- a/pkg/common/cmd/auth.go +++ b/pkg/common/cmd/auth.go @@ -64,5 +64,8 @@ func (a *AuthRpcCmd) runE() error { a.authConfig.RedisConfig.GetConfigFileName(), a.authConfig.Discovery.GetConfigFileName(), }, + []string{ + a.authConfig.Discovery.RpcService.MessageGateway, + }, auth.Start) } diff --git a/pkg/common/cmd/conversation.go b/pkg/common/cmd/conversation.go index 12c29a873..2f8769897 100644 --- a/pkg/common/cmd/conversation.go +++ b/pkg/common/cmd/conversation.go @@ -68,6 +68,6 @@ func (a *ConversationRpcCmd) runE() error { a.conversationConfig.Share.GetConfigFileName(), a.conversationConfig.LocalCacheConfig.GetConfigFileName(), a.conversationConfig.Discovery.GetConfigFileName(), - }, + }, nil, conversation.Start) } diff --git a/pkg/common/cmd/friend.go b/pkg/common/cmd/friend.go index 209d481bb..dd850cf17 100644 --- a/pkg/common/cmd/friend.go +++ b/pkg/common/cmd/friend.go @@ -70,6 +70,6 @@ func (a *FriendRpcCmd) runE() error { a.relationConfig.WebhooksConfig.GetConfigFileName(), a.relationConfig.LocalCacheConfig.GetConfigFileName(), a.relationConfig.Discovery.GetConfigFileName(), - }, + }, nil, relation.Start) } diff --git a/pkg/common/cmd/group.go b/pkg/common/cmd/group.go index 23fd460f7..7a599077f 100644 --- a/pkg/common/cmd/group.go +++ b/pkg/common/cmd/group.go @@ -71,6 +71,6 @@ func (a *GroupRpcCmd) runE() error { a.groupConfig.WebhooksConfig.GetConfigFileName(), a.groupConfig.LocalCacheConfig.GetConfigFileName(), a.groupConfig.Discovery.GetConfigFileName(), - }, + }, nil, group.Start, versionctx.EnableVersionCtx()) } diff --git a/pkg/common/cmd/msg.go b/pkg/common/cmd/msg.go index b6f0b6131..c4049be05 100644 --- a/pkg/common/cmd/msg.go +++ b/pkg/common/cmd/msg.go @@ -72,6 +72,6 @@ func (a *MsgRpcCmd) runE() error { a.msgConfig.WebhooksConfig.GetConfigFileName(), a.msgConfig.LocalCacheConfig.GetConfigFileName(), a.msgConfig.Discovery.GetConfigFileName(), - }, + }, nil, msg.Start) } diff --git a/pkg/common/cmd/push.go b/pkg/common/cmd/push.go index 41b9d56e6..c4ae84952 100644 --- a/pkg/common/cmd/push.go +++ b/pkg/common/cmd/push.go @@ -72,5 +72,8 @@ func (a *PushRpcCmd) runE() error { a.pushConfig.LocalCacheConfig.GetConfigFileName(), a.pushConfig.Discovery.GetConfigFileName(), }, + []string{ + a.pushConfig.Discovery.RpcService.MessageGateway, + }, push.Start) } diff --git a/pkg/common/cmd/root.go b/pkg/common/cmd/root.go index 4c5256d80..0a405fb6e 100644 --- a/pkg/common/cmd/root.go +++ b/pkg/common/cmd/root.go @@ -93,7 +93,7 @@ func (r *RootCmd) initEtcd() error { return err } if disConfig.Enable == config.ETCD { - discov, _ := kdisc.NewDiscoveryRegister(&disConfig, env) + discov, _ := kdisc.NewDiscoveryRegister(&disConfig, env, nil) r.etcdClient = discov.(*etcd.SvcDiscoveryRegistryImpl).GetClient() } return nil @@ -113,7 +113,9 @@ func (r *RootCmd) persistentPreRun(cmd *cobra.Command, opts ...func(*CmdOpts)) e if err := r.initializeLogger(cmdOpts); err != nil { return errs.WrapMsg(err, "failed to initialize logger") } - + if err := r.etcdClient.Close(); err != nil { + return errs.WrapMsg(err, "failed to close etcd client") + } return nil } @@ -141,9 +143,24 @@ func (r *RootCmd) updateConfigFromEtcd(opts *CmdOpts) error { if r.etcdClient == nil { return nil } + ctx := context.TODO() + + res, err := r.etcdClient.Get(ctx, disetcd.BuildKey(disetcd.EnableConfigCenterKey)) + if err != nil { + log.ZWarn(ctx, "root cmd updateConfigFromEtcd, etcd Get EnableConfigCenterKey err: %v", errs.Wrap(err)) + return nil + } + if res.Count == 0 { + return nil + } else { + if string(res.Kvs[0].Value) == disetcd.Disable { + return nil + } else if string(res.Kvs[0].Value) != disetcd.Enable { + return errs.New("unknown EnableConfigCenter value").Wrap() + } + } update := func(configFileName string, configStruct any) error { - ctx := context.TODO() key := disetcd.BuildKey(configFileName) etcdRes, err := r.etcdClient.Get(ctx, key) if err != nil { diff --git a/pkg/common/cmd/third.go b/pkg/common/cmd/third.go index 5086116b5..e567234e4 100644 --- a/pkg/common/cmd/third.go +++ b/pkg/common/cmd/third.go @@ -70,6 +70,6 @@ func (a *ThirdRpcCmd) runE() error { a.thirdConfig.MinioConfig.GetConfigFileName(), a.thirdConfig.LocalCacheConfig.GetConfigFileName(), a.thirdConfig.Discovery.GetConfigFileName(), - }, + }, nil, third.Start) } diff --git a/pkg/common/cmd/user.go b/pkg/common/cmd/user.go index 61125e0c3..190f6f892 100644 --- a/pkg/common/cmd/user.go +++ b/pkg/common/cmd/user.go @@ -72,6 +72,6 @@ func (a *UserRpcCmd) runE() error { a.userConfig.WebhooksConfig.GetConfigFileName(), a.userConfig.LocalCacheConfig.GetConfigFileName(), a.userConfig.Discovery.GetConfigFileName(), - }, + }, nil, user.Start) } diff --git a/pkg/common/discovery/discoveryregister.go b/pkg/common/discovery/discoveryregister.go index bc9fd0f5a..1b64c3e78 100644 --- a/pkg/common/discovery/discoveryregister.go +++ b/pkg/common/discovery/discoveryregister.go @@ -28,7 +28,7 @@ import ( ) // NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type. -func NewDiscoveryRegister(discovery *config.Discovery, runtimeEnv string) (discovery.SvcDiscoveryRegistry, error) { +func NewDiscoveryRegister(discovery *config.Discovery, runtimeEnv string, watchNames []string) (discovery.SvcDiscoveryRegistry, error) { if runtimeEnv == config.KUBERNETES { return kubernetes.NewKubernetesConnManager(discovery.Kubernetes.Namespace, grpc.WithDefaultCallOptions( @@ -42,6 +42,7 @@ func NewDiscoveryRegister(discovery *config.Discovery, runtimeEnv string) (disco return etcd.NewSvcDiscoveryRegistry( discovery.Etcd.RootDirectory, discovery.Etcd.Address, + watchNames, etcd.WithDialTimeout(10*time.Second), etcd.WithMaxCallSendMsgSize(20*1024*1024), etcd.WithUsernameAndPassword(discovery.Etcd.Username, discovery.Etcd.Password)) diff --git a/pkg/common/discovery/etcd/config_manager.go b/pkg/common/discovery/etcd/config_manager.go index 013e2cce3..70d37c323 100644 --- a/pkg/common/discovery/etcd/config_manager.go +++ b/pkg/common/discovery/etcd/config_manager.go @@ -14,11 +14,6 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" ) -const ( - ConfigKeyPrefix = "/open-im/config/" - RestartKey = "restart" -) - var ( ShutDowns []func() error ) diff --git a/pkg/common/discovery/etcd/const.go b/pkg/common/discovery/etcd/const.go new file mode 100644 index 000000000..c9b00fc2c --- /dev/null +++ b/pkg/common/discovery/etcd/const.go @@ -0,0 +1,9 @@ +package etcd + +const ( + ConfigKeyPrefix = "/open-im/config/" + RestartKey = "restart" + EnableConfigCenterKey = "enable-config-center" + Enable = "enable" + Disable = "disable" +) diff --git a/pkg/common/discovery/etcd/doc.go b/pkg/common/discovery/etcd/doc.go deleted file mode 100644 index fedf5ad51..000000000 --- a/pkg/common/discovery/etcd/doc.go +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright © 2024 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package etcd // import "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd" diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index 5facc8f73..27aabca95 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -49,7 +49,7 @@ import ( // Start rpc server. func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConfig *conf.Prometheus, listenIP, registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, notification *conf.Notification, config T, - watchConfigNames []string, + watchConfigNames []string, watchServiceNames []string, rpcFn func(ctx context.Context, config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption) error { @@ -95,7 +95,7 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf if autoSetPorts && discovery.Enable != conf.ETCD { return errs.New("only etcd support autoSetPorts", "rpcRegisterName", rpcRegisterName).Wrap() } - client, err := kdisc.NewDiscoveryRegister(discovery, runTimeEnv) + client, err := kdisc.NewDiscoveryRegister(discovery, runTimeEnv, watchServiceNames) if err != nil { return err }