diff --git a/internal/msgtransfer/modify_msg_handler.go b/internal/msgtransfer/modify_msg_handler.go index beeb48d3e..948f43df6 100644 --- a/internal/msgtransfer/modify_msg_handler.go +++ b/internal/msgtransfer/modify_msg_handler.go @@ -40,12 +40,12 @@ func (ModifyMsgConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { r func (mmc *ModifyMsgConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for msg := range claim.Messages() { - log.NewDebug("", "kafka get info to mysql", "ModifyMsgConsumerHandler", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "key", string(msg.Key)) + ctx := mmc.modifyMsgConsumerGroup.GetContextFromMsg(msg) + log.ZDebug(ctx, "kafka get info to mysql", "ModifyMsgConsumerHandler", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "key", string(msg.Key)) if len(msg.Value) != 0 { - ctx := mmc.modifyMsgConsumerGroup.GetContextFromMsg(msg) mmc.ModifyMsg(ctx, msg, string(msg.Key), sess) } else { - log.Error("", "msg get from kafka but is nil", msg.Key) + log.ZError(ctx, "msg get from kafka but is nil", nil, "key", msg.Key) } sess.MarkMessage(msg, "") } diff --git a/internal/msgtransfer/persistent_msg_handler.go b/internal/msgtransfer/persistent_msg_handler.go index b0095d6e9..6d9b64a13 100644 --- a/internal/msgtransfer/persistent_msg_handler.go +++ b/internal/msgtransfer/persistent_msg_handler.go @@ -14,7 +14,6 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller" kfk "github.com/OpenIMSDK/Open-IM-Server/pkg/common/kafka" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext" pbMsg "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" @@ -38,7 +37,6 @@ func NewPersistentConsumerHandler(database controller.ChatLogDatabase) *Persiste func (pc *PersistentConsumerHandler) handleChatWs2Mysql(ctx context.Context, cMsg *sarama.ConsumerMessage, msgKey string, _ sarama.ConsumerGroupSession) { msg := cMsg.Value - operationID := mcontext.GetOperationID(ctx) var tag bool msgFromMQ := pbMsg.MsgDataToMQ{} err := proto.Unmarshal(msg, &msgFromMQ) @@ -46,6 +44,7 @@ func (pc *PersistentConsumerHandler) handleChatWs2Mysql(ctx context.Context, cMs log.ZError(ctx, "msg_transfer Unmarshal msg err", err) return } + return log.ZDebug(ctx, "handleChatWs2Mysql", "msg", msgFromMQ.MsgData) //Control whether to store history messages (mysql) isPersist := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsPersistent) @@ -64,9 +63,9 @@ func (pc *PersistentConsumerHandler) handleChatWs2Mysql(ctx context.Context, cMs tag = true } if tag { - log.NewInfo(operationID, "msg_transfer msg persisting", string(msg)) + log.ZInfo(ctx, "msg_transfer msg persisting", "msg", string(msg)) if err = pc.chatLogDatabase.CreateChatLog(&msgFromMQ); err != nil { - log.NewError(operationID, "Message insert failed", "err", err.Error(), "msg", msgFromMQ.String()) + log.ZError(ctx, "Message insert failed", err, "msg", msgFromMQ.String()) return } } @@ -76,12 +75,12 @@ func (PersistentConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { func (PersistentConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } func (pc *PersistentConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for msg := range claim.Messages() { - log.NewDebug("", "kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "key", string(msg.Key)) + ctx := pc.persistentConsumerGroup.GetContextFromMsg(msg) + log.ZDebug(ctx, "kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "key", string(msg.Key)) if len(msg.Value) != 0 { - ctx := pc.persistentConsumerGroup.GetContextFromMsg(msg) pc.handleChatWs2Mysql(ctx, msg, string(msg.Key), sess) } else { - log.Error("", "msg get from kafka but is nil", msg.Key) + log.ZError(ctx, "msg get from kafka but is nil", nil, "key", msg.Key) } sess.MarkMessage(msg, "") }