diff --git a/internal/msgtransfer/modify_msg_handler.go b/internal/msgtransfer/modify_msg_handler.go index b5d01510e..faaed5aa9 100644 --- a/internal/msgtransfer/modify_msg_handler.go +++ b/internal/msgtransfer/modify_msg_handler.go @@ -41,7 +41,7 @@ func (mmc *ModifyMsgConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSessi for msg := range claim.Messages() { log.NewDebug("", "kafka get info to mysql", "ModifyMsgConsumerHandler", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "key", string(msg.Key)) if len(msg.Value) != 0 { - ctx := mmc.modifyMsgConsumerGroup.GetContextFromMsg(msg) + ctx := mmc.modifyMsgConsumerGroup.GetContextFromMsg(msg, "modify consumer") mmc.ModifyMsg(ctx, msg, string(msg.Key), sess) } else { log.Error("", "msg get from kafka but is nil", msg.Key) diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 2c470ef4a..dbf2a2590 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -89,7 +89,7 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) { notStoragePushMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80) log.Debug(triggerID, "msg arrived channel", "channel id", channelID, msgList, msgChannelValue.aggregationID, len(msgList)) var modifyMsgList []*pbMsg.MsgDataToMQ - ctx := context.Background() + ctx := tracelog.NewCtx("redis consumer", triggerID) tracelog.SetOperationID(ctx, triggerID) for _, v := range msgList { log.Debug(triggerID, "msg come to storage center", v.String()) diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index 5b34c0ec0..359c88749 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -78,7 +78,7 @@ func (mc *OnlineHistoryMongoConsumerHandler) ConsumeClaim(sess sarama.ConsumerGr for msg := range claim.Messages() { log.NewDebug("", "kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "key", string(msg.Key)) if len(msg.Value) != 0 { - ctx := mc.historyConsumerGroup.GetContextFromMsg(msg) + ctx := mc.historyConsumerGroup.GetContextFromMsg(msg, "mongoDB consumer") mc.handleChatWs2Mongo(ctx, msg, string(msg.Key), sess) } else { log.Error("", "mongo msg get from kafka but is nil", msg.Key) diff --git a/internal/msgtransfer/persistent_msg_handler.go b/internal/msgtransfer/persistent_msg_handler.go index 481b3f505..72c336d41 100644 --- a/internal/msgtransfer/persistent_msg_handler.go +++ b/internal/msgtransfer/persistent_msg_handler.go @@ -78,7 +78,7 @@ func (pc *PersistentConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSessi for msg := range claim.Messages() { log.NewDebug("", "kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "key", string(msg.Key)) if len(msg.Value) != 0 { - ctx := pc.persistentConsumerGroup.GetContextFromMsg(msg) + ctx := pc.persistentConsumerGroup.GetContextFromMsg(msg, "mysql consumer") pc.handleChatWs2Mysql(ctx, msg, string(msg.Key), sess) } else { log.Error("", "msg get from kafka but is nil", msg.Key) diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index a2433866c..d40f1d628 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -68,7 +68,7 @@ func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for msg := range claim.Messages() { log.NewDebug("", "kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) - ctx := c.pushConsumerGroup.GetContextFromMsg(msg) + ctx := c.pushConsumerGroup.GetContextFromMsg(msg, "push consumer") c.handleMs2PsChat(ctx, msg.Value) sess.MarkMessage(msg, "") } diff --git a/pkg/common/kafka/consumer_group.go b/pkg/common/kafka/consumer_group.go index 6db83e125..0daf05fb4 100644 --- a/pkg/common/kafka/consumer_group.go +++ b/pkg/common/kafka/consumer_group.go @@ -42,8 +42,8 @@ func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addrs []str } } -func (mc *MConsumerGroup) GetContextFromMsg(cMsg *sarama.ConsumerMessage) context.Context { - ctx := context.Background() +func (mc *MConsumerGroup) GetContextFromMsg(cMsg *sarama.ConsumerMessage, rootFuncName string) context.Context { + ctx := tracelog.NewCtx(rootFuncName, "") var operationID string for _, v := range cMsg.Headers { if string(v.Key) == constant.OperationID { diff --git a/pkg/common/log/logrus.go b/pkg/common/log/logrus.go index 3c24cf8a4..694394a1f 100644 --- a/pkg/common/log/logrus.go +++ b/pkg/common/log/logrus.go @@ -237,7 +237,7 @@ func NewWarn(OperationID string, args ...interface{}) { } func ShowLog(ctx context.Context) { - t := ctx.Value(tracelog.TraceLogKey).(*tracelog.ApiInfo) + t := ctx.Value(tracelog.TraceLogKey).(*tracelog.FuncInfos) OperationID := tracelog.GetOperationID(ctx) for _, v := range *t.Funcs { @@ -273,7 +273,7 @@ func ShowLog(ctx context.Context) { } func InfoWithCtx(ctx context.Context, args ...interface{}) { - t := ctx.Value(tracelog.TraceLogKey).(*tracelog.ApiInfo) + t := ctx.Value(tracelog.TraceLogKey).(*tracelog.FuncInfos) OperationID := tracelog.GetOperationID(ctx) for _, v := range *t.Funcs { logger.WithFields(logrus.Fields{ @@ -284,7 +284,7 @@ func InfoWithCtx(ctx context.Context, args ...interface{}) { } func DebugWithCtx(ctx context.Context, args ...interface{}) { - t := ctx.Value(tracelog.TraceLogKey).(*tracelog.ApiInfo) + t := ctx.Value(tracelog.TraceLogKey).(*tracelog.FuncInfos) OperationID := tracelog.GetOperationID(ctx) for _, v := range *t.Funcs { logger.WithFields(logrus.Fields{ @@ -295,7 +295,7 @@ func DebugWithCtx(ctx context.Context, args ...interface{}) { } func ErrorWithCtx(ctx context.Context, args ...interface{}) { - t := ctx.Value(tracelog.TraceLogKey).(*tracelog.ApiInfo) + t := ctx.Value(tracelog.TraceLogKey).(*tracelog.FuncInfos) OperationID := tracelog.GetOperationID(ctx) for _, v := range *t.Funcs { if v.Err != nil { @@ -308,7 +308,7 @@ func ErrorWithCtx(ctx context.Context, args ...interface{}) { } func WarnWithCtx(ctx context.Context, args ...interface{}) { - t := ctx.Value(tracelog.TraceLogKey).(*tracelog.ApiInfo) + t := ctx.Value(tracelog.TraceLogKey).(*tracelog.FuncInfos) OperationID := tracelog.GetOperationID(ctx) for _, v := range *t.Funcs { logger.WithFields(logrus.Fields{ diff --git a/pkg/common/mw/rpc.go b/pkg/common/mw/rpc.go index d68099dc3..f9004b648 100644 --- a/pkg/common/mw/rpc.go +++ b/pkg/common/mw/rpc.go @@ -39,7 +39,7 @@ func rpcServerInterceptor(ctx context.Context, req interface{}, info *grpc.Unary if opts := md.Get(OpUserID); len(opts) == 1 { opUserID = opts[0] } - ctx = tracelog.NewRpcCtx(ctx, funcName, operationID) + ctx = tracelog.SetFuncInfos(ctx, funcName, operationID) defer log.ShowLog(ctx) tracelog.SetCtxInfo(ctx, funcName, err, "opUserID", opUserID, "rpcReq", rpcString(req)) resp, err = handler(ctx, req) diff --git a/pkg/common/tracelog/ctx.go b/pkg/common/tracelog/ctx.go index 76d152862..3ec433520 100644 --- a/pkg/common/tracelog/ctx.go +++ b/pkg/common/tracelog/ctx.go @@ -1,7 +1,6 @@ package tracelog import ( - "OpenIM/pkg/common/constant" "OpenIM/pkg/utils" "context" "github.com/sirupsen/logrus" @@ -15,24 +14,27 @@ import ( const TraceLogKey = "tracelog" -func NewCtx(c *gin.Context, api string) context.Context { - req := &ApiInfo{ApiName: api, GinCtx: c, OperationID: c.GetHeader(constant.OperationID), Funcs: &[]FuncInfo{}} - return context.WithValue(c, TraceLogKey, req) +func SetFuncInfos(c context.Context, rootFuncName string, operationID string) context.Context { + req := &FuncInfos{RootFuncName: rootFuncName, Funcs: &[]FuncInfo{}} + ctx := context.WithValue(c, TraceLogKey, req) + SetOperationID(ctx, operationID) + return ctx } -func NewRpcCtx(c context.Context, rpc string, operationID string) context.Context { - req := &ApiInfo{ApiName: rpc, Funcs: &[]FuncInfo{}} +func NewCtx(rootFuncName string, operationID string) context.Context { + c := context.Background() + req := &FuncInfos{RootFuncName: rootFuncName, Funcs: &[]FuncInfo{}} ctx := context.WithValue(c, TraceLogKey, req) SetOperationID(ctx, operationID) return ctx } func SetOperationID(ctx context.Context, operationID string) { - ctx.Value(TraceLogKey).(*ApiInfo).OperationID = operationID + ctx.Value(TraceLogKey).(*FuncInfos).OperationID = operationID } func GetOperationID(ctx context.Context) string { - return ctx.Value(TraceLogKey).(*ApiInfo).OperationID + return ctx.Value(TraceLogKey).(*FuncInfos).OperationID } func GetOpUserID(ctx context.Context) string { @@ -53,11 +55,11 @@ func Unwrap(err error) error { return err } -type ApiInfo struct { - ApiName string - OperationID string - Funcs *[]FuncInfo - GinCtx *gin.Context +type FuncInfos struct { + RootFuncName string + OperationID string + Funcs *[]FuncInfo + GinCtx *gin.Context } type FuncInfo struct { @@ -99,7 +101,7 @@ func SetCtxWarn(ctx context.Context, funcName string, err error, args ...interfa } func SetContextInfo(ctx context.Context, funcName string, logLevel logrus.Level, err error, args ...interface{}) { - t := ctx.Value(TraceLogKey).(*ApiInfo) + t := ctx.Value(TraceLogKey).(*FuncInfos) var funcInfo FuncInfo funcInfo.Args = make(map[string]interface{}) argsHandle(args, funcInfo.Args) @@ -117,7 +119,7 @@ func SetContextInfo(ctx context.Context, funcName string, logLevel logrus.Level, } func SetRpcReqInfo(ctx context.Context, funcName string, req string) { - t := ctx.Value(TraceLogKey).(*ApiInfo) + t := ctx.Value(TraceLogKey).(*FuncInfos) var funcInfo FuncInfo funcInfo.Args = make(map[string]interface{}) var args []interface{} @@ -128,7 +130,7 @@ func SetRpcReqInfo(ctx context.Context, funcName string, req string) { } func SetRpcRespInfo(ctx context.Context, funcName string, resp string) { - t := ctx.Value(TraceLogKey).(*ApiInfo) + t := ctx.Value(TraceLogKey).(*FuncInfos) var funcInfo FuncInfo funcInfo.Args = make(map[string]interface{}) var args []interface{}