fix: prevent panic on double close of data channel in batcher

- Add alreadyClosed flag to track if data channel was closed unexpectedly
- Skip closing data channel in defer if already closed
- Add unit test TestBatcher_UnexpectedChannelClose to verify the fix
This commit is contained in:
buvidk 2026-01-06 17:20:18 +08:00
parent fbca49d431
commit 3bcb1ce65d
2 changed files with 58 additions and 2 deletions

View File

@ -151,12 +151,15 @@ func (b *Batcher[T]) Put(ctx context.Context, data *T) error {
func (b *Batcher[T]) scheduler() {
ticker := time.NewTicker(b.config.interval)
alreadyClosed := false
defer func() {
ticker.Stop()
for _, ch := range b.chArrays {
close(ch)
}
close(b.data)
if !alreadyClosed {
close(b.data)
}
b.wait.Done()
}()
@ -169,6 +172,7 @@ func (b *Batcher[T]) scheduler() {
case data, ok := <-b.data:
if !ok {
// If the data channel is closed unexpectedly
alreadyClosed = true
return
}
if data == nil {

View File

@ -3,9 +3,10 @@ package batcher
import (
"context"
"fmt"
"github.com/openimsdk/tools/utils/stringutil"
"testing"
"time"
"github.com/openimsdk/tools/utils/stringutil"
)
func TestBatcher(t *testing.T) {
@ -64,3 +65,54 @@ func TestBatcher(t *testing.T) {
t.Error("Data channel should be empty after closing")
}
}
func TestBatcher_UnexpectedChannelClose(t *testing.T) {
config := Config{
size: 100,
buffer: 10,
worker: 2,
interval: 50 * time.Millisecond,
}
b := New[string](
WithSize(config.size),
WithBuffer(config.buffer),
WithWorker(config.worker),
WithInterval(config.interval),
)
processedCount := 0
b.Do = func(ctx context.Context, channelID int, vals *Msg[string]) {
processedCount += len(vals.Val())
}
b.Sharding = func(key string) int {
hashCode := stringutil.GetHashCode(key)
return int(hashCode) % config.worker
}
b.Key = func(data *string) string {
return *data
}
err := b.Start()
if err != nil {
t.Fatal(err)
}
// Put some data
for i := 0; i < 50; i++ {
data := fmt.Sprintf("data%d", i)
if err := b.Put(context.Background(), &data); err != nil {
t.Fatal(err)
}
}
// Simulate unexpected channel close by closing the data channel directly
// This tests the alreadyClosed flag logic to prevent double close panic
close(b.data)
// Wait for scheduler to detect the closed channel and exit gracefully
b.wait.Wait()
t.Logf("Processed %d items before unexpected close", processedCount)
t.Log("Batcher handled unexpected channel close gracefully without panic")
}