mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-05 05:12:45 +08:00
fix: the abnormal message has no sending time, causing the SDK to be abnormal (#3087)
* pb * fix: Modifying other fields while setting IsPrivateChat does not take effect * fix: quote message error revoke * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * upgrading pkg tools * fix * fix * optimize log output * feat: support GetLastMessage * feat: support GetLastMessage * feat: s3 switch * feat: s3 switch * fix: GetUsersOnline * feat: SendBusinessNotification supported configuration parameters * feat: SendBusinessNotification supported configuration parameters * feat: SendBusinessNotification supported configuration parameters * feat: seq conversion failed without exiting * fix: DeleteDoc crash * fix: fill send time * fix: fill send time
This commit is contained in:
parent
83c7b7134c
commit
274a9bee65
@ -1091,22 +1091,148 @@ func (m *MsgMgo) onlyFindDocIndex(ctx context.Context, docID string, indexes []i
|
||||
return msgDocModel[0].Msg, nil
|
||||
}
|
||||
|
||||
//func (m *MsgMgo) FindSeqs(ctx context.Context, conversationID string, seqs []int64) ([]*model.MsgInfoModel, error) {
|
||||
// if len(seqs) == 0 {
|
||||
// return nil, nil
|
||||
// }
|
||||
// result := make([]*model.MsgInfoModel, 0, len(seqs))
|
||||
// for docID, seqs := range m.model.GetDocIDSeqsMap(conversationID, seqs) {
|
||||
// res, err := m.onlyFindDocIndex(ctx, docID, datautil.Slice(seqs, m.model.GetMsgIndex))
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// for i, re := range res {
|
||||
// if re == nil || re.Msg == nil {
|
||||
// continue
|
||||
// }
|
||||
// result = append(result, res[i])
|
||||
// }
|
||||
// }
|
||||
// return result, nil
|
||||
//}
|
||||
|
||||
func (m *MsgMgo) findBeforeDocSendTime(ctx context.Context, docID string, limit int64) (int64, int64, error) {
|
||||
if limit == 0 {
|
||||
return 0, 0, nil
|
||||
}
|
||||
pipeline := []bson.M{
|
||||
{
|
||||
"$match": bson.M{
|
||||
"doc_id": docID,
|
||||
},
|
||||
},
|
||||
{
|
||||
"$project": bson.M{
|
||||
"_id": 0,
|
||||
"doc_id": 0,
|
||||
},
|
||||
},
|
||||
{
|
||||
"$unwind": "$msgs",
|
||||
},
|
||||
{
|
||||
"$project": bson.M{
|
||||
//"_id": 0,
|
||||
//"doc_id": 0,
|
||||
"msgs.msg.send_time": 1,
|
||||
"msgs.msg.seq": 1,
|
||||
},
|
||||
},
|
||||
}
|
||||
if limit > 0 {
|
||||
pipeline = append(pipeline, bson.M{"$limit": limit})
|
||||
}
|
||||
type Result struct {
|
||||
Msgs *model.MsgInfoModel `bson:"msgs"`
|
||||
}
|
||||
res, err := mongoutil.Aggregate[Result](ctx, m.coll, pipeline)
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
for i := len(res) - 1; i > 0; i-- {
|
||||
v := res[i]
|
||||
if v.Msgs != nil && v.Msgs.Msg != nil && v.Msgs.Msg.SendTime > 0 {
|
||||
return v.Msgs.Msg.Seq, v.Msgs.Msg.SendTime, nil
|
||||
}
|
||||
}
|
||||
return 0, 0, nil
|
||||
}
|
||||
|
||||
func (m *MsgMgo) findBeforeSendTime(ctx context.Context, conversationID string, seq int64) (int64, int64, error) {
|
||||
first := true
|
||||
for i := m.model.GetDocIndex(seq); i >= 0; i-- {
|
||||
limit := int64(-1)
|
||||
if first {
|
||||
first = false
|
||||
limit = m.model.GetMsgIndex(seq)
|
||||
}
|
||||
docID := m.model.BuildDocIDByIndex(conversationID, i)
|
||||
msgSeq, msgSendTime, err := m.findBeforeDocSendTime(ctx, docID, limit)
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
if msgSendTime > 0 {
|
||||
return msgSeq, msgSendTime, nil
|
||||
}
|
||||
}
|
||||
return 0, 0, nil
|
||||
}
|
||||
|
||||
func (m *MsgMgo) FindSeqs(ctx context.Context, conversationID string, seqs []int64) ([]*model.MsgInfoModel, error) {
|
||||
if len(seqs) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
var abnormalSeq []int64
|
||||
result := make([]*model.MsgInfoModel, 0, len(seqs))
|
||||
for docID, seqs := range m.model.GetDocIDSeqsMap(conversationID, seqs) {
|
||||
res, err := m.onlyFindDocIndex(ctx, docID, datautil.Slice(seqs, m.model.GetMsgIndex))
|
||||
for docID, docSeqs := range m.model.GetDocIDSeqsMap(conversationID, seqs) {
|
||||
res, err := m.onlyFindDocIndex(ctx, docID, datautil.Slice(docSeqs, m.model.GetMsgIndex))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(res) == 0 {
|
||||
abnormalSeq = append(abnormalSeq, docSeqs...)
|
||||
continue
|
||||
}
|
||||
for i, re := range res {
|
||||
if re == nil || re.Msg == nil {
|
||||
if re == nil || re.Msg == nil || re.Msg.SendTime == 0 {
|
||||
abnormalSeq = append(abnormalSeq, docSeqs[i])
|
||||
continue
|
||||
}
|
||||
result = append(result, res[i])
|
||||
}
|
||||
}
|
||||
if len(abnormalSeq) > 0 {
|
||||
datautil.Sort(abnormalSeq, false)
|
||||
sendTime := make(map[int64]int64)
|
||||
var (
|
||||
lastSeq int64
|
||||
lastSendTime int64
|
||||
)
|
||||
for _, seq := range abnormalSeq {
|
||||
if lastSendTime > 0 && lastSeq <= seq {
|
||||
sendTime[seq] = lastSendTime
|
||||
continue
|
||||
}
|
||||
msgSeq, msgSendTime, err := m.findBeforeSendTime(ctx, conversationID, seq)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if msgSendTime <= 0 {
|
||||
break
|
||||
}
|
||||
sendTime[seq] = msgSendTime
|
||||
lastSeq = msgSeq
|
||||
lastSendTime = msgSendTime
|
||||
}
|
||||
for _, seq := range abnormalSeq {
|
||||
result = append(result, &model.MsgInfoModel{
|
||||
Msg: &model.MsgDataModel{
|
||||
Seq: seq,
|
||||
Status: constant.MsgStatusHasDeleted,
|
||||
SendTime: sendTime[seq],
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
@ -15,9 +15,10 @@
|
||||
package model
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
|
||||
"github.com/openimsdk/protocol/sdkws"
|
||||
"github.com/openimsdk/tools/errs"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -108,6 +109,10 @@ func (m *MsgDocModel) IsFull() bool {
|
||||
return m.Msg[len(m.Msg)-1].Msg != nil
|
||||
}
|
||||
|
||||
func (m *MsgDocModel) GetDocIndex(seq int64) int64 {
|
||||
return (seq - 1) / singleGocMsgNum
|
||||
}
|
||||
|
||||
func (m *MsgDocModel) GetDocID(conversationID string, seq int64) string {
|
||||
seqSuffix := (seq - 1) / singleGocMsgNum
|
||||
return m.indexGen(conversationID, seqSuffix)
|
||||
@ -135,6 +140,10 @@ func (*MsgDocModel) indexGen(conversationID string, seqSuffix int64) string {
|
||||
return conversationID + ":" + strconv.FormatInt(seqSuffix, 10)
|
||||
}
|
||||
|
||||
func (*MsgDocModel) BuildDocIDByIndex(conversationID string, index int64) string {
|
||||
return conversationID + ":" + strconv.FormatInt(index, 10)
|
||||
}
|
||||
|
||||
func (*MsgDocModel) GenExceptionMessageBySeqs(seqs []int64) (exceptionMsg []*sdkws.MsgData) {
|
||||
for _, v := range seqs {
|
||||
msgModel := new(sdkws.MsgData)
|
||||
|
Loading…
x
Reference in New Issue
Block a user