diff --git a/pkg/tools/batcher/batcher.go b/pkg/tools/batcher/batcher.go index 93a31ed8f..bbaa2ca93 100644 --- a/pkg/tools/batcher/batcher.go +++ b/pkg/tools/batcher/batcher.go @@ -151,12 +151,21 @@ func (b *Batcher[T]) Put(ctx context.Context, data *T) error { func (b *Batcher[T]) scheduler() { ticker := time.NewTicker(b.config.interval) + // Track whether b.data was closed by an external caller so the + // cleanup below does not close it a second time. The only routes + // out of this function that leave b.data open are the nil-message + // and ticker paths; the ok == false branch means someone already + // closed the channel, and calling close(b.data) again there would + // panic with "close of closed channel" (#3653). + externallyClosed := false defer func() { ticker.Stop() for _, ch := range b.chArrays { close(ch) } - close(b.data) + if !externallyClosed { + close(b.data) + } b.wait.Done() }() @@ -169,6 +178,7 @@ func (b *Batcher[T]) scheduler() { case data, ok := <-b.data: if !ok { // If the data channel is closed unexpectedly + externallyClosed = true return } if data == nil {