mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-25 11:06:43 +08:00
log
This commit is contained in:
parent
6f3320ab55
commit
7f861fa5c3
@ -41,7 +41,7 @@ func (mmc *ModifyMsgConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSessi
|
|||||||
for msg := range claim.Messages() {
|
for msg := range claim.Messages() {
|
||||||
log.NewDebug("", "kafka get info to mysql", "ModifyMsgConsumerHandler", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "key", string(msg.Key))
|
log.NewDebug("", "kafka get info to mysql", "ModifyMsgConsumerHandler", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "key", string(msg.Key))
|
||||||
if len(msg.Value) != 0 {
|
if len(msg.Value) != 0 {
|
||||||
ctx := mmc.modifyMsgConsumerGroup.GetContextFromMsg(msg)
|
ctx := mmc.modifyMsgConsumerGroup.GetContextFromMsg(msg, "modify consumer")
|
||||||
mmc.ModifyMsg(ctx, msg, string(msg.Key), sess)
|
mmc.ModifyMsg(ctx, msg, string(msg.Key), sess)
|
||||||
} else {
|
} else {
|
||||||
log.Error("", "msg get from kafka but is nil", msg.Key)
|
log.Error("", "msg get from kafka but is nil", msg.Key)
|
||||||
|
@ -89,7 +89,7 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
|
|||||||
notStoragePushMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80)
|
notStoragePushMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80)
|
||||||
log.Debug(triggerID, "msg arrived channel", "channel id", channelID, msgList, msgChannelValue.aggregationID, len(msgList))
|
log.Debug(triggerID, "msg arrived channel", "channel id", channelID, msgList, msgChannelValue.aggregationID, len(msgList))
|
||||||
var modifyMsgList []*pbMsg.MsgDataToMQ
|
var modifyMsgList []*pbMsg.MsgDataToMQ
|
||||||
ctx := context.Background()
|
ctx := tracelog.NewCtx("redis consumer", triggerID)
|
||||||
tracelog.SetOperationID(ctx, triggerID)
|
tracelog.SetOperationID(ctx, triggerID)
|
||||||
for _, v := range msgList {
|
for _, v := range msgList {
|
||||||
log.Debug(triggerID, "msg come to storage center", v.String())
|
log.Debug(triggerID, "msg come to storage center", v.String())
|
||||||
|
@ -78,7 +78,7 @@ func (mc *OnlineHistoryMongoConsumerHandler) ConsumeClaim(sess sarama.ConsumerGr
|
|||||||
for msg := range claim.Messages() {
|
for msg := range claim.Messages() {
|
||||||
log.NewDebug("", "kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "key", string(msg.Key))
|
log.NewDebug("", "kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "key", string(msg.Key))
|
||||||
if len(msg.Value) != 0 {
|
if len(msg.Value) != 0 {
|
||||||
ctx := mc.historyConsumerGroup.GetContextFromMsg(msg)
|
ctx := mc.historyConsumerGroup.GetContextFromMsg(msg, "mongoDB consumer")
|
||||||
mc.handleChatWs2Mongo(ctx, msg, string(msg.Key), sess)
|
mc.handleChatWs2Mongo(ctx, msg, string(msg.Key), sess)
|
||||||
} else {
|
} else {
|
||||||
log.Error("", "mongo msg get from kafka but is nil", msg.Key)
|
log.Error("", "mongo msg get from kafka but is nil", msg.Key)
|
||||||
|
@ -78,7 +78,7 @@ func (pc *PersistentConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSessi
|
|||||||
for msg := range claim.Messages() {
|
for msg := range claim.Messages() {
|
||||||
log.NewDebug("", "kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "key", string(msg.Key))
|
log.NewDebug("", "kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "key", string(msg.Key))
|
||||||
if len(msg.Value) != 0 {
|
if len(msg.Value) != 0 {
|
||||||
ctx := pc.persistentConsumerGroup.GetContextFromMsg(msg)
|
ctx := pc.persistentConsumerGroup.GetContextFromMsg(msg, "mysql consumer")
|
||||||
pc.handleChatWs2Mysql(ctx, msg, string(msg.Key), sess)
|
pc.handleChatWs2Mysql(ctx, msg, string(msg.Key), sess)
|
||||||
} else {
|
} else {
|
||||||
log.Error("", "msg get from kafka but is nil", msg.Key)
|
log.Error("", "msg get from kafka but is nil", msg.Key)
|
||||||
|
@ -68,7 +68,7 @@ func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
|
|||||||
claim sarama.ConsumerGroupClaim) error {
|
claim sarama.ConsumerGroupClaim) error {
|
||||||
for msg := range claim.Messages() {
|
for msg := range claim.Messages() {
|
||||||
log.NewDebug("", "kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value))
|
log.NewDebug("", "kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value))
|
||||||
ctx := c.pushConsumerGroup.GetContextFromMsg(msg)
|
ctx := c.pushConsumerGroup.GetContextFromMsg(msg, "push consumer")
|
||||||
c.handleMs2PsChat(ctx, msg.Value)
|
c.handleMs2PsChat(ctx, msg.Value)
|
||||||
sess.MarkMessage(msg, "")
|
sess.MarkMessage(msg, "")
|
||||||
}
|
}
|
||||||
|
@ -42,8 +42,8 @@ func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addrs []str
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mc *MConsumerGroup) GetContextFromMsg(cMsg *sarama.ConsumerMessage) context.Context {
|
func (mc *MConsumerGroup) GetContextFromMsg(cMsg *sarama.ConsumerMessage, rootFuncName string) context.Context {
|
||||||
ctx := context.Background()
|
ctx := tracelog.NewCtx(rootFuncName, "")
|
||||||
var operationID string
|
var operationID string
|
||||||
for _, v := range cMsg.Headers {
|
for _, v := range cMsg.Headers {
|
||||||
if string(v.Key) == constant.OperationID {
|
if string(v.Key) == constant.OperationID {
|
||||||
|
@ -237,7 +237,7 @@ func NewWarn(OperationID string, args ...interface{}) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func ShowLog(ctx context.Context) {
|
func ShowLog(ctx context.Context) {
|
||||||
t := ctx.Value(tracelog.TraceLogKey).(*tracelog.ApiInfo)
|
t := ctx.Value(tracelog.TraceLogKey).(*tracelog.FuncInfos)
|
||||||
OperationID := tracelog.GetOperationID(ctx)
|
OperationID := tracelog.GetOperationID(ctx)
|
||||||
for _, v := range *t.Funcs {
|
for _, v := range *t.Funcs {
|
||||||
|
|
||||||
@ -273,7 +273,7 @@ func ShowLog(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func InfoWithCtx(ctx context.Context, args ...interface{}) {
|
func InfoWithCtx(ctx context.Context, args ...interface{}) {
|
||||||
t := ctx.Value(tracelog.TraceLogKey).(*tracelog.ApiInfo)
|
t := ctx.Value(tracelog.TraceLogKey).(*tracelog.FuncInfos)
|
||||||
OperationID := tracelog.GetOperationID(ctx)
|
OperationID := tracelog.GetOperationID(ctx)
|
||||||
for _, v := range *t.Funcs {
|
for _, v := range *t.Funcs {
|
||||||
logger.WithFields(logrus.Fields{
|
logger.WithFields(logrus.Fields{
|
||||||
@ -284,7 +284,7 @@ func InfoWithCtx(ctx context.Context, args ...interface{}) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func DebugWithCtx(ctx context.Context, args ...interface{}) {
|
func DebugWithCtx(ctx context.Context, args ...interface{}) {
|
||||||
t := ctx.Value(tracelog.TraceLogKey).(*tracelog.ApiInfo)
|
t := ctx.Value(tracelog.TraceLogKey).(*tracelog.FuncInfos)
|
||||||
OperationID := tracelog.GetOperationID(ctx)
|
OperationID := tracelog.GetOperationID(ctx)
|
||||||
for _, v := range *t.Funcs {
|
for _, v := range *t.Funcs {
|
||||||
logger.WithFields(logrus.Fields{
|
logger.WithFields(logrus.Fields{
|
||||||
@ -295,7 +295,7 @@ func DebugWithCtx(ctx context.Context, args ...interface{}) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func ErrorWithCtx(ctx context.Context, args ...interface{}) {
|
func ErrorWithCtx(ctx context.Context, args ...interface{}) {
|
||||||
t := ctx.Value(tracelog.TraceLogKey).(*tracelog.ApiInfo)
|
t := ctx.Value(tracelog.TraceLogKey).(*tracelog.FuncInfos)
|
||||||
OperationID := tracelog.GetOperationID(ctx)
|
OperationID := tracelog.GetOperationID(ctx)
|
||||||
for _, v := range *t.Funcs {
|
for _, v := range *t.Funcs {
|
||||||
if v.Err != nil {
|
if v.Err != nil {
|
||||||
@ -308,7 +308,7 @@ func ErrorWithCtx(ctx context.Context, args ...interface{}) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func WarnWithCtx(ctx context.Context, args ...interface{}) {
|
func WarnWithCtx(ctx context.Context, args ...interface{}) {
|
||||||
t := ctx.Value(tracelog.TraceLogKey).(*tracelog.ApiInfo)
|
t := ctx.Value(tracelog.TraceLogKey).(*tracelog.FuncInfos)
|
||||||
OperationID := tracelog.GetOperationID(ctx)
|
OperationID := tracelog.GetOperationID(ctx)
|
||||||
for _, v := range *t.Funcs {
|
for _, v := range *t.Funcs {
|
||||||
logger.WithFields(logrus.Fields{
|
logger.WithFields(logrus.Fields{
|
||||||
|
@ -39,7 +39,7 @@ func rpcServerInterceptor(ctx context.Context, req interface{}, info *grpc.Unary
|
|||||||
if opts := md.Get(OpUserID); len(opts) == 1 {
|
if opts := md.Get(OpUserID); len(opts) == 1 {
|
||||||
opUserID = opts[0]
|
opUserID = opts[0]
|
||||||
}
|
}
|
||||||
ctx = tracelog.NewRpcCtx(ctx, funcName, operationID)
|
ctx = tracelog.SetFuncInfos(ctx, funcName, operationID)
|
||||||
defer log.ShowLog(ctx)
|
defer log.ShowLog(ctx)
|
||||||
tracelog.SetCtxInfo(ctx, funcName, err, "opUserID", opUserID, "rpcReq", rpcString(req))
|
tracelog.SetCtxInfo(ctx, funcName, err, "opUserID", opUserID, "rpcReq", rpcString(req))
|
||||||
resp, err = handler(ctx, req)
|
resp, err = handler(ctx, req)
|
||||||
|
@ -1,7 +1,6 @@
|
|||||||
package tracelog
|
package tracelog
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"OpenIM/pkg/common/constant"
|
|
||||||
"OpenIM/pkg/utils"
|
"OpenIM/pkg/utils"
|
||||||
"context"
|
"context"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
@ -15,24 +14,27 @@ import (
|
|||||||
|
|
||||||
const TraceLogKey = "tracelog"
|
const TraceLogKey = "tracelog"
|
||||||
|
|
||||||
func NewCtx(c *gin.Context, api string) context.Context {
|
func SetFuncInfos(c context.Context, rootFuncName string, operationID string) context.Context {
|
||||||
req := &ApiInfo{ApiName: api, GinCtx: c, OperationID: c.GetHeader(constant.OperationID), Funcs: &[]FuncInfo{}}
|
req := &FuncInfos{RootFuncName: rootFuncName, Funcs: &[]FuncInfo{}}
|
||||||
return context.WithValue(c, TraceLogKey, req)
|
ctx := context.WithValue(c, TraceLogKey, req)
|
||||||
|
SetOperationID(ctx, operationID)
|
||||||
|
return ctx
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRpcCtx(c context.Context, rpc string, operationID string) context.Context {
|
func NewCtx(rootFuncName string, operationID string) context.Context {
|
||||||
req := &ApiInfo{ApiName: rpc, Funcs: &[]FuncInfo{}}
|
c := context.Background()
|
||||||
|
req := &FuncInfos{RootFuncName: rootFuncName, Funcs: &[]FuncInfo{}}
|
||||||
ctx := context.WithValue(c, TraceLogKey, req)
|
ctx := context.WithValue(c, TraceLogKey, req)
|
||||||
SetOperationID(ctx, operationID)
|
SetOperationID(ctx, operationID)
|
||||||
return ctx
|
return ctx
|
||||||
}
|
}
|
||||||
|
|
||||||
func SetOperationID(ctx context.Context, operationID string) {
|
func SetOperationID(ctx context.Context, operationID string) {
|
||||||
ctx.Value(TraceLogKey).(*ApiInfo).OperationID = operationID
|
ctx.Value(TraceLogKey).(*FuncInfos).OperationID = operationID
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetOperationID(ctx context.Context) string {
|
func GetOperationID(ctx context.Context) string {
|
||||||
return ctx.Value(TraceLogKey).(*ApiInfo).OperationID
|
return ctx.Value(TraceLogKey).(*FuncInfos).OperationID
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetOpUserID(ctx context.Context) string {
|
func GetOpUserID(ctx context.Context) string {
|
||||||
@ -53,8 +55,8 @@ func Unwrap(err error) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
type ApiInfo struct {
|
type FuncInfos struct {
|
||||||
ApiName string
|
RootFuncName string
|
||||||
OperationID string
|
OperationID string
|
||||||
Funcs *[]FuncInfo
|
Funcs *[]FuncInfo
|
||||||
GinCtx *gin.Context
|
GinCtx *gin.Context
|
||||||
@ -99,7 +101,7 @@ func SetCtxWarn(ctx context.Context, funcName string, err error, args ...interfa
|
|||||||
}
|
}
|
||||||
|
|
||||||
func SetContextInfo(ctx context.Context, funcName string, logLevel logrus.Level, err error, args ...interface{}) {
|
func SetContextInfo(ctx context.Context, funcName string, logLevel logrus.Level, err error, args ...interface{}) {
|
||||||
t := ctx.Value(TraceLogKey).(*ApiInfo)
|
t := ctx.Value(TraceLogKey).(*FuncInfos)
|
||||||
var funcInfo FuncInfo
|
var funcInfo FuncInfo
|
||||||
funcInfo.Args = make(map[string]interface{})
|
funcInfo.Args = make(map[string]interface{})
|
||||||
argsHandle(args, funcInfo.Args)
|
argsHandle(args, funcInfo.Args)
|
||||||
@ -117,7 +119,7 @@ func SetContextInfo(ctx context.Context, funcName string, logLevel logrus.Level,
|
|||||||
}
|
}
|
||||||
|
|
||||||
func SetRpcReqInfo(ctx context.Context, funcName string, req string) {
|
func SetRpcReqInfo(ctx context.Context, funcName string, req string) {
|
||||||
t := ctx.Value(TraceLogKey).(*ApiInfo)
|
t := ctx.Value(TraceLogKey).(*FuncInfos)
|
||||||
var funcInfo FuncInfo
|
var funcInfo FuncInfo
|
||||||
funcInfo.Args = make(map[string]interface{})
|
funcInfo.Args = make(map[string]interface{})
|
||||||
var args []interface{}
|
var args []interface{}
|
||||||
@ -128,7 +130,7 @@ func SetRpcReqInfo(ctx context.Context, funcName string, req string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func SetRpcRespInfo(ctx context.Context, funcName string, resp string) {
|
func SetRpcRespInfo(ctx context.Context, funcName string, resp string) {
|
||||||
t := ctx.Value(TraceLogKey).(*ApiInfo)
|
t := ctx.Value(TraceLogKey).(*FuncInfos)
|
||||||
var funcInfo FuncInfo
|
var funcInfo FuncInfo
|
||||||
funcInfo.Args = make(map[string]interface{})
|
funcInfo.Args = make(map[string]interface{})
|
||||||
var args []interface{}
|
var args []interface{}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user