From ee47c6d7274abd95d33f9e913ecdcf0459a4ec9e Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Wed, 25 May 2022 20:29:32 +0800 Subject: [PATCH 1/9] channelNum --- .../logic/online_history_msg_handler.go | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index a915fce0e..3deed6a23 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -17,6 +17,7 @@ import ( "github.com/golang/protobuf/proto" "hash/crc32" "strings" + "sync" "time" ) @@ -389,7 +390,7 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS break } } - + rwLock := new(sync.RWMutex) log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) cMsg := make([]*sarama.ConsumerMessage, 0, 1000) t := time.NewTicker(time.Duration(100) * time.Millisecond) @@ -407,8 +408,16 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS ////go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) { //och.chArrays[channelID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: []*pbMsg.MsgDataToMQ{&msgFromMQ}, triggerID: msgFromMQ.OperationID}} //sess.MarkMessage(msg, "") + rwLock.Lock() cMsg = append(cMsg, msg) + rwLock.Unlock() + sess.MarkMessage(msg, "") //och.TriggerCmd(OnlineTopicBusy) + + //log.NewDebug("", "online kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "online", msg.Offset, claim.HighWaterMarkOffset()) + + } + go func() { select { //case : // triggerID = utils.OperationIDGenerator() @@ -430,24 +439,26 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS case <-t.C: if len(cMsg) > 0 { + rwLock.Lock() ccMsg := make([]*sarama.ConsumerMessage, 0, 1000) for _, v := range cMsg { ccMsg = append(ccMsg, v) } + cMsg = make([]*sarama.ConsumerMessage, 0, 1000) + rwLock.Unlock() + triggerID = utils.OperationIDGenerator() log.Debug(triggerID, "timer trigger msg consumer start", len(ccMsg)) och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ triggerID: triggerID, cmsgList: ccMsg}} - sess.MarkMessage(ccMsg[len(cMsg)-1], "") - cMsg = make([]*sarama.ConsumerMessage, 0, 1000) + //sess.MarkMessage(ccMsg[len(cMsg)-1], "") + log.Debug(triggerID, "timer trigger msg consumer end", len(cMsg)) } - default: } - //log.NewDebug("", "online kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "online", msg.Offset, claim.HighWaterMarkOffset()) + }() - } return nil } From bf87eb68e2f9aa5d11c0b19aa9ae7e4a98750edb Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Wed, 25 May 2022 21:03:48 +0800 Subject: [PATCH 2/9] channelNum --- .../logic/online_history_msg_handler.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index 3deed6a23..095319f10 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -446,11 +446,17 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS } cMsg = make([]*sarama.ConsumerMessage, 0, 1000) rwLock.Unlock() - + split := 1000 triggerID = utils.OperationIDGenerator() - log.Debug(triggerID, "timer trigger msg consumer start", len(ccMsg)) - och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ - triggerID: triggerID, cmsgList: ccMsg}} + for i := 0; i < len(ccMsg)/split; i++ { + log.Debug(triggerID, "timer trigger msg consumer start", len(ccMsg)) + och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ + triggerID: triggerID, cmsgList: ccMsg[i*split : (i+1)*split]}} + } + if (len(ccMsg) % split) > 0 { + och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ + triggerID: triggerID, cmsgList: ccMsg[split*(len(ccMsg)/split):]}} + } //sess.MarkMessage(ccMsg[len(cMsg)-1], "") log.Debug(triggerID, "timer trigger msg consumer end", len(cMsg)) From bcfa3efef7b3d729bfab6139ebf08ad704709b16 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Wed, 25 May 2022 21:15:17 +0800 Subject: [PATCH 3/9] channelNum --- .../logic/online_history_msg_handler.go | 86 ++++++++++--------- 1 file changed, 45 insertions(+), 41 deletions(-) diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index 095319f10..48f190dde 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -418,51 +418,55 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS } go func() { - select { - //case : - // triggerID = utils.OperationIDGenerator() - // - // log.NewDebug(triggerID, "claim.Messages ", msg) - // cMsg = append(cMsg, msg) - // if len(cMsg) >= 1000 { - // ccMsg := make([]*sarama.ConsumerMessage, 0, 1000) - // for _, v := range cMsg { - // ccMsg = append(ccMsg, v) - // } - // log.Debug(triggerID, "length trigger msg consumer start", len(ccMsg)) - // och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ - // triggerID: triggerID, cmsgList: ccMsg}} - // sess.MarkMessage(msg, "") - // cMsg = make([]*sarama.ConsumerMessage, 0, 1000) - // log.Debug(triggerID, "length trigger msg consumer end", len(cMsg)) - // } + for { + select { + //case : + // triggerID = utils.OperationIDGenerator() + // + // log.NewDebug(triggerID, "claim.Messages ", msg) + // cMsg = append(cMsg, msg) + // if len(cMsg) >= 1000 { + // ccMsg := make([]*sarama.ConsumerMessage, 0, 1000) + // for _, v := range cMsg { + // ccMsg = append(ccMsg, v) + // } + // log.Debug(triggerID, "length trigger msg consumer start", len(ccMsg)) + // och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ + // triggerID: triggerID, cmsgList: ccMsg}} + // sess.MarkMessage(msg, "") + // cMsg = make([]*sarama.ConsumerMessage, 0, 1000) + // log.Debug(triggerID, "length trigger msg consumer end", len(cMsg)) + // } - case <-t.C: - if len(cMsg) > 0 { - rwLock.Lock() - ccMsg := make([]*sarama.ConsumerMessage, 0, 1000) - for _, v := range cMsg { - ccMsg = append(ccMsg, v) - } - cMsg = make([]*sarama.ConsumerMessage, 0, 1000) - rwLock.Unlock() - split := 1000 - triggerID = utils.OperationIDGenerator() - for i := 0; i < len(ccMsg)/split; i++ { - log.Debug(triggerID, "timer trigger msg consumer start", len(ccMsg)) - och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ - triggerID: triggerID, cmsgList: ccMsg[i*split : (i+1)*split]}} - } - if (len(ccMsg) % split) > 0 { - och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ - triggerID: triggerID, cmsgList: ccMsg[split*(len(ccMsg)/split):]}} - } - //sess.MarkMessage(ccMsg[len(cMsg)-1], "") + case <-t.C: + if len(cMsg) > 0 { + rwLock.Lock() + ccMsg := make([]*sarama.ConsumerMessage, 0, 1000) + for _, v := range cMsg { + ccMsg = append(ccMsg, v) + } + cMsg = make([]*sarama.ConsumerMessage, 0, 1000) + rwLock.Unlock() + split := 1000 + triggerID = utils.OperationIDGenerator() + for i := 0; i < len(ccMsg)/split; i++ { + log.NewWarn(triggerID, "timer trigger msg consumer start", len(ccMsg)) + //log.Debug() + och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ + triggerID: triggerID, cmsgList: ccMsg[i*split : (i+1)*split]}} + } + if (len(ccMsg) % split) > 0 { + och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ + triggerID: triggerID, cmsgList: ccMsg[split*(len(ccMsg)/split):]}} + } + //sess.MarkMessage(ccMsg[len(cMsg)-1], "") + + log.NewWarn(triggerID, "timer trigger msg consumer end", len(cMsg)) + } - log.Debug(triggerID, "timer trigger msg consumer end", len(cMsg)) } - } + }() return nil From 94cd17909d0cc9039c54ebba8d8d98b090e6471a Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Wed, 25 May 2022 21:23:10 +0800 Subject: [PATCH 4/9] channelNum --- .../logic/online_history_msg_handler.go | 44 +++++++++---------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index 48f190dde..7434dfd5d 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -395,28 +395,6 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS cMsg := make([]*sarama.ConsumerMessage, 0, 1000) t := time.NewTicker(time.Duration(100) * time.Millisecond) var triggerID string - for msg := range claim.Messages() { - //msgFromMQ := pbMsg.MsgDataToMQ{} - //err := proto.Unmarshal(msg.Value, &msgFromMQ) - //if err != nil { - // log.Error(triggerID, "msg_transfer Unmarshal msg err", "msg", string(msg.Value), "err", err.Error()) - //} - //userID := string(msg.Key) - //hashCode := getHashCode(userID) - //channelID := hashCode % ChannelNum - //log.Debug(triggerID, "generate channelID", hashCode, channelID, userID) - ////go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) { - //och.chArrays[channelID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: []*pbMsg.MsgDataToMQ{&msgFromMQ}, triggerID: msgFromMQ.OperationID}} - //sess.MarkMessage(msg, "") - rwLock.Lock() - cMsg = append(cMsg, msg) - rwLock.Unlock() - sess.MarkMessage(msg, "") - //och.TriggerCmd(OnlineTopicBusy) - - //log.NewDebug("", "online kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "online", msg.Offset, claim.HighWaterMarkOffset()) - - } go func() { for { select { @@ -468,6 +446,28 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS } }() + for msg := range claim.Messages() { + //msgFromMQ := pbMsg.MsgDataToMQ{} + //err := proto.Unmarshal(msg.Value, &msgFromMQ) + //if err != nil { + // log.Error(triggerID, "msg_transfer Unmarshal msg err", "msg", string(msg.Value), "err", err.Error()) + //} + //userID := string(msg.Key) + //hashCode := getHashCode(userID) + //channelID := hashCode % ChannelNum + //log.Debug(triggerID, "generate channelID", hashCode, channelID, userID) + ////go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) { + //och.chArrays[channelID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: []*pbMsg.MsgDataToMQ{&msgFromMQ}, triggerID: msgFromMQ.OperationID}} + //sess.MarkMessage(msg, "") + rwLock.Lock() + cMsg = append(cMsg, msg) + rwLock.Unlock() + sess.MarkMessage(msg, "") + //och.TriggerCmd(OnlineTopicBusy) + + //log.NewDebug("", "online kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "online", msg.Offset, claim.HighWaterMarkOffset()) + + } return nil } From 948e5a2ddea3957dc240d319c09ac4e5938ce719 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Wed, 25 May 2022 21:43:35 +0800 Subject: [PATCH 5/9] channelNum --- internal/msg_transfer/logic/online_history_msg_handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index 7434dfd5d..695691107 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -427,8 +427,8 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS rwLock.Unlock() split := 1000 triggerID = utils.OperationIDGenerator() + log.NewWarn(triggerID, "timer trigger msg consumer start", len(ccMsg)) for i := 0; i < len(ccMsg)/split; i++ { - log.NewWarn(triggerID, "timer trigger msg consumer start", len(ccMsg)) //log.Debug() och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ triggerID: triggerID, cmsgList: ccMsg[i*split : (i+1)*split]}} From c83e816bdae0011b9034b12561479268c4564203 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Wed, 25 May 2022 22:15:34 +0800 Subject: [PATCH 6/9] channelNum --- internal/msg_transfer/logic/online_history_msg_handler.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index 695691107..2baa30a36 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -154,7 +154,6 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) { } } func (och *OnlineHistoryConsumerHandler) SendMessageToMongoCH(userID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq uint64) { - return hashCode := getHashCode(userID) channelID := hashCode % ChannelNum log.Debug(triggerID, "generate channelID", hashCode, channelID, userID) From 5621a651a16ff1eda44e64e93863ad5cf6d4ca6e Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Wed, 25 May 2022 22:17:11 +0800 Subject: [PATCH 7/9] channelNum --- internal/msg_transfer/logic/online_history_msg_handler.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index 2baa30a36..0f07bd63f 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -51,11 +51,11 @@ func (och *OnlineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) { go och.MessagesDistributionHandle() och.cmdCh = cmdCh for i := 0; i < ChannelNum; i++ { - och.chArrays[i] = make(chan Cmd2Value, 100) + och.chArrays[i] = make(chan Cmd2Value, 10) go och.Run(i) } for i := 0; i < ChannelNum; i++ { - och.chMongoArrays[i] = make(chan Cmd2Value, 1000) + och.chMongoArrays[i] = make(chan Cmd2Value, 100) go och.MongoMessageRun(i) } if config.Config.ReliableStorage { From 2910a6effd29978b3c5f1c14ff9da6967d9acaa5 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Wed, 25 May 2022 22:30:30 +0800 Subject: [PATCH 8/9] channelNum --- internal/msg_transfer/logic/online_history_msg_handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index 0f07bd63f..328669a7e 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -55,7 +55,7 @@ func (och *OnlineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) { go och.Run(i) } for i := 0; i < ChannelNum; i++ { - och.chMongoArrays[i] = make(chan Cmd2Value, 100) + och.chMongoArrays[i] = make(chan Cmd2Value, 1000) go och.MongoMessageRun(i) } if config.Config.ReliableStorage { From 2add9a02f966e9aa691c272f943444f3a2ac49b1 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Wed, 25 May 2022 22:42:44 +0800 Subject: [PATCH 9/9] channelNum --- internal/msg_transfer/logic/online_history_msg_handler.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index 328669a7e..fb5b33ce9 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -51,11 +51,11 @@ func (och *OnlineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) { go och.MessagesDistributionHandle() och.cmdCh = cmdCh for i := 0; i < ChannelNum; i++ { - och.chArrays[i] = make(chan Cmd2Value, 10) + och.chArrays[i] = make(chan Cmd2Value, 50) go och.Run(i) } for i := 0; i < ChannelNum; i++ { - och.chMongoArrays[i] = make(chan Cmd2Value, 1000) + och.chMongoArrays[i] = make(chan Cmd2Value, 10000) go och.MongoMessageRun(i) } if config.Config.ReliableStorage {