From eebff88a29a8412330d446ed23db69b990b29224 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Mon, 28 Jul 2025 16:57:58 +0800 Subject: [PATCH 1/4] fix: performance issues with Kafka caused by encapsulating the MQ interface --- go.mod | 2 +- go.sum | 4 ++-- internal/msgtransfer/init.go | 6 +++--- internal/msgtransfer/online_history_msg_handler.go | 13 ++++++++++--- internal/msgtransfer/online_msg_to_mongo_handler.go | 10 +++++++--- internal/push/push.go | 9 +++++---- 6 files changed, 28 insertions(+), 16 deletions(-) diff --git a/go.mod b/go.mod index c06451aaa..775765706 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 github.com/openimsdk/protocol v0.0.73-alpha.12 - github.com/openimsdk/tools v0.0.50-alpha.96 + github.com/openimsdk/tools v0.0.50-alpha.97 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 diff --git a/go.sum b/go.sum index d29eb0f3f..329a916ec 100644 --- a/go.sum +++ b/go.sum @@ -349,8 +349,8 @@ github.com/openimsdk/gomake v0.0.15-alpha.11 h1:PQudYDRESYeYlUYrrLLJhYIlUPO5x7FA github.com/openimsdk/gomake v0.0.15-alpha.11/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/protocol v0.0.73-alpha.12 h1:2NYawXeHChYUeSme6QJ9pOLh+Empce2WmwEtbP4JvKk= github.com/openimsdk/protocol v0.0.73-alpha.12/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= -github.com/openimsdk/tools v0.0.50-alpha.96 h1:U44Fq2jHiEvGi9zuYAnTRNx3Xd9T7P/kBAZLHvQ8xg4= -github.com/openimsdk/tools v0.0.50-alpha.96/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= +github.com/openimsdk/tools v0.0.50-alpha.97 h1:6ik5w3PpgDG6VjSo3nb3FT/fxN3JX7iIARVxVu9g7VY= +github.com/openimsdk/tools v0.0.50-alpha.97/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index bbec3f9a2..35026c79a 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -134,7 +134,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg if err != nil { return err } - historyMongoHandler := NewOnlineHistoryMongoConsumerHandler(msgTransferDatabase,config) + historyMongoHandler := NewOnlineHistoryMongoConsumerHandler(msgTransferDatabase, config) msgTransfer := &MsgTransfer{ historyConsumer: historyConsumer, @@ -161,8 +161,8 @@ func (m *MsgTransfer) Start(ctx context.Context) error { }() go func() { - fn := func(ctx context.Context, key string, value []byte) error { - m.historyMongoHandler.HandleChatWs2Mongo(ctx, key, value) + fn := func(msg mq.Message) error { + m.historyMongoHandler.HandleChatWs2Mongo(msg) return nil } for { diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 05775a1e6..8b212774a 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "errors" + "github.com/openimsdk/tools/mq" "sync" "time" @@ -77,6 +78,7 @@ type ConsumerMessage struct { Ctx context.Context Key string Value []byte + Raw mq.Message } func NewOnlineHistoryRedisConsumerHandler(ctx context.Context, client discovery.Conn, config *Config, database controller.MsgTransferDatabase) (*OnlineHistoryRedisConsumerHandler, error) { @@ -113,6 +115,11 @@ func NewOnlineHistoryRedisConsumerHandler(ctx context.Context, client discovery. b.Do = och.do och.redisMessageBatches = b + och.redisMessageBatches.OnComplete = func(lastMessage *ConsumerMessage, totalCount int) { + lastMessage.Raw.Mark() + lastMessage.Raw.Commit() + } + return &och, nil } func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID int, val *batcher.Msg[ConsumerMessage]) { @@ -388,10 +395,10 @@ func withAggregationCtx(ctx context.Context, values []*ContextMsg) context.Conte return mcontext.SetOperationID(ctx, allMessageOperationID) } -func (och *OnlineHistoryRedisConsumerHandler) HandlerRedisMessage(ctx context.Context, key string, value []byte) error { // a instance in the consumer group - err := och.redisMessageBatches.Put(ctx, &ConsumerMessage{Ctx: ctx, Key: key, Value: value}) +func (och *OnlineHistoryRedisConsumerHandler) HandlerRedisMessage(msg mq.Message) error { // a instance in the consumer group + err := och.redisMessageBatches.Put(msg.Context(), &ConsumerMessage{Ctx: msg.Context(), Key: msg.Key(), Value: msg.Value(), Raw: msg}) if err != nil { - log.ZWarn(ctx, "put msg to error", err, "key", key, "value", value) + log.ZWarn(msg.Context(), "put msg to error", err, "key", msg.Key(), "value", msg.Value()) } return nil } diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index 6c1498f82..8611af7ea 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -15,12 +15,12 @@ package msgtransfer import ( - "context" + "github.com/openimsdk/protocol/constant" + "github.com/openimsdk/tools/mq" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" - "github.com/openimsdk/protocol/constant" pbmsg "github.com/openimsdk/protocol/msg" "github.com/openimsdk/tools/log" "google.golang.org/protobuf/proto" @@ -40,7 +40,10 @@ func NewOnlineHistoryMongoConsumerHandler(database controller.MsgTransferDatabas } } -func (mc *OnlineHistoryMongoConsumerHandler) HandleChatWs2Mongo(ctx context.Context, key string, msg []byte) { +func (mc *OnlineHistoryMongoConsumerHandler) HandleChatWs2Mongo(val mq.Message) { + ctx := val.Context() + key := val.Key() + msg := val.Value() msgFromMQ := pbmsg.MsgDataToMongoByMQ{} err := proto.Unmarshal(msg, &msgFromMQ) if err != nil { @@ -58,6 +61,7 @@ func (mc *OnlineHistoryMongoConsumerHandler) HandleChatWs2Mongo(ctx context.Cont prommetrics.MsgInsertMongoFailedCounter.Inc() } else { prommetrics.MsgInsertMongoSuccessCounter.Inc() + val.Mark() } for _, msgData := range msgFromMQ.MsgData { diff --git a/internal/push/push.go b/internal/push/push.go index 1d6f8cb30..bf95b6acc 100644 --- a/internal/push/push.go +++ b/internal/push/push.go @@ -2,6 +2,7 @@ package push import ( "context" + "github.com/openimsdk/tools/mq" "math/rand" "strconv" @@ -106,8 +107,8 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg go func() { pushHandler.WaitCache() - fn := func(ctx context.Context, key string, value []byte) error { - pushHandler.HandleMs2PsChat(authverify.WithTempAdmin(ctx), value) + fn := func(msg mq.Message) error { + pushHandler.HandleMs2PsChat(authverify.WithTempAdmin(msg.Context()), msg.Value()) return nil } consumerCtx := mcontext.SetOperationID(context.Background(), "push_"+strconv.Itoa(int(rand.Uint32()))) @@ -121,8 +122,8 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg }() go func() { - fn := func(ctx context.Context, key string, value []byte) error { - offlineHandler.HandleMsg2OfflinePush(ctx, value) + fn := func(msg mq.Message) error { + offlineHandler.HandleMsg2OfflinePush(msg.Context(), msg.Value()) return nil } consumerCtx := mcontext.SetOperationID(context.Background(), "push_"+strconv.Itoa(int(rand.Uint32()))) From d9c3504afd6bb4f8a93154215995d8acbde01d7f Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Fri, 1 Aug 2025 10:13:42 +0800 Subject: [PATCH 2/4] fix: admin token in standalone mode --- internal/api/router.go | 5 +++++ pkg/common/storage/cache/redis/token.go | 9 ++++----- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/internal/api/router.go b/internal/api/router.go index 8a4199581..1d3a92dd7 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -97,6 +97,11 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, cf case BestSpeed: r.Use(gzip.Gzip(gzip.BestSpeed)) } + if config.Standalone() { + r.Use(func(c *gin.Context) { + c.Set(authverify.CtxAdminUserIDsKey, cfg.Share.IMAdminUser.UserIDs) + }) + } r.Use(api.GinLogger(), prommetricsGin(), gin.RecoveryWithWriter(gin.DefaultErrorWriter, mw.GinPanicErr), mw.CorsHandler(), mw.GinParseOperationID(), GinParseToken(rpcli.NewAuthClient(authConn)), setGinIsAdmin(cfg.Share.IMAdminUser.UserIDs)) diff --git a/pkg/common/storage/cache/redis/token.go b/pkg/common/storage/cache/redis/token.go index b3870daee..c74ccce66 100644 --- a/pkg/common/storage/cache/redis/token.go +++ b/pkg/common/storage/cache/redis/token.go @@ -165,16 +165,15 @@ func (c *tokenCache) DeleteTokenByTokenMap(ctx context.Context, userID string, t } func (c *tokenCache) DeleteAndSetTemporary(ctx context.Context, userID string, platformID int, fields []string) error { - key := cachekey.GetTokenKey(userID, platformID) - if err := c.rdb.HDel(ctx, key, fields...).Err(); err != nil { - return errs.Wrap(err) - } for _, f := range fields { k := cachekey.GetTemporaryTokenKey(userID, platformID, f) if err := c.rdb.Set(ctx, k, "", time.Minute*5).Err(); err != nil { return errs.Wrap(err) } } - + key := cachekey.GetTokenKey(userID, platformID) + if err := c.rdb.HDel(ctx, key, fields...).Err(); err != nil { + return errs.Wrap(err) + } return nil } From 9a1d2a85cdb555d7eef4e79d2c866dacdfbd86ae Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Wed, 15 Oct 2025 10:10:29 +0800 Subject: [PATCH 3/4] fix: full id version --- internal/rpc/conversation/sync.go | 2 +- internal/rpc/group/sync.go | 4 ++-- internal/rpc/relation/sync.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/rpc/conversation/sync.go b/internal/rpc/conversation/sync.go index a24dd85c6..85128f719 100644 --- a/internal/rpc/conversation/sync.go +++ b/internal/rpc/conversation/sync.go @@ -27,7 +27,7 @@ func (c *conversationServer) GetFullOwnerConversationIDs(ctx context.Context, re conversationIDs = nil } return &conversation.GetFullOwnerConversationIDsResp{ - Version: idHash, + Version: uint64(vl.Version), VersionID: vl.ID.Hex(), Equal: req.IdHash == idHash, ConversationIDs: conversationIDs, diff --git a/internal/rpc/group/sync.go b/internal/rpc/group/sync.go index b864fbf53..92c7a60ce 100644 --- a/internal/rpc/group/sync.go +++ b/internal/rpc/group/sync.go @@ -34,7 +34,7 @@ func (g *groupServer) GetFullGroupMemberUserIDs(ctx context.Context, req *pbgrou userIDs = nil } return &pbgroup.GetFullGroupMemberUserIDsResp{ - Version: idHash, + Version: uint64(vl.Version), VersionID: vl.ID.Hex(), Equal: req.IdHash == idHash, UserIDs: userIDs, @@ -58,7 +58,7 @@ func (g *groupServer) GetFullJoinGroupIDs(ctx context.Context, req *pbgroup.GetF groupIDs = nil } return &pbgroup.GetFullJoinGroupIDsResp{ - Version: idHash, + Version: uint64(vl.Version), VersionID: vl.ID.Hex(), Equal: req.IdHash == idHash, GroupIDs: groupIDs, diff --git a/internal/rpc/relation/sync.go b/internal/rpc/relation/sync.go index 79fa0858c..187f6238d 100644 --- a/internal/rpc/relation/sync.go +++ b/internal/rpc/relation/sync.go @@ -56,7 +56,7 @@ func (s *friendServer) GetFullFriendUserIDs(ctx context.Context, req *relation.G userIDs = nil } return &relation.GetFullFriendUserIDsResp{ - Version: idHash, + Version: uint64(vl.Version), VersionID: vl.ID.Hex(), Equal: req.IdHash == idHash, UserIDs: userIDs, From ebda95fb11abdddd52ee645dcb2d0697ac9b8e35 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Fri, 12 Dec 2025 15:23:45 +0800 Subject: [PATCH 4/4] fix: resolve deadlock in cache eviction and improve GetBatch implementation --- pkg/localcache/cache.go | 27 ++++++----- pkg/localcache/cache_test.go | 67 ++++++++++++++++++++++++++++ pkg/localcache/init.go | 4 -- pkg/localcache/lru/lru_expiration.go | 49 +++++++++++++++++++- pkg/localcache/lru/lru_slot.go | 6 +-- 5 files changed, 133 insertions(+), 20 deletions(-) diff --git a/pkg/localcache/cache.go b/pkg/localcache/cache.go index 07d36cf46..b2376d6f1 100644 --- a/pkg/localcache/cache.go +++ b/pkg/localcache/cache.go @@ -47,15 +47,15 @@ func New[V any](opts ...Option) Cache[V] { if opt.localSlotNum > 0 && opt.localSlotSize > 0 { createSimpleLRU := func() lru.LRU[string, V] { if opt.expirationEvict { - return lru.NewExpirationLRU(opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict) + return lru.NewExpirationLRU[string, V](opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict) } else { - return lru.NewLazyLRU(opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict) + return lru.NewLazyLRU[string, V](opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict) } } if opt.localSlotNum == 1 { c.local = createSimpleLRU() } else { - c.local = lru.NewSlotLRU(opt.localSlotNum, LRUStringHash, createSimpleLRU) + c.local = lru.NewSlotLRU[string, V](opt.localSlotNum, LRUStringHash, createSimpleLRU) } if opt.linkSlotNum > 0 { c.link = link.New(opt.linkSlotNum) @@ -71,14 +71,19 @@ type cache[V any] struct { } func (c *cache[V]) onEvict(key string, value V) { - _ = value - if c.link != nil { - lks := c.link.Del(key) - for k := range lks { - if key != k { // prevent deadlock - c.local.Del(k) - } + // Do not delete other keys while the underlying LRU still holds its lock; + // defer linked deletions to avoid re-entering the same slot and deadlocking. + if lks := c.link.Del(key); len(lks) > 0 { + go c.delLinked(key, lks) + } + } +} + +func (c *cache[V]) delLinked(src string, keys map[string]struct{}) { + for k := range keys { + if src != k { + c.local.Del(k) } } } @@ -105,7 +110,7 @@ func (c *cache[V]) Get(ctx context.Context, key string, fetch func(ctx context.C func (c *cache[V]) GetLink(ctx context.Context, key string, fetch func(ctx context.Context) (V, error), link ...string) (V, error) { if c.local != nil { return c.local.Get(key, func() (V, error) { - if len(link) > 0 { + if len(link) > 0 && c.link != nil { c.link.Link(key, link...) } return fetch(ctx) diff --git a/pkg/localcache/cache_test.go b/pkg/localcache/cache_test.go index c206e6799..13eb20797 100644 --- a/pkg/localcache/cache_test.go +++ b/pkg/localcache/cache_test.go @@ -22,6 +22,8 @@ import ( "sync/atomic" "testing" "time" + + "github.com/openimsdk/open-im-server/v3/pkg/localcache/lru" ) func TestName(t *testing.T) { @@ -91,3 +93,68 @@ func TestName(t *testing.T) { t.Log("del", del.Load()) // 137.35s } + +// Test deadlock scenario when eviction callback deletes a linked key that hashes to the same slot. +func TestCacheEvictDeadlock(t *testing.T) { + ctx := context.Background() + c := New[string](WithLocalSlotNum(1), WithLocalSlotSize(1), WithLazy()) + + if _, err := c.GetLink(ctx, "k1", func(ctx context.Context) (string, error) { + return "v1", nil + }, "k2"); err != nil { + t.Fatalf("seed cache failed: %v", err) + } + + done := make(chan struct{}) + go func() { + defer close(done) + _, _ = c.GetLink(ctx, "k2", func(ctx context.Context) (string, error) { + return "v2", nil + }, "k1") + }() + + select { + case <-done: + // expected to finish quickly; current implementation deadlocks here. + case <-time.After(time.Second): + t.Fatal("GetLink deadlocked during eviction of linked key") + } +} + +func TestExpirationLRUGetBatch(t *testing.T) { + l := lru.NewExpirationLRU[string, string](2, time.Minute, time.Second*5, EmptyTarget{}, nil) + + keys := []string{"a", "b"} + values, err := l.GetBatch(keys, func(keys []string) (map[string]string, error) { + res := make(map[string]string) + for _, k := range keys { + res[k] = k + "_v" + } + return res, nil + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(values) != len(keys) { + t.Fatalf("expected %d values, got %d", len(keys), len(values)) + } + for _, k := range keys { + if v, ok := values[k]; !ok || v != k+"_v" { + t.Fatalf("unexpected value for %s: %q, ok=%v", k, v, ok) + } + } + + // second batch should hit cache + values, err = l.GetBatch(keys, func(keys []string) (map[string]string, error) { + t.Fatalf("should not fetch on cache hit") + return nil, nil + }) + if err != nil { + t.Fatalf("unexpected error on cache hit: %v", err) + } + for _, k := range keys { + if v, ok := values[k]; !ok || v != k+"_v" { + t.Fatalf("unexpected cached value for %s: %q, ok=%v", k, v, ok) + } + } +} diff --git a/pkg/localcache/init.go b/pkg/localcache/init.go index ad339da7c..d0bccaa7e 100644 --- a/pkg/localcache/init.go +++ b/pkg/localcache/init.go @@ -33,10 +33,6 @@ func InitLocalCache(localCache *config.LocalCache) { Local config.CacheConfig Keys []string }{ - { - Local: localCache.Auth, - Keys: []string{cachekey.UidPidToken}, - }, { Local: localCache.User, Keys: []string{cachekey.UserInfoKey, cachekey.UserGlobalRecvMsgOptKey}, diff --git a/pkg/localcache/lru/lru_expiration.go b/pkg/localcache/lru/lru_expiration.go index df6bacbf4..4197cacec 100644 --- a/pkg/localcache/lru/lru_expiration.go +++ b/pkg/localcache/lru/lru_expiration.go @@ -52,8 +52,53 @@ type ExpirationLRU[K comparable, V any] struct { } func (x *ExpirationLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)) (map[K]V, error) { - //TODO implement me - panic("implement me") + var ( + err error + results = make(map[K]V) + misses = make([]K, 0, len(keys)) + ) + + for _, key := range keys { + x.lock.Lock() + v, ok := x.core.Get(key) + x.lock.Unlock() + if ok { + x.target.IncrGetHit() + v.lock.RLock() + results[key] = v.value + if v.err != nil && err == nil { + err = v.err + } + v.lock.RUnlock() + continue + } + misses = append(misses, key) + } + + if len(misses) == 0 { + return results, err + } + + fetchValues, fetchErr := fetch(misses) + if fetchErr != nil && err == nil { + err = fetchErr + } + + for key, val := range fetchValues { + results[key] = val + if fetchErr != nil { + x.target.IncrGetFailed() + continue + } + x.target.IncrGetSuccess() + item := &expirationLruItem[V]{value: val} + x.lock.Lock() + x.core.Add(key, item) + x.lock.Unlock() + } + + // any keys not returned from fetch remain absent (no cache write) + return results, err } func (x *ExpirationLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) { diff --git a/pkg/localcache/lru/lru_slot.go b/pkg/localcache/lru/lru_slot.go index 14ee3b50f..077219b75 100644 --- a/pkg/localcache/lru/lru_slot.go +++ b/pkg/localcache/lru/lru_slot.go @@ -35,7 +35,7 @@ type slotLRU[K comparable, V any] struct { func (x *slotLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)) (map[K]V, error) { var ( slotKeys = make(map[uint64][]K) - kVs = make(map[K]V) + vs = make(map[K]V) ) for _, k := range keys { @@ -49,10 +49,10 @@ func (x *slotLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error) return nil, err } for key, value := range batches { - kVs[key] = value + vs[key] = value } } - return kVs, nil + return vs, nil } func (x *slotLRU[K, V]) getIndex(k K) uint64 {