消息阅后即焚

This commit is contained in:
hawklin2017 2026-05-13 20:50:34 +08:00
parent 3e5bf3a4e0
commit d992d1fb41
5 changed files with 72 additions and 27 deletions

View File

@ -1,6 +1,9 @@
cronExecuteTime: 0 2 * * *
burnCronExecuteTime: "*/1 * * * *" # 可选:仅阅后即焚清理使用;不配置则与 cronExecuteTime 相同
burnClearLimit: 100 # 可选:单次 RPC 批大小,默认 100
burnClearMaxLoop: 100 # 可选:单次定时内最大循环轮数,默认 10000
retainChatRecords: 365
fileExpireTime: 180
deleteObjectType: ["msg-picture","msg-file", "msg-voice","msg-video","msg-video-snapshot","sdklog"]
chatAPI:
address: http://127.0.0.1:10008
address: http://127.0.0.1:10008

View File

@ -25,17 +25,15 @@ import (
)
// clearBurnExpiredMsgs 阅后即焚 cron 入口:循环调用 conversation 服务的
// ClearBurnExpiredMsgs每次至多处理 burnLimit 个 (user, conversation) 分组,
// 直至本轮没有新的过期分组或达到防御性的最大循环次数
// ClearBurnExpiredMsgs每次至多处理 burnClearLimit 个 (user, conversation) 分组,
// 直至本轮没有新的过期分组或达到 burnClearMaxLoop 轮
func (c *cronServer) clearBurnExpiredMsgs() {
now := time.Now()
operationID := fmt.Sprintf("cron_burn_msg_%d_%d", os.Getpid(), now.UnixMilli())
ctx := mcontext.SetOperationID(c.ctx, operationID)
log.ZDebug(ctx, "clear burn expired msgs cron start")
const (
maxLoop = 10000
burnLimit = 100
)
burnLimit := int32(c.config.CronTask.BurnClearLimit)
maxLoop := c.config.CronTask.BurnClearMaxLoop
var count int
for i := 1; i <= maxLoop; i++ {
resp, err := c.conversationClient.ClearBurnExpiredMsgs(ctx, &pbconversation.ClearBurnExpiredMsgsReq{

View File

@ -16,6 +16,7 @@ package tools
import (
"context"
"strings"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
@ -44,36 +45,37 @@ type CronTaskConfig struct {
MongodbConfig config.Mongo
}
func Start(ctx context.Context, config *CronTaskConfig) error {
log.CInfo(ctx, "CRON-TASK server is initializing", "chatRecordsClearTime", config.CronTask.CronExecuteTime, "msgDestructTime", config.CronTask.RetainChatRecords)
if config.CronTask.RetainChatRecords < 1 {
func Start(ctx context.Context, cfg *CronTaskConfig) error {
config.FillCronTaskDefaults(&cfg.CronTask)
log.CInfo(ctx, "CRON-TASK server is initializing", "chatRecordsClearTime", cfg.CronTask.CronExecuteTime, "msgDestructTime", cfg.CronTask.RetainChatRecords)
if cfg.CronTask.RetainChatRecords < 1 {
return errs.New("msg destruct time must be greater than 1").Wrap()
}
client, err := kdisc.NewDiscoveryRegister(&config.Discovery, &config.Share, nil)
client, err := kdisc.NewDiscoveryRegister(&cfg.Discovery, &cfg.Share, nil)
if err != nil {
return errs.WrapMsg(err, "failed to register discovery service")
}
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()))
ctx = mcontext.SetOpUserID(ctx, config.Share.IMAdminUserID[0])
ctx = mcontext.SetOpUserID(ctx, cfg.Share.IMAdminUserID[0])
msgConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Msg)
msgConn, err := client.GetConn(ctx, cfg.Share.RpcRegisterName.Msg)
if err != nil {
return err
}
thirdConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Third)
thirdConn, err := client.GetConn(ctx, cfg.Share.RpcRegisterName.Third)
if err != nil {
return err
}
conversationConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Conversation)
conversationConn, err := client.GetConn(ctx, cfg.Share.RpcRegisterName.Conversation)
if err != nil {
return err
}
authConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Auth)
authConn, err := client.GetConn(ctx, cfg.Share.RpcRegisterName.Auth)
if err != nil {
return err
}
mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build())
mgocli, err := mongoutil.NewMongoDB(ctx, cfg.MongodbConfig.Build())
if err != nil {
return errs.WrapMsg(err, "crontask: connect mongodb failed")
}
@ -86,14 +88,14 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
srv := &cronServer{
ctx: ctx,
config: config,
config: cfg,
cron: cron.New(),
msgClient: msg.NewMsgClient(msgConn),
conversationClient: pbconversation.NewConversationClient(conversationConn),
thirdClient: third.NewThirdClient(thirdConn),
authClient: rpcli.NewAuthClient(authConn),
userOfflineRecordDB: userOfflineRecordDB,
chatAPIAddress: config.CronTask.ChatAPI.Address,
chatAPIAddress: cfg.CronTask.ChatAPI.Address,
}
if err := srv.registerClearS3(); err != nil {
@ -111,7 +113,7 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
if err := srv.registerDeleteExpiredOfflineUsers(); err != nil {
return err
}
log.ZDebug(ctx, "start cron task", "CronExecuteTime", config.CronTask.CronExecuteTime)
log.ZDebug(ctx, "start cron task", "CronExecuteTime", cfg.CronTask.CronExecuteTime)
srv.cron.Start()
<-ctx.Done()
return nil
@ -153,7 +155,11 @@ func (c *cronServer) registerClearUserMsg() error {
}
func (c *cronServer) registerClearBurnExpiredMsgs() error {
_, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, c.clearBurnExpiredMsgs)
schedule := strings.TrimSpace(c.config.CronTask.BurnCronExecuteTime)
if schedule == "" {
schedule = c.config.CronTask.CronExecuteTime
}
_, err := c.cron.AddFunc(schedule, c.clearBurnExpiredMsgs)
return errs.WrapMsg(err, "failed to register clear burn expired msgs cron task")
}

View File

@ -115,10 +115,18 @@ type API struct {
}
type CronTask struct {
// CronExecuteTime 标准 5 段 cron 表达式供聊天记录清理、S3 清理、用户消息清理及阅后即焚清理等任务共用(除非 burnCronExecuteTime 单独指定)。
// 未配置时由 FillCronTaskDefaults 设为每天 02:00与 config/openim-crontask.yml 一致)。
CronExecuteTime string `mapstructure:"cronExecuteTime"`
RetainChatRecords int `mapstructure:"retainChatRecords"`
FileExpireTime int `mapstructure:"fileExpireTime"`
DeleteObjectType []string `mapstructure:"deleteObjectType"`
// BurnCronExecuteTime 仅「单聊阅后即焚」清理ClearBurnExpiredMsgs的 cron留空则与 CronExecuteTime 相同。
BurnCronExecuteTime string `mapstructure:"burnCronExecuteTime"`
// BurnClearLimit 单次 RPC 最多处理多少个 (user, conversation) 分组;<=0 时默认 100。
BurnClearLimit int `mapstructure:"burnClearLimit"`
// BurnClearMaxLoop 单次定时触发内最多循环轮数;<=0 时默认 10000。
BurnClearMaxLoop int `mapstructure:"burnClearMaxLoop"`
// ChatAPI 是 chat HTTP API 服务的访问配置,用于调用 /account/del 等需要管理员权限的接口。
ChatAPI ChatAPI `mapstructure:"chatAPI"`
}
@ -290,9 +298,9 @@ type Group struct {
AutoSetPorts bool `mapstructure:"autoSetPorts"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"rpc"`
Prometheus Prometheus `mapstructure:"prometheus"`
EnableHistoryForNewMembers bool `mapstructure:"enableHistoryForNewMembers"`
CommonGroupsLimitWithFriend int `mapstructure:"commonGroupsLimitWithFriend"`
Prometheus Prometheus `mapstructure:"prometheus"`
EnableHistoryForNewMembers bool `mapstructure:"enableHistoryForNewMembers"`
CommonGroupsLimitWithFriend int `mapstructure:"commonGroupsLimitWithFriend"`
}
type Msg struct {
@ -483,7 +491,7 @@ type Crypto struct {
AutoSetPorts bool `mapstructure:"autoSetPorts"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"rpc"`
Prometheus Prometheus `mapstructure:"prometheus"`
Prometheus Prometheus `mapstructure:"prometheus"`
Virgil VirgilConfig `mapstructure:"virgil"`
}
@ -526,7 +534,6 @@ type RedPacketIndexer struct {
PollInterval int `mapstructure:"pollInterval"`
}
// FullConfig stores all configurations for before and after events
type Webhooks struct {
URL string `mapstructure:"url"`
@ -780,6 +787,37 @@ func (ct *CronTask) GetConfigFileName() string {
return OpenIMCronTaskCfgFileName
}
// FillCronTaskDefaults applies defaults after YAML/env load. Only fills empty or invalid placeholders.
func FillCronTaskDefaults(ct *CronTask) {
if ct == nil {
return
}
if strings.TrimSpace(ct.CronExecuteTime) == "" {
ct.CronExecuteTime = "0 2 * * *"
}
if strings.TrimSpace(ct.BurnCronExecuteTime) == "" {
ct.BurnCronExecuteTime = "*/1 * * * *"
}
if strings.TrimSpace(ct.BurnClearLimit) == "" {
ct.BurnClearLimit = 100
}
if strings.TrimSpace(ct.BurnClearMaxLoop) == "" {
ct.BurnClearMaxLoop = 10000
}
if ct.ChatAPI.Address == "" {
ct.ChatAPI.Address = "http://127.0.0.1:10008"
}
if ct.RetainChatRecords < 1 {
ct.RetainChatRecords = 365
}
if ct.BurnClearLimit < 1 {
ct.BurnClearLimit = 100
}
if ct.BurnClearMaxLoop < 1 {
ct.BurnClearMaxLoop = 10000
}
}
func (mg *MsgGateway) GetConfigFileName() string {
return OpenIMMsgGatewayCfgFileName
}

@ -1 +1 @@
Subproject commit eaf121f66a1529357897cf48f0a19643deb043f6
Subproject commit a9d1f9d497b399e9c1bc9bcf7b752a24c15959ec