diff --git a/.github/workflows/publish-docker-image.yml b/.github/workflows/publish-docker-image.yml index 998a11cf3..4cd3316dd 100644 --- a/.github/workflows/publish-docker-image.yml +++ b/.github/workflows/publish-docker-image.yml @@ -4,42 +4,80 @@ on: push: branches: - release-* - # tags: - # - 'v*' - release: types: [published] - workflow_dispatch: inputs: tag: description: "Tag version to be used for Docker image" required: true - default: "v3.8.0" + default: "v3.8.3" + +env: + GO_VERSION: "1.22" + IMAGE_NAME: "openim-server" + # IMAGE_NAME: ${{ github.event.repository.name }} + DOCKER_BUILDKIT: 1 jobs: - build-and-test: + publish-docker-images: runs-on: ubuntu-latest + if: ${{ !(github.event_name == 'pull_request' && github.event.pull_request.merged == false) }} steps: - - uses: actions/checkout@v4 + - name: Checkout main repository + uses: actions/checkout@v4 with: path: main-repo - # - name: Set up QEMU - # uses: docker/setup-qemu-action@v3.3.0 + - name: Set up QEMU + uses: docker/setup-qemu-action@v3.3.0 - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v3.8.0 - - - name: Build Docker image - id: build - uses: docker/build-push-action@v5 + id: buildx + uses: docker/setup-buildx-action@v3 with: - context: ./main-repo - load: true - tags: "openim/openim-server:local" - cache-from: type=gha,scope=build - cache-to: type=gha,mode=max,scope=build + driver-opts: network=host + + - name: Extract metadata for Docker + id: meta + uses: docker/metadata-action@v5.6.0 + with: + images: | + ${{ secrets.DOCKER_USERNAME }}/${{ env.IMAGE_NAME }} + ghcr.io/${{ github.repository_owner }}/${{ env.IMAGE_NAME }} + registry.cn-hangzhou.aliyuncs.com/openimsdk/${{ env.IMAGE_NAME }} + tags: | + type=ref,event=tag + type=schedule + type=ref,event=branch + type=ref,event=pr + type=semver,pattern={{version}} + type=semver,pattern=v{{version}} + type=semver,pattern={{major}}.{{minor}} + type=semver,pattern={{major}} + type=sha + + - name: Install skopeo + run: | + sudo apt-get update && sudo apt-get install -y skopeo + + - name: Build multi-arch images as OCI + run: | + mkdir -p /tmp/oci-image /tmp/docker-cache + + # Build multi-architecture image and save in OCI format + docker buildx build \ + --platform linux/amd64,linux/arm64 \ + --output type=oci,dest=/tmp/oci-image/multi-arch.tar \ + --cache-to type=local,dest=/tmp/docker-cache \ + --cache-from type=gha \ + ./main-repo + + # Use skopeo to convert the amd64 image from OCI format to Docker format and load it + skopeo copy --override-arch amd64 oci-archive:/tmp/oci-image/multi-arch.tar docker-daemon:${{ secrets.DOCKER_USERNAME }}/${{ env.IMAGE_NAME }}:local + + # check image + docker image ls | grep openim - name: Checkout compose repository uses: actions/checkout@v4 @@ -52,11 +90,11 @@ jobs: run: | IP=$(hostname -I | awk '{print $1}') echo "The IP Address is: $IP" - echo "::set-output name=ip::$IP" + echo "ip=$IP" >> $GITHUB_OUTPUT - name: Update .env to use the local image run: | - sed -i 's|OPENIM_SERVER_IMAGE=.*|OPENIM_SERVER_IMAGE=openim/openim-server:local|' ${{ github.workspace }}/compose-repo/.env + sed -i 's|OPENIM_SERVER_IMAGE=.*|OPENIM_SERVER_IMAGE=${{ secrets.DOCKER_USERNAME }}/${{ env.IMAGE_NAME }}:local|' ${{ github.workspace }}/compose-repo/.env sed -i 's|MINIO_EXTERNAL_ADDRESS=.*|MINIO_EXTERNAL_ADDRESS=http://${{ steps.get-ip.outputs.ip }}:10005|' ${{ github.workspace }}/compose-repo/.env - name: Start services using Docker Compose @@ -66,23 +104,34 @@ jobs: docker compose ps - - name: Extract metadata for Docker (tags, labels) - id: meta - uses: docker/metadata-action@v5.6.0 - with: - images: | - openim/openim-server - ghcr.io/openimsdk/openim-server - registry.cn-hangzhou.aliyuncs.com/openimsdk/openim-server - tags: | - type=ref,event=tag - type=schedule - type=ref,event=branch - # type=semver,pattern={{version}} - type=semver,pattern=v{{version}} - type=semver,pattern=release-{{raw}} - type=sha - type=raw,value=${{ github.event.inputs.tag }} + # - name: Check openim-server health + # run: | + # timeout=300 + # interval=30 + # elapsed=0 + # while [[ $elapsed -le $timeout ]]; do + # if ! docker exec openim-server mage check; then + # echo "openim-server is not ready, waiting..." + # sleep $interval + # elapsed=$(($elapsed + $interval)) + # else + # echo "Health check successful" + # exit 0 + # fi + # done + # echo "Health check failed after 5 minutes" + # exit 1 + + # - name: Check openim-chat health + # if: success() + # run: | + # if ! docker exec openim-chat mage check; then + # echo "openim-chat check failed" + # exit 1 + # else + # echo "Health check successful" + # exit 0 + # fi - name: Log in to Docker Hub uses: docker/login-action@v3.3.0 @@ -104,22 +153,27 @@ jobs: username: ${{ secrets.ALIREGISTRY_USERNAME }} password: ${{ secrets.ALIREGISTRY_TOKEN }} - - name: Push Docker images - uses: docker/build-push-action@v5 - with: - context: ./main-repo - push: true - platforms: linux/amd64,linux/arm64 - tags: ${{ steps.meta.outputs.tags }} - labels: ${{ steps.meta.outputs.labels }} - cache-from: type=gha,scope=build - cache-to: type=gha,mode=max,scope=build + - name: Push multi-architecture images + if: success() + run: | + docker buildx build \ + --platform linux/amd64,linux/arm64 \ + $(echo "${{ steps.meta.outputs.tags }}" | sed 's/,/ --tag /g' | sed 's/^/--tag /') \ + --cache-from type=local,src=/tmp/docker-cache \ + --push \ + ./main-repo - name: Verify multi-platform support run: | - images=("openim/openim-server" "ghcr.io/openimsdk/openim-server" "registry.cn-hangzhou.aliyuncs.com/openimsdk/openim-server") + images=( + "${{ secrets.DOCKER_USERNAME }}/${{ env.IMAGE_NAME }}" + "ghcr.io/${{ github.repository_owner }}/${{ env.IMAGE_NAME }}" + "registry.cn-hangzhou.aliyuncs.com/openimsdk/${{ env.IMAGE_NAME }}" + ) + for image in "${images[@]}"; do - for tag in $(echo "${{ steps.meta.outputs.tags }}" | tr ',' '\n'); do + for tag in $(echo "${{ steps.meta.outputs.tags }}" | tr ',' '\n' | cut -d':' -f2); do + echo "Verifying multi-arch support for $image:$tag" manifest=$(docker manifest inspect "$image:$tag" || echo "error") if [[ "$manifest" == "error" ]]; then echo "Manifest not found for $image:$tag" @@ -135,5 +189,6 @@ jobs: echo "Multi-platform support check failed for $image:$tag - missing arm64" exit 1 fi + echo "✅ $image:$tag supports both amd64 and arm64 architectures" done done diff --git a/.github/workflows/update-version-file-on-release.yml b/.github/workflows/update-version-file-on-release.yml index b7e2a4409..039ff3d4b 100644 --- a/.github/workflows/update-version-file-on-release.yml +++ b/.github/workflows/update-version-file-on-release.yml @@ -96,13 +96,13 @@ jobs: repo, per_page: 100 }); - + release = releases.data.find(r => r.draft && r.tag_name === tagName); if (!release) { throw new Error(`No release found with tag ${tagName}`); } } - + await github.rest.repos.updateRelease({ owner, repo, @@ -110,10 +110,10 @@ jobs: draft: false, prerelease: release.prerelease }); - + const status = release.draft ? "was draft" : "was already published"; core.info(`Release ${tagName} ensured to be published (${status}).`); - + } catch (error) { core.warning(`Could not find or update release for tag ${tagName}: ${error.message}`); } diff --git a/config/openim-api.yml b/config/openim-api.yml index 103c36f95..df0177d24 100644 --- a/config/openim-api.yml +++ b/config/openim-api.yml @@ -17,3 +17,13 @@ prometheus: ports: # This address can be accessed via a browser grafanaURL: + +ratelimiter: + # Whether to enable rate limiting + enable: false + # WindowSize defines time duration per window + window: 20s + # BucketNum defines bucket number for each window + bucket: 500 + # CPU threshold; valid range 0–1000 (1000 = 100%) + cpuThreshold: 850 diff --git a/config/openim-msggateway.yml b/config/openim-msggateway.yml index 812df90f7..8ac07faae 100644 --- a/config/openim-msggateway.yml +++ b/config/openim-msggateway.yml @@ -26,3 +26,20 @@ longConnSvr: websocketMaxMsgLen: 4096 # WebSocket connection handshake timeout in seconds websocketTimeout: 10 + +ratelimiter: + # Whether to enable rate limiting + enable: false + # WindowSize defines time duration per window + window: 20s + # BucketNum defines bucket number for each window + bucket: 500 + # CPU threshold; valid range 0–1000 (1000 = 100%) + cpuThreshold: 850 + +circuitBreaker: + enable: false + window: 5s # Time window size (seconds) + bucket: 100 # Number of buckets + success: 0.6 # Success rate threshold (0.6 means 60%) + request: 500 # Request threshold; circuit breaker evaluation occurs when reached \ No newline at end of file diff --git a/config/openim-msgtransfer.yml b/config/openim-msgtransfer.yml index 52d6a805e..c6ea06803 100644 --- a/config/openim-msgtransfer.yml +++ b/config/openim-msgtransfer.yml @@ -6,3 +6,20 @@ prometheus: # List of ports that Prometheus listens on; each port corresponds to an instance of monitoring. Ensure these are managed accordingly # It will only take effect when autoSetPorts is set to false. ports: + +ratelimiter: + # Whether to enable rate limiting + enable: false + # WindowSize defines time duration per window + window: 20s + # BucketNum defines bucket number for each window + bucket: 500 + # CPU threshold; valid range 0–1000 (1000 = 100%) + cpuThreshold: 850 + +circuitBreaker: + enable: false + window: 5s # Time window size (seconds) + bucket: 100 # Number of buckets + success: 0.6 # Success rate threshold (0.6 means 60%) + request: 500 # Request threshold; circuit breaker evaluation occurs when reached \ No newline at end of file diff --git a/config/openim-push.yml b/config/openim-push.yml index 5db5b541a..58784f13d 100644 --- a/config/openim-push.yml +++ b/config/openim-push.yml @@ -10,10 +10,26 @@ rpc: # It will only take effect when autoSetPorts is set to false. ports: +ratelimiter: + # Whether to enable rate limiting + enable: false + # WindowSize defines time duration per window + window: 20s + # BucketNum defines bucket number for each window + bucket: 500 + # CPU threshold; valid range 0–1000 (1000 = 100%) + cpuThreshold: 850 + +circuitBreaker: + enable: false + window: 5s # Time window size (seconds) + bucket: 100 # Number of buckets + success: 0.6 # Success rate threshold (0.6 means 60%) + request: 500 # Request threshold; circuit breaker evaluation occurs when reached prometheus: # Enable or disable Prometheus monitoring - enable: true + enable: false # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup # It will only take effect when autoSetPorts is set to false. ports: diff --git a/config/openim-rpc-auth.yml b/config/openim-rpc-auth.yml index c42e556c4..09ceef0f4 100644 --- a/config/openim-rpc-auth.yml +++ b/config/openim-rpc-auth.yml @@ -20,3 +20,20 @@ prometheus: tokenPolicy: # Token validity period, in days expire: 90 + +ratelimiter: + # Whether to enable rate limiting + enable: false + # WindowSize defines time duration per window + window: 20s + # BucketNum defines bucket number for each window + bucket: 500 + # CPU threshold; valid range 0–1000 (1000 = 100%) + cpuThreshold: 850 + +circuitBreaker: + enable: false + window: 5s # Time window size (seconds) + bucket: 100 # Number of buckets + success: 0.6 # Success rate threshold (0.6 means 60%) + request: 500 # Request threshold; circuit breaker evaluation occurs when reached \ No newline at end of file diff --git a/config/openim-rpc-conversation.yml b/config/openim-rpc-conversation.yml index e722ac2b0..1135ffe6e 100644 --- a/config/openim-rpc-conversation.yml +++ b/config/openim-rpc-conversation.yml @@ -16,3 +16,20 @@ prometheus: # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup # It will only take effect when autoSetPorts is set to false. ports: + +ratelimiter: + # Whether to enable rate limiting + enable: false + # WindowSize defines time duration per window + window: 20s + # BucketNum defines bucket number for each window + bucket: 500 + # CPU threshold; valid range 0–1000 (1000 = 100%) + cpuThreshold: 850 + +circuitBreaker: + enable: false + window: 5s # Time window size (seconds) + bucket: 100 # Number of buckets + success: 0.6 # Success rate threshold (0.6 means 60%) + request: 500 # Request threshold; circuit breaker evaluation occurs when reached \ No newline at end of file diff --git a/config/openim-rpc-friend.yml b/config/openim-rpc-friend.yml index e722ac2b0..1135ffe6e 100644 --- a/config/openim-rpc-friend.yml +++ b/config/openim-rpc-friend.yml @@ -16,3 +16,20 @@ prometheus: # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup # It will only take effect when autoSetPorts is set to false. ports: + +ratelimiter: + # Whether to enable rate limiting + enable: false + # WindowSize defines time duration per window + window: 20s + # BucketNum defines bucket number for each window + bucket: 500 + # CPU threshold; valid range 0–1000 (1000 = 100%) + cpuThreshold: 850 + +circuitBreaker: + enable: false + window: 5s # Time window size (seconds) + bucket: 100 # Number of buckets + success: 0.6 # Success rate threshold (0.6 means 60%) + request: 500 # Request threshold; circuit breaker evaluation occurs when reached \ No newline at end of file diff --git a/config/openim-rpc-group.yml b/config/openim-rpc-group.yml index 252f64c28..154b149e1 100644 --- a/config/openim-rpc-group.yml +++ b/config/openim-rpc-group.yml @@ -19,3 +19,20 @@ prometheus: enableHistoryForNewMembers: true + +ratelimiter: + # Whether to enable rate limiting + enable: false + # WindowSize defines time duration per window + window: 20s + # BucketNum defines bucket number for each window + bucket: 500 + # CPU threshold; valid range 0–1000 (1000 = 100%) + cpuThreshold: 850 + +circuitBreaker: + enable: false + window: 5s # Time window size (seconds) + bucket: 100 # Number of buckets + success: 0.6 # Success rate threshold (0.6 means 60%) + request: 500 # Request threshold; circuit breaker evaluation occurs when reached \ No newline at end of file diff --git a/config/openim-rpc-msg.yml b/config/openim-rpc-msg.yml index 17fd3b8f4..e05eeb7e4 100644 --- a/config/openim-rpc-msg.yml +++ b/config/openim-rpc-msg.yml @@ -1,6 +1,6 @@ rpc: # The IP address where this RPC service registers itself; if left blank, it defaults to the internal network IP - registerIP: + registerIP: # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP listenIP: 0.0.0.0 # autoSetPorts indicates whether to automatically set the ports @@ -20,3 +20,20 @@ prometheus: # Does sending messages require friend verification friendVerify: false + +ratelimiter: + # Whether to enable rate limiting + enable: false + # WindowSize defines time duration per window + window: 20s + # BucketNum defines bucket number for each window + bucket: 500 + # CPU threshold; valid range 0–1000 (1000 = 100%) + cpuThreshold: 850 + +circuitBreaker: + enable: false + window: 5s # Time window size (seconds) + bucket: 100 # Number of buckets + success: 0.6 # Success rate threshold (0.6 means 60%) + request: 500 # Request threshold; circuit breaker evaluation occurs when reached \ No newline at end of file diff --git a/config/openim-rpc-third.yml b/config/openim-rpc-third.yml index 7169e6c61..f6fbee695 100644 --- a/config/openim-rpc-third.yml +++ b/config/openim-rpc-third.yml @@ -17,6 +17,22 @@ prometheus: # It will only take effect when autoSetPorts is set to false. ports: +ratelimiter: + # Whether to enable rate limiting + enable: false + # WindowSize defines time duration per window + window: 20s + # BucketNum defines bucket number for each window + bucket: 500 + # CPU threshold; valid range 0–1000 (1000 = 100%) + cpuThreshold: 850 + +circuitBreaker: + enable: false + window: 5s # Time window size (seconds) + bucket: 100 # Number of buckets + success: 0.6 # Success rate threshold (0.6 means 60%) + request: 500 # Request threshold; circuit breaker evaluation occurs when reached object: # Use MinIO as object storage, or set to "cos", "oss", "kodo", "aws", while also configuring the corresponding settings diff --git a/config/openim-rpc-user.yml b/config/openim-rpc-user.yml index 337cacd35..c46db7f37 100644 --- a/config/openim-rpc-user.yml +++ b/config/openim-rpc-user.yml @@ -16,3 +16,20 @@ prometheus: # Prometheus listening ports, must be consistent with the number of rpc.ports # It will only take effect when autoSetPorts is set to false. ports: + +ratelimiter: + # Whether to enable rate limiting + enable: false + # WindowSize defines time duration per window + window: 20s + # BucketNum defines bucket number for each window + bucket: 500 + # CPU threshold; valid range 0–1000 (1000 = 100%) + cpuThreshold: 850 + +circuitBreaker: + enable: false + window: 5s # Time window size (seconds) + bucket: 100 # Number of buckets + success: 0.6 # Success rate threshold (0.6 means 60%) + request: 500 # Request threshold; circuit breaker evaluation occurs when reached \ No newline at end of file diff --git a/config/webhooks.yml b/config/webhooks.yml index 9fd3eb339..c1645e7c8 100644 --- a/config/webhooks.yml +++ b/config/webhooks.yml @@ -41,6 +41,9 @@ afterSendGroupMsg: attentionIds: [] # See beforeSendSingleMsg comment. deniedTypes: [] +afterMsgSaveDB: + enable: false + timeout: 5 afterUserOnline: enable: false timeout: 5 diff --git a/go.mod b/go.mod index c0c5e36a8..f8c5fa73c 100644 --- a/go.mod +++ b/go.mod @@ -12,8 +12,8 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.73-alpha.14 - github.com/openimsdk/tools v0.0.50-alpha.103 + github.com/openimsdk/protocol v0.0.73-alpha.17 + github.com/openimsdk/tools v0.0.50-alpha.105 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.10.0 @@ -27,6 +27,7 @@ require ( require github.com/google/uuid v1.6.0 require ( + github.com/IBM/sarama v1.43.0 github.com/fatih/color v1.14.1 github.com/gin-contrib/gzip v1.0.1 github.com/go-redis/redis v6.15.9+incompatible @@ -54,7 +55,6 @@ require ( cloud.google.com/go/iam v1.1.7 // indirect cloud.google.com/go/longrunning v0.5.5 // indirect cloud.google.com/go/storage v1.40.0 // indirect - github.com/IBM/sarama v1.43.0 // indirect github.com/MicahParks/keyfunc v1.9.0 // indirect github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible // indirect github.com/aws/aws-sdk-go-v2 v1.32.5 // indirect @@ -135,6 +135,7 @@ require ( github.com/leodido/go-urn v1.4.0 // indirect github.com/lestrrat-go/strftime v1.0.6 // indirect github.com/lithammer/shortuuid v3.0.0+incompatible // indirect + github.com/lufia/plan9stats v0.0.0-20230110061619-bbe2e5e100de // indirect github.com/magefile/mage v1.15.0 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/mailru/easyjson v0.7.7 // indirect @@ -151,6 +152,7 @@ require ( github.com/pelletier/go-toml/v2 v2.2.2 // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b // indirect github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/common v0.45.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect @@ -160,6 +162,8 @@ require ( github.com/sagikazarmark/locafero v0.4.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect github.com/sercand/kuberesolver/v6 v6.0.1 // indirect + github.com/shirou/gopsutil/v3 v3.24.5 // indirect + github.com/shoenig/go-m1cpu v0.1.6 // indirect github.com/sourcegraph/conc v0.3.0 // indirect github.com/spf13/afero v1.11.0 // indirect github.com/spf13/cast v1.6.0 // indirect diff --git a/go.sum b/go.sum index c6de2aa38..ea874f8bc 100644 --- a/go.sum +++ b/go.sum @@ -303,6 +303,8 @@ github.com/likexian/gokit v0.25.13 h1:p2Uw3+6fGG53CwdU2Dz0T6bOycdb2+bAFAa3ymwWVk github.com/likexian/gokit v0.25.13/go.mod h1:qQhEWFBEfqLCO3/vOEo2EDKd+EycekVtUK4tex+l2H4= github.com/lithammer/shortuuid v3.0.0+incompatible h1:NcD0xWW/MZYXEHa6ITy6kaXN5nwm/V115vj2YXfhS0w= github.com/lithammer/shortuuid v3.0.0+incompatible/go.mod h1:FR74pbAuElzOUuenUHTK2Tciko1/vKuIKS9dSkDrA4w= +github.com/lufia/plan9stats v0.0.0-20230110061619-bbe2e5e100de h1:V53FWzU6KAZVi1tPp5UIsMoUWJ2/PNwYIDXnu7QuBCE= +github.com/lufia/plan9stats v0.0.0-20230110061619-bbe2e5e100de/go.mod h1:JKx41uQRwqlTZabZc+kILPrO/3jlKnQ2Z8b7YiVw5cE= github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg= github.com/magefile/mage v1.15.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= @@ -347,10 +349,10 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.15-alpha.11 h1:PQudYDRESYeYlUYrrLLJhYIlUPO5x7FAx+o5El9U/Bw= github.com/openimsdk/gomake v0.0.15-alpha.11/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.73-alpha.14 h1:lv9wNiPRm6G7q74TfpMobKrSfeTaBlZ+Ps3O6UFPmaE= -github.com/openimsdk/protocol v0.0.73-alpha.14/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= -github.com/openimsdk/tools v0.0.50-alpha.103 h1:jYvI86cWiVu8a8iw1panw+pwIiStuUHF76h3fxA6ESI= -github.com/openimsdk/tools v0.0.50-alpha.103/go.mod h1:qCExFBqXpQBMzZck3XGIFwivBayAn2KNqB3WAd++IJw= +github.com/openimsdk/protocol v0.0.73-alpha.17 h1:ddo0QMns1GVwAmrPIPlAQ7uKmThAYLnOt+CIOgLsJyE= +github.com/openimsdk/protocol v0.0.73-alpha.17/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= +github.com/openimsdk/tools v0.0.50-alpha.105 h1:axuCvKXhxY2RGLhpMMFNgBtE0B65T2Sr1JDW3UD9nBs= +github.com/openimsdk/tools v0.0.50-alpha.105/go.mod h1:x9i/e+WJFW4tocy6RNJQ9NofQiP3KJ1Y576/06TqOG4= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= @@ -361,6 +363,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b h1:0LFwY6Q3gMACTjAbMZBjXAqTOzOwFaj2Ld6cjeQ7Rig= +github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= github.com/prometheus/client_golang v1.18.0 h1:HzFfmkOzH5Q8L8G+kSJKUx5dtG87sewO+FoDDqP5Tbk= @@ -397,6 +401,12 @@ github.com/sercand/kuberesolver/v6 v6.0.1 h1:XZUTA0gy/lgDYp/UhEwv7Js24F1j8NJ833Q github.com/sercand/kuberesolver/v6 v6.0.1/go.mod h1:C0tsTuRMONSY+Xf7pv7RMW1/JlewY1+wS8SZE+1lf1s= github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI= github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= +github.com/shirou/gopsutil/v3 v3.24.5 h1:i0t8kL+kQTvpAYToeuiVk3TgDeKOFioZO3Ztz/iZ9pI= +github.com/shirou/gopsutil/v3 v3.24.5/go.mod h1:bsoOS1aStSs9ErQ1WWfxllSeS1K5D+U30r2NfcubMVk= +github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM= +github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ= +github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU= +github.com/shoenig/test v0.6.4/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k= github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0= github.com/spf13/afero v1.11.0 h1:WJQKhtpdm3v2IzqG8VMqrr6Rf3UYpEF239Jy9wNepM8= @@ -548,6 +558,7 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/internal/api/conversation.go b/internal/api/conversation.go index 5a191c0ec..78affee44 100644 --- a/internal/api/conversation.go +++ b/internal/api/conversation.go @@ -76,3 +76,7 @@ func (o *ConversationApi) GetPinnedConversationIDs(c *gin.Context) { func (o *ConversationApi) UpdateConversationsByUser(c *gin.Context) { a2r.Call(c, conversation.ConversationClient.UpdateConversationsByUser, o.Client) } + +func (o *ConversationApi) DeleteConversations(c *gin.Context) { + a2r.Call(c, conversation.ConversationClient.DeleteConversations, o.Client) +} diff --git a/internal/api/ratelimit.go b/internal/api/ratelimit.go new file mode 100644 index 000000000..0bbf973de --- /dev/null +++ b/internal/api/ratelimit.go @@ -0,0 +1,83 @@ +package api + +import ( + "fmt" + "math" + "net/http" + "strconv" + "time" + + "github.com/gin-gonic/gin" + + "github.com/openimsdk/tools/apiresp" + "github.com/openimsdk/tools/errs" + "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/stability/ratelimit" + "github.com/openimsdk/tools/stability/ratelimit/bbr" +) + +type RateLimiter struct { + Enable bool `yaml:"enable"` + Window time.Duration `yaml:"window"` // time duration per window + Bucket int `yaml:"bucket"` // bucket number for each window + CPUThreshold int64 `yaml:"cpuThreshold"` // CPU threshold; valid range 0–1000 (1000 = 100%) +} + +func RateLimitMiddleware(config *RateLimiter) gin.HandlerFunc { + if !config.Enable { + return func(c *gin.Context) { + c.Next() + } + } + + limiter := bbr.NewBBRLimiter( + bbr.WithWindow(config.Window), + bbr.WithBucket(config.Bucket), + bbr.WithCPUThreshold(config.CPUThreshold), + ) + + return func(c *gin.Context) { + status := limiter.Stat() + + c.Header("X-BBR-CPU", strconv.FormatInt(status.CPU, 10)) + c.Header("X-BBR-MinRT", strconv.FormatInt(status.MinRt, 10)) + c.Header("X-BBR-MaxPass", strconv.FormatInt(status.MaxPass, 10)) + c.Header("X-BBR-MaxInFlight", strconv.FormatInt(status.MaxInFlight, 10)) + c.Header("X-BBR-InFlight", strconv.FormatInt(status.InFlight, 10)) + + done, err := limiter.Allow() + if err != nil { + + c.Header("X-RateLimit-Policy", "BBR") + c.Header("Retry-After", calculateBBRRetryAfter(status)) + c.Header("X-RateLimit-Limit", strconv.FormatInt(status.MaxInFlight, 10)) + c.Header("X-RateLimit-Remaining", "0") // There is no concept of remaining quota in BBR. + + fmt.Println("rate limited:", err, "path:", c.Request.URL.Path) + log.ZWarn(c, "rate limited", err, "path", c.Request.URL.Path) + c.AbortWithStatus(http.StatusTooManyRequests) + apiresp.GinError(c, errs.NewCodeError(http.StatusTooManyRequests, "too many requests, please try again later")) + return + } + + c.Next() + done(ratelimit.DoneInfo{}) + } +} + +func calculateBBRRetryAfter(status bbr.Stat) string { + loadRatio := float64(status.CPU) / float64(status.CPU) + + if loadRatio < 0.8 { + return "1" + } + if loadRatio < 0.95 { + return "2" + } + + backoff := 1 + int64(math.Pow(loadRatio-0.95, 2)*50) + if backoff > 5 { + backoff = 5 + } + return strconv.FormatInt(backoff, 10) +} diff --git a/internal/api/router.go b/internal/api/router.go index 1d3a92dd7..81787f9a9 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -97,6 +97,18 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, cf case BestSpeed: r.Use(gzip.Gzip(gzip.BestSpeed)) } + + // Use rate limiter middleware + if cfg.API.RateLimiter.Enable { + rl := &RateLimiter{ + Enable: cfg.API.RateLimiter.Enable, + Window: cfg.API.RateLimiter.Window, + Bucket: cfg.API.RateLimiter.Bucket, + CPUThreshold: cfg.API.RateLimiter.CPUThreshold, + } + r.Use(RateLimitMiddleware(rl)) + } + if config.Standalone() { r.Use(func(c *gin.Context) { c.Set(authverify.CtxAdminUserIDsKey, cfg.Share.IMAdminUser.UserIDs) @@ -277,6 +289,8 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, cf conversationGroup.POST("/get_owner_conversation", c.GetOwnerConversation) conversationGroup.POST("/get_not_notify_conversation_ids", c.GetNotNotifyConversationIDs) conversationGroup.POST("/get_pinned_conversation_ids", c.GetPinnedConversationIDs) + conversationGroup.POST("/delete_conversations", c.DeleteConversations) + conversationGroup.POST("/update_conversations_by_user", c.UpdateConversationsByUser) } { diff --git a/internal/msggateway/client.go b/internal/msggateway/client.go index bdb62aece..7b9f4bc0e 100644 --- a/internal/msggateway/client.go +++ b/internal/msggateway/client.go @@ -70,6 +70,7 @@ type Client struct { UserID string `json:"userID"` IsBackground bool `json:"isBackground"` SDKType string `json:"sdkType"` + SDKVersion string `json:"sdkVersion"` Encoder Encoder ctx *UserConnContext longConnServer LongConnServer @@ -97,6 +98,7 @@ func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, longConnServer c.closedErr = nil c.token = ctx.GetToken() c.SDKType = ctx.GetSDKType() + c.SDKVersion = ctx.GetSDKVersion() c.hbCtx, c.hbCancel = context.WithCancel(c.ctx) c.subLock = new(sync.Mutex) if c.subUserIDs != nil { diff --git a/internal/msggateway/constant.go b/internal/msggateway/constant.go index 1e7ab3bb7..3959e1138 100644 --- a/internal/msggateway/constant.go +++ b/internal/msggateway/constant.go @@ -28,6 +28,7 @@ const ( BackgroundStatus = "isBackground" SendResponse = "isMsgResp" SDKType = "sdkType" + SDKVersion = "sdkVersion" ) const ( diff --git a/internal/msggateway/context.go b/internal/msggateway/context.go index d73a96df4..37b5a7cdc 100644 --- a/internal/msggateway/context.go +++ b/internal/msggateway/context.go @@ -15,12 +15,13 @@ package msggateway import ( - "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" "net/http" "net/url" "strconv" "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" + "github.com/openimsdk/protocol/constant" "github.com/openimsdk/tools/utils/encrypt" "github.com/openimsdk/tools/utils/stringutil" @@ -140,6 +141,10 @@ func (c *UserConnContext) GetToken() string { return c.Req.URL.Query().Get(Token) } +func (c *UserConnContext) GetSDKVersion() string { + return c.Req.URL.Query().Get(SDKVersion) +} + func (c *UserConnContext) GetCompression() bool { compression, exists := c.Query(Compression) if exists && compression == GzipCompressionProtocol { diff --git a/internal/msggateway/ws_server.go b/internal/msggateway/ws_server.go index bc7a2fa5f..d490cc8b9 100644 --- a/internal/msggateway/ws_server.go +++ b/internal/msggateway/ws_server.go @@ -13,6 +13,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" "github.com/openimsdk/open-im-server/v3/pkg/rpccache" pbAuth "github.com/openimsdk/protocol/auth" + "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/mcontext" "github.com/go-playground/validator/v10" @@ -64,6 +65,8 @@ type WsServer struct { webhookClient *webhook.Client userClient *rpcli.UserClient authClient *rpcli.AuthClient + + ready atomic.Bool } type kickHandler struct { @@ -93,6 +96,8 @@ func (ws *WsServer) SetDiscoveryRegistry(ctx context.Context, disCov discovery.C ws.authClient = rpcli.NewAuthClient(authConn) ws.MessageHandler = NewGrpcHandler(ws.validate, rpcli.NewMsgClient(msgConn), rpcli.NewPushMsgServiceClient(pushConn)) ws.disCov = disCov + + ws.ready.Store(true) return nil } @@ -254,6 +259,10 @@ func (ws *WsServer) registerClient(client *Client) { oldClients []*Client ) oldClients, userOK, clientOK = ws.clients.Get(client.UserID, client.PlatformID) + + log.ZInfo(client.ctx, "registerClient", "userID", client.UserID, "platformID", client.PlatformID, + "sdkVersion", client.SDKVersion) + if !userOK { ws.clients.Set(client.UserID, client) log.ZDebug(client.ctx, "user not exist", "userID", client.UserID, "platformID", client.PlatformID) @@ -453,6 +462,11 @@ func (ws *WsServer) wsHandler(w http.ResponseWriter, r *http.Request) { // Create a new connection context connContext := newContext(w, r) + if !ws.ready.Load() { + httpError(connContext, errs.New("ws server not ready")) + return + } + // Check if the current number of online user connections exceeds the maximum limit if ws.onlineUserConnNum.Load() >= ws.wsMaxConnNum { // If it exceeds the maximum connection number, return an error via HTTP and stop processing @@ -469,6 +483,11 @@ func (ws *WsServer) wsHandler(w http.ResponseWriter, r *http.Request) { return } + if ws.authClient == nil { + httpError(connContext, errs.New("auth client is not initialized")) + return + } + // Call the authentication client to parse the Token obtained from the context resp, err := ws.authClient.ParseToken(connContext, connContext.GetToken()) if err != nil { diff --git a/internal/msgtransfer/callback.go b/internal/msgtransfer/callback.go index f0d439779..575efb4ae 100644 --- a/internal/msgtransfer/callback.go +++ b/internal/msgtransfer/callback.go @@ -51,37 +51,24 @@ func GetContent(msg *sdkws.MsgData) string { } } -func (mc *OnlineHistoryMongoConsumerHandler) webhookAfterSendSingleMsg(ctx context.Context, after *config.AfterConfig, msg *sdkws.MsgData) { - if msg.ContentType == constant.Typing { - return - } - +func (mc *OnlineHistoryMongoConsumerHandler) webhookAfterMsgSaveDB(ctx context.Context, after *config.AfterConfig, msg *sdkws.MsgData) { if !filterAfterMsg(msg, after) { return } - cbReq := &cbapi.CallbackAfterSendSingleMsgReq{ - CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand), - RecvID: msg.RecvID, - } - mc.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendSingleMsgResp{}, after, buildKeyMsgDataQuery(msg)) -} - -func (mc *OnlineHistoryMongoConsumerHandler) webhookAfterSendGroupMsg(ctx context.Context, after *config.AfterConfig, msg *sdkws.MsgData) { - if msg.ContentType == constant.Typing { - return + cbReq := &cbapi.CallbackAfterMsgSaveDBReq{ + CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterMsgSaveDBCommand), } - if !filterAfterMsg(msg, after) { - return + switch msg.SessionType { + case constant.SingleChatType, constant.NotificationChatType: + cbReq.RecvID = msg.RecvID + case constant.ReadGroupChatType: + cbReq.GroupID = msg.GroupID + default: } - cbReq := &cbapi.CallbackAfterSendGroupMsgReq{ - CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendGroupMsgCommand), - GroupID: msg.GroupID, - } - - mc.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after, buildKeyMsgDataQuery(msg)) + mc.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterMsgSaveDBResp{}, after, buildKeyMsgDataQuery(msg)) } func buildKeyMsgDataQuery(msg *sdkws.MsgData) map[string]string { diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 8b212774a..8af585ce3 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "errors" + "github.com/openimsdk/tools/mq" "sync" diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index 8611af7ea..147bd37b0 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -15,7 +15,6 @@ package msgtransfer import ( - "github.com/openimsdk/protocol/constant" "github.com/openimsdk/tools/mq" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" @@ -57,7 +56,7 @@ func (mc *OnlineHistoryMongoConsumerHandler) HandleChatWs2Mongo(val mq.Message) log.ZDebug(ctx, "mongo consumer recv msg", "msgs", msgFromMQ.String()) err = mc.msgTransferDatabase.BatchInsertChat2DB(ctx, msgFromMQ.ConversationID, msgFromMQ.MsgData, msgFromMQ.LastSeq) if err != nil { - log.ZError(ctx, "single data insert to mongo err", err, "msg", msgFromMQ.MsgData, "conversationID", msgFromMQ.ConversationID) + log.ZError(ctx, "batch data insert to mongo err", err, "msg", msgFromMQ.MsgData, "conversationID", msgFromMQ.ConversationID) prommetrics.MsgInsertMongoFailedCounter.Inc() } else { prommetrics.MsgInsertMongoSuccessCounter.Inc() @@ -65,12 +64,7 @@ func (mc *OnlineHistoryMongoConsumerHandler) HandleChatWs2Mongo(val mq.Message) } for _, msgData := range msgFromMQ.MsgData { - switch msgData.SessionType { - case constant.SingleChatType: - mc.webhookAfterSendSingleMsg(ctx, &mc.config.WebhooksConfig.AfterSendSingleMsg, msgData) - case constant.ReadGroupChatType: - mc.webhookAfterSendGroupMsg(ctx, &mc.config.WebhooksConfig.AfterSendGroupMsg, msgData) - } + mc.webhookAfterMsgSaveDB(ctx, &mc.config.WebhooksConfig.AfterMsgSaveDB, msgData) } //var seqs []int64 diff --git a/internal/rpc/conversation/conversation.go b/internal/rpc/conversation/conversation.go index c9bfd3c56..562a76d82 100644 --- a/internal/rpc/conversation/conversation.go +++ b/internal/rpc/conversation/conversation.go @@ -37,6 +37,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/protocol/constant" pbconversation "github.com/openimsdk/protocol/conversation" + "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/errs" @@ -795,7 +796,7 @@ func (c *conversationServer) ClearUserConversationMsg(ctx context.Context, req * } latestMsgDestructTime := time.UnixMilli(req.Timestamp) for i, conversation := range conversations { - if conversation.IsMsgDestruct == false || conversation.MsgDestructTime == 0 { + if !conversation.IsMsgDestruct || conversation.MsgDestructTime == 0 { continue } seq, err := c.msgClient.GetLastMessageSeqByTime(ctx, conversation.ConversationID, req.Timestamp-(conversation.MsgDestructTime*1000)) @@ -835,3 +836,53 @@ func (c *conversationServer) setConversationMinSeqAndLatestMsgDestructTime(ctx c c.conversationNotificationSender.ConversationChangeNotification(ctx, ownerUserID, []string{conversationID}) return nil } + +func (c *conversationServer) DeleteConversations(ctx context.Context, req *pbconversation.DeleteConversationsReq) (resp *pbconversation.DeleteConversationsResp, err error) { + if err := authverify.CheckAccess(ctx, req.OwnerUserID); err != nil { + return nil, err + } + if req.NeedDeleteTime == 0 && len(req.ConversationIDs) == 0 { + return nil, errs.ErrArgs.WrapMsg("need_delete_time or conversationIDs need be set") + } + + if req.NeedDeleteTime != 0 && len(req.ConversationIDs) != 0 { + return nil, errs.ErrArgs.WrapMsg("need_delete_time and conversationIDs cannot both be set") + } + + var needDeleteConversationIDs []string + + if len(req.ConversationIDs) == 0 { + deleteTimeThreshold := time.Now().AddDate(0, 0, -int(req.NeedDeleteTime)).UnixMilli() + conversationIDs, err := c.conversationDatabase.GetConversationIDs(ctx, req.OwnerUserID) + if err != nil { + return nil, err + } + latestMsgs, err := c.msgClient.GetLastMessage(ctx, &msg.GetLastMessageReq{ + UserID: req.OwnerUserID, + ConversationIDs: conversationIDs, + }) + if err != nil { + return nil, err + } + + for conversationID, msg := range latestMsgs.Msgs { + if msg.SendTime < deleteTimeThreshold { + needDeleteConversationIDs = append(needDeleteConversationIDs, conversationID) + } + } + + if len(needDeleteConversationIDs) == 0 { + return &pbconversation.DeleteConversationsResp{}, nil + } + } else { + needDeleteConversationIDs = req.ConversationIDs + } + + if err := c.conversationDatabase.DeleteUsersConversations(ctx, req.OwnerUserID, needDeleteConversationIDs); err != nil { + return nil, err + } + + // c.conversationNotificationSender.ConversationDeleteNotification(ctx, req.OwnerUserID, needDeleteConversationIDs) + + return &pbconversation.DeleteConversationsResp{}, nil +} diff --git a/internal/rpc/conversation/notification.go b/internal/rpc/conversation/notification.go index 6e865ba42..aa7b7d227 100644 --- a/internal/rpc/conversation/notification.go +++ b/internal/rpc/conversation/notification.go @@ -73,3 +73,12 @@ func (c *ConversationNotificationSender) ConversationUnreadChangeNotification( c.Notification(ctx, userID, userID, constant.ConversationUnreadNotification, tips) } + +func (c *ConversationNotificationSender) ConversationDeleteNotification(ctx context.Context, userID string, conversationIDs []string) { + tips := &sdkws.ConversationDeleteTips{ + UserID: userID, + ConversationIDs: conversationIDs, + } + + c.Notification(ctx, userID, userID, constant.ConversationDeleteNotification, tips) +} diff --git a/internal/rpc/msg/callback.go b/internal/rpc/msg/callback.go index 2c00efd43..49fb04477 100644 --- a/internal/rpc/msg/callback.go +++ b/internal/rpc/msg/callback.go @@ -16,8 +16,10 @@ package msg import ( "context" + "encoding/base64" "encoding/json" + "github.com/openimsdk/open-im-server/v3/pkg/apistruct" "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" "github.com/openimsdk/tools/errs" @@ -28,6 +30,7 @@ import ( "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/utils/datautil" + "github.com/openimsdk/tools/utils/stringutil" "google.golang.org/protobuf/proto" ) @@ -87,19 +90,19 @@ func (m *msgServer) webhookBeforeSendSingleMsg(ctx context.Context, before *conf } // Move to msgtransfer -// func (m *msgServer) webhookAfterSendSingleMsg(ctx context.Context, after *config.AfterConfig, msg *pbchat.SendMsgReq) { -// if msg.MsgData.ContentType == constant.Typing { -// return -// } -// if !filterAfterMsg(msg, after) { -// return -// } -// cbReq := &cbapi.CallbackAfterSendSingleMsgReq{ -// CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand), -// RecvID: msg.MsgData.RecvID, -// } -// m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendSingleMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData)) -// } +func (m *msgServer) webhookAfterSendSingleMsg(ctx context.Context, after *config.AfterConfig, msg *pbchat.SendMsgReq) { + if msg.MsgData.ContentType == constant.Typing { + return + } + if !filterAfterMsg(msg, after) { + return + } + cbReq := &cbapi.CallbackAfterSendSingleMsgReq{ + CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand), + RecvID: msg.MsgData.RecvID, + } + m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendSingleMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData)) +} func (m *msgServer) webhookBeforeSendGroupMsg(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq) error { return webhook.WithCondition(ctx, before, func(ctx context.Context) error { @@ -121,21 +124,20 @@ func (m *msgServer) webhookBeforeSendGroupMsg(ctx context.Context, before *confi }) } -// Move to msgtransfer -// func (m *msgServer) webhookAfterSendGroupMsg(ctx context.Context, after *config.AfterConfig, msg *pbchat.SendMsgReq) { -// if msg.MsgData.ContentType == constant.Typing { -// return -// } -// if !filterAfterMsg(msg, after) { -// return -// } -// cbReq := &cbapi.CallbackAfterSendGroupMsgReq{ -// CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendGroupMsgCommand), -// GroupID: msg.MsgData.GroupID, -// } +func (m *msgServer) webhookAfterSendGroupMsg(ctx context.Context, after *config.AfterConfig, msg *pbchat.SendMsgReq) { + if msg.MsgData.ContentType == constant.Typing { + return + } + if !filterAfterMsg(msg, after) { + return + } + cbReq := &cbapi.CallbackAfterSendGroupMsgReq{ + CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendGroupMsgCommand), + GroupID: msg.MsgData.GroupID, + } -// m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData)) -// } + m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData)) +} func (m *msgServer) webhookBeforeMsgModify(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq, beforeMsgData **sdkws.MsgData) error { return webhook.WithCondition(ctx, before, func(ctx context.Context) error { @@ -204,14 +206,14 @@ func (m *msgServer) webhookAfterRevokeMsg(ctx context.Context, after *config.Aft m.webhookClient.AsyncPost(ctx, callbackReq.GetCallbackCommand(), callbackReq, &cbapi.CallbackAfterRevokeMsgResp{}, after) } -// func buildKeyMsgDataQuery(msg *sdkws.MsgData) map[string]string { -// keyMsgData := apistruct.KeyMsgData{ -// SendID: msg.SendID, -// RecvID: msg.RecvID, -// GroupID: msg.GroupID, -// } +func buildKeyMsgDataQuery(msg *sdkws.MsgData) map[string]string { + keyMsgData := apistruct.KeyMsgData{ + SendID: msg.SendID, + RecvID: msg.RecvID, + GroupID: msg.GroupID, + } -// return map[string]string{ -// webhook.Key: base64.StdEncoding.EncodeToString(stringutil.StructToJsonBytes(keyMsgData)), -// } -// } + return map[string]string{ + webhook.Key: base64.StdEncoding.EncodeToString(stringutil.StructToJsonBytes(keyMsgData)), + } +} diff --git a/internal/rpc/msg/send.go b/internal/rpc/msg/send.go index d97905bff..18ad9cc56 100644 --- a/internal/rpc/msg/send.go +++ b/internal/rpc/msg/send.go @@ -86,7 +86,7 @@ func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *pbmsg.SendMsgReq, go m.setConversationAtInfo(ctx, req.MsgData) } - // m.webhookAfterSendGroupMsg(ctx, &m.config.WebhooksConfig.AfterSendGroupMsg, req) + m.webhookAfterSendGroupMsg(ctx, &m.config.WebhooksConfig.AfterSendGroupMsg, req) prommetrics.GroupChatMsgProcessSuccessCounter.Inc() resp = &pbmsg.SendMsgResp{} @@ -194,7 +194,7 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq return nil, err } - // m.webhookAfterSendSingleMsg(ctx, &m.config.WebhooksConfig.AfterSendSingleMsg, req) + m.webhookAfterSendSingleMsg(ctx, &m.config.WebhooksConfig.AfterSendSingleMsg, req) prommetrics.SingleChatMsgProcessSuccessCounter.Inc() return &pbmsg.SendMsgResp{ ServerMsgID: req.MsgData.ServerMsgID, diff --git a/pkg/callbackstruct/constant.go b/pkg/callbackstruct/constant.go index bbc34e71f..ef7cb502f 100644 --- a/pkg/callbackstruct/constant.go +++ b/pkg/callbackstruct/constant.go @@ -66,4 +66,5 @@ const ( CallbackAfterCreateSingleChatConversationsCommand = "callbackAfterCreateSingleChatConversationsCommand" CallbackBeforeCreateGroupChatConversationsCommand = "callbackBeforeCreateGroupChatConversationsCommand" CallbackAfterCreateGroupChatConversationsCommand = "callbackAfterCreateGroupChatConversationsCommand" + CallbackAfterMsgSaveDBCommand = "callbackAfterMsgSaveDBCommand" ) diff --git a/pkg/callbackstruct/conversation.go b/pkg/callbackstruct/conversation.go index 14e78094c..d40e914d3 100644 --- a/pkg/callbackstruct/conversation.go +++ b/pkg/callbackstruct/conversation.go @@ -2,42 +2,42 @@ package callbackstruct type CallbackBeforeCreateSingleChatConversationsReq struct { CallbackCommand `json:"callbackCommand"` - OwnerUserID string `json:"owner_user_id"` - ConversationID string `json:"conversation_id"` - ConversationType int32 `json:"conversation_type"` - UserID string `json:"user_id"` - RecvMsgOpt int32 `json:"recv_msg_opt"` - IsPinned bool `json:"is_pinned"` - IsPrivateChat bool `json:"is_private_chat"` - BurnDuration int32 `json:"burn_duration"` - GroupAtType int32 `json:"group_at_type"` - AttachedInfo string `json:"attached_info"` + OwnerUserID string `json:"ownerUserId"` + ConversationID string `json:"conversationId"` + ConversationType int32 `json:"conversationType"` + UserID string `json:"userId"` + RecvMsgOpt int32 `json:"recvMsgOpt"` + IsPinned bool `json:"isPinned"` + IsPrivateChat bool `json:"isPrivateChat"` + BurnDuration int32 `json:"burnDuration"` + GroupAtType int32 `json:"groupAtType"` + AttachedInfo string `json:"attachedInfo"` Ex string `json:"ex"` } type CallbackBeforeCreateSingleChatConversationsResp struct { CommonCallbackResp - RecvMsgOpt *int32 `json:"recv_msg_opt"` - IsPinned *bool `json:"is_pinned"` - IsPrivateChat *bool `json:"is_private_chat"` - BurnDuration *int32 `json:"burn_duration"` - GroupAtType *int32 `json:"group_at_type"` - AttachedInfo *string `json:"attached_info"` + RecvMsgOpt *int32 `json:"recvMsgOpt"` + IsPinned *bool `json:"isPinned"` + IsPrivateChat *bool `json:"isPrivateChat"` + BurnDuration *int32 `json:"burnDuration"` + GroupAtType *int32 `json:"groupAtType"` + AttachedInfo *string `json:"attachedInfo"` Ex *string `json:"ex"` } type CallbackAfterCreateSingleChatConversationsReq struct { CallbackCommand `json:"callbackCommand"` - OwnerUserID string `json:"owner_user_id"` - ConversationID string `json:"conversation_id"` - ConversationType int32 `json:"conversation_type"` - UserID string `json:"user_id"` - RecvMsgOpt int32 `json:"recv_msg_opt"` - IsPinned bool `json:"is_pinned"` - IsPrivateChat bool `json:"is_private_chat"` - BurnDuration int32 `json:"burn_duration"` - GroupAtType int32 `json:"group_at_type"` - AttachedInfo string `json:"attached_info"` + OwnerUserID string `json:"ownerUserId"` + ConversationID string `json:"conversationId"` + ConversationType int32 `json:"conversationType"` + UserID string `json:"userId"` + RecvMsgOpt int32 `json:"recvMsgOpt"` + IsPinned bool `json:"isPinned"` + IsPrivateChat bool `json:"isPrivateChat"` + BurnDuration int32 `json:"burnDuration"` + GroupAtType int32 `json:"groupAtType"` + AttachedInfo string `json:"attachedInfo"` Ex string `json:"ex"` } @@ -47,42 +47,42 @@ type CallbackAfterCreateSingleChatConversationsResp struct { type CallbackBeforeCreateGroupChatConversationsReq struct { CallbackCommand `json:"callbackCommand"` - OwnerUserID string `json:"owner_user_id"` - ConversationID string `json:"conversation_id"` - ConversationType int32 `json:"conversation_type"` - GroupID string `json:"group_id"` - RecvMsgOpt int32 `json:"recv_msg_opt"` - IsPinned bool `json:"is_pinned"` - IsPrivateChat bool `json:"is_private_chat"` - BurnDuration int32 `json:"burn_duration"` - GroupAtType int32 `json:"group_at_type"` - AttachedInfo string `json:"attached_info"` + OwnerUserID string `json:"ownerUserId"` + ConversationID string `json:"conversationId"` + ConversationType int32 `json:"conversationType"` + GroupID string `json:"groupId"` + RecvMsgOpt int32 `json:"recvMsgOpt"` + IsPinned bool `json:"isPinned"` + IsPrivateChat bool `json:"isPrivateChat"` + BurnDuration int32 `json:"burnDuration"` + GroupAtType int32 `json:"groupAtType"` + AttachedInfo string `json:"attachedInfo"` Ex string `json:"ex"` } type CallbackBeforeCreateGroupChatConversationsResp struct { CommonCallbackResp - RecvMsgOpt *int32 `json:"recv_msg_opt"` - IsPinned *bool `json:"is_pinned"` - IsPrivateChat *bool `json:"is_private_chat"` - BurnDuration *int32 `json:"burn_duration"` - GroupAtType *int32 `json:"group_at_type"` - AttachedInfo *string `json:"attached_info"` + RecvMsgOpt *int32 `json:"recvMsgOpt"` + IsPinned *bool `json:"isPinned"` + IsPrivateChat *bool `json:"isPrivateChat"` + BurnDuration *int32 `json:"burnDuration"` + GroupAtType *int32 `json:"groupAtType"` + AttachedInfo *string `json:"attachedInfo"` Ex *string `json:"ex"` } type CallbackAfterCreateGroupChatConversationsReq struct { CallbackCommand `json:"callbackCommand"` - OwnerUserID string `json:"owner_user_id"` - ConversationID string `json:"conversation_id"` - ConversationType int32 `json:"conversation_type"` - GroupID string `json:"group_id"` - RecvMsgOpt int32 `json:"recv_msg_opt"` - IsPinned bool `json:"is_pinned"` - IsPrivateChat bool `json:"is_private_chat"` - BurnDuration int32 `json:"burn_duration"` - GroupAtType int32 `json:"group_at_type"` - AttachedInfo string `json:"attached_info"` + OwnerUserID string `json:"ownerUserId"` + ConversationID string `json:"conversationId"` + ConversationType int32 `json:"conversationType"` + GroupID string `json:"groupId"` + RecvMsgOpt int32 `json:"recvMsgOpt"` + IsPinned bool `json:"isPinned"` + IsPrivateChat bool `json:"isPrivateChat"` + BurnDuration int32 `json:"burnDuration"` + GroupAtType int32 `json:"groupAtType"` + AttachedInfo string `json:"attachedInfo"` Ex string `json:"ex"` } diff --git a/pkg/callbackstruct/message.go b/pkg/callbackstruct/message.go index 902fa6110..ef8a8bb28 100644 --- a/pkg/callbackstruct/message.go +++ b/pkg/callbackstruct/message.go @@ -103,3 +103,13 @@ type CallbackSingleMsgReadReq struct { type CallbackSingleMsgReadResp struct { CommonCallbackResp } + +type CallbackAfterMsgSaveDBReq struct { + CommonCallbackReq + RecvID string `json:"recvID"` + GroupID string `json:"groupID"` +} + +type CallbackAfterMsgSaveDBResp struct { + CommonCallbackResp +} diff --git a/pkg/common/cmd/api.go b/pkg/common/cmd/api.go index 7b7dbc89b..1356f90d6 100644 --- a/pkg/common/cmd/api.go +++ b/pkg/common/cmd/api.go @@ -81,6 +81,9 @@ func (a *ApiCmd) runE() error { } return startrpc.Start( a.ctx, &a.apiConfig.Discovery, + nil, + nil, + // &a.apiConfig.API.RateLimiter, &prometheus, a.apiConfig.API.Api.ListenIP, "", a.apiConfig.API.Prometheus.AutoSetPorts, diff --git a/pkg/common/cmd/auth.go b/pkg/common/cmd/auth.go index d624e9dae..a6a92ef14 100644 --- a/pkg/common/cmd/auth.go +++ b/pkg/common/cmd/auth.go @@ -57,7 +57,7 @@ func (a *AuthRpcCmd) Exec() error { } func (a *AuthRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.authConfig.Discovery, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP, + return startrpc.Start(a.ctx, &a.authConfig.Discovery, &a.authConfig.RpcConfig.CircuitBreaker, &a.authConfig.RpcConfig.RateLimiter, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP, a.authConfig.RpcConfig.RPC.RegisterIP, a.authConfig.RpcConfig.RPC.AutoSetPorts, a.authConfig.RpcConfig.RPC.Ports, a.Index(), a.authConfig.Discovery.RpcService.Auth, nil, a.authConfig, []string{ diff --git a/pkg/common/cmd/conversation.go b/pkg/common/cmd/conversation.go index 428c442da..f1ae0c59c 100644 --- a/pkg/common/cmd/conversation.go +++ b/pkg/common/cmd/conversation.go @@ -58,7 +58,7 @@ func (a *ConversationRpcCmd) Exec() error { } func (a *ConversationRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.conversationConfig.Discovery, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP, + return startrpc.Start(a.ctx, &a.conversationConfig.Discovery, &a.conversationConfig.RpcConfig.CircuitBreaker, &a.conversationConfig.RpcConfig.RateLimiter, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP, a.conversationConfig.RpcConfig.RPC.RegisterIP, a.conversationConfig.RpcConfig.RPC.AutoSetPorts, a.conversationConfig.RpcConfig.RPC.Ports, a.Index(), a.conversationConfig.Discovery.RpcService.Conversation, &a.conversationConfig.NotificationConfig, a.conversationConfig, []string{ diff --git a/pkg/common/cmd/cron_task.go b/pkg/common/cmd/cron_task.go index af2cbbee9..c666bd021 100644 --- a/pkg/common/cmd/cron_task.go +++ b/pkg/common/cmd/cron_task.go @@ -56,6 +56,8 @@ func (a *CronTaskCmd) runE() error { var prometheus config.Prometheus return startrpc.Start( a.ctx, &a.cronTaskConfig.Discovery, + nil, + nil, &prometheus, "", "", true, diff --git a/pkg/common/cmd/friend.go b/pkg/common/cmd/friend.go index dd850cf17..661da86c7 100644 --- a/pkg/common/cmd/friend.go +++ b/pkg/common/cmd/friend.go @@ -58,7 +58,7 @@ func (a *FriendRpcCmd) Exec() error { } func (a *FriendRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.relationConfig.Discovery, &a.relationConfig.RpcConfig.Prometheus, a.relationConfig.RpcConfig.RPC.ListenIP, + return startrpc.Start(a.ctx, &a.relationConfig.Discovery, &a.relationConfig.RpcConfig.CircuitBreaker, &a.relationConfig.RpcConfig.RateLimiter, &a.relationConfig.RpcConfig.Prometheus, a.relationConfig.RpcConfig.RPC.ListenIP, a.relationConfig.RpcConfig.RPC.RegisterIP, a.relationConfig.RpcConfig.RPC.AutoSetPorts, a.relationConfig.RpcConfig.RPC.Ports, a.Index(), a.relationConfig.Discovery.RpcService.Friend, &a.relationConfig.NotificationConfig, a.relationConfig, []string{ diff --git a/pkg/common/cmd/group.go b/pkg/common/cmd/group.go index 7a599077f..daff7b499 100644 --- a/pkg/common/cmd/group.go +++ b/pkg/common/cmd/group.go @@ -59,7 +59,7 @@ func (a *GroupRpcCmd) Exec() error { } func (a *GroupRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.groupConfig.Discovery, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP, + return startrpc.Start(a.ctx, &a.groupConfig.Discovery, &a.groupConfig.RpcConfig.CircuitBreaker, &a.groupConfig.RpcConfig.RateLimiter, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP, a.groupConfig.RpcConfig.RPC.RegisterIP, a.groupConfig.RpcConfig.RPC.AutoSetPorts, a.groupConfig.RpcConfig.RPC.Ports, a.Index(), a.groupConfig.Discovery.RpcService.Group, &a.groupConfig.NotificationConfig, a.groupConfig, []string{ diff --git a/pkg/common/cmd/msg.go b/pkg/common/cmd/msg.go index c4049be05..845175cd9 100644 --- a/pkg/common/cmd/msg.go +++ b/pkg/common/cmd/msg.go @@ -59,7 +59,7 @@ func (a *MsgRpcCmd) Exec() error { } func (a *MsgRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.msgConfig.Discovery, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP, + return startrpc.Start(a.ctx, &a.msgConfig.Discovery, &a.msgConfig.RpcConfig.CircuitBreaker, &a.msgConfig.RpcConfig.RateLimiter, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP, a.msgConfig.RpcConfig.RPC.RegisterIP, a.msgConfig.RpcConfig.RPC.AutoSetPorts, a.msgConfig.RpcConfig.RPC.Ports, a.Index(), a.msgConfig.Discovery.RpcService.Msg, &a.msgConfig.NotificationConfig, a.msgConfig, []string{ diff --git a/pkg/common/cmd/msg_gateway.go b/pkg/common/cmd/msg_gateway.go index 83cfb6272..4ca899a5e 100644 --- a/pkg/common/cmd/msg_gateway.go +++ b/pkg/common/cmd/msg_gateway.go @@ -61,6 +61,8 @@ func (m *MsgGatewayCmd) runE() error { var prometheus config.Prometheus return startrpc.Start( m.ctx, &m.msgGatewayConfig.Discovery, + &m.msgGatewayConfig.MsgGateway.CircuitBreaker, + &m.msgGatewayConfig.MsgGateway.RateLimiter, &prometheus, rpc.ListenIP, rpc.RegisterIP, rpc.AutoSetPorts, diff --git a/pkg/common/cmd/msg_transfer.go b/pkg/common/cmd/msg_transfer.go index fe6c27e54..81f40ac9d 100644 --- a/pkg/common/cmd/msg_transfer.go +++ b/pkg/common/cmd/msg_transfer.go @@ -62,6 +62,8 @@ func (m *MsgTransferCmd) runE() error { var prometheus config.Prometheus return startrpc.Start( m.ctx, &m.msgTransferConfig.Discovery, + &m.msgTransferConfig.MsgTransfer.CircuitBreaker, + &m.msgTransferConfig.MsgTransfer.RateLimiter, &prometheus, "", "", true, diff --git a/pkg/common/cmd/push.go b/pkg/common/cmd/push.go index 7cd3c481e..bcb34ed2c 100644 --- a/pkg/common/cmd/push.go +++ b/pkg/common/cmd/push.go @@ -60,7 +60,7 @@ func (a *PushRpcCmd) Exec() error { } func (a *PushRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.pushConfig.Discovery, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP, + return startrpc.Start(a.ctx, &a.pushConfig.Discovery, &a.pushConfig.RpcConfig.CircuitBreaker, &a.pushConfig.RpcConfig.RateLimiter, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP, a.pushConfig.RpcConfig.RPC.RegisterIP, a.pushConfig.RpcConfig.RPC.AutoSetPorts, a.pushConfig.RpcConfig.RPC.Ports, a.Index(), a.pushConfig.Discovery.RpcService.Push, &a.pushConfig.NotificationConfig, a.pushConfig, []string{ diff --git a/pkg/common/cmd/third.go b/pkg/common/cmd/third.go index e567234e4..39d731e0d 100644 --- a/pkg/common/cmd/third.go +++ b/pkg/common/cmd/third.go @@ -58,7 +58,7 @@ func (a *ThirdRpcCmd) Exec() error { } func (a *ThirdRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.thirdConfig.Discovery, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP, + return startrpc.Start(a.ctx, &a.thirdConfig.Discovery, &a.thirdConfig.RpcConfig.CircuitBreaker, &a.thirdConfig.RpcConfig.RateLimiter, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP, a.thirdConfig.RpcConfig.RPC.RegisterIP, a.thirdConfig.RpcConfig.RPC.AutoSetPorts, a.thirdConfig.RpcConfig.RPC.Ports, a.Index(), a.thirdConfig.Discovery.RpcService.Third, &a.thirdConfig.NotificationConfig, a.thirdConfig, []string{ diff --git a/pkg/common/cmd/user.go b/pkg/common/cmd/user.go index 190f6f892..12abe1bde 100644 --- a/pkg/common/cmd/user.go +++ b/pkg/common/cmd/user.go @@ -59,7 +59,7 @@ func (a *UserRpcCmd) Exec() error { } func (a *UserRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.userConfig.Discovery, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP, + return startrpc.Start(a.ctx, &a.userConfig.Discovery, &a.userConfig.RpcConfig.CircuitBreaker, &a.userConfig.RpcConfig.RateLimiter, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP, a.userConfig.RpcConfig.RPC.RegisterIP, a.userConfig.RpcConfig.RPC.AutoSetPorts, a.userConfig.RpcConfig.RPC.Ports, a.Index(), a.userConfig.Discovery.RpcService.User, &a.userConfig.NotificationConfig, a.userConfig, []string{ diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index c5000a6e5..695684d15 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -143,6 +143,23 @@ type API struct { Ports []int `yaml:"ports"` GrafanaURL string `yaml:"grafanaURL"` } `yaml:"prometheus"` + + RateLimiter RateLimiter `yaml:"rateLimiter"` +} + +type RateLimiter struct { + Enable bool `yaml:"enable"` + Window time.Duration `yaml:"window"` + Bucket int `yaml:"bucket"` + CPUThreshold int64 `yaml:"cpuThreshold"` +} + +type CircuitBreaker struct { + Enable bool `yaml:"enable"` + Window time.Duration `yaml:"window"` + Bucket int `yaml:"bucket"` + Success float64 `yaml:"success"` + Request int64 `yaml:"request"` } type CronTask struct { @@ -217,6 +234,8 @@ type MsgGateway struct { WebsocketMaxMsgLen int `yaml:"websocketMaxMsgLen"` WebsocketTimeout int `yaml:"websocketTimeout"` } `yaml:"longConnSvr"` + RateLimiter RateLimiter `yaml:"rateLimiter"` + CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"` } type MsgTransfer struct { @@ -225,6 +244,8 @@ type MsgTransfer struct { AutoSetPorts bool `yaml:"autoSetPorts"` Ports []int `yaml:"ports"` } `yaml:"prometheus"` + RateLimiter RateLimiter `yaml:"rateLimiter"` + CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"` } type Push struct { @@ -255,7 +276,9 @@ type Push struct { BadgeCount bool `yaml:"badgeCount"` Production bool `yaml:"production"` } `yaml:"iosPush"` - FullUserCache bool `yaml:"fullUserCache"` + FullUserCache bool `yaml:"fullUserCache"` + RateLimiter RateLimiter `yaml:"rateLimiter"` + CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"` } type Auth struct { @@ -264,28 +287,38 @@ type Auth struct { TokenPolicy struct { Expire int64 `yaml:"expire"` } `yaml:"tokenPolicy"` + RateLimiter RateLimiter `yaml:"rateLimiter"` + CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"` } type Conversation struct { - RPC RPC `yaml:"rpc"` - Prometheus Prometheus `yaml:"prometheus"` + RPC RPC `yaml:"rpc"` + Prometheus Prometheus `yaml:"prometheus"` + RateLimiter RateLimiter `yaml:"rateLimiter"` + CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"` } type Friend struct { - RPC RPC `yaml:"rpc"` - Prometheus Prometheus `yaml:"prometheus"` + RPC RPC `yaml:"rpc"` + Prometheus Prometheus `yaml:"prometheus"` + RateLimiter RateLimiter `yaml:"rateLimiter"` + CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"` } type Group struct { - RPC RPC `yaml:"rpc"` - Prometheus Prometheus `yaml:"prometheus"` - EnableHistoryForNewMembers bool `yaml:"enableHistoryForNewMembers"` + RPC RPC `yaml:"rpc"` + Prometheus Prometheus `yaml:"prometheus"` + EnableHistoryForNewMembers bool `yaml:"enableHistoryForNewMembers"` + RateLimiter RateLimiter `yaml:"rateLimiter"` + CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"` } type Msg struct { - RPC RPC `yaml:"rpc"` - Prometheus Prometheus `yaml:"prometheus"` - FriendVerify bool `yaml:"friendVerify"` + RPC RPC `yaml:"rpc"` + Prometheus Prometheus `yaml:"prometheus"` + FriendVerify bool `yaml:"friendVerify"` + RateLimiter RateLimiter `yaml:"rateLimiter"` + CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"` } type Third struct { @@ -298,6 +331,8 @@ type Third struct { Kodo Kodo `yaml:"kodo"` Aws Aws `yaml:"aws"` } `yaml:"object"` + RateLimiter RateLimiter `yaml:"rateLimiter"` + CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"` } type Cos struct { BucketURL string `yaml:"bucketURL"` @@ -336,8 +371,10 @@ type Aws struct { } type User struct { - RPC RPC `yaml:"rpc"` - Prometheus Prometheus `yaml:"prometheus"` + RPC RPC `yaml:"rpc"` + Prometheus Prometheus `yaml:"prometheus"` + RateLimiter RateLimiter `yaml:"rateLimiter"` + CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"` } type RPC struct { @@ -436,6 +473,7 @@ type Webhooks struct { BeforeSendGroupMsg BeforeConfig `yaml:"beforeSendGroupMsg"` BeforeMsgModify BeforeConfig `yaml:"beforeMsgModify"` AfterSendGroupMsg AfterConfig `yaml:"afterSendGroupMsg"` + AfterMsgSaveDB AfterConfig `yaml:"afterMsgSaveDB"` AfterUserOnline AfterConfig `yaml:"afterUserOnline"` AfterUserOffline AfterConfig `yaml:"afterUserOffline"` AfterUserKickOff AfterConfig `yaml:"afterUserKickOff"` diff --git a/pkg/common/startrpc/circuitbreaker.go b/pkg/common/startrpc/circuitbreaker.go new file mode 100644 index 000000000..060a3aa8e --- /dev/null +++ b/pkg/common/startrpc/circuitbreaker.go @@ -0,0 +1,107 @@ +package startrpc + +import ( + "context" + "time" + + "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/stability/circuitbreaker" + "github.com/openimsdk/tools/stability/circuitbreaker/sre" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type CircuitBreaker struct { + Enable bool `yaml:"enable"` + Success float64 `yaml:"success"` // success rate threshold (0.0-1.0) + Request int64 `yaml:"request"` // request threshold + Bucket int `yaml:"bucket"` // number of buckets + Window time.Duration `yaml:"window"` // time window for statistics +} + +func NewCircuitBreaker(config *CircuitBreaker) circuitbreaker.CircuitBreaker { + if !config.Enable { + return nil + } + + return sre.NewSREBraker( + sre.WithWindow(config.Window), + sre.WithBucket(config.Bucket), + sre.WithSuccess(config.Success), + sre.WithRequest(config.Request), + ) +} + +func UnaryCircuitBreakerInterceptor(breaker circuitbreaker.CircuitBreaker) grpc.ServerOption { + if breaker == nil { + return grpc.ChainUnaryInterceptor(func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) { + return handler(ctx, req) + }) + } + + return grpc.ChainUnaryInterceptor(func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) { + if err := breaker.Allow(); err != nil { + log.ZWarn(ctx, "rpc circuit breaker open", err, "method", info.FullMethod) + return nil, status.Error(codes.Unavailable, "service unavailable due to circuit breaker") + } + + resp, err = handler(ctx, req) + + if err != nil { + if st, ok := status.FromError(err); ok { + switch st.Code() { + case codes.OK: + breaker.MarkSuccess() + case codes.InvalidArgument, codes.NotFound, codes.AlreadyExists, codes.PermissionDenied: + breaker.MarkSuccess() + default: + breaker.MarkFailed() + } + } else { + breaker.MarkFailed() + } + } else { + breaker.MarkSuccess() + } + + return resp, err + + }) +} + +func StreamCircuitBreakerInterceptor(breaker circuitbreaker.CircuitBreaker) grpc.ServerOption { + if breaker == nil { + return grpc.ChainStreamInterceptor(func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + return handler(srv, ss) + }) + } + + return grpc.ChainStreamInterceptor(func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + if err := breaker.Allow(); err != nil { + log.ZWarn(ss.Context(), "rpc circuit breaker open", err, "method", info.FullMethod) + return status.Error(codes.Unavailable, "service unavailable due to circuit breaker") + } + + err := handler(srv, ss) + + if err != nil { + if st, ok := status.FromError(err); ok { + switch st.Code() { + case codes.OK: + breaker.MarkSuccess() + case codes.InvalidArgument, codes.NotFound, codes.AlreadyExists, codes.PermissionDenied: + breaker.MarkSuccess() + default: + breaker.MarkFailed() + } + } else { + breaker.MarkFailed() + } + } else { + breaker.MarkSuccess() + } + + return err + }) +} diff --git a/pkg/common/startrpc/ratelimit.go b/pkg/common/startrpc/ratelimit.go new file mode 100644 index 000000000..1c2ac8eae --- /dev/null +++ b/pkg/common/startrpc/ratelimit.go @@ -0,0 +1,70 @@ +package startrpc + +import ( + "context" + "time" + + "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/stability/ratelimit" + "github.com/openimsdk/tools/stability/ratelimit/bbr" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type RateLimiter struct { + Enable bool + Window time.Duration + Bucket int + CPUThreshold int64 +} + +func NewRateLimiter(config *RateLimiter) ratelimit.Limiter { + if !config.Enable { + return nil + } + + return bbr.NewBBRLimiter( + bbr.WithWindow(config.Window), + bbr.WithBucket(config.Bucket), + bbr.WithCPUThreshold(config.CPUThreshold), + ) +} + +func UnaryRateLimitInterceptor(limiter ratelimit.Limiter) grpc.ServerOption { + if limiter == nil { + return grpc.ChainUnaryInterceptor(func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) { + return handler(ctx, req) + }) + } + + return grpc.ChainUnaryInterceptor(func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) { + done, err := limiter.Allow() + if err != nil { + log.ZWarn(ctx, "rpc rate limited", err, "method", info.FullMethod) + return nil, status.Errorf(codes.ResourceExhausted, "rpc request rate limit exceeded: %v, please try again later", err) + } + + defer done(ratelimit.DoneInfo{}) + return handler(ctx, req) + }) +} + +func StreamRateLimitInterceptor(limiter ratelimit.Limiter) grpc.ServerOption { + if limiter == nil { + return grpc.ChainStreamInterceptor(func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + return handler(srv, ss) + }) + } + + return grpc.ChainStreamInterceptor(func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + done, err := limiter.Allow() + if err != nil { + log.ZWarn(ss.Context(), "rpc rate limited", err, "method", info.FullMethod) + return status.Errorf(codes.ResourceExhausted, "rpc request rate limit exceeded: %v, please try again later", err) + } + defer done(ratelimit.DoneInfo{}) + + return handler(srv, ss) + }) +} diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index 9715f2aac..247d13b6b 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -47,7 +47,7 @@ func init() { prommetrics.RegistryAll() } -func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *conf.Prometheus, listenIP, +func Start[T any](ctx context.Context, disc *conf.Discovery, circuitBreakerConfig *conf.CircuitBreaker, rateLimiterConfig *conf.RateLimiter, prometheusConfig *conf.Prometheus, listenIP, registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, notification *conf.Notification, config T, watchConfigNames []string, watchServiceNames []string, rpcFn func(ctx context.Context, config T, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error, @@ -84,6 +84,45 @@ func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *c } } + if circuitBreakerConfig != nil && circuitBreakerConfig.Enable { + cb := &CircuitBreaker{ + Enable: circuitBreakerConfig.Enable, + Success: circuitBreakerConfig.Success, + Request: circuitBreakerConfig.Request, + Bucket: circuitBreakerConfig.Bucket, + Window: circuitBreakerConfig.Window, + } + + breaker := NewCircuitBreaker(cb) + + options = append(options, + UnaryCircuitBreakerInterceptor(breaker), + StreamCircuitBreakerInterceptor(breaker), + ) + + log.ZInfo(ctx, "RPC circuit breaker enabled", + "service", rpcRegisterName, + "window", circuitBreakerConfig.Window, + "bucket", circuitBreakerConfig.Bucket, + "success", circuitBreakerConfig.Success, + "requestThreshold", circuitBreakerConfig.Request) + } + + if rateLimiterConfig != nil && rateLimiterConfig.Enable { + limiter := NewRateLimiter((*RateLimiter)(rateLimiterConfig)) + + options = append(options, + UnaryRateLimitInterceptor(limiter), + StreamRateLimitInterceptor(limiter), + ) + + log.ZInfo(ctx, "RPC rate limiter enabled", + "service", rpcRegisterName, + "window", rateLimiterConfig.Window, + "bucket", rateLimiterConfig.Bucket, + "cpuThreshold", rateLimiterConfig.CPUThreshold) + } + registerIP, err := network.GetRpcRegisterIP(registerIP) if err != nil { return err @@ -123,7 +162,7 @@ func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *c go func() { sigs := make(chan os.Signal, 1) - signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL) + signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT) select { case <-ctx.Done(): return diff --git a/pkg/common/storage/cache/conversation.go b/pkg/common/storage/cache/conversation.go index ac3011107..d5ab2264f 100644 --- a/pkg/common/storage/cache/conversation.go +++ b/pkg/common/storage/cache/conversation.go @@ -16,6 +16,7 @@ package cache import ( "context" + relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" ) @@ -57,7 +58,7 @@ type ConversationCache interface { GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) DelConversationNotReceiveMessageUserIDs(conversationIDs ...string) ConversationCache DelConversationNotNotifyMessageUserIDs(userIDs ...string) ConversationCache - DelConversationPinnedMessageUserIDs(userIDs ...string) ConversationCache + DelUserPinnedConversations(userIDs ...string) ConversationCache DelConversationVersionUserIDs(userIDs ...string) ConversationCache FindMaxConversationUserVersion(ctx context.Context, userID string) (*relationtb.VersionLog, error) diff --git a/pkg/common/storage/cache/redis/conversation.go b/pkg/common/storage/cache/redis/conversation.go index 3453b570d..16d67322b 100644 --- a/pkg/common/storage/cache/redis/conversation.go +++ b/pkg/common/storage/cache/redis/conversation.go @@ -253,7 +253,7 @@ func (c *ConversationRedisCache) DelConversationNotNotifyMessageUserIDs(userIDs return cache } -func (c *ConversationRedisCache) DelConversationPinnedMessageUserIDs(userIDs ...string) cache.ConversationCache { +func (c *ConversationRedisCache) DelUserPinnedConversations(userIDs ...string) cache.ConversationCache { cache := c.CloneConversationCache() for _, userID := range userIDs { cache.AddKeys(c.getPinnedConversationIDsKey(userID)) diff --git a/pkg/common/storage/cache/redis/token.go b/pkg/common/storage/cache/redis/token.go index 9ad4c319f..3a9967e90 100644 --- a/pkg/common/storage/cache/redis/token.go +++ b/pkg/common/storage/cache/redis/token.go @@ -220,7 +220,6 @@ func (c *tokenCache) DeleteAndSetTemporary(ctx context.Context, userID string, p if err := c.rdb.HDel(ctx, key, fields...).Err(); err != nil { return errs.Wrap(err) } - if c.localCache != nil { c.removeLocalTokenCache(ctx, key) } diff --git a/pkg/common/storage/controller/conversation.go b/pkg/common/storage/controller/conversation.go index 27442ca66..d085c7466 100644 --- a/pkg/common/storage/controller/conversation.go +++ b/pkg/common/storage/controller/conversation.go @@ -78,6 +78,8 @@ type ConversationDatabase interface { GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error) // FindRandConversation finds random conversations based on the specified timestamp and limit. FindRandConversation(ctx context.Context, ts int64, limit int) ([]*relationtb.Conversation, error) + + DeleteUsersConversations(ctx context.Context, userID string, conversationIDs []string) (err error) } func NewConversationDatabase(conversation database.Conversation, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase { @@ -120,7 +122,7 @@ func (c *conversationDatabase) SetUsersConversationFieldTx(ctx context.Context, cache = cache.DelConversationNotNotifyMessageUserIDs(userIDs...) } if _, ok := fieldMap["is_pinned"]; ok { - cache = cache.DelConversationPinnedMessageUserIDs(userIDs...) + cache = cache.DelUserPinnedConversations(userIDs...) } cache = cache.DelConversationVersionUserIDs(haveUserIDs...) } @@ -172,7 +174,7 @@ func (c *conversationDatabase) UpdateUsersConversationField(ctx context.Context, cache = cache.DelConversationNotNotifyMessageUserIDs(userIDs...) } if _, ok := args["is_pinned"]; ok { - cache = cache.DelConversationPinnedMessageUserIDs(userIDs...) + cache = cache.DelUserPinnedConversations(userIDs...) } return cache.ChainExecDel(ctx) } @@ -203,7 +205,7 @@ func (c *conversationDatabase) CreateConversation(ctx context.Context, conversat DelUserConversationIDsHash(userIDs...). DelConversationVersionUserIDs(userIDs...). DelConversationNotNotifyMessageUserIDs(notNotifyUserIDs...). - DelConversationPinnedMessageUserIDs(pinnedUserIDs...). + DelUserPinnedConversations(pinnedUserIDs...). ChainExecDel(ctx) } @@ -259,7 +261,7 @@ func (c *conversationDatabase) SetUserConversations(ctx context.Context, ownerUs cache := c.cache.CloneConversationCache() cache = cache.DelConversationVersionUserIDs(ownerUserID). DelConversationNotNotifyMessageUserIDs(ownerUserID). - DelConversationPinnedMessageUserIDs(ownerUserID) + DelUserPinnedConversations(ownerUserID) groupIDs := datautil.Distinct(datautil.Filter(conversations, func(e *relationtb.Conversation) (string, bool) { return e.GroupID, e.GroupID != "" @@ -429,3 +431,21 @@ func (c *conversationDatabase) GetPinnedConversationIDs(ctx context.Context, use func (c *conversationDatabase) FindRandConversation(ctx context.Context, ts int64, limit int) ([]*relationtb.Conversation, error) { return c.conversationDB.FindRandConversation(ctx, ts, limit) } + +func (c *conversationDatabase) DeleteUsersConversations(ctx context.Context, userID string, conversationIDs []string) (err error) { + return c.tx.Transaction(ctx, func(ctx context.Context) error { + err = c.conversationDB.DeleteUsersConversations(ctx, userID, conversationIDs) + if err != nil { + return err + } + cache := c.cache.CloneConversationCache() + cache = cache.DelConversations(userID, conversationIDs...). + DelConversationVersionUserIDs(userID). + DelConversationIDs(userID). + DelUserConversationIDsHash(userID). + DelConversationNotNotifyMessageUserIDs(userID). + DelUserPinnedConversations(userID) + + return cache.ChainExecDel(ctx) + }) +} diff --git a/pkg/common/storage/database/conversation.go b/pkg/common/storage/database/conversation.go index d612dfc2d..562782346 100644 --- a/pkg/common/storage/database/conversation.go +++ b/pkg/common/storage/database/conversation.go @@ -44,4 +44,5 @@ type Conversation interface { GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error) FindRandConversation(ctx context.Context, ts int64, limit int) ([]*model.Conversation, error) + DeleteUsersConversations(ctx context.Context, userID string, conversationIDs []string) (err error) } diff --git a/pkg/common/storage/database/mgo/conversation.go b/pkg/common/storage/database/mgo/conversation.go index 89f13ea3d..d5ab046aa 100644 --- a/pkg/common/storage/database/mgo/conversation.go +++ b/pkg/common/storage/database/mgo/conversation.go @@ -308,3 +308,20 @@ func (c *ConversationMgo) FindRandConversation(ctx context.Context, ts int64, li } return mongoutil.Aggregate[*model.Conversation](ctx, c.coll, pipeline) } + +func (c *ConversationMgo) DeleteUsersConversations(ctx context.Context, userID string, conversationIDs []string) (err error) { + if len(conversationIDs) == 0 { + return nil + } + return mongoutil.IncrVersion(func() error { + err := mongoutil.DeleteMany(ctx, c.coll, bson.M{"owner_user_id": userID, "conversation_id": bson.M{"$in": conversationIDs}}) + return err + }, func() error { + for _, conversationID := range conversationIDs { + if err := c.version.IncrVersion(ctx, userID, []string{conversationID}, model.VersionStateDelete); err != nil { + return err + } + } + return nil + }) +} diff --git a/pkg/common/storage/database/mgo/user.go b/pkg/common/storage/database/mgo/user.go index a08765baf..04ecca1e8 100644 --- a/pkg/common/storage/database/mgo/user.go +++ b/pkg/common/storage/database/mgo/user.go @@ -73,7 +73,7 @@ func (u *UserMgo) TakeNotification(ctx context.Context, level int64) (user []*mo } func (u *UserMgo) TakeGTEAppManagerLevel(ctx context.Context, level int64) (user []*model.User, err error) { - return mongoutil.Find[*model.User](ctx, u.coll, bson.M{"app_manager_level": bson.M{"$gte": level}}) + return mongoutil.Find[*model.User](ctx, u.coll, bson.M{"app_manger_level": bson.M{"$gte": level}}) } func (u *UserMgo) TakeByNickname(ctx context.Context, nickname string) (user []*model.User, err error) { diff --git a/pkg/common/storage/kafka/config.go b/pkg/common/storage/kafka/config.go new file mode 100644 index 000000000..1c9c4b0a0 --- /dev/null +++ b/pkg/common/storage/kafka/config.go @@ -0,0 +1,33 @@ +// Copyright © 2024 OpenIM open source community. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +type TLSConfig struct { + EnableTLS bool `yaml:"enableTLS"` + CACrt string `yaml:"caCrt"` + ClientCrt string `yaml:"clientCrt"` + ClientKey string `yaml:"clientKey"` + ClientKeyPwd string `yaml:"clientKeyPwd"` + InsecureSkipVerify bool `yaml:"insecureSkipVerify"` +} + +type Config struct { + Username string `yaml:"username"` + Password string `yaml:"password"` + ProducerAck string `yaml:"producerAck"` + CompressType string `yaml:"compressType"` + Addr []string `yaml:"addr"` + TLS TLSConfig `yaml:"tls"` +} diff --git a/pkg/common/storage/kafka/consumer_group.go b/pkg/common/storage/kafka/consumer_group.go new file mode 100644 index 000000000..f0e84bbc9 --- /dev/null +++ b/pkg/common/storage/kafka/consumer_group.go @@ -0,0 +1,68 @@ +// Copyright © 2023 OpenIM. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "context" + "errors" + + "github.com/IBM/sarama" + "github.com/openimsdk/tools/log" +) + +type MConsumerGroup struct { + sarama.ConsumerGroup + groupID string + topics []string +} + +func NewMConsumerGroup(conf *Config, groupID string, topics []string, autoCommitEnable bool) (*MConsumerGroup, error) { + config, err := BuildConsumerGroupConfig(conf, sarama.OffsetNewest, autoCommitEnable) + if err != nil { + return nil, err + } + group, err := NewConsumerGroup(config, conf.Addr, groupID) + if err != nil { + return nil, err + } + return &MConsumerGroup{ + ConsumerGroup: group, + groupID: groupID, + topics: topics, + }, nil +} + +func (mc *MConsumerGroup) GetContextFromMsg(cMsg *sarama.ConsumerMessage) context.Context { + return GetContextWithMQHeader(cMsg.Headers) +} + +func (mc *MConsumerGroup) RegisterHandleAndConsumer(ctx context.Context, handler sarama.ConsumerGroupHandler) { + for { + err := mc.ConsumerGroup.Consume(ctx, mc.topics, handler) + if errors.Is(err, sarama.ErrClosedConsumerGroup) { + return + } + if errors.Is(err, context.Canceled) { + return + } + if err != nil { + log.ZWarn(ctx, "consume err", err, "topic", mc.topics, "groupID", mc.groupID) + } + } +} + +func (mc *MConsumerGroup) Close() error { + return mc.ConsumerGroup.Close() +} diff --git a/pkg/common/storage/kafka/producer.go b/pkg/common/storage/kafka/producer.go new file mode 100644 index 000000000..5f6be29ed --- /dev/null +++ b/pkg/common/storage/kafka/producer.go @@ -0,0 +1,82 @@ +// Copyright © 2023 OpenIM. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "context" + "github.com/IBM/sarama" + "github.com/openimsdk/tools/errs" + "google.golang.org/protobuf/proto" +) + +// Producer represents a Kafka producer. +type Producer struct { + addr []string + topic string + config *sarama.Config + producer sarama.SyncProducer +} + +func NewKafkaProducer(config *sarama.Config, addr []string, topic string) (*Producer, error) { + producer, err := NewProducer(config, addr) + if err != nil { + return nil, err + } + return &Producer{ + addr: addr, + topic: topic, + config: config, + producer: producer, + }, nil +} + +// SendMessage sends a message to the Kafka topic configured in the Producer. +func (p *Producer) SendMessage(ctx context.Context, key string, msg proto.Message) (int32, int64, error) { + // Marshal the protobuf message + bMsg, err := proto.Marshal(msg) + if err != nil { + return 0, 0, errs.WrapMsg(err, "kafka proto Marshal err") + } + if len(bMsg) == 0 { + return 0, 0, errs.WrapMsg(errEmptyMsg, "kafka proto Marshal err") + } + + // Prepare Kafka message + kMsg := &sarama.ProducerMessage{ + Topic: p.topic, + Key: sarama.StringEncoder(key), + Value: sarama.ByteEncoder(bMsg), + } + + // Validate message key and value + if kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 { + return 0, 0, errs.Wrap(errEmptyMsg) + } + + // Attach context metadata as headers + header, err := GetMQHeaderWithContext(ctx) + if err != nil { + return 0, 0, err + } + kMsg.Headers = header + + // Send the message + partition, offset, err := p.producer.SendMessage(kMsg) + if err != nil { + return 0, 0, errs.WrapMsg(err, "p.producer.SendMessage error") + } + + return partition, offset, nil +} diff --git a/pkg/common/storage/kafka/sarama.go b/pkg/common/storage/kafka/sarama.go new file mode 100644 index 000000000..23220b4d0 --- /dev/null +++ b/pkg/common/storage/kafka/sarama.go @@ -0,0 +1,85 @@ +package kafka + +import ( + "bytes" + "strings" + + "github.com/IBM/sarama" + "github.com/openimsdk/tools/errs" +) + +func BuildConsumerGroupConfig(conf *Config, initial int64, autoCommitEnable bool) (*sarama.Config, error) { + kfk := sarama.NewConfig() + kfk.Version = sarama.V2_0_0_0 + kfk.Consumer.Offsets.Initial = initial + kfk.Consumer.Offsets.AutoCommit.Enable = autoCommitEnable + kfk.Consumer.Return.Errors = false + if conf.Username != "" || conf.Password != "" { + kfk.Net.SASL.Enable = true + kfk.Net.SASL.User = conf.Username + kfk.Net.SASL.Password = conf.Password + } + if conf.TLS.EnableTLS { + tls, err := newTLSConfig(conf.TLS.ClientCrt, conf.TLS.ClientKey, conf.TLS.CACrt, []byte(conf.TLS.ClientKeyPwd), conf.TLS.InsecureSkipVerify) + if err != nil { + return nil, err + } + kfk.Net.TLS.Config = tls + kfk.Net.TLS.Enable = true + } + return kfk, nil +} + +func NewConsumerGroup(conf *sarama.Config, addr []string, groupID string) (sarama.ConsumerGroup, error) { + cg, err := sarama.NewConsumerGroup(addr, groupID, conf) + if err != nil { + return nil, errs.WrapMsg(err, "NewConsumerGroup failed", "addr", addr, "groupID", groupID, "conf", *conf) + } + return cg, nil +} + +func BuildProducerConfig(conf Config) (*sarama.Config, error) { + kfk := sarama.NewConfig() + kfk.Producer.Return.Successes = true + kfk.Producer.Return.Errors = true + kfk.Producer.Partitioner = sarama.NewHashPartitioner + if conf.Username != "" || conf.Password != "" { + kfk.Net.SASL.Enable = true + kfk.Net.SASL.User = conf.Username + kfk.Net.SASL.Password = conf.Password + } + switch strings.ToLower(conf.ProducerAck) { + case "no_response": + kfk.Producer.RequiredAcks = sarama.NoResponse + case "wait_for_local": + kfk.Producer.RequiredAcks = sarama.WaitForLocal + case "wait_for_all": + kfk.Producer.RequiredAcks = sarama.WaitForAll + default: + kfk.Producer.RequiredAcks = sarama.WaitForAll + } + if conf.CompressType == "" { + kfk.Producer.Compression = sarama.CompressionNone + } else { + if err := kfk.Producer.Compression.UnmarshalText(bytes.ToLower([]byte(conf.CompressType))); err != nil { + return nil, errs.WrapMsg(err, "UnmarshalText failed", "compressType", conf.CompressType) + } + } + if conf.TLS.EnableTLS { + tls, err := newTLSConfig(conf.TLS.ClientCrt, conf.TLS.ClientKey, conf.TLS.CACrt, []byte(conf.TLS.ClientKeyPwd), conf.TLS.InsecureSkipVerify) + if err != nil { + return nil, err + } + kfk.Net.TLS.Config = tls + kfk.Net.TLS.Enable = true + } + return kfk, nil +} + +func NewProducer(conf *sarama.Config, addr []string) (sarama.SyncProducer, error) { + producer, err := sarama.NewSyncProducer(addr, conf) + if err != nil { + return nil, errs.WrapMsg(err, "NewSyncProducer failed", "addr", addr, "conf", *conf) + } + return producer, nil +} diff --git a/pkg/common/storage/kafka/tls.go b/pkg/common/storage/kafka/tls.go new file mode 100644 index 000000000..00c89dcc1 --- /dev/null +++ b/pkg/common/storage/kafka/tls.go @@ -0,0 +1,83 @@ +// Copyright © 2024 OpenIM open source community. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "crypto/tls" + "crypto/x509" + "encoding/pem" + "os" + + "github.com/openimsdk/tools/errs" +) + +// decryptPEM decrypts a PEM block using a password. +func decryptPEM(data []byte, passphrase []byte) ([]byte, error) { + if len(passphrase) == 0 { + return data, nil + } + b, _ := pem.Decode(data) + d, err := x509.DecryptPEMBlock(b, passphrase) + if err != nil { + return nil, errs.WrapMsg(err, "DecryptPEMBlock failed") + } + return pem.EncodeToMemory(&pem.Block{ + Type: b.Type, + Bytes: d, + }), nil +} + +func readEncryptablePEMBlock(path string, pwd []byte) ([]byte, error) { + data, err := os.ReadFile(path) + if err != nil { + return nil, errs.WrapMsg(err, "ReadFile failed", "path", path) + } + return decryptPEM(data, pwd) +} + +// newTLSConfig setup the TLS config from general config file. +func newTLSConfig(clientCertFile, clientKeyFile, caCertFile string, keyPwd []byte, insecureSkipVerify bool) (*tls.Config, error) { + var tlsConfig tls.Config + if clientCertFile != "" && clientKeyFile != "" { + certPEMBlock, err := os.ReadFile(clientCertFile) + if err != nil { + return nil, errs.WrapMsg(err, "ReadFile failed", "clientCertFile", clientCertFile) + } + keyPEMBlock, err := readEncryptablePEMBlock(clientKeyFile, keyPwd) + if err != nil { + return nil, err + } + + cert, err := tls.X509KeyPair(certPEMBlock, keyPEMBlock) + if err != nil { + return nil, errs.WrapMsg(err, "X509KeyPair failed") + } + tlsConfig.Certificates = []tls.Certificate{cert} + } + + if caCertFile != "" { + caCert, err := os.ReadFile(caCertFile) + if err != nil { + return nil, errs.WrapMsg(err, "ReadFile failed", "caCertFile", caCertFile) + } + caCertPool := x509.NewCertPool() + if ok := caCertPool.AppendCertsFromPEM(caCert); !ok { + return nil, errs.New("AppendCertsFromPEM failed") + } + tlsConfig.RootCAs = caCertPool + } + tlsConfig.InsecureSkipVerify = insecureSkipVerify + return &tlsConfig, nil +} diff --git a/pkg/common/storage/kafka/util.go b/pkg/common/storage/kafka/util.go new file mode 100644 index 000000000..61abe5450 --- /dev/null +++ b/pkg/common/storage/kafka/util.go @@ -0,0 +1,34 @@ +package kafka + +import ( + "context" + "errors" + "github.com/IBM/sarama" + "github.com/openimsdk/protocol/constant" + "github.com/openimsdk/tools/mcontext" +) + +var errEmptyMsg = errors.New("kafka binary msg is empty") + +// GetMQHeaderWithContext extracts message queue headers from the context. +func GetMQHeaderWithContext(ctx context.Context) ([]sarama.RecordHeader, error) { + operationID, opUserID, platform, connID, err := mcontext.GetCtxInfos(ctx) + if err != nil { + return nil, err + } + return []sarama.RecordHeader{ + {Key: []byte(constant.OperationID), Value: []byte(operationID)}, + {Key: []byte(constant.OpUserID), Value: []byte(opUserID)}, + {Key: []byte(constant.OpUserPlatform), Value: []byte(platform)}, + {Key: []byte(constant.ConnID), Value: []byte(connID)}, + }, nil +} + +// GetContextWithMQHeader creates a context from message queue headers. +func GetContextWithMQHeader(header []*sarama.RecordHeader) context.Context { + var values []string + for _, recordHeader := range header { + values = append(values, string(recordHeader.Value)) + } + return mcontext.WithMustInfoCtx(values) // Attach extracted values to context +} diff --git a/pkg/common/storage/kafka/verify.go b/pkg/common/storage/kafka/verify.go new file mode 100644 index 000000000..0a09eed4e --- /dev/null +++ b/pkg/common/storage/kafka/verify.go @@ -0,0 +1,79 @@ +// Copyright © 2024 OpenIM open source community. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "context" + "fmt" + + "github.com/IBM/sarama" + "github.com/openimsdk/tools/errs" +) + +func CheckTopics(ctx context.Context, conf *Config, topics []string) error { + kfk, err := BuildConsumerGroupConfig(conf, sarama.OffsetNewest, false) + if err != nil { + return err + } + cli, err := sarama.NewClient(conf.Addr, kfk) + if err != nil { + return errs.WrapMsg(err, "NewClient failed", "config: ", fmt.Sprintf("%+v", conf)) + } + defer cli.Close() + + existingTopics, err := cli.Topics() + if err != nil { + return errs.WrapMsg(err, "Failed to list topics") + } + + existingTopicsMap := make(map[string]bool) + for _, t := range existingTopics { + existingTopicsMap[t] = true + } + + for _, topic := range topics { + if !existingTopicsMap[topic] { + return errs.New("topic not exist", "topic", topic).Wrap() + } + } + return nil +} + +func CheckHealth(ctx context.Context, conf *Config) error { + kfk, err := BuildConsumerGroupConfig(conf, sarama.OffsetNewest, false) + if err != nil { + return err + } + cli, err := sarama.NewClient(conf.Addr, kfk) + if err != nil { + return errs.WrapMsg(err, "NewClient failed", "config: ", fmt.Sprintf("%+v", conf)) + } + defer cli.Close() + + // Get broker list + brokers := cli.Brokers() + if len(brokers) == 0 { + return errs.New("no brokers found").Wrap() + } + + // Check if all brokers are reachable + for _, broker := range brokers { + if err := broker.Open(kfk); err != nil { + return errs.WrapMsg(err, "failed to open broker", "broker", broker.Addr()) + } + } + + return nil +} diff --git a/pkg/msgprocessor/conversation.go b/pkg/msgprocessor/conversation.go index 04d772d16..137eb4457 100644 --- a/pkg/msgprocessor/conversation.go +++ b/pkg/msgprocessor/conversation.go @@ -109,7 +109,7 @@ func GetConversationIDBySessionType(sessionType int, ids ...string) string { case constant.ReadGroupChatType: return "sg_" + ids[0] // super group chat case constant.NotificationChatType: - return "sn_" + ids[0] // server notification chat + return "sn_" + strings.Join(ids, "_") // server notification chat } return "" } diff --git a/tools/seq/internal/seq.go b/tools/seq/internal/seq.go index e90c4a41b..b14ebbf70 100644 --- a/tools/seq/internal/seq.go +++ b/tools/seq/internal/seq.go @@ -72,7 +72,7 @@ func Main(conf string, del time.Duration) error { if err != nil { return err } - + mongodbConfig, err := readConfig[config.Mongo](conf, config.MongodbConfigFileName) if err != nil { return err diff --git a/tools/stress-test-v2/main.go b/tools/stress-test-v2/main.go new file mode 100644 index 000000000..8adcbd62c --- /dev/null +++ b/tools/stress-test-v2/main.go @@ -0,0 +1,735 @@ +package main + +import ( + "bytes" + "context" + "encoding/json" + "flag" + "fmt" + "io" + "net/http" + "os" + "os/signal" + "sync" + "syscall" + "time" + + "github.com/openimsdk/open-im-server/v3/pkg/apistruct" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/protocol/auth" + "github.com/openimsdk/protocol/constant" + "github.com/openimsdk/protocol/group" + "github.com/openimsdk/protocol/sdkws" + pbuser "github.com/openimsdk/protocol/user" + "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/system/program" +) + +// 1. Create 100K New Users +// 2. Create 100 100K Groups +// 3. Create 1000 999 Groups +// 4. Send message to 100K Groups every second +// 5. Send message to 999 Groups every minute + +var ( + // Use default userIDs List for testing, need to be created. + TestTargetUserList = []string{ + // "", + } + // DefaultGroupID = "" // Use default group ID for testing, need to be created. +) + +var ( + ApiAddress string + + // API method + GetAdminToken = "/auth/get_admin_token" + UserCheck = "/user/account_check" + CreateUser = "/user/user_register" + ImportFriend = "/friend/import_friend" + InviteToGroup = "/group/invite_user_to_group" + GetGroupMemberInfo = "/group/get_group_members_info" + SendMsg = "/msg/send_msg" + CreateGroup = "/group/create_group" + GetUserToken = "/auth/user_token" +) + +const ( + MaxUser = 100000 + Max100KGroup = 100 + Max999Group = 1000 + MaxInviteUserLimit = 999 + + CreateUserTicker = 1 * time.Second + CreateGroupTicker = 1 * time.Second + Create100KGroupTicker = 1 * time.Second + Create999GroupTicker = 1 * time.Second + SendMsgTo100KGroupTicker = 1 * time.Second + SendMsgTo999GroupTicker = 1 * time.Minute +) + +type BaseResp struct { + ErrCode int `json:"errCode"` + ErrMsg string `json:"errMsg"` + Data json.RawMessage `json:"data"` +} + +type StressTest struct { + Conf *conf + AdminUserID string + AdminToken string + DefaultGroupID string + DefaultUserID string + UserCounter int + CreateUserCounter int + Create100kGroupCounter int + Create999GroupCounter int + MsgCounter int + CreatedUsers []string + CreatedGroups []string + Mutex sync.Mutex + Ctx context.Context + Cancel context.CancelFunc + HttpClient *http.Client + Wg sync.WaitGroup + Once sync.Once +} + +type conf struct { + Share config.Share + Api config.API +} + +func initConfig(configDir string) (*config.Share, *config.API, error) { + var ( + share = &config.Share{} + apiConfig = &config.API{} + ) + + err := config.Load(configDir, config.ShareFileName, config.EnvPrefixMap[config.ShareFileName], share) + if err != nil { + return nil, nil, err + } + + err = config.Load(configDir, config.OpenIMAPICfgFileName, config.EnvPrefixMap[config.OpenIMAPICfgFileName], apiConfig) + if err != nil { + return nil, nil, err + } + + return share, apiConfig, nil +} + +// Post Request +func (st *StressTest) PostRequest(ctx context.Context, url string, reqbody any) ([]byte, error) { + // Marshal body + jsonBody, err := json.Marshal(reqbody) + if err != nil { + log.ZError(ctx, "Failed to marshal request body", err, "url", url, "reqbody", reqbody) + return nil, err + } + + req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(jsonBody)) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("operationID", st.AdminUserID) + if st.AdminToken != "" { + req.Header.Set("token", st.AdminToken) + } + + // log.ZInfo(ctx, "Header info is ", "Content-Type", "application/json", "operationID", st.AdminUserID, "token", st.AdminToken) + + resp, err := st.HttpClient.Do(req) + if err != nil { + log.ZError(ctx, "Failed to send request", err, "url", url, "reqbody", reqbody) + return nil, err + } + defer resp.Body.Close() + + respBody, err := io.ReadAll(resp.Body) + if err != nil { + log.ZError(ctx, "Failed to read response body", err, "url", url) + return nil, err + } + + var baseResp BaseResp + if err := json.Unmarshal(respBody, &baseResp); err != nil { + log.ZError(ctx, "Failed to unmarshal response body", err, "url", url, "respBody", string(respBody)) + return nil, err + } + + if baseResp.ErrCode != 0 { + err = fmt.Errorf(baseResp.ErrMsg) + log.ZError(ctx, "Failed to send request", err, "url", url, "reqbody", reqbody, "resp", baseResp) + return nil, err + } + + return baseResp.Data, nil +} + +func (st *StressTest) GetAdminToken(ctx context.Context) (string, error) { + req := auth.GetAdminTokenReq{ + Secret: st.Conf.Share.Secret, + UserID: st.AdminUserID, + } + + resp, err := st.PostRequest(ctx, ApiAddress+GetAdminToken, &req) + if err != nil { + return "", err + } + + data := &auth.GetAdminTokenResp{} + if err := json.Unmarshal(resp, &data); err != nil { + return "", err + } + + return data.Token, nil +} + +func (st *StressTest) CheckUser(ctx context.Context, userIDs []string) ([]string, error) { + req := pbuser.AccountCheckReq{ + CheckUserIDs: userIDs, + } + + resp, err := st.PostRequest(ctx, ApiAddress+UserCheck, &req) + if err != nil { + return nil, err + } + + data := &pbuser.AccountCheckResp{} + if err := json.Unmarshal(resp, &data); err != nil { + return nil, err + } + + unRegisteredUserIDs := make([]string, 0) + + for _, res := range data.Results { + if res.AccountStatus == constant.UnRegistered { + unRegisteredUserIDs = append(unRegisteredUserIDs, res.UserID) + } + } + + return unRegisteredUserIDs, nil +} + +func (st *StressTest) CreateUser(ctx context.Context, userID string) (string, error) { + user := &sdkws.UserInfo{ + UserID: userID, + Nickname: userID, + } + + req := pbuser.UserRegisterReq{ + Users: []*sdkws.UserInfo{user}, + } + + _, err := st.PostRequest(ctx, ApiAddress+CreateUser, &req) + if err != nil { + return "", err + } + + st.UserCounter++ + return userID, nil +} + +func (st *StressTest) CreateUserBatch(ctx context.Context, userIDs []string) error { + // The method can import a large number of users at once. + var userList []*sdkws.UserInfo + + defer st.Once.Do( + func() { + st.DefaultUserID = userIDs[0] + fmt.Println("Default Send User Created ID:", st.DefaultUserID) + }) + + needUserIDs, err := st.CheckUser(ctx, userIDs) + if err != nil { + return err + } + + for _, userID := range needUserIDs { + user := &sdkws.UserInfo{ + UserID: userID, + Nickname: userID, + } + userList = append(userList, user) + } + + req := pbuser.UserRegisterReq{ + Users: userList, + } + + _, err = st.PostRequest(ctx, ApiAddress+CreateUser, &req) + if err != nil { + return err + } + + st.UserCounter += len(userList) + return nil +} + +func (st *StressTest) GetGroupMembersInfo(ctx context.Context, groupID string, userIDs []string) ([]string, error) { + needInviteUserIDs := make([]string, 0) + + const maxBatchSize = 500 + if len(userIDs) > maxBatchSize { + for i := 0; i < len(userIDs); i += maxBatchSize { + end := min(i+maxBatchSize, len(userIDs)) + batchUserIDs := userIDs[i:end] + + // log.ZInfo(ctx, "Processing group members batch", "groupID", groupID, "batch", i/maxBatchSize+1, + // "batchUserCount", len(batchUserIDs)) + + // Process a single batch + batchReq := group.GetGroupMembersInfoReq{ + GroupID: groupID, + UserIDs: batchUserIDs, + } + + resp, err := st.PostRequest(ctx, ApiAddress+GetGroupMemberInfo, &batchReq) + if err != nil { + log.ZError(ctx, "Batch query failed", err, "batch", i/maxBatchSize+1) + continue + } + + data := &group.GetGroupMembersInfoResp{} + if err := json.Unmarshal(resp, &data); err != nil { + log.ZError(ctx, "Failed to parse batch response", err, "batch", i/maxBatchSize+1) + continue + } + + // Process the batch results + existingMembers := make(map[string]bool) + for _, member := range data.Members { + existingMembers[member.UserID] = true + } + + for _, userID := range batchUserIDs { + if !existingMembers[userID] { + needInviteUserIDs = append(needInviteUserIDs, userID) + } + } + } + + return needInviteUserIDs, nil + } + + req := group.GetGroupMembersInfoReq{ + GroupID: groupID, + UserIDs: userIDs, + } + + resp, err := st.PostRequest(ctx, ApiAddress+GetGroupMemberInfo, &req) + if err != nil { + return nil, err + } + + data := &group.GetGroupMembersInfoResp{} + if err := json.Unmarshal(resp, &data); err != nil { + return nil, err + } + + existingMembers := make(map[string]bool) + for _, member := range data.Members { + existingMembers[member.UserID] = true + } + + for _, userID := range userIDs { + if !existingMembers[userID] { + needInviteUserIDs = append(needInviteUserIDs, userID) + } + } + + return needInviteUserIDs, nil +} + +func (st *StressTest) InviteToGroup(ctx context.Context, groupID string, userIDs []string) error { + req := group.InviteUserToGroupReq{ + GroupID: groupID, + InvitedUserIDs: userIDs, + } + _, err := st.PostRequest(ctx, ApiAddress+InviteToGroup, &req) + if err != nil { + return err + } + + return nil +} + +func (st *StressTest) SendMsg(ctx context.Context, userID string, groupID string) error { + contentObj := map[string]any{ + // "content": fmt.Sprintf("index %d. The current time is %s", st.MsgCounter, time.Now().Format("2006-01-02 15:04:05.000")), + "content": fmt.Sprintf("The current time is %s", time.Now().Format("2006-01-02 15:04:05.000")), + } + + req := &apistruct.SendMsgReq{ + SendMsg: apistruct.SendMsg{ + SendID: userID, + SenderNickname: userID, + GroupID: groupID, + ContentType: constant.Text, + SessionType: constant.ReadGroupChatType, + Content: contentObj, + }, + } + + _, err := st.PostRequest(ctx, ApiAddress+SendMsg, &req) + if err != nil { + log.ZError(ctx, "Failed to send message", err, "userID", userID, "req", &req) + return err + } + + st.MsgCounter++ + + return nil +} + +// Max userIDs number is 1000 +func (st *StressTest) CreateGroup(ctx context.Context, groupID string, userID string, userIDsList []string) (string, error) { + groupInfo := &sdkws.GroupInfo{ + GroupID: groupID, + GroupName: groupID, + GroupType: constant.WorkingGroup, + } + + req := group.CreateGroupReq{ + OwnerUserID: userID, + MemberUserIDs: userIDsList, + GroupInfo: groupInfo, + } + + resp := group.CreateGroupResp{} + + response, err := st.PostRequest(ctx, ApiAddress+CreateGroup, &req) + if err != nil { + return "", err + } + + if err := json.Unmarshal(response, &resp); err != nil { + return "", err + } + + // st.GroupCounter++ + + return resp.GroupInfo.GroupID, nil +} + +func main() { + var configPath string + // defaultConfigDir := filepath.Join("..", "..", "..", "..", "..", "config") + // flag.StringVar(&configPath, "c", defaultConfigDir, "config path") + flag.StringVar(&configPath, "c", "", "config path") + flag.Parse() + + if configPath == "" { + _, _ = fmt.Fprintln(os.Stderr, "config path is empty") + os.Exit(1) + return + } + + fmt.Printf(" Config Path: %s\n", configPath) + + share, apiConfig, err := initConfig(configPath) + if err != nil { + program.ExitWithError(err) + return + } + + ApiAddress = fmt.Sprintf("http://%s:%s", "127.0.0.1", fmt.Sprint(apiConfig.Api.Ports[0])) + + ctx, cancel := context.WithCancel(context.Background()) + // ch := make(chan struct{}) + + st := &StressTest{ + Conf: &conf{ + Share: *share, + Api: *apiConfig, + }, + AdminUserID: share.IMAdminUser.UserIDs[0], + Ctx: ctx, + Cancel: cancel, + HttpClient: &http.Client{ + Timeout: 50 * time.Second, + }, + } + + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + go func() { + <-c + fmt.Println("\nReceived stop signal, stopping...") + + go func() { + // time.Sleep(5 * time.Second) + fmt.Println("Force exit") + os.Exit(0) + }() + + st.Cancel() + }() + + token, err := st.GetAdminToken(st.Ctx) + if err != nil { + log.ZError(ctx, "Get Admin Token failed.", err, "AdminUserID", st.AdminUserID) + } + + st.AdminToken = token + fmt.Println("Admin Token:", st.AdminToken) + fmt.Println("ApiAddress:", ApiAddress) + for i := 0; i < MaxUser; i++ { + userID := fmt.Sprintf("v2_StressTest_User_%d", i) + st.CreatedUsers = append(st.CreatedUsers, userID) + st.CreateUserCounter++ + } + + // err = st.CreateUserBatch(st.Ctx, st.CreatedUsers) + // if err != nil { + // log.ZError(ctx, "Create user failed.", err) + // } + + const batchSize = 1000 + totalUsers := len(st.CreatedUsers) + successCount := 0 + + if st.DefaultUserID == "" && len(st.CreatedUsers) > 0 { + st.DefaultUserID = st.CreatedUsers[0] + } + + for i := 0; i < totalUsers; i += batchSize { + end := min(i+batchSize, totalUsers) + + userBatch := st.CreatedUsers[i:end] + log.ZInfo(st.Ctx, "Creating user batch", "batch", i/batchSize+1, "count", len(userBatch)) + + err = st.CreateUserBatch(st.Ctx, userBatch) + if err != nil { + log.ZError(st.Ctx, "Batch user creation failed", err, "batch", i/batchSize+1) + } else { + successCount += len(userBatch) + log.ZInfo(st.Ctx, "Batch user creation succeeded", "batch", i/batchSize+1, + "progress", fmt.Sprintf("%d/%d", successCount, totalUsers)) + } + } + + // Execute create 100k group + st.Wg.Add(1) + go func() { + defer st.Wg.Done() + + create100kGroupTicker := time.NewTicker(Create100KGroupTicker) + defer create100kGroupTicker.Stop() + + for i := 0; i < Max100KGroup; i++ { + select { + case <-st.Ctx.Done(): + log.ZInfo(st.Ctx, "Stop Create 100K Group") + return + + case <-create100kGroupTicker.C: + // Create 100K groups + st.Wg.Add(1) + go func(idx int) { + defer st.Wg.Done() + defer func() { + st.Create100kGroupCounter++ + }() + + groupID := fmt.Sprintf("v2_StressTest_Group_100K_%d", idx) + + if _, err = st.CreateGroup(st.Ctx, groupID, st.DefaultUserID, TestTargetUserList); err != nil { + log.ZError(st.Ctx, "Create group failed.", err) + // continue + } + + for i := 0; i < MaxUser/MaxInviteUserLimit; i++ { + InviteUserIDs := make([]string, 0) + // ensure TargetUserList is in group + InviteUserIDs = append(InviteUserIDs, TestTargetUserList...) + + startIdx := max(i*MaxInviteUserLimit, 1) + endIdx := min((i+1)*MaxInviteUserLimit, MaxUser) + + for j := startIdx; j < endIdx; j++ { + userCreatedID := fmt.Sprintf("v2_StressTest_User_%d", j) + InviteUserIDs = append(InviteUserIDs, userCreatedID) + } + + if len(InviteUserIDs) == 0 { + log.ZWarn(st.Ctx, "InviteUserIDs is empty", nil, "groupID", groupID) + continue + } + + InviteUserIDs, err := st.GetGroupMembersInfo(ctx, groupID, InviteUserIDs) + if err != nil { + log.ZError(st.Ctx, "GetGroupMembersInfo failed.", err, "groupID", groupID) + continue + } + + if len(InviteUserIDs) == 0 { + log.ZWarn(st.Ctx, "InviteUserIDs is empty", nil, "groupID", groupID) + continue + } + + // Invite To Group + if err = st.InviteToGroup(st.Ctx, groupID, InviteUserIDs); err != nil { + log.ZError(st.Ctx, "Invite To Group failed.", err, "UserID", InviteUserIDs) + continue + // os.Exit(1) + // return + } + } + }(i) + } + } + }() + + // create 999 groups + st.Wg.Add(1) + go func() { + defer st.Wg.Done() + + create999GroupTicker := time.NewTicker(Create999GroupTicker) + defer create999GroupTicker.Stop() + + for i := 0; i < Max999Group; i++ { + select { + case <-st.Ctx.Done(): + log.ZInfo(st.Ctx, "Stop Create 999 Group") + return + + case <-create999GroupTicker.C: + // Create 999 groups + st.Wg.Add(1) + go func(idx int) { + defer st.Wg.Done() + defer func() { + st.Create999GroupCounter++ + }() + + groupID := fmt.Sprintf("v2_StressTest_Group_1K_%d", idx) + + if _, err = st.CreateGroup(st.Ctx, groupID, st.DefaultUserID, TestTargetUserList); err != nil { + log.ZError(st.Ctx, "Create group failed.", err) + // continue + } + for i := 0; i < MaxUser/MaxInviteUserLimit; i++ { + InviteUserIDs := make([]string, 0) + // ensure TargetUserList is in group + InviteUserIDs = append(InviteUserIDs, TestTargetUserList...) + + startIdx := max(i*MaxInviteUserLimit, 1) + endIdx := min((i+1)*MaxInviteUserLimit, MaxUser) + + for j := startIdx; j < endIdx; j++ { + userCreatedID := fmt.Sprintf("v2_StressTest_User_%d", j) + InviteUserIDs = append(InviteUserIDs, userCreatedID) + } + + if len(InviteUserIDs) == 0 { + log.ZWarn(st.Ctx, "InviteUserIDs is empty", nil, "groupID", groupID) + continue + } + + InviteUserIDs, err := st.GetGroupMembersInfo(ctx, groupID, InviteUserIDs) + if err != nil { + log.ZError(st.Ctx, "GetGroupMembersInfo failed.", err, "groupID", groupID) + continue + } + + if len(InviteUserIDs) == 0 { + log.ZWarn(st.Ctx, "InviteUserIDs is empty", nil, "groupID", groupID) + continue + } + + // Invite To Group + if err = st.InviteToGroup(st.Ctx, groupID, InviteUserIDs); err != nil { + log.ZError(st.Ctx, "Invite To Group failed.", err, "UserID", InviteUserIDs) + continue + // os.Exit(1) + // return + } + } + }(i) + } + } + }() + + // Send message to 100K groups + st.Wg.Wait() + fmt.Println("All groups created successfully, starting to send messages...") + log.ZInfo(ctx, "All groups created successfully, starting to send messages...") + + var groups100K []string + var groups999 []string + + for i := 0; i < Max100KGroup; i++ { + groupID := fmt.Sprintf("v2_StressTest_Group_100K_%d", i) + groups100K = append(groups100K, groupID) + } + + for i := 0; i < Max999Group; i++ { + groupID := fmt.Sprintf("v2_StressTest_Group_1K_%d", i) + groups999 = append(groups999, groupID) + } + + send100kGroupLimiter := make(chan struct{}, 20) + send999GroupLimiter := make(chan struct{}, 100) + + // execute Send message to 100K groups + go func() { + ticker := time.NewTicker(SendMsgTo100KGroupTicker) + defer ticker.Stop() + + for { + select { + case <-st.Ctx.Done(): + log.ZInfo(st.Ctx, "Stop Send Message to 100K Group") + return + + case <-ticker.C: + // Send message to 100K groups + for _, groupID := range groups100K { + send100kGroupLimiter <- struct{}{} + go func(groupID string) { + defer func() { <-send100kGroupLimiter }() + if err := st.SendMsg(st.Ctx, st.DefaultUserID, groupID); err != nil { + log.ZError(st.Ctx, "Send message to 100K group failed.", err) + } + }(groupID) + } + // log.ZInfo(st.Ctx, "Send message to 100K groups successfully.") + } + } + }() + + // execute Send message to 999 groups + go func() { + ticker := time.NewTicker(SendMsgTo999GroupTicker) + defer ticker.Stop() + + for { + select { + case <-st.Ctx.Done(): + log.ZInfo(st.Ctx, "Stop Send Message to 999 Group") + return + + case <-ticker.C: + // Send message to 999 groups + for _, groupID := range groups999 { + send999GroupLimiter <- struct{}{} + go func(groupID string) { + defer func() { <-send999GroupLimiter }() + + if err := st.SendMsg(st.Ctx, st.DefaultUserID, groupID); err != nil { + log.ZError(st.Ctx, "Send message to 999 group failed.", err) + } + }(groupID) + } + // log.ZInfo(st.Ctx, "Send message to 999 groups successfully.") + } + } + }() + + <-st.Ctx.Done() + fmt.Println("Received signal to exit, shutting down...") +}