diff --git a/internal/push/logic/push_handler.go b/internal/push/logic/push_handler.go index 200120800..f137c09d6 100644 --- a/internal/push/logic/push_handler.go +++ b/internal/push/logic/push_handler.go @@ -65,6 +65,7 @@ func (ms *PushConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, for msg := range claim.Messages() { log.NewDebug("", "kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) ms.msgHandle[msg.Topic](msg.Value) + sess.MarkMessage(msg, "") } return nil }