mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-11-04 03:13:15 +08:00 
			
		
		
		
	* refactor: cmd update. * refactor: msg transfer refactor. * refactor: msg transfer refactor. * refactor: msg transfer refactor. * fix: read prometheus port when flag set to enable and prevent failure during startup. * fix: notification has counted unread counts bug fix. * fix: merge opensource code into local. * refactor: delete message and message batch use lua. * refactor: delete message and message batch use lua. * refactor: delete message and message batch use lua. * refactor: delete message and message batch use lua. * refactor: delete message and message batch use lua. * refactor: delete message and message batch use lua. * refactor: delete message and message batch use lua. * refactor: delete message and message batch use lua. * refactor: delete message and message batch use lua. * refactor: delete message and message batch use lua. * refactor: delete message and message batch use lua. * refactor: delete message and message batch use lua. * refactor: delete message and message batch use lua. * refactor: delete message and message batch use lua. * refactor: delete message and message batch use lua. * refactor: delete message and message batch use lua. * refactor: delete message and message batch use lua. * refactor: delete message and message batch use lua. * refactor: delete message and message batch use lua. * refactor: delete message and message batch use lua. * refactor: delete message and message batch use lua. * refactor: delete message and message batch use lua. * refactor: delete message and message batch use lua. * refactor: delete message and message batch use lua. * refactor: delete message and message batch use lua. * refactor: delete message and message batch use lua. * refactor: delete message and message batch use lua. * refactor: delete message and message batch use lua. * refactor: delete message and message batch use lua. * refactor: delete message and message batch use lua. * fix: add protective measures against memory overflow.
		
			
				
	
	
		
			67 lines
		
	
	
		
			1.4 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			67 lines
		
	
	
		
			1.4 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package batcher
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"fmt"
 | 
						|
	"github.com/openimsdk/tools/utils/stringutil"
 | 
						|
	"testing"
 | 
						|
	"time"
 | 
						|
)
 | 
						|
 | 
						|
func TestBatcher(t *testing.T) {
 | 
						|
	config := Config{
 | 
						|
		size:     1000,
 | 
						|
		buffer:   10,
 | 
						|
		worker:   10,
 | 
						|
		interval: 5 * time.Millisecond,
 | 
						|
	}
 | 
						|
 | 
						|
	b := New[string](
 | 
						|
		WithSize(config.size),
 | 
						|
		WithBuffer(config.buffer),
 | 
						|
		WithWorker(config.worker),
 | 
						|
		WithInterval(config.interval),
 | 
						|
		WithSyncWait(true),
 | 
						|
	)
 | 
						|
 | 
						|
	// Mock Do function to simply print values for demonstration
 | 
						|
	b.Do = func(ctx context.Context, channelID int, vals *Msg[string]) {
 | 
						|
		t.Logf("Channel %d Processed batch: %v", channelID, vals)
 | 
						|
	}
 | 
						|
	b.OnComplete = func(lastMessage *string, totalCount int) {
 | 
						|
		t.Logf("Completed processing with last message: %v, total count: %d", *lastMessage, totalCount)
 | 
						|
	}
 | 
						|
	b.Sharding = func(key string) int {
 | 
						|
		hashCode := stringutil.GetHashCode(key)
 | 
						|
		return int(hashCode) % config.worker
 | 
						|
	}
 | 
						|
	b.Key = func(data *string) string {
 | 
						|
		return *data
 | 
						|
	}
 | 
						|
 | 
						|
	err := b.Start()
 | 
						|
	if err != nil {
 | 
						|
		t.Fatal(err)
 | 
						|
	}
 | 
						|
 | 
						|
	// Test normal data processing
 | 
						|
	for i := 0; i < 10000; i++ {
 | 
						|
		data := "data" + fmt.Sprintf("%d", i)
 | 
						|
		if err := b.Put(context.Background(), &data); err != nil {
 | 
						|
			t.Fatal(err)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	time.Sleep(time.Duration(1) * time.Second)
 | 
						|
	start := time.Now()
 | 
						|
	// Wait for all processing to finish
 | 
						|
	b.Close()
 | 
						|
 | 
						|
	elapsed := time.Since(start)
 | 
						|
	t.Logf("Close took %s", elapsed)
 | 
						|
 | 
						|
	if len(b.data) != 0 {
 | 
						|
		t.Error("Data channel should be empty after closing")
 | 
						|
	}
 | 
						|
}
 |