mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-08-07 11:40:01 +08:00
grpc register etcd
This commit is contained in:
parent
f893391200
commit
63da57fb90
254
src/grpc-etcdv3/getcdv3/pool.go
Normal file
254
src/grpc-etcdv3/getcdv3/pool.go
Normal file
@ -0,0 +1,254 @@
|
|||||||
|
// Package grpcpool provides a pool of grpc clients
|
||||||
|
package getcdv3
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
// ErrClosed is the error when the client pool is closed
|
||||||
|
ErrClosed = errors.New("grpc pool: client pool is closed")
|
||||||
|
// ErrTimeout is the error when the client pool timed out
|
||||||
|
ErrTimeout = errors.New("grpc pool: client pool timed out")
|
||||||
|
// ErrAlreadyClosed is the error when the client conn was already closed
|
||||||
|
ErrAlreadyClosed = errors.New("grpc pool: the connection was already closed")
|
||||||
|
// ErrFullPool is the error when the pool is already full
|
||||||
|
ErrFullPool = errors.New("grpc pool: closing a ClientConn into a full pool")
|
||||||
|
)
|
||||||
|
|
||||||
|
// Factory is a function type creating a grpc client
|
||||||
|
type Factory func(schema, etcdaddr, servicename string) (*grpc.ClientConn, error)
|
||||||
|
|
||||||
|
// FactoryWithContext is a function type creating a grpc client
|
||||||
|
// that accepts the context parameter that could be passed from
|
||||||
|
// Get or NewWithContext method.
|
||||||
|
type FactoryWithContext func(context.Context) (*grpc.ClientConn, error)
|
||||||
|
|
||||||
|
// Pool is the grpc client pool
|
||||||
|
type Pool struct {
|
||||||
|
clients chan ClientConn
|
||||||
|
factory FactoryWithContext
|
||||||
|
idleTimeout time.Duration
|
||||||
|
maxLifeDuration time.Duration
|
||||||
|
mu sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// ClientConn is the wrapper for a grpc client conn
|
||||||
|
type ClientConn struct {
|
||||||
|
*grpc.ClientConn
|
||||||
|
pool *Pool
|
||||||
|
timeUsed time.Time
|
||||||
|
timeInitiated time.Time
|
||||||
|
unhealthy bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// New creates a new clients pool with the given initial and maximum capacity,
|
||||||
|
// and the timeout for the idle clients. Returns an error if the initial
|
||||||
|
// clients could not be created
|
||||||
|
func New(factory Factory, schema, etcdaddr, servicename string, init, capacity int, idleTimeout time.Duration,
|
||||||
|
maxLifeDuration ...time.Duration) (*Pool, error) {
|
||||||
|
return NewWithContext(context.Background(), func(ctx context.Context) (*grpc.ClientConn, error) { return factory(schema, etcdaddr, servicename) },
|
||||||
|
init, capacity, idleTimeout, maxLifeDuration...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewWithContext creates a new clients pool with the given initial and maximum
|
||||||
|
// capacity, and the timeout for the idle clients. The context parameter would
|
||||||
|
// be passed to the factory method during initialization. Returns an error if the
|
||||||
|
// initial clients could not be created.
|
||||||
|
func NewWithContext(ctx context.Context, factory FactoryWithContext, init, capacity int, idleTimeout time.Duration,
|
||||||
|
maxLifeDuration ...time.Duration) (*Pool, error) {
|
||||||
|
|
||||||
|
if capacity <= 0 {
|
||||||
|
capacity = 1
|
||||||
|
}
|
||||||
|
if init < 0 {
|
||||||
|
init = 0
|
||||||
|
}
|
||||||
|
if init > capacity {
|
||||||
|
init = capacity
|
||||||
|
}
|
||||||
|
p := &Pool{
|
||||||
|
clients: make(chan ClientConn, capacity),
|
||||||
|
factory: factory,
|
||||||
|
idleTimeout: idleTimeout,
|
||||||
|
}
|
||||||
|
if len(maxLifeDuration) > 0 {
|
||||||
|
p.maxLifeDuration = maxLifeDuration[0]
|
||||||
|
}
|
||||||
|
for i := 0; i < init; i++ {
|
||||||
|
c, err := factory(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
p.clients <- ClientConn{
|
||||||
|
ClientConn: c,
|
||||||
|
pool: p,
|
||||||
|
timeUsed: time.Now(),
|
||||||
|
timeInitiated: time.Now(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Fill the rest of the pool with empty clients
|
||||||
|
for i := 0; i < capacity-init; i++ {
|
||||||
|
p.clients <- ClientConn{
|
||||||
|
pool: p,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return p, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Pool) getClients() chan ClientConn {
|
||||||
|
p.mu.RLock()
|
||||||
|
defer p.mu.RUnlock()
|
||||||
|
|
||||||
|
return p.clients
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close empties the pool calling Close on all its clients.
|
||||||
|
// You can call Close while there are outstanding clients.
|
||||||
|
// The pool channel is then closed, and Get will not be allowed anymore
|
||||||
|
func (p *Pool) Close() {
|
||||||
|
p.mu.Lock()
|
||||||
|
clients := p.clients
|
||||||
|
p.clients = nil
|
||||||
|
p.mu.Unlock()
|
||||||
|
|
||||||
|
if clients == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
close(clients)
|
||||||
|
for client := range clients {
|
||||||
|
if client.ClientConn == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
client.ClientConn.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsClosed returns true if the client pool is closed.
|
||||||
|
func (p *Pool) IsClosed() bool {
|
||||||
|
return p == nil || p.getClients() == nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get will return the next available client. If capacity
|
||||||
|
// has not been reached, it will create a new one using the factory. Otherwise,
|
||||||
|
// it will wait till the next client becomes available or a timeout.
|
||||||
|
// A timeout of 0 is an indefinite wait
|
||||||
|
func (p *Pool) Get(ctx context.Context) (*ClientConn, error) {
|
||||||
|
clients := p.getClients()
|
||||||
|
if clients == nil {
|
||||||
|
return nil, ErrClosed
|
||||||
|
}
|
||||||
|
|
||||||
|
wrapper := ClientConn{
|
||||||
|
pool: p,
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case wrapper = <-clients:
|
||||||
|
// All good
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil, ErrTimeout // it would better returns ctx.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the wrapper was idle too long, close the connection and create a new
|
||||||
|
// one. It's safe to assume that there isn't any newer client as the client
|
||||||
|
// we fetched is the first in the channel
|
||||||
|
idleTimeout := p.idleTimeout
|
||||||
|
if wrapper.ClientConn != nil && idleTimeout > 0 &&
|
||||||
|
wrapper.timeUsed.Add(idleTimeout).Before(time.Now()) {
|
||||||
|
|
||||||
|
wrapper.ClientConn.Close()
|
||||||
|
wrapper.ClientConn = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var err error
|
||||||
|
if wrapper.ClientConn == nil {
|
||||||
|
wrapper.ClientConn, err = p.factory(ctx)
|
||||||
|
if err != nil {
|
||||||
|
// If there was an error, we want to put back a placeholder
|
||||||
|
// client in the channel
|
||||||
|
clients <- ClientConn{
|
||||||
|
pool: p,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// This is a new connection, reset its initiated time
|
||||||
|
wrapper.timeInitiated = time.Now()
|
||||||
|
}
|
||||||
|
|
||||||
|
return &wrapper, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unhealthy marks the client conn as unhealthy, so that the connection
|
||||||
|
// gets reset when closed
|
||||||
|
func (c *ClientConn) Unhealthy() {
|
||||||
|
c.unhealthy = true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close returns a ClientConn to the pool. It is safe to call multiple time,
|
||||||
|
// but will return an error after first time
|
||||||
|
func (c *ClientConn) Close() error {
|
||||||
|
if c == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if c.ClientConn == nil {
|
||||||
|
return ErrAlreadyClosed
|
||||||
|
}
|
||||||
|
if c.pool.IsClosed() {
|
||||||
|
return ErrClosed
|
||||||
|
}
|
||||||
|
// If the wrapper connection has become too old, we want to recycle it. To
|
||||||
|
// clarify the logic: if the sum of the initialization time and the max
|
||||||
|
// duration is before Now(), it means the initialization is so old adding
|
||||||
|
// the maximum duration couldn't put in the future. This sum therefore
|
||||||
|
// corresponds to the cut-off point: if it's in the future we still have
|
||||||
|
// time, if it's in the past it's too old
|
||||||
|
maxDuration := c.pool.maxLifeDuration
|
||||||
|
if maxDuration > 0 && c.timeInitiated.Add(maxDuration).Before(time.Now()) {
|
||||||
|
c.Unhealthy()
|
||||||
|
}
|
||||||
|
|
||||||
|
// We're cloning the wrapper so we can set ClientConn to nil in the one
|
||||||
|
// used by the user
|
||||||
|
wrapper := ClientConn{
|
||||||
|
pool: c.pool,
|
||||||
|
ClientConn: c.ClientConn,
|
||||||
|
timeUsed: time.Now(),
|
||||||
|
}
|
||||||
|
if c.unhealthy {
|
||||||
|
wrapper.ClientConn.Close()
|
||||||
|
wrapper.ClientConn = nil
|
||||||
|
} else {
|
||||||
|
wrapper.timeInitiated = c.timeInitiated
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case c.pool.clients <- wrapper:
|
||||||
|
// All good
|
||||||
|
default:
|
||||||
|
return ErrFullPool
|
||||||
|
}
|
||||||
|
|
||||||
|
c.ClientConn = nil // Mark as closed
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Capacity returns the capacity
|
||||||
|
func (p *Pool) Capacity() int {
|
||||||
|
if p.IsClosed() {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
return cap(p.clients)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Available returns the number of currently unused clients
|
||||||
|
func (p *Pool) Available() int {
|
||||||
|
if p.IsClosed() {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
return len(p.clients)
|
||||||
|
}
|
95
src/grpc-etcdv3/getcdv3/register.go
Normal file
95
src/grpc-etcdv3/getcdv3/register.go
Normal file
@ -0,0 +1,95 @@
|
|||||||
|
package getcdv3
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"go.etcd.io/etcd/clientv3"
|
||||||
|
"net"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
type RegEtcd struct {
|
||||||
|
cli *clientv3.Client
|
||||||
|
ctx context.Context
|
||||||
|
cancel context.CancelFunc
|
||||||
|
key string
|
||||||
|
}
|
||||||
|
|
||||||
|
var rEtcd *RegEtcd
|
||||||
|
|
||||||
|
// "%s:///%s/"
|
||||||
|
func GetPrefix(schema, serviceName string) string {
|
||||||
|
return fmt.Sprintf("%s:///%s/", schema, serviceName)
|
||||||
|
}
|
||||||
|
|
||||||
|
// "%s:///%s"
|
||||||
|
func GetPrefix4Unique(schema, serviceName string) string {
|
||||||
|
return fmt.Sprintf("%s:///%s", schema, serviceName)
|
||||||
|
}
|
||||||
|
|
||||||
|
// "%s:///%s/" -> "%s:///%s:ip:port"
|
||||||
|
func RegisterEtcd4Unique(schema, etcdAddr, myHost string, myPort int, serviceName string, ttl int) error {
|
||||||
|
serviceName = serviceName + ":" + net.JoinHostPort(myHost, strconv.Itoa(myPort))
|
||||||
|
return RegisterEtcd(schema, etcdAddr, myHost, myPort, serviceName, ttl)
|
||||||
|
}
|
||||||
|
|
||||||
|
//etcdAddr separated by commas
|
||||||
|
func RegisterEtcd(schema, etcdAddr, myHost string, myPort int, serviceName string, ttl int) error {
|
||||||
|
cli, err := clientv3.New(clientv3.Config{
|
||||||
|
Endpoints: strings.Split(etcdAddr, ","),
|
||||||
|
})
|
||||||
|
fmt.Println("RegisterEtcd")
|
||||||
|
if err != nil {
|
||||||
|
// return fmt.Errorf("grpclb: create clientv3 client failed: %v", err)
|
||||||
|
return fmt.Errorf("create etcd clientv3 client failed, errmsg:%v, etcd addr:%s", err, etcdAddr)
|
||||||
|
}
|
||||||
|
|
||||||
|
//lease
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
resp, err := cli.Grant(ctx, int64(ttl))
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("grant failed")
|
||||||
|
}
|
||||||
|
|
||||||
|
// schema:///serviceName/ip:port ->ip:port
|
||||||
|
serviceValue := net.JoinHostPort(myHost, strconv.Itoa(myPort))
|
||||||
|
serviceKey := GetPrefix(schema, serviceName) + serviceValue
|
||||||
|
|
||||||
|
//set key->value
|
||||||
|
if _, err := cli.Put(ctx, serviceKey, serviceValue, clientv3.WithLease(resp.ID)); err != nil {
|
||||||
|
return fmt.Errorf("put failed, errmsg:%v, key:%s, value:%s", err, serviceKey, serviceValue)
|
||||||
|
}
|
||||||
|
|
||||||
|
//keepalive
|
||||||
|
kresp, err := cli.KeepAlive(ctx, resp.ID)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("keepalive faild, errmsg:%v, lease id:%d", err, resp.ID)
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
FLOOP:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case _, ok := <-kresp:
|
||||||
|
if ok == true {
|
||||||
|
} else {
|
||||||
|
break FLOOP
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
rEtcd = &RegEtcd{ctx: ctx,
|
||||||
|
cli: cli,
|
||||||
|
cancel: cancel,
|
||||||
|
key: serviceKey}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func UnRegisterEtcd() {
|
||||||
|
//delete
|
||||||
|
rEtcd.cancel()
|
||||||
|
rEtcd.cli.Delete(rEtcd.ctx, rEtcd.key)
|
||||||
|
}
|
291
src/grpc-etcdv3/getcdv3/resolver.go
Normal file
291
src/grpc-etcdv3/getcdv3/resolver.go
Normal file
@ -0,0 +1,291 @@
|
|||||||
|
package getcdv3
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"go.etcd.io/etcd/clientv3"
|
||||||
|
"go.etcd.io/etcd/mvcc/mvccpb"
|
||||||
|
//"google.golang.org/genproto/googleapis/ads/googleads/v1/services"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
"google.golang.org/grpc/balancer/roundrobin"
|
||||||
|
"google.golang.org/grpc/resolver"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Resolver struct {
|
||||||
|
cc resolver.ClientConn
|
||||||
|
serviceName string
|
||||||
|
grpcClientConn *grpc.ClientConn
|
||||||
|
cli *clientv3.Client
|
||||||
|
schema string
|
||||||
|
etcdAddr string
|
||||||
|
watchStartRevision int64
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
nameResolver = make(map[string]*Resolver)
|
||||||
|
rwNameResolverMutex sync.RWMutex
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewResolver(schema, etcdAddr, serviceName string) (*Resolver, error) {
|
||||||
|
etcdCli, err := clientv3.New(clientv3.Config{
|
||||||
|
Endpoints: strings.Split(etcdAddr, ","),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var r Resolver
|
||||||
|
r.serviceName = serviceName
|
||||||
|
r.cli = etcdCli
|
||||||
|
r.schema = schema
|
||||||
|
r.etcdAddr = etcdAddr
|
||||||
|
resolver.Register(&r)
|
||||||
|
|
||||||
|
conn, err := grpc.Dial(
|
||||||
|
GetPrefix(schema, serviceName),
|
||||||
|
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, roundrobin.Name)),
|
||||||
|
grpc.WithInsecure(),
|
||||||
|
grpc.WithTimeout(time.Duration(5)*time.Second),
|
||||||
|
)
|
||||||
|
if err == nil {
|
||||||
|
r.grpcClientConn = conn
|
||||||
|
}
|
||||||
|
return &r, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r1 *Resolver) ResolveNow(rn resolver.ResolveNowOptions) {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r1 *Resolver) Close() {
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetConn(schema, etcdaddr, serviceName string) *grpc.ClientConn {
|
||||||
|
rwNameResolverMutex.RLock()
|
||||||
|
r, ok := nameResolver[schema+serviceName]
|
||||||
|
rwNameResolverMutex.RUnlock()
|
||||||
|
if ok {
|
||||||
|
return r.grpcClientConn
|
||||||
|
}
|
||||||
|
|
||||||
|
rwNameResolverMutex.Lock()
|
||||||
|
r, ok = nameResolver[schema+serviceName]
|
||||||
|
if ok {
|
||||||
|
rwNameResolverMutex.Unlock()
|
||||||
|
return r.grpcClientConn
|
||||||
|
}
|
||||||
|
|
||||||
|
r, err := NewResolver(schema, etcdaddr, serviceName)
|
||||||
|
if err != nil {
|
||||||
|
rwNameResolverMutex.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
nameResolver[schema+serviceName] = r
|
||||||
|
rwNameResolverMutex.Unlock()
|
||||||
|
return r.grpcClientConn
|
||||||
|
|
||||||
|
var ipAdr string
|
||||||
|
switch serviceName {
|
||||||
|
case "User":
|
||||||
|
ipAdr = "1.14.194.38:10100"
|
||||||
|
case "Friend":
|
||||||
|
ipAdr = "1.14.194.38:10200"
|
||||||
|
case "OfflineMessage":
|
||||||
|
ipAdr = "1.14.194.38:10300"
|
||||||
|
case "Push":
|
||||||
|
ipAdr = "1.14.194.38:10700"
|
||||||
|
case "OnlineMessageRelay":
|
||||||
|
ipAdr = "1.14.194.38:10400"
|
||||||
|
case "Group":
|
||||||
|
ipAdr = "1.14.194.38:10500"
|
||||||
|
case "Auth":
|
||||||
|
ipAdr = "1.14.194.38:10600"
|
||||||
|
}
|
||||||
|
conn, err := grpc.Dial(ipAdr,
|
||||||
|
grpc.WithChainStreamInterceptor(),
|
||||||
|
grpc.WithInsecure(),
|
||||||
|
grpc.WithBlock(),
|
||||||
|
grpc.WithDisableRetry(),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return conn
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Resolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
|
||||||
|
if r.cli == nil {
|
||||||
|
return nil, fmt.Errorf("etcd clientv3 client failed, etcd:%s", target)
|
||||||
|
}
|
||||||
|
r.cc = cc
|
||||||
|
|
||||||
|
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
// "%s:///%s"
|
||||||
|
prefix := GetPrefix(r.schema, r.serviceName)
|
||||||
|
// get key first
|
||||||
|
resp, err := r.cli.Get(ctx, prefix, clientv3.WithPrefix())
|
||||||
|
if err == nil {
|
||||||
|
var addrList []resolver.Address
|
||||||
|
for i := range resp.Kvs {
|
||||||
|
fmt.Println("init addr: ", string(resp.Kvs[i].Value))
|
||||||
|
addrList = append(addrList, resolver.Address{Addr: string(resp.Kvs[i].Value)})
|
||||||
|
}
|
||||||
|
r.cc.UpdateState(resolver.State{Addresses: addrList})
|
||||||
|
r.watchStartRevision = resp.Header.Revision + 1
|
||||||
|
go r.watch(prefix, addrList)
|
||||||
|
} else {
|
||||||
|
return nil, fmt.Errorf("etcd get failed, prefix: %s", prefix)
|
||||||
|
}
|
||||||
|
|
||||||
|
return r, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Resolver) Scheme() string {
|
||||||
|
return r.schema
|
||||||
|
}
|
||||||
|
|
||||||
|
func exists(addrList []resolver.Address, addr string) bool {
|
||||||
|
for _, v := range addrList {
|
||||||
|
if v.Addr == addr {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func remove(s []resolver.Address, addr string) ([]resolver.Address, bool) {
|
||||||
|
for i := range s {
|
||||||
|
if s[i].Addr == addr {
|
||||||
|
s[i] = s[len(s)-1]
|
||||||
|
return s[:len(s)-1], true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Resolver) watch(prefix string, addrList []resolver.Address) {
|
||||||
|
rch := r.cli.Watch(context.Background(), prefix, clientv3.WithPrefix(), clientv3.WithPrefix())
|
||||||
|
for n := range rch {
|
||||||
|
flag := 0
|
||||||
|
for _, ev := range n.Events {
|
||||||
|
switch ev.Type {
|
||||||
|
case mvccpb.PUT:
|
||||||
|
if !exists(addrList, string(ev.Kv.Value)) {
|
||||||
|
flag = 1
|
||||||
|
addrList = append(addrList, resolver.Address{Addr: string(ev.Kv.Value)})
|
||||||
|
fmt.Println("after add, new list: ", addrList)
|
||||||
|
}
|
||||||
|
case mvccpb.DELETE:
|
||||||
|
fmt.Println("remove addr key: ", string(ev.Kv.Key), "value:", string(ev.Kv.Value))
|
||||||
|
i := strings.LastIndexAny(string(ev.Kv.Key), "/")
|
||||||
|
if i < 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
t := string(ev.Kv.Key)[i+1:]
|
||||||
|
fmt.Println("remove addr key: ", string(ev.Kv.Key), "value:", string(ev.Kv.Value), "addr:", t)
|
||||||
|
if s, ok := remove(addrList, t); ok {
|
||||||
|
flag = 1
|
||||||
|
addrList = s
|
||||||
|
fmt.Println("after remove, new list: ", addrList)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if flag == 1 {
|
||||||
|
r.cc.UpdateState(resolver.State{Addresses: addrList})
|
||||||
|
fmt.Println("update: ", addrList)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetConn4Unique(schema, etcdaddr, servicename string) []*grpc.ClientConn {
|
||||||
|
gEtcdCli, err := clientv3.New(clientv3.Config{Endpoints: strings.Split(etcdaddr, ",")})
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("eeeeeeeeeeeee", err.Error())
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
// "%s:///%s"
|
||||||
|
prefix := GetPrefix4Unique(schema, servicename)
|
||||||
|
|
||||||
|
resp, err := gEtcdCli.Get(ctx, prefix, clientv3.WithPrefix())
|
||||||
|
// "%s:///%s:ip:port" -> %s:ip:port
|
||||||
|
allService := make([]string, 0)
|
||||||
|
if err == nil {
|
||||||
|
for i := range resp.Kvs {
|
||||||
|
k := string(resp.Kvs[i].Key)
|
||||||
|
|
||||||
|
b := strings.LastIndex(k, "///")
|
||||||
|
k1 := k[b+len("///"):]
|
||||||
|
|
||||||
|
e := strings.Index(k1, "/")
|
||||||
|
k2 := k1[:e]
|
||||||
|
allService = append(allService, k2)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
gEtcdCli.Close()
|
||||||
|
fmt.Println("rrrrrrrrrrr", err.Error())
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
gEtcdCli.Close()
|
||||||
|
|
||||||
|
allConn := make([]*grpc.ClientConn, 0)
|
||||||
|
for _, v := range allService {
|
||||||
|
r := GetConn(schema, etcdaddr, v)
|
||||||
|
allConn = append(allConn, r)
|
||||||
|
}
|
||||||
|
|
||||||
|
return allConn
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
service2pool = make(map[string]*Pool)
|
||||||
|
service2poolMu sync.Mutex
|
||||||
|
)
|
||||||
|
|
||||||
|
func GetconnFactory(schema, etcdaddr, servicename string) (*grpc.ClientConn, error) {
|
||||||
|
c := GetConn(schema, etcdaddr, servicename)
|
||||||
|
if c != nil {
|
||||||
|
return c, nil
|
||||||
|
} else {
|
||||||
|
return c, fmt.Errorf("GetConn failed")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetConnPool(schema, etcdaddr, servicename string) (*ClientConn, error) {
|
||||||
|
//get pool
|
||||||
|
p := NewPool(schema, etcdaddr, servicename)
|
||||||
|
//poo->get
|
||||||
|
|
||||||
|
ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(1000*time.Millisecond))
|
||||||
|
|
||||||
|
c, err := p.Get(ctx)
|
||||||
|
fmt.Println(err)
|
||||||
|
return c, err
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewPool(schema, etcdaddr, servicename string) *Pool {
|
||||||
|
|
||||||
|
if _, ok := service2pool[schema+servicename]; !ok {
|
||||||
|
//
|
||||||
|
service2poolMu.Lock()
|
||||||
|
if _, ok1 := service2pool[schema+servicename]; !ok1 {
|
||||||
|
p, err := New(GetconnFactory, schema, etcdaddr, servicename, 5, 10, 1)
|
||||||
|
if err == nil {
|
||||||
|
service2pool[schema+servicename] = p
|
||||||
|
}
|
||||||
|
}
|
||||||
|
service2poolMu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
return service2pool[schema+servicename]
|
||||||
|
}
|
||||||
|
func GetGrpcConn(schema, etcdaddr, servicename string) *grpc.ClientConn {
|
||||||
|
return nameResolver[schema+servicename].grpcClientConn
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user