mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-06 04:15:46 +08:00
Add etcd as a service discovery mechanism
This commit is contained in:
parent
3d4e95d7c8
commit
73824cc5f2
@ -2,7 +2,6 @@ package push
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/openimsdk/protocol/msggateway"
|
||||
"github.com/openimsdk/protocol/sdkws"
|
||||
"github.com/openimsdk/tools/discovery"
|
||||
@ -87,8 +86,6 @@ func (d *DefaultAllNode) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.M
|
||||
|
||||
// Online push message
|
||||
for _, conn := range conns {
|
||||
fmt.Println(ctx, "get gateway conn detail ", "conn ", *conn)
|
||||
|
||||
conn := conn // loop var safe
|
||||
ctx := ctx
|
||||
wg.Go(func() error {
|
||||
|
@ -97,10 +97,8 @@ func (r *SvcDiscoveryRegistryImpl) GetUserIdHashGatewayHost(ctx context.Context,
|
||||
// GetConns returns gRPC client connections for a given service name
|
||||
func (r *SvcDiscoveryRegistryImpl) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) {
|
||||
fullServiceKey := fmt.Sprintf("%s/%s", r.rootDirectory, serviceName)
|
||||
|
||||
r.mu.RLock()
|
||||
defer r.mu.RUnlock()
|
||||
fmt.Printf("all conns ", serviceName, r.connMap[fullServiceKey])
|
||||
return r.connMap[fullServiceKey], nil
|
||||
}
|
||||
|
||||
@ -122,9 +120,7 @@ func (r *SvcDiscoveryRegistryImpl) AddOption(opts ...grpc.DialOption) {
|
||||
|
||||
// CloseConn closes a given gRPC client connection
|
||||
func (r *SvcDiscoveryRegistryImpl) CloseConn(conn *grpc.ClientConn) {
|
||||
if err := conn.Close(); err != nil {
|
||||
fmt.Printf("Failed to close connection: %v\n", err)
|
||||
}
|
||||
conn.Close()
|
||||
}
|
||||
|
||||
// Register registers a new service endpoint with etcd
|
||||
@ -136,7 +132,7 @@ func (r *SvcDiscoveryRegistryImpl) Register(serviceName, host string, port int,
|
||||
}
|
||||
r.endpointMgr = em
|
||||
|
||||
leaseResp, err := r.client.Grant(context.Background(), 60) // Increase TTL time
|
||||
leaseResp, err := r.client.Grant(context.Background(), 30) //
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -158,14 +154,11 @@ func (r *SvcDiscoveryRegistryImpl) Register(serviceName, host string, port int,
|
||||
func (r *SvcDiscoveryRegistryImpl) keepAliveLease(leaseID clientv3.LeaseID) {
|
||||
ch, err := r.client.KeepAlive(context.Background(), leaseID)
|
||||
if err != nil {
|
||||
fmt.Printf("Failed to keep lease alive: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
for ka := range ch {
|
||||
if ka != nil {
|
||||
} else {
|
||||
fmt.Printf("Lease keep-alive response channel closed\n")
|
||||
return
|
||||
}
|
||||
}
|
||||
@ -175,10 +168,14 @@ func (r *SvcDiscoveryRegistryImpl) keepAliveLease(leaseID clientv3.LeaseID) {
|
||||
func (r *SvcDiscoveryRegistryImpl) watchServiceChanges() {
|
||||
watchChan := r.client.Watch(context.Background(), r.rootDirectory, clientv3.WithPrefix())
|
||||
for watchResp := range watchChan {
|
||||
updatedPrefixes := make(map[string]struct{}) // Create a set to track updated prefixes
|
||||
|
||||
for _, event := range watchResp.Events {
|
||||
prefix, _ := r.splitEndpoint(string(event.Kv.Key))
|
||||
fmt.Printf("Change detected for prefix: %s\n", prefix)
|
||||
r.refreshConnMap(prefix)
|
||||
if _, alreadyUpdated := updatedPrefixes[prefix]; !alreadyUpdated {
|
||||
updatedPrefixes[prefix] = struct{}{} // Mark this prefix as updated
|
||||
r.refreshConnMap(prefix)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -191,7 +188,6 @@ func (r *SvcDiscoveryRegistryImpl) refreshConnMap(prefix string) {
|
||||
fullPrefix := fmt.Sprintf("%s/", prefix)
|
||||
resp, err := r.client.Get(context.Background(), fullPrefix, clientv3.WithPrefix())
|
||||
if err != nil {
|
||||
fmt.Printf("Failed to get endpoints: %v\n", err)
|
||||
return
|
||||
}
|
||||
r.connMap[prefix] = []*grpc.ClientConn{} // Update the connMap with new connections
|
||||
@ -199,7 +195,6 @@ func (r *SvcDiscoveryRegistryImpl) refreshConnMap(prefix string) {
|
||||
_, addr := r.splitEndpoint(string(kv.Key))
|
||||
conn, err := grpc.DialContext(context.Background(), addr, append(r.dialOptions, grpc.WithResolvers(r.resolver))...)
|
||||
if err != nil {
|
||||
fmt.Printf("Failed to dial new endpoint: %v\n", err)
|
||||
continue
|
||||
}
|
||||
r.connMap[prefix] = append(r.connMap[prefix], conn)
|
||||
@ -244,11 +239,9 @@ func Check(ctx context.Context, etcdServers []string, etcdRoot string, createIfN
|
||||
cfg := clientv3.Config{
|
||||
Endpoints: etcdServers,
|
||||
}
|
||||
|
||||
for _, opt := range options {
|
||||
opt(&cfg)
|
||||
}
|
||||
|
||||
client, err := clientv3.New(cfg)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to connect to etcd")
|
||||
@ -279,7 +272,6 @@ func Check(ctx context.Context, etcdServers []string, etcdRoot string, createIfN
|
||||
return errors.Wrap(err, "failed to create lease in etcd")
|
||||
}
|
||||
}
|
||||
|
||||
putOpts := []clientv3.OpOption{}
|
||||
if leaseResp != nil {
|
||||
putOpts = append(putOpts, clientv3.WithLease(leaseResp.ID))
|
||||
@ -289,13 +281,9 @@ func Check(ctx context.Context, etcdServers []string, etcdRoot string, createIfN
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to create the root node in etcd")
|
||||
}
|
||||
fmt.Printf("Root node %s did not exist, but has been created.\n", etcdRoot)
|
||||
} else {
|
||||
return fmt.Errorf("root node %s does not exist in etcd", etcdRoot)
|
||||
}
|
||||
} else {
|
||||
fmt.Printf("Etcd is running and the root node %s exists.\n", etcdRoot)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user