mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-11-04 11:22:10 +08:00 
			
		
		
		
	Merge pull request #6 from mo3et/minio-cron-del
feature: implement minio expire delete logic.
This commit is contained in:
		
						commit
						bf2cf427f5
					
				@ -1,2 +1,3 @@
 | 
			
		||||
chatRecordsClearTime: "0 2 * * *"
 | 
			
		||||
retainChatRecords: 365
 | 
			
		||||
fileExpireTime: 90
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										4
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										4
									
								
								go.mod
									
									
									
									
									
								
							@ -13,8 +13,8 @@ require (
 | 
			
		||||
	github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
 | 
			
		||||
	github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect
 | 
			
		||||
	github.com/mitchellh/mapstructure v1.5.0
 | 
			
		||||
	github.com/openimsdk/protocol v0.0.69-alpha.17
 | 
			
		||||
	github.com/openimsdk/tools v0.0.49-alpha.25
 | 
			
		||||
	github.com/openimsdk/protocol v0.0.69-alpha.22
 | 
			
		||||
	github.com/openimsdk/tools v0.0.49-alpha.30
 | 
			
		||||
	github.com/pkg/errors v0.9.1 // indirect
 | 
			
		||||
	github.com/prometheus/client_golang v1.18.0
 | 
			
		||||
	github.com/stretchr/testify v1.9.0
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										8
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										8
									
								
								go.sum
									
									
									
									
									
								
							@ -270,10 +270,10 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
 | 
			
		||||
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
 | 
			
		||||
github.com/openimsdk/gomake v0.0.13 h1:xLDe/moqgWpRoptHzI4packAWzs4C16b+sVY+txNJp0=
 | 
			
		||||
github.com/openimsdk/gomake v0.0.13/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
 | 
			
		||||
github.com/openimsdk/protocol v0.0.69-alpha.17 h1:pEag4ZdlovE+AyLsw1VYFU/3sk6ayvGdPzgufQfKf9M=
 | 
			
		||||
github.com/openimsdk/protocol v0.0.69-alpha.17/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
 | 
			
		||||
github.com/openimsdk/tools v0.0.49-alpha.25 h1:OpRPwDZ2xWX7Zj5kyfZhryu/NfZTrsRVr2GFwu1HQHI=
 | 
			
		||||
github.com/openimsdk/tools v0.0.49-alpha.25/go.mod h1:rwsFI1G/nBHNfiNapbven41akRDPBbH4df0Cgy6xueU=
 | 
			
		||||
github.com/openimsdk/protocol v0.0.69-alpha.22 h1:kifZWVNDkg9diXFJUJ/Q9xFc80cveBhc+1dUXcE9xHQ=
 | 
			
		||||
github.com/openimsdk/protocol v0.0.69-alpha.22/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
 | 
			
		||||
github.com/openimsdk/tools v0.0.49-alpha.30 h1:iT2+1F8cJmlwKEris25YgK0seiJRUear+wTgc1bzcg8=
 | 
			
		||||
github.com/openimsdk/tools v0.0.49-alpha.30/go.mod h1:zc0maZ2ohXlHd0ylY5JnCE8uqq/hslhcfcKa6iO5PCU=
 | 
			
		||||
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
 | 
			
		||||
github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
 | 
			
		||||
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
 | 
			
		||||
 | 
			
		||||
@ -19,11 +19,12 @@ import (
 | 
			
		||||
	"encoding/base64"
 | 
			
		||||
	"encoding/hex"
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
 | 
			
		||||
	"path"
 | 
			
		||||
	"strconv"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
 | 
			
		||||
 | 
			
		||||
	"github.com/google/uuid"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
 | 
			
		||||
	"github.com/openimsdk/protocol/third"
 | 
			
		||||
@ -283,6 +284,19 @@ func (t *thirdServer) apiAddress(prefix, name string) string {
 | 
			
		||||
	return prefix + name
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (t *thirdServer) DeleteOutdatedData(ctx context.Context, req *third.DeleteOutdatedDataReq) (*third.DeleteOutdatedDataResp, error) {
 | 
			
		||||
	expireTime := time.UnixMilli(req.ExpireTime)
 | 
			
		||||
	models, err := t.s3dataBase.FindByExpires(ctx, expireTime)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	for _, model := range models {
 | 
			
		||||
		t.s3dataBase.DeleteObject(ctx, model.Key)
 | 
			
		||||
		t.s3dataBase.DeleteSpecifiedData(ctx, model.Engine, model.Key)
 | 
			
		||||
	}
 | 
			
		||||
	return &third.DeleteOutdatedDataResp{}, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type FormDataMate struct {
 | 
			
		||||
	Name        string `json:"name"`
 | 
			
		||||
	Size        int64  `json:"size"`
 | 
			
		||||
 | 
			
		||||
@ -17,15 +17,17 @@ package tools
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"os"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/config"
 | 
			
		||||
	kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
 | 
			
		||||
	"github.com/openimsdk/protocol/msg"
 | 
			
		||||
	"github.com/openimsdk/protocol/third"
 | 
			
		||||
	"github.com/openimsdk/tools/mcontext"
 | 
			
		||||
	"github.com/openimsdk/tools/mw"
 | 
			
		||||
	"google.golang.org/grpc"
 | 
			
		||||
	"google.golang.org/grpc/credentials/insecure"
 | 
			
		||||
	"os"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/openimsdk/tools/errs"
 | 
			
		||||
	"github.com/openimsdk/tools/log"
 | 
			
		||||
@ -69,6 +71,25 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
 | 
			
		||||
	if _, err := crontab.AddFunc(config.CronTask.ChatRecordsClearTime, clearFunc); err != nil {
 | 
			
		||||
		return errs.Wrap(err)
 | 
			
		||||
	}
 | 
			
		||||
	deleteFunc := func() {
 | 
			
		||||
		now := time.Now()
 | 
			
		||||
		deleteTime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.FileExpireTime))
 | 
			
		||||
		ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deleteTime.UnixMilli()))
 | 
			
		||||
		log.ZInfo(ctx, "deleteoutDatedData ", "deletetime", deleteTime, "timestamp", deleteTime.UnixMilli())
 | 
			
		||||
		tConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Third)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		thirdClient := third.NewThirdClient(tConn)
 | 
			
		||||
		if _, err := thirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: deleteTime.UnixMilli()}); err != nil {
 | 
			
		||||
			log.ZError(ctx, "cron deleteoutDatedData failed", err, "deleteTime", deleteTime, "cont", time.Since(now))
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		log.ZInfo(ctx, "cron deleteoutDatedData success", "deltime", deleteTime, "cont", time.Since(now))
 | 
			
		||||
	}
 | 
			
		||||
	if _, err := crontab.AddFunc(string(config.CronTask.FileExpireTime), deleteFunc); err != nil {
 | 
			
		||||
		return errs.Wrap(err)
 | 
			
		||||
	}
 | 
			
		||||
	log.ZInfo(ctx, "start cron task", "chatRecordsClearTime", config.CronTask.ChatRecordsClearTime)
 | 
			
		||||
	crontab.Start()
 | 
			
		||||
	<-ctx.Done()
 | 
			
		||||
 | 
			
		||||
@ -15,14 +15,15 @@
 | 
			
		||||
package config
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"strings"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/openimsdk/tools/db/mongoutil"
 | 
			
		||||
	"github.com/openimsdk/tools/db/redisutil"
 | 
			
		||||
	"github.com/openimsdk/tools/mq/kafka"
 | 
			
		||||
	"github.com/openimsdk/tools/s3/cos"
 | 
			
		||||
	"github.com/openimsdk/tools/s3/minio"
 | 
			
		||||
	"github.com/openimsdk/tools/s3/oss"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"time"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type CacheConfig struct {
 | 
			
		||||
@ -107,6 +108,7 @@ type API struct {
 | 
			
		||||
type CronTask struct {
 | 
			
		||||
	ChatRecordsClearTime string `mapstructure:"chatRecordsClearTime"`
 | 
			
		||||
	RetainChatRecords    int    `mapstructure:"retainChatRecords"`
 | 
			
		||||
	FileExpireTime       int    `mapstructure:"fileExpireTime"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type OfflinePushConfig struct {
 | 
			
		||||
 | 
			
		||||
@ -16,11 +16,12 @@ package controller
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"path/filepath"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	redis2 "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
 | 
			
		||||
	"path/filepath"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
 | 
			
		||||
	"github.com/openimsdk/tools/s3"
 | 
			
		||||
@ -38,6 +39,9 @@ type S3Database interface {
 | 
			
		||||
	SetObject(ctx context.Context, info *model.Object) error
 | 
			
		||||
	StatObject(ctx context.Context, name string) (*s3.ObjectInfo, error)
 | 
			
		||||
	FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error)
 | 
			
		||||
	FindByExpires(ctx context.Context, duration time.Time) ([]*model.Object, error)
 | 
			
		||||
	DeleteObject(ctx context.Context, name string) error
 | 
			
		||||
	DeleteSpecifiedData(ctx context.Context, engine string, name string) error
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewS3Database(rdb redis.UniversalClient, s3 s3.Interface, obj database.ObjectInfo) S3Database {
 | 
			
		||||
@ -111,3 +115,13 @@ func (s *s3Database) StatObject(ctx context.Context, name string) (*s3.ObjectInf
 | 
			
		||||
func (s *s3Database) FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error) {
 | 
			
		||||
	return s.s3.FormData(ctx, name, size, contentType, duration)
 | 
			
		||||
}
 | 
			
		||||
func (s *s3Database) FindByExpires(ctx context.Context, duration time.Time) ([]*model.Object, error) {
 | 
			
		||||
	return s.db.FindByExpires(ctx, duration)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *s3Database) DeleteObject(ctx context.Context, name string) error {
 | 
			
		||||
	return s.s3.DeleteObject(ctx, name)
 | 
			
		||||
}
 | 
			
		||||
func (s *s3Database) DeleteSpecifiedData(ctx context.Context, engine string, name string) error {
 | 
			
		||||
	return s.db.Delete(ctx, engine, name)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -16,6 +16,8 @@ package mgo
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
 | 
			
		||||
 | 
			
		||||
@ -68,3 +70,8 @@ func (o *S3Mongo) Take(ctx context.Context, engine string, name string) (*model.
 | 
			
		||||
func (o *S3Mongo) Delete(ctx context.Context, engine string, name string) error {
 | 
			
		||||
	return mongoutil.DeleteOne(ctx, o.coll, bson.M{"name": name, "engine": engine})
 | 
			
		||||
}
 | 
			
		||||
func (o *S3Mongo) FindByExpires(ctx context.Context, duration time.Time) ([]*model.Object, error) {
 | 
			
		||||
	return mongoutil.Find[*model.Object](ctx, o.coll, bson.M{
 | 
			
		||||
		"create_time": bson.M{"$lt": duration},
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -16,6 +16,8 @@ package database
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
@ -23,4 +25,5 @@ type ObjectInfo interface {
 | 
			
		||||
	SetObject(ctx context.Context, obj *model.Object) error
 | 
			
		||||
	Take(ctx context.Context, engine string, name string) (*model.Object, error)
 | 
			
		||||
	Delete(ctx context.Context, engine string, name string) error
 | 
			
		||||
	FindByExpires(ctx context.Context, duration time.Time) ([]*model.Object, error)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -41,3 +41,7 @@ func NewThird(discov discovery.SvcDiscoveryRegistry, rpcRegisterName, grafanaUrl
 | 
			
		||||
	}
 | 
			
		||||
	return &Third{discov: discov, Client: client, conn: conn, GrafanaUrl: grafanaUrl}
 | 
			
		||||
}
 | 
			
		||||
func (t *Third) DeleteOutdatedData(ctx context.Context, expires int64) error {
 | 
			
		||||
	_, err := t.Client.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: expires})
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user