From 4629b03272dadb074e2472edad08fb205ddd4c4b Mon Sep 17 00:00:00 2001 From: withchao <48119764+withchao@users.noreply.github.com> Date: Wed, 19 Jul 2023 16:17:46 +0800 Subject: [PATCH] fix: optimize minio initialization (#602) --- pkg/common/db/s3/minio/minio.go | 76 ++++++++++++++++++++++++++++----- 1 file changed, 65 insertions(+), 11 deletions(-) diff --git a/pkg/common/db/s3/minio/minio.go b/pkg/common/db/s3/minio/minio.go index baf9b64e6..bb6e6ce24 100644 --- a/pkg/common/db/s3/minio/minio.go +++ b/pkg/common/db/s3/minio/minio.go @@ -22,6 +22,7 @@ import ( "net/url" "strconv" "strings" + "sync" "time" "github.com/minio/minio-go/v7" @@ -56,21 +57,20 @@ func NewMinio() (s3.Interface, error) { if err != nil { return nil, err } - exists, err := client.BucketExists(context.Background(), conf.Bucket) - if err != nil { - return nil, err - } - if !exists { - if err := client.MakeBucket(context.Background(), conf.Bucket, minio.MakeBucketOptions{}); err != nil { - return nil, err - } - } - return &Minio{ + m := &Minio{ bucket: conf.Bucket, bucketURL: conf.Endpoint + "/" + conf.Bucket + "/", opts: opts, core: &minio.Core{Client: client}, - }, nil + lock: &sync.Mutex{}, + init: false, + } + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if err := m.initMinio(ctx); err != nil { + fmt.Println("init minio error:", err) + } + return m, nil } type Minio struct { @@ -78,6 +78,30 @@ type Minio struct { bucketURL string opts *minio.Options core *minio.Core + lock sync.Locker + init bool +} + +func (m *Minio) initMinio(ctx context.Context) error { + if m.init { + return nil + } + m.lock.Lock() + defer m.lock.Unlock() + if m.init { + return nil + } + exists, err := m.core.Client.BucketExists(ctx, config.Config.Object.Minio.Bucket) + if err != nil { + return fmt.Errorf("check bucket exists error: %w", err) + } + if !exists { + if err := m.core.Client.MakeBucket(ctx, config.Config.Object.Minio.Bucket, minio.MakeBucketOptions{}); err != nil { + return fmt.Errorf("make bucket error: %w", err) + } + } + m.init = true + return nil } func (m *Minio) Engine() string { @@ -93,6 +117,9 @@ func (m *Minio) PartLimit() *s3.PartLimit { } func (m *Minio) InitiateMultipartUpload(ctx context.Context, name string) (*s3.InitiateMultipartUploadResult, error) { + if err := m.initMinio(ctx); err != nil { + return nil, err + } uploadID, err := m.core.NewMultipartUpload(ctx, m.bucket, name, minio.PutObjectOptions{}) if err != nil { return nil, err @@ -105,6 +132,9 @@ func (m *Minio) InitiateMultipartUpload(ctx context.Context, name string) (*s3.I } func (m *Minio) CompleteMultipartUpload(ctx context.Context, uploadID string, name string, parts []s3.Part) (*s3.CompleteMultipartUploadResult, error) { + if err := m.initMinio(ctx); err != nil { + return nil, err + } minioParts := make([]minio.CompletePart, len(parts)) for i, part := range parts { minioParts[i] = minio.CompletePart{ @@ -142,6 +172,9 @@ func (m *Minio) PartSize(ctx context.Context, size int64) (int64, error) { } func (m *Minio) AuthSign(ctx context.Context, uploadID string, name string, expire time.Duration, partNumbers []int) (*s3.AuthSignResult, error) { + if err := m.initMinio(ctx); err != nil { + return nil, err + } creds, err := m.opts.Creds.Get() if err != nil { return nil, err @@ -170,6 +203,9 @@ func (m *Minio) AuthSign(ctx context.Context, uploadID string, name string, expi } func (m *Minio) PresignedPutObject(ctx context.Context, name string, expire time.Duration) (string, error) { + if err := m.initMinio(ctx); err != nil { + return "", err + } rawURL, err := m.core.Client.PresignedPutObject(ctx, m.bucket, name, expire) if err != nil { return "", err @@ -178,10 +214,16 @@ func (m *Minio) PresignedPutObject(ctx context.Context, name string, expire time } func (m *Minio) DeleteObject(ctx context.Context, name string) error { + if err := m.initMinio(ctx); err != nil { + return err + } return m.core.Client.RemoveObject(ctx, m.bucket, name, minio.RemoveObjectOptions{}) } func (m *Minio) StatObject(ctx context.Context, name string) (*s3.ObjectInfo, error) { + if err := m.initMinio(ctx); err != nil { + return nil, err + } info, err := m.core.Client.StatObject(ctx, m.bucket, name, minio.StatObjectOptions{}) if err != nil { return nil, err @@ -195,6 +237,9 @@ func (m *Minio) StatObject(ctx context.Context, name string) (*s3.ObjectInfo, er } func (m *Minio) CopyObject(ctx context.Context, src string, dst string) (*s3.CopyObjectInfo, error) { + if err := m.initMinio(ctx); err != nil { + return nil, err + } result, err := m.core.Client.CopyObject(ctx, minio.CopyDestOptions{ Bucket: m.bucket, Object: dst, @@ -226,10 +271,16 @@ func (m *Minio) IsNotFound(err error) bool { } func (m *Minio) AbortMultipartUpload(ctx context.Context, uploadID string, name string) error { + if err := m.initMinio(ctx); err != nil { + return err + } return m.core.AbortMultipartUpload(ctx, m.bucket, name, uploadID) } func (m *Minio) ListUploadedParts(ctx context.Context, uploadID string, name string, partNumberMarker int, maxParts int) (*s3.ListUploadedPartsResult, error) { + if err := m.initMinio(ctx); err != nil { + return nil, err + } result, err := m.core.ListObjectParts(ctx, m.bucket, name, uploadID, partNumberMarker, maxParts) if err != nil { return nil, err @@ -253,6 +304,9 @@ func (m *Minio) ListUploadedParts(ctx context.Context, uploadID string, name str } func (m *Minio) AccessURL(ctx context.Context, name string, expire time.Duration, opt *s3.AccessURLOption) (string, error) { + if err := m.initMinio(ctx); err != nil { + return "", err + } reqParams := make(url.Values) if opt != nil { if opt.ContentType != "" {