This commit is contained in:
wangchuxiao 2022-05-25 15:56:39 +08:00
parent 6022aff965
commit 841566a5d1
3 changed files with 41 additions and 20 deletions

View File

@ -91,10 +91,11 @@ func (rpc *rpcChat) runCh() {
for { for {
select { select {
case msg := <-rpc.delMsgCh: case msg := <-rpc.delMsgCh:
if err := db.DB.DelMsgBySeqList(msg.UserID, msg.SeqList, msg.OperationID); err != nil { if unexistSeqList, err := db.DB.DelMsgBySeqList(msg.UserID, msg.SeqList, msg.OperationID); err != nil {
log.NewError(msg.OperationID, utils.GetSelfFuncName(), "DelMsgBySeqList qrgs: ", msg.UserID, msg.SeqList, msg.OperationID, err.Error()) log.NewError(msg.OperationID, utils.GetSelfFuncName(), "DelMsgBySeqList args: ", msg.UserID, msg.SeqList, msg.OperationID, err.Error())
DeleteMessageNotification(msg.OpUserID, msg.UserID, unexistSeqList, msg.OperationID)
} }
db.DataBases.DelMsgFromCache(msg.SeqList, msg.UserID, msg.OperationID) db.DB.DelMsgFromCache(msg.UserID, msg.SeqList, msg.OperationID)
} }
} }
} }

View File

@ -85,7 +85,7 @@ func (d *DataBases) GetMinSeqFromMongo2(uid string) (MinSeq uint32, err error) {
} }
// deleteMsgByLogic // deleteMsgByLogic
func (d *DataBases) DelMsgBySeqList(userID string, seqList []uint32, operationID string) (err error) { func (d *DataBases) DelMsgBySeqList(userID string, seqList []uint32, operationID string) (totalUnexistSeqList []uint32, err error) {
log.Debug(operationID, utils.GetSelfFuncName(), "args ", userID, seqList) log.Debug(operationID, utils.GetSelfFuncName(), "args ", userID, seqList)
sortkeys.Uint32s(seqList) sortkeys.Uint32s(seqList)
suffixUserID2SubSeqList := func(uid string, seqList []uint32) map[string][]uint32 { suffixUserID2SubSeqList := func(uid string, seqList []uint32) map[string][]uint32 {
@ -102,33 +102,38 @@ func (d *DataBases) DelMsgBySeqList(userID string, seqList []uint32, operationID
return t return t
}(userID, seqList) }(userID, seqList)
lock := sync.Mutex{}
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(len(suffixUserID2SubSeqList)) wg.Add(len(suffixUserID2SubSeqList))
for k, v := range suffixUserID2SubSeqList { for k, v := range suffixUserID2SubSeqList {
go func(suffixUserID string, subSeqList []uint32, operationID string) { go func(suffixUserID string, subSeqList []uint32, operationID string) {
if e := d.DelMsgBySeqListInOneDoc(suffixUserID, subSeqList, operationID); e != nil { defer wg.Done()
unexistSeqList, err := d.DelMsgBySeqListInOneDoc(suffixUserID, subSeqList, operationID)
if err != nil {
log.Error(operationID, "DelMsgBySeqListInOneDoc failed ", e.Error(), suffixUserID, subSeqList) log.Error(operationID, "DelMsgBySeqListInOneDoc failed ", e.Error(), suffixUserID, subSeqList)
err = e return
} }
lock.Lock()
totalUnexistSeqList = append(totalUnexistSeqList, unexistSeqList...)
lock.Unlock()
wg.Done() wg.Done()
}(k, v, operationID) }(k, v, operationID)
} }
wg.Wait() return totalUnexistSeqList, err
return err
} }
func (d *DataBases) DelMsgBySeqListInOneDoc(suffixUserID string, seqList []uint32, operationID string) error { func (d *DataBases) DelMsgBySeqListInOneDoc(suffixUserID string, seqList []uint32, operationID string) ([]uint32, error) {
log.Debug(operationID, utils.GetSelfFuncName(), "args ", suffixUserID, seqList) log.Debug(operationID, utils.GetSelfFuncName(), "args ", suffixUserID, seqList)
seqMsgList, indexList, err := d.GetMsgAndIndexBySeqListInOneMongo2(suffixUserID, seqList, operationID) seqMsgList, indexList, unexistSeqList, err := d.GetMsgAndIndexBySeqListInOneMongo2(suffixUserID, seqList, operationID)
if err != nil { if err != nil {
return utils.Wrap(err, "") return nil, utils.Wrap(err, "")
} }
for i, v := range seqMsgList { for i, v := range seqMsgList {
if err := d.ReplaceMsgByIndex(suffixUserID, v, operationID, indexList[i]); err != nil { if err := d.ReplaceMsgByIndex(suffixUserID, v, operationID, indexList[i]); err != nil {
return utils.Wrap(err, "") return nil, utils.Wrap(err, "")
} }
} }
return nil return unexistSeqList, nil
} }
// deleteMsgByLogic // deleteMsgByLogic
@ -302,13 +307,13 @@ func (d *DataBases) GetMsgBySeqListMongo2(uid string, seqList []uint32, operatio
return seqMsg, nil return seqMsg, nil
} }
func (d *DataBases) GetMsgAndIndexBySeqListInOneMongo2(suffixUserID string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, indexList []int, err error) { func (d *DataBases) GetMsgAndIndexBySeqListInOneMongo2(suffixUserID string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, indexList []int, unexistSeqList []uint32, err error) {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)
sChat := UserChat{} sChat := UserChat{}
if err = c.FindOne(ctx, bson.M{"uid": suffixUserID}).Decode(&sChat); err != nil { if err = c.FindOne(ctx, bson.M{"uid": suffixUserID}).Decode(&sChat); err != nil {
log.NewError(operationID, "not find seqUid", suffixUserID, err.Error()) log.NewError(operationID, "not find seqUid", suffixUserID, err.Error())
return nil, nil, utils.Wrap(err, "") return nil, nil, nil, utils.Wrap(err, "")
} }
singleCount := 0 singleCount := 0
var hasSeqList []uint32 var hasSeqList []uint32
@ -316,7 +321,7 @@ func (d *DataBases) GetMsgAndIndexBySeqListInOneMongo2(suffixUserID string, seqL
msg := new(open_im_sdk.MsgData) msg := new(open_im_sdk.MsgData)
if err = proto.Unmarshal(sChat.Msg[i].Msg, msg); err != nil { if err = proto.Unmarshal(sChat.Msg[i].Msg, msg); err != nil {
log.NewError(operationID, "Unmarshal err", msg.String(), err.Error()) log.NewError(operationID, "Unmarshal err", msg.String(), err.Error())
return nil, nil, err return nil, nil, nil, err
} }
if isContainInt32(msg.Seq, seqList) { if isContainInt32(msg.Seq, seqList) {
indexList = append(indexList, i) indexList = append(indexList, i)
@ -328,8 +333,13 @@ func (d *DataBases) GetMsgAndIndexBySeqListInOneMongo2(suffixUserID string, seqL
} }
} }
} }
for _, i := range seqList {
return seqMsg, indexList, nil if isContainInt32(i, hasSeqList) {
continue
}
unexistSeqList = append(unexistSeqList, i)
}
return seqMsg, indexList, unexistSeqList, nil
} }
func genExceptionMessageBySeqList(seqList []uint32) (exceptionMsg []*open_im_sdk.MsgData) { func genExceptionMessageBySeqList(seqList []uint32) (exceptionMsg []*open_im_sdk.MsgData) {
@ -915,8 +925,17 @@ func isContainInt32(target uint32, List []uint32) bool {
} }
} }
return false return false
} }
func isNotContainInt32(target uint32, List []uint32) bool {
for _, i := range List {
if i == target {
return false
}
}
return true
}
func indexGen(uid string, seqSuffix uint32) string { func indexGen(uid string, seqSuffix uint32) string {
return uid + ":" + strconv.FormatInt(int64(seqSuffix), 10) return uid + ":" + strconv.FormatInt(int64(seqSuffix), 10)
} }

View File

@ -314,7 +314,7 @@ func (d *DataBases) SetMessageToCache(msgList []*pbChat.MsgDataToMQ, uid string,
return nil return nil
} }
func (d *DataBases) DelMsgFromCache(seqList []uint32, uid, operationID string) { func (d *DataBases) DelMsgFromCache(uid string, seqList []uint32, operationID string) {
for _, seq := range seqList { for _, seq := range seqList {
key := messageCache + uid + "_" + strconv.Itoa(int(seq)) key := messageCache + uid + "_" + strconv.Itoa(int(seq))
result, err := redis.String(d.Exec("GET", key)) result, err := redis.String(d.Exec("GET", key))
@ -325,6 +325,7 @@ func (d *DataBases) DelMsgFromCache(seqList []uint32, uid, operationID string) {
log2.Debug(operationID, utils.GetSelfFuncName(), "del result", result) log2.Debug(operationID, utils.GetSelfFuncName(), "del result", result)
var msg pbCommon.MsgData var msg pbCommon.MsgData
err = utils.String2Pb(result, &msg) err = utils.String2Pb(result, &msg)
log2.NewDebug(operationID, utils.GetSelfFuncName(), "msg", msg)
if err != nil { if err != nil {
log2.NewWarn(operationID, utils.GetSelfFuncName(), "string2Pb failed", msg, err.Error()) log2.NewWarn(operationID, utils.GetSelfFuncName(), "string2Pb failed", msg, err.Error())
continue continue