From fd42c6dced1dc82432c86a38d62d5a8fa76ae22d Mon Sep 17 00:00:00 2001
From: "fengyun.rui" <rfyiamcool@163.com>
Date: Mon, 13 Nov 2023 13:17:50 +0800
Subject: [PATCH] fix: reduce lock msg transfer (#1308)

* fix: reduce lock msg transfer

Signed-off-by: rfyiamcool <rfyiamcool@163.com>

* fix: reduce lock msg transfer

Signed-off-by: rfyiamcool <rfyiamcool@163.com>

---------

Signed-off-by: rfyiamcool <rfyiamcool@163.com>
---
 internal/msgtransfer/init.go                  |  3 +-
 .../msgtransfer/online_history_msg_handler.go | 73 +++++++++++--------
 2 files changed, 44 insertions(+), 32 deletions(-)

diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go
index bebf6819a..4ce015543 100644
--- a/internal/msgtransfer/init.go
+++ b/internal/msgtransfer/init.go
@@ -21,14 +21,13 @@ import (
 	"net/http"
 	"sync"
 
+	"github.com/OpenIMSDK/tools/mw"
 	"github.com/prometheus/client_golang/prometheus"
 	"github.com/prometheus/client_golang/prometheus/collectors"
 	"github.com/prometheus/client_golang/prometheus/promhttp"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/credentials/insecure"
 
-	"github.com/OpenIMSDK/tools/mw"
-
 	"github.com/openimsdk/open-im-server/v3/pkg/common/config"
 	"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
 	"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go
index b4556634c..b019b0120 100644
--- a/internal/msgtransfer/online_history_msg_handler.go
+++ b/internal/msgtransfer/online_history_msg_handler.go
@@ -427,49 +427,62 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(
 			break
 		}
 	}
-	rwLock := new(sync.RWMutex)
 	log.ZDebug(context.Background(), "online new session msg come", "highWaterMarkOffset",
 		claim.HighWaterMarkOffset(), "topic", claim.Topic(), "partition", claim.Partition())
-	cMsg := make([]*sarama.ConsumerMessage, 0, 1000)
-	t := time.NewTicker(time.Millisecond * 100)
+
+	split := 1000
+	rwLock := new(sync.RWMutex)
+	messages := make([]*sarama.ConsumerMessage, 0, 1000)
+	ticker := time.NewTicker(time.Millisecond * 100)
+
 	go func() {
 		for {
 			select {
-			case <-t.C:
-				if len(cMsg) > 0 {
-					rwLock.Lock()
-					ccMsg := make([]*sarama.ConsumerMessage, 0, 1000)
-					for _, v := range cMsg {
-						ccMsg = append(ccMsg, v)
-					}
-					cMsg = make([]*sarama.ConsumerMessage, 0, 1000)
-					rwLock.Unlock()
-					split := 1000
-					ctx := mcontext.WithTriggerIDContext(context.Background(), utils.OperationIDGenerator())
-					log.ZDebug(ctx, "timer trigger msg consumer start", "length", len(ccMsg))
-					for i := 0; i < len(ccMsg)/split; i++ {
-						// log.Debug()
-						och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{
-							ctx: ctx, cMsgList: ccMsg[i*split : (i+1)*split],
-						}}
-					}
-					if (len(ccMsg) % split) > 0 {
-						och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{
-							ctx: ctx, cMsgList: ccMsg[split*(len(ccMsg)/split):],
-						}}
-					}
-					log.ZDebug(ctx, "timer trigger msg consumer end", "length", len(ccMsg))
+			case <-ticker.C:
+				if len(messages) == 0 {
+					continue
 				}
+
+				rwLock.Lock()
+				buffer := make([]*sarama.ConsumerMessage, 0, len(messages))
+				buffer = append(buffer, messages...)
+
+				// reuse slice, set cap to 0
+				messages = messages[:0]
+				rwLock.Unlock()
+
+				start := time.Now()
+				ctx := mcontext.WithTriggerIDContext(context.Background(), utils.OperationIDGenerator())
+				log.ZDebug(ctx, "timer trigger msg consumer start", "length", len(buffer))
+				for i := 0; i < len(buffer)/split; i++ {
+					och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{
+						ctx: ctx, cMsgList: buffer[i*split : (i+1)*split],
+					}}
+				}
+				if (len(buffer) % split) > 0 {
+					och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{
+						ctx: ctx, cMsgList: buffer[split*(len(buffer)/split):],
+					}}
+				}
+
+				log.ZDebug(ctx, "timer trigger msg consumer end",
+					"length", len(buffer), "time_cost", time.Since(start),
+				)
 			}
 		}
 	}()
+
 	for msg := range claim.Messages() {
-		rwLock.Lock()
-		if len(msg.Value) != 0 {
-			cMsg = append(cMsg, msg)
+		if len(msg.Value) == 0 {
+			continue
 		}
+
+		rwLock.Lock()
+		messages = append(messages, msg)
 		rwLock.Unlock()
+
 		sess.MarkMessage(msg, "")
 	}
+
 	return nil
 }