mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-11-04 19:32:17 +08:00 
			
		
		
		
	Merge pull request #8 from mo3et/minio-cron-del
feat: implement scheduled delete outdated object in minio.
This commit is contained in:
		
						commit
						c117ef8399
					
				@ -1,3 +1,3 @@
 | 
				
			|||||||
chatRecordsClearTime: "0 2 * * *"
 | 
					cronExecuteTime: "0 2 * * *"
 | 
				
			||||||
retainChatRecords: 365
 | 
					retainChatRecords: 365
 | 
				
			||||||
fileExpireTime: 90
 | 
					fileExpireTime: 90
 | 
				
			||||||
 | 
				
			|||||||
							
								
								
									
										2
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								go.mod
									
									
									
									
									
								
							@ -14,7 +14,7 @@ require (
 | 
				
			|||||||
	github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect
 | 
						github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect
 | 
				
			||||||
	github.com/mitchellh/mapstructure v1.5.0
 | 
						github.com/mitchellh/mapstructure v1.5.0
 | 
				
			||||||
	github.com/openimsdk/protocol v0.0.69-alpha.22
 | 
						github.com/openimsdk/protocol v0.0.69-alpha.22
 | 
				
			||||||
	github.com/openimsdk/tools v0.0.49-alpha.30
 | 
						github.com/openimsdk/tools v0.0.49-alpha.39
 | 
				
			||||||
	github.com/pkg/errors v0.9.1 // indirect
 | 
						github.com/pkg/errors v0.9.1 // indirect
 | 
				
			||||||
	github.com/prometheus/client_golang v1.18.0
 | 
						github.com/prometheus/client_golang v1.18.0
 | 
				
			||||||
	github.com/stretchr/testify v1.9.0
 | 
						github.com/stretchr/testify v1.9.0
 | 
				
			||||||
 | 
				
			|||||||
							
								
								
									
										4
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										4
									
								
								go.sum
									
									
									
									
									
								
							@ -272,8 +272,8 @@ github.com/openimsdk/gomake v0.0.13 h1:xLDe/moqgWpRoptHzI4packAWzs4C16b+sVY+txNJ
 | 
				
			|||||||
github.com/openimsdk/gomake v0.0.13/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
 | 
					github.com/openimsdk/gomake v0.0.13/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
 | 
				
			||||||
github.com/openimsdk/protocol v0.0.69-alpha.22 h1:kifZWVNDkg9diXFJUJ/Q9xFc80cveBhc+1dUXcE9xHQ=
 | 
					github.com/openimsdk/protocol v0.0.69-alpha.22 h1:kifZWVNDkg9diXFJUJ/Q9xFc80cveBhc+1dUXcE9xHQ=
 | 
				
			||||||
github.com/openimsdk/protocol v0.0.69-alpha.22/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
 | 
					github.com/openimsdk/protocol v0.0.69-alpha.22/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
 | 
				
			||||||
github.com/openimsdk/tools v0.0.49-alpha.30 h1:iT2+1F8cJmlwKEris25YgK0seiJRUear+wTgc1bzcg8=
 | 
					github.com/openimsdk/tools v0.0.49-alpha.39 h1:bl5+q7xHsc/j1NnkN8/gYmn23RsNNbRizDY58d2EY1w=
 | 
				
			||||||
github.com/openimsdk/tools v0.0.49-alpha.30/go.mod h1:zc0maZ2ohXlHd0ylY5JnCE8uqq/hslhcfcKa6iO5PCU=
 | 
					github.com/openimsdk/tools v0.0.49-alpha.39/go.mod h1:zc0maZ2ohXlHd0ylY5JnCE8uqq/hslhcfcKa6iO5PCU=
 | 
				
			||||||
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
 | 
					github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
 | 
				
			||||||
github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
 | 
					github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
 | 
				
			||||||
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
 | 
					github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
 | 
				
			||||||
 | 
				
			|||||||
@ -23,10 +23,14 @@ import (
 | 
				
			|||||||
	"strconv"
 | 
						"strconv"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"github.com/openimsdk/open-im-server/v3/pkg/common/config"
 | 
				
			||||||
 | 
						"github.com/openimsdk/open-im-server/v3/pkg/common/storage/common"
 | 
				
			||||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
 | 
						"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
 | 
				
			||||||
 | 
						"go.mongodb.org/mongo-driver/mongo"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/google/uuid"
 | 
						"github.com/google/uuid"
 | 
				
			||||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
 | 
						"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
 | 
				
			||||||
 | 
						"github.com/openimsdk/protocol/sdkws"
 | 
				
			||||||
	"github.com/openimsdk/protocol/third"
 | 
						"github.com/openimsdk/protocol/third"
 | 
				
			||||||
	"github.com/openimsdk/tools/errs"
 | 
						"github.com/openimsdk/tools/errs"
 | 
				
			||||||
	"github.com/openimsdk/tools/log"
 | 
						"github.com/openimsdk/tools/log"
 | 
				
			||||||
@ -285,14 +289,47 @@ func (t *thirdServer) apiAddress(prefix, name string) string {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (t *thirdServer) DeleteOutdatedData(ctx context.Context, req *third.DeleteOutdatedDataReq) (*third.DeleteOutdatedDataResp, error) {
 | 
					func (t *thirdServer) DeleteOutdatedData(ctx context.Context, req *third.DeleteOutdatedDataReq) (*third.DeleteOutdatedDataResp, error) {
 | 
				
			||||||
 | 
						var conf config.Third
 | 
				
			||||||
	expireTime := time.UnixMilli(req.ExpireTime)
 | 
						expireTime := time.UnixMilli(req.ExpireTime)
 | 
				
			||||||
	models, err := t.s3dataBase.FindByExpires(ctx, expireTime)
 | 
						findPagination := &sdkws.RequestPagination{
 | 
				
			||||||
	if err != nil {
 | 
							PageNumber: 1,
 | 
				
			||||||
		return nil, err
 | 
							ShowNumber: 1000,
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	for _, model := range models {
 | 
						for {
 | 
				
			||||||
		t.s3dataBase.DeleteObject(ctx, model.Key)
 | 
							total, models, err := t.s3dataBase.FindByExpires(ctx, expireTime, findPagination)
 | 
				
			||||||
		t.s3dataBase.DeleteSpecifiedData(ctx, model.Engine, model.Key)
 | 
							if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments {
 | 
				
			||||||
 | 
								return nil, errs.Wrap(err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							needDelObjectKeys := make([]string, 0)
 | 
				
			||||||
 | 
							for _, model := range models {
 | 
				
			||||||
 | 
								needDelObjectKeys = append(needDelObjectKeys, model.Key)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							needDelObjectKeys = datautil.Distinct(needDelObjectKeys)
 | 
				
			||||||
 | 
							for _, key := range needDelObjectKeys {
 | 
				
			||||||
 | 
								count, err := t.s3dataBase.FindNotDelByS3(ctx, key, expireTime)
 | 
				
			||||||
 | 
								if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments {
 | 
				
			||||||
 | 
									return nil, errs.Wrap(err)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								if int(count) < 1 {
 | 
				
			||||||
 | 
									thumbnailKey, err := t.s3dataBase.GetImageThumbnailKey(ctx, key)
 | 
				
			||||||
 | 
									if err != nil {
 | 
				
			||||||
 | 
										return nil, errs.Wrap(err)
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
									t.s3dataBase.DeleteObject(ctx, thumbnailKey)
 | 
				
			||||||
 | 
									t.s3dataBase.DelS3Key(ctx, conf.Object.Enable, needDelObjectKeys...)
 | 
				
			||||||
 | 
									t.s3dataBase.DeleteObject(ctx, key)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							for _, model := range models {
 | 
				
			||||||
 | 
								err := t.s3dataBase.DeleteSpecifiedData(ctx, model.Engine, model.Name)
 | 
				
			||||||
 | 
								if err != nil {
 | 
				
			||||||
 | 
									return nil, errs.Wrap(err)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if total < int64(findPagination.ShowNumber) {
 | 
				
			||||||
 | 
								break
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return &third.DeleteOutdatedDataResp{}, nil
 | 
						return &third.DeleteOutdatedDataResp{}, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -41,7 +41,7 @@ type CronTaskConfig struct {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func Start(ctx context.Context, config *CronTaskConfig) error {
 | 
					func Start(ctx context.Context, config *CronTaskConfig) error {
 | 
				
			||||||
	log.CInfo(ctx, "CRON-TASK server is initializing", "chatRecordsClearTime", config.CronTask.ChatRecordsClearTime, "msgDestructTime", config.CronTask.RetainChatRecords)
 | 
						log.CInfo(ctx, "CRON-TASK server is initializing", "chatRecordsClearTime", config.CronTask.CronExecuteTime, "msgDestructTime", config.CronTask.RetainChatRecords)
 | 
				
			||||||
	if config.CronTask.RetainChatRecords < 1 {
 | 
						if config.CronTask.RetainChatRecords < 1 {
 | 
				
			||||||
		return errs.New("msg destruct time must be greater than 1").Wrap()
 | 
							return errs.New("msg destruct time must be greater than 1").Wrap()
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@ -68,29 +68,31 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
 | 
				
			|||||||
		}
 | 
							}
 | 
				
			||||||
		log.ZInfo(ctx, "cron clear chat records success", "deltime", deltime, "cont", time.Since(now))
 | 
							log.ZInfo(ctx, "cron clear chat records success", "deltime", deltime, "cont", time.Since(now))
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if _, err := crontab.AddFunc(config.CronTask.ChatRecordsClearTime, clearFunc); err != nil {
 | 
						if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, clearFunc); err != nil {
 | 
				
			||||||
		return errs.Wrap(err)
 | 
							return errs.Wrap(err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						tConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Third)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						thirdClient := third.NewThirdClient(tConn)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	deleteFunc := func() {
 | 
						deleteFunc := func() {
 | 
				
			||||||
		now := time.Now()
 | 
							now := time.Now()
 | 
				
			||||||
		deleteTime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.FileExpireTime))
 | 
							deleteTime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.FileExpireTime))
 | 
				
			||||||
		ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deleteTime.UnixMilli()))
 | 
							ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deleteTime.UnixMilli()))
 | 
				
			||||||
		log.ZInfo(ctx, "deleteoutDatedData ", "deletetime", deleteTime, "timestamp", deleteTime.UnixMilli())
 | 
							log.ZInfo(ctx, "deleteoutDatedData ", "deletetime", deleteTime, "timestamp", deleteTime.UnixMilli())
 | 
				
			||||||
		tConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Third)
 | 
					 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			return
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		thirdClient := third.NewThirdClient(tConn)
 | 
					 | 
				
			||||||
		if _, err := thirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: deleteTime.UnixMilli()}); err != nil {
 | 
							if _, err := thirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: deleteTime.UnixMilli()}); err != nil {
 | 
				
			||||||
			log.ZError(ctx, "cron deleteoutDatedData failed", err, "deleteTime", deleteTime, "cont", time.Since(now))
 | 
								log.ZError(ctx, "cron deleteoutDatedData failed", err, "deleteTime", deleteTime, "cont", time.Since(now))
 | 
				
			||||||
			return
 | 
								return
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		log.ZInfo(ctx, "cron deleteoutDatedData success", "deltime", deleteTime, "cont", time.Since(now))
 | 
							log.ZInfo(ctx, "cron deleteoutDatedData success", "deltime", deleteTime, "cont", time.Since(now))
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if _, err := crontab.AddFunc(string(config.CronTask.FileExpireTime), deleteFunc); err != nil {
 | 
						if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, deleteFunc); err != nil {
 | 
				
			||||||
		return errs.Wrap(err)
 | 
							return errs.Wrap(err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	log.ZInfo(ctx, "start cron task", "chatRecordsClearTime", config.CronTask.ChatRecordsClearTime)
 | 
						log.ZInfo(ctx, "start cron task", "CronExecuteTime", config.CronTask.CronExecuteTime)
 | 
				
			||||||
	crontab.Start()
 | 
						crontab.Start()
 | 
				
			||||||
	<-ctx.Done()
 | 
						<-ctx.Done()
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
 | 
				
			|||||||
@ -106,9 +106,9 @@ type API struct {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type CronTask struct {
 | 
					type CronTask struct {
 | 
				
			||||||
	ChatRecordsClearTime string `mapstructure:"chatRecordsClearTime"`
 | 
						CronExecuteTime   string `mapstructure:"cronExecuteTime"`
 | 
				
			||||||
	RetainChatRecords    int    `mapstructure:"retainChatRecords"`
 | 
						RetainChatRecords int    `mapstructure:"retainChatRecords"`
 | 
				
			||||||
	FileExpireTime       int    `mapstructure:"fileExpireTime"`
 | 
						FileExpireTime    int    `mapstructure:"fileExpireTime"`
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type OfflinePushConfig struct {
 | 
					type OfflinePushConfig struct {
 | 
				
			||||||
 | 
				
			|||||||
@ -19,11 +19,12 @@ import (
 | 
				
			|||||||
	"path/filepath"
 | 
						"path/filepath"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	redis2 "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
 | 
						redisCache "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
 | 
				
			||||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
 | 
						"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
 | 
				
			||||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
 | 
						"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
 | 
						"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
 | 
				
			||||||
 | 
						"github.com/openimsdk/tools/db/pagination"
 | 
				
			||||||
	"github.com/openimsdk/tools/s3"
 | 
						"github.com/openimsdk/tools/s3"
 | 
				
			||||||
	"github.com/openimsdk/tools/s3/cont"
 | 
						"github.com/openimsdk/tools/s3/cont"
 | 
				
			||||||
	"github.com/redis/go-redis/v9"
 | 
						"github.com/redis/go-redis/v9"
 | 
				
			||||||
@ -39,23 +40,28 @@ type S3Database interface {
 | 
				
			|||||||
	SetObject(ctx context.Context, info *model.Object) error
 | 
						SetObject(ctx context.Context, info *model.Object) error
 | 
				
			||||||
	StatObject(ctx context.Context, name string) (*s3.ObjectInfo, error)
 | 
						StatObject(ctx context.Context, name string) (*s3.ObjectInfo, error)
 | 
				
			||||||
	FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error)
 | 
						FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error)
 | 
				
			||||||
	FindByExpires(ctx context.Context, duration time.Time) ([]*model.Object, error)
 | 
						FindByExpires(ctx context.Context, duration time.Time, pagination pagination.Pagination) (total int64, objects []*model.Object, err error)
 | 
				
			||||||
	DeleteObject(ctx context.Context, name string) error
 | 
						DeleteObject(ctx context.Context, name string) error
 | 
				
			||||||
	DeleteSpecifiedData(ctx context.Context, engine string, name string) error
 | 
						DeleteSpecifiedData(ctx context.Context, engine string, name string) error
 | 
				
			||||||
 | 
						FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error)
 | 
				
			||||||
 | 
						DelS3Key(ctx context.Context, engine string, keys ...string) error
 | 
				
			||||||
 | 
						GetImageThumbnailKey(ctx context.Context, name string) (string, error)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func NewS3Database(rdb redis.UniversalClient, s3 s3.Interface, obj database.ObjectInfo) S3Database {
 | 
					func NewS3Database(rdb redis.UniversalClient, s3 s3.Interface, obj database.ObjectInfo) S3Database {
 | 
				
			||||||
	return &s3Database{
 | 
						return &s3Database{
 | 
				
			||||||
		s3:    cont.New(redis2.NewS3Cache(rdb, s3), s3),
 | 
							s3:      cont.New(redisCache.NewS3Cache(rdb, s3), s3),
 | 
				
			||||||
		cache: redis2.NewObjectCacheRedis(rdb, obj),
 | 
							cache:   redisCache.NewObjectCacheRedis(rdb, obj),
 | 
				
			||||||
		db:    obj,
 | 
							s3cache: redisCache.NewS3Cache(rdb, s3),
 | 
				
			||||||
 | 
							db:      obj,
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type s3Database struct {
 | 
					type s3Database struct {
 | 
				
			||||||
	s3    *cont.Controller
 | 
						s3      *cont.Controller
 | 
				
			||||||
	cache cache.ObjectCache
 | 
						cache   cache.ObjectCache
 | 
				
			||||||
	db    database.ObjectInfo
 | 
						s3cache cont.S3Cache
 | 
				
			||||||
 | 
						db      database.ObjectInfo
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (s *s3Database) PartSize(ctx context.Context, size int64) (int64, error) {
 | 
					func (s *s3Database) PartSize(ctx context.Context, size int64) (int64, error) {
 | 
				
			||||||
@ -115,8 +121,9 @@ func (s *s3Database) StatObject(ctx context.Context, name string) (*s3.ObjectInf
 | 
				
			|||||||
func (s *s3Database) FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error) {
 | 
					func (s *s3Database) FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error) {
 | 
				
			||||||
	return s.s3.FormData(ctx, name, size, contentType, duration)
 | 
						return s.s3.FormData(ctx, name, size, contentType, duration)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
func (s *s3Database) FindByExpires(ctx context.Context, duration time.Time) ([]*model.Object, error) {
 | 
					func (s *s3Database) FindByExpires(ctx context.Context, duration time.Time, pagination pagination.Pagination) (total int64, objects []*model.Object, err error) {
 | 
				
			||||||
	return s.db.FindByExpires(ctx, duration)
 | 
					
 | 
				
			||||||
 | 
						return s.db.FindByExpires(ctx, duration, pagination)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (s *s3Database) DeleteObject(ctx context.Context, name string) error {
 | 
					func (s *s3Database) DeleteObject(ctx context.Context, name string) error {
 | 
				
			||||||
@ -125,3 +132,15 @@ func (s *s3Database) DeleteObject(ctx context.Context, name string) error {
 | 
				
			|||||||
func (s *s3Database) DeleteSpecifiedData(ctx context.Context, engine string, name string) error {
 | 
					func (s *s3Database) DeleteSpecifiedData(ctx context.Context, engine string, name string) error {
 | 
				
			||||||
	return s.db.Delete(ctx, engine, name)
 | 
						return s.db.Delete(ctx, engine, name)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (s *s3Database) FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error) {
 | 
				
			||||||
 | 
						return s.db.FindNotDelByS3(ctx, key, duration)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (s *s3Database) DelS3Key(ctx context.Context, engine string, keys ...string) error {
 | 
				
			||||||
 | 
						return s.s3cache.DelS3Key(ctx, engine, keys...)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (s *s3Database) GetImageThumbnailKey(ctx context.Context, name string) (string, error) {
 | 
				
			||||||
 | 
						return s.s3.GetImageThumbnailKey(ctx, name)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -16,9 +16,10 @@ package controller
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"context"
 | 
						"context"
 | 
				
			||||||
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
 | 
						"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
 | 
				
			||||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
 | 
						"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
 | 
				
			||||||
	"time"
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
 | 
						"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
 | 
				
			||||||
	"github.com/openimsdk/tools/db/pagination"
 | 
						"github.com/openimsdk/tools/db/pagination"
 | 
				
			||||||
 | 
				
			|||||||
@ -22,6 +22,7 @@ import (
 | 
				
			|||||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
 | 
						"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/openimsdk/tools/db/mongoutil"
 | 
						"github.com/openimsdk/tools/db/mongoutil"
 | 
				
			||||||
 | 
						"github.com/openimsdk/tools/db/pagination"
 | 
				
			||||||
	"github.com/openimsdk/tools/errs"
 | 
						"github.com/openimsdk/tools/errs"
 | 
				
			||||||
	"go.mongodb.org/mongo-driver/bson"
 | 
						"go.mongodb.org/mongo-driver/bson"
 | 
				
			||||||
	"go.mongodb.org/mongo-driver/mongo"
 | 
						"go.mongodb.org/mongo-driver/mongo"
 | 
				
			||||||
@ -70,8 +71,14 @@ func (o *S3Mongo) Take(ctx context.Context, engine string, name string) (*model.
 | 
				
			|||||||
func (o *S3Mongo) Delete(ctx context.Context, engine string, name string) error {
 | 
					func (o *S3Mongo) Delete(ctx context.Context, engine string, name string) error {
 | 
				
			||||||
	return mongoutil.DeleteOne(ctx, o.coll, bson.M{"name": name, "engine": engine})
 | 
						return mongoutil.DeleteOne(ctx, o.coll, bson.M{"name": name, "engine": engine})
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
func (o *S3Mongo) FindByExpires(ctx context.Context, duration time.Time) ([]*model.Object, error) {
 | 
					func (o *S3Mongo) FindByExpires(ctx context.Context, duration time.Time, pagination pagination.Pagination) (total int64, objects []*model.Object, err error) {
 | 
				
			||||||
	return mongoutil.Find[*model.Object](ctx, o.coll, bson.M{
 | 
						return mongoutil.FindPage[*model.Object](ctx, o.coll, bson.M{
 | 
				
			||||||
		"create_time": bson.M{"$lt": duration},
 | 
							"create_time": bson.M{"$lt": duration},
 | 
				
			||||||
 | 
						}, pagination)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					func (o *S3Mongo) FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error) {
 | 
				
			||||||
 | 
						return mongoutil.Count(ctx, o.coll, bson.M{
 | 
				
			||||||
 | 
							"key":         key,
 | 
				
			||||||
 | 
							"create_time": bson.M{"$gt": duration},
 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -19,11 +19,13 @@ import (
 | 
				
			|||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
 | 
						"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
 | 
				
			||||||
 | 
						"github.com/openimsdk/tools/db/pagination"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type ObjectInfo interface {
 | 
					type ObjectInfo interface {
 | 
				
			||||||
	SetObject(ctx context.Context, obj *model.Object) error
 | 
						SetObject(ctx context.Context, obj *model.Object) error
 | 
				
			||||||
	Take(ctx context.Context, engine string, name string) (*model.Object, error)
 | 
						Take(ctx context.Context, engine string, name string) (*model.Object, error)
 | 
				
			||||||
	Delete(ctx context.Context, engine string, name string) error
 | 
						Delete(ctx context.Context, engine string, name string) error
 | 
				
			||||||
	FindByExpires(ctx context.Context, duration time.Time) ([]*model.Object, error)
 | 
						FindByExpires(ctx context.Context, duration time.Time, pagination pagination.Pagination) (total int64, objects []*model.Object, err error)
 | 
				
			||||||
 | 
						FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user