mirror of
https://github.com/gogf/gf.git
synced 2025-04-05 11:18:50 +08:00
os/gcron: add graceful shutdown support (#3625)
This commit is contained in:
parent
f8272bc5f4
commit
8824b8b0fe
@ -120,3 +120,8 @@ func Start(name ...string) {
|
||||
func Stop(name ...string) {
|
||||
defaultCron.Stop(name...)
|
||||
}
|
||||
|
||||
// StopGracefully Blocks and waits all current running jobs done.
|
||||
func StopGracefully() {
|
||||
defaultCron.StopGracefully()
|
||||
}
|
||||
|
@ -8,6 +8,7 @@ package gcron
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gogf/gf/v2/container/garray"
|
||||
@ -19,10 +20,11 @@ import (
|
||||
|
||||
// Cron stores all the cron job entries.
|
||||
type Cron struct {
|
||||
idGen *gtype.Int64 // Used for unique name generation.
|
||||
status *gtype.Int // Timed task status(0: Not Start; 1: Running; 2: Stopped; -1: Closed)
|
||||
entries *gmap.StrAnyMap // All timed task entries.
|
||||
logger glog.ILogger // Logger, it is nil in default.
|
||||
idGen *gtype.Int64 // Used for unique name generation.
|
||||
status *gtype.Int // Timed task status(0: Not Start; 1: Running; 2: Stopped; -1: Closed)
|
||||
entries *gmap.StrAnyMap // All timed task entries.
|
||||
logger glog.ILogger // Logger, it is nil in default.
|
||||
jobWaiter sync.WaitGroup // Graceful shutdown when cron jobs are stopped.
|
||||
}
|
||||
|
||||
// New returns a new Cron object with default settings.
|
||||
@ -187,6 +189,12 @@ func (c *Cron) Stop(name ...string) {
|
||||
}
|
||||
}
|
||||
|
||||
// StopGracefully Blocks and waits all current running jobs done.
|
||||
func (c *Cron) StopGracefully() {
|
||||
c.status.Set(StatusStopped)
|
||||
c.jobWaiter.Wait()
|
||||
}
|
||||
|
||||
// Remove deletes scheduled task which named `name`.
|
||||
func (c *Cron) Remove(name string) {
|
||||
if v := c.entries.Get(name); v != nil {
|
||||
|
@ -152,7 +152,9 @@ func (e *Entry) checkAndRun(ctx context.Context) {
|
||||
e.Close()
|
||||
|
||||
case StatusReady, StatusRunning:
|
||||
e.cron.jobWaiter.Add(1)
|
||||
defer func() {
|
||||
e.cron.jobWaiter.Done()
|
||||
if exception := recover(); exception != nil {
|
||||
// Exception caught, it logs the error content to logger in default behavior.
|
||||
e.logErrorf(ctx,
|
||||
|
@ -8,8 +8,12 @@ package gcron_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/gogf/gf/v2/frame/g"
|
||||
"github.com/gogf/gf/v2/os/gcron"
|
||||
"github.com/gogf/gf/v2/os/glog"
|
||||
)
|
||||
@ -21,3 +25,24 @@ func ExampleCronAddSingleton() {
|
||||
})
|
||||
select {}
|
||||
}
|
||||
|
||||
func ExampleCronGracefulShutdown() {
|
||||
_, err := gcron.Add(ctx, "*/2 * * * * *", func(ctx context.Context) {
|
||||
g.Log().Debug(ctx, "Every 2s job start")
|
||||
time.Sleep(5 * time.Second)
|
||||
g.Log().Debug(ctx, "Every 2s job after 5 second end")
|
||||
}, "MyCronJob")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
quit := make(chan os.Signal, 1)
|
||||
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
sig := <-quit
|
||||
glog.Printf(ctx, "Signal received: %s, stopping cron", sig)
|
||||
|
||||
glog.Print(ctx, "Waiting for all cron jobs to complete...")
|
||||
gcron.StopGracefully()
|
||||
glog.Print(ctx, "All cron jobs completed")
|
||||
}
|
||||
|
@ -9,12 +9,16 @@ package gcron_test
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gogf/gf/v2/container/garray"
|
||||
"github.com/gogf/gf/v2/frame/g"
|
||||
"github.com/gogf/gf/v2/os/gcron"
|
||||
"github.com/gogf/gf/v2/os/glog"
|
||||
"github.com/gogf/gf/v2/test/gtest"
|
||||
)
|
||||
|
||||
@ -277,3 +281,41 @@ func TestCron_DelayAddTimes(t *testing.T) {
|
||||
t.Assert(cron.Size(), 0)
|
||||
})
|
||||
}
|
||||
|
||||
func TestCron_JobWaiter(t *testing.T) {
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
var err error
|
||||
s1 := garray.New(true)
|
||||
s2 := garray.New(true)
|
||||
_, err = gcron.Add(ctx, "* * * * * *", func(ctx context.Context) {
|
||||
g.Log().Debug(ctx, "Every second")
|
||||
s1.Append(struct{}{})
|
||||
}, "MyFirstCronJob")
|
||||
t.Assert(err, nil)
|
||||
_, err = gcron.Add(ctx, "*/2 * * * * *", func(ctx context.Context) {
|
||||
g.Log().Debug(ctx, "Every 2s job start")
|
||||
time.Sleep(3 * time.Second)
|
||||
s2.Append(struct{}{})
|
||||
g.Log().Debug(ctx, "Every 2s job after 3 second end")
|
||||
}, "MySecondCronJob")
|
||||
t.Assert(err, nil)
|
||||
|
||||
quit := make(chan os.Signal, 1)
|
||||
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
go func() {
|
||||
time.Sleep(4 * time.Second) // Ensure that the job is triggered twice
|
||||
glog.Print(ctx, "Sending SIGINT")
|
||||
quit <- syscall.SIGINT // Send SIGINT
|
||||
}()
|
||||
|
||||
sig := <-quit
|
||||
glog.Printf(ctx, "Signal received: %s, stopping cron", sig)
|
||||
|
||||
glog.Print(ctx, "Waiting for all cron jobs to complete...")
|
||||
gcron.StopGracefully()
|
||||
glog.Print(ctx, "All cron jobs completed")
|
||||
t.Assert(s1.Len(), 4)
|
||||
t.Assert(s2.Len(), 2)
|
||||
})
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user