From b7ca3bd95f580fc16d6de1caf8d9708e97c42dc3 Mon Sep 17 00:00:00 2001 From: HonQi <14954524+HonQii@users.noreply.github.com> Date: Wed, 4 Jun 2025 10:46:20 +0800 Subject: [PATCH 1/9] fix redis config db field (#3395) --- pkg/common/config/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index d5ae68ec0..cd57a11cf 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -328,7 +328,7 @@ type Redis struct { Username string `yaml:"username"` Password string `yaml:"password"` ClusterMode bool `yaml:"clusterMode"` - DB int `yaml:"storage"` + DB int `yaml:"db"` MaxRetry int `yaml:"maxRetry"` PoolSize int `yaml:"poolSize"` } From 04ee509b68b4ad1579d948cbaf82b37d2a2e518c Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Fri, 6 Jun 2025 11:29:45 +0800 Subject: [PATCH 2/9] fix: solve incorrect when sendMsg webhook callback after. (#3409) * fix: solve incorrect when sendMsg webhook callback after. * remove print. --- internal/msgtransfer/callback.go | 124 ++++++++++++++++++ internal/msgtransfer/init.go | 2 +- .../online_msg_to_mongo_handler.go | 18 ++- internal/rpc/msg/send.go | 6 +- 4 files changed, 146 insertions(+), 4 deletions(-) create mode 100644 internal/msgtransfer/callback.go diff --git a/internal/msgtransfer/callback.go b/internal/msgtransfer/callback.go new file mode 100644 index 000000000..ea51c2839 --- /dev/null +++ b/internal/msgtransfer/callback.go @@ -0,0 +1,124 @@ +package msgtransfer + +import ( + "context" + "encoding/base64" + + "github.com/openimsdk/open-im-server/v3/pkg/apistruct" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" + "github.com/openimsdk/protocol/constant" + "github.com/openimsdk/protocol/sdkws" + "github.com/openimsdk/tools/mcontext" + "github.com/openimsdk/tools/utils/datautil" + "github.com/openimsdk/tools/utils/stringutil" + "google.golang.org/protobuf/proto" + + cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" +) + +func toCommonCallback(ctx context.Context, msg *sdkws.MsgData, command string) cbapi.CommonCallbackReq { + return cbapi.CommonCallbackReq{ + SendID: msg.SendID, + ServerMsgID: msg.ServerMsgID, + CallbackCommand: command, + ClientMsgID: msg.ClientMsgID, + OperationID: mcontext.GetOperationID(ctx), + SenderPlatformID: msg.SenderPlatformID, + SenderNickname: msg.SenderNickname, + SessionType: msg.SessionType, + MsgFrom: msg.MsgFrom, + ContentType: msg.ContentType, + Status: msg.Status, + SendTime: msg.SendTime, + CreateTime: msg.CreateTime, + AtUserIDList: msg.AtUserIDList, + SenderFaceURL: msg.SenderFaceURL, + Content: GetContent(msg), + Seq: uint32(msg.Seq), + Ex: msg.Ex, + } +} + +func GetContent(msg *sdkws.MsgData) string { + if msg.ContentType >= constant.NotificationBegin && msg.ContentType <= constant.NotificationEnd { + var tips sdkws.TipsComm + _ = proto.Unmarshal(msg.Content, &tips) + content := tips.JsonDetail + return content + } else { + return string(msg.Content) + } +} + +func (mc *OnlineHistoryMongoConsumerHandler) webhookAfterSendSingleMsg(ctx context.Context, after *config.AfterConfig, msg *sdkws.MsgData) { + if msg.ContentType == constant.Typing { + return + } + if !filterAfterMsg(msg, after) { + return + } + cbReq := &cbapi.CallbackAfterSendSingleMsgReq{ + CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand), + RecvID: msg.RecvID, + } + mc.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendSingleMsgResp{}, after, buildKeyMsgDataQuery(msg)) +} + +func (mc *OnlineHistoryMongoConsumerHandler) webhookAfterSendGroupMsg(ctx context.Context, after *config.AfterConfig, msg *sdkws.MsgData) { + if msg.ContentType == constant.Typing { + return + } + if !filterAfterMsg(msg, after) { + return + } + cbReq := &cbapi.CallbackAfterSendGroupMsgReq{ + CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendGroupMsgCommand), + GroupID: msg.GroupID, + } + + mc.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after, buildKeyMsgDataQuery(msg)) +} + +func buildKeyMsgDataQuery(msg *sdkws.MsgData) map[string]string { + keyMsgData := apistruct.KeyMsgData{ + SendID: msg.SendID, + RecvID: msg.RecvID, + GroupID: msg.GroupID, + } + + return map[string]string{ + webhook.Key: base64.StdEncoding.EncodeToString(stringutil.StructToJsonBytes(keyMsgData)), + } +} + +func filterAfterMsg(msg *sdkws.MsgData, after *config.AfterConfig) bool { + return filterMsg(msg, after.AttentionIds, after.DeniedTypes) +} + +func filterMsg(msg *sdkws.MsgData, attentionIds []string, deniedTypes []int32) bool { + // According to the attentionIds configuration, only some users are sent + if len(attentionIds) != 0 && !datautil.Contain(msg.RecvID, attentionIds...) { + return false + } + + if defaultDeniedTypes(msg.ContentType) { + return false + } + + if len(deniedTypes) != 0 && datautil.Contain(msg.ContentType, deniedTypes...) { + return false + } + + return true +} + +func defaultDeniedTypes(contentType int32) bool { + if contentType >= constant.NotificationBegin && contentType <= constant.NotificationEnd { + return true + } + if contentType == constant.Typing { + return true + } + return false +} diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 175813552..b07ec6f1d 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -134,7 +134,7 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, server gr if err != nil { return err } - historyMongoHandler := NewOnlineHistoryMongoConsumerHandler(msgTransferDatabase) + historyMongoHandler := NewOnlineHistoryMongoConsumerHandler(msgTransferDatabase,config) msgTransfer := &MsgTransfer{ historyConsumer: historyConsumer, diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index a895bb9c4..6c1498f82 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -19,6 +19,8 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" + "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" + "github.com/openimsdk/protocol/constant" pbmsg "github.com/openimsdk/protocol/msg" "github.com/openimsdk/tools/log" "google.golang.org/protobuf/proto" @@ -26,11 +28,15 @@ import ( type OnlineHistoryMongoConsumerHandler struct { msgTransferDatabase controller.MsgTransferDatabase + config *Config + webhookClient *webhook.Client } -func NewOnlineHistoryMongoConsumerHandler(database controller.MsgTransferDatabase) *OnlineHistoryMongoConsumerHandler { +func NewOnlineHistoryMongoConsumerHandler(database controller.MsgTransferDatabase, config *Config) *OnlineHistoryMongoConsumerHandler { return &OnlineHistoryMongoConsumerHandler{ msgTransferDatabase: database, + config: config, + webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL), } } @@ -53,6 +59,16 @@ func (mc *OnlineHistoryMongoConsumerHandler) HandleChatWs2Mongo(ctx context.Cont } else { prommetrics.MsgInsertMongoSuccessCounter.Inc() } + + for _, msgData := range msgFromMQ.MsgData { + switch msgData.SessionType { + case constant.SingleChatType: + mc.webhookAfterSendSingleMsg(ctx, &mc.config.WebhooksConfig.AfterSendSingleMsg, msgData) + case constant.ReadGroupChatType: + mc.webhookAfterSendGroupMsg(ctx, &mc.config.WebhooksConfig.AfterSendGroupMsg, msgData) + } + } + //var seqs []int64 //for _, msg := range msgFromMQ.MsgData { // seqs = append(seqs, msg.Seq) diff --git a/internal/rpc/msg/send.go b/internal/rpc/msg/send.go index f13b80708..d97905bff 100644 --- a/internal/rpc/msg/send.go +++ b/internal/rpc/msg/send.go @@ -86,7 +86,8 @@ func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *pbmsg.SendMsgReq, go m.setConversationAtInfo(ctx, req.MsgData) } - m.webhookAfterSendGroupMsg(ctx, &m.config.WebhooksConfig.AfterSendGroupMsg, req) + // m.webhookAfterSendGroupMsg(ctx, &m.config.WebhooksConfig.AfterSendGroupMsg, req) + prommetrics.GroupChatMsgProcessSuccessCounter.Inc() resp = &pbmsg.SendMsgResp{} resp.SendTime = req.MsgData.SendTime @@ -192,7 +193,8 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq prommetrics.SingleChatMsgProcessFailedCounter.Inc() return nil, err } - m.webhookAfterSendSingleMsg(ctx, &m.config.WebhooksConfig.AfterSendSingleMsg, req) + + // m.webhookAfterSendSingleMsg(ctx, &m.config.WebhooksConfig.AfterSendSingleMsg, req) prommetrics.SingleChatMsgProcessSuccessCounter.Inc() return &pbmsg.SendMsgResp{ ServerMsgID: req.MsgData.ServerMsgID, From 75367545ea330bdb67a6e4242cb31e5652ce44dc Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Fri, 6 Jun 2025 14:25:58 +0800 Subject: [PATCH 3/9] feat: support distributed lock in crontask. (#3401) * feat: support distributed lock in crontask. * remove space. * remove comment. * remove log. * Update logic. * Update contents. --- internal/tools/cron/cron_task.go | 21 ++++++-- internal/tools/cron/dist_look.go | 89 ++++++++++++++++++++++++++++++++ start-config.yml | 2 +- 3 files changed, 108 insertions(+), 4 deletions(-) create mode 100644 internal/tools/cron/dist_look.go diff --git a/internal/tools/cron/cron_task.go b/internal/tools/cron/cron_task.go index 7ae314193..a4de309d4 100644 --- a/internal/tools/cron/cron_task.go +++ b/internal/tools/cron/cron_task.go @@ -58,6 +58,11 @@ func Start(ctx context.Context, conf *Config, client discovery.Conn, service grp cm.Watch(ctx) } + locker, err := NewEtcdLocker(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient()) + if err != nil { + return err + } + srv := &cronServer{ ctx: ctx, config: conf, @@ -65,6 +70,7 @@ func Start(ctx context.Context, conf *Config, client discovery.Conn, service grp msgClient: msg.NewMsgClient(msgConn), conversationClient: pbconversation.NewConversationClient(conversationConn), thirdClient: third.NewThirdClient(thirdConn), + locker: locker, } if err := srv.registerClearS3(); err != nil { @@ -81,6 +87,8 @@ func Start(ctx context.Context, conf *Config, client discovery.Conn, service grp log.ZDebug(ctx, "cron task server is running") <-ctx.Done() log.ZDebug(ctx, "cron task server is shutting down") + srv.cron.Stop() + return nil } @@ -91,6 +99,7 @@ type cronServer struct { msgClient msg.MsgClient conversationClient pbconversation.ConversationClient thirdClient third.ThirdClient + locker *EtcdLocker } func (c *cronServer) registerClearS3() error { @@ -98,7 +107,9 @@ func (c *cronServer) registerClearS3() error { log.ZInfo(c.ctx, "disable scheduled cleanup of s3", "fileExpireTime", c.config.CronTask.FileExpireTime, "deleteObjectType", c.config.CronTask.DeleteObjectType) return nil } - _, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, c.clearS3) + _, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, func() { + c.locker.ExecuteWithLock(c.ctx, "clearS3", c.clearS3) + }) return errs.WrapMsg(err, "failed to register clear s3 cron task") } @@ -107,11 +118,15 @@ func (c *cronServer) registerDeleteMsg() error { log.ZInfo(c.ctx, "disable scheduled cleanup of chat records", "retainChatRecords", c.config.CronTask.RetainChatRecords) return nil } - _, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, c.deleteMsg) + _, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, func() { + c.locker.ExecuteWithLock(c.ctx, "deleteMsg", c.deleteMsg) + }) return errs.WrapMsg(err, "failed to register delete msg cron task") } func (c *cronServer) registerClearUserMsg() error { - _, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, c.clearUserMsg) + _, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, func() { + c.locker.ExecuteWithLock(c.ctx, "clearUserMsg", c.clearUserMsg) + }) return errs.WrapMsg(err, "failed to register clear user msg cron task") } diff --git a/internal/tools/cron/dist_look.go b/internal/tools/cron/dist_look.go new file mode 100644 index 000000000..d1d2d1cb0 --- /dev/null +++ b/internal/tools/cron/dist_look.go @@ -0,0 +1,89 @@ +package cron + +import ( + "context" + "fmt" + "os" + "time" + + "github.com/openimsdk/tools/log" + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/client/v3/concurrency" +) + +const ( + lockLeaseTTL = 300 +) + +type EtcdLocker struct { + client *clientv3.Client + instanceID string +} + +// NewEtcdLocker creates a new etcd distributed lock +func NewEtcdLocker(client *clientv3.Client) (*EtcdLocker, error) { + hostname, _ := os.Hostname() + pid := os.Getpid() + instanceID := fmt.Sprintf("%s-pid-%d-%d", hostname, pid, time.Now().UnixNano()) + + locker := &EtcdLocker{ + client: client, + instanceID: instanceID, + } + + return locker, nil +} + +func (e *EtcdLocker) ExecuteWithLock(ctx context.Context, taskName string, task func()) { + session, err := concurrency.NewSession(e.client, concurrency.WithTTL(lockLeaseTTL)) + if err != nil { + log.ZWarn(ctx, "Failed to create etcd session", err, + "taskName", taskName, + "instanceID", e.instanceID) + return + } + defer session.Close() + + lockKey := fmt.Sprintf("openim/crontask/%s", taskName) + mutex := concurrency.NewMutex(session, lockKey) + + ctxWithTimeout, cancel := context.WithTimeout(ctx, 100*time.Millisecond) + defer cancel() + + err = mutex.TryLock(ctxWithTimeout) + if err != nil { + if err == context.DeadlineExceeded { + log.ZDebug(ctx, "Task is being executed by another instance, skipping", + "taskName", taskName, + "instanceID", e.instanceID) + } else { + log.ZWarn(ctx, "Failed to acquire task lock", err, + "taskName", taskName, + "instanceID", e.instanceID) + } + return + } + + defer func() { + if err := mutex.Unlock(ctx); err != nil { + log.ZWarn(ctx, "Failed to release task lock", err, + "taskName", taskName, + "instanceID", e.instanceID) + } else { + log.ZInfo(ctx, "Successfully released task lock", + "taskName", taskName, + "instanceID", e.instanceID) + } + }() + + log.ZInfo(ctx, "Successfully acquired task lock, starting execution", + "taskName", taskName, + "instanceID", e.instanceID, + "sessionID", session.Lease()) + + task() + + log.ZInfo(ctx, "Task execution completed", + "taskName", taskName, + "instanceID", e.instanceID) +} diff --git a/start-config.yml b/start-config.yml index 1231b5d0d..da959044d 100644 --- a/start-config.yml +++ b/start-config.yml @@ -1,6 +1,6 @@ serviceBinaries: openim-api: 1 - openim-crontask: 1 + openim-crontask: 4 openim-rpc-user: 1 openim-msggateway: 1 openim-push: 8 From d156e1e5199cbc923a919ef422a589c9ca73b385 Mon Sep 17 00:00:00 2001 From: icey-yu <119291641+icey-yu@users.noreply.github.com> Date: Wed, 11 Jun 2025 16:29:44 +0800 Subject: [PATCH 4/9] fix: prometheus discovery (#3408) --- cmd/main.go | 17 ++------------ go.mod | 2 +- go.sum | 4 ++-- internal/api/init.go | 2 +- internal/api/prometheus_discovery.go | 27 ++++++++++++----------- internal/api/router.go | 2 +- internal/msggateway/init.go | 2 +- internal/msgtransfer/init.go | 2 +- internal/push/push.go | 2 +- internal/rpc/auth/auth.go | 2 +- internal/rpc/conversation/conversation.go | 2 +- internal/rpc/group/group.go | 2 +- internal/rpc/msg/server.go | 2 +- internal/rpc/relation/friend.go | 2 +- internal/rpc/third/third.go | 2 +- internal/rpc/user/user.go | 2 +- internal/tools/cron/cron_task.go | 2 +- pkg/common/cmd/api.go | 3 ++- pkg/common/cmd/msg_transfer.go | 3 ++- pkg/common/prommetrics/prommetrics.go | 8 ++++++- pkg/common/startrpc/start.go | 10 +++++---- 21 files changed, 49 insertions(+), 51 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 1d0b82be8..7e19f1c98 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -3,7 +3,6 @@ package main import ( "bytes" "context" - "encoding/json" "flag" "fmt" "net" @@ -39,7 +38,6 @@ import ( "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/system/program" "github.com/openimsdk/tools/utils/datautil" - "github.com/openimsdk/tools/utils/network" "github.com/spf13/viper" "google.golang.org/grpc" ) @@ -250,23 +248,12 @@ func (x *cmds) run(ctx context.Context) error { return err } } - ip, err := network.GetLocalIP() - if err != nil { - return err - } listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) if err != nil { return fmt.Errorf("prometheus listen %d error %w", port, err) } defer listener.Close() log.ZDebug(ctx, "prometheus start", "addr", listener.Addr()) - target, err := json.Marshal(prommetrics.BuildDefaultTarget(ip, listener.Addr().(*net.TCPAddr).Port)) - if err != nil { - return err - } - if err := standalone.GetKeyValue().SetKey(ctx, prommetrics.BuildDiscoveryKey(prommetrics.APIKeyName), target); err != nil { - return err - } go func() { err := prommetrics.Start(listener) if err == nil { @@ -342,7 +329,7 @@ func (x *cmds) run(ctx context.Context) error { } } -func putCmd[C any](cmd *cmds, block bool, fn func(ctx context.Context, config *C, client discovery.Conn, server grpc.ServiceRegistrar) error) { +func putCmd[C any](cmd *cmds, block bool, fn func(ctx context.Context, config *C, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error) { name := path.Base(runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()) if index := strings.Index(name, "."); index >= 0 { name = name[:index] @@ -352,7 +339,7 @@ func putCmd[C any](cmd *cmds, block bool, fn func(ctx context.Context, config *C if err := cmd.parseConf(&conf); err != nil { return err } - return fn(ctx, &conf, standalone.GetDiscoveryConn(), standalone.GetServiceRegistrar()) + return fn(ctx, &conf, standalone.GetSvcDiscoveryRegistry(), standalone.GetServiceRegistrar()) }) } diff --git a/go.mod b/go.mod index 221e28b72..845f75bb2 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 github.com/openimsdk/protocol v0.0.73-alpha.12 - github.com/openimsdk/tools v0.0.50-alpha.84 + github.com/openimsdk/tools v0.0.50-alpha.85 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 diff --git a/go.sum b/go.sum index 6298f98c9..b775a1056 100644 --- a/go.sum +++ b/go.sum @@ -349,8 +349,8 @@ github.com/openimsdk/gomake v0.0.15-alpha.5 h1:eEZCEHm+NsmcO3onXZPIUbGFCYPYbsX5b github.com/openimsdk/gomake v0.0.15-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/protocol v0.0.73-alpha.12 h1:2NYawXeHChYUeSme6QJ9pOLh+Empce2WmwEtbP4JvKk= github.com/openimsdk/protocol v0.0.73-alpha.12/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= -github.com/openimsdk/tools v0.0.50-alpha.84 h1:jN60Ys/0edZjL/TDmm/5VSJFP4pGYRipkWqhILJbq/8= -github.com/openimsdk/tools v0.0.50-alpha.84/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= +github.com/openimsdk/tools v0.0.50-alpha.85 h1:OqTUYx6r7Zp/eH8FKB08XeNjPV405TUIG9QT6QQ+F+s= +github.com/openimsdk/tools v0.0.50-alpha.85/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= diff --git a/internal/api/init.go b/internal/api/init.go index 378f03eda..f3548e29a 100644 --- a/internal/api/init.go +++ b/internal/api/init.go @@ -39,7 +39,7 @@ type Config struct { Index conf.Index } -func Start(ctx context.Context, config *Config, client discovery.Conn, service grpc.ServiceRegistrar) error { +func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, service grpc.ServiceRegistrar) error { apiPort, err := datautil.GetElemByIndex(config.API.Api.Ports, int(config.Index)) if err != nil { return err diff --git a/internal/api/prometheus_discovery.go b/internal/api/prometheus_discovery.go index bdcca4e26..c861003a0 100644 --- a/internal/api/prometheus_discovery.go +++ b/internal/api/prometheus_discovery.go @@ -6,35 +6,29 @@ import ( "net/http" "github.com/gin-gonic/gin" - conf "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/tools/apiresp" "github.com/openimsdk/tools/discovery" - "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/errs" - clientv3 "go.etcd.io/etcd/client/v3" ) type PrometheusDiscoveryApi struct { config *Config - client *clientv3.Client kv discovery.KeyValue } -func NewPrometheusDiscoveryApi(config *Config, client discovery.Conn) *PrometheusDiscoveryApi { +func NewPrometheusDiscoveryApi(config *Config, client discovery.SvcDiscoveryRegistry) *PrometheusDiscoveryApi { api := &PrometheusDiscoveryApi{ config: config, - } - if config.Discovery.Enable == conf.ETCD { - api.client = client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() + kv: client, } return api } func (p *PrometheusDiscoveryApi) discovery(c *gin.Context, key string) { - value, err := p.kv.GetKey(c, prommetrics.BuildDiscoveryKey(key)) + value, err := p.kv.GetKeyWithPrefix(c, prommetrics.BuildDiscoveryKeyPrefix(key)) if err != nil { - if errors.Is(err, discovery.ErrNotSupportedKeyValue) { + if errors.Is(err, discovery.ErrNotSupported) { c.JSON(http.StatusOK, []struct{}{}) return } @@ -46,10 +40,17 @@ func (p *PrometheusDiscoveryApi) discovery(c *gin.Context, key string) { return } var resp prommetrics.RespTarget - if err := json.Unmarshal(value, &resp); err != nil { - apiresp.GinError(c, errs.WrapMsg(err, "json unmarshal err")) - return + for i := range value { + var tmp prommetrics.Target + if err = json.Unmarshal(value[i], &tmp); err != nil { + apiresp.GinError(c, errs.WrapMsg(err, "json unmarshal err")) + return + } + + resp.Targets = append(resp.Targets, tmp.Target) + resp.Labels = tmp.Labels // default label is fixed. See prommetrics.BuildDefaultTarget } + c.JSON(http.StatusOK, []*prommetrics.RespTarget{&resp}) } diff --git a/internal/api/router.go b/internal/api/router.go index 5e5f35a60..fcad104b8 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -53,7 +53,7 @@ func prommetricsGin() gin.HandlerFunc { } } -func newGinRouter(ctx context.Context, client discovery.Conn, cfg *Config) (*gin.Engine, error) { +func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, cfg *Config) (*gin.Engine, error) { authConn, err := client.GetConn(ctx, cfg.Discovery.RpcService.Auth) if err != nil { return nil, err diff --git a/internal/msggateway/init.go b/internal/msggateway/init.go index 8772693cc..40a57b1da 100644 --- a/internal/msggateway/init.go +++ b/internal/msggateway/init.go @@ -39,7 +39,7 @@ type Config struct { } // Start run ws server. -func Start(ctx context.Context, conf *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { +func Start(ctx context.Context, conf *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error { log.CInfo(ctx, "MSG-GATEWAY server is initializing", "runtimeEnv", runtimeenv.RuntimeEnvironment(), "rpcPorts", conf.MsgGateway.RPC.Ports, "wsPort", conf.MsgGateway.LongConnSvr.Ports, "prometheusPorts", conf.MsgGateway.Prometheus.Ports) diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index b07ec6f1d..bbec3f9a2 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -58,7 +58,7 @@ type Config struct { Index conf.Index } -func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { +func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error { builder := mqbuild.NewBuilder(&config.KafkaConfig) log.CInfo(ctx, "MSG-TRANSFER server is initializing", "runTimeEnv", runtimeenv.RuntimeEnvironment(), "prometheusPorts", diff --git a/internal/push/push.go b/internal/push/push.go index f720a52ac..1d6f8cb30 100644 --- a/internal/push/push.go +++ b/internal/push/push.go @@ -50,7 +50,7 @@ func (p pushServer) DelUserPushToken(ctx context.Context, return &pbpush.DelUserPushTokenResp{}, nil } -func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { +func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error { dbb := dbbuild.NewBuilder(&config.MongoConfig, &config.RedisConfig) rdb, err := dbb.Redis(ctx) if err != nil { diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index 2c2691d1d..5ed9cdf12 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -59,7 +59,7 @@ type Config struct { Discovery config.Discovery } -func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { +func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error { dbb := dbbuild.NewBuilder(&config.MongoConfig, &config.RedisConfig) rdb, err := dbb.Redis(ctx) if err != nil { diff --git a/internal/rpc/conversation/conversation.go b/internal/rpc/conversation/conversation.go index ba9e7746b..cf5a2b9c6 100644 --- a/internal/rpc/conversation/conversation.go +++ b/internal/rpc/conversation/conversation.go @@ -69,7 +69,7 @@ type Config struct { Discovery config.Discovery } -func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { +func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error { dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig) mgocli, err := dbb.Mongo(ctx) if err != nil { diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 5219546b7..ce8a2c7aa 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -76,7 +76,7 @@ type Config struct { Discovery config.Discovery } -func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { +func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error { dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig) mgocli, err := dbb.Mongo(ctx) if err != nil { diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index cfc750c5b..9d5391cc9 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -78,7 +78,7 @@ func (m *msgServer) addInterceptorHandler(interceptorFunc ...MessageInterceptorF } -func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { +func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error { builder := mqbuild.NewBuilder(&config.KafkaConfig) redisProducer, err := builder.GetTopicProducer(ctx, config.KafkaConfig.ToRedisTopic) if err != nil { diff --git a/internal/rpc/relation/friend.go b/internal/rpc/relation/friend.go index 8c7c40536..d05cf7d77 100644 --- a/internal/rpc/relation/friend.go +++ b/internal/rpc/relation/friend.go @@ -66,7 +66,7 @@ type Config struct { Discovery config.Discovery } -func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { +func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error { dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig) mgocli, err := dbb.Mongo(ctx) if err != nil { diff --git a/internal/rpc/third/third.go b/internal/rpc/third/third.go index c6dcb2ea4..cea6a8522 100644 --- a/internal/rpc/third/third.go +++ b/internal/rpc/third/third.go @@ -64,7 +64,7 @@ type Config struct { Discovery config.Discovery } -func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { +func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error { dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig) mgocli, err := dbb.Mongo(ctx) if err != nil { diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index 5639baab9..28461ae0a 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -79,7 +79,7 @@ type Config struct { Discovery config.Discovery } -func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { +func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error { dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig) mgocli, err := dbb.Mongo(ctx) if err != nil { diff --git a/internal/tools/cron/cron_task.go b/internal/tools/cron/cron_task.go index a4de309d4..abe596192 100644 --- a/internal/tools/cron/cron_task.go +++ b/internal/tools/cron/cron_task.go @@ -25,7 +25,7 @@ type Config struct { Discovery config.Discovery } -func Start(ctx context.Context, conf *Config, client discovery.Conn, service grpc.ServiceRegistrar) error { +func Start(ctx context.Context, conf *Config, client discovery.SvcDiscoveryRegistry, service grpc.ServiceRegistrar) error { log.CInfo(ctx, "CRON-TASK server is initializing", "runTimeEnv", runtimeenv.RuntimeEnvironment(), "chatRecordsClearTime", conf.CronTask.CronExecuteTime, "msgDestructTime", conf.CronTask.RetainChatRecords) if conf.CronTask.RetainChatRecords < 1 { log.ZInfo(ctx, "disable cron") diff --git a/pkg/common/cmd/api.go b/pkg/common/cmd/api.go index 484467798..7b7dbc89b 100644 --- a/pkg/common/cmd/api.go +++ b/pkg/common/cmd/api.go @@ -19,6 +19,7 @@ import ( "github.com/openimsdk/open-im-server/v3/internal/api" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" "github.com/openimsdk/open-im-server/v3/version" "github.com/openimsdk/tools/system/program" @@ -84,7 +85,7 @@ func (a *ApiCmd) runE() error { a.apiConfig.API.Api.ListenIP, "", a.apiConfig.API.Prometheus.AutoSetPorts, nil, int(a.apiConfig.Index), - a.apiConfig.Discovery.RpcService.MessageGateway, + prommetrics.APIKeyName, &a.apiConfig.Notification, a.apiConfig, []string{}, diff --git a/pkg/common/cmd/msg_transfer.go b/pkg/common/cmd/msg_transfer.go index 9411a2cd0..fe6c27e54 100644 --- a/pkg/common/cmd/msg_transfer.go +++ b/pkg/common/cmd/msg_transfer.go @@ -19,6 +19,7 @@ import ( "github.com/openimsdk/open-im-server/v3/internal/msgtransfer" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" "github.com/openimsdk/open-im-server/v3/version" "github.com/openimsdk/tools/system/program" @@ -65,7 +66,7 @@ func (m *MsgTransferCmd) runE() error { "", "", true, nil, int(m.msgTransferConfig.Index), - "", + prommetrics.MessageTransferKeyName, nil, m.msgTransferConfig, []string{}, diff --git a/pkg/common/prommetrics/prommetrics.go b/pkg/common/prommetrics/prommetrics.go index 153314bbb..3f683a50e 100644 --- a/pkg/common/prommetrics/prommetrics.go +++ b/pkg/common/prommetrics/prommetrics.go @@ -85,6 +85,8 @@ func Start(listener net.Listener) error { const ( APIKeyName = "api" MessageTransferKeyName = "message-transfer" + + TTL = 300 ) type Target struct { @@ -97,10 +99,14 @@ type RespTarget struct { Labels map[string]string `json:"labels"` } -func BuildDiscoveryKey(name string) string { +func BuildDiscoveryKeyPrefix(name string) string { return fmt.Sprintf("%s/%s/%s", "openim", "prometheus_discovery", name) } +func BuildDiscoveryKey(name string, index int) string { + return fmt.Sprintf("%s/%s/%s/%d", "openim", "prometheus_discovery", name, index) +} + func BuildDefaultTarget(host string, ip int) Target { return Target{ Target: fmt.Sprintf("%s:%d", host, ip), diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index b99d32db1..06e19d8d2 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -50,7 +50,7 @@ func init() { func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *conf.Prometheus, listenIP, registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, notification *conf.Notification, config T, watchConfigNames []string, watchServiceNames []string, - rpcFn func(ctx context.Context, config T, client discovery.Conn, server grpc.ServiceRegistrar) error, + rpcFn func(ctx context.Context, config T, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error, options ...grpc.ServerOption) error { if notification != nil { @@ -148,9 +148,11 @@ func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *c if err != nil { return err } - if err := client.SetKey(ctx, prommetrics.BuildDiscoveryKey(prommetrics.APIKeyName), target); err != nil { - if !errors.Is(err, discovery.ErrNotSupportedKeyValue) { - return err + if autoSetPorts { + if err = client.SetWithLease(ctx, prommetrics.BuildDiscoveryKey(rpcRegisterName, index), target, prommetrics.TTL); err != nil { + if !errors.Is(err, discovery.ErrNotSupported) { + return err + } } } go func() { From 545125884e8b1dbfe4de71a34ad1a5a5663bef7c Mon Sep 17 00:00:00 2001 From: icey-yu <119291641+icey-yu@users.noreply.github.com> Date: Mon, 16 Jun 2025 10:46:07 +0800 Subject: [PATCH 5/9] fix: import friends send notification (#3420) --- internal/rpc/relation/friend.go | 4 ++-- internal/rpc/relation/notification.go | 16 +++++++++++----- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/internal/rpc/relation/friend.go b/internal/rpc/relation/friend.go index d05cf7d77..43909c8e3 100644 --- a/internal/rpc/relation/friend.go +++ b/internal/rpc/relation/friend.go @@ -192,7 +192,7 @@ func (s *friendServer) ImportFriends(ctx context.Context, req *relation.ImportFr FromUserID: req.OwnerUserID, ToUserID: userID, HandleResult: constant.FriendResponseAgree, - }) + }, false) } s.webhookAfterImportFriends(ctx, &s.config.WebhooksConfig.AfterImportFriends, req) @@ -221,7 +221,7 @@ func (s *friendServer) RespondFriendApply(ctx context.Context, req *relation.Res return nil, err } s.webhookAfterAddFriendAgree(ctx, &s.config.WebhooksConfig.AfterAddFriendAgree, req) - s.notificationSender.FriendApplicationAgreedNotification(ctx, req) + s.notificationSender.FriendApplicationAgreedNotification(ctx, req, true) return resp, nil } if req.HandleResult == constant.FriendResponseRefuse { diff --git a/internal/rpc/relation/notification.go b/internal/rpc/relation/notification.go index d6a03003e..4ee45e197 100644 --- a/internal/rpc/relation/notification.go +++ b/internal/rpc/relation/notification.go @@ -171,11 +171,17 @@ func (f *FriendNotificationSender) FriendApplicationAddNotification(ctx context. f.Notification(ctx, req.FromUserID, req.ToUserID, constant.FriendApplicationNotification, &tips) } -func (f *FriendNotificationSender) FriendApplicationAgreedNotification(ctx context.Context, req *relation.RespondFriendApplyReq) { - request, err := f.getFriendRequests(ctx, req.FromUserID, req.ToUserID) - if err != nil { - log.ZError(ctx, "FriendApplicationAgreedNotification get friend request", err, "fromUserID", req.FromUserID, "toUserID", req.ToUserID) - return +func (f *FriendNotificationSender) FriendApplicationAgreedNotification(ctx context.Context, req *relation.RespondFriendApplyReq, checkReq bool) { + var ( + request *sdkws.FriendRequest + err error + ) + if checkReq { + request, err = f.getFriendRequests(ctx, req.FromUserID, req.ToUserID) + if err != nil { + log.ZError(ctx, "FriendApplicationAgreedNotification get friend request", err, "fromUserID", req.FromUserID, "toUserID", req.ToUserID) + return + } } tips := sdkws.FriendApplicationApprovedTips{ FromToUserID: &sdkws.FromToUserID{ From 1e2375faca1cb2ee3b08723f9627856db950cad4 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Tue, 17 Jun 2025 15:12:25 +0800 Subject: [PATCH 6/9] fix: improve mileston PR workflows contents. (#3382) --- .github/workflows/merge-from-milestone.yml | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/.github/workflows/merge-from-milestone.yml b/.github/workflows/merge-from-milestone.yml index 1f5762ccb..a0ec2ac16 100644 --- a/.github/workflows/merge-from-milestone.yml +++ b/.github/workflows/merge-from-milestone.yml @@ -155,11 +155,27 @@ jobs: '{title: $title, head: $head, base: $base, body: $body}')") new_pr_number=$(echo "$response" | jq -r '.number') - echo "Created PR #$new_pr_number" - curl -s -X POST -H "Authorization: token $GITHUB_TOKEN" \ + if [[ "$new_pr_number" == "null" || -z "$new_pr_number" ]]; then + echo "Failed to create PR. Response: $response" + + git checkout $TARGET_BRANCH + + git branch -D $cherry_pick_branch + + echo "Deleted branch: $cherry_pick_branch" + git push origin --delete $cherry_pick_branch + else + echo "Created PR #$new_pr_number" + + curl -s -X POST -H "Authorization: token $GITHUB_TOKEN" \ -H "Accept: application/vnd.github+json" \ -d '{"labels": ["milestone-merge"]}' \ "https://api.github.com/repos/${{ github.repository }}/issues/$new_pr_number/labels" + fi + + echo "" + echo "----------------------------------------" + echo "" fi done From 1baf9a8e0f11cc94313eec2165e273482d1bdb0d Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Tue, 17 Jun 2025 15:33:25 +0800 Subject: [PATCH 7/9] feat: Implement etcd and kafka auth. (#3394) * feat: Implement etcd and kafka auth. * Update etcd command contents. * update contents. * feat: update auth logic to compatible old version. * update comment. * update contents. --- .env | 2 +- config/discovery.yml | 10 ++-- config/kafka.yml | 20 +++---- docker-compose.yml | 128 ++++++++++++++++++++++++++++++++++++++----- 4 files changed, 131 insertions(+), 29 deletions(-) diff --git a/.env b/.env index 0ab998037..2d4dfd4c7 100644 --- a/.env +++ b/.env @@ -2,7 +2,7 @@ MONGO_IMAGE=mongo:7.0 REDIS_IMAGE=redis:7.0.0 KAFKA_IMAGE=bitnami/kafka:3.5.1 MINIO_IMAGE=minio/minio:RELEASE.2024-01-11T07-46-16Z -ETCD_IMAGE=quay.io/coreos/etcd:v3.5.13 +ETCD_IMAGE=bitnami/etcd:3.5.13 PROMETHEUS_IMAGE=prom/prometheus:v2.45.6 ALERTMANAGER_IMAGE=prom/alertmanager:v0.27.0 GRAFANA_IMAGE=grafana/grafana:11.0.1 diff --git a/config/discovery.yml b/config/discovery.yml index e8d733e9f..2251dceb7 100644 --- a/config/discovery.yml +++ b/config/discovery.yml @@ -1,9 +1,11 @@ enable: etcd etcd: rootDirectory: openim - address: [ localhost:12379 ] - username: '' - password: '' + address: [localhost:12379] + ## Attention: If you set auth in etcd + ## you must also update the username and password in Chat project. + username: + password: kubernetes: namespace: default @@ -17,4 +19,4 @@ rpcService: group: group-rpc-service auth: auth-rpc-service conversation: conversation-rpc-service - third: third-rpc-service \ No newline at end of file + third: third-rpc-service diff --git a/config/kafka.yml b/config/kafka.yml index fd06ae2bb..2e9b5296c 100644 --- a/config/kafka.yml +++ b/config/kafka.yml @@ -1,13 +1,13 @@ -# Username for authentication -username: '' -# Password for authentication -password: '' +## Kafka authentication +username: +password: + # Producer acknowledgment settings -producerAck: +producerAck: # Compression type to use (e.g., none, gzip, snappy) compressType: none # List of Kafka broker addresses -address: [ localhost:19094 ] +address: [localhost:19094] # Kafka topic for Redis integration toRedisTopic: toRedis # Kafka topic for MongoDB integration @@ -29,12 +29,12 @@ tls: # Enable or disable TLS enableTLS: false # CA certificate file path - caCrt: + caCrt: # Client certificate file path - clientCrt: + clientCrt: # Client key file path - clientKey: + clientKey: # Client key password - clientKeyPwd: + clientKeyPwd: # Whether to skip TLS verification (not recommended for production) insecureSkipVerify: false diff --git a/docker-compose.yml b/docker-compose.yml index 65b4e6625..60fc865f2 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -83,8 +83,83 @@ services: - ETCD_INITIAL_CLUSTER=s1=http://0.0.0.0:2380 - ETCD_INITIAL_CLUSTER_TOKEN=tkn - ETCD_INITIAL_CLUSTER_STATE=new + - ALLOW_NONE_AUTHENTICATION=no + + ## Optional: Enable etcd authentication by setting the following credentials + # - ETCD_ROOT_USER=root + # - ETCD_ROOT_PASSWORD=openIM123 + # - ETCD_USERNAME=openIM + # - ETCD_PASSWORD=openIM123 volumes: - "${DATA_DIR}/components/etcd:/etcd-data" + command: > + /bin/sh -c ' + etcd & + export ETCDCTL_API=3 + echo "Waiting for etcd to become healthy..." + until etcdctl --endpoints=http://127.0.0.1:2379 endpoint health &>/dev/null; do + echo "Waiting for ETCD to start..." + sleep 1 + done + + echo "etcd is healthy." + + if [ -n "$${ETCD_ROOT_USER}" ] && [ -n "$${ETCD_ROOT_PASSWORD}" ] && [ -n "$${ETCD_USERNAME}" ] && [ -n "$${ETCD_PASSWORD}" ]; then + echo "Authentication credentials provided. Setting up authentication..." + + echo "Checking authentication status..." + if ! etcdctl --endpoints=http://127.0.0.1:2379 auth status | grep -q "Authentication Status: true"; then + echo "Authentication is disabled. Creating users and enabling..." + + # Create users and setup permissions + etcdctl --endpoints=http://127.0.0.1:2379 user add $${ETCD_ROOT_USER} --new-user-password=$${ETCD_ROOT_PASSWORD} || true + etcdctl --endpoints=http://127.0.0.1:2379 user add $${ETCD_USERNAME} --new-user-password=$${ETCD_PASSWORD} || true + + etcdctl --endpoints=http://127.0.0.1:2379 role add openim-role || true + etcdctl --endpoints=http://127.0.0.1:2379 role grant-permission openim-role --prefix=true readwrite / || true + etcdctl --endpoints=http://127.0.0.1:2379 role grant-permission openim-role --prefix=true readwrite "" || true + etcdctl --endpoints=http://127.0.0.1:2379 user grant-role $${ETCD_USERNAME} openim-role || true + + etcdctl --endpoints=http://127.0.0.1:2379 user grant-role $${ETCD_ROOT_USER} $${ETCD_USERNAME} root || true + + echo "Enabling authentication..." + etcdctl --endpoints=http://127.0.0.1:2379 auth enable + echo "Authentication enabled successfully" + else + echo "Authentication is already enabled. Checking OpenIM user..." + + # Check if openIM user exists and can perform operations + if ! etcdctl --endpoints=http://127.0.0.1:2379 --user=$${ETCD_USERNAME}:$${ETCD_PASSWORD} put /test/auth "auth-check" &>/dev/null; then + echo "OpenIM user test failed. Recreating user with root credentials..." + + # Try to create/update the openIM user using root credentials + etcdctl --endpoints=http://127.0.0.1:2379 --user=$${ETCD_ROOT_USER}:$${ETCD_ROOT_PASSWORD} user add $${ETCD_USERNAME} --new-user-password=$${ETCD_PASSWORD} --no-password-file || true + etcdctl --endpoints=http://127.0.0.1:2379 --user=$${ETCD_ROOT_USER}:$${ETCD_ROOT_PASSWORD} role add openim-role || true + etcdctl --endpoints=http://127.0.0.1:2379 --user=$${ETCD_ROOT_USER}:$${ETCD_ROOT_PASSWORD} role grant-permission openim-role --prefix=true readwrite / || true + etcdctl --endpoints=http://127.0.0.1:2379 --user=$${ETCD_ROOT_USER}:$${ETCD_ROOT_PASSWORD} role grant-permission openim-role --prefix=true readwrite "" || true + etcdctl --endpoints=http://127.0.0.1:2379 --user=$${ETCD_ROOT_USER}:$${ETCD_ROOT_PASSWORD} user grant-role $${ETCD_USERNAME} openim-role || true + etcdctl --endpoints=http://127.0.0.1:2379 user grant-role $${ETCD_ROOT_USER} $${ETCD_USERNAME} root || true + + echo "OpenIM user recreated with required permissions" + else + echo "OpenIM user exists and has correct permissions" + etcdctl --endpoints=http://127.0.0.1:2379 --user=$${ETCD_USERNAME}:$${ETCD_PASSWORD} del /test/auth &>/dev/null + fi + fi + echo "Testing authentication with OpenIM user..." + if etcdctl --endpoints=http://127.0.0.1:2379 --user=$${ETCD_USERNAME}:$${ETCD_PASSWORD} put /test/auth "auth-works"; then + echo "Authentication working properly" + etcdctl --endpoints=http://127.0.0.1:2379 --user=$${ETCD_USERNAME}:$${ETCD_PASSWORD} del /test/auth + else + echo "WARNING: Authentication test failed" + fi + else + echo "No authentication credentials provided. Running in no-auth mode." + echo "To enable authentication, set ETCD_ROOT_USER, ETCD_ROOT_PASSWORD, ETCD_USERNAME, and ETCD_PASSWORD environment variables." + fi + + tail -f /dev/null + ' restart: always networks: - openim @@ -104,12 +179,38 @@ services: KAFKA_CFG_NODE_ID: 0 KAFKA_CFG_PROCESS_ROLES: controller,broker KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@kafka:9093 - KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094 - KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,EXTERNAL://localhost:19094 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_NUM_PARTITIONS: 8 KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true" + + KAFKA_CFG_LISTENERS: "PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094" + KAFKA_CFG_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:9092,EXTERNAL://localhost:19094" + KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT" + KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "PLAINTEXT" + + # Authentication configuration variables - comment out to disable auth + # KAFKA_USERNAME: "openIM" + # KAFKA_PASSWORD: "openIM123" + command: > + /bin/sh -c ' + if [ -n "$${KAFKA_USERNAME}" ] && [ -n "$${KAFKA_PASSWORD}" ]; then + echo "=== Kafka SASL Authentication ENABLED ===" + echo "Username: $${KAFKA_USERNAME}" + + # Set environment variables for SASL authentication + export KAFKA_CFG_LISTENERS="SASL_PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094" + export KAFKA_CFG_ADVERTISED_LISTENERS="SASL_PLAINTEXT://kafka:9092,EXTERNAL://localhost:19094" + export KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP="CONTROLLER:PLAINTEXT,EXTERNAL:SASL_PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT" + export KAFKA_CFG_SASL_ENABLED_MECHANISMS="PLAIN" + export KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL="PLAIN" + export KAFKA_CFG_INTER_BROKER_LISTENER_NAME="SASL_PLAINTEXT" + export KAFKA_CLIENT_USERS="$${KAFKA_USERNAME}" + export KAFKA_CLIENT_PASSWORDS="$${KAFKA_PASSWORD}" + fi + + # Start Kafka with the configured environment + exec /opt/bitnami/scripts/kafka/entrypoint.sh /opt/bitnami/scripts/kafka/run.sh + ' networks: - openim @@ -148,7 +249,7 @@ services: - "11002:80" networks: - openim - + prometheus: image: ${PROMETHEUS_IMAGE} container_name: prometheus @@ -161,9 +262,9 @@ services: - ./config/instance-down-rules.yml:/etc/prometheus/instance-down-rules.yml - ${DATA_DIR}/components/prometheus/data:/prometheus command: - - '--config.file=/etc/prometheus/prometheus.yml' - - '--storage.tsdb.path=/prometheus' - - '--web.listen-address=:${PROMETHEUS_PORT}' + - "--config.file=/etc/prometheus/prometheus.yml" + - "--storage.tsdb.path=/prometheus" + - "--web.listen-address=:${PROMETHEUS_PORT}" network_mode: host alertmanager: @@ -176,8 +277,8 @@ services: - ./config/alertmanager.yml:/etc/alertmanager/alertmanager.yml - ./config/email.tmpl:/etc/alertmanager/email.tmpl command: - - '--config.file=/etc/alertmanager/alertmanager.yml' - - '--web.listen-address=:${ALERTMANAGER_PORT}' + - "--config.file=/etc/alertmanager/alertmanager.yml" + - "--web.listen-address=:${ALERTMANAGER_PORT}" network_mode: host grafana: @@ -209,9 +310,8 @@ services: - /sys:/host/sys:ro - /:/rootfs:ro command: - - '--path.procfs=/host/proc' - - '--path.sysfs=/host/sys' - - '--path.rootfs=/rootfs' - - '--web.listen-address=:19100' + - "--path.procfs=/host/proc" + - "--path.sysfs=/host/sys" + - "--path.rootfs=/rootfs" + - "--web.listen-address=:19100" network_mode: host - From 8f7b02979d3ffb7530005e5352bce53f6101fe18 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Wed, 18 Jun 2025 14:31:09 +0800 Subject: [PATCH 8/9] feat: support redis sentinel. (#3423) * feat: support redis sentinel. * update docker compose contents. * update config contents. * revert content. * supoort redisMode. * update config. * remvove print. --- config/redis.yml | 13 ++++-- go.mod | 2 +- go.sum | 4 +- pkg/common/config/config.go | 44 ++++++++++++------- pkg/common/discovery/discoveryregister.go | 2 +- pkg/common/storage/cache/redis/online_test.go | 2 +- tools/seq/internal/seq.go | 12 +++-- 7 files changed, 52 insertions(+), 27 deletions(-) diff --git a/config/redis.yml b/config/redis.yml index 2448bcb5c..5e62719ae 100644 --- a/config/redis.yml +++ b/config/redis.yml @@ -1,7 +1,14 @@ -address: [ localhost:16379 ] -username: +address: [localhost:16379] +username: password: openIM123 -clusterMode: false +# redis Mode, including "standalone","cluster","sentinel" +redisMode: "standalone" db: 0 maxRetry: 10 poolSize: 100 +# Sentinel configuration (only used when redisMode is "sentinel") +sentinelMode: + masterName: "redis-master" + sentinelsAddrs: ["127.0.0.1:26379", "127.0.0.1:26380", "127.0.0.1:26381"] + routeByLatency: true + routeRandomly: true diff --git a/go.mod b/go.mod index 845f75bb2..0ecd3a113 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 github.com/openimsdk/protocol v0.0.73-alpha.12 - github.com/openimsdk/tools v0.0.50-alpha.85 + github.com/openimsdk/tools v0.0.50-alpha.91 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 diff --git a/go.sum b/go.sum index b775a1056..d8eb07f80 100644 --- a/go.sum +++ b/go.sum @@ -349,8 +349,8 @@ github.com/openimsdk/gomake v0.0.15-alpha.5 h1:eEZCEHm+NsmcO3onXZPIUbGFCYPYbsX5b github.com/openimsdk/gomake v0.0.15-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/protocol v0.0.73-alpha.12 h1:2NYawXeHChYUeSme6QJ9pOLh+Empce2WmwEtbP4JvKk= github.com/openimsdk/protocol v0.0.73-alpha.12/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= -github.com/openimsdk/tools v0.0.50-alpha.85 h1:OqTUYx6r7Zp/eH8FKB08XeNjPV405TUIG9QT6QQ+F+s= -github.com/openimsdk/tools v0.0.50-alpha.85/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= +github.com/openimsdk/tools v0.0.50-alpha.91 h1:4zXtTwwCIUawet1VDvnD3C/1E4N4ostDfh+RfL5nz90= +github.com/openimsdk/tools v0.0.50-alpha.91/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index cd57a11cf..619571064 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -323,14 +323,22 @@ type RPC struct { } type Redis struct { - Disable bool `yaml:"-"` - Address []string `yaml:"address"` - Username string `yaml:"username"` - Password string `yaml:"password"` - ClusterMode bool `yaml:"clusterMode"` - DB int `yaml:"db"` - MaxRetry int `yaml:"maxRetry"` - PoolSize int `yaml:"poolSize"` + Disable bool `yaml:"-"` + Address []string `yaml:"address"` + Username string `yaml:"username"` + Password string `yaml:"password"` + RedisMode string `yaml:"redisMode"` + DB int `yaml:"db"` + MaxRetry int `yaml:"maxRetry"` + PoolSize int `yaml:"poolSize"` + SentinelMode Sentinel `yaml:"sentinelMode"` +} + +type Sentinel struct { + MasterName string `yaml:"masterName"` + SentinelAddrs []string `yaml:"sentinelsAddrs"` + RouteByLatency bool `yaml:"routeByLatency"` + RouteRandomly bool `yaml:"routeRandomly"` } type BeforeConfig struct { @@ -487,13 +495,19 @@ func (m *Mongo) Build() *mongoutil.Config { func (r *Redis) Build() *redisutil.Config { return &redisutil.Config{ - ClusterMode: r.ClusterMode, - Address: r.Address, - Username: r.Username, - Password: r.Password, - DB: r.DB, - MaxRetry: r.MaxRetry, - PoolSize: r.PoolSize, + RedisMode: r.RedisMode, + Address: r.Address, + Username: r.Username, + Password: r.Password, + DB: r.DB, + MaxRetry: r.MaxRetry, + PoolSize: r.PoolSize, + Sentinel: &redisutil.Sentinel{ + MasterName: r.SentinelMode.MasterName, + SentinelAddrs: r.SentinelMode.SentinelAddrs, + RouteByLatency: r.SentinelMode.RouteByLatency, + RouteRandomly: r.SentinelMode.RouteRandomly, + }, } } diff --git a/pkg/common/discovery/discoveryregister.go b/pkg/common/discovery/discoveryregister.go index dc100be5c..87333fcac 100644 --- a/pkg/common/discovery/discoveryregister.go +++ b/pkg/common/discovery/discoveryregister.go @@ -35,7 +35,7 @@ func NewDiscoveryRegister(discovery *config.Discovery, watchNames []string) (dis return standalone.GetSvcDiscoveryRegistry(), nil } if runtimeenv.RuntimeEnvironment() == config.KUBERNETES { - return kubernetes.NewKubernetesConnManager(discovery.Kubernetes.Namespace, + return kubernetes.NewConnManager(discovery.Kubernetes.Namespace, nil, grpc.WithDefaultCallOptions( grpc.MaxCallSendMsgSize(1024*1024*20), ), diff --git a/pkg/common/storage/cache/redis/online_test.go b/pkg/common/storage/cache/redis/online_test.go index 0306f6f5d..d2ac283e0 100644 --- a/pkg/common/storage/cache/redis/online_test.go +++ b/pkg/common/storage/cache/redis/online_test.go @@ -26,7 +26,7 @@ func TestName111111(t *testing.T) { "172.16.8.124:7005", "172.16.8.124:7006", }, - ClusterMode: true, + RedisMode: "cluster", Password: "passwd123", //Address: []string{"localhost:16379"}, //Password: "openIM123", diff --git a/tools/seq/internal/seq.go b/tools/seq/internal/seq.go index 9fd352a96..e90c4a41b 100644 --- a/tools/seq/internal/seq.go +++ b/tools/seq/internal/seq.go @@ -28,6 +28,8 @@ import ( "go.mongodb.org/mongo-driver/mongo/options" ) +const StructTagName = "yaml" + const ( MaxSeq = "MAX_SEQ:" MinSeq = "MIN_SEQ:" @@ -54,13 +56,14 @@ func readConfig[T any](dir string, name string) (*T, error) { if err := v.ReadInConfig(); err != nil { return nil, err } - fn := func(config *mapstructure.DecoderConfig) { - config.TagName = "mapstructure" - } + var conf T - if err := v.Unmarshal(&conf, fn); err != nil { + if err := v.Unmarshal(&conf, func(config *mapstructure.DecoderConfig) { + config.TagName = StructTagName + }); err != nil { return nil, err } + return &conf, nil } @@ -69,6 +72,7 @@ func Main(conf string, del time.Duration) error { if err != nil { return err } + mongodbConfig, err := readConfig[config.Mongo](conf, config.MongodbConfigFileName) if err != nil { return err From 53bf8acc213420eca8dcbf94f4978afefcb71153 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Wed, 18 Jun 2025 14:32:29 +0800 Subject: [PATCH 9/9] fix: solve webhook incorrect attentionID references. (#3411) --- config/webhooks.yml | 4 +- internal/msgtransfer/callback.go | 10 ++++- internal/rpc/msg/callback.go | 77 ++++++++++++++++---------------- 3 files changed, 49 insertions(+), 42 deletions(-) diff --git a/config/webhooks.yml b/config/webhooks.yml index 283a23ed4..9fd3eb339 100644 --- a/config/webhooks.yml +++ b/config/webhooks.yml @@ -16,7 +16,7 @@ afterUpdateUserInfoEx: afterSendSingleMsg: enable: false timeout: 5 - # Only the recvID specified in attentionIds will send the callback + # Only the recvIDs specified in attentionIds will send the callback # if not set, all user messages will be callback attentionIds: [] # See beforeSendSingleMsg comment. @@ -36,7 +36,7 @@ beforeMsgModify: afterSendGroupMsg: enable: false timeout: 5 - # Only the recvID specified in attentionIds will send the callback + # Only the GroupIDs specified in attentionIds will send the callback # if not set, all user messages will be callback attentionIds: [] # See beforeSendSingleMsg comment. diff --git a/internal/msgtransfer/callback.go b/internal/msgtransfer/callback.go index ea51c2839..f0d439779 100644 --- a/internal/msgtransfer/callback.go +++ b/internal/msgtransfer/callback.go @@ -55,9 +55,11 @@ func (mc *OnlineHistoryMongoConsumerHandler) webhookAfterSendSingleMsg(ctx conte if msg.ContentType == constant.Typing { return } + if !filterAfterMsg(msg, after) { return } + cbReq := &cbapi.CallbackAfterSendSingleMsgReq{ CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand), RecvID: msg.RecvID, @@ -69,9 +71,11 @@ func (mc *OnlineHistoryMongoConsumerHandler) webhookAfterSendGroupMsg(ctx contex if msg.ContentType == constant.Typing { return } + if !filterAfterMsg(msg, after) { return } + cbReq := &cbapi.CallbackAfterSendGroupMsgReq{ CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendGroupMsgCommand), GroupID: msg.GroupID, @@ -98,7 +102,11 @@ func filterAfterMsg(msg *sdkws.MsgData, after *config.AfterConfig) bool { func filterMsg(msg *sdkws.MsgData, attentionIds []string, deniedTypes []int32) bool { // According to the attentionIds configuration, only some users are sent - if len(attentionIds) != 0 && !datautil.Contain(msg.RecvID, attentionIds...) { + if len(attentionIds) != 0 && msg.ContentType == constant.SingleChatType && !datautil.Contain(msg.RecvID, attentionIds...) { + return false + } + + if len(attentionIds) != 0 && msg.ContentType == constant.ReadGroupChatType && !datautil.Contain(msg.GroupID, attentionIds...) { return false } diff --git a/internal/rpc/msg/callback.go b/internal/rpc/msg/callback.go index 5bc98de0c..2c00efd43 100644 --- a/internal/rpc/msg/callback.go +++ b/internal/rpc/msg/callback.go @@ -16,13 +16,10 @@ package msg import ( "context" - "encoding/base64" "encoding/json" - "github.com/openimsdk/open-im-server/v3/pkg/apistruct" "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" "github.com/openimsdk/tools/errs" - "github.com/openimsdk/tools/utils/stringutil" cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" "github.com/openimsdk/open-im-server/v3/pkg/common/config" @@ -89,19 +86,20 @@ func (m *msgServer) webhookBeforeSendSingleMsg(ctx context.Context, before *conf }) } -func (m *msgServer) webhookAfterSendSingleMsg(ctx context.Context, after *config.AfterConfig, msg *pbchat.SendMsgReq) { - if msg.MsgData.ContentType == constant.Typing { - return - } - if !filterAfterMsg(msg, after) { - return - } - cbReq := &cbapi.CallbackAfterSendSingleMsgReq{ - CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand), - RecvID: msg.MsgData.RecvID, - } - m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendSingleMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData)) -} +// Move to msgtransfer +// func (m *msgServer) webhookAfterSendSingleMsg(ctx context.Context, after *config.AfterConfig, msg *pbchat.SendMsgReq) { +// if msg.MsgData.ContentType == constant.Typing { +// return +// } +// if !filterAfterMsg(msg, after) { +// return +// } +// cbReq := &cbapi.CallbackAfterSendSingleMsgReq{ +// CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand), +// RecvID: msg.MsgData.RecvID, +// } +// m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendSingleMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData)) +// } func (m *msgServer) webhookBeforeSendGroupMsg(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq) error { return webhook.WithCondition(ctx, before, func(ctx context.Context) error { @@ -123,20 +121,21 @@ func (m *msgServer) webhookBeforeSendGroupMsg(ctx context.Context, before *confi }) } -func (m *msgServer) webhookAfterSendGroupMsg(ctx context.Context, after *config.AfterConfig, msg *pbchat.SendMsgReq) { - if msg.MsgData.ContentType == constant.Typing { - return - } - if !filterAfterMsg(msg, after) { - return - } - cbReq := &cbapi.CallbackAfterSendGroupMsgReq{ - CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendGroupMsgCommand), - GroupID: msg.MsgData.GroupID, - } +// Move to msgtransfer +// func (m *msgServer) webhookAfterSendGroupMsg(ctx context.Context, after *config.AfterConfig, msg *pbchat.SendMsgReq) { +// if msg.MsgData.ContentType == constant.Typing { +// return +// } +// if !filterAfterMsg(msg, after) { +// return +// } +// cbReq := &cbapi.CallbackAfterSendGroupMsgReq{ +// CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendGroupMsgCommand), +// GroupID: msg.MsgData.GroupID, +// } - m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData)) -} +// m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData)) +// } func (m *msgServer) webhookBeforeMsgModify(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq, beforeMsgData **sdkws.MsgData) error { return webhook.WithCondition(ctx, before, func(ctx context.Context) error { @@ -205,14 +204,14 @@ func (m *msgServer) webhookAfterRevokeMsg(ctx context.Context, after *config.Aft m.webhookClient.AsyncPost(ctx, callbackReq.GetCallbackCommand(), callbackReq, &cbapi.CallbackAfterRevokeMsgResp{}, after) } -func buildKeyMsgDataQuery(msg *sdkws.MsgData) map[string]string { - keyMsgData := apistruct.KeyMsgData{ - SendID: msg.SendID, - RecvID: msg.RecvID, - GroupID: msg.GroupID, - } +// func buildKeyMsgDataQuery(msg *sdkws.MsgData) map[string]string { +// keyMsgData := apistruct.KeyMsgData{ +// SendID: msg.SendID, +// RecvID: msg.RecvID, +// GroupID: msg.GroupID, +// } - return map[string]string{ - webhook.Key: base64.StdEncoding.EncodeToString(stringutil.StructToJsonBytes(keyMsgData)), - } -} +// return map[string]string{ +// webhook.Key: base64.StdEncoding.EncodeToString(stringutil.StructToJsonBytes(keyMsgData)), +// } +// }