mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-06 04:15:46 +08:00
perf: redis block with keys command (#1423)
Signed-off-by: rfyiamcool <rfyiamcool@163.com>
This commit is contained in:
parent
1f7dfa33d7
commit
403cfb6055
36
pkg/common/db/cache/msg.go
vendored
36
pkg/common/db/cache/msg.go
vendored
@ -645,19 +645,35 @@ func (c *msgCache) PipeDeleteMessages(ctx context.Context, conversationID string
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *msgCache) CleanUpOneConversationAllMsg(ctx context.Context, conversationID string) error {
|
func (c *msgCache) CleanUpOneConversationAllMsg(ctx context.Context, conversationID string) error {
|
||||||
vals, err := c.rdb.Keys(ctx, c.allMessageCacheKey(conversationID)).Result()
|
var (
|
||||||
if errors.Is(err, redis.Nil) {
|
cursor uint64
|
||||||
return nil
|
keys []string
|
||||||
}
|
err error
|
||||||
if err != nil {
|
|
||||||
return errs.Wrap(err)
|
key = c.allMessageCacheKey(conversationID)
|
||||||
}
|
)
|
||||||
for _, v := range vals {
|
|
||||||
if err := c.rdb.Del(ctx, v).Err(); err != nil {
|
for {
|
||||||
|
// scan up to 10000 at a time, the count (10000) param refers to the number of scans on redis server.
|
||||||
|
// if the count is too small, needs to be run scan on redis frequently.
|
||||||
|
var limit int64 = 10000
|
||||||
|
keys, cursor, err = c.rdb.Scan(ctx, cursor, key, limit).Result()
|
||||||
|
if err != nil {
|
||||||
return errs.Wrap(err)
|
return errs.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, key := range keys {
|
||||||
|
err := c.rdb.Del(ctx, key).Err()
|
||||||
|
if err != nil {
|
||||||
|
return errs.Wrap(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// scan end
|
||||||
|
if cursor == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *msgCache) DelMsgFromCache(ctx context.Context, userID string, seqs []int64) error {
|
func (c *msgCache) DelMsgFromCache(ctx context.Context, userID string, seqs []int64) error {
|
||||||
|
47
pkg/common/db/cache/msg_test.go
vendored
47
pkg/common/db/cache/msg_test.go
vendored
@ -385,3 +385,50 @@ func testParallelDeleteMessagesMix(t *testing.T, cid string, seqs []int64, input
|
|||||||
assert.EqualValues(t, 1, val) // exists
|
assert.EqualValues(t, 1, val) // exists
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCleanUpOneConversationAllMsg(t *testing.T) {
|
||||||
|
rdb := redis.NewClient(&redis.Options{})
|
||||||
|
defer rdb.Close()
|
||||||
|
|
||||||
|
cacher := msgCache{rdb: rdb}
|
||||||
|
count := 1000
|
||||||
|
prefix := fmt.Sprintf("%v", rand.Int63())
|
||||||
|
|
||||||
|
ids := []string{}
|
||||||
|
for i := 0; i < count; i++ {
|
||||||
|
id := fmt.Sprintf("%v-cid-%v", prefix, rand.Int63())
|
||||||
|
ids = append(ids, id)
|
||||||
|
|
||||||
|
key := cacher.allMessageCacheKey(id)
|
||||||
|
rdb.Set(context.Background(), key, "openim", 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
// delete 100 keys with scan.
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
pickedKey := ids[i]
|
||||||
|
err := cacher.CleanUpOneConversationAllMsg(context.Background(), pickedKey)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
|
||||||
|
ls, err := rdb.Keys(context.Background(), pickedKey).Result()
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, 0, len(ls))
|
||||||
|
|
||||||
|
rcode, err := rdb.Exists(context.Background(), pickedKey).Result()
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.EqualValues(t, 0, rcode) // non-exists
|
||||||
|
}
|
||||||
|
|
||||||
|
sid := fmt.Sprintf("%v-cid-*", prefix)
|
||||||
|
ls, err := rdb.Keys(context.Background(), cacher.allMessageCacheKey(sid)).Result()
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, count-100, len(ls))
|
||||||
|
|
||||||
|
// delete fuzzy matching keys.
|
||||||
|
err = cacher.CleanUpOneConversationAllMsg(context.Background(), sid)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
|
||||||
|
// don't contains keys matched `{prefix}-cid-{random}` on redis
|
||||||
|
ls, err = rdb.Keys(context.Background(), cacher.allMessageCacheKey(sid)).Result()
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, 0, len(ls))
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user