From d992d1fb4114e0b3b18708a046976593897e3104 Mon Sep 17 00:00:00 2001 From: hawklin2017 <32898629+hawklin2017@users.noreply.github.com> Date: Wed, 13 May 2026 20:50:34 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B6=88=E6=81=AF=E9=98=85=E5=90=8E=E5=8D=B3?= =?UTF-8?q?=E7=84=9A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/openim-crontask.yml | 5 +++- internal/tools/burn_msg.go | 10 ++++---- internal/tools/cron_task.go | 34 +++++++++++++++----------- pkg/common/config/config.go | 48 +++++++++++++++++++++++++++++++++---- protocol | 2 +- 5 files changed, 72 insertions(+), 27 deletions(-) diff --git a/config/openim-crontask.yml b/config/openim-crontask.yml index ccce6cfd7..0cf66780c 100644 --- a/config/openim-crontask.yml +++ b/config/openim-crontask.yml @@ -1,6 +1,9 @@ cronExecuteTime: 0 2 * * * +burnCronExecuteTime: "*/1 * * * *" # 可选:仅阅后即焚清理使用;不配置则与 cronExecuteTime 相同 +burnClearLimit: 100 # 可选:单次 RPC 批大小,默认 100 +burnClearMaxLoop: 100 # 可选:单次定时内最大循环轮数,默认 10000 retainChatRecords: 365 fileExpireTime: 180 deleteObjectType: ["msg-picture","msg-file", "msg-voice","msg-video","msg-video-snapshot","sdklog"] chatAPI: - address: http://127.0.0.1:10008 \ No newline at end of file + address: http://127.0.0.1:10008 diff --git a/internal/tools/burn_msg.go b/internal/tools/burn_msg.go index 597acccda..560bb232f 100644 --- a/internal/tools/burn_msg.go +++ b/internal/tools/burn_msg.go @@ -25,17 +25,15 @@ import ( ) // clearBurnExpiredMsgs 阅后即焚 cron 入口:循环调用 conversation 服务的 -// ClearBurnExpiredMsgs,每次至多处理 burnLimit 个 (user, conversation) 分组, -// 直至本轮没有新的过期分组或达到防御性的最大循环次数。 +// ClearBurnExpiredMsgs,每次至多处理 burnClearLimit 个 (user, conversation) 分组, +// 直至本轮没有新的过期分组或达到 burnClearMaxLoop 轮。 func (c *cronServer) clearBurnExpiredMsgs() { now := time.Now() operationID := fmt.Sprintf("cron_burn_msg_%d_%d", os.Getpid(), now.UnixMilli()) ctx := mcontext.SetOperationID(c.ctx, operationID) log.ZDebug(ctx, "clear burn expired msgs cron start") - const ( - maxLoop = 10000 - burnLimit = 100 - ) + burnLimit := int32(c.config.CronTask.BurnClearLimit) + maxLoop := c.config.CronTask.BurnClearMaxLoop var count int for i := 1; i <= maxLoop; i++ { resp, err := c.conversationClient.ClearBurnExpiredMsgs(ctx, &pbconversation.ClearBurnExpiredMsgsReq{ diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index 8dcb0195a..7b6c027a6 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -16,6 +16,7 @@ package tools import ( "context" + "strings" "github.com/openimsdk/open-im-server/v3/pkg/common/config" kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" @@ -44,36 +45,37 @@ type CronTaskConfig struct { MongodbConfig config.Mongo } -func Start(ctx context.Context, config *CronTaskConfig) error { - log.CInfo(ctx, "CRON-TASK server is initializing", "chatRecordsClearTime", config.CronTask.CronExecuteTime, "msgDestructTime", config.CronTask.RetainChatRecords) - if config.CronTask.RetainChatRecords < 1 { +func Start(ctx context.Context, cfg *CronTaskConfig) error { + config.FillCronTaskDefaults(&cfg.CronTask) + log.CInfo(ctx, "CRON-TASK server is initializing", "chatRecordsClearTime", cfg.CronTask.CronExecuteTime, "msgDestructTime", cfg.CronTask.RetainChatRecords) + if cfg.CronTask.RetainChatRecords < 1 { return errs.New("msg destruct time must be greater than 1").Wrap() } - client, err := kdisc.NewDiscoveryRegister(&config.Discovery, &config.Share, nil) + client, err := kdisc.NewDiscoveryRegister(&cfg.Discovery, &cfg.Share, nil) if err != nil { return errs.WrapMsg(err, "failed to register discovery service") } client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials())) - ctx = mcontext.SetOpUserID(ctx, config.Share.IMAdminUserID[0]) + ctx = mcontext.SetOpUserID(ctx, cfg.Share.IMAdminUserID[0]) - msgConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Msg) + msgConn, err := client.GetConn(ctx, cfg.Share.RpcRegisterName.Msg) if err != nil { return err } - thirdConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Third) + thirdConn, err := client.GetConn(ctx, cfg.Share.RpcRegisterName.Third) if err != nil { return err } - conversationConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Conversation) + conversationConn, err := client.GetConn(ctx, cfg.Share.RpcRegisterName.Conversation) if err != nil { return err } - authConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Auth) + authConn, err := client.GetConn(ctx, cfg.Share.RpcRegisterName.Auth) if err != nil { return err } - mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build()) + mgocli, err := mongoutil.NewMongoDB(ctx, cfg.MongodbConfig.Build()) if err != nil { return errs.WrapMsg(err, "crontask: connect mongodb failed") } @@ -86,14 +88,14 @@ func Start(ctx context.Context, config *CronTaskConfig) error { srv := &cronServer{ ctx: ctx, - config: config, + config: cfg, cron: cron.New(), msgClient: msg.NewMsgClient(msgConn), conversationClient: pbconversation.NewConversationClient(conversationConn), thirdClient: third.NewThirdClient(thirdConn), authClient: rpcli.NewAuthClient(authConn), userOfflineRecordDB: userOfflineRecordDB, - chatAPIAddress: config.CronTask.ChatAPI.Address, + chatAPIAddress: cfg.CronTask.ChatAPI.Address, } if err := srv.registerClearS3(); err != nil { @@ -111,7 +113,7 @@ func Start(ctx context.Context, config *CronTaskConfig) error { if err := srv.registerDeleteExpiredOfflineUsers(); err != nil { return err } - log.ZDebug(ctx, "start cron task", "CronExecuteTime", config.CronTask.CronExecuteTime) + log.ZDebug(ctx, "start cron task", "CronExecuteTime", cfg.CronTask.CronExecuteTime) srv.cron.Start() <-ctx.Done() return nil @@ -153,7 +155,11 @@ func (c *cronServer) registerClearUserMsg() error { } func (c *cronServer) registerClearBurnExpiredMsgs() error { - _, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, c.clearBurnExpiredMsgs) + schedule := strings.TrimSpace(c.config.CronTask.BurnCronExecuteTime) + if schedule == "" { + schedule = c.config.CronTask.CronExecuteTime + } + _, err := c.cron.AddFunc(schedule, c.clearBurnExpiredMsgs) return errs.WrapMsg(err, "failed to register clear burn expired msgs cron task") } diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 40ce852b1..cf79a6fc2 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -115,10 +115,18 @@ type API struct { } type CronTask struct { + // CronExecuteTime 标准 5 段 cron 表达式,供聊天记录清理、S3 清理、用户消息清理及阅后即焚清理等任务共用(除非 burnCronExecuteTime 单独指定)。 + // 未配置时由 FillCronTaskDefaults 设为每天 02:00(与 config/openim-crontask.yml 一致)。 CronExecuteTime string `mapstructure:"cronExecuteTime"` RetainChatRecords int `mapstructure:"retainChatRecords"` FileExpireTime int `mapstructure:"fileExpireTime"` DeleteObjectType []string `mapstructure:"deleteObjectType"` + // BurnCronExecuteTime 仅「单聊阅后即焚」清理(ClearBurnExpiredMsgs)的 cron;留空则与 CronExecuteTime 相同。 + BurnCronExecuteTime string `mapstructure:"burnCronExecuteTime"` + // BurnClearLimit 单次 RPC 最多处理多少个 (user, conversation) 分组;<=0 时默认 100。 + BurnClearLimit int `mapstructure:"burnClearLimit"` + // BurnClearMaxLoop 单次定时触发内最多循环轮数;<=0 时默认 10000。 + BurnClearMaxLoop int `mapstructure:"burnClearMaxLoop"` // ChatAPI 是 chat HTTP API 服务的访问配置,用于调用 /account/del 等需要管理员权限的接口。 ChatAPI ChatAPI `mapstructure:"chatAPI"` } @@ -290,9 +298,9 @@ type Group struct { AutoSetPorts bool `mapstructure:"autoSetPorts"` Ports []int `mapstructure:"ports"` } `mapstructure:"rpc"` - Prometheus Prometheus `mapstructure:"prometheus"` - EnableHistoryForNewMembers bool `mapstructure:"enableHistoryForNewMembers"` - CommonGroupsLimitWithFriend int `mapstructure:"commonGroupsLimitWithFriend"` + Prometheus Prometheus `mapstructure:"prometheus"` + EnableHistoryForNewMembers bool `mapstructure:"enableHistoryForNewMembers"` + CommonGroupsLimitWithFriend int `mapstructure:"commonGroupsLimitWithFriend"` } type Msg struct { @@ -483,7 +491,7 @@ type Crypto struct { AutoSetPorts bool `mapstructure:"autoSetPorts"` Ports []int `mapstructure:"ports"` } `mapstructure:"rpc"` - Prometheus Prometheus `mapstructure:"prometheus"` + Prometheus Prometheus `mapstructure:"prometheus"` Virgil VirgilConfig `mapstructure:"virgil"` } @@ -526,7 +534,6 @@ type RedPacketIndexer struct { PollInterval int `mapstructure:"pollInterval"` } - // FullConfig stores all configurations for before and after events type Webhooks struct { URL string `mapstructure:"url"` @@ -780,6 +787,37 @@ func (ct *CronTask) GetConfigFileName() string { return OpenIMCronTaskCfgFileName } +// FillCronTaskDefaults applies defaults after YAML/env load. Only fills empty or invalid placeholders. +func FillCronTaskDefaults(ct *CronTask) { + if ct == nil { + return + } + if strings.TrimSpace(ct.CronExecuteTime) == "" { + ct.CronExecuteTime = "0 2 * * *" + } + if strings.TrimSpace(ct.BurnCronExecuteTime) == "" { + ct.BurnCronExecuteTime = "*/1 * * * *" + } + if strings.TrimSpace(ct.BurnClearLimit) == "" { + ct.BurnClearLimit = 100 + } + if strings.TrimSpace(ct.BurnClearMaxLoop) == "" { + ct.BurnClearMaxLoop = 10000 + } + if ct.ChatAPI.Address == "" { + ct.ChatAPI.Address = "http://127.0.0.1:10008" + } + if ct.RetainChatRecords < 1 { + ct.RetainChatRecords = 365 + } + if ct.BurnClearLimit < 1 { + ct.BurnClearLimit = 100 + } + if ct.BurnClearMaxLoop < 1 { + ct.BurnClearMaxLoop = 10000 + } +} + func (mg *MsgGateway) GetConfigFileName() string { return OpenIMMsgGatewayCfgFileName } diff --git a/protocol b/protocol index eaf121f66..a9d1f9d49 160000 --- a/protocol +++ b/protocol @@ -1 +1 @@ -Subproject commit eaf121f66a1529357897cf48f0a19643deb043f6 +Subproject commit a9d1f9d497b399e9c1bc9bcf7b752a24c15959ec