diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index cf51e9628..f409da44a 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -23,6 +23,7 @@ type MsgChannelValue struct { userID string triggerID string msgList []*pbMsg.MsgDataToMQ + lastSeq uint64 } type TriggerChannelValue struct { triggerID string @@ -54,7 +55,7 @@ func (och *OnlineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) { go och.Run(i) } for i := 0; i < ChannelNum; i++ { - och.chMongoArrays[i] = make(chan Cmd2Value, 1000) + och.chMongoArrays[i] = make(chan Cmd2Value, 10000) go och.MongoMessageRun(i) } if config.Config.ReliableStorage { @@ -128,14 +129,15 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) { // return //} log.Debug(triggerID, "msg storage length", len(storageMsgList), "push length", len(noStoragepushMsgList)) - err := saveUserChatList(msgChannelValue.userID, storageMsgList, triggerID) + err, lastSeq := saveUserChatList(msgChannelValue.userID, storageMsgList, triggerID) if err != nil { singleMsgFailedCount += uint64(len(storageMsgList)) - log.NewError(triggerID, "single data insert to mongo err", err.Error(), storageMsgList) + log.NewError(triggerID, "single data insert to redis err", err.Error(), storageMsgList) } else { singleMsgSuccessCountMutex.Lock() singleMsgSuccessCount += uint64(len(storageMsgList)) singleMsgSuccessCountMutex.Unlock() + och.SendMessageToMongoCH(msgChannelValue.userID, triggerID, storageMsgList, lastSeq) go func(push, storage []*pbMsg.MsgDataToMQ) { for _, v := range storage { sendMessageToPush(v, msgChannelValue.userID) @@ -151,24 +153,28 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) { } } } -func (och *OnlineHistoryConsumerHandler) SendMessageToMongoCH(userID string, triggerID string, messages []*pbMsg.MsgDataToMQ) { +func (och *OnlineHistoryConsumerHandler) SendMessageToMongoCH(userID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq uint64) { hashCode := getHashCode(userID) channelID := hashCode % ChannelNum log.Debug(triggerID, "generate channelID", hashCode, channelID, userID) //go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) { - och.chMongoArrays[channelID] <- Cmd2Value{Cmd: MongoMessages, Value: MsgChannelValue{userID: userID, msgList: messages, triggerID: triggerID}} + och.chMongoArrays[channelID] <- Cmd2Value{Cmd: MongoMessages, Value: MsgChannelValue{userID: userID, msgList: messages, triggerID: triggerID, lastSeq: lastSeq}} } func (och *OnlineHistoryConsumerHandler) MongoMessageRun(channelID int) { for { select { case cmd := <-och.chArrays[channelID]: switch cmd.Cmd { - case MongoMessages: msgChannelValue := cmd.Value.(MsgChannelValue) msgList := msgChannelValue.msgList triggerID := msgChannelValue.triggerID userID := msgChannelValue.userID + lastSeq := msgChannelValue.lastSeq + err := db.DB.BatchInsertChat2DB(userID, msgList, triggerID, lastSeq) + if err != nil { + log.NewError(triggerID, "single data insert to mongo err", err.Error(), msgList) + } } } }