diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index 48f190dde..7434dfd5d 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -395,28 +395,6 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS cMsg := make([]*sarama.ConsumerMessage, 0, 1000) t := time.NewTicker(time.Duration(100) * time.Millisecond) var triggerID string - for msg := range claim.Messages() { - //msgFromMQ := pbMsg.MsgDataToMQ{} - //err := proto.Unmarshal(msg.Value, &msgFromMQ) - //if err != nil { - // log.Error(triggerID, "msg_transfer Unmarshal msg err", "msg", string(msg.Value), "err", err.Error()) - //} - //userID := string(msg.Key) - //hashCode := getHashCode(userID) - //channelID := hashCode % ChannelNum - //log.Debug(triggerID, "generate channelID", hashCode, channelID, userID) - ////go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) { - //och.chArrays[channelID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: []*pbMsg.MsgDataToMQ{&msgFromMQ}, triggerID: msgFromMQ.OperationID}} - //sess.MarkMessage(msg, "") - rwLock.Lock() - cMsg = append(cMsg, msg) - rwLock.Unlock() - sess.MarkMessage(msg, "") - //och.TriggerCmd(OnlineTopicBusy) - - //log.NewDebug("", "online kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "online", msg.Offset, claim.HighWaterMarkOffset()) - - } go func() { for { select { @@ -468,6 +446,28 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS } }() + for msg := range claim.Messages() { + //msgFromMQ := pbMsg.MsgDataToMQ{} + //err := proto.Unmarshal(msg.Value, &msgFromMQ) + //if err != nil { + // log.Error(triggerID, "msg_transfer Unmarshal msg err", "msg", string(msg.Value), "err", err.Error()) + //} + //userID := string(msg.Key) + //hashCode := getHashCode(userID) + //channelID := hashCode % ChannelNum + //log.Debug(triggerID, "generate channelID", hashCode, channelID, userID) + ////go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) { + //och.chArrays[channelID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: []*pbMsg.MsgDataToMQ{&msgFromMQ}, triggerID: msgFromMQ.OperationID}} + //sess.MarkMessage(msg, "") + rwLock.Lock() + cMsg = append(cMsg, msg) + rwLock.Unlock() + sess.MarkMessage(msg, "") + //och.TriggerCmd(OnlineTopicBusy) + + //log.NewDebug("", "online kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "online", msg.Offset, claim.HighWaterMarkOffset()) + + } return nil }