diff --git a/internal/msg_transfer/logic/online_msg_to_mongo_handler.go b/internal/msg_transfer/logic/online_msg_to_mongo_handler.go index 58575f849..f08b59965 100644 --- a/internal/msg_transfer/logic/online_msg_to_mongo_handler.go +++ b/internal/msg_transfer/logic/online_msg_to_mongo_handler.go @@ -34,6 +34,7 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(cMsg *sarama.Con log.Error("msg_transfer Unmarshal msg err", "", "msg", string(msg), "err", err.Error()) return } + log.Info(msgFromMQ.TriggerID, "BatchInsertChat2DB userID: ", msgFromMQ.AggregationID, "msgFromMQ.LastSeq: ", msgFromMQ.LastSeq) err = db.DB.BatchInsertChat2DB(msgFromMQ.AggregationID, msgFromMQ.MessageList, msgFromMQ.TriggerID, msgFromMQ.LastSeq) if err != nil { log.NewError(msgFromMQ.TriggerID, "single data insert to mongo err", err.Error(), msgFromMQ.MessageList, msgFromMQ.AggregationID, msgFromMQ.TriggerID) diff --git a/pkg/common/db/batch_insert_chat.go b/pkg/common/db/batch_insert_chat.go index f2920813d..16096f9bb 100644 --- a/pkg/common/db/batch_insert_chat.go +++ b/pkg/common/db/batch_insert_chat.go @@ -26,12 +26,15 @@ func (d *DataBases) BatchInsertChat2DB(userID string, msgList []*pbMsg.MsgDataTo isInit := false var remain uint64 blk0 := uint64(GetSingleGocMsgNum() - 1) + //currentMaxSeq 4998 if currentMaxSeq < uint64(GetSingleGocMsgNum()) { - remain = blk0 - currentMaxSeq + remain = blk0 - currentMaxSeq //1 } else { - excludeBlk0 := currentMaxSeq - blk0 + excludeBlk0 := currentMaxSeq - blk0 //=1 + //(5000-1)%5000 == 4999 remain = (uint64(GetSingleGocMsgNum()) - (excludeBlk0 % uint64(GetSingleGocMsgNum()))) % uint64(GetSingleGocMsgNum()) } + //remain=1 insertCounter := uint64(0) msgListToMongo := make([]MsgInfo, 0) msgListToMongoNext := make([]MsgInfo, 0) @@ -45,6 +48,7 @@ func (d *DataBases) BatchInsertChat2DB(userID string, msgList []*pbMsg.MsgDataTo sMsg := MsgInfo{} sMsg.SendTime = m.MsgData.SendTime m.MsgData.Seq = uint32(currentMaxSeq) + log.Debug(operationID, "mongo msg node ", m.String(), m.MsgData.ClientMsgID, "userID: ", userID, "seq: ", currentMaxSeq) if sMsg.Msg, err = proto.Marshal(m.MsgData); err != nil { return utils.Wrap(err, "") } @@ -58,11 +62,11 @@ func (d *DataBases) BatchInsertChat2DB(userID string, msgList []*pbMsg.MsgDataTo msgListToMongo = append(msgListToMongo, sMsg) insertCounter++ seqUid = getSeqUid(userID, uint32(currentMaxSeq)) - log.Debug(operationID, "msgListToMongo ", seqUid, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain) + log.Debug(operationID, "msgListToMongo ", seqUid, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain, "userID: ", userID) } else { msgListToMongoNext = append(msgListToMongoNext, sMsg) seqUidNext = getSeqUid(userID, uint32(currentMaxSeq)) - log.Debug(operationID, "msgListToMongoNext ", seqUidNext, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain) + log.Debug(operationID, "msgListToMongoNext ", seqUidNext, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain, "userID: ", userID) } } @@ -71,7 +75,7 @@ func (d *DataBases) BatchInsertChat2DB(userID string, msgList []*pbMsg.MsgDataTo if seqUid != "" { filter := bson.M{"uid": seqUid} - log.NewDebug(operationID, "filter ", seqUid, "list ", msgListToMongo) + log.NewDebug(operationID, "filter ", seqUid, "list ", msgListToMongo, "userID: ", userID) err := c.FindOneAndUpdate(ctx, filter, bson.M{"$push": bson.M{"msg": bson.M{"$each": msgListToMongo}}}).Err() if err != nil { if err == mongo.ErrNoDocuments { @@ -95,7 +99,7 @@ func (d *DataBases) BatchInsertChat2DB(userID string, msgList []*pbMsg.MsgDataTo sChat := UserChat{} sChat.UID = seqUidNext sChat.Msg = msgListToMongoNext - log.NewDebug(operationID, "filter ", seqUidNext, "list ", msgListToMongoNext) + log.NewDebug(operationID, "filter ", seqUidNext, "list ", msgListToMongoNext, "userID: ", userID) if _, err = c.InsertOne(ctx, &sChat); err != nil { log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat) return utils.Wrap(err, "") @@ -119,8 +123,10 @@ func (d *DataBases) BatchInsertChat2Cache(insertID string, msgList []*pbMsg.MsgD var err error if msgList[0].MsgData.SessionType == constant.SuperGroupChatType { currentMaxSeq, err = d.GetGroupMaxSeq(insertID) + log.Debug(operationID, "constant.SuperGroupChatType lastMaxSeq before add ", currentMaxSeq, "userID ", insertID, err) } else { currentMaxSeq, err = d.GetUserMaxSeq(insertID) + log.Debug(operationID, "constant.SingleChatType lastMaxSeq before add ", currentMaxSeq, "userID ", insertID, err) } if err != nil && err != go_redis.Nil { return utils.Wrap(err, ""), 0 @@ -128,11 +134,12 @@ func (d *DataBases) BatchInsertChat2Cache(insertID string, msgList []*pbMsg.MsgD lastMaxSeq := currentMaxSeq for _, m := range msgList { - log.Debug(operationID, "msg node ", m.String(), m.MsgData.ClientMsgID) + currentMaxSeq++ sMsg := MsgInfo{} sMsg.SendTime = m.MsgData.SendTime m.MsgData.Seq = uint32(currentMaxSeq) + log.Debug(operationID, "cache msg node ", m.String(), m.MsgData.ClientMsgID, "userID: ", insertID, "seq: ", currentMaxSeq) } log.Debug(operationID, "SetMessageToCache ", insertID, len(msgList)) err = d.SetMessageToCache(msgList, insertID, operationID)