diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index b1e9c3d59..468371512 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -90,7 +90,7 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) { msgChannelValue := cmd.Value.(MsgChannelValue) ctxMsgList := msgChannelValue.ctxMsgList ctx := msgChannelValue.ctx - log.ZDebug(withAggregationCtx(ctx, ctxMsgList), "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMsgList), "uniqueKey", msgChannelValue.uniqueKey) + log.ZDebug(ctx, "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMsgList), "uniqueKey", msgChannelValue.uniqueKey) storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList, modifyMsgList := och.getPushStorageMsgList(ctxMsgList) log.ZDebug(ctx, "msg lens", "storageMsgList", len(storageMsgList), "notStorageMsgList", len(notStorageMsgList), "storageNotificationList", len(storageNotificationList), "notStorageNotificationList", len(notStorageNotificationList), "modifyMsgList", len(modifyMsgList)) @@ -259,8 +259,9 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() { if len(v) >= 0 { hashCode := utils.GetHashCode(uniqueKey) channelID := hashCode % ChannelNum - log.ZDebug(withAggregationCtx(ctx, v), "generate channelID", "hashCode", hashCode, "channelID", channelID, "uniqueKey", uniqueKey) - och.chArrays[channelID] <- Cmd2Value{Cmd: SourceMessages, Value: MsgChannelValue{uniqueKey: uniqueKey, ctxMsgList: v, ctx: ctx}} + newCtx := withAggregationCtx(ctx, v) + log.ZDebug(newCtx, "generate channelID", "hashCode", hashCode, "channelID", channelID, "uniqueKey", uniqueKey) + och.chArrays[channelID] <- Cmd2Value{Cmd: SourceMessages, Value: MsgChannelValue{uniqueKey: uniqueKey, ctxMsgList: v, ctx: newCtx}} } } } @@ -269,9 +270,14 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() { } func withAggregationCtx(ctx context.Context, values []*ContextMsg) context.Context { var allMessageOperationID string - for _, v := range values { + for i, v := range values { if opid := mcontext.GetOperationID(v.ctx); opid != "" { - allMessageOperationID += "$" + opid + if i == 0 { + allMessageOperationID += opid + + } else { + allMessageOperationID += "$" + opid + } } } return mcontext.SetOperationID(ctx, allMessageOperationID)