diff --git a/internal/rpc/third/s3.go b/internal/rpc/third/s3.go index ca1e87cf8..e0bf7837d 100644 --- a/internal/rpc/third/s3.go +++ b/internal/rpc/third/s3.go @@ -3,6 +3,7 @@ package third import ( "OpenIM/pkg/proto/third" "context" + "time" ) func (t *thirdServer) ApplyPut(ctx context.Context, req *third.ApplyPutReq) (*third.ApplyPutResp, error) { @@ -16,3 +17,8 @@ func (t *thirdServer) GetPut(ctx context.Context, req *third.GetPutReq) (*third. func (t *thirdServer) ConfirmPut(ctx context.Context, req *third.ConfirmPutReq) (*third.ConfirmPutResp, error) { return t.s3dataBase.ConfirmPut(ctx, req) } + +func (t *thirdServer) CleanObject(ctx context.Context, now time.Time) { + //清理过期的对象 + +} diff --git a/pkg/common/db/controller/s3.go b/pkg/common/db/controller/s3.go index 685a8b999..149e7d15f 100644 --- a/pkg/common/db/controller/s3.go +++ b/pkg/common/db/controller/s3.go @@ -4,6 +4,7 @@ import "C" import ( "OpenIM/pkg/common/db/obj" "OpenIM/pkg/common/db/table/relation" + "OpenIM/pkg/common/tracelog" "OpenIM/pkg/proto/third" "OpenIM/pkg/utils" "context" @@ -12,7 +13,6 @@ import ( "errors" "fmt" "github.com/google/uuid" - "log" "path" "strconv" "time" @@ -92,11 +92,11 @@ func (c *s3Database) UUID() string { } func (c *s3Database) HashName(hash string) string { - return path.Join("hash", hash) + return path.Join("hash", c.today(), c.UUID()) } func (c *s3Database) isNotFound(err error) bool { - return false + return relation.IsNotFound(err) } func (c *s3Database) ApplyPut(ctx context.Context, req *third.ApplyPutReq) (*third.ApplyPutResp, error) { @@ -205,46 +205,49 @@ func (c *s3Database) GetPut(ctx context.Context, req *third.GetPutReq) (*third.G } func (c *s3Database) ConfirmPut(ctx context.Context, req *third.ConfirmPutReq) (_ *third.ConfirmPutResp, _err error) { - up, err := c.put.Take(ctx, req.PutID) + put, err := c.put.Take(ctx, req.PutID) if err != nil { return nil, err } - _, pack := c.getFragmentNum(up.FragmentSize, up.ObjectSize) + _, pack := c.getFragmentNum(put.FragmentSize, put.ObjectSize) defer func() { if _err == nil { // 清理上传的碎片 - for i := 0; i < pack; i++ { - name := path.Join(up.Path, c.fragmentName(i)) - err := c.obj.DeleteObjet(ctx, &obj.BucketObject{ - Bucket: c.obj.TempBucket(), - Name: name, - }) - if err != nil { - log.Printf("delete fragment %d %s %s failed %s\n", i, c.obj.TempBucket(), name, err) - } + err := c.obj.DeleteObjet(ctx, &obj.BucketObject{Bucket: c.obj.TempBucket(), Name: put.Path}) + if err != nil { + tracelog.SetCtxWarn(ctx, "DeleteObjet", err, "Bucket", c.obj.TempBucket(), "Path", put.Path) } } }() - if up.Complete { + if put.Complete { return nil, errors.New("put completed") } now := time.Now().UnixMilli() - if up.EffectiveTime.UnixMilli() < now { + if put.EffectiveTime.UnixMilli() < now { return nil, errors.New("upload expired") } - if up.ExpirationTime != nil && up.ExpirationTime.UnixMilli() < now { + if put.ExpirationTime != nil && put.ExpirationTime.UnixMilli() < now { return nil, errors.New("object expired") } - if hash, err := c.hash.Take(ctx, up.Hash, c.obj.Name()); err == nil { + if hash, err := c.hash.Take(ctx, put.Hash, c.obj.Name()); err == nil { o := relation.ObjectInfoModel{ - Name: up.Name, + Name: put.Name, Hash: hash.Hash, - ExpirationTime: up.ExpirationTime, + ExpirationTime: put.ExpirationTime, CreateTime: time.Now(), } if err := c.info.SetObject(ctx, &o); err != nil { return nil, err } + defer func() { + err := c.obj.DeleteObjet(ctx, &obj.BucketObject{ + Bucket: c.obj.TempBucket(), + Name: put.Path, + }) + if err != nil { + tracelog.SetCtxWarn(ctx, "DeleteObjet", err, "Bucket", c.obj.TempBucket(), "Path", put.Path) + } + }() // 服务端已存在 return &third.ConfirmPutResp{ Url: c.urlName(o.Name), @@ -254,7 +257,7 @@ func (c *s3Database) ConfirmPut(ctx context.Context, req *third.ConfirmPutReq) ( } src := make([]obj.BucketObject, pack) for i := 0; i < pack; i++ { - name := path.Join(up.Path, c.fragmentName(i)) + name := path.Join(put.Path, c.fragmentName(i)) o, err := c.obj.GetObjectInfo(ctx, &obj.BucketObject{ Bucket: c.obj.TempBucket(), Name: name, @@ -263,13 +266,13 @@ func (c *s3Database) ConfirmPut(ctx context.Context, req *third.ConfirmPutReq) ( return nil, err } if i+1 == pack { // 最后一个 - size := up.ObjectSize - up.FragmentSize*int64(i) + size := put.ObjectSize - put.FragmentSize*int64(i) if size != o.Size { return nil, fmt.Errorf("last fragment %d size %d not equal to %d hash %s", i, o.Size, size, o.Hash) } } else { - if o.Size != up.FragmentSize { - return nil, fmt.Errorf("fragment %d size %d not equal to %d hash %s", i, o.Size, up.FragmentSize, o.Hash) + if o.Size != put.FragmentSize { + return nil, fmt.Errorf("fragment %d size %d not equal to %d hash %s", i, o.Size, put.FragmentSize, o.Hash) } } src[i] = obj.BucketObject{ @@ -279,7 +282,7 @@ func (c *s3Database) ConfirmPut(ctx context.Context, req *third.ConfirmPutReq) ( } dst := &obj.BucketObject{ Bucket: c.obj.DataBucket(), - Name: c.HashName(up.Hash), + Name: c.HashName(put.Hash), } if len(src) == 1 { // 未分片直接触发copy // 检查数据完整性,避免脏数据 @@ -287,11 +290,11 @@ func (c *s3Database) ConfirmPut(ctx context.Context, req *third.ConfirmPutReq) ( if err != nil { return nil, err } - if up.ObjectSize != o.Size { - return nil, fmt.Errorf("size mismatching should %d reality %d", up.ObjectSize, o.Size) + if put.ObjectSize != o.Size { + return nil, fmt.Errorf("size mismatching should %d reality %d", put.ObjectSize, o.Size) } - if up.Hash != o.Hash { - return nil, fmt.Errorf("hash mismatching should %s reality %s", up.Hash, o.Hash) + if put.Hash != o.Hash { + return nil, fmt.Errorf("hash mismatching should %s reality %s", put.Hash, o.Hash) } if err := c.obj.CopyObjet(ctx, &src[0], dst); err != nil { return nil, err @@ -299,11 +302,11 @@ func (c *s3Database) ConfirmPut(ctx context.Context, req *third.ConfirmPutReq) ( } else { tempBucket := &obj.BucketObject{ Bucket: c.obj.TempBucket(), - Name: path.Join("merge", c.today(), req.PutID, c.UUID()), + Name: path.Join(put.Path, "merge_"+c.UUID()), } defer func() { // 清理合成的文件 if err := c.obj.DeleteObjet(ctx, tempBucket); err != nil { - log.Printf("delete %s %s %s failed %s\n", c.obj.Name(), tempBucket.Bucket, tempBucket.Name, err) + tracelog.SetCtxWarn(ctx, "DeleteObjet", err, "Bucket", tempBucket.Bucket, "Path", tempBucket.Name) } }() err := c.obj.ComposeObject(ctx, src, tempBucket) @@ -314,40 +317,107 @@ func (c *s3Database) ConfirmPut(ctx context.Context, req *third.ConfirmPutReq) ( if err != nil { return nil, err } - if up.ObjectSize != info.Size { - return nil, fmt.Errorf("size mismatch should %d reality %d", up.ObjectSize, info.Size) + if put.ObjectSize != info.Size { + return nil, fmt.Errorf("size mismatch should %d reality %d", put.ObjectSize, info.Size) } - if up.Hash != info.Hash { - return nil, fmt.Errorf("hash mismatch should %s reality %s", up.Hash, info.Hash) + if put.Hash != info.Hash { + return nil, fmt.Errorf("hash mismatch should %s reality %s", put.Hash, info.Hash) } if err := c.obj.CopyObjet(ctx, tempBucket, dst); err != nil { return nil, err } } - o := &relation.ObjectInfoModel{ - Name: up.Name, - Hash: up.Hash, - ExpirationTime: up.ExpirationTime, - CreateTime: time.Now(), - } h := &relation.ObjectHashModel{ - Hash: up.Hash, - Size: up.ObjectSize, + Hash: put.Hash, Engine: c.obj.Name(), + Size: put.ObjectSize, Bucket: c.obj.DataBucket(), - Name: c.HashName(up.Hash), + Name: dst.Name, CreateTime: time.Now(), } + o := &relation.ObjectInfoModel{ + Name: put.Name, + Hash: put.Hash, + ExpirationTime: put.ExpirationTime, + CreateTime: time.Now(), + } if err := c.hash.Create(ctx, []*relation.ObjectHashModel{h}); err != nil { return nil, err } if err := c.info.SetObject(ctx, o); err != nil { return nil, err } - if err := c.put.SetCompleted(ctx, up.PutID); err != nil { - log.Printf("set uploaded %s failed %s\n", up.PutID, err) + if err := c.put.SetCompleted(ctx, put.PutID); err != nil { + tracelog.SetCtxWarn(ctx, "SetCompleted", err, "PutID", put.PutID) } return &third.ConfirmPutResp{ Url: c.urlName(o.Name), }, nil } + +func (c *s3Database) CleanExpirationObject(ctx context.Context, t time.Time) { + // 清理上传产生的临时文件 + c.cleanPutTemp(ctx, t, 10) + // 清理hash引用全过期的文件 + c.cleanExpirationObject(ctx, t) + // 清理没有引用的hash对象 + c.clearNoCitation(ctx, c.obj.Name(), 10) +} + +func (c *s3Database) cleanPutTemp(ctx context.Context, t time.Time, num int) { + for { + puts, err := c.put.FindExpirationPut(ctx, t, num) + if err != nil { + tracelog.SetCtxWarn(ctx, "FindExpirationPut", err, "Time", t, "Num", num) + return + } + if len(puts) == 0 { + return + } + for _, put := range puts { + err := c.obj.DeleteObjet(ctx, &obj.BucketObject{Bucket: c.obj.TempBucket(), Name: put.Path}) + if err != nil { + tracelog.SetCtxWarn(ctx, "DeleteObjet", err, "Bucket", c.obj.TempBucket(), "Path", put.Path) + return + } + } + ids := utils.Slice(puts, func(e *relation.ObjectPutModel) string { return e.PutID }) + err = c.put.DelPut(ctx, ids) + if err != nil { + tracelog.SetCtxWarn(ctx, "DelPut", err, "PutID", ids) + return + } + } +} + +func (c *s3Database) cleanExpirationObject(ctx context.Context, t time.Time) { + err := c.info.DeleteExpiration(ctx, t) + if err != nil { + tracelog.SetCtxWarn(ctx, "DeleteExpiration", err, "Time", t) + } +} + +func (c *s3Database) clearNoCitation(ctx context.Context, engine string, limit int) { + for { + list, err := c.hash.DeleteNoCitation(ctx, engine, limit) + if err != nil { + tracelog.SetCtxWarn(ctx, "DeleteNoCitation", err, "Engine", engine, "Limit", limit) + return + } + if len(list) == 0 { + return + } + var hasErr bool + for _, h := range list { + err := c.obj.DeleteObjet(ctx, &obj.BucketObject{Bucket: h.Bucket, Name: h.Name}) + if err != nil { + hasErr = true + tracelog.SetCtxWarn(ctx, "DeleteObjet", err, "Bucket", h.Bucket, "Path", h.Name) + continue + } + } + if hasErr { + return + } + } +} diff --git a/pkg/common/db/obj/obj.go b/pkg/common/db/obj/obj.go index 5cfb2f9b5..7e4a4492f 100644 --- a/pkg/common/db/obj/obj.go +++ b/pkg/common/db/obj/obj.go @@ -47,7 +47,7 @@ type Interface interface { GetObjectInfo(ctx context.Context, args *BucketObject) (*ObjectInfo, error) // CopyObjet 复制对象 CopyObjet(ctx context.Context, src *BucketObject, dst *BucketObject) error - // DeleteObjet 删除对象 + // DeleteObjet 删除对象(不存在返回nil) DeleteObjet(ctx context.Context, info *BucketObject) error // ComposeObject 合并对象 ComposeObject(ctx context.Context, src []BucketObject, dst *BucketObject) error diff --git a/pkg/common/db/relation/object_hash_model.go b/pkg/common/db/relation/object_hash_model.go index 9f3d28200..825b85100 100644 --- a/pkg/common/db/relation/object_hash_model.go +++ b/pkg/common/db/relation/object_hash_model.go @@ -38,3 +38,15 @@ func (o *ObjectHashGorm) Create(ctx context.Context, h []*relation.ObjectHashMod }() return utils.Wrap1(o.DB.Create(h).Error) } + +func (o *ObjectHashGorm) DeleteNoCitation(ctx context.Context, engine string, num int) (list []*relation.ObjectHashModel, err error) { + defer func() { + tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "engine", engine, "num", num, "objectHash", list) + }() + err = o.DB.Table(relation.ObjectHashModelTableName, "as h").Select("h.*"). + Joins("LEFT JOIN "+relation.ObjectInfoModelTableName+" as i ON h.hash = i.hash"). + Where("h.engine = ? AND i.hash IS NULL", engine). + Limit(num). + Find(&list).Error + return list, utils.Wrap1(err) +} diff --git a/pkg/common/db/relation/object_info_model.go b/pkg/common/db/relation/object_info_model.go index 54b11ec8e..4b60ce82a 100644 --- a/pkg/common/db/relation/object_info_model.go +++ b/pkg/common/db/relation/object_info_model.go @@ -6,6 +6,7 @@ import ( "OpenIM/pkg/utils" "context" "gorm.io/gorm" + "time" ) func NewObjectInfo(db *gorm.DB) relation.ObjectInfoModelInterface { @@ -43,3 +44,10 @@ func (o *ObjectInfoGorm) Take(ctx context.Context, name string) (info *relation. info = &relation.ObjectInfoModel{} return info, utils.Wrap1(o.DB.Where("name = ?", name).Take(info).Error) } + +func (o *ObjectInfoGorm) DeleteExpiration(ctx context.Context, expiration time.Time) (err error) { + defer func() { + tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "expiration", expiration) + }() + return utils.Wrap1(o.DB.Where("expiration_time IS NOT NULL AND expiration_time <= ?", expiration).Delete(&relation.ObjectInfoModel{}).Error) +} diff --git a/pkg/common/db/relation/object_put_model.go b/pkg/common/db/relation/object_put_model.go index 818ee7e07..5e92b81b3 100644 --- a/pkg/common/db/relation/object_put_model.go +++ b/pkg/common/db/relation/object_put_model.go @@ -6,6 +6,7 @@ import ( "OpenIM/pkg/utils" "context" "gorm.io/gorm" + "time" ) func NewObjectPut(db *gorm.DB) relation.ObjectPutModelInterface { @@ -45,3 +46,18 @@ func (o *ObjectPutGorm) SetCompleted(ctx context.Context, putID string) (err err }() return utils.Wrap1(o.DB.Model(&relation.ObjectPutModel{}).Where("put_id = ?", putID).Update("complete", true).Error) } + +func (o *ObjectPutGorm) FindExpirationPut(ctx context.Context, expirationTime time.Time, num int) (list []*relation.ObjectPutModel, err error) { + defer func() { + tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "expirationTime", expirationTime, "num", num, "list", list) + }() + err = o.DB.Where("effective_time <= ?", expirationTime).Limit(num).Find(&list).Error + return list, utils.Wrap1(err) +} + +func (o *ObjectPutGorm) DelPut(ctx context.Context, ids []string) (err error) { + defer func() { + tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ids", ids) + }() + return utils.Wrap1(o.DB.Where("put_id IN ?", ids).Delete(&relation.ObjectPutModel{}).Error) +} diff --git a/pkg/common/db/table/relation/object_hash.go b/pkg/common/db/table/relation/object_hash.go index f3d161697..86d038fc4 100644 --- a/pkg/common/db/table/relation/object_hash.go +++ b/pkg/common/db/table/relation/object_hash.go @@ -26,4 +26,5 @@ type ObjectHashModelInterface interface { NewTx(tx any) ObjectHashModelInterface Take(ctx context.Context, hash string, engine string) (*ObjectHashModel, error) Create(ctx context.Context, h []*ObjectHashModel) error + DeleteNoCitation(ctx context.Context, engine string, num int) (list []*ObjectHashModel, err error) } diff --git a/pkg/common/db/table/relation/object_info.go b/pkg/common/db/table/relation/object_info.go index e96996ef3..cc1d75dd1 100644 --- a/pkg/common/db/table/relation/object_info.go +++ b/pkg/common/db/table/relation/object_info.go @@ -24,4 +24,5 @@ type ObjectInfoModelInterface interface { NewTx(tx any) ObjectInfoModelInterface SetObject(ctx context.Context, obj *ObjectInfoModel) error Take(ctx context.Context, name string) (*ObjectInfoModel, error) + DeleteExpiration(ctx context.Context, expiration time.Time) error } diff --git a/pkg/common/db/table/relation/object_put.go b/pkg/common/db/table/relation/object_put.go index 73d0478c7..e914519f6 100644 --- a/pkg/common/db/table/relation/object_put.go +++ b/pkg/common/db/table/relation/object_put.go @@ -31,4 +31,6 @@ type ObjectPutModelInterface interface { Create(ctx context.Context, m []*ObjectPutModel) error Take(ctx context.Context, putID string) (*ObjectPutModel, error) SetCompleted(ctx context.Context, putID string) error + FindExpirationPut(ctx context.Context, expirationTime time.Time, num int) ([]*ObjectPutModel, error) + DelPut(ctx context.Context, ids []string) error } diff --git a/pkg/common/db/table/relation/utils.go b/pkg/common/db/table/relation/utils.go index a046de1e8..3e0d09820 100644 --- a/pkg/common/db/table/relation/utils.go +++ b/pkg/common/db/table/relation/utils.go @@ -1,5 +1,10 @@ package relation +import ( + "OpenIM/pkg/utils" + "gorm.io/gorm" +) + type BatchUpdateGroupMember struct { GroupID string UserID string @@ -10,3 +15,7 @@ type GroupSimpleUserID struct { Hash uint64 MemberNum uint32 } + +func IsNotFound(err error) bool { + return utils.Unwrap(err) == gorm.ErrRecordNotFound +}