This commit is contained in:
wangchuxiao 2023-05-15 11:44:51 +08:00
parent 54a5e06f37
commit dbec5f2191
4 changed files with 16 additions and 21 deletions

View File

@ -71,7 +71,6 @@ func (p *Pusher) MsgToUser(ctx context.Context, userID string, msg *sdkws.MsgDat
return err
}
isOfflinePush := utils.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush)
//log.NewInfo(operationID, "push_result", wsResults, "sendData", msg, "isOfflinePush", isOfflinePush)
log.ZDebug(ctx, "push_result", "ws push result", wsResults, "sendData", msg, "isOfflinePush", isOfflinePush, "push_to_userID", userID)
p.successCount++
if isOfflinePush && userID != msg.SendID {

View File

@ -396,23 +396,23 @@ func (db *commonMsgDatabase) refetchDelSeqsMsgs(ctx context.Context, conversatio
func (db *commonMsgDatabase) findMsgBySeq(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, unExistSeqs []int64, err error) {
beginSeq, endSeq := db.msg.GetSeqsBeginEnd(seqs)
log.ZDebug(ctx, "findMsgBySeq", "docID", docID, "seqs", seqs, "beginSeq", beginSeq, "endSeq", endSeq)
msgs, _, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, docID, beginSeq, endSeq)
msgs, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, docID, beginSeq, endSeq)
if err != nil {
return nil, nil, err
}
for _, seq := range seqs {
for i, msg := range msgs {
log.ZDebug(ctx, "findMsgBySeq", "docID", docID, "seqs", seqs, "beginSeq", beginSeq, "endSeq", endSeq, "len(msgs)", len(msgs))
seqMsgs = append(seqMsgs, msgs...)
for i, seq := range seqs {
for _, msg := range msgs {
if seq == msg.Seq {
seqMsgs = append(seqMsgs, msg)
continue
}
if i == len(msgs)-1 {
unExistSeqs = append(unExistSeqs, seq)
break
}
}
if i == len(seqs)-1 {
unExistSeqs = append(unExistSeqs, seq)
}
}
msgs, _, unExistSeqs, err = db.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, seqs)
msgs, _, unExistSeqs, err = db.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, unExistSeqs)
if err != nil {
return nil, nil, err
}

View File

@ -31,7 +31,7 @@ type MsgDocModelInterface interface {
Create(ctx context.Context, model *MsgDocModel) error
UpdateMsgStatusByIndexInOneDoc(ctx context.Context, docID string, msg *sdkws.MsgData, seqIndex int, status int32) error
FindOneByDocID(ctx context.Context, docID string) (*MsgDocModel, error)
GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, beginSeq, endSeq int64) ([]*sdkws.MsgData, []int64, error)
GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, beginSeq, endSeq int64) ([]*sdkws.MsgData, error)
GetNewestMsg(ctx context.Context, conversationID string) (*MsgInfoModel, error)
GetOldestMsg(ctx context.Context, conversationID string) (*MsgInfoModel, error)
Delete(ctx context.Context, docIDs []string) error

View File

@ -134,7 +134,7 @@ func (m *MsgMongoDriver) UpdateOneDoc(ctx context.Context, msg *table.MsgDocMode
return err
}
func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, beginSeq, endSeq int64) (msgs []*sdkws.MsgData, seqs []int64, err error) {
func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, beginSeq, endSeq int64) (msgs []*sdkws.MsgData, err error) {
beginIndex := m.msg.GetMsgIndex(beginSeq)
num := endSeq - beginSeq + 1
pipeline := bson.A{
@ -151,7 +151,7 @@ func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(ctx context.Context, docID strin
}
cursor, err := m.MsgCollection.Aggregate(ctx, pipeline)
if err != nil {
return nil, nil, errs.Wrap(err)
return nil, errs.Wrap(err)
}
defer cursor.Close(ctx)
var doc table.MsgDocModel
@ -159,28 +159,24 @@ func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(ctx context.Context, docID strin
for cursor.Next(ctx) {
err := cursor.Decode(&doc)
if err != nil {
return nil, nil, err
return nil, err
}
if i == 0 {
break
}
}
if len(doc.Msg) < 1 {
return nil, nil, errs.ErrRecordNotFound.Wrap("mongo GetMsgBySeqIndex failed, len is 0")
}
log.ZDebug(ctx, "msgInfos", "num", len(doc.Msg))
for _, v := range doc.Msg {
var msg sdkws.MsgData
if err := proto.Unmarshal(v.Msg, &msg); err != nil {
return nil, nil, err
return nil, err
}
if msg.Seq >= beginSeq && msg.Seq <= endSeq {
log.ZDebug(ctx, "find msg", "msg", &msg)
msgs = append(msgs, &msg)
seqs = append(seqs, msg.Seq)
} else {
log.ZWarn(ctx, "this msg is at wrong position", nil, "msg", &msg)
}
}
return msgs, seqs, nil
return msgs, nil
}