From d36f4cf6f862fe104d47658c4fa6c96f27d77085 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Wed, 11 May 2022 19:44:08 +0800 Subject: [PATCH] test --- .../logic/offline_history_msg_handler.go | 34 +++++++++---------- .../logic/online_history_msg_handler.go | 4 +-- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/internal/msg_transfer/logic/offline_history_msg_handler.go b/internal/msg_transfer/logic/offline_history_msg_handler.go index a62bba4e2..7a63965ff 100644 --- a/internal/msg_transfer/logic/offline_history_msg_handler.go +++ b/internal/msg_transfer/logic/offline_history_msg_handler.go @@ -98,23 +98,23 @@ func (OfflineHistoryConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error func (OfflineHistoryConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } func (mc *OfflineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group - log.NewDebug("", "offline new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) - for msg := range claim.Messages() { - log.NewDebug("", "kafka get info to delay mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "offline") - //mc.msgHandle[msg.Topic](msg.Value, string(msg.Key)) - } - //select { - //case cmd := <-mc.cmdCh: - // if cmd.Cmd == OnlineTopicVacancy { - // for msg := range claim.Messages() { - // if GetOnlineTopicStatus() == OnlineTopicVacancy { - // log.NewDebug("", "kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) - // mc.msgHandle[msg.Topic](msg.Value, string(msg.Key)) - // sess.MarkMessage(msg, "") - // } - // } - // } - // + //log.NewDebug("", "offline new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) + //for msg := range claim.Messages() { + // log.NewDebug("", "kafka get info to delay mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "offline") + // //mc.msgHandle[msg.Topic](msg.Value, string(msg.Key)) //} + select { + case cmd := <-mc.cmdCh: + if cmd.Cmd == OnlineTopicVacancy { + for msg := range claim.Messages() { + if GetOnlineTopicStatus() == OnlineTopicVacancy { + log.NewDebug("", "kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) + mc.msgHandle[msg.Topic](msg.Value, string(msg.Key)) + sess.MarkMessage(msg, "") + } + } + } + + } return nil } diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index 755ca4433..179de1c79 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -134,9 +134,9 @@ func (OnlineHistoryConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) - och.TriggerCmd(OnlineTopicBusy) - SetOnlineTopicStatus(OnlineTopicBusy) for msg := range claim.Messages() { + och.TriggerCmd(OnlineTopicBusy) + SetOnlineTopicStatus(OnlineTopicBusy) log.NewDebug("", "kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "online") och.msgHandle[msg.Topic](msg.Value, string(msg.Key)) sess.MarkMessage(msg, "")