mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-11-04 11:22:10 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			255 lines
		
	
	
		
			6.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			255 lines
		
	
	
		
			6.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// Package grpcpool provides a pool of grpc clients
 | 
						|
package getcdv3
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"errors"
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"google.golang.org/grpc"
 | 
						|
)
 | 
						|
 | 
						|
var (
 | 
						|
	// ErrClosed is the error when the client pool is closed
 | 
						|
	ErrClosed = errors.New("grpc pool: client pool is closed")
 | 
						|
	// ErrTimeout is the error when the client pool timed out
 | 
						|
	ErrTimeout = errors.New("grpc pool: client pool timed out")
 | 
						|
	// ErrAlreadyClosed is the error when the client conn was already closed
 | 
						|
	ErrAlreadyClosed = errors.New("grpc pool: the connection was already closed")
 | 
						|
	// ErrFullPool is the error when the pool is already full
 | 
						|
	ErrFullPool = errors.New("grpc pool: closing a ClientConn into a full pool")
 | 
						|
)
 | 
						|
 | 
						|
// Factory is a function type creating a grpc client
 | 
						|
type Factory func(schema, etcdaddr, servicename string) (*grpc.ClientConn, error)
 | 
						|
 | 
						|
// FactoryWithContext is a function type creating a grpc client
 | 
						|
// that accepts the context parameter that could be passed from
 | 
						|
// Get or NewWithContext method.
 | 
						|
type FactoryWithContext func(context.Context) (*grpc.ClientConn, error)
 | 
						|
 | 
						|
// Pool is the grpc client pool
 | 
						|
type Pool struct {
 | 
						|
	clients         chan ClientConn
 | 
						|
	factory         FactoryWithContext
 | 
						|
	idleTimeout     time.Duration
 | 
						|
	maxLifeDuration time.Duration
 | 
						|
	mu              sync.RWMutex
 | 
						|
}
 | 
						|
 | 
						|
// ClientConn is the wrapper for a grpc client conn
 | 
						|
type ClientConn struct {
 | 
						|
	*grpc.ClientConn
 | 
						|
	pool          *Pool
 | 
						|
	timeUsed      time.Time
 | 
						|
	timeInitiated time.Time
 | 
						|
	unhealthy     bool
 | 
						|
}
 | 
						|
 | 
						|
// New creates a new clients pool with the given initial and maximum capacity,
 | 
						|
// and the timeout for the idle clients. Returns an error if the initial
 | 
						|
// clients could not be created
 | 
						|
func New(factory Factory, schema, etcdaddr, servicename string, init, capacity int, idleTimeout time.Duration,
 | 
						|
	maxLifeDuration ...time.Duration) (*Pool, error) {
 | 
						|
	return NewWithContext(context.Background(), func(ctx context.Context) (*grpc.ClientConn, error) { return factory(schema, etcdaddr, servicename) },
 | 
						|
		init, capacity, idleTimeout, maxLifeDuration...)
 | 
						|
}
 | 
						|
 | 
						|
// NewWithContext creates a new clients pool with the given initial and maximum
 | 
						|
// capacity, and the timeout for the idle clients. The context parameter would
 | 
						|
// be passed to the factory method during initialization. Returns an error if the
 | 
						|
// initial clients could not be created.
 | 
						|
func NewWithContext(ctx context.Context, factory FactoryWithContext, init, capacity int, idleTimeout time.Duration,
 | 
						|
	maxLifeDuration ...time.Duration) (*Pool, error) {
 | 
						|
 | 
						|
	if capacity <= 0 {
 | 
						|
		capacity = 1
 | 
						|
	}
 | 
						|
	if init < 0 {
 | 
						|
		init = 0
 | 
						|
	}
 | 
						|
	if init > capacity {
 | 
						|
		init = capacity
 | 
						|
	}
 | 
						|
	p := &Pool{
 | 
						|
		clients:     make(chan ClientConn, capacity),
 | 
						|
		factory:     factory,
 | 
						|
		idleTimeout: idleTimeout,
 | 
						|
	}
 | 
						|
	if len(maxLifeDuration) > 0 {
 | 
						|
		p.maxLifeDuration = maxLifeDuration[0]
 | 
						|
	}
 | 
						|
	for i := 0; i < init; i++ {
 | 
						|
		c, err := factory(ctx)
 | 
						|
		if err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
 | 
						|
		p.clients <- ClientConn{
 | 
						|
			ClientConn:    c,
 | 
						|
			pool:          p,
 | 
						|
			timeUsed:      time.Now(),
 | 
						|
			timeInitiated: time.Now(),
 | 
						|
		}
 | 
						|
	}
 | 
						|
	// Fill the rest of the pool with empty clients
 | 
						|
	for i := 0; i < capacity-init; i++ {
 | 
						|
		p.clients <- ClientConn{
 | 
						|
			pool: p,
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return p, nil
 | 
						|
}
 | 
						|
 | 
						|
func (p *Pool) getClients() chan ClientConn {
 | 
						|
	p.mu.RLock()
 | 
						|
	defer p.mu.RUnlock()
 | 
						|
 | 
						|
	return p.clients
 | 
						|
}
 | 
						|
 | 
						|
// Close empties the pool calling Close on all its clients.
 | 
						|
// You can call Close while there are outstanding clients.
 | 
						|
// The pool channel is then closed, and Get will not be allowed anymore
 | 
						|
func (p *Pool) Close() {
 | 
						|
	p.mu.Lock()
 | 
						|
	clients := p.clients
 | 
						|
	p.clients = nil
 | 
						|
	p.mu.Unlock()
 | 
						|
 | 
						|
	if clients == nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	close(clients)
 | 
						|
	for client := range clients {
 | 
						|
		if client.ClientConn == nil {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		client.ClientConn.Close()
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// IsClosed returns true if the client pool is closed.
 | 
						|
func (p *Pool) IsClosed() bool {
 | 
						|
	return p == nil || p.getClients() == nil
 | 
						|
}
 | 
						|
 | 
						|
// Get will return the next available client. If capacity
 | 
						|
// has not been reached, it will create a new one using the factory. Otherwise,
 | 
						|
// it will wait till the next client becomes available or a timeout.
 | 
						|
// A timeout of 0 is an indefinite wait
 | 
						|
func (p *Pool) Get(ctx context.Context) (*ClientConn, error) {
 | 
						|
	clients := p.getClients()
 | 
						|
	if clients == nil {
 | 
						|
		return nil, ErrClosed
 | 
						|
	}
 | 
						|
 | 
						|
	wrapper := ClientConn{
 | 
						|
		pool: p,
 | 
						|
	}
 | 
						|
	select {
 | 
						|
	case wrapper = <-clients:
 | 
						|
		// All good
 | 
						|
	case <-ctx.Done():
 | 
						|
		return nil, ErrTimeout // it would better returns ctx.Err()
 | 
						|
	}
 | 
						|
 | 
						|
	// If the wrapper was idle too long, close the connection and create a new
 | 
						|
	// one. It's safe to assume that there isn't any newer client as the client
 | 
						|
	// we fetched is the first in the channel
 | 
						|
	idleTimeout := p.idleTimeout
 | 
						|
	if wrapper.ClientConn != nil && idleTimeout > 0 &&
 | 
						|
		wrapper.timeUsed.Add(idleTimeout).Before(time.Now()) {
 | 
						|
 | 
						|
		wrapper.ClientConn.Close()
 | 
						|
		wrapper.ClientConn = nil
 | 
						|
	}
 | 
						|
 | 
						|
	var err error
 | 
						|
	if wrapper.ClientConn == nil {
 | 
						|
		wrapper.ClientConn, err = p.factory(ctx)
 | 
						|
		if err != nil {
 | 
						|
			// If there was an error, we want to put back a placeholder
 | 
						|
			// client in the channel
 | 
						|
			clients <- ClientConn{
 | 
						|
				pool: p,
 | 
						|
			}
 | 
						|
		}
 | 
						|
		// This is a new connection, reset its initiated time
 | 
						|
		wrapper.timeInitiated = time.Now()
 | 
						|
	}
 | 
						|
 | 
						|
	return &wrapper, err
 | 
						|
}
 | 
						|
 | 
						|
// Unhealthy marks the client conn as unhealthy, so that the connection
 | 
						|
// gets reset when closed
 | 
						|
func (c *ClientConn) Unhealthy() {
 | 
						|
	c.unhealthy = true
 | 
						|
}
 | 
						|
 | 
						|
// Close returns a ClientConn to the pool. It is safe to call multiple time,
 | 
						|
// but will return an error after first time
 | 
						|
func (c *ClientConn) Close() error {
 | 
						|
	if c == nil {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	if c.ClientConn == nil {
 | 
						|
		return ErrAlreadyClosed
 | 
						|
	}
 | 
						|
	if c.pool.IsClosed() {
 | 
						|
		return ErrClosed
 | 
						|
	}
 | 
						|
	// If the wrapper connection has become too old, we want to recycle it. To
 | 
						|
	// clarify the logic: if the sum of the initialization time and the max
 | 
						|
	// duration is before Now(), it means the initialization is so old adding
 | 
						|
	// the maximum duration couldn't put in the future. This sum therefore
 | 
						|
	// corresponds to the cut-off point: if it's in the future we still have
 | 
						|
	// time, if it's in the past it's too old
 | 
						|
	maxDuration := c.pool.maxLifeDuration
 | 
						|
	if maxDuration > 0 && c.timeInitiated.Add(maxDuration).Before(time.Now()) {
 | 
						|
		c.Unhealthy()
 | 
						|
	}
 | 
						|
 | 
						|
	// We're cloning the wrapper so we can set ClientConn to nil in the one
 | 
						|
	// used by the user
 | 
						|
	wrapper := ClientConn{
 | 
						|
		pool:       c.pool,
 | 
						|
		ClientConn: c.ClientConn,
 | 
						|
		timeUsed:   time.Now(),
 | 
						|
	}
 | 
						|
	if c.unhealthy {
 | 
						|
		wrapper.ClientConn.Close()
 | 
						|
		wrapper.ClientConn = nil
 | 
						|
	} else {
 | 
						|
		wrapper.timeInitiated = c.timeInitiated
 | 
						|
	}
 | 
						|
	select {
 | 
						|
	case c.pool.clients <- wrapper:
 | 
						|
		// All good
 | 
						|
	default:
 | 
						|
		return ErrFullPool
 | 
						|
	}
 | 
						|
 | 
						|
	c.ClientConn = nil // Mark as closed
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// Capacity returns the capacity
 | 
						|
func (p *Pool) Capacity() int {
 | 
						|
	if p.IsClosed() {
 | 
						|
		return 0
 | 
						|
	}
 | 
						|
	return cap(p.clients)
 | 
						|
}
 | 
						|
 | 
						|
// Available returns the number of currently unused clients
 | 
						|
func (p *Pool) Available() int {
 | 
						|
	if p.IsClosed() {
 | 
						|
		return 0
 | 
						|
	}
 | 
						|
	return len(p.clients)
 | 
						|
}
 |