Merge branch 'errcode' of github.com:OpenIMSDK/Open-IM-Server into errcode

This commit is contained in:
wangchuxiao 2023-05-29 15:47:57 +08:00
commit b4844a8a88
2 changed files with 75 additions and 45 deletions

View File

@ -2,6 +2,7 @@ package controller
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"math/rand" "math/rand"
"strconv" "strconv"
@ -185,17 +186,28 @@ func Test_Revoke(t *testing.T) {
} }
} }
// func Test_Delete(t *testing.T) { func Test_FindBySeq(t *testing.T) {
// db := GetDB() db := GetDB()
// ctx := context.Background() ctx := context.Background()
// var arr []any res, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, "123456", "test:0", []int64{1, 2, 3})
// for i := 0; i < 123; i++ { if err != nil {
// arr = append(arr, []string{"uid_1", "uid_2"}) t.Fatal(err)
// } }
// if err := db.BatchInsertBlock(ctx, "test", arr, "", 210); err != nil { data, _ := json.Marshal(res)
// t.Fatal(err) fmt.Println(string(data))
// } }
// }
//func Test_Delete(t *testing.T) {
// db := GetDB()
// ctx := context.Background()
// var arr []any
// for i := 0; i < 123; i++ {
// arr = append(arr, []string{"uid_1", "uid_2"})
// }
// if err := db.BatchInsertBlock(ctx, "test", arr, updateKeyDel, 210); err != nil {
// t.Fatal(err)
// }
//}
//func Test_Delete1(t *testing.T) { //func Test_Delete1(t *testing.T) {
// config.Config.Mongo.DBAddress = []string{"192.168.44.128:37017"} // config.Config.Mongo.DBAddress = []string{"192.168.44.128:37017"}

View File

@ -6,7 +6,6 @@ import (
"fmt" "fmt"
table "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation" table "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs" "github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
@ -178,47 +177,66 @@ func (m *MsgMongoDriver) DeleteDocs(ctx context.Context, docIDs []string) error
return err return err
} }
func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(ctx context.Context, userID, docID string, seqs []int64) (msgs []*table.MsgInfoModel, err error) { func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(ctx context.Context, userID string, docID string, seqs []int64) (msgs []*table.MsgInfoModel, err error) {
beginSeq, endSeq := utils.GetSeqsBeginEnd(seqs) indexs := make([]int64, 0, len(seqs))
beginIndex := m.model.GetMsgIndex(beginSeq) for _, seq := range seqs {
num := endSeq - beginSeq + 1 indexs = append(indexs, m.model.GetMsgIndex(seq))
log.ZInfo(ctx, "GetMsgBySeqIndexIn1Doc", "docID", docID, "seqs", seqs, "beginSeq", beginSeq, "endSeq", endSeq, "beginIndex", beginIndex, "num", num) }
pipeline := bson.A{ pipeline := mongo.Pipeline{
bson.M{ {
"$match": bson.M{"doc_id": docID}, {"$match", bson.D{
{"doc_id", docID},
}},
}, },
bson.M{ {
"$project": bson.M{ {"$project", bson.D{
"msgs": bson.M{ {"_id", 0},
"$slice": bson.A{"$msgs", beginIndex, num}, {"msgs", bson.D{
}, {"$map", bson.D{
}, {"input", indexs},
{"as", "index"},
{"in", bson.D{
{"$let", bson.D{
{"vars", bson.D{
{"currentMsg", bson.D{
{"$arrayElemAt", []string{"$msgs", "$$index"}},
}},
}},
{"in", bson.D{
{"$cond", bson.D{
{"if", bson.D{
{"$in", []string{userID, "$$currentMsg.del_list"}},
}},
{"then", nil},
{"else", "$$currentMsg"},
}},
}},
}},
}},
}},
}},
}},
},
{
{"$project", bson.D{
{"doc_id", 0},
{"msgs.del_list", 0},
}},
}, },
} }
cursor, err := m.MsgCollection.Aggregate(ctx, pipeline) cur, err := m.MsgCollection.Aggregate(ctx, pipeline)
if err != nil { if err != nil {
return nil, errs.Wrap(err) return nil, errs.Wrap(err)
} }
defer cursor.Close(ctx) defer cur.Close(ctx)
var doc table.MsgDocModel var msgDocModel []table.MsgDocModel
if cursor.Next(ctx) { if err := cur.All(ctx, &msgDocModel); err != nil {
if err := cursor.Decode(&doc); err != nil { return nil, errs.Wrap(err)
return nil, err
}
} }
log.ZDebug(ctx, "msgInfos", "num", len(doc.Msg), "docID", docID) if len(msgDocModel) == 0 {
for _, v := range doc.Msg { return nil, errs.Wrap(mongo.ErrNoDocuments)
if v.Msg == nil {
continue
}
if v.Msg.Seq >= beginSeq && v.Msg.Seq <= endSeq {
log.ZDebug(ctx, "find msg", "msg", v.Msg)
msgs = append(msgs, v)
} else {
log.ZWarn(ctx, "this msg is at wrong position", nil, "msg", v.Msg)
}
} }
return msgs, nil return msgDocModel[0].Msg, nil
} }
func (m *MsgMongoDriver) IsExistDocID(ctx context.Context, docID string) (bool, error) { func (m *MsgMongoDriver) IsExistDocID(ctx context.Context, docID string) (bool, error) {