mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-10-25 20:52:11 +08:00 
			
		
		
		
	* pb * fix: Modifying other fields while setting IsPrivateChat does not take effect * fix: quote message error revoke * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * upgrading pkg tools * fix * fix * optimize log output * feat: support GetLastMessage * feat: support GetLastMessage * feat: s3 switch * feat: s3 switch * fix: GetUsersOnline * feat: SendBusinessNotification supported configuration parameters * feat: SendBusinessNotification supported configuration parameters * feat: SendBusinessNotification supported configuration parameters * feat: seq conversion failed without exiting * monolithic * fix: DeleteDoc crash * fix: DeleteDoc crash * fix: monolithic * fix: monolithic * fix: fill send time * fix: fill send time * fix: crash caused by withdrawing messages from users who have left the group * fix: mq * fix: mq * fix: user msg timestamp * fix: mq * 1 * 1 * 1 * 1 * 1 * 1 * 1 * seq read config * seq read config * 1 * 1 * fix: the source message of the reference is withdrawn, and the referenced message is deleted * 1 * 1 * 1 * 1 * 1 * 1 * 1 * 1 * 1 * 1 * 1 * 1 * 1 * 1
		
			
				
	
	
		
			212 lines
		
	
	
		
			5.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			212 lines
		
	
	
		
			5.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package redis
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 
 | |
| 	"github.com/openimsdk/tools/errs"
 | |
| 	"github.com/openimsdk/tools/log"
 | |
| 	"github.com/redis/go-redis/v9"
 | |
| 	"golang.org/x/sync/errgroup"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	defaultBatchSize       = 50
 | |
| 	defaultConcurrentLimit = 3
 | |
| )
 | |
| 
 | |
| // RedisShardManager is a class for sharding and processing keys
 | |
| type RedisShardManager struct {
 | |
| 	redisClient redis.UniversalClient
 | |
| 	config      *Config
 | |
| }
 | |
| type Config struct {
 | |
| 	batchSize       int
 | |
| 	continueOnError bool
 | |
| 	concurrentLimit int
 | |
| }
 | |
| 
 | |
| // Option is a function type for configuring Config
 | |
| type Option func(c *Config)
 | |
| 
 | |
| //// NewRedisShardManager creates a new RedisShardManager instance
 | |
| //func NewRedisShardManager(redisClient redis.UniversalClient, opts ...Option) *RedisShardManager {
 | |
| //	config := &Config{
 | |
| //		batchSize:       defaultBatchSize, // Default batch size is 50 keys
 | |
| //		continueOnError: false,
 | |
| //		concurrentLimit: defaultConcurrentLimit, // Default concurrent limit is 3
 | |
| //	}
 | |
| //	for _, opt := range opts {
 | |
| //		opt(config)
 | |
| //	}
 | |
| //	rsm := &RedisShardManager{
 | |
| //		redisClient: redisClient,
 | |
| //		config:      config,
 | |
| //	}
 | |
| //	return rsm
 | |
| //}
 | |
| //
 | |
| //// WithBatchSize sets the number of keys to process per batch
 | |
| //func WithBatchSize(size int) Option {
 | |
| //	return func(c *Config) {
 | |
| //		c.batchSize = size
 | |
| //	}
 | |
| //}
 | |
| //
 | |
| //// WithContinueOnError sets whether to continue processing on error
 | |
| //func WithContinueOnError(continueOnError bool) Option {
 | |
| //	return func(c *Config) {
 | |
| //		c.continueOnError = continueOnError
 | |
| //	}
 | |
| //}
 | |
| //
 | |
| //// WithConcurrentLimit sets the concurrency limit
 | |
| //func WithConcurrentLimit(limit int) Option {
 | |
| //	return func(c *Config) {
 | |
| //		c.concurrentLimit = limit
 | |
| //	}
 | |
| //}
 | |
| //
 | |
| //// ProcessKeysBySlot groups keys by their Redis cluster hash slots and processes them using the provided function.
 | |
| //func (rsm *RedisShardManager) ProcessKeysBySlot(
 | |
| //	ctx context.Context,
 | |
| //	keys []string,
 | |
| //	processFunc func(ctx context.Context, slot int64, keys []string) error,
 | |
| //) error {
 | |
| //
 | |
| //	// Group keys by slot
 | |
| //	slots, err := groupKeysBySlot(ctx, rsm.redisClient, keys)
 | |
| //	if err != nil {
 | |
| //		return err
 | |
| //	}
 | |
| //
 | |
| //	g, ctx := errgroup.WithContext(ctx)
 | |
| //	g.SetLimit(rsm.config.concurrentLimit)
 | |
| //
 | |
| //	// Process keys in each slot using the provided function
 | |
| //	for slot, singleSlotKeys := range slots {
 | |
| //		batches := splitIntoBatches(singleSlotKeys, rsm.config.batchSize)
 | |
| //		for _, batch := range batches {
 | |
| //			slot, batch := slot, batch // Avoid closure capture issue
 | |
| //			g.Go(func() error {
 | |
| //				err := processFunc(ctx, slot, batch)
 | |
| //				if err != nil {
 | |
| //					log.ZWarn(ctx, "Batch processFunc failed", err, "slot", slot, "keys", batch)
 | |
| //					if !rsm.config.continueOnError {
 | |
| //						return err
 | |
| //					}
 | |
| //				}
 | |
| //				return nil
 | |
| //			})
 | |
| //		}
 | |
| //	}
 | |
| //
 | |
| //	if err := g.Wait(); err != nil {
 | |
| //		return err
 | |
| //	}
 | |
| //	return nil
 | |
| //}
 | |
| 
 | |
| // groupKeysBySlot groups keys by their Redis cluster hash slots.
 | |
| func groupKeysBySlot(ctx context.Context, redisClient redis.UniversalClient, keys []string) (map[int64][]string, error) {
 | |
| 	slots := make(map[int64][]string)
 | |
| 	clusterClient, isCluster := redisClient.(*redis.ClusterClient)
 | |
| 	if isCluster && len(keys) > 1 {
 | |
| 		pipe := clusterClient.Pipeline()
 | |
| 		cmds := make([]*redis.IntCmd, len(keys))
 | |
| 		for i, key := range keys {
 | |
| 			cmds[i] = pipe.ClusterKeySlot(ctx, key)
 | |
| 		}
 | |
| 		_, err := pipe.Exec(ctx)
 | |
| 		if err != nil {
 | |
| 			return nil, errs.WrapMsg(err, "get slot err")
 | |
| 		}
 | |
| 
 | |
| 		for i, cmd := range cmds {
 | |
| 			slot, err := cmd.Result()
 | |
| 			if err != nil {
 | |
| 				log.ZWarn(ctx, "some key get slot err", err, "key", keys[i])
 | |
| 				return nil, errs.WrapMsg(err, "get slot err", "key", keys[i])
 | |
| 			}
 | |
| 			slots[slot] = append(slots[slot], keys[i])
 | |
| 		}
 | |
| 	} else {
 | |
| 		// If not a cluster client, put all keys in the same slot (0)
 | |
| 		slots[0] = keys
 | |
| 	}
 | |
| 
 | |
| 	return slots, nil
 | |
| }
 | |
| 
 | |
| // splitIntoBatches splits keys into batches of the specified size
 | |
| func splitIntoBatches(keys []string, batchSize int) [][]string {
 | |
| 	var batches [][]string
 | |
| 	for batchSize < len(keys) {
 | |
| 		keys, batches = keys[batchSize:], append(batches, keys[0:batchSize:batchSize])
 | |
| 	}
 | |
| 	return append(batches, keys)
 | |
| }
 | |
| 
 | |
| // ProcessKeysBySlot groups keys by their Redis cluster hash slots and processes them using the provided function.
 | |
| func ProcessKeysBySlot(
 | |
| 	ctx context.Context,
 | |
| 	redisClient redis.UniversalClient,
 | |
| 	keys []string,
 | |
| 	processFunc func(ctx context.Context, slot int64, keys []string) error,
 | |
| 	opts ...Option,
 | |
| ) error {
 | |
| 
 | |
| 	config := &Config{
 | |
| 		batchSize:       defaultBatchSize,
 | |
| 		continueOnError: false,
 | |
| 		concurrentLimit: defaultConcurrentLimit,
 | |
| 	}
 | |
| 	for _, opt := range opts {
 | |
| 		opt(config)
 | |
| 	}
 | |
| 
 | |
| 	// Group keys by slot
 | |
| 	slots, err := groupKeysBySlot(ctx, redisClient, keys)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	g, ctx := errgroup.WithContext(ctx)
 | |
| 	g.SetLimit(config.concurrentLimit)
 | |
| 
 | |
| 	// Process keys in each slot using the provided function
 | |
| 	for slot, singleSlotKeys := range slots {
 | |
| 		batches := splitIntoBatches(singleSlotKeys, config.batchSize)
 | |
| 		for _, batch := range batches {
 | |
| 			slot, batch := slot, batch // Avoid closure capture issue
 | |
| 			g.Go(func() error {
 | |
| 				err := processFunc(ctx, slot, batch)
 | |
| 				if err != nil {
 | |
| 					log.ZWarn(ctx, "Batch processFunc failed", err, "slot", slot, "keys", batch)
 | |
| 					if !config.continueOnError {
 | |
| 						return err
 | |
| 					}
 | |
| 				}
 | |
| 				return nil
 | |
| 			})
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if err := g.Wait(); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func DeleteCacheBySlot(ctx context.Context, rcClient *rocksCacheClient, keys []string) error {
 | |
| 	switch len(keys) {
 | |
| 	case 0:
 | |
| 		return nil
 | |
| 	case 1:
 | |
| 		return rcClient.GetClient().TagAsDeletedBatch2(ctx, keys)
 | |
| 	default:
 | |
| 		return ProcessKeysBySlot(ctx, rcClient.GetRedis(), keys, func(ctx context.Context, slot int64, keys []string) error {
 | |
| 			return rcClient.GetClient().TagAsDeletedBatch2(ctx, keys)
 | |
| 		})
 | |
| 	}
 | |
| }
 |