From 80d74f6f386b50eff5039c3f8c44df1d49815d9b Mon Sep 17 00:00:00 2001 From: wenxu12345 <44203734@qq.com> Date: Tue, 26 Jul 2022 15:52:38 +0800 Subject: [PATCH] Push message to client through message queue --- cmd/Open-IM-SDK-Core | 2 +- .../logic/online_history_msg_handler.go | 40 ++++++++++--------- pkg/common/kafka/producer.go | 14 +++---- 3 files changed, 30 insertions(+), 26 deletions(-) diff --git a/cmd/Open-IM-SDK-Core b/cmd/Open-IM-SDK-Core index c5e258fa8..3b9c0d84d 160000 --- a/cmd/Open-IM-SDK-Core +++ b/cmd/Open-IM-SDK-Core @@ -1 +1 @@ -Subproject commit c5e258fa82dd11c9ca1e553a56920bad3f1a4b73 +Subproject commit 3b9c0d84d43d45c2a73b56a3c3510f86b67ff9bb diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index 7d74d4c22..34f950320 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -107,32 +107,24 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) { singleMsgSuccessCount += uint64(len(storageMsgList)) singleMsgSuccessCountMutex.Unlock() och.SendMessageToMongoCH(msgChannelValue.aggregationID, triggerID, storageMsgList, lastSeq) - go func(push, storage []*pbMsg.MsgDataToMQ) { - for _, v := range storage { - sendMessageToPush(v, msgChannelValue.aggregationID) - } - for _, x := range push { - sendMessageToPush(x, msgChannelValue.aggregationID) - } - - }(notStoragePushMsgList, storageMsgList) - + for _, v := range storageMsgList { + sendMessageToPushMQ(v, msgChannelValue.aggregationID) + } + for _, x := range notStoragePushMsgList { + sendMessageToPushMQ(x, msgChannelValue.aggregationID) + } } } else { - go func(push []*pbMsg.MsgDataToMQ) { - for _, x := range push { - sendMessageToPush(x, msgChannelValue.aggregationID) - } - }(notStoragePushMsgList) - + for _, x := range notStoragePushMsgList { + sendMessageToPushMQ(x, msgChannelValue.aggregationID) + } } } - } } - } + func (och *OnlineHistoryRedisConsumerHandler) SendMessageToMongoCH(aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq uint64) { if len(messages) > 0 { pid, offset, err := producerToMongo.SendMessage(&pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, AggregationID: aggregationID, MessageList: messages, TriggerID: triggerID}, aggregationID, triggerID) @@ -522,6 +514,7 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(sess sarama.ConsumerG // } // return nil //} + func sendMessageToPush(message *pbMsg.MsgDataToMQ, pushToUserID string) { log.Info(message.OperationID, "msg_transfer send message to push", "message", message.String()) rpcPushMsg := pbPush.PushMsgReq{OperationID: message.OperationID, MsgData: message.MsgData, PushToUserID: pushToUserID} @@ -549,6 +542,17 @@ func sendMessageToPush(message *pbMsg.MsgDataToMQ, pushToUserID string) { } } +func sendMessageToPushMQ(message *pbMsg.MsgDataToMQ, pushToUserID string) { + log.Info(message.OperationID, utils.GetSelfFuncName(), "msg ", message.String(), pushToUserID) + rpcPushMsg := pbPush.PushMsgReq{OperationID: message.OperationID, MsgData: message.MsgData, PushToUserID: pushToUserID} + mqPushMsg := pbMsg.PushMsgDataToMQ{OperationID: message.OperationID, MsgData: message.MsgData, PushToUserID: pushToUserID} + pid, offset, err := producer.SendMessage(&mqPushMsg, mqPushMsg.PushToUserID, rpcPushMsg.OperationID) + if err != nil { + log.Error(mqPushMsg.OperationID, "kafka send failed", "send data", message.String(), "pid", pid, "offset", offset, "err", err.Error()) + } + return +} + // String hashes a string to a unique hashcode. // // crc32 returns a uint32, but for our use we need diff --git a/pkg/common/kafka/producer.go b/pkg/common/kafka/producer.go index 1ebaaab10..4eb10aebd 100644 --- a/pkg/common/kafka/producer.go +++ b/pkg/common/kafka/producer.go @@ -1,7 +1,7 @@ package kafka import ( - log2 "Open_IM/pkg/common/log" + log "Open_IM/pkg/common/log" "Open_IM/pkg/utils" "errors" "github.com/Shopify/sarama" @@ -36,26 +36,26 @@ func NewKafkaProducer(addr []string, topic string) *Producer { } func (p *Producer) SendMessage(m proto.Message, key string, operationID string) (int32, int64, error) { - log2.Info(operationID, "SendMessage", "key ", key, m.String(), p.producer) + log.Info(operationID, "SendMessage", "key ", key, m.String(), p.producer) kMsg := &sarama.ProducerMessage{} kMsg.Topic = p.topic kMsg.Key = sarama.StringEncoder(key) bMsg, err := proto.Marshal(m) if err != nil { - log2.Error(operationID, "", "proto marshal err = %s", err.Error()) + log.Error(operationID, "", "proto marshal err = %s", err.Error()) return -1, -1, err } if len(bMsg) == 0 { - log2.Error(operationID, "len(bMsg) == 0 ") + log.Error(operationID, "len(bMsg) == 0 ") return 0, 0, errors.New("len(bMsg) == 0 ") } kMsg.Value = sarama.ByteEncoder(bMsg) - log2.Info(operationID, "ByteEncoder SendMessage begin", "key ", kMsg, p.producer, "len: ", kMsg.Key.Length(), kMsg.Value.Length()) + log.Info(operationID, "ByteEncoder SendMessage begin", "key ", kMsg, p.producer, "len: ", kMsg.Key.Length(), kMsg.Value.Length()) if kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 { - log2.Error(operationID, "kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 ", kMsg) + log.Error(operationID, "kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 ", kMsg) return -1, -1, errors.New("key or value == 0") } a, b, c := p.producer.SendMessage(kMsg) - log2.Info(operationID, "ByteEncoder SendMessage end", "key ", kMsg.Key.Length(), kMsg.Value.Length(), p.producer) + log.Info(operationID, "ByteEncoder SendMessage end", "key ", kMsg.Key.Length(), kMsg.Value.Length(), p.producer) return a, b, utils.Wrap(c, "") }