diff --git a/config/openim-api.yml b/config/openim-api.yml index 103c36f95..df0177d24 100644 --- a/config/openim-api.yml +++ b/config/openim-api.yml @@ -17,3 +17,13 @@ prometheus: ports: # This address can be accessed via a browser grafanaURL: + +ratelimiter: + # Whether to enable rate limiting + enable: false + # WindowSize defines time duration per window + window: 20s + # BucketNum defines bucket number for each window + bucket: 500 + # CPU threshold; valid range 0–1000 (1000 = 100%) + cpuThreshold: 850 diff --git a/config/openim-msggateway.yml b/config/openim-msggateway.yml index 812df90f7..8ac07faae 100644 --- a/config/openim-msggateway.yml +++ b/config/openim-msggateway.yml @@ -26,3 +26,20 @@ longConnSvr: websocketMaxMsgLen: 4096 # WebSocket connection handshake timeout in seconds websocketTimeout: 10 + +ratelimiter: + # Whether to enable rate limiting + enable: false + # WindowSize defines time duration per window + window: 20s + # BucketNum defines bucket number for each window + bucket: 500 + # CPU threshold; valid range 0–1000 (1000 = 100%) + cpuThreshold: 850 + +circuitBreaker: + enable: false + window: 5s # Time window size (seconds) + bucket: 100 # Number of buckets + success: 0.6 # Success rate threshold (0.6 means 60%) + request: 500 # Request threshold; circuit breaker evaluation occurs when reached \ No newline at end of file diff --git a/config/openim-msgtransfer.yml b/config/openim-msgtransfer.yml index 52d6a805e..c6ea06803 100644 --- a/config/openim-msgtransfer.yml +++ b/config/openim-msgtransfer.yml @@ -6,3 +6,20 @@ prometheus: # List of ports that Prometheus listens on; each port corresponds to an instance of monitoring. Ensure these are managed accordingly # It will only take effect when autoSetPorts is set to false. ports: + +ratelimiter: + # Whether to enable rate limiting + enable: false + # WindowSize defines time duration per window + window: 20s + # BucketNum defines bucket number for each window + bucket: 500 + # CPU threshold; valid range 0–1000 (1000 = 100%) + cpuThreshold: 850 + +circuitBreaker: + enable: false + window: 5s # Time window size (seconds) + bucket: 100 # Number of buckets + success: 0.6 # Success rate threshold (0.6 means 60%) + request: 500 # Request threshold; circuit breaker evaluation occurs when reached \ No newline at end of file diff --git a/config/openim-push.yml b/config/openim-push.yml index 5db5b541a..58784f13d 100644 --- a/config/openim-push.yml +++ b/config/openim-push.yml @@ -10,10 +10,26 @@ rpc: # It will only take effect when autoSetPorts is set to false. ports: +ratelimiter: + # Whether to enable rate limiting + enable: false + # WindowSize defines time duration per window + window: 20s + # BucketNum defines bucket number for each window + bucket: 500 + # CPU threshold; valid range 0–1000 (1000 = 100%) + cpuThreshold: 850 + +circuitBreaker: + enable: false + window: 5s # Time window size (seconds) + bucket: 100 # Number of buckets + success: 0.6 # Success rate threshold (0.6 means 60%) + request: 500 # Request threshold; circuit breaker evaluation occurs when reached prometheus: # Enable or disable Prometheus monitoring - enable: true + enable: false # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup # It will only take effect when autoSetPorts is set to false. ports: diff --git a/config/openim-rpc-auth.yml b/config/openim-rpc-auth.yml index c42e556c4..09ceef0f4 100644 --- a/config/openim-rpc-auth.yml +++ b/config/openim-rpc-auth.yml @@ -20,3 +20,20 @@ prometheus: tokenPolicy: # Token validity period, in days expire: 90 + +ratelimiter: + # Whether to enable rate limiting + enable: false + # WindowSize defines time duration per window + window: 20s + # BucketNum defines bucket number for each window + bucket: 500 + # CPU threshold; valid range 0–1000 (1000 = 100%) + cpuThreshold: 850 + +circuitBreaker: + enable: false + window: 5s # Time window size (seconds) + bucket: 100 # Number of buckets + success: 0.6 # Success rate threshold (0.6 means 60%) + request: 500 # Request threshold; circuit breaker evaluation occurs when reached \ No newline at end of file diff --git a/config/openim-rpc-conversation.yml b/config/openim-rpc-conversation.yml index e722ac2b0..1135ffe6e 100644 --- a/config/openim-rpc-conversation.yml +++ b/config/openim-rpc-conversation.yml @@ -16,3 +16,20 @@ prometheus: # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup # It will only take effect when autoSetPorts is set to false. ports: + +ratelimiter: + # Whether to enable rate limiting + enable: false + # WindowSize defines time duration per window + window: 20s + # BucketNum defines bucket number for each window + bucket: 500 + # CPU threshold; valid range 0–1000 (1000 = 100%) + cpuThreshold: 850 + +circuitBreaker: + enable: false + window: 5s # Time window size (seconds) + bucket: 100 # Number of buckets + success: 0.6 # Success rate threshold (0.6 means 60%) + request: 500 # Request threshold; circuit breaker evaluation occurs when reached \ No newline at end of file diff --git a/config/openim-rpc-friend.yml b/config/openim-rpc-friend.yml index e722ac2b0..1135ffe6e 100644 --- a/config/openim-rpc-friend.yml +++ b/config/openim-rpc-friend.yml @@ -16,3 +16,20 @@ prometheus: # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup # It will only take effect when autoSetPorts is set to false. ports: + +ratelimiter: + # Whether to enable rate limiting + enable: false + # WindowSize defines time duration per window + window: 20s + # BucketNum defines bucket number for each window + bucket: 500 + # CPU threshold; valid range 0–1000 (1000 = 100%) + cpuThreshold: 850 + +circuitBreaker: + enable: false + window: 5s # Time window size (seconds) + bucket: 100 # Number of buckets + success: 0.6 # Success rate threshold (0.6 means 60%) + request: 500 # Request threshold; circuit breaker evaluation occurs when reached \ No newline at end of file diff --git a/config/openim-rpc-group.yml b/config/openim-rpc-group.yml index 252f64c28..154b149e1 100644 --- a/config/openim-rpc-group.yml +++ b/config/openim-rpc-group.yml @@ -19,3 +19,20 @@ prometheus: enableHistoryForNewMembers: true + +ratelimiter: + # Whether to enable rate limiting + enable: false + # WindowSize defines time duration per window + window: 20s + # BucketNum defines bucket number for each window + bucket: 500 + # CPU threshold; valid range 0–1000 (1000 = 100%) + cpuThreshold: 850 + +circuitBreaker: + enable: false + window: 5s # Time window size (seconds) + bucket: 100 # Number of buckets + success: 0.6 # Success rate threshold (0.6 means 60%) + request: 500 # Request threshold; circuit breaker evaluation occurs when reached \ No newline at end of file diff --git a/config/openim-rpc-msg.yml b/config/openim-rpc-msg.yml index 17fd3b8f4..ffe8943df 100644 --- a/config/openim-rpc-msg.yml +++ b/config/openim-rpc-msg.yml @@ -20,3 +20,20 @@ prometheus: # Does sending messages require friend verification friendVerify: false + +ratelimiter: + # Whether to enable rate limiting + enable: false + # WindowSize defines time duration per window + window: 20s + # BucketNum defines bucket number for each window + bucket: 500 + # CPU threshold; valid range 0–1000 (1000 = 100%) + cpuThreshold: 850 + +circuitBreaker: + enable: false + window: 5s # Time window size (seconds) + bucket: 100 # Number of buckets + success: 0.6 # Success rate threshold (0.6 means 60%) + request: 500 # Request threshold; circuit breaker evaluation occurs when reached \ No newline at end of file diff --git a/config/openim-rpc-third.yml b/config/openim-rpc-third.yml index 7169e6c61..f6fbee695 100644 --- a/config/openim-rpc-third.yml +++ b/config/openim-rpc-third.yml @@ -17,6 +17,22 @@ prometheus: # It will only take effect when autoSetPorts is set to false. ports: +ratelimiter: + # Whether to enable rate limiting + enable: false + # WindowSize defines time duration per window + window: 20s + # BucketNum defines bucket number for each window + bucket: 500 + # CPU threshold; valid range 0–1000 (1000 = 100%) + cpuThreshold: 850 + +circuitBreaker: + enable: false + window: 5s # Time window size (seconds) + bucket: 100 # Number of buckets + success: 0.6 # Success rate threshold (0.6 means 60%) + request: 500 # Request threshold; circuit breaker evaluation occurs when reached object: # Use MinIO as object storage, or set to "cos", "oss", "kodo", "aws", while also configuring the corresponding settings diff --git a/config/openim-rpc-user.yml b/config/openim-rpc-user.yml index 337cacd35..c46db7f37 100644 --- a/config/openim-rpc-user.yml +++ b/config/openim-rpc-user.yml @@ -16,3 +16,20 @@ prometheus: # Prometheus listening ports, must be consistent with the number of rpc.ports # It will only take effect when autoSetPorts is set to false. ports: + +ratelimiter: + # Whether to enable rate limiting + enable: false + # WindowSize defines time duration per window + window: 20s + # BucketNum defines bucket number for each window + bucket: 500 + # CPU threshold; valid range 0–1000 (1000 = 100%) + cpuThreshold: 850 + +circuitBreaker: + enable: false + window: 5s # Time window size (seconds) + bucket: 100 # Number of buckets + success: 0.6 # Success rate threshold (0.6 means 60%) + request: 500 # Request threshold; circuit breaker evaluation occurs when reached \ No newline at end of file diff --git a/go.mod b/go.mod index c0c5e36a8..de5c4f92f 100644 --- a/go.mod +++ b/go.mod @@ -12,8 +12,8 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.73-alpha.14 - github.com/openimsdk/tools v0.0.50-alpha.103 + github.com/openimsdk/protocol v0.0.73-alpha.17 + github.com/openimsdk/tools v0.0.50-alpha.105 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.10.0 @@ -135,6 +135,7 @@ require ( github.com/leodido/go-urn v1.4.0 // indirect github.com/lestrrat-go/strftime v1.0.6 // indirect github.com/lithammer/shortuuid v3.0.0+incompatible // indirect + github.com/lufia/plan9stats v0.0.0-20230110061619-bbe2e5e100de // indirect github.com/magefile/mage v1.15.0 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/mailru/easyjson v0.7.7 // indirect @@ -151,6 +152,7 @@ require ( github.com/pelletier/go-toml/v2 v2.2.2 // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b // indirect github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/common v0.45.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect @@ -160,6 +162,8 @@ require ( github.com/sagikazarmark/locafero v0.4.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect github.com/sercand/kuberesolver/v6 v6.0.1 // indirect + github.com/shirou/gopsutil/v3 v3.24.5 // indirect + github.com/shoenig/go-m1cpu v0.1.6 // indirect github.com/sourcegraph/conc v0.3.0 // indirect github.com/spf13/afero v1.11.0 // indirect github.com/spf13/cast v1.6.0 // indirect diff --git a/go.sum b/go.sum index c6de2aa38..ea874f8bc 100644 --- a/go.sum +++ b/go.sum @@ -303,6 +303,8 @@ github.com/likexian/gokit v0.25.13 h1:p2Uw3+6fGG53CwdU2Dz0T6bOycdb2+bAFAa3ymwWVk github.com/likexian/gokit v0.25.13/go.mod h1:qQhEWFBEfqLCO3/vOEo2EDKd+EycekVtUK4tex+l2H4= github.com/lithammer/shortuuid v3.0.0+incompatible h1:NcD0xWW/MZYXEHa6ITy6kaXN5nwm/V115vj2YXfhS0w= github.com/lithammer/shortuuid v3.0.0+incompatible/go.mod h1:FR74pbAuElzOUuenUHTK2Tciko1/vKuIKS9dSkDrA4w= +github.com/lufia/plan9stats v0.0.0-20230110061619-bbe2e5e100de h1:V53FWzU6KAZVi1tPp5UIsMoUWJ2/PNwYIDXnu7QuBCE= +github.com/lufia/plan9stats v0.0.0-20230110061619-bbe2e5e100de/go.mod h1:JKx41uQRwqlTZabZc+kILPrO/3jlKnQ2Z8b7YiVw5cE= github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg= github.com/magefile/mage v1.15.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= @@ -347,10 +349,10 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.15-alpha.11 h1:PQudYDRESYeYlUYrrLLJhYIlUPO5x7FAx+o5El9U/Bw= github.com/openimsdk/gomake v0.0.15-alpha.11/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.73-alpha.14 h1:lv9wNiPRm6G7q74TfpMobKrSfeTaBlZ+Ps3O6UFPmaE= -github.com/openimsdk/protocol v0.0.73-alpha.14/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= -github.com/openimsdk/tools v0.0.50-alpha.103 h1:jYvI86cWiVu8a8iw1panw+pwIiStuUHF76h3fxA6ESI= -github.com/openimsdk/tools v0.0.50-alpha.103/go.mod h1:qCExFBqXpQBMzZck3XGIFwivBayAn2KNqB3WAd++IJw= +github.com/openimsdk/protocol v0.0.73-alpha.17 h1:ddo0QMns1GVwAmrPIPlAQ7uKmThAYLnOt+CIOgLsJyE= +github.com/openimsdk/protocol v0.0.73-alpha.17/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= +github.com/openimsdk/tools v0.0.50-alpha.105 h1:axuCvKXhxY2RGLhpMMFNgBtE0B65T2Sr1JDW3UD9nBs= +github.com/openimsdk/tools v0.0.50-alpha.105/go.mod h1:x9i/e+WJFW4tocy6RNJQ9NofQiP3KJ1Y576/06TqOG4= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= @@ -361,6 +363,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b h1:0LFwY6Q3gMACTjAbMZBjXAqTOzOwFaj2Ld6cjeQ7Rig= +github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= github.com/prometheus/client_golang v1.18.0 h1:HzFfmkOzH5Q8L8G+kSJKUx5dtG87sewO+FoDDqP5Tbk= @@ -397,6 +401,12 @@ github.com/sercand/kuberesolver/v6 v6.0.1 h1:XZUTA0gy/lgDYp/UhEwv7Js24F1j8NJ833Q github.com/sercand/kuberesolver/v6 v6.0.1/go.mod h1:C0tsTuRMONSY+Xf7pv7RMW1/JlewY1+wS8SZE+1lf1s= github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI= github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= +github.com/shirou/gopsutil/v3 v3.24.5 h1:i0t8kL+kQTvpAYToeuiVk3TgDeKOFioZO3Ztz/iZ9pI= +github.com/shirou/gopsutil/v3 v3.24.5/go.mod h1:bsoOS1aStSs9ErQ1WWfxllSeS1K5D+U30r2NfcubMVk= +github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM= +github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ= +github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU= +github.com/shoenig/test v0.6.4/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k= github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0= github.com/spf13/afero v1.11.0 h1:WJQKhtpdm3v2IzqG8VMqrr6Rf3UYpEF239Jy9wNepM8= @@ -548,6 +558,7 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/internal/api/conversation.go b/internal/api/conversation.go index e222d893a..bc550d55a 100644 --- a/internal/api/conversation.go +++ b/internal/api/conversation.go @@ -62,3 +62,7 @@ func (o *ConversationApi) GetPinnedConversationIDs(c *gin.Context) { func (o *ConversationApi) UpdateConversationsByUser(c *gin.Context) { a2r.Call(c, conversation.ConversationClient.UpdateConversationsByUser, o.Client) } + +func (o *ConversationApi) DeleteConversations(c *gin.Context) { + a2r.Call(c, conversation.ConversationClient.DeleteConversations, o.Client) +} diff --git a/internal/api/ratelimit.go b/internal/api/ratelimit.go new file mode 100644 index 000000000..0bbf973de --- /dev/null +++ b/internal/api/ratelimit.go @@ -0,0 +1,83 @@ +package api + +import ( + "fmt" + "math" + "net/http" + "strconv" + "time" + + "github.com/gin-gonic/gin" + + "github.com/openimsdk/tools/apiresp" + "github.com/openimsdk/tools/errs" + "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/stability/ratelimit" + "github.com/openimsdk/tools/stability/ratelimit/bbr" +) + +type RateLimiter struct { + Enable bool `yaml:"enable"` + Window time.Duration `yaml:"window"` // time duration per window + Bucket int `yaml:"bucket"` // bucket number for each window + CPUThreshold int64 `yaml:"cpuThreshold"` // CPU threshold; valid range 0–1000 (1000 = 100%) +} + +func RateLimitMiddleware(config *RateLimiter) gin.HandlerFunc { + if !config.Enable { + return func(c *gin.Context) { + c.Next() + } + } + + limiter := bbr.NewBBRLimiter( + bbr.WithWindow(config.Window), + bbr.WithBucket(config.Bucket), + bbr.WithCPUThreshold(config.CPUThreshold), + ) + + return func(c *gin.Context) { + status := limiter.Stat() + + c.Header("X-BBR-CPU", strconv.FormatInt(status.CPU, 10)) + c.Header("X-BBR-MinRT", strconv.FormatInt(status.MinRt, 10)) + c.Header("X-BBR-MaxPass", strconv.FormatInt(status.MaxPass, 10)) + c.Header("X-BBR-MaxInFlight", strconv.FormatInt(status.MaxInFlight, 10)) + c.Header("X-BBR-InFlight", strconv.FormatInt(status.InFlight, 10)) + + done, err := limiter.Allow() + if err != nil { + + c.Header("X-RateLimit-Policy", "BBR") + c.Header("Retry-After", calculateBBRRetryAfter(status)) + c.Header("X-RateLimit-Limit", strconv.FormatInt(status.MaxInFlight, 10)) + c.Header("X-RateLimit-Remaining", "0") // There is no concept of remaining quota in BBR. + + fmt.Println("rate limited:", err, "path:", c.Request.URL.Path) + log.ZWarn(c, "rate limited", err, "path", c.Request.URL.Path) + c.AbortWithStatus(http.StatusTooManyRequests) + apiresp.GinError(c, errs.NewCodeError(http.StatusTooManyRequests, "too many requests, please try again later")) + return + } + + c.Next() + done(ratelimit.DoneInfo{}) + } +} + +func calculateBBRRetryAfter(status bbr.Stat) string { + loadRatio := float64(status.CPU) / float64(status.CPU) + + if loadRatio < 0.8 { + return "1" + } + if loadRatio < 0.95 { + return "2" + } + + backoff := 1 + int64(math.Pow(loadRatio-0.95, 2)*50) + if backoff > 5 { + backoff = 5 + } + return strconv.FormatInt(backoff, 10) +} diff --git a/internal/api/router.go b/internal/api/router.go index 1d3a92dd7..b3bad136e 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -97,6 +97,18 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, cf case BestSpeed: r.Use(gzip.Gzip(gzip.BestSpeed)) } + + // Use rate limiter middleware + if cfg.API.RateLimiter.Enable { + rl := &RateLimiter{ + Enable: cfg.API.RateLimiter.Enable, + Window: cfg.API.RateLimiter.Window, + Bucket: cfg.API.RateLimiter.Bucket, + CPUThreshold: cfg.API.RateLimiter.CPUThreshold, + } + r.Use(RateLimitMiddleware(rl)) + } + if config.Standalone() { r.Use(func(c *gin.Context) { c.Set(authverify.CtxAdminUserIDsKey, cfg.Share.IMAdminUser.UserIDs) @@ -277,6 +289,7 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, cf conversationGroup.POST("/get_owner_conversation", c.GetOwnerConversation) conversationGroup.POST("/get_not_notify_conversation_ids", c.GetNotNotifyConversationIDs) conversationGroup.POST("/get_pinned_conversation_ids", c.GetPinnedConversationIDs) + conversationGroup.POST("/delete_conversations", c.DeleteConversations) } { diff --git a/internal/msggateway/ws_server.go b/internal/msggateway/ws_server.go index 504167e01..3d20923b5 100644 --- a/internal/msggateway/ws_server.go +++ b/internal/msggateway/ws_server.go @@ -13,6 +13,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" "github.com/openimsdk/open-im-server/v3/pkg/rpccache" pbAuth "github.com/openimsdk/protocol/auth" + "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/mcontext" "github.com/go-playground/validator/v10" @@ -64,6 +65,8 @@ type WsServer struct { webhookClient *webhook.Client userClient *rpcli.UserClient authClient *rpcli.AuthClient + + ready atomic.Bool } type kickHandler struct { @@ -93,6 +96,8 @@ func (ws *WsServer) SetDiscoveryRegistry(ctx context.Context, disCov discovery.C ws.authClient = rpcli.NewAuthClient(authConn) ws.MessageHandler = NewGrpcHandler(ws.validate, rpcli.NewMsgClient(msgConn), rpcli.NewPushMsgServiceClient(pushConn)) ws.disCov = disCov + + ws.ready.Store(true) return nil } @@ -437,6 +442,11 @@ func (ws *WsServer) wsHandler(w http.ResponseWriter, r *http.Request) { // Create a new connection context connContext := newContext(w, r) + if !ws.ready.Load() { + httpError(connContext, errs.New("ws server not ready")) + return + } + // Check if the current number of online user connections exceeds the maximum limit if ws.onlineUserConnNum.Load() >= ws.wsMaxConnNum { // If it exceeds the maximum connection number, return an error via HTTP and stop processing @@ -453,6 +463,11 @@ func (ws *WsServer) wsHandler(w http.ResponseWriter, r *http.Request) { return } + if ws.authClient == nil { + httpError(connContext, errs.New("auth client is not initialized")) + return + } + // Call the authentication client to parse the Token obtained from the context resp, err := ws.authClient.ParseToken(connContext, connContext.GetToken()) if err != nil { diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 7a6a010c3..b9231c978 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "errors" + "github.com/openimsdk/tools/mq" "sync" diff --git a/internal/rpc/conversation/conversation.go b/internal/rpc/conversation/conversation.go index 6d2c27e92..6b65bea18 100644 --- a/internal/rpc/conversation/conversation.go +++ b/internal/rpc/conversation/conversation.go @@ -23,6 +23,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/protocol/constant" pbconversation "github.com/openimsdk/protocol/conversation" + "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/errs" @@ -781,7 +782,7 @@ func (c *conversationServer) ClearUserConversationMsg(ctx context.Context, req * } latestMsgDestructTime := time.UnixMilli(req.Timestamp) for i, conversation := range conversations { - if conversation.IsMsgDestruct == false || conversation.MsgDestructTime == 0 { + if !conversation.IsMsgDestruct || conversation.MsgDestructTime == 0 { continue } seq, err := c.msgClient.GetLastMessageSeqByTime(ctx, conversation.ConversationID, req.Timestamp-(conversation.MsgDestructTime*1000)) @@ -821,3 +822,53 @@ func (c *conversationServer) setConversationMinSeqAndLatestMsgDestructTime(ctx c c.conversationNotificationSender.ConversationChangeNotification(ctx, ownerUserID, []string{conversationID}) return nil } + +func (c *conversationServer) DeleteConversations(ctx context.Context, req *pbconversation.DeleteConversationsReq) (resp *pbconversation.DeleteConversationsResp, err error) { + if err := authverify.CheckAccess(ctx, req.OwnerUserID); err != nil { + return nil, err + } + if req.NeedDeleteTime == 0 && len(req.ConversationIDs) == 0 { + return nil, errs.ErrArgs.WrapMsg("need_delete_time or conversationIDs need be set") + } + + if req.NeedDeleteTime != 0 && len(req.ConversationIDs) != 0 { + return nil, errs.ErrArgs.WrapMsg("need_delete_time and conversationIDs cannot both be set") + } + + var needDeleteConversationIDs []string + + if len(req.ConversationIDs) == 0 { + deleteTimeThreshold := time.Now().AddDate(0, 0, -int(req.NeedDeleteTime)).UnixMilli() + conversationIDs, err := c.conversationDatabase.GetConversationIDs(ctx, req.OwnerUserID) + if err != nil { + return nil, err + } + latestMsgs, err := c.msgClient.GetLastMessage(ctx, &msg.GetLastMessageReq{ + UserID: req.OwnerUserID, + ConversationIDs: conversationIDs, + }) + if err != nil { + return nil, err + } + + for conversationID, msg := range latestMsgs.Msgs { + if msg.SendTime < deleteTimeThreshold { + needDeleteConversationIDs = append(needDeleteConversationIDs, conversationID) + } + } + + if len(needDeleteConversationIDs) == 0 { + return &pbconversation.DeleteConversationsResp{}, nil + } + } else { + needDeleteConversationIDs = req.ConversationIDs + } + + if err := c.conversationDatabase.DeleteUsersConversations(ctx, req.OwnerUserID, needDeleteConversationIDs); err != nil { + return nil, err + } + + // c.conversationNotificationSender.ConversationDeleteNotification(ctx, req.OwnerUserID, needDeleteConversationIDs) + + return &pbconversation.DeleteConversationsResp{}, nil +} diff --git a/internal/rpc/conversation/notification.go b/internal/rpc/conversation/notification.go index 95cd6d90f..dc1466ec8 100644 --- a/internal/rpc/conversation/notification.go +++ b/internal/rpc/conversation/notification.go @@ -59,3 +59,12 @@ func (c *ConversationNotificationSender) ConversationUnreadChangeNotification( c.Notification(ctx, userID, userID, constant.ConversationUnreadNotification, tips) } + +func (c *ConversationNotificationSender) ConversationDeleteNotification(ctx context.Context, userID string, conversationIDs []string) { + tips := &sdkws.ConversationDeleteTips{ + UserID: userID, + ConversationIDs: conversationIDs, + } + + c.Notification(ctx, userID, userID, constant.ConversationDeleteNotification, tips) +} diff --git a/pkg/common/cmd/api.go b/pkg/common/cmd/api.go index 6c64a95e1..40b080664 100644 --- a/pkg/common/cmd/api.go +++ b/pkg/common/cmd/api.go @@ -67,6 +67,9 @@ func (a *ApiCmd) runE() error { } return startrpc.Start( a.ctx, &a.apiConfig.Discovery, + nil, + nil, + // &a.apiConfig.API.RateLimiter, &prometheus, a.apiConfig.API.Api.ListenIP, "", a.apiConfig.API.Prometheus.AutoSetPorts, diff --git a/pkg/common/cmd/auth.go b/pkg/common/cmd/auth.go index b6165cc16..e38a322d4 100644 --- a/pkg/common/cmd/auth.go +++ b/pkg/common/cmd/auth.go @@ -43,7 +43,7 @@ func (a *AuthRpcCmd) Exec() error { } func (a *AuthRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.authConfig.Discovery, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP, + return startrpc.Start(a.ctx, &a.authConfig.Discovery, &a.authConfig.RpcConfig.CircuitBreaker, &a.authConfig.RpcConfig.RateLimiter, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP, a.authConfig.RpcConfig.RPC.RegisterIP, a.authConfig.RpcConfig.RPC.AutoSetPorts, a.authConfig.RpcConfig.RPC.Ports, a.Index(), a.authConfig.Discovery.RpcService.Auth, nil, a.authConfig, []string{ diff --git a/pkg/common/cmd/conversation.go b/pkg/common/cmd/conversation.go index 652665d2d..e9dbdac4c 100644 --- a/pkg/common/cmd/conversation.go +++ b/pkg/common/cmd/conversation.go @@ -44,7 +44,7 @@ func (a *ConversationRpcCmd) Exec() error { } func (a *ConversationRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.conversationConfig.Discovery, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP, + return startrpc.Start(a.ctx, &a.conversationConfig.Discovery, &a.conversationConfig.RpcConfig.CircuitBreaker, &a.conversationConfig.RpcConfig.RateLimiter, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP, a.conversationConfig.RpcConfig.RPC.RegisterIP, a.conversationConfig.RpcConfig.RPC.AutoSetPorts, a.conversationConfig.RpcConfig.RPC.Ports, a.Index(), a.conversationConfig.Discovery.RpcService.Conversation, &a.conversationConfig.NotificationConfig, a.conversationConfig, []string{ diff --git a/pkg/common/cmd/cron_task.go b/pkg/common/cmd/cron_task.go index 811ad6cb3..96798c809 100644 --- a/pkg/common/cmd/cron_task.go +++ b/pkg/common/cmd/cron_task.go @@ -42,6 +42,8 @@ func (a *CronTaskCmd) runE() error { var prometheus config.Prometheus return startrpc.Start( a.ctx, &a.cronTaskConfig.Discovery, + nil, + nil, &prometheus, "", "", true, diff --git a/pkg/common/cmd/friend.go b/pkg/common/cmd/friend.go index b2dc684b9..fe0491ab0 100644 --- a/pkg/common/cmd/friend.go +++ b/pkg/common/cmd/friend.go @@ -44,7 +44,7 @@ func (a *FriendRpcCmd) Exec() error { } func (a *FriendRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.relationConfig.Discovery, &a.relationConfig.RpcConfig.Prometheus, a.relationConfig.RpcConfig.RPC.ListenIP, + return startrpc.Start(a.ctx, &a.relationConfig.Discovery, &a.relationConfig.RpcConfig.CircuitBreaker, &a.relationConfig.RpcConfig.RateLimiter, &a.relationConfig.RpcConfig.Prometheus, a.relationConfig.RpcConfig.RPC.ListenIP, a.relationConfig.RpcConfig.RPC.RegisterIP, a.relationConfig.RpcConfig.RPC.AutoSetPorts, a.relationConfig.RpcConfig.RPC.Ports, a.Index(), a.relationConfig.Discovery.RpcService.Friend, &a.relationConfig.NotificationConfig, a.relationConfig, []string{ diff --git a/pkg/common/cmd/group.go b/pkg/common/cmd/group.go index 04dea3472..91d1958b5 100644 --- a/pkg/common/cmd/group.go +++ b/pkg/common/cmd/group.go @@ -45,7 +45,7 @@ func (a *GroupRpcCmd) Exec() error { } func (a *GroupRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.groupConfig.Discovery, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP, + return startrpc.Start(a.ctx, &a.groupConfig.Discovery, &a.groupConfig.RpcConfig.CircuitBreaker, &a.groupConfig.RpcConfig.RateLimiter, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP, a.groupConfig.RpcConfig.RPC.RegisterIP, a.groupConfig.RpcConfig.RPC.AutoSetPorts, a.groupConfig.RpcConfig.RPC.Ports, a.Index(), a.groupConfig.Discovery.RpcService.Group, &a.groupConfig.NotificationConfig, a.groupConfig, []string{ diff --git a/pkg/common/cmd/msg.go b/pkg/common/cmd/msg.go index f30b1b326..376bd6888 100644 --- a/pkg/common/cmd/msg.go +++ b/pkg/common/cmd/msg.go @@ -45,7 +45,7 @@ func (a *MsgRpcCmd) Exec() error { } func (a *MsgRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.msgConfig.Discovery, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP, + return startrpc.Start(a.ctx, &a.msgConfig.Discovery, &a.msgConfig.RpcConfig.CircuitBreaker, &a.msgConfig.RpcConfig.RateLimiter, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP, a.msgConfig.RpcConfig.RPC.RegisterIP, a.msgConfig.RpcConfig.RPC.AutoSetPorts, a.msgConfig.RpcConfig.RPC.Ports, a.Index(), a.msgConfig.Discovery.RpcService.Msg, &a.msgConfig.NotificationConfig, a.msgConfig, []string{ diff --git a/pkg/common/cmd/msg_gateway.go b/pkg/common/cmd/msg_gateway.go index 2fb4288b1..59c0b83fb 100644 --- a/pkg/common/cmd/msg_gateway.go +++ b/pkg/common/cmd/msg_gateway.go @@ -47,6 +47,8 @@ func (m *MsgGatewayCmd) runE() error { var prometheus config.Prometheus return startrpc.Start( m.ctx, &m.msgGatewayConfig.Discovery, + &m.msgGatewayConfig.MsgGateway.CircuitBreaker, + &m.msgGatewayConfig.MsgGateway.RateLimiter, &prometheus, rpc.ListenIP, rpc.RegisterIP, rpc.AutoSetPorts, diff --git a/pkg/common/cmd/msg_transfer.go b/pkg/common/cmd/msg_transfer.go index 7892c8db7..a3c9d6871 100644 --- a/pkg/common/cmd/msg_transfer.go +++ b/pkg/common/cmd/msg_transfer.go @@ -48,6 +48,8 @@ func (m *MsgTransferCmd) runE() error { var prometheus config.Prometheus return startrpc.Start( m.ctx, &m.msgTransferConfig.Discovery, + &m.msgTransferConfig.MsgTransfer.CircuitBreaker, + &m.msgTransferConfig.MsgTransfer.RateLimiter, &prometheus, "", "", true, diff --git a/pkg/common/cmd/push.go b/pkg/common/cmd/push.go index 391d2cc87..3e4e908c2 100644 --- a/pkg/common/cmd/push.go +++ b/pkg/common/cmd/push.go @@ -46,7 +46,7 @@ func (a *PushRpcCmd) Exec() error { } func (a *PushRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.pushConfig.Discovery, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP, + return startrpc.Start(a.ctx, &a.pushConfig.Discovery, &a.pushConfig.RpcConfig.CircuitBreaker, &a.pushConfig.RpcConfig.RateLimiter, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP, a.pushConfig.RpcConfig.RPC.RegisterIP, a.pushConfig.RpcConfig.RPC.AutoSetPorts, a.pushConfig.RpcConfig.RPC.Ports, a.Index(), a.pushConfig.Discovery.RpcService.Push, &a.pushConfig.NotificationConfig, a.pushConfig, []string{ diff --git a/pkg/common/cmd/third.go b/pkg/common/cmd/third.go index 94011832d..aadf39848 100644 --- a/pkg/common/cmd/third.go +++ b/pkg/common/cmd/third.go @@ -44,7 +44,7 @@ func (a *ThirdRpcCmd) Exec() error { } func (a *ThirdRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.thirdConfig.Discovery, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP, + return startrpc.Start(a.ctx, &a.thirdConfig.Discovery, &a.thirdConfig.RpcConfig.CircuitBreaker, &a.thirdConfig.RpcConfig.RateLimiter, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP, a.thirdConfig.RpcConfig.RPC.RegisterIP, a.thirdConfig.RpcConfig.RPC.AutoSetPorts, a.thirdConfig.RpcConfig.RPC.Ports, a.Index(), a.thirdConfig.Discovery.RpcService.Third, &a.thirdConfig.NotificationConfig, a.thirdConfig, []string{ diff --git a/pkg/common/cmd/user.go b/pkg/common/cmd/user.go index b38ebfd37..43e583ddc 100644 --- a/pkg/common/cmd/user.go +++ b/pkg/common/cmd/user.go @@ -45,7 +45,7 @@ func (a *UserRpcCmd) Exec() error { } func (a *UserRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.userConfig.Discovery, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP, + return startrpc.Start(a.ctx, &a.userConfig.Discovery, &a.userConfig.RpcConfig.CircuitBreaker, &a.userConfig.RpcConfig.RateLimiter, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP, a.userConfig.RpcConfig.RPC.RegisterIP, a.userConfig.RpcConfig.RPC.AutoSetPorts, a.userConfig.RpcConfig.RPC.Ports, a.Index(), a.userConfig.Discovery.RpcService.User, &a.userConfig.NotificationConfig, a.userConfig, []string{ diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 50b06be01..4f74eeee7 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -129,6 +129,23 @@ type API struct { Ports []int `yaml:"ports"` GrafanaURL string `yaml:"grafanaURL"` } `yaml:"prometheus"` + + RateLimiter RateLimiter `yaml:"rateLimiter"` +} + +type RateLimiter struct { + Enable bool `yaml:"enable"` + Window time.Duration `yaml:"window"` + Bucket int `yaml:"bucket"` + CPUThreshold int64 `yaml:"cpuThreshold"` +} + +type CircuitBreaker struct { + Enable bool `yaml:"enable"` + Window time.Duration `yaml:"window"` + Bucket int `yaml:"bucket"` + Success float64 `yaml:"success"` + Request int64 `yaml:"request"` } type CronTask struct { @@ -203,6 +220,8 @@ type MsgGateway struct { WebsocketMaxMsgLen int `yaml:"websocketMaxMsgLen"` WebsocketTimeout int `yaml:"websocketTimeout"` } `yaml:"longConnSvr"` + RateLimiter RateLimiter `yaml:"rateLimiter"` + CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"` } type MsgTransfer struct { @@ -211,6 +230,8 @@ type MsgTransfer struct { AutoSetPorts bool `yaml:"autoSetPorts"` Ports []int `yaml:"ports"` } `yaml:"prometheus"` + RateLimiter RateLimiter `yaml:"rateLimiter"` + CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"` } type Push struct { @@ -241,7 +262,9 @@ type Push struct { BadgeCount bool `yaml:"badgeCount"` Production bool `yaml:"production"` } `yaml:"iosPush"` - FullUserCache bool `yaml:"fullUserCache"` + FullUserCache bool `yaml:"fullUserCache"` + RateLimiter RateLimiter `yaml:"rateLimiter"` + CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"` } type Auth struct { @@ -250,28 +273,38 @@ type Auth struct { TokenPolicy struct { Expire int64 `yaml:"expire"` } `yaml:"tokenPolicy"` + RateLimiter RateLimiter `yaml:"rateLimiter"` + CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"` } type Conversation struct { - RPC RPC `yaml:"rpc"` - Prometheus Prometheus `yaml:"prometheus"` + RPC RPC `yaml:"rpc"` + Prometheus Prometheus `yaml:"prometheus"` + RateLimiter RateLimiter `yaml:"rateLimiter"` + CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"` } type Friend struct { - RPC RPC `yaml:"rpc"` - Prometheus Prometheus `yaml:"prometheus"` + RPC RPC `yaml:"rpc"` + Prometheus Prometheus `yaml:"prometheus"` + RateLimiter RateLimiter `yaml:"rateLimiter"` + CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"` } type Group struct { - RPC RPC `yaml:"rpc"` - Prometheus Prometheus `yaml:"prometheus"` - EnableHistoryForNewMembers bool `yaml:"enableHistoryForNewMembers"` + RPC RPC `yaml:"rpc"` + Prometheus Prometheus `yaml:"prometheus"` + EnableHistoryForNewMembers bool `yaml:"enableHistoryForNewMembers"` + RateLimiter RateLimiter `yaml:"rateLimiter"` + CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"` } type Msg struct { - RPC RPC `yaml:"rpc"` - Prometheus Prometheus `yaml:"prometheus"` - FriendVerify bool `yaml:"friendVerify"` + RPC RPC `yaml:"rpc"` + Prometheus Prometheus `yaml:"prometheus"` + FriendVerify bool `yaml:"friendVerify"` + RateLimiter RateLimiter `yaml:"rateLimiter"` + CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"` } type Third struct { @@ -284,6 +317,8 @@ type Third struct { Kodo Kodo `yaml:"kodo"` Aws Aws `yaml:"aws"` } `yaml:"object"` + RateLimiter RateLimiter `yaml:"rateLimiter"` + CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"` } type Cos struct { BucketURL string `yaml:"bucketURL"` @@ -322,8 +357,10 @@ type Aws struct { } type User struct { - RPC RPC `yaml:"rpc"` - Prometheus Prometheus `yaml:"prometheus"` + RPC RPC `yaml:"rpc"` + Prometheus Prometheus `yaml:"prometheus"` + RateLimiter RateLimiter `yaml:"rateLimiter"` + CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"` } type RPC struct { diff --git a/pkg/common/startrpc/circuitbreaker.go b/pkg/common/startrpc/circuitbreaker.go new file mode 100644 index 000000000..060a3aa8e --- /dev/null +++ b/pkg/common/startrpc/circuitbreaker.go @@ -0,0 +1,107 @@ +package startrpc + +import ( + "context" + "time" + + "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/stability/circuitbreaker" + "github.com/openimsdk/tools/stability/circuitbreaker/sre" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type CircuitBreaker struct { + Enable bool `yaml:"enable"` + Success float64 `yaml:"success"` // success rate threshold (0.0-1.0) + Request int64 `yaml:"request"` // request threshold + Bucket int `yaml:"bucket"` // number of buckets + Window time.Duration `yaml:"window"` // time window for statistics +} + +func NewCircuitBreaker(config *CircuitBreaker) circuitbreaker.CircuitBreaker { + if !config.Enable { + return nil + } + + return sre.NewSREBraker( + sre.WithWindow(config.Window), + sre.WithBucket(config.Bucket), + sre.WithSuccess(config.Success), + sre.WithRequest(config.Request), + ) +} + +func UnaryCircuitBreakerInterceptor(breaker circuitbreaker.CircuitBreaker) grpc.ServerOption { + if breaker == nil { + return grpc.ChainUnaryInterceptor(func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) { + return handler(ctx, req) + }) + } + + return grpc.ChainUnaryInterceptor(func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) { + if err := breaker.Allow(); err != nil { + log.ZWarn(ctx, "rpc circuit breaker open", err, "method", info.FullMethod) + return nil, status.Error(codes.Unavailable, "service unavailable due to circuit breaker") + } + + resp, err = handler(ctx, req) + + if err != nil { + if st, ok := status.FromError(err); ok { + switch st.Code() { + case codes.OK: + breaker.MarkSuccess() + case codes.InvalidArgument, codes.NotFound, codes.AlreadyExists, codes.PermissionDenied: + breaker.MarkSuccess() + default: + breaker.MarkFailed() + } + } else { + breaker.MarkFailed() + } + } else { + breaker.MarkSuccess() + } + + return resp, err + + }) +} + +func StreamCircuitBreakerInterceptor(breaker circuitbreaker.CircuitBreaker) grpc.ServerOption { + if breaker == nil { + return grpc.ChainStreamInterceptor(func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + return handler(srv, ss) + }) + } + + return grpc.ChainStreamInterceptor(func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + if err := breaker.Allow(); err != nil { + log.ZWarn(ss.Context(), "rpc circuit breaker open", err, "method", info.FullMethod) + return status.Error(codes.Unavailable, "service unavailable due to circuit breaker") + } + + err := handler(srv, ss) + + if err != nil { + if st, ok := status.FromError(err); ok { + switch st.Code() { + case codes.OK: + breaker.MarkSuccess() + case codes.InvalidArgument, codes.NotFound, codes.AlreadyExists, codes.PermissionDenied: + breaker.MarkSuccess() + default: + breaker.MarkFailed() + } + } else { + breaker.MarkFailed() + } + } else { + breaker.MarkSuccess() + } + + return err + }) +} diff --git a/pkg/common/startrpc/ratelimit.go b/pkg/common/startrpc/ratelimit.go new file mode 100644 index 000000000..1c2ac8eae --- /dev/null +++ b/pkg/common/startrpc/ratelimit.go @@ -0,0 +1,70 @@ +package startrpc + +import ( + "context" + "time" + + "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/stability/ratelimit" + "github.com/openimsdk/tools/stability/ratelimit/bbr" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type RateLimiter struct { + Enable bool + Window time.Duration + Bucket int + CPUThreshold int64 +} + +func NewRateLimiter(config *RateLimiter) ratelimit.Limiter { + if !config.Enable { + return nil + } + + return bbr.NewBBRLimiter( + bbr.WithWindow(config.Window), + bbr.WithBucket(config.Bucket), + bbr.WithCPUThreshold(config.CPUThreshold), + ) +} + +func UnaryRateLimitInterceptor(limiter ratelimit.Limiter) grpc.ServerOption { + if limiter == nil { + return grpc.ChainUnaryInterceptor(func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) { + return handler(ctx, req) + }) + } + + return grpc.ChainUnaryInterceptor(func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) { + done, err := limiter.Allow() + if err != nil { + log.ZWarn(ctx, "rpc rate limited", err, "method", info.FullMethod) + return nil, status.Errorf(codes.ResourceExhausted, "rpc request rate limit exceeded: %v, please try again later", err) + } + + defer done(ratelimit.DoneInfo{}) + return handler(ctx, req) + }) +} + +func StreamRateLimitInterceptor(limiter ratelimit.Limiter) grpc.ServerOption { + if limiter == nil { + return grpc.ChainStreamInterceptor(func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + return handler(srv, ss) + }) + } + + return grpc.ChainStreamInterceptor(func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + done, err := limiter.Allow() + if err != nil { + log.ZWarn(ss.Context(), "rpc rate limited", err, "method", info.FullMethod) + return status.Errorf(codes.ResourceExhausted, "rpc request rate limit exceeded: %v, please try again later", err) + } + defer done(ratelimit.DoneInfo{}) + + return handler(srv, ss) + }) +} diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index 4cb03d685..03afd2dc9 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -33,7 +33,7 @@ func init() { prommetrics.RegistryAll() } -func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *conf.Prometheus, listenIP, +func Start[T any](ctx context.Context, disc *conf.Discovery, circuitBreakerConfig *conf.CircuitBreaker, rateLimiterConfig *conf.RateLimiter, prometheusConfig *conf.Prometheus, listenIP, registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, notification *conf.Notification, config T, watchConfigNames []string, watchServiceNames []string, rpcFn func(ctx context.Context, config T, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error, @@ -70,6 +70,45 @@ func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *c } } + if circuitBreakerConfig != nil && circuitBreakerConfig.Enable { + cb := &CircuitBreaker{ + Enable: circuitBreakerConfig.Enable, + Success: circuitBreakerConfig.Success, + Request: circuitBreakerConfig.Request, + Bucket: circuitBreakerConfig.Bucket, + Window: circuitBreakerConfig.Window, + } + + breaker := NewCircuitBreaker(cb) + + options = append(options, + UnaryCircuitBreakerInterceptor(breaker), + StreamCircuitBreakerInterceptor(breaker), + ) + + log.ZInfo(ctx, "RPC circuit breaker enabled", + "service", rpcRegisterName, + "window", circuitBreakerConfig.Window, + "bucket", circuitBreakerConfig.Bucket, + "success", circuitBreakerConfig.Success, + "requestThreshold", circuitBreakerConfig.Request) + } + + if rateLimiterConfig != nil && rateLimiterConfig.Enable { + limiter := NewRateLimiter((*RateLimiter)(rateLimiterConfig)) + + options = append(options, + UnaryRateLimitInterceptor(limiter), + StreamRateLimitInterceptor(limiter), + ) + + log.ZInfo(ctx, "RPC rate limiter enabled", + "service", rpcRegisterName, + "window", rateLimiterConfig.Window, + "bucket", rateLimiterConfig.Bucket, + "cpuThreshold", rateLimiterConfig.CPUThreshold) + } + registerIP, err := network.GetRpcRegisterIP(registerIP) if err != nil { return err @@ -109,7 +148,7 @@ func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *c go func() { sigs := make(chan os.Signal, 1) - signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL) + signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT) select { case <-ctx.Done(): return diff --git a/pkg/common/storage/cache/conversation.go b/pkg/common/storage/cache/conversation.go index 20506e197..0108a8baa 100644 --- a/pkg/common/storage/cache/conversation.go +++ b/pkg/common/storage/cache/conversation.go @@ -2,6 +2,7 @@ package cache import ( "context" + relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" ) @@ -43,7 +44,7 @@ type ConversationCache interface { GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) DelConversationNotReceiveMessageUserIDs(conversationIDs ...string) ConversationCache DelConversationNotNotifyMessageUserIDs(userIDs ...string) ConversationCache - DelConversationPinnedMessageUserIDs(userIDs ...string) ConversationCache + DelUserPinnedConversations(userIDs ...string) ConversationCache DelConversationVersionUserIDs(userIDs ...string) ConversationCache FindMaxConversationUserVersion(ctx context.Context, userID string) (*relationtb.VersionLog, error) diff --git a/pkg/common/storage/cache/redis/conversation.go b/pkg/common/storage/cache/redis/conversation.go index 3453b570d..16d67322b 100644 --- a/pkg/common/storage/cache/redis/conversation.go +++ b/pkg/common/storage/cache/redis/conversation.go @@ -253,7 +253,7 @@ func (c *ConversationRedisCache) DelConversationNotNotifyMessageUserIDs(userIDs return cache } -func (c *ConversationRedisCache) DelConversationPinnedMessageUserIDs(userIDs ...string) cache.ConversationCache { +func (c *ConversationRedisCache) DelUserPinnedConversations(userIDs ...string) cache.ConversationCache { cache := c.CloneConversationCache() for _, userID := range userIDs { cache.AddKeys(c.getPinnedConversationIDsKey(userID)) diff --git a/pkg/common/storage/controller/conversation.go b/pkg/common/storage/controller/conversation.go index c20b4113c..508797bbb 100644 --- a/pkg/common/storage/controller/conversation.go +++ b/pkg/common/storage/controller/conversation.go @@ -64,6 +64,8 @@ type ConversationDatabase interface { GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error) // FindRandConversation finds random conversations based on the specified timestamp and limit. FindRandConversation(ctx context.Context, ts int64, limit int) ([]*relationtb.Conversation, error) + + DeleteUsersConversations(ctx context.Context, userID string, conversationIDs []string) (err error) } func NewConversationDatabase(conversation database.Conversation, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase { @@ -106,7 +108,7 @@ func (c *conversationDatabase) SetUsersConversationFieldTx(ctx context.Context, cache = cache.DelConversationNotNotifyMessageUserIDs(userIDs...) } if _, ok := fieldMap["is_pinned"]; ok { - cache = cache.DelConversationPinnedMessageUserIDs(userIDs...) + cache = cache.DelUserPinnedConversations(userIDs...) } cache = cache.DelConversationVersionUserIDs(haveUserIDs...) } @@ -158,7 +160,7 @@ func (c *conversationDatabase) UpdateUsersConversationField(ctx context.Context, cache = cache.DelConversationNotNotifyMessageUserIDs(userIDs...) } if _, ok := args["is_pinned"]; ok { - cache = cache.DelConversationPinnedMessageUserIDs(userIDs...) + cache = cache.DelUserPinnedConversations(userIDs...) } return cache.ChainExecDel(ctx) } @@ -189,7 +191,7 @@ func (c *conversationDatabase) CreateConversation(ctx context.Context, conversat DelUserConversationIDsHash(userIDs...). DelConversationVersionUserIDs(userIDs...). DelConversationNotNotifyMessageUserIDs(notNotifyUserIDs...). - DelConversationPinnedMessageUserIDs(pinnedUserIDs...). + DelUserPinnedConversations(pinnedUserIDs...). ChainExecDel(ctx) } @@ -245,7 +247,7 @@ func (c *conversationDatabase) SetUserConversations(ctx context.Context, ownerUs cache := c.cache.CloneConversationCache() cache = cache.DelConversationVersionUserIDs(ownerUserID). DelConversationNotNotifyMessageUserIDs(ownerUserID). - DelConversationPinnedMessageUserIDs(ownerUserID) + DelUserPinnedConversations(ownerUserID) groupIDs := datautil.Distinct(datautil.Filter(conversations, func(e *relationtb.Conversation) (string, bool) { return e.GroupID, e.GroupID != "" @@ -415,3 +417,21 @@ func (c *conversationDatabase) GetPinnedConversationIDs(ctx context.Context, use func (c *conversationDatabase) FindRandConversation(ctx context.Context, ts int64, limit int) ([]*relationtb.Conversation, error) { return c.conversationDB.FindRandConversation(ctx, ts, limit) } + +func (c *conversationDatabase) DeleteUsersConversations(ctx context.Context, userID string, conversationIDs []string) (err error) { + return c.tx.Transaction(ctx, func(ctx context.Context) error { + err = c.conversationDB.DeleteUsersConversations(ctx, userID, conversationIDs) + if err != nil { + return err + } + cache := c.cache.CloneConversationCache() + cache = cache.DelConversations(userID, conversationIDs...). + DelConversationVersionUserIDs(userID). + DelConversationIDs(userID). + DelUserConversationIDsHash(userID). + DelConversationNotNotifyMessageUserIDs(userID). + DelUserPinnedConversations(userID) + + return cache.ChainExecDel(ctx) + }) +} diff --git a/pkg/common/storage/database/conversation.go b/pkg/common/storage/database/conversation.go index 6042b8aa1..4f5358504 100644 --- a/pkg/common/storage/database/conversation.go +++ b/pkg/common/storage/database/conversation.go @@ -30,4 +30,5 @@ type Conversation interface { GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error) FindRandConversation(ctx context.Context, ts int64, limit int) ([]*model.Conversation, error) + DeleteUsersConversations(ctx context.Context, userID string, conversationIDs []string) (err error) } diff --git a/pkg/common/storage/database/mgo/conversation.go b/pkg/common/storage/database/mgo/conversation.go index dac51b06d..c7db196e8 100644 --- a/pkg/common/storage/database/mgo/conversation.go +++ b/pkg/common/storage/database/mgo/conversation.go @@ -294,3 +294,20 @@ func (c *ConversationMgo) FindRandConversation(ctx context.Context, ts int64, li } return mongoutil.Aggregate[*model.Conversation](ctx, c.coll, pipeline) } + +func (c *ConversationMgo) DeleteUsersConversations(ctx context.Context, userID string, conversationIDs []string) (err error) { + if len(conversationIDs) == 0 { + return nil + } + return mongoutil.IncrVersion(func() error { + err := mongoutil.DeleteMany(ctx, c.coll, bson.M{"owner_user_id": userID, "conversation_id": bson.M{"$in": conversationIDs}}) + return err + }, func() error { + for _, conversationID := range conversationIDs { + if err := c.version.IncrVersion(ctx, userID, []string{conversationID}, model.VersionStateDelete); err != nil { + return err + } + } + return nil + }) +}