mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-06 04:15:46 +08:00
aggres
This commit is contained in:
parent
573b39f248
commit
d855beb354
@ -23,14 +23,14 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const ConsumerMsgs = 3
|
const ConsumerMsgs = 3
|
||||||
const AggregationMessages = 4
|
const SourceMessages = 4
|
||||||
const MongoMessages = 5
|
const MongoMessages = 5
|
||||||
const ChannelNum = 100
|
const ChannelNum = 100
|
||||||
|
|
||||||
type MsgChannelValue struct {
|
type MsgChannelValue struct {
|
||||||
aggregationID string //maybe userID or super groupID
|
sourceID string //maybe userID or super groupID
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
ctxMsgList []*ContextMsg
|
ctxMsgList []*ContextMsg
|
||||||
}
|
}
|
||||||
|
|
||||||
type TriggerChannelValue struct {
|
type TriggerChannelValue struct {
|
||||||
@ -84,16 +84,16 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
|
|||||||
select {
|
select {
|
||||||
case cmd := <-och.chArrays[channelID]:
|
case cmd := <-och.chArrays[channelID]:
|
||||||
switch cmd.Cmd {
|
switch cmd.Cmd {
|
||||||
case AggregationMessages:
|
case SourceMessages:
|
||||||
msgChannelValue := cmd.Value.(MsgChannelValue)
|
msgChannelValue := cmd.Value.(MsgChannelValue)
|
||||||
ctxMsgList := msgChannelValue.ctxMsgList
|
ctxMsgList := msgChannelValue.ctxMsgList
|
||||||
ctx := msgChannelValue.ctx
|
ctx := msgChannelValue.ctx
|
||||||
log.ZDebug(ctx, "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMsgList), "aggregationID", msgChannelValue.aggregationID)
|
log.ZDebug(ctx, "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMsgList), "sourceID", msgChannelValue.sourceID)
|
||||||
storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList, modifyMsgList := och.getPushStorageMsgList(msgChannelValue.aggregationID, ctxMsgList)
|
storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList, modifyMsgList := och.getPushStorageMsgList(msgChannelValue.sourceID, ctxMsgList)
|
||||||
och.handleMsg(ctx, msgChannelValue.aggregationID, storageMsgList, notStorageMsgList)
|
och.handleMsg(ctx, msgChannelValue.sourceID, storageMsgList, notStorageMsgList)
|
||||||
och.handleNotification(ctx, msgChannelValue.aggregationID, storageNotificationList, notStorageNotificationList)
|
och.handleNotification(ctx, msgChannelValue.sourceID, storageNotificationList, notStorageNotificationList)
|
||||||
if err := och.msgDatabase.MsgToModifyMQ(ctx, msgChannelValue.aggregationID, modifyMsgList); err != nil {
|
if err := och.msgDatabase.MsgToModifyMQ(ctx, msgChannelValue.sourceID, modifyMsgList); err != nil {
|
||||||
log.ZError(ctx, "msg to modify mq error", err, "aggregationID", msgChannelValue.aggregationID, "modifyMsgList", modifyMsgList)
|
log.ZError(ctx, "msg to modify mq error", err, "sourceID", msgChannelValue.sourceID, "modifyMsgList", modifyMsgList)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -101,13 +101,13 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 获取消息/通知 存储的消息列表, 不存储并且推送的消息列表,
|
// 获取消息/通知 存储的消息列表, 不存储并且推送的消息列表,
|
||||||
func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(aggregationID string, totalMsgs []*ContextMsg) (storageMsgList, notStorageMsgList, storageNotificatoinList, notStorageNotificationList, modifyMsgList []*pbMsg.MsgDataToMQ) {
|
func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(sourceID string, totalMsgs []*ContextMsg) (storageMsgList, notStorageMsgList, storageNotificatoinList, notStorageNotificationList, modifyMsgList []*pbMsg.MsgDataToMQ) {
|
||||||
isStorage := func(msg *pbMsg.MsgDataToMQ) bool {
|
isStorage := func(msg *pbMsg.MsgDataToMQ) bool {
|
||||||
options2 := utils.Options(msg.MsgData.Options)
|
options2 := utils.Options(msg.MsgData.Options)
|
||||||
if options2.IsHistory() {
|
if options2.IsHistory() {
|
||||||
return true
|
return true
|
||||||
} else {
|
} else {
|
||||||
if !(!options2.IsSenderSync() && aggregationID == msg.MsgData.SendID) {
|
if !(!options2.IsSenderSync() && sourceID == msg.MsgData.SendID) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -142,52 +142,52 @@ func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(aggregationI
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Context, aggregationID string, storageList, notStorageList []*pbMsg.MsgDataToMQ) {
|
func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Context, sourceID string, storageList, notStorageList []*pbMsg.MsgDataToMQ) {
|
||||||
och.toPushTopic(ctx, aggregationID, notStorageList)
|
och.toPushTopic(ctx, sourceID, notStorageList)
|
||||||
if len(storageList) > 0 {
|
if len(storageList) > 0 {
|
||||||
lastSeq, err := och.msgDatabase.NotificationBatchInsertChat2Cache(ctx, aggregationID, storageList)
|
lastSeq, err := och.msgDatabase.NotificationBatchInsertChat2Cache(ctx, sourceID, storageList)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.ZError(ctx, "notification batch insert to redis error", err, "aggregationID", aggregationID, "storageList", storageList)
|
log.ZError(ctx, "notification batch insert to redis error", err, "sourceID", sourceID, "storageList", storageList)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
och.msgDatabase.MsgToMongoMQ(ctx, aggregationID, storageList, lastSeq)
|
och.msgDatabase.MsgToMongoMQ(ctx, sourceID, storageList, lastSeq)
|
||||||
och.toPushTopic(ctx, aggregationID, storageList)
|
och.toPushTopic(ctx, sourceID, storageList)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, aggregationID string, msgs []*pbMsg.MsgDataToMQ) {
|
func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, sourceID string, msgs []*pbMsg.MsgDataToMQ) {
|
||||||
for _, v := range msgs {
|
for _, v := range msgs {
|
||||||
och.msgDatabase.MsgToPushMQ(ctx, aggregationID, v)
|
och.msgDatabase.MsgToPushMQ(ctx, sourceID, v)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, aggregationID string, storageList, notStorageList []*pbMsg.MsgDataToMQ) {
|
func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, sourceID string, storageList, notStorageList []*pbMsg.MsgDataToMQ) {
|
||||||
och.toPushTopic(ctx, aggregationID, notStorageList)
|
och.toPushTopic(ctx, sourceID, notStorageList)
|
||||||
if len(storageList) > 0 {
|
if len(storageList) > 0 {
|
||||||
var currentMaxSeq int64
|
var currentMaxSeq int64
|
||||||
var err error
|
var err error
|
||||||
if storageList[0].MsgData.SessionType == constant.SuperGroupChatType {
|
if storageList[0].MsgData.SessionType == constant.SuperGroupChatType {
|
||||||
currentMaxSeq, err = och.msgDatabase.GetGroupMaxSeq(ctx, aggregationID)
|
currentMaxSeq, err = och.msgDatabase.GetGroupMaxSeq(ctx, sourceID)
|
||||||
if err == redis.Nil {
|
if err == redis.Nil {
|
||||||
if err := och.GroupChatFirstCreateConversation(ctx, storageList[0].MsgData); err != nil {
|
if err := och.GroupChatFirstCreateConversation(ctx, storageList[0].MsgData); err != nil {
|
||||||
log.ZError(ctx, "single chat first create conversation error", err, "aggregationID", aggregationID)
|
log.ZError(ctx, "single chat first create conversation error", err, "sourceID", sourceID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
currentMaxSeq, err = och.msgDatabase.GetUserMaxSeq(ctx, aggregationID)
|
currentMaxSeq, err = och.msgDatabase.GetUserMaxSeq(ctx, sourceID)
|
||||||
if err == redis.Nil {
|
if err == redis.Nil {
|
||||||
if err := och.SingleChatFirstCreateConversation(ctx, storageList[0].MsgData); err != nil {
|
if err := och.SingleChatFirstCreateConversation(ctx, storageList[0].MsgData); err != nil {
|
||||||
log.ZError(ctx, "single chat first create conversation error", err, "aggregationID", aggregationID)
|
log.ZError(ctx, "single chat first create conversation error", err, "sourceID", sourceID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err != nil && err != redis.Nil {
|
if err != nil && err != redis.Nil {
|
||||||
prome.Inc(prome.SeqGetFailedCounter)
|
prome.Inc(prome.SeqGetFailedCounter)
|
||||||
log.ZError(ctx, "get max seq err", err, "aggregationID", aggregationID)
|
log.ZError(ctx, "get max seq err", err, "sourceID", sourceID)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
prome.Inc(prome.SeqGetSuccessCounter)
|
prome.Inc(prome.SeqGetSuccessCounter)
|
||||||
lastSeq, err := och.msgDatabase.BatchInsertChat2Cache(ctx, aggregationID, storageList, currentMaxSeq)
|
lastSeq, err := och.msgDatabase.BatchInsertChat2Cache(ctx, sourceID, storageList, currentMaxSeq)
|
||||||
if err != nil && err != redis.Nil {
|
if err != nil && err != redis.Nil {
|
||||||
log.ZError(ctx, "batch data insert to redis err", err, "storageMsgList", storageList)
|
log.ZError(ctx, "batch data insert to redis err", err, "storageMsgList", storageList)
|
||||||
och.singleMsgFailedCountMutex.Lock()
|
och.singleMsgFailedCountMutex.Lock()
|
||||||
@ -198,8 +198,8 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, agg
|
|||||||
och.singleMsgSuccessCountMutex.Lock()
|
och.singleMsgSuccessCountMutex.Lock()
|
||||||
och.singleMsgSuccessCount += uint64(len(storageList))
|
och.singleMsgSuccessCount += uint64(len(storageList))
|
||||||
och.singleMsgSuccessCountMutex.Unlock()
|
och.singleMsgSuccessCountMutex.Unlock()
|
||||||
och.msgDatabase.MsgToMongoMQ(ctx, aggregationID, storageList, lastSeq)
|
och.msgDatabase.MsgToMongoMQ(ctx, sourceID, storageList, lastSeq)
|
||||||
och.toPushTopic(ctx, aggregationID, storageList)
|
och.toPushTopic(ctx, sourceID, storageList)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -265,12 +265,12 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.ZDebug(ctx, "generate map list users len", "length", len(aggregationMsgs))
|
log.ZDebug(ctx, "generate map list users len", "length", len(aggregationMsgs))
|
||||||
for aggregationID, v := range aggregationMsgs {
|
for sourceID, v := range aggregationMsgs {
|
||||||
if len(v) >= 0 {
|
if len(v) >= 0 {
|
||||||
hashCode := utils.GetHashCode(aggregationID)
|
hashCode := utils.GetHashCode(sourceID)
|
||||||
channelID := hashCode % ChannelNum
|
channelID := hashCode % ChannelNum
|
||||||
log.ZDebug(ctx, "generate channelID", "hashCode", hashCode, "channelID", channelID, "aggregationID", aggregationID)
|
log.ZDebug(ctx, "generate channelID", "hashCode", hashCode, "channelID", channelID, "sourceID", sourceID)
|
||||||
och.chArrays[channelID] <- Cmd2Value{Cmd: AggregationMessages, Value: MsgChannelValue{aggregationID: aggregationID, ctxMsgList: v, ctx: ctx}}
|
och.chArrays[channelID] <- Cmd2Value{Cmd: AggregationMessages, Value: MsgChannelValue{sourceID: sourceID, ctxMsgList: v, ctx: ctx}}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -40,14 +40,14 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont
|
|||||||
log.Error("msg_transfer Unmarshal msg err", "", "msg", string(msg), "err", err.Error())
|
log.Error("msg_transfer Unmarshal msg err", "", "msg", string(msg), "err", err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
log.Info(operationID, "BatchInsertChat2DB userID: ", msgFromMQ.AggregationID, "msgFromMQ.LastSeq: ", msgFromMQ.LastSeq)
|
log.Info(operationID, "BatchInsertChat2DB userID: ", msgFromMQ.SourceID, "msgFromMQ.LastSeq: ", msgFromMQ.LastSeq)
|
||||||
err = mc.msgDatabase.BatchInsertChat2DB(ctx, msgFromMQ.AggregationID, msgFromMQ.Messages, msgFromMQ.LastSeq)
|
err = mc.msgDatabase.BatchInsertChat2DB(ctx, msgFromMQ.SourceID, msgFromMQ.Messages, msgFromMQ.LastSeq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.Messages, msgFromMQ.AggregationID, msgFromMQ.TriggerID)
|
log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.Messages, msgFromMQ.SourceID, msgFromMQ.TriggerID)
|
||||||
}
|
}
|
||||||
err = mc.msgDatabase.DeleteMessageFromCache(ctx, msgFromMQ.AggregationID, msgFromMQ.Messages)
|
err = mc.msgDatabase.DeleteMessageFromCache(ctx, msgFromMQ.SourceID, msgFromMQ.Messages)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.NewError(operationID, "remove cache msg from redis err", err.Error(), msgFromMQ.Messages, msgFromMQ.AggregationID, msgFromMQ.TriggerID)
|
log.NewError(operationID, "remove cache msg from redis err", err.Error(), msgFromMQ.Messages, msgFromMQ.SourceID, msgFromMQ.TriggerID)
|
||||||
}
|
}
|
||||||
for _, v := range msgFromMQ.Messages {
|
for _, v := range msgFromMQ.Messages {
|
||||||
if v.MsgData.ContentType == constant.DeleteMessageNotification {
|
if v.MsgData.ContentType == constant.DeleteMessageNotification {
|
||||||
|
@ -80,9 +80,9 @@ type MsgDatabase interface {
|
|||||||
GetGroupMinSeq(ctx context.Context, groupID string) (int64, error)
|
GetGroupMinSeq(ctx context.Context, groupID string) (int64, error)
|
||||||
|
|
||||||
MsgToMQ(ctx context.Context, key string, msg2mq *pbMsg.MsgDataToMQ) error
|
MsgToMQ(ctx context.Context, key string, msg2mq *pbMsg.MsgDataToMQ) error
|
||||||
MsgToModifyMQ(ctx context.Context, aggregationID string, messages []*pbMsg.MsgDataToMQ) error
|
MsgToModifyMQ(ctx context.Context, sourceID string, messages []*pbMsg.MsgDataToMQ) error
|
||||||
MsgToPushMQ(ctx context.Context, sourceID string, msg2mq *pbMsg.MsgDataToMQ) (int32, int64, error)
|
MsgToPushMQ(ctx context.Context, sourceID string, msg2mq *pbMsg.MsgDataToMQ) (int32, int64, error)
|
||||||
MsgToMongoMQ(ctx context.Context, aggregationID string, messages []*pbMsg.MsgDataToMQ, lastSeq int64) error
|
MsgToMongoMQ(ctx context.Context, sourceID string, messages []*pbMsg.MsgDataToMQ, lastSeq int64) error
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMsgDatabase(msgDocModel unRelationTb.MsgDocModelInterface, cacheModel cache.Model) MsgDatabase {
|
func NewMsgDatabase(msgDocModel unRelationTb.MsgDocModelInterface, cacheModel cache.Model) MsgDatabase {
|
||||||
@ -189,9 +189,9 @@ func (db *msgDatabase) MsgToMQ(ctx context.Context, key string, msg2mq *pbMsg.Ms
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *msgDatabase) MsgToModifyMQ(ctx context.Context, aggregationID string, messages []*pbMsg.MsgDataToMQ) error {
|
func (db *msgDatabase) MsgToModifyMQ(ctx context.Context, sourceID string, messages []*pbMsg.MsgDataToMQ) error {
|
||||||
if len(messages) > 0 {
|
if len(messages) > 0 {
|
||||||
_, _, err := db.producerToModify.SendMessage(ctx, aggregationID, &pbMsg.MsgDataToModifyByMQ{AggregationID: aggregationID, Messages: messages})
|
_, _, err := db.producerToModify.SendMessage(ctx, sourceID, &pbMsg.MsgDataToModifyByMQ{SourceID: sourceID, Messages: messages})
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@ -206,9 +206,9 @@ func (db *msgDatabase) MsgToPushMQ(ctx context.Context, key string, msg2mq *pbMs
|
|||||||
return partition, offset, err
|
return partition, offset, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *msgDatabase) MsgToMongoMQ(ctx context.Context, aggregationID string, messages []*pbMsg.MsgDataToMQ, lastSeq int64) error {
|
func (db *msgDatabase) MsgToMongoMQ(ctx context.Context, sourceID string, messages []*pbMsg.MsgDataToMQ, lastSeq int64) error {
|
||||||
if len(messages) > 0 {
|
if len(messages) > 0 {
|
||||||
_, _, err := db.producerToModify.SendMessage(ctx, aggregationID, &pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, AggregationID: aggregationID, Messages: messages})
|
_, _, err := db.producerToModify.SendMessage(ctx, sourceID, &pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, SourceID: sourceID, Messages: messages})
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
Loading…
x
Reference in New Issue
Block a user