From eebff88a29a8412330d446ed23db69b990b29224 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Mon, 28 Jul 2025 16:57:58 +0800 Subject: [PATCH 1/3] fix: performance issues with Kafka caused by encapsulating the MQ interface --- go.mod | 2 +- go.sum | 4 ++-- internal/msgtransfer/init.go | 6 +++--- internal/msgtransfer/online_history_msg_handler.go | 13 ++++++++++--- internal/msgtransfer/online_msg_to_mongo_handler.go | 10 +++++++--- internal/push/push.go | 9 +++++---- 6 files changed, 28 insertions(+), 16 deletions(-) diff --git a/go.mod b/go.mod index c06451aaa..775765706 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 github.com/openimsdk/protocol v0.0.73-alpha.12 - github.com/openimsdk/tools v0.0.50-alpha.96 + github.com/openimsdk/tools v0.0.50-alpha.97 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 diff --git a/go.sum b/go.sum index d29eb0f3f..329a916ec 100644 --- a/go.sum +++ b/go.sum @@ -349,8 +349,8 @@ github.com/openimsdk/gomake v0.0.15-alpha.11 h1:PQudYDRESYeYlUYrrLLJhYIlUPO5x7FA github.com/openimsdk/gomake v0.0.15-alpha.11/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/protocol v0.0.73-alpha.12 h1:2NYawXeHChYUeSme6QJ9pOLh+Empce2WmwEtbP4JvKk= github.com/openimsdk/protocol v0.0.73-alpha.12/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= -github.com/openimsdk/tools v0.0.50-alpha.96 h1:U44Fq2jHiEvGi9zuYAnTRNx3Xd9T7P/kBAZLHvQ8xg4= -github.com/openimsdk/tools v0.0.50-alpha.96/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= +github.com/openimsdk/tools v0.0.50-alpha.97 h1:6ik5w3PpgDG6VjSo3nb3FT/fxN3JX7iIARVxVu9g7VY= +github.com/openimsdk/tools v0.0.50-alpha.97/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index bbec3f9a2..35026c79a 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -134,7 +134,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg if err != nil { return err } - historyMongoHandler := NewOnlineHistoryMongoConsumerHandler(msgTransferDatabase,config) + historyMongoHandler := NewOnlineHistoryMongoConsumerHandler(msgTransferDatabase, config) msgTransfer := &MsgTransfer{ historyConsumer: historyConsumer, @@ -161,8 +161,8 @@ func (m *MsgTransfer) Start(ctx context.Context) error { }() go func() { - fn := func(ctx context.Context, key string, value []byte) error { - m.historyMongoHandler.HandleChatWs2Mongo(ctx, key, value) + fn := func(msg mq.Message) error { + m.historyMongoHandler.HandleChatWs2Mongo(msg) return nil } for { diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 05775a1e6..8b212774a 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "errors" + "github.com/openimsdk/tools/mq" "sync" "time" @@ -77,6 +78,7 @@ type ConsumerMessage struct { Ctx context.Context Key string Value []byte + Raw mq.Message } func NewOnlineHistoryRedisConsumerHandler(ctx context.Context, client discovery.Conn, config *Config, database controller.MsgTransferDatabase) (*OnlineHistoryRedisConsumerHandler, error) { @@ -113,6 +115,11 @@ func NewOnlineHistoryRedisConsumerHandler(ctx context.Context, client discovery. b.Do = och.do och.redisMessageBatches = b + och.redisMessageBatches.OnComplete = func(lastMessage *ConsumerMessage, totalCount int) { + lastMessage.Raw.Mark() + lastMessage.Raw.Commit() + } + return &och, nil } func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID int, val *batcher.Msg[ConsumerMessage]) { @@ -388,10 +395,10 @@ func withAggregationCtx(ctx context.Context, values []*ContextMsg) context.Conte return mcontext.SetOperationID(ctx, allMessageOperationID) } -func (och *OnlineHistoryRedisConsumerHandler) HandlerRedisMessage(ctx context.Context, key string, value []byte) error { // a instance in the consumer group - err := och.redisMessageBatches.Put(ctx, &ConsumerMessage{Ctx: ctx, Key: key, Value: value}) +func (och *OnlineHistoryRedisConsumerHandler) HandlerRedisMessage(msg mq.Message) error { // a instance in the consumer group + err := och.redisMessageBatches.Put(msg.Context(), &ConsumerMessage{Ctx: msg.Context(), Key: msg.Key(), Value: msg.Value(), Raw: msg}) if err != nil { - log.ZWarn(ctx, "put msg to error", err, "key", key, "value", value) + log.ZWarn(msg.Context(), "put msg to error", err, "key", msg.Key(), "value", msg.Value()) } return nil } diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index 6c1498f82..8611af7ea 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -15,12 +15,12 @@ package msgtransfer import ( - "context" + "github.com/openimsdk/protocol/constant" + "github.com/openimsdk/tools/mq" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" - "github.com/openimsdk/protocol/constant" pbmsg "github.com/openimsdk/protocol/msg" "github.com/openimsdk/tools/log" "google.golang.org/protobuf/proto" @@ -40,7 +40,10 @@ func NewOnlineHistoryMongoConsumerHandler(database controller.MsgTransferDatabas } } -func (mc *OnlineHistoryMongoConsumerHandler) HandleChatWs2Mongo(ctx context.Context, key string, msg []byte) { +func (mc *OnlineHistoryMongoConsumerHandler) HandleChatWs2Mongo(val mq.Message) { + ctx := val.Context() + key := val.Key() + msg := val.Value() msgFromMQ := pbmsg.MsgDataToMongoByMQ{} err := proto.Unmarshal(msg, &msgFromMQ) if err != nil { @@ -58,6 +61,7 @@ func (mc *OnlineHistoryMongoConsumerHandler) HandleChatWs2Mongo(ctx context.Cont prommetrics.MsgInsertMongoFailedCounter.Inc() } else { prommetrics.MsgInsertMongoSuccessCounter.Inc() + val.Mark() } for _, msgData := range msgFromMQ.MsgData { diff --git a/internal/push/push.go b/internal/push/push.go index 1d6f8cb30..bf95b6acc 100644 --- a/internal/push/push.go +++ b/internal/push/push.go @@ -2,6 +2,7 @@ package push import ( "context" + "github.com/openimsdk/tools/mq" "math/rand" "strconv" @@ -106,8 +107,8 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg go func() { pushHandler.WaitCache() - fn := func(ctx context.Context, key string, value []byte) error { - pushHandler.HandleMs2PsChat(authverify.WithTempAdmin(ctx), value) + fn := func(msg mq.Message) error { + pushHandler.HandleMs2PsChat(authverify.WithTempAdmin(msg.Context()), msg.Value()) return nil } consumerCtx := mcontext.SetOperationID(context.Background(), "push_"+strconv.Itoa(int(rand.Uint32()))) @@ -121,8 +122,8 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg }() go func() { - fn := func(ctx context.Context, key string, value []byte) error { - offlineHandler.HandleMsg2OfflinePush(ctx, value) + fn := func(msg mq.Message) error { + offlineHandler.HandleMsg2OfflinePush(msg.Context(), msg.Value()) return nil } consumerCtx := mcontext.SetOperationID(context.Background(), "push_"+strconv.Itoa(int(rand.Uint32()))) From d9c3504afd6bb4f8a93154215995d8acbde01d7f Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Fri, 1 Aug 2025 10:13:42 +0800 Subject: [PATCH 2/3] fix: admin token in standalone mode --- internal/api/router.go | 5 +++++ pkg/common/storage/cache/redis/token.go | 9 ++++----- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/internal/api/router.go b/internal/api/router.go index 8a4199581..1d3a92dd7 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -97,6 +97,11 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, cf case BestSpeed: r.Use(gzip.Gzip(gzip.BestSpeed)) } + if config.Standalone() { + r.Use(func(c *gin.Context) { + c.Set(authverify.CtxAdminUserIDsKey, cfg.Share.IMAdminUser.UserIDs) + }) + } r.Use(api.GinLogger(), prommetricsGin(), gin.RecoveryWithWriter(gin.DefaultErrorWriter, mw.GinPanicErr), mw.CorsHandler(), mw.GinParseOperationID(), GinParseToken(rpcli.NewAuthClient(authConn)), setGinIsAdmin(cfg.Share.IMAdminUser.UserIDs)) diff --git a/pkg/common/storage/cache/redis/token.go b/pkg/common/storage/cache/redis/token.go index b3870daee..c74ccce66 100644 --- a/pkg/common/storage/cache/redis/token.go +++ b/pkg/common/storage/cache/redis/token.go @@ -165,16 +165,15 @@ func (c *tokenCache) DeleteTokenByTokenMap(ctx context.Context, userID string, t } func (c *tokenCache) DeleteAndSetTemporary(ctx context.Context, userID string, platformID int, fields []string) error { - key := cachekey.GetTokenKey(userID, platformID) - if err := c.rdb.HDel(ctx, key, fields...).Err(); err != nil { - return errs.Wrap(err) - } for _, f := range fields { k := cachekey.GetTemporaryTokenKey(userID, platformID, f) if err := c.rdb.Set(ctx, k, "", time.Minute*5).Err(); err != nil { return errs.Wrap(err) } } - + key := cachekey.GetTokenKey(userID, platformID) + if err := c.rdb.HDel(ctx, key, fields...).Err(); err != nil { + return errs.Wrap(err) + } return nil } From 9a1d2a85cdb555d7eef4e79d2c866dacdfbd86ae Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Wed, 15 Oct 2025 10:10:29 +0800 Subject: [PATCH 3/3] fix: full id version --- internal/rpc/conversation/sync.go | 2 +- internal/rpc/group/sync.go | 4 ++-- internal/rpc/relation/sync.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/rpc/conversation/sync.go b/internal/rpc/conversation/sync.go index a24dd85c6..85128f719 100644 --- a/internal/rpc/conversation/sync.go +++ b/internal/rpc/conversation/sync.go @@ -27,7 +27,7 @@ func (c *conversationServer) GetFullOwnerConversationIDs(ctx context.Context, re conversationIDs = nil } return &conversation.GetFullOwnerConversationIDsResp{ - Version: idHash, + Version: uint64(vl.Version), VersionID: vl.ID.Hex(), Equal: req.IdHash == idHash, ConversationIDs: conversationIDs, diff --git a/internal/rpc/group/sync.go b/internal/rpc/group/sync.go index b864fbf53..92c7a60ce 100644 --- a/internal/rpc/group/sync.go +++ b/internal/rpc/group/sync.go @@ -34,7 +34,7 @@ func (g *groupServer) GetFullGroupMemberUserIDs(ctx context.Context, req *pbgrou userIDs = nil } return &pbgroup.GetFullGroupMemberUserIDsResp{ - Version: idHash, + Version: uint64(vl.Version), VersionID: vl.ID.Hex(), Equal: req.IdHash == idHash, UserIDs: userIDs, @@ -58,7 +58,7 @@ func (g *groupServer) GetFullJoinGroupIDs(ctx context.Context, req *pbgroup.GetF groupIDs = nil } return &pbgroup.GetFullJoinGroupIDsResp{ - Version: idHash, + Version: uint64(vl.Version), VersionID: vl.ID.Hex(), Equal: req.IdHash == idHash, GroupIDs: groupIDs, diff --git a/internal/rpc/relation/sync.go b/internal/rpc/relation/sync.go index 79fa0858c..187f6238d 100644 --- a/internal/rpc/relation/sync.go +++ b/internal/rpc/relation/sync.go @@ -56,7 +56,7 @@ func (s *friendServer) GetFullFriendUserIDs(ctx context.Context, req *relation.G userIDs = nil } return &relation.GetFullFriendUserIDsResp{ - Version: idHash, + Version: uint64(vl.Version), VersionID: vl.ID.Hex(), Equal: req.IdHash == idHash, UserIDs: userIDs,