From 2043abb24b8bbe4dea8ba83c6e913518d849fcf1 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 10 May 2023 18:18:30 +0800 Subject: [PATCH] mongo consumer --- internal/msgtransfer/init.go | 1 - internal/msgtransfer/online_msg_to_mongo_handler.go | 8 ++++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 4249a429b..2d5fb73e1 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -46,7 +46,6 @@ func StartTransfer(prometheusPort int) error { client, err := openKeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, openKeeper.WithFreq(time.Hour), openKeeper.WithRoundRobin(), openKeeper.WithUserNameAndPassword(config.Config.Zookeeper.UserName, config.Config.Zookeeper.Password), openKeeper.WithTimeout(10)) - if err != nil { return err } diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index 24c6b52f1..5bb4579ad 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -100,14 +100,14 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont func (OnlineHistoryMongoConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } func (OnlineHistoryMongoConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } func (mc *OnlineHistoryMongoConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group - log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) + log.ZDebug(context.Background(), "online new session msg come", "highWaterMarkOffset", + claim.HighWaterMarkOffset(), "topic", claim.Topic(), "partition", claim.Partition()) for msg := range claim.Messages() { - log.NewDebug("", "kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "key", string(msg.Key)) + ctx := mc.historyConsumerGroup.GetContextFromMsg(msg) if len(msg.Value) != 0 { - ctx := mc.historyConsumerGroup.GetContextFromMsg(msg) mc.handleChatWs2Mongo(ctx, msg, string(msg.Key), sess) } else { - log.Error("", "mongo msg get from kafka but is nil", msg.Key) + log.ZError(ctx, "mongo msg get from kafka but is nil", nil, "key", msg.Key) } sess.MarkMessage(msg, "") }