diff --git a/config/openim-api.yml b/config/openim-api.yml index 103c36f95..df0177d24 100644 --- a/config/openim-api.yml +++ b/config/openim-api.yml @@ -17,3 +17,13 @@ prometheus: ports: # This address can be accessed via a browser grafanaURL: + +ratelimiter: + # Whether to enable rate limiting + enable: false + # WindowSize defines time duration per window + window: 20s + # BucketNum defines bucket number for each window + bucket: 500 + # CPU threshold; valid range 0–1000 (1000 = 100%) + cpuThreshold: 850 diff --git a/config/openim-msggateway.yml b/config/openim-msggateway.yml index 812df90f7..8ac07faae 100644 --- a/config/openim-msggateway.yml +++ b/config/openim-msggateway.yml @@ -26,3 +26,20 @@ longConnSvr: websocketMaxMsgLen: 4096 # WebSocket connection handshake timeout in seconds websocketTimeout: 10 + +ratelimiter: + # Whether to enable rate limiting + enable: false + # WindowSize defines time duration per window + window: 20s + # BucketNum defines bucket number for each window + bucket: 500 + # CPU threshold; valid range 0–1000 (1000 = 100%) + cpuThreshold: 850 + +circuitBreaker: + enable: false + window: 5s # Time window size (seconds) + bucket: 100 # Number of buckets + success: 0.6 # Success rate threshold (0.6 means 60%) + request: 500 # Request threshold; circuit breaker evaluation occurs when reached \ No newline at end of file diff --git a/config/openim-msgtransfer.yml b/config/openim-msgtransfer.yml index 52d6a805e..c6ea06803 100644 --- a/config/openim-msgtransfer.yml +++ b/config/openim-msgtransfer.yml @@ -6,3 +6,20 @@ prometheus: # List of ports that Prometheus listens on; each port corresponds to an instance of monitoring. Ensure these are managed accordingly # It will only take effect when autoSetPorts is set to false. ports: + +ratelimiter: + # Whether to enable rate limiting + enable: false + # WindowSize defines time duration per window + window: 20s + # BucketNum defines bucket number for each window + bucket: 500 + # CPU threshold; valid range 0–1000 (1000 = 100%) + cpuThreshold: 850 + +circuitBreaker: + enable: false + window: 5s # Time window size (seconds) + bucket: 100 # Number of buckets + success: 0.6 # Success rate threshold (0.6 means 60%) + request: 500 # Request threshold; circuit breaker evaluation occurs when reached \ No newline at end of file diff --git a/config/openim-push.yml b/config/openim-push.yml index 5db5b541a..58784f13d 100644 --- a/config/openim-push.yml +++ b/config/openim-push.yml @@ -10,10 +10,26 @@ rpc: # It will only take effect when autoSetPorts is set to false. ports: +ratelimiter: + # Whether to enable rate limiting + enable: false + # WindowSize defines time duration per window + window: 20s + # BucketNum defines bucket number for each window + bucket: 500 + # CPU threshold; valid range 0–1000 (1000 = 100%) + cpuThreshold: 850 + +circuitBreaker: + enable: false + window: 5s # Time window size (seconds) + bucket: 100 # Number of buckets + success: 0.6 # Success rate threshold (0.6 means 60%) + request: 500 # Request threshold; circuit breaker evaluation occurs when reached prometheus: # Enable or disable Prometheus monitoring - enable: true + enable: false # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup # It will only take effect when autoSetPorts is set to false. ports: diff --git a/config/openim-rpc-auth.yml b/config/openim-rpc-auth.yml index c42e556c4..09ceef0f4 100644 --- a/config/openim-rpc-auth.yml +++ b/config/openim-rpc-auth.yml @@ -20,3 +20,20 @@ prometheus: tokenPolicy: # Token validity period, in days expire: 90 + +ratelimiter: + # Whether to enable rate limiting + enable: false + # WindowSize defines time duration per window + window: 20s + # BucketNum defines bucket number for each window + bucket: 500 + # CPU threshold; valid range 0–1000 (1000 = 100%) + cpuThreshold: 850 + +circuitBreaker: + enable: false + window: 5s # Time window size (seconds) + bucket: 100 # Number of buckets + success: 0.6 # Success rate threshold (0.6 means 60%) + request: 500 # Request threshold; circuit breaker evaluation occurs when reached \ No newline at end of file diff --git a/config/openim-rpc-conversation.yml b/config/openim-rpc-conversation.yml index e722ac2b0..1135ffe6e 100644 --- a/config/openim-rpc-conversation.yml +++ b/config/openim-rpc-conversation.yml @@ -16,3 +16,20 @@ prometheus: # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup # It will only take effect when autoSetPorts is set to false. ports: + +ratelimiter: + # Whether to enable rate limiting + enable: false + # WindowSize defines time duration per window + window: 20s + # BucketNum defines bucket number for each window + bucket: 500 + # CPU threshold; valid range 0–1000 (1000 = 100%) + cpuThreshold: 850 + +circuitBreaker: + enable: false + window: 5s # Time window size (seconds) + bucket: 100 # Number of buckets + success: 0.6 # Success rate threshold (0.6 means 60%) + request: 500 # Request threshold; circuit breaker evaluation occurs when reached \ No newline at end of file diff --git a/config/openim-rpc-friend.yml b/config/openim-rpc-friend.yml index e722ac2b0..1135ffe6e 100644 --- a/config/openim-rpc-friend.yml +++ b/config/openim-rpc-friend.yml @@ -16,3 +16,20 @@ prometheus: # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup # It will only take effect when autoSetPorts is set to false. ports: + +ratelimiter: + # Whether to enable rate limiting + enable: false + # WindowSize defines time duration per window + window: 20s + # BucketNum defines bucket number for each window + bucket: 500 + # CPU threshold; valid range 0–1000 (1000 = 100%) + cpuThreshold: 850 + +circuitBreaker: + enable: false + window: 5s # Time window size (seconds) + bucket: 100 # Number of buckets + success: 0.6 # Success rate threshold (0.6 means 60%) + request: 500 # Request threshold; circuit breaker evaluation occurs when reached \ No newline at end of file diff --git a/config/openim-rpc-group.yml b/config/openim-rpc-group.yml index 252f64c28..154b149e1 100644 --- a/config/openim-rpc-group.yml +++ b/config/openim-rpc-group.yml @@ -19,3 +19,20 @@ prometheus: enableHistoryForNewMembers: true + +ratelimiter: + # Whether to enable rate limiting + enable: false + # WindowSize defines time duration per window + window: 20s + # BucketNum defines bucket number for each window + bucket: 500 + # CPU threshold; valid range 0–1000 (1000 = 100%) + cpuThreshold: 850 + +circuitBreaker: + enable: false + window: 5s # Time window size (seconds) + bucket: 100 # Number of buckets + success: 0.6 # Success rate threshold (0.6 means 60%) + request: 500 # Request threshold; circuit breaker evaluation occurs when reached \ No newline at end of file diff --git a/config/openim-rpc-msg.yml b/config/openim-rpc-msg.yml index 17fd3b8f4..ffe8943df 100644 --- a/config/openim-rpc-msg.yml +++ b/config/openim-rpc-msg.yml @@ -20,3 +20,20 @@ prometheus: # Does sending messages require friend verification friendVerify: false + +ratelimiter: + # Whether to enable rate limiting + enable: false + # WindowSize defines time duration per window + window: 20s + # BucketNum defines bucket number for each window + bucket: 500 + # CPU threshold; valid range 0–1000 (1000 = 100%) + cpuThreshold: 850 + +circuitBreaker: + enable: false + window: 5s # Time window size (seconds) + bucket: 100 # Number of buckets + success: 0.6 # Success rate threshold (0.6 means 60%) + request: 500 # Request threshold; circuit breaker evaluation occurs when reached \ No newline at end of file diff --git a/config/openim-rpc-third.yml b/config/openim-rpc-third.yml index 7169e6c61..f6fbee695 100644 --- a/config/openim-rpc-third.yml +++ b/config/openim-rpc-third.yml @@ -17,6 +17,22 @@ prometheus: # It will only take effect when autoSetPorts is set to false. ports: +ratelimiter: + # Whether to enable rate limiting + enable: false + # WindowSize defines time duration per window + window: 20s + # BucketNum defines bucket number for each window + bucket: 500 + # CPU threshold; valid range 0–1000 (1000 = 100%) + cpuThreshold: 850 + +circuitBreaker: + enable: false + window: 5s # Time window size (seconds) + bucket: 100 # Number of buckets + success: 0.6 # Success rate threshold (0.6 means 60%) + request: 500 # Request threshold; circuit breaker evaluation occurs when reached object: # Use MinIO as object storage, or set to "cos", "oss", "kodo", "aws", while also configuring the corresponding settings diff --git a/config/openim-rpc-user.yml b/config/openim-rpc-user.yml index 337cacd35..c46db7f37 100644 --- a/config/openim-rpc-user.yml +++ b/config/openim-rpc-user.yml @@ -16,3 +16,20 @@ prometheus: # Prometheus listening ports, must be consistent with the number of rpc.ports # It will only take effect when autoSetPorts is set to false. ports: + +ratelimiter: + # Whether to enable rate limiting + enable: false + # WindowSize defines time duration per window + window: 20s + # BucketNum defines bucket number for each window + bucket: 500 + # CPU threshold; valid range 0–1000 (1000 = 100%) + cpuThreshold: 850 + +circuitBreaker: + enable: false + window: 5s # Time window size (seconds) + bucket: 100 # Number of buckets + success: 0.6 # Success rate threshold (0.6 means 60%) + request: 500 # Request threshold; circuit breaker evaluation occurs when reached \ No newline at end of file diff --git a/go.mod b/go.mod index c0c5e36a8..134b0c9fc 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 github.com/openimsdk/protocol v0.0.73-alpha.14 - github.com/openimsdk/tools v0.0.50-alpha.103 + github.com/openimsdk/tools v0.0.50-alpha.104 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.10.0 @@ -135,6 +135,7 @@ require ( github.com/leodido/go-urn v1.4.0 // indirect github.com/lestrrat-go/strftime v1.0.6 // indirect github.com/lithammer/shortuuid v3.0.0+incompatible // indirect + github.com/lufia/plan9stats v0.0.0-20230110061619-bbe2e5e100de // indirect github.com/magefile/mage v1.15.0 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/mailru/easyjson v0.7.7 // indirect @@ -151,6 +152,7 @@ require ( github.com/pelletier/go-toml/v2 v2.2.2 // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b // indirect github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/common v0.45.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect @@ -160,6 +162,8 @@ require ( github.com/sagikazarmark/locafero v0.4.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect github.com/sercand/kuberesolver/v6 v6.0.1 // indirect + github.com/shirou/gopsutil/v3 v3.24.5 // indirect + github.com/shoenig/go-m1cpu v0.1.6 // indirect github.com/sourcegraph/conc v0.3.0 // indirect github.com/spf13/afero v1.11.0 // indirect github.com/spf13/cast v1.6.0 // indirect diff --git a/go.sum b/go.sum index c6de2aa38..24f913f26 100644 --- a/go.sum +++ b/go.sum @@ -303,6 +303,8 @@ github.com/likexian/gokit v0.25.13 h1:p2Uw3+6fGG53CwdU2Dz0T6bOycdb2+bAFAa3ymwWVk github.com/likexian/gokit v0.25.13/go.mod h1:qQhEWFBEfqLCO3/vOEo2EDKd+EycekVtUK4tex+l2H4= github.com/lithammer/shortuuid v3.0.0+incompatible h1:NcD0xWW/MZYXEHa6ITy6kaXN5nwm/V115vj2YXfhS0w= github.com/lithammer/shortuuid v3.0.0+incompatible/go.mod h1:FR74pbAuElzOUuenUHTK2Tciko1/vKuIKS9dSkDrA4w= +github.com/lufia/plan9stats v0.0.0-20230110061619-bbe2e5e100de h1:V53FWzU6KAZVi1tPp5UIsMoUWJ2/PNwYIDXnu7QuBCE= +github.com/lufia/plan9stats v0.0.0-20230110061619-bbe2e5e100de/go.mod h1:JKx41uQRwqlTZabZc+kILPrO/3jlKnQ2Z8b7YiVw5cE= github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg= github.com/magefile/mage v1.15.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= @@ -349,8 +351,8 @@ github.com/openimsdk/gomake v0.0.15-alpha.11 h1:PQudYDRESYeYlUYrrLLJhYIlUPO5x7FA github.com/openimsdk/gomake v0.0.15-alpha.11/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/protocol v0.0.73-alpha.14 h1:lv9wNiPRm6G7q74TfpMobKrSfeTaBlZ+Ps3O6UFPmaE= github.com/openimsdk/protocol v0.0.73-alpha.14/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= -github.com/openimsdk/tools v0.0.50-alpha.103 h1:jYvI86cWiVu8a8iw1panw+pwIiStuUHF76h3fxA6ESI= -github.com/openimsdk/tools v0.0.50-alpha.103/go.mod h1:qCExFBqXpQBMzZck3XGIFwivBayAn2KNqB3WAd++IJw= +github.com/openimsdk/tools v0.0.50-alpha.104 h1:fmDMqMC+SasN8noGKCT++0BQDqEF2O4ek9bLkiLMeHw= +github.com/openimsdk/tools v0.0.50-alpha.104/go.mod h1:x9i/e+WJFW4tocy6RNJQ9NofQiP3KJ1Y576/06TqOG4= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= @@ -361,6 +363,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b h1:0LFwY6Q3gMACTjAbMZBjXAqTOzOwFaj2Ld6cjeQ7Rig= +github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= github.com/prometheus/client_golang v1.18.0 h1:HzFfmkOzH5Q8L8G+kSJKUx5dtG87sewO+FoDDqP5Tbk= @@ -397,6 +401,12 @@ github.com/sercand/kuberesolver/v6 v6.0.1 h1:XZUTA0gy/lgDYp/UhEwv7Js24F1j8NJ833Q github.com/sercand/kuberesolver/v6 v6.0.1/go.mod h1:C0tsTuRMONSY+Xf7pv7RMW1/JlewY1+wS8SZE+1lf1s= github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI= github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= +github.com/shirou/gopsutil/v3 v3.24.5 h1:i0t8kL+kQTvpAYToeuiVk3TgDeKOFioZO3Ztz/iZ9pI= +github.com/shirou/gopsutil/v3 v3.24.5/go.mod h1:bsoOS1aStSs9ErQ1WWfxllSeS1K5D+U30r2NfcubMVk= +github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM= +github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ= +github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU= +github.com/shoenig/test v0.6.4/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k= github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0= github.com/spf13/afero v1.11.0 h1:WJQKhtpdm3v2IzqG8VMqrr6Rf3UYpEF239Jy9wNepM8= @@ -548,6 +558,7 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/internal/api/ratelimit.go b/internal/api/ratelimit.go new file mode 100644 index 000000000..0bbf973de --- /dev/null +++ b/internal/api/ratelimit.go @@ -0,0 +1,83 @@ +package api + +import ( + "fmt" + "math" + "net/http" + "strconv" + "time" + + "github.com/gin-gonic/gin" + + "github.com/openimsdk/tools/apiresp" + "github.com/openimsdk/tools/errs" + "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/stability/ratelimit" + "github.com/openimsdk/tools/stability/ratelimit/bbr" +) + +type RateLimiter struct { + Enable bool `yaml:"enable"` + Window time.Duration `yaml:"window"` // time duration per window + Bucket int `yaml:"bucket"` // bucket number for each window + CPUThreshold int64 `yaml:"cpuThreshold"` // CPU threshold; valid range 0–1000 (1000 = 100%) +} + +func RateLimitMiddleware(config *RateLimiter) gin.HandlerFunc { + if !config.Enable { + return func(c *gin.Context) { + c.Next() + } + } + + limiter := bbr.NewBBRLimiter( + bbr.WithWindow(config.Window), + bbr.WithBucket(config.Bucket), + bbr.WithCPUThreshold(config.CPUThreshold), + ) + + return func(c *gin.Context) { + status := limiter.Stat() + + c.Header("X-BBR-CPU", strconv.FormatInt(status.CPU, 10)) + c.Header("X-BBR-MinRT", strconv.FormatInt(status.MinRt, 10)) + c.Header("X-BBR-MaxPass", strconv.FormatInt(status.MaxPass, 10)) + c.Header("X-BBR-MaxInFlight", strconv.FormatInt(status.MaxInFlight, 10)) + c.Header("X-BBR-InFlight", strconv.FormatInt(status.InFlight, 10)) + + done, err := limiter.Allow() + if err != nil { + + c.Header("X-RateLimit-Policy", "BBR") + c.Header("Retry-After", calculateBBRRetryAfter(status)) + c.Header("X-RateLimit-Limit", strconv.FormatInt(status.MaxInFlight, 10)) + c.Header("X-RateLimit-Remaining", "0") // There is no concept of remaining quota in BBR. + + fmt.Println("rate limited:", err, "path:", c.Request.URL.Path) + log.ZWarn(c, "rate limited", err, "path", c.Request.URL.Path) + c.AbortWithStatus(http.StatusTooManyRequests) + apiresp.GinError(c, errs.NewCodeError(http.StatusTooManyRequests, "too many requests, please try again later")) + return + } + + c.Next() + done(ratelimit.DoneInfo{}) + } +} + +func calculateBBRRetryAfter(status bbr.Stat) string { + loadRatio := float64(status.CPU) / float64(status.CPU) + + if loadRatio < 0.8 { + return "1" + } + if loadRatio < 0.95 { + return "2" + } + + backoff := 1 + int64(math.Pow(loadRatio-0.95, 2)*50) + if backoff > 5 { + backoff = 5 + } + return strconv.FormatInt(backoff, 10) +} diff --git a/internal/api/router.go b/internal/api/router.go index 1d3a92dd7..7e1192f21 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -97,6 +97,18 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, cf case BestSpeed: r.Use(gzip.Gzip(gzip.BestSpeed)) } + + // Use rate limiter middleware + if cfg.API.RateLimiter.Enable { + rl := &RateLimiter{ + Enable: cfg.API.RateLimiter.Enable, + Window: cfg.API.RateLimiter.Window, + Bucket: cfg.API.RateLimiter.Bucket, + CPUThreshold: cfg.API.RateLimiter.CPUThreshold, + } + r.Use(RateLimitMiddleware(rl)) + } + if config.Standalone() { r.Use(func(c *gin.Context) { c.Set(authverify.CtxAdminUserIDsKey, cfg.Share.IMAdminUser.UserIDs) diff --git a/pkg/common/cmd/api.go b/pkg/common/cmd/api.go index 7b7dbc89b..1356f90d6 100644 --- a/pkg/common/cmd/api.go +++ b/pkg/common/cmd/api.go @@ -81,6 +81,9 @@ func (a *ApiCmd) runE() error { } return startrpc.Start( a.ctx, &a.apiConfig.Discovery, + nil, + nil, + // &a.apiConfig.API.RateLimiter, &prometheus, a.apiConfig.API.Api.ListenIP, "", a.apiConfig.API.Prometheus.AutoSetPorts, diff --git a/pkg/common/cmd/auth.go b/pkg/common/cmd/auth.go index d624e9dae..a6a92ef14 100644 --- a/pkg/common/cmd/auth.go +++ b/pkg/common/cmd/auth.go @@ -57,7 +57,7 @@ func (a *AuthRpcCmd) Exec() error { } func (a *AuthRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.authConfig.Discovery, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP, + return startrpc.Start(a.ctx, &a.authConfig.Discovery, &a.authConfig.RpcConfig.CircuitBreaker, &a.authConfig.RpcConfig.RateLimiter, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP, a.authConfig.RpcConfig.RPC.RegisterIP, a.authConfig.RpcConfig.RPC.AutoSetPorts, a.authConfig.RpcConfig.RPC.Ports, a.Index(), a.authConfig.Discovery.RpcService.Auth, nil, a.authConfig, []string{ diff --git a/pkg/common/cmd/conversation.go b/pkg/common/cmd/conversation.go index 428c442da..f1ae0c59c 100644 --- a/pkg/common/cmd/conversation.go +++ b/pkg/common/cmd/conversation.go @@ -58,7 +58,7 @@ func (a *ConversationRpcCmd) Exec() error { } func (a *ConversationRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.conversationConfig.Discovery, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP, + return startrpc.Start(a.ctx, &a.conversationConfig.Discovery, &a.conversationConfig.RpcConfig.CircuitBreaker, &a.conversationConfig.RpcConfig.RateLimiter, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP, a.conversationConfig.RpcConfig.RPC.RegisterIP, a.conversationConfig.RpcConfig.RPC.AutoSetPorts, a.conversationConfig.RpcConfig.RPC.Ports, a.Index(), a.conversationConfig.Discovery.RpcService.Conversation, &a.conversationConfig.NotificationConfig, a.conversationConfig, []string{ diff --git a/pkg/common/cmd/cron_task.go b/pkg/common/cmd/cron_task.go index af2cbbee9..c666bd021 100644 --- a/pkg/common/cmd/cron_task.go +++ b/pkg/common/cmd/cron_task.go @@ -56,6 +56,8 @@ func (a *CronTaskCmd) runE() error { var prometheus config.Prometheus return startrpc.Start( a.ctx, &a.cronTaskConfig.Discovery, + nil, + nil, &prometheus, "", "", true, diff --git a/pkg/common/cmd/friend.go b/pkg/common/cmd/friend.go index dd850cf17..661da86c7 100644 --- a/pkg/common/cmd/friend.go +++ b/pkg/common/cmd/friend.go @@ -58,7 +58,7 @@ func (a *FriendRpcCmd) Exec() error { } func (a *FriendRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.relationConfig.Discovery, &a.relationConfig.RpcConfig.Prometheus, a.relationConfig.RpcConfig.RPC.ListenIP, + return startrpc.Start(a.ctx, &a.relationConfig.Discovery, &a.relationConfig.RpcConfig.CircuitBreaker, &a.relationConfig.RpcConfig.RateLimiter, &a.relationConfig.RpcConfig.Prometheus, a.relationConfig.RpcConfig.RPC.ListenIP, a.relationConfig.RpcConfig.RPC.RegisterIP, a.relationConfig.RpcConfig.RPC.AutoSetPorts, a.relationConfig.RpcConfig.RPC.Ports, a.Index(), a.relationConfig.Discovery.RpcService.Friend, &a.relationConfig.NotificationConfig, a.relationConfig, []string{ diff --git a/pkg/common/cmd/group.go b/pkg/common/cmd/group.go index 7a599077f..daff7b499 100644 --- a/pkg/common/cmd/group.go +++ b/pkg/common/cmd/group.go @@ -59,7 +59,7 @@ func (a *GroupRpcCmd) Exec() error { } func (a *GroupRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.groupConfig.Discovery, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP, + return startrpc.Start(a.ctx, &a.groupConfig.Discovery, &a.groupConfig.RpcConfig.CircuitBreaker, &a.groupConfig.RpcConfig.RateLimiter, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP, a.groupConfig.RpcConfig.RPC.RegisterIP, a.groupConfig.RpcConfig.RPC.AutoSetPorts, a.groupConfig.RpcConfig.RPC.Ports, a.Index(), a.groupConfig.Discovery.RpcService.Group, &a.groupConfig.NotificationConfig, a.groupConfig, []string{ diff --git a/pkg/common/cmd/msg.go b/pkg/common/cmd/msg.go index c4049be05..845175cd9 100644 --- a/pkg/common/cmd/msg.go +++ b/pkg/common/cmd/msg.go @@ -59,7 +59,7 @@ func (a *MsgRpcCmd) Exec() error { } func (a *MsgRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.msgConfig.Discovery, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP, + return startrpc.Start(a.ctx, &a.msgConfig.Discovery, &a.msgConfig.RpcConfig.CircuitBreaker, &a.msgConfig.RpcConfig.RateLimiter, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP, a.msgConfig.RpcConfig.RPC.RegisterIP, a.msgConfig.RpcConfig.RPC.AutoSetPorts, a.msgConfig.RpcConfig.RPC.Ports, a.Index(), a.msgConfig.Discovery.RpcService.Msg, &a.msgConfig.NotificationConfig, a.msgConfig, []string{ diff --git a/pkg/common/cmd/msg_gateway.go b/pkg/common/cmd/msg_gateway.go index 83cfb6272..4ca899a5e 100644 --- a/pkg/common/cmd/msg_gateway.go +++ b/pkg/common/cmd/msg_gateway.go @@ -61,6 +61,8 @@ func (m *MsgGatewayCmd) runE() error { var prometheus config.Prometheus return startrpc.Start( m.ctx, &m.msgGatewayConfig.Discovery, + &m.msgGatewayConfig.MsgGateway.CircuitBreaker, + &m.msgGatewayConfig.MsgGateway.RateLimiter, &prometheus, rpc.ListenIP, rpc.RegisterIP, rpc.AutoSetPorts, diff --git a/pkg/common/cmd/msg_transfer.go b/pkg/common/cmd/msg_transfer.go index fe6c27e54..81f40ac9d 100644 --- a/pkg/common/cmd/msg_transfer.go +++ b/pkg/common/cmd/msg_transfer.go @@ -62,6 +62,8 @@ func (m *MsgTransferCmd) runE() error { var prometheus config.Prometheus return startrpc.Start( m.ctx, &m.msgTransferConfig.Discovery, + &m.msgTransferConfig.MsgTransfer.CircuitBreaker, + &m.msgTransferConfig.MsgTransfer.RateLimiter, &prometheus, "", "", true, diff --git a/pkg/common/cmd/push.go b/pkg/common/cmd/push.go index 7cd3c481e..bcb34ed2c 100644 --- a/pkg/common/cmd/push.go +++ b/pkg/common/cmd/push.go @@ -60,7 +60,7 @@ func (a *PushRpcCmd) Exec() error { } func (a *PushRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.pushConfig.Discovery, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP, + return startrpc.Start(a.ctx, &a.pushConfig.Discovery, &a.pushConfig.RpcConfig.CircuitBreaker, &a.pushConfig.RpcConfig.RateLimiter, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP, a.pushConfig.RpcConfig.RPC.RegisterIP, a.pushConfig.RpcConfig.RPC.AutoSetPorts, a.pushConfig.RpcConfig.RPC.Ports, a.Index(), a.pushConfig.Discovery.RpcService.Push, &a.pushConfig.NotificationConfig, a.pushConfig, []string{ diff --git a/pkg/common/cmd/third.go b/pkg/common/cmd/third.go index e567234e4..39d731e0d 100644 --- a/pkg/common/cmd/third.go +++ b/pkg/common/cmd/third.go @@ -58,7 +58,7 @@ func (a *ThirdRpcCmd) Exec() error { } func (a *ThirdRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.thirdConfig.Discovery, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP, + return startrpc.Start(a.ctx, &a.thirdConfig.Discovery, &a.thirdConfig.RpcConfig.CircuitBreaker, &a.thirdConfig.RpcConfig.RateLimiter, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP, a.thirdConfig.RpcConfig.RPC.RegisterIP, a.thirdConfig.RpcConfig.RPC.AutoSetPorts, a.thirdConfig.RpcConfig.RPC.Ports, a.Index(), a.thirdConfig.Discovery.RpcService.Third, &a.thirdConfig.NotificationConfig, a.thirdConfig, []string{ diff --git a/pkg/common/cmd/user.go b/pkg/common/cmd/user.go index 190f6f892..12abe1bde 100644 --- a/pkg/common/cmd/user.go +++ b/pkg/common/cmd/user.go @@ -59,7 +59,7 @@ func (a *UserRpcCmd) Exec() error { } func (a *UserRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.userConfig.Discovery, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP, + return startrpc.Start(a.ctx, &a.userConfig.Discovery, &a.userConfig.RpcConfig.CircuitBreaker, &a.userConfig.RpcConfig.RateLimiter, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP, a.userConfig.RpcConfig.RPC.RegisterIP, a.userConfig.RpcConfig.RPC.AutoSetPorts, a.userConfig.RpcConfig.RPC.Ports, a.Index(), a.userConfig.Discovery.RpcService.User, &a.userConfig.NotificationConfig, a.userConfig, []string{ diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index c5000a6e5..e645c9356 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -143,6 +143,23 @@ type API struct { Ports []int `yaml:"ports"` GrafanaURL string `yaml:"grafanaURL"` } `yaml:"prometheus"` + + RateLimiter RateLimiter `yaml:"rateLimiter"` +} + +type RateLimiter struct { + Enable bool `yaml:"enable"` + Window time.Duration `yaml:"window"` + Bucket int `yaml:"bucket"` + CPUThreshold int64 `yaml:"cpuThreshold"` +} + +type CircuitBreaker struct { + Enable bool `yaml:"enable"` + Window time.Duration `yaml:"window"` + Bucket int `yaml:"bucket"` + Success float64 `yaml:"success"` + Request int64 `yaml:"request"` } type CronTask struct { @@ -217,6 +234,8 @@ type MsgGateway struct { WebsocketMaxMsgLen int `yaml:"websocketMaxMsgLen"` WebsocketTimeout int `yaml:"websocketTimeout"` } `yaml:"longConnSvr"` + RateLimiter RateLimiter `yaml:"rateLimiter"` + CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"` } type MsgTransfer struct { @@ -225,6 +244,8 @@ type MsgTransfer struct { AutoSetPorts bool `yaml:"autoSetPorts"` Ports []int `yaml:"ports"` } `yaml:"prometheus"` + RateLimiter RateLimiter `yaml:"rateLimiter"` + CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"` } type Push struct { @@ -255,7 +276,9 @@ type Push struct { BadgeCount bool `yaml:"badgeCount"` Production bool `yaml:"production"` } `yaml:"iosPush"` - FullUserCache bool `yaml:"fullUserCache"` + FullUserCache bool `yaml:"fullUserCache"` + RateLimiter RateLimiter `yaml:"rateLimiter"` + CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"` } type Auth struct { @@ -264,28 +287,38 @@ type Auth struct { TokenPolicy struct { Expire int64 `yaml:"expire"` } `yaml:"tokenPolicy"` + RateLimiter RateLimiter `yaml:"rateLimiter"` + CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"` } type Conversation struct { - RPC RPC `yaml:"rpc"` - Prometheus Prometheus `yaml:"prometheus"` + RPC RPC `yaml:"rpc"` + Prometheus Prometheus `yaml:"prometheus"` + RateLimiter RateLimiter `yaml:"rateLimiter"` + CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"` } type Friend struct { - RPC RPC `yaml:"rpc"` - Prometheus Prometheus `yaml:"prometheus"` + RPC RPC `yaml:"rpc"` + Prometheus Prometheus `yaml:"prometheus"` + RateLimiter RateLimiter `yaml:"rateLimiter"` + CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"` } type Group struct { - RPC RPC `yaml:"rpc"` - Prometheus Prometheus `yaml:"prometheus"` - EnableHistoryForNewMembers bool `yaml:"enableHistoryForNewMembers"` + RPC RPC `yaml:"rpc"` + Prometheus Prometheus `yaml:"prometheus"` + EnableHistoryForNewMembers bool `yaml:"enableHistoryForNewMembers"` + RateLimiter RateLimiter `yaml:"rateLimiter"` + CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"` } type Msg struct { - RPC RPC `yaml:"rpc"` - Prometheus Prometheus `yaml:"prometheus"` - FriendVerify bool `yaml:"friendVerify"` + RPC RPC `yaml:"rpc"` + Prometheus Prometheus `yaml:"prometheus"` + FriendVerify bool `yaml:"friendVerify"` + RateLimiter RateLimiter `yaml:"rateLimiter"` + CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"` } type Third struct { @@ -298,6 +331,8 @@ type Third struct { Kodo Kodo `yaml:"kodo"` Aws Aws `yaml:"aws"` } `yaml:"object"` + RateLimiter RateLimiter `yaml:"rateLimiter"` + CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"` } type Cos struct { BucketURL string `yaml:"bucketURL"` @@ -336,8 +371,10 @@ type Aws struct { } type User struct { - RPC RPC `yaml:"rpc"` - Prometheus Prometheus `yaml:"prometheus"` + RPC RPC `yaml:"rpc"` + Prometheus Prometheus `yaml:"prometheus"` + RateLimiter RateLimiter `yaml:"rateLimiter"` + CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"` } type RPC struct { diff --git a/pkg/common/startrpc/circuitbreaker.go b/pkg/common/startrpc/circuitbreaker.go new file mode 100644 index 000000000..060a3aa8e --- /dev/null +++ b/pkg/common/startrpc/circuitbreaker.go @@ -0,0 +1,107 @@ +package startrpc + +import ( + "context" + "time" + + "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/stability/circuitbreaker" + "github.com/openimsdk/tools/stability/circuitbreaker/sre" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type CircuitBreaker struct { + Enable bool `yaml:"enable"` + Success float64 `yaml:"success"` // success rate threshold (0.0-1.0) + Request int64 `yaml:"request"` // request threshold + Bucket int `yaml:"bucket"` // number of buckets + Window time.Duration `yaml:"window"` // time window for statistics +} + +func NewCircuitBreaker(config *CircuitBreaker) circuitbreaker.CircuitBreaker { + if !config.Enable { + return nil + } + + return sre.NewSREBraker( + sre.WithWindow(config.Window), + sre.WithBucket(config.Bucket), + sre.WithSuccess(config.Success), + sre.WithRequest(config.Request), + ) +} + +func UnaryCircuitBreakerInterceptor(breaker circuitbreaker.CircuitBreaker) grpc.ServerOption { + if breaker == nil { + return grpc.ChainUnaryInterceptor(func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) { + return handler(ctx, req) + }) + } + + return grpc.ChainUnaryInterceptor(func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) { + if err := breaker.Allow(); err != nil { + log.ZWarn(ctx, "rpc circuit breaker open", err, "method", info.FullMethod) + return nil, status.Error(codes.Unavailable, "service unavailable due to circuit breaker") + } + + resp, err = handler(ctx, req) + + if err != nil { + if st, ok := status.FromError(err); ok { + switch st.Code() { + case codes.OK: + breaker.MarkSuccess() + case codes.InvalidArgument, codes.NotFound, codes.AlreadyExists, codes.PermissionDenied: + breaker.MarkSuccess() + default: + breaker.MarkFailed() + } + } else { + breaker.MarkFailed() + } + } else { + breaker.MarkSuccess() + } + + return resp, err + + }) +} + +func StreamCircuitBreakerInterceptor(breaker circuitbreaker.CircuitBreaker) grpc.ServerOption { + if breaker == nil { + return grpc.ChainStreamInterceptor(func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + return handler(srv, ss) + }) + } + + return grpc.ChainStreamInterceptor(func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + if err := breaker.Allow(); err != nil { + log.ZWarn(ss.Context(), "rpc circuit breaker open", err, "method", info.FullMethod) + return status.Error(codes.Unavailable, "service unavailable due to circuit breaker") + } + + err := handler(srv, ss) + + if err != nil { + if st, ok := status.FromError(err); ok { + switch st.Code() { + case codes.OK: + breaker.MarkSuccess() + case codes.InvalidArgument, codes.NotFound, codes.AlreadyExists, codes.PermissionDenied: + breaker.MarkSuccess() + default: + breaker.MarkFailed() + } + } else { + breaker.MarkFailed() + } + } else { + breaker.MarkSuccess() + } + + return err + }) +} diff --git a/pkg/common/startrpc/ratelimit.go b/pkg/common/startrpc/ratelimit.go new file mode 100644 index 000000000..1c2ac8eae --- /dev/null +++ b/pkg/common/startrpc/ratelimit.go @@ -0,0 +1,70 @@ +package startrpc + +import ( + "context" + "time" + + "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/stability/ratelimit" + "github.com/openimsdk/tools/stability/ratelimit/bbr" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type RateLimiter struct { + Enable bool + Window time.Duration + Bucket int + CPUThreshold int64 +} + +func NewRateLimiter(config *RateLimiter) ratelimit.Limiter { + if !config.Enable { + return nil + } + + return bbr.NewBBRLimiter( + bbr.WithWindow(config.Window), + bbr.WithBucket(config.Bucket), + bbr.WithCPUThreshold(config.CPUThreshold), + ) +} + +func UnaryRateLimitInterceptor(limiter ratelimit.Limiter) grpc.ServerOption { + if limiter == nil { + return grpc.ChainUnaryInterceptor(func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) { + return handler(ctx, req) + }) + } + + return grpc.ChainUnaryInterceptor(func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) { + done, err := limiter.Allow() + if err != nil { + log.ZWarn(ctx, "rpc rate limited", err, "method", info.FullMethod) + return nil, status.Errorf(codes.ResourceExhausted, "rpc request rate limit exceeded: %v, please try again later", err) + } + + defer done(ratelimit.DoneInfo{}) + return handler(ctx, req) + }) +} + +func StreamRateLimitInterceptor(limiter ratelimit.Limiter) grpc.ServerOption { + if limiter == nil { + return grpc.ChainStreamInterceptor(func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + return handler(srv, ss) + }) + } + + return grpc.ChainStreamInterceptor(func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + done, err := limiter.Allow() + if err != nil { + log.ZWarn(ss.Context(), "rpc rate limited", err, "method", info.FullMethod) + return status.Errorf(codes.ResourceExhausted, "rpc request rate limit exceeded: %v, please try again later", err) + } + defer done(ratelimit.DoneInfo{}) + + return handler(srv, ss) + }) +} diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index 9715f2aac..247d13b6b 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -47,7 +47,7 @@ func init() { prommetrics.RegistryAll() } -func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *conf.Prometheus, listenIP, +func Start[T any](ctx context.Context, disc *conf.Discovery, circuitBreakerConfig *conf.CircuitBreaker, rateLimiterConfig *conf.RateLimiter, prometheusConfig *conf.Prometheus, listenIP, registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, notification *conf.Notification, config T, watchConfigNames []string, watchServiceNames []string, rpcFn func(ctx context.Context, config T, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error, @@ -84,6 +84,45 @@ func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *c } } + if circuitBreakerConfig != nil && circuitBreakerConfig.Enable { + cb := &CircuitBreaker{ + Enable: circuitBreakerConfig.Enable, + Success: circuitBreakerConfig.Success, + Request: circuitBreakerConfig.Request, + Bucket: circuitBreakerConfig.Bucket, + Window: circuitBreakerConfig.Window, + } + + breaker := NewCircuitBreaker(cb) + + options = append(options, + UnaryCircuitBreakerInterceptor(breaker), + StreamCircuitBreakerInterceptor(breaker), + ) + + log.ZInfo(ctx, "RPC circuit breaker enabled", + "service", rpcRegisterName, + "window", circuitBreakerConfig.Window, + "bucket", circuitBreakerConfig.Bucket, + "success", circuitBreakerConfig.Success, + "requestThreshold", circuitBreakerConfig.Request) + } + + if rateLimiterConfig != nil && rateLimiterConfig.Enable { + limiter := NewRateLimiter((*RateLimiter)(rateLimiterConfig)) + + options = append(options, + UnaryRateLimitInterceptor(limiter), + StreamRateLimitInterceptor(limiter), + ) + + log.ZInfo(ctx, "RPC rate limiter enabled", + "service", rpcRegisterName, + "window", rateLimiterConfig.Window, + "bucket", rateLimiterConfig.Bucket, + "cpuThreshold", rateLimiterConfig.CPUThreshold) + } + registerIP, err := network.GetRpcRegisterIP(registerIP) if err != nil { return err @@ -123,7 +162,7 @@ func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *c go func() { sigs := make(chan os.Signal, 1) - signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL) + signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT) select { case <-ctx.Done(): return