diff --git a/internal/msg_transfer/logic/online_msg_to_mongo_handler.go b/internal/msg_transfer/logic/online_msg_to_mongo_handler.go index 42efdc428..58575f849 100644 --- a/internal/msg_transfer/logic/online_msg_to_mongo_handler.go +++ b/internal/msg_transfer/logic/online_msg_to_mongo_handler.go @@ -36,7 +36,12 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(cMsg *sarama.Con } err = db.DB.BatchInsertChat2DB(msgFromMQ.AggregationID, msgFromMQ.MessageList, msgFromMQ.TriggerID, msgFromMQ.LastSeq) if err != nil { - log.NewError(msgFromMQ.TriggerID, "single data insert to mongo err", err.Error(), msgFromMQ.MessageList) + log.NewError(msgFromMQ.TriggerID, "single data insert to mongo err", err.Error(), msgFromMQ.MessageList, msgFromMQ.AggregationID, msgFromMQ.TriggerID) + } else { + err = db.DB.DeleteMessageFromCache(msgFromMQ.MessageList, msgFromMQ.AggregationID, msgFromMQ.GetTriggerID()) + if err != nil { + log.NewError(msgFromMQ.TriggerID, "remove cache msg from redis err", err.Error(), msgFromMQ.MessageList, msgFromMQ.AggregationID, msgFromMQ.TriggerID) + } } for _, v := range msgFromMQ.MessageList { if v.MsgData.ContentType == constant.DeleteMessageNotification { diff --git a/pkg/common/db/RedisModel.go b/pkg/common/db/RedisModel.go index 217c4c5c5..163202270 100644 --- a/pkg/common/db/RedisModel.go +++ b/pkg/common/db/RedisModel.go @@ -221,6 +221,19 @@ func (d *DataBases) SetMessageToCache(msgList []*pbChat.MsgDataToMQ, uid string, _, err := pipe.Exec(ctx) return err } +func (d *DataBases) DeleteMessageFromCache(msgList []*pbChat.MsgDataToMQ, uid string, operationID string) error { + ctx := context.Background() + var keys []string + for _, msg := range msgList { + key := messageCache + uid + "_" + strconv.Itoa(int(msg.MsgData.Seq)) + keys = append(keys, key) + } + err := d.RDB.Del(ctx, keys...).Err() + if err != nil { + log2.NewWarn(operationID, utils.GetSelfFuncName(), "redis failed", "args:", keys, uid, err.Error(), msgList) + } + return err +} func (d *DataBases) CleanUpOneUserAllMsgFromRedis(userID string, operationID string) error { ctx := context.Background()