From 4c17c64403c379ce2c7eda77dd2181e7aaa8937e Mon Sep 17 00:00:00 2001
From: wangchuxiao <wangchuxiao97@outlook.com>
Date: Tue, 30 May 2023 19:18:12 +0800
Subject: [PATCH] builder

---
 cmd/api/main.go                             |   2 +-
 internal/msgtransfer/init.go                |   2 +-
 pkg/discoveryregistry/zookeeper/conf.go     |  27 +++
 pkg/discoveryregistry/zookeeper/discover.go | 110 ++++++++++++
 pkg/discoveryregistry/zookeeper/register.go |  39 ++++
 pkg/discoveryregistry/zookeeper/resolver.go |  51 ++++++
 pkg/discoveryregistry/zookeeper/zk.go       | 187 ++++++++++++++++++++
 pkg/startrpc/start.go                       |   2 +-
 8 files changed, 417 insertions(+), 3 deletions(-)
 create mode 100644 pkg/discoveryregistry/zookeeper/conf.go
 create mode 100644 pkg/discoveryregistry/zookeeper/discover.go
 create mode 100644 pkg/discoveryregistry/zookeeper/register.go
 create mode 100644 pkg/discoveryregistry/zookeeper/resolver.go
 create mode 100644 pkg/discoveryregistry/zookeeper/zk.go

diff --git a/cmd/api/main.go b/cmd/api/main.go
index e559445ab..48ff83206 100644
--- a/cmd/api/main.go
+++ b/cmd/api/main.go
@@ -17,7 +17,7 @@ import (
 	"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
 	"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
 	"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
-	"github.com/OpenIMSDK/openKeeper"
+	openKeeper "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry/zookeeper"
 )
 
 func main() {
diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go
index e7595bc99..82ee5842b 100644
--- a/internal/msgtransfer/init.go
+++ b/internal/msgtransfer/init.go
@@ -14,8 +14,8 @@ import (
 	"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/unrelation"
 	"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mw"
 	"github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome"
+	openKeeper "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry/zookeeper"
 	"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient"
-	"github.com/OpenIMSDK/openKeeper"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/credentials/insecure"
 )
diff --git a/pkg/discoveryregistry/zookeeper/conf.go b/pkg/discoveryregistry/zookeeper/conf.go
new file mode 100644
index 000000000..04ee6b179
--- /dev/null
+++ b/pkg/discoveryregistry/zookeeper/conf.go
@@ -0,0 +1,27 @@
+package openKeeper
+
+import (
+	"github.com/go-zookeeper/zk"
+)
+
+func (s *ZkClient) RegisterConf2Registry(key string, conf []byte) error {
+	exists, _, err := s.conn.Exists(s.getPath(key))
+	if err != nil {
+		return err
+	}
+	if exists {
+		if err := s.conn.Delete(s.getPath(key), 0); err != nil {
+			return err
+		}
+	}
+	_, err = s.conn.Create(s.getPath(key), conf, 0, zk.WorldACL(zk.PermAll))
+	if err != zk.ErrNodeExists {
+		return err
+	}
+	return nil
+}
+
+func (s *ZkClient) GetConfFromRegistry(key string) ([]byte, error) {
+	bytes, _, err := s.conn.Get(s.getPath(key))
+	return bytes, err
+}
diff --git a/pkg/discoveryregistry/zookeeper/discover.go b/pkg/discoveryregistry/zookeeper/discover.go
new file mode 100644
index 000000000..aa0f74266
--- /dev/null
+++ b/pkg/discoveryregistry/zookeeper/discover.go
@@ -0,0 +1,110 @@
+package openKeeper
+
+import (
+	"context"
+	"fmt"
+	"strings"
+	"sync"
+
+	"github.com/pkg/errors"
+
+	"github.com/go-zookeeper/zk"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/resolver"
+)
+
+var ErrConnIsNil = errors.New("conn is nil")
+var ErrConnIsNilButLocalNotNil = errors.New("conn is nil, but local is not nil")
+
+func (s *ZkClient) watch(wg *sync.WaitGroup) {
+	wg.Done()
+	for {
+		event := <-s.eventChan
+		switch event.Type {
+		case zk.EventSession:
+		case zk.EventNodeCreated:
+		case zk.EventNodeChildrenChanged:
+			l := strings.Split(event.Path, "/")
+			s.lock.Lock()
+			if len(l) > 1 {
+				rpcName := l[len(l)-1]
+				s.flushResolver(rpcName)
+				if len(s.localConns[rpcName]) != 0 {
+					delete(s.localConns, rpcName)
+				}
+			}
+			s.lock.Unlock()
+		case zk.EventNodeDataChanged:
+		case zk.EventNodeDeleted:
+		case zk.EventNotWatching:
+		}
+	}
+
+}
+
+func (s *ZkClient) GetConnsRemote(serviceName string) (conns []resolver.Address, err error) {
+	path := s.getPath(serviceName)
+	childNodes, _, err := s.conn.Children(path)
+	if err != nil {
+		return nil, errors.Wrap(err, "get children error")
+	}
+	for _, child := range childNodes {
+		fullPath := path + "/" + child
+		data, _, err := s.conn.Get(fullPath)
+		if err != nil {
+			if err == zk.ErrNoNode {
+				return nil, errors.Wrap(err, "this is zk ErrNoNode")
+			}
+			return nil, errors.Wrap(err, "get children error")
+		}
+		conns = append(conns, resolver.Address{Addr: string(data), ServerName: serviceName})
+	}
+	_, _, _, err = s.conn.ChildrenW(s.getPath(serviceName))
+	if err != nil {
+		return nil, errors.Wrap(err, "children watch error")
+	}
+	if len(conns) == 0 {
+		return nil, fmt.Errorf("no conn for service %s, grpc server may not exist, local conn is %v, please check zookeeper server %v, path: %s", serviceName, s.localConns, s.zkServers, s.zkRoot)
+	}
+	return conns, nil
+}
+
+func (s *ZkClient) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) {
+	s.lock.Lock()
+	defer s.lock.Unlock()
+	opts = append(s.options, opts...)
+	conns := s.localConns[serviceName]
+	if len(conns) == 0 {
+		var err error
+		conns, err = s.GetConnsRemote(serviceName)
+		if err != nil {
+			return nil, err
+		}
+		s.localConns[serviceName] = conns
+	}
+	var ret []*grpc.ClientConn
+	for _, conn := range conns {
+		c, err := grpc.DialContext(ctx, conn.Addr, append(s.options, opts...)...)
+		if err != nil {
+			return nil, errors.Wrap(err, fmt.Sprintf("conns dialContext error, conn: %s", conn.Addr))
+		}
+		ret = append(ret, c)
+	}
+	return ret, nil
+}
+
+func (s *ZkClient) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
+	newOpts := append(s.options, grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, s.balancerName)))
+	return grpc.DialContext(ctx, fmt.Sprintf("%s:///%s", s.scheme, serviceName), append(newOpts, opts...)...)
+}
+
+func (s *ZkClient) GetFirstConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
+	conns, err := s.GetConns(ctx, serviceName, opts...)
+	if err != nil {
+		return nil, err
+	}
+	if len(conns) == 0 {
+		return nil, ErrConnIsNil
+	}
+	return conns[0], nil
+}
diff --git a/pkg/discoveryregistry/zookeeper/register.go b/pkg/discoveryregistry/zookeeper/register.go
new file mode 100644
index 000000000..b27b5bd28
--- /dev/null
+++ b/pkg/discoveryregistry/zookeeper/register.go
@@ -0,0 +1,39 @@
+package openKeeper
+
+import (
+	"github.com/go-zookeeper/zk"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/resolver"
+)
+
+func (s *ZkClient) Register(rpcRegisterName, host string, port int, opts ...grpc.DialOption) error {
+	s.lock.Lock()
+	defer s.lock.Unlock()
+	if err := s.ensureName(rpcRegisterName); err != nil {
+		return err
+	}
+	addr := s.getAddr(host, port)
+	_, err := grpc.Dial(addr, opts...)
+	if err != nil {
+		return err
+	}
+	node, err := s.conn.CreateProtectedEphemeralSequential(s.getPath(rpcRegisterName)+"/"+addr+"_", []byte(addr), zk.WorldACL(zk.PermAll))
+	if err != nil {
+		return err
+	}
+	s.node = node
+	return nil
+}
+
+func (s *ZkClient) UnRegister() error {
+	s.lock.Lock()
+	defer s.lock.Unlock()
+	err := s.conn.Delete(s.node, -1)
+	if err != nil {
+		return err
+	}
+	s.node = ""
+	s.localConns = make(map[string][]resolver.Address)
+	s.resolvers = make(map[string]*Resolver)
+	return nil
+}
diff --git a/pkg/discoveryregistry/zookeeper/resolver.go b/pkg/discoveryregistry/zookeeper/resolver.go
new file mode 100644
index 000000000..38925e6b0
--- /dev/null
+++ b/pkg/discoveryregistry/zookeeper/resolver.go
@@ -0,0 +1,51 @@
+package openKeeper
+
+import (
+	"context"
+	"strings"
+	"sync"
+
+	"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
+	"google.golang.org/grpc/resolver"
+)
+
+type Resolver struct {
+	target resolver.Target
+	cc     resolver.ClientConn
+	addrs  []resolver.Address
+	lock   sync.RWMutex
+
+	getConnsRemote func(serviceName string) (conns []resolver.Address, err error)
+}
+
+func (r *Resolver) ResolveNow(o resolver.ResolveNowOptions) {
+	log.ZDebug(context.Background(), "start resolve now", "target", r.target)
+	newConns, err := r.getConnsRemote(strings.TrimLeft(r.target.URL.Path, "/"))
+	if err != nil {
+		return
+	}
+	log.ZDebug(context.Background(), "resolve now", "newConns", newConns, "target", r.target)
+	r.lock.Lock()
+	r.addrs = newConns
+	r.lock.Unlock()
+	r.cc.UpdateState(resolver.State{Addresses: r.addrs})
+	log.ZDebug(context.Background(), "resolve now ok", "newConns", newConns, "target", r.target)
+}
+
+func (s *Resolver) Close() {}
+
+func (s *ZkClient) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
+	log.ZDebug(context.Background(), "build resolver", "target", target, "opts", opts, "cc", cc)
+	r := &Resolver{}
+	r.target = target
+	r.cc = cc
+	r.getConnsRemote = s.GetConnsRemote
+	r.ResolveNow(resolver.ResolveNowOptions{})
+	s.lock.Lock()
+	defer s.lock.Unlock()
+	s.resolvers[strings.TrimLeft(target.URL.Path, "/")] = r
+	log.ZDebug(context.Background(), "build resolver ok", "target", target)
+	return r, nil
+}
+
+func (s *ZkClient) Scheme() string { return s.scheme }
diff --git a/pkg/discoveryregistry/zookeeper/zk.go b/pkg/discoveryregistry/zookeeper/zk.go
new file mode 100644
index 000000000..de110410f
--- /dev/null
+++ b/pkg/discoveryregistry/zookeeper/zk.go
@@ -0,0 +1,187 @@
+package openKeeper
+
+import (
+	"net"
+	"strconv"
+	"sync"
+	"time"
+
+	"github.com/go-zookeeper/zk"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/resolver"
+)
+
+const (
+	defaultFreq = time.Minute * 30
+	timeout     = 5
+)
+
+type ZkClient struct {
+	zkServers []string
+	zkRoot    string
+	userName  string
+	password  string
+
+	scheme string
+
+	timeout   int
+	conn      *zk.Conn
+	eventChan <-chan zk.Event
+	node      string
+	ticker    *time.Ticker
+
+	lock    sync.RWMutex
+	options []grpc.DialOption
+
+	resolvers    map[string]*Resolver
+	localConns   map[string][]resolver.Address
+	balancerName string
+}
+
+type ZkOption func(*ZkClient)
+
+func WithRoundRobin() ZkOption {
+	return func(client *ZkClient) {
+		client.balancerName = "round_robin"
+	}
+}
+
+func WithUserNameAndPassword(userName, password string) ZkOption {
+	return func(client *ZkClient) {
+		client.userName = userName
+		client.password = password
+	}
+}
+
+func WithOptions(opts ...grpc.DialOption) ZkOption {
+	return func(client *ZkClient) {
+		client.options = opts
+	}
+}
+
+func WithFreq(freq time.Duration) ZkOption {
+	return func(client *ZkClient) {
+		client.ticker = time.NewTicker(freq)
+	}
+}
+
+func WithTimeout(timeout int) ZkOption {
+	return func(client *ZkClient) {
+		client.timeout = timeout
+	}
+}
+
+func NewClient(zkServers []string, zkRoot string, options ...ZkOption) (*ZkClient, error) {
+	client := &ZkClient{
+		zkServers:  zkServers,
+		zkRoot:     "/",
+		scheme:     zkRoot,
+		timeout:    timeout,
+		localConns: make(map[string][]resolver.Address),
+		resolvers:  make(map[string]*Resolver),
+	}
+	client.ticker = time.NewTicker(defaultFreq)
+	for _, option := range options {
+		option(client)
+	}
+	conn, eventChan, err := zk.Connect(zkServers, time.Duration(client.timeout)*time.Second, zk.WithLogInfo(false))
+	if err != nil {
+		return nil, err
+	}
+	if client.userName != "" && client.password != "" {
+		if err := conn.AddAuth("digest", []byte(client.userName+":"+client.password)); err != nil {
+			return nil, err
+		}
+	}
+	client.zkRoot += zkRoot
+	client.eventChan = eventChan
+	client.conn = conn
+	if err := client.ensureRoot(); err != nil {
+		client.CloseZK()
+		return nil, err
+	}
+	resolver.Register(client)
+	var wg sync.WaitGroup
+	wg.Add(2)
+	go client.refresh(&wg)
+	go client.watch(&wg)
+	wg.Wait()
+	return client, nil
+}
+
+func (s *ZkClient) CloseZK() {
+	s.conn.Close()
+}
+
+func (s *ZkClient) ensureAndCreate(node string) error {
+	exists, _, err := s.conn.Exists(node)
+	if err != nil {
+		return err
+	}
+	if !exists {
+		_, err := s.conn.Create(node, []byte(""), 0, zk.WorldACL(zk.PermAll))
+		if err != nil && err != zk.ErrNodeExists {
+			return err
+		}
+	}
+	return nil
+}
+
+func (s *ZkClient) refresh(wg *sync.WaitGroup) {
+	wg.Done()
+	for range s.ticker.C {
+		s.lock.Lock()
+		for rpcName := range s.resolvers {
+			s.flushResolver(rpcName)
+		}
+		for rpcName := range s.localConns {
+			delete(s.localConns, rpcName)
+		}
+		s.lock.Unlock()
+	}
+
+}
+
+func (s *ZkClient) flushResolver(serviceName string) {
+	r, ok := s.resolvers[serviceName]
+	if ok {
+		r.ResolveNow(resolver.ResolveNowOptions{})
+		s.resolvers[serviceName] = r
+	}
+}
+
+func (s *ZkClient) GetZkConn() *zk.Conn {
+	return s.conn
+}
+
+func (s *ZkClient) GetRootPath() string {
+	return s.zkRoot
+}
+
+func (s *ZkClient) GetNode() string {
+	return s.node
+}
+
+func (s *ZkClient) ensureRoot() error {
+	return s.ensureAndCreate(s.zkRoot)
+}
+
+func (s *ZkClient) ensureName(rpcRegisterName string) error {
+	return s.ensureAndCreate(s.getPath(rpcRegisterName))
+}
+
+func (s *ZkClient) getPath(rpcRegisterName string) string {
+	return s.zkRoot + "/" + rpcRegisterName
+}
+
+func (s *ZkClient) getAddr(host string, port int) string {
+	return net.JoinHostPort(host, strconv.Itoa(port))
+}
+
+func (s *ZkClient) AddOption(opts ...grpc.DialOption) {
+	s.options = append(s.options, opts...)
+}
+
+func (s *ZkClient) GetClientLocalConns() map[string][]resolver.Address {
+	return s.localConns
+}
diff --git a/pkg/startrpc/start.go b/pkg/startrpc/start.go
index ba46da86e..0d830c6f8 100644
--- a/pkg/startrpc/start.go
+++ b/pkg/startrpc/start.go
@@ -11,8 +11,8 @@ import (
 	"github.com/OpenIMSDK/Open-IM-Server/pkg/common/network"
 	"github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome"
 	"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
+	openKeeper "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry/zookeeper"
 	"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
-	"github.com/OpenIMSDK/openKeeper"
 	grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/credentials/insecure"