diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 7494016c5..556bd6327 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -90,7 +90,7 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) { msgChannelValue := cmd.Value.(MsgChannelValue) ctxMsgList := msgChannelValue.ctxMsgList ctx := msgChannelValue.ctx - log.ZDebug(ctx, "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMsgList), "uniqueKey", msgChannelValue.uniqueKey) + log.ZDebug(withAggregationCtx(ctx, ctxMsgList), "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMsgList), "uniqueKey", msgChannelValue.uniqueKey) storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList, modifyMsgList := och.getPushStorageMsgList(ctxMsgList) log.ZDebug(ctx, "msg lens", "storageMsgList", len(storageMsgList), "notStorageMsgList", len(notStorageMsgList), "storageNotificationList", len(storageNotificationList), "notStorageNotificationList", len(notStorageNotificationList), "modifyMsgList", len(modifyMsgList)) @@ -234,7 +234,7 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() { err := proto.Unmarshal(consumerMessages[i].Value, &msgFromMQ) if err != nil { log.ZError(ctx, "msg_transfer Unmarshal msg err", err, string(consumerMessages[i].Value)) - return + continue } var arr []string for i, header := range consumerMessages[i].Headers { @@ -243,7 +243,7 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() { log.ZInfo(ctx, "consumer.kafka.GetContextWithMQHeader", "len", len(consumerMessages[i].Headers), "header", strings.Join(arr, ", ")) ctxMsg.ctx = kafka.GetContextWithMQHeader(consumerMessages[i].Headers) ctxMsg.message = &msgFromMQ - log.ZDebug(ctx, "single msg come to distribution center", msgFromMQ.String(), string(consumerMessages[i].Key)) + log.ZDebug(ctx, "single msg come to distribution center", "message", msgFromMQ, "key", string(consumerMessages[i].Key)) //aggregationMsgs[string(consumerMessages[i].Key)] = append(aggregationMsgs[string(consumerMessages[i].Key)], ctxMsg) if oldM, ok := aggregationMsgs[string(consumerMessages[i].Key)]; ok { oldM = append(oldM, ctxMsg) @@ -259,7 +259,7 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() { if len(v) >= 0 { hashCode := utils.GetHashCode(uniqueKey) channelID := hashCode % ChannelNum - log.ZDebug(ctx, "generate channelID", "hashCode", hashCode, "channelID", channelID, "uniqueKey", uniqueKey) + log.ZDebug(withAggregationCtx(ctx, v), "generate channelID", "hashCode", hashCode, "channelID", channelID, "uniqueKey", uniqueKey) och.chArrays[channelID] <- Cmd2Value{Cmd: SourceMessages, Value: MsgChannelValue{uniqueKey: uniqueKey, ctxMsgList: v, ctx: ctx}} } } @@ -267,6 +267,15 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() { } } } +func withAggregationCtx(ctx context.Context, values []*ContextMsg) context.Context { + var allMessageOperationID string + for _, v := range values { + if opid := mcontext.GetOperationID(v.ctx); opid != "" { + allMessageOperationID += "$" + opid + } + } + return mcontext.SetOperationID(ctx, allMessageOperationID) +} func (och *OnlineHistoryRedisConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } func (och *OnlineHistoryRedisConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { @@ -300,8 +309,7 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(sess sarama.ConsumerG cMsg = make([]*sarama.ConsumerMessage, 0, 1000) rwLock.Unlock() split := 1000 - ctx := mcontext.NewCtx(utils.OperationIDGenerator()) - ctx = mcontext.WithTriggerIDContext(ctx, utils.OperationIDGenerator()) + ctx := mcontext.WithTriggerIDContext(context.Background(), utils.OperationIDGenerator()) log.ZDebug(ctx, "timer trigger msg consumer start", "length", len(ccMsg)) for i := 0; i < len(ccMsg)/split; i++ { //log.Debug()