diff --git a/internal/rpc/msg/rpcChat.go b/internal/rpc/msg/rpcChat.go index 1dee643c7..59b36f65d 100644 --- a/internal/rpc/msg/rpcChat.go +++ b/internal/rpc/msg/rpcChat.go @@ -91,10 +91,11 @@ func (rpc *rpcChat) runCh() { for { select { case msg := <-rpc.delMsgCh: - if err := db.DB.DelMsgBySeqList(msg.UserID, msg.SeqList, msg.OperationID); err != nil { - log.NewError(msg.OperationID, utils.GetSelfFuncName(), "DelMsgBySeqList qrgs: ", msg.UserID, msg.SeqList, msg.OperationID, err.Error()) + if unexistSeqList, err := db.DB.DelMsgBySeqList(msg.UserID, msg.SeqList, msg.OperationID); err != nil { + log.NewError(msg.OperationID, utils.GetSelfFuncName(), "DelMsgBySeqList args: ", msg.UserID, msg.SeqList, msg.OperationID, err.Error()) + DeleteMessageNotification(msg.OpUserID, msg.UserID, unexistSeqList, msg.OperationID) } - db.DataBases.DelMsgFromCache(msg.SeqList, msg.UserID, msg.OperationID) + db.DB.DelMsgFromCache(msg.UserID, msg.SeqList, msg.OperationID) } } } diff --git a/pkg/common/db/mongoModel.go b/pkg/common/db/mongoModel.go index 1e824c11b..0152941d3 100644 --- a/pkg/common/db/mongoModel.go +++ b/pkg/common/db/mongoModel.go @@ -85,7 +85,7 @@ func (d *DataBases) GetMinSeqFromMongo2(uid string) (MinSeq uint32, err error) { } // deleteMsgByLogic -func (d *DataBases) DelMsgBySeqList(userID string, seqList []uint32, operationID string) (err error) { +func (d *DataBases) DelMsgBySeqList(userID string, seqList []uint32, operationID string) (totalUnexistSeqList []uint32, err error) { log.Debug(operationID, utils.GetSelfFuncName(), "args ", userID, seqList) sortkeys.Uint32s(seqList) suffixUserID2SubSeqList := func(uid string, seqList []uint32) map[string][]uint32 { @@ -102,33 +102,38 @@ func (d *DataBases) DelMsgBySeqList(userID string, seqList []uint32, operationID return t }(userID, seqList) + lock := sync.Mutex{} var wg sync.WaitGroup wg.Add(len(suffixUserID2SubSeqList)) for k, v := range suffixUserID2SubSeqList { go func(suffixUserID string, subSeqList []uint32, operationID string) { - if e := d.DelMsgBySeqListInOneDoc(suffixUserID, subSeqList, operationID); e != nil { + defer wg.Done() + unexistSeqList, err := d.DelMsgBySeqListInOneDoc(suffixUserID, subSeqList, operationID) + if err != nil { log.Error(operationID, "DelMsgBySeqListInOneDoc failed ", e.Error(), suffixUserID, subSeqList) - err = e + return } + lock.Lock() + totalUnexistSeqList = append(totalUnexistSeqList, unexistSeqList...) + lock.Unlock() wg.Done() }(k, v, operationID) } - wg.Wait() - return err + return totalUnexistSeqList, err } -func (d *DataBases) DelMsgBySeqListInOneDoc(suffixUserID string, seqList []uint32, operationID string) error { +func (d *DataBases) DelMsgBySeqListInOneDoc(suffixUserID string, seqList []uint32, operationID string) ([]uint32, error) { log.Debug(operationID, utils.GetSelfFuncName(), "args ", suffixUserID, seqList) - seqMsgList, indexList, err := d.GetMsgAndIndexBySeqListInOneMongo2(suffixUserID, seqList, operationID) + seqMsgList, indexList, unexistSeqList, err := d.GetMsgAndIndexBySeqListInOneMongo2(suffixUserID, seqList, operationID) if err != nil { - return utils.Wrap(err, "") + return nil, utils.Wrap(err, "") } for i, v := range seqMsgList { if err := d.ReplaceMsgByIndex(suffixUserID, v, operationID, indexList[i]); err != nil { - return utils.Wrap(err, "") + return nil, utils.Wrap(err, "") } } - return nil + return unexistSeqList, nil } // deleteMsgByLogic @@ -302,13 +307,13 @@ func (d *DataBases) GetMsgBySeqListMongo2(uid string, seqList []uint32, operatio return seqMsg, nil } -func (d *DataBases) GetMsgAndIndexBySeqListInOneMongo2(suffixUserID string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, indexList []int, err error) { +func (d *DataBases) GetMsgAndIndexBySeqListInOneMongo2(suffixUserID string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, indexList []int, unexistSeqList []uint32, err error) { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) sChat := UserChat{} if err = c.FindOne(ctx, bson.M{"uid": suffixUserID}).Decode(&sChat); err != nil { log.NewError(operationID, "not find seqUid", suffixUserID, err.Error()) - return nil, nil, utils.Wrap(err, "") + return nil, nil, nil, utils.Wrap(err, "") } singleCount := 0 var hasSeqList []uint32 @@ -316,7 +321,7 @@ func (d *DataBases) GetMsgAndIndexBySeqListInOneMongo2(suffixUserID string, seqL msg := new(open_im_sdk.MsgData) if err = proto.Unmarshal(sChat.Msg[i].Msg, msg); err != nil { log.NewError(operationID, "Unmarshal err", msg.String(), err.Error()) - return nil, nil, err + return nil, nil, nil, err } if isContainInt32(msg.Seq, seqList) { indexList = append(indexList, i) @@ -328,8 +333,13 @@ func (d *DataBases) GetMsgAndIndexBySeqListInOneMongo2(suffixUserID string, seqL } } } - - return seqMsg, indexList, nil + for _, i := range seqList { + if isContainInt32(i, hasSeqList) { + continue + } + unexistSeqList = append(unexistSeqList, i) + } + return seqMsg, indexList, unexistSeqList, nil } func genExceptionMessageBySeqList(seqList []uint32) (exceptionMsg []*open_im_sdk.MsgData) { @@ -915,8 +925,17 @@ func isContainInt32(target uint32, List []uint32) bool { } } return false - } + +func isNotContainInt32(target uint32, List []uint32) bool { + for _, i := range List { + if i == target { + return false + } + } + return true +} + func indexGen(uid string, seqSuffix uint32) string { return uid + ":" + strconv.FormatInt(int64(seqSuffix), 10) } diff --git a/pkg/common/db/redisModel.go b/pkg/common/db/redisModel.go index 32ff05326..524d9ca56 100644 --- a/pkg/common/db/redisModel.go +++ b/pkg/common/db/redisModel.go @@ -314,7 +314,7 @@ func (d *DataBases) SetMessageToCache(msgList []*pbChat.MsgDataToMQ, uid string, return nil } -func (d *DataBases) DelMsgFromCache(seqList []uint32, uid, operationID string) { +func (d *DataBases) DelMsgFromCache(uid string, seqList []uint32, operationID string) { for _, seq := range seqList { key := messageCache + uid + "_" + strconv.Itoa(int(seq)) result, err := redis.String(d.Exec("GET", key)) @@ -325,6 +325,7 @@ func (d *DataBases) DelMsgFromCache(seqList []uint32, uid, operationID string) { log2.Debug(operationID, utils.GetSelfFuncName(), "del result", result) var msg pbCommon.MsgData err = utils.String2Pb(result, &msg) + log2.NewDebug(operationID, utils.GetSelfFuncName(), "msg", msg) if err != nil { log2.NewWarn(operationID, utils.GetSelfFuncName(), "string2Pb failed", msg, err.Error()) continue