diff --git a/internal/msg_transfer/logic/offline_history_msg_handler.go b/internal/msg_transfer/logic/offline_history_msg_handler.go index 2f63f2007..a2a7f7673 100644 --- a/internal/msg_transfer/logic/offline_history_msg_handler.go +++ b/internal/msg_transfer/logic/offline_history_msg_handler.go @@ -28,14 +28,15 @@ func (mc *OfflineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) { go mc.MessagesDistributionHandle() mc.cmdCh = cmdCh mc.msgCh = make(chan Cmd2Value, 1000) + for i := 0; i < ChannelNum; i++ { + mc.chArrays[i] = make(chan Cmd2Value, 1000) + go mc.Run(i) + } if config.Config.ReliableStorage { mc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = mc.handleChatWs2Mongo } else { mc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = mc.handleChatWs2MongoLowReliability - for i := 0; i < ChannelNum; i++ { - mc.chArrays[i] = make(chan Cmd2Value, 1000) - go mc.Run(i) - } + } mc.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschatOffline.Topic}, diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index 4773e1af3..6efef8124 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -48,14 +48,15 @@ func (och *OnlineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) { go och.MessagesDistributionHandle() och.cmdCh = cmdCh och.msgCh = make(chan Cmd2Value, 1000) + for i := 0; i < ChannelNum; i++ { + och.chArrays[i] = make(chan Cmd2Value, 1000) + go och.Run(i) + } if config.Config.ReliableStorage { och.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = och.handleChatWs2Mongo } else { och.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = och.handleChatWs2MongoLowReliability - for i := 0; i < ChannelNum; i++ { - och.chArrays[i] = make(chan Cmd2Value, 1000) - go och.Run(i) - } + } och.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic},