diff --git a/go.mod b/go.mod index fb839f0b5..99fa917fa 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 github.com/openimsdk/protocol v0.0.72-alpha.71 - github.com/openimsdk/tools v0.0.50-alpha.65 + github.com/openimsdk/tools v0.0.50-alpha.67 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 diff --git a/go.sum b/go.sum index 160111163..0b8c86325 100644 --- a/go.sum +++ b/go.sum @@ -317,12 +317,12 @@ github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= -github.com/openimsdk/gomake v0.0.15-alpha.2 h1:5Q8yl8ezy2yx+q8/ucU/t4kJnDfCzNOrkXcDACCqtyM= -github.com/openimsdk/gomake v0.0.15-alpha.2/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= +github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= +github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/protocol v0.0.72-alpha.71 h1:R3utzOlqepaJWTAmnfJi4ccUM/XIoFasSyjQMOipM70= github.com/openimsdk/protocol v0.0.72-alpha.71/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= -github.com/openimsdk/tools v0.0.50-alpha.65 h1:BRtxkyWxDWPHuHphSwEyHZj7kJSR98am/fHOH84naK8= -github.com/openimsdk/tools v0.0.50-alpha.65/go.mod h1:B+oqV0zdewN7OiEHYJm+hW+8/Te7B8tHHgD8rK5ZLZk= +github.com/openimsdk/tools v0.0.50-alpha.67 h1:K7kguqvPbjldHAi7pGhcG2ERkctCqG9ZFlteT7UKaxM= +github.com/openimsdk/tools v0.0.50-alpha.67/go.mod h1:B+oqV0zdewN7OiEHYJm+hW+8/Te7B8tHHgD8rK5ZLZk= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= diff --git a/internal/api/config_manager.go b/internal/api/config_manager.go index c61b2cb0b..a3540d1cf 100644 --- a/internal/api/config_manager.go +++ b/internal/api/config_manager.go @@ -1,312 +1,313 @@ package api -import ( - "encoding/json" - "reflect" - "strconv" - "time" - - "github.com/gin-gonic/gin" - "github.com/openimsdk/open-im-server/v3/pkg/apistruct" - "github.com/openimsdk/open-im-server/v3/pkg/authverify" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd" - "github.com/openimsdk/open-im-server/v3/version" - "github.com/openimsdk/tools/apiresp" - "github.com/openimsdk/tools/errs" - "github.com/openimsdk/tools/log" - "github.com/openimsdk/tools/utils/runtimeenv" - clientv3 "go.etcd.io/etcd/client/v3" -) - -const ( - // wait for Restart http call return - waitHttp = time.Millisecond * 200 -) - -type ConfigManager struct { - imAdminUserID []string - config *config.AllConfig - client *clientv3.Client - - configPath string - runtimeEnv string -} - -func NewConfigManager(IMAdminUserID []string, cfg *config.AllConfig, client *clientv3.Client, configPath string, runtimeEnv string) *ConfigManager { - cm := &ConfigManager{ - imAdminUserID: IMAdminUserID, - config: cfg, - client: client, - configPath: configPath, - runtimeEnv: runtimeEnv, - } - return cm -} - -func (cm *ConfigManager) CheckAdmin(c *gin.Context) { - if err := authverify.CheckAdmin(c, cm.imAdminUserID); err != nil { - apiresp.GinError(c, err) - c.Abort() - } -} - -func (cm *ConfigManager) GetConfig(c *gin.Context) { - var req apistruct.GetConfigReq - if err := c.BindJSON(&req); err != nil { - apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) - return - } - conf := cm.config.Name2Config(req.ConfigName) - if conf == nil { - apiresp.GinError(c, errs.ErrArgs.WithDetail("config name not found").Wrap()) - return - } - b, err := json.Marshal(conf) - if err != nil { - apiresp.GinError(c, err) - return - } - apiresp.GinSuccess(c, string(b)) -} - -func (cm *ConfigManager) GetConfigList(c *gin.Context) { - var resp apistruct.GetConfigListResp - resp.ConfigNames = cm.config.GetConfigNames() - resp.Environment = runtimeenv.PrintRuntimeEnvironment() - resp.Version = version.Version - - apiresp.GinSuccess(c, resp) -} - -func (cm *ConfigManager) SetConfig(c *gin.Context) { - if cm.config.Discovery.Enable != config.ETCD { - apiresp.GinError(c, errs.New("only etcd support set config").Wrap()) - return - } - var req apistruct.SetConfigReq - if err := c.BindJSON(&req); err != nil { - apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) - return - } - var err error - switch req.ConfigName { - case cm.config.Discovery.GetConfigFileName(): - err = compareAndSave[config.Discovery](c, cm.config.Name2Config(req.ConfigName), &req, cm) - case cm.config.Kafka.GetConfigFileName(): - err = compareAndSave[config.Kafka](c, cm.config.Name2Config(req.ConfigName), &req, cm) - case cm.config.LocalCache.GetConfigFileName(): - err = compareAndSave[config.LocalCache](c, cm.config.Name2Config(req.ConfigName), &req, cm) - case cm.config.Log.GetConfigFileName(): - err = compareAndSave[config.Log](c, cm.config.Name2Config(req.ConfigName), &req, cm) - case cm.config.Minio.GetConfigFileName(): - err = compareAndSave[config.Minio](c, cm.config.Name2Config(req.ConfigName), &req, cm) - case cm.config.Mongo.GetConfigFileName(): - err = compareAndSave[config.Mongo](c, cm.config.Name2Config(req.ConfigName), &req, cm) - case cm.config.Notification.GetConfigFileName(): - err = compareAndSave[config.Notification](c, cm.config.Name2Config(req.ConfigName), &req, cm) - case cm.config.API.GetConfigFileName(): - err = compareAndSave[config.API](c, cm.config.Name2Config(req.ConfigName), &req, cm) - case cm.config.CronTask.GetConfigFileName(): - err = compareAndSave[config.CronTask](c, cm.config.Name2Config(req.ConfigName), &req, cm) - case cm.config.MsgGateway.GetConfigFileName(): - err = compareAndSave[config.MsgGateway](c, cm.config.Name2Config(req.ConfigName), &req, cm) - case cm.config.MsgTransfer.GetConfigFileName(): - err = compareAndSave[config.MsgTransfer](c, cm.config.Name2Config(req.ConfigName), &req, cm) - case cm.config.Push.GetConfigFileName(): - err = compareAndSave[config.Push](c, cm.config.Name2Config(req.ConfigName), &req, cm) - case cm.config.Auth.GetConfigFileName(): - err = compareAndSave[config.Auth](c, cm.config.Name2Config(req.ConfigName), &req, cm) - case cm.config.Conversation.GetConfigFileName(): - err = compareAndSave[config.Conversation](c, cm.config.Name2Config(req.ConfigName), &req, cm) - case cm.config.Friend.GetConfigFileName(): - err = compareAndSave[config.Friend](c, cm.config.Name2Config(req.ConfigName), &req, cm) - case cm.config.Group.GetConfigFileName(): - err = compareAndSave[config.Group](c, cm.config.Name2Config(req.ConfigName), &req, cm) - case cm.config.Msg.GetConfigFileName(): - err = compareAndSave[config.Msg](c, cm.config.Name2Config(req.ConfigName), &req, cm) - case cm.config.Third.GetConfigFileName(): - err = compareAndSave[config.Third](c, cm.config.Name2Config(req.ConfigName), &req, cm) - case cm.config.User.GetConfigFileName(): - err = compareAndSave[config.User](c, cm.config.Name2Config(req.ConfigName), &req, cm) - case cm.config.Redis.GetConfigFileName(): - err = compareAndSave[config.Redis](c, cm.config.Name2Config(req.ConfigName), &req, cm) - case cm.config.Share.GetConfigFileName(): - err = compareAndSave[config.Share](c, cm.config.Name2Config(req.ConfigName), &req, cm) - case cm.config.Webhooks.GetConfigFileName(): - err = compareAndSave[config.Webhooks](c, cm.config.Name2Config(req.ConfigName), &req, cm) - default: - apiresp.GinError(c, errs.ErrArgs.Wrap()) - return - } - if err != nil { - apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) - return - } - apiresp.GinSuccess(c, nil) -} - -func compareAndSave[T any](c *gin.Context, old any, req *apistruct.SetConfigReq, cm *ConfigManager) error { - conf := new(T) - err := json.Unmarshal([]byte(req.Data), &conf) - if err != nil { - return errs.ErrArgs.WithDetail(err.Error()).Wrap() - } - eq := reflect.DeepEqual(old, conf) - if eq { - return nil - } - data, err := json.Marshal(conf) - if err != nil { - return errs.ErrArgs.WithDetail(err.Error()).Wrap() - } - _, err = cm.client.Put(c, etcd.BuildKey(req.ConfigName), string(data)) - if err != nil { - return errs.WrapMsg(err, "save to etcd failed") - } - return nil -} - -func (cm *ConfigManager) ResetConfig(c *gin.Context) { - go func() { - if err := cm.resetConfig(c, true); err != nil { - log.ZError(c, "reset config err", err) - } - }() - apiresp.GinSuccess(c, nil) -} - -func (cm *ConfigManager) resetConfig(c *gin.Context, checkChange bool, ops ...clientv3.Op) error { - txn := cm.client.Txn(c) - type initConf struct { - old any - new any - } - configMap := map[string]*initConf{ - cm.config.Discovery.GetConfigFileName(): {old: &cm.config.Discovery, new: new(config.Discovery)}, - cm.config.Kafka.GetConfigFileName(): {old: &cm.config.Kafka, new: new(config.Kafka)}, - cm.config.LocalCache.GetConfigFileName(): {old: &cm.config.LocalCache, new: new(config.LocalCache)}, - cm.config.Log.GetConfigFileName(): {old: &cm.config.Log, new: new(config.Log)}, - cm.config.Minio.GetConfigFileName(): {old: &cm.config.Minio, new: new(config.Minio)}, - cm.config.Mongo.GetConfigFileName(): {old: &cm.config.Mongo, new: new(config.Mongo)}, - cm.config.Notification.GetConfigFileName(): {old: &cm.config.Notification, new: new(config.Notification)}, - cm.config.API.GetConfigFileName(): {old: &cm.config.API, new: new(config.API)}, - cm.config.CronTask.GetConfigFileName(): {old: &cm.config.CronTask, new: new(config.CronTask)}, - cm.config.MsgGateway.GetConfigFileName(): {old: &cm.config.MsgGateway, new: new(config.MsgGateway)}, - cm.config.MsgTransfer.GetConfigFileName(): {old: &cm.config.MsgTransfer, new: new(config.MsgTransfer)}, - cm.config.Push.GetConfigFileName(): {old: &cm.config.Push, new: new(config.Push)}, - cm.config.Auth.GetConfigFileName(): {old: &cm.config.Auth, new: new(config.Auth)}, - cm.config.Conversation.GetConfigFileName(): {old: &cm.config.Conversation, new: new(config.Conversation)}, - cm.config.Friend.GetConfigFileName(): {old: &cm.config.Friend, new: new(config.Friend)}, - cm.config.Group.GetConfigFileName(): {old: &cm.config.Group, new: new(config.Group)}, - cm.config.Msg.GetConfigFileName(): {old: &cm.config.Msg, new: new(config.Msg)}, - cm.config.Third.GetConfigFileName(): {old: &cm.config.Third, new: new(config.Third)}, - cm.config.User.GetConfigFileName(): {old: &cm.config.User, new: new(config.User)}, - cm.config.Redis.GetConfigFileName(): {old: &cm.config.Redis, new: new(config.Redis)}, - cm.config.Share.GetConfigFileName(): {old: &cm.config.Share, new: new(config.Share)}, - cm.config.Webhooks.GetConfigFileName(): {old: &cm.config.Webhooks, new: new(config.Webhooks)}, - } - - changedKeys := make([]string, 0, len(configMap)) - for k, v := range configMap { - err := config.Load( - cm.configPath, - k, - config.EnvPrefixMap[k], - cm.runtimeEnv, - v.new, - ) - if err != nil { - log.ZError(c, "load config failed", err) - continue - } - equal := reflect.DeepEqual(v.old, v.new) - if !checkChange || !equal { - changedKeys = append(changedKeys, k) - } - } - - for _, k := range changedKeys { - data, err := json.Marshal(configMap[k].new) - if err != nil { - log.ZError(c, "marshal config failed", err) - continue - } - ops = append(ops, clientv3.OpPut(etcd.BuildKey(k), string(data))) - } - if len(ops) > 0 { - txn.Then(ops...) - _, err := txn.Commit() - if err != nil { - return errs.WrapMsg(err, "commit etcd txn failed") - } - } - return nil -} - -func (cm *ConfigManager) Restart(c *gin.Context) { - go cm.restart(c) - apiresp.GinSuccess(c, nil) -} - -func (cm *ConfigManager) restart(c *gin.Context) { - time.Sleep(waitHttp) // wait for Restart http call return - t := time.Now().Unix() - _, err := cm.client.Put(c, etcd.BuildKey(etcd.RestartKey), strconv.Itoa(int(t))) - if err != nil { - log.ZError(c, "restart etcd put key failed", err) - } -} - -func (cm *ConfigManager) SetEnableConfigManager(c *gin.Context) { - if cm.config.Discovery.Enable != config.ETCD { - apiresp.GinError(c, errs.New("only etcd support config manager").Wrap()) - return - } - var req apistruct.SetEnableConfigManagerReq - if err := c.BindJSON(&req); err != nil { - apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) - return - } - var enableStr string - if req.Enable { - enableStr = etcd.Enable - } else { - enableStr = etcd.Disable - } - resp, err := cm.client.Get(c, etcd.BuildKey(etcd.EnableConfigCenterKey)) - if err != nil { - apiresp.GinError(c, errs.WrapMsg(err, "getEnableConfigManager failed")) - return - } - if !(resp.Count > 0 && string(resp.Kvs[0].Value) == etcd.Enable) && req.Enable { - go func() { - time.Sleep(waitHttp) // wait for Restart http call return - err := cm.resetConfig(c, false, clientv3.OpPut(etcd.BuildKey(etcd.EnableConfigCenterKey), enableStr)) - if err != nil { - log.ZError(c, "resetConfig failed", err) - } - }() - } else { - _, err = cm.client.Put(c, etcd.BuildKey(etcd.EnableConfigCenterKey), enableStr) - if err != nil { - apiresp.GinError(c, errs.WrapMsg(err, "setEnableConfigManager failed")) - return - } - } - - apiresp.GinSuccess(c, nil) -} - -func (cm *ConfigManager) GetEnableConfigManager(c *gin.Context) { - resp, err := cm.client.Get(c, etcd.BuildKey(etcd.EnableConfigCenterKey)) - if err != nil { - apiresp.GinError(c, errs.WrapMsg(err, "getEnableConfigManager failed")) - return - } - var enable bool - if resp.Count > 0 && string(resp.Kvs[0].Value) == etcd.Enable { - enable = true - } - apiresp.GinSuccess(c, &apistruct.GetEnableConfigManagerResp{Enable: enable}) -} +// +//import ( +// "encoding/json" +// "reflect" +// "strconv" +// "time" +// +// "github.com/gin-gonic/gin" +// "github.com/openimsdk/open-im-server/v3/pkg/apistruct" +// "github.com/openimsdk/open-im-server/v3/pkg/authverify" +// "github.com/openimsdk/open-im-server/v3/pkg/common/config" +// "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd" +// "github.com/openimsdk/open-im-server/v3/version" +// "github.com/openimsdk/tools/apiresp" +// "github.com/openimsdk/tools/errs" +// "github.com/openimsdk/tools/log" +// "github.com/openimsdk/tools/utils/runtimeenv" +// clientv3 "go.etcd.io/etcd/client/v3" +//) +// +//const ( +// // wait for Restart http call return +// waitHttp = time.Millisecond * 200 +//) +// +//type ConfigManager struct { +// imAdminUserID []string +// config *config.AllConfig +// client *clientv3.Client +// +// configPath string +// runtimeEnv string +//} +// +//func NewConfigManager(IMAdminUserID []string, cfg *config.AllConfig, client *clientv3.Client, configPath string, runtimeEnv string) *ConfigManager { +// cm := &ConfigManager{ +// imAdminUserID: IMAdminUserID, +// config: cfg, +// client: client, +// configPath: configPath, +// runtimeEnv: runtimeEnv, +// } +// return cm +//} +// +//func (cm *ConfigManager) CheckAdmin(c *gin.Context) { +// if err := authverify.CheckAdmin(c, cm.imAdminUserID); err != nil { +// apiresp.GinError(c, err) +// c.Abort() +// } +//} +// +//func (cm *ConfigManager) GetConfig(c *gin.Context) { +// var req apistruct.GetConfigReq +// if err := c.BindJSON(&req); err != nil { +// apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) +// return +// } +// conf := cm.config.Name2Config(req.ConfigName) +// if conf == nil { +// apiresp.GinError(c, errs.ErrArgs.WithDetail("config name not found").Wrap()) +// return +// } +// b, err := json.Marshal(conf) +// if err != nil { +// apiresp.GinError(c, err) +// return +// } +// apiresp.GinSuccess(c, string(b)) +//} +// +//func (cm *ConfigManager) GetConfigList(c *gin.Context) { +// var resp apistruct.GetConfigListResp +// resp.ConfigNames = cm.config.GetConfigNames() +// resp.Environment = runtimeenv.PrintRuntimeEnvironment() +// resp.Version = version.Version +// +// apiresp.GinSuccess(c, resp) +//} +// +//func (cm *ConfigManager) SetConfig(c *gin.Context) { +// if cm.config.Discovery.Enable != config.ETCD { +// apiresp.GinError(c, errs.New("only etcd support set config").Wrap()) +// return +// } +// var req apistruct.SetConfigReq +// if err := c.BindJSON(&req); err != nil { +// apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) +// return +// } +// var err error +// switch req.ConfigName { +// case cm.config.Discovery.GetConfigFileName(): +// err = compareAndSave[config.Discovery](c, cm.config.Name2Config(req.ConfigName), &req, cm) +// case cm.config.Kafka.GetConfigFileName(): +// err = compareAndSave[config.Kafka](c, cm.config.Name2Config(req.ConfigName), &req, cm) +// case cm.config.LocalCache.GetConfigFileName(): +// err = compareAndSave[config.LocalCache](c, cm.config.Name2Config(req.ConfigName), &req, cm) +// case cm.config.Log.GetConfigFileName(): +// err = compareAndSave[config.Log](c, cm.config.Name2Config(req.ConfigName), &req, cm) +// case cm.config.Minio.GetConfigFileName(): +// err = compareAndSave[config.Minio](c, cm.config.Name2Config(req.ConfigName), &req, cm) +// case cm.config.Mongo.GetConfigFileName(): +// err = compareAndSave[config.Mongo](c, cm.config.Name2Config(req.ConfigName), &req, cm) +// case cm.config.Notification.GetConfigFileName(): +// err = compareAndSave[config.Notification](c, cm.config.Name2Config(req.ConfigName), &req, cm) +// case cm.config.API.GetConfigFileName(): +// err = compareAndSave[config.API](c, cm.config.Name2Config(req.ConfigName), &req, cm) +// case cm.config.CronTask.GetConfigFileName(): +// err = compareAndSave[config.CronTask](c, cm.config.Name2Config(req.ConfigName), &req, cm) +// case cm.config.MsgGateway.GetConfigFileName(): +// err = compareAndSave[config.MsgGateway](c, cm.config.Name2Config(req.ConfigName), &req, cm) +// case cm.config.MsgTransfer.GetConfigFileName(): +// err = compareAndSave[config.MsgTransfer](c, cm.config.Name2Config(req.ConfigName), &req, cm) +// case cm.config.Push.GetConfigFileName(): +// err = compareAndSave[config.Push](c, cm.config.Name2Config(req.ConfigName), &req, cm) +// case cm.config.Auth.GetConfigFileName(): +// err = compareAndSave[config.Auth](c, cm.config.Name2Config(req.ConfigName), &req, cm) +// case cm.config.Conversation.GetConfigFileName(): +// err = compareAndSave[config.Conversation](c, cm.config.Name2Config(req.ConfigName), &req, cm) +// case cm.config.Friend.GetConfigFileName(): +// err = compareAndSave[config.Friend](c, cm.config.Name2Config(req.ConfigName), &req, cm) +// case cm.config.Group.GetConfigFileName(): +// err = compareAndSave[config.Group](c, cm.config.Name2Config(req.ConfigName), &req, cm) +// case cm.config.Msg.GetConfigFileName(): +// err = compareAndSave[config.Msg](c, cm.config.Name2Config(req.ConfigName), &req, cm) +// case cm.config.Third.GetConfigFileName(): +// err = compareAndSave[config.Third](c, cm.config.Name2Config(req.ConfigName), &req, cm) +// case cm.config.User.GetConfigFileName(): +// err = compareAndSave[config.User](c, cm.config.Name2Config(req.ConfigName), &req, cm) +// case cm.config.Redis.GetConfigFileName(): +// err = compareAndSave[config.Redis](c, cm.config.Name2Config(req.ConfigName), &req, cm) +// case cm.config.Share.GetConfigFileName(): +// err = compareAndSave[config.Share](c, cm.config.Name2Config(req.ConfigName), &req, cm) +// case cm.config.Webhooks.GetConfigFileName(): +// err = compareAndSave[config.Webhooks](c, cm.config.Name2Config(req.ConfigName), &req, cm) +// default: +// apiresp.GinError(c, errs.ErrArgs.Wrap()) +// return +// } +// if err != nil { +// apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) +// return +// } +// apiresp.GinSuccess(c, nil) +//} +// +//func compareAndSave[T any](c *gin.Context, old any, req *apistruct.SetConfigReq, cm *ConfigManager) error { +// conf := new(T) +// err := json.Unmarshal([]byte(req.Data), &conf) +// if err != nil { +// return errs.ErrArgs.WithDetail(err.Error()).Wrap() +// } +// eq := reflect.DeepEqual(old, conf) +// if eq { +// return nil +// } +// data, err := json.Marshal(conf) +// if err != nil { +// return errs.ErrArgs.WithDetail(err.Error()).Wrap() +// } +// _, err = cm.client.Put(c, etcd.BuildKey(req.ConfigName), string(data)) +// if err != nil { +// return errs.WrapMsg(err, "save to etcd failed") +// } +// return nil +//} +// +//func (cm *ConfigManager) ResetConfig(c *gin.Context) { +// go func() { +// if err := cm.resetConfig(c, true); err != nil { +// log.ZError(c, "reset config err", err) +// } +// }() +// apiresp.GinSuccess(c, nil) +//} +// +//func (cm *ConfigManager) resetConfig(c *gin.Context, checkChange bool, ops ...clientv3.Op) error { +// txn := cm.client.Txn(c) +// type initConf struct { +// old any +// new any +// } +// configMap := map[string]*initConf{ +// cm.config.Discovery.GetConfigFileName(): {old: &cm.config.Discovery, new: new(config.Discovery)}, +// cm.config.Kafka.GetConfigFileName(): {old: &cm.config.Kafka, new: new(config.Kafka)}, +// cm.config.LocalCache.GetConfigFileName(): {old: &cm.config.LocalCache, new: new(config.LocalCache)}, +// cm.config.Log.GetConfigFileName(): {old: &cm.config.Log, new: new(config.Log)}, +// cm.config.Minio.GetConfigFileName(): {old: &cm.config.Minio, new: new(config.Minio)}, +// cm.config.Mongo.GetConfigFileName(): {old: &cm.config.Mongo, new: new(config.Mongo)}, +// cm.config.Notification.GetConfigFileName(): {old: &cm.config.Notification, new: new(config.Notification)}, +// cm.config.API.GetConfigFileName(): {old: &cm.config.API, new: new(config.API)}, +// cm.config.CronTask.GetConfigFileName(): {old: &cm.config.CronTask, new: new(config.CronTask)}, +// cm.config.MsgGateway.GetConfigFileName(): {old: &cm.config.MsgGateway, new: new(config.MsgGateway)}, +// cm.config.MsgTransfer.GetConfigFileName(): {old: &cm.config.MsgTransfer, new: new(config.MsgTransfer)}, +// cm.config.Push.GetConfigFileName(): {old: &cm.config.Push, new: new(config.Push)}, +// cm.config.Auth.GetConfigFileName(): {old: &cm.config.Auth, new: new(config.Auth)}, +// cm.config.Conversation.GetConfigFileName(): {old: &cm.config.Conversation, new: new(config.Conversation)}, +// cm.config.Friend.GetConfigFileName(): {old: &cm.config.Friend, new: new(config.Friend)}, +// cm.config.Group.GetConfigFileName(): {old: &cm.config.Group, new: new(config.Group)}, +// cm.config.Msg.GetConfigFileName(): {old: &cm.config.Msg, new: new(config.Msg)}, +// cm.config.Third.GetConfigFileName(): {old: &cm.config.Third, new: new(config.Third)}, +// cm.config.User.GetConfigFileName(): {old: &cm.config.User, new: new(config.User)}, +// cm.config.Redis.GetConfigFileName(): {old: &cm.config.Redis, new: new(config.Redis)}, +// cm.config.Share.GetConfigFileName(): {old: &cm.config.Share, new: new(config.Share)}, +// cm.config.Webhooks.GetConfigFileName(): {old: &cm.config.Webhooks, new: new(config.Webhooks)}, +// } +// +// changedKeys := make([]string, 0, len(configMap)) +// for k, v := range configMap { +// err := config.Load( +// cm.configPath, +// k, +// config.EnvPrefixMap[k], +// cm.runtimeEnv, +// v.new, +// ) +// if err != nil { +// log.ZError(c, "load config failed", err) +// continue +// } +// equal := reflect.DeepEqual(v.old, v.new) +// if !checkChange || !equal { +// changedKeys = append(changedKeys, k) +// } +// } +// +// for _, k := range changedKeys { +// data, err := json.Marshal(configMap[k].new) +// if err != nil { +// log.ZError(c, "marshal config failed", err) +// continue +// } +// ops = append(ops, clientv3.OpPut(etcd.BuildKey(k), string(data))) +// } +// if len(ops) > 0 { +// txn.Then(ops...) +// _, err := txn.Commit() +// if err != nil { +// return errs.WrapMsg(err, "commit etcd txn failed") +// } +// } +// return nil +//} +// +//func (cm *ConfigManager) Restart(c *gin.Context) { +// go cm.restart(c) +// apiresp.GinSuccess(c, nil) +//} +// +//func (cm *ConfigManager) restart(c *gin.Context) { +// time.Sleep(waitHttp) // wait for Restart http call return +// t := time.Now().Unix() +// _, err := cm.client.Put(c, etcd.BuildKey(etcd.RestartKey), strconv.Itoa(int(t))) +// if err != nil { +// log.ZError(c, "restart etcd put key failed", err) +// } +//} +// +//func (cm *ConfigManager) SetEnableConfigManager(c *gin.Context) { +// if cm.config.Discovery.Enable != config.ETCD { +// apiresp.GinError(c, errs.New("only etcd support config manager").Wrap()) +// return +// } +// var req apistruct.SetEnableConfigManagerReq +// if err := c.BindJSON(&req); err != nil { +// apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) +// return +// } +// var enableStr string +// if req.Enable { +// enableStr = etcd.Enable +// } else { +// enableStr = etcd.Disable +// } +// resp, err := cm.client.Get(c, etcd.BuildKey(etcd.EnableConfigCenterKey)) +// if err != nil { +// apiresp.GinError(c, errs.WrapMsg(err, "getEnableConfigManager failed")) +// return +// } +// if !(resp.Count > 0 && string(resp.Kvs[0].Value) == etcd.Enable) && req.Enable { +// go func() { +// time.Sleep(waitHttp) // wait for Restart http call return +// err := cm.resetConfig(c, false, clientv3.OpPut(etcd.BuildKey(etcd.EnableConfigCenterKey), enableStr)) +// if err != nil { +// log.ZError(c, "resetConfig failed", err) +// } +// }() +// } else { +// _, err = cm.client.Put(c, etcd.BuildKey(etcd.EnableConfigCenterKey), enableStr) +// if err != nil { +// apiresp.GinError(c, errs.WrapMsg(err, "setEnableConfigManager failed")) +// return +// } +// } +// +// apiresp.GinSuccess(c, nil) +//} +// +//func (cm *ConfigManager) GetEnableConfigManager(c *gin.Context) { +// resp, err := cm.client.Get(c, etcd.BuildKey(etcd.EnableConfigCenterKey)) +// if err != nil { +// apiresp.GinError(c, errs.WrapMsg(err, "getEnableConfigManager failed")) +// return +// } +// var enable bool +// if resp.Count > 0 && string(resp.Kvs[0].Value) == etcd.Enable { +// enable = true +// } +// apiresp.GinSuccess(c, &apistruct.GetEnableConfigManagerResp{Enable: enable}) +//} diff --git a/internal/api/router.go b/internal/api/router.go index d23e84bee..9b3fac24f 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -2,6 +2,15 @@ package api import ( "context" + "fmt" + "github.com/openimsdk/open-im-server/v3/pkg/rpcli" + pbAuth "github.com/openimsdk/protocol/auth" + "github.com/openimsdk/protocol/conversation" + "github.com/openimsdk/protocol/group" + "github.com/openimsdk/protocol/msg" + "github.com/openimsdk/protocol/relation" + "github.com/openimsdk/protocol/third" + "github.com/openimsdk/protocol/user" "net/http" "strings" @@ -22,7 +31,6 @@ import ( "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mw" - clientv3 "go.etcd.io/etcd/client/v3" ) const ( @@ -47,8 +55,10 @@ func prommetricsGin() gin.HandlerFunc { } } -func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, cfg *Config) (*gin.Engine, error) { - authConn, err := client.GetConn(ctx, cfg.Discovery.RpcService.Auth) +func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, config *Config) (*gin.Engine, error) { + client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) + authConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Auth) if err != nil { return nil, err } diff --git a/pkg/common/discoveryregister/discoveryregister.go b/pkg/common/discoveryregister/discoveryregister.go index 559c937c1..5731ffcdf 100644 --- a/pkg/common/discoveryregister/discoveryregister.go +++ b/pkg/common/discoveryregister/discoveryregister.go @@ -42,6 +42,7 @@ func NewDiscoveryRegister(discovery *config.Discovery, share *config.Share) (dis return etcd.NewSvcDiscoveryRegistry( discovery.Etcd.RootDirectory, discovery.Etcd.Address, + nil, etcd.WithDialTimeout(10*time.Second), etcd.WithMaxCallSendMsgSize(20*1024*1024), etcd.WithUsernameAndPassword(discovery.Etcd.Username, discovery.Etcd.Password))