diff --git a/internal/common/rpc_server/a.go b/internal/common/rpc_server/a.go new file mode 100644 index 000000000..19852a1f5 --- /dev/null +++ b/internal/common/rpc_server/a.go @@ -0,0 +1,46 @@ +package rpc_server + +import ( + "Open_IM/internal/common/network" + "Open_IM/pkg/common/constant" + "Open_IM/pkg/common/log" + discoveryRegistry "Open_IM/pkg/discovery_registry" + "github.com/OpenIMSDK/openKeeper" + "net" + "strconv" +) + +type RpcServer struct { + Port int + RegisterName string + RegisterCenter discoveryRegistry.SvcDiscoveryRegistry +} + +func NewRpcServer(registerIPInConfig string, port int, registerName string, zkServers []string, zkRoot string) (*RpcServer, error) { + log.NewPrivateLog(constant.LogFileName) + s := &RpcServer{ + Port: port, + RegisterName: registerName, + } + + zkClient, err := openKeeper.NewClient(zkServers, zkRoot, 10, "", "") + if err != nil { + return nil, err + } + registerIP, err := network.GetRpcRegisterIP(registerIPInConfig) + if err != nil { + return nil, err + } + err = zkClient.Register(s.RegisterName, registerIP, s.Port) + if err != nil { + return nil, err + } + s.RegisterCenter = zkClient + return s, nil +} + +func GetTcpListen(listenIPInConfig string, port int) (net.Listener, string, error) { + address := network.GetListenIP(listenIPInConfig) + ":" + strconv.Itoa(port) + listener, err := net.Listen("tcp", address) + return listener, address, err +} diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index 34712c609..b196915c5 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -2,30 +2,63 @@ package auth import ( "Open_IM/internal/common/check" - "Open_IM/internal/common/network" + "Open_IM/internal/common/rpc_server" + "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db/controller" "Open_IM/pkg/common/log" promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/common/token_verify" "Open_IM/pkg/common/tracelog" - discoveryRegistry "Open_IM/pkg/discovery_registry" pbAuth "Open_IM/pkg/proto/auth" pbRelay "Open_IM/pkg/proto/relay" "Open_IM/pkg/utils" "context" - "github.com/OpenIMSDK/openKeeper" - "net" - "strconv" - "strings" - grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - - "Open_IM/pkg/common/config" - "google.golang.org/grpc" ) +func NewRpcAuthServer(port int) *rpcAuth { + r, err := rpc_server.NewRpcServer(config.Config.RpcRegisterIP, port, config.Config.RpcRegisterName.OpenImAuthName, config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema) + if err != nil { + panic(err) + } + return &rpcAuth{ + RpcServer: r, + } +} + +func (s *rpcAuth) Run() { + operationID := utils.OperationIDGenerator() + log.NewInfo(operationID, "rpc auth start...") + listener, address, err := rpc_server.GetTcpListen(config.Config.ListenIP, s.Port) + if err != nil { + panic(err) + } + log.NewInfo(operationID, "listen network success ", listener, address) + var grpcOpts []grpc.ServerOption + if config.Config.Prometheus.Enable { + promePkg.NewGrpcRequestCounter() + promePkg.NewGrpcRequestFailedCounter() + promePkg.NewGrpcRequestSuccessCounter() + promePkg.NewUserRegisterCounter() + promePkg.NewUserLoginCounter() + grpcOpts = append(grpcOpts, []grpc.ServerOption{ + // grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme), + grpc.StreamInterceptor(grpcPrometheus.StreamServerInterceptor), + grpc.UnaryInterceptor(grpcPrometheus.UnaryServerInterceptor), + }...) + } + srv := grpc.NewServer(grpcOpts...) + defer srv.GracefulStop() + pbAuth.RegisterAuthServer(srv, s) + err = srv.Serve(listener) + if err != nil { + panic(err) + } + log.NewInfo(operationID, "rpc auth ok") +} + func (s *rpcAuth) UserToken(ctx context.Context, req *pbAuth.UserTokenReq) (*pbAuth.UserTokenResp, error) { resp := pbAuth.UserTokenResp{} if _, err := check.GetUsersInfo(ctx, req.UserID); err != nil { @@ -87,7 +120,7 @@ func (s *rpcAuth) ForceLogout(ctx context.Context, req *pbAuth.ForceLogoutReq) ( } func (s *rpcAuth) forceKickOff(ctx context.Context, userID string, platformID int32, operationID string) error { - grpcCons, err := s.registerCenter.GetConns(config.Config.RpcRegisterName.OpenImRelayName) + grpcCons, err := s.RegisterCenter.GetConns(config.Config.RpcRegisterName.OpenImRelayName) if err != nil { return err } @@ -102,68 +135,6 @@ func (s *rpcAuth) forceKickOff(ctx context.Context, userID string, platformID in } type rpcAuth struct { - rpcPort int - rpcRegisterName string - etcdSchema string - etcdAddr []string + *rpc_server.RpcServer controller.AuthInterface - registerCenter discoveryRegistry.SvcDiscoveryRegistry -} - -func NewRpcAuthServer(port int) *rpcAuth { - log.NewPrivateLog(constant.LogFileName) - s := &rpcAuth{ - rpcPort: port, - rpcRegisterName: config.Config.RpcRegisterName.OpenImAuthName, - } - - zkClient, err := openKeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, 10, "", "") - if err != nil { - panic(err.Error()) - } - registerIP, err := network.GetRpcRegisterIP(config.Config.RpcRegisterIP) - err = zkClient.Register(s.rpcRegisterName, registerIP, s.rpcPort) - if err != nil { - panic(err.Error()) - } - s.registerCenter = zkClient - return s - -} - -func (s *rpcAuth) Run() { - operationID := utils.OperationIDGenerator() - log.NewInfo(operationID, "rpc auth start...") - address := network.GetListenIP(config.Config.ListenIP) + ":" + strconv.Itoa(s.rpcPort) - listener, err := net.Listen("tcp", address) - if err != nil { - panic("listening err:" + err.Error() + s.rpcRegisterName) - } - log.NewInfo(operationID, "listen network success, ", address, listener) - var grpcOpts []grpc.ServerOption - if config.Config.Prometheus.Enable { - promePkg.NewGrpcRequestCounter() - promePkg.NewGrpcRequestFailedCounter() - promePkg.NewGrpcRequestSuccessCounter() - promePkg.NewUserRegisterCounter() - promePkg.NewUserLoginCounter() - grpcOpts = append(grpcOpts, []grpc.ServerOption{ - // grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme), - grpc.StreamInterceptor(grpcPrometheus.StreamServerInterceptor), - grpc.UnaryInterceptor(grpcPrometheus.UnaryServerInterceptor), - }...) - } - srv := grpc.NewServer(grpcOpts...) - defer srv.GracefulStop() - - //service registers with etcd - pbAuth.RegisterAuthServer(srv, s) - - log.NewInfo(operationID, "RegisterAuthServer ok ", s.etcdSchema, strings.Join(s.etcdAddr, ","), registerIP, s.rpcPort, s.rpcRegisterName) - err = srv.Serve(listener) - if err != nil { - log.NewError(operationID, "Serve failed ", err.Error()) - return - } - log.NewInfo(operationID, "rpc auth ok") } diff --git a/internal/rpc/friend/friend.go b/internal/rpc/friend/friend.go index 3a4860237..64a1a830f 100644 --- a/internal/rpc/friend/friend.go +++ b/internal/rpc/friend/friend.go @@ -4,6 +4,7 @@ import ( "Open_IM/internal/common/check" "Open_IM/internal/common/convert" "Open_IM/internal/common/network" + "Open_IM/internal/common/rpc_server" chat "Open_IM/internal/rpc/msg" "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" @@ -27,34 +28,17 @@ import ( ) type friendServer struct { - rpcPort int - rpcRegisterName string - etcdSchema string - etcdAddr []string + *rpc_server.RpcServer + controller.FriendInterface controller.BlackInterface - - registerCenter discoveryRegistry.SvcDiscoveryRegistry } func NewFriendServer(port int) *friendServer { - log.NewPrivateLog(constant.LogFileName) - f := friendServer{ - rpcPort: port, - rpcRegisterName: config.Config.RpcRegisterName.OpenImFriendName, - } - - zkClient, err := openKeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, 10, "", "") + r, err := rpc_server.NewRpcServer(config.Config.RpcRegisterIP, port, config.Config.RpcRegisterName.OpenImAuthName, config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema) if err != nil { - panic(err.Error()) + panic(err) } - registerIP, err := network.GetRpcRegisterIP(config.Config.RpcRegisterIP) - err = zkClient.Register(f.rpcRegisterName, registerIP, f.rpcPort) - if err != nil { - panic(err.Error()) - } - f.registerCenter = zkClient - //mysql init var mysql relation.Mysql var model relation.FriendGorm @@ -66,7 +50,6 @@ func NewFriendServer(port int) *friendServer { if err != nil { panic("db init err:" + err.Error()) } - err = mysql.InitConn().AutoMigrateModel(&relationTb.BlackModel{}) if err != nil { panic("db init err:" + err.Error()) @@ -76,21 +59,22 @@ func NewFriendServer(port int) *friendServer { } else { panic("db init err:" + "conn is nil") } - f.FriendInterface = controller.NewFriendController(model.DB) - f.BlackInterface = controller.NewBlackController(model.DB) - return &f + return &friendServer{ + RpcServer: r, + FriendInterface : controller.NewFriendController(model.DB), + BlackInterface : controller.NewBlackController(model.DB) + } } func (s *friendServer) Run() { operationID := utils.OperationIDGenerator() log.NewInfo(operationID, "friendServer run...") - address := network.GetListenIP(config.Config.ListenIP) + ":" + strconv.Itoa(s.rpcPort) - - //listener network - listener, err := net.Listen("tcp", address) + listener, address, err := rpc_server.GetTcpListen(config.Config.ListenIP, s.Port) if err != nil { - panic("listening err:" + err.Error() + s.rpcRegisterName) + panic(err) } + + log.NewInfo(operationID, "listen ok ", address) defer listener.Close() //grpc server diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index c5e48dfd7..802dc5048 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -58,7 +58,7 @@ func NewUserServer(port int) *userServer { //mysql init var mysql relation.Mysql var model relation.UserGorm - err := mysql.InitConn().AutoMigrateModel(&model) + err = mysql.InitConn().AutoMigrateModel(&model) if err != nil { panic("db init err:" + err.Error()) }