rpc start

This commit is contained in:
withchao 2023-02-13 17:44:22 +08:00
parent f3d43f8578
commit 048d4b83e1
5 changed files with 207 additions and 113 deletions

View File

@ -2,25 +2,25 @@ package main
import ( import (
"Open_IM/internal/rpc/group" "Open_IM/internal/rpc/group"
"Open_IM/internal/startrpc"
"Open_IM/pkg/common/config" "Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
promePkg "Open_IM/pkg/common/prometheus"
"flag"
"fmt"
) )
func main() { func main() {
defaultPorts := config.Config.RpcPort.OpenImGroupPort //defaultPorts := config.Config.RpcPort.OpenImGroupPort
rpcPort := flag.Int("port", defaultPorts[0], "get RpcGroupPort from cmd,default 16000 as port") //rpcPort := flag.Int("port", defaultPorts[0], "get RpcGroupPort from cmd,default 16000 as port")
prometheusPort := flag.Int("prometheus_port", config.Config.Prometheus.GroupPrometheusPort[0], "groupPrometheusPort default listen port") //prometheusPort := flag.Int("prometheus_port", config.Config.Prometheus.GroupPrometheusPort[0], "groupPrometheusPort default listen port")
flag.Parse() //flag.Parse()
fmt.Println("start group rpc server, port: ", *rpcPort, ", OpenIM version: ", constant.CurrentVersion, "\n") //fmt.Println("start group rpc server, port: ", *rpcPort, ", OpenIM version: ", constant.CurrentVersion, "\n")
rpcServer := group.NewGroupServer(*rpcPort) //rpcServer := group.NewGroupServer(*rpcPort)
go func() { //go func() {
err := promePkg.StartPromeSrv(*prometheusPort) // err := promePkg.StartPromeSrv(*prometheusPort)
if err != nil { // if err != nil {
panic(err) // panic(err)
} // }
}() //}()
rpcServer.Run() //rpcServer.Run()
startrpc.StartRpc(config.Config.RpcPort.OpenImGroupPort[0], config.Config.RpcRegisterName.OpenImGroupName, config.Config.Prometheus.GroupPrometheusPort[0], group.Start)
} }

View File

@ -2,18 +2,13 @@ package group
import ( import (
"Open_IM/internal/common/check" "Open_IM/internal/common/check"
"Open_IM/internal/common/network"
chat "Open_IM/internal/rpc/msg" chat "Open_IM/internal/rpc/msg"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant" "Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db/cache" "Open_IM/pkg/common/db/cache"
"Open_IM/pkg/common/db/controller" "Open_IM/pkg/common/db/controller"
"Open_IM/pkg/common/db/relation" "Open_IM/pkg/common/db/relation"
relationTb "Open_IM/pkg/common/db/table/relation" relationTb "Open_IM/pkg/common/db/table/relation"
"Open_IM/pkg/common/db/unrelation" "Open_IM/pkg/common/db/unrelation"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/middleware"
promePkg "Open_IM/pkg/common/prometheus"
"Open_IM/pkg/common/tokenverify" "Open_IM/pkg/common/tokenverify"
"Open_IM/pkg/common/tracelog" "Open_IM/pkg/common/tracelog"
discoveryRegistry "Open_IM/pkg/discoveryregistry" discoveryRegistry "Open_IM/pkg/discoveryregistry"
@ -22,107 +17,128 @@ import (
"Open_IM/pkg/utils" "Open_IM/pkg/utils"
"context" "context"
"fmt" "fmt"
grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "google.golang.org/grpc"
"gorm.io/gorm" "gorm.io/gorm"
"math/big" "math/big"
"math/rand" "math/rand"
"net"
"strconv" "strconv"
"strings" "strings"
"time" "time"
"github.com/OpenIMSDK/openKeeper"
"google.golang.org/grpc"
) )
func Start(server *grpc.Server) {
//err := mysql.InitConn().AutoMigrateModel(&groupModel)
//if err != nil {
// panic("db init err:" + err.Error())
//}
//mongo.InitMongo()
//redis.InitRedis()
pbGroup.RegisterGroupServer(server, &groupServer{
GroupInterface: controller.NewGroupInterface(nil, cache.NewRedis().GetClient(), unrelation.NewMongo().GetClient()),
registerCenter: nil,
user: check.NewUserCheck(),
})
}
type groupServer struct { type groupServer struct {
rpcPort int //rpcPort int
rpcRegisterName string //rpcRegisterName string
schema string //schema string
zkAddr []string //zkAddr []string
GroupInterface controller.GroupInterface GroupInterface controller.GroupInterface
registerCenter discoveryRegistry.SvcDiscoveryRegistry registerCenter discoveryRegistry.SvcDiscoveryRegistry
user *check.UserCheck user *check.UserCheck
} }
func NewGroupServer(port int) *groupServer { //
log.NewPrivateLog(constant.LogFileName) //type groupServer struct {
g := groupServer{ // rpcPort int
rpcPort: port, // rpcRegisterName string
rpcRegisterName: config.Config.RpcRegisterName.OpenImGroupName, // schema string
schema: config.Config.Zookeeper.Schema, // zkAddr []string
zkAddr: config.Config.Zookeeper.ZkAddr, // GroupInterface controller.GroupInterface
} // registerCenter discoveryRegistry.SvcDiscoveryRegistry
//mysql init // user *check.UserCheck
var mysql relation.Mysql //}
var mongo unrelation.Mongo //
var groupModel relationTb.GroupModel //func NewGroupServer(port int) *groupServer {
var redis cache.RedisClient // log.NewPrivateLog(constant.LogFileName)
err := mysql.InitConn().AutoMigrateModel(&groupModel) // g := groupServer{
if err != nil { // rpcPort: port,
panic("db init err:" + err.Error()) // rpcRegisterName: config.Config.RpcRegisterName.OpenImGroupName,
} // schema: config.Config.Zookeeper.Schema,
mongo.InitMongo() // zkAddr: config.Config.Zookeeper.ZkAddr,
redis.InitRedis() // }
mongo.CreateSuperGroupIndex() // //mysql init
zkClient, err := openKeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, 10, "", "") // var mysql relation.Mysql
if err != nil { // var mongo unrelation.Mongo
panic(err.Error()) // var groupModel relationTb.GroupModel
} // var redis cache.RedisClient
registerIP, err := network.GetRpcRegisterIP(config.Config.RpcRegisterIP) // err := mysql.InitConn().AutoMigrateModel(&groupModel)
g.registerCenter = zkClient // if err != nil {
err = g.registerCenter.Register(config.Config.RpcRegisterName.OpenImGroupName, registerIP, port) // panic("db init err:" + err.Error())
if err != nil { // }
panic(err.Error()) // mongo.InitMongo()
} // redis.InitRedis()
// mongo.CreateSuperGroupIndex()
//conns, err := g.registerCenter.GetConns(config.Config.RpcRegisterName.OpenImConversationName) // zkClient, err := openKeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, 10, "", "")
g.GroupInterface = controller.NewGroupInterface(mysql.GormConn(), redis.GetClient(), mongo.GetClient()) // if err != nil {
g.user = check.NewUserCheck() // panic(err.Error())
return &g // }
} // registerIP, err := network.GetRpcRegisterIP(config.Config.RpcRegisterIP)
// g.registerCenter = zkClient
func (s *groupServer) Run() { // err = g.registerCenter.Register(config.Config.RpcRegisterName.OpenImGroupName, registerIP, port)
operationID := utils.OperationIDGenerator() // if err != nil {
log.NewInfo(operationID, "group rpc start ") // panic(err.Error())
address := network.GetListenIP(config.Config.ListenIP) + ":" + strconv.Itoa(s.rpcPort) // }
//listener network //
listener, err := net.Listen("tcp", address) // //conns, err := g.registerCenter.GetConns(config.Config.RpcRegisterName.OpenImConversationName)
if err != nil { // g.GroupInterface = controller.NewGroupInterface(mysql.GormConn(), redis.GetClient(), mongo.GetClient())
panic("listening err:" + err.Error() + s.rpcRegisterName) // g.user = check.NewUserCheck()
} // return &g
log.NewInfo(operationID, "listen network success, ", address, listener) //}
//
defer listener.Close() //func (s *groupServer) Run() {
//grpc server // operationID := utils.OperationIDGenerator()
recvSize := 1024 * 1024 * constant.GroupRPCRecvSize // log.NewInfo(operationID, "group rpc start ")
sendSize := 1024 * 1024 * constant.GroupRPCSendSize // address := network.GetListenIP(config.Config.ListenIP) + ":" + strconv.Itoa(s.rpcPort)
var grpcOpts = []grpc.ServerOption{ // //listener network
grpc.MaxRecvMsgSize(recvSize), // listener, err := net.Listen("tcp", address)
grpc.MaxSendMsgSize(sendSize), // if err != nil {
grpc.UnaryInterceptor(middleware.RpcServerInterceptor), // panic("listening err:" + err.Error() + s.rpcRegisterName)
} // }
if config.Config.Prometheus.Enable { // log.NewInfo(operationID, "listen network success, ", address, listener)
promePkg.NewGrpcRequestCounter() //
promePkg.NewGrpcRequestFailedCounter() // defer listener.Close()
promePkg.NewGrpcRequestSuccessCounter() // //grpc server
grpcOpts = append(grpcOpts, []grpc.ServerOption{ // recvSize := 1024 * 1024 * constant.GroupRPCRecvSize
// grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme), // sendSize := 1024 * 1024 * constant.GroupRPCSendSize
grpc.StreamInterceptor(grpcPrometheus.StreamServerInterceptor), // var grpcOpts = []grpc.ServerOption{
grpc.UnaryInterceptor(grpcPrometheus.UnaryServerInterceptor), // grpc.MaxRecvMsgSize(recvSize),
}...) // grpc.MaxSendMsgSize(sendSize),
} // grpc.UnaryInterceptor(middleware.RpcServerInterceptor),
srv := grpc.NewServer(grpcOpts...) // }
defer srv.GracefulStop() // if config.Config.Prometheus.Enable {
//Service registers with etcd // promePkg.NewGrpcRequestCounter()
pbGroup.RegisterGroupServer(srv, s) // promePkg.NewGrpcRequestFailedCounter()
err = srv.Serve(listener) // promePkg.NewGrpcRequestSuccessCounter()
if err != nil { // grpcOpts = append(grpcOpts, []grpc.ServerOption{
log.NewError(operationID, "Serve failed ", err.Error()) // // grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme),
return // grpc.StreamInterceptor(grpcPrometheus.StreamServerInterceptor),
} // grpc.UnaryInterceptor(grpcPrometheus.UnaryServerInterceptor),
log.NewInfo(operationID, "group rpc success") // }...)
} // }
// srv := grpc.NewServer(grpcOpts...)
// defer srv.GracefulStop()
// //Service registers with etcd
// pbGroup.RegisterGroupServer(srv, s)
// err = srv.Serve(listener)
// if err != nil {
// log.NewError(operationID, "Serve failed ", err.Error())
// return
// }
// log.NewInfo(operationID, "group rpc success")
//}
func (s *groupServer) CheckGroupAdmin(ctx context.Context, groupID string) error { func (s *groupServer) CheckGroupAdmin(ctx context.Context, groupID string) error {
if !tokenverify.IsAppManagerUid(ctx) { if !tokenverify.IsAppManagerUid(ctx) {
@ -145,7 +161,7 @@ func (s *groupServer) GetUsernameMap(ctx context.Context, userIDs []string, comp
if err != nil { if err != nil {
return nil, err return nil, err
} }
return utils.SliceToMapAny(users, func(e *open_im_sdk.PublicUserInfo) (string, string) { return utils.SliceToMapAny(users, func(e *sdkws.PublicUserInfo) (string, string) {
return e.UserID, e.Nickname return e.UserID, e.Nickname
}), nil }), nil
} }

View File

@ -0,0 +1,66 @@
package startrpc
import (
"Open_IM/internal/common/network"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/middleware"
promePkg "Open_IM/pkg/common/prometheus"
"flag"
"fmt"
"github.com/OpenIMSDK/openKeeper"
grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"google.golang.org/grpc"
"net"
)
func StartRpc(rpcPort int, rpcRegisterName string, prometheusPort int, fn func(server *grpc.Server), options ...grpc.ServerOption) {
flagRpcPort := flag.Int("port", rpcPort, "get RpcGroupPort from cmd,default 16000 as port")
flagPrometheusPort := flag.Int("prometheus_port", prometheusPort, "groupPrometheusPort default listen port")
flag.Parse()
rpcPort = *flagRpcPort
prometheusPort = *flagPrometheusPort
fmt.Println("start group rpc server, port: ", rpcPort, ", OpenIM version: ", constant.CurrentVersion)
log.NewPrivateLog(constant.LogFileName)
listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", config.Config.ListenIP, rpcPort))
if err != nil {
panic(err)
}
defer listener.Close()
zkClient, err := openKeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, 10, "", "")
if err != nil {
panic(err.Error())
}
registerIP, err := network.GetRpcRegisterIP(config.Config.RpcRegisterIP)
if err != nil {
panic(err)
}
options = append(options, grpc.UnaryInterceptor(middleware.RpcServerInterceptor)) // ctx 中间件
if config.Config.Prometheus.Enable {
promePkg.NewGrpcRequestCounter()
promePkg.NewGrpcRequestFailedCounter()
promePkg.NewGrpcRequestSuccessCounter()
options = append(options, []grpc.ServerOption{
// grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme),
grpc.StreamInterceptor(grpcPrometheus.StreamServerInterceptor),
grpc.UnaryInterceptor(grpcPrometheus.UnaryServerInterceptor),
}...)
}
srv := grpc.NewServer(options...)
defer srv.GracefulStop()
fn(srv)
err = zkClient.Register(rpcRegisterName, registerIP, rpcPort)
if err != nil {
panic(err.Error())
}
if config.Config.Prometheus.Enable {
err := promePkg.StartPromeSrv(prometheusPort)
if err != nil {
panic(err)
}
}
if err := srv.Serve(listener); err != nil {
panic(err)
}
}

View File

@ -50,6 +50,12 @@ type Cache interface {
// native redis operate // native redis operate
func NewRedis() *RedisClient {
o := &RedisClient{}
o.InitRedis()
return o
}
type RedisClient struct { type RedisClient struct {
rdb redis.UniversalClient rdb redis.UniversalClient
} }

View File

@ -13,6 +13,12 @@ import (
"time" "time"
) )
func NewMongo() *Mongo {
mgo := &Mongo{}
mgo.InitMongo()
return mgo
}
type Mongo struct { type Mongo struct {
db *mongo.Client db *mongo.Client
} }