mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-11-05 21:02:11 +08:00
feat: msg format
This commit is contained in:
parent
fdfb75a66f
commit
32b7738112
2
go.mod
2
go.mod
@ -16,7 +16,7 @@ require (
|
||||
github.com/mitchellh/mapstructure v1.5.0
|
||||
github.com/openimsdk/localcache v0.0.1
|
||||
github.com/openimsdk/protocol v0.0.58-google
|
||||
github.com/openimsdk/tools v0.0.46-alpha.11
|
||||
github.com/openimsdk/tools v0.0.46-alpha.14
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/prometheus/client_golang v1.18.0
|
||||
github.com/sirupsen/logrus v1.9.3 // indirect
|
||||
|
||||
4
go.sum
4
go.sum
@ -270,8 +270,8 @@ github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
|
||||
github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs=
|
||||
github.com/openimsdk/protocol v0.0.58-google h1:cGNUVaXO9LqcFgIb4NvrtEOrv0spGecoQKyN8YWhyZs=
|
||||
github.com/openimsdk/protocol v0.0.58-google/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
|
||||
github.com/openimsdk/tools v0.0.46-alpha.11 h1:2+Rd+rhjjwvdKe3aJ7voRhBG+KJhh6nCNwfjrZotpbE=
|
||||
github.com/openimsdk/tools v0.0.46-alpha.11/go.mod h1:NAHPJyNUJm0n0WaZfIRC5s6Np+timv+xKIn5I8SKYaM=
|
||||
github.com/openimsdk/tools v0.0.46-alpha.14 h1:9znmFmI9sQn5mNLUXhtf84lVL/KpnQNoxBi6yiM1vZM=
|
||||
github.com/openimsdk/tools v0.0.46-alpha.14/go.mod h1:NAHPJyNUJm0n0WaZfIRC5s6Np+timv+xKIn5I8SKYaM=
|
||||
github.com/pelletier/go-buffruneio v0.2.0/go.mod h1:JkE26KsDizTr40EUHkXVtNPvgGtbSNq5BcowyYOWdKo=
|
||||
github.com/pelletier/go-toml/v2 v2.0.8 h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZR9tGQ=
|
||||
github.com/pelletier/go-toml/v2 v2.0.8/go.mod h1:vuYfssBdrU2XDZ9bYydBu6t+6a6PYNcZljzZR9VXg+4=
|
||||
|
||||
@ -18,6 +18,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo"
|
||||
"github.com/openimsdk/tools/log"
|
||||
"net/http"
|
||||
"os"
|
||||
@ -65,10 +66,6 @@ func Start(ctx context.Context, config *config.GlobalConfig, prometheusPort, ind
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = mongo.CreateMsgIndex(); err != nil {
|
||||
return err
|
||||
}
|
||||
client, err := kdisc.NewDiscoveryRegister(config)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -80,7 +77,10 @@ func Start(ctx context.Context, config *config.GlobalConfig, prometheusPort, ind
|
||||
|
||||
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
|
||||
msgModel := cache.NewMsgCacheModel(rdb, config.MsgCacheTimeout, &config.Redis)
|
||||
msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase(config.Mongo.Database))
|
||||
msgDocModel, err := mgo.NewMsgMongo(mongo.GetDatabase(config.Mongo.Database))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, &config.Kafka)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@ -19,6 +19,7 @@ import (
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/rpccache"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
|
||||
@ -72,11 +73,11 @@ func Start(ctx context.Context, config *config.GlobalConfig, client discoveryreg
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := mongo.CreateMsgIndex(); err != nil {
|
||||
cacheModel := cache.NewMsgCacheModel(rdb, config.MsgCacheTimeout, &config.Redis)
|
||||
msgDocModel, err := mgo.NewMsgMongo(mongo.GetDatabase(config.Mongo.Database))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cacheModel := cache.NewMsgCacheModel(rdb, config.MsgCacheTimeout, &config.Redis)
|
||||
msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase(config.Mongo.Database))
|
||||
conversationClient := rpcclient.NewConversationRpcClient(client, config.RpcRegisterName.OpenImConversationName)
|
||||
userRpcClient := rpcclient.NewUserRpcClient(client, config.RpcRegisterName.OpenImUserName, &config.Manager, &config.IMAdmin)
|
||||
groupRpcClient := rpcclient.NewGroupRpcClient(client, config.RpcRegisterName.OpenImGroupName)
|
||||
|
||||
@ -80,7 +80,7 @@ func Start(ctx context.Context, config *config.GlobalConfig, client registry.Svc
|
||||
return err
|
||||
}
|
||||
cache := cache.NewUserCacheRedis(rdb, userDB, cache.GetDefaultOpt())
|
||||
userMongoDB := unrelation.NewUserMongoDriver(mongo.GetDatabase(config.Mongo.Database))
|
||||
userMongoDB := mgo.NewUserMongoDriver(mongo.GetDatabase(config.Mongo.Database))
|
||||
database := controller.NewUserDatabase(userDB, cache, tx.NewMongo(mongo.GetClient()), userMongoDB)
|
||||
friendRpcClient := rpcclient.NewFriendRpcClient(client, config.RpcRegisterName.OpenImFriendName)
|
||||
groupRpcClient := rpcclient.NewGroupRpcClient(client, config.RpcRegisterName.OpenImGroupName)
|
||||
|
||||
@ -84,7 +84,7 @@ func InitMsgTool(ctx context.Context, config *config.GlobalConfig) (*MsgTool, er
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
userMongoDB := unrelation.NewUserMongoDriver(mongo.GetDatabase(config.Mongo.Database))
|
||||
userMongoDB := mgo.NewUserMongoDriver(mongo.GetDatabase(config.Mongo.Database))
|
||||
ctxTx := tx.NewMongo(mongo.GetClient())
|
||||
userDatabase := controller.NewUserDatabase(
|
||||
userDB,
|
||||
|
||||
@ -18,6 +18,7 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo"
|
||||
"time"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
@ -146,7 +147,10 @@ func NewCommonMsgDatabase(msgDocModel unrelationtb.MsgDocModelInterface, cacheMo
|
||||
|
||||
func InitCommonMsgDatabase(rdb redis.UniversalClient, database *mongo.Database, config *config.GlobalConfig) (CommonMsgDatabase, error) {
|
||||
cacheModel := cache.NewMsgCacheModel(rdb, config.MsgCacheTimeout, &config.Redis)
|
||||
msgDocModel := unrelation.NewMsgMongoDriver(database)
|
||||
msgDocModel, err := mgo.NewMsgMongo(database)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return NewCommonMsgDatabase(msgDocModel, cacheModel, &config.Kafka)
|
||||
}
|
||||
|
||||
|
||||
@ -1,250 +0,0 @@
|
||||
// Copyright © 2023 OpenIM. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package controller
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/openimsdk/tools/log"
|
||||
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
unrelationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation"
|
||||
)
|
||||
|
||||
func Test_BatchInsertChat2DB(t *testing.T) {
|
||||
conf := config.NewGlobalConfig()
|
||||
conf.Mongo.Address = []string{"192.168.44.128:37017"}
|
||||
// conf.Mongo.Timeout = 60
|
||||
conf.Mongo.Database = "openIM"
|
||||
// conf.Mongo.Source = "admin"
|
||||
conf.Mongo.Username = "root"
|
||||
conf.Mongo.Password = "openIM123"
|
||||
conf.Mongo.MaxPoolSize = 100
|
||||
conf.RetainChatRecords = 3650
|
||||
conf.ChatRecordsClearTime = "0 2 * * 3"
|
||||
|
||||
mongo, err := unrelation.NewMongoDB(context.Background(), &conf.Mongo)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
err = mongo.GetDatabase(conf.Mongo.Database).Client().Ping(context.Background(), nil)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
db := &commonMsgDatabase{
|
||||
msgDocDatabase: unrelation.NewMsgMongoDriver(mongo.GetDatabase(conf.Mongo.Database)),
|
||||
}
|
||||
|
||||
//ctx := context.Background()
|
||||
//msgs := make([]*sdkws.MsgData, 0, 1)
|
||||
//for i := 0; i < cap(msgs); i++ {
|
||||
// msgs = append(msgs, &sdkws.MsgData{
|
||||
// Content: []byte(fmt.Sprintf("test-%d", i)),
|
||||
// SendTime: time.Now().UnixMilli(),
|
||||
// })
|
||||
//}
|
||||
//err = db.BatchInsertChat2DB(ctx, "test", msgs, 0)
|
||||
//if err != nil {
|
||||
// panic(err)
|
||||
//}
|
||||
|
||||
_ = db.BatchInsertChat2DB
|
||||
c := mongo.GetDatabase(conf.Mongo.Database).Collection("msg")
|
||||
|
||||
ch := make(chan int)
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
|
||||
index := 10
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < 1000; i++ {
|
||||
wg.Add(1)
|
||||
go func(channelID int) {
|
||||
defer wg.Done()
|
||||
<-ch
|
||||
var arr []string
|
||||
for i := 0; i < 500; i++ {
|
||||
arr = append(arr, strconv.Itoa(i+1))
|
||||
}
|
||||
rand.Shuffle(len(arr), func(i, j int) {
|
||||
arr[i], arr[j] = arr[j], arr[i]
|
||||
})
|
||||
for j, s := range arr {
|
||||
if j == 0 {
|
||||
fmt.Printf("channnelID: %d, arr[0]: %s\n", channelID, arr[j])
|
||||
}
|
||||
filter := bson.M{"doc_id": "test:0"}
|
||||
update := bson.M{
|
||||
"$addToSet": bson.M{
|
||||
fmt.Sprintf("msgs.%d.del_list", index): bson.M{"$each": []string{s}},
|
||||
},
|
||||
}
|
||||
_, err := c.UpdateOne(context.Background(), filter, update)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
||||
for i := 0; i < 1000; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
<-ch
|
||||
var arr []string
|
||||
for i := 0; i < 500; i++ {
|
||||
arr = append(arr, strconv.Itoa(1001+i))
|
||||
}
|
||||
rand.Shuffle(len(arr), func(i, j int) {
|
||||
arr[i], arr[j] = arr[j], arr[i]
|
||||
})
|
||||
for _, s := range arr {
|
||||
filter := bson.M{"doc_id": "test:0"}
|
||||
update := bson.M{
|
||||
"$addToSet": bson.M{
|
||||
fmt.Sprintf("msgs.%d.read_list", index): bson.M{"$each": []string{s}},
|
||||
},
|
||||
}
|
||||
_, err := c.UpdateOne(context.Background(), filter, update)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
time.Sleep(time.Second * 2)
|
||||
|
||||
close(ch)
|
||||
|
||||
wg.Wait()
|
||||
|
||||
}
|
||||
|
||||
func GetDB() *commonMsgDatabase {
|
||||
conf := config.NewGlobalConfig()
|
||||
conf.Mongo.Address = []string{"203.56.175.233:37017"}
|
||||
// conf.Mongo.Timeout = 60
|
||||
conf.Mongo.Database = "openim_v3"
|
||||
// conf.Mongo.Source = "admin"
|
||||
conf.Mongo.Username = "root"
|
||||
conf.Mongo.Password = "openIM123"
|
||||
conf.Mongo.MaxPoolSize = 100
|
||||
conf.RetainChatRecords = 3650
|
||||
conf.ChatRecordsClearTime = "0 2 * * 3"
|
||||
|
||||
mongo, err := unrelation.NewMongoDB(context.Background(), &conf.Mongo)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
err = mongo.GetDatabase(conf.Mongo.Database).Client().Ping(context.Background(), nil)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return &commonMsgDatabase{
|
||||
msgDocDatabase: unrelation.NewMsgMongoDriver(mongo.GetDatabase(conf.Mongo.Database)),
|
||||
}
|
||||
}
|
||||
|
||||
func Test_Insert(t *testing.T) {
|
||||
db := GetDB()
|
||||
ctx := context.Background()
|
||||
var arr []any
|
||||
for i := 0; i < 345; i++ {
|
||||
if i%2 == 0 {
|
||||
arr = append(arr, (*unrelationtb.MsgDataModel)(nil))
|
||||
continue
|
||||
}
|
||||
arr = append(arr, &unrelationtb.MsgDataModel{
|
||||
Seq: int64(i),
|
||||
Content: fmt.Sprintf("test-%d", i),
|
||||
})
|
||||
}
|
||||
if err := db.BatchInsertBlock(ctx, "test", arr, updateKeyMsg, 1); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func Test_Revoke(t *testing.T) {
|
||||
db := GetDB()
|
||||
ctx := context.Background()
|
||||
var arr []any
|
||||
for i := 0; i < 456; i++ {
|
||||
arr = append(arr, &unrelationtb.RevokeModel{
|
||||
UserID: "uid_" + strconv.Itoa(i),
|
||||
Nickname: "uname_" + strconv.Itoa(i),
|
||||
Time: time.Now().UnixMilli(),
|
||||
})
|
||||
}
|
||||
if err := db.BatchInsertBlock(ctx, "test", arr, updateKeyRevoke, 123); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func Test_FindBySeq(t *testing.T) {
|
||||
if err := log.InitFromConfig("", "", 6, true, false, "", 2, 1, "1,0"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
db := GetDB()
|
||||
ctx := context.Background()
|
||||
fmt.Println(
|
||||
db.msgDocDatabase.(*unrelation.MsgMongoDriver).GetMsgBySeqIndexIn1Doc(ctx, "100", "si_100_101:0", []int64{1}),
|
||||
)
|
||||
//res, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, "123456", "test:0", []int64{1, 2, 3})
|
||||
//if err != nil {
|
||||
// t.Fatal(err)
|
||||
//}
|
||||
//db.GetMsgBySeqs(ctx, "100", "si_100_101:0", []int64{6})
|
||||
//data, _ := json.Marshal(res)
|
||||
//fmt.Println(string(data))
|
||||
}
|
||||
|
||||
//func Test_Delete(t *testing.T) {
|
||||
// db := GetDB()
|
||||
// ctx := context.Background()
|
||||
// var arr []any
|
||||
// for i := 0; i < 123; i++ {
|
||||
// arr = append(arr, []string{"uid_1", "uid_2"})
|
||||
// }
|
||||
// if err := db.BatchInsertBlock(ctx, "test", arr, updateKeyDel, 210); err != nil {
|
||||
// t.Fatal(err)
|
||||
// }
|
||||
//}
|
||||
|
||||
func TestName(t *testing.T) {
|
||||
db := GetDB()
|
||||
var seqs []int64
|
||||
for i := int64(1); i <= 50; i++ {
|
||||
seqs = append(seqs, i)
|
||||
}
|
||||
msgs, err := db.getMsgBySeqsRange(context.Background(), "4931176757", "si_3866692501_4931176757", seqs, seqs[0], seqs[len(seqs)-1])
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
t.Log(msgs)
|
||||
|
||||
}
|
||||
@ -87,10 +87,10 @@ type userDatabase struct {
|
||||
tx tx.CtxTx
|
||||
userDB relation.UserModelInterface
|
||||
cache cache.UserCache
|
||||
mongoDB unrelationtb.UserModelInterface
|
||||
mongoDB unrelationtb.SubscribeUserModelInterface
|
||||
}
|
||||
|
||||
func NewUserDatabase(userDB relation.UserModelInterface, cache cache.UserCache, tx tx.CtxTx, mongoDB unrelationtb.UserModelInterface) UserDatabase {
|
||||
func NewUserDatabase(userDB relation.UserModelInterface, cache cache.UserCache, tx tx.CtxTx, mongoDB unrelationtb.SubscribeUserModelInterface) UserDatabase {
|
||||
return &userDatabase{userDB: userDB, cache: cache, tx: tx, mongoDB: mongoDB}
|
||||
}
|
||||
|
||||
@ -161,7 +161,7 @@ func (u *userDatabase) Create(ctx context.Context, users []*relation.UserModel)
|
||||
}
|
||||
|
||||
//// Update (non-zero value) externally guarantees that userID exists.
|
||||
//func (u *userDatabase) Update(ctx context.Context, user *relation.UserModel) (err error) {
|
||||
//func (u *userDatabase) Update(ctx context.Context, user *relation.SubscribeUserModel) (err error) {
|
||||
// if err := u.userDB.Update(ctx, user); err != nil {
|
||||
// return err
|
||||
// }
|
||||
|
||||
@ -1,68 +1,56 @@
|
||||
// Copyright © 2023 OpenIM. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package unrelation
|
||||
package mgo
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
table "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
|
||||
"github.com/openimsdk/protocol/constant"
|
||||
"github.com/openimsdk/protocol/msg"
|
||||
"github.com/openimsdk/protocol/sdkws"
|
||||
"github.com/openimsdk/tools/errs"
|
||||
"github.com/openimsdk/tools/log"
|
||||
"github.com/openimsdk/tools/mgoutil"
|
||||
"github.com/openimsdk/tools/utils"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
"google.golang.org/protobuf/proto"
|
||||
"time"
|
||||
)
|
||||
|
||||
var ErrMsgListNotExist = errors.New("user not have msg in mongoDB")
|
||||
|
||||
type MsgMongoDriver struct {
|
||||
MsgCollection *mongo.Collection
|
||||
model table.MsgDocModel
|
||||
func NewMsgMongo(db *mongo.Database) (relation.MsgDocModelInterface, error) {
|
||||
coll := db.Collection(new(relation.MsgDocModel).TableName())
|
||||
_, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{
|
||||
Keys: bson.D{
|
||||
{Key: "doc_id", Value: 1},
|
||||
},
|
||||
Options: options.Index().SetUnique(true),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, errs.Wrap(err)
|
||||
}
|
||||
return &MsgMgo{coll: coll}, nil
|
||||
}
|
||||
|
||||
func NewMsgMongoDriver(database *mongo.Database) table.MsgDocModelInterface {
|
||||
collection := database.Collection(table.MsgDocModel{}.TableName())
|
||||
return &MsgMongoDriver{MsgCollection: collection}
|
||||
type MsgMgo struct {
|
||||
coll *mongo.Collection
|
||||
model relation.MsgDocModel
|
||||
}
|
||||
|
||||
func (m *MsgMongoDriver) PushMsgsToDoc(ctx context.Context, docID string, msgsToMongo []table.MsgInfoModel) error {
|
||||
return m.MsgCollection.FindOneAndUpdate(ctx, bson.M{"doc_id": docID}, bson.M{"$push": bson.M{"msgs": bson.M{"$each": msgsToMongo}}}).
|
||||
Err()
|
||||
func (m *MsgMgo) PushMsgsToDoc(ctx context.Context, docID string, msgsToMongo []relation.MsgInfoModel) error {
|
||||
filter := bson.M{"doc_id": docID}
|
||||
update := bson.M{"$push": bson.M{"msgs": bson.M{"$each": msgsToMongo}}}
|
||||
return mgoutil.UpdateOne(ctx, m.coll, filter, update, false)
|
||||
}
|
||||
|
||||
func (m *MsgMongoDriver) Create(ctx context.Context, model *table.MsgDocModel) error {
|
||||
_, err := m.MsgCollection.InsertOne(ctx, model)
|
||||
return err
|
||||
func (m *MsgMgo) Create(ctx context.Context, model *relation.MsgDocModel) error {
|
||||
return mgoutil.InsertMany(ctx, m.coll, []*relation.MsgDocModel{model})
|
||||
}
|
||||
|
||||
func (m *MsgMongoDriver) UpdateMsg(
|
||||
ctx context.Context,
|
||||
docID string,
|
||||
index int64,
|
||||
key string,
|
||||
value any,
|
||||
) (*mongo.UpdateResult, error) {
|
||||
func (m *MsgMgo) UpdateMsg(ctx context.Context, docID string, index int64, key string, value any) (*mongo.UpdateResult, error) {
|
||||
var field string
|
||||
if key == "" {
|
||||
field = fmt.Sprintf("msgs.%d", index)
|
||||
@ -71,21 +59,10 @@ func (m *MsgMongoDriver) UpdateMsg(
|
||||
}
|
||||
filter := bson.M{"doc_id": docID}
|
||||
update := bson.M{"$set": bson.M{field: value}}
|
||||
res, err := m.MsgCollection.UpdateOne(ctx, filter, update)
|
||||
if err != nil {
|
||||
return nil, errs.Wrap(err)
|
||||
}
|
||||
return res, nil
|
||||
return mgoutil.UpdateOneResult(ctx, m.coll, filter, update)
|
||||
}
|
||||
|
||||
// PushUnique value must slice.
|
||||
func (m *MsgMongoDriver) PushUnique(
|
||||
ctx context.Context,
|
||||
docID string,
|
||||
index int64,
|
||||
key string,
|
||||
value any,
|
||||
) (*mongo.UpdateResult, error) {
|
||||
func (m *MsgMgo) PushUnique(ctx context.Context, docID string, index int64, key string, value any) (*mongo.UpdateResult, error) {
|
||||
var field string
|
||||
if key == "" {
|
||||
field = fmt.Sprintf("msgs.%d", index)
|
||||
@ -98,138 +75,24 @@ func (m *MsgMongoDriver) PushUnique(
|
||||
field: bson.M{"$each": value},
|
||||
},
|
||||
}
|
||||
res, err := m.MsgCollection.UpdateOne(ctx, filter, update)
|
||||
if err != nil {
|
||||
return nil, errs.Wrap(err)
|
||||
}
|
||||
return res, nil
|
||||
return mgoutil.UpdateOneResult(ctx, m.coll, filter, update)
|
||||
}
|
||||
|
||||
func (m *MsgMongoDriver) UpdateMsgContent(ctx context.Context, docID string, index int64, msg []byte) error {
|
||||
_, err := m.MsgCollection.UpdateOne(
|
||||
ctx,
|
||||
bson.M{"doc_id": docID},
|
||||
bson.M{"$set": bson.M{fmt.Sprintf("msgs.%d.msg", index): msg}},
|
||||
)
|
||||
if err != nil {
|
||||
return errs.Wrap(err)
|
||||
}
|
||||
return nil
|
||||
func (m *MsgMgo) UpdateMsgContent(ctx context.Context, docID string, index int64, msg []byte) error {
|
||||
filter := bson.M{"doc_id": docID}
|
||||
update := bson.M{"$set": bson.M{fmt.Sprintf("msgs.%d.msg", index): msg}}
|
||||
return mgoutil.UpdateOne(ctx, m.coll, filter, update, false)
|
||||
}
|
||||
|
||||
func (m *MsgMongoDriver) UpdateMsgStatusByIndexInOneDoc(ctx context.Context, docID string, msg *sdkws.MsgData, seqIndex int, status int32) error {
|
||||
msg.Status = status
|
||||
bytes, err := proto.Marshal(msg)
|
||||
if err != nil {
|
||||
return errs.Wrap(err)
|
||||
}
|
||||
_, err = m.MsgCollection.UpdateOne(
|
||||
ctx,
|
||||
bson.M{"doc_id": docID},
|
||||
bson.M{"$set": bson.M{fmt.Sprintf("msgs.%d.msg", seqIndex): bytes}},
|
||||
)
|
||||
if err != nil {
|
||||
return errs.WrapMsg(err, fmt.Sprintf("docID is %s, seqIndex is %d", docID, seqIndex))
|
||||
}
|
||||
return nil
|
||||
func (m *MsgMgo) IsExistDocID(ctx context.Context, docID string) (bool, error) {
|
||||
return mgoutil.Exist(ctx, m.coll, bson.M{"doc_id": docID})
|
||||
}
|
||||
|
||||
func (m *MsgMongoDriver) FindOneByDocID(ctx context.Context, docID string) (*table.MsgDocModel, error) {
|
||||
doc := &table.MsgDocModel{}
|
||||
err := m.MsgCollection.FindOne(ctx, bson.M{"doc_id": docID}).Decode(doc)
|
||||
return doc, err
|
||||
func (m *MsgMgo) FindOneByDocID(ctx context.Context, docID string) (*relation.MsgDocModel, error) {
|
||||
return mgoutil.FindOne[*relation.MsgDocModel](ctx, m.coll, bson.M{"doc_id": docID})
|
||||
}
|
||||
|
||||
func (m *MsgMongoDriver) GetMsgDocModelByIndex(
|
||||
ctx context.Context,
|
||||
conversationID string,
|
||||
index, sort int64,
|
||||
) (*table.MsgDocModel, error) {
|
||||
if sort != 1 && sort != -1 {
|
||||
return nil, errs.ErrArgs.WrapMsg("mongo sort must be 1 or -1")
|
||||
}
|
||||
findOpts := options.Find().SetLimit(1).SetSkip(index).SetSort(bson.M{"doc_id": sort})
|
||||
cursor, err := m.MsgCollection.Find(
|
||||
ctx,
|
||||
bson.M{"doc_id": primitive.Regex{Pattern: fmt.Sprintf("^%s:", conversationID)}},
|
||||
findOpts,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, errs.WrapMsg(err, fmt.Sprintf("conversationID is %s", conversationID))
|
||||
}
|
||||
var msgs []table.MsgDocModel
|
||||
err = cursor.All(ctx, &msgs)
|
||||
if err != nil {
|
||||
return nil, errs.WrapMsg(err, fmt.Sprintf("cursor is %s", cursor.Current.String()))
|
||||
}
|
||||
if len(msgs) > 0 {
|
||||
return &msgs[0], nil
|
||||
}
|
||||
return nil, ErrMsgListNotExist
|
||||
}
|
||||
|
||||
func (m *MsgMongoDriver) GetNewestMsg(ctx context.Context, conversationID string) (*table.MsgInfoModel, error) {
|
||||
var skip int64 = 0
|
||||
for {
|
||||
msgDocModel, err := m.GetMsgDocModelByIndex(ctx, conversationID, skip, -1)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for i := len(msgDocModel.Msg) - 1; i >= 0; i-- {
|
||||
if msgDocModel.Msg[i].Msg != nil {
|
||||
return msgDocModel.Msg[i], nil
|
||||
}
|
||||
}
|
||||
skip++
|
||||
}
|
||||
}
|
||||
|
||||
func (m *MsgMongoDriver) GetOldestMsg(ctx context.Context, conversationID string) (*table.MsgInfoModel, error) {
|
||||
var skip int64 = 0
|
||||
for {
|
||||
msgDocModel, err := m.GetMsgDocModelByIndex(ctx, conversationID, skip, 1)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for i, v := range msgDocModel.Msg {
|
||||
if v.Msg != nil {
|
||||
return msgDocModel.Msg[i], nil
|
||||
}
|
||||
}
|
||||
skip++
|
||||
}
|
||||
}
|
||||
|
||||
func (m *MsgMongoDriver) DeleteMsgsInOneDocByIndex(ctx context.Context, docID string, indexes []int) error {
|
||||
updates := bson.M{
|
||||
"$set": bson.M{},
|
||||
}
|
||||
for _, index := range indexes {
|
||||
updates["$set"].(bson.M)[fmt.Sprintf("msgs.%d", index)] = bson.M{
|
||||
"msg": nil,
|
||||
}
|
||||
}
|
||||
_, err := m.MsgCollection.UpdateMany(ctx, bson.M{"doc_id": docID}, updates)
|
||||
if err != nil {
|
||||
return errs.WrapMsg(err, fmt.Sprintf("docID is %s, indexes is %v", docID, indexes))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MsgMongoDriver) DeleteDocs(ctx context.Context, docIDs []string) error {
|
||||
if docIDs == nil {
|
||||
return nil
|
||||
}
|
||||
_, err := m.MsgCollection.DeleteMany(ctx, bson.M{"doc_id": bson.M{"$in": docIDs}})
|
||||
return err
|
||||
}
|
||||
|
||||
func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(
|
||||
ctx context.Context,
|
||||
userID string,
|
||||
docID string,
|
||||
seqs []int64,
|
||||
) (msgs []*table.MsgInfoModel, err error) {
|
||||
func (m *MsgMgo) GetMsgBySeqIndexIn1Doc(ctx context.Context, userID, docID string, seqs []int64) ([]*relation.MsgInfoModel, error) {
|
||||
indexs := make([]int64, 0, len(seqs))
|
||||
for _, seq := range seqs {
|
||||
indexs = append(indexs, m.model.GetMsgIndex(seq))
|
||||
@ -270,20 +133,14 @@ func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(
|
||||
{Key: "msgs.del_list", Value: 0},
|
||||
}}},
|
||||
}
|
||||
|
||||
cur, err := m.MsgCollection.Aggregate(ctx, pipeline)
|
||||
msgDocModel, err := mgoutil.Aggregate[*relation.MsgDocModel](ctx, m.coll, pipeline)
|
||||
if err != nil {
|
||||
return nil, errs.Wrap(err)
|
||||
}
|
||||
defer cur.Close(ctx)
|
||||
var msgDocModel []table.MsgDocModel
|
||||
if err := cur.All(ctx, &msgDocModel); err != nil {
|
||||
return nil, errs.WrapMsg(err, fmt.Sprintf("docID is %s, seqs is %v", docID, seqs))
|
||||
return nil, err
|
||||
}
|
||||
if len(msgDocModel) == 0 {
|
||||
return nil, errs.Wrap(mongo.ErrNoDocuments)
|
||||
}
|
||||
msgs = make([]*table.MsgInfoModel, 0, len(msgDocModel[0].Msg))
|
||||
msgs := make([]*relation.MsgInfoModel, 0, len(msgDocModel[0].Msg))
|
||||
for i := range msgDocModel[0].Msg {
|
||||
msg := msgDocModel[0].Msg[i]
|
||||
if msg == nil || msg.Msg == nil {
|
||||
@ -303,14 +160,14 @@ func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(
|
||||
Seq: msg.Msg.Seq,
|
||||
Ex: msg.Msg.Ex,
|
||||
}
|
||||
data, err := json.Marshal(&revokeContent)
|
||||
data, err := utils.JsonMarshal(&revokeContent)
|
||||
if err != nil {
|
||||
return nil, errs.WrapMsg(err, fmt.Sprintf("docID is %s, seqs is %v", docID, seqs))
|
||||
}
|
||||
elem := sdkws.NotificationElem{
|
||||
Detail: string(data),
|
||||
}
|
||||
content, err := json.Marshal(&elem)
|
||||
content, err := utils.JsonMarshal(&elem)
|
||||
if err != nil {
|
||||
return nil, errs.WrapMsg(err, fmt.Sprintf("docID is %s, seqs is %v", docID, seqs))
|
||||
}
|
||||
@ -322,16 +179,72 @@ func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(
|
||||
return msgs, nil
|
||||
}
|
||||
|
||||
func (m *MsgMongoDriver) IsExistDocID(ctx context.Context, docID string) (bool, error) {
|
||||
count, err := m.MsgCollection.CountDocuments(ctx, bson.M{"doc_id": docID})
|
||||
func (m *MsgMgo) GetNewestMsg(ctx context.Context, conversationID string) (*relation.MsgInfoModel, error) {
|
||||
for skip := int64(0); ; skip++ {
|
||||
msgDocModel, err := m.GetMsgDocModelByIndex(ctx, conversationID, skip, -1)
|
||||
if err != nil {
|
||||
return false, errs.WrapMsg(err, fmt.Sprintf("docID is %s", docID))
|
||||
return nil, err
|
||||
}
|
||||
for i := len(msgDocModel.Msg) - 1; i >= 0; i-- {
|
||||
if msgDocModel.Msg[i].Msg != nil {
|
||||
return msgDocModel.Msg[i], nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return count > 0, nil
|
||||
}
|
||||
|
||||
func (m *MsgMongoDriver) MarkSingleChatMsgsAsRead(ctx context.Context, userID string, docID string, indexes []int64) error {
|
||||
updates := []mongo.WriteModel{}
|
||||
func (m *MsgMgo) GetOldestMsg(ctx context.Context, conversationID string) (*relation.MsgInfoModel, error) {
|
||||
for skip := int64(0); ; skip++ {
|
||||
msgDocModel, err := m.GetMsgDocModelByIndex(ctx, conversationID, skip, 1)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for i, v := range msgDocModel.Msg {
|
||||
if v.Msg != nil {
|
||||
return msgDocModel.Msg[i], nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *MsgMgo) DeleteDocs(ctx context.Context, docIDs []string) error {
|
||||
if len(docIDs) == 0 {
|
||||
return nil
|
||||
}
|
||||
return mgoutil.DeleteMany(ctx, m.coll, bson.M{"doc_id": bson.M{"$in": docIDs}})
|
||||
}
|
||||
|
||||
func (m *MsgMgo) GetMsgDocModelByIndex(ctx context.Context, conversationID string, index, sort int64) (*relation.MsgDocModel, error) {
|
||||
if sort != 1 && sort != -1 {
|
||||
return nil, errs.ErrArgs.WrapMsg("mongo sort must be 1 or -1")
|
||||
}
|
||||
opt := options.Find().SetLimit(1).SetSkip(index).SetSort(bson.M{"doc_id": sort}).SetLimit(1)
|
||||
filter := bson.M{"doc_id": primitive.Regex{Pattern: fmt.Sprintf("^%s:", conversationID)}}
|
||||
msgs, err := mgoutil.Find[*relation.MsgDocModel](ctx, m.coll, filter, opt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(msgs) > 0 {
|
||||
return msgs[0], nil
|
||||
}
|
||||
return nil, errs.Wrap(ErrMsgListNotExist)
|
||||
}
|
||||
|
||||
func (m *MsgMgo) DeleteMsgsInOneDocByIndex(ctx context.Context, docID string, indexes []int) error {
|
||||
update := bson.M{
|
||||
"$set": bson.M{},
|
||||
}
|
||||
for _, index := range indexes {
|
||||
update["$set"].(bson.M)[fmt.Sprintf("msgs.%d", index)] = bson.M{
|
||||
"msg": nil,
|
||||
}
|
||||
}
|
||||
_, err := mgoutil.UpdateMany(ctx, m.coll, bson.M{"doc_id": docID}, update)
|
||||
return err
|
||||
}
|
||||
|
||||
func (m *MsgMgo) MarkSingleChatMsgsAsRead(ctx context.Context, userID string, docID string, indexes []int64) error {
|
||||
var updates []mongo.WriteModel
|
||||
for _, index := range indexes {
|
||||
filter := bson.M{
|
||||
"doc_id": docID,
|
||||
@ -349,204 +262,110 @@ func (m *MsgMongoDriver) MarkSingleChatMsgsAsRead(ctx context.Context, userID st
|
||||
SetUpdate(update)
|
||||
updates = append(updates, updateModel)
|
||||
}
|
||||
_, err := m.MsgCollection.BulkWrite(ctx, updates)
|
||||
if _, err := m.coll.BulkWrite(ctx, updates); err != nil {
|
||||
return errs.WrapMsg(err, fmt.Sprintf("docID is %s, indexes is %v", docID, indexes))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// RangeUserSendCount
|
||||
// db.msg.aggregate([
|
||||
//
|
||||
// {
|
||||
// $match: {
|
||||
// "msgs.msg.send_time": {
|
||||
// "$gte": 0,
|
||||
// "$lt": 1788122092317
|
||||
// }
|
||||
// }
|
||||
// },
|
||||
// {
|
||||
// "$addFields": {
|
||||
// "msgs": {
|
||||
// "$filter": {
|
||||
// "input": "$msgs",
|
||||
// "as": "item",
|
||||
// "cond": {
|
||||
// "$and": [
|
||||
// {
|
||||
// $gte: ["$$item.msg.send_time", 0]
|
||||
// },
|
||||
// {
|
||||
// $lt: ["$$item.msg.send_time", 1788122092317]
|
||||
// }
|
||||
// ]
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// },
|
||||
// {
|
||||
// "$project": {
|
||||
// "_id": 0,
|
||||
//
|
||||
// },
|
||||
//
|
||||
// },
|
||||
// {
|
||||
// "$project": {
|
||||
// "result": {
|
||||
// "$map": {
|
||||
// "input": "$msgs",
|
||||
// "as": "item",
|
||||
// "in": {
|
||||
// user_id: "$$item.msg.send_id",
|
||||
// send_date: {
|
||||
// $dateToString: {
|
||||
// format: "%Y-%m-%d",
|
||||
// date: {
|
||||
// $toDate: "$$item.msg.send_time"
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// },
|
||||
//
|
||||
// },
|
||||
// {
|
||||
// "$unwind": "$result"
|
||||
// },
|
||||
// {
|
||||
// "$group": {
|
||||
// _id: "$result.send_date",
|
||||
// count: {
|
||||
// $sum: 1
|
||||
// },
|
||||
// original: {
|
||||
// $push: "$$ROOT"
|
||||
// }
|
||||
// }
|
||||
// },
|
||||
// {
|
||||
// "$addFields": {
|
||||
// "dates": "$$ROOT"
|
||||
// }
|
||||
// },
|
||||
// {
|
||||
// "$project": {
|
||||
// "_id": 0,
|
||||
// "count": 0,
|
||||
// "dates.original": 0,
|
||||
//
|
||||
// },
|
||||
//
|
||||
// },
|
||||
// {
|
||||
// "$group": {
|
||||
// _id: null,
|
||||
// count: {
|
||||
// $sum: 1
|
||||
// },
|
||||
// dates: {
|
||||
// $push: "$dates"
|
||||
// },
|
||||
// original: {
|
||||
// $push: "$original"
|
||||
// },
|
||||
//
|
||||
// }
|
||||
// },
|
||||
// {
|
||||
// "$unwind": "$original"
|
||||
// },
|
||||
// {
|
||||
// "$unwind": "$original"
|
||||
// },
|
||||
// {
|
||||
// "$group": {
|
||||
// _id: "$original.result.user_id",
|
||||
// count: {
|
||||
// $sum: 1
|
||||
// },
|
||||
// original: {
|
||||
// $push: "$dates"
|
||||
// },
|
||||
//
|
||||
// }
|
||||
// },
|
||||
// {
|
||||
// "$addFields": {
|
||||
// "dates": {
|
||||
// $arrayElemAt: ["$original", 0]
|
||||
// }
|
||||
// }
|
||||
// },
|
||||
// {
|
||||
// "$project": {
|
||||
// original: 0
|
||||
// }
|
||||
// },
|
||||
// {
|
||||
// $sort: {
|
||||
// count: - 1
|
||||
// }
|
||||
// },
|
||||
// {
|
||||
// "$group": {
|
||||
// _id: null,
|
||||
// user_count: {
|
||||
// $sum: 1
|
||||
// },
|
||||
// users: {
|
||||
// $push: "$$ROOT"
|
||||
// },
|
||||
//
|
||||
// }
|
||||
// },
|
||||
// {
|
||||
// "$addFields": {
|
||||
// "dates": {
|
||||
// $arrayElemAt: ["$users", 0]
|
||||
// }
|
||||
// }
|
||||
// },
|
||||
// {
|
||||
// "$addFields": {
|
||||
// "dates": "$dates.dates"
|
||||
// }
|
||||
// },
|
||||
// {
|
||||
// "$project": {
|
||||
// _id: 0,
|
||||
// "users.dates": 0,
|
||||
//
|
||||
// }
|
||||
// },
|
||||
// {
|
||||
// "$addFields": {
|
||||
// "msg_count": {
|
||||
// $sum: "$users.count"
|
||||
// }
|
||||
// }
|
||||
// },
|
||||
// {
|
||||
// "$addFields": {
|
||||
// users: {
|
||||
// $slice: ["$users", 0, 10]
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// ]).
|
||||
func (m *MsgMongoDriver) RangeUserSendCount(
|
||||
ctx context.Context,
|
||||
start time.Time,
|
||||
end time.Time,
|
||||
group bool,
|
||||
ase bool,
|
||||
pageNumber int32,
|
||||
showNumber int32,
|
||||
) (msgCount int64, userCount int64, users []*table.UserCount, dateCount map[string]int64, err error) {
|
||||
func (m *MsgMgo) SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (int32, []*relation.MsgInfoModel, error) {
|
||||
var pipe mongo.Pipeline
|
||||
condition := bson.A{}
|
||||
if req.SendTime != "" {
|
||||
// Changed to keyed fields for bson.M to avoid govet errors
|
||||
condition = append(condition, bson.M{"$eq": bson.A{bson.M{"$dateToString": bson.M{"format": "%Y-%m-%d", "date": bson.M{"$toDate": "$$item.msg.send_time"}}}, req.SendTime}})
|
||||
}
|
||||
if req.ContentType != 0 {
|
||||
condition = append(condition, bson.M{"$eq": bson.A{"$$item.msg.content_type", req.ContentType}})
|
||||
}
|
||||
if req.SessionType != 0 {
|
||||
condition = append(condition, bson.M{"$eq": bson.A{"$$item.msg.session_type", req.SessionType}})
|
||||
}
|
||||
if req.RecvID != "" {
|
||||
condition = append(condition, bson.M{"$regexFind": bson.M{"input": "$$item.msg.recv_id", "regex": req.RecvID}})
|
||||
}
|
||||
if req.SendID != "" {
|
||||
condition = append(condition, bson.M{"$regexFind": bson.M{"input": "$$item.msg.send_id", "regex": req.SendID}})
|
||||
}
|
||||
|
||||
or := bson.A{
|
||||
bson.M{"doc_id": bson.M{"$regex": "^si_", "$options": "i"}},
|
||||
bson.M{"doc_id": bson.M{"$regex": "^g_", "$options": "i"}},
|
||||
bson.M{"doc_id": bson.M{"$regex": "^sg_", "$options": "i"}},
|
||||
}
|
||||
|
||||
// Use bson.D with keyed fields to specify the order explicitly
|
||||
pipe = mongo.Pipeline{
|
||||
{{"$match", bson.D{{Key: "$or", Value: or}}}},
|
||||
{{"$project", bson.D{
|
||||
{Key: "msgs", Value: bson.D{
|
||||
{Key: "$filter", Value: bson.D{
|
||||
{Key: "input", Value: "$msgs"},
|
||||
{Key: "as", Value: "item"},
|
||||
{Key: "cond", Value: bson.D{{Key: "$and", Value: condition}}},
|
||||
}},
|
||||
}},
|
||||
{Key: "doc_id", Value: 1},
|
||||
}}},
|
||||
{{"$unwind", bson.M{"path": "$msgs"}}},
|
||||
{{"$sort", bson.M{"msgs.msg.send_time": -1}}},
|
||||
}
|
||||
type docModel struct {
|
||||
DocID string `bson:"doc_id"`
|
||||
Msg *relation.MsgInfoModel `bson:"msgs"`
|
||||
}
|
||||
msgsDocs, err := mgoutil.Aggregate[*docModel](ctx, m.coll, pipe)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
msgs := make([]*relation.MsgInfoModel, 0)
|
||||
for _, doc := range msgsDocs {
|
||||
msgInfo := doc.Msg
|
||||
if msgInfo == nil || msgInfo.Msg == nil {
|
||||
continue
|
||||
}
|
||||
if msgInfo.Revoke != nil {
|
||||
revokeContent := sdkws.MessageRevokedContent{
|
||||
RevokerID: msgInfo.Revoke.UserID,
|
||||
RevokerRole: msgInfo.Revoke.Role,
|
||||
ClientMsgID: msgInfo.Msg.ClientMsgID,
|
||||
RevokerNickname: msgInfo.Revoke.Nickname,
|
||||
RevokeTime: msgInfo.Revoke.Time,
|
||||
SourceMessageSendTime: msgInfo.Msg.SendTime,
|
||||
SourceMessageSendID: msgInfo.Msg.SendID,
|
||||
SourceMessageSenderNickname: msgInfo.Msg.SenderNickname,
|
||||
SessionType: msgInfo.Msg.SessionType,
|
||||
Seq: msgInfo.Msg.Seq,
|
||||
Ex: msgInfo.Msg.Ex,
|
||||
}
|
||||
data, err := utils.JsonMarshal(&revokeContent)
|
||||
if err != nil {
|
||||
return 0, nil, errs.WrapMsg(err, "json.Marshal revokeContent")
|
||||
}
|
||||
elem := sdkws.NotificationElem{Detail: string(data)}
|
||||
content, err := utils.JsonMarshal(&elem)
|
||||
if err != nil {
|
||||
return 0, nil, errs.WrapMsg(err, "json.Marshal elem")
|
||||
}
|
||||
msgInfo.Msg.ContentType = constant.MsgRevokeNotification
|
||||
msgInfo.Msg.Content = string(content)
|
||||
}
|
||||
msgs = append(msgs, msgInfo)
|
||||
}
|
||||
start := (req.Pagination.PageNumber - 1) * req.Pagination.ShowNumber
|
||||
n := int32(len(msgs))
|
||||
if start >= n {
|
||||
return n, []*relation.MsgInfoModel{}, nil
|
||||
}
|
||||
if start+req.Pagination.ShowNumber < n {
|
||||
msgs = msgs[start : start+req.Pagination.ShowNumber]
|
||||
} else {
|
||||
msgs = msgs[start:]
|
||||
}
|
||||
return n, msgs, nil
|
||||
}
|
||||
|
||||
func (m *MsgMgo) RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*relation.UserCount, dateCount map[string]int64, err error) {
|
||||
var sort int
|
||||
if ase {
|
||||
sort = 1
|
||||
@ -773,21 +592,16 @@ func (m *MsgMongoDriver) RangeUserSendCount(
|
||||
},
|
||||
},
|
||||
}
|
||||
cur, err := m.MsgCollection.Aggregate(ctx, pipeline, options.Aggregate().SetAllowDiskUse(true))
|
||||
result, err := mgoutil.Aggregate[*Result](ctx, m.coll, pipeline, options.Aggregate().SetAllowDiskUse(true))
|
||||
if err != nil {
|
||||
return 0, 0, nil, nil, errs.Wrap(err)
|
||||
}
|
||||
defer cur.Close(ctx)
|
||||
var result []Result
|
||||
if err = cur.All(ctx, &result); err != nil {
|
||||
return 0, 0, nil, nil, errs.Wrap(err)
|
||||
return 0, 0, nil, nil, err
|
||||
}
|
||||
if len(result) == 0 {
|
||||
return 0, 0, nil, nil, errs.Wrap(err)
|
||||
}
|
||||
users = make([]*table.UserCount, len(result[0].Users))
|
||||
users = make([]*relation.UserCount, len(result[0].Users))
|
||||
for i, r := range result[0].Users {
|
||||
users[i] = &table.UserCount{
|
||||
users[i] = &relation.UserCount{
|
||||
UserID: r.UserID,
|
||||
Count: r.Count,
|
||||
}
|
||||
@ -799,14 +613,7 @@ func (m *MsgMongoDriver) RangeUserSendCount(
|
||||
return result[0].MsgCount, result[0].UserCount, users, dateCount, nil
|
||||
}
|
||||
|
||||
func (m *MsgMongoDriver) RangeGroupSendCount(
|
||||
ctx context.Context,
|
||||
start time.Time,
|
||||
end time.Time,
|
||||
ase bool,
|
||||
pageNumber int32,
|
||||
showNumber int32,
|
||||
) (msgCount int64, userCount int64, groups []*table.GroupCount, dateCount map[string]int64, err error) {
|
||||
func (m *MsgMgo) RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*relation.GroupCount, dateCount map[string]int64, err error) {
|
||||
var sort int
|
||||
if ase {
|
||||
sort = 1
|
||||
@ -1022,21 +829,16 @@ func (m *MsgMongoDriver) RangeGroupSendCount(
|
||||
},
|
||||
},
|
||||
}
|
||||
cur, err := m.MsgCollection.Aggregate(ctx, pipeline, options.Aggregate().SetAllowDiskUse(true))
|
||||
result, err := mgoutil.Aggregate[*Result](ctx, m.coll, pipeline, options.Aggregate().SetAllowDiskUse(true))
|
||||
if err != nil {
|
||||
return 0, 0, nil, nil, errs.Wrap(err)
|
||||
}
|
||||
defer cur.Close(ctx)
|
||||
var result []Result
|
||||
if err = cur.All(ctx, &result); err != nil {
|
||||
return 0, 0, nil, nil, errs.Wrap(err)
|
||||
return 0, 0, nil, nil, err
|
||||
}
|
||||
if len(result) == 0 {
|
||||
return 0, 0, nil, nil, errs.Wrap(err)
|
||||
}
|
||||
groups = make([]*table.GroupCount, len(result[0].Groups))
|
||||
groups = make([]*relation.GroupCount, len(result[0].Groups))
|
||||
for i, r := range result[0].Groups {
|
||||
groups[i] = &table.GroupCount{
|
||||
groups[i] = &relation.GroupCount{
|
||||
GroupID: r.GroupID,
|
||||
Count: r.Count,
|
||||
}
|
||||
@ -1048,113 +850,51 @@ func (m *MsgMongoDriver) RangeGroupSendCount(
|
||||
return result[0].MsgCount, result[0].UserCount, groups, dateCount, nil
|
||||
}
|
||||
|
||||
func (m *MsgMongoDriver) SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (int32, []*table.MsgInfoModel, error) {
|
||||
total, msgs, err := m.searchMessage(ctx, req)
|
||||
func (m *MsgMgo) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) {
|
||||
for _, conversationID := range conversationIDs {
|
||||
regex := primitive.Regex{Pattern: fmt.Sprintf("^%s:", conversationID)}
|
||||
msgDocs, err := mgoutil.Find[*relation.MsgDocModel](ctx, m.coll, bson.M{"doc_id": regex})
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
return total, msgs, nil
|
||||
}
|
||||
|
||||
func (m *MsgMongoDriver) searchMessage(ctx context.Context, req *msg.SearchMessageReq) (int32, []*table.MsgInfoModel, error) {
|
||||
var pipe mongo.Pipeline
|
||||
condition := bson.A{}
|
||||
if req.SendTime != "" {
|
||||
// Changed to keyed fields for bson.M to avoid govet errors
|
||||
condition = append(condition, bson.M{"$eq": bson.A{bson.M{"$dateToString": bson.M{"format": "%Y-%m-%d", "date": bson.M{"$toDate": "$$item.msg.send_time"}}}, req.SendTime}})
|
||||
}
|
||||
if req.ContentType != 0 {
|
||||
condition = append(condition, bson.M{"$eq": bson.A{"$$item.msg.content_type", req.ContentType}})
|
||||
}
|
||||
if req.SessionType != 0 {
|
||||
condition = append(condition, bson.M{"$eq": bson.A{"$$item.msg.session_type", req.SessionType}})
|
||||
}
|
||||
if req.RecvID != "" {
|
||||
condition = append(condition, bson.M{"$regexFind": bson.M{"input": "$$item.msg.recv_id", "regex": req.RecvID}})
|
||||
}
|
||||
if req.SendID != "" {
|
||||
condition = append(condition, bson.M{"$regexFind": bson.M{"input": "$$item.msg.send_id", "regex": req.SendID}})
|
||||
}
|
||||
|
||||
or := bson.A{
|
||||
bson.M{"doc_id": bson.M{"$regex": "^si_", "$options": "i"}},
|
||||
bson.M{"doc_id": bson.M{"$regex": "^g_", "$options": "i"}},
|
||||
bson.M{"doc_id": bson.M{"$regex": "^sg_", "$options": "i"}},
|
||||
}
|
||||
|
||||
// Use bson.D with keyed fields to specify the order explicitly
|
||||
pipe = mongo.Pipeline{
|
||||
{{"$match", bson.D{{Key: "$or", Value: or}}}},
|
||||
{{"$project", bson.D{
|
||||
{Key: "msgs", Value: bson.D{
|
||||
{Key: "$filter", Value: bson.D{
|
||||
{Key: "input", Value: "$msgs"},
|
||||
{Key: "as", Value: "item"},
|
||||
{Key: "cond", Value: bson.D{{Key: "$and", Value: condition}}},
|
||||
}},
|
||||
}},
|
||||
{Key: "doc_id", Value: 1},
|
||||
}}},
|
||||
{{"$unwind", bson.M{"path": "$msgs"}}},
|
||||
{{"$sort", bson.M{"msgs.msg.send_time": -1}}},
|
||||
}
|
||||
cursor, err := m.MsgCollection.Aggregate(ctx, pipe)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
type docModel struct {
|
||||
DocID string `bson:"doc_id"`
|
||||
Msg *table.MsgInfoModel `bson:"msgs"`
|
||||
}
|
||||
var msgsDocs []docModel
|
||||
err = cursor.All(ctx, &msgsDocs)
|
||||
if err != nil {
|
||||
return 0, nil, errs.WrapMsg(err, "cursor.All msgsDocs")
|
||||
}
|
||||
log.ZDebug(ctx, "query mongoDB", "result", msgsDocs)
|
||||
msgs := make([]*table.MsgInfoModel, 0)
|
||||
for _, doc := range msgsDocs {
|
||||
msgInfo := doc.Msg
|
||||
if msgInfo == nil || msgInfo.Msg == nil {
|
||||
log.ZError(ctx, "convertAll find msg doc failed", err, "conversationID", conversationID)
|
||||
continue
|
||||
}
|
||||
if msgInfo.Revoke != nil {
|
||||
revokeContent := sdkws.MessageRevokedContent{
|
||||
RevokerID: msgInfo.Revoke.UserID,
|
||||
RevokerRole: msgInfo.Revoke.Role,
|
||||
ClientMsgID: msgInfo.Msg.ClientMsgID,
|
||||
RevokerNickname: msgInfo.Revoke.Nickname,
|
||||
RevokeTime: msgInfo.Revoke.Time,
|
||||
SourceMessageSendTime: msgInfo.Msg.SendTime,
|
||||
SourceMessageSendID: msgInfo.Msg.SendID,
|
||||
SourceMessageSenderNickname: msgInfo.Msg.SenderNickname,
|
||||
SessionType: msgInfo.Msg.SessionType,
|
||||
Seq: msgInfo.Msg.Seq,
|
||||
Ex: msgInfo.Msg.Ex,
|
||||
if len(msgDocs) < 1 {
|
||||
continue
|
||||
}
|
||||
data, err := json.Marshal(&revokeContent)
|
||||
if err != nil {
|
||||
return 0, nil, errs.WrapMsg(err, "json.Marshal revokeContent")
|
||||
log.ZInfo(ctx, "msg doc convert", "conversationID", conversationID, "len(msgDocs)", len(msgDocs))
|
||||
if len(msgDocs[0].Msg) == int(m.model.GetSingleGocMsgNum5000()) {
|
||||
if err := mgoutil.DeleteMany(ctx, m.coll, bson.M{"doc_id": regex}); err != nil {
|
||||
log.ZError(ctx, "convertAll delete many failed", err, "conversationID", conversationID)
|
||||
continue
|
||||
}
|
||||
elem := sdkws.NotificationElem{Detail: string(data)}
|
||||
content, err := json.Marshal(&elem)
|
||||
if err != nil {
|
||||
return 0, nil, errs.WrapMsg(err, "json.Marshal elem")
|
||||
var newMsgDocs []any
|
||||
for _, msgDoc := range msgDocs {
|
||||
if int64(len(msgDoc.Msg)) == m.model.GetSingleGocMsgNum() {
|
||||
continue
|
||||
}
|
||||
msgInfo.Msg.ContentType = constant.MsgRevokeNotification
|
||||
msgInfo.Msg.Content = string(content)
|
||||
}
|
||||
msgs = append(msgs, msgInfo)
|
||||
}
|
||||
start := (req.Pagination.PageNumber - 1) * req.Pagination.ShowNumber
|
||||
n := int32(len(msgs))
|
||||
if start >= n {
|
||||
return n, []*table.MsgInfoModel{}, nil
|
||||
}
|
||||
if start+req.Pagination.ShowNumber < n {
|
||||
msgs = msgs[start : start+req.Pagination.ShowNumber]
|
||||
var index int64
|
||||
for index < int64(len(msgDoc.Msg)) {
|
||||
msg := msgDoc.Msg[index]
|
||||
if msg != nil && msg.Msg != nil {
|
||||
msgDocModel := relation.MsgDocModel{DocID: m.model.GetDocID(conversationID, msg.Msg.Seq)}
|
||||
end := index + m.model.GetSingleGocMsgNum()
|
||||
if int(end) >= len(msgDoc.Msg) {
|
||||
msgDocModel.Msg = msgDoc.Msg[index:]
|
||||
} else {
|
||||
msgs = msgs[start:]
|
||||
msgDocModel.Msg = msgDoc.Msg[index:end]
|
||||
}
|
||||
newMsgDocs = append(newMsgDocs, msgDocModel)
|
||||
index = end
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
if err = mgoutil.InsertMany(ctx, m.coll, newMsgDocs); err != nil {
|
||||
log.ZError(ctx, "convertAll insert many failed", err, "conversationID", conversationID, "len(newMsgDocs)", len(newMsgDocs))
|
||||
} else {
|
||||
log.ZInfo(ctx, "msg doc convert", "conversationID", conversationID, "len(newMsgDocs)", len(newMsgDocs))
|
||||
}
|
||||
}
|
||||
}
|
||||
return n, msgs, nil
|
||||
}
|
||||
@ -12,12 +12,12 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package unrelation
|
||||
package mgo
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation"
|
||||
"github.com/openimsdk/tools/errs"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
@ -35,9 +35,9 @@ const (
|
||||
MaximumSubscription = 3000
|
||||
)
|
||||
|
||||
func NewUserMongoDriver(database *mongo.Database) unrelation.UserModelInterface {
|
||||
func NewUserMongoDriver(database *mongo.Database) relation.SubscribeUserModelInterface {
|
||||
return &UserMongoDriver{
|
||||
userCollection: database.Collection(unrelation.SubscribeUser),
|
||||
userCollection: database.Collection(relation.SubscribeUser),
|
||||
}
|
||||
}
|
||||
|
||||
@ -155,7 +155,7 @@ func (u *UserMongoDriver) RemoveSubscribedListFromUser(ctx context.Context, user
|
||||
|
||||
// GetAllSubscribeList Get all users subscribed by this user.
|
||||
func (u *UserMongoDriver) GetAllSubscribeList(ctx context.Context, userID string) (userIDList []string, err error) {
|
||||
var user unrelation.UserModel
|
||||
var user relation.SubscribeUserModel
|
||||
cursor := u.userCollection.FindOne(
|
||||
ctx,
|
||||
bson.M{"user_id": SubscriptionPrefix + userID})
|
||||
@ -172,7 +172,7 @@ func (u *UserMongoDriver) GetAllSubscribeList(ctx context.Context, userID string
|
||||
|
||||
// GetSubscribedList Get the user subscribed by those users.
|
||||
func (u *UserMongoDriver) GetSubscribedList(ctx context.Context, userID string) (userIDList []string, err error) {
|
||||
var user unrelation.UserModel
|
||||
var user relation.SubscribeUserModel
|
||||
cursor := u.userCollection.FindOne(
|
||||
ctx,
|
||||
bson.M{"user_id": SubscribedPrefix + userID})
|
||||
@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package unrelation
|
||||
package relation
|
||||
|
||||
import (
|
||||
"context"
|
||||
@ -110,23 +110,8 @@ type MsgDocModelInterface interface {
|
||||
DeleteMsgsInOneDocByIndex(ctx context.Context, docID string, indexes []int) error
|
||||
MarkSingleChatMsgsAsRead(ctx context.Context, userID string, docID string, indexes []int64) error
|
||||
SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (int32, []*MsgInfoModel, error)
|
||||
RangeUserSendCount(
|
||||
ctx context.Context,
|
||||
start time.Time,
|
||||
end time.Time,
|
||||
group bool,
|
||||
ase bool,
|
||||
pageNumber int32,
|
||||
showNumber int32,
|
||||
) (msgCount int64, userCount int64, users []*UserCount, dateCount map[string]int64, err error)
|
||||
RangeGroupSendCount(
|
||||
ctx context.Context,
|
||||
start time.Time,
|
||||
end time.Time,
|
||||
ase bool,
|
||||
pageNumber int32,
|
||||
showNumber int32,
|
||||
) (msgCount int64, userCount int64, groups []*GroupCount, dateCount map[string]int64, err error)
|
||||
RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*UserCount, dateCount map[string]int64, err error)
|
||||
RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*GroupCount, dateCount map[string]int64, err error)
|
||||
ConvertMsgsDocLen(ctx context.Context, conversationIDs []string)
|
||||
}
|
||||
|
||||
@ -165,11 +150,11 @@ func (m MsgDocModel) GetDocIDSeqsMap(conversationID string, seqs []int64) map[st
|
||||
return t
|
||||
}
|
||||
|
||||
func (m MsgDocModel) GetMsgIndex(seq int64) int64 {
|
||||
func (MsgDocModel) GetMsgIndex(seq int64) int64 {
|
||||
return (seq - 1) % singleGocMsgNum
|
||||
}
|
||||
|
||||
func (m MsgDocModel) indexGen(conversationID string, seqSuffix int64) string {
|
||||
func (MsgDocModel) indexGen(conversationID string, seqSuffix int64) string {
|
||||
return conversationID + ":" + strconv.FormatInt(seqSuffix, 10)
|
||||
}
|
||||
|
||||
@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package unrelation
|
||||
package relation
|
||||
|
||||
import "context"
|
||||
|
||||
@ -21,18 +21,18 @@ const (
|
||||
SubscribeUser = "subscribe_user"
|
||||
)
|
||||
|
||||
// UserModel collection structure.
|
||||
type UserModel struct {
|
||||
// SubscribeUserModel collection structure.
|
||||
type SubscribeUserModel struct {
|
||||
UserID string `bson:"user_id" json:"userID"`
|
||||
UserIDList []string `bson:"user_id_list" json:"userIDList"`
|
||||
}
|
||||
|
||||
func (UserModel) TableName() string {
|
||||
func (SubscribeUserModel) TableName() string {
|
||||
return SubscribeUser
|
||||
}
|
||||
|
||||
// UserModelInterface Operation interface of user mongodb.
|
||||
type UserModelInterface interface {
|
||||
// SubscribeUserModelInterface Operation interface of user mongodb.
|
||||
type SubscribeUserModelInterface interface {
|
||||
// AddSubscriptionList Subscriber's handling of thresholds.
|
||||
AddSubscriptionList(ctx context.Context, userID string, userIDList []string) error
|
||||
// UnsubscriptionList Handling of unsubscribe.
|
||||
@ -1,20 +0,0 @@
|
||||
// Copyright © 2023 OpenIM. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package unrelation
|
||||
|
||||
type CommonUserModel struct {
|
||||
UserID string `bson:"user_id"`
|
||||
UserName string `bson:"user_name"`
|
||||
}
|
||||
@ -1,15 +0,0 @@
|
||||
// Copyright © 2024 OpenIM. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package unrelation // import "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation"
|
||||
@ -1,53 +0,0 @@
|
||||
// Copyright © 2023 OpenIM. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package unrelation
|
||||
|
||||
// import (
|
||||
// "context"
|
||||
//)
|
||||
//
|
||||
// const (
|
||||
// CSuperGroup = "super_group"
|
||||
// CUserToSuperGroup = "user_to_super_group"
|
||||
//)
|
||||
//
|
||||
// type SuperGroupModel struct {
|
||||
// GroupID string `bson:"group_id" json:"groupID"`
|
||||
// MemberIDs []string `bson:"member_id_list" json:"memberIDList"`
|
||||
//}
|
||||
//
|
||||
// func (SuperGroupModel) TableName() string {
|
||||
// return CSuperGroup
|
||||
//}
|
||||
//
|
||||
//type UserToSuperGroupModel struct {
|
||||
// UserID string `bson:"user_id" json:"userID"`
|
||||
// GroupIDs []string `bson:"group_id_list" json:"groupIDList"`
|
||||
//}
|
||||
//
|
||||
//func (UserToSuperGroupModel) TableName() string {
|
||||
// return CUserToSuperGroup
|
||||
//}
|
||||
//
|
||||
//type SuperGroupModelInterface interface {
|
||||
// CreateSuperGroup(ctx context.Context, groupID string, initMemberIDs []string) error
|
||||
// TakeSuperGroup(ctx context.Context, groupID string) (group *SuperGroupModel, err error)
|
||||
// FindSuperGroup(ctx context.Context, groupIDs []string) (groups []*SuperGroupModel, err error)
|
||||
// AddUserToSuperGroup(ctx context.Context, groupID string, userIDs []string) error
|
||||
// RemoverUserFromSuperGroup(ctx context.Context, groupID string, userIDs []string) error
|
||||
// GetSuperGroupByUserID(ctx context.Context, userID string) (*UserToSuperGroupModel, error)
|
||||
// DeleteSuperGroup(ctx context.Context, groupID string) error
|
||||
// RemoveGroupFromUser(ctx context.Context, groupID string, userIDs []string) error
|
||||
//}
|
||||
@ -1,15 +0,0 @@
|
||||
// Copyright © 2024 OpenIM. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package unrelation // import "github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation"
|
||||
@ -23,10 +23,8 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation"
|
||||
"github.com/openimsdk/tools/errs"
|
||||
"github.com/openimsdk/tools/mw/specialerror"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
)
|
||||
@ -127,44 +125,3 @@ func (m *Mongo) GetClient() *mongo.Client {
|
||||
func (m *Mongo) GetDatabase(database string) *mongo.Database {
|
||||
return m.db.Database(database)
|
||||
}
|
||||
|
||||
// CreateMsgIndex creates an index for messages in MongoDB.
|
||||
func (m *Mongo) CreateMsgIndex() error {
|
||||
return m.createMongoIndex(unrelation.Msg, true, "doc_id")
|
||||
}
|
||||
|
||||
// createMongoIndex creates an index in a MongoDB collection.
|
||||
func (m *Mongo) createMongoIndex(collection string, isUnique bool, keys ...string) error {
|
||||
db := m.GetDatabase(m.mongoConf.Database).Collection(collection)
|
||||
opts := options.CreateIndexes().SetMaxTime(10 * time.Second)
|
||||
indexView := db.Indexes()
|
||||
|
||||
keysDoc := buildIndexKeys(keys)
|
||||
|
||||
index := mongo.IndexModel{
|
||||
Keys: keysDoc,
|
||||
}
|
||||
if isUnique {
|
||||
index.Options = options.Index().SetUnique(true)
|
||||
}
|
||||
|
||||
_, err := indexView.CreateOne(context.Background(), index, opts)
|
||||
if err != nil {
|
||||
return errs.WrapMsg(err, "CreateIndex")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// buildIndexKeys builds the BSON document for index keys.
|
||||
func buildIndexKeys(keys []string) bson.D {
|
||||
keysDoc := bson.D{}
|
||||
for _, key := range keys {
|
||||
direction := 1 // default direction is ascending
|
||||
if strings.HasPrefix(key, "-") {
|
||||
direction = -1 // descending order for prefixed with "-"
|
||||
key = strings.TrimLeft(key, "-")
|
||||
}
|
||||
keysDoc = append(keysDoc, bson.E{Key: key, Value: direction})
|
||||
}
|
||||
return keysDoc
|
||||
}
|
||||
|
||||
@ -1,81 +0,0 @@
|
||||
// Copyright © 2023 OpenIM. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package unrelation
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
table "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation"
|
||||
"github.com/openimsdk/tools/log"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
)
|
||||
|
||||
func (m *MsgMongoDriver) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) {
|
||||
for _, conversationID := range conversationIDs {
|
||||
regex := primitive.Regex{Pattern: fmt.Sprintf("^%s:", conversationID)}
|
||||
cursor, err := m.MsgCollection.Find(ctx, bson.M{"doc_id": regex})
|
||||
if err != nil {
|
||||
log.ZError(ctx, "convertAll find msg doc failed", err, "conversationID", conversationID)
|
||||
continue
|
||||
}
|
||||
var msgDocs []table.MsgDocModel
|
||||
err = cursor.All(ctx, &msgDocs)
|
||||
if err != nil {
|
||||
log.ZError(ctx, "convertAll cursor all failed", err, "conversationID", conversationID)
|
||||
continue
|
||||
}
|
||||
if len(msgDocs) < 1 {
|
||||
continue
|
||||
}
|
||||
log.ZInfo(ctx, "msg doc convert", "conversationID", conversationID, "len(msgDocs)", len(msgDocs))
|
||||
if len(msgDocs[0].Msg) == int(m.model.GetSingleGocMsgNum5000()) {
|
||||
if _, err := m.MsgCollection.DeleteMany(ctx, bson.M{"doc_id": regex}); err != nil {
|
||||
log.ZError(ctx, "convertAll delete many failed", err, "conversationID", conversationID)
|
||||
continue
|
||||
}
|
||||
var newMsgDocs []any
|
||||
for _, msgDoc := range msgDocs {
|
||||
if int64(len(msgDoc.Msg)) == m.model.GetSingleGocMsgNum() {
|
||||
continue
|
||||
}
|
||||
var index int64
|
||||
for index < int64(len(msgDoc.Msg)) {
|
||||
msg := msgDoc.Msg[index]
|
||||
if msg != nil && msg.Msg != nil {
|
||||
msgDocModel := table.MsgDocModel{DocID: m.model.GetDocID(conversationID, msg.Msg.Seq)}
|
||||
end := index + m.model.GetSingleGocMsgNum()
|
||||
if int(end) >= len(msgDoc.Msg) {
|
||||
msgDocModel.Msg = msgDoc.Msg[index:]
|
||||
} else {
|
||||
msgDocModel.Msg = msgDoc.Msg[index:end]
|
||||
}
|
||||
newMsgDocs = append(newMsgDocs, msgDocModel)
|
||||
index = end
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
_, err = m.MsgCollection.InsertMany(ctx, newMsgDocs)
|
||||
if err != nil {
|
||||
log.ZError(ctx, "convertAll insert many failed", err, "conversationID", conversationID, "len(newMsgDocs)", len(newMsgDocs))
|
||||
} else {
|
||||
log.ZInfo(ctx, "msg doc convert", "conversationID", conversationID, "len(newMsgDocs)", len(newMsgDocs))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1,163 +0,0 @@
|
||||
// Copyright © 2023 OpenIM. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package unrelation
|
||||
|
||||
//
|
||||
// import (
|
||||
// "context"
|
||||
//
|
||||
// "go.mongodb.org/mongo-driver/bson"
|
||||
// "go.mongodb.org/mongo-driver/mongo"
|
||||
// "go.mongodb.org/mongo-driver/mongo/options"
|
||||
//
|
||||
// "github.com/openimsdk/tools/utils"
|
||||
//
|
||||
// "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation"
|
||||
//)
|
||||
//
|
||||
// func NewSuperGroupMongoDriver(database *mongo.Database) unrelation.SuperGroupModelInterface {
|
||||
// return &SuperGroupMongoDriver{
|
||||
// superGroupCollection: database.Collection(unrelation.CSuperGroup),
|
||||
// userToSuperGroupCollection: database.Collection(unrelation.CUserToSuperGroup),
|
||||
// }
|
||||
//}
|
||||
//
|
||||
// type SuperGroupMongoDriver struct {
|
||||
// superGroupCollection *mongo.Collection
|
||||
// userToSuperGroupCollection *mongo.Collection
|
||||
//}
|
||||
//
|
||||
// func (s *SuperGroupMongoDriver) CreateSuperGroup(ctx context.Context, groupID string, initMemberIDs []string) error {
|
||||
// _, err := s.superGroupCollection.InsertOne(ctx, &unrelation.SuperGroupModel{
|
||||
// GroupID: groupID,
|
||||
// MemberIDs: initMemberIDs,
|
||||
// })
|
||||
// if err != nil {
|
||||
// return err
|
||||
// }
|
||||
// for _, userID := range initMemberIDs {
|
||||
// _, err = s.userToSuperGroupCollection.UpdateOne(
|
||||
// ctx,
|
||||
// bson.M{"user_id": userID},
|
||||
// bson.M{"$addToSet": bson.M{"group_id_list": groupID}},
|
||||
// &options.UpdateOptions{
|
||||
// Upsert: utils.ToPtr(true),
|
||||
// },
|
||||
// )
|
||||
// if err != nil {
|
||||
// return err
|
||||
// }
|
||||
// }
|
||||
// return nil
|
||||
//}
|
||||
//
|
||||
//func (s *SuperGroupMongoDriver) TakeSuperGroup(
|
||||
// ctx context.Context,
|
||||
// groupID string,
|
||||
//) (group *unrelation.SuperGroupModel, err error) {
|
||||
// if err := s.superGroupCollection.FindOne(ctx, bson.M{"group_id": groupID}).Decode(&group); err != nil {
|
||||
// return nil, utils.Wrap(err, "")
|
||||
// }
|
||||
// return group, nil
|
||||
//}
|
||||
//
|
||||
//func (s *SuperGroupMongoDriver) FindSuperGroup(
|
||||
// ctx context.Context,
|
||||
// groupIDs []string,
|
||||
//) (groups []*unrelation.SuperGroupModel, err error) {
|
||||
// cursor, err := s.superGroupCollection.Find(ctx, bson.M{"group_id": bson.M{
|
||||
// "$in": groupIDs,
|
||||
// }})
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// defer cursor.Close(ctx)
|
||||
// if err := cursor.All(ctx, &groups); err != nil {
|
||||
// return nil, utils.Wrap(err, "")
|
||||
// }
|
||||
// return groups, nil
|
||||
//}
|
||||
//
|
||||
//func (s *SuperGroupMongoDriver) AddUserToSuperGroup(ctx context.Context, groupID string, userIDs []string) error {
|
||||
// _, err := s.superGroupCollection.UpdateOne(
|
||||
// ctx,
|
||||
// bson.M{"group_id": groupID},
|
||||
// bson.M{"$addToSet": bson.M{"member_id_list": bson.M{"$each": userIDs}}},
|
||||
// )
|
||||
// if err != nil {
|
||||
// return err
|
||||
// }
|
||||
// upsert := true
|
||||
// opts := &options.UpdateOptions{
|
||||
// Upsert: &upsert,
|
||||
// }
|
||||
// for _, userID := range userIDs {
|
||||
// _, err = s.userToSuperGroupCollection.UpdateOne(
|
||||
// ctx,
|
||||
// bson.M{"user_id": userID},
|
||||
// bson.M{"$addToSet": bson.M{"group_id_list": groupID}},
|
||||
// opts,
|
||||
// )
|
||||
// if err != nil {
|
||||
// return utils.Wrap(err, "transaction failed")
|
||||
// }
|
||||
// }
|
||||
// return nil
|
||||
//}
|
||||
//
|
||||
//func (s *SuperGroupMongoDriver) RemoverUserFromSuperGroup(ctx context.Context, groupID string, userIDs []string) error {
|
||||
// _, err := s.superGroupCollection.UpdateOne(
|
||||
// ctx,
|
||||
// bson.M{"group_id": groupID},
|
||||
// bson.M{"$pull": bson.M{"member_id_list": bson.M{"$in": userIDs}}},
|
||||
// )
|
||||
// if err != nil {
|
||||
// return err
|
||||
// }
|
||||
// err = s.RemoveGroupFromUser(ctx, groupID, userIDs)
|
||||
// if err != nil {
|
||||
// return err
|
||||
// }
|
||||
// return nil
|
||||
//}
|
||||
//
|
||||
//func (s *SuperGroupMongoDriver) GetSuperGroupByUserID(
|
||||
// ctx context.Context,
|
||||
// userID string,
|
||||
//) (*unrelation.UserToSuperGroupModel, error) {
|
||||
// var user unrelation.UserToSuperGroupModel
|
||||
// err := s.userToSuperGroupCollection.FindOne(ctx, bson.M{"user_id": userID}).Decode(&user)
|
||||
// return &user, utils.Wrap(err, "")
|
||||
//}
|
||||
//
|
||||
//func (s *SuperGroupMongoDriver) DeleteSuperGroup(ctx context.Context, groupID string) error {
|
||||
// group, err := s.TakeSuperGroup(ctx, groupID)
|
||||
// if err != nil {
|
||||
// return err
|
||||
// }
|
||||
// if _, err := s.superGroupCollection.DeleteOne(ctx, bson.M{"group_id": groupID}); err != nil {
|
||||
// return utils.Wrap(err, "")
|
||||
// }
|
||||
// return s.RemoveGroupFromUser(ctx, groupID, group.MemberIDs)
|
||||
//}
|
||||
//
|
||||
//func (s *SuperGroupMongoDriver) RemoveGroupFromUser(ctx context.Context, groupID string, userIDs []string) error {
|
||||
// _, err := s.userToSuperGroupCollection.UpdateOne(
|
||||
// ctx,
|
||||
// bson.M{"user_id": bson.M{"$in": userIDs}},
|
||||
// bson.M{"$pull": bson.M{"group_id_list": groupID}},
|
||||
// )
|
||||
// return utils.Wrap(err, "")
|
||||
//}
|
||||
Loading…
x
Reference in New Issue
Block a user