diff --git a/internal/msgtransfer/modify_msg_handler.go b/internal/msgtransfer/modify_msg_handler.go index 2d3f92027..f9c0228c3 100644 --- a/internal/msgtransfer/modify_msg_handler.go +++ b/internal/msgtransfer/modify_msg_handler.go @@ -41,7 +41,7 @@ func (mmc *ModifyMsgConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSessi for msg := range claim.Messages() { log.NewDebug("", "kafka get info to mysql", "ModifyMsgConsumerHandler", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "key", string(msg.Key)) if len(msg.Value) != 0 { - ctx := mmc.modifyMsgConsumerGroup.GetContextFromMsg(msg, "modify consumer") + ctx := mmc.modifyMsgConsumerGroup.GetContextFromMsg(msg) mmc.ModifyMsg(ctx, msg, string(msg.Key), sess) } else { log.Error("", "msg get from kafka but is nil", msg.Key) diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index 3dd8a6e64..d0f4ff001 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -78,7 +78,7 @@ func (mc *OnlineHistoryMongoConsumerHandler) ConsumeClaim(sess sarama.ConsumerGr for msg := range claim.Messages() { log.NewDebug("", "kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "key", string(msg.Key)) if len(msg.Value) != 0 { - ctx := mc.historyConsumerGroup.GetContextFromMsg(msg, "mongoDB consumer") + ctx := mc.historyConsumerGroup.GetContextFromMsg(msg) mc.handleChatWs2Mongo(ctx, msg, string(msg.Key), sess) } else { log.Error("", "mongo msg get from kafka but is nil", msg.Key) diff --git a/internal/msgtransfer/persistent_msg_handler.go b/internal/msgtransfer/persistent_msg_handler.go index 19ec67201..ddc264a2b 100644 --- a/internal/msgtransfer/persistent_msg_handler.go +++ b/internal/msgtransfer/persistent_msg_handler.go @@ -78,7 +78,7 @@ func (pc *PersistentConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSessi for msg := range claim.Messages() { log.NewDebug("", "kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "key", string(msg.Key)) if len(msg.Value) != 0 { - ctx := pc.persistentConsumerGroup.GetContextFromMsg(msg, "mysql consumer") + ctx := pc.persistentConsumerGroup.GetContextFromMsg(msg) pc.handleChatWs2Mysql(ctx, msg, string(msg.Key), sess) } else { log.Error("", "msg get from kafka but is nil", msg.Key) diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index 8748563d3..3beafe75d 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -58,7 +58,7 @@ func (ConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for msg := range claim.Messages() { - ctx := c.pushConsumerGroup.GetContextFromMsg(msg, "push consumer") + ctx := c.pushConsumerGroup.GetContextFromMsg(msg) c.handleMs2PsChat(ctx, msg.Value) sess.MarkMessage(msg, "") } diff --git a/pkg/common/kafka/consumer_group.go b/pkg/common/kafka/consumer_group.go index 7bc29fda2..c0c3823de 100644 --- a/pkg/common/kafka/consumer_group.go +++ b/pkg/common/kafka/consumer_group.go @@ -8,8 +8,6 @@ package kafka import ( "context" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext" "github.com/Shopify/sarama" ) @@ -42,16 +40,9 @@ func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addrs []str } } -func (mc *MConsumerGroup) GetContextFromMsg(cMsg *sarama.ConsumerMessage, rootFuncName string) context.Context { - ctx := mcontext.NewCtx(rootFuncName) - var operationID string - for _, v := range cMsg.Headers { - if string(v.Key) == constant.OperationID { - operationID = string(v.Value) - } - } - mcontext.SetOperationID(ctx, operationID) - return ctx +func (mc *MConsumerGroup) GetContextFromMsg(cMsg *sarama.ConsumerMessage) context.Context { + return GetContextWithMQHeader(cMsg.Headers) + } func (mc *MConsumerGroup) RegisterHandleAndConsumer(handler sarama.ConsumerGroupHandler) {