mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-11-05 03:42:08 +08:00
implement minio expire delete.
This commit is contained in:
parent
9b043feca7
commit
2b2a75f19e
@ -1,2 +1,3 @@
|
|||||||
chatRecordsClearTime: "0 2 * * *"
|
chatRecordsClearTime: "0 2 * * *"
|
||||||
retainChatRecords: 365
|
retainChatRecords: 365
|
||||||
|
fileExpireTime: 90
|
||||||
|
|||||||
2
go.mod
2
go.mod
@ -13,7 +13,7 @@ require (
|
|||||||
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
|
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
|
||||||
github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect
|
github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect
|
||||||
github.com/mitchellh/mapstructure v1.5.0
|
github.com/mitchellh/mapstructure v1.5.0
|
||||||
github.com/openimsdk/protocol v0.0.69-alpha.17
|
github.com/openimsdk/protocol v0.0.69-alpha.22
|
||||||
github.com/openimsdk/tools v0.0.49-alpha.25
|
github.com/openimsdk/tools v0.0.49-alpha.25
|
||||||
github.com/pkg/errors v0.9.1 // indirect
|
github.com/pkg/errors v0.9.1 // indirect
|
||||||
github.com/prometheus/client_golang v1.18.0
|
github.com/prometheus/client_golang v1.18.0
|
||||||
|
|||||||
4
go.sum
4
go.sum
@ -270,8 +270,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
|
|||||||
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
|
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
|
||||||
github.com/openimsdk/gomake v0.0.13 h1:xLDe/moqgWpRoptHzI4packAWzs4C16b+sVY+txNJp0=
|
github.com/openimsdk/gomake v0.0.13 h1:xLDe/moqgWpRoptHzI4packAWzs4C16b+sVY+txNJp0=
|
||||||
github.com/openimsdk/gomake v0.0.13/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
|
github.com/openimsdk/gomake v0.0.13/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
|
||||||
github.com/openimsdk/protocol v0.0.69-alpha.17 h1:pEag4ZdlovE+AyLsw1VYFU/3sk6ayvGdPzgufQfKf9M=
|
github.com/openimsdk/protocol v0.0.69-alpha.22 h1:kifZWVNDkg9diXFJUJ/Q9xFc80cveBhc+1dUXcE9xHQ=
|
||||||
github.com/openimsdk/protocol v0.0.69-alpha.17/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
|
github.com/openimsdk/protocol v0.0.69-alpha.22/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
|
||||||
github.com/openimsdk/tools v0.0.49-alpha.25 h1:OpRPwDZ2xWX7Zj5kyfZhryu/NfZTrsRVr2GFwu1HQHI=
|
github.com/openimsdk/tools v0.0.49-alpha.25 h1:OpRPwDZ2xWX7Zj5kyfZhryu/NfZTrsRVr2GFwu1HQHI=
|
||||||
github.com/openimsdk/tools v0.0.49-alpha.25/go.mod h1:rwsFI1G/nBHNfiNapbven41akRDPBbH4df0Cgy6xueU=
|
github.com/openimsdk/tools v0.0.49-alpha.25/go.mod h1:rwsFI1G/nBHNfiNapbven41akRDPBbH4df0Cgy6xueU=
|
||||||
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
|
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
|
||||||
|
|||||||
@ -19,11 +19,12 @@ import (
|
|||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
|
||||||
"path"
|
"path"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||||
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
|
||||||
"github.com/openimsdk/protocol/third"
|
"github.com/openimsdk/protocol/third"
|
||||||
@ -283,6 +284,15 @@ func (t *thirdServer) apiAddress(prefix, name string) string {
|
|||||||
return prefix + name
|
return prefix + name
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *thirdServer) DeleteOutdatedData(ctx context.Context, req *third.DeleteOutdatedDataReq) (*third.DeleteOutdatedDataResp, error) {
|
||||||
|
expireTime := time.UnixMilli(req.ExpireTime)
|
||||||
|
err := t.s3dataBase.DeleteByExpires(ctx, expireTime)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &third.DeleteOutdatedDataResp{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
type FormDataMate struct {
|
type FormDataMate struct {
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
Size int64 `json:"size"`
|
Size int64 `json:"size"`
|
||||||
|
|||||||
@ -17,15 +17,17 @@ package tools
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||||
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
|
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
|
||||||
"github.com/openimsdk/protocol/msg"
|
"github.com/openimsdk/protocol/msg"
|
||||||
|
"github.com/openimsdk/protocol/third"
|
||||||
"github.com/openimsdk/tools/mcontext"
|
"github.com/openimsdk/tools/mcontext"
|
||||||
"github.com/openimsdk/tools/mw"
|
"github.com/openimsdk/tools/mw"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/credentials/insecure"
|
"google.golang.org/grpc/credentials/insecure"
|
||||||
"os"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/openimsdk/tools/errs"
|
"github.com/openimsdk/tools/errs"
|
||||||
"github.com/openimsdk/tools/log"
|
"github.com/openimsdk/tools/log"
|
||||||
@ -69,6 +71,25 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
|
|||||||
if _, err := crontab.AddFunc(config.CronTask.ChatRecordsClearTime, clearFunc); err != nil {
|
if _, err := crontab.AddFunc(config.CronTask.ChatRecordsClearTime, clearFunc); err != nil {
|
||||||
return errs.Wrap(err)
|
return errs.Wrap(err)
|
||||||
}
|
}
|
||||||
|
deleteFunc := func() {
|
||||||
|
now := time.Now()
|
||||||
|
deleteTime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.FileExpireTime))
|
||||||
|
ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deleteTime.UnixMilli()))
|
||||||
|
log.ZInfo(ctx, "deleteoutDatedData ", "deletetime", deleteTime, "timestamp", deleteTime.UnixMilli())
|
||||||
|
tConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Third)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
thirdClient := third.NewThirdClient(tConn)
|
||||||
|
if _, err := thirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: deleteTime.UnixMilli()}); err != nil {
|
||||||
|
log.ZError(ctx, "cron deleteoutDatedData failed", err, "deleteTime", deleteTime, "cont", time.Since(now))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log.ZInfo(ctx, "cron deleteoutDatedData success", "deltime", deleteTime, "cont", time.Since(now))
|
||||||
|
}
|
||||||
|
if _, err := crontab.AddFunc(string(config.CronTask.FileExpireTime), deleteFunc); err != nil {
|
||||||
|
return errs.Wrap(err)
|
||||||
|
}
|
||||||
log.ZInfo(ctx, "start cron task", "chatRecordsClearTime", config.CronTask.ChatRecordsClearTime)
|
log.ZInfo(ctx, "start cron task", "chatRecordsClearTime", config.CronTask.ChatRecordsClearTime)
|
||||||
crontab.Start()
|
crontab.Start()
|
||||||
<-ctx.Done()
|
<-ctx.Done()
|
||||||
|
|||||||
@ -15,14 +15,15 @@
|
|||||||
package config
|
package config
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/openimsdk/tools/db/mongoutil"
|
"github.com/openimsdk/tools/db/mongoutil"
|
||||||
"github.com/openimsdk/tools/db/redisutil"
|
"github.com/openimsdk/tools/db/redisutil"
|
||||||
"github.com/openimsdk/tools/mq/kafka"
|
"github.com/openimsdk/tools/mq/kafka"
|
||||||
"github.com/openimsdk/tools/s3/cos"
|
"github.com/openimsdk/tools/s3/cos"
|
||||||
"github.com/openimsdk/tools/s3/minio"
|
"github.com/openimsdk/tools/s3/minio"
|
||||||
"github.com/openimsdk/tools/s3/oss"
|
"github.com/openimsdk/tools/s3/oss"
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type CacheConfig struct {
|
type CacheConfig struct {
|
||||||
@ -107,6 +108,7 @@ type API struct {
|
|||||||
type CronTask struct {
|
type CronTask struct {
|
||||||
ChatRecordsClearTime string `mapstructure:"chatRecordsClearTime"`
|
ChatRecordsClearTime string `mapstructure:"chatRecordsClearTime"`
|
||||||
RetainChatRecords int `mapstructure:"retainChatRecords"`
|
RetainChatRecords int `mapstructure:"retainChatRecords"`
|
||||||
|
FileExpireTime int `mapstructure:"fileExpireTime"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type OfflinePushConfig struct {
|
type OfflinePushConfig struct {
|
||||||
|
|||||||
@ -16,11 +16,12 @@ package controller
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"path/filepath"
|
||||||
|
"time"
|
||||||
|
|
||||||
redis2 "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
|
redis2 "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||||
"path/filepath"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
|
||||||
"github.com/openimsdk/tools/s3"
|
"github.com/openimsdk/tools/s3"
|
||||||
@ -38,6 +39,7 @@ type S3Database interface {
|
|||||||
SetObject(ctx context.Context, info *model.Object) error
|
SetObject(ctx context.Context, info *model.Object) error
|
||||||
StatObject(ctx context.Context, name string) (*s3.ObjectInfo, error)
|
StatObject(ctx context.Context, name string) (*s3.ObjectInfo, error)
|
||||||
FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error)
|
FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error)
|
||||||
|
DeleteByExpires(ctx context.Context, duration time.Time) error
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewS3Database(rdb redis.UniversalClient, s3 s3.Interface, obj database.ObjectInfo) S3Database {
|
func NewS3Database(rdb redis.UniversalClient, s3 s3.Interface, obj database.ObjectInfo) S3Database {
|
||||||
@ -111,3 +113,6 @@ func (s *s3Database) StatObject(ctx context.Context, name string) (*s3.ObjectInf
|
|||||||
func (s *s3Database) FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error) {
|
func (s *s3Database) FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error) {
|
||||||
return s.s3.FormData(ctx, name, size, contentType, duration)
|
return s.s3.FormData(ctx, name, size, contentType, duration)
|
||||||
}
|
}
|
||||||
|
func (s *s3Database) DeleteByExpires(ctx context.Context, duration time.Time) error {
|
||||||
|
return s.db.DeleteByExpires(ctx, duration)
|
||||||
|
}
|
||||||
|
|||||||
@ -16,6 +16,8 @@ package mgo
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||||
|
|
||||||
@ -68,3 +70,8 @@ func (o *S3Mongo) Take(ctx context.Context, engine string, name string) (*model.
|
|||||||
func (o *S3Mongo) Delete(ctx context.Context, engine string, name string) error {
|
func (o *S3Mongo) Delete(ctx context.Context, engine string, name string) error {
|
||||||
return mongoutil.DeleteOne(ctx, o.coll, bson.M{"name": name, "engine": engine})
|
return mongoutil.DeleteOne(ctx, o.coll, bson.M{"name": name, "engine": engine})
|
||||||
}
|
}
|
||||||
|
func (o *S3Mongo) DeleteByExpires(ctx context.Context, duration time.Time) error {
|
||||||
|
return mongoutil.DeleteMany(ctx, o.coll, bson.M{
|
||||||
|
"create_time": bson.M{"$lt": duration},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|||||||
@ -16,6 +16,8 @@ package database
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -23,4 +25,5 @@ type ObjectInfo interface {
|
|||||||
SetObject(ctx context.Context, obj *model.Object) error
|
SetObject(ctx context.Context, obj *model.Object) error
|
||||||
Take(ctx context.Context, engine string, name string) (*model.Object, error)
|
Take(ctx context.Context, engine string, name string) (*model.Object, error)
|
||||||
Delete(ctx context.Context, engine string, name string) error
|
Delete(ctx context.Context, engine string, name string) error
|
||||||
|
DeleteByExpires(ctx context.Context, duration time.Time) error
|
||||||
}
|
}
|
||||||
|
|||||||
@ -41,3 +41,7 @@ func NewThird(discov discovery.SvcDiscoveryRegistry, rpcRegisterName, grafanaUrl
|
|||||||
}
|
}
|
||||||
return &Third{discov: discov, Client: client, conn: conn, GrafanaUrl: grafanaUrl}
|
return &Third{discov: discov, Client: client, conn: conn, GrafanaUrl: grafanaUrl}
|
||||||
}
|
}
|
||||||
|
func (t *Third) DeleteOutdatedData(ctx context.Context, expires int64) error {
|
||||||
|
_, err := t.Client.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: expires})
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user