From a1f79f45081be2cbd9bc6e19844950a653bd8df0 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Thu, 15 Sep 2022 01:22:20 +0800 Subject: [PATCH] prome --- cmd/open_im_api/main.go | 4 ++ internal/msg_gateway/gate/relay_rpc_server.go | 10 ++- internal/push/logic/push_rpc_server.go | 7 +- internal/rpc/auth/auth.go | 7 +- internal/rpc/conversation/conversaion.go | 7 +- internal/rpc/friend/friend.go | 9 ++- internal/rpc/group/group.go | 5 ++ internal/rpc/msg/rpcChat.go | 23 ++---- internal/rpc/user/user.go | 7 +- pkg/common/kafka/producer.go | 6 ++ pkg/common/prometheus/gather.go | 70 ++++++++++++++++++- pkg/common/prometheus/grpc.go | 35 ++++++++++ pkg/common/prometheus/prometheus.go | 23 ++++++ 13 files changed, 187 insertions(+), 26 deletions(-) create mode 100644 pkg/common/prometheus/grpc.go diff --git a/cmd/open_im_api/main.go b/cmd/open_im_api/main.go index a4db0e616..8f45a38c8 100644 --- a/cmd/open_im_api/main.go +++ b/cmd/open_im_api/main.go @@ -52,6 +52,10 @@ func main() { log.Info("load config: ", config.Config) r.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler)) if config.Config.Prometheus.Enable { + promePkg.NewApiRequestCounter() + promePkg.NewApiRequestFailedCounter() + promePkg.NewApiRequestSuccessCounter() + r.Use(promePkg.PromeTheusMiddleware) r.GET("/metrics", promePkg.PrometheusHandler()) } // user routing group, which handles user registration and login services diff --git a/internal/msg_gateway/gate/relay_rpc_server.go b/internal/msg_gateway/gate/relay_rpc_server.go index 406cb991e..e201061e3 100644 --- a/internal/msg_gateway/gate/relay_rpc_server.go +++ b/internal/msg_gateway/gate/relay_rpc_server.go @@ -4,6 +4,7 @@ import ( "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/log" + promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/common/token_verify" "Open_IM/pkg/grpc-etcdv3/getcdv3" pbRelay "Open_IM/pkg/proto/relay" @@ -12,11 +13,12 @@ import ( "bytes" "context" "encoding/gob" - "github.com/golang/protobuf/proto" "net" "strconv" "strings" + "github.com/golang/protobuf/proto" + "github.com/gorilla/websocket" "google.golang.org/grpc" ) @@ -52,7 +54,11 @@ func (r *RPCServer) run() { panic("listening err:" + err.Error() + r.rpcRegisterName) } defer listener.Close() - srv := grpc.NewServer() + var grpcOpts []grpc.ServerOption + if config.Config.Prometheus.Enable { + grpcOpts = append(grpcOpts, promePkg.UnaryServerInterceptorProme) + } + srv := grpc.NewServer(grpcOpts...) defer srv.GracefulStop() pbRelay.RegisterRelayServer(srv, r) diff --git a/internal/push/logic/push_rpc_server.go b/internal/push/logic/push_rpc_server.go index 8fb99dbfb..a95afd24e 100644 --- a/internal/push/logic/push_rpc_server.go +++ b/internal/push/logic/push_rpc_server.go @@ -4,6 +4,7 @@ import ( "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/log" + promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/grpc-etcdv3/getcdv3" pbPush "Open_IM/pkg/proto/push" "Open_IM/pkg/utils" @@ -42,7 +43,11 @@ func (r *RPCServer) run() { panic("listening err:" + err.Error() + r.rpcRegisterName) } defer listener.Close() - srv := grpc.NewServer() + var grpcOpts []grpc.ServerOption + if config.Config.Prometheus.Enable { + grpcOpts = append(grpcOpts, promePkg.UnaryServerInterceptorProme) + } + srv := grpc.NewServer(grpcOpts...) defer srv.GracefulStop() pbPush.RegisterPushMsgServiceServer(srv, r) rpcRegisterIP := config.Config.RpcRegisterIP diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index 3f6a957db..46ec43f2b 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -126,8 +126,11 @@ func (rpc *rpcAuth) Run() { panic("listening err:" + err.Error() + rpc.rpcRegisterName) } log.NewInfo(operationID, "listen network success, ", address, listener) - //grpc server - srv := grpc.NewServer() + var grpcOpts []grpc.ServerOption + if config.Config.Prometheus.Enable { + grpcOpts = append(grpcOpts, promePkg.UnaryServerInterceptorProme) + } + srv := grpc.NewServer(grpcOpts...) defer srv.GracefulStop() //service registers with etcd diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index 3a6af935e..19cd5ead7 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -7,6 +7,7 @@ import ( imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model" rocksCache "Open_IM/pkg/common/db/rocks_cache" "Open_IM/pkg/common/log" + promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/grpc-etcdv3/getcdv3" pbConversation "Open_IM/pkg/proto/conversation" "Open_IM/pkg/utils" @@ -184,7 +185,11 @@ func (rpc *rpcConversation) Run() { } log.NewInfo("0", "listen network success, ", address, listener) //grpc server - srv := grpc.NewServer() + var grpcOpts []grpc.ServerOption + if config.Config.Prometheus.Enable { + grpcOpts = append(grpcOpts, promePkg.UnaryServerInterceptorProme) + } + srv := grpc.NewServer(grpcOpts...) defer srv.GracefulStop() //service registers with etcd diff --git a/internal/rpc/friend/friend.go b/internal/rpc/friend/friend.go index 29ec99eab..6f174c697 100644 --- a/internal/rpc/friend/friend.go +++ b/internal/rpc/friend/friend.go @@ -6,8 +6,9 @@ import ( "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db" imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model" - "Open_IM/pkg/common/db/rocks_cache" + rocksCache "Open_IM/pkg/common/db/rocks_cache" "Open_IM/pkg/common/log" + promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/common/token_verify" cp "Open_IM/pkg/common/utils" "Open_IM/pkg/grpc-etcdv3/getcdv3" @@ -60,7 +61,11 @@ func (s *friendServer) Run() { log.NewInfo("0", "listen ok ", address) defer listener.Close() //grpc server - srv := grpc.NewServer() + var grpcOpts []grpc.ServerOption + if config.Config.Prometheus.Enable { + grpcOpts = append(grpcOpts, promePkg.UnaryServerInterceptorProme) + } + srv := grpc.NewServer(grpcOpts...) defer srv.GracefulStop() //User friend related services register to etcd pbFriend.RegisterFriendServer(srv, s) diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index a36547c2c..062765069 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -8,6 +8,7 @@ import ( imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model" rocksCache "Open_IM/pkg/common/db/rocks_cache" "Open_IM/pkg/common/log" + promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/common/token_verify" cp "Open_IM/pkg/common/utils" "Open_IM/pkg/grpc-etcdv3/getcdv3" @@ -69,6 +70,10 @@ func (s *groupServer) Run() { grpc.MaxRecvMsgSize(recvSize), grpc.MaxSendMsgSize(sendSize), } + var grpcOpts []grpc.ServerOption + if config.Config.Prometheus.Enable { + grpcOpts = append(grpcOpts, promePkg.UnaryServerInterceptorProme) + } srv := grpc.NewServer(options...) defer srv.GracefulStop() //Service registers with etcd diff --git a/internal/rpc/msg/rpcChat.go b/internal/rpc/msg/rpcChat.go index 2c4c6270c..bbd249622 100644 --- a/internal/rpc/msg/rpcChat.go +++ b/internal/rpc/msg/rpcChat.go @@ -6,6 +6,7 @@ import ( "Open_IM/pkg/common/db" "Open_IM/pkg/common/kafka" "Open_IM/pkg/common/log" + promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/grpc-etcdv3/getcdv3" "Open_IM/pkg/proto/msg" "Open_IM/pkg/utils" @@ -13,16 +14,9 @@ import ( "strconv" "strings" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "google.golang.org/grpc" ) -var ( - sendMsgSuccessCounter prometheus.Counter - sendMsgFailedCounter prometheus.Counter -) - type rpcChat struct { rpcPort int rpcRegisterName string @@ -55,14 +49,7 @@ func NewRpcChatServer(port int) *rpcChat { } func (rpc *rpcChat) initPrometheus() { - sendMsgSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "send_msg_success", - Help: "The number of send msg success", - }) - sendMsgFailedCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "send_msg_failed", - Help: "The number of send msg failed", - }) + promePkg.NewSendMsgCount() } func (rpc *rpcChat) Run() { @@ -80,7 +67,11 @@ func (rpc *rpcChat) Run() { } log.Info("", "listen network success, address ", address) - srv := grpc.NewServer() + var grpcOpts []grpc.ServerOption + if config.Config.Prometheus.Enable { + grpcOpts = append(grpcOpts, promePkg.UnaryServerInterceptorProme) + } + srv := grpc.NewServer(grpcOpts...) defer srv.GracefulStop() rpcRegisterIP := config.Config.RpcRegisterIP diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index cf4e959a5..241e6d081 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -8,6 +8,7 @@ import ( imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model" rocksCache "Open_IM/pkg/common/db/rocks_cache" "Open_IM/pkg/common/log" + promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/common/token_verify" "Open_IM/pkg/grpc-etcdv3/getcdv3" pbConversation "Open_IM/pkg/proto/conversation" @@ -61,7 +62,11 @@ func (s *userServer) Run() { log.NewInfo("0", "listen network success, address ", address, listener) defer listener.Close() //grpc server - srv := grpc.NewServer() + var grpcOpts []grpc.ServerOption + if config.Config.Prometheus.Enable { + grpcOpts = append(grpcOpts, promePkg.UnaryServerInterceptorProme) + } + srv := grpc.NewServer(grpcOpts...) defer srv.GracefulStop() //Service registers with etcd pbUser.RegisterUserServer(srv, s) diff --git a/pkg/common/kafka/producer.go b/pkg/common/kafka/producer.go index 4eb10aebd..15c08ac04 100644 --- a/pkg/common/kafka/producer.go +++ b/pkg/common/kafka/producer.go @@ -4,8 +4,11 @@ import ( log "Open_IM/pkg/common/log" "Open_IM/pkg/utils" "errors" + "github.com/Shopify/sarama" "github.com/golang/protobuf/proto" + + promePkg "Open_IM/pkg/common/prometheus" ) type Producer struct { @@ -57,5 +60,8 @@ func (p *Producer) SendMessage(m proto.Message, key string, operationID string) } a, b, c := p.producer.SendMessage(kMsg) log.Info(operationID, "ByteEncoder SendMessage end", "key ", kMsg.Key.Length(), kMsg.Value.Length(), p.producer) + if c == nil { + promePkg.PromeInc(promePkg.SendMsgCounter) + } return a, b, utils.Wrap(c, "") } diff --git a/pkg/common/prometheus/gather.go b/pkg/common/prometheus/gather.go index b5d252e66..ccac941c2 100644 --- a/pkg/common/prometheus/gather.go +++ b/pkg/common/prometheus/gather.go @@ -6,7 +6,7 @@ import ( ) var ( - // user rpc + //auth rpc UserLoginCounter prometheus.Counter UserRegisterCounter prometheus.Counter @@ -14,6 +14,18 @@ var ( SeqGetFailedCounter prometheus.Counter SeqSetSuccessCounter prometheus.Counter SeqSetFailedCounter prometheus.Counter + + // api + ApiRequestCounter prometheus.Counter + ApiRequestSuccessCounter prometheus.Counter + ApiRequestFailedCounter prometheus.Counter + + // grpc + GrpcRequestCounter prometheus.Counter + GrpcRequestSuccessCounter prometheus.Counter + GrpcRequestFailedCounter prometheus.Counter + + SendMsgCounter prometheus.Counter ) func NewUserLoginCounter() { @@ -23,6 +35,13 @@ func NewUserLoginCounter() { }) } +func NewUserRegisterCounter() { + UserRegisterCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "user_register", + Help: "The number of user register", + }) +} + func NewSeqGetSuccessCounter() { SeqGetSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ Name: "seq_get_success", @@ -48,3 +67,52 @@ func NewSeqSetFailedCounter() { Help: "The number of failed set seq", }) } + +func NewApiRequestCounter() { + ApiRequestCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "api_request", + Help: "The number of api request", + }) +} + +func NewApiRequestSuccessCounter() { + ApiRequestSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "api_request_success", + Help: "The number of api request success", + }) +} + +func NewApiRequestFailedCounter() { + ApiRequestFailedCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "api_request_failed", + Help: "The number of api request failed", + }) +} + +func NewGrpcRequestCounter() { + GrpcRequestCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "grpc_request", + Help: "The number of api request", + }) +} + +func NewGrpcRequestSuccessCounter() { + GrpcRequestSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "grpc_request_success", + Help: "The number of grpc request success", + }) +} + +func NewGrpcRequestFailedCounter() { + GrpcRequestFailedCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "grpc_request_failed", + Help: "The number of grpc request failed", + }) +} + +func NewSendMsgCount() { + SendMsgCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "send_msg", + Help: "The number of send msg", + }) +} diff --git a/pkg/common/prometheus/grpc.go b/pkg/common/prometheus/grpc.go new file mode 100644 index 000000000..d0d513b00 --- /dev/null +++ b/pkg/common/prometheus/grpc.go @@ -0,0 +1,35 @@ +package prometheus + +import ( + "context" + "encoding/json" + "time" + + "Open_IM/pkg/common/log" + + "google.golang.org/grpc" + "google.golang.org/grpc/peer" +) + +func UnaryServerInterceptorProme(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + remote, _ := peer.FromContext(ctx) + remoteAddr := remote.Addr.String() + + in, _ := json.Marshal(req) + inStr := string(in) + log.NewInfo("ip", remoteAddr, "access_start", info.FullMethod, "in", inStr) + + start := time.Now() + defer func() { + out, _ := json.Marshal(resp) + outStr := string(out) + duration := int64(time.Since(start) / time.Millisecond) + if duration >= 500 { + log.NewInfo("ip", remoteAddr, "access_end", info.FullMethod, "in", inStr, "out", outStr, "err", err, "duration/ms", duration) + } else { + log.NewInfo("ip", remoteAddr, "access_end", info.FullMethod, "in", inStr, "out", outStr, "err", err, "duration/ms", duration) + } + }() + resp, err = handler(ctx, req) + return +} diff --git a/pkg/common/prometheus/prometheus.go b/pkg/common/prometheus/prometheus.go index a6b65fee0..4951823cc 100644 --- a/pkg/common/prometheus/prometheus.go +++ b/pkg/common/prometheus/prometheus.go @@ -2,6 +2,7 @@ package prometheus import ( "Open_IM/pkg/common/config" + "bytes" "net/http" "strconv" @@ -26,6 +27,28 @@ func PrometheusHandler() gin.HandlerFunc { } } +type responseBodyWriter struct { + gin.ResponseWriter + body *bytes.Buffer +} + +func (r responseBodyWriter) Write(b []byte) (int, error) { + r.body.Write(b) + return r.ResponseWriter.Write(b) +} + +func PromeTheusMiddleware(c *gin.Context) { + PromeInc(ApiRequestCounter) + w := &responseBodyWriter{body: &bytes.Buffer{}, ResponseWriter: c.Writer} + c.Writer = w + c.Next() + if c.Writer.Status() == http.StatusOK { + PromeInc(ApiRequestSuccessCounter) + } else { + PromeInc(ApiRequestFailedCounter) + } +} + func PromeInc(counter prometheus.Counter) { if config.Config.Prometheus.Enable { counter.Inc()