This commit is contained in:
wangchuxiao 2023-05-26 11:11:48 +08:00
parent 5f783fa078
commit 1d441a9623
3 changed files with 31 additions and 12 deletions

View File

@ -448,6 +448,7 @@ func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, userID strin
log.ZDebug(ctx, "getMsgBySeqsRange", "conversationID", conversationID, "allSeqs", allSeqs, "begin", begin, "end", end) log.ZDebug(ctx, "getMsgBySeqsRange", "conversationID", conversationID, "allSeqs", allSeqs, "begin", begin, "end", end)
var totalNotExistSeqs []int64 var totalNotExistSeqs []int64
// mongo index // mongo index
var delSeqs []int64
for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, allSeqs) { for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, allSeqs) {
log.ZDebug(ctx, "getMsgBySeqsRange", "docID", docID, "seqs", seqs) log.ZDebug(ctx, "getMsgBySeqsRange", "docID", docID, "seqs", seqs)
msgs, notExistSeqs, err := db.findMsgInfoBySeq(ctx, docID, seqs) msgs, notExistSeqs, err := db.findMsgInfoBySeq(ctx, docID, seqs)
@ -456,26 +457,28 @@ func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, userID strin
} }
log.ZDebug(ctx, "getMsgBySeqsRange", "unExistSeqs", notExistSeqs, "msgs", len(msgs)) log.ZDebug(ctx, "getMsgBySeqsRange", "unExistSeqs", notExistSeqs, "msgs", len(msgs))
for _, msg := range msgs { for _, msg := range msgs {
if utils.IsContain(userID, msg.DelList) {
delSeqs = append(delSeqs, msg.Msg.Seq)
}
seqMsgs = append(seqMsgs, convert.MsgDB2Pb(msg.Msg)) seqMsgs = append(seqMsgs, convert.MsgDB2Pb(msg.Msg))
} }
totalNotExistSeqs = append(totalNotExistSeqs, notExistSeqs...) totalNotExistSeqs = append(totalNotExistSeqs, notExistSeqs...)
} }
log.ZDebug(ctx, "getMsgBySeqsRange", "totalNotExistSeqs", totalNotExistSeqs) log.ZDebug(ctx, "getMsgBySeqsRange", "totalNotExistSeqs", totalNotExistSeqs)
seqMsgs = append(seqMsgs, db.msg.GenExceptionMessageBySeqs(totalNotExistSeqs)...) seqMsgs = append(seqMsgs, db.msg.GenExceptionMessageBySeqs(totalNotExistSeqs)...)
var delSeqs []int64
for _, msg := range seqMsgs { for _, msg := range seqMsgs {
if msg.Status == constant.MsgDeleted { if msg.Status == constant.MsgDeleted {
delSeqs = append(delSeqs, msg.Seq) delSeqs = append(delSeqs, msg.Seq)
} }
} }
if len(delSeqs) > 0 { if len(delSeqs) > 0 {
msgs, err := db.refetchDelSeqsMsgs(ctx, conversationID, int64(len(delSeqs)), allSeqs[0], begin) // msgs, err := db.refetchDelSeqsMsgs(ctx, conversationID, int64(len(delSeqs)), allSeqs[0], begin)
if err != nil { // if err != nil {
log.ZWarn(ctx, "refetchDelSeqsMsgs", err, "delSeqs", delSeqs, "begin", begin) // log.ZWarn(ctx, "refetchDelSeqsMsgs", err, "delSeqs", delSeqs, "begin", begin)
} // }
for _, msg := range msgs { // for _, msg := range msgs {
seqMsgs = append(seqMsgs, convert.MsgDB2Pb(msg)) // seqMsgs = append(seqMsgs, convert.MsgDB2Pb(msg))
} // }
} }
// sort by seq // sort by seq
if len(totalNotExistSeqs) > 0 || len(delSeqs) > 0 { if len(totalNotExistSeqs) > 0 || len(delSeqs) > 0 {
@ -628,9 +631,9 @@ func (db *commonMsgDatabase) deleteMsgRecursion(ctx context.Context, conversatio
} }
if hasMarkDelFlag { if hasMarkDelFlag {
// mark del all delMsgIndexs // mark del all delMsgIndexs
// if err := db.msgDocDatabase.UpdateOneDoc(ctx, msgDocModel); err != nil { if err := db.msgDocDatabase.DeleteMsgsInOneDocByIndex(ctx, msgDocModel.DocID, delMsgIndexs); err != nil {
// return delStruct.getSetMinSeq(), err return delStruct.getSetMinSeq(), err
// } }
} }
return MsgInfoModel.Msg.Seq, nil return MsgInfoModel.Msg.Seq, nil
} }
@ -640,7 +643,6 @@ func (db *commonMsgDatabase) deleteMsgRecursion(ctx context.Context, conversatio
// 继续递归 index+1 // 继续递归 index+1
seq, err := db.deleteMsgRecursion(ctx, conversationID, index+1, delStruct, remainTime) seq, err := db.deleteMsgRecursion(ctx, conversationID, index+1, delStruct, remainTime)
return seq, err return seq, err
return 0, nil
} }
func (db *commonMsgDatabase) CleanUpUserConversationsMsgs(ctx context.Context, user string, conversationIDs []string) { func (db *commonMsgDatabase) CleanUpUserConversationsMsgs(ctx context.Context, user string, conversationIDs []string) {

View File

@ -78,6 +78,7 @@ type MsgDocModelInterface interface {
GetOldestMsg(ctx context.Context, conversationID string) (*MsgInfoModel, error) GetOldestMsg(ctx context.Context, conversationID string) (*MsgInfoModel, error)
DeleteDocs(ctx context.Context, docIDs []string) error DeleteDocs(ctx context.Context, docIDs []string) error
GetMsgDocModelByIndex(ctx context.Context, conversationID string, index, sort int64) (*MsgDocModel, error) GetMsgDocModelByIndex(ctx context.Context, conversationID string, index, sort int64) (*MsgDocModel, error)
DeleteMsgsInOneDocByIndex(ctx context.Context, docID string, indexes []int) error
} }
func (MsgDocModel) TableName() string { func (MsgDocModel) TableName() string {

View File

@ -154,6 +154,22 @@ func (m *MsgMongoDriver) GetOldestMsg(ctx context.Context, conversationID string
} }
} }
func (m *MsgMongoDriver) DeleteMsgsInOneDocByIndex(ctx context.Context, docID string, indexes []int) error {
updates := bson.M{
"$set": bson.M{},
}
for _, index := range indexes {
updates["$set"].(bson.M)[fmt.Sprintf("msgs.%d", index)] = bson.M{
"msg": nil,
}
}
_, err := m.MsgCollection.UpdateMany(ctx, bson.M{"doc_id": docID}, updates)
if err != nil {
return utils.Wrap(err, "")
}
return nil
}
func (m *MsgMongoDriver) DeleteDocs(ctx context.Context, docIDs []string) error { func (m *MsgMongoDriver) DeleteDocs(ctx context.Context, docIDs []string) error {
if docIDs == nil { if docIDs == nil {
return nil return nil