diff --git a/internal/tools/cron/cron_task.go b/internal/tools/cron/cron_task.go index b57baa81f..24f0762eb 100644 --- a/internal/tools/cron/cron_task.go +++ b/internal/tools/cron/cron_task.go @@ -114,8 +114,7 @@ func (c *cronServer) registerClearS3() error { } _, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, func() { c.locker.ExecuteWithLock(c.ctx, c.clearS3) - }, - ) + }) return errs.WrapMsg(err, "failed to register clear s3 cron task") } diff --git a/internal/tools/cron/dist_look.go b/internal/tools/cron/dist_look.go index 57da8254f..9edfbc794 100644 --- a/internal/tools/cron/dist_look.go +++ b/internal/tools/cron/dist_look.go @@ -56,7 +56,6 @@ func (e *EtcdLocker) Start(ctx context.Context) error { func (e *EtcdLocker) runLockLoop(ctx context.Context) { defer close(e.stoppedCh) - for { select { case <-e.stopCh: @@ -75,7 +74,6 @@ func (e *EtcdLocker) runLockLoop(ctx context.Context) { if acquired { e.runKeepAlive(ctx) - time.Sleep(e.acquireDelay) } else { e.watchLock(ctx) @@ -134,12 +132,10 @@ func (e *EtcdLocker) runKeepAlive(ctx context.Context) { atomic.StoreInt32(&e.isLockOwner, 0) // Set to false atomically return } - case <-ctx.Done(): log.ZInfo(ctx, "Context canceled, releasing lock", "instanceID", e.instanceID) e.releaseLock(ctx) return - case <-e.stopCh: log.ZInfo(ctx, "Stop signal received, releasing lock", "instanceID", e.instanceID) e.releaseLock(ctx) @@ -151,15 +147,12 @@ func (e *EtcdLocker) runKeepAlive(ctx context.Context) { // Watch lock status directly in etcd func (e *EtcdLocker) watchLock(ctx context.Context) { log.ZInfo(ctx, "Starting to watch lock status", "instanceID", e.instanceID) - watchCtx, cancel := context.WithCancel(ctx) e.watchCancel = cancel - defer e.cancelWatch() // Watch for changes to the lock key e.watchCh = e.client.Watch(watchCtx, lockKey) - for { select { case resp, ok := <-e.watchCh: @@ -167,7 +160,6 @@ func (e *EtcdLocker) watchLock(ctx context.Context) { log.ZWarn(ctx, "Watch channel closed", nil, "instanceID", e.instanceID) return } - for _, event := range resp.Events { if event.Type == clientv3.EventTypeDelete { log.ZInfo(ctx, "Lock released, attempting to acquire", "instanceID", e.instanceID) @@ -191,7 +183,6 @@ func (e *EtcdLocker) releaseLock(ctx context.Context) { leaseID := e.leaseID atomic.StoreInt32(&e.isLockOwner, 0) e.leaseID = 0 - if leaseID != 0 { _, err := e.client.Revoke(context.Background(), leaseID) if err != nil {