From 4cb56a132653ffa10b05f9f5ec57b13545c48d72 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Mon, 23 Sep 2024 18:39:23 +0800 Subject: [PATCH] fix: redis support acquisition time --- .../storage/cache/redis/seq_conversation.go | 44 ++++++++++++++----- .../cache/redis/seq_conversation_test.go | 14 +++++- 2 files changed, 44 insertions(+), 14 deletions(-) diff --git a/pkg/common/storage/cache/redis/seq_conversation.go b/pkg/common/storage/cache/redis/seq_conversation.go index 7fe849193..1ba565b47 100644 --- a/pkg/common/storage/cache/redis/seq_conversation.go +++ b/pkg/common/storage/cache/redis/seq_conversation.go @@ -169,6 +169,7 @@ local key = KEYS[1] local size = tonumber(ARGV[1]) local lockSecond = ARGV[2] local dataSecond = ARGV[3] +local mallocTime = ARGV[4] local result = {} if redis.call("EXISTS", key) == 0 then local lockValue = math.random(0, 999999999) @@ -189,6 +190,12 @@ if size == 0 then table.insert(result, 0) table.insert(result, curr_seq) table.insert(result, last_seq) + local setTime = redis.call("HGET", key, "TIME") + if setTime then + table.insert(result, setTime) + else + table.insert(result, 0) + end return result end local max_seq = curr_seq + size @@ -196,21 +203,25 @@ if max_seq > last_seq then local lockValue = math.random(0, 999999999) redis.call("HSET", key, "LOCK", lockValue) redis.call("HSET", key, "CURR", last_seq) + redis.call("HSET", key, "TIME", mallocTime) redis.call("EXPIRE", key, lockSecond) table.insert(result, 3) table.insert(result, curr_seq) table.insert(result, last_seq) table.insert(result, lockValue) + table.insert(result, mallocTime) return result end redis.call("HSET", key, "CURR", max_seq) +redis.call("HSET", key, "TIME", ARGV[4]) redis.call("EXPIRE", key, dataSecond) table.insert(result, 0) table.insert(result, curr_seq) table.insert(result, last_seq) +table.insert(result, mallocTime) return result ` - result, err := s.rdb.Eval(ctx, script, []string{key}, size, int64(s.lockTime/time.Second), int64(s.dataTime/time.Second)).Int64Slice() + result, err := s.rdb.Eval(ctx, script, []string{key}, size, int64(s.lockTime/time.Second), int64(s.dataTime/time.Second), time.Now().UnixMilli()).Int64Slice() if err != nil { return nil, errs.Wrap(err) } @@ -267,29 +278,34 @@ func (s *seqConversationCacheRedis) getMallocSize(conversationID string, size in } func (s *seqConversationCacheRedis) Malloc(ctx context.Context, conversationID string, size int64) (int64, error) { + seq, _, err := s.mallocTime(ctx, conversationID, size) + return seq, err +} + +func (s *seqConversationCacheRedis) mallocTime(ctx context.Context, conversationID string, size int64) (int64, int64, error) { if size < 0 { - return 0, errs.New("size must be greater than 0") + return 0, 0, errs.New("size must be greater than 0") } key := s.getSeqMallocKey(conversationID) for i := 0; i < 10; i++ { states, err := s.malloc(ctx, key, size) if err != nil { - return 0, err + return 0, 0, err } switch states[0] { case 0: // success - return states[1], nil + return states[1], states[3], nil case 1: // not found mallocSize := s.getMallocSize(conversationID, size) seq, err := s.mgo.Malloc(ctx, conversationID, mallocSize) if err != nil { - return 0, err + return 0, 0, err } s.setSeqRetry(ctx, key, states[1], seq+size, seq+mallocSize) - return seq, nil + return seq, 0, nil case 2: // locked if err := s.wait(ctx); err != nil { - return 0, err + return 0, 0, err } continue case 3: // exceeded cache max value @@ -298,23 +314,23 @@ func (s *seqConversationCacheRedis) Malloc(ctx context.Context, conversationID s mallocSize := s.getMallocSize(conversationID, size) seq, err := s.mgo.Malloc(ctx, conversationID, mallocSize) if err != nil { - return 0, err + return 0, 0, err } if lastSeq == seq { s.setSeqRetry(ctx, key, states[3], currSeq+size, seq+mallocSize) - return currSeq, nil + return currSeq, 0, nil } else { log.ZWarn(ctx, "malloc seq not equal cache last seq", nil, "conversationID", conversationID, "currSeq", currSeq, "lastSeq", lastSeq, "mallocSeq", seq) s.setSeqRetry(ctx, key, states[3], seq+size, seq+mallocSize) - return seq, nil + return seq, 0, nil } default: log.ZError(ctx, "malloc seq unknown state", nil, "state", states[0], "conversationID", conversationID, "size", size) - return 0, errs.New(fmt.Sprintf("unknown state: %d", states[0])) + return 0, 0, errs.New(fmt.Sprintf("unknown state: %d", states[0])) } } log.ZError(ctx, "malloc seq retrying still failed", nil, "conversationID", conversationID, "size", size) - return 0, errs.New("malloc seq waiting for lock timeout", "conversationID", conversationID, "size", size) + return 0, 0, errs.New("malloc seq waiting for lock timeout", "conversationID", conversationID, "size", size) } func (s *seqConversationCacheRedis) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) { @@ -331,3 +347,7 @@ func (s *seqConversationCacheRedis) SetMinSeqs(ctx context.Context, seqs map[str } return DeleteCacheBySlot(ctx, s.rocks, keys) } + +func (s *seqConversationCacheRedis) GetMaxSeqWithTime(ctx context.Context, conversationID string) (int64, int64, error) { + return s.mallocTime(ctx, conversationID, 0) +} diff --git a/pkg/common/storage/cache/redis/seq_conversation_test.go b/pkg/common/storage/cache/redis/seq_conversation_test.go index 1a40624b8..56898e5f7 100644 --- a/pkg/common/storage/cache/redis/seq_conversation_test.go +++ b/pkg/common/storage/cache/redis/seq_conversation_test.go @@ -14,7 +14,7 @@ import ( ) func newTestSeq() *seqConversationCacheRedis { - mgocli, err := mongo.Connect(context.Background(), options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.48:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second)) + mgocli, err := mongo.Connect(context.Background(), options.Client().ApplyURI("mongodb://openIM:openIM123@127.0.0.1:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second)) if err != nil { panic(err) } @@ -23,7 +23,7 @@ func newTestSeq() *seqConversationCacheRedis { panic(err) } opt := &redis.Options{ - Addr: "172.16.8.48:16379", + Addr: "127.0.0.1:16379", Password: "openIM123", DB: 1, } @@ -107,3 +107,13 @@ func TestMinSeq(t *testing.T) { ts := newTestSeq() t.Log(ts.GetMinSeq(context.Background(), "10000000")) } + +func TestMalloc(t *testing.T) { + ts := newTestSeq() + t.Log(ts.Malloc(context.Background(), "10000000", 100)) +} + +func TestGetMaxSeqWithTime(t *testing.T) { + ts := newTestSeq() + t.Log(ts.GetMaxSeqWithTime(context.Background(), "10000000")) +}