diff --git a/internal/rpc/msg/clear.go b/internal/rpc/msg/clear.go index 7d62e7c8f..8d4bd0c8e 100644 --- a/internal/rpc/msg/clear.go +++ b/internal/rpc/msg/clear.go @@ -2,6 +2,7 @@ package msg import ( "context" + "strings" "time" "github.com/openimsdk/open-im-server/v3/pkg/authverify" @@ -26,52 +27,42 @@ func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) if req.Timestamp > time.Now().UnixMilli() { return nil, errs.ErrArgs.WrapMsg("request millisecond timestamp error") } - var ( - docNum int - msgNum int - start = time.Now() - getLimit = 5000 - ) - - destructMsg := func(ctx context.Context) (bool, error) { - docIDs, err := m.MsgDatabase.GetDocIDs(ctx) - if err != nil { - return false, err - } - - msgs, err := m.MsgDatabase.GetBeforeMsg(ctx, req.Timestamp, docIDs, getLimit) - if err != nil { - return false, err - } - if len(msgs) == 0 { - return false, nil - } - - for _, msg := range msgs { - index, err := m.MsgDatabase.DeleteDocMsgBefore(ctx, req.Timestamp, msg) - if err != nil { - return false, err - } - if len(index) == 0 { - return false, errs.ErrInternalServer.WrapMsg("delete doc msg failed") - } - - docNum++ - msgNum += len(index) - } - - return true, nil + if req.Limit <= 0 { + return nil, errs.ErrArgs.WrapMsg("request limit error") } - - _, err = destructMsg(ctx) + docs, err := m.MsgDatabase.GetRandBeforeMsg(ctx, req.Timestamp, int(req.Limit)) if err != nil { - log.ZError(ctx, "clear msg failed", err, "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start)) return nil, err } - - log.ZDebug(ctx, "clearing message", "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start)) - - return &msg.DestructMsgsResp{}, nil + for _, doc := range docs { + if err := m.MsgDatabase.DeleteDoc(ctx, doc.DocID); err != nil { + return nil, err + } + index := strings.LastIndex(doc.DocID, ":") + if index < 0 { + continue + } + var minSeq int64 + for _, model := range doc.Msg { + if model.Msg == nil { + continue + } + if model.Msg.Seq > minSeq { + minSeq = model.Msg.Seq + } + } + if minSeq <= 0 { + continue + } + conversationID := doc.DocID[:index] + if conversationID == "" { + continue + } + if err := m.MsgDatabase.SetMinSeq(ctx, conversationID, minSeq); err != nil { + return nil, err + } + } + return &msg.DestructMsgsResp{Count: int32(len(docs))}, nil } // soft delete for user self diff --git a/internal/rpc/third/s3.go b/internal/rpc/third/s3.go index d4146fdf4..a541462a2 100644 --- a/internal/rpc/third/s3.go +++ b/internal/rpc/third/s3.go @@ -19,6 +19,7 @@ import ( "encoding/base64" "encoding/hex" "encoding/json" + "github.com/openimsdk/open-im-server/v3/pkg/authverify" "path" "strconv" "time" @@ -284,10 +285,13 @@ func (t *thirdServer) apiAddress(prefix, name string) string { } func (t *thirdServer) DeleteOutdatedData(ctx context.Context, req *third.DeleteOutdatedDataReq) (*third.DeleteOutdatedDataResp, error) { + if err := authverify.CheckAdmin(ctx, t.config.Share.IMAdminUserID); err != nil { + return nil, err + } engine := t.config.RpcConfig.Object.Enable expireTime := time.UnixMilli(req.ExpireTime) // Find all expired data in S3 database - models, err := t.s3dataBase.FindExpirationObject(ctx, engine, expireTime, req.ObjectGroup, int64(req.Count)) + models, err := t.s3dataBase.FindExpirationObject(ctx, engine, expireTime, req.ObjectGroup, int64(req.Limit)) if err != nil { return nil, err } diff --git a/internal/tools/msg.go b/internal/tools/msg.go index 1149b1f29..46f1573f2 100644 --- a/internal/tools/msg.go +++ b/internal/tools/msg.go @@ -12,12 +12,25 @@ import ( func (c *cronServer) deleteMsg() { now := time.Now() deltime := now.Add(-time.Hour * 24 * time.Duration(c.config.CronTask.RetainChatRecords)) - ctx := mcontext.SetOperationID(c.ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deltime.UnixMilli())) + operationID := fmt.Sprintf("cron_msg_%d_%d", os.Getpid(), deltime.UnixMilli()) + ctx := mcontext.SetOperationID(c.ctx, operationID) log.ZDebug(ctx, "Destruct chat records", "deltime", deltime, "timestamp", deltime.UnixMilli()) - - if _, err := c.msgClient.DestructMsgs(ctx, &msg.DestructMsgsReq{Timestamp: deltime.UnixMilli()}); err != nil { - log.ZError(ctx, "cron destruct chat records failed", err, "deltime", deltime, "cont", time.Since(now)) - return + const ( + deleteCount = 20 + deleteLimit = 50 + ) + var count int + for i := 1; i <= deleteCount; i++ { + ctx := mcontext.SetOperationID(c.ctx, fmt.Sprintf("%s_%d", operationID, i)) + resp, err := c.msgClient.DestructMsgs(ctx, &msg.DestructMsgsReq{Timestamp: deltime.UnixMilli(), Limit: deleteLimit}) + if err != nil { + log.ZError(ctx, "cron destruct chat records failed", err) + break + } + count += int(resp.Count) + if resp.Count <= deleteLimit { + break + } } - log.ZDebug(ctx, "cron destruct chat records success", "deltime", deltime, "cont", time.Since(now)) + log.ZDebug(ctx, "cron destruct chat records end", "deltime", deltime, "cont", time.Since(now), "deleteDocs", count) } diff --git a/internal/tools/s3.go b/internal/tools/s3.go index 38d8badc6..3b0122c84 100644 --- a/internal/tools/s3.go +++ b/internal/tools/s3.go @@ -15,20 +15,21 @@ func (c *cronServer) clearS3() { // number of pagination. if need modify, need update value in third.DeleteOutdatedData pageShowNumber := 500 deleteTime := start.Add(-time.Hour * 24 * time.Duration(c.config.CronTask.FileExpireTime)) - operationID := fmt.Sprintf("cron_%d_%d", os.Getpid(), deleteTime.UnixMilli()) + operationID := fmt.Sprintf("cron_s3_%d_%d", os.Getpid(), deleteTime.UnixMilli()) ctx := mcontext.SetOperationID(c.ctx, operationID) log.ZDebug(ctx, "deleteoutDatedData", "deletetime", deleteTime, "timestamp", deleteTime.UnixMilli()) + var count int for i := 1; i <= executeNum; i++ { ctx := mcontext.SetOperationID(c.ctx, fmt.Sprintf("%s_%d", operationID, i)) - resp, err := c.thirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: deleteTime.UnixMilli(), ObjectGroup: c.config.CronTask.DeleteObjectType, Count: int32(pageShowNumber)}) + resp, err := c.thirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: deleteTime.UnixMilli(), ObjectGroup: c.config.CronTask.DeleteObjectType, Limit: int32(pageShowNumber)}) if err != nil { - log.ZError(ctx, "cron deleteoutDatedData failed", err, "deleteTime", deleteTime, "cont", time.Since(start)) + log.ZError(ctx, "cron deleteoutDatedData failed", err) return } + count += int(resp.Count) if resp.Count < int32(pageShowNumber) { break } } - - log.ZDebug(ctx, "cron deleteoutDatedData success", "deltime", deleteTime, "cont", time.Since(start)) + log.ZDebug(ctx, "cron deleteoutDatedData success", "deltime", deleteTime, "cont", time.Since(start), "count", count) } diff --git a/internal/tools/user_msg.go b/internal/tools/user_msg.go index 9dfaf58c3..52a2b6bdc 100644 --- a/internal/tools/user_msg.go +++ b/internal/tools/user_msg.go @@ -12,9 +12,9 @@ import ( func (c *cronServer) clearUserMsg() { now := time.Now() - ctx := mcontext.SetOperationID(c.ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), now.UnixMilli())) - log.ZDebug(ctx, "clear msg cron start", "now", now) - + operationID := fmt.Sprintf("cron_user_msg_%d_%d", os.Getpid(), now.UnixMilli()) + ctx := mcontext.SetOperationID(c.ctx, operationID) + log.ZDebug(ctx, "clear msg cron start") conversations, err := c.conversationClient.GetConversationsNeedClearMsg(ctx, &pbconversation.GetConversationsNeedClearMsgReq{}) if err != nil { log.ZError(ctx, "Get conversation need Destruct msgs failed.", err) diff --git a/pkg/common/storage/controller/msg.go b/pkg/common/storage/controller/msg.go index 3c3cd9671..88089f1a0 100644 --- a/pkg/common/storage/controller/msg.go +++ b/pkg/common/storage/controller/msg.go @@ -18,7 +18,6 @@ import ( "context" "encoding/json" "errors" - "strings" "time" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" @@ -69,6 +68,7 @@ type CommonMsgDatabase interface { GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) SetMinSeqs(ctx context.Context, seqs map[string]int64) error + SetMinSeq(ctx context.Context, conversationID string, seq int64) error SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) (err error) SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error @@ -95,13 +95,14 @@ type CommonMsgDatabase interface { ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) // get Msg when destruct msg before - GetBeforeMsg(ctx context.Context, ts int64, docIds []string, limit int) ([]*model.MsgDocModel, error) - DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error) + //DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error) - GetDocIDs(ctx context.Context) ([]string, error) + GetRandBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error) SetUserConversationsMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error SetUserConversationsMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error + + DeleteDoc(ctx context.Context, docID string) error } func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser cache.SeqUser, seqConversation cache.SeqConversationCache, kafkaConf *config.Kafka) (CommonMsgDatabase, error) { @@ -806,9 +807,10 @@ func (db *commonMsgDatabase) GetMaxSeq(ctx context.Context, conversationID strin return db.seqConversation.GetMaxSeq(ctx, conversationID) } -func (db *commonMsgDatabase) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error { - return db.seqConversation.SetMinSeq(ctx, conversationID, minSeq) -} +// +//func (db *commonMsgDatabase) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error { +// return db.seqConversation.SetMinSeq(ctx, conversationID, minSeq) +//} func (db *commonMsgDatabase) SetMinSeqs(ctx context.Context, seqs map[string]int64) error { return db.seqConversation.SetMinSeqs(ctx, seqs) @@ -947,56 +949,40 @@ func (db *commonMsgDatabase) ConvertMsgsDocLen(ctx context.Context, conversation db.msgDocDatabase.ConvertMsgsDocLen(ctx, conversationIDs) } -func (db *commonMsgDatabase) GetBeforeMsg(ctx context.Context, ts int64, docIDs []string, limit int) ([]*model.MsgDocModel, error) { - var msgs []*model.MsgDocModel - for i := 0; i < len(docIDs); i += 1000 { - end := i + 1000 - if end > len(docIDs) { - end = len(docIDs) - } - - res, err := db.msgDocDatabase.GetBeforeMsg(ctx, ts, docIDs[i:end], limit) - if err != nil { - return nil, err - } - msgs = append(msgs, res...) - - if len(msgs) >= limit { - return msgs[:limit], nil - } - } - return msgs, nil +func (db *commonMsgDatabase) GetRandBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error) { + return db.msgDocDatabase.GetRandBeforeMsg(ctx, ts, limit) } -func (db *commonMsgDatabase) DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error) { - var notNull int - index := make([]int, 0, len(doc.Msg)) - for i, message := range doc.Msg { - if message.Msg != nil { - notNull++ - if message.Msg.SendTime < ts { - index = append(index, i) - } - } - } - if len(index) == 0 { - return index, nil - } - maxSeq := doc.Msg[index[len(index)-1]].Msg.Seq - conversationID := doc.DocID[:strings.LastIndex(doc.DocID, ":")] - if err := db.setMinSeq(ctx, conversationID, maxSeq+1); err != nil { - return index, err - } - if len(index) == notNull { - log.ZDebug(ctx, "Delete db in Doc", "DocID", doc.DocID, "index", index, "maxSeq", maxSeq) - return index, db.msgDocDatabase.DeleteDoc(ctx, doc.DocID) - } else { - log.ZDebug(ctx, "delete db in index", "DocID", doc.DocID, "index", index, "maxSeq", maxSeq) - return index, db.msgDocDatabase.DeleteMsgByIndex(ctx, doc.DocID, index) - } -} +// +//func (db *commonMsgDatabase) DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error) { +// var notNull int +// index := make([]int, 0, len(doc.Msg)) +// for i, message := range doc.Msg { +// if message.Msg != nil { +// notNull++ +// if message.Msg.SendTime < ts { +// index = append(index, i) +// } +// } +// } +// if len(index) == 0 { +// return index, nil +// } +// maxSeq := doc.Msg[index[len(index)-1]].Msg.Seq +// conversationID := doc.DocID[:strings.LastIndex(doc.DocID, ":")] +// if err := db.SetMinSeq(ctx, conversationID, maxSeq+1); err != nil { +// return index, err +// } +// if len(index) == notNull { +// log.ZDebug(ctx, "Delete db in Doc", "DocID", doc.DocID, "index", index, "maxSeq", maxSeq) +// return index, db.msgDocDatabase.DeleteDoc(ctx, doc.DocID) +// } else { +// log.ZDebug(ctx, "delete db in index", "DocID", doc.DocID, "index", index, "maxSeq", maxSeq) +// return index, db.msgDocDatabase.DeleteMsgByIndex(ctx, doc.DocID, index) +// } +//} -func (db *commonMsgDatabase) setMinSeq(ctx context.Context, conversationID string, seq int64) error { +func (db *commonMsgDatabase) SetMinSeq(ctx context.Context, conversationID string, seq int64) error { dbSeq, err := db.seqConversation.GetMinSeq(ctx, conversationID) if err != nil { if errors.Is(errs.Unwrap(err), redis.Nil) { @@ -1010,8 +996,8 @@ func (db *commonMsgDatabase) setMinSeq(ctx context.Context, conversationID strin return db.seqConversation.SetMinSeq(ctx, conversationID, seq) } -func (db *commonMsgDatabase) GetDocIDs(ctx context.Context) ([]string, error) { - return db.msgDocDatabase.GetDocIDs(ctx) +func (db *commonMsgDatabase) GetRandDocIDs(ctx context.Context, limit int) ([]string, error) { + return db.msgDocDatabase.GetRandDocIDs(ctx, limit) } func (db *commonMsgDatabase) GetCacheMaxSeqWithTime(ctx context.Context, conversationIDs []string) (map[string]database.SeqTime, error) { @@ -1026,3 +1012,7 @@ func (db *commonMsgDatabase) GetMaxSeqsWithTime(ctx context.Context, conversatio // todo: only the time in the redis cache will be taken, not the message time return db.seqConversation.GetMaxSeqsWithTime(ctx, conversationIDs) } + +func (db *commonMsgDatabase) DeleteDoc(ctx context.Context, docID string) error { + return db.msgDocDatabase.DeleteDoc(ctx, docID) +} diff --git a/pkg/common/storage/database/mgo/msg.go b/pkg/common/storage/database/mgo/msg.go index fc1fe47ea..937dbae1d 100644 --- a/pkg/common/storage/database/mgo/msg.go +++ b/pkg/common/storage/database/mgo/msg.go @@ -1227,8 +1227,7 @@ func (m *MsgMgo) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string } } -func (m *MsgMgo) GetDocIDs(ctx context.Context) ([]string, error) { - limit := 5000 +func (m *MsgMgo) GetRandDocIDs(ctx context.Context, limit int) ([]string, error) { var skip int var docIDs []string var offset int @@ -1267,15 +1266,18 @@ func (m *MsgMgo) GetDocIDs(ctx context.Context) ([]string, error) { return docIDs, errs.Wrap(err) } -func (m *MsgMgo) GetBeforeMsg(ctx context.Context, ts int64, docIDs []string, limit int) ([]*model.MsgDocModel, error) { +func (m *MsgMgo) GetRandBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error) { return mongoutil.Aggregate[*model.MsgDocModel](ctx, m.coll, []bson.M{ { "$match": bson.M{ - "doc_id": bson.M{ - "$in": docIDs, - }, - "msgs.msg.send_time": bson.M{ - "$lt": ts, + "msgs": bson.M{ + "$not": bson.M{ + "$elemMatch": bson.M{ + "msg.send_time": bson.M{ + "$gt": ts, + }, + }, + }, }, }, }, @@ -1288,7 +1290,7 @@ func (m *MsgMgo) GetBeforeMsg(ctx context.Context, ts int64, docIDs []string, li }, }, { - "$limit": limit, + "$sample": limit, }, }) } diff --git a/pkg/common/storage/database/mgo/msg_test.go b/pkg/common/storage/database/mgo/msg_test.go index 5aed4dc51..eff847d26 100644 --- a/pkg/common/storage/database/mgo/msg_test.go +++ b/pkg/common/storage/database/mgo/msg_test.go @@ -3,12 +3,11 @@ package mgo import ( "context" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" - "github.com/openimsdk/protocol/msg" - "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/db/mongoutil" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" + "math" "math/rand" "strconv" "testing" @@ -18,33 +17,43 @@ import ( func TestName1(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*300) defer cancel() - cli := Result(mongo.Connect(ctx, options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.48:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second))) + cli := Result(mongo.Connect(ctx, options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.66:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second))) - v := &MsgMgo{ - coll: cli.Database("openim_v3").Collection("msg3"), - } + //v := &MsgMgo{ + // coll: cli.Database("openim_v3").Collection("msg3"), + //} + // + //req := &msg.SearchMessageReq{ + // //RecvID: "3187706596", + // //SendID: "7009965934", + // ContentType: 101, + // //SendTime: "2024-05-06", + // //SessionType: 3, + // Pagination: &sdkws.RequestPagination{ + // PageNumber: 1, + // ShowNumber: 10, + // }, + //} + //total, res, err := v.SearchMessage(ctx, req) + //if err != nil { + // panic(err) + //} + // + //for i, re := range res { + // t.Logf("%d => %d | %+v", i+1, re.Msg.Seq, re.Msg.Content) + //} + // + //t.Log(total) - req := &msg.SearchMessageReq{ - //RecvID: "3187706596", - //SendID: "7009965934", - ContentType: 101, - //SendTime: "2024-05-06", - //SessionType: 3, - Pagination: &sdkws.RequestPagination{ - PageNumber: 1, - ShowNumber: 10, - }, - } - total, res, err := v.SearchMessage(ctx, req) + msg, err := NewMsgMongo(cli.Database("openim_v3")) if err != nil { panic(err) } - - for i, re := range res { - t.Logf("%d => %d | %+v", i+1, re.Msg.Seq, re.Msg.Content) + res, err := msg.GetBeforeMsg(ctx, time.Now().UnixMilli(), []string{"1:0"}, 1000) + if err != nil { + panic(err) } - - t.Log(total) + t.Log(len(res)) } func TestName10(t *testing.T) { @@ -73,3 +82,26 @@ func TestName10(t *testing.T) { } } + +func TestName3(t *testing.T) { + t.Log(uint64(math.MaxUint64)) + t.Log(int64(math.MaxInt64)) + + t.Log(int64(math.MinInt64)) +} + +func TestName4(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*300) + defer cancel() + cli := Result(mongo.Connect(ctx, options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.66:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second))) + + msg, err := NewMsgMongo(cli.Database("openim_v3")) + if err != nil { + panic(err) + } + res, err := msg.GetBeforeMsg(ctx, time.Now().UnixMilli(), []string{"1:0"}, 1000) + if err != nil { + panic(err) + } + t.Log(len(res)) +} diff --git a/pkg/common/storage/database/msg.go b/pkg/common/storage/database/msg.go index 23a99f5b9..17aaf2342 100644 --- a/pkg/common/storage/database/msg.go +++ b/pkg/common/storage/database/msg.go @@ -45,7 +45,7 @@ type Msg interface { DeleteDoc(ctx context.Context, docID string) error DeleteMsgByIndex(ctx context.Context, docID string, index []int) error - GetBeforeMsg(ctx context.Context, ts int64, docIDs []string, limit int) ([]*model.MsgDocModel, error) + GetRandBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error) - GetDocIDs(ctx context.Context) ([]string, error) + GetRandDocIDs(ctx context.Context, limit int) ([]string, error) }