mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-10-25 20:52:11 +08:00 
			
		
		
		
	fix: prometheus discovery
This commit is contained in:
		
							parent
							
								
									812c1e4127
								
							
						
					
					
						commit
						1a5c094e13
					
				
							
								
								
									
										17
									
								
								cmd/main.go
									
									
									
									
									
								
							
							
						
						
									
										17
									
								
								cmd/main.go
									
									
									
									
									
								
							| @ -3,7 +3,6 @@ package main | ||||
| import ( | ||||
| 	"bytes" | ||||
| 	"context" | ||||
| 	"encoding/json" | ||||
| 	"flag" | ||||
| 	"fmt" | ||||
| 	"net" | ||||
| @ -39,7 +38,6 @@ import ( | ||||
| 	"github.com/openimsdk/tools/log" | ||||
| 	"github.com/openimsdk/tools/system/program" | ||||
| 	"github.com/openimsdk/tools/utils/datautil" | ||||
| 	"github.com/openimsdk/tools/utils/network" | ||||
| 	"github.com/spf13/viper" | ||||
| 	"google.golang.org/grpc" | ||||
| ) | ||||
| @ -250,23 +248,12 @@ func (x *cmds) run(ctx context.Context) error { | ||||
| 				return err | ||||
| 			} | ||||
| 		} | ||||
| 		ip, err := network.GetLocalIP() | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) | ||||
| 		if err != nil { | ||||
| 			return fmt.Errorf("prometheus listen %d error %w", port, err) | ||||
| 		} | ||||
| 		defer listener.Close() | ||||
| 		log.ZDebug(ctx, "prometheus start", "addr", listener.Addr()) | ||||
| 		target, err := json.Marshal(prommetrics.BuildDefaultTarget(ip, listener.Addr().(*net.TCPAddr).Port)) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		if err := standalone.GetKeyValue().SetKey(ctx, prommetrics.BuildDiscoveryKey(prommetrics.APIKeyName), target); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		go func() { | ||||
| 			err := prommetrics.Start(listener) | ||||
| 			if err == nil { | ||||
| @ -342,7 +329,7 @@ func (x *cmds) run(ctx context.Context) error { | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func putCmd[C any](cmd *cmds, block bool, fn func(ctx context.Context, config *C, client discovery.Conn, server grpc.ServiceRegistrar) error) { | ||||
| func putCmd[C any](cmd *cmds, block bool, fn func(ctx context.Context, config *C, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error) { | ||||
| 	name := path.Base(runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()) | ||||
| 	if index := strings.Index(name, "."); index >= 0 { | ||||
| 		name = name[:index] | ||||
| @ -352,7 +339,7 @@ func putCmd[C any](cmd *cmds, block bool, fn func(ctx context.Context, config *C | ||||
| 		if err := cmd.parseConf(&conf); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		return fn(ctx, &conf, standalone.GetDiscoveryConn(), standalone.GetServiceRegistrar()) | ||||
| 		return fn(ctx, &conf, standalone.GetSvcDiscoveryRegistry(), standalone.GetServiceRegistrar()) | ||||
| 	}) | ||||
| } | ||||
| 
 | ||||
|  | ||||
							
								
								
									
										2
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								go.mod
									
									
									
									
									
								
							| @ -13,7 +13,7 @@ require ( | ||||
| 	github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 | ||||
| 	github.com/mitchellh/mapstructure v1.5.0 | ||||
| 	github.com/openimsdk/protocol v0.0.73-alpha.12 | ||||
| 	github.com/openimsdk/tools v0.0.50-alpha.84 | ||||
| 	github.com/openimsdk/tools v0.0.50-alpha.85 | ||||
| 	github.com/pkg/errors v0.9.1 // indirect | ||||
| 	github.com/prometheus/client_golang v1.18.0 | ||||
| 	github.com/stretchr/testify v1.9.0 | ||||
|  | ||||
							
								
								
									
										4
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										4
									
								
								go.sum
									
									
									
									
									
								
							| @ -349,8 +349,8 @@ github.com/openimsdk/gomake v0.0.15-alpha.5 h1:eEZCEHm+NsmcO3onXZPIUbGFCYPYbsX5b | ||||
| github.com/openimsdk/gomake v0.0.15-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= | ||||
| github.com/openimsdk/protocol v0.0.73-alpha.12 h1:2NYawXeHChYUeSme6QJ9pOLh+Empce2WmwEtbP4JvKk= | ||||
| github.com/openimsdk/protocol v0.0.73-alpha.12/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= | ||||
| github.com/openimsdk/tools v0.0.50-alpha.84 h1:jN60Ys/0edZjL/TDmm/5VSJFP4pGYRipkWqhILJbq/8= | ||||
| github.com/openimsdk/tools v0.0.50-alpha.84/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= | ||||
| github.com/openimsdk/tools v0.0.50-alpha.85 h1:OqTUYx6r7Zp/eH8FKB08XeNjPV405TUIG9QT6QQ+F+s= | ||||
| github.com/openimsdk/tools v0.0.50-alpha.85/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= | ||||
| github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= | ||||
| github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= | ||||
| github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= | ||||
|  | ||||
| @ -39,7 +39,7 @@ type Config struct { | ||||
| 	Index      conf.Index | ||||
| } | ||||
| 
 | ||||
| func Start(ctx context.Context, config *Config, client discovery.Conn, service grpc.ServiceRegistrar) error { | ||||
| func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, service grpc.ServiceRegistrar) error { | ||||
| 	apiPort, err := datautil.GetElemByIndex(config.API.Api.Ports, int(config.Index)) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
|  | ||||
| @ -6,35 +6,29 @@ import ( | ||||
| 	"net/http" | ||||
| 
 | ||||
| 	"github.com/gin-gonic/gin" | ||||
| 	conf "github.com/openimsdk/open-im-server/v3/pkg/common/config" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" | ||||
| 	"github.com/openimsdk/tools/apiresp" | ||||
| 	"github.com/openimsdk/tools/discovery" | ||||
| 	"github.com/openimsdk/tools/discovery/etcd" | ||||
| 	"github.com/openimsdk/tools/errs" | ||||
| 	clientv3 "go.etcd.io/etcd/client/v3" | ||||
| ) | ||||
| 
 | ||||
| type PrometheusDiscoveryApi struct { | ||||
| 	config *Config | ||||
| 	client *clientv3.Client | ||||
| 	kv     discovery.KeyValue | ||||
| } | ||||
| 
 | ||||
| func NewPrometheusDiscoveryApi(config *Config, client discovery.Conn) *PrometheusDiscoveryApi { | ||||
| func NewPrometheusDiscoveryApi(config *Config, client discovery.SvcDiscoveryRegistry) *PrometheusDiscoveryApi { | ||||
| 	api := &PrometheusDiscoveryApi{ | ||||
| 		config: config, | ||||
| 	} | ||||
| 	if config.Discovery.Enable == conf.ETCD { | ||||
| 		api.client = client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() | ||||
| 		kv:     client, | ||||
| 	} | ||||
| 	return api | ||||
| } | ||||
| 
 | ||||
| func (p *PrometheusDiscoveryApi) discovery(c *gin.Context, key string) { | ||||
| 	value, err := p.kv.GetKey(c, prommetrics.BuildDiscoveryKey(key)) | ||||
| 	value, err := p.kv.GetKeyWithPrefix(c, prommetrics.BuildDiscoveryKeyPrefix(key)) | ||||
| 	if err != nil { | ||||
| 		if errors.Is(err, discovery.ErrNotSupportedKeyValue) { | ||||
| 		if errors.Is(err, discovery.ErrNotSupported) { | ||||
| 			c.JSON(http.StatusOK, []struct{}{}) | ||||
| 			return | ||||
| 		} | ||||
| @ -46,10 +40,17 @@ func (p *PrometheusDiscoveryApi) discovery(c *gin.Context, key string) { | ||||
| 		return | ||||
| 	} | ||||
| 	var resp prommetrics.RespTarget | ||||
| 	if err := json.Unmarshal(value, &resp); err != nil { | ||||
| 	for i := range value { | ||||
| 		var tmp prommetrics.Target | ||||
| 		if err = json.Unmarshal(value[i], &tmp); err != nil { | ||||
| 			apiresp.GinError(c, errs.WrapMsg(err, "json unmarshal err")) | ||||
| 			return | ||||
| 		} | ||||
| 
 | ||||
| 		resp.Targets = append(resp.Targets, tmp.Target) | ||||
| 		resp.Labels = tmp.Labels // default label is fixed. See prommetrics.BuildDefaultTarget | ||||
| 	} | ||||
| 
 | ||||
| 	c.JSON(http.StatusOK, []*prommetrics.RespTarget{&resp}) | ||||
| } | ||||
| 
 | ||||
|  | ||||
| @ -53,7 +53,7 @@ func prommetricsGin() gin.HandlerFunc { | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func newGinRouter(ctx context.Context, client discovery.Conn, cfg *Config) (*gin.Engine, error) { | ||||
| func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, cfg *Config) (*gin.Engine, error) { | ||||
| 	authConn, err := client.GetConn(ctx, cfg.Discovery.RpcService.Auth) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
|  | ||||
| @ -39,7 +39,7 @@ type Config struct { | ||||
| } | ||||
| 
 | ||||
| // Start run ws server. | ||||
| func Start(ctx context.Context, conf *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { | ||||
| func Start(ctx context.Context, conf *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error { | ||||
| 	log.CInfo(ctx, "MSG-GATEWAY server is initializing", "runtimeEnv", runtimeenv.RuntimeEnvironment(), | ||||
| 		"rpcPorts", conf.MsgGateway.RPC.Ports, | ||||
| 		"wsPort", conf.MsgGateway.LongConnSvr.Ports, "prometheusPorts", conf.MsgGateway.Prometheus.Ports) | ||||
|  | ||||
| @ -58,7 +58,7 @@ type Config struct { | ||||
| 	Index          conf.Index | ||||
| } | ||||
| 
 | ||||
| func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { | ||||
| func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error { | ||||
| 	builder := mqbuild.NewBuilder(&config.KafkaConfig) | ||||
| 
 | ||||
| 	log.CInfo(ctx, "MSG-TRANSFER server is initializing", "runTimeEnv", runtimeenv.RuntimeEnvironment(), "prometheusPorts", | ||||
|  | ||||
| @ -50,7 +50,7 @@ func (p pushServer) DelUserPushToken(ctx context.Context, | ||||
| 	return &pbpush.DelUserPushTokenResp{}, nil | ||||
| } | ||||
| 
 | ||||
| func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { | ||||
| func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error { | ||||
| 	dbb := dbbuild.NewBuilder(&config.MongoConfig, &config.RedisConfig) | ||||
| 	rdb, err := dbb.Redis(ctx) | ||||
| 	if err != nil { | ||||
|  | ||||
| @ -59,7 +59,7 @@ type Config struct { | ||||
| 	Discovery   config.Discovery | ||||
| } | ||||
| 
 | ||||
| func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { | ||||
| func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error { | ||||
| 	dbb := dbbuild.NewBuilder(&config.MongoConfig, &config.RedisConfig) | ||||
| 	rdb, err := dbb.Redis(ctx) | ||||
| 	if err != nil { | ||||
|  | ||||
| @ -69,7 +69,7 @@ type Config struct { | ||||
| 	Discovery          config.Discovery | ||||
| } | ||||
| 
 | ||||
| func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { | ||||
| func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error { | ||||
| 	dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig) | ||||
| 	mgocli, err := dbb.Mongo(ctx) | ||||
| 	if err != nil { | ||||
|  | ||||
| @ -76,7 +76,7 @@ type Config struct { | ||||
| 	Discovery          config.Discovery | ||||
| } | ||||
| 
 | ||||
| func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { | ||||
| func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error { | ||||
| 	dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig) | ||||
| 	mgocli, err := dbb.Mongo(ctx) | ||||
| 	if err != nil { | ||||
|  | ||||
| @ -78,7 +78,7 @@ func (m *msgServer) addInterceptorHandler(interceptorFunc ...MessageInterceptorF | ||||
| 
 | ||||
| } | ||||
| 
 | ||||
| func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { | ||||
| func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error { | ||||
| 	builder := mqbuild.NewBuilder(&config.KafkaConfig) | ||||
| 	redisProducer, err := builder.GetTopicProducer(ctx, config.KafkaConfig.ToRedisTopic) | ||||
| 	if err != nil { | ||||
|  | ||||
| @ -66,7 +66,7 @@ type Config struct { | ||||
| 	Discovery          config.Discovery | ||||
| } | ||||
| 
 | ||||
| func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { | ||||
| func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error { | ||||
| 	dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig) | ||||
| 	mgocli, err := dbb.Mongo(ctx) | ||||
| 	if err != nil { | ||||
|  | ||||
| @ -64,7 +64,7 @@ type Config struct { | ||||
| 	Discovery          config.Discovery | ||||
| } | ||||
| 
 | ||||
| func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { | ||||
| func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error { | ||||
| 	dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig) | ||||
| 	mgocli, err := dbb.Mongo(ctx) | ||||
| 	if err != nil { | ||||
|  | ||||
| @ -79,7 +79,7 @@ type Config struct { | ||||
| 	Discovery          config.Discovery | ||||
| } | ||||
| 
 | ||||
| func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { | ||||
| func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error { | ||||
| 	dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig) | ||||
| 	mgocli, err := dbb.Mongo(ctx) | ||||
| 	if err != nil { | ||||
|  | ||||
| @ -25,7 +25,7 @@ type Config struct { | ||||
| 	Discovery config.Discovery | ||||
| } | ||||
| 
 | ||||
| func Start(ctx context.Context, conf *Config, client discovery.Conn, service grpc.ServiceRegistrar) error { | ||||
| func Start(ctx context.Context, conf *Config, client discovery.SvcDiscoveryRegistry, service grpc.ServiceRegistrar) error { | ||||
| 	log.CInfo(ctx, "CRON-TASK server is initializing", "runTimeEnv", runtimeenv.RuntimeEnvironment(), "chatRecordsClearTime", conf.CronTask.CronExecuteTime, "msgDestructTime", conf.CronTask.RetainChatRecords) | ||||
| 	if conf.CronTask.RetainChatRecords < 1 { | ||||
| 		log.ZInfo(ctx, "disable cron") | ||||
|  | ||||
| @ -19,6 +19,7 @@ import ( | ||||
| 
 | ||||
| 	"github.com/openimsdk/open-im-server/v3/internal/api" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/config" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" | ||||
| 	"github.com/openimsdk/open-im-server/v3/version" | ||||
| 	"github.com/openimsdk/tools/system/program" | ||||
| @ -84,7 +85,7 @@ func (a *ApiCmd) runE() error { | ||||
| 		a.apiConfig.API.Api.ListenIP, "", | ||||
| 		a.apiConfig.API.Prometheus.AutoSetPorts, | ||||
| 		nil, int(a.apiConfig.Index), | ||||
| 		a.apiConfig.Discovery.RpcService.MessageGateway, | ||||
| 		prommetrics.APIKeyName, | ||||
| 		&a.apiConfig.Notification, | ||||
| 		a.apiConfig, | ||||
| 		[]string{}, | ||||
|  | ||||
| @ -19,6 +19,7 @@ import ( | ||||
| 
 | ||||
| 	"github.com/openimsdk/open-im-server/v3/internal/msgtransfer" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/config" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" | ||||
| 	"github.com/openimsdk/open-im-server/v3/version" | ||||
| 	"github.com/openimsdk/tools/system/program" | ||||
| @ -65,7 +66,7 @@ func (m *MsgTransferCmd) runE() error { | ||||
| 		"", "", | ||||
| 		true, | ||||
| 		nil, int(m.msgTransferConfig.Index), | ||||
| 		"", | ||||
| 		prommetrics.MessageTransferKeyName, | ||||
| 		nil, | ||||
| 		m.msgTransferConfig, | ||||
| 		[]string{}, | ||||
|  | ||||
| @ -85,6 +85,8 @@ func Start(listener net.Listener) error { | ||||
| const ( | ||||
| 	APIKeyName             = "api" | ||||
| 	MessageTransferKeyName = "message-transfer" | ||||
| 
 | ||||
| 	TTL = 300 | ||||
| ) | ||||
| 
 | ||||
| type Target struct { | ||||
| @ -97,10 +99,14 @@ type RespTarget struct { | ||||
| 	Labels  map[string]string `json:"labels"` | ||||
| } | ||||
| 
 | ||||
| func BuildDiscoveryKey(name string) string { | ||||
| func BuildDiscoveryKeyPrefix(name string) string { | ||||
| 	return fmt.Sprintf("%s/%s/%s", "openim", "prometheus_discovery", name) | ||||
| } | ||||
| 
 | ||||
| func BuildDiscoveryKey(name string, index int) string { | ||||
| 	return fmt.Sprintf("%s/%s/%s/%d", "openim", "prometheus_discovery", name, index) | ||||
| } | ||||
| 
 | ||||
| func BuildDefaultTarget(host string, ip int) Target { | ||||
| 	return Target{ | ||||
| 		Target: fmt.Sprintf("%s:%d", host, ip), | ||||
|  | ||||
| @ -50,7 +50,7 @@ func init() { | ||||
| func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *conf.Prometheus, listenIP, | ||||
| 	registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, notification *conf.Notification, config T, | ||||
| 	watchConfigNames []string, watchServiceNames []string, | ||||
| 	rpcFn func(ctx context.Context, config T, client discovery.Conn, server grpc.ServiceRegistrar) error, | ||||
| 	rpcFn func(ctx context.Context, config T, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error, | ||||
| 	options ...grpc.ServerOption) error { | ||||
| 
 | ||||
| 	if notification != nil { | ||||
| @ -148,11 +148,13 @@ func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *c | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		if err := client.SetKey(ctx, prommetrics.BuildDiscoveryKey(prommetrics.APIKeyName), target); err != nil { | ||||
| 			if !errors.Is(err, discovery.ErrNotSupportedKeyValue) { | ||||
| 		if autoSetPorts { | ||||
| 			if err = client.SetWithLease(ctx, prommetrics.BuildDiscoveryKey(rpcRegisterName, index), target, prommetrics.TTL); err != nil { | ||||
| 				if !errors.Is(err, discovery.ErrNotSupported) { | ||||
| 					return err | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 		go func() { | ||||
| 			err := prommetrics.Start(prometheusListener) | ||||
| 			if err == nil { | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user