From f21bc47d7173885cb47ef9d00d2e234e025e31d2 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Tue, 3 Jun 2025 18:27:10 +0800 Subject: [PATCH] Update contents. --- internal/tools/cron/cron_task.go | 1 + internal/tools/cron/dist_look.go | 140 +++++++------------------------ 2 files changed, 32 insertions(+), 109 deletions(-) diff --git a/internal/tools/cron/cron_task.go b/internal/tools/cron/cron_task.go index 33ebda57d..a4de309d4 100644 --- a/internal/tools/cron/cron_task.go +++ b/internal/tools/cron/cron_task.go @@ -87,6 +87,7 @@ func Start(ctx context.Context, conf *Config, client discovery.Conn, service grp log.ZDebug(ctx, "cron task server is running") <-ctx.Done() log.ZDebug(ctx, "cron task server is shutting down") + srv.cron.Stop() return nil } diff --git a/internal/tools/cron/dist_look.go b/internal/tools/cron/dist_look.go index f7911951e..d1d2d1cb0 100644 --- a/internal/tools/cron/dist_look.go +++ b/internal/tools/cron/dist_look.go @@ -8,10 +8,11 @@ import ( "github.com/openimsdk/tools/log" clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/client/v3/concurrency" ) const ( - lockLeaseTTL = 3000 + lockLeaseTTL = 300 ) type EtcdLocker struct { @@ -33,131 +34,52 @@ func NewEtcdLocker(client *clientv3.Client) (*EtcdLocker, error) { return locker, nil } -func (e *EtcdLocker) tryAcquireTaskLock(ctx context.Context, taskName string) (clientv3.LeaseID, bool, error) { - lockKey := fmt.Sprintf("openim/crontask/%s-lock", taskName) - - lease, err := e.client.Grant(ctx, lockLeaseTTL) - if err != nil { - return 0, false, fmt.Errorf("failed to create lease: %w", err) - } - - txnResp, err := e.client.Txn(ctx). - If(clientv3.Compare(clientv3.CreateRevision(lockKey), "=", 0)). - Then(clientv3.OpPut(lockKey, e.instanceID, clientv3.WithLease(lease.ID))). - Else(clientv3.OpGet(lockKey)). - Commit() - - if err != nil { - e.client.Revoke(ctx, lease.ID) - return 0, false, fmt.Errorf("transaction failed: %w", err) - } - - if !txnResp.Succeeded { - rangeResp := txnResp.Responses[0].GetResponseRange() - if len(rangeResp.Kvs) > 0 { - currentOwner := string(rangeResp.Kvs[0].Value) - log.ZInfo(ctx, "Task lock already owned, skipping execution", - "taskName", taskName, - "instanceID", e.instanceID, - "currentOwner", currentOwner) - } - e.client.Revoke(ctx, lease.ID) - return 0, false, nil - } - - log.ZInfo(ctx, "Successfully acquired task lock", - "taskName", taskName, - "instanceID", e.instanceID, - "leaseID", lease.ID) - return lease.ID, true, nil -} - -func (e *EtcdLocker) releaseTaskLock(ctx context.Context, taskName string, leaseID clientv3.LeaseID) { - if leaseID == 0 { - return - } - - _, err := e.client.Revoke(ctx, leaseID) - if err != nil { - log.ZWarn(ctx, "Failed to revoke task lease", err, - "taskName", taskName, - "instanceID", e.instanceID, - "leaseID", leaseID) - } else { - log.ZInfo(ctx, "Successfully released task lock", - "taskName", taskName, - "instanceID", e.instanceID) - } -} - -func (e *EtcdLocker) startLeaseKeepAlive(ctx context.Context, taskName string, leaseID clientv3.LeaseID) (context.CancelFunc, error) { - keepAliveCtx, cancel := context.WithCancel(ctx) - - keepAliveCh, err := e.client.KeepAlive(keepAliveCtx, leaseID) - if err != nil { - cancel() - return nil, fmt.Errorf("failed to start keepalive: %w", err) - } - - go func() { - defer cancel() - for { - select { - case _, ok := <-keepAliveCh: - if !ok { - log.ZWarn(keepAliveCtx, "KeepAlive channel closed, lease may have expired", nil, - "taskName", taskName, - "instanceID", e.instanceID, - "leaseID", leaseID) - return - } - - case <-keepAliveCtx.Done(): - log.ZDebug(keepAliveCtx, "KeepAlive stopped", - "taskName", taskName, - "instanceID", e.instanceID) - return - } - } - }() - - return cancel, nil -} - func (e *EtcdLocker) ExecuteWithLock(ctx context.Context, taskName string, task func()) { - leaseID, acquired, err := e.tryAcquireTaskLock(ctx, taskName) + session, err := concurrency.NewSession(e.client, concurrency.WithTTL(lockLeaseTTL)) if err != nil { - log.ZWarn(ctx, "Failed to acquire task lock", err, + log.ZWarn(ctx, "Failed to create etcd session", err, "taskName", taskName, "instanceID", e.instanceID) return } + defer session.Close() - if !acquired { - log.ZDebug(ctx, "Task is being executed by another instance, skipping", - "taskName", taskName, - "instanceID", e.instanceID) - return - } + lockKey := fmt.Sprintf("openim/crontask/%s", taskName) + mutex := concurrency.NewMutex(session, lockKey) - cancelKeepAlive, err := e.startLeaseKeepAlive(ctx, taskName, leaseID) + ctxWithTimeout, cancel := context.WithTimeout(ctx, 100*time.Millisecond) + defer cancel() + + err = mutex.TryLock(ctxWithTimeout) if err != nil { - log.ZWarn(ctx, "Failed to start lease keepalive", err, - "taskName", taskName, - "instanceID", e.instanceID) - e.releaseTaskLock(ctx, taskName, leaseID) + if err == context.DeadlineExceeded { + log.ZDebug(ctx, "Task is being executed by another instance, skipping", + "taskName", taskName, + "instanceID", e.instanceID) + } else { + log.ZWarn(ctx, "Failed to acquire task lock", err, + "taskName", taskName, + "instanceID", e.instanceID) + } return } defer func() { - cancelKeepAlive() - e.releaseTaskLock(ctx, taskName, leaseID) + if err := mutex.Unlock(ctx); err != nil { + log.ZWarn(ctx, "Failed to release task lock", err, + "taskName", taskName, + "instanceID", e.instanceID) + } else { + log.ZInfo(ctx, "Successfully released task lock", + "taskName", taskName, + "instanceID", e.instanceID) + } }() - log.ZInfo(ctx, "Starting task execution with lease keepalive", + log.ZInfo(ctx, "Successfully acquired task lock, starting execution", "taskName", taskName, "instanceID", e.instanceID, - "leaseID", leaseID) + "sessionID", session.Lease()) task()