BatchInsertBlock

This commit is contained in:
withchao 2023-05-26 11:08:45 +08:00
parent b17037f8d5
commit 2401cd3525

View File

@ -28,6 +28,13 @@ import (
"go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo"
) )
const (
updateKeyMsg = iota
updateKeyRevoke
updateKeyDel
updateKeyRead
)
type CommonMsgDatabase interface { type CommonMsgDatabase interface {
// 批量插入消息 // 批量插入消息
BatchInsertChat2DB(ctx context.Context, conversationID string, msgs []*sdkws.MsgData, currentMaxSeq int64) error BatchInsertChat2DB(ctx context.Context, conversationID string, msgs []*sdkws.MsgData, currentMaxSeq int64) error
@ -141,14 +148,33 @@ func (db *commonMsgDatabase) MsgToMongoMQ(ctx context.Context, key, conversation
return nil return nil
} }
func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationID string, msgList []*unRelationTb.MsgInfoModel, firstSeq int64) error { func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationID string, fields []any, key int8, firstSeq int64) error {
if len(msgList) == 0 { if len(fields) == 0 {
return nil return nil
} }
num := db.msg.GetSingleGocMsgNum() num := db.msg.GetSingleGocMsgNum()
//num = 100 //num = 100
if msgList[0].Msg != nil { for i, field := range fields { // 检查类型
firstSeq = msgList[0].Msg.Seq var ok bool
switch key {
case updateKeyMsg:
var msg *unRelationTb.MsgDataModel
msg, ok = field.(*unRelationTb.MsgDataModel)
if msg != nil && msg.Seq != firstSeq+int64(i) {
return errs.ErrInternalServer.Wrap("seq is invalid")
}
case updateKeyRevoke:
_, ok = field.(*unRelationTb.RevokeModel)
case updateKeyDel:
_, ok = field.([]string)
case updateKeyRead:
_, ok = field.([]string)
default:
return errs.ErrInternalServer.Wrap("key is invalid")
}
if !ok {
return errs.ErrInternalServer.Wrap("field type is invalid")
}
} }
getDocID := func(seq int64) string { getDocID := func(seq int64) string {
return conversationID + ":" + strconv.FormatInt(seq/num, 10) return conversationID + ":" + strconv.FormatInt(seq/num, 10)
@ -157,21 +183,23 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI
return seq % num return seq % num
} }
// 返回值为true表示数据库存在该文档false表示数据库不存在该文档 // 返回值为true表示数据库存在该文档false表示数据库不存在该文档
updateMsgModel := func(docID string, index int64, msg *unRelationTb.MsgInfoModel) (bool, error) { updateMsgModel := func(seq int64, i int) (bool, error) {
var ( var (
res *mongo.UpdateResult res *mongo.UpdateResult
err error err error
) )
if msg.Msg != nil { docID := getDocID(seq)
res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "msg", msg.Msg) index := getIndex(seq)
} else if msg.Revoke != nil { field := fields[i]
res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "revoke", msg.Revoke) switch key {
} else if msg.DelList != nil { case updateKeyMsg:
res, err = db.msgDocDatabase.PushUnique(ctx, docID, index, "del_list", msg.DelList) res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "msg", field)
} else if msg.ReadList != nil { case updateKeyRevoke:
res, err = db.msgDocDatabase.PushUnique(ctx, docID, index, "read_list", msg.ReadList) res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "revoke", field)
} else { case updateKeyDel:
return false, errs.ErrArgs.Wrap("msg all field is nil") res, err = db.msgDocDatabase.PushUnique(ctx, docID, index, "del_list", field)
case updateKeyRead:
res, err = db.msgDocDatabase.PushUnique(ctx, docID, index, "read_list", field)
} }
if err != nil { if err != nil {
return false, err return false, err
@ -179,39 +207,52 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI
return res.MatchedCount > 0, nil return res.MatchedCount > 0, nil
} }
tryUpdate := true tryUpdate := true
for i := 0; i < len(msgList); i++ { for i := 0; i < len(fields); i++ {
msg := msgList[i] seq := firstSeq + int64(i) // 当前seq
seq := firstSeq + int64(i)
docID := getDocID(seq)
if tryUpdate { if tryUpdate {
matched, err := updateMsgModel(docID, getIndex(seq), msg) matched, err := updateMsgModel(seq, i)
if err != nil { if err != nil {
return err return err
} }
if matched { if matched {
continue continue // 匹配到了,继续下一个(不一定修改)
} }
} }
doc := unRelationTb.MsgDocModel{ doc := unRelationTb.MsgDocModel{
DocID: docID, DocID: getDocID(seq),
Msg: make([]*unRelationTb.MsgInfoModel, num), Msg: make([]*unRelationTb.MsgInfoModel, num),
} }
var insert int var insert int // 插入的数量
for j := i; j < len(msgList); j++ { for j := i; j < len(fields); j++ {
seq = firstSeq + int64(j) seq = firstSeq + int64(j)
if getDocID(seq) != docID { if getDocID(seq) != doc.DocID {
break break
} }
insert++ insert++
doc.Msg[getIndex(seq)] = msgList[j] switch key {
case updateKeyMsg:
doc.Msg[getIndex(seq)] = &unRelationTb.MsgInfoModel{
Msg: fields[j].(*unRelationTb.MsgDataModel),
}
case updateKeyRevoke:
doc.Msg[getIndex(seq)] = &unRelationTb.MsgInfoModel{
Revoke: fields[j].(*unRelationTb.RevokeModel),
}
case updateKeyDel:
doc.Msg[getIndex(seq)] = &unRelationTb.MsgInfoModel{
DelList: fields[j].([]string),
}
case updateKeyRead:
doc.Msg[getIndex(seq)] = &unRelationTb.MsgInfoModel{
ReadList: fields[j].([]string),
}
}
} }
for i, model := range doc.Msg { for i, model := range doc.Msg {
if model == nil { if model == nil {
doc.Msg[i] = &unRelationTb.MsgInfoModel{ model = &unRelationTb.MsgInfoModel{}
DelList: []string{}, doc.Msg[i] = model
ReadList: []string{},
} }
} else {
if model.DelList == nil { if model.DelList == nil {
doc.Msg[i].DelList = []string{} doc.Msg[i].DelList = []string{}
} }
@ -219,114 +260,65 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI
doc.Msg[i].ReadList = []string{} doc.Msg[i].ReadList = []string{}
} }
} }
}
if err := db.msgDocDatabase.Create(ctx, &doc); err != nil { if err := db.msgDocDatabase.Create(ctx, &doc); err != nil {
if mongo.IsDuplicateKeyError(err) { if mongo.IsDuplicateKeyError(err) {
i-- i-- // 存在并发,重试当前数据
tryUpdate = true tryUpdate = true // 以修改模式
continue continue
} }
return err return err
} }
tryUpdate = false tryUpdate = false // 当前以插入成功,下一块优先插入模式
i += insert - 1 i += insert - 1 // 跳过已插入的数据
} }
return nil return nil
} }
func (db *commonMsgDatabase) BatchInsertChat2DB(ctx context.Context, conversationID string, msgList []*sdkws.MsgData, currentMaxSeq int64) error { func (db *commonMsgDatabase) BatchInsertChat2DB(ctx context.Context, conversationID string, msgList []*sdkws.MsgData, currentMaxSeq int64) error {
//num := db.msg.GetSingleGocMsgNum() msgs := make([]any, len(msgList))
//currentIndex := currentMaxSeq / num for i, msg := range msgList {
//var blockMsgs []*[]*sdkws.MsgData if msg == nil {
//for i, data := range msgList { continue
// data.Seq = currentMaxSeq + int64(i+1) }
// index := data.Seq/num - currentIndex var offlinePushModel *unRelationTb.OfflinePushModel
// if i == 0 && index == 1 { if msg.OfflinePushInfo != nil {
// index-- offlinePushModel = &unRelationTb.OfflinePushModel{
// currentIndex++ Title: msg.OfflinePushInfo.Title,
// } Desc: msg.OfflinePushInfo.Desc,
// var block *[]*sdkws.MsgData Ex: msg.OfflinePushInfo.Ex,
// if len(blockMsgs) == int(index) { IOSPushSound: msg.OfflinePushInfo.IOSPushSound,
// var size int64 IOSBadgeCount: msg.OfflinePushInfo.IOSBadgeCount,
// if i == 0 { }
// size = num - data.Seq%num }
// } else { msgs[i] = &unRelationTb.MsgDataModel{
// temp := int64(len(msgList)-len(*blockMsgs[0])) - int64(len(blockMsgs)-1)*num SendID: msg.SendID,
// if temp >= num { RecvID: msg.RecvID,
// size = num GroupID: msg.GroupID,
// } else { ClientMsgID: msg.ClientMsgID,
// size = temp % num ServerMsgID: msg.ServerMsgID,
// } SenderPlatformID: msg.SenderPlatformID,
// } SenderNickname: msg.SenderNickname,
// temp := make([]*sdkws.MsgData, 0, size) SenderFaceURL: msg.SenderFaceURL,
// block = &temp SessionType: msg.SessionType,
// blockMsgs = append(blockMsgs, block) MsgFrom: msg.MsgFrom,
// } else { ContentType: msg.ContentType,
// block = blockMsgs[index] Content: string(msg.Content),
// } Seq: msg.Seq,
// *block = append(*block, msgList[i]) SendTime: msg.SendTime,
//} CreateTime: msg.CreateTime,
//create := currentMaxSeq == 0 || ((*blockMsgs[0])[0].Seq%num == 0) Status: msg.Status,
//if !create { Options: msg.Options,
// exist, err := db.msgDocDatabase.IsExistDocID(ctx, db.msg.IndexDocID(conversationID, currentIndex)) OfflinePush: offlinePushModel,
// if err != nil { AtUserIDList: msg.AtUserIDList,
// return err AttachedInfo: msg.AttachedInfo,
// } Ex: msg.Ex,
// create = !exist }
//} }
//for i, msgs := range blockMsgs { return db.BatchInsertBlock(ctx, conversationID, msgs, updateKeyMsg, currentMaxSeq-int64(len(msgList)))
// docID := db.msg.IndexDocID(conversationID, currentIndex+int64(i))
// if create || i != 0 { // 插入
// doc := unRelationTb.MsgDocModel{
// DocID: docID,
// Msg: make([]unRelationTb.MsgInfoModel, num),
// }
// for i := 0; i < len(doc.Msg); i++ {
// doc.Msg[i].ReadList = []string{}
// doc.Msg[i].DelList = []string{}
// }
// for _, msg := range *msgs {
// data, err := proto.Marshal(msg)
// if err != nil {
// return err
// }
// doc.Msg[msg.Seq%num] = unRelationTb.MsgInfoModel{
// SendTime: msg.SendTime,
// Msg: data,
// ReadList: []string{},
// DelList: []string{},
// }
// }
// if err := db.msgDocDatabase.Create(ctx, &doc); err != nil {
// prome.Inc(prome.MsgInsertMongoFailedCounter)
// return utils.Wrap(err, "")
// }
// prome.Inc(prome.MsgInsertMongoSuccessCounter)
// } else { // 修改
// for _, msg := range *msgs {
// data, err := proto.Marshal(msg)
// if err != nil {
// return err
// }
// info := unRelationTb.MsgInfoModel{
// SendTime: msg.SendTime,
// Msg: data,
// }
// if err := db.msgDocDatabase.UpdateMsg(ctx, docID, msg.Seq%num, &info); err != nil {
// prome.Inc(prome.MsgInsertMongoFailedCounter)
// return err
// }
// prome.Inc(prome.MsgInsertMongoSuccessCounter)
// }
// }
//}
return nil
} }
func (db *commonMsgDatabase) RevokeMsg(ctx context.Context, conversationID string, seq int64, revoke *unRelationTb.RevokeModel) error { func (db *commonMsgDatabase) RevokeMsg(ctx context.Context, conversationID string, seq int64, revoke *unRelationTb.RevokeModel) error {
msgs := []*unRelationTb.MsgInfoModel{{Revoke: revoke}} return db.BatchInsertBlock(ctx, conversationID, []any{revoke}, updateKeyRevoke, seq)
return db.BatchInsertBlock(ctx, conversationID, msgs, seq)
//return db.msgDocDatabase.UpdateMsgContent(ctx, docID, seq%db.msg.GetSingleGocMsgNum(), msg)
} }
func (db *commonMsgDatabase) DeleteMessageFromCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) error { func (db *commonMsgDatabase) DeleteMessageFromCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) error {