1
0
mirror of https://github.com/gogf/gf.git synced 2025-04-05 11:18:50 +08:00

add supervisor for package grpool (#2252)

* add supervisor for package grpool

* up
This commit is contained in:
John Guo 2022-11-08 19:00:16 +08:00 committed by GitHub
parent 582c6eaef9
commit b000aa3dfe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 98 additions and 42 deletions

View File

@ -9,11 +9,14 @@ package grpool
import (
"context"
"time"
"github.com/gogf/gf/v2/container/glist"
"github.com/gogf/gf/v2/container/gtype"
"github.com/gogf/gf/v2/errors/gcode"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/os/gtimer"
"github.com/gogf/gf/v2/util/grand"
)
// Func is the pool function which contains context parameter.
@ -26,15 +29,20 @@ type RecoverFunc func(ctx context.Context, err error)
type Pool struct {
limit int // Max goroutine count limit.
count *gtype.Int // Current running goroutine count.
list *glist.List // Job list for asynchronous job adding purpose.
list *glist.List // List for asynchronous job adding purpose.
closed *gtype.Bool // Is pool closed or not.
}
type internalPoolItem struct {
type localPoolItem struct {
Ctx context.Context
Func Func
}
const (
minTimerDuration = 500 * time.Millisecond
maxTimerDuration = 1500 * time.Millisecond
)
// Default goroutine pool.
var (
pool = New()
@ -53,6 +61,8 @@ func New(limit ...int) *Pool {
if len(limit) > 0 && limit[0] > 0 {
p.limit = limit[0]
}
timerDuration := grand.D(minTimerDuration, maxTimerDuration)
gtimer.Add(context.Background(), timerDuration, p.supervisor)
return p
}
@ -66,8 +76,8 @@ func Add(ctx context.Context, f Func) error {
// The optional `recoverFunc` is called when any panic during executing of `userFunc`.
// If `recoverFunc` is not passed or given nil, it ignores the panic from `userFunc`.
// The job will be executed asynchronously.
func AddWithRecover(ctx context.Context, userFunc Func, recoverFunc ...RecoverFunc) error {
return pool.AddWithRecover(ctx, userFunc, recoverFunc...)
func AddWithRecover(ctx context.Context, userFunc Func, recoverFunc RecoverFunc) error {
return pool.AddWithRecover(ctx, userFunc, recoverFunc)
}
// Size returns current goroutine count of default goroutine pool.
@ -84,42 +94,68 @@ func Jobs() int {
// The job will be executed asynchronously.
func (p *Pool) Add(ctx context.Context, f Func) error {
for p.closed.Val() {
return gerror.NewCode(gcode.CodeInvalidOperation, "pool closed")
return gerror.NewCode(
gcode.CodeInvalidOperation,
"goroutine pool is already closed",
)
}
p.list.PushFront(&internalPoolItem{
p.list.PushFront(&localPoolItem{
Ctx: ctx,
Func: f,
})
// Check and fork new worker.
p.checkAndFork()
return nil
}
// checkAndFork checks and creates a new goroutine worker.
// Note that the worker dies if the job function panics and the job has no recover handling.
func (p *Pool) checkAndFork() {
// Check whether fork new goroutine or not.
var n int
for {
n = p.count.Val()
if p.limit != -1 && n >= p.limit {
// No need fork new goroutine.
return nil
return
}
if p.count.Cas(n, n+1) {
// Use CAS to guarantee atomicity.
break
}
}
p.fork()
return nil
// Create job function in goroutine.
go func() {
defer p.count.Add(-1)
var (
listItem interface{}
poolItem *localPoolItem
)
for !p.closed.Val() {
listItem = p.list.PopBack()
if listItem == nil {
return
}
poolItem = listItem.(*localPoolItem)
poolItem.Func(poolItem.Ctx)
}
}()
}
// AddWithRecover pushes a new job to the pool with specified recover function.
// The optional `recoverFunc` is called when any panic during executing of `userFunc`.
// If `recoverFunc` is not passed or given nil, it ignores the panic from `userFunc`.
// The job will be executed asynchronously.
func (p *Pool) AddWithRecover(ctx context.Context, userFunc Func, recoverFunc ...RecoverFunc) error {
func (p *Pool) AddWithRecover(ctx context.Context, userFunc Func, recoverFunc RecoverFunc) error {
return p.Add(ctx, func(ctx context.Context) {
defer func() {
if exception := recover(); exception != nil {
if len(recoverFunc) > 0 && recoverFunc[0] != nil {
if recoverFunc != nil {
if v, ok := exception.(error); ok && gerror.HasStack(v) {
recoverFunc[0](ctx, v)
recoverFunc(ctx, v)
} else {
recoverFunc[0](ctx, gerror.Newf(`%+v`, exception))
recoverFunc(ctx, gerror.Newf(`%+v`, exception))
}
}
}
@ -146,27 +182,6 @@ func (p *Pool) Jobs() int {
return p.list.Size()
}
// fork creates a new goroutine worker.
// Note that the worker dies if the job function panics.
func (p *Pool) fork() {
go func() {
defer p.count.Add(-1)
var (
listItem interface{}
poolItem *internalPoolItem
)
for !p.closed.Val() {
if listItem = p.list.PopBack(); listItem != nil {
poolItem = listItem.(*internalPoolItem)
poolItem.Func(poolItem.Ctx)
} else {
return
}
}
}()
}
// IsClosed returns if pool is closed.
func (p *Pool) IsClosed() bool {
return p.closed.Val()

View File

@ -0,0 +1,30 @@
// Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://github.com/gogf/gf.
package grpool
import (
"context"
"github.com/gogf/gf/v2/os/gtimer"
)
// supervisor checks the job list and fork new worker goroutine to handle the job
// if there are jobs but no workers in pool.
func (p *Pool) supervisor(ctx context.Context) {
if p.IsClosed() {
gtimer.Exit()
}
if p.list.Size() > 0 && p.count.Val() == 0 {
var number = p.list.Size()
if p.limit > 0 {
number = p.limit
}
for i := 0; i < number; i++ {
p.checkAndFork()
}
}
}

View File

@ -4,8 +4,6 @@
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://github.com/gogf/gf.
// go test *.go -bench=".*" -count=1
package grpool_test
import (
@ -22,19 +20,23 @@ import (
func Test_Basic(t *testing.T) {
gtest.C(t, func(t *gtest.T) {
var (
err error
wg = sync.WaitGroup{}
array = garray.NewArray(true)
size = 100
)
wg.Add(size)
for i := 0; i < size; i++ {
grpool.Add(ctx, func(ctx context.Context) {
err = grpool.Add(ctx, func(ctx context.Context) {
array.Append(1)
wg.Done()
})
t.AssertNil(err)
}
wg.Wait()
time.Sleep(100 * time.Millisecond)
t.Assert(array.Len(), size)
t.Assert(grpool.Jobs(), 0)
t.Assert(grpool.Size(), 0)
@ -64,6 +66,7 @@ func Test_Limit1(t *testing.T) {
func Test_Limit2(t *testing.T) {
gtest.C(t, func(t *gtest.T) {
var (
err error
wg = sync.WaitGroup{}
array = garray.NewArray(true)
size = 100
@ -71,10 +74,11 @@ func Test_Limit2(t *testing.T) {
)
wg.Add(size)
for i := 0; i < size; i++ {
pool.Add(ctx, func(ctx context.Context) {
err = pool.Add(ctx, func(ctx context.Context) {
defer wg.Done()
array.Append(1)
})
t.AssertNil(err)
}
wg.Wait()
t.Assert(array.Len(), size)
@ -111,18 +115,25 @@ func Test_Limit3(t *testing.T) {
func Test_AddWithRecover(t *testing.T) {
gtest.C(t, func(t *gtest.T) {
array := garray.NewArray(true)
grpool.AddWithRecover(ctx, func(ctx context.Context) {
var (
err error
array = garray.NewArray(true)
)
err = grpool.AddWithRecover(ctx, func(ctx context.Context) {
array.Append(1)
panic(1)
}, func(ctx context.Context, err error) {
array.Append(1)
})
grpool.AddWithRecover(ctx, func(ctx context.Context) {
t.AssertNil(err)
err = grpool.AddWithRecover(ctx, func(ctx context.Context) {
panic(1)
array.Append(1)
})
}, nil)
t.AssertNil(err)
time.Sleep(500 * time.Millisecond)
t.Assert(array.Len(), 2)
})
}