mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-25 11:06:43 +08:00
group
This commit is contained in:
parent
b89d5ec0ab
commit
b004811809
@ -58,16 +58,16 @@ func StartTransfer(prometheusPort int) error {
|
|||||||
extendMsgDatabase := controller.NewExtendMsgDatabase(extendMsgModel, extendMsgCache, tx.NewMongo(mongo.GetClient()))
|
extendMsgDatabase := controller.NewExtendMsgDatabase(extendMsgModel, extendMsgCache, tx.NewMongo(mongo.GetClient()))
|
||||||
msgDatabase := controller.NewCommonMsgDatabase(msgDocModel, msgModel)
|
msgDatabase := controller.NewCommonMsgDatabase(msgDocModel, msgModel)
|
||||||
conversationRpcClient := rpcclient.NewConversationClient(client)
|
conversationRpcClient := rpcclient.NewConversationClient(client)
|
||||||
|
groupRpcClient := rpcclient.NewGroupClient(client)
|
||||||
msgTransfer := NewMsgTransfer(chatLogDatabase, extendMsgDatabase, msgDatabase, conversationRpcClient)
|
msgTransfer := NewMsgTransfer(chatLogDatabase, extendMsgDatabase, msgDatabase, conversationRpcClient, groupRpcClient)
|
||||||
msgTransfer.initPrometheus()
|
msgTransfer.initPrometheus()
|
||||||
return msgTransfer.Start(prometheusPort)
|
return msgTransfer.Start(prometheusPort)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMsgTransfer(chatLogDatabase controller.ChatLogDatabase,
|
func NewMsgTransfer(chatLogDatabase controller.ChatLogDatabase,
|
||||||
extendMsgDatabase controller.ExtendMsgDatabase, msgDatabase controller.CommonMsgDatabase,
|
extendMsgDatabase controller.ExtendMsgDatabase, msgDatabase controller.CommonMsgDatabase,
|
||||||
conversationRpcClient *rpcclient.ConversationClient) *MsgTransfer {
|
conversationRpcClient *rpcclient.ConversationClient, groupRpcClient *rpcclient.GroupClient) *MsgTransfer {
|
||||||
return &MsgTransfer{persistentCH: NewPersistentConsumerHandler(chatLogDatabase), historyCH: NewOnlineHistoryRedisConsumerHandler(msgDatabase, conversationRpcClient),
|
return &MsgTransfer{persistentCH: NewPersistentConsumerHandler(chatLogDatabase), historyCH: NewOnlineHistoryRedisConsumerHandler(msgDatabase, conversationRpcClient, groupRpcClient),
|
||||||
historyMongoCH: NewOnlineHistoryMongoConsumerHandler(msgDatabase), modifyCH: NewModifyMsgConsumerHandler(extendMsgDatabase)}
|
historyMongoCH: NewOnlineHistoryMongoConsumerHandler(msgDatabase), modifyCH: NewModifyMsgConsumerHandler(extendMsgDatabase)}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -63,7 +63,7 @@ type OnlineHistoryRedisConsumerHandler struct {
|
|||||||
groupRpcClient *rpcclient.GroupClient
|
groupRpcClient *rpcclient.GroupClient
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewOnlineHistoryRedisConsumerHandler(database controller.CommonMsgDatabase, conversationRpcClient *rpcclient.ConversationClient) *OnlineHistoryRedisConsumerHandler {
|
func NewOnlineHistoryRedisConsumerHandler(database controller.CommonMsgDatabase, conversationRpcClient *rpcclient.ConversationClient, groupRpcClient *rpcclient.GroupClient) *OnlineHistoryRedisConsumerHandler {
|
||||||
var och OnlineHistoryRedisConsumerHandler
|
var och OnlineHistoryRedisConsumerHandler
|
||||||
och.msgDatabase = database
|
och.msgDatabase = database
|
||||||
och.msgDistributionCh = make(chan Cmd2Value) //no buffer channel
|
och.msgDistributionCh = make(chan Cmd2Value) //no buffer channel
|
||||||
@ -73,6 +73,7 @@ func NewOnlineHistoryRedisConsumerHandler(database controller.CommonMsgDatabase,
|
|||||||
go och.Run(i)
|
go och.Run(i)
|
||||||
}
|
}
|
||||||
och.conversationRpcClient = conversationRpcClient
|
och.conversationRpcClient = conversationRpcClient
|
||||||
|
och.groupRpcClient = groupRpcClient
|
||||||
och.historyConsumerGroup = kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
|
och.historyConsumerGroup = kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
|
||||||
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic},
|
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic},
|
||||||
config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToRedis)
|
config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToRedis)
|
||||||
|
@ -168,7 +168,6 @@ func (m *msgServer) PullMessageBySeqs(ctx context.Context, req *sdkws.PullMessag
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
resp.Msgs[seq.ConversationID] = &sdkws.PullMsgs{Msgs: notificationMsgs}
|
resp.Msgs[seq.ConversationID] = &sdkws.PullMsgs{Msgs: notificationMsgs}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return resp, nil
|
return resp, nil
|
||||||
|
Loading…
x
Reference in New Issue
Block a user