mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-11-04 11:22:10 +08:00 
			
		
		
		
	* refactor: db refactor and cache key add. * refactor: db refactor and cache key add. * refactor: go version update. * refactor: file name change.
		
			
				
	
	
		
			483 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			483 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// Copyright © 2023 OpenIM. All rights reserved.
 | 
						|
//
 | 
						|
// Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
// you may not use this file except in compliance with the License.
 | 
						|
// You may obtain a copy of the License at
 | 
						|
//
 | 
						|
//     http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
//
 | 
						|
// Unless required by applicable law or agreed to in writing, software
 | 
						|
// distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
// See the License for the specific language governing permissions and
 | 
						|
// limitations under the License.
 | 
						|
 | 
						|
package msgtransfer
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"strconv"
 | 
						|
	"strings"
 | 
						|
	"sync"
 | 
						|
	"sync/atomic"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/IBM/sarama"
 | 
						|
	"github.com/go-redis/redis"
 | 
						|
	"github.com/openimsdk/open-im-server/v3/pkg/common/config"
 | 
						|
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
 | 
						|
	"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
 | 
						|
	"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
 | 
						|
	"github.com/openimsdk/protocol/constant"
 | 
						|
	"github.com/openimsdk/protocol/sdkws"
 | 
						|
	"github.com/openimsdk/tools/errs"
 | 
						|
	"github.com/openimsdk/tools/log"
 | 
						|
	"github.com/openimsdk/tools/mcontext"
 | 
						|
	"github.com/openimsdk/tools/mq/kafka"
 | 
						|
	"github.com/openimsdk/tools/utils/idutil"
 | 
						|
	"github.com/openimsdk/tools/utils/stringutil"
 | 
						|
	"google.golang.org/protobuf/proto"
 | 
						|
)
 | 
						|
 | 
						|
const (
 | 
						|
	ConsumerMsgs   = 3
 | 
						|
	SourceMessages = 4
 | 
						|
	MongoMessages  = 5
 | 
						|
	ChannelNum     = 100
 | 
						|
)
 | 
						|
 | 
						|
type MsgChannelValue struct {
 | 
						|
	uniqueKey  string
 | 
						|
	ctx        context.Context
 | 
						|
	ctxMsgList []*ContextMsg
 | 
						|
}
 | 
						|
 | 
						|
type TriggerChannelValue struct {
 | 
						|
	ctx      context.Context
 | 
						|
	cMsgList []*sarama.ConsumerMessage
 | 
						|
}
 | 
						|
 | 
						|
type Cmd2Value struct {
 | 
						|
	Cmd   int
 | 
						|
	Value any
 | 
						|
}
 | 
						|
type ContextMsg struct {
 | 
						|
	message *sdkws.MsgData
 | 
						|
	ctx     context.Context
 | 
						|
}
 | 
						|
 | 
						|
type OnlineHistoryRedisConsumerHandler struct {
 | 
						|
	historyConsumerGroup *kafka.MConsumerGroup
 | 
						|
	chArrays             [ChannelNum]chan Cmd2Value
 | 
						|
	msgDistributionCh    chan Cmd2Value
 | 
						|
 | 
						|
	// singleMsgSuccessCount      uint64
 | 
						|
	// singleMsgFailedCount       uint64
 | 
						|
	// singleMsgSuccessCountMutex sync.Mutex
 | 
						|
	// singleMsgFailedCountMutex  sync.Mutex
 | 
						|
 | 
						|
	msgDatabase           controller.CommonMsgDatabase
 | 
						|
	conversationRpcClient *rpcclient.ConversationRpcClient
 | 
						|
	groupRpcClient        *rpcclient.GroupRpcClient
 | 
						|
}
 | 
						|
 | 
						|
func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database controller.CommonMsgDatabase,
 | 
						|
	conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) (*OnlineHistoryRedisConsumerHandler, error) {
 | 
						|
	historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToRedisGroupID, []string{kafkaConf.ToRedisTopic}, true)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	var och OnlineHistoryRedisConsumerHandler
 | 
						|
	och.msgDatabase = database
 | 
						|
	och.msgDistributionCh = make(chan Cmd2Value) // no buffer channel
 | 
						|
	go och.MessagesDistributionHandle()
 | 
						|
	for i := 0; i < ChannelNum; i++ {
 | 
						|
		och.chArrays[i] = make(chan Cmd2Value, 50)
 | 
						|
		go och.Run(i)
 | 
						|
	}
 | 
						|
	och.conversationRpcClient = conversationRpcClient
 | 
						|
	och.groupRpcClient = groupRpcClient
 | 
						|
	och.historyConsumerGroup = historyConsumerGroup
 | 
						|
	return &och, err
 | 
						|
}
 | 
						|
 | 
						|
func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
 | 
						|
	for cmd := range och.chArrays[channelID] {
 | 
						|
		switch cmd.Cmd {
 | 
						|
		case SourceMessages:
 | 
						|
			msgChannelValue := cmd.Value.(MsgChannelValue)
 | 
						|
			ctxMsgList := msgChannelValue.ctxMsgList
 | 
						|
			ctx := msgChannelValue.ctx
 | 
						|
			log.ZDebug(
 | 
						|
				ctx,
 | 
						|
				"msg arrived channel",
 | 
						|
				"channel id",
 | 
						|
				channelID,
 | 
						|
				"msgList length",
 | 
						|
				len(ctxMsgList),
 | 
						|
				"uniqueKey",
 | 
						|
				msgChannelValue.uniqueKey,
 | 
						|
			)
 | 
						|
			storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList, modifyMsgList := och.getPushStorageMsgList(
 | 
						|
				ctxMsgList,
 | 
						|
			)
 | 
						|
			log.ZDebug(
 | 
						|
				ctx,
 | 
						|
				"msg lens",
 | 
						|
				"storageMsgList",
 | 
						|
				len(storageMsgList),
 | 
						|
				"notStorageMsgList",
 | 
						|
				len(notStorageMsgList),
 | 
						|
				"storageNotificationList",
 | 
						|
				len(storageNotificationList),
 | 
						|
				"notStorageNotificationList",
 | 
						|
				len(notStorageNotificationList),
 | 
						|
				"modifyMsgList",
 | 
						|
				len(modifyMsgList),
 | 
						|
			)
 | 
						|
			conversationIDMsg := msgprocessor.GetChatConversationIDByMsg(ctxMsgList[0].message)
 | 
						|
			conversationIDNotification := msgprocessor.GetNotificationConversationIDByMsg(ctxMsgList[0].message)
 | 
						|
			och.handleMsg(ctx, msgChannelValue.uniqueKey, conversationIDMsg, storageMsgList, notStorageMsgList)
 | 
						|
			och.handleNotification(
 | 
						|
				ctx,
 | 
						|
				msgChannelValue.uniqueKey,
 | 
						|
				conversationIDNotification,
 | 
						|
				storageNotificationList,
 | 
						|
				notStorageNotificationList,
 | 
						|
			)
 | 
						|
			if err := och.msgDatabase.MsgToModifyMQ(ctx, msgChannelValue.uniqueKey, conversationIDNotification, modifyMsgList); err != nil {
 | 
						|
				log.ZError(ctx, "msg to modify mq error", err, "uniqueKey", msgChannelValue.uniqueKey, "modifyMsgList", modifyMsgList)
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Get messages/notifications stored message list, not stored and pushed message list.
 | 
						|
func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(
 | 
						|
	totalMsgs []*ContextMsg,
 | 
						|
) (storageMsgList, notStorageMsgList, storageNotificatoinList, notStorageNotificationList, modifyMsgList []*sdkws.MsgData) {
 | 
						|
	isStorage := func(msg *sdkws.MsgData) bool {
 | 
						|
		options2 := msgprocessor.Options(msg.Options)
 | 
						|
		if options2.IsHistory() {
 | 
						|
			return true
 | 
						|
		}
 | 
						|
		// if !(!options2.IsSenderSync() && conversationID == msg.MsgData.SendID) {
 | 
						|
		// 	return false
 | 
						|
		// }
 | 
						|
		return false
 | 
						|
	}
 | 
						|
	for _, v := range totalMsgs {
 | 
						|
		options := msgprocessor.Options(v.message.Options)
 | 
						|
		if !options.IsNotNotification() {
 | 
						|
			// clone msg from notificationMsg
 | 
						|
			if options.IsSendMsg() {
 | 
						|
				msg := proto.Clone(v.message).(*sdkws.MsgData)
 | 
						|
				// message
 | 
						|
				if v.message.Options != nil {
 | 
						|
					msg.Options = msgprocessor.NewMsgOptions()
 | 
						|
				}
 | 
						|
				msg.Options = msgprocessor.WithOptions(msg.Options,
 | 
						|
					msgprocessor.WithOfflinePush(options.IsOfflinePush()),
 | 
						|
					msgprocessor.WithUnreadCount(options.IsUnreadCount()),
 | 
						|
				)
 | 
						|
				v.message.Options = msgprocessor.WithOptions(
 | 
						|
					v.message.Options,
 | 
						|
					msgprocessor.WithOfflinePush(false),
 | 
						|
					msgprocessor.WithUnreadCount(false),
 | 
						|
				)
 | 
						|
				storageMsgList = append(storageMsgList, msg)
 | 
						|
			}
 | 
						|
			if isStorage(v.message) {
 | 
						|
				storageNotificatoinList = append(storageNotificatoinList, v.message)
 | 
						|
			} else {
 | 
						|
				notStorageNotificationList = append(notStorageNotificationList, v.message)
 | 
						|
			}
 | 
						|
		} else {
 | 
						|
			if isStorage(v.message) {
 | 
						|
				storageMsgList = append(storageMsgList, v.message)
 | 
						|
			} else {
 | 
						|
				notStorageMsgList = append(notStorageMsgList, v.message)
 | 
						|
			}
 | 
						|
		}
 | 
						|
		if v.message.ContentType == constant.ReactionMessageModifier ||
 | 
						|
			v.message.ContentType == constant.ReactionMessageDeleter {
 | 
						|
			modifyMsgList = append(modifyMsgList, v.message)
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
func (och *OnlineHistoryRedisConsumerHandler) handleNotification(
 | 
						|
	ctx context.Context,
 | 
						|
	key, conversationID string,
 | 
						|
	storageList, notStorageList []*sdkws.MsgData,
 | 
						|
) {
 | 
						|
	och.toPushTopic(ctx, key, conversationID, notStorageList)
 | 
						|
	if len(storageList) > 0 {
 | 
						|
		lastSeq, _, err := och.msgDatabase.BatchInsertChat2Cache(ctx, conversationID, storageList)
 | 
						|
		if err != nil {
 | 
						|
			log.ZError(
 | 
						|
				ctx,
 | 
						|
				"notification batch insert to redis error",
 | 
						|
				err,
 | 
						|
				"conversationID",
 | 
						|
				conversationID,
 | 
						|
				"storageList",
 | 
						|
				storageList,
 | 
						|
			)
 | 
						|
			return
 | 
						|
		}
 | 
						|
		log.ZDebug(ctx, "success to next topic", "conversationID", conversationID)
 | 
						|
		err = och.msgDatabase.MsgToMongoMQ(ctx, key, conversationID, storageList, lastSeq)
 | 
						|
		if err != nil {
 | 
						|
			log.ZError(ctx, "MsgToMongoMQ error", err)
 | 
						|
		}
 | 
						|
		och.toPushTopic(ctx, key, conversationID, storageList)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, key, conversationID string, msgs []*sdkws.MsgData) {
 | 
						|
	for _, v := range msgs {
 | 
						|
		och.msgDatabase.MsgToPushMQ(ctx, key, conversationID, v) // nolint: errcheck
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key, conversationID string, storageList, notStorageList []*sdkws.MsgData) {
 | 
						|
	och.toPushTopic(ctx, key, conversationID, notStorageList)
 | 
						|
	if len(storageList) > 0 {
 | 
						|
		lastSeq, isNewConversation, err := och.msgDatabase.BatchInsertChat2Cache(ctx, conversationID, storageList)
 | 
						|
		if err != nil && errs.Unwrap(err) != redis.Nil {
 | 
						|
			log.ZError(ctx, "batch data insert to redis err", err, "storageMsgList", storageList)
 | 
						|
			return
 | 
						|
		}
 | 
						|
		if isNewConversation {
 | 
						|
			switch storageList[0].SessionType {
 | 
						|
			case constant.ReadGroupChatType:
 | 
						|
				log.ZInfo(ctx, "group chat first create conversation", "conversationID",
 | 
						|
					conversationID)
 | 
						|
				userIDs, err := och.groupRpcClient.GetGroupMemberIDs(ctx, storageList[0].GroupID)
 | 
						|
				if err != nil {
 | 
						|
					log.ZWarn(ctx, "get group member ids error", err, "conversationID",
 | 
						|
						conversationID)
 | 
						|
				} else {
 | 
						|
					if err := och.conversationRpcClient.GroupChatFirstCreateConversation(ctx,
 | 
						|
						storageList[0].GroupID, userIDs); err != nil {
 | 
						|
						log.ZWarn(ctx, "single chat first create conversation error", err,
 | 
						|
							"conversationID", conversationID)
 | 
						|
					}
 | 
						|
				}
 | 
						|
			case constant.SingleChatType, constant.NotificationChatType:
 | 
						|
				if err := och.conversationRpcClient.SingleChatFirstCreateConversation(ctx, storageList[0].RecvID,
 | 
						|
					storageList[0].SendID, conversationID, storageList[0].SessionType); err != nil {
 | 
						|
					log.ZWarn(ctx, "single chat or notification first create conversation error", err,
 | 
						|
						"conversationID", conversationID, "sessionType", storageList[0].SessionType)
 | 
						|
				}
 | 
						|
			default:
 | 
						|
				log.ZWarn(ctx, "unknown session type", nil, "sessionType",
 | 
						|
					storageList[0].SessionType)
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		log.ZDebug(ctx, "success incr to next topic")
 | 
						|
		err = och.msgDatabase.MsgToMongoMQ(ctx, key, conversationID, storageList, lastSeq)
 | 
						|
		if err != nil {
 | 
						|
			log.ZError(ctx, "MsgToMongoMQ error", err)
 | 
						|
		}
 | 
						|
		och.toPushTopic(ctx, key, conversationID, storageList)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() {
 | 
						|
	for {
 | 
						|
		aggregationMsgs := make(map[string][]*ContextMsg, ChannelNum)
 | 
						|
		select {
 | 
						|
		case cmd := <-och.msgDistributionCh:
 | 
						|
			switch cmd.Cmd {
 | 
						|
			case ConsumerMsgs:
 | 
						|
				triggerChannelValue := cmd.Value.(TriggerChannelValue)
 | 
						|
				ctx := triggerChannelValue.ctx
 | 
						|
				consumerMessages := triggerChannelValue.cMsgList
 | 
						|
				// Aggregation map[userid]message list
 | 
						|
				log.ZDebug(ctx, "batch messages come to distribution center", "length", len(consumerMessages))
 | 
						|
				for i := 0; i < len(consumerMessages); i++ {
 | 
						|
					ctxMsg := &ContextMsg{}
 | 
						|
					msgFromMQ := &sdkws.MsgData{}
 | 
						|
					err := proto.Unmarshal(consumerMessages[i].Value, msgFromMQ)
 | 
						|
					if err != nil {
 | 
						|
						log.ZError(ctx, "msg_transfer Unmarshal msg err", err, string(consumerMessages[i].Value))
 | 
						|
						continue
 | 
						|
					}
 | 
						|
					var arr []string
 | 
						|
					for i, header := range consumerMessages[i].Headers {
 | 
						|
						arr = append(arr, strconv.Itoa(i), string(header.Key), string(header.Value))
 | 
						|
					}
 | 
						|
					log.ZInfo(ctx, "consumer.kafka.GetContextWithMQHeader", "len", len(consumerMessages[i].Headers),
 | 
						|
						"header", strings.Join(arr, ", "))
 | 
						|
					ctxMsg.ctx = kafka.GetContextWithMQHeader(consumerMessages[i].Headers)
 | 
						|
					ctxMsg.message = msgFromMQ
 | 
						|
					log.ZDebug(
 | 
						|
						ctx,
 | 
						|
						"single msg come to distribution center",
 | 
						|
						"message",
 | 
						|
						msgFromMQ,
 | 
						|
						"key",
 | 
						|
						string(consumerMessages[i].Key),
 | 
						|
					)
 | 
						|
					// aggregationMsgs[string(consumerMessages[i].Key)] =
 | 
						|
					// append(aggregationMsgs[string(consumerMessages[i].Key)], ctxMsg)
 | 
						|
					if oldM, ok := aggregationMsgs[string(consumerMessages[i].Key)]; ok {
 | 
						|
						oldM = append(oldM, ctxMsg)
 | 
						|
						aggregationMsgs[string(consumerMessages[i].Key)] = oldM
 | 
						|
					} else {
 | 
						|
						m := make([]*ContextMsg, 0, 100)
 | 
						|
						m = append(m, ctxMsg)
 | 
						|
						aggregationMsgs[string(consumerMessages[i].Key)] = m
 | 
						|
					}
 | 
						|
				}
 | 
						|
				log.ZDebug(ctx, "generate map list users len", "length", len(aggregationMsgs))
 | 
						|
				for uniqueKey, v := range aggregationMsgs {
 | 
						|
					if len(v) >= 0 {
 | 
						|
						hashCode := stringutil.GetHashCode(uniqueKey)
 | 
						|
						channelID := hashCode % ChannelNum
 | 
						|
						newCtx := withAggregationCtx(ctx, v)
 | 
						|
						log.ZDebug(
 | 
						|
							newCtx,
 | 
						|
							"generate channelID",
 | 
						|
							"hashCode",
 | 
						|
							hashCode,
 | 
						|
							"channelID",
 | 
						|
							channelID,
 | 
						|
							"uniqueKey",
 | 
						|
							uniqueKey,
 | 
						|
						)
 | 
						|
						och.chArrays[channelID] <- Cmd2Value{Cmd: SourceMessages, Value: MsgChannelValue{uniqueKey: uniqueKey, ctxMsgList: v, ctx: newCtx}}
 | 
						|
					}
 | 
						|
				}
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func withAggregationCtx(ctx context.Context, values []*ContextMsg) context.Context {
 | 
						|
	var allMessageOperationID string
 | 
						|
	for i, v := range values {
 | 
						|
		if opid := mcontext.GetOperationID(v.ctx); opid != "" {
 | 
						|
			if i == 0 {
 | 
						|
				allMessageOperationID += opid
 | 
						|
			} else {
 | 
						|
				allMessageOperationID += "$" + opid
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return mcontext.SetOperationID(ctx, allMessageOperationID)
 | 
						|
}
 | 
						|
 | 
						|
func (och *OnlineHistoryRedisConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
 | 
						|
func (och *OnlineHistoryRedisConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error {
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(
 | 
						|
	sess sarama.ConsumerGroupSession,
 | 
						|
	claim sarama.ConsumerGroupClaim,
 | 
						|
) error { // a instance in the consumer group
 | 
						|
	for {
 | 
						|
		if sess == nil {
 | 
						|
			log.ZWarn(context.Background(), "sess == nil, waiting", nil)
 | 
						|
			time.Sleep(100 * time.Millisecond)
 | 
						|
		} else {
 | 
						|
			break
 | 
						|
		}
 | 
						|
	}
 | 
						|
	log.ZInfo(context.Background(), "online new session msg come", "highWaterMarkOffset",
 | 
						|
		claim.HighWaterMarkOffset(), "topic", claim.Topic(), "partition", claim.Partition())
 | 
						|
 | 
						|
	var (
 | 
						|
		split    = 1000
 | 
						|
		rwLock   = new(sync.RWMutex)
 | 
						|
		messages = make([]*sarama.ConsumerMessage, 0, 1000)
 | 
						|
		ticker   = time.NewTicker(time.Millisecond * 100)
 | 
						|
 | 
						|
		wg      = sync.WaitGroup{}
 | 
						|
		running = new(atomic.Bool)
 | 
						|
	)
 | 
						|
	running.Store(true)
 | 
						|
 | 
						|
	wg.Add(1)
 | 
						|
	go func() {
 | 
						|
		defer wg.Done()
 | 
						|
 | 
						|
		for {
 | 
						|
			select {
 | 
						|
			case <-ticker.C:
 | 
						|
				// if the buffer is empty and running is false, return loop.
 | 
						|
				if len(messages) == 0 {
 | 
						|
					if !running.Load() {
 | 
						|
						return
 | 
						|
					}
 | 
						|
 | 
						|
					continue
 | 
						|
				}
 | 
						|
 | 
						|
				rwLock.Lock()
 | 
						|
				buffer := make([]*sarama.ConsumerMessage, 0, len(messages))
 | 
						|
				buffer = append(buffer, messages...)
 | 
						|
 | 
						|
				// reuse slice, set cap to 0
 | 
						|
				messages = messages[:0]
 | 
						|
				rwLock.Unlock()
 | 
						|
 | 
						|
				start := time.Now()
 | 
						|
				ctx := mcontext.WithTriggerIDContext(context.Background(), idutil.OperationIDGenerator())
 | 
						|
				log.ZDebug(ctx, "timer trigger msg consumer start", "length", len(buffer))
 | 
						|
				for i := 0; i < len(buffer)/split; i++ {
 | 
						|
					och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{
 | 
						|
						ctx: ctx, cMsgList: buffer[i*split : (i+1)*split],
 | 
						|
					}}
 | 
						|
				}
 | 
						|
				if (len(buffer) % split) > 0 {
 | 
						|
					och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{
 | 
						|
						ctx: ctx, cMsgList: buffer[split*(len(buffer)/split):],
 | 
						|
					}}
 | 
						|
				}
 | 
						|
 | 
						|
				log.ZDebug(ctx, "timer trigger msg consumer end",
 | 
						|
					"length", len(buffer), "time_cost", time.Since(start),
 | 
						|
				)
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}()
 | 
						|
 | 
						|
	wg.Add(1)
 | 
						|
	go func() {
 | 
						|
		defer wg.Done()
 | 
						|
 | 
						|
		for running.Load() {
 | 
						|
			select {
 | 
						|
			case msg, ok := <-claim.Messages():
 | 
						|
				if !ok {
 | 
						|
					running.Store(false)
 | 
						|
					return
 | 
						|
				}
 | 
						|
 | 
						|
				if len(msg.Value) == 0 {
 | 
						|
					continue
 | 
						|
				}
 | 
						|
 | 
						|
				rwLock.Lock()
 | 
						|
				messages = append(messages, msg)
 | 
						|
				rwLock.Unlock()
 | 
						|
 | 
						|
				sess.MarkMessage(msg, "")
 | 
						|
 | 
						|
			case <-sess.Context().Done():
 | 
						|
				running.Store(false)
 | 
						|
				return
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}()
 | 
						|
 | 
						|
	wg.Wait()
 | 
						|
	return nil
 | 
						|
}
 |