redis add get message

This commit is contained in:
Gordon 2022-05-24 21:42:12 +08:00
parent 1246f3aa21
commit 25c316165e

View File

@ -23,6 +23,7 @@ type MsgChannelValue struct {
userID string userID string
triggerID string triggerID string
msgList []*pbMsg.MsgDataToMQ msgList []*pbMsg.MsgDataToMQ
lastSeq uint64
} }
type TriggerChannelValue struct { type TriggerChannelValue struct {
triggerID string triggerID string
@ -54,7 +55,7 @@ func (och *OnlineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) {
go och.Run(i) go och.Run(i)
} }
for i := 0; i < ChannelNum; i++ { for i := 0; i < ChannelNum; i++ {
och.chMongoArrays[i] = make(chan Cmd2Value, 1000) och.chMongoArrays[i] = make(chan Cmd2Value, 10000)
go och.MongoMessageRun(i) go och.MongoMessageRun(i)
} }
if config.Config.ReliableStorage { if config.Config.ReliableStorage {
@ -128,14 +129,15 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) {
// return // return
//} //}
log.Debug(triggerID, "msg storage length", len(storageMsgList), "push length", len(noStoragepushMsgList)) log.Debug(triggerID, "msg storage length", len(storageMsgList), "push length", len(noStoragepushMsgList))
err := saveUserChatList(msgChannelValue.userID, storageMsgList, triggerID) err, lastSeq := saveUserChatList(msgChannelValue.userID, storageMsgList, triggerID)
if err != nil { if err != nil {
singleMsgFailedCount += uint64(len(storageMsgList)) singleMsgFailedCount += uint64(len(storageMsgList))
log.NewError(triggerID, "single data insert to mongo err", err.Error(), storageMsgList) log.NewError(triggerID, "single data insert to redis err", err.Error(), storageMsgList)
} else { } else {
singleMsgSuccessCountMutex.Lock() singleMsgSuccessCountMutex.Lock()
singleMsgSuccessCount += uint64(len(storageMsgList)) singleMsgSuccessCount += uint64(len(storageMsgList))
singleMsgSuccessCountMutex.Unlock() singleMsgSuccessCountMutex.Unlock()
och.SendMessageToMongoCH(msgChannelValue.userID, triggerID, storageMsgList, lastSeq)
go func(push, storage []*pbMsg.MsgDataToMQ) { go func(push, storage []*pbMsg.MsgDataToMQ) {
for _, v := range storage { for _, v := range storage {
sendMessageToPush(v, msgChannelValue.userID) sendMessageToPush(v, msgChannelValue.userID)
@ -151,24 +153,28 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) {
} }
} }
} }
func (och *OnlineHistoryConsumerHandler) SendMessageToMongoCH(userID string, triggerID string, messages []*pbMsg.MsgDataToMQ) { func (och *OnlineHistoryConsumerHandler) SendMessageToMongoCH(userID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq uint64) {
hashCode := getHashCode(userID) hashCode := getHashCode(userID)
channelID := hashCode % ChannelNum channelID := hashCode % ChannelNum
log.Debug(triggerID, "generate channelID", hashCode, channelID, userID) log.Debug(triggerID, "generate channelID", hashCode, channelID, userID)
//go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) { //go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) {
och.chMongoArrays[channelID] <- Cmd2Value{Cmd: MongoMessages, Value: MsgChannelValue{userID: userID, msgList: messages, triggerID: triggerID}} och.chMongoArrays[channelID] <- Cmd2Value{Cmd: MongoMessages, Value: MsgChannelValue{userID: userID, msgList: messages, triggerID: triggerID, lastSeq: lastSeq}}
} }
func (och *OnlineHistoryConsumerHandler) MongoMessageRun(channelID int) { func (och *OnlineHistoryConsumerHandler) MongoMessageRun(channelID int) {
for { for {
select { select {
case cmd := <-och.chArrays[channelID]: case cmd := <-och.chArrays[channelID]:
switch cmd.Cmd { switch cmd.Cmd {
case MongoMessages: case MongoMessages:
msgChannelValue := cmd.Value.(MsgChannelValue) msgChannelValue := cmd.Value.(MsgChannelValue)
msgList := msgChannelValue.msgList msgList := msgChannelValue.msgList
triggerID := msgChannelValue.triggerID triggerID := msgChannelValue.triggerID
userID := msgChannelValue.userID userID := msgChannelValue.userID
lastSeq := msgChannelValue.lastSeq
err := db.DB.BatchInsertChat2DB(userID, msgList, triggerID, lastSeq)
if err != nil {
log.NewError(triggerID, "single data insert to mongo err", err.Error(), msgList)
}
} }
} }
} }