mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-10-26 13:12:12 +08:00
feat: add enable config center
This commit is contained in:
parent
7b02b4b59b
commit
1b6fda21c6
@ -1,6 +1,7 @@
|
|||||||
package api
|
package api
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"reflect"
|
"reflect"
|
||||||
"strconv"
|
"strconv"
|
||||||
@ -19,12 +20,18 @@ import (
|
|||||||
clientv3 "go.etcd.io/etcd/client/v3"
|
clientv3 "go.etcd.io/etcd/client/v3"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// wait for Restart http call return
|
||||||
|
waitHttp = time.Millisecond * 200
|
||||||
|
)
|
||||||
|
|
||||||
type ConfigManager struct {
|
type ConfigManager struct {
|
||||||
imAdminUserID []string
|
imAdminUserID []string
|
||||||
config *config.AllConfig
|
config *config.AllConfig
|
||||||
client *clientv3.Client
|
client *clientv3.Client
|
||||||
configPath string
|
|
||||||
runtimeEnv string
|
configPath string
|
||||||
|
runtimeEnv string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewConfigManager(IMAdminUserID []string, cfg *config.AllConfig, client *clientv3.Client, configPath string, runtimeEnv string) *ConfigManager {
|
func NewConfigManager(IMAdminUserID []string, cfg *config.AllConfig, client *clientv3.Client, configPath string, runtimeEnv string) *ConfigManager {
|
||||||
@ -85,49 +92,49 @@ func (cm *ConfigManager) SetConfig(c *gin.Context) {
|
|||||||
var err error
|
var err error
|
||||||
switch req.ConfigName {
|
switch req.ConfigName {
|
||||||
case cm.config.Discovery.GetConfigFileName():
|
case cm.config.Discovery.GetConfigFileName():
|
||||||
err = compareAndSave[config.Discovery](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
|
err = compareAndSave[config.Discovery](c, cm.config.Name2Config(req.ConfigName), &req, cm)
|
||||||
case cm.config.Kafka.GetConfigFileName():
|
case cm.config.Kafka.GetConfigFileName():
|
||||||
err = compareAndSave[config.Kafka](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
|
err = compareAndSave[config.Kafka](c, cm.config.Name2Config(req.ConfigName), &req, cm)
|
||||||
case cm.config.LocalCache.GetConfigFileName():
|
case cm.config.LocalCache.GetConfigFileName():
|
||||||
err = compareAndSave[config.LocalCache](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
|
err = compareAndSave[config.LocalCache](c, cm.config.Name2Config(req.ConfigName), &req, cm)
|
||||||
case cm.config.Log.GetConfigFileName():
|
case cm.config.Log.GetConfigFileName():
|
||||||
err = compareAndSave[config.Log](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
|
err = compareAndSave[config.Log](c, cm.config.Name2Config(req.ConfigName), &req, cm)
|
||||||
case cm.config.Minio.GetConfigFileName():
|
case cm.config.Minio.GetConfigFileName():
|
||||||
err = compareAndSave[config.Minio](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
|
err = compareAndSave[config.Minio](c, cm.config.Name2Config(req.ConfigName), &req, cm)
|
||||||
case cm.config.Mongo.GetConfigFileName():
|
case cm.config.Mongo.GetConfigFileName():
|
||||||
err = compareAndSave[config.Mongo](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
|
err = compareAndSave[config.Mongo](c, cm.config.Name2Config(req.ConfigName), &req, cm)
|
||||||
case cm.config.Notification.GetConfigFileName():
|
case cm.config.Notification.GetConfigFileName():
|
||||||
err = compareAndSave[config.Notification](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
|
err = compareAndSave[config.Notification](c, cm.config.Name2Config(req.ConfigName), &req, cm)
|
||||||
case cm.config.API.GetConfigFileName():
|
case cm.config.API.GetConfigFileName():
|
||||||
err = compareAndSave[config.API](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
|
err = compareAndSave[config.API](c, cm.config.Name2Config(req.ConfigName), &req, cm)
|
||||||
case cm.config.CronTask.GetConfigFileName():
|
case cm.config.CronTask.GetConfigFileName():
|
||||||
err = compareAndSave[config.CronTask](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
|
err = compareAndSave[config.CronTask](c, cm.config.Name2Config(req.ConfigName), &req, cm)
|
||||||
case cm.config.MsgGateway.GetConfigFileName():
|
case cm.config.MsgGateway.GetConfigFileName():
|
||||||
err = compareAndSave[config.MsgGateway](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
|
err = compareAndSave[config.MsgGateway](c, cm.config.Name2Config(req.ConfigName), &req, cm)
|
||||||
case cm.config.MsgTransfer.GetConfigFileName():
|
case cm.config.MsgTransfer.GetConfigFileName():
|
||||||
err = compareAndSave[config.MsgTransfer](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
|
err = compareAndSave[config.MsgTransfer](c, cm.config.Name2Config(req.ConfigName), &req, cm)
|
||||||
case cm.config.Push.GetConfigFileName():
|
case cm.config.Push.GetConfigFileName():
|
||||||
err = compareAndSave[config.Push](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
|
err = compareAndSave[config.Push](c, cm.config.Name2Config(req.ConfigName), &req, cm)
|
||||||
case cm.config.Auth.GetConfigFileName():
|
case cm.config.Auth.GetConfigFileName():
|
||||||
err = compareAndSave[config.Auth](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
|
err = compareAndSave[config.Auth](c, cm.config.Name2Config(req.ConfigName), &req, cm)
|
||||||
case cm.config.Conversation.GetConfigFileName():
|
case cm.config.Conversation.GetConfigFileName():
|
||||||
err = compareAndSave[config.Conversation](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
|
err = compareAndSave[config.Conversation](c, cm.config.Name2Config(req.ConfigName), &req, cm)
|
||||||
case cm.config.Friend.GetConfigFileName():
|
case cm.config.Friend.GetConfigFileName():
|
||||||
err = compareAndSave[config.Friend](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
|
err = compareAndSave[config.Friend](c, cm.config.Name2Config(req.ConfigName), &req, cm)
|
||||||
case cm.config.Group.GetConfigFileName():
|
case cm.config.Group.GetConfigFileName():
|
||||||
err = compareAndSave[config.Group](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
|
err = compareAndSave[config.Group](c, cm.config.Name2Config(req.ConfigName), &req, cm)
|
||||||
case cm.config.Msg.GetConfigFileName():
|
case cm.config.Msg.GetConfigFileName():
|
||||||
err = compareAndSave[config.Msg](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
|
err = compareAndSave[config.Msg](c, cm.config.Name2Config(req.ConfigName), &req, cm)
|
||||||
case cm.config.Third.GetConfigFileName():
|
case cm.config.Third.GetConfigFileName():
|
||||||
err = compareAndSave[config.Third](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
|
err = compareAndSave[config.Third](c, cm.config.Name2Config(req.ConfigName), &req, cm)
|
||||||
case cm.config.User.GetConfigFileName():
|
case cm.config.User.GetConfigFileName():
|
||||||
err = compareAndSave[config.User](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
|
err = compareAndSave[config.User](c, cm.config.Name2Config(req.ConfigName), &req, cm)
|
||||||
case cm.config.Redis.GetConfigFileName():
|
case cm.config.Redis.GetConfigFileName():
|
||||||
err = compareAndSave[config.Redis](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
|
err = compareAndSave[config.Redis](c, cm.config.Name2Config(req.ConfigName), &req, cm)
|
||||||
case cm.config.Share.GetConfigFileName():
|
case cm.config.Share.GetConfigFileName():
|
||||||
err = compareAndSave[config.Share](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
|
err = compareAndSave[config.Share](c, cm.config.Name2Config(req.ConfigName), &req, cm)
|
||||||
case cm.config.Webhooks.GetConfigFileName():
|
case cm.config.Webhooks.GetConfigFileName():
|
||||||
err = compareAndSave[config.Webhooks](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
|
err = compareAndSave[config.Webhooks](c, cm.config.Name2Config(req.ConfigName), &req, cm)
|
||||||
default:
|
default:
|
||||||
apiresp.GinError(c, errs.ErrArgs.Wrap())
|
apiresp.GinError(c, errs.ErrArgs.Wrap())
|
||||||
return
|
return
|
||||||
@ -139,7 +146,7 @@ func (cm *ConfigManager) SetConfig(c *gin.Context) {
|
|||||||
apiresp.GinSuccess(c, nil)
|
apiresp.GinSuccess(c, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func compareAndSave[T any](c *gin.Context, old any, req *apistruct.SetConfigReq, client *clientv3.Client) error {
|
func compareAndSave[T any](c *gin.Context, old any, req *apistruct.SetConfigReq, cm *ConfigManager) error {
|
||||||
conf := new(T)
|
conf := new(T)
|
||||||
err := json.Unmarshal([]byte(req.Data), &conf)
|
err := json.Unmarshal([]byte(req.Data), &conf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -153,7 +160,7 @@ func compareAndSave[T any](c *gin.Context, old any, req *apistruct.SetConfigReq,
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return errs.ErrArgs.WithDetail(err.Error()).Wrap()
|
return errs.ErrArgs.WithDetail(err.Error()).Wrap()
|
||||||
}
|
}
|
||||||
_, err = client.Put(c, etcd.BuildKey(req.ConfigName), string(data))
|
_, err = cm.client.Put(c, etcd.BuildKey(req.ConfigName), string(data))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errs.WrapMsg(err, "save to etcd failed")
|
return errs.WrapMsg(err, "save to etcd failed")
|
||||||
}
|
}
|
||||||
@ -241,10 +248,101 @@ func (cm *ConfigManager) Restart(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (cm *ConfigManager) restart(c *gin.Context) {
|
func (cm *ConfigManager) restart(c *gin.Context) {
|
||||||
time.Sleep(time.Millisecond * 200) // wait for Restart http call return
|
time.Sleep(waitHttp) // wait for Restart http call return
|
||||||
t := time.Now().Unix()
|
t := time.Now().Unix()
|
||||||
_, err := cm.client.Put(c, etcd.BuildKey(etcd.RestartKey), strconv.Itoa(int(t)))
|
_, err := cm.client.Put(c, etcd.BuildKey(etcd.RestartKey), strconv.Itoa(int(t)))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.ZError(c, "restart etcd put key failed", err)
|
log.ZError(c, "restart etcd put key failed", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (cm *ConfigManager) SetEnableConfigManager(c *gin.Context) {
|
||||||
|
var req apistruct.SetEnableConfigManagerReq
|
||||||
|
if err := c.BindJSON(&req); err != nil {
|
||||||
|
apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var enableStr string
|
||||||
|
if req.Enable {
|
||||||
|
enableStr = etcd.Enable
|
||||||
|
} else {
|
||||||
|
enableStr = etcd.Disable
|
||||||
|
}
|
||||||
|
resp, err := cm.client.Get(c, etcd.BuildKey(etcd.EnableConfigCenterKey))
|
||||||
|
if err != nil {
|
||||||
|
apiresp.GinError(c, errs.WrapMsg(err, "getEnableConfigManager failed"))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !(resp.Count > 0 && string(resp.Kvs[0].Value) == etcd.Enable) && req.Enable {
|
||||||
|
go func() {
|
||||||
|
time.Sleep(waitHttp) // wait for Restart http call return
|
||||||
|
err := cm.writeAllConfig(c, clientv3.OpPut(etcd.BuildKey(etcd.EnableConfigCenterKey), enableStr))
|
||||||
|
if err != nil {
|
||||||
|
log.ZError(c, "writeAllConfig failed", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
} else {
|
||||||
|
_, err = cm.client.Put(c, etcd.BuildKey(etcd.EnableConfigCenterKey), enableStr)
|
||||||
|
if err != nil {
|
||||||
|
apiresp.GinError(c, errs.WrapMsg(err, "setEnableConfigManager failed"))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
apiresp.GinSuccess(c, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cm *ConfigManager) GetEnableConfigManager(c *gin.Context) {
|
||||||
|
resp, err := cm.client.Get(c, etcd.BuildKey(etcd.EnableConfigCenterKey))
|
||||||
|
if err != nil {
|
||||||
|
apiresp.GinError(c, errs.WrapMsg(err, "getEnableConfigManager failed"))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var enable bool
|
||||||
|
if resp.Count > 0 && string(resp.Kvs[0].Value) == etcd.Enable {
|
||||||
|
enable = true
|
||||||
|
}
|
||||||
|
apiresp.GinSuccess(c, &apistruct.GetEnableConfigManagerResp{Enable: enable})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cm *ConfigManager) writeAllConfig(ctx context.Context, ops ...clientv3.Op) error {
|
||||||
|
getWriteConfigOp(ctx, cm.config.Discovery.GetConfigFileName(), cm.config.Discovery, &ops)
|
||||||
|
getWriteConfigOp(ctx, cm.config.Kafka.GetConfigFileName(), cm.config.Kafka, &ops)
|
||||||
|
getWriteConfigOp(ctx, cm.config.LocalCache.GetConfigFileName(), cm.config.LocalCache, &ops)
|
||||||
|
getWriteConfigOp(ctx, cm.config.Log.GetConfigFileName(), cm.config.Log, &ops)
|
||||||
|
getWriteConfigOp(ctx, cm.config.Minio.GetConfigFileName(), cm.config.Minio, &ops)
|
||||||
|
getWriteConfigOp(ctx, cm.config.Mongo.GetConfigFileName(), cm.config.Mongo, &ops)
|
||||||
|
getWriteConfigOp(ctx, cm.config.Notification.GetConfigFileName(), cm.config.Notification, &ops)
|
||||||
|
getWriteConfigOp(ctx, cm.config.API.GetConfigFileName(), cm.config.API, &ops)
|
||||||
|
getWriteConfigOp(ctx, cm.config.CronTask.GetConfigFileName(), cm.config.CronTask, &ops)
|
||||||
|
getWriteConfigOp(ctx, cm.config.MsgGateway.GetConfigFileName(), cm.config.MsgGateway, &ops)
|
||||||
|
getWriteConfigOp(ctx, cm.config.MsgTransfer.GetConfigFileName(), cm.config.MsgTransfer, &ops)
|
||||||
|
getWriteConfigOp(ctx, cm.config.Push.GetConfigFileName(), cm.config.Push, &ops)
|
||||||
|
getWriteConfigOp(ctx, cm.config.Auth.GetConfigFileName(), cm.config.Auth, &ops)
|
||||||
|
getWriteConfigOp(ctx, cm.config.Conversation.GetConfigFileName(), cm.config.Conversation, &ops)
|
||||||
|
getWriteConfigOp(ctx, cm.config.Friend.GetConfigFileName(), cm.config.Friend, &ops)
|
||||||
|
getWriteConfigOp(ctx, cm.config.Group.GetConfigFileName(), cm.config.Group, &ops)
|
||||||
|
getWriteConfigOp(ctx, cm.config.Msg.GetConfigFileName(), cm.config.Msg, &ops)
|
||||||
|
getWriteConfigOp(ctx, cm.config.Third.GetConfigFileName(), cm.config.Third, &ops)
|
||||||
|
getWriteConfigOp(ctx, cm.config.User.GetConfigFileName(), cm.config.User, &ops)
|
||||||
|
getWriteConfigOp(ctx, cm.config.Redis.GetConfigFileName(), cm.config.Redis, &ops)
|
||||||
|
getWriteConfigOp(ctx, cm.config.Share.GetConfigFileName(), cm.config.Share, &ops)
|
||||||
|
getWriteConfigOp(ctx, cm.config.Webhooks.GetConfigFileName(), cm.config.Webhooks, &ops)
|
||||||
|
txn := cm.client.Txn(ctx)
|
||||||
|
txn.Then(ops...)
|
||||||
|
_, err := txn.Commit()
|
||||||
|
if err != nil {
|
||||||
|
return errs.WrapMsg(err, "writeAllConfig failed commit")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func getWriteConfigOp[T any](ctx context.Context, key string, config T, ops *[]clientv3.Op) {
|
||||||
|
data, err := json.Marshal(config)
|
||||||
|
if err != nil {
|
||||||
|
log.ZError(ctx, "marshal config failed", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
*ops = append(*ops, clientv3.OpPut(key, string(data)))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|||||||
@ -314,6 +314,8 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, cf
|
|||||||
configGroup.POST("/get_config", cm.GetConfig)
|
configGroup.POST("/get_config", cm.GetConfig)
|
||||||
configGroup.POST("/set_config", cm.SetConfig)
|
configGroup.POST("/set_config", cm.SetConfig)
|
||||||
configGroup.POST("/reset_config", cm.ResetConfig)
|
configGroup.POST("/reset_config", cm.ResetConfig)
|
||||||
|
configGroup.POST("/set_enable_config_manager", cm.SetEnableConfigManager)
|
||||||
|
configGroup.POST("/ ", cm.GetEnableConfigManager)
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
r.POST("/restart", cm.CheckAdmin, cm.Restart)
|
r.POST("/restart", cm.CheckAdmin, cm.Restart)
|
||||||
|
|||||||
@ -14,3 +14,11 @@ type SetConfigReq struct {
|
|||||||
ConfigName string `json:"configName"`
|
ConfigName string `json:"configName"`
|
||||||
Data string `json:"data"`
|
Data string `json:"data"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type SetEnableConfigManagerReq struct {
|
||||||
|
Enable bool `json:"enable"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type GetEnableConfigManagerResp struct {
|
||||||
|
Enable bool `json:"enable"`
|
||||||
|
}
|
||||||
|
|||||||
@ -141,9 +141,24 @@ func (r *RootCmd) updateConfigFromEtcd(opts *CmdOpts) error {
|
|||||||
if r.etcdClient == nil {
|
if r.etcdClient == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
ctx := context.TODO()
|
||||||
|
|
||||||
|
res, err := r.etcdClient.Get(ctx, disetcd.BuildKey(disetcd.EnableConfigCenterKey))
|
||||||
|
if err != nil {
|
||||||
|
log.ZWarn(ctx, "root cmd updateConfigFromEtcd, etcd Get EnableConfigCenterKey err: %v", errs.Wrap(err))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if res.Count == 0 {
|
||||||
|
return nil
|
||||||
|
} else {
|
||||||
|
if string(res.Kvs[0].Value) == disetcd.Disable {
|
||||||
|
return nil
|
||||||
|
} else if string(res.Kvs[0].Value) != disetcd.Enable {
|
||||||
|
return errs.New("unknown EnableConfigCenter value").Wrap()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
update := func(configFileName string, configStruct any) error {
|
update := func(configFileName string, configStruct any) error {
|
||||||
ctx := context.TODO()
|
|
||||||
key := disetcd.BuildKey(configFileName)
|
key := disetcd.BuildKey(configFileName)
|
||||||
etcdRes, err := r.etcdClient.Get(ctx, key)
|
etcdRes, err := r.etcdClient.Get(ctx, key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@ -14,11 +14,6 @@ import (
|
|||||||
clientv3 "go.etcd.io/etcd/client/v3"
|
clientv3 "go.etcd.io/etcd/client/v3"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
|
||||||
ConfigKeyPrefix = "/open-im/config/"
|
|
||||||
RestartKey = "restart"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
var (
|
||||||
ShutDowns []func() error
|
ShutDowns []func() error
|
||||||
)
|
)
|
||||||
|
|||||||
9
pkg/common/discovery/etcd/const.go
Normal file
9
pkg/common/discovery/etcd/const.go
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
package etcd
|
||||||
|
|
||||||
|
const (
|
||||||
|
ConfigKeyPrefix = "/open-im/config/"
|
||||||
|
RestartKey = "restart"
|
||||||
|
EnableConfigCenterKey = "enable-config-center"
|
||||||
|
Enable = "enable"
|
||||||
|
Disable = "disable"
|
||||||
|
)
|
||||||
@ -1,15 +0,0 @@
|
|||||||
// Copyright © 2024 OpenIM. All rights reserved.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package etcd // import "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd"
|
|
||||||
Loading…
x
Reference in New Issue
Block a user