From e78b01b296e5eec63e50ec727e9801905b3d8d1e Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Tue, 3 Jun 2025 16:30:21 +0800 Subject: [PATCH] feat: support distributed lock in crontask. --- internal/tools/cron/cron_task.go | 27 +++- internal/tools/cron/dist_look.go | 260 +++++++++++++++++++++++++++++++ start-config.yml | 2 +- 3 files changed, 285 insertions(+), 4 deletions(-) create mode 100644 internal/tools/cron/dist_look.go diff --git a/internal/tools/cron/cron_task.go b/internal/tools/cron/cron_task.go index 7ae314193..b57baa81f 100644 --- a/internal/tools/cron/cron_task.go +++ b/internal/tools/cron/cron_task.go @@ -58,6 +58,15 @@ func Start(ctx context.Context, conf *Config, client discovery.Conn, service grp cm.Watch(ctx) } + locker, err := NewEtcdLocker(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient()) + if err != nil { + return err + } + + if err := locker.Start(ctx); err != nil { + return err + } + srv := &cronServer{ ctx: ctx, config: conf, @@ -65,6 +74,7 @@ func Start(ctx context.Context, conf *Config, client discovery.Conn, service grp msgClient: msg.NewMsgClient(msgConn), conversationClient: pbconversation.NewConversationClient(conversationConn), thirdClient: third.NewThirdClient(thirdConn), + locker: locker, } if err := srv.registerClearS3(); err != nil { @@ -81,6 +91,9 @@ func Start(ctx context.Context, conf *Config, client discovery.Conn, service grp log.ZDebug(ctx, "cron task server is running") <-ctx.Done() log.ZDebug(ctx, "cron task server is shutting down") + + locker.Stop() // release distributed lock + return nil } @@ -91,6 +104,7 @@ type cronServer struct { msgClient msg.MsgClient conversationClient pbconversation.ConversationClient thirdClient third.ThirdClient + locker *EtcdLocker } func (c *cronServer) registerClearS3() error { @@ -98,7 +112,10 @@ func (c *cronServer) registerClearS3() error { log.ZInfo(c.ctx, "disable scheduled cleanup of s3", "fileExpireTime", c.config.CronTask.FileExpireTime, "deleteObjectType", c.config.CronTask.DeleteObjectType) return nil } - _, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, c.clearS3) + _, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, func() { + c.locker.ExecuteWithLock(c.ctx, c.clearS3) + }, + ) return errs.WrapMsg(err, "failed to register clear s3 cron task") } @@ -107,11 +124,15 @@ func (c *cronServer) registerDeleteMsg() error { log.ZInfo(c.ctx, "disable scheduled cleanup of chat records", "retainChatRecords", c.config.CronTask.RetainChatRecords) return nil } - _, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, c.deleteMsg) + _, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, func() { + c.locker.ExecuteWithLock(c.ctx, c.deleteMsg) + }) return errs.WrapMsg(err, "failed to register delete msg cron task") } func (c *cronServer) registerClearUserMsg() error { - _, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, c.clearUserMsg) + _, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, func() { + c.locker.ExecuteWithLock(c.ctx, c.clearUserMsg) + }) return errs.WrapMsg(err, "failed to register clear user msg cron task") } diff --git a/internal/tools/cron/dist_look.go b/internal/tools/cron/dist_look.go new file mode 100644 index 000000000..57da8254f --- /dev/null +++ b/internal/tools/cron/dist_look.go @@ -0,0 +1,260 @@ +package cron + +import ( + "context" + "fmt" + "os" + "sync/atomic" + "time" + + "github.com/openimsdk/tools/log" + clientv3 "go.etcd.io/etcd/client/v3" +) + +const ( + lockKey = "openim/crontask/dist-lock" + lockLeaseTTL = 15 // Lease TTL in seconds + acquireRetryDelay = 500 * time.Millisecond +) + +type EtcdLocker struct { + client *clientv3.Client + instanceID string + leaseID clientv3.LeaseID + isLockOwner int32 // Using atomic for lock ownership check + watchCh clientv3.WatchChan + watchCancel context.CancelFunc + leaseTTL int64 + stopCh chan struct{} + stoppedCh chan struct{} + acquireDelay time.Duration +} + +// NewEtcdLocker creates a new etcd distributed lock +func NewEtcdLocker(client *clientv3.Client) (*EtcdLocker, error) { + hostname, _ := os.Hostname() + pid := os.Getpid() + instanceID := fmt.Sprintf("%s-pid-%d-%d", hostname, pid, time.Now().UnixNano()) + + locker := &EtcdLocker{ + client: client, + instanceID: instanceID, + leaseTTL: lockLeaseTTL, + stopCh: make(chan struct{}), + stoppedCh: make(chan struct{}), + acquireDelay: acquireRetryDelay, + } + + return locker, nil +} + +func (e *EtcdLocker) Start(ctx context.Context) error { + log.ZInfo(ctx, "Starting etcd distributed lock", "instanceID", e.instanceID) + go e.runLockLoop(ctx) + return nil +} + +func (e *EtcdLocker) runLockLoop(ctx context.Context) { + defer close(e.stoppedCh) + + for { + select { + case <-e.stopCh: + e.releaseLock(ctx) + return + case <-ctx.Done(): + e.releaseLock(ctx) + return + default: + acquired, err := e.tryAcquireLock(ctx) + if err != nil { + log.ZWarn(ctx, "Failed to acquire lock", err, "instanceID", e.instanceID) + time.Sleep(e.acquireDelay) + continue + } + + if acquired { + e.runKeepAlive(ctx) + + time.Sleep(e.acquireDelay) + } else { + e.watchLock(ctx) + } + } + } +} + +func (e *EtcdLocker) tryAcquireLock(ctx context.Context) (bool, error) { + lease, err := e.client.Grant(ctx, e.leaseTTL) + if err != nil { + return false, fmt.Errorf("failed to create lease: %w", err) + } + + txnResp, err := e.client.Txn(ctx). + If(clientv3.Compare(clientv3.CreateRevision(lockKey), "=", 0)). + Then(clientv3.OpPut(lockKey, e.instanceID, clientv3.WithLease(lease.ID))). + Else(clientv3.OpGet(lockKey)). + Commit() + + if err != nil { + e.client.Revoke(ctx, lease.ID) + return false, fmt.Errorf("transaction failed: %w", err) + } + + if !txnResp.Succeeded { + rangeResp := txnResp.Responses[0].GetResponseRange() + if len(rangeResp.Kvs) > 0 { + currentOwner := string(rangeResp.Kvs[0].Value) + log.ZInfo(ctx, "Lock already owned", "instanceID", e.instanceID, "owner", currentOwner) + } + + e.client.Revoke(ctx, lease.ID) + return false, nil + } + + e.leaseID = lease.ID + atomic.StoreInt32(&e.isLockOwner, 1) + log.ZInfo(ctx, "Successfully acquired lock", "instanceID", e.instanceID, "leaseID", lease.ID) + return true, nil +} + +func (e *EtcdLocker) runKeepAlive(ctx context.Context) { + keepAliveCh, err := e.client.KeepAlive(ctx, e.leaseID) + if err != nil { + log.ZError(ctx, "Failed to start lease keepalive", err, "instanceID", e.instanceID) + e.releaseLock(ctx) + return + } + + for { + select { + case _, ok := <-keepAliveCh: + if !ok { + log.ZWarn(ctx, "Keepalive channel closed, lock lost", nil, "instanceID", e.instanceID) + atomic.StoreInt32(&e.isLockOwner, 0) // Set to false atomically + return + } + + case <-ctx.Done(): + log.ZInfo(ctx, "Context canceled, releasing lock", "instanceID", e.instanceID) + e.releaseLock(ctx) + return + + case <-e.stopCh: + log.ZInfo(ctx, "Stop signal received, releasing lock", "instanceID", e.instanceID) + e.releaseLock(ctx) + return + } + } +} + +// Watch lock status directly in etcd +func (e *EtcdLocker) watchLock(ctx context.Context) { + log.ZInfo(ctx, "Starting to watch lock status", "instanceID", e.instanceID) + + watchCtx, cancel := context.WithCancel(ctx) + e.watchCancel = cancel + + defer e.cancelWatch() + + // Watch for changes to the lock key + e.watchCh = e.client.Watch(watchCtx, lockKey) + + for { + select { + case resp, ok := <-e.watchCh: + if !ok { + log.ZWarn(ctx, "Watch channel closed", nil, "instanceID", e.instanceID) + return + } + + for _, event := range resp.Events { + if event.Type == clientv3.EventTypeDelete { + log.ZInfo(ctx, "Lock released, attempting to acquire", "instanceID", e.instanceID) + return + } + } + case <-ctx.Done(): + return + case <-e.stopCh: + return + } + } +} + +// Release the lock +func (e *EtcdLocker) releaseLock(ctx context.Context) { + if atomic.LoadInt32(&e.isLockOwner) == 0 { + return + } + + leaseID := e.leaseID + atomic.StoreInt32(&e.isLockOwner, 0) + e.leaseID = 0 + + if leaseID != 0 { + _, err := e.client.Revoke(context.Background(), leaseID) + if err != nil { + log.ZWarn(ctx, "Failed to revoke lease", err, "instanceID", e.instanceID, "error", err) + } else { + log.ZInfo(ctx, "Successfully released lock", "instanceID", e.instanceID) + } + } +} + +func (e *EtcdLocker) CheckLockOwnership(ctx context.Context) (bool, error) { + if atomic.LoadInt32(&e.isLockOwner) == 0 { + return false, nil + } + + resp, err := e.client.Get(ctx, lockKey) + if err != nil { + return false, fmt.Errorf("failed to check lock status: %w", err) + } + if len(resp.Kvs) > 0 && string(resp.Kvs[0].Value) == e.instanceID { + return true, nil + } + if atomic.LoadInt32(&e.isLockOwner) == 1 { + log.ZWarn(ctx, "Lock ownership lost unexpectedly", nil, "instanceID", e.instanceID) + atomic.StoreInt32(&e.isLockOwner, 0) + } + + return false, nil +} + +func (e *EtcdLocker) cancelWatch() { + if e.watchCancel != nil { + e.watchCancel() + e.watchCancel = nil + } +} + +func (e *EtcdLocker) Stop() { + e.cancelWatch() + close(e.stopCh) + <-e.stoppedCh +} + +func (e *EtcdLocker) IsLockOwner() bool { + return atomic.LoadInt32(&e.isLockOwner) == 1 +} + +func (e *EtcdLocker) ExecuteWithLock(ctx context.Context, task func()) { + if atomic.LoadInt32(&e.isLockOwner) == 0 { + log.ZDebug(ctx, "Instance does not own lock (local check), skipping task execution", "instanceID", e.instanceID) + return + } + isOwner, err := e.CheckLockOwnership(ctx) + if err != nil { + log.ZWarn(ctx, "Failed to verify lock ownership", err, "instanceID", e.instanceID) + return + } + if !isOwner { + log.ZDebug(ctx, "Instance does not own lock (etcd verification), skipping task execution", "instanceID", e.instanceID) + return + } + + log.ZInfo(ctx, "Starting lock-protected task execution", "instanceID", e.instanceID) + task() + log.ZInfo(ctx, "Lock-protected task execution completed", "instanceID", e.instanceID) +} diff --git a/start-config.yml b/start-config.yml index 1231b5d0d..da959044d 100644 --- a/start-config.yml +++ b/start-config.yml @@ -1,6 +1,6 @@ serviceBinaries: openim-api: 1 - openim-crontask: 1 + openim-crontask: 4 openim-rpc-user: 1 openim-msggateway: 1 openim-push: 8