From ec6c840165ea622f8e7ffb74e99bca50064a88ba Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Mon, 10 Feb 2025 16:21:13 +0800 Subject: [PATCH] 1 --- internal/rpc/conversation/conversation.go | 2 +- internal/rpc/relation/friend.go | 4 +- pkg/common/storage/cache/redis/batch.go | 63 +++++-- .../storage/cache/redis/batch_handler.go | 79 +++------ pkg/common/storage/cache/redis/black.go | 31 +--- .../storage/cache/redis/conversation.go | 35 +--- pkg/common/storage/cache/redis/doc.go | 15 -- pkg/common/storage/cache/redis/friend.go | 29 +--- pkg/common/storage/cache/redis/group.go | 38 +--- pkg/common/storage/cache/redis/minio.go | 59 +++++++ pkg/common/storage/cache/redis/msg.go | 20 +-- .../cache/redis/redis_shard_manager.go | 164 +++++++++--------- pkg/common/storage/cache/redis/s3.go | 86 ++------- .../storage/cache/redis/seq_conversation.go | 32 ++-- pkg/common/storage/cache/redis/seq_user.go | 20 +-- pkg/common/storage/cache/redis/third.go | 17 +- pkg/common/storage/cache/redis/todo.go | 3 + pkg/common/storage/cache/redis/user.go | 28 +-- pkg/common/storage/controller/group.go | 2 +- 19 files changed, 298 insertions(+), 429 deletions(-) delete mode 100644 pkg/common/storage/cache/redis/doc.go create mode 100644 pkg/common/storage/cache/redis/minio.go create mode 100644 pkg/common/storage/cache/redis/todo.go diff --git a/internal/rpc/conversation/conversation.go b/internal/rpc/conversation/conversation.go index c7958f021..359c28303 100644 --- a/internal/rpc/conversation/conversation.go +++ b/internal/rpc/conversation/conversation.go @@ -96,7 +96,7 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, server gr pbconversation.RegisterConversationServer(server, &conversationServer{ conversationNotificationSender: NewConversationNotificationSender(&config.NotificationConfig, msgClient), conversationDatabase: controller.NewConversationDatabase(conversationDB, - redis.NewConversationRedis(rdb, &config.LocalCacheConfig, redis.GetRocksCacheOptions(), conversationDB), mgocli.GetTx()), + redis.NewConversationRedis(rdb, &config.LocalCacheConfig, conversationDB), mgocli.GetTx()), userClient: rpcli.NewUserClient(userConn), groupClient: rpcli.NewGroupClient(groupConn), msgClient: msgClient, diff --git a/internal/rpc/relation/friend.go b/internal/rpc/relation/friend.go index 464275f75..aaf767508 100644 --- a/internal/rpc/relation/friend.go +++ b/internal/rpc/relation/friend.go @@ -115,12 +115,12 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, server gr db: controller.NewFriendDatabase( friendMongoDB, friendRequestMongoDB, - redis.NewFriendCacheRedis(rdb, &config.LocalCacheConfig, friendMongoDB, redis.GetRocksCacheOptions()), + redis.NewFriendCacheRedis(rdb, &config.LocalCacheConfig, friendMongoDB), mgocli.GetTx(), ), blackDatabase: controller.NewBlackDatabase( blackMongoDB, - redis.NewBlackCacheRedis(rdb, &config.LocalCacheConfig, blackMongoDB, redis.GetRocksCacheOptions()), + redis.NewBlackCacheRedis(rdb, &config.LocalCacheConfig, blackMongoDB), ), notificationSender: notificationSender, RegisterCenter: client, diff --git a/pkg/common/storage/cache/redis/batch.go b/pkg/common/storage/cache/redis/batch.go index 1810ac993..7f9205bab 100644 --- a/pkg/common/storage/cache/redis/batch.go +++ b/pkg/common/storage/cache/redis/batch.go @@ -3,28 +3,65 @@ package redis import ( "context" "encoding/json" + "time" + "github.com/dtm-labs/rockscache" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/redis/go-redis/v9" - "golang.org/x/sync/singleflight" - "time" - "unsafe" ) -func getRocksCacheRedisClient(cli *rockscache.Client) redis.UniversalClient { - type Client struct { - rdb redis.UniversalClient - _ rockscache.Options - _ singleflight.Group - } - return (*Client)(unsafe.Pointer(cli)).rdb +// GetRocksCacheOptions returns the default configuration options for RocksCache. +func GetRocksCacheOptions() *rockscache.Options { + opts := rockscache.NewDefaultOptions() + opts.LockExpire = rocksCacheTimeout + opts.WaitReplicasTimeout = rocksCacheTimeout + opts.StrongConsistency = true + opts.RandomExpireAdjustment = 0.2 + + return &opts } -func batchGetCache2[K comparable, V any](ctx context.Context, rcClient *rockscache.Client, expire time.Duration, ids []K, idKey func(id K) string, vId func(v *V) K, fn func(ctx context.Context, ids []K) ([]*V, error)) ([]*V, error) { +func newRocksCacheClient(rdb redis.UniversalClient) *rocksCacheClient { + if rdb == nil { + return &rocksCacheClient{} + } + rc := &rocksCacheClient{ + rdb: rdb, + client: rockscache.NewClient(rdb, *GetRocksCacheOptions()), + } + return rc +} + +type rocksCacheClient struct { + rdb redis.UniversalClient + client *rockscache.Client +} + +func (x *rocksCacheClient) GetClient() *rockscache.Client { + return x.client +} + +func (x *rocksCacheClient) Disable() bool { + return x.client == nil +} + +func (x *rocksCacheClient) GetRedis() redis.UniversalClient { + return x.rdb +} + +func (x *rocksCacheClient) GetBatchDeleter(topics ...string) cache.BatchDeleter { + return NewBatchDeleterRedis(x, topics) +} + +func batchGetCache2[K comparable, V any](ctx context.Context, rcClient *rocksCacheClient, expire time.Duration, ids []K, idKey func(id K) string, vId func(v *V) K, fn func(ctx context.Context, ids []K) ([]*V, error)) ([]*V, error) { if len(ids) == 0 { return nil, nil } + if rcClient.Disable() { + return fn(ctx, ids) + } findKeys := make([]string, 0, len(ids)) keyId := make(map[string]K) for _, id := range ids { @@ -35,13 +72,13 @@ func batchGetCache2[K comparable, V any](ctx context.Context, rcClient *rockscac keyId[key] = id findKeys = append(findKeys, key) } - slotKeys, err := groupKeysBySlot(ctx, getRocksCacheRedisClient(rcClient), findKeys) + slotKeys, err := groupKeysBySlot(ctx, rcClient.GetRedis(), findKeys) if err != nil { return nil, err } result := make([]*V, 0, len(findKeys)) for _, keys := range slotKeys { - indexCache, err := rcClient.FetchBatch2(ctx, keys, expire, func(idx []int) (map[int]string, error) { + indexCache, err := rcClient.GetClient().FetchBatch2(ctx, keys, expire, func(idx []int) (map[int]string, error) { queryIds := make([]K, 0, len(idx)) idIndex := make(map[K]int) for _, index := range idx { diff --git a/pkg/common/storage/cache/redis/batch_handler.go b/pkg/common/storage/cache/redis/batch_handler.go index 420ebdf77..893ba8abb 100644 --- a/pkg/common/storage/cache/redis/batch_handler.go +++ b/pkg/common/storage/cache/redis/batch_handler.go @@ -1,23 +1,11 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - package redis import ( "context" "encoding/json" "fmt" + "time" + "github.com/dtm-labs/rockscache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/localcache" @@ -25,7 +13,6 @@ import ( "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/utils/datautil" "github.com/redis/go-redis/v9" - "time" ) const ( @@ -41,10 +28,10 @@ type BatchDeleterRedis struct { } // NewBatchDeleterRedis creates a new BatchDeleterRedis instance. -func NewBatchDeleterRedis(redisClient redis.UniversalClient, options *rockscache.Options, redisPubTopics []string) *BatchDeleterRedis { +func NewBatchDeleterRedis(rcClient *rocksCacheClient, redisPubTopics []string) *BatchDeleterRedis { return &BatchDeleterRedis{ - redisClient: redisClient, - rocksClient: rockscache.NewClient(redisClient, *options), + redisClient: rcClient.GetRedis(), + rocksClient: rcClient.GetClient(), redisPubTopics: redisPubTopics, } } @@ -107,21 +94,29 @@ func (c *BatchDeleterRedis) AddKeys(keys ...string) { c.keys = append(c.keys, keys...) } -// GetRocksCacheOptions returns the default configuration options for RocksCache. -func GetRocksCacheOptions() *rockscache.Options { - opts := rockscache.NewDefaultOptions() - opts.LockExpire = rocksCacheTimeout - opts.WaitReplicasTimeout = rocksCacheTimeout - opts.StrongConsistency = true - opts.RandomExpireAdjustment = 0.2 +type disableBatchDeleter struct{} - return &opts +func (x disableBatchDeleter) ChainExecDel(ctx context.Context) error { + return nil } -func getCache[T any](ctx context.Context, rcClient *rockscache.Client, key string, expire time.Duration, fn func(ctx context.Context) (T, error)) (T, error) { +func (x disableBatchDeleter) ExecDelWithKeys(ctx context.Context, keys []string) error { + return nil +} + +func (x disableBatchDeleter) Clone() cache.BatchDeleter { + return x +} + +func (x disableBatchDeleter) AddKeys(keys ...string) {} + +func getCache[T any](ctx context.Context, rcClient *rocksCacheClient, key string, expire time.Duration, fn func(ctx context.Context) (T, error)) (T, error) { + if rcClient.Disable() { + return fn(ctx) + } var t T var write bool - v, err := rcClient.Fetch2(ctx, key, expire, func() (s string, err error) { + v, err := rcClient.GetClient().Fetch2(ctx, key, expire, func() (s string, err error) { t, err = fn(ctx) if err != nil { //log.ZError(ctx, "getCache query database failed", err, "key", key) @@ -152,31 +147,3 @@ func getCache[T any](ctx context.Context, rcClient *rockscache.Client, key strin return t, nil } - -//func batchGetCache[T any, K comparable]( -// ctx context.Context, -// rcClient *rockscache.Client, -// expire time.Duration, -// keys []K, -// keyFn func(key K) string, -// fns func(ctx context.Context, key K) (T, error), -//) ([]T, error) { -// if len(keys) == 0 { -// return nil, nil -// } -// res := make([]T, 0, len(keys)) -// for _, key := range keys { -// val, err := getCache(ctx, rcClient, keyFn(key), expire, func(ctx context.Context) (T, error) { -// return fns(ctx, key) -// }) -// if err != nil { -// if errs.ErrRecordNotFound.Is(specialerror.ErrCode(errs.Unwrap(err))) { -// continue -// } -// return nil, errs.Wrap(err) -// } -// res = append(res, val) -// } -// -// return res, nil -//} diff --git a/pkg/common/storage/cache/redis/black.go b/pkg/common/storage/cache/redis/black.go index fac6dbe6f..f83399ee1 100644 --- a/pkg/common/storage/cache/redis/black.go +++ b/pkg/common/storage/cache/redis/black.go @@ -1,29 +1,14 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - package redis import ( "context" - "github.com/dtm-labs/rockscache" + "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" - "github.com/openimsdk/tools/log" "github.com/redis/go-redis/v9" - "time" ) const ( @@ -33,18 +18,16 @@ const ( type BlackCacheRedis struct { cache.BatchDeleter expireTime time.Duration - rcClient *rockscache.Client + rcClient *rocksCacheClient blackDB database.Black } -func NewBlackCacheRedis(rdb redis.UniversalClient, localCache *config.LocalCache, blackDB database.Black, options *rockscache.Options) cache.BlackCache { - batchHandler := NewBatchDeleterRedis(rdb, options, []string{localCache.Friend.Topic}) - b := localCache.Friend - log.ZDebug(context.Background(), "black local cache init", "Topic", b.Topic, "SlotNum", b.SlotNum, "SlotSize", b.SlotSize, "enable", b.Enable()) +func NewBlackCacheRedis(rdb redis.UniversalClient, localCache *config.LocalCache, blackDB database.Black) cache.BlackCache { + rc := newRocksCacheClient(rdb) return &BlackCacheRedis{ - BatchDeleter: batchHandler, + BatchDeleter: rc.GetBatchDeleter(localCache.Friend.Topic), expireTime: blackExpireTime, - rcClient: rockscache.NewClient(rdb, *options), + rcClient: rc, blackDB: blackDB, } } diff --git a/pkg/common/storage/cache/redis/conversation.go b/pkg/common/storage/cache/redis/conversation.go index 91d8ed69d..3453b570d 100644 --- a/pkg/common/storage/cache/redis/conversation.go +++ b/pkg/common/storage/cache/redis/conversation.go @@ -1,47 +1,30 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - package redis import ( "context" - "github.com/dtm-labs/rockscache" + "math/big" + "strings" + "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" - "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/encrypt" "github.com/redis/go-redis/v9" - "math/big" - "strings" - "time" ) const ( conversationExpireTime = time.Second * 60 * 60 * 12 ) -func NewConversationRedis(rdb redis.UniversalClient, localCache *config.LocalCache, opts *rockscache.Options, db database.Conversation) cache.ConversationCache { - batchHandler := NewBatchDeleterRedis(rdb, opts, []string{localCache.Conversation.Topic}) - c := localCache.Conversation - log.ZDebug(context.Background(), "conversation local cache init", "Topic", c.Topic, "SlotNum", c.SlotNum, "SlotSize", c.SlotSize, "enable", c.Enable()) +func NewConversationRedis(rdb redis.UniversalClient, localCache *config.LocalCache, db database.Conversation) cache.ConversationCache { + rc := newRocksCacheClient(rdb) return &ConversationRedisCache{ - BatchDeleter: batchHandler, - rcClient: rockscache.NewClient(rdb, *opts), + BatchDeleter: rc.GetBatchDeleter(localCache.Conversation.Topic), + rcClient: rc, conversationDB: db, expireTime: conversationExpireTime, } @@ -49,7 +32,7 @@ func NewConversationRedis(rdb redis.UniversalClient, localCache *config.LocalCac type ConversationRedisCache struct { cache.BatchDeleter - rcClient *rockscache.Client + rcClient *rocksCacheClient conversationDB database.Conversation expireTime time.Duration } diff --git a/pkg/common/storage/cache/redis/doc.go b/pkg/common/storage/cache/redis/doc.go deleted file mode 100644 index 4c2fcacd1..000000000 --- a/pkg/common/storage/cache/redis/doc.go +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package redis diff --git a/pkg/common/storage/cache/redis/friend.go b/pkg/common/storage/cache/redis/friend.go index be4687794..7618f361f 100644 --- a/pkg/common/storage/cache/redis/friend.go +++ b/pkg/common/storage/cache/redis/friend.go @@ -1,30 +1,14 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - package redis import ( "context" "time" - "github.com/dtm-labs/rockscache" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" - "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/utils/datautil" "github.com/redis/go-redis/v9" ) @@ -38,21 +22,18 @@ type FriendCacheRedis struct { cache.BatchDeleter friendDB database.Friend expireTime time.Duration - rcClient *rockscache.Client + rcClient *rocksCacheClient syncCount int } // NewFriendCacheRedis creates a new instance of FriendCacheRedis. -func NewFriendCacheRedis(rdb redis.UniversalClient, localCache *config.LocalCache, friendDB database.Friend, - options *rockscache.Options) cache.FriendCache { - batchHandler := NewBatchDeleterRedis(rdb, options, []string{localCache.Friend.Topic}) - f := localCache.Friend - log.ZDebug(context.Background(), "friend local cache init", "Topic", f.Topic, "SlotNum", f.SlotNum, "SlotSize", f.SlotSize, "enable", f.Enable()) +func NewFriendCacheRedis(rdb redis.UniversalClient, localCache *config.LocalCache, friendDB database.Friend) cache.FriendCache { + rc := newRocksCacheClient(rdb) return &FriendCacheRedis{ - BatchDeleter: batchHandler, + BatchDeleter: rc.GetBatchDeleter(localCache.Friend.Topic), friendDB: friendDB, expireTime: friendExpireTime, - rcClient: rockscache.NewClient(rdb, *options), + rcClient: rc, } } diff --git a/pkg/common/storage/cache/redis/group.go b/pkg/common/storage/cache/redis/group.go index 736111df3..d66716404 100644 --- a/pkg/common/storage/cache/redis/group.go +++ b/pkg/common/storage/cache/redis/group.go @@ -1,17 +1,3 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - package redis import ( @@ -19,7 +5,6 @@ import ( "fmt" "time" - "github.com/dtm-labs/rockscache" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" @@ -36,34 +21,21 @@ const ( groupExpireTime = time.Second * 60 * 60 * 12 ) -var errIndex = errs.New("err index") - type GroupCacheRedis struct { cache.BatchDeleter groupDB database.Group groupMemberDB database.GroupMember groupRequestDB database.GroupRequest expireTime time.Duration - rcClient *rockscache.Client + rcClient *rocksCacheClient groupHash cache.GroupHash } -func NewGroupCacheRedis( - rdb redis.UniversalClient, - localCache *config.LocalCache, - groupDB database.Group, - groupMemberDB database.GroupMember, - groupRequestDB database.GroupRequest, - hashCode cache.GroupHash, - opts *rockscache.Options, -) cache.GroupCache { - batchHandler := NewBatchDeleterRedis(rdb, opts, []string{localCache.Group.Topic}) - g := localCache.Group - log.ZDebug(context.Background(), "group local cache init", "Topic", g.Topic, "SlotNum", g.SlotNum, "SlotSize", g.SlotSize, "enable", g.Enable()) - +func NewGroupCacheRedis(rdb redis.UniversalClient, localCache *config.LocalCache, groupDB database.Group, groupMemberDB database.GroupMember, groupRequestDB database.GroupRequest, hashCode cache.GroupHash) cache.GroupCache { + rc := newRocksCacheClient(rdb) return &GroupCacheRedis{ - BatchDeleter: batchHandler, - rcClient: rockscache.NewClient(rdb, *opts), + BatchDeleter: rc.GetBatchDeleter(localCache.Group.Topic), + rcClient: rc, expireTime: groupExpireTime, groupDB: groupDB, groupMemberDB: groupMemberDB, diff --git a/pkg/common/storage/cache/redis/minio.go b/pkg/common/storage/cache/redis/minio.go new file mode 100644 index 000000000..17bd6ec03 --- /dev/null +++ b/pkg/common/storage/cache/redis/minio.go @@ -0,0 +1,59 @@ +package redis + +import ( + "context" + "time" + + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" + "github.com/openimsdk/tools/s3/minio" + "github.com/redis/go-redis/v9" +) + +func NewMinioCache(rdb redis.UniversalClient) minio.Cache { + rc := newRocksCacheClient(rdb) + return &minioCacheRedis{ + BatchDeleter: rc.GetBatchDeleter(), + rcClient: rc, + expireTime: time.Hour * 24 * 7, + } +} + +type minioCacheRedis struct { + cache.BatchDeleter + rcClient *rocksCacheClient + expireTime time.Duration +} + +func (g *minioCacheRedis) getObjectImageInfoKey(key string) string { + return cachekey.GetObjectImageInfoKey(key) +} + +func (g *minioCacheRedis) getMinioImageThumbnailKey(key string, format string, width int, height int) string { + return cachekey.GetMinioImageThumbnailKey(key, format, width, height) +} + +func (g *minioCacheRedis) DelObjectImageInfoKey(ctx context.Context, keys ...string) error { + ks := make([]string, 0, len(keys)) + for _, key := range keys { + ks = append(ks, g.getObjectImageInfoKey(key)) + } + return g.BatchDeleter.ExecDelWithKeys(ctx, ks) +} + +func (g *minioCacheRedis) DelImageThumbnailKey(ctx context.Context, key string, format string, width int, height int) error { + return g.BatchDeleter.ExecDelWithKeys(ctx, []string{g.getMinioImageThumbnailKey(key, format, width, height)}) + +} + +func (g *minioCacheRedis) GetImageObjectKeyInfo(ctx context.Context, key string, fn func(ctx context.Context) (*minio.ImageInfo, error)) (*minio.ImageInfo, error) { + info, err := getCache(ctx, g.rcClient, g.getObjectImageInfoKey(key), g.expireTime, fn) + if err != nil { + return nil, err + } + return info, nil +} + +func (g *minioCacheRedis) GetThumbnailKey(ctx context.Context, key string, format string, width int, height int, minioCache func(ctx context.Context) (string, error)) (string, error) { + return getCache(ctx, g.rcClient, g.getMinioImageThumbnailKey(key, format, width, height), g.expireTime, minioCache) +} diff --git a/pkg/common/storage/cache/redis/msg.go b/pkg/common/storage/cache/redis/msg.go index 0651f0283..dfe6ca04d 100644 --- a/pkg/common/storage/cache/redis/msg.go +++ b/pkg/common/storage/cache/redis/msg.go @@ -3,7 +3,8 @@ package redis import ( "context" "encoding/json" - "github.com/dtm-labs/rockscache" + "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" @@ -11,7 +12,6 @@ import ( "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/utils/datautil" "github.com/redis/go-redis/v9" - "time" ) // // msgCacheTimeout is expiration time of message cache, 86400 seconds @@ -19,15 +19,13 @@ const msgCacheTimeout = time.Hour * 24 func NewMsgCache(client redis.UniversalClient, db database.Msg) cache.MsgCache { return &msgCache{ - rdb: client, - rcClient: rockscache.NewClient(client, *GetRocksCacheOptions()), + rcClient: newRocksCacheClient(client), msgDocDatabase: db, } } type msgCache struct { - rdb redis.UniversalClient - rcClient *rockscache.Client + rcClient *rocksCacheClient msgDocDatabase database.Msg } @@ -36,11 +34,11 @@ func (c *msgCache) getSendMsgKey(id string) string { } func (c *msgCache) SetSendMsgStatus(ctx context.Context, id string, status int32) error { - return errs.Wrap(c.rdb.Set(ctx, c.getSendMsgKey(id), status, time.Hour*24).Err()) + return errs.Wrap(c.rcClient.GetRedis().Set(ctx, c.getSendMsgKey(id), status, time.Hour*24).Err()) } func (c *msgCache) GetSendMsgStatus(ctx context.Context, id string) (int32, error) { - result, err := c.rdb.Get(ctx, c.getSendMsgKey(id)).Int() + result, err := c.rcClient.GetRedis().Get(ctx, c.getSendMsgKey(id)).Int() return int32(result), errs.Wrap(err) } @@ -67,12 +65,12 @@ func (c *msgCache) DelMessageBySeqs(ctx context.Context, conversationID string, keys := datautil.Slice(seqs, func(seq int64) string { return cachekey.GetMsgCacheKey(conversationID, seq) }) - slotKeys, err := groupKeysBySlot(ctx, getRocksCacheRedisClient(c.rcClient), keys) + slotKeys, err := groupKeysBySlot(ctx, c.rcClient.GetRedis(), keys) if err != nil { return err } for _, keys := range slotKeys { - if err := c.rcClient.TagAsDeletedBatch2(ctx, keys); err != nil { + if err := c.rcClient.GetClient().TagAsDeletedBatch2(ctx, keys); err != nil { return err } } @@ -88,7 +86,7 @@ func (c *msgCache) SetMessageBySeqs(ctx context.Context, conversationID string, if err != nil { return err } - if err := c.rcClient.RawSet(ctx, cachekey.GetMsgCacheKey(conversationID, msg.Msg.Seq), string(data), msgCacheTimeout); err != nil { + if err := c.rcClient.GetClient().RawSet(ctx, cachekey.GetMsgCacheKey(conversationID, msg.Msg.Seq), string(data), msgCacheTimeout); err != nil { return err } } diff --git a/pkg/common/storage/cache/redis/redis_shard_manager.go b/pkg/common/storage/cache/redis/redis_shard_manager.go index 17e5fecf6..0a0263892 100644 --- a/pkg/common/storage/cache/redis/redis_shard_manager.go +++ b/pkg/common/storage/cache/redis/redis_shard_manager.go @@ -2,7 +2,7 @@ package redis import ( "context" - "github.com/dtm-labs/rockscache" + "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/redis/go-redis/v9" @@ -28,83 +28,83 @@ type Config struct { // Option is a function type for configuring Config type Option func(c *Config) -// NewRedisShardManager creates a new RedisShardManager instance -func NewRedisShardManager(redisClient redis.UniversalClient, opts ...Option) *RedisShardManager { - config := &Config{ - batchSize: defaultBatchSize, // Default batch size is 50 keys - continueOnError: false, - concurrentLimit: defaultConcurrentLimit, // Default concurrent limit is 3 - } - for _, opt := range opts { - opt(config) - } - rsm := &RedisShardManager{ - redisClient: redisClient, - config: config, - } - return rsm -} - -// WithBatchSize sets the number of keys to process per batch -func WithBatchSize(size int) Option { - return func(c *Config) { - c.batchSize = size - } -} - -// WithContinueOnError sets whether to continue processing on error -func WithContinueOnError(continueOnError bool) Option { - return func(c *Config) { - c.continueOnError = continueOnError - } -} - -// WithConcurrentLimit sets the concurrency limit -func WithConcurrentLimit(limit int) Option { - return func(c *Config) { - c.concurrentLimit = limit - } -} - -// ProcessKeysBySlot groups keys by their Redis cluster hash slots and processes them using the provided function. -func (rsm *RedisShardManager) ProcessKeysBySlot( - ctx context.Context, - keys []string, - processFunc func(ctx context.Context, slot int64, keys []string) error, -) error { - - // Group keys by slot - slots, err := groupKeysBySlot(ctx, rsm.redisClient, keys) - if err != nil { - return err - } - - g, ctx := errgroup.WithContext(ctx) - g.SetLimit(rsm.config.concurrentLimit) - - // Process keys in each slot using the provided function - for slot, singleSlotKeys := range slots { - batches := splitIntoBatches(singleSlotKeys, rsm.config.batchSize) - for _, batch := range batches { - slot, batch := slot, batch // Avoid closure capture issue - g.Go(func() error { - err := processFunc(ctx, slot, batch) - if err != nil { - log.ZWarn(ctx, "Batch processFunc failed", err, "slot", slot, "keys", batch) - if !rsm.config.continueOnError { - return err - } - } - return nil - }) - } - } - - if err := g.Wait(); err != nil { - return err - } - return nil -} +//// NewRedisShardManager creates a new RedisShardManager instance +//func NewRedisShardManager(redisClient redis.UniversalClient, opts ...Option) *RedisShardManager { +// config := &Config{ +// batchSize: defaultBatchSize, // Default batch size is 50 keys +// continueOnError: false, +// concurrentLimit: defaultConcurrentLimit, // Default concurrent limit is 3 +// } +// for _, opt := range opts { +// opt(config) +// } +// rsm := &RedisShardManager{ +// redisClient: redisClient, +// config: config, +// } +// return rsm +//} +// +//// WithBatchSize sets the number of keys to process per batch +//func WithBatchSize(size int) Option { +// return func(c *Config) { +// c.batchSize = size +// } +//} +// +//// WithContinueOnError sets whether to continue processing on error +//func WithContinueOnError(continueOnError bool) Option { +// return func(c *Config) { +// c.continueOnError = continueOnError +// } +//} +// +//// WithConcurrentLimit sets the concurrency limit +//func WithConcurrentLimit(limit int) Option { +// return func(c *Config) { +// c.concurrentLimit = limit +// } +//} +// +//// ProcessKeysBySlot groups keys by their Redis cluster hash slots and processes them using the provided function. +//func (rsm *RedisShardManager) ProcessKeysBySlot( +// ctx context.Context, +// keys []string, +// processFunc func(ctx context.Context, slot int64, keys []string) error, +//) error { +// +// // Group keys by slot +// slots, err := groupKeysBySlot(ctx, rsm.redisClient, keys) +// if err != nil { +// return err +// } +// +// g, ctx := errgroup.WithContext(ctx) +// g.SetLimit(rsm.config.concurrentLimit) +// +// // Process keys in each slot using the provided function +// for slot, singleSlotKeys := range slots { +// batches := splitIntoBatches(singleSlotKeys, rsm.config.batchSize) +// for _, batch := range batches { +// slot, batch := slot, batch // Avoid closure capture issue +// g.Go(func() error { +// err := processFunc(ctx, slot, batch) +// if err != nil { +// log.ZWarn(ctx, "Batch processFunc failed", err, "slot", slot, "keys", batch) +// if !rsm.config.continueOnError { +// return err +// } +// } +// return nil +// }) +// } +// } +// +// if err := g.Wait(); err != nil { +// return err +// } +// return nil +//} // groupKeysBySlot groups keys by their Redis cluster hash slots. func groupKeysBySlot(ctx context.Context, redisClient redis.UniversalClient, keys []string) (map[int64][]string, error) { @@ -197,15 +197,15 @@ func ProcessKeysBySlot( return nil } -func DeleteCacheBySlot(ctx context.Context, rcClient *rockscache.Client, keys []string) error { +func DeleteCacheBySlot(ctx context.Context, rcClient *rocksCacheClient, keys []string) error { switch len(keys) { case 0: return nil case 1: - return rcClient.TagAsDeletedBatch2(ctx, keys) + return rcClient.GetClient().TagAsDeletedBatch2(ctx, keys) default: - return ProcessKeysBySlot(ctx, getRocksCacheRedisClient(rcClient), keys, func(ctx context.Context, slot int64, keys []string) error { - return rcClient.TagAsDeletedBatch2(ctx, keys) + return ProcessKeysBySlot(ctx, rcClient.GetRedis(), keys, func(ctx context.Context, slot int64, keys []string) error { + return rcClient.GetClient().TagAsDeletedBatch2(ctx, keys) }) } } diff --git a/pkg/common/storage/cache/redis/s3.go b/pkg/common/storage/cache/redis/s3.go index 954557aca..37f80a477 100644 --- a/pkg/common/storage/cache/redis/s3.go +++ b/pkg/common/storage/cache/redis/s3.go @@ -1,39 +1,23 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - package redis import ( "context" - "github.com/dtm-labs/rockscache" + "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/tools/s3" "github.com/openimsdk/tools/s3/cont" - "github.com/openimsdk/tools/s3/minio" "github.com/redis/go-redis/v9" - "time" ) func NewObjectCacheRedis(rdb redis.UniversalClient, objDB database.ObjectInfo) cache.ObjectCache { - opts := rockscache.NewDefaultOptions() - batchHandler := NewBatchDeleterRedis(rdb, &opts, nil) + rc := newRocksCacheClient(rdb) return &objectCacheRedis{ - BatchDeleter: batchHandler, - rcClient: rockscache.NewClient(rdb, opts), + BatchDeleter: rc.GetBatchDeleter(), + rcClient: rc, expireTime: time.Hour * 12, objDB: objDB, } @@ -42,7 +26,7 @@ func NewObjectCacheRedis(rdb redis.UniversalClient, objDB database.ObjectInfo) c type objectCacheRedis struct { cache.BatchDeleter objDB database.ObjectInfo - rcClient *rockscache.Client + rcClient *rocksCacheClient expireTime time.Duration } @@ -76,11 +60,10 @@ func (g *objectCacheRedis) GetName(ctx context.Context, engine string, name stri } func NewS3Cache(rdb redis.UniversalClient, s3 s3.Interface) cont.S3Cache { - opts := rockscache.NewDefaultOptions() - batchHandler := NewBatchDeleterRedis(rdb, &opts, nil) + rc := newRocksCacheClient(rdb) return &s3CacheRedis{ - BatchDeleter: batchHandler, - rcClient: rockscache.NewClient(rdb, opts), + BatchDeleter: rc.GetBatchDeleter(), + rcClient: rc, expireTime: time.Hour * 12, s3: s3, } @@ -89,7 +72,7 @@ func NewS3Cache(rdb redis.UniversalClient, s3 s3.Interface) cont.S3Cache { type s3CacheRedis struct { cache.BatchDeleter s3 s3.Interface - rcClient *rockscache.Client + rcClient *rocksCacheClient expireTime time.Duration } @@ -110,52 +93,3 @@ func (g *s3CacheRedis) GetKey(ctx context.Context, engine string, name string) ( return g.s3.StatObject(ctx, name) }) } - -func NewMinioCache(rdb redis.UniversalClient) minio.Cache { - opts := rockscache.NewDefaultOptions() - batchHandler := NewBatchDeleterRedis(rdb, &opts, nil) - return &minioCacheRedis{ - BatchDeleter: batchHandler, - rcClient: rockscache.NewClient(rdb, opts), - expireTime: time.Hour * 24 * 7, - } -} - -type minioCacheRedis struct { - cache.BatchDeleter - rcClient *rockscache.Client - expireTime time.Duration -} - -func (g *minioCacheRedis) getObjectImageInfoKey(key string) string { - return cachekey.GetObjectImageInfoKey(key) -} - -func (g *minioCacheRedis) getMinioImageThumbnailKey(key string, format string, width int, height int) string { - return cachekey.GetMinioImageThumbnailKey(key, format, width, height) -} - -func (g *minioCacheRedis) DelObjectImageInfoKey(ctx context.Context, keys ...string) error { - ks := make([]string, 0, len(keys)) - for _, key := range keys { - ks = append(ks, g.getObjectImageInfoKey(key)) - } - return g.BatchDeleter.ExecDelWithKeys(ctx, ks) -} - -func (g *minioCacheRedis) DelImageThumbnailKey(ctx context.Context, key string, format string, width int, height int) error { - return g.BatchDeleter.ExecDelWithKeys(ctx, []string{g.getMinioImageThumbnailKey(key, format, width, height)}) - -} - -func (g *minioCacheRedis) GetImageObjectKeyInfo(ctx context.Context, key string, fn func(ctx context.Context) (*minio.ImageInfo, error)) (*minio.ImageInfo, error) { - info, err := getCache(ctx, g.rcClient, g.getObjectImageInfoKey(key), g.expireTime, fn) - if err != nil { - return nil, err - } - return info, nil -} - -func (g *minioCacheRedis) GetThumbnailKey(ctx context.Context, key string, format string, width int, height int, minioCache func(ctx context.Context) (string, error)) (string, error) { - return getCache(ctx, g.rcClient, g.getMinioImageThumbnailKey(key, format, width, height), g.expireTime, minioCache) -} diff --git a/pkg/common/storage/cache/redis/seq_conversation.go b/pkg/common/storage/cache/redis/seq_conversation.go index 71705cef7..2ba69a7d6 100644 --- a/pkg/common/storage/cache/redis/seq_conversation.go +++ b/pkg/common/storage/cache/redis/seq_conversation.go @@ -4,7 +4,9 @@ import ( "context" "errors" "fmt" - "github.com/dtm-labs/rockscache" + "strconv" + "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" @@ -12,25 +14,21 @@ import ( "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/redis/go-redis/v9" - "strconv" - "time" ) func NewSeqConversationCacheRedis(rdb redis.UniversalClient, mgo database.SeqConversation) cache.SeqConversationCache { return &seqConversationCacheRedis{ - rdb: rdb, mgo: mgo, lockTime: time.Second * 3, dataTime: time.Hour * 24 * 365, minSeqExpireTime: time.Hour, - rocks: rockscache.NewClient(rdb, *GetRocksCacheOptions()), + rcClient: newRocksCacheClient(rdb), } } type seqConversationCacheRedis struct { - rdb redis.UniversalClient mgo database.SeqConversation - rocks *rockscache.Client + rcClient *rocksCacheClient lockTime time.Duration dataTime time.Duration minSeqExpireTime time.Duration @@ -45,7 +43,7 @@ func (s *seqConversationCacheRedis) SetMinSeq(ctx context.Context, conversationI } func (s *seqConversationCacheRedis) GetMinSeq(ctx context.Context, conversationID string) (int64, error) { - return getCache(ctx, s.rocks, s.getMinSeqKey(conversationID), s.minSeqExpireTime, func(ctx context.Context) (int64, error) { + return getCache(ctx, s.rcClient, s.getMinSeqKey(conversationID), s.minSeqExpireTime, func(ctx context.Context) (int64, error) { return s.mgo.GetMinSeq(ctx, conversationID) }) } @@ -68,7 +66,7 @@ func (s *seqConversationCacheRedis) getSingleMaxSeqWithTime(ctx context.Context, func (s *seqConversationCacheRedis) batchGetMaxSeq(ctx context.Context, keys []string, keyConversationID map[string]string, seqs map[string]int64) error { result := make([]*redis.StringCmd, len(keys)) - pipe := s.rdb.Pipeline() + pipe := s.rcClient.GetRedis().Pipeline() for i, key := range keys { result[i] = pipe.HGet(ctx, key, "CURR") } @@ -99,7 +97,7 @@ func (s *seqConversationCacheRedis) batchGetMaxSeq(ctx context.Context, keys []s func (s *seqConversationCacheRedis) batchGetMaxSeqWithTime(ctx context.Context, keys []string, keyConversationID map[string]string, seqs map[string]database.SeqTime) error { result := make([]*redis.SliceCmd, len(keys)) - pipe := s.rdb.Pipeline() + pipe := s.rcClient.GetRedis().Pipeline() for i, key := range keys { result[i] = pipe.HMGet(ctx, key, "CURR", "TIME") } @@ -157,7 +155,7 @@ func (s *seqConversationCacheRedis) GetMaxSeqs(ctx context.Context, conversation if len(keys) == 1 { return s.getSingleMaxSeq(ctx, conversationIDs[0]) } - slotKeys, err := groupKeysBySlot(ctx, s.rdb, keys) + slotKeys, err := groupKeysBySlot(ctx, s.rcClient.GetRedis(), keys) if err != nil { return nil, err } @@ -190,7 +188,7 @@ func (s *seqConversationCacheRedis) GetMaxSeqsWithTime(ctx context.Context, conv if len(keys) == 1 { return s.getSingleMaxSeqWithTime(ctx, conversationIDs[0]) } - slotKeys, err := groupKeysBySlot(ctx, s.rdb, keys) + slotKeys, err := groupKeysBySlot(ctx, s.rcClient.GetRedis(), keys) if err != nil { return nil, err } @@ -234,7 +232,7 @@ redis.call("HSET", key, "CURR", curr_seq, "LAST", last_seq, "TIME", mallocTime) redis.call("EXPIRE", key, dataSecond) return 0 ` - result, err := s.rdb.Eval(ctx, script, []string{key}, owner, int64(s.dataTime/time.Second), currSeq, lastSeq, mill).Int64() + result, err := s.rcClient.GetRedis().Eval(ctx, script, []string{key}, owner, int64(s.dataTime/time.Second), currSeq, lastSeq, mill).Int64() if err != nil { return 0, errs.Wrap(err) } @@ -305,7 +303,7 @@ table.insert(result, last_seq) table.insert(result, mallocTime) return result ` - result, err := s.rdb.Eval(ctx, script, []string{key}, size, int64(s.lockTime/time.Second), int64(s.dataTime/time.Second), time.Now().UnixMilli()).Int64Slice() + result, err := s.rcClient.GetRedis().Eval(ctx, script, []string{key}, size, int64(s.lockTime/time.Second), int64(s.dataTime/time.Second), time.Now().UnixMilli()).Int64Slice() if err != nil { return nil, errs.Wrap(err) } @@ -438,7 +436,7 @@ func (s *seqConversationCacheRedis) SetMinSeqs(ctx context.Context, seqs map[str return err } } - return DeleteCacheBySlot(ctx, s.rocks, keys) + return DeleteCacheBySlot(ctx, s.rcClient, keys) } // GetCacheMaxSeqWithTime only get the existing cache, if there is no cache, no cache will be generated @@ -456,7 +454,7 @@ func (s *seqConversationCacheRedis) GetCacheMaxSeqWithTime(ctx context.Context, key2conversationID[key] = conversationID keys = append(keys, key) } - slotKeys, err := groupKeysBySlot(ctx, s.rdb, keys) + slotKeys, err := groupKeysBySlot(ctx, s.rcClient.GetRedis(), keys) if err != nil { return nil, err } @@ -465,7 +463,7 @@ func (s *seqConversationCacheRedis) GetCacheMaxSeqWithTime(ctx context.Context, if len(keys) == 0 { continue } - pipe := s.rdb.Pipeline() + pipe := s.rcClient.GetRedis().Pipeline() cmds := make([]*redis.SliceCmd, 0, len(keys)) for _, key := range keys { cmds = append(cmds, pipe.HMGet(ctx, key, "CURR", "TIME")) diff --git a/pkg/common/storage/cache/redis/seq_user.go b/pkg/common/storage/cache/redis/seq_user.go index 0cedfeee1..ad289be07 100644 --- a/pkg/common/storage/cache/redis/seq_user.go +++ b/pkg/common/storage/cache/redis/seq_user.go @@ -2,31 +2,29 @@ package redis import ( "context" - "github.com/dtm-labs/rockscache" + "strconv" + "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/tools/errs" "github.com/redis/go-redis/v9" - "strconv" - "time" ) func NewSeqUserCacheRedis(rdb redis.UniversalClient, mgo database.SeqUser) cache.SeqUser { return &seqUserCacheRedis{ - rdb: rdb, mgo: mgo, readSeqWriteRatio: 100, expireTime: time.Hour * 24 * 7, readExpireTime: time.Hour * 24 * 30, - rocks: rockscache.NewClient(rdb, *GetRocksCacheOptions()), + rocks: newRocksCacheClient(rdb), } } type seqUserCacheRedis struct { - rdb redis.UniversalClient mgo database.SeqUser - rocks *rockscache.Client + rocks *rocksCacheClient expireTime time.Duration readExpireTime time.Duration readSeqWriteRatio int64 @@ -54,7 +52,7 @@ func (s *seqUserCacheRedis) SetUserMaxSeq(ctx context.Context, conversationID st if err := s.mgo.SetUserMaxSeq(ctx, conversationID, userID, seq); err != nil { return err } - return s.rocks.TagAsDeleted2(ctx, s.getSeqUserMaxSeqKey(conversationID, userID)) + return s.rocks.GetClient().TagAsDeleted2(ctx, s.getSeqUserMaxSeqKey(conversationID, userID)) } func (s *seqUserCacheRedis) GetUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) { @@ -79,7 +77,7 @@ func (s *seqUserCacheRedis) SetUserReadSeq(ctx context.Context, conversationID s return err } if dbSeq < seq { - if err := s.rocks.RawSet(ctx, s.getSeqUserReadSeqKey(conversationID, userID), strconv.Itoa(int(seq)), s.readExpireTime); err != nil { + if err := s.rocks.GetClient().RawSet(ctx, s.getSeqUserReadSeqKey(conversationID, userID), strconv.Itoa(int(seq)), s.readExpireTime); err != nil { return errs.Wrap(err) } } @@ -109,12 +107,12 @@ func (s *seqUserCacheRedis) setUserRedisReadSeqs(ctx context.Context, userID str keys = append(keys, key) keySeq[key] = seq } - slotKeys, err := groupKeysBySlot(ctx, s.rdb, keys) + slotKeys, err := groupKeysBySlot(ctx, s.rocks.GetRedis(), keys) if err != nil { return err } for _, keys := range slotKeys { - pipe := s.rdb.Pipeline() + pipe := s.rocks.GetRedis().Pipeline() for _, key := range keys { pipe.HSet(ctx, key, "value", strconv.FormatInt(keySeq[key], 10)) pipe.Expire(ctx, key, s.readExpireTime) diff --git a/pkg/common/storage/cache/redis/third.go b/pkg/common/storage/cache/redis/third.go index 3288cecb8..1ee6576c7 100644 --- a/pkg/common/storage/cache/redis/third.go +++ b/pkg/common/storage/cache/redis/third.go @@ -1,26 +1,13 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - package redis import ( "context" + "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/tools/errs" "github.com/redis/go-redis/v9" - "time" ) func NewThirdCache(rdb redis.UniversalClient) cache.ThirdCache { diff --git a/pkg/common/storage/cache/redis/todo.go b/pkg/common/storage/cache/redis/todo.go new file mode 100644 index 000000000..13438d6d4 --- /dev/null +++ b/pkg/common/storage/cache/redis/todo.go @@ -0,0 +1,3 @@ +package redis + +// todo: token online third minio diff --git a/pkg/common/storage/cache/redis/user.go b/pkg/common/storage/cache/redis/user.go index f6b490730..6729650b0 100644 --- a/pkg/common/storage/cache/redis/user.go +++ b/pkg/common/storage/cache/redis/user.go @@ -1,30 +1,16 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - package redis import ( "context" + "time" + "github.com/dtm-labs/rockscache" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" - "github.com/openimsdk/tools/log" "github.com/redis/go-redis/v9" - "time" ) const ( @@ -38,19 +24,17 @@ type UserCacheRedis struct { rdb redis.UniversalClient userDB database.User expireTime time.Duration - rcClient *rockscache.Client + rcClient *rocksCacheClient } func NewUserCacheRedis(rdb redis.UniversalClient, localCache *config.LocalCache, userDB database.User, options *rockscache.Options) cache.UserCache { - batchHandler := NewBatchDeleterRedis(rdb, options, []string{localCache.User.Topic}) - u := localCache.User - log.ZDebug(context.Background(), "user local cache init", "Topic", u.Topic, "SlotNum", u.SlotNum, "SlotSize", u.SlotSize, "enable", u.Enable()) + rc := newRocksCacheClient(rdb) return &UserCacheRedis{ - BatchDeleter: batchHandler, + BatchDeleter: rc.GetBatchDeleter(localCache.User.Topic), rdb: rdb, userDB: userDB, expireTime: userExpireTime, - rcClient: rockscache.NewClient(rdb, *options), + rcClient: rc, } } diff --git a/pkg/common/storage/controller/group.go b/pkg/common/storage/controller/group.go index 072429ed0..6de0432a3 100644 --- a/pkg/common/storage/controller/group.go +++ b/pkg/common/storage/controller/group.go @@ -140,7 +140,7 @@ func NewGroupDatabase( groupMemberDB: groupMemberDB, groupRequestDB: groupRequestDB, ctxTx: ctxTx, - cache: redis2.NewGroupCacheRedis(rdb, localCache, groupDB, groupMemberDB, groupRequestDB, groupHash, redis2.GetRocksCacheOptions()), + cache: redis2.NewGroupCacheRedis(rdb, localCache, groupDB, groupMemberDB, groupRequestDB, groupHash), } }