refactor: ctx message update

This commit is contained in:
Gordon 2023-05-19 17:10:00 +08:00
parent e7797167cb
commit 6ce26f93cc

View File

@ -90,7 +90,7 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
msgChannelValue := cmd.Value.(MsgChannelValue) msgChannelValue := cmd.Value.(MsgChannelValue)
ctxMsgList := msgChannelValue.ctxMsgList ctxMsgList := msgChannelValue.ctxMsgList
ctx := msgChannelValue.ctx ctx := msgChannelValue.ctx
log.ZDebug(ctx, "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMsgList), "conversationID", msgChannelValue.conversationID) log.ZDebug(withAggregationCtx(ctx, ctxMsgList), "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMsgList), "conversationID", msgChannelValue.conversationID)
storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList, modifyMsgList := och.getPushStorageMsgList(ctxMsgList) storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList, modifyMsgList := och.getPushStorageMsgList(ctxMsgList)
log.ZDebug(ctx, "msg lens", "storageMsgList", len(storageMsgList), "notStorageMsgList", len(notStorageMsgList), log.ZDebug(ctx, "msg lens", "storageMsgList", len(storageMsgList), "notStorageMsgList", len(notStorageMsgList),
"storageNotificationList", len(storageNotificationList), "notStorageNotificationList", len(notStorageNotificationList), "modifyMsgList", len(modifyMsgList)) "storageNotificationList", len(storageNotificationList), "notStorageNotificationList", len(notStorageNotificationList), "modifyMsgList", len(modifyMsgList))
@ -232,7 +232,7 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() {
err := proto.Unmarshal(consumerMessages[i].Value, &msgFromMQ) err := proto.Unmarshal(consumerMessages[i].Value, &msgFromMQ)
if err != nil { if err != nil {
log.ZError(ctx, "msg_transfer Unmarshal msg err", err, string(consumerMessages[i].Value)) log.ZError(ctx, "msg_transfer Unmarshal msg err", err, string(consumerMessages[i].Value))
return continue
} }
var arr []string var arr []string
for i, header := range consumerMessages[i].Headers { for i, header := range consumerMessages[i].Headers {
@ -241,7 +241,7 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() {
log.ZInfo(ctx, "consumer.kafka.GetContextWithMQHeader", "len", len(consumerMessages[i].Headers), "header", strings.Join(arr, ", ")) log.ZInfo(ctx, "consumer.kafka.GetContextWithMQHeader", "len", len(consumerMessages[i].Headers), "header", strings.Join(arr, ", "))
ctxMsg.ctx = kafka.GetContextWithMQHeader(consumerMessages[i].Headers) ctxMsg.ctx = kafka.GetContextWithMQHeader(consumerMessages[i].Headers)
ctxMsg.message = &msgFromMQ ctxMsg.message = &msgFromMQ
log.ZDebug(ctx, "single msg come to distribution center", msgFromMQ.String(), string(consumerMessages[i].Key)) log.ZDebug(ctx, "single msg come to distribution center", "message", msgFromMQ, "key", string(consumerMessages[i].Key))
//aggregationMsgs[string(consumerMessages[i].Key)] = append(aggregationMsgs[string(consumerMessages[i].Key)], ctxMsg) //aggregationMsgs[string(consumerMessages[i].Key)] = append(aggregationMsgs[string(consumerMessages[i].Key)], ctxMsg)
if oldM, ok := aggregationMsgs[string(consumerMessages[i].Key)]; ok { if oldM, ok := aggregationMsgs[string(consumerMessages[i].Key)]; ok {
oldM = append(oldM, ctxMsg) oldM = append(oldM, ctxMsg)
@ -257,7 +257,7 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() {
if len(v) >= 0 { if len(v) >= 0 {
hashCode := utils.GetHashCode(conversationID) hashCode := utils.GetHashCode(conversationID)
channelID := hashCode % ChannelNum channelID := hashCode % ChannelNum
log.ZDebug(ctx, "generate channelID", "hashCode", hashCode, "channelID", channelID, "conversationID", conversationID) log.ZDebug(withAggregationCtx(ctx, v), "generate channelID", "hashCode", hashCode, "channelID", channelID, "conversationID", conversationID)
och.chArrays[channelID] <- Cmd2Value{Cmd: SourceMessages, Value: MsgChannelValue{conversationID: conversationID, ctxMsgList: v, ctx: ctx}} och.chArrays[channelID] <- Cmd2Value{Cmd: SourceMessages, Value: MsgChannelValue{conversationID: conversationID, ctxMsgList: v, ctx: ctx}}
} }
} }
@ -265,6 +265,15 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() {
} }
} }
} }
func withAggregationCtx(ctx context.Context, values []*ContextMsg) context.Context {
var allMessageOperationID string
for _, v := range values {
if opid := mcontext.GetOperationID(v.ctx); opid != "" {
allMessageOperationID += "$" + opid
}
}
return mcontext.SetOperationID(ctx, allMessageOperationID)
}
func (och *OnlineHistoryRedisConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } func (och *OnlineHistoryRedisConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (och *OnlineHistoryRedisConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { func (och *OnlineHistoryRedisConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error {
@ -298,8 +307,7 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(sess sarama.ConsumerG
cMsg = make([]*sarama.ConsumerMessage, 0, 1000) cMsg = make([]*sarama.ConsumerMessage, 0, 1000)
rwLock.Unlock() rwLock.Unlock()
split := 1000 split := 1000
ctx := mcontext.NewCtx(utils.OperationIDGenerator()) ctx := mcontext.WithTriggerIDContext(context.Background(), utils.OperationIDGenerator())
ctx = mcontext.WithTriggerIDContext(ctx, utils.OperationIDGenerator())
log.ZDebug(ctx, "timer trigger msg consumer start", "length", len(ccMsg)) log.ZDebug(ctx, "timer trigger msg consumer start", "length", len(ccMsg))
for i := 0; i < len(ccMsg)/split; i++ { for i := 0; i < len(ccMsg)/split; i++ {
//log.Debug() //log.Debug()