From cdf09aa1bdee9e0e10841ee6983c7b6b71c78cbf Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Wed, 17 Aug 2022 11:35:33 +0800 Subject: [PATCH] get grpc conn from config --- config/config.yaml | 4 +- pkg/grpc-etcdv3/getcdv3/resolver.go | 113 ++++++++++++++++++++++++++++ 2 files changed, 115 insertions(+), 2 deletions(-) diff --git a/config/config.yaml b/config/config.yaml index 58cc41814..9c14726ae 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -73,7 +73,7 @@ kafka: serverip: 0.0.0.0 #作为rpc时,注册到etcd的地址,单机默认即可,如果是集群部署,需要修改(具体使用内网地址还是外网地址,要依情况而定,目的是api/gateway能访问到) -rpcRegisterIP: 127.0.0.1 +rpcRegisterIP: #默认即可 listenIP: 0.0.0.0 @@ -149,7 +149,7 @@ rpcport: #rpc服务端口 默认即可 openImOfficePort: [ 10210 ] openImOrganizationPort: [ 10220 ] openImConversationPort: [ 10230 ] - openImCachePort: [10240] + openImCachePort: [ 10240 ] c2c: callbackBeforeSendMsg: switch: false diff --git a/pkg/grpc-etcdv3/getcdv3/resolver.go b/pkg/grpc-etcdv3/getcdv3/resolver.go index b72df5fc9..92a3b5a64 100644 --- a/pkg/grpc-etcdv3/getcdv3/resolver.go +++ b/pkg/grpc-etcdv3/getcdv3/resolver.go @@ -1,6 +1,7 @@ package getcdv3 import ( + "Open_IM/pkg/common/config" "Open_IM/pkg/common/log" "Open_IM/pkg/utils" "context" @@ -96,6 +97,118 @@ func GetConn(schema, etcdaddr, serviceName string, operationID string) *grpc.Cli return r.grpcClientConn } +func GetConfigConn(serviceName string, operationID string) *grpc.ClientConn { + rpcRegisterIP := config.Config.RpcRegisterIP + var err error + if config.Config.RpcRegisterIP == "" { + rpcRegisterIP, err = utils.GetLocalIP() + if err != nil { + log.Error("", "GetLocalIP failed ", err.Error()) + return nil + } + } + + var configPortList []int + //1 + if config.Config.RpcRegisterName.OpenImUserName == serviceName { + configPortList = config.Config.RpcPort.OpenImAuthPort + } + //2 + if config.Config.RpcRegisterName.OpenImFriendName == serviceName { + configPortList = config.Config.RpcPort.OpenImAuthPort + } + //3 + if config.Config.RpcRegisterName.OpenImMsgName == serviceName { + configPortList = config.Config.RpcPort.OpenImAuthPort + } + //4 + if config.Config.RpcRegisterName.OpenImPushName == serviceName { + configPortList = config.Config.RpcPort.OpenImAuthPort + } + //5 + if config.Config.RpcRegisterName.OpenImRelayName == serviceName { + configPortList = config.Config.RpcPort.OpenImAuthPort + } + //6 + if config.Config.RpcRegisterName.OpenImGroupName == serviceName { + configPortList = config.Config.RpcPort.OpenImAuthPort + } + //7 + if config.Config.RpcRegisterName.OpenImAuthName == serviceName { + configPortList = config.Config.RpcPort.OpenImAuthPort + } + //8 + if config.Config.RpcRegisterName.OpenImMessageCMSName == serviceName { + configPortList = config.Config.RpcPort.OpenImAuthPort + } + //9 + if config.Config.RpcRegisterName.OpenImAdminCMSName == serviceName { + configPortList = config.Config.RpcPort.OpenImAuthPort + } + //10 + if config.Config.RpcRegisterName.OpenImOfficeName == serviceName { + configPortList = config.Config.RpcPort.OpenImAuthPort + } + //11 + if config.Config.RpcRegisterName.OpenImOrganizationName == serviceName { + configPortList = config.Config.RpcPort.OpenImAuthPort + } + //12 + if config.Config.RpcRegisterName.OpenImConversationName == serviceName { + configPortList = config.Config.RpcPort.OpenImAuthPort + } + //13 + if config.Config.RpcRegisterName.OpenImCacheName == serviceName { + configPortList = config.Config.RpcPort.OpenImAuthPort + } + //14 + if config.Config.RpcRegisterName.OpenImRealTimeCommName == serviceName { + configPortList = config.Config.RpcPort.OpenImAuthPort + } + if len(configPortList) == 0 { + log.Error("", "len(configPortList) == 0 ") + return nil + } + target := rpcRegisterIP + ":" + utils.Int32ToString(int32(configPortList[0])) + log.Info("", "rpcRegisterIP ", rpcRegisterIP, "port ", configPortList, "grpc target: ", target, "serviceName: ", serviceName) + conn, err := grpc.Dial(target, grpc.WithInsecure()) + if err != nil { + log.Error("", "grpc.Dail failed ", err.Error()) + return nil + } + return conn +} + +func GetDefaultConn(schema, etcdaddr, serviceName string, operationID string) *grpc.ClientConn { + rwNameResolverMutex.RLock() + r, ok := nameResolver[schema+serviceName] + rwNameResolverMutex.RUnlock() + if ok { + log.Debug(operationID, "etcd key ", schema+serviceName, "value ", *r.grpcClientConn, *r) + return r.grpcClientConn + } + + rwNameResolverMutex.Lock() + r, ok = nameResolver[schema+serviceName] + if ok { + rwNameResolverMutex.Unlock() + log.Debug(operationID, "etcd key ", schema+serviceName, "value ", *r.grpcClientConn, *r) + return r.grpcClientConn + } + + r, err := NewResolver(schema, etcdaddr, serviceName, operationID) + if err != nil { + log.Error(operationID, "etcd failed ", schema, etcdaddr, serviceName, err.Error()) + rwNameResolverMutex.Unlock() + return nil + } + + log.Debug(operationID, "etcd key ", schema+serviceName, "value ", *r.grpcClientConn, *r) + nameResolver[schema+serviceName] = r + rwNameResolverMutex.Unlock() + return r.grpcClientConn +} + func (r *Resolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { if r.cli == nil { return nil, fmt.Errorf("etcd clientv3 client failed, etcd:%s", target)