BatchInsertBlock

This commit is contained in:
withchao 2023-05-25 12:24:21 +08:00
parent 597a3cf9b5
commit 4c8c30f496
4 changed files with 158 additions and 13 deletions

View File

@ -3,6 +3,7 @@ package controller
import (
"fmt"
"sort"
"strconv"
"sync"
"time"
@ -141,6 +142,82 @@ func (db *commonMsgDatabase) MsgToMongoMQ(ctx context.Context, key, conversation
return nil
}
func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationID string, msgList []*unRelationTb.MsgInfoModel, firstSeq int64) error {
if len(msgList) == 0 {
return nil
}
num := db.msg.GetSingleGocMsgNum()
if msgList[0].Msg != nil {
firstSeq = msgList[0].Msg.Seq
}
getDocID := func(seq int64) string {
return conversationID + ":" + strconv.FormatInt(seq/num, 10)
}
getIndex := func(seq int64) int64 {
return seq % num
}
// 返回值为true表示数据库存在该文档false表示数据库不存在该文档
updateMsgModel := func(docID string, index int64, msg *unRelationTb.MsgInfoModel) (bool, error) {
var (
res *mongo.UpdateResult
err error
)
if msg.Msg != nil {
res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "msg", msgList[0].Msg)
} else if msg.Revoke != nil {
res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "revoke", msgList[0].Revoke)
} else if msg.DelList != nil {
res, err = db.msgDocDatabase.PushUnique(ctx, docID, index, "del_list", msgList[0].DelList)
} else if msg.ReadList != nil {
res, err = db.msgDocDatabase.PushUnique(ctx, docID, index, "read_list", msgList[0].ReadList)
} else {
return false, errs.ErrArgs.Wrap("msg all field is nil")
}
if err != nil {
return false, err
}
return res.MatchedCount > 0, nil
}
tryUpdate := true
for i := 0; i < len(msgList); i++ {
msg := msgList[i]
seq := firstSeq + int64(i)
docID := getDocID(seq)
if tryUpdate {
matched, err := updateMsgModel(docID, getIndex(seq), msg)
if err != nil {
return err
}
if matched {
continue
}
}
doc := unRelationTb.MsgDocModel{
DocID: docID,
Msg: make([]unRelationTb.MsgInfoModel, num),
}
var insert int
for j := i; j < len(msgList); j++ {
if getDocID(seq) != docID {
break
}
insert++
doc.Msg[getIndex(seq)] = *msgList[j]
}
if err := db.msgDocDatabase.Create(ctx, &doc); err != nil {
if mongo.IsDuplicateKeyError(err) {
i--
tryUpdate = true
continue
}
return err
}
tryUpdate = false
i += insert - 1
}
return nil
}
func (db *commonMsgDatabase) BatchInsertChat2DB(ctx context.Context, conversationID string, msgList []*sdkws.MsgData, currentMaxSeq int64) error {
num := db.msg.GetSingleGocMsgNum()
currentIndex := currentMaxSeq / num

View File

@ -38,6 +38,7 @@ func Test_BatchInsertChat2DB(t *testing.T) {
db := &commonMsgDatabase{
msgDocDatabase: unrelation.NewMsgMongoDriver(mongo.GetDatabase()),
}
//ctx := context.Background()
//msgs := make([]*sdkws.MsgData, 0, 1)
//for i := 0; i < cap(msgs); i++ {

View File

@ -2,10 +2,10 @@ package unrelation
import (
"context"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"go.mongodb.org/mongo-driver/mongo"
"strconv"
"strings"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
)
const (
@ -20,18 +20,56 @@ type MsgDocModel struct {
Msg []MsgInfoModel `bson:"msgs"`
}
type RevokeModel struct {
UserID string `bson:"user_id"`
Nickname string `bson:"nickname"`
Time int64 `bson:"time"`
}
type OfflinePushModel struct {
Title string `bson:"title"`
Desc string `bson:"desc"`
Ex string `bson:"ex"`
IOSPushSound string `bson:"ios_push_sound"`
IOSBadgeCount bool `bson:"ios_badge_count"`
}
type MsgDataModel struct {
SendID string `bson:"send_id"`
RecvID string `bson:"recv_id"`
GroupID string `bson:"group_id"`
ClientMsgID string `bson:"client_msg_id"`
ServerMsgID string `bson:"server_msg_id"`
SenderPlatformID int32 `bson:"sender_platform_id"`
SenderNickname string `bson:"sender_nickname"`
SenderFaceURL string `bson:"sender_face_url"`
SessionType int32 `bson:"session_type"`
MsgFrom int32 `bson:"msg_from"`
ContentType int32 `bson:"content_type"`
Content []byte `bson:"content"`
Seq int64 `bson:"seq"`
SendTime int64 `bson:"send_time"`
CreateTime int64 `bson:"create_time"`
Status int32 `bson:"status"`
Options map[string]bool `bson:"options"`
OfflinePush *OfflinePushModel `bson:"offline_push"`
AtUserIDList []string `bson:"at_user_id_list"`
AttachedInfo string `bson:"attached_info"`
Ex string `bson:"ex"`
}
type MsgInfoModel struct {
SendTime int64 `bson:"sendtime"`
Msg []byte `bson:"msg"`
Revoke bool `bson:"revoke"`
ReadList []string `bson:"read_list"`
DelList []string `bson:"del_list"`
Msg *MsgDataModel `bson:"msg"`
Revoke *RevokeModel `bson:"revoke"`
DelList []string `bson:"del_list"`
ReadList []string `bson:"read_list"`
}
type MsgDocModelInterface interface {
PushMsgsToDoc(ctx context.Context, docID string, msgsToMongo []MsgInfoModel) error
Create(ctx context.Context, model *MsgDocModel) error
UpdateMsg(ctx context.Context, docID string, index int64, info *MsgInfoModel) error
UpdateMsg(ctx context.Context, docID string, index int64, key string, value any) (*mongo.UpdateResult, error)
PushUnique(ctx context.Context, docID string, index int64, key string, value any) (*mongo.UpdateResult, error)
UpdateMsgContent(ctx context.Context, docID string, index int64, msg []byte) error
IsExistDocID(ctx context.Context, docID string) (bool, error)
UpdateMsgStatusByIndexInOneDoc(ctx context.Context, docID string, msg *sdkws.MsgData, seqIndex int, status int32) error

View File

@ -38,12 +38,41 @@ func (m *MsgMongoDriver) Create(ctx context.Context, model *table.MsgDocModel) e
return err
}
func (m *MsgMongoDriver) UpdateMsg(ctx context.Context, docID string, index int64, info *table.MsgInfoModel) error {
_, err := m.MsgCollection.UpdateOne(ctx, bson.M{"doc_id": docID}, bson.M{"$set": bson.M{fmt.Sprintf("msgs.%d", index): info}})
if err != nil {
return utils.Wrap(err, "")
func (m *MsgMongoDriver) UpdateMsg(ctx context.Context, docID string, index int64, key string, value any) (*mongo.UpdateResult, error) {
var field string
if key == "" {
field = fmt.Sprintf("msgs.%d", index)
} else {
field = fmt.Sprintf("msgs.%d.%s", index, key)
}
return nil
filter := bson.M{"doc_id": docID}
update := bson.M{"$set": bson.M{field: value}}
res, err := m.MsgCollection.UpdateOne(ctx, filter, update)
if err != nil {
return nil, utils.Wrap(err, "")
}
return res, nil
}
// PushUnique value must slice
func (m *MsgMongoDriver) PushUnique(ctx context.Context, docID string, index int64, key string, value any) (*mongo.UpdateResult, error) {
var field string
if key == "" {
field = fmt.Sprintf("msgs.%d", index)
} else {
field = fmt.Sprintf("msgs.%d.%s", index, key)
}
filter := bson.M{"doc_id": docID}
update := bson.M{
"$addToSet": bson.M{
field: bson.M{"$each": value},
},
}
res, err := m.MsgCollection.UpdateOne(ctx, filter, update)
if err != nil {
return nil, utils.Wrap(err, "")
}
return res, nil
}
func (m *MsgMongoDriver) UpdateMsgContent(ctx context.Context, docID string, index int64, msg []byte) error {