diff --git a/docker-compose.yaml b/docker-compose.yaml index 5b2c15712..81697ebb6 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -115,7 +115,7 @@ services: open_im_server: - image: openim/open_im_server:v2.3.1 + image: openim/open_im_server:v2.3.2 container_name: open_im_server volumes: - ./logs:/Open-IM-Server/logs diff --git a/internal/msg_transfer/logic/init.go b/internal/msg_transfer/logic/init.go index e5ad296ff..6b26ade63 100644 --- a/internal/msg_transfer/logic/init.go +++ b/internal/msg_transfer/logic/init.go @@ -40,9 +40,8 @@ var ( func Init() { cmdCh = make(chan Cmd2Value, 10000) w = new(sync.Mutex) - persistentCH.Init() // 订阅ws2mschat 消费到 mysql - historyCH.Init(cmdCh) // 订阅ws2mschat 如果可靠性存储 消费到 incrseq 再存入mongo 再push || 非可靠性 直接incr再push 初始化ws2mschat - historyMongoCH.Init() + persistentCH.Init() // ws2mschat save mysql + historyCH.Init(cmdCh) // onlineTopicStatus = OnlineTopicVacancy //offlineHistoryCH.Init(cmdCh) statistics.NewStatistics(&singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval) diff --git a/internal/msg_transfer/logic/persistent_msg_handler.go b/internal/msg_transfer/logic/persistent_msg_handler.go index d40ee497b..c02c02739 100644 --- a/internal/msg_transfer/logic/persistent_msg_handler.go +++ b/internal/msg_transfer/logic/persistent_msg_handler.go @@ -23,11 +23,7 @@ import ( ) var ( - // msgInsertMysqlProcessed perometheus.Countr - msgInsertMysqlProcessed = promauto.NewCounter(prometheus.CounterOpts{ - Name: "insert_mysql_msg_total", - Help: "The total number of msg insert mysql events", - }) + msgInsertMysqlProcessed prometheus.Counter ) type PersistentConsumerHandler struct { @@ -41,12 +37,12 @@ func (pc *PersistentConsumerHandler) Init() { pc.persistentConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic}, config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMySql) - // if config.Config.Prometheus.Enable { - // msgInsertMysqlProcessed = promauto.NewCounter(prometheus.CounterOpts{ - // Name: "insert_mysql_msg_total", - // Help: "The total number of msg insert mysql events", - // }) - // } + if config.Config.Prometheus.Enable { + msgInsertMysqlProcessed = promauto.NewCounter(prometheus.CounterOpts{ + Name: "insert_mysql_msg_total", + Help: "The total number of msg insert mysql events", + }) + } } func (pc *PersistentConsumerHandler) handleChatWs2Mysql(cMsg *sarama.ConsumerMessage, msgKey string, _ sarama.ConsumerGroupSession) {