diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 2be9d9743..e34e92982 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -58,16 +58,16 @@ func StartTransfer(prometheusPort int) error { extendMsgDatabase := controller.NewExtendMsgDatabase(extendMsgModel, extendMsgCache, tx.NewMongo(mongo.GetClient())) msgDatabase := controller.NewCommonMsgDatabase(msgDocModel, msgModel) conversationRpcClient := rpcclient.NewConversationClient(client) - - msgTransfer := NewMsgTransfer(chatLogDatabase, extendMsgDatabase, msgDatabase, conversationRpcClient) + groupRpcClient := rpcclient.NewGroupClient(client) + msgTransfer := NewMsgTransfer(chatLogDatabase, extendMsgDatabase, msgDatabase, conversationRpcClient, groupRpcClient) msgTransfer.initPrometheus() return msgTransfer.Start(prometheusPort) } func NewMsgTransfer(chatLogDatabase controller.ChatLogDatabase, extendMsgDatabase controller.ExtendMsgDatabase, msgDatabase controller.CommonMsgDatabase, - conversationRpcClient *rpcclient.ConversationClient) *MsgTransfer { - return &MsgTransfer{persistentCH: NewPersistentConsumerHandler(chatLogDatabase), historyCH: NewOnlineHistoryRedisConsumerHandler(msgDatabase, conversationRpcClient), + conversationRpcClient *rpcclient.ConversationClient, groupRpcClient *rpcclient.GroupClient) *MsgTransfer { + return &MsgTransfer{persistentCH: NewPersistentConsumerHandler(chatLogDatabase), historyCH: NewOnlineHistoryRedisConsumerHandler(msgDatabase, conversationRpcClient, groupRpcClient), historyMongoCH: NewOnlineHistoryMongoConsumerHandler(msgDatabase), modifyCH: NewModifyMsgConsumerHandler(extendMsgDatabase)} } diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 8984d7ded..a126bc9d0 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -63,7 +63,7 @@ type OnlineHistoryRedisConsumerHandler struct { groupRpcClient *rpcclient.GroupClient } -func NewOnlineHistoryRedisConsumerHandler(database controller.CommonMsgDatabase, conversationRpcClient *rpcclient.ConversationClient) *OnlineHistoryRedisConsumerHandler { +func NewOnlineHistoryRedisConsumerHandler(database controller.CommonMsgDatabase, conversationRpcClient *rpcclient.ConversationClient, groupRpcClient *rpcclient.GroupClient) *OnlineHistoryRedisConsumerHandler { var och OnlineHistoryRedisConsumerHandler och.msgDatabase = database och.msgDistributionCh = make(chan Cmd2Value) //no buffer channel @@ -73,6 +73,7 @@ func NewOnlineHistoryRedisConsumerHandler(database controller.CommonMsgDatabase, go och.Run(i) } och.conversationRpcClient = conversationRpcClient + och.groupRpcClient = groupRpcClient och.historyConsumerGroup = kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic}, config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToRedis) diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index b82b7d81a..526b80c7b 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -168,7 +168,6 @@ func (m *msgServer) PullMessageBySeqs(ctx context.Context, req *sdkws.PullMessag continue } resp.Msgs[seq.ConversationID] = &sdkws.PullMsgs{Msgs: notificationMsgs} - } } return resp, nil