This commit is contained in:
withchao 2025-02-10 16:21:13 +08:00
parent 3d4ad4602c
commit ec6c840165
19 changed files with 298 additions and 429 deletions

View File

@ -96,7 +96,7 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, server gr
pbconversation.RegisterConversationServer(server, &conversationServer{ pbconversation.RegisterConversationServer(server, &conversationServer{
conversationNotificationSender: NewConversationNotificationSender(&config.NotificationConfig, msgClient), conversationNotificationSender: NewConversationNotificationSender(&config.NotificationConfig, msgClient),
conversationDatabase: controller.NewConversationDatabase(conversationDB, conversationDatabase: controller.NewConversationDatabase(conversationDB,
redis.NewConversationRedis(rdb, &config.LocalCacheConfig, redis.GetRocksCacheOptions(), conversationDB), mgocli.GetTx()), redis.NewConversationRedis(rdb, &config.LocalCacheConfig, conversationDB), mgocli.GetTx()),
userClient: rpcli.NewUserClient(userConn), userClient: rpcli.NewUserClient(userConn),
groupClient: rpcli.NewGroupClient(groupConn), groupClient: rpcli.NewGroupClient(groupConn),
msgClient: msgClient, msgClient: msgClient,

View File

@ -115,12 +115,12 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, server gr
db: controller.NewFriendDatabase( db: controller.NewFriendDatabase(
friendMongoDB, friendMongoDB,
friendRequestMongoDB, friendRequestMongoDB,
redis.NewFriendCacheRedis(rdb, &config.LocalCacheConfig, friendMongoDB, redis.GetRocksCacheOptions()), redis.NewFriendCacheRedis(rdb, &config.LocalCacheConfig, friendMongoDB),
mgocli.GetTx(), mgocli.GetTx(),
), ),
blackDatabase: controller.NewBlackDatabase( blackDatabase: controller.NewBlackDatabase(
blackMongoDB, blackMongoDB,
redis.NewBlackCacheRedis(rdb, &config.LocalCacheConfig, blackMongoDB, redis.GetRocksCacheOptions()), redis.NewBlackCacheRedis(rdb, &config.LocalCacheConfig, blackMongoDB),
), ),
notificationSender: notificationSender, notificationSender: notificationSender,
RegisterCenter: client, RegisterCenter: client,

View File

@ -3,28 +3,65 @@ package redis
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"time"
"github.com/dtm-labs/rockscache" "github.com/dtm-labs/rockscache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log" "github.com/openimsdk/tools/log"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
"golang.org/x/sync/singleflight"
"time"
"unsafe"
) )
func getRocksCacheRedisClient(cli *rockscache.Client) redis.UniversalClient { // GetRocksCacheOptions returns the default configuration options for RocksCache.
type Client struct { func GetRocksCacheOptions() *rockscache.Options {
rdb redis.UniversalClient opts := rockscache.NewDefaultOptions()
_ rockscache.Options opts.LockExpire = rocksCacheTimeout
_ singleflight.Group opts.WaitReplicasTimeout = rocksCacheTimeout
} opts.StrongConsistency = true
return (*Client)(unsafe.Pointer(cli)).rdb opts.RandomExpireAdjustment = 0.2
return &opts
} }
func batchGetCache2[K comparable, V any](ctx context.Context, rcClient *rockscache.Client, expire time.Duration, ids []K, idKey func(id K) string, vId func(v *V) K, fn func(ctx context.Context, ids []K) ([]*V, error)) ([]*V, error) { func newRocksCacheClient(rdb redis.UniversalClient) *rocksCacheClient {
if rdb == nil {
return &rocksCacheClient{}
}
rc := &rocksCacheClient{
rdb: rdb,
client: rockscache.NewClient(rdb, *GetRocksCacheOptions()),
}
return rc
}
type rocksCacheClient struct {
rdb redis.UniversalClient
client *rockscache.Client
}
func (x *rocksCacheClient) GetClient() *rockscache.Client {
return x.client
}
func (x *rocksCacheClient) Disable() bool {
return x.client == nil
}
func (x *rocksCacheClient) GetRedis() redis.UniversalClient {
return x.rdb
}
func (x *rocksCacheClient) GetBatchDeleter(topics ...string) cache.BatchDeleter {
return NewBatchDeleterRedis(x, topics)
}
func batchGetCache2[K comparable, V any](ctx context.Context, rcClient *rocksCacheClient, expire time.Duration, ids []K, idKey func(id K) string, vId func(v *V) K, fn func(ctx context.Context, ids []K) ([]*V, error)) ([]*V, error) {
if len(ids) == 0 { if len(ids) == 0 {
return nil, nil return nil, nil
} }
if rcClient.Disable() {
return fn(ctx, ids)
}
findKeys := make([]string, 0, len(ids)) findKeys := make([]string, 0, len(ids))
keyId := make(map[string]K) keyId := make(map[string]K)
for _, id := range ids { for _, id := range ids {
@ -35,13 +72,13 @@ func batchGetCache2[K comparable, V any](ctx context.Context, rcClient *rockscac
keyId[key] = id keyId[key] = id
findKeys = append(findKeys, key) findKeys = append(findKeys, key)
} }
slotKeys, err := groupKeysBySlot(ctx, getRocksCacheRedisClient(rcClient), findKeys) slotKeys, err := groupKeysBySlot(ctx, rcClient.GetRedis(), findKeys)
if err != nil { if err != nil {
return nil, err return nil, err
} }
result := make([]*V, 0, len(findKeys)) result := make([]*V, 0, len(findKeys))
for _, keys := range slotKeys { for _, keys := range slotKeys {
indexCache, err := rcClient.FetchBatch2(ctx, keys, expire, func(idx []int) (map[int]string, error) { indexCache, err := rcClient.GetClient().FetchBatch2(ctx, keys, expire, func(idx []int) (map[int]string, error) {
queryIds := make([]K, 0, len(idx)) queryIds := make([]K, 0, len(idx))
idIndex := make(map[K]int) idIndex := make(map[K]int)
for _, index := range idx { for _, index := range idx {

View File

@ -1,23 +1,11 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package redis package redis
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"time"
"github.com/dtm-labs/rockscache" "github.com/dtm-labs/rockscache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/localcache" "github.com/openimsdk/open-im-server/v3/pkg/localcache"
@ -25,7 +13,6 @@ import (
"github.com/openimsdk/tools/log" "github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/datautil"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
"time"
) )
const ( const (
@ -41,10 +28,10 @@ type BatchDeleterRedis struct {
} }
// NewBatchDeleterRedis creates a new BatchDeleterRedis instance. // NewBatchDeleterRedis creates a new BatchDeleterRedis instance.
func NewBatchDeleterRedis(redisClient redis.UniversalClient, options *rockscache.Options, redisPubTopics []string) *BatchDeleterRedis { func NewBatchDeleterRedis(rcClient *rocksCacheClient, redisPubTopics []string) *BatchDeleterRedis {
return &BatchDeleterRedis{ return &BatchDeleterRedis{
redisClient: redisClient, redisClient: rcClient.GetRedis(),
rocksClient: rockscache.NewClient(redisClient, *options), rocksClient: rcClient.GetClient(),
redisPubTopics: redisPubTopics, redisPubTopics: redisPubTopics,
} }
} }
@ -107,21 +94,29 @@ func (c *BatchDeleterRedis) AddKeys(keys ...string) {
c.keys = append(c.keys, keys...) c.keys = append(c.keys, keys...)
} }
// GetRocksCacheOptions returns the default configuration options for RocksCache. type disableBatchDeleter struct{}
func GetRocksCacheOptions() *rockscache.Options {
opts := rockscache.NewDefaultOptions()
opts.LockExpire = rocksCacheTimeout
opts.WaitReplicasTimeout = rocksCacheTimeout
opts.StrongConsistency = true
opts.RandomExpireAdjustment = 0.2
return &opts func (x disableBatchDeleter) ChainExecDel(ctx context.Context) error {
return nil
} }
func getCache[T any](ctx context.Context, rcClient *rockscache.Client, key string, expire time.Duration, fn func(ctx context.Context) (T, error)) (T, error) { func (x disableBatchDeleter) ExecDelWithKeys(ctx context.Context, keys []string) error {
return nil
}
func (x disableBatchDeleter) Clone() cache.BatchDeleter {
return x
}
func (x disableBatchDeleter) AddKeys(keys ...string) {}
func getCache[T any](ctx context.Context, rcClient *rocksCacheClient, key string, expire time.Duration, fn func(ctx context.Context) (T, error)) (T, error) {
if rcClient.Disable() {
return fn(ctx)
}
var t T var t T
var write bool var write bool
v, err := rcClient.Fetch2(ctx, key, expire, func() (s string, err error) { v, err := rcClient.GetClient().Fetch2(ctx, key, expire, func() (s string, err error) {
t, err = fn(ctx) t, err = fn(ctx)
if err != nil { if err != nil {
//log.ZError(ctx, "getCache query database failed", err, "key", key) //log.ZError(ctx, "getCache query database failed", err, "key", key)
@ -152,31 +147,3 @@ func getCache[T any](ctx context.Context, rcClient *rockscache.Client, key strin
return t, nil return t, nil
} }
//func batchGetCache[T any, K comparable](
// ctx context.Context,
// rcClient *rockscache.Client,
// expire time.Duration,
// keys []K,
// keyFn func(key K) string,
// fns func(ctx context.Context, key K) (T, error),
//) ([]T, error) {
// if len(keys) == 0 {
// return nil, nil
// }
// res := make([]T, 0, len(keys))
// for _, key := range keys {
// val, err := getCache(ctx, rcClient, keyFn(key), expire, func(ctx context.Context) (T, error) {
// return fns(ctx, key)
// })
// if err != nil {
// if errs.ErrRecordNotFound.Is(specialerror.ErrCode(errs.Unwrap(err))) {
// continue
// }
// return nil, errs.Wrap(err)
// }
// res = append(res, val)
// }
//
// return res, nil
//}

View File

@ -1,29 +1,14 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package redis package redis
import ( import (
"context" "context"
"github.com/dtm-labs/rockscache" "time"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/tools/log"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
"time"
) )
const ( const (
@ -33,18 +18,16 @@ const (
type BlackCacheRedis struct { type BlackCacheRedis struct {
cache.BatchDeleter cache.BatchDeleter
expireTime time.Duration expireTime time.Duration
rcClient *rockscache.Client rcClient *rocksCacheClient
blackDB database.Black blackDB database.Black
} }
func NewBlackCacheRedis(rdb redis.UniversalClient, localCache *config.LocalCache, blackDB database.Black, options *rockscache.Options) cache.BlackCache { func NewBlackCacheRedis(rdb redis.UniversalClient, localCache *config.LocalCache, blackDB database.Black) cache.BlackCache {
batchHandler := NewBatchDeleterRedis(rdb, options, []string{localCache.Friend.Topic}) rc := newRocksCacheClient(rdb)
b := localCache.Friend
log.ZDebug(context.Background(), "black local cache init", "Topic", b.Topic, "SlotNum", b.SlotNum, "SlotSize", b.SlotSize, "enable", b.Enable())
return &BlackCacheRedis{ return &BlackCacheRedis{
BatchDeleter: batchHandler, BatchDeleter: rc.GetBatchDeleter(localCache.Friend.Topic),
expireTime: blackExpireTime, expireTime: blackExpireTime,
rcClient: rockscache.NewClient(rdb, *options), rcClient: rc,
blackDB: blackDB, blackDB: blackDB,
} }
} }

View File

@ -1,47 +1,30 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package redis package redis
import ( import (
"context" "context"
"github.com/dtm-labs/rockscache" "math/big"
"strings"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/encrypt" "github.com/openimsdk/tools/utils/encrypt"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
"math/big"
"strings"
"time"
) )
const ( const (
conversationExpireTime = time.Second * 60 * 60 * 12 conversationExpireTime = time.Second * 60 * 60 * 12
) )
func NewConversationRedis(rdb redis.UniversalClient, localCache *config.LocalCache, opts *rockscache.Options, db database.Conversation) cache.ConversationCache { func NewConversationRedis(rdb redis.UniversalClient, localCache *config.LocalCache, db database.Conversation) cache.ConversationCache {
batchHandler := NewBatchDeleterRedis(rdb, opts, []string{localCache.Conversation.Topic}) rc := newRocksCacheClient(rdb)
c := localCache.Conversation
log.ZDebug(context.Background(), "conversation local cache init", "Topic", c.Topic, "SlotNum", c.SlotNum, "SlotSize", c.SlotSize, "enable", c.Enable())
return &ConversationRedisCache{ return &ConversationRedisCache{
BatchDeleter: batchHandler, BatchDeleter: rc.GetBatchDeleter(localCache.Conversation.Topic),
rcClient: rockscache.NewClient(rdb, *opts), rcClient: rc,
conversationDB: db, conversationDB: db,
expireTime: conversationExpireTime, expireTime: conversationExpireTime,
} }
@ -49,7 +32,7 @@ func NewConversationRedis(rdb redis.UniversalClient, localCache *config.LocalCac
type ConversationRedisCache struct { type ConversationRedisCache struct {
cache.BatchDeleter cache.BatchDeleter
rcClient *rockscache.Client rcClient *rocksCacheClient
conversationDB database.Conversation conversationDB database.Conversation
expireTime time.Duration expireTime time.Duration
} }

View File

@ -1,15 +0,0 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package redis

View File

@ -1,30 +1,14 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package redis package redis
import ( import (
"context" "context"
"time" "time"
"github.com/dtm-labs/rockscache"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/datautil"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
) )
@ -38,21 +22,18 @@ type FriendCacheRedis struct {
cache.BatchDeleter cache.BatchDeleter
friendDB database.Friend friendDB database.Friend
expireTime time.Duration expireTime time.Duration
rcClient *rockscache.Client rcClient *rocksCacheClient
syncCount int syncCount int
} }
// NewFriendCacheRedis creates a new instance of FriendCacheRedis. // NewFriendCacheRedis creates a new instance of FriendCacheRedis.
func NewFriendCacheRedis(rdb redis.UniversalClient, localCache *config.LocalCache, friendDB database.Friend, func NewFriendCacheRedis(rdb redis.UniversalClient, localCache *config.LocalCache, friendDB database.Friend) cache.FriendCache {
options *rockscache.Options) cache.FriendCache { rc := newRocksCacheClient(rdb)
batchHandler := NewBatchDeleterRedis(rdb, options, []string{localCache.Friend.Topic})
f := localCache.Friend
log.ZDebug(context.Background(), "friend local cache init", "Topic", f.Topic, "SlotNum", f.SlotNum, "SlotSize", f.SlotSize, "enable", f.Enable())
return &FriendCacheRedis{ return &FriendCacheRedis{
BatchDeleter: batchHandler, BatchDeleter: rc.GetBatchDeleter(localCache.Friend.Topic),
friendDB: friendDB, friendDB: friendDB,
expireTime: friendExpireTime, expireTime: friendExpireTime,
rcClient: rockscache.NewClient(rdb, *options), rcClient: rc,
} }
} }

View File

@ -1,17 +1,3 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package redis package redis
import ( import (
@ -19,7 +5,6 @@ import (
"fmt" "fmt"
"time" "time"
"github.com/dtm-labs/rockscache"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
@ -36,34 +21,21 @@ const (
groupExpireTime = time.Second * 60 * 60 * 12 groupExpireTime = time.Second * 60 * 60 * 12
) )
var errIndex = errs.New("err index")
type GroupCacheRedis struct { type GroupCacheRedis struct {
cache.BatchDeleter cache.BatchDeleter
groupDB database.Group groupDB database.Group
groupMemberDB database.GroupMember groupMemberDB database.GroupMember
groupRequestDB database.GroupRequest groupRequestDB database.GroupRequest
expireTime time.Duration expireTime time.Duration
rcClient *rockscache.Client rcClient *rocksCacheClient
groupHash cache.GroupHash groupHash cache.GroupHash
} }
func NewGroupCacheRedis( func NewGroupCacheRedis(rdb redis.UniversalClient, localCache *config.LocalCache, groupDB database.Group, groupMemberDB database.GroupMember, groupRequestDB database.GroupRequest, hashCode cache.GroupHash) cache.GroupCache {
rdb redis.UniversalClient, rc := newRocksCacheClient(rdb)
localCache *config.LocalCache,
groupDB database.Group,
groupMemberDB database.GroupMember,
groupRequestDB database.GroupRequest,
hashCode cache.GroupHash,
opts *rockscache.Options,
) cache.GroupCache {
batchHandler := NewBatchDeleterRedis(rdb, opts, []string{localCache.Group.Topic})
g := localCache.Group
log.ZDebug(context.Background(), "group local cache init", "Topic", g.Topic, "SlotNum", g.SlotNum, "SlotSize", g.SlotSize, "enable", g.Enable())
return &GroupCacheRedis{ return &GroupCacheRedis{
BatchDeleter: batchHandler, BatchDeleter: rc.GetBatchDeleter(localCache.Group.Topic),
rcClient: rockscache.NewClient(rdb, *opts), rcClient: rc,
expireTime: groupExpireTime, expireTime: groupExpireTime,
groupDB: groupDB, groupDB: groupDB,
groupMemberDB: groupMemberDB, groupMemberDB: groupMemberDB,

59
pkg/common/storage/cache/redis/minio.go vendored Normal file
View File

@ -0,0 +1,59 @@
package redis
import (
"context"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/tools/s3/minio"
"github.com/redis/go-redis/v9"
)
func NewMinioCache(rdb redis.UniversalClient) minio.Cache {
rc := newRocksCacheClient(rdb)
return &minioCacheRedis{
BatchDeleter: rc.GetBatchDeleter(),
rcClient: rc,
expireTime: time.Hour * 24 * 7,
}
}
type minioCacheRedis struct {
cache.BatchDeleter
rcClient *rocksCacheClient
expireTime time.Duration
}
func (g *minioCacheRedis) getObjectImageInfoKey(key string) string {
return cachekey.GetObjectImageInfoKey(key)
}
func (g *minioCacheRedis) getMinioImageThumbnailKey(key string, format string, width int, height int) string {
return cachekey.GetMinioImageThumbnailKey(key, format, width, height)
}
func (g *minioCacheRedis) DelObjectImageInfoKey(ctx context.Context, keys ...string) error {
ks := make([]string, 0, len(keys))
for _, key := range keys {
ks = append(ks, g.getObjectImageInfoKey(key))
}
return g.BatchDeleter.ExecDelWithKeys(ctx, ks)
}
func (g *minioCacheRedis) DelImageThumbnailKey(ctx context.Context, key string, format string, width int, height int) error {
return g.BatchDeleter.ExecDelWithKeys(ctx, []string{g.getMinioImageThumbnailKey(key, format, width, height)})
}
func (g *minioCacheRedis) GetImageObjectKeyInfo(ctx context.Context, key string, fn func(ctx context.Context) (*minio.ImageInfo, error)) (*minio.ImageInfo, error) {
info, err := getCache(ctx, g.rcClient, g.getObjectImageInfoKey(key), g.expireTime, fn)
if err != nil {
return nil, err
}
return info, nil
}
func (g *minioCacheRedis) GetThumbnailKey(ctx context.Context, key string, format string, width int, height int, minioCache func(ctx context.Context) (string, error)) (string, error) {
return getCache(ctx, g.rcClient, g.getMinioImageThumbnailKey(key, format, width, height), g.expireTime, minioCache)
}

View File

@ -3,7 +3,8 @@ package redis
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"github.com/dtm-labs/rockscache" "time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
@ -11,7 +12,6 @@ import (
"github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/datautil"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
"time"
) // ) //
// msgCacheTimeout is expiration time of message cache, 86400 seconds // msgCacheTimeout is expiration time of message cache, 86400 seconds
@ -19,15 +19,13 @@ const msgCacheTimeout = time.Hour * 24
func NewMsgCache(client redis.UniversalClient, db database.Msg) cache.MsgCache { func NewMsgCache(client redis.UniversalClient, db database.Msg) cache.MsgCache {
return &msgCache{ return &msgCache{
rdb: client, rcClient: newRocksCacheClient(client),
rcClient: rockscache.NewClient(client, *GetRocksCacheOptions()),
msgDocDatabase: db, msgDocDatabase: db,
} }
} }
type msgCache struct { type msgCache struct {
rdb redis.UniversalClient rcClient *rocksCacheClient
rcClient *rockscache.Client
msgDocDatabase database.Msg msgDocDatabase database.Msg
} }
@ -36,11 +34,11 @@ func (c *msgCache) getSendMsgKey(id string) string {
} }
func (c *msgCache) SetSendMsgStatus(ctx context.Context, id string, status int32) error { func (c *msgCache) SetSendMsgStatus(ctx context.Context, id string, status int32) error {
return errs.Wrap(c.rdb.Set(ctx, c.getSendMsgKey(id), status, time.Hour*24).Err()) return errs.Wrap(c.rcClient.GetRedis().Set(ctx, c.getSendMsgKey(id), status, time.Hour*24).Err())
} }
func (c *msgCache) GetSendMsgStatus(ctx context.Context, id string) (int32, error) { func (c *msgCache) GetSendMsgStatus(ctx context.Context, id string) (int32, error) {
result, err := c.rdb.Get(ctx, c.getSendMsgKey(id)).Int() result, err := c.rcClient.GetRedis().Get(ctx, c.getSendMsgKey(id)).Int()
return int32(result), errs.Wrap(err) return int32(result), errs.Wrap(err)
} }
@ -67,12 +65,12 @@ func (c *msgCache) DelMessageBySeqs(ctx context.Context, conversationID string,
keys := datautil.Slice(seqs, func(seq int64) string { keys := datautil.Slice(seqs, func(seq int64) string {
return cachekey.GetMsgCacheKey(conversationID, seq) return cachekey.GetMsgCacheKey(conversationID, seq)
}) })
slotKeys, err := groupKeysBySlot(ctx, getRocksCacheRedisClient(c.rcClient), keys) slotKeys, err := groupKeysBySlot(ctx, c.rcClient.GetRedis(), keys)
if err != nil { if err != nil {
return err return err
} }
for _, keys := range slotKeys { for _, keys := range slotKeys {
if err := c.rcClient.TagAsDeletedBatch2(ctx, keys); err != nil { if err := c.rcClient.GetClient().TagAsDeletedBatch2(ctx, keys); err != nil {
return err return err
} }
} }
@ -88,7 +86,7 @@ func (c *msgCache) SetMessageBySeqs(ctx context.Context, conversationID string,
if err != nil { if err != nil {
return err return err
} }
if err := c.rcClient.RawSet(ctx, cachekey.GetMsgCacheKey(conversationID, msg.Msg.Seq), string(data), msgCacheTimeout); err != nil { if err := c.rcClient.GetClient().RawSet(ctx, cachekey.GetMsgCacheKey(conversationID, msg.Msg.Seq), string(data), msgCacheTimeout); err != nil {
return err return err
} }
} }

View File

@ -2,7 +2,7 @@ package redis
import ( import (
"context" "context"
"github.com/dtm-labs/rockscache"
"github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log" "github.com/openimsdk/tools/log"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
@ -28,83 +28,83 @@ type Config struct {
// Option is a function type for configuring Config // Option is a function type for configuring Config
type Option func(c *Config) type Option func(c *Config)
// NewRedisShardManager creates a new RedisShardManager instance //// NewRedisShardManager creates a new RedisShardManager instance
func NewRedisShardManager(redisClient redis.UniversalClient, opts ...Option) *RedisShardManager { //func NewRedisShardManager(redisClient redis.UniversalClient, opts ...Option) *RedisShardManager {
config := &Config{ // config := &Config{
batchSize: defaultBatchSize, // Default batch size is 50 keys // batchSize: defaultBatchSize, // Default batch size is 50 keys
continueOnError: false, // continueOnError: false,
concurrentLimit: defaultConcurrentLimit, // Default concurrent limit is 3 // concurrentLimit: defaultConcurrentLimit, // Default concurrent limit is 3
} // }
for _, opt := range opts { // for _, opt := range opts {
opt(config) // opt(config)
} // }
rsm := &RedisShardManager{ // rsm := &RedisShardManager{
redisClient: redisClient, // redisClient: redisClient,
config: config, // config: config,
} // }
return rsm // return rsm
} //}
//
// WithBatchSize sets the number of keys to process per batch //// WithBatchSize sets the number of keys to process per batch
func WithBatchSize(size int) Option { //func WithBatchSize(size int) Option {
return func(c *Config) { // return func(c *Config) {
c.batchSize = size // c.batchSize = size
} // }
} //}
//
// WithContinueOnError sets whether to continue processing on error //// WithContinueOnError sets whether to continue processing on error
func WithContinueOnError(continueOnError bool) Option { //func WithContinueOnError(continueOnError bool) Option {
return func(c *Config) { // return func(c *Config) {
c.continueOnError = continueOnError // c.continueOnError = continueOnError
} // }
} //}
//
// WithConcurrentLimit sets the concurrency limit //// WithConcurrentLimit sets the concurrency limit
func WithConcurrentLimit(limit int) Option { //func WithConcurrentLimit(limit int) Option {
return func(c *Config) { // return func(c *Config) {
c.concurrentLimit = limit // c.concurrentLimit = limit
} // }
} //}
//
// ProcessKeysBySlot groups keys by their Redis cluster hash slots and processes them using the provided function. //// ProcessKeysBySlot groups keys by their Redis cluster hash slots and processes them using the provided function.
func (rsm *RedisShardManager) ProcessKeysBySlot( //func (rsm *RedisShardManager) ProcessKeysBySlot(
ctx context.Context, // ctx context.Context,
keys []string, // keys []string,
processFunc func(ctx context.Context, slot int64, keys []string) error, // processFunc func(ctx context.Context, slot int64, keys []string) error,
) error { //) error {
//
// Group keys by slot // // Group keys by slot
slots, err := groupKeysBySlot(ctx, rsm.redisClient, keys) // slots, err := groupKeysBySlot(ctx, rsm.redisClient, keys)
if err != nil { // if err != nil {
return err // return err
} // }
//
g, ctx := errgroup.WithContext(ctx) // g, ctx := errgroup.WithContext(ctx)
g.SetLimit(rsm.config.concurrentLimit) // g.SetLimit(rsm.config.concurrentLimit)
//
// Process keys in each slot using the provided function // // Process keys in each slot using the provided function
for slot, singleSlotKeys := range slots { // for slot, singleSlotKeys := range slots {
batches := splitIntoBatches(singleSlotKeys, rsm.config.batchSize) // batches := splitIntoBatches(singleSlotKeys, rsm.config.batchSize)
for _, batch := range batches { // for _, batch := range batches {
slot, batch := slot, batch // Avoid closure capture issue // slot, batch := slot, batch // Avoid closure capture issue
g.Go(func() error { // g.Go(func() error {
err := processFunc(ctx, slot, batch) // err := processFunc(ctx, slot, batch)
if err != nil { // if err != nil {
log.ZWarn(ctx, "Batch processFunc failed", err, "slot", slot, "keys", batch) // log.ZWarn(ctx, "Batch processFunc failed", err, "slot", slot, "keys", batch)
if !rsm.config.continueOnError { // if !rsm.config.continueOnError {
return err // return err
} // }
} // }
return nil // return nil
}) // })
} // }
} // }
//
if err := g.Wait(); err != nil { // if err := g.Wait(); err != nil {
return err // return err
} // }
return nil // return nil
} //}
// groupKeysBySlot groups keys by their Redis cluster hash slots. // groupKeysBySlot groups keys by their Redis cluster hash slots.
func groupKeysBySlot(ctx context.Context, redisClient redis.UniversalClient, keys []string) (map[int64][]string, error) { func groupKeysBySlot(ctx context.Context, redisClient redis.UniversalClient, keys []string) (map[int64][]string, error) {
@ -197,15 +197,15 @@ func ProcessKeysBySlot(
return nil return nil
} }
func DeleteCacheBySlot(ctx context.Context, rcClient *rockscache.Client, keys []string) error { func DeleteCacheBySlot(ctx context.Context, rcClient *rocksCacheClient, keys []string) error {
switch len(keys) { switch len(keys) {
case 0: case 0:
return nil return nil
case 1: case 1:
return rcClient.TagAsDeletedBatch2(ctx, keys) return rcClient.GetClient().TagAsDeletedBatch2(ctx, keys)
default: default:
return ProcessKeysBySlot(ctx, getRocksCacheRedisClient(rcClient), keys, func(ctx context.Context, slot int64, keys []string) error { return ProcessKeysBySlot(ctx, rcClient.GetRedis(), keys, func(ctx context.Context, slot int64, keys []string) error {
return rcClient.TagAsDeletedBatch2(ctx, keys) return rcClient.GetClient().TagAsDeletedBatch2(ctx, keys)
}) })
} }
} }

View File

@ -1,39 +1,23 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package redis package redis
import ( import (
"context" "context"
"github.com/dtm-labs/rockscache" "time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/s3" "github.com/openimsdk/tools/s3"
"github.com/openimsdk/tools/s3/cont" "github.com/openimsdk/tools/s3/cont"
"github.com/openimsdk/tools/s3/minio"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
"time"
) )
func NewObjectCacheRedis(rdb redis.UniversalClient, objDB database.ObjectInfo) cache.ObjectCache { func NewObjectCacheRedis(rdb redis.UniversalClient, objDB database.ObjectInfo) cache.ObjectCache {
opts := rockscache.NewDefaultOptions() rc := newRocksCacheClient(rdb)
batchHandler := NewBatchDeleterRedis(rdb, &opts, nil)
return &objectCacheRedis{ return &objectCacheRedis{
BatchDeleter: batchHandler, BatchDeleter: rc.GetBatchDeleter(),
rcClient: rockscache.NewClient(rdb, opts), rcClient: rc,
expireTime: time.Hour * 12, expireTime: time.Hour * 12,
objDB: objDB, objDB: objDB,
} }
@ -42,7 +26,7 @@ func NewObjectCacheRedis(rdb redis.UniversalClient, objDB database.ObjectInfo) c
type objectCacheRedis struct { type objectCacheRedis struct {
cache.BatchDeleter cache.BatchDeleter
objDB database.ObjectInfo objDB database.ObjectInfo
rcClient *rockscache.Client rcClient *rocksCacheClient
expireTime time.Duration expireTime time.Duration
} }
@ -76,11 +60,10 @@ func (g *objectCacheRedis) GetName(ctx context.Context, engine string, name stri
} }
func NewS3Cache(rdb redis.UniversalClient, s3 s3.Interface) cont.S3Cache { func NewS3Cache(rdb redis.UniversalClient, s3 s3.Interface) cont.S3Cache {
opts := rockscache.NewDefaultOptions() rc := newRocksCacheClient(rdb)
batchHandler := NewBatchDeleterRedis(rdb, &opts, nil)
return &s3CacheRedis{ return &s3CacheRedis{
BatchDeleter: batchHandler, BatchDeleter: rc.GetBatchDeleter(),
rcClient: rockscache.NewClient(rdb, opts), rcClient: rc,
expireTime: time.Hour * 12, expireTime: time.Hour * 12,
s3: s3, s3: s3,
} }
@ -89,7 +72,7 @@ func NewS3Cache(rdb redis.UniversalClient, s3 s3.Interface) cont.S3Cache {
type s3CacheRedis struct { type s3CacheRedis struct {
cache.BatchDeleter cache.BatchDeleter
s3 s3.Interface s3 s3.Interface
rcClient *rockscache.Client rcClient *rocksCacheClient
expireTime time.Duration expireTime time.Duration
} }
@ -110,52 +93,3 @@ func (g *s3CacheRedis) GetKey(ctx context.Context, engine string, name string) (
return g.s3.StatObject(ctx, name) return g.s3.StatObject(ctx, name)
}) })
} }
func NewMinioCache(rdb redis.UniversalClient) minio.Cache {
opts := rockscache.NewDefaultOptions()
batchHandler := NewBatchDeleterRedis(rdb, &opts, nil)
return &minioCacheRedis{
BatchDeleter: batchHandler,
rcClient: rockscache.NewClient(rdb, opts),
expireTime: time.Hour * 24 * 7,
}
}
type minioCacheRedis struct {
cache.BatchDeleter
rcClient *rockscache.Client
expireTime time.Duration
}
func (g *minioCacheRedis) getObjectImageInfoKey(key string) string {
return cachekey.GetObjectImageInfoKey(key)
}
func (g *minioCacheRedis) getMinioImageThumbnailKey(key string, format string, width int, height int) string {
return cachekey.GetMinioImageThumbnailKey(key, format, width, height)
}
func (g *minioCacheRedis) DelObjectImageInfoKey(ctx context.Context, keys ...string) error {
ks := make([]string, 0, len(keys))
for _, key := range keys {
ks = append(ks, g.getObjectImageInfoKey(key))
}
return g.BatchDeleter.ExecDelWithKeys(ctx, ks)
}
func (g *minioCacheRedis) DelImageThumbnailKey(ctx context.Context, key string, format string, width int, height int) error {
return g.BatchDeleter.ExecDelWithKeys(ctx, []string{g.getMinioImageThumbnailKey(key, format, width, height)})
}
func (g *minioCacheRedis) GetImageObjectKeyInfo(ctx context.Context, key string, fn func(ctx context.Context) (*minio.ImageInfo, error)) (*minio.ImageInfo, error) {
info, err := getCache(ctx, g.rcClient, g.getObjectImageInfoKey(key), g.expireTime, fn)
if err != nil {
return nil, err
}
return info, nil
}
func (g *minioCacheRedis) GetThumbnailKey(ctx context.Context, key string, format string, width int, height int, minioCache func(ctx context.Context) (string, error)) (string, error) {
return getCache(ctx, g.rcClient, g.getMinioImageThumbnailKey(key, format, width, height), g.expireTime, minioCache)
}

View File

@ -4,7 +4,9 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"github.com/dtm-labs/rockscache" "strconv"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
@ -12,25 +14,21 @@ import (
"github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log" "github.com/openimsdk/tools/log"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
"strconv"
"time"
) )
func NewSeqConversationCacheRedis(rdb redis.UniversalClient, mgo database.SeqConversation) cache.SeqConversationCache { func NewSeqConversationCacheRedis(rdb redis.UniversalClient, mgo database.SeqConversation) cache.SeqConversationCache {
return &seqConversationCacheRedis{ return &seqConversationCacheRedis{
rdb: rdb,
mgo: mgo, mgo: mgo,
lockTime: time.Second * 3, lockTime: time.Second * 3,
dataTime: time.Hour * 24 * 365, dataTime: time.Hour * 24 * 365,
minSeqExpireTime: time.Hour, minSeqExpireTime: time.Hour,
rocks: rockscache.NewClient(rdb, *GetRocksCacheOptions()), rcClient: newRocksCacheClient(rdb),
} }
} }
type seqConversationCacheRedis struct { type seqConversationCacheRedis struct {
rdb redis.UniversalClient
mgo database.SeqConversation mgo database.SeqConversation
rocks *rockscache.Client rcClient *rocksCacheClient
lockTime time.Duration lockTime time.Duration
dataTime time.Duration dataTime time.Duration
minSeqExpireTime time.Duration minSeqExpireTime time.Duration
@ -45,7 +43,7 @@ func (s *seqConversationCacheRedis) SetMinSeq(ctx context.Context, conversationI
} }
func (s *seqConversationCacheRedis) GetMinSeq(ctx context.Context, conversationID string) (int64, error) { func (s *seqConversationCacheRedis) GetMinSeq(ctx context.Context, conversationID string) (int64, error) {
return getCache(ctx, s.rocks, s.getMinSeqKey(conversationID), s.minSeqExpireTime, func(ctx context.Context) (int64, error) { return getCache(ctx, s.rcClient, s.getMinSeqKey(conversationID), s.minSeqExpireTime, func(ctx context.Context) (int64, error) {
return s.mgo.GetMinSeq(ctx, conversationID) return s.mgo.GetMinSeq(ctx, conversationID)
}) })
} }
@ -68,7 +66,7 @@ func (s *seqConversationCacheRedis) getSingleMaxSeqWithTime(ctx context.Context,
func (s *seqConversationCacheRedis) batchGetMaxSeq(ctx context.Context, keys []string, keyConversationID map[string]string, seqs map[string]int64) error { func (s *seqConversationCacheRedis) batchGetMaxSeq(ctx context.Context, keys []string, keyConversationID map[string]string, seqs map[string]int64) error {
result := make([]*redis.StringCmd, len(keys)) result := make([]*redis.StringCmd, len(keys))
pipe := s.rdb.Pipeline() pipe := s.rcClient.GetRedis().Pipeline()
for i, key := range keys { for i, key := range keys {
result[i] = pipe.HGet(ctx, key, "CURR") result[i] = pipe.HGet(ctx, key, "CURR")
} }
@ -99,7 +97,7 @@ func (s *seqConversationCacheRedis) batchGetMaxSeq(ctx context.Context, keys []s
func (s *seqConversationCacheRedis) batchGetMaxSeqWithTime(ctx context.Context, keys []string, keyConversationID map[string]string, seqs map[string]database.SeqTime) error { func (s *seqConversationCacheRedis) batchGetMaxSeqWithTime(ctx context.Context, keys []string, keyConversationID map[string]string, seqs map[string]database.SeqTime) error {
result := make([]*redis.SliceCmd, len(keys)) result := make([]*redis.SliceCmd, len(keys))
pipe := s.rdb.Pipeline() pipe := s.rcClient.GetRedis().Pipeline()
for i, key := range keys { for i, key := range keys {
result[i] = pipe.HMGet(ctx, key, "CURR", "TIME") result[i] = pipe.HMGet(ctx, key, "CURR", "TIME")
} }
@ -157,7 +155,7 @@ func (s *seqConversationCacheRedis) GetMaxSeqs(ctx context.Context, conversation
if len(keys) == 1 { if len(keys) == 1 {
return s.getSingleMaxSeq(ctx, conversationIDs[0]) return s.getSingleMaxSeq(ctx, conversationIDs[0])
} }
slotKeys, err := groupKeysBySlot(ctx, s.rdb, keys) slotKeys, err := groupKeysBySlot(ctx, s.rcClient.GetRedis(), keys)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -190,7 +188,7 @@ func (s *seqConversationCacheRedis) GetMaxSeqsWithTime(ctx context.Context, conv
if len(keys) == 1 { if len(keys) == 1 {
return s.getSingleMaxSeqWithTime(ctx, conversationIDs[0]) return s.getSingleMaxSeqWithTime(ctx, conversationIDs[0])
} }
slotKeys, err := groupKeysBySlot(ctx, s.rdb, keys) slotKeys, err := groupKeysBySlot(ctx, s.rcClient.GetRedis(), keys)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -234,7 +232,7 @@ redis.call("HSET", key, "CURR", curr_seq, "LAST", last_seq, "TIME", mallocTime)
redis.call("EXPIRE", key, dataSecond) redis.call("EXPIRE", key, dataSecond)
return 0 return 0
` `
result, err := s.rdb.Eval(ctx, script, []string{key}, owner, int64(s.dataTime/time.Second), currSeq, lastSeq, mill).Int64() result, err := s.rcClient.GetRedis().Eval(ctx, script, []string{key}, owner, int64(s.dataTime/time.Second), currSeq, lastSeq, mill).Int64()
if err != nil { if err != nil {
return 0, errs.Wrap(err) return 0, errs.Wrap(err)
} }
@ -305,7 +303,7 @@ table.insert(result, last_seq)
table.insert(result, mallocTime) table.insert(result, mallocTime)
return result return result
` `
result, err := s.rdb.Eval(ctx, script, []string{key}, size, int64(s.lockTime/time.Second), int64(s.dataTime/time.Second), time.Now().UnixMilli()).Int64Slice() result, err := s.rcClient.GetRedis().Eval(ctx, script, []string{key}, size, int64(s.lockTime/time.Second), int64(s.dataTime/time.Second), time.Now().UnixMilli()).Int64Slice()
if err != nil { if err != nil {
return nil, errs.Wrap(err) return nil, errs.Wrap(err)
} }
@ -438,7 +436,7 @@ func (s *seqConversationCacheRedis) SetMinSeqs(ctx context.Context, seqs map[str
return err return err
} }
} }
return DeleteCacheBySlot(ctx, s.rocks, keys) return DeleteCacheBySlot(ctx, s.rcClient, keys)
} }
// GetCacheMaxSeqWithTime only get the existing cache, if there is no cache, no cache will be generated // GetCacheMaxSeqWithTime only get the existing cache, if there is no cache, no cache will be generated
@ -456,7 +454,7 @@ func (s *seqConversationCacheRedis) GetCacheMaxSeqWithTime(ctx context.Context,
key2conversationID[key] = conversationID key2conversationID[key] = conversationID
keys = append(keys, key) keys = append(keys, key)
} }
slotKeys, err := groupKeysBySlot(ctx, s.rdb, keys) slotKeys, err := groupKeysBySlot(ctx, s.rcClient.GetRedis(), keys)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -465,7 +463,7 @@ func (s *seqConversationCacheRedis) GetCacheMaxSeqWithTime(ctx context.Context,
if len(keys) == 0 { if len(keys) == 0 {
continue continue
} }
pipe := s.rdb.Pipeline() pipe := s.rcClient.GetRedis().Pipeline()
cmds := make([]*redis.SliceCmd, 0, len(keys)) cmds := make([]*redis.SliceCmd, 0, len(keys))
for _, key := range keys { for _, key := range keys {
cmds = append(cmds, pipe.HMGet(ctx, key, "CURR", "TIME")) cmds = append(cmds, pipe.HMGet(ctx, key, "CURR", "TIME"))

View File

@ -2,31 +2,29 @@ package redis
import ( import (
"context" "context"
"github.com/dtm-labs/rockscache" "strconv"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/errs"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
"strconv"
"time"
) )
func NewSeqUserCacheRedis(rdb redis.UniversalClient, mgo database.SeqUser) cache.SeqUser { func NewSeqUserCacheRedis(rdb redis.UniversalClient, mgo database.SeqUser) cache.SeqUser {
return &seqUserCacheRedis{ return &seqUserCacheRedis{
rdb: rdb,
mgo: mgo, mgo: mgo,
readSeqWriteRatio: 100, readSeqWriteRatio: 100,
expireTime: time.Hour * 24 * 7, expireTime: time.Hour * 24 * 7,
readExpireTime: time.Hour * 24 * 30, readExpireTime: time.Hour * 24 * 30,
rocks: rockscache.NewClient(rdb, *GetRocksCacheOptions()), rocks: newRocksCacheClient(rdb),
} }
} }
type seqUserCacheRedis struct { type seqUserCacheRedis struct {
rdb redis.UniversalClient
mgo database.SeqUser mgo database.SeqUser
rocks *rockscache.Client rocks *rocksCacheClient
expireTime time.Duration expireTime time.Duration
readExpireTime time.Duration readExpireTime time.Duration
readSeqWriteRatio int64 readSeqWriteRatio int64
@ -54,7 +52,7 @@ func (s *seqUserCacheRedis) SetUserMaxSeq(ctx context.Context, conversationID st
if err := s.mgo.SetUserMaxSeq(ctx, conversationID, userID, seq); err != nil { if err := s.mgo.SetUserMaxSeq(ctx, conversationID, userID, seq); err != nil {
return err return err
} }
return s.rocks.TagAsDeleted2(ctx, s.getSeqUserMaxSeqKey(conversationID, userID)) return s.rocks.GetClient().TagAsDeleted2(ctx, s.getSeqUserMaxSeqKey(conversationID, userID))
} }
func (s *seqUserCacheRedis) GetUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) { func (s *seqUserCacheRedis) GetUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
@ -79,7 +77,7 @@ func (s *seqUserCacheRedis) SetUserReadSeq(ctx context.Context, conversationID s
return err return err
} }
if dbSeq < seq { if dbSeq < seq {
if err := s.rocks.RawSet(ctx, s.getSeqUserReadSeqKey(conversationID, userID), strconv.Itoa(int(seq)), s.readExpireTime); err != nil { if err := s.rocks.GetClient().RawSet(ctx, s.getSeqUserReadSeqKey(conversationID, userID), strconv.Itoa(int(seq)), s.readExpireTime); err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
} }
@ -109,12 +107,12 @@ func (s *seqUserCacheRedis) setUserRedisReadSeqs(ctx context.Context, userID str
keys = append(keys, key) keys = append(keys, key)
keySeq[key] = seq keySeq[key] = seq
} }
slotKeys, err := groupKeysBySlot(ctx, s.rdb, keys) slotKeys, err := groupKeysBySlot(ctx, s.rocks.GetRedis(), keys)
if err != nil { if err != nil {
return err return err
} }
for _, keys := range slotKeys { for _, keys := range slotKeys {
pipe := s.rdb.Pipeline() pipe := s.rocks.GetRedis().Pipeline()
for _, key := range keys { for _, key := range keys {
pipe.HSet(ctx, key, "value", strconv.FormatInt(keySeq[key], 10)) pipe.HSet(ctx, key, "value", strconv.FormatInt(keySeq[key], 10))
pipe.Expire(ctx, key, s.readExpireTime) pipe.Expire(ctx, key, s.readExpireTime)

View File

@ -1,26 +1,13 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package redis package redis
import ( import (
"context" "context"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/errs"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
"time"
) )
func NewThirdCache(rdb redis.UniversalClient) cache.ThirdCache { func NewThirdCache(rdb redis.UniversalClient) cache.ThirdCache {

View File

@ -0,0 +1,3 @@
package redis
// todo: token online third minio

View File

@ -1,30 +1,16 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package redis package redis
import ( import (
"context" "context"
"time"
"github.com/dtm-labs/rockscache" "github.com/dtm-labs/rockscache"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/log"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
"time"
) )
const ( const (
@ -38,19 +24,17 @@ type UserCacheRedis struct {
rdb redis.UniversalClient rdb redis.UniversalClient
userDB database.User userDB database.User
expireTime time.Duration expireTime time.Duration
rcClient *rockscache.Client rcClient *rocksCacheClient
} }
func NewUserCacheRedis(rdb redis.UniversalClient, localCache *config.LocalCache, userDB database.User, options *rockscache.Options) cache.UserCache { func NewUserCacheRedis(rdb redis.UniversalClient, localCache *config.LocalCache, userDB database.User, options *rockscache.Options) cache.UserCache {
batchHandler := NewBatchDeleterRedis(rdb, options, []string{localCache.User.Topic}) rc := newRocksCacheClient(rdb)
u := localCache.User
log.ZDebug(context.Background(), "user local cache init", "Topic", u.Topic, "SlotNum", u.SlotNum, "SlotSize", u.SlotSize, "enable", u.Enable())
return &UserCacheRedis{ return &UserCacheRedis{
BatchDeleter: batchHandler, BatchDeleter: rc.GetBatchDeleter(localCache.User.Topic),
rdb: rdb, rdb: rdb,
userDB: userDB, userDB: userDB,
expireTime: userExpireTime, expireTime: userExpireTime,
rcClient: rockscache.NewClient(rdb, *options), rcClient: rc,
} }
} }

View File

@ -140,7 +140,7 @@ func NewGroupDatabase(
groupMemberDB: groupMemberDB, groupMemberDB: groupMemberDB,
groupRequestDB: groupRequestDB, groupRequestDB: groupRequestDB,
ctxTx: ctxTx, ctxTx: ctxTx,
cache: redis2.NewGroupCacheRedis(rdb, localCache, groupDB, groupMemberDB, groupRequestDB, groupHash, redis2.GetRocksCacheOptions()), cache: redis2.NewGroupCacheRedis(rdb, localCache, groupDB, groupMemberDB, groupRequestDB, groupHash),
} }
} }