diff --git a/go.mod b/go.mod index 6bb3ab4c0..6a51cfc08 100644 --- a/go.mod +++ b/go.mod @@ -27,6 +27,7 @@ require ( github.com/golang-jwt/jwt/v4 v4.1.0 github.com/golang/protobuf v1.5.2 github.com/gorilla/websocket v1.4.2 + github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/jinzhu/copier v0.3.4 github.com/jinzhu/gorm v1.9.16 github.com/jonboulle/clockwork v0.2.2 // indirect diff --git a/go.sum b/go.sum index 945c71ab3..e1f5a8e7f 100644 --- a/go.sum +++ b/go.sum @@ -344,6 +344,7 @@ github.com/gorilla/sessions v1.2.1 h1:DHd3rPN5lE3Ts3D8rKkQ8x/0kqfeNmBAaiSi+o7Fsg github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/hashicorp/go-uuid v1.0.2 h1:cfejS+Tpcp13yd5nYHWDI6qVCny6wyX2Mt5SGur2IGE= diff --git a/internal/msg_gateway/gate/relay_rpc_server.go b/internal/msg_gateway/gate/relay_rpc_server.go index 35623d17d..f0ba76a3a 100644 --- a/internal/msg_gateway/gate/relay_rpc_server.go +++ b/internal/msg_gateway/gate/relay_rpc_server.go @@ -18,6 +18,7 @@ import ( "strings" "github.com/golang/protobuf/proto" + grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/gorilla/websocket" "google.golang.org/grpc" @@ -59,7 +60,11 @@ func (r *RPCServer) run() { promePkg.NewGrpcRequestCounter() promePkg.NewGrpcRequestFailedCounter() promePkg.NewGrpcRequestSuccessCounter() - grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme)) + grpcOpts = append(grpcOpts, []grpc.ServerOption{ + grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme), + grpc.StreamInterceptor(grpcPrometheus.StreamServerInterceptor), + grpc.UnaryInterceptor(grpcPrometheus.UnaryServerInterceptor), + }...) } srv := grpc.NewServer(grpcOpts...) defer srv.GracefulStop() diff --git a/internal/push/logic/push_rpc_server.go b/internal/push/logic/push_rpc_server.go index 44269d504..99962559d 100644 --- a/internal/push/logic/push_rpc_server.go +++ b/internal/push/logic/push_rpc_server.go @@ -10,9 +10,11 @@ import ( "Open_IM/pkg/utils" "context" "net" + "strconv" "strings" + grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "google.golang.org/grpc" ) @@ -48,7 +50,11 @@ func (r *RPCServer) run() { promePkg.NewGrpcRequestCounter() promePkg.NewGrpcRequestFailedCounter() promePkg.NewGrpcRequestSuccessCounter() - grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme)) + grpcOpts = append(grpcOpts, []grpc.ServerOption{ + grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme), + grpc.StreamInterceptor(grpcPrometheus.StreamServerInterceptor), + grpc.UnaryInterceptor(grpcPrometheus.UnaryServerInterceptor), + }...) } srv := grpc.NewServer(grpcOpts...) defer srv.GracefulStop() diff --git a/internal/rpc/admin_cms/admin_cms.go b/internal/rpc/admin_cms/admin_cms.go index bde6d421b..aa82c0e74 100644 --- a/internal/rpc/admin_cms/admin_cms.go +++ b/internal/rpc/admin_cms/admin_cms.go @@ -11,6 +11,9 @@ import ( "Open_IM/pkg/grpc-etcdv3/getcdv3" pbAdminCMS "Open_IM/pkg/proto/admin_cms" server_api_params "Open_IM/pkg/proto/sdk_ws" + + grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "Open_IM/pkg/utils" "context" "errors" @@ -63,7 +66,11 @@ func (s *adminCMSServer) Run() { promePkg.NewGrpcRequestCounter() promePkg.NewGrpcRequestFailedCounter() promePkg.NewGrpcRequestSuccessCounter() - grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme)) + grpcOpts = append(grpcOpts, []grpc.ServerOption{ + grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme), + grpc.StreamInterceptor(grpcPrometheus.StreamServerInterceptor), + grpc.UnaryInterceptor(grpcPrometheus.UnaryServerInterceptor), + }...) } srv := grpc.NewServer(grpcOpts...) defer srv.GracefulStop() diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index 498edf609..16ac9c117 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -17,6 +17,8 @@ import ( "strconv" "strings" + grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "Open_IM/pkg/common/config" "google.golang.org/grpc" @@ -128,7 +130,11 @@ func (rpc *rpcAuth) Run() { promePkg.NewGrpcRequestSuccessCounter() promePkg.NewUserRegisterCounter() promePkg.NewUserLoginCounter() - grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme)) + grpcOpts = append(grpcOpts, []grpc.ServerOption{ + grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme), + grpc.StreamInterceptor(grpcPrometheus.StreamServerInterceptor), + grpc.UnaryInterceptor(grpcPrometheus.UnaryServerInterceptor), + }...) } srv := grpc.NewServer(grpcOpts...) defer srv.GracefulStop() diff --git a/internal/rpc/cache/cache.go b/internal/rpc/cache/cache.go index 117f3384c..9f0ae8022 100644 --- a/internal/rpc/cache/cache.go +++ b/internal/rpc/cache/cache.go @@ -5,6 +5,7 @@ import ( "Open_IM/pkg/common/constant" rocksCache "Open_IM/pkg/common/db/rocks_cache" "Open_IM/pkg/common/log" + promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/grpc-etcdv3/getcdv3" pbCache "Open_IM/pkg/proto/cache" "Open_IM/pkg/utils" @@ -13,6 +14,7 @@ import ( "strconv" "strings" + grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "google.golang.org/grpc" ) @@ -50,8 +52,18 @@ func (s *cacheServer) Run() { log.NewInfo("0", "listen network success, ", address, listener) defer listener.Close() //grpc server - - srv := grpc.NewServer() + var grpcOpts []grpc.ServerOption + if config.Config.Prometheus.Enable { + promePkg.NewGrpcRequestCounter() + promePkg.NewGrpcRequestFailedCounter() + promePkg.NewGrpcRequestSuccessCounter() + grpcOpts = append(grpcOpts, []grpc.ServerOption{ + grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme), + grpc.StreamInterceptor(grpcPrometheus.StreamServerInterceptor), + grpc.UnaryInterceptor(grpcPrometheus.UnaryServerInterceptor), + }...) + } + srv := grpc.NewServer(grpcOpts...) defer srv.GracefulStop() pbCache.RegisterCacheServer(srv, s) diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index ee04b1974..b9fbc9452 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -16,6 +16,8 @@ import ( "strconv" "strings" + grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "Open_IM/pkg/common/config" "google.golang.org/grpc" @@ -190,7 +192,11 @@ func (rpc *rpcConversation) Run() { promePkg.NewGrpcRequestCounter() promePkg.NewGrpcRequestFailedCounter() promePkg.NewGrpcRequestSuccessCounter() - grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme)) + grpcOpts = append(grpcOpts, []grpc.ServerOption{ + grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme), + grpc.StreamInterceptor(grpcPrometheus.StreamServerInterceptor), + grpc.UnaryInterceptor(grpcPrometheus.UnaryServerInterceptor), + }...) } srv := grpc.NewServer(grpcOpts...) defer srv.GracefulStop() diff --git a/internal/rpc/friend/friend.go b/internal/rpc/friend/friend.go index 696217cb8..379f29eda 100644 --- a/internal/rpc/friend/friend.go +++ b/internal/rpc/friend/friend.go @@ -22,6 +22,8 @@ import ( "strings" "time" + grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "google.golang.org/grpc" ) @@ -66,7 +68,11 @@ func (s *friendServer) Run() { promePkg.NewGrpcRequestCounter() promePkg.NewGrpcRequestFailedCounter() promePkg.NewGrpcRequestSuccessCounter() - grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme)) + grpcOpts = append(grpcOpts, []grpc.ServerOption{ + grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme), + grpc.StreamInterceptor(grpcPrometheus.StreamServerInterceptor), + grpc.UnaryInterceptor(grpcPrometheus.UnaryServerInterceptor), + }...) } srv := grpc.NewServer(grpcOpts...) defer srv.GracefulStop() diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 2193cd89c..70006b5a2 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -26,6 +26,8 @@ import ( "strings" "time" + grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "google.golang.org/grpc" "gorm.io/gorm" ) @@ -75,7 +77,11 @@ func (s *groupServer) Run() { promePkg.NewGrpcRequestCounter() promePkg.NewGrpcRequestFailedCounter() promePkg.NewGrpcRequestSuccessCounter() - grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme)) + grpcOpts = append(grpcOpts, []grpc.ServerOption{ + grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme), + grpc.StreamInterceptor(grpcPrometheus.StreamServerInterceptor), + grpc.UnaryInterceptor(grpcPrometheus.UnaryServerInterceptor), + }...) } srv := grpc.NewServer(options...) defer srv.GracefulStop() diff --git a/internal/rpc/msg/rpcChat.go b/internal/rpc/msg/rpcChat.go index 22bd3840e..cf50b3f9a 100644 --- a/internal/rpc/msg/rpcChat.go +++ b/internal/rpc/msg/rpcChat.go @@ -14,6 +14,8 @@ import ( "strconv" "strings" + grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "google.golang.org/grpc" ) @@ -72,7 +74,11 @@ func (rpc *rpcChat) Run() { promePkg.NewGrpcRequestCounter() promePkg.NewGrpcRequestFailedCounter() promePkg.NewGrpcRequestSuccessCounter() - grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme)) + grpcOpts = append(grpcOpts, []grpc.ServerOption{ + grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme), + grpc.StreamInterceptor(grpcPrometheus.StreamServerInterceptor), + grpc.UnaryInterceptor(grpcPrometheus.UnaryServerInterceptor), + }...) } srv := grpc.NewServer(grpcOpts...) defer srv.GracefulStop() diff --git a/internal/rpc/office/office.go b/internal/rpc/office/office.go index 4d12267e5..f0692f3cc 100644 --- a/internal/rpc/office/office.go +++ b/internal/rpc/office/office.go @@ -22,6 +22,8 @@ import ( "time" "unsafe" + grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "google.golang.org/grpc" ) @@ -72,7 +74,11 @@ func (s *officeServer) Run() { promePkg.NewGrpcRequestCounter() promePkg.NewGrpcRequestFailedCounter() promePkg.NewGrpcRequestSuccessCounter() - grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme)) + grpcOpts = append(grpcOpts, []grpc.ServerOption{ + grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme), + grpc.StreamInterceptor(grpcPrometheus.StreamServerInterceptor), + grpc.UnaryInterceptor(grpcPrometheus.UnaryServerInterceptor), + }...) } srv := grpc.NewServer(grpcOpts...) defer srv.GracefulStop() diff --git a/internal/rpc/organization/organization.go b/internal/rpc/organization/organization.go index 327e40003..ea78ad85b 100644 --- a/internal/rpc/organization/organization.go +++ b/internal/rpc/organization/organization.go @@ -22,6 +22,8 @@ import ( "strings" "time" + grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "google.golang.org/grpc" ) @@ -64,7 +66,11 @@ func (s *organizationServer) Run() { promePkg.NewGrpcRequestCounter() promePkg.NewGrpcRequestFailedCounter() promePkg.NewGrpcRequestSuccessCounter() - grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme)) + grpcOpts = append(grpcOpts, []grpc.ServerOption{ + grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme), + grpc.StreamInterceptor(grpcPrometheus.StreamServerInterceptor), + grpc.UnaryInterceptor(grpcPrometheus.UnaryServerInterceptor), + }...) } srv := grpc.NewServer(grpcOpts...) defer srv.GracefulStop() diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index 080a50b7f..0ae46e5e4 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -22,6 +22,8 @@ import ( "strconv" "strings" + grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "google.golang.org/grpc" "gorm.io/gorm" ) @@ -67,7 +69,11 @@ func (s *userServer) Run() { promePkg.NewGrpcRequestCounter() promePkg.NewGrpcRequestFailedCounter() promePkg.NewGrpcRequestSuccessCounter() - grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme)) + grpcOpts = append(grpcOpts, []grpc.ServerOption{ + grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme), + grpc.StreamInterceptor(grpcPrometheus.StreamServerInterceptor), + grpc.UnaryInterceptor(grpcPrometheus.UnaryServerInterceptor), + }...) } srv := grpc.NewServer(grpcOpts...) defer srv.GracefulStop()