diff --git a/go.mod b/go.mod index 065f719a1..fb839f0b5 100644 --- a/go.mod +++ b/go.mod @@ -12,8 +12,8 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.72-alpha.70 - github.com/openimsdk/tools v0.0.50-alpha.63 + github.com/openimsdk/protocol v0.0.72-alpha.71 + github.com/openimsdk/tools v0.0.50-alpha.65 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 diff --git a/go.sum b/go.sum index bfe6d2928..160111163 100644 --- a/go.sum +++ b/go.sum @@ -317,12 +317,12 @@ github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= -github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= -github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.72-alpha.70 h1:j7vB81+rTthijRda2b8tlli9oWvPxr4yXHwZ8nPZIBQ= -github.com/openimsdk/protocol v0.0.72-alpha.70/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M= -github.com/openimsdk/tools v0.0.50-alpha.63 h1:dPoVvg4KWqYX/xtK3j96TwX2A/4jwT5S5XIHvSM9hTY= -github.com/openimsdk/tools v0.0.50-alpha.63/go.mod h1:B+oqV0zdewN7OiEHYJm+hW+8/Te7B8tHHgD8rK5ZLZk= +github.com/openimsdk/gomake v0.0.15-alpha.2 h1:5Q8yl8ezy2yx+q8/ucU/t4kJnDfCzNOrkXcDACCqtyM= +github.com/openimsdk/gomake v0.0.15-alpha.2/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= +github.com/openimsdk/protocol v0.0.72-alpha.71 h1:R3utzOlqepaJWTAmnfJi4ccUM/XIoFasSyjQMOipM70= +github.com/openimsdk/protocol v0.0.72-alpha.71/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= +github.com/openimsdk/tools v0.0.50-alpha.65 h1:BRtxkyWxDWPHuHphSwEyHZj7kJSR98am/fHOH84naK8= +github.com/openimsdk/tools v0.0.50-alpha.65/go.mod h1:B+oqV0zdewN7OiEHYJm+hW+8/Te7B8tHHgD8rK5ZLZk= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= diff --git a/internal/api/config_manager.go b/internal/api/config_manager.go new file mode 100644 index 000000000..c61b2cb0b --- /dev/null +++ b/internal/api/config_manager.go @@ -0,0 +1,312 @@ +package api + +import ( + "encoding/json" + "reflect" + "strconv" + "time" + + "github.com/gin-gonic/gin" + "github.com/openimsdk/open-im-server/v3/pkg/apistruct" + "github.com/openimsdk/open-im-server/v3/pkg/authverify" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd" + "github.com/openimsdk/open-im-server/v3/version" + "github.com/openimsdk/tools/apiresp" + "github.com/openimsdk/tools/errs" + "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/utils/runtimeenv" + clientv3 "go.etcd.io/etcd/client/v3" +) + +const ( + // wait for Restart http call return + waitHttp = time.Millisecond * 200 +) + +type ConfigManager struct { + imAdminUserID []string + config *config.AllConfig + client *clientv3.Client + + configPath string + runtimeEnv string +} + +func NewConfigManager(IMAdminUserID []string, cfg *config.AllConfig, client *clientv3.Client, configPath string, runtimeEnv string) *ConfigManager { + cm := &ConfigManager{ + imAdminUserID: IMAdminUserID, + config: cfg, + client: client, + configPath: configPath, + runtimeEnv: runtimeEnv, + } + return cm +} + +func (cm *ConfigManager) CheckAdmin(c *gin.Context) { + if err := authverify.CheckAdmin(c, cm.imAdminUserID); err != nil { + apiresp.GinError(c, err) + c.Abort() + } +} + +func (cm *ConfigManager) GetConfig(c *gin.Context) { + var req apistruct.GetConfigReq + if err := c.BindJSON(&req); err != nil { + apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) + return + } + conf := cm.config.Name2Config(req.ConfigName) + if conf == nil { + apiresp.GinError(c, errs.ErrArgs.WithDetail("config name not found").Wrap()) + return + } + b, err := json.Marshal(conf) + if err != nil { + apiresp.GinError(c, err) + return + } + apiresp.GinSuccess(c, string(b)) +} + +func (cm *ConfigManager) GetConfigList(c *gin.Context) { + var resp apistruct.GetConfigListResp + resp.ConfigNames = cm.config.GetConfigNames() + resp.Environment = runtimeenv.PrintRuntimeEnvironment() + resp.Version = version.Version + + apiresp.GinSuccess(c, resp) +} + +func (cm *ConfigManager) SetConfig(c *gin.Context) { + if cm.config.Discovery.Enable != config.ETCD { + apiresp.GinError(c, errs.New("only etcd support set config").Wrap()) + return + } + var req apistruct.SetConfigReq + if err := c.BindJSON(&req); err != nil { + apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) + return + } + var err error + switch req.ConfigName { + case cm.config.Discovery.GetConfigFileName(): + err = compareAndSave[config.Discovery](c, cm.config.Name2Config(req.ConfigName), &req, cm) + case cm.config.Kafka.GetConfigFileName(): + err = compareAndSave[config.Kafka](c, cm.config.Name2Config(req.ConfigName), &req, cm) + case cm.config.LocalCache.GetConfigFileName(): + err = compareAndSave[config.LocalCache](c, cm.config.Name2Config(req.ConfigName), &req, cm) + case cm.config.Log.GetConfigFileName(): + err = compareAndSave[config.Log](c, cm.config.Name2Config(req.ConfigName), &req, cm) + case cm.config.Minio.GetConfigFileName(): + err = compareAndSave[config.Minio](c, cm.config.Name2Config(req.ConfigName), &req, cm) + case cm.config.Mongo.GetConfigFileName(): + err = compareAndSave[config.Mongo](c, cm.config.Name2Config(req.ConfigName), &req, cm) + case cm.config.Notification.GetConfigFileName(): + err = compareAndSave[config.Notification](c, cm.config.Name2Config(req.ConfigName), &req, cm) + case cm.config.API.GetConfigFileName(): + err = compareAndSave[config.API](c, cm.config.Name2Config(req.ConfigName), &req, cm) + case cm.config.CronTask.GetConfigFileName(): + err = compareAndSave[config.CronTask](c, cm.config.Name2Config(req.ConfigName), &req, cm) + case cm.config.MsgGateway.GetConfigFileName(): + err = compareAndSave[config.MsgGateway](c, cm.config.Name2Config(req.ConfigName), &req, cm) + case cm.config.MsgTransfer.GetConfigFileName(): + err = compareAndSave[config.MsgTransfer](c, cm.config.Name2Config(req.ConfigName), &req, cm) + case cm.config.Push.GetConfigFileName(): + err = compareAndSave[config.Push](c, cm.config.Name2Config(req.ConfigName), &req, cm) + case cm.config.Auth.GetConfigFileName(): + err = compareAndSave[config.Auth](c, cm.config.Name2Config(req.ConfigName), &req, cm) + case cm.config.Conversation.GetConfigFileName(): + err = compareAndSave[config.Conversation](c, cm.config.Name2Config(req.ConfigName), &req, cm) + case cm.config.Friend.GetConfigFileName(): + err = compareAndSave[config.Friend](c, cm.config.Name2Config(req.ConfigName), &req, cm) + case cm.config.Group.GetConfigFileName(): + err = compareAndSave[config.Group](c, cm.config.Name2Config(req.ConfigName), &req, cm) + case cm.config.Msg.GetConfigFileName(): + err = compareAndSave[config.Msg](c, cm.config.Name2Config(req.ConfigName), &req, cm) + case cm.config.Third.GetConfigFileName(): + err = compareAndSave[config.Third](c, cm.config.Name2Config(req.ConfigName), &req, cm) + case cm.config.User.GetConfigFileName(): + err = compareAndSave[config.User](c, cm.config.Name2Config(req.ConfigName), &req, cm) + case cm.config.Redis.GetConfigFileName(): + err = compareAndSave[config.Redis](c, cm.config.Name2Config(req.ConfigName), &req, cm) + case cm.config.Share.GetConfigFileName(): + err = compareAndSave[config.Share](c, cm.config.Name2Config(req.ConfigName), &req, cm) + case cm.config.Webhooks.GetConfigFileName(): + err = compareAndSave[config.Webhooks](c, cm.config.Name2Config(req.ConfigName), &req, cm) + default: + apiresp.GinError(c, errs.ErrArgs.Wrap()) + return + } + if err != nil { + apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) + return + } + apiresp.GinSuccess(c, nil) +} + +func compareAndSave[T any](c *gin.Context, old any, req *apistruct.SetConfigReq, cm *ConfigManager) error { + conf := new(T) + err := json.Unmarshal([]byte(req.Data), &conf) + if err != nil { + return errs.ErrArgs.WithDetail(err.Error()).Wrap() + } + eq := reflect.DeepEqual(old, conf) + if eq { + return nil + } + data, err := json.Marshal(conf) + if err != nil { + return errs.ErrArgs.WithDetail(err.Error()).Wrap() + } + _, err = cm.client.Put(c, etcd.BuildKey(req.ConfigName), string(data)) + if err != nil { + return errs.WrapMsg(err, "save to etcd failed") + } + return nil +} + +func (cm *ConfigManager) ResetConfig(c *gin.Context) { + go func() { + if err := cm.resetConfig(c, true); err != nil { + log.ZError(c, "reset config err", err) + } + }() + apiresp.GinSuccess(c, nil) +} + +func (cm *ConfigManager) resetConfig(c *gin.Context, checkChange bool, ops ...clientv3.Op) error { + txn := cm.client.Txn(c) + type initConf struct { + old any + new any + } + configMap := map[string]*initConf{ + cm.config.Discovery.GetConfigFileName(): {old: &cm.config.Discovery, new: new(config.Discovery)}, + cm.config.Kafka.GetConfigFileName(): {old: &cm.config.Kafka, new: new(config.Kafka)}, + cm.config.LocalCache.GetConfigFileName(): {old: &cm.config.LocalCache, new: new(config.LocalCache)}, + cm.config.Log.GetConfigFileName(): {old: &cm.config.Log, new: new(config.Log)}, + cm.config.Minio.GetConfigFileName(): {old: &cm.config.Minio, new: new(config.Minio)}, + cm.config.Mongo.GetConfigFileName(): {old: &cm.config.Mongo, new: new(config.Mongo)}, + cm.config.Notification.GetConfigFileName(): {old: &cm.config.Notification, new: new(config.Notification)}, + cm.config.API.GetConfigFileName(): {old: &cm.config.API, new: new(config.API)}, + cm.config.CronTask.GetConfigFileName(): {old: &cm.config.CronTask, new: new(config.CronTask)}, + cm.config.MsgGateway.GetConfigFileName(): {old: &cm.config.MsgGateway, new: new(config.MsgGateway)}, + cm.config.MsgTransfer.GetConfigFileName(): {old: &cm.config.MsgTransfer, new: new(config.MsgTransfer)}, + cm.config.Push.GetConfigFileName(): {old: &cm.config.Push, new: new(config.Push)}, + cm.config.Auth.GetConfigFileName(): {old: &cm.config.Auth, new: new(config.Auth)}, + cm.config.Conversation.GetConfigFileName(): {old: &cm.config.Conversation, new: new(config.Conversation)}, + cm.config.Friend.GetConfigFileName(): {old: &cm.config.Friend, new: new(config.Friend)}, + cm.config.Group.GetConfigFileName(): {old: &cm.config.Group, new: new(config.Group)}, + cm.config.Msg.GetConfigFileName(): {old: &cm.config.Msg, new: new(config.Msg)}, + cm.config.Third.GetConfigFileName(): {old: &cm.config.Third, new: new(config.Third)}, + cm.config.User.GetConfigFileName(): {old: &cm.config.User, new: new(config.User)}, + cm.config.Redis.GetConfigFileName(): {old: &cm.config.Redis, new: new(config.Redis)}, + cm.config.Share.GetConfigFileName(): {old: &cm.config.Share, new: new(config.Share)}, + cm.config.Webhooks.GetConfigFileName(): {old: &cm.config.Webhooks, new: new(config.Webhooks)}, + } + + changedKeys := make([]string, 0, len(configMap)) + for k, v := range configMap { + err := config.Load( + cm.configPath, + k, + config.EnvPrefixMap[k], + cm.runtimeEnv, + v.new, + ) + if err != nil { + log.ZError(c, "load config failed", err) + continue + } + equal := reflect.DeepEqual(v.old, v.new) + if !checkChange || !equal { + changedKeys = append(changedKeys, k) + } + } + + for _, k := range changedKeys { + data, err := json.Marshal(configMap[k].new) + if err != nil { + log.ZError(c, "marshal config failed", err) + continue + } + ops = append(ops, clientv3.OpPut(etcd.BuildKey(k), string(data))) + } + if len(ops) > 0 { + txn.Then(ops...) + _, err := txn.Commit() + if err != nil { + return errs.WrapMsg(err, "commit etcd txn failed") + } + } + return nil +} + +func (cm *ConfigManager) Restart(c *gin.Context) { + go cm.restart(c) + apiresp.GinSuccess(c, nil) +} + +func (cm *ConfigManager) restart(c *gin.Context) { + time.Sleep(waitHttp) // wait for Restart http call return + t := time.Now().Unix() + _, err := cm.client.Put(c, etcd.BuildKey(etcd.RestartKey), strconv.Itoa(int(t))) + if err != nil { + log.ZError(c, "restart etcd put key failed", err) + } +} + +func (cm *ConfigManager) SetEnableConfigManager(c *gin.Context) { + if cm.config.Discovery.Enable != config.ETCD { + apiresp.GinError(c, errs.New("only etcd support config manager").Wrap()) + return + } + var req apistruct.SetEnableConfigManagerReq + if err := c.BindJSON(&req); err != nil { + apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) + return + } + var enableStr string + if req.Enable { + enableStr = etcd.Enable + } else { + enableStr = etcd.Disable + } + resp, err := cm.client.Get(c, etcd.BuildKey(etcd.EnableConfigCenterKey)) + if err != nil { + apiresp.GinError(c, errs.WrapMsg(err, "getEnableConfigManager failed")) + return + } + if !(resp.Count > 0 && string(resp.Kvs[0].Value) == etcd.Enable) && req.Enable { + go func() { + time.Sleep(waitHttp) // wait for Restart http call return + err := cm.resetConfig(c, false, clientv3.OpPut(etcd.BuildKey(etcd.EnableConfigCenterKey), enableStr)) + if err != nil { + log.ZError(c, "resetConfig failed", err) + } + }() + } else { + _, err = cm.client.Put(c, etcd.BuildKey(etcd.EnableConfigCenterKey), enableStr) + if err != nil { + apiresp.GinError(c, errs.WrapMsg(err, "setEnableConfigManager failed")) + return + } + } + + apiresp.GinSuccess(c, nil) +} + +func (cm *ConfigManager) GetEnableConfigManager(c *gin.Context) { + resp, err := cm.client.Get(c, etcd.BuildKey(etcd.EnableConfigCenterKey)) + if err != nil { + apiresp.GinError(c, errs.WrapMsg(err, "getEnableConfigManager failed")) + return + } + var enable bool + if resp.Count > 0 && string(resp.Kvs[0].Value) == etcd.Enable { + enable = true + } + apiresp.GinSuccess(c, &apistruct.GetEnableConfigManagerResp{Enable: enable}) +} diff --git a/internal/api/router.go b/internal/api/router.go index 9b3fac24f..d23e84bee 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -2,15 +2,6 @@ package api import ( "context" - "fmt" - "github.com/openimsdk/open-im-server/v3/pkg/rpcli" - pbAuth "github.com/openimsdk/protocol/auth" - "github.com/openimsdk/protocol/conversation" - "github.com/openimsdk/protocol/group" - "github.com/openimsdk/protocol/msg" - "github.com/openimsdk/protocol/relation" - "github.com/openimsdk/protocol/third" - "github.com/openimsdk/protocol/user" "net/http" "strings" @@ -31,6 +22,7 @@ import ( "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mw" + clientv3 "go.etcd.io/etcd/client/v3" ) const ( @@ -55,10 +47,8 @@ func prommetricsGin() gin.HandlerFunc { } } -func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, config *Config) (*gin.Engine, error) { - client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) - authConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Auth) +func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, cfg *Config) (*gin.Engine, error) { + authConn, err := client.GetConn(ctx, cfg.Discovery.RpcService.Auth) if err != nil { return nil, err } diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index 9df012e86..2bf08783d 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -17,6 +17,7 @@ package auth import ( "context" "errors" + "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "github.com/openimsdk/open-im-server/v3/pkg/common/config" @@ -118,9 +119,13 @@ func (s *authServer) GetUserToken(ctx context.Context, req *pbauth.GetUserTokenR if authverify.IsManagerUserID(req.UserID, s.config.Share.IMAdminUserID) { return nil, errs.ErrNoPermission.WrapMsg("don't get Admin token") } - if err := s.userClient.CheckUser(ctx, []string{req.UserID}); err != nil { + user, err := s.userClient.GetUserInfo(ctx, req.UserID) + if err != nil { return nil, err } + if user.AppMangerLevel >= constant.AppNotificationAdmin { + return nil, errs.ErrArgs.WrapMsg("app account can`t get token") + } token, err := s.authDatabase.CreateToken(ctx, req.UserID, int(req.PlatformID)) if err != nil { return nil, err diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index 02b3dc81d..f0788215d 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -17,7 +17,6 @@ package user import ( "context" "errors" - "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "math/rand" "strings" "sync" @@ -32,6 +31,7 @@ import ( tablerelation "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" "github.com/openimsdk/open-im-server/v3/pkg/localcache" + "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "github.com/openimsdk/protocol/group" friendpb "github.com/openimsdk/protocol/relation" "github.com/openimsdk/tools/db/redisutil" @@ -480,7 +480,9 @@ func (s *userServer) AddNotificationAccount(ctx context.Context, req *pbuser.Add if err := authverify.CheckAdmin(ctx, s.config.Share.IMAdminUserID); err != nil { return nil, err } - + if req.AppMangerLevel < constant.AppNotificationAdmin { + return nil, errs.ErrArgs.WithDetail("app level not supported") + } if req.UserID == "" { for i := 0; i < 20; i++ { userId := s.genUserID() @@ -506,16 +508,17 @@ func (s *userServer) AddNotificationAccount(ctx context.Context, req *pbuser.Add Nickname: req.NickName, FaceURL: req.FaceURL, CreateTime: time.Now(), - AppMangerLevel: constant.AppNotificationAdmin, + AppMangerLevel: req.AppMangerLevel, } if err := s.db.Create(ctx, []*tablerelation.User{user}); err != nil { return nil, err } return &pbuser.AddNotificationAccountResp{ - UserID: req.UserID, - NickName: req.NickName, - FaceURL: req.FaceURL, + UserID: req.UserID, + NickName: req.NickName, + FaceURL: req.FaceURL, + AppMangerLevel: req.AppMangerLevel, }, nil } @@ -595,8 +598,13 @@ func (s *userServer) GetNotificationAccount(ctx context.Context, req *pbuser.Get if err != nil { return nil, servererrs.ErrUserIDNotFound.Wrap() } - if user.AppMangerLevel == constant.AppAdmin || user.AppMangerLevel == constant.AppNotificationAdmin { - return &pbuser.GetNotificationAccountResp{}, nil + if user.AppMangerLevel == constant.AppAdmin || user.AppMangerLevel >= constant.AppNotificationAdmin { + return &pbuser.GetNotificationAccountResp{Account: &pbuser.NotificationAccountInfo{ + UserID: user.UserID, + FaceURL: user.FaceURL, + NickName: user.Nickname, + AppMangerLevel: user.AppMangerLevel, + }}, nil } return nil, errs.ErrNoPermission.WrapMsg("notification messages cannot be sent for this ID") @@ -621,11 +629,12 @@ func (s *userServer) userModelToResp(users []*tablerelation.User, pagination pag accounts := make([]*pbuser.NotificationAccountInfo, 0) var total int64 for _, v := range users { - if v.AppMangerLevel == constant.AppNotificationAdmin && !datautil.Contain(v.UserID, s.config.Share.IMAdminUserID...) { + if v.AppMangerLevel >= constant.AppNotificationAdmin && !datautil.Contain(v.UserID, s.config.Share.IMAdminUserID...) { temp := &pbuser.NotificationAccountInfo{ - UserID: v.UserID, - FaceURL: v.FaceURL, - NickName: v.Nickname, + UserID: v.UserID, + FaceURL: v.FaceURL, + NickName: v.Nickname, + AppMangerLevel: v.AppMangerLevel, } accounts = append(accounts, temp) total += 1