diff --git a/cmd/main.go b/cmd/main.go index 4a693f31c..e0e73888d 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -5,6 +5,7 @@ import ( "context" "flag" "fmt" + "net" "os" "os/signal" "path" @@ -29,9 +30,11 @@ import ( "github.com/openimsdk/open-im-server/v3/internal/rpc/third" "github.com/openimsdk/open-im-server/v3/internal/rpc/user" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/discovery/standalone" "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/utils/datautil" "github.com/spf13/viper" "google.golang.org/grpc" ) @@ -192,11 +195,39 @@ func (x *cmds) run(ctx context.Context) error { if err := x.initAllConfig(); err != nil { return err } + ctx, cancel := context.WithCancelCause(ctx) + if prometheus := x.config.API.Prometheus; prometheus.Enable { + var ( + port int + err error + ) + if !prometheus.AutoSetPorts { + port, err = datautil.GetElemByIndex(prometheus.Ports, 0) + if err != nil { + return err + } + } + listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) + if err != nil { + return fmt.Errorf("prometheus listen %d error %w", port, err) + } + defer listener.Close() + prommetrics.RegistryAll() + log.ZDebug(ctx, "prometheus start", "addr", listener.Addr()) + go func() { + err := prommetrics.Start(listener) + if err == nil { + err = fmt.Errorf("http done") + } + cancel(fmt.Errorf("prometheus %w", err)) + }() + } + go func() { sigs := make(chan os.Signal, 1) - signal.Notify(sigs, syscall.SIGTERM) + signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL) select { case <-ctx.Done(): return diff --git a/internal/api/init.go b/internal/api/init.go index 3c71c6b4c..7bb8cc1ac 100644 --- a/internal/api/init.go +++ b/internal/api/init.go @@ -23,7 +23,6 @@ import ( "os" "os/signal" "strconv" - "sync" "syscall" "time" @@ -49,87 +48,23 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, service g return err } - //client, err := kdisc.NewDiscoveryRegister(&config.Discovery, []string{ - // config.Discovery.RpcService.MessageGateway, - //}) - //if err != nil { - // return errs.WrapMsg(err, "failed to register discovery service") - //} - //client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) - - //registerIP, err := network.GetRpcRegisterIP("") - //if err != nil { - // return err - //} - // - // todo - //getAutoPort := func() (net.Listener, int, error) { - // registerAddr := net.JoinHostPort(registerIP, "0") - // listener, err := net.Listen("tcp", registerAddr) - // if err != nil { - // return nil, 0, errs.WrapMsg(err, "listen err", "registerAddr", registerAddr) - // } - // _, portStr, _ := net.SplitHostPort(listener.Addr().String()) - // port, _ := strconv.Atoi(portStr) - // return listener, port, nil - //} - // - //if config.API.Prometheus.AutoSetPorts && config.Discovery.Enable != conf.ETCD { - // return errs.New("only etcd support autoSetPorts", "RegisterName", "api").Wrap() - //} - router, err := newGinRouter(ctx, client, config) if err != nil { return err } - //if config.API.Prometheus.Enable { - // var ( - // listener net.Listener - // ) - // - // if config.API.Prometheus.AutoSetPorts { - // listener, prometheusPort, err = getAutoPort() - // if err != nil { - // return err - // } - // - // etcdClient := client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() - // - // _, err = etcdClient.Put(ctx, prommetrics.BuildDiscoveryKey(prommetrics.APIKeyName), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort))) - // if err != nil { - // return errs.WrapMsg(err, "etcd put err") - // } - // } else { - // prometheusPort, err = datautil.GetElemByIndex(config.API.Prometheus.Ports, index) - // if err != nil { - // return err - // } - // listener, err = net.Listen("tcp", fmt.Sprintf(":%d", prometheusPort)) - // if err != nil { - // return errs.WrapMsg(err, "listen err", "addr", fmt.Sprintf(":%d", prometheusPort)) - // } - // } - // - // go func() { - // if err := prommetrics.ApiInit(listener); err != nil && !errors.Is(err, http.ErrServerClosed) { - // netErr = errs.WrapMsg(err, fmt.Sprintf("api prometheus start err: %d", prometheusPort)) - // netDone <- struct{}{} - // } - // }() - // - //} - var wg sync.WaitGroup + ctx, cancel := context.WithCancelCause(ctx) + done := make(chan struct{}) go func() { - wg.Add(1) httpServer := &http.Server{ Handler: router, Addr: net.JoinHostPort(network.GetListenIP(config.API.Api.ListenIP), strconv.Itoa(apiPort)), } log.CInfo(ctx, "api server is init", "runtimeEnv", runtimeenv.RuntimeEnvironment(), "address", httpServer.Addr, "apiPort", apiPort) go func() { - defer wg.Done() + defer close(done) <-ctx.Done() + log.ZDebug(ctx, "api server is shutting down") if err := httpServer.Shutdown(context.Background()); err != nil { log.ZWarn(ctx, "api server shutdown err", err) } @@ -156,11 +91,6 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, service g } exitCause := context.Cause(ctx) log.ZWarn(ctx, "api server exit", exitCause) - done := make(chan struct{}) - go func() { - wg.Wait() - close(done) - }() timer := time.NewTimer(time.Second * 15) defer timer.Stop() select { diff --git a/internal/api/router.go b/internal/api/router.go index 9febaade0..e8c1173a5 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -303,7 +303,6 @@ func newGinRouter(ctx context.Context, client discovery.Conn, cfg *Config) (*gin } cm := NewConfigManager(cfg.Share.IMAdminUserID, &cfg.AllConfig, etcdClient, string(cfg.ConfigPath)) { - configGroup := r.Group("/config", cm.CheckAdmin) configGroup.POST("/get_config_list", cm.GetConfigList) configGroup.POST("/get_config", cm.GetConfig) diff --git a/internal/msggateway/ws_server.go b/internal/msggateway/ws_server.go index 3f7fd7ec0..304cb3b5c 100644 --- a/internal/msggateway/ws_server.go +++ b/internal/msggateway/ws_server.go @@ -21,7 +21,6 @@ import ( "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/msggateway" "github.com/openimsdk/tools/discovery" - "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/utils/stringutil" "golang.org/x/sync/errgroup" @@ -175,27 +174,33 @@ func (ws *WsServer) Run(ctx context.Context) error { } }() - server := http.Server{Addr: fmt.Sprintf(":%d", ws.port), Handler: nil} + done := make(chan struct{}) go func() { + wsServer := http.Server{Addr: fmt.Sprintf(":%d", ws.port), Handler: nil} http.HandleFunc("/", ws.wsHandler) - err := server.ListenAndServe() + go func() { + defer close(done) + <-ctx.Done() + _ = wsServer.Shutdown(context.Background()) + }() + err := wsServer.ListenAndServe() if err == nil { err = fmt.Errorf("http server closed") } cancel(fmt.Errorf("msg gateway %w", err)) }() - shutDown := func() error { - ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) - defer cancel() - sErr := server.Shutdown(ctx) - if sErr != nil { - return errs.WrapMsg(sErr, "shutdown err") - } - return nil - } <-ctx.Done() - return shutDown() + + timeout := time.NewTimer(time.Second * 15) + defer timeout.Stop() + select { + case <-timeout.C: + log.ZWarn(ctx, "msg gateway graceful stop timeout", nil) + case <-done: + log.ZDebug(ctx, "msg gateway graceful stop done") + } + return context.Cause(ctx) } const concurrentRequest = 3 diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 3c73b5032..3ca6c6b46 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -155,10 +155,10 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, server gr historyMongoHandler: historyMongoHandler, } - return msgTransfer.Start(int(config.Index), config, client) + return msgTransfer.Start(ctx, int(config.Index), config, client) } -func (m *MsgTransfer) Start(index int, config *Config, client discovery.Conn) error { +func (m *MsgTransfer) Start(ctx context.Context, index int, config *Config, client discovery.Conn) error { m.ctx, m.cancel = context.WithCancel(context.Background()) var ( netDone = make(chan struct{}, 1) @@ -254,6 +254,7 @@ func (m *MsgTransfer) Start(index int, config *Config, client discovery.Conn) er }() } + // todo sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGTERM) select { diff --git a/pkg/common/prommetrics/api.go b/pkg/common/prommetrics/api.go index 2dc5cb65d..b1368f511 100644 --- a/pkg/common/prommetrics/api.go +++ b/pkg/common/prommetrics/api.go @@ -1,10 +1,11 @@ package prommetrics import ( - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" "net" "strconv" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" ) var ( @@ -24,6 +25,10 @@ var ( ) ) +func RegistryApi() { + registry.MustRegister(apiCounter, httpCounter) +} + func ApiInit(listener net.Listener) error { apiRegistry := prometheus.NewRegistry() cs := append( @@ -41,9 +46,3 @@ func APICall(path string, method string, apiCode int) { func HttpCall(path string, method string, status int) { httpCounter.With(prometheus.Labels{"path": path, "method": method, "status": strconv.Itoa(status)}).Inc() } - -//func ApiHandler() http.Handler { -// return promhttp.InstrumentMetricHandler( -// apiRegistry, promhttp.HandlerFor(apiRegistry, promhttp.HandlerOpts{}), -// ) -//} diff --git a/pkg/common/prommetrics/discovery.go b/pkg/common/prommetrics/discovery.go deleted file mode 100644 index 8f03bc2ae..000000000 --- a/pkg/common/prommetrics/discovery.go +++ /dev/null @@ -1,31 +0,0 @@ -package prommetrics - -import "fmt" - -const ( - APIKeyName = "api" - MessageTransferKeyName = "message-transfer" -) - -type Target struct { - Target string `json:"target"` - Labels map[string]string `json:"labels"` -} - -type RespTarget struct { - Targets []string `json:"targets"` - Labels map[string]string `json:"labels"` -} - -func BuildDiscoveryKey(name string) string { - return fmt.Sprintf("%s/%s/%s", "openim", "prometheus_discovery", name) -} - -func BuildDefaultTarget(host string, ip int) Target { - return Target{ - Target: fmt.Sprintf("%s:%d", host, ip), - Labels: map[string]string{ - "namespace": "default", - }, - } -} diff --git a/pkg/common/prommetrics/doc.go b/pkg/common/prommetrics/doc.go deleted file mode 100644 index c5108b4cb..000000000 --- a/pkg/common/prommetrics/doc.go +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright © 2024 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package prommetrics // import "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" diff --git a/pkg/common/prommetrics/grpc_auth.go b/pkg/common/prommetrics/grpc_auth.go index 30dd5f1b1..a102c5d26 100644 --- a/pkg/common/prommetrics/grpc_auth.go +++ b/pkg/common/prommetrics/grpc_auth.go @@ -24,3 +24,7 @@ var ( Help: "The number of user login", }) ) + +func RegistryAuth() { + registry.MustRegister(UserLoginCounter) +} diff --git a/pkg/common/prommetrics/grpc_msg.go b/pkg/common/prommetrics/grpc_msg.go index 758879b90..909fddd3d 100644 --- a/pkg/common/prommetrics/grpc_msg.go +++ b/pkg/common/prommetrics/grpc_msg.go @@ -36,3 +36,12 @@ var ( Help: "The number of group chat msg failed processed", }) ) + +func RegistryMsg() { + registry.MustRegister( + SingleChatMsgProcessSuccessCounter, + SingleChatMsgProcessFailedCounter, + GroupChatMsgProcessSuccessCounter, + GroupChatMsgProcessFailedCounter, + ) +} diff --git a/pkg/common/prommetrics/grpc_msggateway.go b/pkg/common/prommetrics/grpc_msggateway.go index 98d5a3267..0377b2fa9 100644 --- a/pkg/common/prommetrics/grpc_msggateway.go +++ b/pkg/common/prommetrics/grpc_msggateway.go @@ -24,3 +24,7 @@ var ( Help: "The number of online user num", }) ) + +func RegistryMsgGateway() { + registry.MustRegister(OnlineUserGauge) +} diff --git a/pkg/common/prommetrics/grpc_push.go b/pkg/common/prommetrics/grpc_push.go index 5c966310f..c6280ec76 100644 --- a/pkg/common/prommetrics/grpc_push.go +++ b/pkg/common/prommetrics/grpc_push.go @@ -28,3 +28,10 @@ var ( Help: "The number of messages with a push time exceeding 10 seconds", }) ) + +func RegistryPush() { + registry.MustRegister( + MsgOfflinePushFailedCounter, + MsgLoneTimePushCounter, + ) +} diff --git a/pkg/common/prommetrics/grpc_user.go b/pkg/common/prommetrics/grpc_user.go index cc2fc42e6..1c8c94c7a 100644 --- a/pkg/common/prommetrics/grpc_user.go +++ b/pkg/common/prommetrics/grpc_user.go @@ -8,3 +8,7 @@ var ( Help: "The number of user login", }) ) + +func RegistryUser() { + registry.MustRegister(UserRegisterCounter) +} diff --git a/pkg/common/prommetrics/prommetrics.go b/pkg/common/prommetrics/prommetrics.go index 2fc5d76b4..153314bbb 100644 --- a/pkg/common/prommetrics/prommetrics.go +++ b/pkg/common/prommetrics/prommetrics.go @@ -15,14 +15,42 @@ package prommetrics import ( - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/collectors" + "errors" + "fmt" "net" "net/http" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/collectors" + "github.com/prometheus/client_golang/prometheus/promhttp" ) const commonPath = "/metrics" +var registry = &prometheusRegistry{prometheus.NewRegistry()} + +type prometheusRegistry struct { + *prometheus.Registry +} + +func (x *prometheusRegistry) MustRegister(cs ...prometheus.Collector) { + for _, c := range cs { + if err := x.Registry.Register(c); err != nil { + if errors.As(err, &prometheus.AlreadyRegisteredError{}) { + continue + } + panic(err) + } + } +} + +func init() { + registry.MustRegister( + collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}), + collectors.NewGoCollector(), + ) +} + var ( baseCollector = []prometheus.Collector{ collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}), @@ -36,3 +64,48 @@ func Init(registry *prometheus.Registry, listener net.Listener, path string, han srv.Handle(path, handler) return http.Serve(listener, srv) } + +func RegistryAll() { + RegistryApi() + RegistryAuth() + RegistryMsg() + RegistryMsgGateway() + RegistryPush() + RegistryUser() + RegistryRpc() + RegistryTransfer() +} + +func Start(listener net.Listener) error { + srv := http.NewServeMux() + srv.Handle(commonPath, promhttp.HandlerFor(registry, promhttp.HandlerOpts{})) + return http.Serve(listener, srv) +} + +const ( + APIKeyName = "api" + MessageTransferKeyName = "message-transfer" +) + +type Target struct { + Target string `json:"target"` + Labels map[string]string `json:"labels"` +} + +type RespTarget struct { + Targets []string `json:"targets"` + Labels map[string]string `json:"labels"` +} + +func BuildDiscoveryKey(name string) string { + return fmt.Sprintf("%s/%s/%s", "openim", "prometheus_discovery", name) +} + +func BuildDefaultTarget(host string, ip int) Target { + return Target{ + Target: fmt.Sprintf("%s:%d", host, ip), + Labels: map[string]string{ + "namespace": "default", + }, + } +} diff --git a/pkg/common/prommetrics/prommetrics_test.go b/pkg/common/prommetrics/prommetrics_test.go index 14b1aaff3..be2dff7f2 100644 --- a/pkg/common/prommetrics/prommetrics_test.go +++ b/pkg/common/prommetrics/prommetrics_test.go @@ -14,6 +14,8 @@ package prommetrics +import "testing" + //func TestNewGrpcPromObj(t *testing.T) { // // Create a custom metric to pass into the NewGrpcPromObj function. // customMetric := prometheus.NewCounter(prometheus.CounterOpts{ @@ -67,3 +69,9 @@ package prommetrics // }) // } //} + +func TestName(t *testing.T) { + RegistryApi() + RegistryApi() + +} diff --git a/pkg/common/prommetrics/rpc.go b/pkg/common/prommetrics/rpc.go index 3f115d30b..a04e4fd5e 100644 --- a/pkg/common/prommetrics/rpc.go +++ b/pkg/common/prommetrics/rpc.go @@ -1,12 +1,13 @@ package prommetrics import ( + "net" + "strconv" + gp "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" - "net" - "strconv" ) const rpcPath = commonPath @@ -22,6 +23,10 @@ var ( ) ) +func RegistryRpc() { + registry.MustRegister(rpcCounter) +} + func RpcInit(cs []prometheus.Collector, listener net.Listener) error { reg := prometheus.NewRegistry() cs = append(append( diff --git a/pkg/common/prommetrics/transfer.go b/pkg/common/prommetrics/transfer.go index 36fe1d568..51a4ca872 100644 --- a/pkg/common/prommetrics/transfer.go +++ b/pkg/common/prommetrics/transfer.go @@ -15,9 +15,10 @@ package prommetrics import ( + "net" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" - "net" ) var ( @@ -43,6 +44,16 @@ var ( }) ) +func RegistryTransfer() { + registry.MustRegister( + MsgInsertRedisSuccessCounter, + MsgInsertRedisFailedCounter, + MsgInsertMongoSuccessCounter, + MsgInsertMongoFailedCounter, + SeqSetFailedCounter, + ) +} + func TransferInit(listener net.Listener) error { reg := prometheus.NewRegistry() cs := append(