From ceb669dfb8ca68c5dfa295cf09b3c247dd5dfc2f Mon Sep 17 00:00:00 2001 From: xuexihuang <1339326187@qq.com> Date: Wed, 29 Nov 2023 10:41:11 +0800 Subject: [PATCH] Feature middleware (#1476) * fix:fix error values&logs * modify: add logs * feature:add redis io retry logic * feature:add redis error alert rule * test:for test alert * fix:fix prometheus rules * del:del test code --------- Co-authored-by: lin.huang --- config/instance-down-rules.yml | 13 +++- internal/msgtransfer/init.go | 7 +-- .../msgtransfer/online_history_msg_handler.go | 16 ++--- internal/rpc/msg/sync_msg.go | 12 +--- pkg/common/db/cache/msg.go | 31 +++++++++- pkg/common/db/controller/msg.go | 60 +++++++++---------- 6 files changed, 82 insertions(+), 57 deletions(-) diff --git a/config/instance-down-rules.yml b/config/instance-down-rules.yml index 72b1f5aa3..5541d2c54 100644 --- a/config/instance-down-rules.yml +++ b/config/instance-down-rules.yml @@ -8,4 +8,15 @@ groups: severity: critical annotations: summary: "Instance {{ $labels.instance }} down" - description: "{{ $labels.instance }} of job {{ $labels.job }} has been down for more than 1 minutes." \ No newline at end of file + description: "{{ $labels.instance }} of job {{ $labels.job }} has been down for more than 1 minutes." + + - name: database_insert_failure_alerts + rules: + - alert: DatabaseInsertFailed + expr: (increase(msg_insert_redis_failed_total[5m]) > 0) or (increase(msg_insert_mongo_failed_total[5m]) > 0) + for: 1m + labels: + severity: critical + annotations: + summary: "Increase in MsgInsertRedisFailedCounter or MsgInsertMongoFailedCounter detected" + description: "Either MsgInsertRedisFailedCounter or MsgInsertMongoFailedCounter has increased in the last 5 minutes, indicating failures in message insert operations to Redis or MongoDB,maybe the redis or mongodb is crash." diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 4ce015543..8436317ee 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -17,16 +17,15 @@ package msgtransfer import ( "errors" "fmt" - "log" - "net/http" - "sync" - "github.com/OpenIMSDK/tools/mw" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" "github.com/prometheus/client_golang/prometheus/promhttp" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "log" + "net/http" + "sync" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index b019b0120..eb8e500fe 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -252,7 +252,10 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification( return } log.ZDebug(ctx, "success to next topic", "conversationID", conversationID) - och.msgDatabase.MsgToMongoMQ(ctx, key, conversationID, storageList, lastSeq) + err = och.msgDatabase.MsgToMongoMQ(ctx, key, conversationID, storageList, lastSeq) + if err != nil { + log.ZError(ctx, "MsgToMongoMQ error", err) + } och.toPushTopic(ctx, key, conversationID, storageList) } } @@ -277,9 +280,6 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg( lastSeq, isNewConversation, err := och.msgDatabase.BatchInsertChat2Cache(ctx, conversationID, storageList) if err != nil && errs.Unwrap(err) != redis.Nil { log.ZError(ctx, "batch data insert to redis err", err, "storageMsgList", storageList) - och.singleMsgFailedCountMutex.Lock() - och.singleMsgFailedCount += uint64(len(storageList)) - och.singleMsgFailedCountMutex.Unlock() return } if isNewConversation { @@ -311,10 +311,10 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg( } log.ZDebug(ctx, "success incr to next topic") - och.singleMsgSuccessCountMutex.Lock() - och.singleMsgSuccessCount += uint64(len(storageList)) - och.singleMsgSuccessCountMutex.Unlock() - och.msgDatabase.MsgToMongoMQ(ctx, key, conversationID, storageList, lastSeq) + err = och.msgDatabase.MsgToMongoMQ(ctx, key, conversationID, storageList, lastSeq) + if err != nil { + log.ZError(ctx, "MsgToMongoMQ error", err) + } och.toPushTopic(ctx, key, conversationID, storageList) } } diff --git a/internal/rpc/msg/sync_msg.go b/internal/rpc/msg/sync_msg.go index 7c67ff05f..dbd8da4d8 100644 --- a/internal/rpc/msg/sync_msg.go +++ b/internal/rpc/msg/sync_msg.go @@ -42,15 +42,8 @@ func (m *msgServer) PullMessageBySeqs( log.ZError(ctx, "GetConversation error", err, "conversationID", seq.ConversationID) continue } - minSeq, maxSeq, msgs, err := m.MsgDatabase.GetMsgBySeqsRange( - ctx, - req.UserID, - seq.ConversationID, - seq.Begin, - seq.End, - seq.Num, - conversation.MaxSeq, - ) + minSeq, maxSeq, msgs, err := m.MsgDatabase.GetMsgBySeqsRange(ctx, req.UserID, seq.ConversationID, + seq.Begin, seq.End, seq.Num, conversation.MaxSeq) if err != nil { log.ZWarn(ctx, "GetMsgBySeqsRange error", err, "conversationID", seq.ConversationID, "seq", seq) continue @@ -64,7 +57,6 @@ func (m *msgServer) PullMessageBySeqs( } if len(msgs) == 0 { log.ZWarn(ctx, "not have msgs", nil, "conversationID", seq.ConversationID, "seq", seq) - continue } resp.Msgs[seq.ConversationID] = &sdkws.PullMsgs{Msgs: msgs, IsEnd: isEnd} diff --git a/pkg/common/db/cache/msg.go b/pkg/common/db/cache/msg.go index 282d1d1c1..f86b44d9b 100644 --- a/pkg/common/db/cache/msg.go +++ b/pkg/common/db/cache/msg.go @@ -173,7 +173,20 @@ func (c *msgCache) getSeqs(ctx context.Context, items []string, getkey func(s st } func (c *msgCache) SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error { - return c.setSeq(ctx, conversationID, maxSeq, c.getMaxSeqKey) + var retErr error + for { + select { + case <-ctx.Done(): + return errs.Wrap(retErr, "SetMaxSeq redis retry too many amount") + default: + retErr = c.setSeq(ctx, conversationID, maxSeq, c.getMaxSeqKey) + if retErr != nil { + time.Sleep(time.Second * 2) + continue + } + return nil + } + } } func (c *msgCache) GetMaxSeqs(ctx context.Context, conversationIDs []string) (m map[string]int64, err error) { @@ -181,7 +194,21 @@ func (c *msgCache) GetMaxSeqs(ctx context.Context, conversationIDs []string) (m } func (c *msgCache) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) { - return c.getSeq(ctx, conversationID, c.getMaxSeqKey) + var retErr error + var retData int64 + for { + select { + case <-ctx.Done(): + return -1, errs.Wrap(retErr, "GetMaxSeq redis retry too many amount") + default: + retData, retErr = c.getSeq(ctx, conversationID, c.getMaxSeqKey) + if retErr != nil && errs.Unwrap(retErr) != redis.Nil { + time.Sleep(time.Second * 2) + continue + } + return retData, retErr + } + } } func (c *msgCache) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error { diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index fb0a9c702..cba0a6bbd 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -357,7 +357,9 @@ func (db *commonMsgDatabase) DelUserDeleteMsgsList(ctx context.Context, conversa } func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, err error) { - currentMaxSeq, err := db.cache.GetMaxSeq(ctx, conversationID) + cancelCtx, cancel := context.WithTimeout(ctx, 1*time.Minute) + defer cancel() + currentMaxSeq, err := db.cache.GetMaxSeq(cancelCtx, conversationID) if err != nil && errs.Unwrap(err) != redis.Nil { log.ZError(ctx, "db.cache.GetMaxSeq", err) return 0, false, err @@ -384,19 +386,21 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa prommetrics.MsgInsertRedisFailedCounter.Add(float64(failedNum)) log.ZError(ctx, "setMessageToCache error", err, "len", len(msgs), "conversationID", conversationID) } else { - prommetrics.MsgInsertRedisSuccessCounter.Inc() + prommetrics.MsgInsertRedisSuccessCounter.Add(float64(len(msgs))) } - err = db.cache.SetMaxSeq(ctx, conversationID, currentMaxSeq) + cancelCtx, cancel = context.WithTimeout(ctx, 1*time.Minute) + defer cancel() + err = db.cache.SetMaxSeq(cancelCtx, conversationID, currentMaxSeq) if err != nil { log.ZError(ctx, "db.cache.SetMaxSeq error", err, "conversationID", conversationID) prommetrics.SeqSetFailedCounter.Inc() } err2 := db.cache.SetHasReadSeqs(ctx, conversationID, userSeqMap) - if err != nil { + if err2 != nil { log.ZError(ctx, "SetHasReadSeqs error", err2, "userSeqMap", userSeqMap, "conversationID", conversationID) prommetrics.SeqSetFailedCounter.Inc() } - return lastMaxSeq, isNew, utils.Wrap(err, "") + return lastMaxSeq, isNew, errs.Wrap(err, "redis SetMaxSeq error") } func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversationID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) { @@ -654,16 +658,26 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) (int64, int64, []*sdkws.MsgData, error) { userMinSeq, err := db.cache.GetConversationUserMinSeq(ctx, conversationID, userID) - if err != nil && errs.Unwrap(err) != redis.Nil { - return 0, 0, nil, err + if err != nil { + log.ZError(ctx, "cache.GetConversationUserMinSeq error", err) + if errs.Unwrap(err) != redis.Nil { + return 0, 0, nil, err + } } minSeq, err := db.cache.GetMinSeq(ctx, conversationID) - if err != nil && errs.Unwrap(err) != redis.Nil { - return 0, 0, nil, err + if err != nil { + log.ZError(ctx, "cache.GetMinSeq error", err) + if errs.Unwrap(err) != redis.Nil { + return 0, 0, nil, err + } } maxSeq, err := db.cache.GetMaxSeq(ctx, conversationID) - if err != nil && errs.Unwrap(err) != redis.Nil { - return 0, 0, nil, err + if err != nil { + log.ZError(ctx, "cache.GetMaxSeq error", err) + if errs.Unwrap(err) != redis.Nil { + return 0, 0, nil, err + } + } if userMinSeq < minSeq { minSeq = userMinSeq @@ -676,34 +690,16 @@ func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, co } successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, conversationID, newSeqs) if err != nil { - if err != redis.Nil { - log.ZError(ctx, "get message from redis exception", err, "failedSeqs", failedSeqs, "conversationID", conversationID) - } + log.ZError(ctx, "get message from redis exception", err, "failedSeqs", failedSeqs, "conversationID", conversationID) } - log.ZInfo( - ctx, - "db.cache.GetMessagesBySeq", - "userID", - userID, - "conversationID", - conversationID, - "seqs", - seqs, - "successMsgs", - len(successMsgs), - "failedSeqs", - failedSeqs, - "conversationID", - conversationID, - ) + log.ZInfo(ctx, "db.cache.GetMessagesBySeq", "userID", userID, "conversationID", conversationID, "seqs", seqs, "successMsgs", + len(successMsgs), "failedSeqs", failedSeqs, "conversationID", conversationID) if len(failedSeqs) > 0 { mongoMsgs, err := db.getMsgBySeqs(ctx, userID, conversationID, failedSeqs) if err != nil { - return 0, 0, nil, err } - successMsgs = append(successMsgs, mongoMsgs...) } return minSeq, maxSeq, successMsgs, nil