From 63da57fb901613062f7d738b8bbab0211575851f Mon Sep 17 00:00:00 2001 From: away <1819625428@qq.com> Date: Thu, 15 Jul 2021 11:08:41 +0800 Subject: [PATCH] grpc register etcd --- src/grpc-etcdv3/getcdv3/pool.go | 254 ++++++++++++++++++++++++ src/grpc-etcdv3/getcdv3/register.go | 95 +++++++++ src/grpc-etcdv3/getcdv3/resolver.go | 291 ++++++++++++++++++++++++++++ 3 files changed, 640 insertions(+) create mode 100644 src/grpc-etcdv3/getcdv3/pool.go create mode 100644 src/grpc-etcdv3/getcdv3/register.go create mode 100644 src/grpc-etcdv3/getcdv3/resolver.go diff --git a/src/grpc-etcdv3/getcdv3/pool.go b/src/grpc-etcdv3/getcdv3/pool.go new file mode 100644 index 000000000..943f4bd3f --- /dev/null +++ b/src/grpc-etcdv3/getcdv3/pool.go @@ -0,0 +1,254 @@ +// Package grpcpool provides a pool of grpc clients +package getcdv3 + +import ( + "context" + "errors" + "sync" + "time" + + "google.golang.org/grpc" +) + +var ( + // ErrClosed is the error when the client pool is closed + ErrClosed = errors.New("grpc pool: client pool is closed") + // ErrTimeout is the error when the client pool timed out + ErrTimeout = errors.New("grpc pool: client pool timed out") + // ErrAlreadyClosed is the error when the client conn was already closed + ErrAlreadyClosed = errors.New("grpc pool: the connection was already closed") + // ErrFullPool is the error when the pool is already full + ErrFullPool = errors.New("grpc pool: closing a ClientConn into a full pool") +) + +// Factory is a function type creating a grpc client +type Factory func(schema, etcdaddr, servicename string) (*grpc.ClientConn, error) + +// FactoryWithContext is a function type creating a grpc client +// that accepts the context parameter that could be passed from +// Get or NewWithContext method. +type FactoryWithContext func(context.Context) (*grpc.ClientConn, error) + +// Pool is the grpc client pool +type Pool struct { + clients chan ClientConn + factory FactoryWithContext + idleTimeout time.Duration + maxLifeDuration time.Duration + mu sync.RWMutex +} + +// ClientConn is the wrapper for a grpc client conn +type ClientConn struct { + *grpc.ClientConn + pool *Pool + timeUsed time.Time + timeInitiated time.Time + unhealthy bool +} + +// New creates a new clients pool with the given initial and maximum capacity, +// and the timeout for the idle clients. Returns an error if the initial +// clients could not be created +func New(factory Factory, schema, etcdaddr, servicename string, init, capacity int, idleTimeout time.Duration, + maxLifeDuration ...time.Duration) (*Pool, error) { + return NewWithContext(context.Background(), func(ctx context.Context) (*grpc.ClientConn, error) { return factory(schema, etcdaddr, servicename) }, + init, capacity, idleTimeout, maxLifeDuration...) +} + +// NewWithContext creates a new clients pool with the given initial and maximum +// capacity, and the timeout for the idle clients. The context parameter would +// be passed to the factory method during initialization. Returns an error if the +// initial clients could not be created. +func NewWithContext(ctx context.Context, factory FactoryWithContext, init, capacity int, idleTimeout time.Duration, + maxLifeDuration ...time.Duration) (*Pool, error) { + + if capacity <= 0 { + capacity = 1 + } + if init < 0 { + init = 0 + } + if init > capacity { + init = capacity + } + p := &Pool{ + clients: make(chan ClientConn, capacity), + factory: factory, + idleTimeout: idleTimeout, + } + if len(maxLifeDuration) > 0 { + p.maxLifeDuration = maxLifeDuration[0] + } + for i := 0; i < init; i++ { + c, err := factory(ctx) + if err != nil { + return nil, err + } + + p.clients <- ClientConn{ + ClientConn: c, + pool: p, + timeUsed: time.Now(), + timeInitiated: time.Now(), + } + } + // Fill the rest of the pool with empty clients + for i := 0; i < capacity-init; i++ { + p.clients <- ClientConn{ + pool: p, + } + } + return p, nil +} + +func (p *Pool) getClients() chan ClientConn { + p.mu.RLock() + defer p.mu.RUnlock() + + return p.clients +} + +// Close empties the pool calling Close on all its clients. +// You can call Close while there are outstanding clients. +// The pool channel is then closed, and Get will not be allowed anymore +func (p *Pool) Close() { + p.mu.Lock() + clients := p.clients + p.clients = nil + p.mu.Unlock() + + if clients == nil { + return + } + + close(clients) + for client := range clients { + if client.ClientConn == nil { + continue + } + client.ClientConn.Close() + } +} + +// IsClosed returns true if the client pool is closed. +func (p *Pool) IsClosed() bool { + return p == nil || p.getClients() == nil +} + +// Get will return the next available client. If capacity +// has not been reached, it will create a new one using the factory. Otherwise, +// it will wait till the next client becomes available or a timeout. +// A timeout of 0 is an indefinite wait +func (p *Pool) Get(ctx context.Context) (*ClientConn, error) { + clients := p.getClients() + if clients == nil { + return nil, ErrClosed + } + + wrapper := ClientConn{ + pool: p, + } + select { + case wrapper = <-clients: + // All good + case <-ctx.Done(): + return nil, ErrTimeout // it would better returns ctx.Err() + } + + // If the wrapper was idle too long, close the connection and create a new + // one. It's safe to assume that there isn't any newer client as the client + // we fetched is the first in the channel + idleTimeout := p.idleTimeout + if wrapper.ClientConn != nil && idleTimeout > 0 && + wrapper.timeUsed.Add(idleTimeout).Before(time.Now()) { + + wrapper.ClientConn.Close() + wrapper.ClientConn = nil + } + + var err error + if wrapper.ClientConn == nil { + wrapper.ClientConn, err = p.factory(ctx) + if err != nil { + // If there was an error, we want to put back a placeholder + // client in the channel + clients <- ClientConn{ + pool: p, + } + } + // This is a new connection, reset its initiated time + wrapper.timeInitiated = time.Now() + } + + return &wrapper, err +} + +// Unhealthy marks the client conn as unhealthy, so that the connection +// gets reset when closed +func (c *ClientConn) Unhealthy() { + c.unhealthy = true +} + +// Close returns a ClientConn to the pool. It is safe to call multiple time, +// but will return an error after first time +func (c *ClientConn) Close() error { + if c == nil { + return nil + } + if c.ClientConn == nil { + return ErrAlreadyClosed + } + if c.pool.IsClosed() { + return ErrClosed + } + // If the wrapper connection has become too old, we want to recycle it. To + // clarify the logic: if the sum of the initialization time and the max + // duration is before Now(), it means the initialization is so old adding + // the maximum duration couldn't put in the future. This sum therefore + // corresponds to the cut-off point: if it's in the future we still have + // time, if it's in the past it's too old + maxDuration := c.pool.maxLifeDuration + if maxDuration > 0 && c.timeInitiated.Add(maxDuration).Before(time.Now()) { + c.Unhealthy() + } + + // We're cloning the wrapper so we can set ClientConn to nil in the one + // used by the user + wrapper := ClientConn{ + pool: c.pool, + ClientConn: c.ClientConn, + timeUsed: time.Now(), + } + if c.unhealthy { + wrapper.ClientConn.Close() + wrapper.ClientConn = nil + } else { + wrapper.timeInitiated = c.timeInitiated + } + select { + case c.pool.clients <- wrapper: + // All good + default: + return ErrFullPool + } + + c.ClientConn = nil // Mark as closed + return nil +} + +// Capacity returns the capacity +func (p *Pool) Capacity() int { + if p.IsClosed() { + return 0 + } + return cap(p.clients) +} + +// Available returns the number of currently unused clients +func (p *Pool) Available() int { + if p.IsClosed() { + return 0 + } + return len(p.clients) +} diff --git a/src/grpc-etcdv3/getcdv3/register.go b/src/grpc-etcdv3/getcdv3/register.go new file mode 100644 index 000000000..e1a78dd61 --- /dev/null +++ b/src/grpc-etcdv3/getcdv3/register.go @@ -0,0 +1,95 @@ +package getcdv3 + +import ( + "context" + "fmt" + "go.etcd.io/etcd/clientv3" + "net" + "strconv" + "strings" +) + +type RegEtcd struct { + cli *clientv3.Client + ctx context.Context + cancel context.CancelFunc + key string +} + +var rEtcd *RegEtcd + +// "%s:///%s/" +func GetPrefix(schema, serviceName string) string { + return fmt.Sprintf("%s:///%s/", schema, serviceName) +} + +// "%s:///%s" +func GetPrefix4Unique(schema, serviceName string) string { + return fmt.Sprintf("%s:///%s", schema, serviceName) +} + +// "%s:///%s/" -> "%s:///%s:ip:port" +func RegisterEtcd4Unique(schema, etcdAddr, myHost string, myPort int, serviceName string, ttl int) error { + serviceName = serviceName + ":" + net.JoinHostPort(myHost, strconv.Itoa(myPort)) + return RegisterEtcd(schema, etcdAddr, myHost, myPort, serviceName, ttl) +} + +//etcdAddr separated by commas +func RegisterEtcd(schema, etcdAddr, myHost string, myPort int, serviceName string, ttl int) error { + cli, err := clientv3.New(clientv3.Config{ + Endpoints: strings.Split(etcdAddr, ","), + }) + fmt.Println("RegisterEtcd") + if err != nil { + // return fmt.Errorf("grpclb: create clientv3 client failed: %v", err) + return fmt.Errorf("create etcd clientv3 client failed, errmsg:%v, etcd addr:%s", err, etcdAddr) + } + + //lease + ctx, cancel := context.WithCancel(context.Background()) + resp, err := cli.Grant(ctx, int64(ttl)) + if err != nil { + return fmt.Errorf("grant failed") + } + + // schema:///serviceName/ip:port ->ip:port + serviceValue := net.JoinHostPort(myHost, strconv.Itoa(myPort)) + serviceKey := GetPrefix(schema, serviceName) + serviceValue + + //set key->value + if _, err := cli.Put(ctx, serviceKey, serviceValue, clientv3.WithLease(resp.ID)); err != nil { + return fmt.Errorf("put failed, errmsg:%v, key:%s, value:%s", err, serviceKey, serviceValue) + } + + //keepalive + kresp, err := cli.KeepAlive(ctx, resp.ID) + if err != nil { + return fmt.Errorf("keepalive faild, errmsg:%v, lease id:%d", err, resp.ID) + } + + go func() { + FLOOP: + for { + select { + case _, ok := <-kresp: + if ok == true { + } else { + break FLOOP + } + } + } + }() + + rEtcd = &RegEtcd{ctx: ctx, + cli: cli, + cancel: cancel, + key: serviceKey} + + return nil +} + +func UnRegisterEtcd() { + //delete + rEtcd.cancel() + rEtcd.cli.Delete(rEtcd.ctx, rEtcd.key) +} diff --git a/src/grpc-etcdv3/getcdv3/resolver.go b/src/grpc-etcdv3/getcdv3/resolver.go new file mode 100644 index 000000000..39b1ad052 --- /dev/null +++ b/src/grpc-etcdv3/getcdv3/resolver.go @@ -0,0 +1,291 @@ +package getcdv3 + +import ( + "context" + "fmt" + "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/mvcc/mvccpb" + //"google.golang.org/genproto/googleapis/ads/googleads/v1/services" + "google.golang.org/grpc" + "google.golang.org/grpc/balancer/roundrobin" + "google.golang.org/grpc/resolver" + "strings" + "sync" + "time" +) + +type Resolver struct { + cc resolver.ClientConn + serviceName string + grpcClientConn *grpc.ClientConn + cli *clientv3.Client + schema string + etcdAddr string + watchStartRevision int64 +} + +var ( + nameResolver = make(map[string]*Resolver) + rwNameResolverMutex sync.RWMutex +) + +func NewResolver(schema, etcdAddr, serviceName string) (*Resolver, error) { + etcdCli, err := clientv3.New(clientv3.Config{ + Endpoints: strings.Split(etcdAddr, ","), + }) + if err != nil { + return nil, err + } + + var r Resolver + r.serviceName = serviceName + r.cli = etcdCli + r.schema = schema + r.etcdAddr = etcdAddr + resolver.Register(&r) + + conn, err := grpc.Dial( + GetPrefix(schema, serviceName), + grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, roundrobin.Name)), + grpc.WithInsecure(), + grpc.WithTimeout(time.Duration(5)*time.Second), + ) + if err == nil { + r.grpcClientConn = conn + } + return &r, err +} + +func (r1 *Resolver) ResolveNow(rn resolver.ResolveNowOptions) { +} + +func (r1 *Resolver) Close() { +} + +func GetConn(schema, etcdaddr, serviceName string) *grpc.ClientConn { + rwNameResolverMutex.RLock() + r, ok := nameResolver[schema+serviceName] + rwNameResolverMutex.RUnlock() + if ok { + return r.grpcClientConn + } + + rwNameResolverMutex.Lock() + r, ok = nameResolver[schema+serviceName] + if ok { + rwNameResolverMutex.Unlock() + return r.grpcClientConn + } + + r, err := NewResolver(schema, etcdaddr, serviceName) + if err != nil { + rwNameResolverMutex.Unlock() + return nil + } + + nameResolver[schema+serviceName] = r + rwNameResolverMutex.Unlock() + return r.grpcClientConn + + var ipAdr string + switch serviceName { + case "User": + ipAdr = "1.14.194.38:10100" + case "Friend": + ipAdr = "1.14.194.38:10200" + case "OfflineMessage": + ipAdr = "1.14.194.38:10300" + case "Push": + ipAdr = "1.14.194.38:10700" + case "OnlineMessageRelay": + ipAdr = "1.14.194.38:10400" + case "Group": + ipAdr = "1.14.194.38:10500" + case "Auth": + ipAdr = "1.14.194.38:10600" + } + conn, err := grpc.Dial(ipAdr, + grpc.WithChainStreamInterceptor(), + grpc.WithInsecure(), + grpc.WithBlock(), + grpc.WithDisableRetry(), + ) + if err != nil { + return nil + } + + return conn +} + +func (r *Resolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { + if r.cli == nil { + return nil, fmt.Errorf("etcd clientv3 client failed, etcd:%s", target) + } + r.cc = cc + + ctx, _ := context.WithTimeout(context.Background(), 5*time.Second) + // "%s:///%s" + prefix := GetPrefix(r.schema, r.serviceName) + // get key first + resp, err := r.cli.Get(ctx, prefix, clientv3.WithPrefix()) + if err == nil { + var addrList []resolver.Address + for i := range resp.Kvs { + fmt.Println("init addr: ", string(resp.Kvs[i].Value)) + addrList = append(addrList, resolver.Address{Addr: string(resp.Kvs[i].Value)}) + } + r.cc.UpdateState(resolver.State{Addresses: addrList}) + r.watchStartRevision = resp.Header.Revision + 1 + go r.watch(prefix, addrList) + } else { + return nil, fmt.Errorf("etcd get failed, prefix: %s", prefix) + } + + return r, nil +} + +func (r *Resolver) Scheme() string { + return r.schema +} + +func exists(addrList []resolver.Address, addr string) bool { + for _, v := range addrList { + if v.Addr == addr { + return true + } + } + return false +} + +func remove(s []resolver.Address, addr string) ([]resolver.Address, bool) { + for i := range s { + if s[i].Addr == addr { + s[i] = s[len(s)-1] + return s[:len(s)-1], true + } + } + return nil, false +} + +func (r *Resolver) watch(prefix string, addrList []resolver.Address) { + rch := r.cli.Watch(context.Background(), prefix, clientv3.WithPrefix(), clientv3.WithPrefix()) + for n := range rch { + flag := 0 + for _, ev := range n.Events { + switch ev.Type { + case mvccpb.PUT: + if !exists(addrList, string(ev.Kv.Value)) { + flag = 1 + addrList = append(addrList, resolver.Address{Addr: string(ev.Kv.Value)}) + fmt.Println("after add, new list: ", addrList) + } + case mvccpb.DELETE: + fmt.Println("remove addr key: ", string(ev.Kv.Key), "value:", string(ev.Kv.Value)) + i := strings.LastIndexAny(string(ev.Kv.Key), "/") + if i < 0 { + return + } + t := string(ev.Kv.Key)[i+1:] + fmt.Println("remove addr key: ", string(ev.Kv.Key), "value:", string(ev.Kv.Value), "addr:", t) + if s, ok := remove(addrList, t); ok { + flag = 1 + addrList = s + fmt.Println("after remove, new list: ", addrList) + } + } + } + + if flag == 1 { + r.cc.UpdateState(resolver.State{Addresses: addrList}) + fmt.Println("update: ", addrList) + } + } +} + +func GetConn4Unique(schema, etcdaddr, servicename string) []*grpc.ClientConn { + gEtcdCli, err := clientv3.New(clientv3.Config{Endpoints: strings.Split(etcdaddr, ",")}) + if err != nil { + fmt.Println("eeeeeeeeeeeee", err.Error()) + return nil + } + + ctx, _ := context.WithTimeout(context.Background(), 5*time.Second) + // "%s:///%s" + prefix := GetPrefix4Unique(schema, servicename) + + resp, err := gEtcdCli.Get(ctx, prefix, clientv3.WithPrefix()) + // "%s:///%s:ip:port" -> %s:ip:port + allService := make([]string, 0) + if err == nil { + for i := range resp.Kvs { + k := string(resp.Kvs[i].Key) + + b := strings.LastIndex(k, "///") + k1 := k[b+len("///"):] + + e := strings.Index(k1, "/") + k2 := k1[:e] + allService = append(allService, k2) + } + } else { + gEtcdCli.Close() + fmt.Println("rrrrrrrrrrr", err.Error()) + return nil + } + gEtcdCli.Close() + + allConn := make([]*grpc.ClientConn, 0) + for _, v := range allService { + r := GetConn(schema, etcdaddr, v) + allConn = append(allConn, r) + } + + return allConn +} + +var ( + service2pool = make(map[string]*Pool) + service2poolMu sync.Mutex +) + +func GetconnFactory(schema, etcdaddr, servicename string) (*grpc.ClientConn, error) { + c := GetConn(schema, etcdaddr, servicename) + if c != nil { + return c, nil + } else { + return c, fmt.Errorf("GetConn failed") + } +} + +func GetConnPool(schema, etcdaddr, servicename string) (*ClientConn, error) { + //get pool + p := NewPool(schema, etcdaddr, servicename) + //poo->get + + ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(1000*time.Millisecond)) + + c, err := p.Get(ctx) + fmt.Println(err) + return c, err + +} + +func NewPool(schema, etcdaddr, servicename string) *Pool { + + if _, ok := service2pool[schema+servicename]; !ok { + // + service2poolMu.Lock() + if _, ok1 := service2pool[schema+servicename]; !ok1 { + p, err := New(GetconnFactory, schema, etcdaddr, servicename, 5, 10, 1) + if err == nil { + service2pool[schema+servicename] = p + } + } + service2poolMu.Unlock() + } + + return service2pool[schema+servicename] +} +func GetGrpcConn(schema, etcdaddr, servicename string) *grpc.ClientConn { + return nameResolver[schema+servicename].grpcClientConn +}