mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-05 20:11:14 +08:00
fix: user seq, asynchronous friend notification, message search (#2447)
* fix: GroupApplicationAcceptedNotification * fix: GroupApplicationAcceptedNotification * fix: NotificationUserInfoUpdate * cicd: robot automated Change * fix: component * fix: getConversationInfo * feat: cron task * feat: cron task * feat: cron task * feat: cron task * feat: cron task * fix: minio config url recognition error * new mongo * new mongo * new mongo * new mongo * new mongo * new mongo * new mongo * new mongo * friend incr sync * friend incr sync * friend incr sync * friend incr sync * friend incr sync * mage * optimization version log * optimization version log * sync * sync * sync * group sync * sync option * sync option * refactor: replace `friend` package with `realtion`. * refactor: update lastest commit to relation. * sync option * sync option * sync option * sync * sync * go.mod * seq * update: go mod * refactor: change incremental to full * feat: get full friend user ids * feat: api and config * seq * group version * merge * seq * seq * seq * fix: sort by id avoid unstable sort friends. * group * group * group * fix: sort by id avoid unstable sort friends. * fix: sort by id avoid unstable sort friends. * fix: sort by id avoid unstable sort friends. * user version * seq * seq * seq user * user online * implement minio expire delete. * user online * config * fix * fix * implement minio expire delete logic. * online cache * online cache * online cache * online cache * online cache * online cache * online cache * online cache * online cache * online cache * online cache * online cache * feat: implement scheduled delete outdated object in minio. * update gomake version * update gomake version * implement FindExpires pagination. * remove unnesseary incr. * fix uncorrect args call. * online push * online push * online push * resolving conflicts * resolving conflicts * test * api prommetrics * api prommetrics * api prommetrics * api prommetrics * api prommetrics * rpc prommetrics * rpc prommetrics * online status * online status * online status * online status * sub * conversation version incremental * merge seq * merge online * merge online * merge online * merge seq * GetOwnerConversation * fix: change incremental syncer router name. * rockscache batch get * rockscache seq batch get * fix: GetMsgDocModelByIndex bug * update go.mod * update go.mod * merge * feat: prometheus * feat: prometheus * group member sort * sub * sub * fix: seq conversion bug * fix: redis pipe exec * sort version * sort version * sort version * remove old version online subscription * remove old version online subscription * version log index * version log index * batch push * batch push * seq void filling * fix: batchGetMaxSeq * fix: batchGetMaxSeq * cache db error log * 111 * fix bug --------- Co-authored-by: withchao <withchao@users.noreply.github.com> Co-authored-by: Monet Lee <monet_lee@163.com> Co-authored-by: OpenIM-Gordon <46924906+FGadvancer@users.noreply.github.com> Co-authored-by: icey-yu <1186114839@qq.com>
This commit is contained in:
parent
c3c9969f2f
commit
5c52840d67
@ -16,6 +16,7 @@ package friend
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/openimsdk/tools/mq/memamq"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
|
||||
@ -49,6 +50,7 @@ type friendServer struct {
|
||||
RegisterCenter discovery.SvcDiscoveryRegistry
|
||||
config *Config
|
||||
webhookClient *webhook.Client
|
||||
queue *memamq.MemoryQueue
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
@ -118,8 +120,8 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
|
||||
conversationRpcClient: rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation),
|
||||
config: config,
|
||||
webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL),
|
||||
queue: memamq.NewMemoryQueue(128, 1024*8),
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/util/hashutil"
|
||||
"github.com/openimsdk/protocol/sdkws"
|
||||
"github.com/openimsdk/tools/log"
|
||||
"slices"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/internal/rpc/incrversion"
|
||||
@ -17,13 +18,22 @@ func (s *friendServer) NotificationUserInfoUpdate(ctx context.Context, req *rela
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(userIDs) > 0 {
|
||||
friendUserIDs := []string{req.UserID}
|
||||
noCancelCtx := context.WithoutCancel(ctx)
|
||||
err := s.queue.PushCtx(ctx, func() {
|
||||
for _, userID := range userIDs {
|
||||
if err := s.db.OwnerIncrVersion(ctx, userID, []string{req.UserID}, model.VersionStateUpdate); err != nil {
|
||||
return nil, err
|
||||
if err := s.db.OwnerIncrVersion(noCancelCtx, userID, friendUserIDs, model.VersionStateUpdate); err != nil {
|
||||
log.ZError(ctx, "OwnerIncrVersion", err, "userID", userID, "friendUserIDs", friendUserIDs)
|
||||
}
|
||||
}
|
||||
for _, userID := range userIDs {
|
||||
s.notificationSender.FriendInfoUpdatedNotification(ctx, req.UserID, userID)
|
||||
s.notificationSender.FriendInfoUpdatedNotification(noCancelCtx, req.UserID, userID)
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
log.ZError(ctx, "NotificationUserInfoUpdate timeout", err, "userID", req.UserID)
|
||||
}
|
||||
}
|
||||
return &relation.NotificationUserInfoUpdateResp{}, nil
|
||||
}
|
||||
|
@ -111,7 +111,7 @@ func (m *msgServer) GetMaxSeq(ctx context.Context, req *sdkws.GetMaxSeqReq) (*sd
|
||||
|
||||
func (m *msgServer) SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (resp *msg.SearchMessageResp, err error) {
|
||||
var chatLogs []*sdkws.MsgData
|
||||
var total int32
|
||||
var total int64
|
||||
resp = &msg.SearchMessageResp{}
|
||||
if total, chatLogs, err = m.MsgDatabase.SearchMessage(ctx, req); err != nil {
|
||||
return nil, err
|
||||
@ -194,7 +194,7 @@ func (m *msgServer) SearchMessage(ctx context.Context, req *msg.SearchMessageReq
|
||||
}
|
||||
resp.ChatLogs = append(resp.ChatLogs, pbchatLog)
|
||||
}
|
||||
resp.ChatLogsNum = total
|
||||
resp.ChatLogsNum = int32(total)
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
|
2
pkg/common/storage/cache/redis/batch_test.go
vendored
2
pkg/common/storage/cache/redis/batch_test.go
vendored
@ -45,7 +45,7 @@ func TestName(t *testing.T) {
|
||||
}
|
||||
seqUser := NewSeqUserCacheRedis(rdb, mgoSeqUser)
|
||||
|
||||
res, err := seqUser.GetReadSeqs(ctx, "2110910952", []string{"sg_2920732023", "sg_345762580"})
|
||||
res, err := seqUser.GetUserReadSeqs(ctx, "2110910952", []string{"sg_2920732023", "sg_345762580"})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
32
pkg/common/storage/cache/redis/seq_user_test.go
vendored
32
pkg/common/storage/cache/redis/seq_user_test.go
vendored
@ -4,7 +4,10 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
|
||||
mgo2 "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
"log"
|
||||
"strconv"
|
||||
"sync/atomic"
|
||||
@ -77,3 +80,32 @@ func TestRecvOnline(t *testing.T) {
|
||||
fmt.Printf("Received message from channel %s: %s\n", msg.Channel, msg.Payload)
|
||||
}
|
||||
}
|
||||
|
||||
func TestName1(t *testing.T) {
|
||||
opt := &redis.Options{
|
||||
Addr: "172.16.8.48:16379",
|
||||
Password: "openIM123",
|
||||
DB: 0,
|
||||
}
|
||||
rdb := redis.NewClient(opt)
|
||||
|
||||
mgo, err := mongo.Connect(context.Background(),
|
||||
options.Client().
|
||||
ApplyURI("mongodb://openIM:openIM123@172.16.8.48:37017/openim_v3?maxPoolSize=100").
|
||||
SetConnectTimeout(5*time.Second))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
model, err := mgo2.NewSeqUserMongo(mgo.Database("openim_v3"))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
seq := NewSeqUserCacheRedis(rdb, model)
|
||||
|
||||
res, err := seq.GetUserReadSeqs(context.Background(), "2110910952", []string{"sg_345762580", "2000", "3000"})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
t.Log(res)
|
||||
|
||||
}
|
||||
|
@ -84,7 +84,7 @@ type CommonMsgDatabase interface {
|
||||
//GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error)
|
||||
SetSendMsgStatus(ctx context.Context, id string, status int32) error
|
||||
GetSendMsgStatus(ctx context.Context, id string) (int32, error)
|
||||
SearchMessage(ctx context.Context, req *pbmsg.SearchMessageReq) (total int32, msgData []*sdkws.MsgData, err error)
|
||||
SearchMessage(ctx context.Context, req *pbmsg.SearchMessageReq) (total int64, msgData []*sdkws.MsgData, err error)
|
||||
FindOneByDocIDs(ctx context.Context, docIDs []string, seqs map[string]int64) (map[string]*sdkws.MsgData, error)
|
||||
|
||||
// to mq
|
||||
@ -878,7 +878,7 @@ func (db *commonMsgDatabase) RangeGroupSendCount(
|
||||
return db.msgDocDatabase.RangeGroupSendCount(ctx, start, end, ase, pageNumber, showNumber)
|
||||
}
|
||||
|
||||
func (db *commonMsgDatabase) SearchMessage(ctx context.Context, req *pbmsg.SearchMessageReq) (total int32, msgData []*sdkws.MsgData, err error) {
|
||||
func (db *commonMsgDatabase) SearchMessage(ctx context.Context, req *pbmsg.SearchMessageReq) (total int64, msgData []*sdkws.MsgData, err error) {
|
||||
var totalMsgs []*sdkws.MsgData
|
||||
total, msgs, err := db.msgDocDatabase.SearchMessage(ctx, req)
|
||||
if err != nil {
|
||||
|
@ -278,124 +278,409 @@ func (m *MsgMgo) MarkSingleChatMsgsAsRead(ctx context.Context, userID string, do
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MsgMgo) SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (int32, []*model.MsgInfoModel, error) {
|
||||
where := make(bson.A, 0, 6)
|
||||
//func (m *MsgMgo) searchCount(ctx context.Context, filter any) (int64, error) {
|
||||
//
|
||||
// return nil, nil
|
||||
//}
|
||||
|
||||
//func (m *MsgMgo) searchMessage(ctx context.Context, filter any, nextID primitive.ObjectID, content bool, limit int) (int64, []*model.MsgInfoModel, primitive.ObjectID, error) {
|
||||
// var pipeline bson.A
|
||||
// if !nextID.IsZero() {
|
||||
// pipeline = append(pipeline, bson.M{"$match": bson.M{"_id": bson.M{"$gt": nextID}}})
|
||||
// }
|
||||
// pipeline = append(pipeline,
|
||||
// bson.M{"$match": filter},
|
||||
// bson.M{"$limit": limit},
|
||||
// bson.M{"$unwind": "$msgs"},
|
||||
// bson.M{"$match": filter},
|
||||
// bson.M{
|
||||
// "$group": bson.M{
|
||||
// "_id": "$_id",
|
||||
// "doc_id": bson.M{
|
||||
// "$first": "$doc_id",
|
||||
// },
|
||||
// "msgs": bson.M{"$push": "$msgs"},
|
||||
// },
|
||||
// },
|
||||
// )
|
||||
// if !content {
|
||||
// pipeline = append(pipeline,
|
||||
// bson.M{
|
||||
// "$project": bson.M{
|
||||
// "_id": 1,
|
||||
// "count": bson.M{"$size": "$msgs"},
|
||||
// },
|
||||
// },
|
||||
// )
|
||||
// type result struct {
|
||||
// ID primitive.ObjectID `bson:"_id"`
|
||||
// Count int64 `bson:"count"`
|
||||
// }
|
||||
// res, err := mongoutil.Aggregate[result](ctx, m.coll, pipeline)
|
||||
// if err != nil {
|
||||
// return 0, nil, primitive.ObjectID{}, err
|
||||
// }
|
||||
// if len(res) == 0 {
|
||||
// return 0, nil, primitive.ObjectID{}, nil
|
||||
// }
|
||||
// var count int64
|
||||
// for _, r := range res {
|
||||
// count += r.Count
|
||||
// }
|
||||
// return count, nil, res[len(res)-1].ID, nil
|
||||
// }
|
||||
// type result struct {
|
||||
// ID primitive.ObjectID `bson:"_id"`
|
||||
// Msg []*model.MsgInfoModel `bson:"msgs"`
|
||||
// }
|
||||
// res, err := mongoutil.Aggregate[result](ctx, m.coll, pipeline)
|
||||
// if err != nil {
|
||||
// return 0, nil, primitive.ObjectID{}, err
|
||||
// }
|
||||
// if len(res) == 0 {
|
||||
// return 0, nil, primitive.ObjectID{}, err
|
||||
// }
|
||||
// var count int
|
||||
// for _, r := range res {
|
||||
// count += len(r.Msg)
|
||||
// }
|
||||
// msgs := make([]*model.MsgInfoModel, 0, count)
|
||||
// for _, r := range res {
|
||||
// msgs = append(msgs, r.Msg...)
|
||||
// }
|
||||
// return int64(count), msgs, res[len(res)-1].ID, nil
|
||||
//}
|
||||
|
||||
/*
|
||||
|
||||
db.msg3.aggregate(
|
||||
[
|
||||
{
|
||||
"$match": {
|
||||
"doc_id": "si_7009965934_8710838466:0"
|
||||
},
|
||||
|
||||
}
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
*/
|
||||
|
||||
type searchMessageIndex struct {
|
||||
ID primitive.ObjectID `bson:"_id"`
|
||||
Index []int64 `bson:"index"`
|
||||
}
|
||||
|
||||
func (m *MsgMgo) searchMessageIndex(ctx context.Context, filter any, nextID primitive.ObjectID, limit int) ([]searchMessageIndex, error) {
|
||||
var pipeline bson.A
|
||||
if !nextID.IsZero() {
|
||||
pipeline = append(pipeline, bson.M{"$match": bson.M{"_id": bson.M{"$gt": nextID}}})
|
||||
}
|
||||
pipeline = append(pipeline,
|
||||
bson.M{"$sort": bson.M{"_id": 1}},
|
||||
bson.M{"$match": filter},
|
||||
bson.M{"$limit": limit},
|
||||
bson.M{
|
||||
"$project": bson.M{
|
||||
"_id": 1,
|
||||
"msgs": bson.M{
|
||||
"$map": bson.M{
|
||||
"input": "$msgs",
|
||||
"as": "msg",
|
||||
"in": bson.M{
|
||||
"$mergeObjects": bson.A{
|
||||
"$$msg",
|
||||
bson.M{
|
||||
"_search_temp_index": bson.M{
|
||||
"$indexOfArray": bson.A{
|
||||
"$msgs", "$$msg",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
bson.M{"$unwind": "$msgs"},
|
||||
bson.M{"$match": filter},
|
||||
bson.M{
|
||||
"$project": bson.M{
|
||||
"_id": 1,
|
||||
"msgs._search_temp_index": 1,
|
||||
},
|
||||
},
|
||||
bson.M{
|
||||
"$group": bson.M{
|
||||
"_id": "$_id",
|
||||
"index": bson.M{"$push": "$msgs._search_temp_index"},
|
||||
},
|
||||
},
|
||||
bson.M{"$sort": bson.M{"_id": 1}},
|
||||
)
|
||||
return mongoutil.Aggregate[searchMessageIndex](ctx, m.coll, pipeline)
|
||||
}
|
||||
|
||||
func (m *MsgMgo) searchMessage(ctx context.Context, req *msg.SearchMessageReq) (int64, []searchMessageIndex, error) {
|
||||
filter := bson.M{}
|
||||
if req.RecvID != "" {
|
||||
where = append(where, bson.M{"msgs.msg.recv_id": req.RecvID})
|
||||
filter["$or"] = bson.A{
|
||||
bson.M{"msgs.msg.recv_id": req.RecvID},
|
||||
bson.M{"msgs.msg.group_id": req.RecvID},
|
||||
}
|
||||
}
|
||||
if req.SendID != "" {
|
||||
where = append(where, bson.M{"msgs.msg.send_id": req.SendID})
|
||||
filter["msgs.msg.send_id"] = req.SendID
|
||||
}
|
||||
if req.ContentType != 0 {
|
||||
where = append(where, bson.M{"msgs.msg.content_type": req.ContentType})
|
||||
filter["msgs.msg.content_type"] = req.ContentType
|
||||
}
|
||||
if req.SessionType != 0 {
|
||||
where = append(where, bson.M{"msgs.msg.session_type": req.SessionType})
|
||||
filter["msgs.msg.session_type"] = req.SessionType
|
||||
}
|
||||
if req.SendTime != "" {
|
||||
sendTime, err := time.Parse(time.DateOnly, req.SendTime)
|
||||
if err != nil {
|
||||
return 0, nil, errs.ErrArgs.WrapMsg("invalid sendTime", "req", req.SendTime, "format", time.DateOnly, "cause", err.Error())
|
||||
}
|
||||
where = append(where,
|
||||
bson.M{
|
||||
"msgs.msg.send_time": bson.M{
|
||||
filter["$and"] = bson.A{
|
||||
bson.M{"msgs.msg.send_time": bson.M{
|
||||
"$gte": sendTime.UnixMilli(),
|
||||
},
|
||||
},
|
||||
}},
|
||||
bson.M{
|
||||
"msgs.msg.send_time": bson.M{
|
||||
"$lt": sendTime.Add(time.Hour * 24).UnixMilli(),
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
nextID primitive.ObjectID
|
||||
count int
|
||||
dataRange []searchMessageIndex
|
||||
skip = int((req.Pagination.GetPageNumber() - 1) * req.Pagination.GetShowNumber())
|
||||
)
|
||||
_, _ = dataRange, skip
|
||||
const maxDoc = 50
|
||||
data := make([]searchMessageIndex, 0, req.Pagination.GetShowNumber())
|
||||
push := cap(data)
|
||||
for i := 0; ; i++ {
|
||||
res, err := m.searchMessageIndex(ctx, filter, nextID, maxDoc)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
pipeline := bson.A{
|
||||
bson.M{
|
||||
"$unwind": "$msgs",
|
||||
},
|
||||
if len(res) > 0 {
|
||||
nextID = res[len(res)-1].ID
|
||||
}
|
||||
if len(where) > 0 {
|
||||
pipeline = append(pipeline, bson.M{
|
||||
"$match": bson.M{"$and": where},
|
||||
for _, r := range res {
|
||||
var dataIndex []int64
|
||||
for _, index := range r.Index {
|
||||
if push > 0 && count >= skip {
|
||||
dataIndex = append(dataIndex, index)
|
||||
push--
|
||||
}
|
||||
count++
|
||||
}
|
||||
if len(dataIndex) > 0 {
|
||||
data = append(data, searchMessageIndex{
|
||||
ID: r.ID,
|
||||
Index: dataIndex,
|
||||
})
|
||||
}
|
||||
pipeline = append(pipeline,
|
||||
bson.M{
|
||||
"$project": bson.M{
|
||||
"_id": 0,
|
||||
"msg": "$msgs.msg",
|
||||
},
|
||||
},
|
||||
bson.M{
|
||||
"$count": "count",
|
||||
},
|
||||
)
|
||||
count, err := mongoutil.Aggregate[int32](ctx, m.coll, pipeline)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
if len(count) == 0 || count[0] == 0 {
|
||||
return 0, nil, nil
|
||||
if push <= 0 {
|
||||
push--
|
||||
}
|
||||
if len(res) < maxDoc || push < -10 {
|
||||
return int64(count), data, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *MsgMgo) getDocRange(ctx context.Context, id primitive.ObjectID, index []int64) ([]*model.MsgInfoModel, error) {
|
||||
if len(index) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
pipeline := bson.A{
|
||||
bson.M{"$match": bson.M{"_id": id}},
|
||||
bson.M{"$project": "$msgs"},
|
||||
}
|
||||
pipeline = pipeline[:len(pipeline)-1]
|
||||
pipeline = append(pipeline,
|
||||
bson.M{
|
||||
"$skip": (req.Pagination.GetPageNumber() - 1) * req.Pagination.GetShowNumber(),
|
||||
},
|
||||
bson.M{
|
||||
"$limit": req.Pagination.GetShowNumber(),
|
||||
},
|
||||
)
|
||||
msgs, err := mongoutil.Aggregate[*model.MsgInfoModel](ctx, m.coll, pipeline)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return msgs, nil
|
||||
}
|
||||
|
||||
func (m *MsgMgo) SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (int64, []*model.MsgInfoModel, error) {
|
||||
count, data, err := m.searchMessage(ctx, req)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
for i := range msgs {
|
||||
msgInfo := msgs[i]
|
||||
if msgInfo == nil || msgInfo.Msg == nil {
|
||||
var msgs []*model.MsgInfoModel
|
||||
if len(data) > 0 {
|
||||
var n int
|
||||
for _, d := range data {
|
||||
n += len(d.Index)
|
||||
}
|
||||
msgs = make([]*model.MsgInfoModel, 0, n)
|
||||
}
|
||||
for _, val := range data {
|
||||
res, err := mongoutil.FindOne[*model.MsgDocModel](ctx, m.coll, bson.M{"_id": val.ID})
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
for _, i := range val.Index {
|
||||
if i >= int64(len(res.Msg)) {
|
||||
continue
|
||||
}
|
||||
if msgInfo.Revoke != nil {
|
||||
revokeContent := sdkws.MessageRevokedContent{
|
||||
RevokerID: msgInfo.Revoke.UserID,
|
||||
RevokerRole: msgInfo.Revoke.Role,
|
||||
ClientMsgID: msgInfo.Msg.ClientMsgID,
|
||||
RevokerNickname: msgInfo.Revoke.Nickname,
|
||||
RevokeTime: msgInfo.Revoke.Time,
|
||||
SourceMessageSendTime: msgInfo.Msg.SendTime,
|
||||
SourceMessageSendID: msgInfo.Msg.SendID,
|
||||
SourceMessageSenderNickname: msgInfo.Msg.SenderNickname,
|
||||
SessionType: msgInfo.Msg.SessionType,
|
||||
Seq: msgInfo.Msg.Seq,
|
||||
Ex: msgInfo.Msg.Ex,
|
||||
}
|
||||
data, err := jsonutil.JsonMarshal(&revokeContent)
|
||||
if err != nil {
|
||||
return 0, nil, errs.WrapMsg(err, "json.Marshal revokeContent")
|
||||
}
|
||||
elem := sdkws.NotificationElem{Detail: string(data)}
|
||||
content, err := jsonutil.JsonMarshal(&elem)
|
||||
if err != nil {
|
||||
return 0, nil, errs.WrapMsg(err, "json.Marshal elem")
|
||||
}
|
||||
msgInfo.Msg.ContentType = constant.MsgRevokeNotification
|
||||
msgInfo.Msg.Content = string(content)
|
||||
msgs = append(msgs, res.Msg[i])
|
||||
}
|
||||
}
|
||||
//start := (req.Pagination.PageNumber - 1) * req.Pagination.ShowNumber
|
||||
//n := int32(len(msgs))
|
||||
//if start >= n {
|
||||
// return n, []*relation.MsgInfoModel{}, nil
|
||||
//}
|
||||
//if start+req.Pagination.ShowNumber < n {
|
||||
// msgs = msgs[start : start+req.Pagination.ShowNumber]
|
||||
//} else {
|
||||
// msgs = msgs[start:]
|
||||
//}
|
||||
return count[0], msgs, nil
|
||||
return count, msgs, nil
|
||||
}
|
||||
|
||||
//func (m *MsgMgo) SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (int32, []*model.MsgInfoModel, error) {
|
||||
// where := make(bson.A, 0, 6)
|
||||
// if req.RecvID != "" {
|
||||
// if req.SessionType == constant.ReadGroupChatType {
|
||||
// where = append(where, bson.M{
|
||||
// "$or": bson.A{
|
||||
// bson.M{"doc_id": "^n_" + req.RecvID + ":"},
|
||||
// bson.M{"doc_id": "^sg_" + req.RecvID + ":"},
|
||||
// },
|
||||
// })
|
||||
// } else {
|
||||
// where = append(where, bson.M{"msgs.msg.recv_id": req.RecvID})
|
||||
// }
|
||||
// }
|
||||
// if req.SendID != "" {
|
||||
// where = append(where, bson.M{"msgs.msg.send_id": req.SendID})
|
||||
// }
|
||||
// if req.ContentType != 0 {
|
||||
// where = append(where, bson.M{"msgs.msg.content_type": req.ContentType})
|
||||
// }
|
||||
// if req.SessionType != 0 {
|
||||
// where = append(where, bson.M{"msgs.msg.session_type": req.SessionType})
|
||||
// }
|
||||
// if req.SendTime != "" {
|
||||
// sendTime, err := time.Parse(time.DateOnly, req.SendTime)
|
||||
// if err != nil {
|
||||
// return 0, nil, errs.ErrArgs.WrapMsg("invalid sendTime", "req", req.SendTime, "format", time.DateOnly, "cause", err.Error())
|
||||
// }
|
||||
// where = append(where,
|
||||
// bson.M{
|
||||
// "msgs.msg.send_time": bson.M{
|
||||
// "$gte": sendTime.UnixMilli(),
|
||||
// },
|
||||
// },
|
||||
// bson.M{
|
||||
// "msgs.msg.send_time": bson.M{
|
||||
// "$lt": sendTime.Add(time.Hour * 24).UnixMilli(),
|
||||
// },
|
||||
// },
|
||||
// )
|
||||
// }
|
||||
// opt := options.Find().SetLimit(100)
|
||||
// res, err := mongoutil.Find[model.MsgDocModel](ctx, m.coll, bson.M{"$and": where}, opt)
|
||||
// if err != nil {
|
||||
// return 0, nil, err
|
||||
// }
|
||||
// _ = res
|
||||
// fmt.Println()
|
||||
//
|
||||
// return 0, nil, nil
|
||||
// pipeline := bson.A{
|
||||
// bson.M{
|
||||
// "$unwind": "$msgs",
|
||||
// },
|
||||
// }
|
||||
// if len(where) > 0 {
|
||||
// pipeline = append(pipeline, bson.M{
|
||||
// "$match": bson.M{"$and": where},
|
||||
// })
|
||||
// }
|
||||
// pipeline = append(pipeline,
|
||||
// bson.M{
|
||||
// "$project": bson.M{
|
||||
// "_id": 0,
|
||||
// "msg": "$msgs.msg",
|
||||
// },
|
||||
// },
|
||||
// bson.M{
|
||||
// "$count": "count",
|
||||
// },
|
||||
// )
|
||||
// //count, err := mongoutil.Aggregate[int32](ctx, m.coll, pipeline)
|
||||
// //if err != nil {
|
||||
// // return 0, nil, err
|
||||
// //}
|
||||
// //if len(count) == 0 || count[0] == 0 {
|
||||
// // return 0, nil, nil
|
||||
// //}
|
||||
// count := []int32{0}
|
||||
// pipeline = pipeline[:len(pipeline)-1]
|
||||
// pipeline = append(pipeline,
|
||||
// bson.M{
|
||||
// "$skip": (req.Pagination.GetPageNumber() - 1) * req.Pagination.GetShowNumber(),
|
||||
// },
|
||||
// bson.M{
|
||||
// "$limit": req.Pagination.GetShowNumber(),
|
||||
// },
|
||||
// )
|
||||
// msgs, err := mongoutil.Aggregate[*model.MsgInfoModel](ctx, m.coll, pipeline)
|
||||
// if err != nil {
|
||||
// return 0, nil, err
|
||||
// }
|
||||
// for i := range msgs {
|
||||
// msgInfo := msgs[i]
|
||||
// if msgInfo == nil || msgInfo.Msg == nil {
|
||||
// continue
|
||||
// }
|
||||
// if msgInfo.Revoke != nil {
|
||||
// revokeContent := sdkws.MessageRevokedContent{
|
||||
// RevokerID: msgInfo.Revoke.UserID,
|
||||
// RevokerRole: msgInfo.Revoke.Role,
|
||||
// ClientMsgID: msgInfo.Msg.ClientMsgID,
|
||||
// RevokerNickname: msgInfo.Revoke.Nickname,
|
||||
// RevokeTime: msgInfo.Revoke.Time,
|
||||
// SourceMessageSendTime: msgInfo.Msg.SendTime,
|
||||
// SourceMessageSendID: msgInfo.Msg.SendID,
|
||||
// SourceMessageSenderNickname: msgInfo.Msg.SenderNickname,
|
||||
// SessionType: msgInfo.Msg.SessionType,
|
||||
// Seq: msgInfo.Msg.Seq,
|
||||
// Ex: msgInfo.Msg.Ex,
|
||||
// }
|
||||
// data, err := jsonutil.JsonMarshal(&revokeContent)
|
||||
// if err != nil {
|
||||
// return 0, nil, errs.WrapMsg(err, "json.Marshal revokeContent")
|
||||
// }
|
||||
// elem := sdkws.NotificationElem{Detail: string(data)}
|
||||
// content, err := jsonutil.JsonMarshal(&elem)
|
||||
// if err != nil {
|
||||
// return 0, nil, errs.WrapMsg(err, "json.Marshal elem")
|
||||
// }
|
||||
// msgInfo.Msg.ContentType = constant.MsgRevokeNotification
|
||||
// msgInfo.Msg.Content = string(content)
|
||||
// }
|
||||
// }
|
||||
// //start := (req.Pagination.PageNumber - 1) * req.Pagination.ShowNumber
|
||||
// //n := int32(len(msgs))
|
||||
// //if start >= n {
|
||||
// // return n, []*relation.MsgInfoModel{}, nil
|
||||
// //}
|
||||
// //if start+req.Pagination.ShowNumber < n {
|
||||
// // msgs = msgs[start : start+req.Pagination.ShowNumber]
|
||||
// //} else {
|
||||
// // msgs = msgs[start:]
|
||||
// //}
|
||||
// return count[0], msgs, nil
|
||||
//}
|
||||
|
||||
func (m *MsgMgo) RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*model.UserCount, dateCount map[string]int64, err error) {
|
||||
var sort int
|
||||
if ase {
|
||||
|
75
pkg/common/storage/database/mgo/msg_test.go
Normal file
75
pkg/common/storage/database/mgo/msg_test.go
Normal file
@ -0,0 +1,75 @@
|
||||
package mgo
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||
"github.com/openimsdk/protocol/msg"
|
||||
"github.com/openimsdk/protocol/sdkws"
|
||||
"github.com/openimsdk/tools/db/mongoutil"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
"math/rand"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestName1(t *testing.T) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*300)
|
||||
defer cancel()
|
||||
cli := Result(mongo.Connect(ctx, options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.48:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second)))
|
||||
|
||||
v := &MsgMgo{
|
||||
coll: cli.Database("openim_v3").Collection("msg3"),
|
||||
}
|
||||
|
||||
req := &msg.SearchMessageReq{
|
||||
//RecvID: "3187706596",
|
||||
//SendID: "7009965934",
|
||||
ContentType: 101,
|
||||
//SendTime: "2024-05-06",
|
||||
//SessionType: 3,
|
||||
Pagination: &sdkws.RequestPagination{
|
||||
PageNumber: 1,
|
||||
ShowNumber: 10,
|
||||
},
|
||||
}
|
||||
total, res, err := v.SearchMessage(ctx, req)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
for i, re := range res {
|
||||
t.Logf("%d => %d | %+v", i+1, re.Msg.Seq, re.Msg.Content)
|
||||
}
|
||||
|
||||
t.Log(total)
|
||||
}
|
||||
|
||||
func TestName10(t *testing.T) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
|
||||
defer cancel()
|
||||
cli := Result(mongo.Connect(ctx, options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.48:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second)))
|
||||
|
||||
v := &MsgMgo{
|
||||
coll: cli.Database("openim_v3").Collection("msg3"),
|
||||
}
|
||||
opt := options.Find().SetLimit(1000)
|
||||
|
||||
res, err := mongoutil.Find[model.MsgDocModel](ctx, v.coll, bson.M{}, opt)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
ctx = context.Background()
|
||||
for i := 0; i < 100000; i++ {
|
||||
for j := range res {
|
||||
res[j].DocID = strconv.FormatUint(rand.Uint64(), 10) + ":0"
|
||||
}
|
||||
if err := mongoutil.InsertMany(ctx, v.coll, res); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
t.Log("====>", time.Now(), i)
|
||||
}
|
||||
|
||||
}
|
@ -26,7 +26,7 @@ func Mongodb() *mongo.Database {
|
||||
|
||||
func TestUserSeq(t *testing.T) {
|
||||
uSeq := Result(NewSeqUserMongo(Mongodb())).(*seqUserMongo)
|
||||
t.Log(uSeq.SetMinSeq(context.Background(), "1000", "2000", 4))
|
||||
t.Log(uSeq.SetUserMinSeq(context.Background(), "1000", "2000", 4))
|
||||
}
|
||||
|
||||
func TestConversationSeq(t *testing.T) {
|
||||
@ -35,3 +35,8 @@ func TestConversationSeq(t *testing.T) {
|
||||
t.Log(cSeq.Malloc(context.Background(), "2000", 10))
|
||||
t.Log(cSeq.GetMaxSeq(context.Background(), "2000"))
|
||||
}
|
||||
|
||||
func TestUserGetUserReadSeqs(t *testing.T) {
|
||||
uSeq := Result(NewSeqUserMongo(Mongodb())).(*seqUserMongo)
|
||||
t.Log(uSeq.GetUserReadSeqs(context.Background(), "2110910952", []string{"sg_345762580", "2000", "3000"}))
|
||||
}
|
||||
|
@ -88,6 +88,14 @@ func (s *seqUserMongo) GetUserReadSeq(ctx context.Context, conversationID string
|
||||
return s.getSeq(ctx, conversationID, userID, "read_seq")
|
||||
}
|
||||
|
||||
func (s *seqUserMongo) notFoundSet0(seq map[string]int64, conversationIDs []string) {
|
||||
for _, conversationID := range conversationIDs {
|
||||
if _, ok := seq[conversationID]; !ok {
|
||||
seq[conversationID] = 0
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *seqUserMongo) GetUserReadSeqs(ctx context.Context, userID string, conversationID []string) (map[string]int64, error) {
|
||||
if len(conversationID) == 0 {
|
||||
return map[string]int64{}, nil
|
||||
@ -102,6 +110,7 @@ func (s *seqUserMongo) GetUserReadSeqs(ctx context.Context, userID string, conve
|
||||
for _, seq := range seqs {
|
||||
res[seq.ConversationID] = seq.ReadSeq
|
||||
}
|
||||
s.notFoundSet0(res, conversationID)
|
||||
return res, nil
|
||||
}
|
||||
|
||||
|
@ -37,7 +37,7 @@ type Msg interface {
|
||||
GetMsgDocModelByIndex(ctx context.Context, conversationID string, index, sort int64) (*model.MsgDocModel, error)
|
||||
DeleteMsgsInOneDocByIndex(ctx context.Context, docID string, indexes []int) error
|
||||
MarkSingleChatMsgsAsRead(ctx context.Context, userID string, docID string, indexes []int64) error
|
||||
SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (int32, []*model.MsgInfoModel, error)
|
||||
SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (int64, []*model.MsgInfoModel, error)
|
||||
RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*model.UserCount, dateCount map[string]int64, err error)
|
||||
RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*model.GroupCount, dateCount map[string]int64, err error)
|
||||
ConvertMsgsDocLen(ctx context.Context, conversationIDs []string)
|
||||
|
Loading…
x
Reference in New Issue
Block a user