From 20afd0a51faed4bc0e2c62b788e8c2dca6f9c13a Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Wed, 10 May 2023 15:21:08 +0800 Subject: [PATCH] fix bug --- internal/msgtransfer/online_history_msg_handler.go | 10 ++++++++++ pkg/common/kafka/producer.go | 7 +++++++ 2 files changed, 17 insertions(+) diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 570b38049..3159c118b 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -2,7 +2,11 @@ package msgtransfer import ( "context" + "fmt" "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" + "runtime/debug" + "strconv" + "strings" "sync" "time" @@ -155,6 +159,7 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Con } func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) { + fmt.Printf("toPushTopic Stack:\n%s\n", debug.Stack()) for _, v := range msgs { och.msgDatabase.MsgToPushMQ(ctx, conversationID, v) } @@ -219,6 +224,11 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() { log.ZError(ctx, "msg_transfer Unmarshal msg err", err, string(consumerMessages[i].Value)) return } + var arr []string + for i, header := range consumerMessages[i].Headers { + arr = append(arr, strconv.Itoa(i), string(header.Key), string(header.Value)) + } + log.ZInfo(ctx, "consumer.kafka.GetContextWithMQHeader", "len", len(consumerMessages[i].Headers), "header", strings.Join(arr, ", ")) ctxMsg.ctx = kafka.GetContextWithMQHeader(consumerMessages[i].Headers) ctxMsg.message = &msgFromMQ log.ZDebug(ctx, "single msg come to distribution center", msgFromMQ.String(), string(consumerMessages[i].Key)) diff --git a/pkg/common/kafka/producer.go b/pkg/common/kafka/producer.go index b0542aef4..9da96d430 100644 --- a/pkg/common/kafka/producer.go +++ b/pkg/common/kafka/producer.go @@ -3,6 +3,8 @@ package kafka import ( "context" "errors" + "strconv" + "strings" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" @@ -87,6 +89,11 @@ func (p *Producer) SendMessage(ctx context.Context, key string, m proto.Message) if err != nil { return 0, 0, utils.Wrap(err, "") } + var arr []string + for i, header := range header { + arr = append(arr, strconv.Itoa(i), string(header.Key), string(header.Value)) + } + log.ZInfo(ctx, "producer.kafka.GetContextWithMQHeader", "len", len(header), "header", strings.Join(arr, ", ")) kMsg.Headers = header partition, offset, err := p.producer.SendMessage(kMsg) log.ZDebug(ctx, "ByteEncoder SendMessage end", "key ", kMsg.Key, "key length", kMsg.Value.Length())