mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-11-04 11:22:10 +08:00 
			
		
		
		
	* refactor: refactor workflows contents. * add tool workflows. * update field. * fix: remove chat error. * Fix err. * fix error. * remove cn comment. * update workflows files. * update infra config. * move workflows. * feat: update bot. * fix: solve uncorrect outdated msg get. * update get docIDs logic. * update * update skip logic. * fix * update. * fix: delay deleteObject func. * remove unused content. * update log type. * feat: implement request batch count limit. * update * update * refactor: improve db structure in `storage/controller`
		
			
				
	
	
		
			287 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			287 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package controller
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
 | 
						|
	"github.com/openimsdk/open-im-server/v3/pkg/common/config"
 | 
						|
	"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
 | 
						|
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
 | 
						|
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
 | 
						|
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
 | 
						|
	pbmsg "github.com/openimsdk/protocol/msg"
 | 
						|
	"github.com/openimsdk/protocol/sdkws"
 | 
						|
	"github.com/openimsdk/tools/errs"
 | 
						|
	"github.com/openimsdk/tools/log"
 | 
						|
	"github.com/openimsdk/tools/mq/kafka"
 | 
						|
	"go.mongodb.org/mongo-driver/mongo"
 | 
						|
)
 | 
						|
 | 
						|
type MsgTransferDatabase interface {
 | 
						|
	// BatchInsertChat2DB inserts a batch of messages into the database for a specific conversation.
 | 
						|
	BatchInsertChat2DB(ctx context.Context, conversationID string, msgs []*sdkws.MsgData, currentMaxSeq int64) error
 | 
						|
	// DeleteMessagesFromCache deletes message caches from Redis by sequence numbers.
 | 
						|
	DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error
 | 
						|
 | 
						|
	// BatchInsertChat2Cache increments the sequence number and then batch inserts messages into the cache.
 | 
						|
	BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNewConversation bool, err error)
 | 
						|
	SetHasReadSeqToDB(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error
 | 
						|
 | 
						|
	// to mq
 | 
						|
	MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error)
 | 
						|
	MsgToMongoMQ(ctx context.Context, key, conversationID string, msgs []*sdkws.MsgData, lastSeq int64) error
 | 
						|
}
 | 
						|
 | 
						|
func NewMsgTransferDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser cache.SeqUser, seqConversation cache.SeqConversationCache, kafkaConf *config.Kafka) (MsgTransferDatabase, error) {
 | 
						|
	conf, err := kafka.BuildProducerConfig(*kafkaConf.Build())
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	producerToMongo, err := kafka.NewKafkaProducer(conf, kafkaConf.Address, kafkaConf.ToMongoTopic)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	producerToPush, err := kafka.NewKafkaProducer(conf, kafkaConf.Address, kafkaConf.ToPushTopic)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	return &msgTransferDatabase{
 | 
						|
		msgDocDatabase:  msgDocModel,
 | 
						|
		msg:             msg,
 | 
						|
		seqUser:         seqUser,
 | 
						|
		seqConversation: seqConversation,
 | 
						|
		producerToMongo: producerToMongo,
 | 
						|
		producerToPush:  producerToPush,
 | 
						|
	}, nil
 | 
						|
}
 | 
						|
 | 
						|
type msgTransferDatabase struct {
 | 
						|
	msgDocDatabase  database.Msg
 | 
						|
	msgTable        model.MsgDocModel
 | 
						|
	msg             cache.MsgCache
 | 
						|
	seqConversation cache.SeqConversationCache
 | 
						|
	seqUser         cache.SeqUser
 | 
						|
	producerToMongo *kafka.Producer
 | 
						|
	producerToPush  *kafka.Producer
 | 
						|
}
 | 
						|
 | 
						|
func (db *msgTransferDatabase) BatchInsertChat2DB(ctx context.Context, conversationID string, msgList []*sdkws.MsgData, currentMaxSeq int64) error {
 | 
						|
	if len(msgList) == 0 {
 | 
						|
		return errs.ErrArgs.WrapMsg("msgList is empty")
 | 
						|
	}
 | 
						|
	msgs := make([]any, len(msgList))
 | 
						|
	for i, msg := range msgList {
 | 
						|
		if msg == nil {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		var offlinePushModel *model.OfflinePushModel
 | 
						|
		if msg.OfflinePushInfo != nil {
 | 
						|
			offlinePushModel = &model.OfflinePushModel{
 | 
						|
				Title:         msg.OfflinePushInfo.Title,
 | 
						|
				Desc:          msg.OfflinePushInfo.Desc,
 | 
						|
				Ex:            msg.OfflinePushInfo.Ex,
 | 
						|
				IOSPushSound:  msg.OfflinePushInfo.IOSPushSound,
 | 
						|
				IOSBadgeCount: msg.OfflinePushInfo.IOSBadgeCount,
 | 
						|
			}
 | 
						|
		}
 | 
						|
		msgs[i] = &model.MsgDataModel{
 | 
						|
			SendID:           msg.SendID,
 | 
						|
			RecvID:           msg.RecvID,
 | 
						|
			GroupID:          msg.GroupID,
 | 
						|
			ClientMsgID:      msg.ClientMsgID,
 | 
						|
			ServerMsgID:      msg.ServerMsgID,
 | 
						|
			SenderPlatformID: msg.SenderPlatformID,
 | 
						|
			SenderNickname:   msg.SenderNickname,
 | 
						|
			SenderFaceURL:    msg.SenderFaceURL,
 | 
						|
			SessionType:      msg.SessionType,
 | 
						|
			MsgFrom:          msg.MsgFrom,
 | 
						|
			ContentType:      msg.ContentType,
 | 
						|
			Content:          string(msg.Content),
 | 
						|
			Seq:              msg.Seq,
 | 
						|
			SendTime:         msg.SendTime,
 | 
						|
			CreateTime:       msg.CreateTime,
 | 
						|
			Status:           msg.Status,
 | 
						|
			Options:          msg.Options,
 | 
						|
			OfflinePush:      offlinePushModel,
 | 
						|
			AtUserIDList:     msg.AtUserIDList,
 | 
						|
			AttachedInfo:     msg.AttachedInfo,
 | 
						|
			Ex:               msg.Ex,
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return db.BatchInsertBlock(ctx, conversationID, msgs, updateKeyMsg, msgList[0].Seq)
 | 
						|
}
 | 
						|
 | 
						|
func (db *msgTransferDatabase) BatchInsertBlock(ctx context.Context, conversationID string, fields []any, key int8, firstSeq int64) error {
 | 
						|
	if len(fields) == 0 {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	num := db.msgTable.GetSingleGocMsgNum()
 | 
						|
	// num = 100
 | 
						|
	for i, field := range fields { // Check the type of the field
 | 
						|
		var ok bool
 | 
						|
		switch key {
 | 
						|
		case updateKeyMsg:
 | 
						|
			var msg *model.MsgDataModel
 | 
						|
			msg, ok = field.(*model.MsgDataModel)
 | 
						|
			if msg != nil && msg.Seq != firstSeq+int64(i) {
 | 
						|
				return errs.ErrInternalServer.WrapMsg("seq is invalid")
 | 
						|
			}
 | 
						|
		case updateKeyRevoke:
 | 
						|
			_, ok = field.(*model.RevokeModel)
 | 
						|
		default:
 | 
						|
			return errs.ErrInternalServer.WrapMsg("key is invalid")
 | 
						|
		}
 | 
						|
		if !ok {
 | 
						|
			return errs.ErrInternalServer.WrapMsg("field type is invalid")
 | 
						|
		}
 | 
						|
	}
 | 
						|
	// Returns true if the document exists in the database, false if the document does not exist in the database
 | 
						|
	updateMsgModel := func(seq int64, i int) (bool, error) {
 | 
						|
		var (
 | 
						|
			res *mongo.UpdateResult
 | 
						|
			err error
 | 
						|
		)
 | 
						|
		docID := db.msgTable.GetDocID(conversationID, seq)
 | 
						|
		index := db.msgTable.GetMsgIndex(seq)
 | 
						|
		field := fields[i]
 | 
						|
		switch key {
 | 
						|
		case updateKeyMsg:
 | 
						|
			res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "msg", field)
 | 
						|
		case updateKeyRevoke:
 | 
						|
			res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "revoke", field)
 | 
						|
		}
 | 
						|
		if err != nil {
 | 
						|
			return false, err
 | 
						|
		}
 | 
						|
		return res.MatchedCount > 0, nil
 | 
						|
	}
 | 
						|
	tryUpdate := true
 | 
						|
	for i := 0; i < len(fields); i++ {
 | 
						|
		seq := firstSeq + int64(i) // Current sequence number
 | 
						|
		if tryUpdate {
 | 
						|
			matched, err := updateMsgModel(seq, i)
 | 
						|
			if err != nil {
 | 
						|
				return err
 | 
						|
			}
 | 
						|
			if matched {
 | 
						|
				continue // The current data has been updated, skip the current data
 | 
						|
			}
 | 
						|
		}
 | 
						|
		doc := model.MsgDocModel{
 | 
						|
			DocID: db.msgTable.GetDocID(conversationID, seq),
 | 
						|
			Msg:   make([]*model.MsgInfoModel, num),
 | 
						|
		}
 | 
						|
		var insert int // Inserted data number
 | 
						|
		for j := i; j < len(fields); j++ {
 | 
						|
			seq = firstSeq + int64(j)
 | 
						|
			if db.msgTable.GetDocID(conversationID, seq) != doc.DocID {
 | 
						|
				break
 | 
						|
			}
 | 
						|
			insert++
 | 
						|
			switch key {
 | 
						|
			case updateKeyMsg:
 | 
						|
				doc.Msg[db.msgTable.GetMsgIndex(seq)] = &model.MsgInfoModel{
 | 
						|
					Msg: fields[j].(*model.MsgDataModel),
 | 
						|
				}
 | 
						|
			case updateKeyRevoke:
 | 
						|
				doc.Msg[db.msgTable.GetMsgIndex(seq)] = &model.MsgInfoModel{
 | 
						|
					Revoke: fields[j].(*model.RevokeModel),
 | 
						|
				}
 | 
						|
			}
 | 
						|
		}
 | 
						|
		for i, msgInfo := range doc.Msg {
 | 
						|
			if msgInfo == nil {
 | 
						|
				msgInfo = &model.MsgInfoModel{}
 | 
						|
				doc.Msg[i] = msgInfo
 | 
						|
			}
 | 
						|
			if msgInfo.DelList == nil {
 | 
						|
				doc.Msg[i].DelList = []string{}
 | 
						|
			}
 | 
						|
		}
 | 
						|
		if err := db.msgDocDatabase.Create(ctx, &doc); err != nil {
 | 
						|
			if mongo.IsDuplicateKeyError(err) {
 | 
						|
				i--              // already inserted
 | 
						|
				tryUpdate = true // next block use update mode
 | 
						|
				continue
 | 
						|
			}
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		tryUpdate = false // The current block is inserted successfully, and the next block is inserted preferentially
 | 
						|
		i += insert - 1   // Skip the inserted data
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (db *msgTransferDatabase) DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error {
 | 
						|
	return db.msg.DeleteMessagesFromCache(ctx, conversationID, seqs)
 | 
						|
}
 | 
						|
 | 
						|
func (db *msgTransferDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, err error) {
 | 
						|
	lenList := len(msgs)
 | 
						|
	if int64(lenList) > db.msgTable.GetSingleGocMsgNum() {
 | 
						|
		return 0, false, errs.New("message count exceeds limit", "limit", db.msgTable.GetSingleGocMsgNum()).Wrap()
 | 
						|
	}
 | 
						|
	if lenList < 1 {
 | 
						|
		return 0, false, errs.New("no messages to insert", "minCount", 1).Wrap()
 | 
						|
	}
 | 
						|
	currentMaxSeq, err := db.seqConversation.Malloc(ctx, conversationID, int64(len(msgs)))
 | 
						|
	if err != nil {
 | 
						|
		log.ZError(ctx, "storage.seq.Malloc", err)
 | 
						|
		return 0, false, err
 | 
						|
	}
 | 
						|
	isNew = currentMaxSeq == 0
 | 
						|
	lastMaxSeq := currentMaxSeq
 | 
						|
	userSeqMap := make(map[string]int64)
 | 
						|
	for _, m := range msgs {
 | 
						|
		currentMaxSeq++
 | 
						|
		m.Seq = currentMaxSeq
 | 
						|
		userSeqMap[m.SendID] = m.Seq
 | 
						|
	}
 | 
						|
 | 
						|
	failedNum, err := db.msg.SetMessagesToCache(ctx, conversationID, msgs)
 | 
						|
	if err != nil {
 | 
						|
		prommetrics.MsgInsertRedisFailedCounter.Add(float64(failedNum))
 | 
						|
		log.ZError(ctx, "setMessageToCache error", err, "len", len(msgs), "conversationID", conversationID)
 | 
						|
	} else {
 | 
						|
		prommetrics.MsgInsertRedisSuccessCounter.Inc()
 | 
						|
	}
 | 
						|
	err = db.setHasReadSeqs(ctx, conversationID, userSeqMap)
 | 
						|
	if err != nil {
 | 
						|
		log.ZError(ctx, "SetHasReadSeqs error", err, "userSeqMap", userSeqMap, "conversationID", conversationID)
 | 
						|
		prommetrics.SeqSetFailedCounter.Inc()
 | 
						|
	}
 | 
						|
	return lastMaxSeq, isNew, errs.Wrap(err)
 | 
						|
}
 | 
						|
 | 
						|
func (db *msgTransferDatabase) setHasReadSeqs(ctx context.Context, conversationID string, userSeqMap map[string]int64) error {
 | 
						|
	for userID, seq := range userSeqMap {
 | 
						|
		if err := db.seqUser.SetUserReadSeq(ctx, conversationID, userID, seq); err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (db *msgTransferDatabase) SetHasReadSeqToDB(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error {
 | 
						|
	return db.seqUser.SetUserReadSeqToDB(ctx, conversationID, userID, hasReadSeq)
 | 
						|
}
 | 
						|
 | 
						|
func (db *msgTransferDatabase) MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error) {
 | 
						|
	partition, offset, err := db.producerToPush.SendMessage(ctx, key, &pbmsg.PushMsgDataToMQ{MsgData: msg2mq, ConversationID: conversationID})
 | 
						|
	if err != nil {
 | 
						|
		log.ZError(ctx, "MsgToPushMQ", err, "key", key, "msg2mq", msg2mq)
 | 
						|
		return 0, 0, err
 | 
						|
	}
 | 
						|
	return partition, offset, nil
 | 
						|
}
 | 
						|
 | 
						|
func (db *msgTransferDatabase) MsgToMongoMQ(ctx context.Context, key, conversationID string, messages []*sdkws.MsgData, lastSeq int64) error {
 | 
						|
	if len(messages) > 0 {
 | 
						|
		_, _, err := db.producerToMongo.SendMessage(ctx, key, &pbmsg.MsgDataToMongoByMQ{LastSeq: lastSeq, ConversationID: conversationID, MsgData: messages})
 | 
						|
		if err != nil {
 | 
						|
			log.ZError(ctx, "MsgToMongoMQ", err, "key", key, "conversationID", conversationID, "lastSeq", lastSeq)
 | 
						|
			return err
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 |