diff --git a/internal/push/logic/push_handler.go b/internal/push/logic/push_handler.go index 65f59a9f0..0c04a933d 100644 --- a/internal/push/logic/push_handler.go +++ b/internal/push/logic/push_handler.go @@ -59,6 +59,7 @@ func (ms *PushConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, for msg := range claim.Messages() { log.InfoByKv("kafka get info to mysql", "", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) ms.msgHandle[msg.Topic](msg.Value) + sess.MarkMessage(msg, "") } return nil }