diff --git a/pkg/tools/batcher/batcher.go b/pkg/tools/batcher/batcher.go index 93a31ed8f..25ddf4e88 100644 --- a/pkg/tools/batcher/batcher.go +++ b/pkg/tools/batcher/batcher.go @@ -151,12 +151,15 @@ func (b *Batcher[T]) Put(ctx context.Context, data *T) error { func (b *Batcher[T]) scheduler() { ticker := time.NewTicker(b.config.interval) + alreadyClosed := false defer func() { ticker.Stop() for _, ch := range b.chArrays { close(ch) } - close(b.data) + if !alreadyClosed { + close(b.data) + } b.wait.Done() }() @@ -169,6 +172,7 @@ func (b *Batcher[T]) scheduler() { case data, ok := <-b.data: if !ok { // If the data channel is closed unexpectedly + alreadyClosed = true return } if data == nil { diff --git a/pkg/tools/batcher/batcher_test.go b/pkg/tools/batcher/batcher_test.go index 90e028449..1535f8de2 100644 --- a/pkg/tools/batcher/batcher_test.go +++ b/pkg/tools/batcher/batcher_test.go @@ -3,9 +3,10 @@ package batcher import ( "context" "fmt" - "github.com/openimsdk/tools/utils/stringutil" "testing" "time" + + "github.com/openimsdk/tools/utils/stringutil" ) func TestBatcher(t *testing.T) { @@ -64,3 +65,54 @@ func TestBatcher(t *testing.T) { t.Error("Data channel should be empty after closing") } } + +func TestBatcher_UnexpectedChannelClose(t *testing.T) { + config := Config{ + size: 100, + buffer: 10, + worker: 2, + interval: 50 * time.Millisecond, + } + + b := New[string]( + WithSize(config.size), + WithBuffer(config.buffer), + WithWorker(config.worker), + WithInterval(config.interval), + ) + + processedCount := 0 + b.Do = func(ctx context.Context, channelID int, vals *Msg[string]) { + processedCount += len(vals.Val()) + } + b.Sharding = func(key string) int { + hashCode := stringutil.GetHashCode(key) + return int(hashCode) % config.worker + } + b.Key = func(data *string) string { + return *data + } + + err := b.Start() + if err != nil { + t.Fatal(err) + } + + // Put some data + for i := 0; i < 50; i++ { + data := fmt.Sprintf("data%d", i) + if err := b.Put(context.Background(), &data); err != nil { + t.Fatal(err) + } + } + + // Simulate unexpected channel close by closing the data channel directly + // This tests the alreadyClosed flag logic to prevent double close panic + close(b.data) + + // Wait for scheduler to detect the closed channel and exit gracefully + b.wait.Wait() + + t.Logf("Processed %d items before unexpected close", processedCount) + t.Log("Batcher handled unexpected channel close gracefully without panic") +}