mirror of
https://github.com/gogf/gf.git
synced 2025-04-05 11:18:50 +08:00
feat: improve watch for polaris registrar (#2788)
This commit is contained in:
parent
83fa3593b1
commit
2c22f4e17d
@ -42,6 +42,7 @@ func (r *Registry) Search(ctx context.Context, in gsvc.SearchInput) ([]gsvc.Serv
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
serviceInstances := instancesToServiceInstances(instancesResponse.GetInstances())
|
||||
// Service filter.
|
||||
filteredServices := make([]gsvc.Service, 0)
|
||||
@ -78,26 +79,29 @@ func instancesToServiceInstances(instances []model.Instance) []gsvc.Service {
|
||||
serviceInstances = make([]gsvc.Service, 0, len(instances))
|
||||
endpointStr bytes.Buffer
|
||||
)
|
||||
|
||||
for _, instance := range instances {
|
||||
if instance.IsHealthy() {
|
||||
endpointStr.WriteString(fmt.Sprintf("%s:%d%s", instance.GetHost(), instance.GetPort(), gsvc.EndpointsDelimiter))
|
||||
}
|
||||
}
|
||||
|
||||
for _, instance := range instances {
|
||||
if instance.IsHealthy() {
|
||||
serviceInstances = append(serviceInstances, instanceToServiceInstance(instance, gstr.TrimRight(endpointStr.String(), gsvc.EndpointsDelimiter)))
|
||||
if endpointStr.Len() > 0 {
|
||||
for _, instance := range instances {
|
||||
if instance.IsHealthy() {
|
||||
serviceInstances = append(serviceInstances, instanceToServiceInstance(instance, gstr.TrimRight(endpointStr.String(), gsvc.EndpointsDelimiter), ""))
|
||||
}
|
||||
}
|
||||
}
|
||||
return serviceInstances
|
||||
}
|
||||
|
||||
func instanceToServiceInstance(instance model.Instance, endpointStr string) gsvc.Service {
|
||||
// instanceToServiceInstance converts the instance to service instance.
|
||||
// instanceID Must be null when creating and adding, and non-null when updating and deleting
|
||||
func instanceToServiceInstance(instance model.Instance, endpointStr, instanceID string) gsvc.Service {
|
||||
var (
|
||||
s *gsvc.LocalService
|
||||
metadata = instance.GetMetadata()
|
||||
names = strings.Split(instance.GetService(), instanceIDSeparator)
|
||||
// endpoints = gsvc.NewEndpoints(fmt.Sprintf("%s:%d", instance.GetHost(), instance.GetPort()))
|
||||
s *gsvc.LocalService
|
||||
metadata = instance.GetMetadata()
|
||||
names = strings.Split(instance.GetService(), instanceIDSeparator)
|
||||
endpoints = gsvc.NewEndpoints(endpointStr)
|
||||
)
|
||||
if names != nil && len(names) > 4 {
|
||||
@ -126,9 +130,16 @@ func instanceToServiceInstance(instance model.Instance, endpointStr string) gsvc
|
||||
Endpoints: endpoints,
|
||||
}
|
||||
}
|
||||
return &Service{
|
||||
service := &Service{
|
||||
Service: s,
|
||||
}
|
||||
if instance.GetId() != "" {
|
||||
service.ID = instance.GetId()
|
||||
}
|
||||
if gstr.Trim(instanceID) != "" {
|
||||
service.ID = instanceID
|
||||
}
|
||||
return service
|
||||
}
|
||||
|
||||
// trimAndReplace trims the prefix and suffix separator and replaces the separator in the middle.
|
||||
|
@ -68,10 +68,12 @@ func (w *Watcher) Proceed() ([]gsvc.Service, error) {
|
||||
}
|
||||
// handle DeleteEvent
|
||||
if instanceEvent.DeleteEvent != nil {
|
||||
var endpointStr bytes.Buffer
|
||||
for _, instance := range instanceEvent.DeleteEvent.Instances {
|
||||
// Iterate through existing service instances, deleting them if they exist
|
||||
for i, serviceInstance := range w.ServiceInstances {
|
||||
if serviceInstance.(*Service).ID == instance.GetId() {
|
||||
// remove equal
|
||||
endpointStr.WriteString(fmt.Sprintf("%s:%d%s", instance.GetHost(), instance.GetPort(), gsvc.EndpointsDelimiter))
|
||||
if len(w.ServiceInstances) <= 1 {
|
||||
w.ServiceInstances = w.ServiceInstances[0:0]
|
||||
continue
|
||||
@ -80,32 +82,92 @@ func (w *Watcher) Proceed() ([]gsvc.Service, error) {
|
||||
}
|
||||
}
|
||||
}
|
||||
if endpointStr.Len() > 0 && len(w.ServiceInstances) > 0 {
|
||||
var (
|
||||
newEndpointStr bytes.Buffer
|
||||
serviceEndpointStr = w.ServiceInstances[0].(*Service).GetEndpoints().String()
|
||||
)
|
||||
for _, address := range gstr.SplitAndTrim(serviceEndpointStr, gsvc.EndpointsDelimiter) {
|
||||
if !gstr.Contains(endpointStr.String(), address) {
|
||||
newEndpointStr.WriteString(fmt.Sprintf("%s%s", address, gsvc.EndpointsDelimiter))
|
||||
}
|
||||
}
|
||||
|
||||
for i := 0; i < len(w.ServiceInstances); i++ {
|
||||
w.ServiceInstances[i] = instanceToServiceInstance(instanceEvent.DeleteEvent.Instances[0], gstr.TrimRight(newEndpointStr.String(), gsvc.EndpointsDelimiter), w.ServiceInstances[i].(*Service).ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
// handle UpdateEvent
|
||||
if instanceEvent.UpdateEvent != nil {
|
||||
for i, serviceInstance := range w.ServiceInstances {
|
||||
var endpointStr bytes.Buffer
|
||||
var (
|
||||
updateEndpointStr bytes.Buffer
|
||||
newEndpointStr bytes.Buffer
|
||||
)
|
||||
for _, serviceInstance := range w.ServiceInstances {
|
||||
// update the current department or all instances
|
||||
for _, update := range instanceEvent.UpdateEvent.UpdateList {
|
||||
if serviceInstance.(*Service).ID == update.Before.GetId() {
|
||||
endpointStr.WriteString(fmt.Sprintf("%s:%d%s", update.After.GetHost(), update.After.GetPort(), gsvc.EndpointsDelimiter))
|
||||
// update equal
|
||||
if update.After.IsHealthy() {
|
||||
newEndpointStr.WriteString(fmt.Sprintf("%s:%d%s", update.After.GetHost(), update.After.GetPort(), gsvc.EndpointsDelimiter))
|
||||
}
|
||||
updateEndpointStr.WriteString(fmt.Sprintf("%s:%d%s", update.Before.GetHost(), update.Before.GetPort(), gsvc.EndpointsDelimiter))
|
||||
}
|
||||
}
|
||||
for _, update := range instanceEvent.UpdateEvent.UpdateList {
|
||||
if serviceInstance.(*Service).ID == update.Before.GetId() {
|
||||
w.ServiceInstances[i] = instanceToServiceInstance(update.After, gstr.TrimRight(endpointStr.String(), gsvc.EndpointsDelimiter))
|
||||
}
|
||||
if len(w.ServiceInstances) > 0 {
|
||||
var serviceEndpointStr = w.ServiceInstances[0].(*Service).GetEndpoints().String()
|
||||
// old instance addresses are culled
|
||||
if updateEndpointStr.Len() > 0 {
|
||||
for _, address := range gstr.SplitAndTrim(serviceEndpointStr, gsvc.EndpointsDelimiter) {
|
||||
// If the historical instance is not in the change instance, it remains
|
||||
if !gstr.Contains(updateEndpointStr.String(), address) {
|
||||
newEndpointStr.WriteString(fmt.Sprintf("%s%s", address, gsvc.EndpointsDelimiter))
|
||||
}
|
||||
}
|
||||
}
|
||||
instance := instanceEvent.UpdateEvent.UpdateList[0].After
|
||||
for i := 0; i < len(w.ServiceInstances); i++ {
|
||||
w.ServiceInstances[i] = instanceToServiceInstance(instance, gstr.TrimRight(newEndpointStr.String(), gsvc.EndpointsDelimiter), w.ServiceInstances[i].(*Service).ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
// handle AddEvent
|
||||
if instanceEvent.AddEvent != nil {
|
||||
w.ServiceInstances = append(
|
||||
w.ServiceInstances,
|
||||
instancesToServiceInstances(instanceEvent.AddEvent.Instances)...,
|
||||
var (
|
||||
newEndpointStr bytes.Buffer
|
||||
allEndpointStr string
|
||||
)
|
||||
if len(w.ServiceInstances) > 0 {
|
||||
allEndpointStr = w.ServiceInstances[0].(*Service).GetEndpoints().String()
|
||||
}
|
||||
for i := 0; i < len(instanceEvent.AddEvent.Instances); i++ {
|
||||
instance := instanceEvent.AddEvent.Instances[i]
|
||||
if instance.IsHealthy() {
|
||||
address := fmt.Sprintf("%s:%d", instance.GetHost(), instance.GetPort())
|
||||
if !gstr.Contains(allEndpointStr, address) {
|
||||
newEndpointStr.WriteString(fmt.Sprintf("%s%s", address, gsvc.EndpointsDelimiter))
|
||||
}
|
||||
}
|
||||
}
|
||||
if newEndpointStr.Len() > 0 {
|
||||
allEndpointStr = fmt.Sprintf("%s%s", newEndpointStr.String(), allEndpointStr)
|
||||
}
|
||||
for i := 0; i < len(w.ServiceInstances); i++ {
|
||||
w.ServiceInstances[i] = instanceToServiceInstance(instanceEvent.AddEvent.Instances[0], gstr.TrimRight(allEndpointStr, gsvc.EndpointsDelimiter), w.ServiceInstances[i].(*Service).ID)
|
||||
}
|
||||
|
||||
for i := 0; i < len(instanceEvent.AddEvent.Instances); i++ {
|
||||
instance := instanceEvent.AddEvent.Instances[i]
|
||||
if instance.IsHealthy() {
|
||||
w.ServiceInstances = append(w.ServiceInstances, instanceToServiceInstance(instance, gstr.TrimRight(allEndpointStr, gsvc.EndpointsDelimiter), ""))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return w.ServiceInstances, nil
|
||||
}
|
||||
|
||||
|
@ -19,8 +19,8 @@ import (
|
||||
"github.com/gogf/gf/v2/text/gstr"
|
||||
)
|
||||
|
||||
// TestRegistry TestRegistryManyService
|
||||
func TestRegistry(t *testing.T) {
|
||||
// TestRegistry_Register TestRegistryManyService
|
||||
func TestRegistry_Register(t *testing.T) {
|
||||
conf := config.NewDefaultConfiguration([]string{"127.0.0.1:8091"})
|
||||
conf.GetGlobal().GetStatReporter().SetEnable(false)
|
||||
conf.Consumer.LocalCache.SetPersistDir(os.TempDir() + "/polaris-registry/backup")
|
||||
@ -35,7 +35,39 @@ func TestRegistry(t *testing.T) {
|
||||
)
|
||||
|
||||
svc := &gsvc.LocalService{
|
||||
Name: "goframe-provider-0-tcp",
|
||||
Name: "goframe-provider-register-tcp",
|
||||
Version: "test",
|
||||
Metadata: map[string]interface{}{"app": "goframe", gsvc.MDProtocol: "tcp"},
|
||||
Endpoints: gsvc.NewEndpoints("127.0.0.1:9000"),
|
||||
}
|
||||
|
||||
s, err := r.Register(context.Background(), svc)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err = r.Deregister(context.Background(), s); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRegistry_Deregister TestRegistryManyService
|
||||
func TestRegistry_Deregister(t *testing.T) {
|
||||
conf := config.NewDefaultConfiguration([]string{"127.0.0.1:8091"})
|
||||
conf.GetGlobal().GetStatReporter().SetEnable(false)
|
||||
conf.Consumer.LocalCache.SetPersistDir(os.TempDir() + "/polaris-registry/backup")
|
||||
if err := api.SetLoggersDir(os.TempDir() + "/polaris-registry/log"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
r := NewWithConfig(
|
||||
conf,
|
||||
WithTimeout(time.Second*10),
|
||||
WithTTL(100),
|
||||
)
|
||||
|
||||
svc := &gsvc.LocalService{
|
||||
Name: "goframe-provider-deregister-tcp",
|
||||
Version: "test",
|
||||
Metadata: map[string]interface{}{"app": "goframe", gsvc.MDProtocol: "tcp"},
|
||||
Endpoints: gsvc.NewEndpoints("127.0.0.1:9000"),
|
||||
@ -113,8 +145,8 @@ func TestRegistryMany(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestGetService Test GetService
|
||||
func TestGetService(t *testing.T) {
|
||||
// TestRegistry_Search Test GetService
|
||||
func TestRegistry_Search(t *testing.T) {
|
||||
conf := config.NewDefaultConfiguration([]string{"127.0.0.1:8091"})
|
||||
conf.GetGlobal().GetStatReporter().SetEnable(false)
|
||||
conf.Consumer.LocalCache.SetPersistDir(os.TempDir() + "/polaris-get-service/backup")
|
||||
@ -158,8 +190,8 @@ func TestGetService(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestWatch Test Watch
|
||||
func TestWatch(t *testing.T) {
|
||||
// TestRegistry_Watch Test Watch
|
||||
func TestRegistry_Watch(t *testing.T) {
|
||||
conf := config.NewDefaultConfiguration([]string{"127.0.0.1:8091"})
|
||||
conf.GetGlobal().GetStatReporter().SetEnable(false)
|
||||
conf.Consumer.LocalCache.SetPersistDir(os.TempDir() + "/polaris-watch/backup")
|
||||
@ -192,6 +224,7 @@ func TestWatch(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Log("Register service success svc instance id:", s1.(*Service).ID)
|
||||
// watch svc
|
||||
time.Sleep(time.Second * 1)
|
||||
|
||||
@ -202,7 +235,7 @@ func TestWatch(t *testing.T) {
|
||||
}
|
||||
for _, instance := range next {
|
||||
// it will output one instance
|
||||
t.Log("Register Proceed service: ", instance)
|
||||
t.Log("Register Proceed service: ", instance.GetEndpoints().String())
|
||||
}
|
||||
|
||||
if err = r.Deregister(context.Background(), s1); err != nil {
|
||||
@ -216,7 +249,7 @@ func TestWatch(t *testing.T) {
|
||||
}
|
||||
for _, instance := range next {
|
||||
// it will output nothing
|
||||
t.Log("Deregister Proceed service: ", instance)
|
||||
t.Log("Deregister Proceed first delete service: ", instance.GetEndpoints().String(), ", instance id: ", instance.(*Service).ID)
|
||||
}
|
||||
|
||||
if err = watch.Close(); err != nil {
|
||||
@ -226,31 +259,159 @@ func TestWatch(t *testing.T) {
|
||||
// if nil, stop failed
|
||||
t.Fatal()
|
||||
}
|
||||
t.Log("Watch close success")
|
||||
}
|
||||
|
||||
// TestWatcher_Proceed Test Watch
|
||||
func TestWatcher_Proceed(t *testing.T) {
|
||||
conf := config.NewDefaultConfiguration([]string{"127.0.0.1:8091"})
|
||||
conf.GetGlobal().GetStatReporter().SetEnable(false)
|
||||
conf.Consumer.LocalCache.SetPersistDir(os.TempDir() + "/polaris-watch/backup")
|
||||
if err := api.SetLoggersDir(os.TempDir() + "/polaris-watch/log"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
r := NewWithConfig(
|
||||
conf,
|
||||
WithTimeout(time.Second*10),
|
||||
WithTTL(100),
|
||||
)
|
||||
|
||||
svc := &gsvc.LocalService{
|
||||
Name: "goframe-provider-5-tcp",
|
||||
Version: "test",
|
||||
Metadata: map[string]interface{}{"app": "goframe", gsvc.MDProtocol: "tcp"},
|
||||
Endpoints: gsvc.NewEndpoints("127.0.0.1:9000"),
|
||||
}
|
||||
|
||||
s := &Service{
|
||||
Service: svc,
|
||||
}
|
||||
svc1 := &gsvc.LocalService{
|
||||
Name: "goframe-provider-5-tcp",
|
||||
Version: "test",
|
||||
Metadata: map[string]interface{}{"app": "goframe", gsvc.MDProtocol: "tcp"},
|
||||
Endpoints: gsvc.NewEndpoints("127.0.0.1:9001"),
|
||||
}
|
||||
|
||||
watch, err := r.Watch(context.Background(), s.GetPrefix())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
s1, err := r.Register(context.Background(), svc)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Log("Register service success svc instance id:", s1.(*Service).ID)
|
||||
s22, err := r.Register(context.Background(), svc1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Log("Register service success svc1 instance id:", s22.(*Service).ID)
|
||||
// watch svc
|
||||
time.Sleep(time.Second * 1)
|
||||
|
||||
// svc register, AddEvent
|
||||
next, err := watch.Proceed()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for _, instance := range next {
|
||||
// it will output one instance
|
||||
t.Log("Register Proceed service: ", instance.GetEndpoints().String())
|
||||
}
|
||||
|
||||
if err = r.Deregister(context.Background(), s1); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// svc deregister, DeleteEvent
|
||||
next, err = watch.Proceed()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for _, instance := range next {
|
||||
// it will output nothing
|
||||
t.Log("Deregister Proceed first delete service: ", instance.GetEndpoints().String(), ", instance id: ", instance.(*Service).ID)
|
||||
}
|
||||
|
||||
// ReRegister
|
||||
s1, err = r.Register(context.Background(), svc)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Log("Register service Regin register svc instance id:", s1.(*Service).ID)
|
||||
// svc deregister, DeleteEvent
|
||||
next, err = watch.Proceed()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for _, instance := range next {
|
||||
// it will output nothing
|
||||
t.Log("Deregister Proceed second register service: ", instance.GetEndpoints().String(), ", instance id: ", instance.(*Service).ID)
|
||||
}
|
||||
|
||||
if err = r.Deregister(context.Background(), s22); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// svc deregister, DeleteEvent
|
||||
next, err = watch.Proceed()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for _, instance := range next {
|
||||
// it will output nothing
|
||||
t.Log("Deregister Proceed second delete service: ", instance.GetEndpoints().String(), ", instance id: ", instance.(*Service).ID)
|
||||
}
|
||||
|
||||
// svc register, deleteEvent Deregister s1
|
||||
if err = r.Deregister(context.Background(), s1); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// svc deregister, DeleteEvent
|
||||
next, err = watch.Proceed()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for _, instance := range next {
|
||||
// it will output nothing
|
||||
t.Log("Deregister Proceed third delete service: ", instance.GetEndpoints().String(), ", instance id: ", instance.(*Service).ID)
|
||||
}
|
||||
|
||||
if err = watch.Close(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err = watch.Proceed(); err == nil {
|
||||
// if nil, stop failed
|
||||
t.Fatal()
|
||||
}
|
||||
t.Log("Watch close success")
|
||||
}
|
||||
|
||||
// BenchmarkRegister
|
||||
func BenchmarkRegister(b *testing.B) {
|
||||
conf := config.NewDefaultConfiguration([]string{"127.0.0.1:8091"})
|
||||
conf.GetGlobal().GetStatReporter().SetEnable(false)
|
||||
conf.Consumer.LocalCache.SetPersistDir(os.TempDir() + "/polaris-registry/backup")
|
||||
if err := api.SetLoggersDir(os.TempDir() + "/polaris-registry/log"); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
|
||||
r := NewWithConfig(
|
||||
conf,
|
||||
WithTimeout(time.Second*10),
|
||||
WithTTL(100),
|
||||
)
|
||||
|
||||
svc := &gsvc.LocalService{
|
||||
Name: "goframe-provider-0-tcp",
|
||||
Version: "test",
|
||||
Metadata: map[string]interface{}{"app": "goframe", gsvc.MDProtocol: "tcp"},
|
||||
Endpoints: gsvc.NewEndpoints("127.0.0.1:9000"),
|
||||
}
|
||||
for i := 0; i < b.N; i++ {
|
||||
conf := config.NewDefaultConfiguration([]string{"127.0.0.1:8091"})
|
||||
conf.GetGlobal().GetStatReporter().SetEnable(false)
|
||||
conf.Consumer.LocalCache.SetPersistDir(os.TempDir() + "/polaris-registry/backup")
|
||||
if err := api.SetLoggersDir(os.TempDir() + "/polaris-registry/log"); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
|
||||
r := NewWithConfig(
|
||||
conf,
|
||||
WithTimeout(time.Second*10),
|
||||
WithTTL(100),
|
||||
)
|
||||
|
||||
svc := &gsvc.LocalService{
|
||||
Name: "goframe-provider-0-tcp",
|
||||
Version: "test",
|
||||
Metadata: map[string]interface{}{"app": "goframe", gsvc.MDProtocol: "tcp"},
|
||||
Endpoints: gsvc.NewEndpoints("127.0.0.1:9000"),
|
||||
}
|
||||
|
||||
s, err := r.Register(context.Background(), svc)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
@ -306,22 +467,26 @@ func TestRegistryManyForEndpoints(t *testing.T) {
|
||||
Endpoints: gsvc.NewEndpoints(endpointThree),
|
||||
}
|
||||
|
||||
// svc register, AddEvent
|
||||
s0, err := r.Register(context.Background(), svc)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// svc register, AddEvent
|
||||
s1, err := r.Register(context.Background(), svc1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// svc register, AddEvent
|
||||
s2, err := r.Register(context.Background(), svc2)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Log("Register service success sleep 1s")
|
||||
time.Sleep(time.Second * 1)
|
||||
time.Sleep(time.Second * 2)
|
||||
|
||||
// serviceName = "service-default-default-goframe-provider-tcp-latest"
|
||||
result, err := r.Search(context.Background(), gsvc.SearchInput{
|
||||
Name: serviceName,
|
||||
@ -355,5 +520,244 @@ func TestRegistryManyForEndpoints(t *testing.T) {
|
||||
if err = r.Deregister(context.Background(), s2); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
t.Log("Deregister success")
|
||||
}
|
||||
|
||||
// TestWatcher_Close Test Close
|
||||
func TestWatcher_Close(t *testing.T) {
|
||||
conf := config.NewDefaultConfiguration([]string{"127.0.0.1:8091"})
|
||||
conf.GetGlobal().GetStatReporter().SetEnable(false)
|
||||
conf.Consumer.LocalCache.SetPersistDir(os.TempDir() + "/polaris-watch/backup")
|
||||
if err := api.SetLoggersDir(os.TempDir() + "/polaris-watch/log"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
r := NewWithConfig(
|
||||
conf,
|
||||
WithTimeout(time.Second*10),
|
||||
WithTTL(100),
|
||||
)
|
||||
|
||||
svc := &gsvc.LocalService{
|
||||
Name: "goframe-provider-close-tcp",
|
||||
Version: "test",
|
||||
Metadata: map[string]interface{}{"app": "goframe", gsvc.MDProtocol: "tcp"},
|
||||
Endpoints: gsvc.NewEndpoints("127.0.0.1:9000"),
|
||||
}
|
||||
|
||||
s := &Service{
|
||||
Service: svc,
|
||||
}
|
||||
|
||||
watch, err := r.Watch(context.Background(), s.GetPrefix())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
s1, err := r.Register(context.Background(), svc)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// watch svc
|
||||
time.Sleep(time.Second * 1)
|
||||
if err = r.Deregister(context.Background(), s1); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err = watch.Close(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err = watch.Proceed(); err == nil {
|
||||
// if nil, stop failed
|
||||
t.Fatal()
|
||||
}
|
||||
t.Log("Watch close success")
|
||||
}
|
||||
|
||||
// TestGetKey Test get key
|
||||
func TestGetKey(t *testing.T) {
|
||||
svc := &gsvc.LocalService{
|
||||
Name: "goframe-provider-key-tcp",
|
||||
Version: "test",
|
||||
Metadata: map[string]interface{}{"app": "goframe", gsvc.MDProtocol: "tcp"},
|
||||
Endpoints: gsvc.NewEndpoints("127.0.0.1:9000"),
|
||||
}
|
||||
|
||||
s := &Service{
|
||||
Service: svc,
|
||||
}
|
||||
if s.GetKey() != "service-default-default-goframe-provider-key-tcp-test-127.0.0.1:9000" {
|
||||
t.Fatal("GetKey error key:", s.GetKey())
|
||||
}
|
||||
t.Log("GetKey success ")
|
||||
}
|
||||
|
||||
// TestService_GetPrefix Test GetPrefix
|
||||
func TestService_GetPrefix(t *testing.T) {
|
||||
type fields struct {
|
||||
Service gsvc.Service
|
||||
ID string
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
want string
|
||||
}{
|
||||
{
|
||||
name: "TestService_GetPrefix-0",
|
||||
fields: fields{
|
||||
Service: &gsvc.LocalService{
|
||||
Name: "goframe-provider-0-tcp",
|
||||
Version: "test",
|
||||
Metadata: map[string]interface{}{"app": "goframe", gsvc.MDProtocol: "tcp"},
|
||||
Endpoints: gsvc.NewEndpoints("127.0.0.1:9000"),
|
||||
},
|
||||
ID: "test",
|
||||
},
|
||||
want: "service-default-default-goframe-provider-0-tcp-test",
|
||||
},
|
||||
{
|
||||
name: "TestService_GetPrefix-1",
|
||||
fields: fields{
|
||||
Service: &gsvc.LocalService{
|
||||
Name: "goframe-provider-1-tcp",
|
||||
Version: "test",
|
||||
Metadata: map[string]interface{}{"app": "goframe", gsvc.MDProtocol: "tcp"},
|
||||
Endpoints: gsvc.NewEndpoints("127.0.0.1:9001"),
|
||||
},
|
||||
ID: "test",
|
||||
},
|
||||
want: "service-default-default-goframe-provider-1-tcp-test",
|
||||
},
|
||||
{
|
||||
name: "TestService_GetPrefix-2",
|
||||
fields: fields{
|
||||
Service: &gsvc.LocalService{
|
||||
Name: "goframe-provider-2-tcp",
|
||||
Version: "latest",
|
||||
Metadata: map[string]interface{}{"app": "goframe", gsvc.MDProtocol: "tcp"},
|
||||
Endpoints: gsvc.NewEndpoints("127.0.0.1:9002"),
|
||||
},
|
||||
ID: "latest",
|
||||
},
|
||||
want: "service-default-default-goframe-provider-2-tcp-latest",
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
s := &Service{
|
||||
Service: tt.fields.Service,
|
||||
ID: tt.fields.ID,
|
||||
}
|
||||
if got := s.GetPrefix(); got != tt.want {
|
||||
t.Errorf("GetPrefix() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestService_GetName Test GetName
|
||||
func TestService_GetKey(t *testing.T) {
|
||||
type fields struct {
|
||||
Service gsvc.Service
|
||||
ID string
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
want string
|
||||
}{
|
||||
{
|
||||
name: "TestService_GetKey-0",
|
||||
fields: fields{
|
||||
Service: &gsvc.LocalService{
|
||||
Namespace: gsvc.DefaultNamespace,
|
||||
Deployment: gsvc.DefaultDeployment,
|
||||
Name: "goframe-provider-0-tcp",
|
||||
Version: "test",
|
||||
Metadata: map[string]interface{}{"app": "goframe", gsvc.MDProtocol: "tcp"},
|
||||
Endpoints: gsvc.NewEndpoints("127.0.0.1:9000"),
|
||||
},
|
||||
ID: "test",
|
||||
},
|
||||
want: "service-default-default-goframe-provider-0-tcp-test-127.0.0.1:9000",
|
||||
},
|
||||
{
|
||||
name: "TestService_GetKey-1",
|
||||
fields: fields{
|
||||
Service: &gsvc.LocalService{
|
||||
Namespace: gsvc.DefaultNamespace,
|
||||
Deployment: gsvc.DefaultDeployment,
|
||||
Name: "goframe-provider-1-tcp",
|
||||
Version: "latest",
|
||||
Metadata: map[string]interface{}{"app": "goframe", gsvc.MDProtocol: "tcp"},
|
||||
Endpoints: gsvc.NewEndpoints("127.0.0.1:9001"),
|
||||
},
|
||||
ID: "latest",
|
||||
},
|
||||
want: "service-default-default-goframe-provider-1-tcp-latest-127.0.0.1:9001",
|
||||
},
|
||||
{
|
||||
name: "TestService_GetKey-2",
|
||||
fields: fields{
|
||||
Service: &gsvc.LocalService{
|
||||
Namespace: gsvc.DefaultNamespace,
|
||||
Deployment: gsvc.DefaultDeployment,
|
||||
Name: "goframe-provider-2-tcp",
|
||||
Version: "latest",
|
||||
Metadata: map[string]interface{}{"app": "goframe", gsvc.MDProtocol: "tcp"},
|
||||
Endpoints: gsvc.NewEndpoints("127.0.0.1:9002"),
|
||||
},
|
||||
ID: "latest",
|
||||
},
|
||||
want: "service-default-default-goframe-provider-2-tcp-latest-127.0.0.1:9002",
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
s := &Service{
|
||||
Service: tt.fields.Service,
|
||||
ID: tt.fields.ID,
|
||||
}
|
||||
if got := s.GetKey(); got != tt.want {
|
||||
t.Errorf("GetKey() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Test_trimAndReplace Test trimAndReplace
|
||||
func Test_trimAndReplace(t *testing.T) {
|
||||
type args struct {
|
||||
key string
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
want string
|
||||
}{
|
||||
{
|
||||
name: "Test_trimAndReplace-0",
|
||||
args: args{key: "/service/default/default/goframe-provider-0-tcp/latest/127.0.0.1:9000"},
|
||||
want: "service-default-default-goframe-provider-0-tcp-latest-127.0.0.1:9000",
|
||||
},
|
||||
{
|
||||
name: "Test_trimAndReplace-1",
|
||||
args: args{key: "/service/default/default/goframe-provider-1-tcp/latest/127.0.0.1:9001"},
|
||||
want: "service-default-default-goframe-provider-1-tcp-latest-127.0.0.1:9001",
|
||||
},
|
||||
{
|
||||
name: "Test_trimAndReplace-2",
|
||||
args: args{key: "/service/default/default/goframe-provider-2-tcp/latest/127.0.0.1:9002"},
|
||||
want: "service-default-default-goframe-provider-2-tcp-latest-127.0.0.1:9002",
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
if got := trimAndReplace(tt.args.key); got != tt.want {
|
||||
t.Errorf("trimAndReplace() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -91,8 +91,7 @@ func watchAndUpdateService(watchedServiceMap *gmap.StrAnyMap, watcher Watcher, s
|
||||
)
|
||||
for {
|
||||
time.Sleep(time.Second)
|
||||
services, err = watcher.Proceed()
|
||||
if err != nil {
|
||||
if services, err = watcher.Proceed(); err != nil {
|
||||
intlog.Errorf(ctx, `%+v`, err)
|
||||
continue
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user