Merge branch 'tuoyun' of github.com:OpenIMSDK/Open-IM-Server into tuoyun

This commit is contained in:
wangchuxiao 2022-05-26 10:08:22 +08:00
commit 9beff11125

View File

@ -17,6 +17,7 @@ import (
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"hash/crc32" "hash/crc32"
"strings" "strings"
"sync"
"time" "time"
) )
@ -50,11 +51,11 @@ func (och *OnlineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) {
go och.MessagesDistributionHandle() go och.MessagesDistributionHandle()
och.cmdCh = cmdCh och.cmdCh = cmdCh
for i := 0; i < ChannelNum; i++ { for i := 0; i < ChannelNum; i++ {
och.chArrays[i] = make(chan Cmd2Value, 100) och.chArrays[i] = make(chan Cmd2Value, 50)
go och.Run(i) go och.Run(i)
} }
for i := 0; i < ChannelNum; i++ { for i := 0; i < ChannelNum; i++ {
och.chMongoArrays[i] = make(chan Cmd2Value, 1000) och.chMongoArrays[i] = make(chan Cmd2Value, 10000)
go och.MongoMessageRun(i) go och.MongoMessageRun(i)
} }
if config.Config.ReliableStorage { if config.Config.ReliableStorage {
@ -153,7 +154,6 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) {
} }
} }
func (och *OnlineHistoryConsumerHandler) SendMessageToMongoCH(userID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq uint64) { func (och *OnlineHistoryConsumerHandler) SendMessageToMongoCH(userID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq uint64) {
return
hashCode := getHashCode(userID) hashCode := getHashCode(userID)
channelID := hashCode % ChannelNum channelID := hashCode % ChannelNum
log.Debug(triggerID, "generate channelID", hashCode, channelID, userID) log.Debug(triggerID, "generate channelID", hashCode, channelID, userID)
@ -389,26 +389,13 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS
break break
} }
} }
rwLock := new(sync.RWMutex)
log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition())
cMsg := make([]*sarama.ConsumerMessage, 0, 1000) cMsg := make([]*sarama.ConsumerMessage, 0, 1000)
t := time.NewTicker(time.Duration(100) * time.Millisecond) t := time.NewTicker(time.Duration(100) * time.Millisecond)
var triggerID string var triggerID string
for msg := range claim.Messages() { go func() {
//msgFromMQ := pbMsg.MsgDataToMQ{} for {
//err := proto.Unmarshal(msg.Value, &msgFromMQ)
//if err != nil {
// log.Error(triggerID, "msg_transfer Unmarshal msg err", "msg", string(msg.Value), "err", err.Error())
//}
//userID := string(msg.Key)
//hashCode := getHashCode(userID)
//channelID := hashCode % ChannelNum
//log.Debug(triggerID, "generate channelID", hashCode, channelID, userID)
////go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) {
//och.chArrays[channelID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: []*pbMsg.MsgDataToMQ{&msgFromMQ}, triggerID: msgFromMQ.OperationID}}
//sess.MarkMessage(msg, "")
cMsg = append(cMsg, msg)
//och.TriggerCmd(OnlineTopicBusy)
select { select {
//case : //case :
// triggerID = utils.OperationIDGenerator() // triggerID = utils.OperationIDGenerator()
@ -430,24 +417,57 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS
case <-t.C: case <-t.C:
if len(cMsg) > 0 { if len(cMsg) > 0 {
rwLock.Lock()
ccMsg := make([]*sarama.ConsumerMessage, 0, 1000) ccMsg := make([]*sarama.ConsumerMessage, 0, 1000)
for _, v := range cMsg { for _, v := range cMsg {
ccMsg = append(ccMsg, v) ccMsg = append(ccMsg, v)
} }
triggerID = utils.OperationIDGenerator()
log.Debug(triggerID, "timer trigger msg consumer start", len(ccMsg))
och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{
triggerID: triggerID, cmsgList: ccMsg}}
sess.MarkMessage(ccMsg[len(cMsg)-1], "")
cMsg = make([]*sarama.ConsumerMessage, 0, 1000) cMsg = make([]*sarama.ConsumerMessage, 0, 1000)
log.Debug(triggerID, "timer trigger msg consumer end", len(cMsg)) rwLock.Unlock()
split := 1000
triggerID = utils.OperationIDGenerator()
log.NewWarn(triggerID, "timer trigger msg consumer start", len(ccMsg))
for i := 0; i < len(ccMsg)/split; i++ {
//log.Debug()
och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{
triggerID: triggerID, cmsgList: ccMsg[i*split : (i+1)*split]}}
}
if (len(ccMsg) % split) > 0 {
och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{
triggerID: triggerID, cmsgList: ccMsg[split*(len(ccMsg)/split):]}}
}
//sess.MarkMessage(ccMsg[len(cMsg)-1], "")
log.NewWarn(triggerID, "timer trigger msg consumer end", len(cMsg))
} }
default:
} }
}
}()
for msg := range claim.Messages() {
//msgFromMQ := pbMsg.MsgDataToMQ{}
//err := proto.Unmarshal(msg.Value, &msgFromMQ)
//if err != nil {
// log.Error(triggerID, "msg_transfer Unmarshal msg err", "msg", string(msg.Value), "err", err.Error())
//}
//userID := string(msg.Key)
//hashCode := getHashCode(userID)
//channelID := hashCode % ChannelNum
//log.Debug(triggerID, "generate channelID", hashCode, channelID, userID)
////go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) {
//och.chArrays[channelID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: []*pbMsg.MsgDataToMQ{&msgFromMQ}, triggerID: msgFromMQ.OperationID}}
//sess.MarkMessage(msg, "")
rwLock.Lock()
cMsg = append(cMsg, msg)
rwLock.Unlock()
sess.MarkMessage(msg, "")
//och.TriggerCmd(OnlineTopicBusy)
//log.NewDebug("", "online kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "online", msg.Offset, claim.HighWaterMarkOffset()) //log.NewDebug("", "online kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "online", msg.Offset, claim.HighWaterMarkOffset())
} }
return nil return nil
} }