diff --git a/internal/rpc/third/s3.go b/internal/rpc/third/s3.go index a541462a2..e1b42ca33 100644 --- a/internal/rpc/third/s3.go +++ b/internal/rpc/third/s3.go @@ -295,18 +295,23 @@ func (t *thirdServer) DeleteOutdatedData(ctx context.Context, req *third.DeleteO if err != nil { return nil, err } - if len(models) > 0 { - names := datautil.Batch(func(o *model.Object) string { - return o.Name - }, models) - if err := t.s3dataBase.DeleteSpecifiedData(ctx, engine, names); err != nil { - return nil, errs.Wrap(err) - } - if err := t.s3dataBase.DelS3Key(ctx, engine, names...); err != nil { + keyCount := make(map[string]int) + for _, obj := range models { + keyCount[obj.Key]++ + } + for _, obj := range models { + count, err := t.s3dataBase.GetKeyCount(ctx, engine, obj.Key) + if err != nil { return nil, err } - for _, object := range models { - if err := t.s3.DeleteObject(ctx, object.Key); err != nil { + if err := t.s3dataBase.DeleteSpecifiedData(ctx, engine, []string{obj.Name}); err != nil { + return nil, errs.Wrap(err) + } + if err := t.s3dataBase.DelS3Key(ctx, engine, obj.Name); err != nil { + return nil, err + } + if int(count) <= keyCount[obj.Key] { + if err := t.s3.DeleteObject(ctx, obj.Key); err != nil { return nil, err } } diff --git a/internal/tools/cron_test.go b/internal/tools/cron_test.go new file mode 100644 index 000000000..890349069 --- /dev/null +++ b/internal/tools/cron_test.go @@ -0,0 +1,63 @@ +package tools + +import ( + "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" + pbconversation "github.com/openimsdk/protocol/conversation" + "github.com/openimsdk/protocol/msg" + "github.com/openimsdk/protocol/third" + "github.com/openimsdk/tools/mcontext" + "github.com/openimsdk/tools/mw" + "github.com/robfig/cron/v3" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "testing" +) + +func TestName(t *testing.T) { + conf := &config.Discovery{ + Enable: config.ETCD, + Etcd: config.Etcd{ + RootDirectory: "openim", + Address: []string{"localhost:12379"}, + }, + } + client, err := kdisc.NewDiscoveryRegister(conf, "source") + if err != nil { + panic(err) + } + client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials())) + ctx := mcontext.SetOpUserID(context.Background(), "imAdmin") + msgConn, err := client.GetConn(ctx, "msg-rpc-service") + if err != nil { + panic(err) + } + thirdConn, err := client.GetConn(ctx, "third-rpc-service") + if err != nil { + panic(err) + } + + conversationConn, err := client.GetConn(ctx, "conversation-rpc-service") + if err != nil { + panic(err) + } + + srv := &cronServer{ + ctx: ctx, + config: &CronTaskConfig{ + CronTask: config.CronTask{ + RetainChatRecords: 1, + FileExpireTime: 1, + DeleteObjectType: []string{"msg-picture", "msg-file", "msg-voice", "msg-video", "msg-video-snapshot", "sdklog", ""}, + }, + }, + cron: cron.New(), + msgClient: msg.NewMsgClient(msgConn), + conversationClient: pbconversation.NewConversationClient(conversationConn), + thirdClient: third.NewThirdClient(thirdConn), + } + srv.deleteMsg() + //srv.clearS3() + //srv.clearUserMsg() +} diff --git a/internal/tools/msg.go b/internal/tools/msg.go index 522649645..cc00cc5b8 100644 --- a/internal/tools/msg.go +++ b/internal/tools/msg.go @@ -16,7 +16,7 @@ func (c *cronServer) deleteMsg() { ctx := mcontext.SetOperationID(c.ctx, operationID) log.ZDebug(ctx, "Destruct chat records", "deltime", deltime, "timestamp", deltime.UnixMilli()) const ( - deleteCount = 200 + deleteCount = 10000 deleteLimit = 50 ) var count int @@ -28,9 +28,9 @@ func (c *cronServer) deleteMsg() { break } count += int(resp.Count) - if resp.Count <= deleteLimit { + if resp.Count < deleteLimit { break } } - log.ZDebug(ctx, "cron destruct chat records end", "deltime", deltime, "cont", time.Since(now), "deleteDocs", count) + log.ZDebug(ctx, "cron destruct chat records end", "deltime", deltime, "cont", time.Since(now), "count", count) } diff --git a/internal/tools/s3.go b/internal/tools/s3.go index 469b0d0cb..9b6b9c408 100644 --- a/internal/tools/s3.go +++ b/internal/tools/s3.go @@ -16,7 +16,7 @@ func (c *cronServer) clearS3() { ctx := mcontext.SetOperationID(c.ctx, operationID) log.ZDebug(ctx, "deleteoutDatedData", "deletetime", deleteTime, "timestamp", deleteTime.UnixMilli()) const ( - deleteCount = 200 + deleteCount = 10000 deleteLimit = 100 ) diff --git a/internal/tools/user_msg.go b/internal/tools/user_msg.go index 3d4ca40d3..a4afa769e 100644 --- a/internal/tools/user_msg.go +++ b/internal/tools/user_msg.go @@ -15,7 +15,7 @@ func (c *cronServer) clearUserMsg() { ctx := mcontext.SetOperationID(c.ctx, operationID) log.ZDebug(ctx, "clear user msg cron start") const ( - deleteCount = 200 + deleteCount = 10000 deleteLimit = 100 ) var count int diff --git a/pkg/common/storage/controller/s3.go b/pkg/common/storage/controller/s3.go index 6a0f11b1c..6693d2dde 100644 --- a/pkg/common/storage/controller/s3.go +++ b/pkg/common/storage/controller/s3.go @@ -42,6 +42,7 @@ type S3Database interface { FindExpirationObject(ctx context.Context, engine string, expiration time.Time, needDelType []string, count int64) ([]*model.Object, error) DeleteSpecifiedData(ctx context.Context, engine string, name []string) error DelS3Key(ctx context.Context, engine string, keys ...string) error + GetKeyCount(ctx context.Context, engine string, key string) (int64, error) } func NewS3Database(rdb redis.UniversalClient, s3 s3.Interface, obj database.ObjectInfo) S3Database { @@ -117,10 +118,15 @@ func (s *s3Database) StatObject(ctx context.Context, name string) (*s3.ObjectInf func (s *s3Database) FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error) { return s.s3.FormData(ctx, name, size, contentType, duration) } + func (s *s3Database) FindExpirationObject(ctx context.Context, engine string, expiration time.Time, needDelType []string, count int64) ([]*model.Object, error) { return s.db.FindExpirationObject(ctx, engine, expiration, needDelType, count) } +func (s *s3Database) GetKeyCount(ctx context.Context, engine string, key string) (int64, error) { + return s.db.GetKeyCount(ctx, engine, key) +} + func (s *s3Database) DeleteSpecifiedData(ctx context.Context, engine string, name []string) error { return s.db.Delete(ctx, engine, name) } diff --git a/pkg/common/storage/database/mgo/object.go b/pkg/common/storage/database/mgo/object.go index 53f2a6ba4..624eb84a0 100644 --- a/pkg/common/storage/database/mgo/object.go +++ b/pkg/common/storage/database/mgo/object.go @@ -108,3 +108,7 @@ func (o *S3Mongo) FindExpirationObject(ctx context.Context, engine string, expir "group": bson.M{"$in": needDelType}, }, opt) } + +func (o *S3Mongo) GetKeyCount(ctx context.Context, engine string, key string) (int64, error) { + return mongoutil.Count(ctx, o.coll, bson.M{"engine": engine, "key": key}) +} diff --git a/pkg/common/storage/database/object.go b/pkg/common/storage/database/object.go index 86d47a2a6..5541a159b 100644 --- a/pkg/common/storage/database/object.go +++ b/pkg/common/storage/database/object.go @@ -26,4 +26,5 @@ type ObjectInfo interface { Take(ctx context.Context, engine string, name string) (*model.Object, error) Delete(ctx context.Context, engine string, name []string) error FindExpirationObject(ctx context.Context, engine string, expiration time.Time, needDelType []string, count int64) ([]*model.Object, error) + GetKeyCount(ctx context.Context, engine string, key string) (int64, error) }