From 6ce26f93cc1f8ab2beb67e9dceede134d148be5b Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Fri, 19 May 2023 17:10:00 +0800 Subject: [PATCH] refactor: ctx message update --- .../msgtransfer/online_history_msg_handler.go | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index f97efdf15..770066127 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -90,7 +90,7 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) { msgChannelValue := cmd.Value.(MsgChannelValue) ctxMsgList := msgChannelValue.ctxMsgList ctx := msgChannelValue.ctx - log.ZDebug(ctx, "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMsgList), "conversationID", msgChannelValue.conversationID) + log.ZDebug(withAggregationCtx(ctx, ctxMsgList), "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMsgList), "conversationID", msgChannelValue.conversationID) storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList, modifyMsgList := och.getPushStorageMsgList(ctxMsgList) log.ZDebug(ctx, "msg lens", "storageMsgList", len(storageMsgList), "notStorageMsgList", len(notStorageMsgList), "storageNotificationList", len(storageNotificationList), "notStorageNotificationList", len(notStorageNotificationList), "modifyMsgList", len(modifyMsgList)) @@ -232,7 +232,7 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() { err := proto.Unmarshal(consumerMessages[i].Value, &msgFromMQ) if err != nil { log.ZError(ctx, "msg_transfer Unmarshal msg err", err, string(consumerMessages[i].Value)) - return + continue } var arr []string for i, header := range consumerMessages[i].Headers { @@ -241,7 +241,7 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() { log.ZInfo(ctx, "consumer.kafka.GetContextWithMQHeader", "len", len(consumerMessages[i].Headers), "header", strings.Join(arr, ", ")) ctxMsg.ctx = kafka.GetContextWithMQHeader(consumerMessages[i].Headers) ctxMsg.message = &msgFromMQ - log.ZDebug(ctx, "single msg come to distribution center", msgFromMQ.String(), string(consumerMessages[i].Key)) + log.ZDebug(ctx, "single msg come to distribution center", "message", msgFromMQ, "key", string(consumerMessages[i].Key)) //aggregationMsgs[string(consumerMessages[i].Key)] = append(aggregationMsgs[string(consumerMessages[i].Key)], ctxMsg) if oldM, ok := aggregationMsgs[string(consumerMessages[i].Key)]; ok { oldM = append(oldM, ctxMsg) @@ -257,7 +257,7 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() { if len(v) >= 0 { hashCode := utils.GetHashCode(conversationID) channelID := hashCode % ChannelNum - log.ZDebug(ctx, "generate channelID", "hashCode", hashCode, "channelID", channelID, "conversationID", conversationID) + log.ZDebug(withAggregationCtx(ctx, v), "generate channelID", "hashCode", hashCode, "channelID", channelID, "conversationID", conversationID) och.chArrays[channelID] <- Cmd2Value{Cmd: SourceMessages, Value: MsgChannelValue{conversationID: conversationID, ctxMsgList: v, ctx: ctx}} } } @@ -265,6 +265,15 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() { } } } +func withAggregationCtx(ctx context.Context, values []*ContextMsg) context.Context { + var allMessageOperationID string + for _, v := range values { + if opid := mcontext.GetOperationID(v.ctx); opid != "" { + allMessageOperationID += "$" + opid + } + } + return mcontext.SetOperationID(ctx, allMessageOperationID) +} func (och *OnlineHistoryRedisConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } func (och *OnlineHistoryRedisConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { @@ -298,8 +307,7 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(sess sarama.ConsumerG cMsg = make([]*sarama.ConsumerMessage, 0, 1000) rwLock.Unlock() split := 1000 - ctx := mcontext.NewCtx(utils.OperationIDGenerator()) - ctx = mcontext.WithTriggerIDContext(ctx, utils.OperationIDGenerator()) + ctx := mcontext.WithTriggerIDContext(context.Background(), utils.OperationIDGenerator()) log.ZDebug(ctx, "timer trigger msg consumer start", "length", len(ccMsg)) for i := 0; i < len(ccMsg)/split; i++ { //log.Debug()