mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-24 02:16:16 +08:00
refactoring scheduled tasks
This commit is contained in:
parent
1949d6c756
commit
86d58253c9
@ -295,18 +295,23 @@ func (t *thirdServer) DeleteOutdatedData(ctx context.Context, req *third.DeleteO
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if len(models) > 0 {
|
keyCount := make(map[string]int)
|
||||||
names := datautil.Batch(func(o *model.Object) string {
|
for _, obj := range models {
|
||||||
return o.Name
|
keyCount[obj.Key]++
|
||||||
}, models)
|
|
||||||
if err := t.s3dataBase.DeleteSpecifiedData(ctx, engine, names); err != nil {
|
|
||||||
return nil, errs.Wrap(err)
|
|
||||||
}
|
}
|
||||||
if err := t.s3dataBase.DelS3Key(ctx, engine, names...); err != nil {
|
for _, obj := range models {
|
||||||
|
count, err := t.s3dataBase.GetKeyCount(ctx, engine, obj.Key)
|
||||||
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
for _, object := range models {
|
if err := t.s3dataBase.DeleteSpecifiedData(ctx, engine, []string{obj.Name}); err != nil {
|
||||||
if err := t.s3.DeleteObject(ctx, object.Key); err != nil {
|
return nil, errs.Wrap(err)
|
||||||
|
}
|
||||||
|
if err := t.s3dataBase.DelS3Key(ctx, engine, obj.Name); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if int(count) <= keyCount[obj.Key] {
|
||||||
|
if err := t.s3.DeleteObject(ctx, obj.Key); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
63
internal/tools/cron_test.go
Normal file
63
internal/tools/cron_test.go
Normal file
@ -0,0 +1,63 @@
|
|||||||
|
package tools
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||||
|
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
|
||||||
|
pbconversation "github.com/openimsdk/protocol/conversation"
|
||||||
|
"github.com/openimsdk/protocol/msg"
|
||||||
|
"github.com/openimsdk/protocol/third"
|
||||||
|
"github.com/openimsdk/tools/mcontext"
|
||||||
|
"github.com/openimsdk/tools/mw"
|
||||||
|
"github.com/robfig/cron/v3"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
"google.golang.org/grpc/credentials/insecure"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestName(t *testing.T) {
|
||||||
|
conf := &config.Discovery{
|
||||||
|
Enable: config.ETCD,
|
||||||
|
Etcd: config.Etcd{
|
||||||
|
RootDirectory: "openim",
|
||||||
|
Address: []string{"localhost:12379"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
client, err := kdisc.NewDiscoveryRegister(conf, "source")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||||
|
ctx := mcontext.SetOpUserID(context.Background(), "imAdmin")
|
||||||
|
msgConn, err := client.GetConn(ctx, "msg-rpc-service")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
thirdConn, err := client.GetConn(ctx, "third-rpc-service")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
conversationConn, err := client.GetConn(ctx, "conversation-rpc-service")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
srv := &cronServer{
|
||||||
|
ctx: ctx,
|
||||||
|
config: &CronTaskConfig{
|
||||||
|
CronTask: config.CronTask{
|
||||||
|
RetainChatRecords: 1,
|
||||||
|
FileExpireTime: 1,
|
||||||
|
DeleteObjectType: []string{"msg-picture", "msg-file", "msg-voice", "msg-video", "msg-video-snapshot", "sdklog", ""},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
cron: cron.New(),
|
||||||
|
msgClient: msg.NewMsgClient(msgConn),
|
||||||
|
conversationClient: pbconversation.NewConversationClient(conversationConn),
|
||||||
|
thirdClient: third.NewThirdClient(thirdConn),
|
||||||
|
}
|
||||||
|
srv.deleteMsg()
|
||||||
|
//srv.clearS3()
|
||||||
|
//srv.clearUserMsg()
|
||||||
|
}
|
@ -16,7 +16,7 @@ func (c *cronServer) deleteMsg() {
|
|||||||
ctx := mcontext.SetOperationID(c.ctx, operationID)
|
ctx := mcontext.SetOperationID(c.ctx, operationID)
|
||||||
log.ZDebug(ctx, "Destruct chat records", "deltime", deltime, "timestamp", deltime.UnixMilli())
|
log.ZDebug(ctx, "Destruct chat records", "deltime", deltime, "timestamp", deltime.UnixMilli())
|
||||||
const (
|
const (
|
||||||
deleteCount = 200
|
deleteCount = 10000
|
||||||
deleteLimit = 50
|
deleteLimit = 50
|
||||||
)
|
)
|
||||||
var count int
|
var count int
|
||||||
@ -28,9 +28,9 @@ func (c *cronServer) deleteMsg() {
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
count += int(resp.Count)
|
count += int(resp.Count)
|
||||||
if resp.Count <= deleteLimit {
|
if resp.Count < deleteLimit {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.ZDebug(ctx, "cron destruct chat records end", "deltime", deltime, "cont", time.Since(now), "deleteDocs", count)
|
log.ZDebug(ctx, "cron destruct chat records end", "deltime", deltime, "cont", time.Since(now), "count", count)
|
||||||
}
|
}
|
||||||
|
@ -16,7 +16,7 @@ func (c *cronServer) clearS3() {
|
|||||||
ctx := mcontext.SetOperationID(c.ctx, operationID)
|
ctx := mcontext.SetOperationID(c.ctx, operationID)
|
||||||
log.ZDebug(ctx, "deleteoutDatedData", "deletetime", deleteTime, "timestamp", deleteTime.UnixMilli())
|
log.ZDebug(ctx, "deleteoutDatedData", "deletetime", deleteTime, "timestamp", deleteTime.UnixMilli())
|
||||||
const (
|
const (
|
||||||
deleteCount = 200
|
deleteCount = 10000
|
||||||
deleteLimit = 100
|
deleteLimit = 100
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -15,7 +15,7 @@ func (c *cronServer) clearUserMsg() {
|
|||||||
ctx := mcontext.SetOperationID(c.ctx, operationID)
|
ctx := mcontext.SetOperationID(c.ctx, operationID)
|
||||||
log.ZDebug(ctx, "clear user msg cron start")
|
log.ZDebug(ctx, "clear user msg cron start")
|
||||||
const (
|
const (
|
||||||
deleteCount = 200
|
deleteCount = 10000
|
||||||
deleteLimit = 100
|
deleteLimit = 100
|
||||||
)
|
)
|
||||||
var count int
|
var count int
|
||||||
|
@ -42,6 +42,7 @@ type S3Database interface {
|
|||||||
FindExpirationObject(ctx context.Context, engine string, expiration time.Time, needDelType []string, count int64) ([]*model.Object, error)
|
FindExpirationObject(ctx context.Context, engine string, expiration time.Time, needDelType []string, count int64) ([]*model.Object, error)
|
||||||
DeleteSpecifiedData(ctx context.Context, engine string, name []string) error
|
DeleteSpecifiedData(ctx context.Context, engine string, name []string) error
|
||||||
DelS3Key(ctx context.Context, engine string, keys ...string) error
|
DelS3Key(ctx context.Context, engine string, keys ...string) error
|
||||||
|
GetKeyCount(ctx context.Context, engine string, key string) (int64, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewS3Database(rdb redis.UniversalClient, s3 s3.Interface, obj database.ObjectInfo) S3Database {
|
func NewS3Database(rdb redis.UniversalClient, s3 s3.Interface, obj database.ObjectInfo) S3Database {
|
||||||
@ -117,10 +118,15 @@ func (s *s3Database) StatObject(ctx context.Context, name string) (*s3.ObjectInf
|
|||||||
func (s *s3Database) FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error) {
|
func (s *s3Database) FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error) {
|
||||||
return s.s3.FormData(ctx, name, size, contentType, duration)
|
return s.s3.FormData(ctx, name, size, contentType, duration)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *s3Database) FindExpirationObject(ctx context.Context, engine string, expiration time.Time, needDelType []string, count int64) ([]*model.Object, error) {
|
func (s *s3Database) FindExpirationObject(ctx context.Context, engine string, expiration time.Time, needDelType []string, count int64) ([]*model.Object, error) {
|
||||||
return s.db.FindExpirationObject(ctx, engine, expiration, needDelType, count)
|
return s.db.FindExpirationObject(ctx, engine, expiration, needDelType, count)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *s3Database) GetKeyCount(ctx context.Context, engine string, key string) (int64, error) {
|
||||||
|
return s.db.GetKeyCount(ctx, engine, key)
|
||||||
|
}
|
||||||
|
|
||||||
func (s *s3Database) DeleteSpecifiedData(ctx context.Context, engine string, name []string) error {
|
func (s *s3Database) DeleteSpecifiedData(ctx context.Context, engine string, name []string) error {
|
||||||
return s.db.Delete(ctx, engine, name)
|
return s.db.Delete(ctx, engine, name)
|
||||||
}
|
}
|
||||||
|
@ -108,3 +108,7 @@ func (o *S3Mongo) FindExpirationObject(ctx context.Context, engine string, expir
|
|||||||
"group": bson.M{"$in": needDelType},
|
"group": bson.M{"$in": needDelType},
|
||||||
}, opt)
|
}, opt)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (o *S3Mongo) GetKeyCount(ctx context.Context, engine string, key string) (int64, error) {
|
||||||
|
return mongoutil.Count(ctx, o.coll, bson.M{"engine": engine, "key": key})
|
||||||
|
}
|
||||||
|
@ -26,4 +26,5 @@ type ObjectInfo interface {
|
|||||||
Take(ctx context.Context, engine string, name string) (*model.Object, error)
|
Take(ctx context.Context, engine string, name string) (*model.Object, error)
|
||||||
Delete(ctx context.Context, engine string, name []string) error
|
Delete(ctx context.Context, engine string, name []string) error
|
||||||
FindExpirationObject(ctx context.Context, engine string, expiration time.Time, needDelType []string, count int64) ([]*model.Object, error)
|
FindExpirationObject(ctx context.Context, engine string, expiration time.Time, needDelType []string, count int64) ([]*model.Object, error)
|
||||||
|
GetKeyCount(ctx context.Context, engine string, key string) (int64, error)
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user