mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-06-27 13:03:54 +08:00
* pkg/tools/batcher: stop scheduler panicking when b.data is closed externally
scheduler()'s defer unconditionally calls close(b.data). If the channel
was closed by the caller (or an upstream producer) instead of via the
normal Close()-sends-nil path, the receive on b.data returns ok == false,
scheduler returns, and the deferred close(b.data) then fires on an
already-closed channel:
panic: close of closed channel
reliably reproducible under the #3653 steps (manually closing b.data
while Start() is running).
Track whether we observed the external-close via a local
`externallyClosed` flag set in the `ok == false` branch. The defer
only closes b.data when that flag is false, i.e. when the scheduler
exited through the nil-message or ticker paths and still owns the
channel. No behaviour change on the graceful Close() path.
Fixes #3653
* ci: retrigger after transient gha outage
Signed-off-by: SAY-5 <say.apm35@gmail.com>
---------
Signed-off-by: SAY-5 <say.apm35@gmail.com>
(cherry picked from commit b3a7342a42ca7daf3e01e275fb0e7d166fb16a58)
283 lines
5.9 KiB
Go
283 lines
5.9 KiB
Go
package batcher
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"github.com/openimsdk/tools/errs"
|
|
"github.com/openimsdk/tools/utils/idutil"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
var (
|
|
DefaultDataChanSize = 1000
|
|
DefaultSize = 100
|
|
DefaultBuffer = 100
|
|
DefaultWorker = 5
|
|
DefaultInterval = time.Second
|
|
)
|
|
|
|
type Config struct {
|
|
size int // Number of message aggregations
|
|
buffer int // The number of caches running in a single coroutine
|
|
dataBuffer int // The size of the main data channel
|
|
worker int // Number of coroutines processed in parallel
|
|
interval time.Duration // Time of message aggregations
|
|
syncWait bool // Whether to wait synchronously after distributing messages have been consumed
|
|
}
|
|
|
|
type Option func(c *Config)
|
|
|
|
func WithSize(s int) Option {
|
|
return func(c *Config) {
|
|
c.size = s
|
|
}
|
|
}
|
|
|
|
func WithBuffer(b int) Option {
|
|
return func(c *Config) {
|
|
c.buffer = b
|
|
}
|
|
}
|
|
|
|
func WithWorker(w int) Option {
|
|
return func(c *Config) {
|
|
c.worker = w
|
|
}
|
|
}
|
|
|
|
func WithInterval(i time.Duration) Option {
|
|
return func(c *Config) {
|
|
c.interval = i
|
|
}
|
|
}
|
|
|
|
func WithSyncWait(wait bool) Option {
|
|
return func(c *Config) {
|
|
c.syncWait = wait
|
|
}
|
|
}
|
|
|
|
func WithDataBuffer(size int) Option {
|
|
return func(c *Config) {
|
|
c.dataBuffer = size
|
|
}
|
|
}
|
|
|
|
type Batcher[T any] struct {
|
|
config *Config
|
|
|
|
globalCtx context.Context
|
|
cancel context.CancelFunc
|
|
Do func(ctx context.Context, channelID int, val *Msg[T])
|
|
OnComplete func(lastMessage *T, totalCount int)
|
|
Sharding func(key string) int
|
|
Key func(data *T) string
|
|
HookFunc func(triggerID string, messages map[string][]*T, totalCount int, lastMessage *T)
|
|
data chan *T
|
|
chArrays []chan *Msg[T]
|
|
wait sync.WaitGroup
|
|
counter sync.WaitGroup
|
|
}
|
|
|
|
func emptyOnComplete[T any](*T, int) {}
|
|
func emptyHookFunc[T any](string, map[string][]*T, int, *T) {
|
|
}
|
|
|
|
func New[T any](opts ...Option) *Batcher[T] {
|
|
b := &Batcher[T]{
|
|
OnComplete: emptyOnComplete[T],
|
|
HookFunc: emptyHookFunc[T],
|
|
}
|
|
config := &Config{
|
|
size: DefaultSize,
|
|
buffer: DefaultBuffer,
|
|
worker: DefaultWorker,
|
|
interval: DefaultInterval,
|
|
}
|
|
for _, opt := range opts {
|
|
opt(config)
|
|
}
|
|
b.config = config
|
|
b.data = make(chan *T, DefaultDataChanSize)
|
|
b.globalCtx, b.cancel = context.WithCancel(context.Background())
|
|
|
|
b.chArrays = make([]chan *Msg[T], b.config.worker)
|
|
for i := 0; i < b.config.worker; i++ {
|
|
b.chArrays[i] = make(chan *Msg[T], b.config.buffer)
|
|
}
|
|
return b
|
|
}
|
|
|
|
func (b *Batcher[T]) Worker() int {
|
|
return b.config.worker
|
|
}
|
|
|
|
func (b *Batcher[T]) Start() error {
|
|
if b.Sharding == nil {
|
|
return errs.New("Sharding function is required").Wrap()
|
|
}
|
|
if b.Do == nil {
|
|
return errs.New("Do function is required").Wrap()
|
|
}
|
|
if b.Key == nil {
|
|
return errs.New("Key function is required").Wrap()
|
|
}
|
|
b.wait.Add(b.config.worker)
|
|
for i := 0; i < b.config.worker; i++ {
|
|
go b.run(i, b.chArrays[i])
|
|
}
|
|
b.wait.Add(1)
|
|
go b.scheduler()
|
|
return nil
|
|
}
|
|
|
|
func (b *Batcher[T]) Put(ctx context.Context, data *T) error {
|
|
if data == nil {
|
|
return errs.New("data can not be nil").Wrap()
|
|
}
|
|
select {
|
|
case <-b.globalCtx.Done():
|
|
return errs.New("data channel is closed").Wrap()
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case b.data <- data:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (b *Batcher[T]) scheduler() {
|
|
ticker := time.NewTicker(b.config.interval)
|
|
// Track whether b.data was closed by an external caller so the
|
|
// cleanup below does not close it a second time. The only routes
|
|
// out of this function that leave b.data open are the nil-message
|
|
// and ticker paths; the ok == false branch means someone already
|
|
// closed the channel, and calling close(b.data) again there would
|
|
// panic with "close of closed channel" (#3653).
|
|
externallyClosed := false
|
|
defer func() {
|
|
ticker.Stop()
|
|
for _, ch := range b.chArrays {
|
|
close(ch)
|
|
}
|
|
if !externallyClosed {
|
|
close(b.data)
|
|
}
|
|
b.wait.Done()
|
|
}()
|
|
|
|
vals := make(map[string][]*T)
|
|
count := 0
|
|
var lastAny *T
|
|
|
|
for {
|
|
select {
|
|
case data, ok := <-b.data:
|
|
if !ok {
|
|
// If the data channel is closed unexpectedly
|
|
externallyClosed = true
|
|
return
|
|
}
|
|
if data == nil {
|
|
if count > 0 {
|
|
b.distributeMessage(vals, count, lastAny)
|
|
}
|
|
return
|
|
}
|
|
|
|
key := b.Key(data)
|
|
vals[key] = append(vals[key], data)
|
|
lastAny = data
|
|
|
|
count++
|
|
if count >= b.config.size {
|
|
|
|
b.distributeMessage(vals, count, lastAny)
|
|
vals = make(map[string][]*T)
|
|
count = 0
|
|
}
|
|
|
|
case <-ticker.C:
|
|
if count > 0 {
|
|
|
|
b.distributeMessage(vals, count, lastAny)
|
|
vals = make(map[string][]*T)
|
|
count = 0
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
type Msg[T any] struct {
|
|
key string
|
|
triggerID string
|
|
val []*T
|
|
}
|
|
|
|
func (m Msg[T]) Key() string {
|
|
return m.key
|
|
}
|
|
|
|
func (m Msg[T]) TriggerID() string {
|
|
return m.triggerID
|
|
}
|
|
|
|
func (m Msg[T]) Val() []*T {
|
|
return m.val
|
|
}
|
|
|
|
func (m Msg[T]) String() string {
|
|
var sb strings.Builder
|
|
sb.WriteString("Key: ")
|
|
sb.WriteString(m.key)
|
|
sb.WriteString(", Values: [")
|
|
for i, v := range m.val {
|
|
if i > 0 {
|
|
sb.WriteString(", ")
|
|
}
|
|
sb.WriteString(fmt.Sprintf("%v", *v))
|
|
}
|
|
sb.WriteString("]")
|
|
return sb.String()
|
|
}
|
|
|
|
func (b *Batcher[T]) distributeMessage(messages map[string][]*T, totalCount int, lastMessage *T) {
|
|
triggerID := idutil.OperationIDGenerator()
|
|
b.HookFunc(triggerID, messages, totalCount, lastMessage)
|
|
for key, data := range messages {
|
|
if b.config.syncWait {
|
|
b.counter.Add(1)
|
|
}
|
|
channelID := b.Sharding(key)
|
|
b.chArrays[channelID] <- &Msg[T]{key: key, triggerID: triggerID, val: data}
|
|
}
|
|
if b.config.syncWait {
|
|
b.counter.Wait()
|
|
}
|
|
b.OnComplete(lastMessage, totalCount)
|
|
}
|
|
|
|
func (b *Batcher[T]) run(channelID int, ch <-chan *Msg[T]) {
|
|
defer b.wait.Done()
|
|
for {
|
|
select {
|
|
case messages, ok := <-ch:
|
|
if !ok {
|
|
return
|
|
}
|
|
b.Do(context.Background(), channelID, messages)
|
|
if b.config.syncWait {
|
|
b.counter.Done()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (b *Batcher[T]) Close() {
|
|
b.cancel() // Signal to stop put data
|
|
b.data <- nil
|
|
//wait all goroutines exit
|
|
b.wait.Wait()
|
|
}
|