mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-28 21:19:02 +08:00
Merge branch 'errcode' of github.com:OpenIMSDK/Open-IM-Server into errcode
This commit is contained in:
commit
d0f0fd6e66
@ -135,12 +135,12 @@ func (s *groupServer) GenGroupID(ctx context.Context, groupID *string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupReq) (*pbGroup.CreateGroupResp, error) {
|
func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupReq) (*pbGroup.CreateGroupResp, error) {
|
||||||
if err := tokenverify.CheckAccessV3(ctx, req.OwnerUserID); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if req.OwnerUserID == "" {
|
if req.OwnerUserID == "" {
|
||||||
return nil, errs.ErrArgs.Wrap("no group owner")
|
return nil, errs.ErrArgs.Wrap("no group owner")
|
||||||
}
|
}
|
||||||
|
if err := tokenverify.CheckAccessV3(ctx, req.OwnerUserID); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
userIDs := append(append(req.InitMembers, req.AdminUserIDs...), req.OwnerUserID)
|
userIDs := append(append(req.InitMembers, req.AdminUserIDs...), req.OwnerUserID)
|
||||||
opUserID := mcontext.GetOpUserID(ctx)
|
opUserID := mcontext.GetOpUserID(ctx)
|
||||||
if !utils.Contain(opUserID, userIDs...) {
|
if !utils.Contain(opUserID, userIDs...) {
|
||||||
|
@ -140,76 +140,85 @@ func (db *commonMsgDatabase) MsgToMongoMQ(ctx context.Context, key, conversation
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (db *commonMsgDatabase) BatchInsertChat2DB(ctx context.Context, conversationID string, msgList []*sdkws.MsgData, currentMaxSeq int64) error {
|
func (db *commonMsgDatabase) BatchInsertChat2DB(ctx context.Context, conversationID string, msgList []*sdkws.MsgData, currentMaxSeq int64) error {
|
||||||
if int64(len(msgList)) > db.msg.GetSingleGocMsgNum() {
|
num := db.msg.GetSingleGocMsgNum()
|
||||||
return errors.New("too large")
|
currentIndex := currentMaxSeq / num
|
||||||
}
|
var blockMsgs []*[]*sdkws.MsgData
|
||||||
var remain int64
|
for i, data := range msgList {
|
||||||
blk0 := db.msg.GetSingleGocMsgNum() - 1
|
data.Seq = currentMaxSeq + int64(i+1)
|
||||||
//currentMaxSeq 4998
|
index := data.Seq/num - currentIndex
|
||||||
if currentMaxSeq < db.msg.GetSingleGocMsgNum() {
|
if i == 0 && index == 1 {
|
||||||
remain = blk0 - currentMaxSeq //1
|
index--
|
||||||
} else {
|
currentIndex++
|
||||||
excludeBlk0 := currentMaxSeq - blk0 //=1
|
|
||||||
//(5000-1)%5000 == 4999
|
|
||||||
remain = (db.msg.GetSingleGocMsgNum() - (excludeBlk0 % db.msg.GetSingleGocMsgNum())) % db.msg.GetSingleGocMsgNum()
|
|
||||||
}
|
|
||||||
//remain=1
|
|
||||||
var insertCounter int64
|
|
||||||
msgsToMongo := make([]unRelationTb.MsgInfoModel, 0)
|
|
||||||
msgsToMongoNext := make([]unRelationTb.MsgInfoModel, 0)
|
|
||||||
docID := ""
|
|
||||||
docIDNext := ""
|
|
||||||
var err error
|
|
||||||
for _, m := range msgList {
|
|
||||||
currentMaxSeq++
|
|
||||||
sMsg := unRelationTb.MsgInfoModel{}
|
|
||||||
sMsg.SendTime = m.SendTime
|
|
||||||
m.Seq = currentMaxSeq
|
|
||||||
if sMsg.Msg, err = proto.Marshal(m); err != nil {
|
|
||||||
return utils.Wrap(err, "")
|
|
||||||
}
|
}
|
||||||
if insertCounter < remain {
|
var block *[]*sdkws.MsgData
|
||||||
msgsToMongo = append(msgsToMongo, sMsg)
|
if len(blockMsgs) == int(index) {
|
||||||
insertCounter++
|
var size int64
|
||||||
docID = db.msg.GetDocID(conversationID, currentMaxSeq)
|
if i == 0 {
|
||||||
|
size = num - data.Seq%num
|
||||||
|
} else {
|
||||||
|
temp := int64(len(msgList)-len(*blockMsgs[0])) - int64(len(blockMsgs)-1)*num
|
||||||
|
if temp >= num {
|
||||||
|
size = num
|
||||||
|
} else {
|
||||||
|
size = temp % num
|
||||||
|
}
|
||||||
|
}
|
||||||
|
temp := make([]*sdkws.MsgData, 0, size)
|
||||||
|
block = &temp
|
||||||
|
blockMsgs = append(blockMsgs, block)
|
||||||
} else {
|
} else {
|
||||||
msgsToMongoNext = append(msgsToMongoNext, sMsg)
|
block = blockMsgs[index]
|
||||||
docIDNext = db.msg.GetDocID(conversationID, currentMaxSeq)
|
|
||||||
}
|
}
|
||||||
|
*block = append(*block, msgList[i])
|
||||||
}
|
}
|
||||||
|
create := currentMaxSeq == 0 || ((*blockMsgs[0])[0].Seq%num == 0)
|
||||||
if docID != "" {
|
if !create {
|
||||||
err = db.msgDocDatabase.PushMsgsToDoc(ctx, docID, msgsToMongo)
|
exist, err := db.msgDocDatabase.IsExistDocID(ctx, db.msg.IndexDocID(conversationID, currentIndex))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == mongo.ErrNoDocuments {
|
return err
|
||||||
doc := &unRelationTb.MsgDocModel{}
|
}
|
||||||
doc.DocID = docID
|
create = !exist
|
||||||
doc.Msg = msgsToMongo
|
}
|
||||||
if err = db.msgDocDatabase.Create(ctx, doc); err != nil {
|
for i, msgs := range blockMsgs {
|
||||||
|
docID := db.msg.IndexDocID(conversationID, currentIndex+int64(i))
|
||||||
|
if create || i != 0 { // 插入
|
||||||
|
doc := unRelationTb.MsgDocModel{
|
||||||
|
DocID: docID,
|
||||||
|
Msg: make([]unRelationTb.MsgInfoModel, num),
|
||||||
|
}
|
||||||
|
for _, msg := range *msgs {
|
||||||
|
data, err := proto.Marshal(msg)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
doc.Msg[msg.Seq%num] = unRelationTb.MsgInfoModel{
|
||||||
|
SendTime: msg.SendTime,
|
||||||
|
Msg: data,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := db.msgDocDatabase.Create(ctx, &doc); err != nil {
|
||||||
|
prome.Inc(prome.MsgInsertMongoFailedCounter)
|
||||||
|
return utils.Wrap(err, "")
|
||||||
|
}
|
||||||
|
prome.Inc(prome.MsgInsertMongoSuccessCounter)
|
||||||
|
} else { // 修改
|
||||||
|
for _, msg := range *msgs {
|
||||||
|
data, err := proto.Marshal(msg)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
info := unRelationTb.MsgInfoModel{
|
||||||
|
SendTime: msg.SendTime,
|
||||||
|
Msg: data,
|
||||||
|
}
|
||||||
|
if err := db.msgDocDatabase.UpdateMsg(ctx, docID, msg.Seq%num, &info); err != nil {
|
||||||
prome.Inc(prome.MsgInsertMongoFailedCounter)
|
prome.Inc(prome.MsgInsertMongoFailedCounter)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
prome.Inc(prome.MsgInsertMongoSuccessCounter)
|
prome.Inc(prome.MsgInsertMongoSuccessCounter)
|
||||||
} else {
|
|
||||||
prome.Inc(prome.MsgInsertMongoFailedCounter)
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
log.ZDebug(ctx, "PushMsgsToDoc success", "docID", docID, "len", len(msgsToMongo))
|
|
||||||
prome.Inc(prome.MsgInsertMongoSuccessCounter)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if docIDNext != "" {
|
|
||||||
nextDoc := &unRelationTb.MsgDocModel{}
|
|
||||||
nextDoc.DocID = docIDNext
|
|
||||||
nextDoc.Msg = msgsToMongoNext
|
|
||||||
log.ZDebug(ctx, "create next doc", "docIDNext", docIDNext, "len", len(nextDoc.Msg))
|
|
||||||
if err = db.msgDocDatabase.Create(ctx, nextDoc); err != nil {
|
|
||||||
prome.Inc(prome.MsgInsertMongoFailedCounter)
|
|
||||||
return utils.Wrap(err, "")
|
|
||||||
}
|
|
||||||
prome.Inc(prome.MsgInsertMongoSuccessCounter)
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
51
pkg/common/db/controller/msg_test.go
Normal file
51
pkg/common/db/controller/msg_test.go
Normal file
@ -0,0 +1,51 @@
|
|||||||
|
package controller
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
||||||
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/unrelation"
|
||||||
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Test_BatchInsertChat2DB(t *testing.T) {
|
||||||
|
config.Config.Mongo.DBAddress = []string{"192.168.44.128:37017"}
|
||||||
|
config.Config.Mongo.DBTimeout = 60
|
||||||
|
config.Config.Mongo.DBDatabase = "openIM"
|
||||||
|
config.Config.Mongo.DBSource = "admin"
|
||||||
|
config.Config.Mongo.DBUserName = "root"
|
||||||
|
config.Config.Mongo.DBPassword = "openIM123"
|
||||||
|
config.Config.Mongo.DBMaxPoolSize = 100
|
||||||
|
config.Config.Mongo.DBRetainChatRecords = 3650
|
||||||
|
config.Config.Mongo.ChatRecordsClearTime = "0 2 * * 3"
|
||||||
|
|
||||||
|
mongo, err := unrelation.NewMongo()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
err = mongo.GetDatabase().Client().Ping(context.Background(), nil)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
db := &commonMsgDatabase{
|
||||||
|
msgDocDatabase: unrelation.NewMsgMongoDriver(mongo.GetDatabase()),
|
||||||
|
}
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
msgs := make([]*sdkws.MsgData, 0, 15000)
|
||||||
|
|
||||||
|
for i := 0; i < cap(msgs); i++ {
|
||||||
|
msgs = append(msgs, &sdkws.MsgData{
|
||||||
|
Content: []byte(fmt.Sprintf("test-%d", i)),
|
||||||
|
SendTime: time.Now().UnixMilli(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
err = db.BatchInsertChat2DB(ctx, "test", msgs, 4999)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -29,6 +29,8 @@ type MsgInfoModel struct {
|
|||||||
type MsgDocModelInterface interface {
|
type MsgDocModelInterface interface {
|
||||||
PushMsgsToDoc(ctx context.Context, docID string, msgsToMongo []MsgInfoModel) error
|
PushMsgsToDoc(ctx context.Context, docID string, msgsToMongo []MsgInfoModel) error
|
||||||
Create(ctx context.Context, model *MsgDocModel) error
|
Create(ctx context.Context, model *MsgDocModel) error
|
||||||
|
UpdateMsg(ctx context.Context, docID string, index int64, info *MsgInfoModel) error
|
||||||
|
IsExistDocID(ctx context.Context, docID string) (bool, error)
|
||||||
UpdateMsgStatusByIndexInOneDoc(ctx context.Context, docID string, msg *sdkws.MsgData, seqIndex int, status int32) error
|
UpdateMsgStatusByIndexInOneDoc(ctx context.Context, docID string, msg *sdkws.MsgData, seqIndex int, status int32) error
|
||||||
FindOneByDocID(ctx context.Context, docID string) (*MsgDocModel, error)
|
FindOneByDocID(ctx context.Context, docID string) (*MsgDocModel, error)
|
||||||
GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, seqs []int64) ([]*sdkws.MsgData, error)
|
GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, seqs []int64) ([]*sdkws.MsgData, error)
|
||||||
@ -67,6 +69,10 @@ func (m MsgDocModel) GetDocID(conversationID string, seq int64) string {
|
|||||||
return m.indexGen(conversationID, seqSuffix)
|
return m.indexGen(conversationID, seqSuffix)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m MsgDocModel) IndexDocID(conversationID string, index int64) string {
|
||||||
|
return m.indexGen(conversationID, index)
|
||||||
|
}
|
||||||
|
|
||||||
func (m MsgDocModel) GetSeqDocIDList(userID string, maxSeq int64) []string {
|
func (m MsgDocModel) GetSeqDocIDList(userID string, maxSeq int64) []string {
|
||||||
seqMaxSuffix := maxSeq / singleGocMsgNum
|
seqMaxSuffix := maxSeq / singleGocMsgNum
|
||||||
var seqUserIDs []string
|
var seqUserIDs []string
|
||||||
|
@ -38,6 +38,14 @@ func (m *MsgMongoDriver) Create(ctx context.Context, model *table.MsgDocModel) e
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *MsgMongoDriver) UpdateMsg(ctx context.Context, docID string, index int64, info *table.MsgInfoModel) error {
|
||||||
|
_, err := m.MsgCollection.UpdateOne(ctx, bson.M{"doc_id": docID}, bson.M{"$set": bson.M{fmt.Sprintf("msgs.%d", index): info}})
|
||||||
|
if err != nil {
|
||||||
|
return utils.Wrap(err, "")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (m *MsgMongoDriver) UpdateMsgStatusByIndexInOneDoc(ctx context.Context, docID string, msg *sdkws.MsgData, seqIndex int, status int32) error {
|
func (m *MsgMongoDriver) UpdateMsgStatusByIndexInOneDoc(ctx context.Context, docID string, msg *sdkws.MsgData, seqIndex int, status int32) error {
|
||||||
msg.Status = status
|
msg.Status = status
|
||||||
bytes, err := proto.Marshal(msg)
|
bytes, err := proto.Marshal(msg)
|
||||||
@ -212,3 +220,11 @@ func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(ctx context.Context, docID strin
|
|||||||
}
|
}
|
||||||
return msgs, nil
|
return msgs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *MsgMongoDriver) IsExistDocID(ctx context.Context, docID string) (bool, error) {
|
||||||
|
count, err := m.MsgCollection.CountDocuments(ctx, bson.M{"doc_id": docID})
|
||||||
|
if err != nil {
|
||||||
|
return false, errs.Wrap(err)
|
||||||
|
}
|
||||||
|
return count > 0, nil
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user