diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 556bd6327..b1e9c3d59 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -230,8 +230,8 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() { log.ZDebug(ctx, "batch messages come to distribution center", "length", len(consumerMessages)) for i := 0; i < len(consumerMessages); i++ { ctxMsg := &ContextMsg{} - var msgFromMQ sdkws.MsgData - err := proto.Unmarshal(consumerMessages[i].Value, &msgFromMQ) + msgFromMQ := &sdkws.MsgData{} + err := proto.Unmarshal(consumerMessages[i].Value, msgFromMQ) if err != nil { log.ZError(ctx, "msg_transfer Unmarshal msg err", err, string(consumerMessages[i].Value)) continue @@ -242,7 +242,7 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() { } log.ZInfo(ctx, "consumer.kafka.GetContextWithMQHeader", "len", len(consumerMessages[i].Headers), "header", strings.Join(arr, ", ")) ctxMsg.ctx = kafka.GetContextWithMQHeader(consumerMessages[i].Headers) - ctxMsg.message = &msgFromMQ + ctxMsg.message = msgFromMQ log.ZDebug(ctx, "single msg come to distribution center", "message", msgFromMQ, "key", string(consumerMessages[i].Key)) //aggregationMsgs[string(consumerMessages[i].Key)] = append(aggregationMsgs[string(consumerMessages[i].Key)], ctxMsg) if oldM, ok := aggregationMsgs[string(consumerMessages[i].Key)]; ok {