Merge 3bcb1ce65d38f3424cc52479cedde37be95a3cf2 into fbca49d4319eee0d40395d087b792b0351c069de

This commit is contained in:
buvidk1234 2026-01-06 17:40:02 +08:00 committed by GitHub
commit b4835ae29a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 58 additions and 2 deletions

View File

@ -151,12 +151,15 @@ func (b *Batcher[T]) Put(ctx context.Context, data *T) error {
func (b *Batcher[T]) scheduler() {
ticker := time.NewTicker(b.config.interval)
alreadyClosed := false
defer func() {
ticker.Stop()
for _, ch := range b.chArrays {
close(ch)
}
close(b.data)
if !alreadyClosed {
close(b.data)
}
b.wait.Done()
}()
@ -169,6 +172,7 @@ func (b *Batcher[T]) scheduler() {
case data, ok := <-b.data:
if !ok {
// If the data channel is closed unexpectedly
alreadyClosed = true
return
}
if data == nil {

View File

@ -3,9 +3,10 @@ package batcher
import (
"context"
"fmt"
"github.com/openimsdk/tools/utils/stringutil"
"testing"
"time"
"github.com/openimsdk/tools/utils/stringutil"
)
func TestBatcher(t *testing.T) {
@ -64,3 +65,54 @@ func TestBatcher(t *testing.T) {
t.Error("Data channel should be empty after closing")
}
}
func TestBatcher_UnexpectedChannelClose(t *testing.T) {
config := Config{
size: 100,
buffer: 10,
worker: 2,
interval: 50 * time.Millisecond,
}
b := New[string](
WithSize(config.size),
WithBuffer(config.buffer),
WithWorker(config.worker),
WithInterval(config.interval),
)
processedCount := 0
b.Do = func(ctx context.Context, channelID int, vals *Msg[string]) {
processedCount += len(vals.Val())
}
b.Sharding = func(key string) int {
hashCode := stringutil.GetHashCode(key)
return int(hashCode) % config.worker
}
b.Key = func(data *string) string {
return *data
}
err := b.Start()
if err != nil {
t.Fatal(err)
}
// Put some data
for i := 0; i < 50; i++ {
data := fmt.Sprintf("data%d", i)
if err := b.Put(context.Background(), &data); err != nil {
t.Fatal(err)
}
}
// Simulate unexpected channel close by closing the data channel directly
// This tests the alreadyClosed flag logic to prevent double close panic
close(b.data)
// Wait for scheduler to detect the closed channel and exit gracefully
b.wait.Wait()
t.Logf("Processed %d items before unexpected close", processedCount)
t.Log("Batcher handled unexpected channel close gracefully without panic")
}