diff --git a/config/openim-msggateway.yml b/config/openim-msggateway.yml index 7068441c1..22033e49e 100644 --- a/config/openim-msggateway.yml +++ b/config/openim-msggateway.yml @@ -42,4 +42,4 @@ circuitBreaker: window: 3s # Time window size (seconds) bucket: 10 # Number of buckets success: 0.6 # Success rate threshold (0.6 means 60%) - requestThreshold: 100 # Request threshold; circuit breaker evaluation occurs when reached \ No newline at end of file + request: 100 # Request threshold; circuit breaker evaluation occurs when reached \ No newline at end of file diff --git a/config/openim-msgtransfer.yml b/config/openim-msgtransfer.yml index eaa711925..817a118e0 100644 --- a/config/openim-msgtransfer.yml +++ b/config/openim-msgtransfer.yml @@ -22,4 +22,4 @@ circuitBreaker: window: 3s # Time window size (seconds) bucket: 10 # Number of buckets success: 0.6 # Success rate threshold (0.6 means 60%) - requestThreshold: 100 # Request threshold; circuit breaker evaluation occurs when reached \ No newline at end of file + request: 100 # Request threshold; circuit breaker evaluation occurs when reached \ No newline at end of file diff --git a/config/openim-push.yml b/config/openim-push.yml index 71167a3b8..e522a6003 100644 --- a/config/openim-push.yml +++ b/config/openim-push.yml @@ -25,7 +25,7 @@ circuitBreaker: window: 3s # Time window size (seconds) bucket: 10 # Number of buckets success: 0.6 # Success rate threshold (0.6 means 60%) - requestThreshold: 100 # Request threshold; circuit breaker evaluation occurs when reached + request: 100 # Request threshold; circuit breaker evaluation occurs when reached prometheus: # Enable or disable Prometheus monitoring diff --git a/config/openim-rpc-auth.yml b/config/openim-rpc-auth.yml index 5a5a2b6e9..1941f4155 100644 --- a/config/openim-rpc-auth.yml +++ b/config/openim-rpc-auth.yml @@ -36,4 +36,4 @@ circuitBreaker: window: 3s # Time window size (seconds) bucket: 10 # Number of buckets success: 0.6 # Success rate threshold (0.6 means 60%) - requestThreshold: 100 # Request threshold; circuit breaker evaluation occurs when reached \ No newline at end of file + request: 100 # Request threshold; circuit breaker evaluation occurs when reached \ No newline at end of file diff --git a/config/openim-rpc-conversation.yml b/config/openim-rpc-conversation.yml index 8bd59edad..35e25fbf7 100644 --- a/config/openim-rpc-conversation.yml +++ b/config/openim-rpc-conversation.yml @@ -32,4 +32,4 @@ circuitBreaker: window: 3s # Time window size (seconds) bucket: 10 # Number of buckets success: 0.6 # Success rate threshold (0.6 means 60%) - requestThreshold: 100 # Request threshold; circuit breaker evaluation occurs when reached \ No newline at end of file + request: 100 # Request threshold; circuit breaker evaluation occurs when reached \ No newline at end of file diff --git a/config/openim-rpc-friend.yml b/config/openim-rpc-friend.yml index 8bd59edad..35e25fbf7 100644 --- a/config/openim-rpc-friend.yml +++ b/config/openim-rpc-friend.yml @@ -32,4 +32,4 @@ circuitBreaker: window: 3s # Time window size (seconds) bucket: 10 # Number of buckets success: 0.6 # Success rate threshold (0.6 means 60%) - requestThreshold: 100 # Request threshold; circuit breaker evaluation occurs when reached \ No newline at end of file + request: 100 # Request threshold; circuit breaker evaluation occurs when reached \ No newline at end of file diff --git a/config/openim-rpc-group.yml b/config/openim-rpc-group.yml index b0d805232..179a3ad69 100644 --- a/config/openim-rpc-group.yml +++ b/config/openim-rpc-group.yml @@ -35,4 +35,4 @@ circuitBreaker: window: 3s # Time window size (seconds) bucket: 10 # Number of buckets success: 0.6 # Success rate threshold (0.6 means 60%) - requestThreshold: 100 # Request threshold; circuit breaker evaluation occurs when reached \ No newline at end of file + request: 100 # Request threshold; circuit breaker evaluation occurs when reached \ No newline at end of file diff --git a/config/openim-rpc-third.yml b/config/openim-rpc-third.yml index a6341ce12..09b8aad5b 100644 --- a/config/openim-rpc-third.yml +++ b/config/openim-rpc-third.yml @@ -32,7 +32,7 @@ circuitBreaker: window: 3s # Time window size (seconds) bucket: 10 # Number of buckets success: 0.6 # Success rate threshold (0.6 means 60%) - requestThreshold: 100 # Request threshold; circuit breaker evaluation occurs when reached + request: 100 # Request threshold; circuit breaker evaluation occurs when reached object: # Use MinIO as object storage, or set to "cos", "oss", "kodo", "aws", while also configuring the corresponding settings diff --git a/config/openim-rpc-user.yml b/config/openim-rpc-user.yml index 5e36735a1..19df87935 100644 --- a/config/openim-rpc-user.yml +++ b/config/openim-rpc-user.yml @@ -32,4 +32,4 @@ circuitBreaker: window: 3s # Time window size (seconds) bucket: 10 # Number of buckets success: 0.6 # Success rate threshold (0.6 means 60%) - requestThreshold: 100 # Request threshold; circuit breaker evaluation occurs when reached \ No newline at end of file + request: 100 # Request threshold; circuit breaker evaluation occurs when reached \ No newline at end of file diff --git a/internal/api/ratelimit.go b/internal/api/ratelimit.go index 77171e31b..b34be21c9 100644 --- a/internal/api/ratelimit.go +++ b/internal/api/ratelimit.go @@ -1,7 +1,10 @@ package api import ( + "fmt" + "math" "net/http" + "strconv" "time" "github.com/gin-gonic/gin" @@ -12,7 +15,6 @@ import ( "github.com/openimsdk/tools/log" ) - type RateLimiter struct { Enable bool `yaml:"enable"` Window time.Duration `yaml:"window"` // time duration per window @@ -34,8 +36,23 @@ func RateLimitMiddleware(config *RateLimiter) gin.HandlerFunc { ) return func(c *gin.Context) { + status := limiter.Stat() + + c.Header("X-BBR-CPU", strconv.FormatInt(status.CPU, 10)) + c.Header("X-BBR-MinRT", strconv.FormatInt(status.MinRt, 10)) + c.Header("X-BBR-MaxPass", strconv.FormatInt(status.MaxPass, 10)) + c.Header("X-BBR-MaxInFlight", strconv.FormatInt(status.MaxInFlight, 10)) + c.Header("X-BBR-InFlight", strconv.FormatInt(status.InFlight, 10)) + done, err := limiter.Allow() if err != nil { + + c.Header("X-RateLimit-Policy", "BBR") + c.Header("Retry-After", calculateBBRRetryAfter(status)) + c.Header("X-RateLimit-Limit", strconv.FormatInt(status.MaxInFlight, 10)) + c.Header("X-RateLimit-Remaining", "0") // There is no concept of remaining quota in BBR. + + fmt.Println("rate limited:", err, "path:", c.Request.URL.Path) log.ZWarn(c, "rate limited", err, "path", c.Request.URL.Path) c.AbortWithStatus(http.StatusTooManyRequests) apiresp.GinError(c, errs.NewCodeError(http.StatusTooManyRequests, "too many requests, please try again later")) @@ -47,3 +64,20 @@ func RateLimitMiddleware(config *RateLimiter) gin.HandlerFunc { done(ratelimit.DoneInfo{}) } } + +func calculateBBRRetryAfter(status bbr.Stat) string { + loadRatio := float64(status.CPU) / float64(status.CPU) + + if loadRatio < 0.8 { + return "1" + } + if loadRatio < 0.95 { + return "2" + } + + backoff := 1 + int64(math.Pow(loadRatio-0.95, 2)*50) + if backoff > 5 { + backoff = 5 + } + return strconv.FormatInt(backoff, 10) +} diff --git a/pkg/common/startrpc/circuitbreaker.go b/pkg/common/startrpc/circuitbreaker.go index 0a4903b99..0f261a683 100644 --- a/pkg/common/startrpc/circuitbreaker.go +++ b/pkg/common/startrpc/circuitbreaker.go @@ -49,10 +49,17 @@ func UnaryCircuitBreakerInterceptor(breaker circuitbreaker.CircuitBreaker) grpc. resp, err = handler(ctx, req) if err != nil { - if st, ok := status.FromError(err); ok && st.Code() == codes.Internal { - breaker.MarkFailed() + if st, ok := status.FromError(err); ok { + switch st.Code() { + case codes.OK: + breaker.MarkSuccess() + case codes.InvalidArgument, codes.NotFound, codes.AlreadyExists, codes.PermissionDenied: + breaker.MarkSuccess() + default: + breaker.MarkFailed() + } } else { - breaker.MarkSuccess() + breaker.MarkFailed() } } else { breaker.MarkSuccess() @@ -79,10 +86,17 @@ func StreamCircuitBreakerInterceptor(breaker circuitbreaker.CircuitBreaker) grpc err := handler(srv, ss) if err != nil { - if st, ok := status.FromError(err); ok && st.Code() == codes.Internal { - breaker.MarkFailed() + if st, ok := status.FromError(err); ok { + switch st.Code() { + case codes.OK: + breaker.MarkSuccess() + case codes.InvalidArgument, codes.NotFound, codes.AlreadyExists, codes.PermissionDenied: + breaker.MarkSuccess() + default: + breaker.MarkFailed() + } } else { - breaker.MarkSuccess() + breaker.MarkFailed() } } else { breaker.MarkSuccess()