From 6230b53501ec685bac1c8803671185668279a4e2 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Mon, 22 May 2023 18:36:49 +0800 Subject: [PATCH] BatchInsertChat2DB --- internal/rpc/group/group.go | 6 +- pkg/common/db/controller/msg.go | 125 ++++++++++++++------------ pkg/common/db/controller/msg_test.go | 51 +++++++++++ pkg/common/db/table/unrelation/msg.go | 6 ++ pkg/common/db/unrelation/msg.go | 16 ++++ 5 files changed, 143 insertions(+), 61 deletions(-) create mode 100644 pkg/common/db/controller/msg_test.go diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 6f10bc6d6..6e564d784 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -135,12 +135,12 @@ func (s *groupServer) GenGroupID(ctx context.Context, groupID *string) error { } func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupReq) (*pbGroup.CreateGroupResp, error) { - if err := tokenverify.CheckAccessV3(ctx, req.OwnerUserID); err != nil { - return nil, err - } if req.OwnerUserID == "" { return nil, errs.ErrArgs.Wrap("no group owner") } + if err := tokenverify.CheckAccessV3(ctx, req.OwnerUserID); err != nil { + return nil, err + } userIDs := append(append(req.InitMembers, req.AdminUserIDs...), req.OwnerUserID) opUserID := mcontext.GetOpUserID(ctx) if !utils.Contain(opUserID, userIDs...) { diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index 42ad43182..c0bd53353 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -140,76 +140,85 @@ func (db *commonMsgDatabase) MsgToMongoMQ(ctx context.Context, key, conversation } func (db *commonMsgDatabase) BatchInsertChat2DB(ctx context.Context, conversationID string, msgList []*sdkws.MsgData, currentMaxSeq int64) error { - if int64(len(msgList)) > db.msg.GetSingleGocMsgNum() { - return errors.New("too large") - } - var remain int64 - blk0 := db.msg.GetSingleGocMsgNum() - 1 - //currentMaxSeq 4998 - if currentMaxSeq < db.msg.GetSingleGocMsgNum() { - remain = blk0 - currentMaxSeq //1 - } else { - excludeBlk0 := currentMaxSeq - blk0 //=1 - //(5000-1)%5000 == 4999 - remain = (db.msg.GetSingleGocMsgNum() - (excludeBlk0 % db.msg.GetSingleGocMsgNum())) % db.msg.GetSingleGocMsgNum() - } - //remain=1 - var insertCounter int64 - msgsToMongo := make([]unRelationTb.MsgInfoModel, 0) - msgsToMongoNext := make([]unRelationTb.MsgInfoModel, 0) - docID := "" - docIDNext := "" - var err error - for _, m := range msgList { - currentMaxSeq++ - sMsg := unRelationTb.MsgInfoModel{} - sMsg.SendTime = m.SendTime - m.Seq = currentMaxSeq - if sMsg.Msg, err = proto.Marshal(m); err != nil { - return utils.Wrap(err, "") + num := db.msg.GetSingleGocMsgNum() + currentIndex := currentMaxSeq / num + var blockMsgs []*[]*sdkws.MsgData + for i, data := range msgList { + data.Seq = currentMaxSeq + int64(i+1) + index := data.Seq/num - currentIndex + if i == 0 && index == 1 { + index-- + currentIndex++ } - if insertCounter < remain { - msgsToMongo = append(msgsToMongo, sMsg) - insertCounter++ - docID = db.msg.GetDocID(conversationID, currentMaxSeq) + var block *[]*sdkws.MsgData + if len(blockMsgs) == int(index) { + var size int64 + if i == 0 { + size = num - data.Seq%num + } else { + temp := int64(len(msgList)-len(*blockMsgs[0])) - int64(len(blockMsgs)-1)*num + if temp >= num { + size = num + } else { + size = temp % num + } + } + temp := make([]*sdkws.MsgData, 0, size) + block = &temp + blockMsgs = append(blockMsgs, block) } else { - msgsToMongoNext = append(msgsToMongoNext, sMsg) - docIDNext = db.msg.GetDocID(conversationID, currentMaxSeq) + block = blockMsgs[index] } + *block = append(*block, msgList[i]) } - - if docID != "" { - err = db.msgDocDatabase.PushMsgsToDoc(ctx, docID, msgsToMongo) + create := currentMaxSeq == 0 || ((*blockMsgs[0])[0].Seq%num == 0) + if !create { + exist, err := db.msgDocDatabase.IsExistDocID(ctx, db.msg.IndexDocID(conversationID, currentIndex)) if err != nil { - if err == mongo.ErrNoDocuments { - doc := &unRelationTb.MsgDocModel{} - doc.DocID = docID - doc.Msg = msgsToMongo - if err = db.msgDocDatabase.Create(ctx, doc); err != nil { + return err + } + create = !exist + } + for i, msgs := range blockMsgs { + docID := db.msg.IndexDocID(conversationID, currentIndex+int64(i)) + if create || i != 0 { // 插入 + doc := unRelationTb.MsgDocModel{ + DocID: docID, + Msg: make([]unRelationTb.MsgInfoModel, num), + } + for _, msg := range *msgs { + data, err := proto.Marshal(msg) + if err != nil { + return err + } + doc.Msg[msg.Seq%num] = unRelationTb.MsgInfoModel{ + SendTime: msg.SendTime, + Msg: data, + } + } + if err := db.msgDocDatabase.Create(ctx, &doc); err != nil { + prome.Inc(prome.MsgInsertMongoFailedCounter) + return utils.Wrap(err, "") + } + prome.Inc(prome.MsgInsertMongoSuccessCounter) + } else { // 修改 + for _, msg := range *msgs { + data, err := proto.Marshal(msg) + if err != nil { + return err + } + info := unRelationTb.MsgInfoModel{ + SendTime: msg.SendTime, + Msg: data, + } + if err := db.msgDocDatabase.UpdateMsg(ctx, docID, msg.Seq%num, &info); err != nil { prome.Inc(prome.MsgInsertMongoFailedCounter) return err } prome.Inc(prome.MsgInsertMongoSuccessCounter) - } else { - prome.Inc(prome.MsgInsertMongoFailedCounter) - return err } - } else { - log.ZDebug(ctx, "PushMsgsToDoc success", "docID", docID, "len", len(msgsToMongo)) - prome.Inc(prome.MsgInsertMongoSuccessCounter) } } - if docIDNext != "" { - nextDoc := &unRelationTb.MsgDocModel{} - nextDoc.DocID = docIDNext - nextDoc.Msg = msgsToMongoNext - log.ZDebug(ctx, "create next doc", "docIDNext", docIDNext, "len", len(nextDoc.Msg)) - if err = db.msgDocDatabase.Create(ctx, nextDoc); err != nil { - prome.Inc(prome.MsgInsertMongoFailedCounter) - return utils.Wrap(err, "") - } - prome.Inc(prome.MsgInsertMongoSuccessCounter) - } return nil } diff --git a/pkg/common/db/controller/msg_test.go b/pkg/common/db/controller/msg_test.go new file mode 100644 index 000000000..24e632a42 --- /dev/null +++ b/pkg/common/db/controller/msg_test.go @@ -0,0 +1,51 @@ +package controller + +import ( + "context" + "fmt" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/unrelation" + "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" + "testing" + "time" +) + +func Test_BatchInsertChat2DB(t *testing.T) { + config.Config.Mongo.DBAddress = []string{"192.168.44.128:37017"} + config.Config.Mongo.DBTimeout = 60 + config.Config.Mongo.DBDatabase = "openIM" + config.Config.Mongo.DBSource = "admin" + config.Config.Mongo.DBUserName = "root" + config.Config.Mongo.DBPassword = "openIM123" + config.Config.Mongo.DBMaxPoolSize = 100 + config.Config.Mongo.DBRetainChatRecords = 3650 + config.Config.Mongo.ChatRecordsClearTime = "0 2 * * 3" + + mongo, err := unrelation.NewMongo() + if err != nil { + t.Fatal(err) + } + err = mongo.GetDatabase().Client().Ping(context.Background(), nil) + if err != nil { + panic(err) + } + + db := &commonMsgDatabase{ + msgDocDatabase: unrelation.NewMsgMongoDriver(mongo.GetDatabase()), + } + ctx := context.Background() + + msgs := make([]*sdkws.MsgData, 0, 15000) + + for i := 0; i < cap(msgs); i++ { + msgs = append(msgs, &sdkws.MsgData{ + Content: []byte(fmt.Sprintf("test-%d", i)), + SendTime: time.Now().UnixMilli(), + }) + } + err = db.BatchInsertChat2DB(ctx, "test", msgs, 4999) + if err != nil { + panic(err) + } + +} diff --git a/pkg/common/db/table/unrelation/msg.go b/pkg/common/db/table/unrelation/msg.go index 7d67269d5..7548e01e4 100644 --- a/pkg/common/db/table/unrelation/msg.go +++ b/pkg/common/db/table/unrelation/msg.go @@ -29,6 +29,8 @@ type MsgInfoModel struct { type MsgDocModelInterface interface { PushMsgsToDoc(ctx context.Context, docID string, msgsToMongo []MsgInfoModel) error Create(ctx context.Context, model *MsgDocModel) error + UpdateMsg(ctx context.Context, docID string, index int64, info *MsgInfoModel) error + IsExistDocID(ctx context.Context, docID string) (bool, error) UpdateMsgStatusByIndexInOneDoc(ctx context.Context, docID string, msg *sdkws.MsgData, seqIndex int, status int32) error FindOneByDocID(ctx context.Context, docID string) (*MsgDocModel, error) GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, seqs []int64) ([]*sdkws.MsgData, error) @@ -67,6 +69,10 @@ func (m MsgDocModel) GetDocID(conversationID string, seq int64) string { return m.indexGen(conversationID, seqSuffix) } +func (m MsgDocModel) IndexDocID(conversationID string, index int64) string { + return m.indexGen(conversationID, index) +} + func (m MsgDocModel) GetSeqDocIDList(userID string, maxSeq int64) []string { seqMaxSuffix := maxSeq / singleGocMsgNum var seqUserIDs []string diff --git a/pkg/common/db/unrelation/msg.go b/pkg/common/db/unrelation/msg.go index 0db2f2faf..bf650c598 100644 --- a/pkg/common/db/unrelation/msg.go +++ b/pkg/common/db/unrelation/msg.go @@ -38,6 +38,14 @@ func (m *MsgMongoDriver) Create(ctx context.Context, model *table.MsgDocModel) e return err } +func (m *MsgMongoDriver) UpdateMsg(ctx context.Context, docID string, index int64, info *table.MsgInfoModel) error { + _, err := m.MsgCollection.UpdateOne(ctx, bson.M{"doc_id": docID}, bson.M{"$set": bson.M{fmt.Sprintf("msgs.%d", index): info}}) + if err != nil { + return utils.Wrap(err, "") + } + return nil +} + func (m *MsgMongoDriver) UpdateMsgStatusByIndexInOneDoc(ctx context.Context, docID string, msg *sdkws.MsgData, seqIndex int, status int32) error { msg.Status = status bytes, err := proto.Marshal(msg) @@ -212,3 +220,11 @@ func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(ctx context.Context, docID strin } return msgs, nil } + +func (m *MsgMongoDriver) IsExistDocID(ctx context.Context, docID string) (bool, error) { + count, err := m.MsgCollection.CountDocuments(ctx, bson.M{"doc_id": docID}) + if err != nil { + return false, errs.Wrap(err) + } + return count > 0, nil +}