feat: implement ratelimit and circuitbreaker in middleware.

This commit is contained in:
mo3et 2025-09-19 11:28:38 +08:00
parent bf0289075b
commit 9bd6f3d356
32 changed files with 533 additions and 24 deletions

View File

@ -17,3 +17,13 @@ prometheus:
ports: ports:
# This address can be accessed via a browser # This address can be accessed via a browser
grafanaURL: grafanaURL:
ratelimiter:
# Whether to enable rate limiting
enable: true
# WindowSize defines time duration per window
window: 10s
# BucketNum defines bucket number for each window
bucket: 100
# CPU threshold; valid range 01000 (1000 = 100%)
cpuThreshold: 800

View File

@ -26,3 +26,20 @@ longConnSvr:
websocketMaxMsgLen: 4096 websocketMaxMsgLen: 4096
# WebSocket connection handshake timeout in seconds # WebSocket connection handshake timeout in seconds
websocketTimeout: 10 websocketTimeout: 10
ratelimiter:
# Whether to enable rate limiting
enable: true
# WindowSize defines time duration per window
window: 10s
# BucketNum defines bucket number for each window
bucket: 100
# CPU threshold; valid range 01000 (1000 = 100%)
cpuThreshold: 800
circuitBreaker:
enable: true
window: 3s # Time window size (seconds)
bucket: 10 # Number of buckets
success: 0.6 # Success rate threshold (0.6 means 60%)
requestThreshold: 100 # Request threshold; circuit breaker evaluation occurs when reached

View File

@ -6,3 +6,20 @@ prometheus:
# List of ports that Prometheus listens on; each port corresponds to an instance of monitoring. Ensure these are managed accordingly # List of ports that Prometheus listens on; each port corresponds to an instance of monitoring. Ensure these are managed accordingly
# It will only take effect when autoSetPorts is set to false. # It will only take effect when autoSetPorts is set to false.
ports: ports:
ratelimiter:
# Whether to enable rate limiting
enable: true
# WindowSize defines time duration per window
window: 10s
# BucketNum defines bucket number for each window
bucket: 100
# CPU threshold; valid range 01000 (1000 = 100%)
cpuThreshold: 800
circuitBreaker:
enable: true
window: 3s # Time window size (seconds)
bucket: 10 # Number of buckets
success: 0.6 # Success rate threshold (0.6 means 60%)
requestThreshold: 100 # Request threshold; circuit breaker evaluation occurs when reached

View File

@ -10,6 +10,22 @@ rpc:
# It will only take effect when autoSetPorts is set to false. # It will only take effect when autoSetPorts is set to false.
ports: ports:
ratelimiter:
# Whether to enable rate limiting
enable: true
# WindowSize defines time duration per window
window: 10s
# BucketNum defines bucket number for each window
bucket: 100
# CPU threshold; valid range 01000 (1000 = 100%)
cpuThreshold: 800
circuitBreaker:
enable: true
window: 3s # Time window size (seconds)
bucket: 10 # Number of buckets
success: 0.6 # Success rate threshold (0.6 means 60%)
requestThreshold: 100 # Request threshold; circuit breaker evaluation occurs when reached
prometheus: prometheus:
# Enable or disable Prometheus monitoring # Enable or disable Prometheus monitoring

View File

@ -20,3 +20,20 @@ prometheus:
tokenPolicy: tokenPolicy:
# Token validity period, in days # Token validity period, in days
expire: 90 expire: 90
ratelimiter:
# Whether to enable rate limiting
enable: true
# WindowSize defines time duration per window
window: 10s
# BucketNum defines bucket number for each window
bucket: 100
# CPU threshold; valid range 01000 (1000 = 100%)
cpuThreshold: 800
circuitBreaker:
enable: true
window: 3s # Time window size (seconds)
bucket: 10 # Number of buckets
success: 0.6 # Success rate threshold (0.6 means 60%)
requestThreshold: 100 # Request threshold; circuit breaker evaluation occurs when reached

View File

@ -16,3 +16,20 @@ prometheus:
# List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup
# It will only take effect when autoSetPorts is set to false. # It will only take effect when autoSetPorts is set to false.
ports: ports:
ratelimiter:
# Whether to enable rate limiting
enable: true
# WindowSize defines time duration per window
window: 10s
# BucketNum defines bucket number for each window
bucket: 100
# CPU threshold; valid range 01000 (1000 = 100%)
cpuThreshold: 800
circuitBreaker:
enable: true
window: 3s # Time window size (seconds)
bucket: 10 # Number of buckets
success: 0.6 # Success rate threshold (0.6 means 60%)
requestThreshold: 100 # Request threshold; circuit breaker evaluation occurs when reached

View File

@ -16,3 +16,20 @@ prometheus:
# List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup
# It will only take effect when autoSetPorts is set to false. # It will only take effect when autoSetPorts is set to false.
ports: ports:
ratelimiter:
# Whether to enable rate limiting
enable: true
# WindowSize defines time duration per window
window: 10s
# BucketNum defines bucket number for each window
bucket: 100
# CPU threshold; valid range 01000 (1000 = 100%)
cpuThreshold: 800
circuitBreaker:
enable: true
window: 3s # Time window size (seconds)
bucket: 10 # Number of buckets
success: 0.6 # Success rate threshold (0.6 means 60%)
requestThreshold: 100 # Request threshold; circuit breaker evaluation occurs when reached

View File

@ -19,3 +19,20 @@ prometheus:
enableHistoryForNewMembers: true enableHistoryForNewMembers: true
ratelimiter:
# Whether to enable rate limiting
enable: true
# WindowSize defines time duration per window
window: 10s
# BucketNum defines bucket number for each window
bucket: 100
# CPU threshold; valid range 01000 (1000 = 100%)
cpuThreshold: 800
circuitBreaker:
enable: true
window: 3s # Time window size (seconds)
bucket: 10 # Number of buckets
success: 0.6 # Success rate threshold (0.6 means 60%)
requestThreshold: 100 # Request threshold; circuit breaker evaluation occurs when reached

View File

@ -20,3 +20,20 @@ prometheus:
# Does sending messages require friend verification # Does sending messages require friend verification
friendVerify: false friendVerify: false
ratelimiter:
# Whether to enable rate limiting
enable: true
# WindowSize defines time duration per window
window: 10s
# BucketNum defines bucket number for each window
bucket: 100
# CPU threshold; valid range 01000 (1000 = 100%)
cpuThreshold: 800
circuitBreaker:
enable: true
window: 3s # Time window size (seconds)
bucket: 10 # Number of buckets
success: 0.6 # Success rate threshold (0.6 means 60%)
requestThreshold: 100 # Request threshold; circuit breaker evaluation occurs when reached

View File

@ -17,6 +17,22 @@ prometheus:
# It will only take effect when autoSetPorts is set to false. # It will only take effect when autoSetPorts is set to false.
ports: ports:
ratelimiter:
# Whether to enable rate limiting
enable: true
# WindowSize defines time duration per window
window: 10s
# BucketNum defines bucket number for each window
bucket: 100
# CPU threshold; valid range 01000 (1000 = 100%)
cpuThreshold: 800
circuitBreaker:
enable: true
window: 3s # Time window size (seconds)
bucket: 10 # Number of buckets
success: 0.6 # Success rate threshold (0.6 means 60%)
requestThreshold: 100 # Request threshold; circuit breaker evaluation occurs when reached
object: object:
# Use MinIO as object storage, or set to "cos", "oss", "kodo", "aws", while also configuring the corresponding settings # Use MinIO as object storage, or set to "cos", "oss", "kodo", "aws", while also configuring the corresponding settings

View File

@ -16,3 +16,20 @@ prometheus:
# Prometheus listening ports, must be consistent with the number of rpc.ports # Prometheus listening ports, must be consistent with the number of rpc.ports
# It will only take effect when autoSetPorts is set to false. # It will only take effect when autoSetPorts is set to false.
ports: ports:
ratelimiter:
# Whether to enable rate limiting
enable: true
# WindowSize defines time duration per window
window: 10s
# BucketNum defines bucket number for each window
bucket: 100
# CPU threshold; valid range 01000 (1000 = 100%)
cpuThreshold: 800
circuitBreaker:
enable: true
window: 3s # Time window size (seconds)
bucket: 10 # Number of buckets
success: 0.6 # Success rate threshold (0.6 means 60%)
requestThreshold: 100 # Request threshold; circuit breaker evaluation occurs when reached

4
go.mod
View File

@ -135,6 +135,7 @@ require (
github.com/leodido/go-urn v1.4.0 // indirect github.com/leodido/go-urn v1.4.0 // indirect
github.com/lestrrat-go/strftime v1.0.6 // indirect github.com/lestrrat-go/strftime v1.0.6 // indirect
github.com/lithammer/shortuuid v3.0.0+incompatible // indirect github.com/lithammer/shortuuid v3.0.0+incompatible // indirect
github.com/lufia/plan9stats v0.0.0-20230110061619-bbe2e5e100de // indirect
github.com/magefile/mage v1.15.0 // indirect github.com/magefile/mage v1.15.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect github.com/magiconair/properties v1.8.7 // indirect
github.com/mailru/easyjson v0.7.7 // indirect github.com/mailru/easyjson v0.7.7 // indirect
@ -151,6 +152,7 @@ require (
github.com/pelletier/go-toml/v2 v2.2.2 // indirect github.com/pelletier/go-toml/v2 v2.2.2 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b // indirect
github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.45.0 // indirect github.com/prometheus/common v0.45.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect
@ -159,6 +161,7 @@ require (
github.com/rs/xid v1.5.0 // indirect github.com/rs/xid v1.5.0 // indirect
github.com/sagikazarmark/locafero v0.4.0 // indirect github.com/sagikazarmark/locafero v0.4.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/shirou/gopsutil/v3 v3.23.2 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.11.0 // indirect github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.6.0 // indirect github.com/spf13/cast v1.6.0 // indirect
@ -210,6 +213,7 @@ require (
) )
require ( require (
github.com/go-kratos/aegis v0.2.0
github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/locales v0.14.1 // indirect
github.com/goccy/go-json v0.10.2 // indirect github.com/goccy/go-json v0.10.2 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-isatty v0.0.20 // indirect

17
go.sum
View File

@ -129,6 +129,8 @@ github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE
github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI=
github.com/gin-gonic/gin v1.9.1 h1:4idEAncQnU5cB7BeOkPtxjfCSye0AAm1R0RVIqJ+Jmg= github.com/gin-gonic/gin v1.9.1 h1:4idEAncQnU5cB7BeOkPtxjfCSye0AAm1R0RVIqJ+Jmg=
github.com/gin-gonic/gin v1.9.1/go.mod h1:hPrL7YrpYKXt5YId3A/Tnip5kqbEAP+KLuI3SUcPTeU= github.com/gin-gonic/gin v1.9.1/go.mod h1:hPrL7YrpYKXt5YId3A/Tnip5kqbEAP+KLuI3SUcPTeU=
github.com/go-kratos/aegis v0.2.0 h1:dObzCDWn3XVjUkgxyBp6ZeWtx/do0DPZ7LY3yNSJLUQ=
github.com/go-kratos/aegis v0.2.0/go.mod h1:v0R2m73WgEEYB3XYu6aE2WcMwsZkJ/Rzuf5eVccm7bI=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
@ -201,6 +203,7 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
@ -303,6 +306,9 @@ github.com/likexian/gokit v0.25.13 h1:p2Uw3+6fGG53CwdU2Dz0T6bOycdb2+bAFAa3ymwWVk
github.com/likexian/gokit v0.25.13/go.mod h1:qQhEWFBEfqLCO3/vOEo2EDKd+EycekVtUK4tex+l2H4= github.com/likexian/gokit v0.25.13/go.mod h1:qQhEWFBEfqLCO3/vOEo2EDKd+EycekVtUK4tex+l2H4=
github.com/lithammer/shortuuid v3.0.0+incompatible h1:NcD0xWW/MZYXEHa6ITy6kaXN5nwm/V115vj2YXfhS0w= github.com/lithammer/shortuuid v3.0.0+incompatible h1:NcD0xWW/MZYXEHa6ITy6kaXN5nwm/V115vj2YXfhS0w=
github.com/lithammer/shortuuid v3.0.0+incompatible/go.mod h1:FR74pbAuElzOUuenUHTK2Tciko1/vKuIKS9dSkDrA4w= github.com/lithammer/shortuuid v3.0.0+incompatible/go.mod h1:FR74pbAuElzOUuenUHTK2Tciko1/vKuIKS9dSkDrA4w=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
github.com/lufia/plan9stats v0.0.0-20230110061619-bbe2e5e100de h1:V53FWzU6KAZVi1tPp5UIsMoUWJ2/PNwYIDXnu7QuBCE=
github.com/lufia/plan9stats v0.0.0-20230110061619-bbe2e5e100de/go.mod h1:JKx41uQRwqlTZabZc+kILPrO/3jlKnQ2Z8b7YiVw5cE=
github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg= github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg=
github.com/magefile/mage v1.15.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= github.com/magefile/mage v1.15.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A=
github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
@ -361,6 +367,9 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b h1:0LFwY6Q3gMACTjAbMZBjXAqTOzOwFaj2Ld6cjeQ7Rig=
github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
github.com/prometheus/client_golang v1.18.0 h1:HzFfmkOzH5Q8L8G+kSJKUx5dtG87sewO+FoDDqP5Tbk= github.com/prometheus/client_golang v1.18.0 h1:HzFfmkOzH5Q8L8G+kSJKUx5dtG87sewO+FoDDqP5Tbk=
@ -395,6 +404,8 @@ github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6g
github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ= github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ=
github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI= github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=
github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shirou/gopsutil/v3 v3.23.2 h1:PAWSuiAszn7IhPMBtXsbSCafej7PqUOvY6YywlQUExU=
github.com/shirou/gopsutil/v3 v3.23.2/go.mod h1:gv0aQw33GLo3pG8SiWKiQrbDzbRY1K80RyZJ7V4Th1M=
github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=
github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0= github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0=
github.com/spf13/afero v1.11.0 h1:WJQKhtpdm3v2IzqG8VMqrr6Rf3UYpEF239Jy9wNepM8= github.com/spf13/afero v1.11.0 h1:WJQKhtpdm3v2IzqG8VMqrr6Rf3UYpEF239Jy9wNepM8=
@ -419,6 +430,7 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
@ -428,8 +440,10 @@ github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common v1.0.563/go.mod
github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/kms v1.0.563/go.mod h1:uom4Nvi9W+Qkom0exYiJ9VWJjXwyxtPYTkKkaLMlfE0= github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/kms v1.0.563/go.mod h1:uom4Nvi9W+Qkom0exYiJ9VWJjXwyxtPYTkKkaLMlfE0=
github.com/tencentyun/cos-go-sdk-v5 v0.7.47 h1:uoS4Sob16qEYoapkqJq1D1Vnsy9ira9BfNUMtoFYTI4= github.com/tencentyun/cos-go-sdk-v5 v0.7.47 h1:uoS4Sob16qEYoapkqJq1D1Vnsy9ira9BfNUMtoFYTI4=
github.com/tencentyun/cos-go-sdk-v5 v0.7.47/go.mod h1:DH9US8nB+AJXqwu/AMOrCFN1COv3dpytXuJWHgdg7kE= github.com/tencentyun/cos-go-sdk-v5 v0.7.47/go.mod h1:DH9US8nB+AJXqwu/AMOrCFN1COv3dpytXuJWHgdg7kE=
github.com/tklauser/go-sysconf v0.3.11/go.mod h1:GqXfhXY3kiPa0nAXPDIQIWzJbMCB7AmcWpGR8lSZfqI=
github.com/tklauser/go-sysconf v0.3.13 h1:GBUpcahXSpR2xN01jhkNAbTLRk2Yzgggk8IM08lq3r4= github.com/tklauser/go-sysconf v0.3.13 h1:GBUpcahXSpR2xN01jhkNAbTLRk2Yzgggk8IM08lq3r4=
github.com/tklauser/go-sysconf v0.3.13/go.mod h1:zwleP4Q4OehZHGn4CYZDipCgg9usW5IJePewFCGVEa0= github.com/tklauser/go-sysconf v0.3.13/go.mod h1:zwleP4Q4OehZHGn4CYZDipCgg9usW5IJePewFCGVEa0=
github.com/tklauser/numcpus v0.6.0/go.mod h1:FEZLMke0lhOUG6w2JadTzp0a+Nl8PF/GFkQ5UVIcaL4=
github.com/tklauser/numcpus v0.7.0 h1:yjuerZP127QG9m5Zh/mSO4wqurYil27tHrqwRoRjpr4= github.com/tklauser/numcpus v0.7.0 h1:yjuerZP127QG9m5Zh/mSO4wqurYil27tHrqwRoRjpr4=
github.com/tklauser/numcpus v0.7.0/go.mod h1:bb6dMVcj8A42tSE7i32fsIUCbQNllK5iDguyOZRUzAY= github.com/tklauser/numcpus v0.7.0/go.mod h1:bb6dMVcj8A42tSE7i32fsIUCbQNllK5iDguyOZRUzAY=
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
@ -449,6 +463,7 @@ github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7Jul
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0=
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
go.etcd.io/etcd/api/v3 v3.5.13 h1:8WXU2/NBge6AUF1K1gOexB6e07NgsN1hXK0rSTtgSp4= go.etcd.io/etcd/api/v3 v3.5.13 h1:8WXU2/NBge6AUF1K1gOexB6e07NgsN1hXK0rSTtgSp4=
@ -541,12 +556,14 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=

49
internal/api/ratelimit.go Normal file
View File

@ -0,0 +1,49 @@
package api
import (
"net/http"
"time"
"github.com/gin-gonic/gin"
"github.com/go-kratos/aegis/ratelimit"
"github.com/go-kratos/aegis/ratelimit/bbr"
"github.com/openimsdk/tools/apiresp"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
)
type RateLimiter struct {
Enable bool `yaml:"enable"`
Window time.Duration `yaml:"window"` // time duration per window
Bucket int `yaml:"bucket"` // bucket number for each window
CPUThreshold int64 `yaml:"cpuThreshold"` // CPU threshold; valid range 01000 (1000 = 100%)
}
func RateLimitMiddleware(config *RateLimiter) gin.HandlerFunc {
if !config.Enable {
return func(c *gin.Context) {
c.Next()
}
}
limiter := bbr.NewLimiter(
bbr.WithWindow(config.Window),
bbr.WithBucket(config.Bucket),
bbr.WithCPUThreshold(config.CPUThreshold),
)
return func(c *gin.Context) {
done, err := limiter.Allow()
if err != nil {
log.ZWarn(c, "rate limited", err, "path", c.Request.URL.Path)
c.AbortWithStatus(http.StatusTooManyRequests)
apiresp.GinError(c, errs.NewCodeError(http.StatusTooManyRequests, "too many requests, please try again later"))
return
}
c.Next()
done(ratelimit.DoneInfo{})
}
}

View File

@ -97,6 +97,18 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, cf
case BestSpeed: case BestSpeed:
r.Use(gzip.Gzip(gzip.BestSpeed)) r.Use(gzip.Gzip(gzip.BestSpeed))
} }
// Use rate limiter middleware
if cfg.API.RateLimiter.Enable {
rl := &RateLimiter{
Enable: cfg.API.RateLimiter.Enable,
Window: cfg.API.RateLimiter.Window,
Bucket: cfg.API.RateLimiter.Bucket,
CPUThreshold: cfg.API.RateLimiter.CPUThreshold,
}
r.Use(RateLimitMiddleware(rl))
}
if config.Standalone() { if config.Standalone() {
r.Use(func(c *gin.Context) { r.Use(func(c *gin.Context) {
c.Set(authverify.CtxAdminUserIDsKey, cfg.Share.IMAdminUser.UserIDs) c.Set(authverify.CtxAdminUserIDsKey, cfg.Share.IMAdminUser.UserIDs)

View File

@ -81,6 +81,9 @@ func (a *ApiCmd) runE() error {
} }
return startrpc.Start( return startrpc.Start(
a.ctx, &a.apiConfig.Discovery, a.ctx, &a.apiConfig.Discovery,
nil,
nil,
// &a.apiConfig.API.RateLimiter,
&prometheus, &prometheus,
a.apiConfig.API.Api.ListenIP, "", a.apiConfig.API.Api.ListenIP, "",
a.apiConfig.API.Prometheus.AutoSetPorts, a.apiConfig.API.Prometheus.AutoSetPorts,

View File

@ -57,7 +57,7 @@ func (a *AuthRpcCmd) Exec() error {
} }
func (a *AuthRpcCmd) runE() error { func (a *AuthRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.authConfig.Discovery, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP, return startrpc.Start(a.ctx, &a.authConfig.Discovery, &a.authConfig.RpcConfig.CircuitBreaker, &a.authConfig.RpcConfig.RateLimiter, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP,
a.authConfig.RpcConfig.RPC.RegisterIP, a.authConfig.RpcConfig.RPC.AutoSetPorts, a.authConfig.RpcConfig.RPC.Ports, a.authConfig.RpcConfig.RPC.RegisterIP, a.authConfig.RpcConfig.RPC.AutoSetPorts, a.authConfig.RpcConfig.RPC.Ports,
a.Index(), a.authConfig.Discovery.RpcService.Auth, nil, a.authConfig, a.Index(), a.authConfig.Discovery.RpcService.Auth, nil, a.authConfig,
[]string{ []string{

View File

@ -58,7 +58,7 @@ func (a *ConversationRpcCmd) Exec() error {
} }
func (a *ConversationRpcCmd) runE() error { func (a *ConversationRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.conversationConfig.Discovery, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP, return startrpc.Start(a.ctx, &a.conversationConfig.Discovery, &a.conversationConfig.RpcConfig.CircuitBreaker, &a.conversationConfig.RpcConfig.RateLimiter, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP,
a.conversationConfig.RpcConfig.RPC.RegisterIP, a.conversationConfig.RpcConfig.RPC.AutoSetPorts, a.conversationConfig.RpcConfig.RPC.Ports, a.conversationConfig.RpcConfig.RPC.RegisterIP, a.conversationConfig.RpcConfig.RPC.AutoSetPorts, a.conversationConfig.RpcConfig.RPC.Ports,
a.Index(), a.conversationConfig.Discovery.RpcService.Conversation, &a.conversationConfig.NotificationConfig, a.conversationConfig, a.Index(), a.conversationConfig.Discovery.RpcService.Conversation, &a.conversationConfig.NotificationConfig, a.conversationConfig,
[]string{ []string{

View File

@ -56,6 +56,8 @@ func (a *CronTaskCmd) runE() error {
var prometheus config.Prometheus var prometheus config.Prometheus
return startrpc.Start( return startrpc.Start(
a.ctx, &a.cronTaskConfig.Discovery, a.ctx, &a.cronTaskConfig.Discovery,
nil,
nil,
&prometheus, &prometheus,
"", "", "", "",
true, true,

View File

@ -58,7 +58,7 @@ func (a *FriendRpcCmd) Exec() error {
} }
func (a *FriendRpcCmd) runE() error { func (a *FriendRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.relationConfig.Discovery, &a.relationConfig.RpcConfig.Prometheus, a.relationConfig.RpcConfig.RPC.ListenIP, return startrpc.Start(a.ctx, &a.relationConfig.Discovery, &a.relationConfig.RpcConfig.CircuitBreaker, &a.relationConfig.RpcConfig.RateLimiter, &a.relationConfig.RpcConfig.Prometheus, a.relationConfig.RpcConfig.RPC.ListenIP,
a.relationConfig.RpcConfig.RPC.RegisterIP, a.relationConfig.RpcConfig.RPC.AutoSetPorts, a.relationConfig.RpcConfig.RPC.Ports, a.relationConfig.RpcConfig.RPC.RegisterIP, a.relationConfig.RpcConfig.RPC.AutoSetPorts, a.relationConfig.RpcConfig.RPC.Ports,
a.Index(), a.relationConfig.Discovery.RpcService.Friend, &a.relationConfig.NotificationConfig, a.relationConfig, a.Index(), a.relationConfig.Discovery.RpcService.Friend, &a.relationConfig.NotificationConfig, a.relationConfig,
[]string{ []string{

View File

@ -59,7 +59,7 @@ func (a *GroupRpcCmd) Exec() error {
} }
func (a *GroupRpcCmd) runE() error { func (a *GroupRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.groupConfig.Discovery, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP, return startrpc.Start(a.ctx, &a.groupConfig.Discovery, &a.groupConfig.RpcConfig.CircuitBreaker, &a.groupConfig.RpcConfig.RateLimiter, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP,
a.groupConfig.RpcConfig.RPC.RegisterIP, a.groupConfig.RpcConfig.RPC.AutoSetPorts, a.groupConfig.RpcConfig.RPC.Ports, a.groupConfig.RpcConfig.RPC.RegisterIP, a.groupConfig.RpcConfig.RPC.AutoSetPorts, a.groupConfig.RpcConfig.RPC.Ports,
a.Index(), a.groupConfig.Discovery.RpcService.Group, &a.groupConfig.NotificationConfig, a.groupConfig, a.Index(), a.groupConfig.Discovery.RpcService.Group, &a.groupConfig.NotificationConfig, a.groupConfig,
[]string{ []string{

View File

@ -59,7 +59,7 @@ func (a *MsgRpcCmd) Exec() error {
} }
func (a *MsgRpcCmd) runE() error { func (a *MsgRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.msgConfig.Discovery, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP, return startrpc.Start(a.ctx, &a.msgConfig.Discovery, &a.msgConfig.RpcConfig.CircuitBreaker, &a.msgConfig.RpcConfig.RateLimiter, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP,
a.msgConfig.RpcConfig.RPC.RegisterIP, a.msgConfig.RpcConfig.RPC.AutoSetPorts, a.msgConfig.RpcConfig.RPC.Ports, a.msgConfig.RpcConfig.RPC.RegisterIP, a.msgConfig.RpcConfig.RPC.AutoSetPorts, a.msgConfig.RpcConfig.RPC.Ports,
a.Index(), a.msgConfig.Discovery.RpcService.Msg, &a.msgConfig.NotificationConfig, a.msgConfig, a.Index(), a.msgConfig.Discovery.RpcService.Msg, &a.msgConfig.NotificationConfig, a.msgConfig,
[]string{ []string{

View File

@ -61,6 +61,8 @@ func (m *MsgGatewayCmd) runE() error {
var prometheus config.Prometheus var prometheus config.Prometheus
return startrpc.Start( return startrpc.Start(
m.ctx, &m.msgGatewayConfig.Discovery, m.ctx, &m.msgGatewayConfig.Discovery,
&m.msgGatewayConfig.MsgGateway.CircuitBreaker,
&m.msgGatewayConfig.MsgGateway.RateLimiter,
&prometheus, &prometheus,
rpc.ListenIP, rpc.RegisterIP, rpc.ListenIP, rpc.RegisterIP,
rpc.AutoSetPorts, rpc.AutoSetPorts,

View File

@ -62,6 +62,8 @@ func (m *MsgTransferCmd) runE() error {
var prometheus config.Prometheus var prometheus config.Prometheus
return startrpc.Start( return startrpc.Start(
m.ctx, &m.msgTransferConfig.Discovery, m.ctx, &m.msgTransferConfig.Discovery,
&m.msgTransferConfig.MsgTransfer.CircuitBreaker,
&m.msgTransferConfig.MsgTransfer.RateLimiter,
&prometheus, &prometheus,
"", "", "", "",
true, true,

View File

@ -60,7 +60,7 @@ func (a *PushRpcCmd) Exec() error {
} }
func (a *PushRpcCmd) runE() error { func (a *PushRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.pushConfig.Discovery, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP, return startrpc.Start(a.ctx, &a.pushConfig.Discovery, &a.pushConfig.RpcConfig.CircuitBreaker, &a.pushConfig.RpcConfig.RateLimiter, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP,
a.pushConfig.RpcConfig.RPC.RegisterIP, a.pushConfig.RpcConfig.RPC.AutoSetPorts, a.pushConfig.RpcConfig.RPC.Ports, a.pushConfig.RpcConfig.RPC.RegisterIP, a.pushConfig.RpcConfig.RPC.AutoSetPorts, a.pushConfig.RpcConfig.RPC.Ports,
a.Index(), a.pushConfig.Discovery.RpcService.Push, &a.pushConfig.NotificationConfig, a.pushConfig, a.Index(), a.pushConfig.Discovery.RpcService.Push, &a.pushConfig.NotificationConfig, a.pushConfig,
[]string{ []string{

View File

@ -58,7 +58,7 @@ func (a *ThirdRpcCmd) Exec() error {
} }
func (a *ThirdRpcCmd) runE() error { func (a *ThirdRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.thirdConfig.Discovery, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP, return startrpc.Start(a.ctx, &a.thirdConfig.Discovery, &a.thirdConfig.RpcConfig.CircuitBreaker, &a.thirdConfig.RpcConfig.RateLimiter, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP,
a.thirdConfig.RpcConfig.RPC.RegisterIP, a.thirdConfig.RpcConfig.RPC.AutoSetPorts, a.thirdConfig.RpcConfig.RPC.Ports, a.thirdConfig.RpcConfig.RPC.RegisterIP, a.thirdConfig.RpcConfig.RPC.AutoSetPorts, a.thirdConfig.RpcConfig.RPC.Ports,
a.Index(), a.thirdConfig.Discovery.RpcService.Third, &a.thirdConfig.NotificationConfig, a.thirdConfig, a.Index(), a.thirdConfig.Discovery.RpcService.Third, &a.thirdConfig.NotificationConfig, a.thirdConfig,
[]string{ []string{

View File

@ -59,7 +59,7 @@ func (a *UserRpcCmd) Exec() error {
} }
func (a *UserRpcCmd) runE() error { func (a *UserRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.userConfig.Discovery, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP, return startrpc.Start(a.ctx, &a.userConfig.Discovery, &a.userConfig.RpcConfig.CircuitBreaker, &a.userConfig.RpcConfig.RateLimiter, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP,
a.userConfig.RpcConfig.RPC.RegisterIP, a.userConfig.RpcConfig.RPC.AutoSetPorts, a.userConfig.RpcConfig.RPC.Ports, a.userConfig.RpcConfig.RPC.RegisterIP, a.userConfig.RpcConfig.RPC.AutoSetPorts, a.userConfig.RpcConfig.RPC.Ports,
a.Index(), a.userConfig.Discovery.RpcService.User, &a.userConfig.NotificationConfig, a.userConfig, a.Index(), a.userConfig.Discovery.RpcService.User, &a.userConfig.NotificationConfig, a.userConfig,
[]string{ []string{

View File

@ -143,6 +143,23 @@ type API struct {
Ports []int `yaml:"ports"` Ports []int `yaml:"ports"`
GrafanaURL string `yaml:"grafanaURL"` GrafanaURL string `yaml:"grafanaURL"`
} `yaml:"prometheus"` } `yaml:"prometheus"`
RateLimiter RateLimiter `yaml:"rateLimiter"`
}
type RateLimiter struct {
Enable bool `yaml:"enable"`
Window time.Duration `yaml:"window"`
Bucket int `yaml:"bucket"`
CPUThreshold int64 `yaml:"cpuThreshold"`
}
type CircuitBreaker struct {
Enable bool `yaml:"enable"`
Window time.Duration `yaml:"window"`
Bucket int `yaml:"bucket"`
Success float64 `yaml:"success"`
Request int64 `yaml:"request"`
} }
type CronTask struct { type CronTask struct {
@ -217,6 +234,8 @@ type MsgGateway struct {
WebsocketMaxMsgLen int `yaml:"websocketMaxMsgLen"` WebsocketMaxMsgLen int `yaml:"websocketMaxMsgLen"`
WebsocketTimeout int `yaml:"websocketTimeout"` WebsocketTimeout int `yaml:"websocketTimeout"`
} `yaml:"longConnSvr"` } `yaml:"longConnSvr"`
RateLimiter RateLimiter `yaml:"rateLimiter"`
CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"`
} }
type MsgTransfer struct { type MsgTransfer struct {
@ -225,6 +244,8 @@ type MsgTransfer struct {
AutoSetPorts bool `yaml:"autoSetPorts"` AutoSetPorts bool `yaml:"autoSetPorts"`
Ports []int `yaml:"ports"` Ports []int `yaml:"ports"`
} `yaml:"prometheus"` } `yaml:"prometheus"`
RateLimiter RateLimiter `yaml:"rateLimiter"`
CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"`
} }
type Push struct { type Push struct {
@ -256,6 +277,8 @@ type Push struct {
Production bool `yaml:"production"` Production bool `yaml:"production"`
} `yaml:"iosPush"` } `yaml:"iosPush"`
FullUserCache bool `yaml:"fullUserCache"` FullUserCache bool `yaml:"fullUserCache"`
RateLimiter RateLimiter `yaml:"rateLimiter"`
CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"`
} }
type Auth struct { type Auth struct {
@ -264,28 +287,38 @@ type Auth struct {
TokenPolicy struct { TokenPolicy struct {
Expire int64 `yaml:"expire"` Expire int64 `yaml:"expire"`
} `yaml:"tokenPolicy"` } `yaml:"tokenPolicy"`
RateLimiter RateLimiter `yaml:"rateLimiter"`
CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"`
} }
type Conversation struct { type Conversation struct {
RPC RPC `yaml:"rpc"` RPC RPC `yaml:"rpc"`
Prometheus Prometheus `yaml:"prometheus"` Prometheus Prometheus `yaml:"prometheus"`
RateLimiter RateLimiter `yaml:"rateLimiter"`
CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"`
} }
type Friend struct { type Friend struct {
RPC RPC `yaml:"rpc"` RPC RPC `yaml:"rpc"`
Prometheus Prometheus `yaml:"prometheus"` Prometheus Prometheus `yaml:"prometheus"`
RateLimiter RateLimiter `yaml:"rateLimiter"`
CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"`
} }
type Group struct { type Group struct {
RPC RPC `yaml:"rpc"` RPC RPC `yaml:"rpc"`
Prometheus Prometheus `yaml:"prometheus"` Prometheus Prometheus `yaml:"prometheus"`
EnableHistoryForNewMembers bool `yaml:"enableHistoryForNewMembers"` EnableHistoryForNewMembers bool `yaml:"enableHistoryForNewMembers"`
RateLimiter RateLimiter `yaml:"rateLimiter"`
CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"`
} }
type Msg struct { type Msg struct {
RPC RPC `yaml:"rpc"` RPC RPC `yaml:"rpc"`
Prometheus Prometheus `yaml:"prometheus"` Prometheus Prometheus `yaml:"prometheus"`
FriendVerify bool `yaml:"friendVerify"` FriendVerify bool `yaml:"friendVerify"`
RateLimiter RateLimiter `yaml:"rateLimiter"`
CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"`
} }
type Third struct { type Third struct {
@ -298,6 +331,8 @@ type Third struct {
Kodo Kodo `yaml:"kodo"` Kodo Kodo `yaml:"kodo"`
Aws Aws `yaml:"aws"` Aws Aws `yaml:"aws"`
} `yaml:"object"` } `yaml:"object"`
RateLimiter RateLimiter `yaml:"rateLimiter"`
CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"`
} }
type Cos struct { type Cos struct {
BucketURL string `yaml:"bucketURL"` BucketURL string `yaml:"bucketURL"`
@ -338,6 +373,8 @@ type Aws struct {
type User struct { type User struct {
RPC RPC `yaml:"rpc"` RPC RPC `yaml:"rpc"`
Prometheus Prometheus `yaml:"prometheus"` Prometheus Prometheus `yaml:"prometheus"`
RateLimiter RateLimiter `yaml:"rateLimiter"`
CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"`
} }
type RPC struct { type RPC struct {

View File

@ -0,0 +1,93 @@
package startrpc
import (
"context"
"time"
"github.com/go-kratos/aegis/circuitbreaker"
"github.com/go-kratos/aegis/circuitbreaker/sre"
"github.com/openimsdk/tools/log"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type CircuitBreaker struct {
Enable bool `yaml:"enable"`
Success float64 `yaml:"success"` // success rate threshold (0.0-1.0)
Request int64 `yaml:"request"` // request threshold
Bucket int `yaml:"bucket"` // number of buckets
Window time.Duration `yaml:"window"` // time window for statistics
}
func NewCircuitBreaker(config *CircuitBreaker) circuitbreaker.CircuitBreaker {
if !config.Enable {
return nil
}
return sre.NewBreaker(
sre.WithWindow(config.Window),
sre.WithBucket(config.Bucket),
sre.WithSuccess(config.Success),
sre.WithRequest(config.Request),
)
}
func UnaryCircuitBreakerInterceptor(breaker circuitbreaker.CircuitBreaker) grpc.ServerOption {
if breaker == nil {
return grpc.ChainUnaryInterceptor(func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
return handler(ctx, req)
})
}
return grpc.ChainUnaryInterceptor(func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
if err := breaker.Allow(); err != nil {
log.ZWarn(ctx, "rpc circuit breaker open", err, "method", info.FullMethod)
return nil, status.Error(codes.Unavailable, "service unavailable due to circuit breaker")
}
resp, err = handler(ctx, req)
if err != nil {
if st, ok := status.FromError(err); ok && st.Code() == codes.Internal {
breaker.MarkFailed()
} else {
breaker.MarkSuccess()
}
} else {
breaker.MarkSuccess()
}
return resp, err
})
}
func StreamCircuitBreakerInterceptor(breaker circuitbreaker.CircuitBreaker) grpc.ServerOption {
if breaker == nil {
return grpc.ChainStreamInterceptor(func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
return handler(srv, ss)
})
}
return grpc.ChainStreamInterceptor(func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
if err := breaker.Allow(); err != nil {
log.ZWarn(ss.Context(), "rpc circuit breaker open", err, "method", info.FullMethod)
return status.Error(codes.Unavailable, "service unavailable due to circuit breaker")
}
err := handler(srv, ss)
if err != nil {
if st, ok := status.FromError(err); ok && st.Code() == codes.Internal {
breaker.MarkFailed()
} else {
breaker.MarkSuccess()
}
} else {
breaker.MarkSuccess()
}
return err
})
}

View File

@ -0,0 +1,70 @@
package startrpc
import (
"context"
"time"
"github.com/go-kratos/aegis/ratelimit"
"github.com/go-kratos/aegis/ratelimit/bbr"
"github.com/openimsdk/tools/log"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type RateLimiter struct {
Enable bool
Window time.Duration
Bucket int
CPUThreshold int64
}
func NewRateLimiter(config *RateLimiter) ratelimit.Limiter {
if !config.Enable {
return nil
}
return bbr.NewLimiter(
bbr.WithWindow(config.Window),
bbr.WithBucket(config.Bucket),
bbr.WithCPUThreshold(config.CPUThreshold),
)
}
func UnaryRateLimitInterceptor(limiter ratelimit.Limiter) grpc.ServerOption {
if limiter == nil {
return grpc.ChainUnaryInterceptor(func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
return handler(ctx, req)
})
}
return grpc.ChainUnaryInterceptor(func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
done, err := limiter.Allow()
if err != nil {
log.ZWarn(ctx, "rpc rate limited", err, "method", info.FullMethod)
return nil, status.Errorf(codes.ResourceExhausted, "rpc request rate limit exceeded: %v, please try again later", err)
}
defer done(ratelimit.DoneInfo{})
return handler(ctx, req)
})
}
func StreamRateLimitInterceptor(limiter ratelimit.Limiter) grpc.ServerOption {
if limiter == nil {
return grpc.ChainStreamInterceptor(func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
return handler(srv, ss)
})
}
return grpc.ChainStreamInterceptor(func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
done, err := limiter.Allow()
if err != nil {
log.ZWarn(ss.Context(), "rpc rate limited", err, "method", info.FullMethod)
return status.Errorf(codes.ResourceExhausted, "rpc request rate limit exceeded: %v, please try again later", err)
}
defer done(ratelimit.DoneInfo{})
return handler(srv, ss)
})
}

View File

@ -47,7 +47,7 @@ func init() {
prommetrics.RegistryAll() prommetrics.RegistryAll()
} }
func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *conf.Prometheus, listenIP, func Start[T any](ctx context.Context, disc *conf.Discovery, circuitBreakerConfig *conf.CircuitBreaker, rateLimiterConfig *conf.RateLimiter, prometheusConfig *conf.Prometheus, listenIP,
registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, notification *conf.Notification, config T, registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, notification *conf.Notification, config T,
watchConfigNames []string, watchServiceNames []string, watchConfigNames []string, watchServiceNames []string,
rpcFn func(ctx context.Context, config T, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error, rpcFn func(ctx context.Context, config T, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error,
@ -84,6 +84,45 @@ func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *c
} }
} }
if circuitBreakerConfig != nil && circuitBreakerConfig.Enable {
cb := &CircuitBreaker{
Enable: circuitBreakerConfig.Enable,
Success: circuitBreakerConfig.Success,
Request: circuitBreakerConfig.Request,
Bucket: circuitBreakerConfig.Bucket,
Window: circuitBreakerConfig.Window,
}
breaker := NewCircuitBreaker(cb)
options = append(options,
UnaryCircuitBreakerInterceptor(breaker),
StreamCircuitBreakerInterceptor(breaker),
)
log.ZInfo(ctx, "RPC circuit breaker enabled",
"service", rpcRegisterName,
"window", circuitBreakerConfig.Window,
"bucket", circuitBreakerConfig.Bucket,
"success", circuitBreakerConfig.Success,
"requestThreshold", circuitBreakerConfig.Request)
}
if rateLimiterConfig != nil && rateLimiterConfig.Enable {
limiter := NewRateLimiter((*RateLimiter)(rateLimiterConfig))
options = append(options,
UnaryRateLimitInterceptor(limiter),
StreamRateLimitInterceptor(limiter),
)
log.ZInfo(ctx, "RPC rate limiter enabled",
"service", rpcRegisterName,
"window", rateLimiterConfig.Window,
"bucket", rateLimiterConfig.Bucket,
"cpuThreshold", rateLimiterConfig.CPUThreshold)
}
registerIP, err := network.GetRpcRegisterIP(registerIP) registerIP, err := network.GetRpcRegisterIP(registerIP)
if err != nil { if err != nil {
return err return err
@ -123,7 +162,7 @@ func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *c
go func() { go func() {
sigs := make(chan os.Signal, 1) sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL) signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT)
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return

View File

@ -163,7 +163,8 @@ func (c *ConversationMgo) FindUserID(ctx context.Context, userIDs []string, conv
) )
} }
func (c *ConversationMgo) FindUserIDAllConversationID(ctx context.Context, userID string) ([]string, error) { func (c *ConversationMgo) FindUserIDAllConversationID(ctx context.Context, userID string) ([]string, error) {
return mongoutil.Find[string](ctx, c.coll, bson.M{"owner_user_id": userID}, options.Find().SetProjection(bson.M{"_id": 0, "conversation_id": 1})) return mongoutil.Find[string](ctx, c.coll, bson.M{"owner_user_id": userID},
options.Find().SetProjection(bson.M{"_id": 0, "conversation_id": 1}))
} }
func (c *ConversationMgo) FindUserIDAllNotNotifyConversationID(ctx context.Context, userID string) ([]string, error) { func (c *ConversationMgo) FindUserIDAllNotNotifyConversationID(ctx context.Context, userID string) ([]string, error) {