This commit is contained in:
wangchuxiao 2023-05-19 19:50:52 +08:00
parent 76fd103a82
commit 73c74a929b

View File

@ -230,8 +230,8 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() {
log.ZDebug(ctx, "batch messages come to distribution center", "length", len(consumerMessages)) log.ZDebug(ctx, "batch messages come to distribution center", "length", len(consumerMessages))
for i := 0; i < len(consumerMessages); i++ { for i := 0; i < len(consumerMessages); i++ {
ctxMsg := &ContextMsg{} ctxMsg := &ContextMsg{}
var msgFromMQ sdkws.MsgData msgFromMQ := &sdkws.MsgData{}
err := proto.Unmarshal(consumerMessages[i].Value, &msgFromMQ) err := proto.Unmarshal(consumerMessages[i].Value, msgFromMQ)
if err != nil { if err != nil {
log.ZError(ctx, "msg_transfer Unmarshal msg err", err, string(consumerMessages[i].Value)) log.ZError(ctx, "msg_transfer Unmarshal msg err", err, string(consumerMessages[i].Value))
continue continue
@ -242,7 +242,7 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() {
} }
log.ZInfo(ctx, "consumer.kafka.GetContextWithMQHeader", "len", len(consumerMessages[i].Headers), "header", strings.Join(arr, ", ")) log.ZInfo(ctx, "consumer.kafka.GetContextWithMQHeader", "len", len(consumerMessages[i].Headers), "header", strings.Join(arr, ", "))
ctxMsg.ctx = kafka.GetContextWithMQHeader(consumerMessages[i].Headers) ctxMsg.ctx = kafka.GetContextWithMQHeader(consumerMessages[i].Headers)
ctxMsg.message = &msgFromMQ ctxMsg.message = msgFromMQ
log.ZDebug(ctx, "single msg come to distribution center", "message", msgFromMQ, "key", string(consumerMessages[i].Key)) log.ZDebug(ctx, "single msg come to distribution center", "message", msgFromMQ, "key", string(consumerMessages[i].Key))
//aggregationMsgs[string(consumerMessages[i].Key)] = append(aggregationMsgs[string(consumerMessages[i].Key)], ctxMsg) //aggregationMsgs[string(consumerMessages[i].Key)] = append(aggregationMsgs[string(consumerMessages[i].Key)], ctxMsg)
if oldM, ok := aggregationMsgs[string(consumerMessages[i].Key)]; ok { if oldM, ok := aggregationMsgs[string(consumerMessages[i].Key)]; ok {