diff --git a/docker-compose.yaml b/docker-compose.yaml index f189c8816..f831a2963 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -54,7 +54,7 @@ services: zookeeper: - image: wurstmeister/zookeeper + image: wurstmeister/zookeeper:latest ports: - 2181:2181 container_name: zookeeper diff --git a/pkg/common/log/zk_logger.go b/pkg/common/log/zk_logger.go new file mode 100644 index 000000000..629cee2b9 --- /dev/null +++ b/pkg/common/log/zk_logger.go @@ -0,0 +1,12 @@ +package log + +import ( + "context" + "fmt" +) + +type ZkLogger struct{} + +func (l *ZkLogger) Printf(format string, a ...interface{}) { + ZInfo(context.Background(), fmt.Sprintf(format, a...)) +} diff --git a/pkg/discoveryregistry/zookeeper/discover.go b/pkg/discoveryregistry/zookeeper/discover.go index 3f5a5df42..2142bde21 100644 --- a/pkg/discoveryregistry/zookeeper/discover.go +++ b/pkg/discoveryregistry/zookeeper/discover.go @@ -18,24 +18,26 @@ var ErrConnIsNil = errors.New("conn is nil") var ErrConnIsNilButLocalNotNil = errors.New("conn is nil, but local is not nil") func (s *ZkClient) watch(wg *sync.WaitGroup) { - wg.Done() for { event := <-s.eventChan switch event.Type { case zk.EventSession: + log.ZDebug(context.Background(), "zk session event", "event", event) case zk.EventNodeCreated: case zk.EventNodeChildrenChanged: log.ZDebug(context.Background(), "zk event", "event", event.Path) l := strings.Split(event.Path, "/") - s.lock.Lock() if len(l) > 1 { + s.lock.Lock() rpcName := l[len(l)-1] s.flushResolver(rpcName) if len(s.localConns[rpcName]) != 0 { delete(s.localConns, rpcName) } + s.lock.Unlock() + } - s.lock.Unlock() + log.ZDebug(context.Background(), "zk event handle success", "event", event.Path) case zk.EventNodeDataChanged: case zk.EventNodeDeleted: case zk.EventNotWatching: @@ -72,11 +74,13 @@ func (s *ZkClient) GetConnsRemote(serviceName string) (conns []resolver.Address, } func (s *ZkClient) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) { + log.ZDebug(ctx, "get conns from client", "serviceName", serviceName) s.lock.Lock() opts = append(s.options, opts...) conns := s.localConns[serviceName] if len(conns) == 0 { var err error + log.ZDebug(ctx, "get conns from zk remote", "serviceName", serviceName) conns, err = s.GetConnsRemote(serviceName) if err != nil { return nil, err @@ -85,6 +89,7 @@ func (s *ZkClient) GetConns(ctx context.Context, serviceName string, opts ...grp } s.lock.Unlock() var ret []*grpc.ClientConn + log.ZDebug(ctx, "get conns from zk success", "conns", conns) for _, conn := range conns { c, err := grpc.DialContext(ctx, conn.Addr, append(s.options, opts...)...) if err != nil { @@ -92,6 +97,7 @@ func (s *ZkClient) GetConns(ctx context.Context, serviceName string, opts ...grp } ret = append(ret, c) } + log.ZDebug(ctx, "dial ctx success", "conns", ret) return ret, nil } @@ -107,6 +113,7 @@ func (s *ZkClient) GetConn(ctx context.Context, serviceName string, opts ...grpc if len(conns) == 0 { return nil, ErrConnIsNil } + log.ZDebug(ctx, "get conn from conns", "conns", conns) return s.getConnBalance(conns) } diff --git a/pkg/discoveryregistry/zookeeper/zk.go b/pkg/discoveryregistry/zookeeper/zk.go index e3232c706..13ae21932 100644 --- a/pkg/discoveryregistry/zookeeper/zk.go +++ b/pkg/discoveryregistry/zookeeper/zk.go @@ -87,7 +87,7 @@ func NewClient(zkServers []string, zkRoot string, options ...ZkOption) (*ZkClien for _, option := range options { option(client) } - conn, eventChan, err := zk.Connect(zkServers, time.Duration(client.timeout)*time.Second, zk.WithLogInfo(false)) + conn, eventChan, err := zk.Connect(zkServers, time.Duration(client.timeout)*time.Second, zk.WithLogInfo(true), zk.WithLogger(&log.ZkLogger{})) if err != nil { return nil, err } @@ -105,10 +105,8 @@ func NewClient(zkServers []string, zkRoot string, options ...ZkOption) (*ZkClien } // resolver.Register(client) var wg sync.WaitGroup - wg.Add(2) go client.refresh(&wg) go client.watch(&wg) - wg.Wait() return client, nil } @@ -131,7 +129,6 @@ func (s *ZkClient) ensureAndCreate(node string) error { } func (s *ZkClient) refresh(wg *sync.WaitGroup) { - wg.Done() for range s.ticker.C { log.ZDebug(context.Background(), "refresh local conns") s.lock.Lock()