mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-24 02:16:16 +08:00
consumer update
This commit is contained in:
parent
ed3a1296d8
commit
7fc6b61f21
@ -38,7 +38,7 @@ func (mc *OfflineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) {
|
|||||||
mc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = mc.handleChatWs2MongoLowReliability
|
mc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = mc.handleChatWs2MongoLowReliability
|
||||||
|
|
||||||
}
|
}
|
||||||
mc.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0,
|
mc.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
|
||||||
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschatOffline.Topic},
|
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschatOffline.Topic},
|
||||||
config.Config.Kafka.Ws2mschatOffline.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongoOffline)
|
config.Config.Kafka.Ws2mschatOffline.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongoOffline)
|
||||||
|
|
||||||
|
@ -26,7 +26,7 @@ type PersistentConsumerHandler struct {
|
|||||||
func (pc *PersistentConsumerHandler) Init() {
|
func (pc *PersistentConsumerHandler) Init() {
|
||||||
pc.msgHandle = make(map[string]fcb)
|
pc.msgHandle = make(map[string]fcb)
|
||||||
pc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = pc.handleChatWs2Mysql
|
pc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = pc.handleChatWs2Mysql
|
||||||
pc.persistentConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0,
|
pc.persistentConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
|
||||||
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic},
|
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic},
|
||||||
config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMySql)
|
config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMySql)
|
||||||
|
|
||||||
|
@ -26,7 +26,7 @@ type PushConsumerHandler struct {
|
|||||||
func (ms *PushConsumerHandler) Init() {
|
func (ms *PushConsumerHandler) Init() {
|
||||||
ms.msgHandle = make(map[string]fcb)
|
ms.msgHandle = make(map[string]fcb)
|
||||||
ms.msgHandle[config.Config.Kafka.Ms2pschat.Topic] = ms.handleMs2PsChat
|
ms.msgHandle[config.Config.Kafka.Ms2pschat.Topic] = ms.handleMs2PsChat
|
||||||
ms.pushConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0,
|
ms.pushConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
|
||||||
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ms2pschat.Topic}, config.Config.Kafka.Ms2pschat.Addr,
|
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ms2pschat.Topic}, config.Config.Kafka.Ms2pschat.Addr,
|
||||||
config.Config.Kafka.ConsumerGroupID.MsgToPush)
|
config.Config.Kafka.ConsumerGroupID.MsgToPush)
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user