From f5c4e755eb83928b2c97fd4b640c7046bf4c4809 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Tue, 14 Feb 2023 11:37:19 +0800 Subject: [PATCH] rpc start --- cmd/rpc/group/main.go | 17 +----- internal/rpc/group/group.go | 103 +----------------------------------- internal/startrpc/start.go | 23 ++++---- 3 files changed, 16 insertions(+), 127 deletions(-) diff --git a/cmd/rpc/group/main.go b/cmd/rpc/group/main.go index ad9ba151c..2ccae9e6b 100644 --- a/cmd/rpc/group/main.go +++ b/cmd/rpc/group/main.go @@ -7,20 +7,5 @@ import ( ) func main() { - //defaultPorts := config.Config.RpcPort.OpenImGroupPort - //rpcPort := flag.Int("port", defaultPorts[0], "get RpcGroupPort from cmd,default 16000 as port") - //prometheusPort := flag.Int("prometheus_port", config.Config.Prometheus.GroupPrometheusPort[0], "groupPrometheusPort default listen port") - //flag.Parse() - //fmt.Println("start group rpc server, port: ", *rpcPort, ", OpenIM version: ", constant.CurrentVersion, "\n") - //rpcServer := group.NewGroupServer(*rpcPort) - //go func() { - // err := promePkg.StartPromeSrv(*prometheusPort) - // if err != nil { - // panic(err) - // } - //}() - //rpcServer.Run() - - startrpc.StartRpc(config.Config.RpcPort.OpenImGroupPort[0], config.Config.RpcRegisterName.OpenImGroupName, config.Config.Prometheus.GroupPrometheusPort[0], group.Start) - + startrpc.Start(config.Config.RpcPort.OpenImGroupPort[0], config.Config.RpcRegisterName.OpenImGroupName, config.Config.Prometheus.GroupPrometheusPort[0], group.Start) } diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index bd964b5b0..7ee2e93f1 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -26,121 +26,22 @@ import ( "time" ) -func Start(server *grpc.Server) { - //err := mysql.InitConn().AutoMigrateModel(&groupModel) - //if err != nil { - // panic("db init err:" + err.Error()) - //} - //mongo.InitMongo() - //redis.InitRedis() +func Start(server *grpc.Server) error { pbGroup.RegisterGroupServer(server, &groupServer{ GroupInterface: controller.NewGroupInterface(nil, cache.NewRedis().GetClient(), unrelation.NewMongo().GetClient()), registerCenter: nil, user: check.NewUserCheck(nil), }) + return nil } type groupServer struct { - //rpcPort int - //rpcRegisterName string - //schema string - //zkAddr []string GroupInterface controller.GroupInterface registerCenter discoveryRegistry.SvcDiscoveryRegistry user *check.UserCheck notification *notification.Check } -// -//type groupServer struct { -// rpcPort int -// rpcRegisterName string -// schema string -// zkAddr []string -// GroupInterface controller.GroupInterface -// registerCenter discoveryRegistry.SvcDiscoveryRegistry -// user *check.UserCheck -//} -// -//func NewGroupServer(port int) *groupServer { -// log.NewPrivateLog(constant.LogFileName) -// g := groupServer{ -// rpcPort: port, -// rpcRegisterName: config.Config.RpcRegisterName.OpenImGroupName, -// schema: config.Config.Zookeeper.Schema, -// zkAddr: config.Config.Zookeeper.ZkAddr, -// } -// //mysql init -// var mysql relation.Mysql -// var mongo unrelation.Mongo -// var groupModel relationTb.GroupModel -// var redis cache.RedisClient -// err := mysql.InitConn().AutoMigrateModel(&groupModel) -// if err != nil { -// panic("db init err:" + err.Error()) -// } -// mongo.InitMongo() -// redis.InitRedis() -// mongo.CreateSuperGroupIndex() -// zkClient, err := openKeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, 10, "", "") -// if err != nil { -// panic(err.Error()) -// } -// registerIP, err := network.GetRpcRegisterIP(config.Config.RpcRegisterIP) -// g.registerCenter = zkClient -// err = g.registerCenter.Register(config.Config.RpcRegisterName.OpenImGroupName, registerIP, port) -// if err != nil { -// panic(err.Error()) -// } -// -// //conns, err := g.registerCenter.GetConns(config.Config.RpcRegisterName.OpenImConversationName) -// g.GroupInterface = controller.NewGroupInterface(mysql.GormConn(), redis.GetClient(), mongo.GetClient()) -// g.user = check.NewUserCheck() -// return &g -//} -// -//func (s *groupServer) Run() { -// operationID := utils.OperationIDGenerator() -// log.NewInfo(operationID, "group rpc start ") -// address := network.GetListenIP(config.Config.ListenIP) + ":" + strconv.Itoa(s.rpcPort) -// //listener network -// listener, err := net.Listen("tcp", address) -// if err != nil { -// panic("listening err:" + err.Error() + s.rpcRegisterName) -// } -// log.NewInfo(operationID, "listen network success, ", address, listener) -// -// defer listener.Close() -// //grpc server -// recvSize := 1024 * 1024 * constant.GroupRPCRecvSize -// sendSize := 1024 * 1024 * constant.GroupRPCSendSize -// var grpcOpts = []grpc.ServerOption{ -// grpc.MaxRecvMsgSize(recvSize), -// grpc.MaxSendMsgSize(sendSize), -// grpc.UnaryInterceptor(middleware.RpcServerInterceptor), -// } -// if config.Config.Prometheus.Enable { -// promePkg.NewGrpcRequestCounter() -// promePkg.NewGrpcRequestFailedCounter() -// promePkg.NewGrpcRequestSuccessCounter() -// grpcOpts = append(grpcOpts, []grpc.ServerOption{ -// // grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme), -// grpc.StreamInterceptor(grpcPrometheus.StreamServerInterceptor), -// grpc.UnaryInterceptor(grpcPrometheus.UnaryServerInterceptor), -// }...) -// } -// srv := grpc.NewServer(grpcOpts...) -// defer srv.GracefulStop() -// //Service registers with etcd -// pbGroup.RegisterGroupServer(srv, s) -// err = srv.Serve(listener) -// if err != nil { -// log.NewError(operationID, "Serve failed ", err.Error()) -// return -// } -// log.NewInfo(operationID, "group rpc success") -//} - func (s *groupServer) CheckGroupAdmin(ctx context.Context, groupID string) error { if !tokenverify.IsAppManagerUid(ctx) { groupMember, err := s.GroupInterface.TakeGroupMember(ctx, groupID, tracelog.GetOpUserID(ctx)) diff --git a/internal/startrpc/start.go b/internal/startrpc/start.go index 841ee3cbc..ba4b6a5b6 100644 --- a/internal/startrpc/start.go +++ b/internal/startrpc/start.go @@ -15,7 +15,7 @@ import ( "net" ) -func StartRpc(rpcPort int, rpcRegisterName string, prometheusPort int, fn func(server *grpc.Server), options ...grpc.ServerOption) { +func start(rpcPort int, rpcRegisterName string, prometheusPort int, rpcFn func(server *grpc.Server) error, options []grpc.ServerOption) error { flagRpcPort := flag.Int("port", rpcPort, "get RpcGroupPort from cmd,default 16000 as port") flagPrometheusPort := flag.Int("prometheus_port", prometheusPort, "groupPrometheusPort default listen port") flag.Parse() @@ -25,16 +25,17 @@ func StartRpc(rpcPort int, rpcRegisterName string, prometheusPort int, fn func(s log.NewPrivateLog(constant.LogFileName) listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", config.Config.ListenIP, rpcPort)) if err != nil { - panic(err) + return err } defer listener.Close() zkClient, err := openKeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, 10, "", "") if err != nil { - panic(err.Error()) + return err } + defer zkClient.Close() registerIP, err := network.GetRpcRegisterIP(config.Config.RpcRegisterIP) if err != nil { - panic(err) + return err } options = append(options, grpc.UnaryInterceptor(middleware.RpcServerInterceptor)) // ctx 中间件 if config.Config.Prometheus.Enable { @@ -49,18 +50,20 @@ func StartRpc(rpcPort int, rpcRegisterName string, prometheusPort int, fn func(s } srv := grpc.NewServer(options...) defer srv.GracefulStop() - fn(srv) err = zkClient.Register(rpcRegisterName, registerIP, rpcPort) if err != nil { - panic(err.Error()) + return err } if config.Config.Prometheus.Enable { err := promePkg.StartPromeSrv(prometheusPort) if err != nil { - panic(err) + return err } } - if err := srv.Serve(listener); err != nil { - panic(err) - } + return rpcFn(srv) +} + +func Start(rpcPort int, rpcRegisterName string, prometheusPort int, rpcFn func(server *grpc.Server) error, options ...grpc.ServerOption) { + err := start(rpcPort, rpcRegisterName, prometheusPort, rpcFn, options) + fmt.Println("end", err) }