From ae7e681bcaba124e3e8130973fb5c3ceabaabd6a Mon Sep 17 00:00:00 2001
From: skiffer-git <44203734@qq.com>
Date: Fri, 20 May 2022 11:00:11 +0800
Subject: [PATCH] batch to mongo

---
 internal/msg_transfer/logic/db.go  |  5 ++
 pkg/common/db/batch_insert_chat.go | 78 ++++++++++++++++++++++++++++++
 pkg/common/db/mongoModel.go        | 47 +++++++++++++++++-
 3 files changed, 129 insertions(+), 1 deletion(-)
 create mode 100644 pkg/common/db/batch_insert_chat.go

diff --git a/internal/msg_transfer/logic/db.go b/internal/msg_transfer/logic/db.go
index 806b782c0..052b5f000 100644
--- a/internal/msg_transfer/logic/db.go
+++ b/internal/msg_transfer/logic/db.go
@@ -21,3 +21,8 @@ func saveUserChat(uid string, msg *pbMsg.MsgDataToMQ) error {
 	return db.DB.SaveUserChatMongo2(uid, pbSaveData.MsgData.SendTime, &pbSaveData)
 //	return db.DB.SaveUserChatMongo2(uid, pbSaveData.MsgData.SendTime, &pbSaveData)
 }
+
+func saveUserChatList(uid string, msgList []*pbMsg.MsgDataToMQ, operationID string) error {
+	log.Info(operationID, utils.GetSelfFuncName(), "args ", uid, len(msgList))
+	return db.DB.BatchInsertChat(uid, msgList, operationID)
+}
diff --git a/pkg/common/db/batch_insert_chat.go b/pkg/common/db/batch_insert_chat.go
new file mode 100644
index 000000000..9c23244f6
--- /dev/null
+++ b/pkg/common/db/batch_insert_chat.go
@@ -0,0 +1,78 @@
+package db
+
+import (
+	"Open_IM/pkg/common/config"
+	"Open_IM/pkg/common/log"
+	pbMsg "Open_IM/pkg/proto/chat"
+	"Open_IM/pkg/utils"
+	"context"
+	"errors"
+	"github.com/garyburd/redigo/redis"
+	"github.com/golang/protobuf/proto"
+	"go.mongodb.org/mongo-driver/bson"
+	"time"
+)
+
+func (d *DataBases) BatchInsertChat(userID string, msgList []*pbMsg.MsgDataToMQ, operationID string) error {
+	if len(msgList) > GetSingleGocMsgNum() {
+		return errors.New("too large")
+	}
+	currentMaxSeq, err := d.GetUserMaxSeq(userID)
+	if err == nil {
+
+	} else if err == redis.ErrNil {
+		currentMaxSeq = 0
+	} else {
+		return utils.Wrap(err, "")
+	}
+
+	remain := currentMaxSeq % uint64(GetSingleGocMsgNum())
+	insertCounter := uint64(0)
+	msgListToMongo := make([]MsgInfo, 0)
+	msgListToMongoNext := make([]MsgInfo, 0)
+	seqUid := ""
+	seqUidNext := ""
+	for _, m := range msgList {
+		currentMaxSeq++
+		sMsg := MsgInfo{}
+		sMsg.SendTime = m.MsgData.SendTime
+		m.MsgData.Seq = uint32(currentMaxSeq)
+		if sMsg.Msg, err = proto.Marshal(m.MsgData); err != nil {
+			return utils.Wrap(err, "")
+		}
+		if insertCounter < remain {
+			msgListToMongo = append(msgListToMongo, sMsg)
+			insertCounter++
+			seqUid = getSeqUid(userID, uint32(currentMaxSeq))
+		} else {
+			msgListToMongoNext = append(msgListToMongoNext, sMsg)
+			seqUidNext = getSeqUid(userID, uint32(currentMaxSeq))
+		}
+	}
+	ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
+	c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)
+	newTime := getCurrentTimestampByMill()
+
+	if seqUid != "" {
+		filter := bson.M{"uid": seqUid}
+		log.NewDebug(operationID, "filter ", seqUid)
+		err := c.FindOneAndUpdate(ctx, filter, bson.M{"$push": bson.M{"msg": bson.M{"$each": msgListToMongo}}}).Err()
+		if err != nil {
+			log.Error(operationID, "FindOneAndUpdate failed ", err.Error(), filter)
+			return utils.Wrap(err, "")
+		}
+	}
+
+	if seqUidNext != "" {
+		filter := bson.M{"uid": seqUidNext}
+		sChat := UserChat{}
+		sChat.UID = seqUidNext
+		sChat.Msg = msgListToMongoNext
+		if _, err = c.InsertOne(ctx, &sChat); err != nil {
+			log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat)
+			return utils.Wrap(err, "")
+		}
+	}
+	log.NewDebug(operationID, "find mgo uid cost time", getCurrentTimestampByMill()-newTime)
+	return nil
+}
diff --git a/pkg/common/db/mongoModel.go b/pkg/common/db/mongoModel.go
index 986f5ab78..02131a7df 100644
--- a/pkg/common/db/mongoModel.go
+++ b/pkg/common/db/mongoModel.go
@@ -29,6 +29,10 @@ const cTag = "tag"
 const cSendLog = "send_log"
 const singleGocMsgNum = 5000
 
+func GetSingleGocMsgNum() int {
+	return singleGocMsgNum
+}
+
 type MsgInfo struct {
 	SendTime int64
 	Msg      []byte
@@ -349,7 +353,7 @@ func (d *DataBases) SaveUserChatMongo2(uid string, sendTime int64, m *pbMsg.MsgD
 		return utils.Wrap(err, "")
 	}
 	err = c.FindOneAndUpdate(ctx, filter, bson.M{"$push": bson.M{"msg": sMsg}}).Err()
-	log.NewDebug(operationID, "get mgoSession cost time", getCurrentTimestampByMill()-newTime)
+	log.NewWarn(operationID, "get mgoSession cost time", getCurrentTimestampByMill()-newTime)
 	if err != nil {
 		sChat := UserChat{}
 		sChat.UID = seqUid
@@ -366,6 +370,47 @@ func (d *DataBases) SaveUserChatMongo2(uid string, sendTime int64, m *pbMsg.MsgD
 	return nil
 }
 
+//
+//func (d *DataBases) SaveUserChatListMongo2(uid string, sendTime int64, msgList []*pbMsg.MsgDataToDB) error {
+//	ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
+//	c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)
+//	newTime := getCurrentTimestampByMill()
+//	operationID := ""
+//	seqUid := ""
+//	msgListToMongo := make([]MsgInfo, 0)
+//
+//	for _, m := range msgList {
+//		seqUid = getSeqUid(uid, m.MsgData.Seq)
+//		var err error
+//		sMsg := MsgInfo{}
+//		sMsg.SendTime = sendTime
+//		if sMsg.Msg, err = proto.Marshal(m.MsgData); err != nil {
+//			return utils.Wrap(err, "")
+//		}
+//		msgListToMongo = append(msgListToMongo, sMsg)
+//	}
+//
+//	filter := bson.M{"uid": seqUid}
+//	log.NewDebug(operationID, "filter ", seqUid)
+//	err := c.FindOneAndUpdate(ctx, filter, bson.M{"$push": bson.M{"msg": bson.M{"$each": msgListToMongo}}}).Err()
+//	log.NewWarn(operationID, "get mgoSession cost time", getCurrentTimestampByMill()-newTime)
+//	if err != nil {
+//		sChat := UserChat{}
+//		sChat.UID = seqUid
+//		sChat.Msg = msgListToMongo
+//
+//		if _, err = c.InsertOne(ctx, &sChat); err != nil {
+//			log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat)
+//			return utils.Wrap(err, "")
+//		}
+//	} else {
+//		log.NewDebug(operationID, "FindOneAndUpdate ok", filter)
+//	}
+//
+//	log.NewDebug(operationID, "find mgo uid cost time", getCurrentTimestampByMill()-newTime)
+//	return nil
+//}
+
 func (d *DataBases) SaveUserChat(uid string, sendTime int64, m *pbMsg.MsgDataToDB) error {
 	var seqUid string
 	newTime := getCurrentTimestampByMill()