diff --git a/cmd/openim-api/main.go b/cmd/openim-api/main.go index 8f5113a93..e29ed2a59 100644 --- a/cmd/openim-api/main.go +++ b/cmd/openim-api/main.go @@ -22,10 +22,7 @@ import ( ) func main() { - apiCmd := cmd.NewApiCmd(cmd.ApiServer) - //apiCmd.AddPortFlag() - //apiCmd.AddPrometheusPortFlag() - if err := apiCmd.Execute(); err != nil { + if err := cmd.NewApiCmd().Exec(); err != nil { program.ExitWithError(err) } } diff --git a/cmd/openim-crontask/main.go b/cmd/openim-crontask/main.go index 8b63968c5..674506518 100644 --- a/cmd/openim-crontask/main.go +++ b/cmd/openim-crontask/main.go @@ -20,8 +20,7 @@ import ( ) func main() { - cronTaskCmd := cmd.NewCronTaskCmd(cmd.CronTaskServer) - if err := cronTaskCmd.Exec(); err != nil { + if err := cmd.NewCronTaskCmd().Exec(); err != nil { program.ExitWithError(err) } } diff --git a/internal/api/msg.go b/internal/api/msg.go index 42d5d8775..ee1bd98f5 100644 --- a/internal/api/msg.go +++ b/internal/api/msg.go @@ -40,15 +40,13 @@ type MessageApi struct { *rpcclient.Message validate *validator.Validate userRpcClient *rpcclient.UserRpcClient - manager *config.Manager imAdmin *config.IMAdmin } -func NewMessageApi(msgRpcClient *rpcclient.Message, userRpcClient *rpcclient.User, manager *config.Manager, +func NewMessageApi(msgRpcClient *rpcclient.Message, userRpcClient *rpcclient.User, imAdmin *config.IMAdmin) MessageApi { return MessageApi{Message: msgRpcClient, validate: validator.New(), - userRpcClient: rpcclient.NewUserRpcClientByUser(userRpcClient), - manager: manager, imAdmin: imAdmin} + userRpcClient: rpcclient.NewUserRpcClientByUser(userRpcClient), imAdmin: imAdmin} } func (MessageApi) SetOptions(options map[string]bool, value bool) { @@ -206,7 +204,7 @@ func (m *MessageApi) SendMessage(c *gin.Context) { } // Check if the user has the app manager role. - if !authverify.IsAppManagerUid(c, m.manager, m.imAdmin) { + if !authverify.IsAppManagerUid(c, m.imAdmin) { // Respond with a permission error if the user is not an app manager. apiresp.GinError(c, errs.ErrNoPermission.WrapMsg("only app manager can send message")) return @@ -261,7 +259,7 @@ func (m *MessageApi) SendBusinessNotification(c *gin.Context) { return } - if !authverify.IsAppManagerUid(c, m.manager, m.imAdmin) { + if !authverify.IsAppManagerUid(c, m.imAdmin) { apiresp.GinError(c, errs.ErrNoPermission.WrapMsg("only app manager can send message")) return } @@ -280,7 +278,7 @@ func (m *MessageApi) SendBusinessNotification(c *gin.Context) { SessionType: constant.SingleChatType, CreateTime: timeutil.GetCurrentTimestampByMill(), ClientMsgID: idutil.GetMsgIDByMD5(mcontext.GetOpUserID(c)), - Options: config.GetOptionsByNotification(config.NotificationConf{ + Options: config.GetOptionsByNotification(config.NotificationConfig{ IsSendMsg: false, ReliabilityLevel: 1, UnreadCount: false, @@ -304,7 +302,7 @@ func (m *MessageApi) BatchSendMsg(c *gin.Context) { apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) return } - if err := authverify.CheckAdmin(c, m.manager, m.imAdmin); err != nil { + if err := authverify.CheckAdmin(c, m.imAdmin); err != nil { apiresp.GinError(c, errs.ErrNoPermission.WrapMsg("only app manager can send message")) return } diff --git a/internal/api/route.go b/internal/api/route.go index 766a199b9..374ccbb48 100644 --- a/internal/api/route.go +++ b/internal/api/route.go @@ -17,7 +17,10 @@ package api import ( "context" "fmt" + "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/tools/db/redisutil" + "github.com/openimsdk/tools/utils/datautil" + "github.com/openimsdk/tools/utils/network" "net" "net/http" "os" @@ -51,11 +54,16 @@ import ( "google.golang.org/grpc/credentials/insecure" ) -func Start(ctx context.Context, config *config.GlobalConfig, port int, proPort int) error { - if port == 0 || proPort == 0 { - return errs.New("port or proPort is empty", "port", port, "proPort", proPort).Wrap() +func Start(ctx context.Context, index int, config *cmd.ApiConfig) error { + apiPort, err := datautil.GetElemByIndex(config.RpcConfig.Api.Ports, index) + if err != nil { + return err } - rdb, err := redisutil.NewRedisClient(ctx, config.Redis.Build()) + prometheusPort, err := datautil.GetElemByIndex(config.RpcConfig.Prometheus.Ports, index) + if err != nil { + return err + } + rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build()) if err != nil { return err } @@ -63,46 +71,36 @@ func Start(ctx context.Context, config *config.GlobalConfig, port int, proPort i var client discovery.SvcDiscoveryRegistry // Determine whether zk is passed according to whether it is a clustered deployment - client, err = kdisc.NewDiscoveryRegister(config) + client, err = kdisc.NewDiscoveryRegister(&config.ZookeeperConfig) if err != nil { return errs.WrapMsg(err, "failed to register discovery service") } - if err = client.CreateRpcRootNodes(config.GetServiceNames()); err != nil { + if err = client.CreateRpcRootNodes(config.Share.RpcRegisterName.GetServiceNames()); err != nil { return errs.WrapMsg(err, "failed to create RPC root nodes") } - if err = client.RegisterConf2Registry(constant.OpenIMCommonConfigKey, config.EncodeConfig()); err != nil { - return errs.WrapMsg(err, "failed to register configuration to registry") - } - var ( netDone = make(chan struct{}, 1) netErr error ) router := newGinRouter(client, rdb, config) - if config.Prometheus.Enable { + if config.RpcConfig.Prometheus.Enable { go func() { p := ginprom.NewPrometheus("app", prommetrics.GetGinCusMetrics("Api")) - p.SetListenAddress(fmt.Sprintf(":%d", proPort)) + p.SetListenAddress(fmt.Sprintf(":%d", prometheusPort)) if err = p.Use(router); err != nil && err != http.ErrServerClosed { - netErr = errs.WrapMsg(err, fmt.Sprintf("prometheus start err: %d", proPort)) + netErr = errs.WrapMsg(err, fmt.Sprintf("prometheus start err: %d", prometheusPort)) netDone <- struct{}{} } }() } - - var address string - if config.Api.ListenIP != "" { - address = net.JoinHostPort(config.Api.ListenIP, strconv.Itoa(port)) - } else { - address = net.JoinHostPort("0.0.0.0", strconv.Itoa(port)) - } + address := net.JoinHostPort(network.GetListenIP(config.RpcConfig.Api.ListenIP), strconv.Itoa(apiPort)) server := http.Server{Addr: address, Handler: router} - log.CInfo(ctx, "API server is initializing", "address", address, "apiPort", port, "prometheusPort", proPort) + log.CInfo(ctx, "API server is initializing", "address", address, "apiPort", apiPort, "prometheusPort", prometheusPort) go func() { err = server.ListenAndServe() if err != nil && err != http.ErrServerClosed { @@ -131,8 +129,9 @@ func Start(ctx context.Context, config *config.GlobalConfig, port int, proPort i return nil } -func newGinRouter(disCov discovery.SvcDiscoveryRegistry, rdb redis.UniversalClient, config *config.GlobalConfig) *gin.Engine { - disCov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) +func newGinRouter(disCov discovery.SvcDiscoveryRegistry, rdb redis.UniversalClient, config *cmd.ApiConfig) *gin.Engine { + disCov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) gin.SetMode(gin.ReleaseMode) r := gin.New() if v, ok := binding.Validator.Engine().(*validator.Validate); ok { @@ -140,17 +139,17 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, rdb redis.UniversalClie } r.Use(gin.Recovery(), mw.CorsHandler(), mw.GinParseOperationID()) // init rpc client here - userRpc := rpcclient.NewUser(disCov, config.RpcRegisterName.OpenImUserName, config.RpcRegisterName.OpenImMessageGatewayName, - &config.Manager, &config.IMAdmin) - groupRpc := rpcclient.NewGroup(disCov, config.RpcRegisterName.OpenImGroupName) - friendRpc := rpcclient.NewFriend(disCov, config.RpcRegisterName.OpenImFriendName) - messageRpc := rpcclient.NewMessage(disCov, config.RpcRegisterName.OpenImMsgName) - conversationRpc := rpcclient.NewConversation(disCov, config.RpcRegisterName.OpenImConversationName) - authRpc := rpcclient.NewAuth(disCov, config.RpcRegisterName.OpenImAuthName) - thirdRpc := rpcclient.NewThird(disCov, config.RpcRegisterName.OpenImThirdName, config.Prometheus.GrafanaUrl) + userRpc := rpcclient.NewUser(disCov, config.Share.RpcRegisterName.User, config.Share.RpcRegisterName.MessageGateway, + &config.Share.IMAdmin) + groupRpc := rpcclient.NewGroup(disCov, config.Share.RpcRegisterName.Group) + friendRpc := rpcclient.NewFriend(disCov, config.Share.RpcRegisterName.Friend) + messageRpc := rpcclient.NewMessage(disCov, config.Share.RpcRegisterName.Msg) + conversationRpc := rpcclient.NewConversation(disCov, config.Share.RpcRegisterName.Conversation) + authRpc := rpcclient.NewAuth(disCov, config.Share.RpcRegisterName.Auth) + thirdRpc := rpcclient.NewThird(disCov, config.Share.RpcRegisterName.Third, config.RpcConfig.Prometheus.GrafanaURL) u := NewUserApi(*userRpc) - m := NewMessageApi(messageRpc, userRpc, &config.Manager, &config.IMAdmin) + m := NewMessageApi(messageRpc, userRpc, &config.Share.IMAdmin) ParseToken := GinParseToken(rdb, config) userRouterGroup := r.Group("/user") { diff --git a/pkg/common/cmd/api.go b/pkg/common/cmd/api.go index a0a297211..b94fe1a79 100644 --- a/pkg/common/cmd/api.go +++ b/pkg/common/cmd/api.go @@ -16,46 +16,49 @@ package cmd import ( "context" - "github.com/openimsdk/open-im-server/v3/internal/api" - config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/protocol/constant" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/tools/system/program" "github.com/spf13/cobra" ) type ApiCmd struct { *RootCmd - ctx context.Context + ctx context.Context + configMap map[string]StructEnvPrefix + apiConfig ApiConfig +} +type ApiConfig struct { + RpcConfig config.API + RedisConfig config.Redis + MongodbConfig config.Mongo + ZookeeperConfig config.ZooKeeper + NotificationConfig config.Notification + Share config.Share + MinioConfig config.Minio } -func NewApiCmd(name string) *ApiCmd { - ret := &ApiCmd{RootCmd: NewRootCmd(program.GetProcessName(), name)} - ret.ctx = context.WithValue(context.Background(), "version", config2.Version) - ret.SetRootCmdPt(ret) - ret.addPreRun() - ret.addRunE() +func NewApiCmd() *ApiCmd { + var apiConfig ApiConfig + ret := &ApiCmd{apiConfig: apiConfig} + ret.configMap = map[string]StructEnvPrefix{ + OpenIMAPICfgFileName: {EnvPrefix: apiEnvPrefix, ConfigStruct: &apiConfig.RpcConfig}, + RedisConfigFileName: {EnvPrefix: redisEnvPrefix, ConfigStruct: &apiConfig.RedisConfig}, + ZookeeperConfigFileName: {EnvPrefix: zoopkeeperEnvPrefix, ConfigStruct: &apiConfig.ZookeeperConfig}, + ShareFileName: {EnvPrefix: shareEnvPrefix, ConfigStruct: &apiConfig.Share}, + } + ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) + ret.ctx = context.WithValue(context.Background(), "version", config.Version) + ret.Command.PreRunE = func(cmd *cobra.Command, args []string) error { + return ret.preRunE() + } return ret } -func (a *ApiCmd) addPreRun() { - a.Command.PreRun = func(cmd *cobra.Command, args []string) { - a.port = a.getPortFlag(cmd) - a.prometheusPort = a.getPrometheusPortFlag(cmd) - } +func (a *ApiCmd) Exec() error { + return a.Execute() } -func (a *ApiCmd) addRunE() { - a.Command.RunE = func(cmd *cobra.Command, args []string) error { - return api.Start(a.ctx, a.config, a.port, a.prometheusPort) - } -} - -func (a *ApiCmd) GetPortFromConfig(portType string) int { - if portType == constant.FlagPort { - return a.config.Api.OpenImApiPort[0] - } else if portType == constant.FlagPrometheusPort { - return a.config.Prometheus.ApiPrometheusPort[0] - } - return 0 +func (a *ApiCmd) preRunE() error { + return api.Start(a.ctx, a.Index(), &a.apiConfig) } diff --git a/pkg/common/cmd/constant.go b/pkg/common/cmd/constant.go index 28836f3ec..935d24538 100644 --- a/pkg/common/cmd/constant.go +++ b/pkg/common/cmd/constant.go @@ -63,6 +63,7 @@ const ( minioEnvPrefix = "openim-minio" kafkaEnvPrefix = "openim-kafka" zoopkeeperEnvPrefix = "openim-zookeeper" + apiEnvPrefix = "openim-api" authEnvPrefix = "openim-auth" conversationEnvPrefix = "openim-conversation" friendEnvPrefix = "openim-friend" diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 86f592db2..5057dc105 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -325,20 +325,36 @@ type WebhookConfig struct { } type Share struct { - Env string `mapstructure:"env"` - RpcRegisterName struct { - User string `mapstructure:"user"` - Friend string `mapstructure:"friend"` - Msg string `mapstructure:"msg"` - Push string `mapstructure:"push"` - MessageGateway string `mapstructure:"messageGateway"` - Group string `mapstructure:"group"` - Auth string `mapstructure:"auth"` - Conversation string `mapstructure:"conversation"` - Third string `mapstructure:"third"` - } `mapstructure:"rpcRegisterName"` - IMAdmin IMAdmin `mapstructure:"imAdmin"` + Env string `mapstructure:"env"` + RpcRegisterName RpcRegisterName `mapstructure:"rpcRegisterName"` + IMAdmin IMAdmin `mapstructure:"imAdmin"` } +type RpcRegisterName struct { + User string `mapstructure:"user"` + Friend string `mapstructure:"friend"` + Msg string `mapstructure:"msg"` + Push string `mapstructure:"push"` + MessageGateway string `mapstructure:"messageGateway"` + Group string `mapstructure:"group"` + Auth string `mapstructure:"auth"` + Conversation string `mapstructure:"conversation"` + Third string `mapstructure:"third"` +} + +func (r *RpcRegisterName) GetServiceNames() []string { + return []string{ + r.User, + r.Friend, + r.Msg, + r.Push, + r.MessageGateway, + r.Group, + r.Auth, + r.Conversation, + r.Third, + } +} + type IMAdmin struct { UserID []string `mapstructure:"userID"` Nickname []string `mapstructure:"nickname"` diff --git a/pkg/common/discoveryregister/discoveryregister.go b/pkg/common/discoveryregister/discoveryregister.go index 1c6cb973a..25077a387 100644 --- a/pkg/common/discoveryregister/discoveryregister.go +++ b/pkg/common/discoveryregister/discoveryregister.go @@ -16,14 +16,19 @@ package discoveryregister import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/direct" "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/kubernetes" - "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/zookeeper" "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/errs" "time" ) +const ( + zookeeper = "zoopkeeper" + kubenetes = "k8s" + + direct = "direct" +) + // NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type. func NewDiscoveryRegister(zookeeperConfig *config.ZooKeeper) (discovery.SvcDiscoveryRegistry, error) { diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index e8f1ba414..c4861bd16 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -18,6 +18,7 @@ import ( "context" "fmt" config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/tools/utils/datautil" "github.com/prometheus/client_golang/prometheus" "net" "net/http" @@ -47,11 +48,11 @@ func Start[T any](ctx context.Context, zookeeperConfig *config2.ZooKeeper, prome registerIP string, rpcPorts []int, index int, rpcRegisterName string, config T, rpcFn func(ctx context.Context, config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption) error { - rpcPort, err := getElemByIndex(rpcPorts, index) + rpcPort, err := datautil.GetElemByIndex(rpcPorts, index) if err != nil { return err } - prometheusPort, err := getElemByIndex(prometheusConfig.Ports, index) + prometheusPort, err := datautil.GetElemByIndex(prometheusConfig.Ports, index) if err != nil { return err } @@ -173,11 +174,3 @@ func gracefulStopWithCtx(ctx context.Context, f func()) error { return nil } } - -func getElemByIndex(array []int, index int) (int, error) { - if index < 0 || index >= len(array) { - return 0, errs.New("index out of range", "index", index, "array", array).Wrap() - } - - return array[index], nil -}