mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-05-25 14:39:20 +08:00
Push message to client through message queue
This commit is contained in:
parent
09b830bcf9
commit
80d74f6f38
@ -1 +1 @@
|
|||||||
Subproject commit c5e258fa82dd11c9ca1e553a56920bad3f1a4b73
|
Subproject commit 3b9c0d84d43d45c2a73b56a3c3510f86b67ff9bb
|
@ -107,32 +107,24 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
|
|||||||
singleMsgSuccessCount += uint64(len(storageMsgList))
|
singleMsgSuccessCount += uint64(len(storageMsgList))
|
||||||
singleMsgSuccessCountMutex.Unlock()
|
singleMsgSuccessCountMutex.Unlock()
|
||||||
och.SendMessageToMongoCH(msgChannelValue.aggregationID, triggerID, storageMsgList, lastSeq)
|
och.SendMessageToMongoCH(msgChannelValue.aggregationID, triggerID, storageMsgList, lastSeq)
|
||||||
go func(push, storage []*pbMsg.MsgDataToMQ) {
|
for _, v := range storageMsgList {
|
||||||
for _, v := range storage {
|
sendMessageToPushMQ(v, msgChannelValue.aggregationID)
|
||||||
sendMessageToPush(v, msgChannelValue.aggregationID)
|
|
||||||
}
|
}
|
||||||
for _, x := range push {
|
for _, x := range notStoragePushMsgList {
|
||||||
sendMessageToPush(x, msgChannelValue.aggregationID)
|
sendMessageToPushMQ(x, msgChannelValue.aggregationID)
|
||||||
}
|
}
|
||||||
|
|
||||||
}(notStoragePushMsgList, storageMsgList)
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
go func(push []*pbMsg.MsgDataToMQ) {
|
for _, x := range notStoragePushMsgList {
|
||||||
for _, x := range push {
|
sendMessageToPushMQ(x, msgChannelValue.aggregationID)
|
||||||
sendMessageToPush(x, msgChannelValue.aggregationID)
|
}
|
||||||
}
|
|
||||||
}(notStoragePushMsgList)
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (och *OnlineHistoryRedisConsumerHandler) SendMessageToMongoCH(aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq uint64) {
|
func (och *OnlineHistoryRedisConsumerHandler) SendMessageToMongoCH(aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq uint64) {
|
||||||
if len(messages) > 0 {
|
if len(messages) > 0 {
|
||||||
pid, offset, err := producerToMongo.SendMessage(&pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, AggregationID: aggregationID, MessageList: messages, TriggerID: triggerID}, aggregationID, triggerID)
|
pid, offset, err := producerToMongo.SendMessage(&pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, AggregationID: aggregationID, MessageList: messages, TriggerID: triggerID}, aggregationID, triggerID)
|
||||||
@ -522,6 +514,7 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(sess sarama.ConsumerG
|
|||||||
// }
|
// }
|
||||||
// return nil
|
// return nil
|
||||||
//}
|
//}
|
||||||
|
|
||||||
func sendMessageToPush(message *pbMsg.MsgDataToMQ, pushToUserID string) {
|
func sendMessageToPush(message *pbMsg.MsgDataToMQ, pushToUserID string) {
|
||||||
log.Info(message.OperationID, "msg_transfer send message to push", "message", message.String())
|
log.Info(message.OperationID, "msg_transfer send message to push", "message", message.String())
|
||||||
rpcPushMsg := pbPush.PushMsgReq{OperationID: message.OperationID, MsgData: message.MsgData, PushToUserID: pushToUserID}
|
rpcPushMsg := pbPush.PushMsgReq{OperationID: message.OperationID, MsgData: message.MsgData, PushToUserID: pushToUserID}
|
||||||
@ -549,6 +542,17 @@ func sendMessageToPush(message *pbMsg.MsgDataToMQ, pushToUserID string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func sendMessageToPushMQ(message *pbMsg.MsgDataToMQ, pushToUserID string) {
|
||||||
|
log.Info(message.OperationID, utils.GetSelfFuncName(), "msg ", message.String(), pushToUserID)
|
||||||
|
rpcPushMsg := pbPush.PushMsgReq{OperationID: message.OperationID, MsgData: message.MsgData, PushToUserID: pushToUserID}
|
||||||
|
mqPushMsg := pbMsg.PushMsgDataToMQ{OperationID: message.OperationID, MsgData: message.MsgData, PushToUserID: pushToUserID}
|
||||||
|
pid, offset, err := producer.SendMessage(&mqPushMsg, mqPushMsg.PushToUserID, rpcPushMsg.OperationID)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(mqPushMsg.OperationID, "kafka send failed", "send data", message.String(), "pid", pid, "offset", offset, "err", err.Error())
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// String hashes a string to a unique hashcode.
|
// String hashes a string to a unique hashcode.
|
||||||
//
|
//
|
||||||
// crc32 returns a uint32, but for our use we need
|
// crc32 returns a uint32, but for our use we need
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
package kafka
|
package kafka
|
||||||
|
|
||||||
import (
|
import (
|
||||||
log2 "Open_IM/pkg/common/log"
|
log "Open_IM/pkg/common/log"
|
||||||
"Open_IM/pkg/utils"
|
"Open_IM/pkg/utils"
|
||||||
"errors"
|
"errors"
|
||||||
"github.com/Shopify/sarama"
|
"github.com/Shopify/sarama"
|
||||||
@ -36,26 +36,26 @@ func NewKafkaProducer(addr []string, topic string) *Producer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *Producer) SendMessage(m proto.Message, key string, operationID string) (int32, int64, error) {
|
func (p *Producer) SendMessage(m proto.Message, key string, operationID string) (int32, int64, error) {
|
||||||
log2.Info(operationID, "SendMessage", "key ", key, m.String(), p.producer)
|
log.Info(operationID, "SendMessage", "key ", key, m.String(), p.producer)
|
||||||
kMsg := &sarama.ProducerMessage{}
|
kMsg := &sarama.ProducerMessage{}
|
||||||
kMsg.Topic = p.topic
|
kMsg.Topic = p.topic
|
||||||
kMsg.Key = sarama.StringEncoder(key)
|
kMsg.Key = sarama.StringEncoder(key)
|
||||||
bMsg, err := proto.Marshal(m)
|
bMsg, err := proto.Marshal(m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log2.Error(operationID, "", "proto marshal err = %s", err.Error())
|
log.Error(operationID, "", "proto marshal err = %s", err.Error())
|
||||||
return -1, -1, err
|
return -1, -1, err
|
||||||
}
|
}
|
||||||
if len(bMsg) == 0 {
|
if len(bMsg) == 0 {
|
||||||
log2.Error(operationID, "len(bMsg) == 0 ")
|
log.Error(operationID, "len(bMsg) == 0 ")
|
||||||
return 0, 0, errors.New("len(bMsg) == 0 ")
|
return 0, 0, errors.New("len(bMsg) == 0 ")
|
||||||
}
|
}
|
||||||
kMsg.Value = sarama.ByteEncoder(bMsg)
|
kMsg.Value = sarama.ByteEncoder(bMsg)
|
||||||
log2.Info(operationID, "ByteEncoder SendMessage begin", "key ", kMsg, p.producer, "len: ", kMsg.Key.Length(), kMsg.Value.Length())
|
log.Info(operationID, "ByteEncoder SendMessage begin", "key ", kMsg, p.producer, "len: ", kMsg.Key.Length(), kMsg.Value.Length())
|
||||||
if kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 {
|
if kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 {
|
||||||
log2.Error(operationID, "kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 ", kMsg)
|
log.Error(operationID, "kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 ", kMsg)
|
||||||
return -1, -1, errors.New("key or value == 0")
|
return -1, -1, errors.New("key or value == 0")
|
||||||
}
|
}
|
||||||
a, b, c := p.producer.SendMessage(kMsg)
|
a, b, c := p.producer.SendMessage(kMsg)
|
||||||
log2.Info(operationID, "ByteEncoder SendMessage end", "key ", kMsg.Key.Length(), kMsg.Value.Length(), p.producer)
|
log.Info(operationID, "ByteEncoder SendMessage end", "key ", kMsg.Key.Length(), kMsg.Value.Length(), p.producer)
|
||||||
return a, b, utils.Wrap(c, "")
|
return a, b, utils.Wrap(c, "")
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user