diff --git a/internal/api/init.go b/internal/api/init.go new file mode 100644 index 000000000..dcce69a12 --- /dev/null +++ b/internal/api/init.go @@ -0,0 +1,118 @@ +// Copyright © 2023 OpenIM. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "context" + "fmt" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/tools/utils/datautil" + "github.com/openimsdk/tools/utils/network" + "net" + "net/http" + "os" + "os/signal" + "strconv" + "syscall" + "time" + + kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" + ginprom "github.com/openimsdk/open-im-server/v3/pkg/common/ginprometheus" + "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" + "github.com/openimsdk/tools/discovery" + "github.com/openimsdk/tools/errs" + "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/system/program" +) + +type Config struct { + RpcConfig config.API + MongodbConfig config.Mongo + ZookeeperConfig config.ZooKeeper + NotificationConfig config.Notification + Share config.Share + MinioConfig config.Minio +} + +func Start(ctx context.Context, index int, config *Config) error { + apiPort, err := datautil.GetElemByIndex(config.RpcConfig.Api.Ports, index) + if err != nil { + return err + } + prometheusPort, err := datautil.GetElemByIndex(config.RpcConfig.Prometheus.Ports, index) + if err != nil { + return err + } + + var client discovery.SvcDiscoveryRegistry + + // Determine whether zk is passed according to whether it is a clustered deployment + client, err = kdisc.NewDiscoveryRegister(&config.ZookeeperConfig, &config.Share) + if err != nil { + return errs.WrapMsg(err, "failed to register discovery service") + } + + if err = client.CreateRpcRootNodes(config.Share.RpcRegisterName.GetServiceNames()); err != nil { + return errs.WrapMsg(err, "failed to create RPC root nodes") + } + + var ( + netDone = make(chan struct{}, 1) + netErr error + ) + + router := newGinRouter(client, config) + if config.RpcConfig.Prometheus.Enable { + go func() { + p := ginprom.NewPrometheus("app", prommetrics.GetGinCusMetrics("Api")) + p.SetListenAddress(fmt.Sprintf(":%d", prometheusPort)) + if err = p.Use(router); err != nil && err != http.ErrServerClosed { + netErr = errs.WrapMsg(err, fmt.Sprintf("prometheus start err: %d", prometheusPort)) + netDone <- struct{}{} + } + }() + + } + address := net.JoinHostPort(network.GetListenIP(config.RpcConfig.Api.ListenIP), strconv.Itoa(apiPort)) + + server := http.Server{Addr: address, Handler: router} + log.CInfo(ctx, "API server is initializing", "address", address, "apiPort", apiPort, "prometheusPort", prometheusPort) + go func() { + err = server.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + netErr = errs.WrapMsg(err, fmt.Sprintf("api start err: %s", server.Addr)) + netDone <- struct{}{} + + } + }() + + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGTERM) + + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + select { + case <-sigs: + program.SIGTERMExit() + err := server.Shutdown(ctx) + if err != nil { + return errs.WrapMsg(err, "shutdown err") + } + case <-netDone: + close(netDone) + return netErr + } + return nil +} diff --git a/internal/api/route.go b/internal/api/router.go similarity index 73% rename from internal/api/route.go rename to internal/api/router.go index e66e4342a..bd2de99db 100644 --- a/internal/api/route.go +++ b/internal/api/router.go @@ -1,140 +1,23 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - package api import ( - "context" "fmt" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/tools/db/redisutil" - "github.com/openimsdk/tools/utils/datautil" - "github.com/openimsdk/tools/utils/network" - "net" - "net/http" - "os" - "os/signal" - "strconv" - "syscall" - "time" - "github.com/gin-gonic/gin" "github.com/gin-gonic/gin/binding" "github.com/go-playground/validator/v10" - kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" - ginprom "github.com/openimsdk/open-im-server/v3/pkg/common/ginprometheus" - "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/tools/apiresp" "github.com/openimsdk/tools/discovery" - "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mw" - "github.com/openimsdk/tools/system/program" - "github.com/redis/go-redis/v9" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "net/http" ) -type Config struct { - RpcConfig config.API - RedisConfig config.Redis - MongodbConfig config.Mongo - ZookeeperConfig config.ZooKeeper - NotificationConfig config.Notification - Share config.Share - MinioConfig config.Minio -} - -func Start(ctx context.Context, index int, config *Config) error { - apiPort, err := datautil.GetElemByIndex(config.RpcConfig.Api.Ports, index) - if err != nil { - return err - } - prometheusPort, err := datautil.GetElemByIndex(config.RpcConfig.Prometheus.Ports, index) - if err != nil { - return err - } - rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build()) - if err != nil { - return err - } - - var client discovery.SvcDiscoveryRegistry - - // Determine whether zk is passed according to whether it is a clustered deployment - client, err = kdisc.NewDiscoveryRegister(&config.ZookeeperConfig, &config.Share) - if err != nil { - return errs.WrapMsg(err, "failed to register discovery service") - } - - if err = client.CreateRpcRootNodes(config.Share.RpcRegisterName.GetServiceNames()); err != nil { - return errs.WrapMsg(err, "failed to create RPC root nodes") - } - - var ( - netDone = make(chan struct{}, 1) - netErr error - ) - - router := newGinRouter(client, rdb, config) - if config.RpcConfig.Prometheus.Enable { - go func() { - p := ginprom.NewPrometheus("app", prommetrics.GetGinCusMetrics("Api")) - p.SetListenAddress(fmt.Sprintf(":%d", prometheusPort)) - if err = p.Use(router); err != nil && err != http.ErrServerClosed { - netErr = errs.WrapMsg(err, fmt.Sprintf("prometheus start err: %d", prometheusPort)) - netDone <- struct{}{} - } - }() - - } - address := net.JoinHostPort(network.GetListenIP(config.RpcConfig.Api.ListenIP), strconv.Itoa(apiPort)) - - server := http.Server{Addr: address, Handler: router} - log.CInfo(ctx, "API server is initializing", "address", address, "apiPort", apiPort, "prometheusPort", prometheusPort) - go func() { - err = server.ListenAndServe() - if err != nil && err != http.ErrServerClosed { - netErr = errs.WrapMsg(err, fmt.Sprintf("api start err: %s", server.Addr)) - netDone <- struct{}{} - - } - }() - - sigs := make(chan os.Signal, 1) - signal.Notify(sigs, syscall.SIGTERM) - - ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) - defer cancel() - select { - case <-sigs: - program.SIGTERMExit() - err := server.Shutdown(ctx) - if err != nil { - return errs.WrapMsg(err, "shutdown err") - } - case <-netDone: - close(netDone) - return netErr - } - return nil -} - -func newGinRouter(disCov discovery.SvcDiscoveryRegistry, rdb redis.UniversalClient, config *Config) *gin.Engine { +func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.Engine { disCov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) gin.SetMode(gin.ReleaseMode) diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 78fbda6d3..a1f10b3b3 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -66,7 +66,8 @@ type Config struct { } func Start(ctx context.Context, index int, config *Config) error { - log.CInfo(ctx, "MSG-TRANSFER server is initializing", "prometheusPorts", config.MsgTransfer.Prometheus.Ports, "index", index) + log.CInfo(ctx, "MSG-TRANSFER server is initializing", "prometheusPorts", + config.MsgTransfer.Prometheus.Ports, "index", index) mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build()) if err != nil { return err @@ -84,7 +85,8 @@ func Start(ctx context.Context, index int, config *Config) error { return err } - client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) + client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) //todo MsgCacheTimeout msgModel := cache.NewMsgCache(rdb, 86400, config.RedisConfig.EnablePipeline) seqModel := cache.NewSeqCache(rdb) @@ -105,7 +107,8 @@ func Start(ctx context.Context, index int, config *Config) error { return msgTransfer.Start(index, config) } -func NewMsgTransfer(kafkaConf *config.Kafka, msgDatabase controller.CommonMsgDatabase, conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) (*MsgTransfer, error) { +func NewMsgTransfer(kafkaConf *config.Kafka, msgDatabase controller.CommonMsgDatabase, + conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) (*MsgTransfer, error) { historyCH, err := NewOnlineHistoryRedisConsumerHandler(kafkaConf, msgDatabase, conversationRpcClient, groupRpcClient) if err != nil { return nil, err diff --git a/pkg/common/cmd/api.go b/pkg/common/cmd/api.go index 460fd0ac9..022fb1097 100644 --- a/pkg/common/cmd/api.go +++ b/pkg/common/cmd/api.go @@ -34,7 +34,6 @@ func NewApiCmd() *ApiCmd { ret := &ApiCmd{apiConfig: &apiConfig} ret.configMap = map[string]any{ OpenIMAPICfgFileName: &apiConfig.RpcConfig, - RedisConfigFileName: &apiConfig.RedisConfig, ZookeeperConfigFileName: &apiConfig.ZookeeperConfig, ShareFileName: &apiConfig.Share, }