From ee47c6d7274abd95d33f9e913ecdcf0459a4ec9e Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Wed, 25 May 2022 20:29:32 +0800 Subject: [PATCH] channelNum --- .../logic/online_history_msg_handler.go | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index a915fce0e..3deed6a23 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -17,6 +17,7 @@ import ( "github.com/golang/protobuf/proto" "hash/crc32" "strings" + "sync" "time" ) @@ -389,7 +390,7 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS break } } - + rwLock := new(sync.RWMutex) log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) cMsg := make([]*sarama.ConsumerMessage, 0, 1000) t := time.NewTicker(time.Duration(100) * time.Millisecond) @@ -407,8 +408,16 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS ////go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) { //och.chArrays[channelID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: []*pbMsg.MsgDataToMQ{&msgFromMQ}, triggerID: msgFromMQ.OperationID}} //sess.MarkMessage(msg, "") + rwLock.Lock() cMsg = append(cMsg, msg) + rwLock.Unlock() + sess.MarkMessage(msg, "") //och.TriggerCmd(OnlineTopicBusy) + + //log.NewDebug("", "online kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "online", msg.Offset, claim.HighWaterMarkOffset()) + + } + go func() { select { //case : // triggerID = utils.OperationIDGenerator() @@ -430,24 +439,26 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS case <-t.C: if len(cMsg) > 0 { + rwLock.Lock() ccMsg := make([]*sarama.ConsumerMessage, 0, 1000) for _, v := range cMsg { ccMsg = append(ccMsg, v) } + cMsg = make([]*sarama.ConsumerMessage, 0, 1000) + rwLock.Unlock() + triggerID = utils.OperationIDGenerator() log.Debug(triggerID, "timer trigger msg consumer start", len(ccMsg)) och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ triggerID: triggerID, cmsgList: ccMsg}} - sess.MarkMessage(ccMsg[len(cMsg)-1], "") - cMsg = make([]*sarama.ConsumerMessage, 0, 1000) + //sess.MarkMessage(ccMsg[len(cMsg)-1], "") + log.Debug(triggerID, "timer trigger msg consumer end", len(cMsg)) } - default: } - //log.NewDebug("", "online kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "online", msg.Offset, claim.HighWaterMarkOffset()) + }() - } return nil }