This commit is contained in:
withchao 2023-05-10 15:21:08 +08:00
parent ac357f93eb
commit 20afd0a51f
2 changed files with 17 additions and 0 deletions

View File

@ -2,7 +2,11 @@ package msgtransfer
import (
"context"
"fmt"
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
"runtime/debug"
"strconv"
"strings"
"sync"
"time"
@ -155,6 +159,7 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Con
}
func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) {
fmt.Printf("toPushTopic Stack:\n%s\n", debug.Stack())
for _, v := range msgs {
och.msgDatabase.MsgToPushMQ(ctx, conversationID, v)
}
@ -219,6 +224,11 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() {
log.ZError(ctx, "msg_transfer Unmarshal msg err", err, string(consumerMessages[i].Value))
return
}
var arr []string
for i, header := range consumerMessages[i].Headers {
arr = append(arr, strconv.Itoa(i), string(header.Key), string(header.Value))
}
log.ZInfo(ctx, "consumer.kafka.GetContextWithMQHeader", "len", len(consumerMessages[i].Headers), "header", strings.Join(arr, ", "))
ctxMsg.ctx = kafka.GetContextWithMQHeader(consumerMessages[i].Headers)
ctxMsg.message = &msgFromMQ
log.ZDebug(ctx, "single msg come to distribution center", msgFromMQ.String(), string(consumerMessages[i].Key))

View File

@ -3,6 +3,8 @@ package kafka
import (
"context"
"errors"
"strconv"
"strings"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
@ -87,6 +89,11 @@ func (p *Producer) SendMessage(ctx context.Context, key string, m proto.Message)
if err != nil {
return 0, 0, utils.Wrap(err, "")
}
var arr []string
for i, header := range header {
arr = append(arr, strconv.Itoa(i), string(header.Key), string(header.Value))
}
log.ZInfo(ctx, "producer.kafka.GetContextWithMQHeader", "len", len(header), "header", strings.Join(arr, ", "))
kMsg.Headers = header
partition, offset, err := p.producer.SendMessage(kMsg)
log.ZDebug(ctx, "ByteEncoder SendMessage end", "key ", kMsg.Key, "key length", kMsg.Value.Length())