From 375fc0c89e19879043ed287aff3d8c1219d37d25 Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Wed, 14 Sep 2022 21:11:40 +0800 Subject: [PATCH] prometheus for statistics --- internal/msg_transfer/logic/init.go | 1 + .../msg_transfer/logic/persistent_msg_handler.go | 6 ++++-- pkg/common/db/batch_insert_chat.go | 15 +++++---------- 3 files changed, 10 insertions(+), 12 deletions(-) diff --git a/internal/msg_transfer/logic/init.go b/internal/msg_transfer/logic/init.go index b98f0e8a4..aa7ba0b9a 100644 --- a/internal/msg_transfer/logic/init.go +++ b/internal/msg_transfer/logic/init.go @@ -37,6 +37,7 @@ var ( func Init() { cmdCh = make(chan Cmd2Value, 10000) w = new(sync.Mutex) + initPrometheus() persistentCH.Init() // ws2mschat save mysql historyCH.Init(cmdCh) // historyMongoCH.Init() diff --git a/internal/msg_transfer/logic/persistent_msg_handler.go b/internal/msg_transfer/logic/persistent_msg_handler.go index 3c89820ec..3bfa99255 100644 --- a/internal/msg_transfer/logic/persistent_msg_handler.go +++ b/internal/msg_transfer/logic/persistent_msg_handler.go @@ -32,16 +32,18 @@ func (pc *PersistentConsumerHandler) Init() { pc.persistentConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic}, config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMySql) - pc.initPrometheus() + } -func (pc *PersistentConsumerHandler) initPrometheus() { +func initPrometheus() { promePkg.NewSeqGetSuccessCounter() promePkg.NewSeqGetFailedCounter() promePkg.NewSeqSetSuccessCounter() promePkg.NewSeqSetFailedCounter() promePkg.NewMsgInsertRedisSuccessCounter() promePkg.NewMsgInsertRedisFailedCounter() + promePkg.NewMsgInsertMongoSuccessCounter() + promePkg.NewMsgInsertMongoFailedCounter() } func (pc *PersistentConsumerHandler) handleChatWs2Mysql(cMsg *sarama.ConsumerMessage, msgKey string, _ sarama.ConsumerGroupSession) { diff --git a/pkg/common/db/batch_insert_chat.go b/pkg/common/db/batch_insert_chat.go index 51d573899..9be8f0874 100644 --- a/pkg/common/db/batch_insert_chat.go +++ b/pkg/common/db/batch_insert_chat.go @@ -15,10 +15,6 @@ import ( "go.mongodb.org/mongo-driver/mongo" ) -func (d *DataBases) BatchDeleteChat2DB(userID string, msgList []*pbMsg.MsgDataToMQ, operationID string) { - -} - func (d *DataBases) BatchInsertChat2DB(userID string, msgList []*pbMsg.MsgDataToMQ, operationID string, currentMaxSeq uint64) error { newTime := getCurrentTimestampByMill() if len(msgList) > GetSingleGocMsgNum() { @@ -86,10 +82,13 @@ func (d *DataBases) BatchInsertChat2DB(userID string, msgList []*pbMsg.MsgDataTo sChat.Msg = msgListToMongo log.NewDebug(operationID, "filter ", seqUid, "list ", msgListToMongo) if _, err = c.InsertOne(ctx, &sChat); err != nil { + promePkg.PromeInc(promePkg.MsgInsertMongoFailedCounter) log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat) return utils.Wrap(err, "") } + promePkg.PromeInc(promePkg.MsgInsertMongoSuccessCounter) } else { + promePkg.PromeInc(promePkg.MsgInsertMongoFailedCounter) log.Error(operationID, "FindOneAndUpdate failed ", err.Error(), filter) return utils.Wrap(err, "") } @@ -102,9 +101,11 @@ func (d *DataBases) BatchInsertChat2DB(userID string, msgList []*pbMsg.MsgDataTo sChat.Msg = msgListToMongoNext log.NewDebug(operationID, "filter ", seqUidNext, "list ", msgListToMongoNext, "userID: ", userID) if _, err = c.InsertOne(ctx, &sChat); err != nil { + promePkg.PromeInc(promePkg.MsgInsertMongoFailedCounter) log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat) return utils.Wrap(err, "") } + promePkg.PromeInc(promePkg.MsgInsertMongoSuccessCounter) } log.Debug(operationID, "batch mgo cost time ", getCurrentTimestampByMill()-newTime, userID, len(msgList)) return nil @@ -280,9 +281,3 @@ func (d *DataBases) BatchInsertChat2Cache(insertID string, msgList []*pbMsg.MsgD //func (d *DataBases)setMessageToCache(msgList []*pbMsg.MsgDataToMQ, uid string) (err error) { // //} - -func (d *DataBases) GetFromCacheAndInsertDB(msgUserIDPrefix string) { - //get value from redis - - //batch insert to db -}