From b7ca3bd95f580fc16d6de1caf8d9708e97c42dc3 Mon Sep 17 00:00:00 2001 From: HonQi <14954524+HonQii@users.noreply.github.com> Date: Wed, 4 Jun 2025 10:46:20 +0800 Subject: [PATCH 1/4] fix redis config db field (#3395) --- pkg/common/config/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index d5ae68ec0..cd57a11cf 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -328,7 +328,7 @@ type Redis struct { Username string `yaml:"username"` Password string `yaml:"password"` ClusterMode bool `yaml:"clusterMode"` - DB int `yaml:"storage"` + DB int `yaml:"db"` MaxRetry int `yaml:"maxRetry"` PoolSize int `yaml:"poolSize"` } From 04ee509b68b4ad1579d948cbaf82b37d2a2e518c Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Fri, 6 Jun 2025 11:29:45 +0800 Subject: [PATCH 2/4] fix: solve incorrect when sendMsg webhook callback after. (#3409) * fix: solve incorrect when sendMsg webhook callback after. * remove print. --- internal/msgtransfer/callback.go | 124 ++++++++++++++++++ internal/msgtransfer/init.go | 2 +- .../online_msg_to_mongo_handler.go | 18 ++- internal/rpc/msg/send.go | 6 +- 4 files changed, 146 insertions(+), 4 deletions(-) create mode 100644 internal/msgtransfer/callback.go diff --git a/internal/msgtransfer/callback.go b/internal/msgtransfer/callback.go new file mode 100644 index 000000000..ea51c2839 --- /dev/null +++ b/internal/msgtransfer/callback.go @@ -0,0 +1,124 @@ +package msgtransfer + +import ( + "context" + "encoding/base64" + + "github.com/openimsdk/open-im-server/v3/pkg/apistruct" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" + "github.com/openimsdk/protocol/constant" + "github.com/openimsdk/protocol/sdkws" + "github.com/openimsdk/tools/mcontext" + "github.com/openimsdk/tools/utils/datautil" + "github.com/openimsdk/tools/utils/stringutil" + "google.golang.org/protobuf/proto" + + cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" +) + +func toCommonCallback(ctx context.Context, msg *sdkws.MsgData, command string) cbapi.CommonCallbackReq { + return cbapi.CommonCallbackReq{ + SendID: msg.SendID, + ServerMsgID: msg.ServerMsgID, + CallbackCommand: command, + ClientMsgID: msg.ClientMsgID, + OperationID: mcontext.GetOperationID(ctx), + SenderPlatformID: msg.SenderPlatformID, + SenderNickname: msg.SenderNickname, + SessionType: msg.SessionType, + MsgFrom: msg.MsgFrom, + ContentType: msg.ContentType, + Status: msg.Status, + SendTime: msg.SendTime, + CreateTime: msg.CreateTime, + AtUserIDList: msg.AtUserIDList, + SenderFaceURL: msg.SenderFaceURL, + Content: GetContent(msg), + Seq: uint32(msg.Seq), + Ex: msg.Ex, + } +} + +func GetContent(msg *sdkws.MsgData) string { + if msg.ContentType >= constant.NotificationBegin && msg.ContentType <= constant.NotificationEnd { + var tips sdkws.TipsComm + _ = proto.Unmarshal(msg.Content, &tips) + content := tips.JsonDetail + return content + } else { + return string(msg.Content) + } +} + +func (mc *OnlineHistoryMongoConsumerHandler) webhookAfterSendSingleMsg(ctx context.Context, after *config.AfterConfig, msg *sdkws.MsgData) { + if msg.ContentType == constant.Typing { + return + } + if !filterAfterMsg(msg, after) { + return + } + cbReq := &cbapi.CallbackAfterSendSingleMsgReq{ + CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand), + RecvID: msg.RecvID, + } + mc.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendSingleMsgResp{}, after, buildKeyMsgDataQuery(msg)) +} + +func (mc *OnlineHistoryMongoConsumerHandler) webhookAfterSendGroupMsg(ctx context.Context, after *config.AfterConfig, msg *sdkws.MsgData) { + if msg.ContentType == constant.Typing { + return + } + if !filterAfterMsg(msg, after) { + return + } + cbReq := &cbapi.CallbackAfterSendGroupMsgReq{ + CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendGroupMsgCommand), + GroupID: msg.GroupID, + } + + mc.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after, buildKeyMsgDataQuery(msg)) +} + +func buildKeyMsgDataQuery(msg *sdkws.MsgData) map[string]string { + keyMsgData := apistruct.KeyMsgData{ + SendID: msg.SendID, + RecvID: msg.RecvID, + GroupID: msg.GroupID, + } + + return map[string]string{ + webhook.Key: base64.StdEncoding.EncodeToString(stringutil.StructToJsonBytes(keyMsgData)), + } +} + +func filterAfterMsg(msg *sdkws.MsgData, after *config.AfterConfig) bool { + return filterMsg(msg, after.AttentionIds, after.DeniedTypes) +} + +func filterMsg(msg *sdkws.MsgData, attentionIds []string, deniedTypes []int32) bool { + // According to the attentionIds configuration, only some users are sent + if len(attentionIds) != 0 && !datautil.Contain(msg.RecvID, attentionIds...) { + return false + } + + if defaultDeniedTypes(msg.ContentType) { + return false + } + + if len(deniedTypes) != 0 && datautil.Contain(msg.ContentType, deniedTypes...) { + return false + } + + return true +} + +func defaultDeniedTypes(contentType int32) bool { + if contentType >= constant.NotificationBegin && contentType <= constant.NotificationEnd { + return true + } + if contentType == constant.Typing { + return true + } + return false +} diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 175813552..b07ec6f1d 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -134,7 +134,7 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, server gr if err != nil { return err } - historyMongoHandler := NewOnlineHistoryMongoConsumerHandler(msgTransferDatabase) + historyMongoHandler := NewOnlineHistoryMongoConsumerHandler(msgTransferDatabase,config) msgTransfer := &MsgTransfer{ historyConsumer: historyConsumer, diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index a895bb9c4..6c1498f82 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -19,6 +19,8 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" + "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" + "github.com/openimsdk/protocol/constant" pbmsg "github.com/openimsdk/protocol/msg" "github.com/openimsdk/tools/log" "google.golang.org/protobuf/proto" @@ -26,11 +28,15 @@ import ( type OnlineHistoryMongoConsumerHandler struct { msgTransferDatabase controller.MsgTransferDatabase + config *Config + webhookClient *webhook.Client } -func NewOnlineHistoryMongoConsumerHandler(database controller.MsgTransferDatabase) *OnlineHistoryMongoConsumerHandler { +func NewOnlineHistoryMongoConsumerHandler(database controller.MsgTransferDatabase, config *Config) *OnlineHistoryMongoConsumerHandler { return &OnlineHistoryMongoConsumerHandler{ msgTransferDatabase: database, + config: config, + webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL), } } @@ -53,6 +59,16 @@ func (mc *OnlineHistoryMongoConsumerHandler) HandleChatWs2Mongo(ctx context.Cont } else { prommetrics.MsgInsertMongoSuccessCounter.Inc() } + + for _, msgData := range msgFromMQ.MsgData { + switch msgData.SessionType { + case constant.SingleChatType: + mc.webhookAfterSendSingleMsg(ctx, &mc.config.WebhooksConfig.AfterSendSingleMsg, msgData) + case constant.ReadGroupChatType: + mc.webhookAfterSendGroupMsg(ctx, &mc.config.WebhooksConfig.AfterSendGroupMsg, msgData) + } + } + //var seqs []int64 //for _, msg := range msgFromMQ.MsgData { // seqs = append(seqs, msg.Seq) diff --git a/internal/rpc/msg/send.go b/internal/rpc/msg/send.go index f13b80708..d97905bff 100644 --- a/internal/rpc/msg/send.go +++ b/internal/rpc/msg/send.go @@ -86,7 +86,8 @@ func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *pbmsg.SendMsgReq, go m.setConversationAtInfo(ctx, req.MsgData) } - m.webhookAfterSendGroupMsg(ctx, &m.config.WebhooksConfig.AfterSendGroupMsg, req) + // m.webhookAfterSendGroupMsg(ctx, &m.config.WebhooksConfig.AfterSendGroupMsg, req) + prommetrics.GroupChatMsgProcessSuccessCounter.Inc() resp = &pbmsg.SendMsgResp{} resp.SendTime = req.MsgData.SendTime @@ -192,7 +193,8 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq prommetrics.SingleChatMsgProcessFailedCounter.Inc() return nil, err } - m.webhookAfterSendSingleMsg(ctx, &m.config.WebhooksConfig.AfterSendSingleMsg, req) + + // m.webhookAfterSendSingleMsg(ctx, &m.config.WebhooksConfig.AfterSendSingleMsg, req) prommetrics.SingleChatMsgProcessSuccessCounter.Inc() return &pbmsg.SendMsgResp{ ServerMsgID: req.MsgData.ServerMsgID, From 75367545ea330bdb67a6e4242cb31e5652ce44dc Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Fri, 6 Jun 2025 14:25:58 +0800 Subject: [PATCH 3/4] feat: support distributed lock in crontask. (#3401) * feat: support distributed lock in crontask. * remove space. * remove comment. * remove log. * Update logic. * Update contents. --- internal/tools/cron/cron_task.go | 21 ++++++-- internal/tools/cron/dist_look.go | 89 ++++++++++++++++++++++++++++++++ start-config.yml | 2 +- 3 files changed, 108 insertions(+), 4 deletions(-) create mode 100644 internal/tools/cron/dist_look.go diff --git a/internal/tools/cron/cron_task.go b/internal/tools/cron/cron_task.go index 7ae314193..a4de309d4 100644 --- a/internal/tools/cron/cron_task.go +++ b/internal/tools/cron/cron_task.go @@ -58,6 +58,11 @@ func Start(ctx context.Context, conf *Config, client discovery.Conn, service grp cm.Watch(ctx) } + locker, err := NewEtcdLocker(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient()) + if err != nil { + return err + } + srv := &cronServer{ ctx: ctx, config: conf, @@ -65,6 +70,7 @@ func Start(ctx context.Context, conf *Config, client discovery.Conn, service grp msgClient: msg.NewMsgClient(msgConn), conversationClient: pbconversation.NewConversationClient(conversationConn), thirdClient: third.NewThirdClient(thirdConn), + locker: locker, } if err := srv.registerClearS3(); err != nil { @@ -81,6 +87,8 @@ func Start(ctx context.Context, conf *Config, client discovery.Conn, service grp log.ZDebug(ctx, "cron task server is running") <-ctx.Done() log.ZDebug(ctx, "cron task server is shutting down") + srv.cron.Stop() + return nil } @@ -91,6 +99,7 @@ type cronServer struct { msgClient msg.MsgClient conversationClient pbconversation.ConversationClient thirdClient third.ThirdClient + locker *EtcdLocker } func (c *cronServer) registerClearS3() error { @@ -98,7 +107,9 @@ func (c *cronServer) registerClearS3() error { log.ZInfo(c.ctx, "disable scheduled cleanup of s3", "fileExpireTime", c.config.CronTask.FileExpireTime, "deleteObjectType", c.config.CronTask.DeleteObjectType) return nil } - _, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, c.clearS3) + _, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, func() { + c.locker.ExecuteWithLock(c.ctx, "clearS3", c.clearS3) + }) return errs.WrapMsg(err, "failed to register clear s3 cron task") } @@ -107,11 +118,15 @@ func (c *cronServer) registerDeleteMsg() error { log.ZInfo(c.ctx, "disable scheduled cleanup of chat records", "retainChatRecords", c.config.CronTask.RetainChatRecords) return nil } - _, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, c.deleteMsg) + _, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, func() { + c.locker.ExecuteWithLock(c.ctx, "deleteMsg", c.deleteMsg) + }) return errs.WrapMsg(err, "failed to register delete msg cron task") } func (c *cronServer) registerClearUserMsg() error { - _, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, c.clearUserMsg) + _, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, func() { + c.locker.ExecuteWithLock(c.ctx, "clearUserMsg", c.clearUserMsg) + }) return errs.WrapMsg(err, "failed to register clear user msg cron task") } diff --git a/internal/tools/cron/dist_look.go b/internal/tools/cron/dist_look.go new file mode 100644 index 000000000..d1d2d1cb0 --- /dev/null +++ b/internal/tools/cron/dist_look.go @@ -0,0 +1,89 @@ +package cron + +import ( + "context" + "fmt" + "os" + "time" + + "github.com/openimsdk/tools/log" + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/client/v3/concurrency" +) + +const ( + lockLeaseTTL = 300 +) + +type EtcdLocker struct { + client *clientv3.Client + instanceID string +} + +// NewEtcdLocker creates a new etcd distributed lock +func NewEtcdLocker(client *clientv3.Client) (*EtcdLocker, error) { + hostname, _ := os.Hostname() + pid := os.Getpid() + instanceID := fmt.Sprintf("%s-pid-%d-%d", hostname, pid, time.Now().UnixNano()) + + locker := &EtcdLocker{ + client: client, + instanceID: instanceID, + } + + return locker, nil +} + +func (e *EtcdLocker) ExecuteWithLock(ctx context.Context, taskName string, task func()) { + session, err := concurrency.NewSession(e.client, concurrency.WithTTL(lockLeaseTTL)) + if err != nil { + log.ZWarn(ctx, "Failed to create etcd session", err, + "taskName", taskName, + "instanceID", e.instanceID) + return + } + defer session.Close() + + lockKey := fmt.Sprintf("openim/crontask/%s", taskName) + mutex := concurrency.NewMutex(session, lockKey) + + ctxWithTimeout, cancel := context.WithTimeout(ctx, 100*time.Millisecond) + defer cancel() + + err = mutex.TryLock(ctxWithTimeout) + if err != nil { + if err == context.DeadlineExceeded { + log.ZDebug(ctx, "Task is being executed by another instance, skipping", + "taskName", taskName, + "instanceID", e.instanceID) + } else { + log.ZWarn(ctx, "Failed to acquire task lock", err, + "taskName", taskName, + "instanceID", e.instanceID) + } + return + } + + defer func() { + if err := mutex.Unlock(ctx); err != nil { + log.ZWarn(ctx, "Failed to release task lock", err, + "taskName", taskName, + "instanceID", e.instanceID) + } else { + log.ZInfo(ctx, "Successfully released task lock", + "taskName", taskName, + "instanceID", e.instanceID) + } + }() + + log.ZInfo(ctx, "Successfully acquired task lock, starting execution", + "taskName", taskName, + "instanceID", e.instanceID, + "sessionID", session.Lease()) + + task() + + log.ZInfo(ctx, "Task execution completed", + "taskName", taskName, + "instanceID", e.instanceID) +} diff --git a/start-config.yml b/start-config.yml index 1231b5d0d..da959044d 100644 --- a/start-config.yml +++ b/start-config.yml @@ -1,6 +1,6 @@ serviceBinaries: openim-api: 1 - openim-crontask: 1 + openim-crontask: 4 openim-rpc-user: 1 openim-msggateway: 1 openim-push: 8 From d156e1e5199cbc923a919ef422a589c9ca73b385 Mon Sep 17 00:00:00 2001 From: icey-yu <119291641+icey-yu@users.noreply.github.com> Date: Wed, 11 Jun 2025 16:29:44 +0800 Subject: [PATCH 4/4] fix: prometheus discovery (#3408) --- cmd/main.go | 17 ++------------ go.mod | 2 +- go.sum | 4 ++-- internal/api/init.go | 2 +- internal/api/prometheus_discovery.go | 27 ++++++++++++----------- internal/api/router.go | 2 +- internal/msggateway/init.go | 2 +- internal/msgtransfer/init.go | 2 +- internal/push/push.go | 2 +- internal/rpc/auth/auth.go | 2 +- internal/rpc/conversation/conversation.go | 2 +- internal/rpc/group/group.go | 2 +- internal/rpc/msg/server.go | 2 +- internal/rpc/relation/friend.go | 2 +- internal/rpc/third/third.go | 2 +- internal/rpc/user/user.go | 2 +- internal/tools/cron/cron_task.go | 2 +- pkg/common/cmd/api.go | 3 ++- pkg/common/cmd/msg_transfer.go | 3 ++- pkg/common/prommetrics/prommetrics.go | 8 ++++++- pkg/common/startrpc/start.go | 10 +++++---- 21 files changed, 49 insertions(+), 51 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 1d0b82be8..7e19f1c98 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -3,7 +3,6 @@ package main import ( "bytes" "context" - "encoding/json" "flag" "fmt" "net" @@ -39,7 +38,6 @@ import ( "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/system/program" "github.com/openimsdk/tools/utils/datautil" - "github.com/openimsdk/tools/utils/network" "github.com/spf13/viper" "google.golang.org/grpc" ) @@ -250,23 +248,12 @@ func (x *cmds) run(ctx context.Context) error { return err } } - ip, err := network.GetLocalIP() - if err != nil { - return err - } listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) if err != nil { return fmt.Errorf("prometheus listen %d error %w", port, err) } defer listener.Close() log.ZDebug(ctx, "prometheus start", "addr", listener.Addr()) - target, err := json.Marshal(prommetrics.BuildDefaultTarget(ip, listener.Addr().(*net.TCPAddr).Port)) - if err != nil { - return err - } - if err := standalone.GetKeyValue().SetKey(ctx, prommetrics.BuildDiscoveryKey(prommetrics.APIKeyName), target); err != nil { - return err - } go func() { err := prommetrics.Start(listener) if err == nil { @@ -342,7 +329,7 @@ func (x *cmds) run(ctx context.Context) error { } } -func putCmd[C any](cmd *cmds, block bool, fn func(ctx context.Context, config *C, client discovery.Conn, server grpc.ServiceRegistrar) error) { +func putCmd[C any](cmd *cmds, block bool, fn func(ctx context.Context, config *C, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error) { name := path.Base(runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()) if index := strings.Index(name, "."); index >= 0 { name = name[:index] @@ -352,7 +339,7 @@ func putCmd[C any](cmd *cmds, block bool, fn func(ctx context.Context, config *C if err := cmd.parseConf(&conf); err != nil { return err } - return fn(ctx, &conf, standalone.GetDiscoveryConn(), standalone.GetServiceRegistrar()) + return fn(ctx, &conf, standalone.GetSvcDiscoveryRegistry(), standalone.GetServiceRegistrar()) }) } diff --git a/go.mod b/go.mod index 221e28b72..845f75bb2 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 github.com/openimsdk/protocol v0.0.73-alpha.12 - github.com/openimsdk/tools v0.0.50-alpha.84 + github.com/openimsdk/tools v0.0.50-alpha.85 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 diff --git a/go.sum b/go.sum index 6298f98c9..b775a1056 100644 --- a/go.sum +++ b/go.sum @@ -349,8 +349,8 @@ github.com/openimsdk/gomake v0.0.15-alpha.5 h1:eEZCEHm+NsmcO3onXZPIUbGFCYPYbsX5b github.com/openimsdk/gomake v0.0.15-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/protocol v0.0.73-alpha.12 h1:2NYawXeHChYUeSme6QJ9pOLh+Empce2WmwEtbP4JvKk= github.com/openimsdk/protocol v0.0.73-alpha.12/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= -github.com/openimsdk/tools v0.0.50-alpha.84 h1:jN60Ys/0edZjL/TDmm/5VSJFP4pGYRipkWqhILJbq/8= -github.com/openimsdk/tools v0.0.50-alpha.84/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= +github.com/openimsdk/tools v0.0.50-alpha.85 h1:OqTUYx6r7Zp/eH8FKB08XeNjPV405TUIG9QT6QQ+F+s= +github.com/openimsdk/tools v0.0.50-alpha.85/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= diff --git a/internal/api/init.go b/internal/api/init.go index 378f03eda..f3548e29a 100644 --- a/internal/api/init.go +++ b/internal/api/init.go @@ -39,7 +39,7 @@ type Config struct { Index conf.Index } -func Start(ctx context.Context, config *Config, client discovery.Conn, service grpc.ServiceRegistrar) error { +func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, service grpc.ServiceRegistrar) error { apiPort, err := datautil.GetElemByIndex(config.API.Api.Ports, int(config.Index)) if err != nil { return err diff --git a/internal/api/prometheus_discovery.go b/internal/api/prometheus_discovery.go index bdcca4e26..c861003a0 100644 --- a/internal/api/prometheus_discovery.go +++ b/internal/api/prometheus_discovery.go @@ -6,35 +6,29 @@ import ( "net/http" "github.com/gin-gonic/gin" - conf "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/tools/apiresp" "github.com/openimsdk/tools/discovery" - "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/errs" - clientv3 "go.etcd.io/etcd/client/v3" ) type PrometheusDiscoveryApi struct { config *Config - client *clientv3.Client kv discovery.KeyValue } -func NewPrometheusDiscoveryApi(config *Config, client discovery.Conn) *PrometheusDiscoveryApi { +func NewPrometheusDiscoveryApi(config *Config, client discovery.SvcDiscoveryRegistry) *PrometheusDiscoveryApi { api := &PrometheusDiscoveryApi{ config: config, - } - if config.Discovery.Enable == conf.ETCD { - api.client = client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() + kv: client, } return api } func (p *PrometheusDiscoveryApi) discovery(c *gin.Context, key string) { - value, err := p.kv.GetKey(c, prommetrics.BuildDiscoveryKey(key)) + value, err := p.kv.GetKeyWithPrefix(c, prommetrics.BuildDiscoveryKeyPrefix(key)) if err != nil { - if errors.Is(err, discovery.ErrNotSupportedKeyValue) { + if errors.Is(err, discovery.ErrNotSupported) { c.JSON(http.StatusOK, []struct{}{}) return } @@ -46,10 +40,17 @@ func (p *PrometheusDiscoveryApi) discovery(c *gin.Context, key string) { return } var resp prommetrics.RespTarget - if err := json.Unmarshal(value, &resp); err != nil { - apiresp.GinError(c, errs.WrapMsg(err, "json unmarshal err")) - return + for i := range value { + var tmp prommetrics.Target + if err = json.Unmarshal(value[i], &tmp); err != nil { + apiresp.GinError(c, errs.WrapMsg(err, "json unmarshal err")) + return + } + + resp.Targets = append(resp.Targets, tmp.Target) + resp.Labels = tmp.Labels // default label is fixed. See prommetrics.BuildDefaultTarget } + c.JSON(http.StatusOK, []*prommetrics.RespTarget{&resp}) } diff --git a/internal/api/router.go b/internal/api/router.go index 5e5f35a60..fcad104b8 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -53,7 +53,7 @@ func prommetricsGin() gin.HandlerFunc { } } -func newGinRouter(ctx context.Context, client discovery.Conn, cfg *Config) (*gin.Engine, error) { +func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, cfg *Config) (*gin.Engine, error) { authConn, err := client.GetConn(ctx, cfg.Discovery.RpcService.Auth) if err != nil { return nil, err diff --git a/internal/msggateway/init.go b/internal/msggateway/init.go index 8772693cc..40a57b1da 100644 --- a/internal/msggateway/init.go +++ b/internal/msggateway/init.go @@ -39,7 +39,7 @@ type Config struct { } // Start run ws server. -func Start(ctx context.Context, conf *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { +func Start(ctx context.Context, conf *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error { log.CInfo(ctx, "MSG-GATEWAY server is initializing", "runtimeEnv", runtimeenv.RuntimeEnvironment(), "rpcPorts", conf.MsgGateway.RPC.Ports, "wsPort", conf.MsgGateway.LongConnSvr.Ports, "prometheusPorts", conf.MsgGateway.Prometheus.Ports) diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index b07ec6f1d..bbec3f9a2 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -58,7 +58,7 @@ type Config struct { Index conf.Index } -func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { +func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error { builder := mqbuild.NewBuilder(&config.KafkaConfig) log.CInfo(ctx, "MSG-TRANSFER server is initializing", "runTimeEnv", runtimeenv.RuntimeEnvironment(), "prometheusPorts", diff --git a/internal/push/push.go b/internal/push/push.go index f720a52ac..1d6f8cb30 100644 --- a/internal/push/push.go +++ b/internal/push/push.go @@ -50,7 +50,7 @@ func (p pushServer) DelUserPushToken(ctx context.Context, return &pbpush.DelUserPushTokenResp{}, nil } -func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { +func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error { dbb := dbbuild.NewBuilder(&config.MongoConfig, &config.RedisConfig) rdb, err := dbb.Redis(ctx) if err != nil { diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index 2c2691d1d..5ed9cdf12 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -59,7 +59,7 @@ type Config struct { Discovery config.Discovery } -func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { +func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error { dbb := dbbuild.NewBuilder(&config.MongoConfig, &config.RedisConfig) rdb, err := dbb.Redis(ctx) if err != nil { diff --git a/internal/rpc/conversation/conversation.go b/internal/rpc/conversation/conversation.go index ba9e7746b..cf5a2b9c6 100644 --- a/internal/rpc/conversation/conversation.go +++ b/internal/rpc/conversation/conversation.go @@ -69,7 +69,7 @@ type Config struct { Discovery config.Discovery } -func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { +func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error { dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig) mgocli, err := dbb.Mongo(ctx) if err != nil { diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 5219546b7..ce8a2c7aa 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -76,7 +76,7 @@ type Config struct { Discovery config.Discovery } -func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { +func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error { dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig) mgocli, err := dbb.Mongo(ctx) if err != nil { diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index cfc750c5b..9d5391cc9 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -78,7 +78,7 @@ func (m *msgServer) addInterceptorHandler(interceptorFunc ...MessageInterceptorF } -func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { +func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error { builder := mqbuild.NewBuilder(&config.KafkaConfig) redisProducer, err := builder.GetTopicProducer(ctx, config.KafkaConfig.ToRedisTopic) if err != nil { diff --git a/internal/rpc/relation/friend.go b/internal/rpc/relation/friend.go index 8c7c40536..d05cf7d77 100644 --- a/internal/rpc/relation/friend.go +++ b/internal/rpc/relation/friend.go @@ -66,7 +66,7 @@ type Config struct { Discovery config.Discovery } -func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { +func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error { dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig) mgocli, err := dbb.Mongo(ctx) if err != nil { diff --git a/internal/rpc/third/third.go b/internal/rpc/third/third.go index c6dcb2ea4..cea6a8522 100644 --- a/internal/rpc/third/third.go +++ b/internal/rpc/third/third.go @@ -64,7 +64,7 @@ type Config struct { Discovery config.Discovery } -func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { +func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error { dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig) mgocli, err := dbb.Mongo(ctx) if err != nil { diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index 5639baab9..28461ae0a 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -79,7 +79,7 @@ type Config struct { Discovery config.Discovery } -func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { +func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error { dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig) mgocli, err := dbb.Mongo(ctx) if err != nil { diff --git a/internal/tools/cron/cron_task.go b/internal/tools/cron/cron_task.go index a4de309d4..abe596192 100644 --- a/internal/tools/cron/cron_task.go +++ b/internal/tools/cron/cron_task.go @@ -25,7 +25,7 @@ type Config struct { Discovery config.Discovery } -func Start(ctx context.Context, conf *Config, client discovery.Conn, service grpc.ServiceRegistrar) error { +func Start(ctx context.Context, conf *Config, client discovery.SvcDiscoveryRegistry, service grpc.ServiceRegistrar) error { log.CInfo(ctx, "CRON-TASK server is initializing", "runTimeEnv", runtimeenv.RuntimeEnvironment(), "chatRecordsClearTime", conf.CronTask.CronExecuteTime, "msgDestructTime", conf.CronTask.RetainChatRecords) if conf.CronTask.RetainChatRecords < 1 { log.ZInfo(ctx, "disable cron") diff --git a/pkg/common/cmd/api.go b/pkg/common/cmd/api.go index 484467798..7b7dbc89b 100644 --- a/pkg/common/cmd/api.go +++ b/pkg/common/cmd/api.go @@ -19,6 +19,7 @@ import ( "github.com/openimsdk/open-im-server/v3/internal/api" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" "github.com/openimsdk/open-im-server/v3/version" "github.com/openimsdk/tools/system/program" @@ -84,7 +85,7 @@ func (a *ApiCmd) runE() error { a.apiConfig.API.Api.ListenIP, "", a.apiConfig.API.Prometheus.AutoSetPorts, nil, int(a.apiConfig.Index), - a.apiConfig.Discovery.RpcService.MessageGateway, + prommetrics.APIKeyName, &a.apiConfig.Notification, a.apiConfig, []string{}, diff --git a/pkg/common/cmd/msg_transfer.go b/pkg/common/cmd/msg_transfer.go index 9411a2cd0..fe6c27e54 100644 --- a/pkg/common/cmd/msg_transfer.go +++ b/pkg/common/cmd/msg_transfer.go @@ -19,6 +19,7 @@ import ( "github.com/openimsdk/open-im-server/v3/internal/msgtransfer" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" "github.com/openimsdk/open-im-server/v3/version" "github.com/openimsdk/tools/system/program" @@ -65,7 +66,7 @@ func (m *MsgTransferCmd) runE() error { "", "", true, nil, int(m.msgTransferConfig.Index), - "", + prommetrics.MessageTransferKeyName, nil, m.msgTransferConfig, []string{}, diff --git a/pkg/common/prommetrics/prommetrics.go b/pkg/common/prommetrics/prommetrics.go index 153314bbb..3f683a50e 100644 --- a/pkg/common/prommetrics/prommetrics.go +++ b/pkg/common/prommetrics/prommetrics.go @@ -85,6 +85,8 @@ func Start(listener net.Listener) error { const ( APIKeyName = "api" MessageTransferKeyName = "message-transfer" + + TTL = 300 ) type Target struct { @@ -97,10 +99,14 @@ type RespTarget struct { Labels map[string]string `json:"labels"` } -func BuildDiscoveryKey(name string) string { +func BuildDiscoveryKeyPrefix(name string) string { return fmt.Sprintf("%s/%s/%s", "openim", "prometheus_discovery", name) } +func BuildDiscoveryKey(name string, index int) string { + return fmt.Sprintf("%s/%s/%s/%d", "openim", "prometheus_discovery", name, index) +} + func BuildDefaultTarget(host string, ip int) Target { return Target{ Target: fmt.Sprintf("%s:%d", host, ip), diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index b99d32db1..06e19d8d2 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -50,7 +50,7 @@ func init() { func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *conf.Prometheus, listenIP, registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, notification *conf.Notification, config T, watchConfigNames []string, watchServiceNames []string, - rpcFn func(ctx context.Context, config T, client discovery.Conn, server grpc.ServiceRegistrar) error, + rpcFn func(ctx context.Context, config T, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error, options ...grpc.ServerOption) error { if notification != nil { @@ -148,9 +148,11 @@ func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *c if err != nil { return err } - if err := client.SetKey(ctx, prommetrics.BuildDiscoveryKey(prommetrics.APIKeyName), target); err != nil { - if !errors.Is(err, discovery.ErrNotSupportedKeyValue) { - return err + if autoSetPorts { + if err = client.SetWithLease(ctx, prommetrics.BuildDiscoveryKey(rpcRegisterName, index), target, prommetrics.TTL); err != nil { + if !errors.Is(err, discovery.ErrNotSupported) { + return err + } } } go func() {