mark message

This commit is contained in:
Gordon 2022-08-12 19:24:30 +08:00
parent e4bbd57c41
commit 9c7ae3b0ef

View File

@ -59,6 +59,7 @@ func (ms *PushConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
for msg := range claim.Messages() {
log.NewDebug("", "kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value))
ms.msgHandle[msg.Topic](msg.Value)
sess.MarkMessage(msg, "")
}
return nil
}