mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-12-04 19:45:41 +08:00
feature:add redis io retry logic
This commit is contained in:
parent
5a9adf4654
commit
c5c04c4878
31
pkg/common/db/cache/msg.go
vendored
31
pkg/common/db/cache/msg.go
vendored
@ -173,7 +173,20 @@ func (c *msgCache) getSeqs(ctx context.Context, items []string, getkey func(s st
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *msgCache) SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error {
|
func (c *msgCache) SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error {
|
||||||
return c.setSeq(ctx, conversationID, maxSeq, c.getMaxSeqKey)
|
var retErr error
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return errs.Wrap(retErr, "SetMaxSeq redis retry too many amount")
|
||||||
|
default:
|
||||||
|
retErr = c.setSeq(ctx, conversationID, maxSeq, c.getMaxSeqKey)
|
||||||
|
if retErr != nil {
|
||||||
|
time.Sleep(time.Second * 2)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *msgCache) GetMaxSeqs(ctx context.Context, conversationIDs []string) (m map[string]int64, err error) {
|
func (c *msgCache) GetMaxSeqs(ctx context.Context, conversationIDs []string) (m map[string]int64, err error) {
|
||||||
@ -181,7 +194,21 @@ func (c *msgCache) GetMaxSeqs(ctx context.Context, conversationIDs []string) (m
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *msgCache) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) {
|
func (c *msgCache) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) {
|
||||||
return c.getSeq(ctx, conversationID, c.getMaxSeqKey)
|
var retErr error
|
||||||
|
var retData int64
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return -1, errs.Wrap(retErr, "GetMaxSeq redis retry too many amount")
|
||||||
|
default:
|
||||||
|
retData, retErr = c.getSeq(ctx, conversationID, c.getMaxSeqKey)
|
||||||
|
if retErr != nil && errs.Unwrap(retErr) != redis.Nil {
|
||||||
|
time.Sleep(time.Second * 2)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return retData, retErr
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *msgCache) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error {
|
func (c *msgCache) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error {
|
||||||
|
|||||||
@ -357,7 +357,9 @@ func (db *commonMsgDatabase) DelUserDeleteMsgsList(ctx context.Context, conversa
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, err error) {
|
func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, err error) {
|
||||||
currentMaxSeq, err := db.cache.GetMaxSeq(ctx, conversationID)
|
cancelCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
|
||||||
|
defer cancel()
|
||||||
|
currentMaxSeq, err := db.cache.GetMaxSeq(cancelCtx, conversationID)
|
||||||
if err != nil && errs.Unwrap(err) != redis.Nil {
|
if err != nil && errs.Unwrap(err) != redis.Nil {
|
||||||
log.ZError(ctx, "db.cache.GetMaxSeq", err)
|
log.ZError(ctx, "db.cache.GetMaxSeq", err)
|
||||||
return 0, false, err
|
return 0, false, err
|
||||||
@ -386,7 +388,9 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa
|
|||||||
} else {
|
} else {
|
||||||
prommetrics.MsgInsertRedisSuccessCounter.Add(float64(len(msgs)))
|
prommetrics.MsgInsertRedisSuccessCounter.Add(float64(len(msgs)))
|
||||||
}
|
}
|
||||||
err = db.cache.SetMaxSeq(ctx, conversationID, currentMaxSeq)
|
cancelCtx, cancel = context.WithTimeout(ctx, 1*time.Minute)
|
||||||
|
defer cancel()
|
||||||
|
err = db.cache.SetMaxSeq(cancelCtx, conversationID, currentMaxSeq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.ZError(ctx, "db.cache.SetMaxSeq error", err, "conversationID", conversationID)
|
log.ZError(ctx, "db.cache.SetMaxSeq error", err, "conversationID", conversationID)
|
||||||
prommetrics.SeqSetFailedCounter.Inc()
|
prommetrics.SeqSetFailedCounter.Inc()
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user