​Build: Implement rate limiting and circuit breaker for API and RPC services.​​

This commit is contained in:
mo3et 2025-09-22 10:28:35 +08:00
parent 9bd6f3d356
commit 749ec0b445
11 changed files with 64 additions and 16 deletions

View File

@ -42,4 +42,4 @@ circuitBreaker:
window: 3s # Time window size (seconds) window: 3s # Time window size (seconds)
bucket: 10 # Number of buckets bucket: 10 # Number of buckets
success: 0.6 # Success rate threshold (0.6 means 60%) success: 0.6 # Success rate threshold (0.6 means 60%)
requestThreshold: 100 # Request threshold; circuit breaker evaluation occurs when reached request: 100 # Request threshold; circuit breaker evaluation occurs when reached

View File

@ -22,4 +22,4 @@ circuitBreaker:
window: 3s # Time window size (seconds) window: 3s # Time window size (seconds)
bucket: 10 # Number of buckets bucket: 10 # Number of buckets
success: 0.6 # Success rate threshold (0.6 means 60%) success: 0.6 # Success rate threshold (0.6 means 60%)
requestThreshold: 100 # Request threshold; circuit breaker evaluation occurs when reached request: 100 # Request threshold; circuit breaker evaluation occurs when reached

View File

@ -25,7 +25,7 @@ circuitBreaker:
window: 3s # Time window size (seconds) window: 3s # Time window size (seconds)
bucket: 10 # Number of buckets bucket: 10 # Number of buckets
success: 0.6 # Success rate threshold (0.6 means 60%) success: 0.6 # Success rate threshold (0.6 means 60%)
requestThreshold: 100 # Request threshold; circuit breaker evaluation occurs when reached request: 100 # Request threshold; circuit breaker evaluation occurs when reached
prometheus: prometheus:
# Enable or disable Prometheus monitoring # Enable or disable Prometheus monitoring

View File

@ -36,4 +36,4 @@ circuitBreaker:
window: 3s # Time window size (seconds) window: 3s # Time window size (seconds)
bucket: 10 # Number of buckets bucket: 10 # Number of buckets
success: 0.6 # Success rate threshold (0.6 means 60%) success: 0.6 # Success rate threshold (0.6 means 60%)
requestThreshold: 100 # Request threshold; circuit breaker evaluation occurs when reached request: 100 # Request threshold; circuit breaker evaluation occurs when reached

View File

@ -32,4 +32,4 @@ circuitBreaker:
window: 3s # Time window size (seconds) window: 3s # Time window size (seconds)
bucket: 10 # Number of buckets bucket: 10 # Number of buckets
success: 0.6 # Success rate threshold (0.6 means 60%) success: 0.6 # Success rate threshold (0.6 means 60%)
requestThreshold: 100 # Request threshold; circuit breaker evaluation occurs when reached request: 100 # Request threshold; circuit breaker evaluation occurs when reached

View File

@ -32,4 +32,4 @@ circuitBreaker:
window: 3s # Time window size (seconds) window: 3s # Time window size (seconds)
bucket: 10 # Number of buckets bucket: 10 # Number of buckets
success: 0.6 # Success rate threshold (0.6 means 60%) success: 0.6 # Success rate threshold (0.6 means 60%)
requestThreshold: 100 # Request threshold; circuit breaker evaluation occurs when reached request: 100 # Request threshold; circuit breaker evaluation occurs when reached

View File

@ -35,4 +35,4 @@ circuitBreaker:
window: 3s # Time window size (seconds) window: 3s # Time window size (seconds)
bucket: 10 # Number of buckets bucket: 10 # Number of buckets
success: 0.6 # Success rate threshold (0.6 means 60%) success: 0.6 # Success rate threshold (0.6 means 60%)
requestThreshold: 100 # Request threshold; circuit breaker evaluation occurs when reached request: 100 # Request threshold; circuit breaker evaluation occurs when reached

View File

@ -32,7 +32,7 @@ circuitBreaker:
window: 3s # Time window size (seconds) window: 3s # Time window size (seconds)
bucket: 10 # Number of buckets bucket: 10 # Number of buckets
success: 0.6 # Success rate threshold (0.6 means 60%) success: 0.6 # Success rate threshold (0.6 means 60%)
requestThreshold: 100 # Request threshold; circuit breaker evaluation occurs when reached request: 100 # Request threshold; circuit breaker evaluation occurs when reached
object: object:
# Use MinIO as object storage, or set to "cos", "oss", "kodo", "aws", while also configuring the corresponding settings # Use MinIO as object storage, or set to "cos", "oss", "kodo", "aws", while also configuring the corresponding settings

View File

@ -32,4 +32,4 @@ circuitBreaker:
window: 3s # Time window size (seconds) window: 3s # Time window size (seconds)
bucket: 10 # Number of buckets bucket: 10 # Number of buckets
success: 0.6 # Success rate threshold (0.6 means 60%) success: 0.6 # Success rate threshold (0.6 means 60%)
requestThreshold: 100 # Request threshold; circuit breaker evaluation occurs when reached request: 100 # Request threshold; circuit breaker evaluation occurs when reached

View File

@ -1,7 +1,10 @@
package api package api
import ( import (
"fmt"
"math"
"net/http" "net/http"
"strconv"
"time" "time"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
@ -12,7 +15,6 @@ import (
"github.com/openimsdk/tools/log" "github.com/openimsdk/tools/log"
) )
type RateLimiter struct { type RateLimiter struct {
Enable bool `yaml:"enable"` Enable bool `yaml:"enable"`
Window time.Duration `yaml:"window"` // time duration per window Window time.Duration `yaml:"window"` // time duration per window
@ -34,8 +36,23 @@ func RateLimitMiddleware(config *RateLimiter) gin.HandlerFunc {
) )
return func(c *gin.Context) { return func(c *gin.Context) {
status := limiter.Stat()
c.Header("X-BBR-CPU", strconv.FormatInt(status.CPU, 10))
c.Header("X-BBR-MinRT", strconv.FormatInt(status.MinRt, 10))
c.Header("X-BBR-MaxPass", strconv.FormatInt(status.MaxPass, 10))
c.Header("X-BBR-MaxInFlight", strconv.FormatInt(status.MaxInFlight, 10))
c.Header("X-BBR-InFlight", strconv.FormatInt(status.InFlight, 10))
done, err := limiter.Allow() done, err := limiter.Allow()
if err != nil { if err != nil {
c.Header("X-RateLimit-Policy", "BBR")
c.Header("Retry-After", calculateBBRRetryAfter(status))
c.Header("X-RateLimit-Limit", strconv.FormatInt(status.MaxInFlight, 10))
c.Header("X-RateLimit-Remaining", "0") // There is no concept of remaining quota in BBR.
fmt.Println("rate limited:", err, "path:", c.Request.URL.Path)
log.ZWarn(c, "rate limited", err, "path", c.Request.URL.Path) log.ZWarn(c, "rate limited", err, "path", c.Request.URL.Path)
c.AbortWithStatus(http.StatusTooManyRequests) c.AbortWithStatus(http.StatusTooManyRequests)
apiresp.GinError(c, errs.NewCodeError(http.StatusTooManyRequests, "too many requests, please try again later")) apiresp.GinError(c, errs.NewCodeError(http.StatusTooManyRequests, "too many requests, please try again later"))
@ -47,3 +64,20 @@ func RateLimitMiddleware(config *RateLimiter) gin.HandlerFunc {
done(ratelimit.DoneInfo{}) done(ratelimit.DoneInfo{})
} }
} }
func calculateBBRRetryAfter(status bbr.Stat) string {
loadRatio := float64(status.CPU) / float64(status.CPU)
if loadRatio < 0.8 {
return "1"
}
if loadRatio < 0.95 {
return "2"
}
backoff := 1 + int64(math.Pow(loadRatio-0.95, 2)*50)
if backoff > 5 {
backoff = 5
}
return strconv.FormatInt(backoff, 10)
}

View File

@ -49,10 +49,17 @@ func UnaryCircuitBreakerInterceptor(breaker circuitbreaker.CircuitBreaker) grpc.
resp, err = handler(ctx, req) resp, err = handler(ctx, req)
if err != nil { if err != nil {
if st, ok := status.FromError(err); ok && st.Code() == codes.Internal { if st, ok := status.FromError(err); ok {
breaker.MarkFailed() switch st.Code() {
} else { case codes.OK:
breaker.MarkSuccess() breaker.MarkSuccess()
case codes.InvalidArgument, codes.NotFound, codes.AlreadyExists, codes.PermissionDenied:
breaker.MarkSuccess()
default:
breaker.MarkFailed()
}
} else {
breaker.MarkFailed()
} }
} else { } else {
breaker.MarkSuccess() breaker.MarkSuccess()
@ -79,10 +86,17 @@ func StreamCircuitBreakerInterceptor(breaker circuitbreaker.CircuitBreaker) grpc
err := handler(srv, ss) err := handler(srv, ss)
if err != nil { if err != nil {
if st, ok := status.FromError(err); ok && st.Code() == codes.Internal { if st, ok := status.FromError(err); ok {
breaker.MarkFailed() switch st.Code() {
} else { case codes.OK:
breaker.MarkSuccess() breaker.MarkSuccess()
case codes.InvalidArgument, codes.NotFound, codes.AlreadyExists, codes.PermissionDenied:
breaker.MarkSuccess()
default:
breaker.MarkFailed()
}
} else {
breaker.MarkFailed()
} }
} else { } else {
breaker.MarkSuccess() breaker.MarkSuccess()