mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-06 04:15:46 +08:00
Merge branch 'v2.3.0release' of github.com:OpenIMSDK/Open-IM-Server into v2.3.0release
This commit is contained in:
commit
5a058d9b97
@ -34,6 +34,7 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(cMsg *sarama.Con
|
|||||||
log.Error("msg_transfer Unmarshal msg err", "", "msg", string(msg), "err", err.Error())
|
log.Error("msg_transfer Unmarshal msg err", "", "msg", string(msg), "err", err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
log.Info(msgFromMQ.TriggerID, "BatchInsertChat2DB userID: ", msgFromMQ.AggregationID, "msgFromMQ.LastSeq: ", msgFromMQ.LastSeq)
|
||||||
err = db.DB.BatchInsertChat2DB(msgFromMQ.AggregationID, msgFromMQ.MessageList, msgFromMQ.TriggerID, msgFromMQ.LastSeq)
|
err = db.DB.BatchInsertChat2DB(msgFromMQ.AggregationID, msgFromMQ.MessageList, msgFromMQ.TriggerID, msgFromMQ.LastSeq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.NewError(msgFromMQ.TriggerID, "single data insert to mongo err", err.Error(), msgFromMQ.MessageList, msgFromMQ.AggregationID, msgFromMQ.TriggerID)
|
log.NewError(msgFromMQ.TriggerID, "single data insert to mongo err", err.Error(), msgFromMQ.MessageList, msgFromMQ.AggregationID, msgFromMQ.TriggerID)
|
||||||
|
@ -26,12 +26,15 @@ func (d *DataBases) BatchInsertChat2DB(userID string, msgList []*pbMsg.MsgDataTo
|
|||||||
isInit := false
|
isInit := false
|
||||||
var remain uint64
|
var remain uint64
|
||||||
blk0 := uint64(GetSingleGocMsgNum() - 1)
|
blk0 := uint64(GetSingleGocMsgNum() - 1)
|
||||||
|
//currentMaxSeq 4998
|
||||||
if currentMaxSeq < uint64(GetSingleGocMsgNum()) {
|
if currentMaxSeq < uint64(GetSingleGocMsgNum()) {
|
||||||
remain = blk0 - currentMaxSeq
|
remain = blk0 - currentMaxSeq //1
|
||||||
} else {
|
} else {
|
||||||
excludeBlk0 := currentMaxSeq - blk0
|
excludeBlk0 := currentMaxSeq - blk0 //=1
|
||||||
|
//(5000-1)%5000 == 4999
|
||||||
remain = (uint64(GetSingleGocMsgNum()) - (excludeBlk0 % uint64(GetSingleGocMsgNum()))) % uint64(GetSingleGocMsgNum())
|
remain = (uint64(GetSingleGocMsgNum()) - (excludeBlk0 % uint64(GetSingleGocMsgNum()))) % uint64(GetSingleGocMsgNum())
|
||||||
}
|
}
|
||||||
|
//remain=1
|
||||||
insertCounter := uint64(0)
|
insertCounter := uint64(0)
|
||||||
msgListToMongo := make([]MsgInfo, 0)
|
msgListToMongo := make([]MsgInfo, 0)
|
||||||
msgListToMongoNext := make([]MsgInfo, 0)
|
msgListToMongoNext := make([]MsgInfo, 0)
|
||||||
@ -45,6 +48,7 @@ func (d *DataBases) BatchInsertChat2DB(userID string, msgList []*pbMsg.MsgDataTo
|
|||||||
sMsg := MsgInfo{}
|
sMsg := MsgInfo{}
|
||||||
sMsg.SendTime = m.MsgData.SendTime
|
sMsg.SendTime = m.MsgData.SendTime
|
||||||
m.MsgData.Seq = uint32(currentMaxSeq)
|
m.MsgData.Seq = uint32(currentMaxSeq)
|
||||||
|
log.Debug(operationID, "mongo msg node ", m.String(), m.MsgData.ClientMsgID, "userID: ", userID, "seq: ", currentMaxSeq)
|
||||||
if sMsg.Msg, err = proto.Marshal(m.MsgData); err != nil {
|
if sMsg.Msg, err = proto.Marshal(m.MsgData); err != nil {
|
||||||
return utils.Wrap(err, "")
|
return utils.Wrap(err, "")
|
||||||
}
|
}
|
||||||
@ -58,11 +62,11 @@ func (d *DataBases) BatchInsertChat2DB(userID string, msgList []*pbMsg.MsgDataTo
|
|||||||
msgListToMongo = append(msgListToMongo, sMsg)
|
msgListToMongo = append(msgListToMongo, sMsg)
|
||||||
insertCounter++
|
insertCounter++
|
||||||
seqUid = getSeqUid(userID, uint32(currentMaxSeq))
|
seqUid = getSeqUid(userID, uint32(currentMaxSeq))
|
||||||
log.Debug(operationID, "msgListToMongo ", seqUid, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain)
|
log.Debug(operationID, "msgListToMongo ", seqUid, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain, "userID: ", userID)
|
||||||
} else {
|
} else {
|
||||||
msgListToMongoNext = append(msgListToMongoNext, sMsg)
|
msgListToMongoNext = append(msgListToMongoNext, sMsg)
|
||||||
seqUidNext = getSeqUid(userID, uint32(currentMaxSeq))
|
seqUidNext = getSeqUid(userID, uint32(currentMaxSeq))
|
||||||
log.Debug(operationID, "msgListToMongoNext ", seqUidNext, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain)
|
log.Debug(operationID, "msgListToMongoNext ", seqUidNext, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain, "userID: ", userID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -71,7 +75,7 @@ func (d *DataBases) BatchInsertChat2DB(userID string, msgList []*pbMsg.MsgDataTo
|
|||||||
|
|
||||||
if seqUid != "" {
|
if seqUid != "" {
|
||||||
filter := bson.M{"uid": seqUid}
|
filter := bson.M{"uid": seqUid}
|
||||||
log.NewDebug(operationID, "filter ", seqUid, "list ", msgListToMongo)
|
log.NewDebug(operationID, "filter ", seqUid, "list ", msgListToMongo, "userID: ", userID)
|
||||||
err := c.FindOneAndUpdate(ctx, filter, bson.M{"$push": bson.M{"msg": bson.M{"$each": msgListToMongo}}}).Err()
|
err := c.FindOneAndUpdate(ctx, filter, bson.M{"$push": bson.M{"msg": bson.M{"$each": msgListToMongo}}}).Err()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == mongo.ErrNoDocuments {
|
if err == mongo.ErrNoDocuments {
|
||||||
@ -95,7 +99,7 @@ func (d *DataBases) BatchInsertChat2DB(userID string, msgList []*pbMsg.MsgDataTo
|
|||||||
sChat := UserChat{}
|
sChat := UserChat{}
|
||||||
sChat.UID = seqUidNext
|
sChat.UID = seqUidNext
|
||||||
sChat.Msg = msgListToMongoNext
|
sChat.Msg = msgListToMongoNext
|
||||||
log.NewDebug(operationID, "filter ", seqUidNext, "list ", msgListToMongoNext)
|
log.NewDebug(operationID, "filter ", seqUidNext, "list ", msgListToMongoNext, "userID: ", userID)
|
||||||
if _, err = c.InsertOne(ctx, &sChat); err != nil {
|
if _, err = c.InsertOne(ctx, &sChat); err != nil {
|
||||||
log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat)
|
log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat)
|
||||||
return utils.Wrap(err, "")
|
return utils.Wrap(err, "")
|
||||||
@ -119,8 +123,10 @@ func (d *DataBases) BatchInsertChat2Cache(insertID string, msgList []*pbMsg.MsgD
|
|||||||
var err error
|
var err error
|
||||||
if msgList[0].MsgData.SessionType == constant.SuperGroupChatType {
|
if msgList[0].MsgData.SessionType == constant.SuperGroupChatType {
|
||||||
currentMaxSeq, err = d.GetGroupMaxSeq(insertID)
|
currentMaxSeq, err = d.GetGroupMaxSeq(insertID)
|
||||||
|
log.Debug(operationID, "constant.SuperGroupChatType lastMaxSeq before add ", currentMaxSeq, "userID ", insertID, err)
|
||||||
} else {
|
} else {
|
||||||
currentMaxSeq, err = d.GetUserMaxSeq(insertID)
|
currentMaxSeq, err = d.GetUserMaxSeq(insertID)
|
||||||
|
log.Debug(operationID, "constant.SingleChatType lastMaxSeq before add ", currentMaxSeq, "userID ", insertID, err)
|
||||||
}
|
}
|
||||||
if err != nil && err != go_redis.Nil {
|
if err != nil && err != go_redis.Nil {
|
||||||
return utils.Wrap(err, ""), 0
|
return utils.Wrap(err, ""), 0
|
||||||
@ -128,11 +134,12 @@ func (d *DataBases) BatchInsertChat2Cache(insertID string, msgList []*pbMsg.MsgD
|
|||||||
|
|
||||||
lastMaxSeq := currentMaxSeq
|
lastMaxSeq := currentMaxSeq
|
||||||
for _, m := range msgList {
|
for _, m := range msgList {
|
||||||
log.Debug(operationID, "msg node ", m.String(), m.MsgData.ClientMsgID)
|
|
||||||
currentMaxSeq++
|
currentMaxSeq++
|
||||||
sMsg := MsgInfo{}
|
sMsg := MsgInfo{}
|
||||||
sMsg.SendTime = m.MsgData.SendTime
|
sMsg.SendTime = m.MsgData.SendTime
|
||||||
m.MsgData.Seq = uint32(currentMaxSeq)
|
m.MsgData.Seq = uint32(currentMaxSeq)
|
||||||
|
log.Debug(operationID, "cache msg node ", m.String(), m.MsgData.ClientMsgID, "userID: ", insertID, "seq: ", currentMaxSeq)
|
||||||
}
|
}
|
||||||
log.Debug(operationID, "SetMessageToCache ", insertID, len(msgList))
|
log.Debug(operationID, "SetMessageToCache ", insertID, len(msgList))
|
||||||
err = d.SetMessageToCache(msgList, insertID, operationID)
|
err = d.SetMessageToCache(msgList, insertID, operationID)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user