From 11ebe4e23440830759393b2665f8c95d92a73510 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Wed, 22 Mar 2023 18:35:21 +0800 Subject: [PATCH 1/9] msg update --- .../msgtransfer/online_history_msg_handler.go | 102 ++++++++++-------- internal/push/push_handler.go | 8 +- pkg/common/constant/constant.go | 1 + pkg/common/db/controller/msg.go | 7 +- pkg/common/kafka/producer.go | 27 +++-- pkg/common/log/zap.go | 10 +- pkg/common/mcontext/ctx.go | 60 ++++++++--- 7 files changed, 134 insertions(+), 81 deletions(-) diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index ddfd2259e..dd3c07a3f 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -1,6 +1,7 @@ package msgtransfer import ( + "context" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller" @@ -22,20 +23,24 @@ const ChannelNum = 100 type MsgChannelValue struct { aggregationID string //maybe userID or super groupID - triggerID string - msgList []*pbMsg.MsgDataToMQ + ctx context.Context + ctxMsgList []*ContextMsg lastSeq uint64 } type TriggerChannelValue struct { - triggerID string - cMsgList []*sarama.ConsumerMessage + ctx context.Context + cMsgList []*sarama.ConsumerMessage } type Cmd2Value struct { Cmd int Value interface{} } +type ContextMsg struct { + message *pbMsg.MsgDataToMQ + ctx context.Context +} type OnlineHistoryRedisConsumerHandler struct { historyConsumerGroup *kafka.MConsumerGroup @@ -80,38 +85,39 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) { switch cmd.Cmd { case AggregationMessages: msgChannelValue := cmd.Value.(MsgChannelValue) - msgList := msgChannelValue.msgList - triggerID := msgChannelValue.triggerID + ctxMsgList := msgChannelValue.ctxMsgList + ctx := msgChannelValue.ctx storageMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80) - notStoragePushMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80) - log.Debug(triggerID, "msg arrived channel", "channel id", channelID, msgList, msgChannelValue.aggregationID, len(msgList)) + storagePushMsgList := make([]*ContextMsg, 0, 80) + notStoragePushMsgList := make([]*ContextMsg, 0, 80) + log.ZDebug(ctx, "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMsgList), "aggregationID", msgChannelValue.aggregationID) var modifyMsgList []*pbMsg.MsgDataToMQ - ctx := mcontext.NewCtx("redis consumer") - mcontext.SetOperationID(ctx, triggerID) - for _, v := range msgList { - log.Debug(triggerID, "msg come to storage center", v.String()) - isHistory := utils.GetSwitchFromOptions(v.MsgData.Options, constant.IsHistory) - isSenderSync := utils.GetSwitchFromOptions(v.MsgData.Options, constant.IsSenderSync) + //ctx := mcontext.NewCtx("redis consumer") + //mcontext.SetOperationID(ctx, triggerID) + for _, v := range ctxMsgList { + log.ZDebug(ctx, "msg come to storage center", v.message.String()) + isHistory := utils.GetSwitchFromOptions(v.message.MsgData.Options, constant.IsHistory) + isSenderSync := utils.GetSwitchFromOptions(v.message.MsgData.Options, constant.IsSenderSync) if isHistory { - storageMsgList = append(storageMsgList, v) - //log.NewWarn(triggerID, "storageMsgList to mongodb client msgID: ", v.MsgData.ClientMsgID) + storageMsgList = append(storageMsgList, v.message) + storagePushMsgList = append(storagePushMsgList, v) } else { - if !(!isSenderSync && msgChannelValue.aggregationID == v.MsgData.SendID) { + if !(!isSenderSync && msgChannelValue.aggregationID == v.message.MsgData.SendID) { notStoragePushMsgList = append(notStoragePushMsgList, v) } } - if v.MsgData.ContentType == constant.ReactionMessageModifier || v.MsgData.ContentType == constant.ReactionMessageDeleter { - modifyMsgList = append(modifyMsgList, v) + if v.message.MsgData.ContentType == constant.ReactionMessageModifier || v.message.MsgData.ContentType == constant.ReactionMessageDeleter { + modifyMsgList = append(modifyMsgList, v.message) } } if len(modifyMsgList) > 0 { - och.msgDatabase.MsgToModifyMQ(ctx, msgChannelValue.aggregationID, triggerID, modifyMsgList) + och.msgDatabase.MsgToModifyMQ(ctx, msgChannelValue.aggregationID, "", modifyMsgList) } - log.Debug(triggerID, "msg storage length", len(storageMsgList), "push length", len(notStoragePushMsgList)) + log.ZDebug(ctx, "msg storage length", len(storageMsgList), "push length", len(notStoragePushMsgList)) if len(storageMsgList) > 0 { lastSeq, err := och.msgDatabase.BatchInsertChat2Cache(ctx, msgChannelValue.aggregationID, storageMsgList) if err != nil { - log.NewError(triggerID, "single data insert to redis err", err.Error(), storageMsgList) + log.ZError(ctx, "batch data insert to redis err", err, "storageMsgList", storageMsgList) och.singleMsgFailedCountMutex.Lock() och.singleMsgFailedCount += uint64(len(storageMsgList)) och.singleMsgFailedCountMutex.Unlock() @@ -119,18 +125,20 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) { och.singleMsgSuccessCountMutex.Lock() och.singleMsgSuccessCount += uint64(len(storageMsgList)) och.singleMsgSuccessCountMutex.Unlock() - och.msgDatabase.MsgToMongoMQ(ctx, msgChannelValue.aggregationID, triggerID, storageMsgList, lastSeq) - for _, v := range storageMsgList { - och.msgDatabase.MsgToPushMQ(ctx, msgChannelValue.aggregationID, v) + och.msgDatabase.MsgToMongoMQ(ctx, msgChannelValue.aggregationID, "", storageMsgList, lastSeq) + for _, v := range storagePushMsgList { + och.msgDatabase.MsgToPushMQ(v.ctx, msgChannelValue.aggregationID, v.message) } for _, v := range notStoragePushMsgList { - och.msgDatabase.MsgToPushMQ(ctx, msgChannelValue.aggregationID, v) + och.msgDatabase.MsgToPushMQ(v.ctx, msgChannelValue.aggregationID, v.message) } } } else { for _, v := range notStoragePushMsgList { - och.msgDatabase.MsgToPushMQ(ctx, msgChannelValue.aggregationID, v) - + p, o, err := och.msgDatabase.MsgToPushMQ(v.ctx, msgChannelValue.aggregationID, v.message) + if err != nil { + log.ZError(v.ctx, "kafka send failed", err, "msg", v.message.String(), "pid", p, "offset", o) + } } } } @@ -140,40 +148,43 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) { func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() { for { - aggregationMsgs := make(map[string][]*pbMsg.MsgDataToMQ, ChannelNum) + aggregationMsgs := make(map[string][]*ContextMsg, ChannelNum) select { case cmd := <-och.msgDistributionCh: switch cmd.Cmd { case ConsumerMsgs: triggerChannelValue := cmd.Value.(TriggerChannelValue) - triggerID := triggerChannelValue.triggerID + ctx := triggerChannelValue.ctx consumerMessages := triggerChannelValue.cMsgList //Aggregation map[userid]message list - log.Debug(triggerID, "batch messages come to distribution center", len(consumerMessages)) + log.ZDebug(ctx, "batch messages come to distribution center", len(consumerMessages)) for i := 0; i < len(consumerMessages); i++ { + ctxMsg := &ContextMsg{} msgFromMQ := pbMsg.MsgDataToMQ{} err := proto.Unmarshal(consumerMessages[i].Value, &msgFromMQ) if err != nil { - log.Error(triggerID, "msg_transfer Unmarshal msg err", "msg", string(consumerMessages[i].Value), "err", err.Error()) + log.ZError(ctx, "msg_transfer Unmarshal msg err", err, string(consumerMessages[i].Value)) return } - log.Debug(triggerID, "single msg come to distribution center", msgFromMQ.String(), string(consumerMessages[i].Key)) + ctxMsg.ctx = kafka.GetContextWithMQHeader(consumerMessages[i].Headers) + ctxMsg.message = &msgFromMQ + log.ZDebug(ctx, "single msg come to distribution center", msgFromMQ.String(), string(consumerMessages[i].Key)) if oldM, ok := aggregationMsgs[string(consumerMessages[i].Key)]; ok { - oldM = append(oldM, &msgFromMQ) + oldM = append(oldM, ctxMsg) aggregationMsgs[string(consumerMessages[i].Key)] = oldM } else { - m := make([]*pbMsg.MsgDataToMQ, 0, 100) - m = append(m, &msgFromMQ) + m := make([]*ContextMsg, 0, 100) + m = append(m, ctxMsg) aggregationMsgs[string(consumerMessages[i].Key)] = m } } - log.Debug(triggerID, "generate map list users len", len(aggregationMsgs)) + log.ZDebug(ctx, "generate map list users len", len(aggregationMsgs)) for aggregationID, v := range aggregationMsgs { if len(v) >= 0 { hashCode := utils.GetHashCode(aggregationID) channelID := hashCode % ChannelNum - log.Debug(triggerID, "generate channelID", hashCode, channelID, aggregationID) - och.chArrays[channelID] <- Cmd2Value{Cmd: AggregationMessages, Value: MsgChannelValue{aggregationID: aggregationID, msgList: v, triggerID: triggerID}} + log.ZDebug(ctx, "generate channelID", "hashCode", hashCode, "channelID", channelID, "aggregationID", aggregationID) + och.chArrays[channelID] <- Cmd2Value{Cmd: AggregationMessages, Value: MsgChannelValue{aggregationID: aggregationID, ctxMsgList: v, ctx: ctx}} } } } @@ -194,10 +205,9 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(sess sarama.ConsumerG } } rwLock := new(sync.RWMutex) - log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) + log.ZDebug(context.Background(), "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) cMsg := make([]*sarama.ConsumerMessage, 0, 1000) t := time.NewTicker(time.Duration(100) * time.Millisecond) - var triggerID string go func() { for { select { @@ -211,18 +221,18 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(sess sarama.ConsumerG cMsg = make([]*sarama.ConsumerMessage, 0, 1000) rwLock.Unlock() split := 1000 - triggerID = utils.OperationIDGenerator() - log.Debug(triggerID, "timer trigger msg consumer start", len(ccMsg)) + ctx := mcontext.WithTriggerIDContext(context.Background(), utils.OperationIDGenerator()) + log.ZDebug(ctx, "timer trigger msg consumer start", len(ccMsg)) for i := 0; i < len(ccMsg)/split; i++ { //log.Debug() och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ - triggerID: triggerID, cMsgList: ccMsg[i*split : (i+1)*split]}} + ctx: ctx, cMsgList: ccMsg[i*split : (i+1)*split]}} } if (len(ccMsg) % split) > 0 { och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ - triggerID: triggerID, cMsgList: ccMsg[split*(len(ccMsg)/split):]}} + ctx: ctx, cMsgList: ccMsg[split*(len(ccMsg)/split):]}} } - log.Debug(triggerID, "timer trigger msg consumer end", len(cMsg)) + log.ZDebug(ctx, "timer trigger msg consumer end", len(ccMsg)) } } } diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index c6a1ac660..b9edcb732 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -6,7 +6,6 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" kfk "github.com/OpenIMSDK/Open-IM-Server/pkg/common/kafka" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext" pbChat "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg" pbPush "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/push" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" @@ -29,10 +28,10 @@ func NewConsumerHandler(pusher *Pusher) *ConsumerHandler { } func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) { - log.NewDebug("", "msg come from kafka And push!!!", "msg", string(msg)) + log.ZDebug(ctx, "msg come from kafka And push!!!", "msg", string(msg)) msgFromMQ := pbChat.PushMsgDataToMQ{} if err := proto.Unmarshal(msg, &msgFromMQ); err != nil { - log.Error("", "push Unmarshal msg err", "msg", string(msg), "err", err.Error()) + log.ZError(ctx, "push Unmarshal msg err", err, "msg", string(msg)) return } pbData := &pbPush.PushMsgReq{ @@ -44,7 +43,6 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) { if nowSec-sec > 10 { return } - mcontext.SetOperationID(ctx, "") var err error switch msgFromMQ.MsgData.SessionType { case constant.SuperGroupChatType: @@ -53,7 +51,7 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) { err = c.pusher.MsgToUser(ctx, pbData.SourceID, pbData.MsgData) } if err != nil { - log.NewError("", "push failed", *pbData) + log.ZError(ctx, "push failed", err, "msg", pbData.String()) } } func (ConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } diff --git a/pkg/common/constant/constant.go b/pkg/common/constant/constant.go index cbb350eb0..563e9b670 100644 --- a/pkg/common/constant/constant.go +++ b/pkg/common/constant/constant.go @@ -277,6 +277,7 @@ const OpUserPlatform = "platform" const Token = "token" const RpcCustomHeader = "customHeader" // rpc中间件自定义ctx参数 const CheckKey = "CheckKey" +const TriggerID = "triggerID" const ( UnreliableNotification = 1 diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index ec06c1a43..33fef2314 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -77,7 +77,7 @@ type MsgDatabase interface { MsgToMQ(ctx context.Context, key string, msg2mq *pbMsg.MsgDataToMQ) error MsgToModifyMQ(ctx context.Context, aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ) error - MsgToPushMQ(ctx context.Context, sourceID string, msg2mq *pbMsg.MsgDataToMQ) error + MsgToPushMQ(ctx context.Context, sourceID string, msg2mq *pbMsg.MsgDataToMQ) (int32, int64, error) MsgToMongoMQ(ctx context.Context, aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq int64) error } @@ -193,10 +193,9 @@ func (db *msgDatabase) MsgToModifyMQ(ctx context.Context, aggregationID string, return nil } -func (db *msgDatabase) MsgToPushMQ(ctx context.Context, key string, msg2mq *pbMsg.MsgDataToMQ) error { +func (db *msgDatabase) MsgToPushMQ(ctx context.Context, key string, msg2mq *pbMsg.MsgDataToMQ) (int32, int64, error) { mqPushMsg := pbMsg.PushMsgDataToMQ{MsgData: msg2mq.MsgData, SourceID: key} - _, _, err := db.producerToPush.SendMessage(ctx, key, &mqPushMsg) - return err + return db.producerToPush.SendMessage(ctx, key, &mqPushMsg) } func (db *msgDatabase) MsgToMongoMQ(ctx context.Context, aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq int64) error { diff --git a/pkg/common/kafka/producer.go b/pkg/common/kafka/producer.go index bfeb84295..0a2a4625d 100644 --- a/pkg/common/kafka/producer.go +++ b/pkg/common/kafka/producer.go @@ -47,7 +47,24 @@ func NewKafkaProducer(addr []string, topic string) *Producer { p.producer = producer return &p } - +func GetMQHeaderWithContext(ctx context.Context) ([]sarama.RecordHeader, error) { + operationID, opUserID, platform, connID, err := mcontext.GetMustCtxInfo(ctx) + if err != nil { + return nil, err + } + return []sarama.RecordHeader{ + {Key: []byte(constant.OperationID), Value: []byte(operationID)}, + {Key: []byte(constant.OpUserID), Value: []byte(opUserID)}, + {Key: []byte(constant.OpUserPlatform), Value: []byte(platform)}, + {Key: []byte(constant.ConnID), Value: []byte(connID)}}, err +} +func GetContextWithMQHeader(header []*sarama.RecordHeader) context.Context { + var values []string + for _, recordHeader := range header { + values = append(values, string(recordHeader.Value)) + } + return mcontext.WithMustInfoCtx(values) +} func (p *Producer) SendMessage(ctx context.Context, key string, m proto.Message) (int32, int64, error) { log.ZDebug(ctx, "SendMessage", "key ", key, "msg", m.String()) kMsg := &sarama.ProducerMessage{} @@ -65,15 +82,11 @@ func (p *Producer) SendMessage(ctx context.Context, key string, m proto.Message) return 0, 0, utils.Wrap(emptyMsg, "") } kMsg.Metadata = ctx - operationID, opUserID, platform, connID, err := mcontext.GetMustCtxInfo(ctx) + header, err := GetMQHeaderWithContext(ctx) if err != nil { return 0, 0, utils.Wrap(err, "") } - kMsg.Headers = []sarama.RecordHeader{ - {Key: []byte(constant.OperationID), Value: []byte(operationID)}, - {Key: []byte(constant.OpUserID), Value: []byte(opUserID)}, - {Key: []byte(constant.OpUserPlatform), Value: []byte(platform)}, - {Key: []byte(constant.ConnID), Value: []byte(connID)}} + kMsg.Headers = header partition, offset, err := p.producer.SendMessage(kMsg) log.ZDebug(ctx, "ByteEncoder SendMessage end", "key ", kMsg.Key, "key length", kMsg.Value.Length()) if err == nil { diff --git a/pkg/common/log/zap.go b/pkg/common/log/zap.go index a405fd2e4..8fd8c5c21 100644 --- a/pkg/common/log/zap.go +++ b/pkg/common/log/zap.go @@ -147,14 +147,18 @@ func (l *ZapLogger) kvAppend(ctx context.Context, keysAndValues []interface{}) [ operationID := mcontext.GetOperationID(ctx) opUserID := mcontext.GetOpUserID(ctx) connID := mcontext.GetConnID(ctx) + triggerID := mcontext.GetTriggerID(ctx) if opUserID != "" { - keysAndValues = append([]interface{}{constant.OpUserID, mcontext.GetOpUserID(ctx)}, keysAndValues...) + keysAndValues = append([]interface{}{constant.OpUserID, opUserID}, keysAndValues...) } if operationID != "" { - keysAndValues = append([]interface{}{constant.OperationID, mcontext.GetOperationID(ctx)}, keysAndValues...) + keysAndValues = append([]interface{}{constant.OperationID, operationID}, keysAndValues...) } if connID != "" { - keysAndValues = append([]interface{}{constant.ConnID, mcontext.GetConnID(ctx)}, keysAndValues...) + keysAndValues = append([]interface{}{constant.ConnID, connID}, keysAndValues...) + } + if triggerID != "" { + keysAndValues = append([]interface{}{constant.TriggerID, triggerID}, keysAndValues...) } return keysAndValues } diff --git a/pkg/common/mcontext/ctx.go b/pkg/common/mcontext/ctx.go index bbb26f41c..898aacc53 100644 --- a/pkg/common/mcontext/ctx.go +++ b/pkg/common/mcontext/ctx.go @@ -6,6 +6,17 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" ) +var mapper = []string{constant.OperationID, constant.OpUserID, constant.OpUserPlatform, constant.ConnID} + +func WithOpUserIDContext(ctx context.Context, opUserID string) context.Context { + return context.WithValue(ctx, constant.OpUserID, opUserID) +} +func WithOpUserPlatformContext(ctx context.Context, platform string) context.Context { + return context.WithValue(ctx, constant.OpUserPlatform, platform) +} +func WithTriggerIDContext(ctx context.Context, triggerID string) context.Context { + return context.WithValue(ctx, constant.TriggerID, triggerID) +} func NewCtx(operationID string) context.Context { c := context.Background() ctx := context.WithValue(c, constant.OperationID, operationID) @@ -34,6 +45,34 @@ func GetOperationID(ctx context.Context) string { } return "" } +func GetOpUserID(ctx context.Context) string { + if ctx.Value(constant.OpUserID) != "" { + s, ok := ctx.Value(constant.OpUserID).(string) + if ok { + return s + } + } + return "" +} +func GetConnID(ctx context.Context) string { + if ctx.Value(constant.ConnID) != "" { + s, ok := ctx.Value(constant.ConnID).(string) + if ok { + return s + } + } + return "" +} + +func GetTriggerID(ctx context.Context) string { + if ctx.Value(constant.TriggerID) != "" { + s, ok := ctx.Value(constant.TriggerID).(string) + if ok { + return s + } + } + return "" +} func GetMustCtxInfo(ctx context.Context) (operationID, opUserID, platform, connID string, err error) { operationID, ok := ctx.Value(constant.OperationID).(string) if !ok { @@ -54,23 +93,12 @@ func GetMustCtxInfo(ctx context.Context) (operationID, opUserID, platform, connI return } +func WithMustInfoCtx(values []string) context.Context { + ctx := context.Background() + for i, v := range values { + ctx = context.WithValue(ctx, mapper[i], v) -func GetOpUserID(ctx context.Context) string { - if ctx.Value(constant.OpUserID) != "" { - s, ok := ctx.Value(constant.OpUserID).(string) - if ok { - return s - } } - return "" -} + return ctx -func GetConnID(ctx context.Context) string { - if ctx.Value(constant.ConnID) != "" { - s, ok := ctx.Value(constant.ConnID).(string) - if ok { - return s - } - } - return "" } From 254313d11faee9e24e9c2e8ccb1852ef8a75731e Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Wed, 22 Mar 2023 18:47:29 +0800 Subject: [PATCH 2/9] msg update --- internal/msgtransfer/online_history_msg_handler.go | 3 ++- internal/push/push_handler.go | 2 -- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index dd3c07a3f..43349b19a 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -205,7 +205,8 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(sess sarama.ConsumerG } } rwLock := new(sync.RWMutex) - log.ZDebug(context.Background(), "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) + log.ZDebug(context.Background(), "online new session msg come", "highWaterMarkOffset", + claim.HighWaterMarkOffset(), "topic", claim.Topic(), "partition", claim.Partition()) cMsg := make([]*sarama.ConsumerMessage, 0, 1000) t := time.NewTicker(time.Duration(100) * time.Millisecond) go func() { diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index b9edcb732..8748563d3 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -28,7 +28,6 @@ func NewConsumerHandler(pusher *Pusher) *ConsumerHandler { } func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) { - log.ZDebug(ctx, "msg come from kafka And push!!!", "msg", string(msg)) msgFromMQ := pbChat.PushMsgDataToMQ{} if err := proto.Unmarshal(msg, &msgFromMQ); err != nil { log.ZError(ctx, "push Unmarshal msg err", err, "msg", string(msg)) @@ -59,7 +58,6 @@ func (ConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for msg := range claim.Messages() { - log.NewDebug("", "kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) ctx := c.pushConsumerGroup.GetContextFromMsg(msg, "push consumer") c.handleMs2PsChat(ctx, msg.Value) sess.MarkMessage(msg, "") From 2b0e0eb9e57b7cee05bb1569332ad5119f4b4b73 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Wed, 22 Mar 2023 19:11:44 +0800 Subject: [PATCH 3/9] msg update --- internal/push/push_rpc_server.go | 3 ++- internal/push/push_to_client.go | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/internal/push/push_rpc_server.go b/internal/push/push_rpc_server.go index 9da1eaaf5..08ab12563 100644 --- a/internal/push/push_rpc_server.go +++ b/internal/push/push_rpc_server.go @@ -9,6 +9,7 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry" pbPush "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/push" "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "sync" ) @@ -22,7 +23,7 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e return err } cacheModel := cache.NewCacheModel(rdb) - + client.AddOption(grpc.WithTransportCredentials(insecure.NewCredentials())) offlinePusher := NewOfflinePusher(cacheModel) database := controller.NewPushDatabase(cacheModel) pusher := NewPusher(client, offlinePusher, database, localcache.NewGroupLocalCache(client), localcache.NewConversationLocalCache(client)) diff --git a/internal/push/push_to_client.go b/internal/push/push_to_client.go index 62d5d4728..72eb6547a 100644 --- a/internal/push/push_to_client.go +++ b/internal/push/push_to_client.go @@ -185,7 +185,7 @@ func (p *Pusher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, msgClient := msggateway.NewMsgGatewayClient(v) reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(ctx, &msggateway.OnlineBatchPushOneMsgReq{MsgData: msg, PushToUserIDs: pushToUserIDs}) if err != nil { - log.NewError(mcontext.GetOperationID(ctx), msg, len(pushToUserIDs), "err", err) + continue } if reply != nil && reply.SinglePushResult != nil { From 19b8bd0ff60d6cc67cea9923b0b2ccbb1fee12ef Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Wed, 22 Mar 2023 19:24:38 +0800 Subject: [PATCH 4/9] msg update --- internal/msgtransfer/modify_msg_handler.go | 2 +- .../msgtransfer/online_msg_to_mongo_handler.go | 2 +- internal/msgtransfer/persistent_msg_handler.go | 2 +- internal/push/push_handler.go | 2 +- pkg/common/kafka/consumer_group.go | 15 +++------------ 5 files changed, 7 insertions(+), 16 deletions(-) diff --git a/internal/msgtransfer/modify_msg_handler.go b/internal/msgtransfer/modify_msg_handler.go index 2d3f92027..f9c0228c3 100644 --- a/internal/msgtransfer/modify_msg_handler.go +++ b/internal/msgtransfer/modify_msg_handler.go @@ -41,7 +41,7 @@ func (mmc *ModifyMsgConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSessi for msg := range claim.Messages() { log.NewDebug("", "kafka get info to mysql", "ModifyMsgConsumerHandler", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "key", string(msg.Key)) if len(msg.Value) != 0 { - ctx := mmc.modifyMsgConsumerGroup.GetContextFromMsg(msg, "modify consumer") + ctx := mmc.modifyMsgConsumerGroup.GetContextFromMsg(msg) mmc.ModifyMsg(ctx, msg, string(msg.Key), sess) } else { log.Error("", "msg get from kafka but is nil", msg.Key) diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index 3dd8a6e64..d0f4ff001 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -78,7 +78,7 @@ func (mc *OnlineHistoryMongoConsumerHandler) ConsumeClaim(sess sarama.ConsumerGr for msg := range claim.Messages() { log.NewDebug("", "kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "key", string(msg.Key)) if len(msg.Value) != 0 { - ctx := mc.historyConsumerGroup.GetContextFromMsg(msg, "mongoDB consumer") + ctx := mc.historyConsumerGroup.GetContextFromMsg(msg) mc.handleChatWs2Mongo(ctx, msg, string(msg.Key), sess) } else { log.Error("", "mongo msg get from kafka but is nil", msg.Key) diff --git a/internal/msgtransfer/persistent_msg_handler.go b/internal/msgtransfer/persistent_msg_handler.go index 19ec67201..ddc264a2b 100644 --- a/internal/msgtransfer/persistent_msg_handler.go +++ b/internal/msgtransfer/persistent_msg_handler.go @@ -78,7 +78,7 @@ func (pc *PersistentConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSessi for msg := range claim.Messages() { log.NewDebug("", "kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "key", string(msg.Key)) if len(msg.Value) != 0 { - ctx := pc.persistentConsumerGroup.GetContextFromMsg(msg, "mysql consumer") + ctx := pc.persistentConsumerGroup.GetContextFromMsg(msg) pc.handleChatWs2Mysql(ctx, msg, string(msg.Key), sess) } else { log.Error("", "msg get from kafka but is nil", msg.Key) diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index 8748563d3..3beafe75d 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -58,7 +58,7 @@ func (ConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for msg := range claim.Messages() { - ctx := c.pushConsumerGroup.GetContextFromMsg(msg, "push consumer") + ctx := c.pushConsumerGroup.GetContextFromMsg(msg) c.handleMs2PsChat(ctx, msg.Value) sess.MarkMessage(msg, "") } diff --git a/pkg/common/kafka/consumer_group.go b/pkg/common/kafka/consumer_group.go index 7bc29fda2..c0c3823de 100644 --- a/pkg/common/kafka/consumer_group.go +++ b/pkg/common/kafka/consumer_group.go @@ -8,8 +8,6 @@ package kafka import ( "context" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext" "github.com/Shopify/sarama" ) @@ -42,16 +40,9 @@ func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addrs []str } } -func (mc *MConsumerGroup) GetContextFromMsg(cMsg *sarama.ConsumerMessage, rootFuncName string) context.Context { - ctx := mcontext.NewCtx(rootFuncName) - var operationID string - for _, v := range cMsg.Headers { - if string(v.Key) == constant.OperationID { - operationID = string(v.Value) - } - } - mcontext.SetOperationID(ctx, operationID) - return ctx +func (mc *MConsumerGroup) GetContextFromMsg(cMsg *sarama.ConsumerMessage) context.Context { + return GetContextWithMQHeader(cMsg.Headers) + } func (mc *MConsumerGroup) RegisterHandleAndConsumer(handler sarama.ConsumerGroupHandler) { From d2d3ee49794e06878c5937e62374e3a2293afc26 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Wed, 22 Mar 2023 19:31:43 +0800 Subject: [PATCH 5/9] log --- internal/msgtransfer/online_history_msg_handler.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 43349b19a..0cce342f6 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -95,7 +95,7 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) { //ctx := mcontext.NewCtx("redis consumer") //mcontext.SetOperationID(ctx, triggerID) for _, v := range ctxMsgList { - log.ZDebug(ctx, "msg come to storage center", v.message.String()) + log.ZDebug(ctx, "msg come to storage center", "message", v.message.String()) isHistory := utils.GetSwitchFromOptions(v.message.MsgData.Options, constant.IsHistory) isSenderSync := utils.GetSwitchFromOptions(v.message.MsgData.Options, constant.IsSenderSync) if isHistory { @@ -113,7 +113,7 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) { if len(modifyMsgList) > 0 { och.msgDatabase.MsgToModifyMQ(ctx, msgChannelValue.aggregationID, "", modifyMsgList) } - log.ZDebug(ctx, "msg storage length", len(storageMsgList), "push length", len(notStoragePushMsgList)) + log.ZDebug(ctx, "msg storage length", "storageMsgList", len(storageMsgList), "push length", len(notStoragePushMsgList)) if len(storageMsgList) > 0 { lastSeq, err := och.msgDatabase.BatchInsertChat2Cache(ctx, msgChannelValue.aggregationID, storageMsgList) if err != nil { From 1f80758ebb93caef7d1ccdd9c2491825fc2487b2 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Wed, 22 Mar 2023 19:38:35 +0800 Subject: [PATCH 6/9] log --- internal/msgtransfer/online_history_msg_handler.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 0cce342f6..81f0497d8 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -157,7 +157,7 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() { ctx := triggerChannelValue.ctx consumerMessages := triggerChannelValue.cMsgList //Aggregation map[userid]message list - log.ZDebug(ctx, "batch messages come to distribution center", len(consumerMessages)) + log.ZDebug(ctx, "batch messages come to distribution center", "length", len(consumerMessages)) for i := 0; i < len(consumerMessages); i++ { ctxMsg := &ContextMsg{} msgFromMQ := pbMsg.MsgDataToMQ{} @@ -178,7 +178,7 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() { aggregationMsgs[string(consumerMessages[i].Key)] = m } } - log.ZDebug(ctx, "generate map list users len", len(aggregationMsgs)) + log.ZDebug(ctx, "generate map list users len", "length", len(aggregationMsgs)) for aggregationID, v := range aggregationMsgs { if len(v) >= 0 { hashCode := utils.GetHashCode(aggregationID) @@ -223,7 +223,7 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(sess sarama.ConsumerG rwLock.Unlock() split := 1000 ctx := mcontext.WithTriggerIDContext(context.Background(), utils.OperationIDGenerator()) - log.ZDebug(ctx, "timer trigger msg consumer start", len(ccMsg)) + log.ZDebug(ctx, "timer trigger msg consumer start", "length", len(ccMsg)) for i := 0; i < len(ccMsg)/split; i++ { //log.Debug() och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ @@ -233,7 +233,7 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(sess sarama.ConsumerG och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ ctx: ctx, cMsgList: ccMsg[split*(len(ccMsg)/split):]}} } - log.ZDebug(ctx, "timer trigger msg consumer end", len(ccMsg)) + log.ZDebug(ctx, "timer trigger msg consumer end", "length", len(ccMsg)) } } } From 77a9fad1022ee619693739115cae4a8d0bae60e5 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Wed, 22 Mar 2023 19:39:20 +0800 Subject: [PATCH 7/9] log --- internal/msgtransfer/online_history_msg_handler.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 81f0497d8..fa70ca435 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -192,8 +192,10 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() { } } -func (OnlineHistoryRedisConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } -func (OnlineHistoryRedisConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } +func (och *OnlineHistoryRedisConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } +func (och *OnlineHistoryRedisConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { + return nil +} func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group for { From 56cd2b0a4620bfb6dbce390676d9e81b542ce912 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Wed, 22 Mar 2023 20:24:13 +0800 Subject: [PATCH 8/9] log --- internal/msggateway/client.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/msggateway/client.go b/internal/msggateway/client.go index 05f5182da..97955d77b 100644 --- a/internal/msggateway/client.go +++ b/internal/msggateway/client.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" "runtime/debug" @@ -127,6 +128,8 @@ func (c *Client) handleMessage(message []byte) error { switch binaryReq.ReqIdentifier { case WSGetNewestSeq: resp, messageErr = c.longConnServer.GetSeq(ctx, binaryReq) + log.ZError(ctx, "WSGetNewestSeq", messageErr, "resp", resp) + case WSSendMsg: resp, messageErr = c.longConnServer.SendMessage(ctx, binaryReq) case WSSendSignalMsg: From 646cf767f9061482ef7e1d6ee468dd6a0bbcb582 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Wed, 22 Mar 2023 20:33:37 +0800 Subject: [PATCH 9/9] log --- internal/msggateway/client.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/msggateway/client.go b/internal/msggateway/client.go index 97955d77b..3299be1ca 100644 --- a/internal/msggateway/client.go +++ b/internal/msggateway/client.go @@ -92,6 +92,7 @@ func (c *Client) readMessage() { } returnErr = c.handleMessage(message) if returnErr != nil { + log.ZError(context.Background(), "WSGetNewestSeq", returnErr) break }