Merge remote-tracking branch 'origin/errcode' into errcode

This commit is contained in:
withchao 2023-05-19 20:45:40 +08:00
commit 3039cd5fc4

View File

@ -90,7 +90,7 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
msgChannelValue := cmd.Value.(MsgChannelValue) msgChannelValue := cmd.Value.(MsgChannelValue)
ctxMsgList := msgChannelValue.ctxMsgList ctxMsgList := msgChannelValue.ctxMsgList
ctx := msgChannelValue.ctx ctx := msgChannelValue.ctx
log.ZDebug(withAggregationCtx(ctx, ctxMsgList), "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMsgList), "uniqueKey", msgChannelValue.uniqueKey) log.ZDebug(ctx, "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMsgList), "uniqueKey", msgChannelValue.uniqueKey)
storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList, modifyMsgList := och.getPushStorageMsgList(ctxMsgList) storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList, modifyMsgList := och.getPushStorageMsgList(ctxMsgList)
log.ZDebug(ctx, "msg lens", "storageMsgList", len(storageMsgList), "notStorageMsgList", len(notStorageMsgList), log.ZDebug(ctx, "msg lens", "storageMsgList", len(storageMsgList), "notStorageMsgList", len(notStorageMsgList),
"storageNotificationList", len(storageNotificationList), "notStorageNotificationList", len(notStorageNotificationList), "modifyMsgList", len(modifyMsgList)) "storageNotificationList", len(storageNotificationList), "notStorageNotificationList", len(notStorageNotificationList), "modifyMsgList", len(modifyMsgList))
@ -259,8 +259,9 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() {
if len(v) >= 0 { if len(v) >= 0 {
hashCode := utils.GetHashCode(uniqueKey) hashCode := utils.GetHashCode(uniqueKey)
channelID := hashCode % ChannelNum channelID := hashCode % ChannelNum
log.ZDebug(withAggregationCtx(ctx, v), "generate channelID", "hashCode", hashCode, "channelID", channelID, "uniqueKey", uniqueKey) newCtx := withAggregationCtx(ctx, v)
och.chArrays[channelID] <- Cmd2Value{Cmd: SourceMessages, Value: MsgChannelValue{uniqueKey: uniqueKey, ctxMsgList: v, ctx: ctx}} log.ZDebug(newCtx, "generate channelID", "hashCode", hashCode, "channelID", channelID, "uniqueKey", uniqueKey)
och.chArrays[channelID] <- Cmd2Value{Cmd: SourceMessages, Value: MsgChannelValue{uniqueKey: uniqueKey, ctxMsgList: v, ctx: newCtx}}
} }
} }
} }
@ -269,9 +270,14 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() {
} }
func withAggregationCtx(ctx context.Context, values []*ContextMsg) context.Context { func withAggregationCtx(ctx context.Context, values []*ContextMsg) context.Context {
var allMessageOperationID string var allMessageOperationID string
for _, v := range values { for i, v := range values {
if opid := mcontext.GetOperationID(v.ctx); opid != "" { if opid := mcontext.GetOperationID(v.ctx); opid != "" {
allMessageOperationID += "$" + opid if i == 0 {
allMessageOperationID += opid
} else {
allMessageOperationID += "$" + opid
}
} }
} }
return mcontext.SetOperationID(ctx, allMessageOperationID) return mcontext.SetOperationID(ctx, allMessageOperationID)