diff --git a/pkg/common/storage/database/mgo/object.go b/pkg/common/storage/database/mgo/object.go index 624eb84a0..ee48c6693 100644 --- a/pkg/common/storage/database/mgo/object.go +++ b/pkg/common/storage/database/mgo/object.go @@ -112,3 +112,15 @@ func (o *S3Mongo) FindExpirationObject(ctx context.Context, engine string, expir func (o *S3Mongo) GetKeyCount(ctx context.Context, engine string, key string) (int64, error) { return mongoutil.Count(ctx, o.coll, bson.M{"engine": engine, "key": key}) } + +func (o *S3Mongo) GetEngineCount(ctx context.Context, engine string) (int64, error) { + return mongoutil.Count(ctx, o.coll, bson.M{"engine": engine}) +} + +func (o *S3Mongo) GetEngineInfo(ctx context.Context, engine string, limit int, skip int) ([]*model.Object, error) { + return mongoutil.Find[*model.Object](ctx, o.coll, bson.M{"engine": engine}, options.Find().SetLimit(int64(limit)).SetSkip(int64(skip))) +} + +func (o *S3Mongo) UpdateEngine(ctx context.Context, oldEngine, oldName string, newEngine string) error { + return mongoutil.UpdateOne(ctx, o.coll, bson.M{"engine": oldEngine, "name": oldName}, bson.M{"$set": bson.M{"engine": newEngine}}, false) +} diff --git a/pkg/common/storage/database/object.go b/pkg/common/storage/database/object.go index 5541a159b..a0e4ebe2b 100644 --- a/pkg/common/storage/database/object.go +++ b/pkg/common/storage/database/object.go @@ -27,4 +27,8 @@ type ObjectInfo interface { Delete(ctx context.Context, engine string, name []string) error FindExpirationObject(ctx context.Context, engine string, expiration time.Time, needDelType []string, count int64) ([]*model.Object, error) GetKeyCount(ctx context.Context, engine string, key string) (int64, error) + + GetEngineCount(ctx context.Context, engine string) (int64, error) + GetEngineInfo(ctx context.Context, engine string, limit int, skip int) ([]*model.Object, error) + UpdateEngine(ctx context.Context, oldEngine, oldName string, newEngine string) error } diff --git a/tools/s3/internal/conversion.go b/tools/s3/internal/conversion.go new file mode 100644 index 000000000..ba2174535 --- /dev/null +++ b/tools/s3/internal/conversion.go @@ -0,0 +1,202 @@ +package internal + +import ( + "context" + "errors" + "fmt" + "github.com/mitchellh/mapstructure" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo" + "github.com/openimsdk/tools/db/mongoutil" + "github.com/openimsdk/tools/db/redisutil" + "github.com/openimsdk/tools/s3" + "github.com/openimsdk/tools/s3/aws" + "github.com/openimsdk/tools/s3/cos" + "github.com/openimsdk/tools/s3/kodo" + "github.com/openimsdk/tools/s3/minio" + "github.com/openimsdk/tools/s3/oss" + "github.com/spf13/viper" + "go.mongodb.org/mongo-driver/mongo" + "log" + "net/http" + "path/filepath" + "time" +) + +const defaultTimeout = time.Second * 10 + +func readConf(path string, val any) error { + v := viper.New() + v.SetConfigFile(path) + if err := v.ReadInConfig(); err != nil { + return err + } + fn := func(config *mapstructure.DecoderConfig) { + config.TagName = "mapstructure" + } + return v.Unmarshal(val, fn) +} + +func getS3(path string, name string, thirdConf *config.Third) (s3.Interface, error) { + switch name { + case "minio": + ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) + defer cancel() + var minioConf config.Minio + if err := readConf(filepath.Join(path, minioConf.GetConfigFileName()), &minioConf); err != nil { + return nil, err + } + var redisConf config.Redis + if err := readConf(filepath.Join(path, redisConf.GetConfigFileName()), &redisConf); err != nil { + return nil, err + } + rdb, err := redisutil.NewRedisClient(ctx, redisConf.Build()) + if err != nil { + return nil, err + } + return minio.NewMinio(ctx, redis.NewMinioCache(rdb), *minioConf.Build()) + case "cos": + return cos.NewCos(*thirdConf.Object.Cos.Build()) + case "oss": + return oss.NewOSS(*thirdConf.Object.Oss.Build()) + case "kodo": + return kodo.NewKodo(*thirdConf.Object.Kodo.Build()) + case "aws": + return aws.NewAws(*thirdConf.Object.Aws.Build()) + default: + return nil, fmt.Errorf("invalid object enable: %s", name) + } +} + +func getMongo(path string) (database.ObjectInfo, error) { + var mongoConf config.Mongo + if err := readConf(filepath.Join(path, mongoConf.GetConfigFileName()), &mongoConf); err != nil { + return nil, err + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) + defer cancel() + mgocli, err := mongoutil.NewMongoDB(ctx, mongoConf.Build()) + if err != nil { + return nil, err + } + return mgo.NewS3Mongo(mgocli.GetDB()) +} + +func Main(path string, engine string) error { + var thirdConf config.Third + if err := readConf(filepath.Join(path, thirdConf.GetConfigFileName()), &thirdConf); err != nil { + return err + } + if thirdConf.Object.Enable == engine { + return errors.New("same s3 storage") + } + s3db, err := getMongo(path) + if err != nil { + return err + } + oldS3, err := getS3(path, engine, &thirdConf) + if err != nil { + return err + } + newS3, err := getS3(path, thirdConf.Object.Enable, &thirdConf) + if err != nil { + return err + } + count, err := getEngineCount(s3db, oldS3.Engine()) + if err != nil { + return err + } + log.Printf("engine %s count: %d", oldS3.Engine(), count) + var skip int + for i := 1; i <= count+1; i++ { + log.Printf("start %d/%d", i, count) + start := time.Now() + res, err := doObject(s3db, newS3, oldS3, skip) + if err != nil { + log.Printf("end [%s] %d/%d error %s", time.Since(start), i, count, err) + return err + } + log.Printf("end [%s] %d/%d result %+v", time.Since(start), i, count, *res) + if res.Skip { + skip++ + } + if res.End { + break + } + } + return nil +} + +func getEngineCount(db database.ObjectInfo, name string) (int, error) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) + defer cancel() + count, err := db.GetEngineCount(ctx, name) + if err != nil { + return 0, err + } + return int(count), nil +} + +func doObject(db database.ObjectInfo, newS3, oldS3 s3.Interface, skip int) (*Result, error) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) + defer cancel() + infos, err := db.GetEngineInfo(ctx, oldS3.Engine(), 1, skip) + if err != nil { + return nil, err + } + if len(infos) == 0 { + return &Result{End: true}, nil + } + obj := infos[0] + if _, err := db.Take(ctx, newS3.Engine(), obj.Name); err == nil { + return &Result{Skip: true}, nil + } else if !errors.Is(err, mongo.ErrNoDocuments) { + return nil, err + } + downloadURL, err := oldS3.AccessURL(ctx, obj.Key, time.Hour, &s3.AccessURLOption{}) + if err != nil { + return nil, err + } + putURL, err := newS3.PresignedPutObject(ctx, obj.Key, time.Hour) + if err != nil { + return nil, err + } + downloadResp, err := http.Get(downloadURL) + if err != nil { + return nil, err + } + defer downloadResp.Body.Close() + switch downloadResp.StatusCode { + case http.StatusNotFound: + return &Result{Skip: true}, nil + case http.StatusOK: + default: + return nil, fmt.Errorf("download object failed %s", downloadResp.Status) + } + log.Printf("file size %d", obj.Size) + request, err := http.NewRequest(http.MethodPut, putURL, downloadResp.Body) + if err != nil { + return nil, err + } + putResp, err := http.DefaultClient.Do(request) + if err != nil { + return nil, err + } + defer putResp.Body.Close() + if putResp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("put object failed %s", putResp.Status) + } + ctx, cancel = context.WithTimeout(context.Background(), defaultTimeout) + defer cancel() + if err := db.UpdateEngine(ctx, obj.Engine, obj.Name, newS3.Engine()); err != nil { + return nil, err + } + return &Result{}, nil +} + +type Result struct { + Skip bool + End bool +} diff --git a/tools/s3/main.go b/tools/s3/main.go new file mode 100644 index 000000000..1e661c9a7 --- /dev/null +++ b/tools/s3/main.go @@ -0,0 +1,23 @@ +package main + +import ( + "flag" + "fmt" + "github.com/openimsdk/open-im-server/v3/tools/s3/internal" + "os" +) + +func main() { + var ( + name string + config string + ) + flag.StringVar(&name, "name", "", "old previous storage name") + flag.StringVar(&config, "config", "", "config directory") + flag.Parse() + if err := internal.Main(config, name); err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } + fmt.Fprintln(os.Stdout, "success") +}