1
0
mirror of https://github.com/gogf/gf.git synced 2025-04-05 11:18:50 +08:00

add context parameter for package grpool

This commit is contained in:
John Guo 2021-10-30 18:09:58 +08:00
parent d817047c98
commit 42e27dd14c
6 changed files with 68 additions and 38 deletions

View File

@ -451,7 +451,7 @@ func TestCache_SetConcurrency(t *testing.T) {
pool := grpool.New(4)
go func() {
for {
pool.Add(func() {
pool.Add(ctx, func(ctx context.Context) {
cache.SetIfNotExist(ctx, 1, 11, 10)
})
}
@ -463,7 +463,7 @@ func TestCache_SetConcurrency(t *testing.T) {
go func() {
for {
pool.Add(func() {
pool.Add(ctx, func(ctx context.Context) {
cache.SetIfNotExist(ctx, 1, nil, 10)
})
}

View File

@ -216,7 +216,7 @@ func (l *Logger) print(ctx context.Context, level int, values ...interface{}) {
}
if l.config.Flags&F_ASYNC > 0 {
input.IsAsync = true
err := asyncPool.Add(func() {
err := asyncPool.Add(ctx, func(ctx context.Context) {
input.Next()
})
if err != nil {

View File

@ -8,6 +8,7 @@
package grpool
import (
"context"
"github.com/gogf/gf/v2/errors/gcode"
"github.com/gogf/gf/v2/errors/gerror"
@ -15,6 +16,9 @@ import (
"github.com/gogf/gf/v2/container/gtype"
)
// Func is the pool function which contains context parameter.
type Func func(ctx context.Context)
// Pool manages the goroutines using pool.
type Pool struct {
limit int // Max goroutine count limit.
@ -23,8 +27,15 @@ type Pool struct {
closed *gtype.Bool // Is pool closed or not.
}
type internalPoolItem struct {
Ctx context.Context
Func Func
}
// Default goroutine pool.
var pool = New()
var (
pool = New()
)
// New creates and returns a new goroutine pool object.
// The parameter `limit` is used to limit the max goroutine count,
@ -44,16 +55,16 @@ func New(limit ...int) *Pool {
// Add pushes a new job to the pool using default goroutine pool.
// The job will be executed asynchronously.
func Add(f func()) error {
return pool.Add(f)
func Add(ctx context.Context, f Func) error {
return pool.Add(ctx, f)
}
// AddWithRecover pushes a new job to the pool with specified recover function.
// The optional `recoverFunc` is called when any panic during executing of `userFunc`.
// If `recoverFunc` is not passed or given nil, it ignores the panic from `userFunc`.
// The job will be executed asynchronously.
func AddWithRecover(userFunc func(), recoverFunc ...func(err error)) error {
return pool.AddWithRecover(userFunc, recoverFunc...)
func AddWithRecover(ctx context.Context, userFunc Func, recoverFunc ...func(err error)) error {
return pool.AddWithRecover(ctx, userFunc, recoverFunc...)
}
// Size returns current goroutine count of default goroutine pool.
@ -68,11 +79,14 @@ func Jobs() int {
// Add pushes a new job to the pool.
// The job will be executed asynchronously.
func (p *Pool) Add(f func()) error {
func (p *Pool) Add(ctx context.Context, f Func) error {
for p.closed.Val() {
return gerror.NewCode(gcode.CodeInvalidOperation, "pool closed")
}
p.list.PushFront(f)
p.list.PushFront(&internalPoolItem{
Ctx: ctx,
Func: f,
})
// Check whether fork new goroutine or not.
var n int
for {
@ -94,8 +108,8 @@ func (p *Pool) Add(f func()) error {
// The optional `recoverFunc` is called when any panic during executing of `userFunc`.
// If `recoverFunc` is not passed or given nil, it ignores the panic from `userFunc`.
// The job will be executed asynchronously.
func (p *Pool) AddWithRecover(userFunc func(), recoverFunc ...func(err error)) error {
return p.Add(func() {
func (p *Pool) AddWithRecover(ctx context.Context, userFunc Func, recoverFunc ...func(err error)) error {
return p.Add(ctx, func(ctx context.Context) {
defer func() {
if exception := recover(); exception != nil {
if len(recoverFunc) > 0 && recoverFunc[0] != nil {
@ -107,7 +121,7 @@ func (p *Pool) AddWithRecover(userFunc func(), recoverFunc ...func(err error)) e
}
}
}()
userFunc()
userFunc(ctx)
})
}
@ -135,10 +149,14 @@ func (p *Pool) fork() {
go func() {
defer p.count.Add(-1)
var job interface{}
var (
listItem interface{}
poolItem *internalPoolItem
)
for !p.closed.Val() {
if job = p.list.PopBack(); job != nil {
job.(func())()
if listItem = p.list.PopBack(); listItem != nil {
poolItem = listItem.(*internalPoolItem)
poolItem.Func(poolItem.Ctx)
} else {
return
}

View File

@ -9,24 +9,29 @@
package grpool_test
import (
"context"
"testing"
"github.com/gogf/gf/v2/os/grpool"
)
func increment() {
var (
ctx = context.TODO()
)
func increment(ctx context.Context) {
for i := 0; i < 1000000; i++ {
}
}
func BenchmarkGrpool_1(b *testing.B) {
for i := 0; i < b.N; i++ {
grpool.Add(increment)
grpool.Add(ctx, increment)
}
}
func BenchmarkGoroutine_1(b *testing.B) {
for i := 0; i < b.N; i++ {
go increment()
go increment(ctx)
}
}

View File

@ -19,13 +19,13 @@ var n = 500000
func BenchmarkGrpool2(b *testing.B) {
b.N = n
for i := 0; i < b.N; i++ {
grpool.Add(increment)
grpool.Add(ctx, increment)
}
}
func BenchmarkGoroutine2(b *testing.B) {
b.N = n
for i := 0; i < b.N; i++ {
go increment()
go increment(ctx)
}
}

View File

@ -9,6 +9,7 @@
package grpool_test
import (
"context"
"sync"
"testing"
"time"
@ -20,12 +21,14 @@ import (
func Test_Basic(t *testing.T) {
gtest.C(t, func(t *gtest.T) {
wg := sync.WaitGroup{}
array := garray.NewArray(true)
size := 100
var (
wg = sync.WaitGroup{}
array = garray.NewArray(true)
size = 100
)
wg.Add(size)
for i := 0; i < size; i++ {
grpool.Add(func() {
grpool.Add(ctx, func(ctx context.Context) {
array.Append(1)
wg.Done()
})
@ -40,13 +43,15 @@ func Test_Basic(t *testing.T) {
func Test_Limit1(t *testing.T) {
gtest.C(t, func(t *gtest.T) {
wg := sync.WaitGroup{}
array := garray.NewArray(true)
size := 100
pool := grpool.New(10)
var (
wg = sync.WaitGroup{}
array = garray.NewArray(true)
size = 100
pool = grpool.New(10)
)
wg.Add(size)
for i := 0; i < size; i++ {
pool.Add(func() {
pool.Add(ctx, func(ctx context.Context) {
array.Append(1)
wg.Done()
})
@ -66,7 +71,7 @@ func Test_Limit2(t *testing.T) {
)
wg.Add(size)
for i := 0; i < size; i++ {
pool.Add(func() {
pool.Add(ctx, func(ctx context.Context) {
defer wg.Done()
array.Append(1)
})
@ -78,12 +83,14 @@ func Test_Limit2(t *testing.T) {
func Test_Limit3(t *testing.T) {
gtest.C(t, func(t *gtest.T) {
array := garray.NewArray(true)
size := 1000
pool := grpool.New(100)
var (
array = garray.NewArray(true)
size = 1000
pool = grpool.New(100)
)
t.Assert(pool.Cap(), 100)
for i := 0; i < size; i++ {
pool.Add(func() {
pool.Add(ctx, func(ctx context.Context) {
array.Append(1)
time.Sleep(2 * time.Second)
})
@ -98,20 +105,20 @@ func Test_Limit3(t *testing.T) {
t.Assert(pool.Jobs(), 900)
t.Assert(array.Len(), 100)
t.Assert(pool.IsClosed(), true)
t.AssertNE(pool.Add(func() {}), nil)
t.AssertNE(pool.Add(ctx, func(ctx context.Context) {}), nil)
})
}
func Test_AddWithRecover(t *testing.T) {
gtest.C(t, func(t *gtest.T) {
array := garray.NewArray(true)
grpool.AddWithRecover(func() {
grpool.AddWithRecover(ctx, func(ctx context.Context) {
array.Append(1)
panic(1)
}, func(err error) {
array.Append(1)
})
grpool.AddWithRecover(func() {
grpool.AddWithRecover(ctx, func(ctx context.Context) {
panic(1)
array.Append(1)
})