From a16b95b25f2aaaab5490582ea27fc181440083d3 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Thu, 13 Feb 2025 16:35:11 +0800 Subject: [PATCH] 1 --- cmd/main.go | 23 +++++++------ internal/api/prometheus_discovery.go | 48 ++++++++++------------------ internal/api/router.go | 2 +- internal/tools/cron_task.go | 6 ++-- pkg/common/config/config.go | 2 +- 5 files changed, 35 insertions(+), 46 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index a8daa4acd..75bb39eed 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -8,12 +8,14 @@ import ( "fmt" "net" "os" + "os/signal" "path" "path/filepath" "reflect" "runtime" "strings" "sync" + "syscall" "time" "github.com/mitchellh/mapstructure" @@ -250,16 +252,17 @@ func (x *cmds) run(ctx context.Context) error { }() } - //go func() { - // sigs := make(chan os.Signal, 1) - // signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL) - // select { - // case <-ctx.Done(): - // return - // case val := <-sigs: - // cancel(fmt.Errorf("signal %s", val.String())) - // } - //}() + go func() { + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL) + select { + case <-ctx.Done(): + return + case val := <-sigs: + log.ZDebug(ctx, "recv signal", "signal", val.String()) + cancel(fmt.Errorf("signal %s", val.String())) + } + }() for i := range x.cmds { cmd := x.cmds[i] diff --git a/internal/api/prometheus_discovery.go b/internal/api/prometheus_discovery.go index 2516d0d14..65282cfde 100644 --- a/internal/api/prometheus_discovery.go +++ b/internal/api/prometheus_discovery.go @@ -2,6 +2,7 @@ package api import ( "encoding/json" + "errors" "net/http" "github.com/gin-gonic/gin" @@ -11,13 +12,13 @@ import ( "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/errs" - "github.com/openimsdk/tools/log" clientv3 "go.etcd.io/etcd/client/v3" ) type PrometheusDiscoveryApi struct { config *Config client *clientv3.Client + kv discovery.KeyValue } func NewPrometheusDiscoveryApi(config *Config, client discovery.Conn) *PrometheusDiscoveryApi { @@ -30,43 +31,26 @@ func NewPrometheusDiscoveryApi(config *Config, client discovery.Conn) *Prometheu return api } -func (p *PrometheusDiscoveryApi) Enable(c *gin.Context) { - if p.config.Discovery.Enable != conf.ETCD { - c.JSON(http.StatusOK, []struct{}{}) - c.Abort() - } -} - func (p *PrometheusDiscoveryApi) discovery(c *gin.Context, key string) { - eResp, err := p.client.Get(c, prommetrics.BuildDiscoveryKey(key)) + value, err := p.kv.GetKey(c, prommetrics.BuildDiscoveryKey(key)) if err != nil { - // Log and respond with an error if preparation fails. - apiresp.GinError(c, errs.WrapMsg(err, "etcd get err")) + if errors.Is(err, discovery.ErrNotSupported) { + c.JSON(http.StatusOK, []struct{}{}) + return + } + apiresp.GinError(c, errs.WrapMsg(err, "get key value")) return } - if len(eResp.Kvs) == 0 { - c.JSON(http.StatusOK, []*prommetrics.Target{}) + if len(value) == 0 { + c.JSON(http.StatusOK, []*prommetrics.RespTarget{}) + return } - - var ( - resp = &prommetrics.RespTarget{ - Targets: make([]string, 0, len(eResp.Kvs)), - } - ) - - for i := range eResp.Kvs { - var target prommetrics.Target - err = json.Unmarshal(eResp.Kvs[i].Value, &target) - if err != nil { - log.ZError(c, "prometheus unmarshal err", errs.Wrap(err)) - } - resp.Targets = append(resp.Targets, target.Target) - if resp.Labels == nil { - resp.Labels = target.Labels - } + var resp prommetrics.RespTarget + if err := json.Unmarshal(value, &resp); err != nil { + apiresp.GinError(c, errs.WrapMsg(err, "json unmarshal err")) + return } - - c.JSON(200, []*prommetrics.RespTarget{resp}) + c.JSON(http.StatusOK, []*prommetrics.RespTarget{&resp}) } func (p *PrometheusDiscoveryApi) Api(c *gin.Context) { diff --git a/internal/api/router.go b/internal/api/router.go index e8c1173a5..ebaff019c 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -283,7 +283,7 @@ func newGinRouter(ctx context.Context, client discovery.Conn, cfg *Config) (*gin } { pd := NewPrometheusDiscoveryApi(cfg, client) - proDiscoveryGroup := r.Group("/prometheus_discovery", pd.Enable) + proDiscoveryGroup := r.Group("/prometheus_discovery") proDiscoveryGroup.GET("/api", pd.Api) proDiscoveryGroup.GET("/user", pd.User) proDiscoveryGroup.GET("/group", pd.Group) diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index 7fb7b4abc..fcf00ba41 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -25,11 +25,11 @@ type Config struct { Discovery config.Discovery } -// func Start(ctx context.Context, conf *CronTaskConfig) error { func Start(ctx context.Context, conf *Config, client discovery.Conn, service grpc.ServiceRegistrar) error { - log.CInfo(ctx, "CRON-TASK server is initializing", "runTimeEnv", runtimeenv.RuntimeEnvironment(), "chatRecordsClearTime", conf.CronTask.CronExecuteTime, "msgDestructTime", conf.CronTask.RetainChatRecords) if conf.CronTask.RetainChatRecords < 1 { + log.ZInfo(ctx, "disable cron") + <-ctx.Done() return nil } ctx = mcontext.SetOpUserID(ctx, conf.Share.IMAdminUserID[0]) @@ -78,7 +78,9 @@ func Start(ctx context.Context, conf *Config, client discovery.Conn, service grp } log.ZDebug(ctx, "start cron task", "CronExecuteTime", conf.CronTask.CronExecuteTime) srv.cron.Start() + log.ZDebug(ctx, "cron task server is running") <-ctx.Done() + log.ZDebug(ctx, "cron task server is shutting down") return nil } diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 2cc6979b2..c7a7b2d95 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -323,7 +323,7 @@ type RPC struct { } type Redis struct { - Disable bool `yaml:"disable"` + Disable bool `yaml:"-"` Address []string `yaml:"address"` Username string `yaml:"username"` Password string `yaml:"password"`