From 979100850cc8ba476b94b6bd373ab01988a6788d Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Thu, 15 Sep 2022 12:07:28 +0800 Subject: [PATCH] prometheus for statistics --- internal/msg_gateway/gate/init.go | 3 +- internal/msg_gateway/gate/logic.go | 4 + internal/msg_gateway/gate/relay_rpc_server.go | 12 ++ internal/push/logic/init.go | 5 + internal/push/logic/push_to_client.go | 6 +- internal/rpc/msg/pull_message.go | 15 +- internal/rpc/msg/rpcChat.go | 46 +++-- internal/rpc/msg/send_msg.go | 16 ++ pkg/common/db/RedisModel.go | 1 - pkg/common/prometheus/gather.go | 165 +++++++++++++++++- pkg/common/prometheus/prometheus.go | 1 - 11 files changed, 244 insertions(+), 30 deletions(-) diff --git a/internal/msg_gateway/gate/init.go b/internal/msg_gateway/gate/init.go index 8269a03b0..88ef3d12d 100644 --- a/internal/msg_gateway/gate/init.go +++ b/internal/msg_gateway/gate/init.go @@ -27,14 +27,13 @@ var ( ) func Init(rpcPort, wsPort int) { - //log initialization - rwLock = new(sync.RWMutex) validate = validator.New() statistics.NewStatistics(&sendMsgAllCount, config.Config.ModuleName.LongConnSvrName, fmt.Sprintf("%d second recv to msg_gateway sendMsgCount", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval) statistics.NewStatistics(&userCount, config.Config.ModuleName.LongConnSvrName, fmt.Sprintf("%d second add user conn", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval) ws.onInit(wsPort) rpcSvr.onInit(rpcPort) + initPrometheus() } func Run(promethuesPort int) { diff --git a/internal/msg_gateway/gate/logic.go b/internal/msg_gateway/gate/logic.go index 2ee9be712..7b77840a7 100644 --- a/internal/msg_gateway/gate/logic.go +++ b/internal/msg_gateway/gate/logic.go @@ -5,6 +5,7 @@ import ( "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db" "Open_IM/pkg/common/log" + promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/grpc-etcdv3/getcdv3" pbChat "Open_IM/pkg/proto/msg" pbRtc "Open_IM/pkg/proto/rtc" @@ -43,15 +44,18 @@ func (ws *WServer) msgParse(conn *UserConn, binaryMsg []byte) { case constant.WSGetNewestSeq: log.NewInfo(m.OperationID, "getSeqReq ", m.SendID, m.MsgIncr, m.ReqIdentifier) ws.getSeqReq(conn, &m) + promePkg.PromeInc(promePkg.GetNewestSeqTotalCounter) case constant.WSSendMsg: log.NewInfo(m.OperationID, "sendMsgReq ", m.SendID, m.MsgIncr, m.ReqIdentifier) ws.sendMsgReq(conn, &m) + promePkg.PromeInc(promePkg.MsgRecvTotalCounter) case constant.WSSendSignalMsg: log.NewInfo(m.OperationID, "sendSignalMsgReq ", m.SendID, m.MsgIncr, m.ReqIdentifier) ws.sendSignalMsgReq(conn, &m) case constant.WSPullMsgBySeqList: log.NewInfo(m.OperationID, "pullMsgBySeqListReq ", m.SendID, m.MsgIncr, m.ReqIdentifier) ws.pullMsgBySeqListReq(conn, &m) + promePkg.PromeInc(promePkg.PullMsgBySeqListTotalCounter) case constant.WsLogoutMsg: log.NewInfo(m.OperationID, "conn.Close()", m.SendID, m.MsgIncr, m.ReqIdentifier) // conn.Close() diff --git a/internal/msg_gateway/gate/relay_rpc_server.go b/internal/msg_gateway/gate/relay_rpc_server.go index 406cb991e..35636a762 100644 --- a/internal/msg_gateway/gate/relay_rpc_server.go +++ b/internal/msg_gateway/gate/relay_rpc_server.go @@ -4,6 +4,7 @@ import ( "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/log" + promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/common/token_verify" "Open_IM/pkg/grpc-etcdv3/getcdv3" pbRelay "Open_IM/pkg/proto/relay" @@ -31,6 +32,16 @@ type RPCServer struct { target string } +func initPrometheus() { + promePkg.NewMsgRecvTotalCounter() + promePkg.NewGetNewestSeqTotalCounter() + promePkg.NewPullMsgBySeqListTotalCounter() + promePkg.NewMsgOnlinePushSuccessCounter() + //promePkg.NewSingleChatMsgRecvSuccessCounter() + //promePkg.NewGroupChatMsgRecvSuccessCounter() + //promePkg.NewWorkSuperGroupChatMsgRecvSuccessCounter() +} + func (r *RPCServer) onInit(rpcPort int) { r.rpcPort = rpcPort r.rpcRegisterName = config.Config.RpcRegisterName.OpenImRelayName @@ -174,6 +185,7 @@ func (r *RPCServer) SuperGroupOnlineBatchPushOneMsg(_ context.Context, req *pbRe resultCode := sendMsgBatchToUser(userConn, replyBytes.Bytes(), req, platform, v) if resultCode == 0 && utils.IsContainInt(platform, r.pushTerminal) { tempT.OnlinePush = true + promePkg.PromeInc(promePkg.MsgOnlinePushSuccessCounter) log.Info(req.OperationID, "PushSuperMsgToUser is success By Ws", "args", req.String(), "recvPlatForm", constant.PlatformIDToName(platform), "recvID", v) temp := &pbRelay.SingleMsgToUserPlatform{ ResultCode: resultCode, diff --git a/internal/push/logic/init.go b/internal/push/logic/init.go index ce996d3c5..81ed48087 100644 --- a/internal/push/logic/init.go +++ b/internal/push/logic/init.go @@ -47,6 +47,11 @@ func init() { } } +func initPrometheus() { + promePkg.NewMsgOfflinePushSuccessCounter() + promePkg.NewMsgOfflinePushFailedCounter() +} + func Run(promethuesPort int) { go rpcServer.run() go pushCh.pushConsumerGroup.RegisterHandleAndConsumer(&pushCh) diff --git a/internal/push/logic/push_to_client.go b/internal/push/logic/push_to_client.go index f129ad385..634d2e008 100644 --- a/internal/push/logic/push_to_client.go +++ b/internal/push/logic/push_to_client.go @@ -21,6 +21,7 @@ import ( "context" "strings" + promePkg "Open_IM/pkg/common/prometheus" "github.com/golang/protobuf/proto" ) @@ -140,8 +141,10 @@ func MsgToUser(pushMsg *pbPush.PushMsgReq) { } pushResult, err := offlinePusher.Push(UIDList, title, detailContent, pushMsg.OperationID, opts) if err != nil { + promePkg.PromeInc(promePkg.MsgOfflinePushFailedCounter) log.NewError(pushMsg.OperationID, "offline push error", pushMsg.String(), err.Error()) } else { + promePkg.PromeInc(promePkg.MsgOfflinePushSuccessCounter) log.NewDebug(pushMsg.OperationID, "offline push return result is ", pushResult, pushMsg.MsgData) } } @@ -261,12 +264,13 @@ func MsgToSuperGroupUser(pushMsg *pbPush.PushMsgReq) { } pushResult, err := offlinePusher.Push(needOfflinePushUserIDList, title, detailContent, pushMsg.OperationID, opts) if err != nil { + promePkg.PromeInc(promePkg.MsgOfflinePushFailedCounter) log.NewError(pushMsg.OperationID, "offline push error", pushMsg.String(), err.Error()) } else { + promePkg.PromeInc(promePkg.MsgOfflinePushSuccessCounter) log.NewDebug(pushMsg.OperationID, "offline push return result is ", pushResult, pushMsg.MsgData) } } - } } diff --git a/internal/rpc/msg/pull_message.go b/internal/rpc/msg/pull_message.go index 4fcf9f9f7..1a99a14e6 100644 --- a/internal/rpc/msg/pull_message.go +++ b/internal/rpc/msg/pull_message.go @@ -8,6 +8,8 @@ import ( commonDB "Open_IM/pkg/common/db" "Open_IM/pkg/common/log" open_im_sdk "Open_IM/pkg/proto/sdk_ws" + + promePkg "Open_IM/pkg/common/prometheus" ) func (rpc *rpcChat) GetMaxAndMinSeq(_ context.Context, in *open_im_sdk.GetMaxAndMinSeqReq) (*open_im_sdk.GetMaxAndMinSeqResp, error) { @@ -48,57 +50,62 @@ func (rpc *rpcChat) PullMessageBySeqList(_ context.Context, in *open_im_sdk.Pull log.NewInfo(in.OperationID, "rpc PullMessageBySeqList is arriving", in.String()) resp := new(open_im_sdk.PullMessageBySeqListResp) m := make(map[string]*open_im_sdk.MsgDataList) - //msgList, err := commonDB.DB.GetMsgBySeqList(in.UserID, in.SeqList, in.OperationID) redisMsgList, failedSeqList, err := commonDB.DB.GetMessageListBySeq(in.UserID, in.SeqList, in.OperationID) if err != nil { if err != go_redis.Nil { + promePkg.PromeAdd(promePkg.MsgPullFromRedisFailedCounter, len(failedSeqList)) log.Error(in.OperationID, "get message from redis exception", err.Error(), failedSeqList) } else { log.Debug(in.OperationID, "get message from redis is nil", failedSeqList) } msgList, err1 := commonDB.DB.GetMsgBySeqListMongo2(in.UserID, failedSeqList, in.OperationID) if err1 != nil { + promePkg.PromeAdd(promePkg.MsgPullFromMongoFailedCounter, len(failedSeqList)) log.Error(in.OperationID, "PullMessageBySeqList data error", in.String(), err.Error()) resp.ErrCode = 201 resp.ErrMsg = err.Error() return resp, nil } else { + promePkg.PromeAdd(promePkg.MsgPullFromMongoSuccessCounter, len(msgList)) redisMsgList = append(redisMsgList, msgList...) resp.List = redisMsgList } } else { + promePkg.PromeAdd(promePkg.MsgPullFromRedisSuccessCounter, len(redisMsgList)) resp.List = redisMsgList } + for k, v := range in.GroupSeqList { x := new(open_im_sdk.MsgDataList) redisMsgList, failedSeqList, err := commonDB.DB.GetMessageListBySeq(k, v.SeqList, in.OperationID) if err != nil { if err != go_redis.Nil { + promePkg.PromeAdd(promePkg.MsgPullFromRedisFailedCounter, len(failedSeqList)) log.Error(in.OperationID, "get message from redis exception", err.Error(), failedSeqList) } else { log.Debug(in.OperationID, "get message from redis is nil", failedSeqList) } msgList, err1 := commonDB.DB.GetSuperGroupMsgBySeqListMongo(k, failedSeqList, in.OperationID) if err1 != nil { + promePkg.PromeAdd(promePkg.MsgPullFromMongoFailedCounter, len(failedSeqList)) log.Error(in.OperationID, "PullMessageBySeqList data error", in.String(), err.Error()) resp.ErrCode = 201 resp.ErrMsg = err.Error() return resp, nil } else { + promePkg.PromeAdd(promePkg.MsgPullFromMongoSuccessCounter, len(msgList)) redisMsgList = append(redisMsgList, msgList...) x.MsgDataList = redisMsgList m[k] = x } } else { + promePkg.PromeAdd(promePkg.MsgPullFromRedisSuccessCounter, len(redisMsgList)) x.MsgDataList = redisMsgList m[k] = x } } resp.GroupMsgDataList = m - //respSingleMsgFormat = singleMsgHandleByUser(SingleMsgFormat, in.UserID) - //respGroupMsgFormat = groupMsgHandleByUser(GroupMsgFormat) return resp, nil - } type MsgFormats []*open_im_sdk.MsgData diff --git a/internal/rpc/msg/rpcChat.go b/internal/rpc/msg/rpcChat.go index 2c4c6270c..536a5897b 100644 --- a/internal/rpc/msg/rpcChat.go +++ b/internal/rpc/msg/rpcChat.go @@ -13,15 +13,14 @@ import ( "strconv" "strings" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" + promePkg "Open_IM/pkg/common/prometheus" "google.golang.org/grpc" ) -var ( - sendMsgSuccessCounter prometheus.Counter - sendMsgFailedCounter prometheus.Counter -) +//var ( +// sendMsgSuccessCounter prometheus.Counter +// sendMsgFailedCounter prometheus.Counter +//) type rpcChat struct { rpcPort int @@ -55,14 +54,29 @@ func NewRpcChatServer(port int) *rpcChat { } func (rpc *rpcChat) initPrometheus() { - sendMsgSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "send_msg_success", - Help: "The number of send msg success", - }) - sendMsgFailedCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "send_msg_failed", - Help: "The number of send msg failed", - }) + //sendMsgSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ + // Name: "send_msg_success", + // Help: "The number of send msg success", + //}) + //sendMsgFailedCounter = promauto.NewCounter(prometheus.CounterOpts{ + // Name: "send_msg_failed", + // Help: "The number of send msg failed", + //}) + promePkg.NewMsgPullFromRedisSuccessCounter() + promePkg.NewMsgPullFromRedisFailedCounter() + promePkg.NewMsgPullFromMongoSuccessCounter() + promePkg.NewMsgPullFromMongoFailedCounter() + + promePkg.NewSingleChatMsgRecvSuccessCounter() + promePkg.NewGroupChatMsgRecvSuccessCounter() + promePkg.NewWorkSuperGroupChatMsgRecvSuccessCounter() + + promePkg.NewSingleChatMsgProcessSuccessCounter() + promePkg.NewSingleChatMsgProcessFailedCounter() + promePkg.NewGroupChatMsgProcessSuccessCounter() + promePkg.NewGroupChatMsgProcessFailedCounter() + promePkg.NewWorkSuperGroupChatMsgProcessSuccessCounter() + promePkg.NewWorkSuperGroupChatMsgProcessFailedCounter() } func (rpc *rpcChat) Run() { @@ -97,9 +111,7 @@ func (rpc *rpcChat) Run() { panic(utils.Wrap(err, "register chat module rpc to etcd err")) } go rpc.runCh() - if config.Config.Prometheus.Enable { - rpc.initPrometheus() - } + rpc.initPrometheus() err = srv.Serve(listener) if err != nil { log.Error("", "rpc rpcChat failed ", err.Error()) diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index 876f2cda7..dbc1ec6eb 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -23,6 +23,7 @@ import ( "sync" "time" + promePkg "Open_IM/pkg/common/prometheus" go_redis "github.com/go-redis/redis/v8" "github.com/golang/protobuf/proto" ) @@ -270,6 +271,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S } switch pb.MsgData.SessionType { case constant.SingleChatType: + promePkg.PromeInc(promePkg.SingleChatMsgRecvSuccessCounter) // callback t1 = time.Now() callbackResp := callbackBeforeSendSingleMsg(pb) @@ -282,6 +284,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S callbackResp.ErrCode = 201 } log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSingleMsg result", "end rpc and return", callbackResp) + promePkg.PromeInc(promePkg.SingleChatMsgProcessFailedCounter) return returnMsg(&replay, pb, int32(callbackResp.ErrCode), callbackResp.ErrMsg, "", 0) } t1 = time.Now() @@ -295,6 +298,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S log.Info(pb.OperationID, "sendMsgToKafka ", " cost time: ", time.Since(t1)) if err1 != nil { log.NewError(msgToMQSingle.OperationID, "kafka send msg err :RecvID", msgToMQSingle.MsgData.RecvID, msgToMQSingle.String(), err1.Error()) + promePkg.PromeInc(promePkg.SingleChatMsgProcessFailedCounter) return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) } } @@ -304,6 +308,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S log.Info(pb.OperationID, "sendMsgToKafka ", " cost time: ", time.Since(t1)) if err2 != nil { log.NewError(msgToMQSingle.OperationID, "kafka send msg err:SendID", msgToMQSingle.MsgData.SendID, msgToMQSingle.String()) + promePkg.PromeInc(promePkg.SingleChatMsgProcessFailedCounter) return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) } } @@ -315,9 +320,11 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackAfterSendSingleMsg resp: ", callbackResp) } log.Debug(pb.OperationID, "send msg cost time all: ", db.GetCurrentTimestampByMill()-newTime, pb.MsgData.ClientMsgID) + promePkg.PromeInc(promePkg.SingleChatMsgProcessSuccessCounter) return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime) case constant.GroupChatType: // callback + promePkg.PromeInc(promePkg.GroupChatMsgRecvSuccessCounter) callbackResp := callbackBeforeSendGroupMsg(pb) if callbackResp.ErrCode != 0 { log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendGroupMsg resp:", callbackResp) @@ -327,10 +334,12 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S callbackResp.ErrCode = 201 } log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSingleMsg result", "end rpc and return", callbackResp) + promePkg.PromeInc(promePkg.GroupChatMsgProcessFailedCounter) return returnMsg(&replay, pb, int32(callbackResp.ErrCode), callbackResp.ErrMsg, "", 0) } var memberUserIDList []string if flag, errCode, errMsg, memberUserIDList = messageVerification(pb); !flag { + promePkg.PromeInc(promePkg.GroupChatMsgProcessFailedCounter) return returnMsg(&replay, pb, errCode, errMsg, "", 0) } log.Debug(pb.OperationID, "GetGroupAllMember userID list", memberUserIDList, "len: ", len(memberUserIDList)) @@ -395,6 +404,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S } if !sendTag { log.NewWarn(pb.OperationID, "send tag is ", sendTag) + promePkg.PromeInc(promePkg.GroupChatMsgProcessFailedCounter) return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) } else { if pb.MsgData.ContentType == constant.AtText { @@ -459,6 +469,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S }() } log.Debug(pb.OperationID, "send msg cost time3 ", db.GetCurrentTimestampByMill()-newTime, pb.MsgData.ClientMsgID) + promePkg.PromeInc(promePkg.GroupChatMsgProcessSuccessCounter) return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime) } case constant.NotificationChatType: @@ -481,6 +492,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S log.Debug(pb.OperationID, "send msg cost time ", db.GetCurrentTimestampByMill()-newTime, pb.MsgData.ClientMsgID) return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime) case constant.SuperGroupChatType: + promePkg.PromeInc(promePkg.WorkSuperGroupChatMsgRecvSuccessCounter) // callback callbackResp := callbackBeforeSendGroupMsg(pb) if callbackResp.ErrCode != 0 { @@ -490,10 +502,12 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S if callbackResp.ErrCode == 0 { callbackResp.ErrCode = 201 } + promePkg.PromeInc(promePkg.WorkSuperGroupChatMsgProcessFailedCounter) log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSuperGroupMsg result", "end rpc and return", callbackResp) return returnMsg(&replay, pb, int32(callbackResp.ErrCode), callbackResp.ErrMsg, "", 0) } if flag, errCode, errMsg, _ = messageVerification(pb); !flag { + promePkg.PromeInc(promePkg.WorkSuperGroupChatMsgProcessFailedCounter) return returnMsg(&replay, pb, errCode, errMsg, "", 0) } msgToMQSingle.MsgData = pb.MsgData @@ -501,6 +515,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S err1 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.GroupID, constant.OnlineStatus) if err1 != nil { log.NewError(msgToMQSingle.OperationID, "kafka send msg err:RecvID", msgToMQSingle.MsgData.RecvID, msgToMQSingle.String()) + promePkg.PromeInc(promePkg.WorkSuperGroupChatMsgProcessFailedCounter) return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) } // callback @@ -508,6 +523,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S if callbackResp.ErrCode != 0 { log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackAfterSendSuperGroupMsg resp: ", callbackResp) } + promePkg.PromeInc(promePkg.WorkSuperGroupChatMsgProcessSuccessCounter) return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime) default: diff --git a/pkg/common/db/RedisModel.go b/pkg/common/db/RedisModel.go index 63be03533..d36202367 100644 --- a/pkg/common/db/RedisModel.go +++ b/pkg/common/db/RedisModel.go @@ -182,7 +182,6 @@ func (d *DataBases) GetMessageListBySeq(userID string, seqList []uint32, operati for _, v := range seqList { //MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1 key := messageCache + userID + "_" + strconv.Itoa(int(v)) - result, err := d.RDB.Get(context.Background(), key).Result() if err != nil { errResult = err diff --git a/pkg/common/prometheus/gather.go b/pkg/common/prometheus/gather.go index 534a27d68..471de8d0f 100644 --- a/pkg/common/prometheus/gather.go +++ b/pkg/common/prometheus/gather.go @@ -9,16 +9,43 @@ var ( UserLoginCounter prometheus.Counter UserRegisterCounter prometheus.Counter + //seg SeqGetSuccessCounter prometheus.Counter SeqGetFailedCounter prometheus.Counter SeqSetSuccessCounter prometheus.Counter SeqSetFailedCounter prometheus.Counter - MsgInsertRedisSuccessCounter prometheus.Counter - MsgInsertRedisFailedCounter prometheus.Counter + //msg-db + MsgInsertRedisSuccessCounter prometheus.Counter + MsgInsertRedisFailedCounter prometheus.Counter + MsgInsertMongoSuccessCounter prometheus.Counter + MsgInsertMongoFailedCounter prometheus.Counter + MsgPullFromRedisSuccessCounter prometheus.Counter + MsgPullFromRedisFailedCounter prometheus.Counter + MsgPullFromMongoSuccessCounter prometheus.Counter + MsgPullFromMongoFailedCounter prometheus.Counter - MsgInsertMongoSuccessCounter prometheus.Counter - MsgInsertMongoFailedCounter prometheus.Counter + //msg-ws + MsgRecvTotalCounter prometheus.Counter + GetNewestSeqTotalCounter prometheus.Counter + PullMsgBySeqListTotalCounter prometheus.Counter + + SingleChatMsgRecvSuccessCounter prometheus.Counter + GroupChatMsgRecvSuccessCounter prometheus.Counter + WorkSuperGroupChatMsgRecvSuccessCounter prometheus.Counter + + //msg-msg + SingleChatMsgProcessSuccessCounter prometheus.Counter + SingleChatMsgProcessFailedCounter prometheus.Counter + GroupChatMsgProcessSuccessCounter prometheus.Counter + GroupChatMsgProcessFailedCounter prometheus.Counter + WorkSuperGroupChatMsgProcessSuccessCounter prometheus.Counter + WorkSuperGroupChatMsgProcessFailedCounter prometheus.Counter + + //msg-push + MsgOnlinePushSuccessCounter prometheus.Counter + MsgOfflinePushSuccessCounter prometheus.Counter + MsgOfflinePushFailedCounter prometheus.Counter ) func NewUserLoginCounter() { @@ -87,3 +114,133 @@ func NewMsgInsertMongoFailedCounter() { Help: "The number of failed insert msg to mongo", }) } + +func NewMsgPullFromRedisSuccessCounter() { + MsgPullFromRedisSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "msg_pull_from_redis_success", + Help: "The number of successful pull msg from redis", + }) +} + +func NewMsgPullFromRedisFailedCounter() { + MsgPullFromRedisFailedCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "msg_pull_from_redis_failed", + Help: "The number of failed pull msg from redis", + }) +} + +func NewMsgPullFromMongoSuccessCounter() { + MsgPullFromMongoSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "msg_pull_from_redis_success", + Help: "The number of successful pull msg from mongo", + }) +} + +func NewMsgPullFromMongoFailedCounter() { + MsgPullFromMongoFailedCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "msg_pull_from_mongo_failed", + Help: "The number of failed pull msg from mongo", + }) +} + +func NewMsgRecvTotalCounter() { + MsgRecvTotalCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "msg_recv_total", + Help: "The number of msg received", + }) +} + +func NewGetNewestSeqTotalCounter() { + GetNewestSeqTotalCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "get_newest_seq_total", + Help: "the number of get newest seq", + }) +} +func NewPullMsgBySeqListTotalCounter() { + PullMsgBySeqListTotalCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "pull_msg_by_seq_list_total", + Help: "The number of pull msg by seq list", + }) +} + +func NewSingleChatMsgRecvSuccessCounter() { + SingleChatMsgRecvSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "single_chat_msg_recv_success", + Help: "The number of single chat msg successful received ", + }) +} + +func NewGroupChatMsgRecvSuccessCounter() { + GroupChatMsgRecvSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "group_chat_msg_recv_success", + Help: "The number of group chat msg successful received", + }) +} + +func NewWorkSuperGroupChatMsgRecvSuccessCounter() { + WorkSuperGroupChatMsgRecvSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "work_super_group_chat_msg_recv_success", + Help: "The number of work/super group chat msg successful received", + }) +} + +func NewSingleChatMsgProcessSuccessCounter() { + SingleChatMsgProcessSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "single_chat_msg_process_success", + Help: "The number of single chat msg successful processed", + }) +} + +func NewSingleChatMsgProcessFailedCounter() { + SingleChatMsgProcessFailedCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "single_chat_msg_process_failed", + Help: "The number of single chat msg failed processed", + }) +} + +func NewGroupChatMsgProcessSuccessCounter() { + GroupChatMsgProcessSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "group_chat_msg_process_success", + Help: "The number of group chat msg successful processed", + }) +} + +func NewGroupChatMsgProcessFailedCounter() { + GroupChatMsgProcessFailedCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "group_chat_msg_process_failed", + Help: "The number of group chat msg failed processed", + }) +} + +func NewWorkSuperGroupChatMsgProcessSuccessCounter() { + WorkSuperGroupChatMsgProcessSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "work_super_group_chat_msg_process_success", + Help: "The number of work/super group chat msg successful processed", + }) +} +func NewWorkSuperGroupChatMsgProcessFailedCounter() { + WorkSuperGroupChatMsgProcessFailedCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "work_super_group_chat_msg_process_failed", + Help: "The number of work/super group chat msg failed processed", + }) +} + +func NewMsgOnlinePushSuccessCounter() { + MsgOnlinePushSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "msg_online_push_success", + Help: "The number of msg successful online pushed", + }) +} + +func NewMsgOfflinePushSuccessCounter() { + MsgOfflinePushSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "msg_offline_push_success", + Help: "The number of msg successful offline pushed", + }) +} +func NewMsgOfflinePushFailedCounter() { + MsgOfflinePushFailedCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "msg_offline_push_failed", + Help: "The number of msg failed offline pushed", + }) +} diff --git a/pkg/common/prometheus/prometheus.go b/pkg/common/prometheus/prometheus.go index 2a0d9fe1c..603ac9564 100644 --- a/pkg/common/prometheus/prometheus.go +++ b/pkg/common/prometheus/prometheus.go @@ -31,7 +31,6 @@ func PromeInc(counter prometheus.Counter) { if counter != nil { counter.Inc() } - } }