mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-11-04 19:32:17 +08:00 
			
		
		
		
	implement FindExpires pagination.
This commit is contained in:
		
							parent
							
								
									330611796c
								
							
						
					
					
						commit
						7897044e28
					
				@ -23,6 +23,8 @@ import (
 | 
				
			|||||||
	"strconv"
 | 
						"strconv"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"github.com/openimsdk/open-im-server/v3/pkg/common/config"
 | 
				
			||||||
 | 
						"github.com/openimsdk/open-im-server/v3/pkg/common/storage/common"
 | 
				
			||||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
 | 
						"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
 | 
				
			||||||
	"go.mongodb.org/mongo-driver/mongo"
 | 
						"go.mongodb.org/mongo-driver/mongo"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -286,39 +288,48 @@ func (t *thirdServer) apiAddress(prefix, name string) string {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (t *thirdServer) DeleteOutdatedData(ctx context.Context, req *third.DeleteOutdatedDataReq) (*third.DeleteOutdatedDataResp, error) {
 | 
					func (t *thirdServer) DeleteOutdatedData(ctx context.Context, req *third.DeleteOutdatedDataReq) (*third.DeleteOutdatedDataResp, error) {
 | 
				
			||||||
 | 
						var conf config.Third
 | 
				
			||||||
	expireTime := time.UnixMilli(req.ExpireTime)
 | 
						expireTime := time.UnixMilli(req.ExpireTime)
 | 
				
			||||||
	models, err := t.s3dataBase.FindByExpires(ctx, expireTime)
 | 
						findPagination := &common.FindPagination{
 | 
				
			||||||
	if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments {
 | 
							PageNumber: 1,
 | 
				
			||||||
		return nil, errs.Wrap(err)
 | 
							ShowNumber: 1000,
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	needDelObjectKeys := make([]string, 0)
 | 
						for {
 | 
				
			||||||
	for _, model := range models {
 | 
							total, models, err := t.s3dataBase.FindByExpires(ctx, expireTime, findPagination)
 | 
				
			||||||
		needDelObjectKeys = append(needDelObjectKeys, model.Key)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	needDelObjectKeys = datautil.Distinct(needDelObjectKeys)
 | 
					 | 
				
			||||||
	for _, key := range needDelObjectKeys {
 | 
					 | 
				
			||||||
		count, err := t.s3dataBase.FindNotDelByS3(ctx, key, expireTime)
 | 
					 | 
				
			||||||
		if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments {
 | 
							if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments {
 | 
				
			||||||
			return nil, errs.Wrap(err)
 | 
								return nil, errs.Wrap(err)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		if int(count) < 1 {
 | 
							needDelObjectKeys := make([]string, 0)
 | 
				
			||||||
			thumbnailKey, err := t.s3dataBase.GetImageThumbnailKey(ctx, key)
 | 
							for _, model := range models {
 | 
				
			||||||
 | 
								needDelObjectKeys = append(needDelObjectKeys, model.Key)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							needDelObjectKeys = datautil.Distinct(needDelObjectKeys)
 | 
				
			||||||
 | 
							for _, key := range needDelObjectKeys {
 | 
				
			||||||
 | 
								count, err := t.s3dataBase.FindNotDelByS3(ctx, key, expireTime)
 | 
				
			||||||
 | 
								if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments {
 | 
				
			||||||
 | 
									return nil, errs.Wrap(err)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								if int(count) < 1 {
 | 
				
			||||||
 | 
									thumbnailKey, err := t.s3dataBase.GetImageThumbnailKey(ctx, key)
 | 
				
			||||||
 | 
									if err != nil {
 | 
				
			||||||
 | 
										return nil, errs.Wrap(err)
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
									t.s3dataBase.DeleteObject(ctx, thumbnailKey)
 | 
				
			||||||
 | 
									t.s3dataBase.DelS3Key(ctx, conf.Object.Enable, needDelObjectKeys...)
 | 
				
			||||||
 | 
									t.s3dataBase.DeleteObject(ctx, key)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							for _, model := range models {
 | 
				
			||||||
 | 
								err := t.s3dataBase.DeleteSpecifiedData(ctx, model.Engine, model.Name)
 | 
				
			||||||
			if err != nil {
 | 
								if err != nil {
 | 
				
			||||||
				return nil, errs.Wrap(err)
 | 
									return nil, errs.Wrap(err)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			t.s3dataBase.DeleteObject(ctx, thumbnailKey)
 | 
					 | 
				
			||||||
			t.s3dataBase.DelS3Key(ctx, "minio", needDelObjectKeys...)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			t.s3dataBase.DeleteObject(ctx, key)
 | 
					 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
							if total < int64(findPagination.ShowNumber) {
 | 
				
			||||||
 | 
								break
 | 
				
			||||||
	for _, model := range models {
 | 
					 | 
				
			||||||
		err := t.s3dataBase.DeleteSpecifiedData(ctx, model.Engine, model.Name)
 | 
					 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			return nil, errs.Wrap(err)
 | 
					 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
							findPagination.PageNumber++
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return &third.DeleteOutdatedDataResp{}, nil
 | 
						return &third.DeleteOutdatedDataResp{}, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -24,3 +24,15 @@ type GroupSimpleUserID struct {
 | 
				
			|||||||
	Hash      uint64
 | 
						Hash      uint64
 | 
				
			||||||
	MemberNum uint32
 | 
						MemberNum uint32
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type FindPagination struct {
 | 
				
			||||||
 | 
						PageNumber int32
 | 
				
			||||||
 | 
						ShowNumber int32
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (f *FindPagination) GetPageNumber() int32 {
 | 
				
			||||||
 | 
						return f.PageNumber
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					func (f *FindPagination) GetShowNumber() int32 {
 | 
				
			||||||
 | 
						return f.ShowNumber
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -24,6 +24,7 @@ import (
 | 
				
			|||||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
 | 
						"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
 | 
						"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
 | 
				
			||||||
 | 
						"github.com/openimsdk/tools/db/pagination"
 | 
				
			||||||
	"github.com/openimsdk/tools/s3"
 | 
						"github.com/openimsdk/tools/s3"
 | 
				
			||||||
	"github.com/openimsdk/tools/s3/cont"
 | 
						"github.com/openimsdk/tools/s3/cont"
 | 
				
			||||||
	"github.com/redis/go-redis/v9"
 | 
						"github.com/redis/go-redis/v9"
 | 
				
			||||||
@ -39,7 +40,7 @@ type S3Database interface {
 | 
				
			|||||||
	SetObject(ctx context.Context, info *model.Object) error
 | 
						SetObject(ctx context.Context, info *model.Object) error
 | 
				
			||||||
	StatObject(ctx context.Context, name string) (*s3.ObjectInfo, error)
 | 
						StatObject(ctx context.Context, name string) (*s3.ObjectInfo, error)
 | 
				
			||||||
	FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error)
 | 
						FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error)
 | 
				
			||||||
	FindByExpires(ctx context.Context, duration time.Time) ([]*model.Object, error)
 | 
						FindByExpires(ctx context.Context, duration time.Time, pagination pagination.Pagination) (total int64, objects []*model.Object, err error)
 | 
				
			||||||
	DeleteObject(ctx context.Context, name string) error
 | 
						DeleteObject(ctx context.Context, name string) error
 | 
				
			||||||
	DeleteSpecifiedData(ctx context.Context, engine string, name string) error
 | 
						DeleteSpecifiedData(ctx context.Context, engine string, name string) error
 | 
				
			||||||
	FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error)
 | 
						FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error)
 | 
				
			||||||
@ -120,8 +121,9 @@ func (s *s3Database) StatObject(ctx context.Context, name string) (*s3.ObjectInf
 | 
				
			|||||||
func (s *s3Database) FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error) {
 | 
					func (s *s3Database) FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error) {
 | 
				
			||||||
	return s.s3.FormData(ctx, name, size, contentType, duration)
 | 
						return s.s3.FormData(ctx, name, size, contentType, duration)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
func (s *s3Database) FindByExpires(ctx context.Context, duration time.Time) ([]*model.Object, error) {
 | 
					func (s *s3Database) FindByExpires(ctx context.Context, duration time.Time, pagination pagination.Pagination) (total int64, objects []*model.Object, err error) {
 | 
				
			||||||
	return s.db.FindByExpires(ctx, duration)
 | 
					
 | 
				
			||||||
 | 
						return s.db.FindByExpires(ctx, duration, pagination)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (s *s3Database) DeleteObject(ctx context.Context, name string) error {
 | 
					func (s *s3Database) DeleteObject(ctx context.Context, name string) error {
 | 
				
			||||||
 | 
				
			|||||||
@ -22,6 +22,7 @@ import (
 | 
				
			|||||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
 | 
						"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/openimsdk/tools/db/mongoutil"
 | 
						"github.com/openimsdk/tools/db/mongoutil"
 | 
				
			||||||
 | 
						"github.com/openimsdk/tools/db/pagination"
 | 
				
			||||||
	"github.com/openimsdk/tools/errs"
 | 
						"github.com/openimsdk/tools/errs"
 | 
				
			||||||
	"go.mongodb.org/mongo-driver/bson"
 | 
						"go.mongodb.org/mongo-driver/bson"
 | 
				
			||||||
	"go.mongodb.org/mongo-driver/mongo"
 | 
						"go.mongodb.org/mongo-driver/mongo"
 | 
				
			||||||
@ -70,10 +71,10 @@ func (o *S3Mongo) Take(ctx context.Context, engine string, name string) (*model.
 | 
				
			|||||||
func (o *S3Mongo) Delete(ctx context.Context, engine string, name string) error {
 | 
					func (o *S3Mongo) Delete(ctx context.Context, engine string, name string) error {
 | 
				
			||||||
	return mongoutil.DeleteOne(ctx, o.coll, bson.M{"name": name, "engine": engine})
 | 
						return mongoutil.DeleteOne(ctx, o.coll, bson.M{"name": name, "engine": engine})
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
func (o *S3Mongo) FindByExpires(ctx context.Context, duration time.Time) ([]*model.Object, error) {
 | 
					func (o *S3Mongo) FindByExpires(ctx context.Context, duration time.Time, pagination pagination.Pagination) (total int64, objects []*model.Object, err error) {
 | 
				
			||||||
	return mongoutil.Find[*model.Object](ctx, o.coll, bson.M{
 | 
						return mongoutil.FindPage[*model.Object](ctx, o.coll, bson.M{
 | 
				
			||||||
		"create_time": bson.M{"$lt": duration},
 | 
							"create_time": bson.M{"$lt": duration},
 | 
				
			||||||
	})
 | 
						}, pagination)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
func (o *S3Mongo) FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error) {
 | 
					func (o *S3Mongo) FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error) {
 | 
				
			||||||
	return mongoutil.Count(ctx, o.coll, bson.M{
 | 
						return mongoutil.Count(ctx, o.coll, bson.M{
 | 
				
			||||||
 | 
				
			|||||||
@ -19,12 +19,13 @@ import (
 | 
				
			|||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
 | 
						"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
 | 
				
			||||||
 | 
						"github.com/openimsdk/tools/db/pagination"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type ObjectInfo interface {
 | 
					type ObjectInfo interface {
 | 
				
			||||||
	SetObject(ctx context.Context, obj *model.Object) error
 | 
						SetObject(ctx context.Context, obj *model.Object) error
 | 
				
			||||||
	Take(ctx context.Context, engine string, name string) (*model.Object, error)
 | 
						Take(ctx context.Context, engine string, name string) (*model.Object, error)
 | 
				
			||||||
	Delete(ctx context.Context, engine string, name string) error
 | 
						Delete(ctx context.Context, engine string, name string) error
 | 
				
			||||||
	FindByExpires(ctx context.Context, duration time.Time) ([]*model.Object, error)
 | 
						FindByExpires(ctx context.Context, duration time.Time, pagination pagination.Pagination) (total int64, objects []*model.Object, err error)
 | 
				
			||||||
	FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error)
 | 
						FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user