From 573937e1e5d240ecddeb1ba037ded028b1e5461f Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Fri, 17 Jun 2022 10:37:43 +0800 Subject: [PATCH] msg transfer handle change --- internal/msg_transfer/logic/init.go | 3 ++- internal/msg_transfer/logic/online_msg_to_mongo_handler.go | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/internal/msg_transfer/logic/init.go b/internal/msg_transfer/logic/init.go index 168bfe785..38d0e5b66 100644 --- a/internal/msg_transfer/logic/init.go +++ b/internal/msg_transfer/logic/init.go @@ -38,6 +38,7 @@ func Init() { w = new(sync.Mutex) persistentCH.Init() historyCH.Init(cmdCh) + historyMongoCH.Init() onlineTopicStatus = OnlineTopicVacancy //offlineHistoryCH.Init(cmdCh) statistics.NewStatistics(&singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval) @@ -53,7 +54,7 @@ func Run() { fmt.Println("not start mysql consumer") } go historyCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyCH) - //go historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyMongoCH) + go historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyMongoCH) //go offlineHistoryCH.historyConsumerGroup.RegisterHandleAndConsumer(&offlineHistoryCH) } func SetOnlineTopicStatus(status int) { diff --git a/internal/msg_transfer/logic/online_msg_to_mongo_handler.go b/internal/msg_transfer/logic/online_msg_to_mongo_handler.go index 1d71b8a1c..dae6571f8 100644 --- a/internal/msg_transfer/logic/online_msg_to_mongo_handler.go +++ b/internal/msg_transfer/logic/online_msg_to_mongo_handler.go @@ -18,7 +18,7 @@ type OnlineHistoryMongoConsumerHandler struct { historyConsumerGroup *kfk.MConsumerGroup } -func (och *OnlineHistoryMongoConsumerHandler) Init(cmdCh chan Cmd2Value) { +func (och *OnlineHistoryMongoConsumerHandler) Init() { och.msgHandle = make(map[string]fcb) och.msgHandle[config.Config.Kafka.MsgToMongo.Topic] = och.handleChatWs2Mongo och.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,