From b864f9c6a9c4c7bad2610661a38c5de62117a0b6 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Sat, 21 May 2022 19:17:31 +0800 Subject: [PATCH] message --- internal/msg_transfer/logic/init.go | 4 ++-- internal/rpc/msg/send_msg.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/msg_transfer/logic/init.go b/internal/msg_transfer/logic/init.go index d36a3d8d5..53fcea474 100644 --- a/internal/msg_transfer/logic/init.go +++ b/internal/msg_transfer/logic/init.go @@ -15,7 +15,7 @@ const OnlineTopicVacancy = 0 const Msg = 2 const ConsumerMsgs = 3 const UserMessages = 4 -const ChannelNum = 11 +const ChannelNum = 100 var ( persistentCH PersistentConsumerHandler @@ -52,7 +52,7 @@ func Run() { fmt.Println("not start mysql consumer") } go historyCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyCH) - go offlineHistoryCH.historyConsumerGroup.RegisterHandleAndConsumer(&offlineHistoryCH) + //go offlineHistoryCH.historyConsumerGroup.RegisterHandleAndConsumer(&offlineHistoryCH) } func SetOnlineTopicStatus(status int) { w.Lock() diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index 587f56998..7e4ab3953 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -377,7 +377,7 @@ func (rpc *rpcChat) sendMsgToKafka(m *pbChat.MsgDataToMQ, key string, status str } return err case constant.OfflineStatus: - pid, offset, err := rpc.offlineProducer.SendMessage(m, key) + pid, offset, err := rpc.onlineProducer.SendMessage(m, key) if err != nil { log.Error(m.OperationID, "kafka send failed", "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key, status) }