mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-06 04:15:46 +08:00
s3 complete
This commit is contained in:
parent
a2674bfe9f
commit
3ea0d1f765
@ -3,6 +3,7 @@ package third
|
||||
import (
|
||||
"OpenIM/pkg/proto/third"
|
||||
"context"
|
||||
"time"
|
||||
)
|
||||
|
||||
func (t *thirdServer) ApplyPut(ctx context.Context, req *third.ApplyPutReq) (*third.ApplyPutResp, error) {
|
||||
@ -16,3 +17,8 @@ func (t *thirdServer) GetPut(ctx context.Context, req *third.GetPutReq) (*third.
|
||||
func (t *thirdServer) ConfirmPut(ctx context.Context, req *third.ConfirmPutReq) (*third.ConfirmPutResp, error) {
|
||||
return t.s3dataBase.ConfirmPut(ctx, req)
|
||||
}
|
||||
|
||||
func (t *thirdServer) CleanObject(ctx context.Context, now time.Time) {
|
||||
//清理过期的对象
|
||||
|
||||
}
|
||||
|
@ -4,6 +4,7 @@ import "C"
|
||||
import (
|
||||
"OpenIM/pkg/common/db/obj"
|
||||
"OpenIM/pkg/common/db/table/relation"
|
||||
"OpenIM/pkg/common/tracelog"
|
||||
"OpenIM/pkg/proto/third"
|
||||
"OpenIM/pkg/utils"
|
||||
"context"
|
||||
@ -12,7 +13,6 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/google/uuid"
|
||||
"log"
|
||||
"path"
|
||||
"strconv"
|
||||
"time"
|
||||
@ -92,11 +92,11 @@ func (c *s3Database) UUID() string {
|
||||
}
|
||||
|
||||
func (c *s3Database) HashName(hash string) string {
|
||||
return path.Join("hash", hash)
|
||||
return path.Join("hash", c.today(), c.UUID())
|
||||
}
|
||||
|
||||
func (c *s3Database) isNotFound(err error) bool {
|
||||
return false
|
||||
return relation.IsNotFound(err)
|
||||
}
|
||||
|
||||
func (c *s3Database) ApplyPut(ctx context.Context, req *third.ApplyPutReq) (*third.ApplyPutResp, error) {
|
||||
@ -205,46 +205,49 @@ func (c *s3Database) GetPut(ctx context.Context, req *third.GetPutReq) (*third.G
|
||||
}
|
||||
|
||||
func (c *s3Database) ConfirmPut(ctx context.Context, req *third.ConfirmPutReq) (_ *third.ConfirmPutResp, _err error) {
|
||||
up, err := c.put.Take(ctx, req.PutID)
|
||||
put, err := c.put.Take(ctx, req.PutID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
_, pack := c.getFragmentNum(up.FragmentSize, up.ObjectSize)
|
||||
_, pack := c.getFragmentNum(put.FragmentSize, put.ObjectSize)
|
||||
defer func() {
|
||||
if _err == nil {
|
||||
// 清理上传的碎片
|
||||
for i := 0; i < pack; i++ {
|
||||
name := path.Join(up.Path, c.fragmentName(i))
|
||||
err := c.obj.DeleteObjet(ctx, &obj.BucketObject{
|
||||
Bucket: c.obj.TempBucket(),
|
||||
Name: name,
|
||||
})
|
||||
if err != nil {
|
||||
log.Printf("delete fragment %d %s %s failed %s\n", i, c.obj.TempBucket(), name, err)
|
||||
}
|
||||
err := c.obj.DeleteObjet(ctx, &obj.BucketObject{Bucket: c.obj.TempBucket(), Name: put.Path})
|
||||
if err != nil {
|
||||
tracelog.SetCtxWarn(ctx, "DeleteObjet", err, "Bucket", c.obj.TempBucket(), "Path", put.Path)
|
||||
}
|
||||
}
|
||||
}()
|
||||
if up.Complete {
|
||||
if put.Complete {
|
||||
return nil, errors.New("put completed")
|
||||
}
|
||||
now := time.Now().UnixMilli()
|
||||
if up.EffectiveTime.UnixMilli() < now {
|
||||
if put.EffectiveTime.UnixMilli() < now {
|
||||
return nil, errors.New("upload expired")
|
||||
}
|
||||
if up.ExpirationTime != nil && up.ExpirationTime.UnixMilli() < now {
|
||||
if put.ExpirationTime != nil && put.ExpirationTime.UnixMilli() < now {
|
||||
return nil, errors.New("object expired")
|
||||
}
|
||||
if hash, err := c.hash.Take(ctx, up.Hash, c.obj.Name()); err == nil {
|
||||
if hash, err := c.hash.Take(ctx, put.Hash, c.obj.Name()); err == nil {
|
||||
o := relation.ObjectInfoModel{
|
||||
Name: up.Name,
|
||||
Name: put.Name,
|
||||
Hash: hash.Hash,
|
||||
ExpirationTime: up.ExpirationTime,
|
||||
ExpirationTime: put.ExpirationTime,
|
||||
CreateTime: time.Now(),
|
||||
}
|
||||
if err := c.info.SetObject(ctx, &o); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() {
|
||||
err := c.obj.DeleteObjet(ctx, &obj.BucketObject{
|
||||
Bucket: c.obj.TempBucket(),
|
||||
Name: put.Path,
|
||||
})
|
||||
if err != nil {
|
||||
tracelog.SetCtxWarn(ctx, "DeleteObjet", err, "Bucket", c.obj.TempBucket(), "Path", put.Path)
|
||||
}
|
||||
}()
|
||||
// 服务端已存在
|
||||
return &third.ConfirmPutResp{
|
||||
Url: c.urlName(o.Name),
|
||||
@ -254,7 +257,7 @@ func (c *s3Database) ConfirmPut(ctx context.Context, req *third.ConfirmPutReq) (
|
||||
}
|
||||
src := make([]obj.BucketObject, pack)
|
||||
for i := 0; i < pack; i++ {
|
||||
name := path.Join(up.Path, c.fragmentName(i))
|
||||
name := path.Join(put.Path, c.fragmentName(i))
|
||||
o, err := c.obj.GetObjectInfo(ctx, &obj.BucketObject{
|
||||
Bucket: c.obj.TempBucket(),
|
||||
Name: name,
|
||||
@ -263,13 +266,13 @@ func (c *s3Database) ConfirmPut(ctx context.Context, req *third.ConfirmPutReq) (
|
||||
return nil, err
|
||||
}
|
||||
if i+1 == pack { // 最后一个
|
||||
size := up.ObjectSize - up.FragmentSize*int64(i)
|
||||
size := put.ObjectSize - put.FragmentSize*int64(i)
|
||||
if size != o.Size {
|
||||
return nil, fmt.Errorf("last fragment %d size %d not equal to %d hash %s", i, o.Size, size, o.Hash)
|
||||
}
|
||||
} else {
|
||||
if o.Size != up.FragmentSize {
|
||||
return nil, fmt.Errorf("fragment %d size %d not equal to %d hash %s", i, o.Size, up.FragmentSize, o.Hash)
|
||||
if o.Size != put.FragmentSize {
|
||||
return nil, fmt.Errorf("fragment %d size %d not equal to %d hash %s", i, o.Size, put.FragmentSize, o.Hash)
|
||||
}
|
||||
}
|
||||
src[i] = obj.BucketObject{
|
||||
@ -279,7 +282,7 @@ func (c *s3Database) ConfirmPut(ctx context.Context, req *third.ConfirmPutReq) (
|
||||
}
|
||||
dst := &obj.BucketObject{
|
||||
Bucket: c.obj.DataBucket(),
|
||||
Name: c.HashName(up.Hash),
|
||||
Name: c.HashName(put.Hash),
|
||||
}
|
||||
if len(src) == 1 { // 未分片直接触发copy
|
||||
// 检查数据完整性,避免脏数据
|
||||
@ -287,11 +290,11 @@ func (c *s3Database) ConfirmPut(ctx context.Context, req *third.ConfirmPutReq) (
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if up.ObjectSize != o.Size {
|
||||
return nil, fmt.Errorf("size mismatching should %d reality %d", up.ObjectSize, o.Size)
|
||||
if put.ObjectSize != o.Size {
|
||||
return nil, fmt.Errorf("size mismatching should %d reality %d", put.ObjectSize, o.Size)
|
||||
}
|
||||
if up.Hash != o.Hash {
|
||||
return nil, fmt.Errorf("hash mismatching should %s reality %s", up.Hash, o.Hash)
|
||||
if put.Hash != o.Hash {
|
||||
return nil, fmt.Errorf("hash mismatching should %s reality %s", put.Hash, o.Hash)
|
||||
}
|
||||
if err := c.obj.CopyObjet(ctx, &src[0], dst); err != nil {
|
||||
return nil, err
|
||||
@ -299,11 +302,11 @@ func (c *s3Database) ConfirmPut(ctx context.Context, req *third.ConfirmPutReq) (
|
||||
} else {
|
||||
tempBucket := &obj.BucketObject{
|
||||
Bucket: c.obj.TempBucket(),
|
||||
Name: path.Join("merge", c.today(), req.PutID, c.UUID()),
|
||||
Name: path.Join(put.Path, "merge_"+c.UUID()),
|
||||
}
|
||||
defer func() { // 清理合成的文件
|
||||
if err := c.obj.DeleteObjet(ctx, tempBucket); err != nil {
|
||||
log.Printf("delete %s %s %s failed %s\n", c.obj.Name(), tempBucket.Bucket, tempBucket.Name, err)
|
||||
tracelog.SetCtxWarn(ctx, "DeleteObjet", err, "Bucket", tempBucket.Bucket, "Path", tempBucket.Name)
|
||||
}
|
||||
}()
|
||||
err := c.obj.ComposeObject(ctx, src, tempBucket)
|
||||
@ -314,40 +317,107 @@ func (c *s3Database) ConfirmPut(ctx context.Context, req *third.ConfirmPutReq) (
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if up.ObjectSize != info.Size {
|
||||
return nil, fmt.Errorf("size mismatch should %d reality %d", up.ObjectSize, info.Size)
|
||||
if put.ObjectSize != info.Size {
|
||||
return nil, fmt.Errorf("size mismatch should %d reality %d", put.ObjectSize, info.Size)
|
||||
}
|
||||
if up.Hash != info.Hash {
|
||||
return nil, fmt.Errorf("hash mismatch should %s reality %s", up.Hash, info.Hash)
|
||||
if put.Hash != info.Hash {
|
||||
return nil, fmt.Errorf("hash mismatch should %s reality %s", put.Hash, info.Hash)
|
||||
}
|
||||
if err := c.obj.CopyObjet(ctx, tempBucket, dst); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
o := &relation.ObjectInfoModel{
|
||||
Name: up.Name,
|
||||
Hash: up.Hash,
|
||||
ExpirationTime: up.ExpirationTime,
|
||||
CreateTime: time.Now(),
|
||||
}
|
||||
h := &relation.ObjectHashModel{
|
||||
Hash: up.Hash,
|
||||
Size: up.ObjectSize,
|
||||
Hash: put.Hash,
|
||||
Engine: c.obj.Name(),
|
||||
Size: put.ObjectSize,
|
||||
Bucket: c.obj.DataBucket(),
|
||||
Name: c.HashName(up.Hash),
|
||||
Name: dst.Name,
|
||||
CreateTime: time.Now(),
|
||||
}
|
||||
o := &relation.ObjectInfoModel{
|
||||
Name: put.Name,
|
||||
Hash: put.Hash,
|
||||
ExpirationTime: put.ExpirationTime,
|
||||
CreateTime: time.Now(),
|
||||
}
|
||||
if err := c.hash.Create(ctx, []*relation.ObjectHashModel{h}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := c.info.SetObject(ctx, o); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := c.put.SetCompleted(ctx, up.PutID); err != nil {
|
||||
log.Printf("set uploaded %s failed %s\n", up.PutID, err)
|
||||
if err := c.put.SetCompleted(ctx, put.PutID); err != nil {
|
||||
tracelog.SetCtxWarn(ctx, "SetCompleted", err, "PutID", put.PutID)
|
||||
}
|
||||
return &third.ConfirmPutResp{
|
||||
Url: c.urlName(o.Name),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *s3Database) CleanExpirationObject(ctx context.Context, t time.Time) {
|
||||
// 清理上传产生的临时文件
|
||||
c.cleanPutTemp(ctx, t, 10)
|
||||
// 清理hash引用全过期的文件
|
||||
c.cleanExpirationObject(ctx, t)
|
||||
// 清理没有引用的hash对象
|
||||
c.clearNoCitation(ctx, c.obj.Name(), 10)
|
||||
}
|
||||
|
||||
func (c *s3Database) cleanPutTemp(ctx context.Context, t time.Time, num int) {
|
||||
for {
|
||||
puts, err := c.put.FindExpirationPut(ctx, t, num)
|
||||
if err != nil {
|
||||
tracelog.SetCtxWarn(ctx, "FindExpirationPut", err, "Time", t, "Num", num)
|
||||
return
|
||||
}
|
||||
if len(puts) == 0 {
|
||||
return
|
||||
}
|
||||
for _, put := range puts {
|
||||
err := c.obj.DeleteObjet(ctx, &obj.BucketObject{Bucket: c.obj.TempBucket(), Name: put.Path})
|
||||
if err != nil {
|
||||
tracelog.SetCtxWarn(ctx, "DeleteObjet", err, "Bucket", c.obj.TempBucket(), "Path", put.Path)
|
||||
return
|
||||
}
|
||||
}
|
||||
ids := utils.Slice(puts, func(e *relation.ObjectPutModel) string { return e.PutID })
|
||||
err = c.put.DelPut(ctx, ids)
|
||||
if err != nil {
|
||||
tracelog.SetCtxWarn(ctx, "DelPut", err, "PutID", ids)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *s3Database) cleanExpirationObject(ctx context.Context, t time.Time) {
|
||||
err := c.info.DeleteExpiration(ctx, t)
|
||||
if err != nil {
|
||||
tracelog.SetCtxWarn(ctx, "DeleteExpiration", err, "Time", t)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *s3Database) clearNoCitation(ctx context.Context, engine string, limit int) {
|
||||
for {
|
||||
list, err := c.hash.DeleteNoCitation(ctx, engine, limit)
|
||||
if err != nil {
|
||||
tracelog.SetCtxWarn(ctx, "DeleteNoCitation", err, "Engine", engine, "Limit", limit)
|
||||
return
|
||||
}
|
||||
if len(list) == 0 {
|
||||
return
|
||||
}
|
||||
var hasErr bool
|
||||
for _, h := range list {
|
||||
err := c.obj.DeleteObjet(ctx, &obj.BucketObject{Bucket: h.Bucket, Name: h.Name})
|
||||
if err != nil {
|
||||
hasErr = true
|
||||
tracelog.SetCtxWarn(ctx, "DeleteObjet", err, "Bucket", h.Bucket, "Path", h.Name)
|
||||
continue
|
||||
}
|
||||
}
|
||||
if hasErr {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -47,7 +47,7 @@ type Interface interface {
|
||||
GetObjectInfo(ctx context.Context, args *BucketObject) (*ObjectInfo, error)
|
||||
// CopyObjet 复制对象
|
||||
CopyObjet(ctx context.Context, src *BucketObject, dst *BucketObject) error
|
||||
// DeleteObjet 删除对象
|
||||
// DeleteObjet 删除对象(不存在返回nil)
|
||||
DeleteObjet(ctx context.Context, info *BucketObject) error
|
||||
// ComposeObject 合并对象
|
||||
ComposeObject(ctx context.Context, src []BucketObject, dst *BucketObject) error
|
||||
|
@ -38,3 +38,15 @@ func (o *ObjectHashGorm) Create(ctx context.Context, h []*relation.ObjectHashMod
|
||||
}()
|
||||
return utils.Wrap1(o.DB.Create(h).Error)
|
||||
}
|
||||
|
||||
func (o *ObjectHashGorm) DeleteNoCitation(ctx context.Context, engine string, num int) (list []*relation.ObjectHashModel, err error) {
|
||||
defer func() {
|
||||
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "engine", engine, "num", num, "objectHash", list)
|
||||
}()
|
||||
err = o.DB.Table(relation.ObjectHashModelTableName, "as h").Select("h.*").
|
||||
Joins("LEFT JOIN "+relation.ObjectInfoModelTableName+" as i ON h.hash = i.hash").
|
||||
Where("h.engine = ? AND i.hash IS NULL", engine).
|
||||
Limit(num).
|
||||
Find(&list).Error
|
||||
return list, utils.Wrap1(err)
|
||||
}
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
"OpenIM/pkg/utils"
|
||||
"context"
|
||||
"gorm.io/gorm"
|
||||
"time"
|
||||
)
|
||||
|
||||
func NewObjectInfo(db *gorm.DB) relation.ObjectInfoModelInterface {
|
||||
@ -43,3 +44,10 @@ func (o *ObjectInfoGorm) Take(ctx context.Context, name string) (info *relation.
|
||||
info = &relation.ObjectInfoModel{}
|
||||
return info, utils.Wrap1(o.DB.Where("name = ?", name).Take(info).Error)
|
||||
}
|
||||
|
||||
func (o *ObjectInfoGorm) DeleteExpiration(ctx context.Context, expiration time.Time) (err error) {
|
||||
defer func() {
|
||||
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "expiration", expiration)
|
||||
}()
|
||||
return utils.Wrap1(o.DB.Where("expiration_time IS NOT NULL AND expiration_time <= ?", expiration).Delete(&relation.ObjectInfoModel{}).Error)
|
||||
}
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
"OpenIM/pkg/utils"
|
||||
"context"
|
||||
"gorm.io/gorm"
|
||||
"time"
|
||||
)
|
||||
|
||||
func NewObjectPut(db *gorm.DB) relation.ObjectPutModelInterface {
|
||||
@ -45,3 +46,18 @@ func (o *ObjectPutGorm) SetCompleted(ctx context.Context, putID string) (err err
|
||||
}()
|
||||
return utils.Wrap1(o.DB.Model(&relation.ObjectPutModel{}).Where("put_id = ?", putID).Update("complete", true).Error)
|
||||
}
|
||||
|
||||
func (o *ObjectPutGorm) FindExpirationPut(ctx context.Context, expirationTime time.Time, num int) (list []*relation.ObjectPutModel, err error) {
|
||||
defer func() {
|
||||
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "expirationTime", expirationTime, "num", num, "list", list)
|
||||
}()
|
||||
err = o.DB.Where("effective_time <= ?", expirationTime).Limit(num).Find(&list).Error
|
||||
return list, utils.Wrap1(err)
|
||||
}
|
||||
|
||||
func (o *ObjectPutGorm) DelPut(ctx context.Context, ids []string) (err error) {
|
||||
defer func() {
|
||||
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ids", ids)
|
||||
}()
|
||||
return utils.Wrap1(o.DB.Where("put_id IN ?", ids).Delete(&relation.ObjectPutModel{}).Error)
|
||||
}
|
||||
|
@ -26,4 +26,5 @@ type ObjectHashModelInterface interface {
|
||||
NewTx(tx any) ObjectHashModelInterface
|
||||
Take(ctx context.Context, hash string, engine string) (*ObjectHashModel, error)
|
||||
Create(ctx context.Context, h []*ObjectHashModel) error
|
||||
DeleteNoCitation(ctx context.Context, engine string, num int) (list []*ObjectHashModel, err error)
|
||||
}
|
||||
|
@ -24,4 +24,5 @@ type ObjectInfoModelInterface interface {
|
||||
NewTx(tx any) ObjectInfoModelInterface
|
||||
SetObject(ctx context.Context, obj *ObjectInfoModel) error
|
||||
Take(ctx context.Context, name string) (*ObjectInfoModel, error)
|
||||
DeleteExpiration(ctx context.Context, expiration time.Time) error
|
||||
}
|
||||
|
@ -31,4 +31,6 @@ type ObjectPutModelInterface interface {
|
||||
Create(ctx context.Context, m []*ObjectPutModel) error
|
||||
Take(ctx context.Context, putID string) (*ObjectPutModel, error)
|
||||
SetCompleted(ctx context.Context, putID string) error
|
||||
FindExpirationPut(ctx context.Context, expirationTime time.Time, num int) ([]*ObjectPutModel, error)
|
||||
DelPut(ctx context.Context, ids []string) error
|
||||
}
|
||||
|
@ -1,5 +1,10 @@
|
||||
package relation
|
||||
|
||||
import (
|
||||
"OpenIM/pkg/utils"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
type BatchUpdateGroupMember struct {
|
||||
GroupID string
|
||||
UserID string
|
||||
@ -10,3 +15,7 @@ type GroupSimpleUserID struct {
|
||||
Hash uint64
|
||||
MemberNum uint32
|
||||
}
|
||||
|
||||
func IsNotFound(err error) bool {
|
||||
return utils.Unwrap(err) == gorm.ErrRecordNotFound
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user