mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-08-10 04:59:49 +08:00
prometheus for statistics
This commit is contained in:
parent
a110b4c26b
commit
375fc0c89e
@ -37,6 +37,7 @@ var (
|
|||||||
func Init() {
|
func Init() {
|
||||||
cmdCh = make(chan Cmd2Value, 10000)
|
cmdCh = make(chan Cmd2Value, 10000)
|
||||||
w = new(sync.Mutex)
|
w = new(sync.Mutex)
|
||||||
|
initPrometheus()
|
||||||
persistentCH.Init() // ws2mschat save mysql
|
persistentCH.Init() // ws2mschat save mysql
|
||||||
historyCH.Init(cmdCh) //
|
historyCH.Init(cmdCh) //
|
||||||
historyMongoCH.Init()
|
historyMongoCH.Init()
|
||||||
|
@ -32,16 +32,18 @@ func (pc *PersistentConsumerHandler) Init() {
|
|||||||
pc.persistentConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
|
pc.persistentConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
|
||||||
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic},
|
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic},
|
||||||
config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMySql)
|
config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMySql)
|
||||||
pc.initPrometheus()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pc *PersistentConsumerHandler) initPrometheus() {
|
func initPrometheus() {
|
||||||
promePkg.NewSeqGetSuccessCounter()
|
promePkg.NewSeqGetSuccessCounter()
|
||||||
promePkg.NewSeqGetFailedCounter()
|
promePkg.NewSeqGetFailedCounter()
|
||||||
promePkg.NewSeqSetSuccessCounter()
|
promePkg.NewSeqSetSuccessCounter()
|
||||||
promePkg.NewSeqSetFailedCounter()
|
promePkg.NewSeqSetFailedCounter()
|
||||||
promePkg.NewMsgInsertRedisSuccessCounter()
|
promePkg.NewMsgInsertRedisSuccessCounter()
|
||||||
promePkg.NewMsgInsertRedisFailedCounter()
|
promePkg.NewMsgInsertRedisFailedCounter()
|
||||||
|
promePkg.NewMsgInsertMongoSuccessCounter()
|
||||||
|
promePkg.NewMsgInsertMongoFailedCounter()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pc *PersistentConsumerHandler) handleChatWs2Mysql(cMsg *sarama.ConsumerMessage, msgKey string, _ sarama.ConsumerGroupSession) {
|
func (pc *PersistentConsumerHandler) handleChatWs2Mysql(cMsg *sarama.ConsumerMessage, msgKey string, _ sarama.ConsumerGroupSession) {
|
||||||
|
@ -15,10 +15,6 @@ import (
|
|||||||
"go.mongodb.org/mongo-driver/mongo"
|
"go.mongodb.org/mongo-driver/mongo"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (d *DataBases) BatchDeleteChat2DB(userID string, msgList []*pbMsg.MsgDataToMQ, operationID string) {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *DataBases) BatchInsertChat2DB(userID string, msgList []*pbMsg.MsgDataToMQ, operationID string, currentMaxSeq uint64) error {
|
func (d *DataBases) BatchInsertChat2DB(userID string, msgList []*pbMsg.MsgDataToMQ, operationID string, currentMaxSeq uint64) error {
|
||||||
newTime := getCurrentTimestampByMill()
|
newTime := getCurrentTimestampByMill()
|
||||||
if len(msgList) > GetSingleGocMsgNum() {
|
if len(msgList) > GetSingleGocMsgNum() {
|
||||||
@ -86,10 +82,13 @@ func (d *DataBases) BatchInsertChat2DB(userID string, msgList []*pbMsg.MsgDataTo
|
|||||||
sChat.Msg = msgListToMongo
|
sChat.Msg = msgListToMongo
|
||||||
log.NewDebug(operationID, "filter ", seqUid, "list ", msgListToMongo)
|
log.NewDebug(operationID, "filter ", seqUid, "list ", msgListToMongo)
|
||||||
if _, err = c.InsertOne(ctx, &sChat); err != nil {
|
if _, err = c.InsertOne(ctx, &sChat); err != nil {
|
||||||
|
promePkg.PromeInc(promePkg.MsgInsertMongoFailedCounter)
|
||||||
log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat)
|
log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat)
|
||||||
return utils.Wrap(err, "")
|
return utils.Wrap(err, "")
|
||||||
}
|
}
|
||||||
|
promePkg.PromeInc(promePkg.MsgInsertMongoSuccessCounter)
|
||||||
} else {
|
} else {
|
||||||
|
promePkg.PromeInc(promePkg.MsgInsertMongoFailedCounter)
|
||||||
log.Error(operationID, "FindOneAndUpdate failed ", err.Error(), filter)
|
log.Error(operationID, "FindOneAndUpdate failed ", err.Error(), filter)
|
||||||
return utils.Wrap(err, "")
|
return utils.Wrap(err, "")
|
||||||
}
|
}
|
||||||
@ -102,9 +101,11 @@ func (d *DataBases) BatchInsertChat2DB(userID string, msgList []*pbMsg.MsgDataTo
|
|||||||
sChat.Msg = msgListToMongoNext
|
sChat.Msg = msgListToMongoNext
|
||||||
log.NewDebug(operationID, "filter ", seqUidNext, "list ", msgListToMongoNext, "userID: ", userID)
|
log.NewDebug(operationID, "filter ", seqUidNext, "list ", msgListToMongoNext, "userID: ", userID)
|
||||||
if _, err = c.InsertOne(ctx, &sChat); err != nil {
|
if _, err = c.InsertOne(ctx, &sChat); err != nil {
|
||||||
|
promePkg.PromeInc(promePkg.MsgInsertMongoFailedCounter)
|
||||||
log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat)
|
log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat)
|
||||||
return utils.Wrap(err, "")
|
return utils.Wrap(err, "")
|
||||||
}
|
}
|
||||||
|
promePkg.PromeInc(promePkg.MsgInsertMongoSuccessCounter)
|
||||||
}
|
}
|
||||||
log.Debug(operationID, "batch mgo cost time ", getCurrentTimestampByMill()-newTime, userID, len(msgList))
|
log.Debug(operationID, "batch mgo cost time ", getCurrentTimestampByMill()-newTime, userID, len(msgList))
|
||||||
return nil
|
return nil
|
||||||
@ -280,9 +281,3 @@ func (d *DataBases) BatchInsertChat2Cache(insertID string, msgList []*pbMsg.MsgD
|
|||||||
//func (d *DataBases)setMessageToCache(msgList []*pbMsg.MsgDataToMQ, uid string) (err error) {
|
//func (d *DataBases)setMessageToCache(msgList []*pbMsg.MsgDataToMQ, uid string) (err error) {
|
||||||
//
|
//
|
||||||
//}
|
//}
|
||||||
|
|
||||||
func (d *DataBases) GetFromCacheAndInsertDB(msgUserIDPrefix string) {
|
|
||||||
//get value from redis
|
|
||||||
|
|
||||||
//batch insert to db
|
|
||||||
}
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user