From a1f79f45081be2cbd9bc6e19844950a653bd8df0 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Thu, 15 Sep 2022 01:22:20 +0800 Subject: [PATCH 1/9] prome --- cmd/open_im_api/main.go | 4 ++ internal/msg_gateway/gate/relay_rpc_server.go | 10 ++- internal/push/logic/push_rpc_server.go | 7 +- internal/rpc/auth/auth.go | 7 +- internal/rpc/conversation/conversaion.go | 7 +- internal/rpc/friend/friend.go | 9 ++- internal/rpc/group/group.go | 5 ++ internal/rpc/msg/rpcChat.go | 23 ++---- internal/rpc/user/user.go | 7 +- pkg/common/kafka/producer.go | 6 ++ pkg/common/prometheus/gather.go | 70 ++++++++++++++++++- pkg/common/prometheus/grpc.go | 35 ++++++++++ pkg/common/prometheus/prometheus.go | 23 ++++++ 13 files changed, 187 insertions(+), 26 deletions(-) create mode 100644 pkg/common/prometheus/grpc.go diff --git a/cmd/open_im_api/main.go b/cmd/open_im_api/main.go index a4db0e616..8f45a38c8 100644 --- a/cmd/open_im_api/main.go +++ b/cmd/open_im_api/main.go @@ -52,6 +52,10 @@ func main() { log.Info("load config: ", config.Config) r.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler)) if config.Config.Prometheus.Enable { + promePkg.NewApiRequestCounter() + promePkg.NewApiRequestFailedCounter() + promePkg.NewApiRequestSuccessCounter() + r.Use(promePkg.PromeTheusMiddleware) r.GET("/metrics", promePkg.PrometheusHandler()) } // user routing group, which handles user registration and login services diff --git a/internal/msg_gateway/gate/relay_rpc_server.go b/internal/msg_gateway/gate/relay_rpc_server.go index 406cb991e..e201061e3 100644 --- a/internal/msg_gateway/gate/relay_rpc_server.go +++ b/internal/msg_gateway/gate/relay_rpc_server.go @@ -4,6 +4,7 @@ import ( "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/log" + promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/common/token_verify" "Open_IM/pkg/grpc-etcdv3/getcdv3" pbRelay "Open_IM/pkg/proto/relay" @@ -12,11 +13,12 @@ import ( "bytes" "context" "encoding/gob" - "github.com/golang/protobuf/proto" "net" "strconv" "strings" + "github.com/golang/protobuf/proto" + "github.com/gorilla/websocket" "google.golang.org/grpc" ) @@ -52,7 +54,11 @@ func (r *RPCServer) run() { panic("listening err:" + err.Error() + r.rpcRegisterName) } defer listener.Close() - srv := grpc.NewServer() + var grpcOpts []grpc.ServerOption + if config.Config.Prometheus.Enable { + grpcOpts = append(grpcOpts, promePkg.UnaryServerInterceptorProme) + } + srv := grpc.NewServer(grpcOpts...) defer srv.GracefulStop() pbRelay.RegisterRelayServer(srv, r) diff --git a/internal/push/logic/push_rpc_server.go b/internal/push/logic/push_rpc_server.go index 8fb99dbfb..a95afd24e 100644 --- a/internal/push/logic/push_rpc_server.go +++ b/internal/push/logic/push_rpc_server.go @@ -4,6 +4,7 @@ import ( "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/log" + promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/grpc-etcdv3/getcdv3" pbPush "Open_IM/pkg/proto/push" "Open_IM/pkg/utils" @@ -42,7 +43,11 @@ func (r *RPCServer) run() { panic("listening err:" + err.Error() + r.rpcRegisterName) } defer listener.Close() - srv := grpc.NewServer() + var grpcOpts []grpc.ServerOption + if config.Config.Prometheus.Enable { + grpcOpts = append(grpcOpts, promePkg.UnaryServerInterceptorProme) + } + srv := grpc.NewServer(grpcOpts...) defer srv.GracefulStop() pbPush.RegisterPushMsgServiceServer(srv, r) rpcRegisterIP := config.Config.RpcRegisterIP diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index 3f6a957db..46ec43f2b 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -126,8 +126,11 @@ func (rpc *rpcAuth) Run() { panic("listening err:" + err.Error() + rpc.rpcRegisterName) } log.NewInfo(operationID, "listen network success, ", address, listener) - //grpc server - srv := grpc.NewServer() + var grpcOpts []grpc.ServerOption + if config.Config.Prometheus.Enable { + grpcOpts = append(grpcOpts, promePkg.UnaryServerInterceptorProme) + } + srv := grpc.NewServer(grpcOpts...) defer srv.GracefulStop() //service registers with etcd diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index 3a6af935e..19cd5ead7 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -7,6 +7,7 @@ import ( imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model" rocksCache "Open_IM/pkg/common/db/rocks_cache" "Open_IM/pkg/common/log" + promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/grpc-etcdv3/getcdv3" pbConversation "Open_IM/pkg/proto/conversation" "Open_IM/pkg/utils" @@ -184,7 +185,11 @@ func (rpc *rpcConversation) Run() { } log.NewInfo("0", "listen network success, ", address, listener) //grpc server - srv := grpc.NewServer() + var grpcOpts []grpc.ServerOption + if config.Config.Prometheus.Enable { + grpcOpts = append(grpcOpts, promePkg.UnaryServerInterceptorProme) + } + srv := grpc.NewServer(grpcOpts...) defer srv.GracefulStop() //service registers with etcd diff --git a/internal/rpc/friend/friend.go b/internal/rpc/friend/friend.go index 29ec99eab..6f174c697 100644 --- a/internal/rpc/friend/friend.go +++ b/internal/rpc/friend/friend.go @@ -6,8 +6,9 @@ import ( "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db" imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model" - "Open_IM/pkg/common/db/rocks_cache" + rocksCache "Open_IM/pkg/common/db/rocks_cache" "Open_IM/pkg/common/log" + promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/common/token_verify" cp "Open_IM/pkg/common/utils" "Open_IM/pkg/grpc-etcdv3/getcdv3" @@ -60,7 +61,11 @@ func (s *friendServer) Run() { log.NewInfo("0", "listen ok ", address) defer listener.Close() //grpc server - srv := grpc.NewServer() + var grpcOpts []grpc.ServerOption + if config.Config.Prometheus.Enable { + grpcOpts = append(grpcOpts, promePkg.UnaryServerInterceptorProme) + } + srv := grpc.NewServer(grpcOpts...) defer srv.GracefulStop() //User friend related services register to etcd pbFriend.RegisterFriendServer(srv, s) diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index a36547c2c..062765069 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -8,6 +8,7 @@ import ( imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model" rocksCache "Open_IM/pkg/common/db/rocks_cache" "Open_IM/pkg/common/log" + promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/common/token_verify" cp "Open_IM/pkg/common/utils" "Open_IM/pkg/grpc-etcdv3/getcdv3" @@ -69,6 +70,10 @@ func (s *groupServer) Run() { grpc.MaxRecvMsgSize(recvSize), grpc.MaxSendMsgSize(sendSize), } + var grpcOpts []grpc.ServerOption + if config.Config.Prometheus.Enable { + grpcOpts = append(grpcOpts, promePkg.UnaryServerInterceptorProme) + } srv := grpc.NewServer(options...) defer srv.GracefulStop() //Service registers with etcd diff --git a/internal/rpc/msg/rpcChat.go b/internal/rpc/msg/rpcChat.go index 2c4c6270c..bbd249622 100644 --- a/internal/rpc/msg/rpcChat.go +++ b/internal/rpc/msg/rpcChat.go @@ -6,6 +6,7 @@ import ( "Open_IM/pkg/common/db" "Open_IM/pkg/common/kafka" "Open_IM/pkg/common/log" + promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/grpc-etcdv3/getcdv3" "Open_IM/pkg/proto/msg" "Open_IM/pkg/utils" @@ -13,16 +14,9 @@ import ( "strconv" "strings" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "google.golang.org/grpc" ) -var ( - sendMsgSuccessCounter prometheus.Counter - sendMsgFailedCounter prometheus.Counter -) - type rpcChat struct { rpcPort int rpcRegisterName string @@ -55,14 +49,7 @@ func NewRpcChatServer(port int) *rpcChat { } func (rpc *rpcChat) initPrometheus() { - sendMsgSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "send_msg_success", - Help: "The number of send msg success", - }) - sendMsgFailedCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "send_msg_failed", - Help: "The number of send msg failed", - }) + promePkg.NewSendMsgCount() } func (rpc *rpcChat) Run() { @@ -80,7 +67,11 @@ func (rpc *rpcChat) Run() { } log.Info("", "listen network success, address ", address) - srv := grpc.NewServer() + var grpcOpts []grpc.ServerOption + if config.Config.Prometheus.Enable { + grpcOpts = append(grpcOpts, promePkg.UnaryServerInterceptorProme) + } + srv := grpc.NewServer(grpcOpts...) defer srv.GracefulStop() rpcRegisterIP := config.Config.RpcRegisterIP diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index cf4e959a5..241e6d081 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -8,6 +8,7 @@ import ( imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model" rocksCache "Open_IM/pkg/common/db/rocks_cache" "Open_IM/pkg/common/log" + promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/common/token_verify" "Open_IM/pkg/grpc-etcdv3/getcdv3" pbConversation "Open_IM/pkg/proto/conversation" @@ -61,7 +62,11 @@ func (s *userServer) Run() { log.NewInfo("0", "listen network success, address ", address, listener) defer listener.Close() //grpc server - srv := grpc.NewServer() + var grpcOpts []grpc.ServerOption + if config.Config.Prometheus.Enable { + grpcOpts = append(grpcOpts, promePkg.UnaryServerInterceptorProme) + } + srv := grpc.NewServer(grpcOpts...) defer srv.GracefulStop() //Service registers with etcd pbUser.RegisterUserServer(srv, s) diff --git a/pkg/common/kafka/producer.go b/pkg/common/kafka/producer.go index 4eb10aebd..15c08ac04 100644 --- a/pkg/common/kafka/producer.go +++ b/pkg/common/kafka/producer.go @@ -4,8 +4,11 @@ import ( log "Open_IM/pkg/common/log" "Open_IM/pkg/utils" "errors" + "github.com/Shopify/sarama" "github.com/golang/protobuf/proto" + + promePkg "Open_IM/pkg/common/prometheus" ) type Producer struct { @@ -57,5 +60,8 @@ func (p *Producer) SendMessage(m proto.Message, key string, operationID string) } a, b, c := p.producer.SendMessage(kMsg) log.Info(operationID, "ByteEncoder SendMessage end", "key ", kMsg.Key.Length(), kMsg.Value.Length(), p.producer) + if c == nil { + promePkg.PromeInc(promePkg.SendMsgCounter) + } return a, b, utils.Wrap(c, "") } diff --git a/pkg/common/prometheus/gather.go b/pkg/common/prometheus/gather.go index b5d252e66..ccac941c2 100644 --- a/pkg/common/prometheus/gather.go +++ b/pkg/common/prometheus/gather.go @@ -6,7 +6,7 @@ import ( ) var ( - // user rpc + //auth rpc UserLoginCounter prometheus.Counter UserRegisterCounter prometheus.Counter @@ -14,6 +14,18 @@ var ( SeqGetFailedCounter prometheus.Counter SeqSetSuccessCounter prometheus.Counter SeqSetFailedCounter prometheus.Counter + + // api + ApiRequestCounter prometheus.Counter + ApiRequestSuccessCounter prometheus.Counter + ApiRequestFailedCounter prometheus.Counter + + // grpc + GrpcRequestCounter prometheus.Counter + GrpcRequestSuccessCounter prometheus.Counter + GrpcRequestFailedCounter prometheus.Counter + + SendMsgCounter prometheus.Counter ) func NewUserLoginCounter() { @@ -23,6 +35,13 @@ func NewUserLoginCounter() { }) } +func NewUserRegisterCounter() { + UserRegisterCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "user_register", + Help: "The number of user register", + }) +} + func NewSeqGetSuccessCounter() { SeqGetSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ Name: "seq_get_success", @@ -48,3 +67,52 @@ func NewSeqSetFailedCounter() { Help: "The number of failed set seq", }) } + +func NewApiRequestCounter() { + ApiRequestCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "api_request", + Help: "The number of api request", + }) +} + +func NewApiRequestSuccessCounter() { + ApiRequestSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "api_request_success", + Help: "The number of api request success", + }) +} + +func NewApiRequestFailedCounter() { + ApiRequestFailedCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "api_request_failed", + Help: "The number of api request failed", + }) +} + +func NewGrpcRequestCounter() { + GrpcRequestCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "grpc_request", + Help: "The number of api request", + }) +} + +func NewGrpcRequestSuccessCounter() { + GrpcRequestSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "grpc_request_success", + Help: "The number of grpc request success", + }) +} + +func NewGrpcRequestFailedCounter() { + GrpcRequestFailedCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "grpc_request_failed", + Help: "The number of grpc request failed", + }) +} + +func NewSendMsgCount() { + SendMsgCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "send_msg", + Help: "The number of send msg", + }) +} diff --git a/pkg/common/prometheus/grpc.go b/pkg/common/prometheus/grpc.go new file mode 100644 index 000000000..d0d513b00 --- /dev/null +++ b/pkg/common/prometheus/grpc.go @@ -0,0 +1,35 @@ +package prometheus + +import ( + "context" + "encoding/json" + "time" + + "Open_IM/pkg/common/log" + + "google.golang.org/grpc" + "google.golang.org/grpc/peer" +) + +func UnaryServerInterceptorProme(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + remote, _ := peer.FromContext(ctx) + remoteAddr := remote.Addr.String() + + in, _ := json.Marshal(req) + inStr := string(in) + log.NewInfo("ip", remoteAddr, "access_start", info.FullMethod, "in", inStr) + + start := time.Now() + defer func() { + out, _ := json.Marshal(resp) + outStr := string(out) + duration := int64(time.Since(start) / time.Millisecond) + if duration >= 500 { + log.NewInfo("ip", remoteAddr, "access_end", info.FullMethod, "in", inStr, "out", outStr, "err", err, "duration/ms", duration) + } else { + log.NewInfo("ip", remoteAddr, "access_end", info.FullMethod, "in", inStr, "out", outStr, "err", err, "duration/ms", duration) + } + }() + resp, err = handler(ctx, req) + return +} diff --git a/pkg/common/prometheus/prometheus.go b/pkg/common/prometheus/prometheus.go index a6b65fee0..4951823cc 100644 --- a/pkg/common/prometheus/prometheus.go +++ b/pkg/common/prometheus/prometheus.go @@ -2,6 +2,7 @@ package prometheus import ( "Open_IM/pkg/common/config" + "bytes" "net/http" "strconv" @@ -26,6 +27,28 @@ func PrometheusHandler() gin.HandlerFunc { } } +type responseBodyWriter struct { + gin.ResponseWriter + body *bytes.Buffer +} + +func (r responseBodyWriter) Write(b []byte) (int, error) { + r.body.Write(b) + return r.ResponseWriter.Write(b) +} + +func PromeTheusMiddleware(c *gin.Context) { + PromeInc(ApiRequestCounter) + w := &responseBodyWriter{body: &bytes.Buffer{}, ResponseWriter: c.Writer} + c.Writer = w + c.Next() + if c.Writer.Status() == http.StatusOK { + PromeInc(ApiRequestSuccessCounter) + } else { + PromeInc(ApiRequestFailedCounter) + } +} + func PromeInc(counter prometheus.Counter) { if config.Config.Prometheus.Enable { counter.Inc() From 07170135e6ec633bbec0f3d8f1f5f0ff2fe58fa2 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Thu, 15 Sep 2022 01:28:56 +0800 Subject: [PATCH 2/9] prome --- pkg/common/prometheus/gather.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/pkg/common/prometheus/gather.go b/pkg/common/prometheus/gather.go index cbca7ee61..9f6991e81 100644 --- a/pkg/common/prometheus/gather.go +++ b/pkg/common/prometheus/gather.go @@ -46,13 +46,6 @@ func NewUserRegisterCounter() { }) } -func NewUserRegisterCounter() { - UserRegisterCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "user_register", - Help: "The number of user register", - }) -} - func NewSeqGetSuccessCounter() { SeqGetSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ Name: "seq_get_success", From aa6b439e8c11dbff705b09adce1a30184e958b7d Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Thu, 15 Sep 2022 01:35:08 +0800 Subject: [PATCH 3/9] prome --- internal/msg_gateway/gate/relay_rpc_server.go | 2 +- internal/push/logic/push_rpc_server.go | 2 +- internal/rpc/auth/auth.go | 2 +- internal/rpc/conversation/conversaion.go | 2 +- internal/rpc/friend/friend.go | 2 +- internal/rpc/group/group.go | 2 +- internal/rpc/msg/rpcChat.go | 2 +- internal/rpc/user/user.go | 2 +- 8 files changed, 8 insertions(+), 8 deletions(-) diff --git a/internal/msg_gateway/gate/relay_rpc_server.go b/internal/msg_gateway/gate/relay_rpc_server.go index e201061e3..34aac553a 100644 --- a/internal/msg_gateway/gate/relay_rpc_server.go +++ b/internal/msg_gateway/gate/relay_rpc_server.go @@ -56,7 +56,7 @@ func (r *RPCServer) run() { defer listener.Close() var grpcOpts []grpc.ServerOption if config.Config.Prometheus.Enable { - grpcOpts = append(grpcOpts, promePkg.UnaryServerInterceptorProme) + grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme)) } srv := grpc.NewServer(grpcOpts...) defer srv.GracefulStop() diff --git a/internal/push/logic/push_rpc_server.go b/internal/push/logic/push_rpc_server.go index a95afd24e..8f0344a3e 100644 --- a/internal/push/logic/push_rpc_server.go +++ b/internal/push/logic/push_rpc_server.go @@ -45,7 +45,7 @@ func (r *RPCServer) run() { defer listener.Close() var grpcOpts []grpc.ServerOption if config.Config.Prometheus.Enable { - grpcOpts = append(grpcOpts, promePkg.UnaryServerInterceptorProme) + grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme)) } srv := grpc.NewServer(grpcOpts...) defer srv.GracefulStop() diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index 46ec43f2b..cbd88d8b0 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -128,7 +128,7 @@ func (rpc *rpcAuth) Run() { log.NewInfo(operationID, "listen network success, ", address, listener) var grpcOpts []grpc.ServerOption if config.Config.Prometheus.Enable { - grpcOpts = append(grpcOpts, promePkg.UnaryServerInterceptorProme) + grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme)) } srv := grpc.NewServer(grpcOpts...) defer srv.GracefulStop() diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index 19cd5ead7..b57c7c163 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -187,7 +187,7 @@ func (rpc *rpcConversation) Run() { //grpc server var grpcOpts []grpc.ServerOption if config.Config.Prometheus.Enable { - grpcOpts = append(grpcOpts, promePkg.UnaryServerInterceptorProme) + grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme)) } srv := grpc.NewServer(grpcOpts...) defer srv.GracefulStop() diff --git a/internal/rpc/friend/friend.go b/internal/rpc/friend/friend.go index 6f174c697..0d2a8e209 100644 --- a/internal/rpc/friend/friend.go +++ b/internal/rpc/friend/friend.go @@ -63,7 +63,7 @@ func (s *friendServer) Run() { //grpc server var grpcOpts []grpc.ServerOption if config.Config.Prometheus.Enable { - grpcOpts = append(grpcOpts, promePkg.UnaryServerInterceptorProme) + grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme)) } srv := grpc.NewServer(grpcOpts...) defer srv.GracefulStop() diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 062765069..380d6c021 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -72,7 +72,7 @@ func (s *groupServer) Run() { } var grpcOpts []grpc.ServerOption if config.Config.Prometheus.Enable { - grpcOpts = append(grpcOpts, promePkg.UnaryServerInterceptorProme) + grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme)) } srv := grpc.NewServer(options...) defer srv.GracefulStop() diff --git a/internal/rpc/msg/rpcChat.go b/internal/rpc/msg/rpcChat.go index bbd249622..f6280df37 100644 --- a/internal/rpc/msg/rpcChat.go +++ b/internal/rpc/msg/rpcChat.go @@ -69,7 +69,7 @@ func (rpc *rpcChat) Run() { var grpcOpts []grpc.ServerOption if config.Config.Prometheus.Enable { - grpcOpts = append(grpcOpts, promePkg.UnaryServerInterceptorProme) + grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme)) } srv := grpc.NewServer(grpcOpts...) defer srv.GracefulStop() diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index 241e6d081..7d980d4be 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -64,7 +64,7 @@ func (s *userServer) Run() { //grpc server var grpcOpts []grpc.ServerOption if config.Config.Prometheus.Enable { - grpcOpts = append(grpcOpts, promePkg.UnaryServerInterceptorProme) + grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme)) } srv := grpc.NewServer(grpcOpts...) defer srv.GracefulStop() From c091b5f3856df403a9e68337b3b6cdd98ae4f4c0 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Thu, 15 Sep 2022 01:46:22 +0800 Subject: [PATCH 4/9] prome --- docker-compose_cfg/grafana.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker-compose_cfg/grafana.ini b/docker-compose_cfg/grafana.ini index eeac17d6f..180de10af 100644 --- a/docker-compose_cfg/grafana.ini +++ b/docker-compose_cfg/grafana.ini @@ -1105,7 +1105,7 @@ disable_sanitize_html = false enable_alpha = false app_tls_skip_verify_insecure = false # Enter a comma-separated list of plugin identifiers to identify plugins to load even if they are unsigned. Plugins with modified signatures are never loaded. -allow_loading_unsigned_plugins = +allow_loading_unsigned_plugins = grafana-simple-json-backend-datasource # Enable or disable installing / uninstalling / updating plugins directly from within Grafana. plugin_admin_enabled = true plugin_admin_external_manage_enabled = false From c1a94a1b6c75de163e605aedf6c502c4e5f2aa70 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Thu, 15 Sep 2022 01:48:24 +0800 Subject: [PATCH 5/9] prome --- docker-compose.yaml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docker-compose.yaml b/docker-compose.yaml index 8a903890a..96fa42353 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -157,13 +157,14 @@ services: - ./docker-compose_cfg/grafana.ini:/etc/grafana/grafana.ini # - ./docker-compose_cfg/node-exporter-full_rev1.json:/var/lib/grafana/dashboards/node-exporter-full_rev1.json # - ./components/grafana:/var/lib/grafana - - ./docker-compose_cfg/grafana.db:/var/lib/grafana/grafana.db + # - ./docker-compose_cfg/grafana.db:/var/lib/grafana/grafana.db container_name: grafana ports: - 10007:10007 depends_on: - prometheus network_mode: "host" + # -rw-r----- node-exporter: image: quay.io/prometheus/node-exporter From 144cf279f9664c26c05499aac5e2ca5653432ca2 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Thu, 15 Sep 2022 08:45:10 +0800 Subject: [PATCH 6/9] prome --- internal/msg_gateway/gate/relay_rpc_server.go | 3 +++ internal/push/logic/push_rpc_server.go | 3 +++ internal/rpc/admin_cms/admin_cms.go | 11 +++++++++-- internal/rpc/auth/auth.go | 11 +++-------- internal/rpc/conversation/conversaion.go | 3 +++ internal/rpc/friend/friend.go | 3 +++ internal/rpc/group/group.go | 3 +++ internal/rpc/msg/rpcChat.go | 3 +++ internal/rpc/office/office.go | 11 +++++++++-- internal/rpc/organization/organization.go | 10 +++++++++- internal/rpc/user/user.go | 3 +++ 11 files changed, 51 insertions(+), 13 deletions(-) diff --git a/internal/msg_gateway/gate/relay_rpc_server.go b/internal/msg_gateway/gate/relay_rpc_server.go index 34aac553a..35623d17d 100644 --- a/internal/msg_gateway/gate/relay_rpc_server.go +++ b/internal/msg_gateway/gate/relay_rpc_server.go @@ -56,6 +56,9 @@ func (r *RPCServer) run() { defer listener.Close() var grpcOpts []grpc.ServerOption if config.Config.Prometheus.Enable { + promePkg.NewGrpcRequestCounter() + promePkg.NewGrpcRequestFailedCounter() + promePkg.NewGrpcRequestSuccessCounter() grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme)) } srv := grpc.NewServer(grpcOpts...) diff --git a/internal/push/logic/push_rpc_server.go b/internal/push/logic/push_rpc_server.go index 8f0344a3e..44269d504 100644 --- a/internal/push/logic/push_rpc_server.go +++ b/internal/push/logic/push_rpc_server.go @@ -45,6 +45,9 @@ func (r *RPCServer) run() { defer listener.Close() var grpcOpts []grpc.ServerOption if config.Config.Prometheus.Enable { + promePkg.NewGrpcRequestCounter() + promePkg.NewGrpcRequestFailedCounter() + promePkg.NewGrpcRequestSuccessCounter() grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme)) } srv := grpc.NewServer(grpcOpts...) diff --git a/internal/rpc/admin_cms/admin_cms.go b/internal/rpc/admin_cms/admin_cms.go index 6c477af16..bde6d421b 100644 --- a/internal/rpc/admin_cms/admin_cms.go +++ b/internal/rpc/admin_cms/admin_cms.go @@ -6,6 +6,7 @@ import ( "Open_IM/pkg/common/db" imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model" "Open_IM/pkg/common/log" + promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/common/token_verify" "Open_IM/pkg/grpc-etcdv3/getcdv3" pbAdminCMS "Open_IM/pkg/proto/admin_cms" @@ -57,8 +58,14 @@ func (s *adminCMSServer) Run() { } log.NewInfo("0", "listen network success, ", address, listener) defer listener.Close() - //grpc server - srv := grpc.NewServer() + var grpcOpts []grpc.ServerOption + if config.Config.Prometheus.Enable { + promePkg.NewGrpcRequestCounter() + promePkg.NewGrpcRequestFailedCounter() + promePkg.NewGrpcRequestSuccessCounter() + grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme)) + } + srv := grpc.NewServer(grpcOpts...) defer srv.GracefulStop() //Service registers with etcd pbAdminCMS.RegisterAdminCMSServer(srv, s) diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index cbd88d8b0..a23b7d3f4 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -22,11 +22,6 @@ import ( "google.golang.org/grpc" ) -func (rpc *rpcAuth) initPrometheus() { - promePkg.NewUserLoginCounter() - promePkg.NewUserRegisterCounter() -} - func (rpc *rpcAuth) UserRegister(_ context.Context, req *pbAuth.UserRegisterReq) (*pbAuth.UserRegisterResp, error) { log.NewInfo(req.OperationID, utils.GetSelfFuncName(), " rpc args ", req.String()) var user db.User @@ -128,6 +123,9 @@ func (rpc *rpcAuth) Run() { log.NewInfo(operationID, "listen network success, ", address, listener) var grpcOpts []grpc.ServerOption if config.Config.Prometheus.Enable { + promePkg.NewGrpcRequestCounter() + promePkg.NewGrpcRequestFailedCounter() + promePkg.NewGrpcRequestSuccessCounter() grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme)) } srv := grpc.NewServer(grpcOpts...) @@ -152,9 +150,6 @@ func (rpc *rpcAuth) Run() { } log.NewInfo(operationID, "RegisterAuthServer ok ", rpc.etcdSchema, strings.Join(rpc.etcdAddr, ","), rpcRegisterIP, rpc.rpcPort, rpc.rpcRegisterName) - if config.Config.Prometheus.Enable { - rpc.initPrometheus() - } err = srv.Serve(listener) if err != nil { log.NewError(operationID, "Serve failed ", err.Error()) diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index b57c7c163..ee04b1974 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -187,6 +187,9 @@ func (rpc *rpcConversation) Run() { //grpc server var grpcOpts []grpc.ServerOption if config.Config.Prometheus.Enable { + promePkg.NewGrpcRequestCounter() + promePkg.NewGrpcRequestFailedCounter() + promePkg.NewGrpcRequestSuccessCounter() grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme)) } srv := grpc.NewServer(grpcOpts...) diff --git a/internal/rpc/friend/friend.go b/internal/rpc/friend/friend.go index 0d2a8e209..696217cb8 100644 --- a/internal/rpc/friend/friend.go +++ b/internal/rpc/friend/friend.go @@ -63,6 +63,9 @@ func (s *friendServer) Run() { //grpc server var grpcOpts []grpc.ServerOption if config.Config.Prometheus.Enable { + promePkg.NewGrpcRequestCounter() + promePkg.NewGrpcRequestFailedCounter() + promePkg.NewGrpcRequestSuccessCounter() grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme)) } srv := grpc.NewServer(grpcOpts...) diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 380d6c021..2193cd89c 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -72,6 +72,9 @@ func (s *groupServer) Run() { } var grpcOpts []grpc.ServerOption if config.Config.Prometheus.Enable { + promePkg.NewGrpcRequestCounter() + promePkg.NewGrpcRequestFailedCounter() + promePkg.NewGrpcRequestSuccessCounter() grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme)) } srv := grpc.NewServer(options...) diff --git a/internal/rpc/msg/rpcChat.go b/internal/rpc/msg/rpcChat.go index f6280df37..22bd3840e 100644 --- a/internal/rpc/msg/rpcChat.go +++ b/internal/rpc/msg/rpcChat.go @@ -69,6 +69,9 @@ func (rpc *rpcChat) Run() { var grpcOpts []grpc.ServerOption if config.Config.Prometheus.Enable { + promePkg.NewGrpcRequestCounter() + promePkg.NewGrpcRequestFailedCounter() + promePkg.NewGrpcRequestSuccessCounter() grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme)) } srv := grpc.NewServer(grpcOpts...) diff --git a/internal/rpc/office/office.go b/internal/rpc/office/office.go index 741938ca3..4d12267e5 100644 --- a/internal/rpc/office/office.go +++ b/internal/rpc/office/office.go @@ -8,6 +8,7 @@ import ( "Open_IM/pkg/common/db/mysql_model/im_mysql_model" imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model" "Open_IM/pkg/common/log" + promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/grpc-etcdv3/getcdv3" pbCache "Open_IM/pkg/proto/cache" pbOffice "Open_IM/pkg/proto/office" @@ -63,11 +64,17 @@ func (s *officeServer) Run() { //grpc server recvSize := 1024 * 1024 * 30 sendSize := 1024 * 1024 * 30 - var options = []grpc.ServerOption{ + var grpcOpts = []grpc.ServerOption{ grpc.MaxRecvMsgSize(recvSize), grpc.MaxSendMsgSize(sendSize), } - srv := grpc.NewServer(options...) + if config.Config.Prometheus.Enable { + promePkg.NewGrpcRequestCounter() + promePkg.NewGrpcRequestFailedCounter() + promePkg.NewGrpcRequestSuccessCounter() + grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme)) + } + srv := grpc.NewServer(grpcOpts...) defer srv.GracefulStop() //Service registers with etcd pbOffice.RegisterOfficeServiceServer(srv, s) diff --git a/internal/rpc/organization/organization.go b/internal/rpc/organization/organization.go index 89cd15321..327e40003 100644 --- a/internal/rpc/organization/organization.go +++ b/internal/rpc/organization/organization.go @@ -8,6 +8,7 @@ import ( imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model" rocksCache "Open_IM/pkg/common/db/rocks_cache" "Open_IM/pkg/common/log" + promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/common/token_verify" "Open_IM/pkg/grpc-etcdv3/getcdv3" pbAuth "Open_IM/pkg/proto/auth" @@ -58,7 +59,14 @@ func (s *organizationServer) Run() { log.NewInfo("", "listen network success, ", address, listener) defer listener.Close() //grpc server - srv := grpc.NewServer() + var grpcOpts []grpc.ServerOption + if config.Config.Prometheus.Enable { + promePkg.NewGrpcRequestCounter() + promePkg.NewGrpcRequestFailedCounter() + promePkg.NewGrpcRequestSuccessCounter() + grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme)) + } + srv := grpc.NewServer(grpcOpts...) defer srv.GracefulStop() //Service registers with etcd rpc.RegisterOrganizationServer(srv, s) diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index 7d980d4be..080a50b7f 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -64,6 +64,9 @@ func (s *userServer) Run() { //grpc server var grpcOpts []grpc.ServerOption if config.Config.Prometheus.Enable { + promePkg.NewGrpcRequestCounter() + promePkg.NewGrpcRequestFailedCounter() + promePkg.NewGrpcRequestSuccessCounter() grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme)) } srv := grpc.NewServer(grpcOpts...) From 355c672ed18ebaf978c05f304bad823f9fcf4a98 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Thu, 15 Sep 2022 08:50:29 +0800 Subject: [PATCH 7/9] prome --- docker-compose.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker-compose.yaml b/docker-compose.yaml index 96fa42353..8692c7ba0 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -157,7 +157,7 @@ services: - ./docker-compose_cfg/grafana.ini:/etc/grafana/grafana.ini # - ./docker-compose_cfg/node-exporter-full_rev1.json:/var/lib/grafana/dashboards/node-exporter-full_rev1.json # - ./components/grafana:/var/lib/grafana - # - ./docker-compose_cfg/grafana.db:/var/lib/grafana/grafana.db + - ./docker-compose_cfg/grafana.db:/var/lib/grafana/grafana.db container_name: grafana ports: - 10007:10007 From 9afaba4382ea9b29f1bf65d96623109f40f4cc0c Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Thu, 15 Sep 2022 09:16:08 +0800 Subject: [PATCH 8/9] prome --- internal/msg_transfer/logic/init.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/internal/msg_transfer/logic/init.go b/internal/msg_transfer/logic/init.go index aa7ba0b9a..201c6ee41 100644 --- a/internal/msg_transfer/logic/init.go +++ b/internal/msg_transfer/logic/init.go @@ -37,7 +37,9 @@ var ( func Init() { cmdCh = make(chan Cmd2Value, 10000) w = new(sync.Mutex) - initPrometheus() + if config.Config.Prometheus.Enable { + initPrometheus() + } persistentCH.Init() // ws2mschat save mysql historyCH.Init(cmdCh) // historyMongoCH.Init() From 7f1acb45320173aa9c0471b0896a28ec7db3d29f Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Thu, 15 Sep 2022 09:30:23 +0800 Subject: [PATCH 9/9] prome --- internal/rpc/auth/auth.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index a23b7d3f4..498edf609 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -126,6 +126,8 @@ func (rpc *rpcAuth) Run() { promePkg.NewGrpcRequestCounter() promePkg.NewGrpcRequestFailedCounter() promePkg.NewGrpcRequestSuccessCounter() + promePkg.NewUserRegisterCounter() + promePkg.NewUserLoginCounter() grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme)) } srv := grpc.NewServer(grpcOpts...)