diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 357cd0938..570b38049 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -2,6 +2,7 @@ package msgtransfer import ( "context" + "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" "sync" "time" @@ -164,7 +165,7 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, con if len(storageList) > 0 { lastSeq, isNewConversation, err := och.msgDatabase.BatchInsertChat2Cache(ctx, conversationID, storageList) - if err != nil && err != redis.Nil { + if err != nil && errs.Unwrap(err) != redis.Nil { log.ZError(ctx, "batch data insert to redis err", err, "storageMsgList", storageList) och.singleMsgFailedCountMutex.Lock() och.singleMsgFailedCount += uint64(len(storageList)) diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index 0443f4ab9..c9e55734b 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -272,7 +272,7 @@ func (m *msgServer) modifyMessageByUserMessageReceiveOpt(ctx context.Context, us // conversationID := utils.GetConversationIDBySessionType(conversationID, sessionType) singleOpt, err := m.Conversation.GetSingleConversationRecvMsgOpt(ctx, userID, conversationID) if errs.ErrRecordNotFound.Is(err) { - return false, nil + return true, nil } else if err != nil { return false, err } @@ -326,7 +326,7 @@ func (m *msgServer) sendMsgToGroupOptimization(ctx context.Context, list []strin req.MsgData.Options = options conversationID := utils.GetConversationIDBySessionType(constant.GroupChatType, req.MsgData.GroupID) isSend, err := m.modifyMessageByUserMessageReceiveOpt(ctx, v, conversationID, constant.GroupChatType, req) - if err != nil && (!errs.ErrRecordNotFound.Is(err)) { + if err != nil { return err } if isSend { diff --git a/pkg/common/db/controller/common_msg.go b/pkg/common/db/controller/common_msg.go index 3a567202e..aafe51a85 100644 --- a/pkg/common/db/controller/common_msg.go +++ b/pkg/common/db/controller/common_msg.go @@ -126,8 +126,9 @@ func (db *commonMsgDatabase) MsgToPushMQ(ctx context.Context, conversationID str partition, offset, err := db.producerToPush.SendMessage(ctx, conversationID, &pbMsg.PushMsgDataToMQ{MsgData: msg2mq, ConversationID: conversationID}) if err != nil { log.ZError(ctx, "MsgToPushMQ", err, "key", conversationID, "msg2mq", msg2mq) + return 0, 0, err } - return partition, offset, err + return partition, offset, nil } func (db *commonMsgDatabase) MsgToMongoMQ(ctx context.Context, conversationID string, messages []*sdkws.MsgData, lastSeq int64) error { @@ -218,7 +219,7 @@ func (db *commonMsgDatabase) DeleteMessageFromCache(ctx context.Context, convers func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, err error) { currentMaxSeq, err := db.cache.GetMaxSeq(ctx, conversationID) - if err != nil && err != redis.Nil { + if err != nil && errs.Unwrap(err) != redis.Nil { prome.Inc(prome.SeqGetFailedCounter) return 0, false, err } @@ -230,7 +231,7 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa if lenList < 1 { return 0, false, errors.New("too short as 0") } - if err == redis.Nil { + if errs.Unwrap(err) == redis.Nil { isNew = true } lastMaxSeq := currentMaxSeq