mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-10-27 05:52:29 +08:00 
			
		
		
		
	Merge branch 'openimsdk:main' into main
This commit is contained in:
		
						commit
						8653d19242
					
				
							
								
								
									
										65
									
								
								.github/workflows/cleanup-after-milestone-prs-merged.yml
									
									
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										65
									
								
								.github/workflows/cleanup-after-milestone-prs-merged.yml
									
									
									
									
										vendored
									
									
										Normal file
									
								
							| @ -0,0 +1,65 @@ | |||||||
|  | name: Cleanup After Milestone PRs Merged | ||||||
|  | 
 | ||||||
|  | on: | ||||||
|  |   pull_request: | ||||||
|  |     types: | ||||||
|  |       - closed | ||||||
|  | 
 | ||||||
|  | jobs: | ||||||
|  |   handle_pr: | ||||||
|  |     runs-on: ubuntu-latest | ||||||
|  | 
 | ||||||
|  |     steps: | ||||||
|  |     - name: Checkout repository | ||||||
|  |       uses: actions/checkout@v4.2.0 | ||||||
|  | 
 | ||||||
|  |     - name: Get the PR title and extract PR numbers | ||||||
|  |       id: extract_pr_numbers | ||||||
|  |       run: | | ||||||
|  |         # Get the PR title | ||||||
|  |         PR_TITLE="${{ github.event.pull_request.title }}" | ||||||
|  | 
 | ||||||
|  |         echo "PR Title: $PR_TITLE" | ||||||
|  | 
 | ||||||
|  |         # Extract PR numbers from the title | ||||||
|  |         PR_NUMBERS=$(echo "$PR_TITLE" | grep -oE "#[0-9]+" | tr -d '#' | tr '\n' ' ') | ||||||
|  |         echo "Extracted PR Numbers: $PR_NUMBERS" | ||||||
|  | 
 | ||||||
|  |         # Save PR numbers to a file | ||||||
|  |         echo "$PR_NUMBERS" > pr_numbers.txt | ||||||
|  |         echo "Saved PR Numbers to pr_numbers.txt" | ||||||
|  | 
 | ||||||
|  |         # Check if the title matches a specific pattern | ||||||
|  |         if echo "$PR_TITLE" | grep -qE "^deps: Merge( #[0-9]+)+ PRs into .+"; then | ||||||
|  |           echo "proceed=true" >> $GITHUB_OUTPUT | ||||||
|  |         else | ||||||
|  |           echo "proceed=false" >> $GITHUB_OUTPUT | ||||||
|  |         fi | ||||||
|  | 
 | ||||||
|  |     - name: Use extracted PR numbers and label PRs | ||||||
|  |       if: (steps.extract_pr_numbers.outputs.proceed == 'true' || contains(github.event.pull_request.labels.*.name, 'milestone-merge')) && github.event.pull_request.merged == true | ||||||
|  |       run: | | ||||||
|  |         # Read the previously saved PR numbers | ||||||
|  |         PR_NUMBERS=$(cat pr_numbers.txt) | ||||||
|  |         echo "Using extracted PR Numbers: $PR_NUMBERS" | ||||||
|  | 
 | ||||||
|  |         # Loop through each PR number and add label | ||||||
|  |         for PR_NUMBER in $PR_NUMBERS; do | ||||||
|  |           echo "Adding 'cherry-picked' label to PR #$PR_NUMBER" | ||||||
|  |           curl -X POST \ | ||||||
|  |             -H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" \ | ||||||
|  |             -H "Accept: application/vnd.github+json" \ | ||||||
|  |             https://api.github.com/repos/${{ github.repository }}/issues/$PR_NUMBER/labels \ | ||||||
|  |             -d '{"labels":["cherry-picked"]}' | ||||||
|  |         done | ||||||
|  |       env: | ||||||
|  |         GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} | ||||||
|  | 
 | ||||||
|  |     - name: Delete branch after PR close | ||||||
|  |       if: steps.extract_pr_numbers.outputs.proceed == 'true' || contains(github.event.pull_request.labels.*.name, 'milestone-merge') | ||||||
|  |       run: | | ||||||
|  |         BRANCH_NAME="${{ github.event.pull_request.head.ref }}" | ||||||
|  |         echo "Branch to delete: $BRANCH_NAME" | ||||||
|  |         git push origin --delete "$BRANCH_NAME" | ||||||
|  |       env: | ||||||
|  |         GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} | ||||||
							
								
								
									
										4
									
								
								.github/workflows/go-build-test.yml
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										4
									
								
								.github/workflows/go-build-test.yml
									
									
									
									
										vendored
									
									
								
							| @ -2,11 +2,7 @@ name: Go Build Test | |||||||
| 
 | 
 | ||||||
| on: | on: | ||||||
|   push: |   push: | ||||||
|     branches: |  | ||||||
|       - main |  | ||||||
|   pull_request: |   pull_request: | ||||||
|     branches: |  | ||||||
|       - main |  | ||||||
|     paths-ignore: |     paths-ignore: | ||||||
|       - '**/*.md' |       - '**/*.md' | ||||||
| 
 | 
 | ||||||
|  | |||||||
							
								
								
									
										218
									
								
								.github/workflows/merge-from-milestone.yml
									
									
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										218
									
								
								.github/workflows/merge-from-milestone.yml
									
									
									
									
										vendored
									
									
										Normal file
									
								
							| @ -0,0 +1,218 @@ | |||||||
|  | name: Create Pre-Release PR from Milestone | ||||||
|  | 
 | ||||||
|  | permissions: | ||||||
|  |   contents: write | ||||||
|  |   pull-requests: write | ||||||
|  |   issues: write | ||||||
|  | 
 | ||||||
|  | on: | ||||||
|  |   workflow_dispatch: | ||||||
|  |     inputs: | ||||||
|  |       milestone_name: | ||||||
|  |         description: 'Milestone name to collect closed PRs from' | ||||||
|  |         required: true | ||||||
|  |         default: 'v3.8.2' | ||||||
|  |       target_branch: | ||||||
|  |         description: 'Target branch to merge the consolidated PR' | ||||||
|  |         required: true | ||||||
|  |         default: 'pre-release-v3.8.2' | ||||||
|  | 
 | ||||||
|  | env: | ||||||
|  |   MILESTONE_NAME: ${{ github.event.inputs.milestone_name || 'v3.8.2' }} | ||||||
|  |   TARGET_BRANCH: ${{ github.event.inputs.target_branch || 'pre-release-v3.8.2' }} | ||||||
|  |   GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} | ||||||
|  |   BOT_TOKEN: ${{ secrets.BOT_TOKEN }} | ||||||
|  |   LABEL_NAME: cherry-picked | ||||||
|  |   TEMP_DIR: /tmp  # Using /tmp as the temporary directory | ||||||
|  | 
 | ||||||
|  | jobs: | ||||||
|  |   cherry_pick_milestone_prs: | ||||||
|  |     runs-on: ubuntu-latest | ||||||
|  |     steps: | ||||||
|  |       - name: Setup temp directory | ||||||
|  |         run: | | ||||||
|  |           # Create the temporary directory and initialize necessary files | ||||||
|  |           mkdir -p ${{ env.TEMP_DIR }} | ||||||
|  |           touch ${{ env.TEMP_DIR }}/pr_numbers.txt | ||||||
|  |           touch ${{ env.TEMP_DIR }}/commit_hashes.txt | ||||||
|  |           touch ${{ env.TEMP_DIR }}/pr_title.txt | ||||||
|  |           touch ${{ env.TEMP_DIR }}/pr_body.txt | ||||||
|  |           touch ${{ env.TEMP_DIR }}/created_pr_number.txt | ||||||
|  | 
 | ||||||
|  |       - name: Checkout repository | ||||||
|  |         uses: actions/checkout@v4 | ||||||
|  |         with: | ||||||
|  |           fetch-depth: 0 | ||||||
|  |           token: ${{ secrets.BOT_TOKEN }} | ||||||
|  | 
 | ||||||
|  |       - name: Setup Git User for OpenIM-Robot | ||||||
|  |         run: | | ||||||
|  |           # Set up Git credentials for the bot | ||||||
|  |           git config --global user.email "OpenIM-Robot@users.noreply.github.com" | ||||||
|  |           git config --global user.name "OpenIM-Robot" | ||||||
|  | 
 | ||||||
|  |       - name: Fetch Milestone ID and Filter PR Numbers | ||||||
|  |         env: | ||||||
|  |           MILESTONE_NAME: ${{ env.MILESTONE_NAME }} | ||||||
|  |         run: | | ||||||
|  |           # Fetch milestone details and extract milestone ID | ||||||
|  |           milestones=$(curl -s -H "Authorization: token $BOT_TOKEN" \ | ||||||
|  |             -H "Accept: application/vnd.github+json" \ | ||||||
|  |             "https://api.github.com/repos/${{ github.repository }}/milestones") | ||||||
|  |           milestone_id=$(echo "$milestones" | grep -B3 "\"title\": \"$MILESTONE_NAME\"" | grep '"number":' | head -n1 | grep -o '[0-9]\+') | ||||||
|  |           if [ -z "$milestone_id" ]; then | ||||||
|  |             echo "Milestone '$MILESTONE_NAME' not found. Exiting." | ||||||
|  |             exit 1 | ||||||
|  |           fi | ||||||
|  |           echo "Milestone ID: $milestone_id" | ||||||
|  |           echo "MILESTONE_ID=$milestone_id" >> $GITHUB_ENV | ||||||
|  | 
 | ||||||
|  |           # Fetch issues for the milestone | ||||||
|  |           issues=$(curl -s -H "Authorization: token $BOT_TOKEN" \ | ||||||
|  |                 -H "Accept: application/vnd.github+json" \ | ||||||
|  |                 "https://api.github.com/repos/${{ github.repository }}/issues?milestone=$milestone_id&state=closed&per_page=100") | ||||||
|  | 
 | ||||||
|  |           > ${{ env.TEMP_DIR }}/pr_numbers.txt | ||||||
|  | 
 | ||||||
|  |           # Filter PRs that do not have the 'cherry-picked' label | ||||||
|  |           for pr_number in $(echo "$issues" | jq -r '.[] | select(.pull_request != null) | .number'); do | ||||||
|  |             labels=$(curl -s -H "Authorization: token $BOT_TOKEN" \ | ||||||
|  |               -H "Accept: application/vnd.github+json" \ | ||||||
|  |               "https://api.github.com/repos/${{ github.repository }}/issues/$pr_number/labels" | jq -r '.[].name') | ||||||
|  | 
 | ||||||
|  |             if ! echo "$labels" | grep -q "${LABEL_NAME}"; then | ||||||
|  |               echo "PR #$pr_number does not have the 'cherry-picked' label. Adding to the list." | ||||||
|  |               echo "$pr_number" >> ${{ env.TEMP_DIR }}/pr_numbers.txt | ||||||
|  |             else | ||||||
|  |               echo "PR #$pr_number already has the 'cherry-picked' label. Skipping." | ||||||
|  |             fi | ||||||
|  |           done | ||||||
|  | 
 | ||||||
|  |           # Sort the filtered PR numbers | ||||||
|  |           sort -n ${{ env.TEMP_DIR }}/pr_numbers.txt -o ${{ env.TEMP_DIR }}/pr_numbers.txt | ||||||
|  | 
 | ||||||
|  |           echo "Filtered and sorted PR numbers:" | ||||||
|  |           cat ${{ env.TEMP_DIR }}/pr_numbers.txt || echo "No closed PR numbers found for milestone." | ||||||
|  | 
 | ||||||
|  |       - name: Fetch Merge Commits for PRs and Generate Title and Body | ||||||
|  |         run: | | ||||||
|  |           # Ensure the files are initialized | ||||||
|  |           > ${{ env.TEMP_DIR }}/commit_hashes.txt | ||||||
|  |           > ${{ env.TEMP_DIR }}/pr_title.txt | ||||||
|  |           > ${{ env.TEMP_DIR }}/pr_body.txt | ||||||
|  | 
 | ||||||
|  |           # Write description to the PR body | ||||||
|  |           echo "### Description:" >> ${{ env.TEMP_DIR }}/pr_body.txt | ||||||
|  |           echo "Merging PRs from milestone \`$MILESTONE_NAME\` into target branch \`$TARGET_BRANCH\`." >> ${{ env.TEMP_DIR }}/pr_body.txt | ||||||
|  |           echo "" >> ${{ env.TEMP_DIR }}/pr_body.txt | ||||||
|  |           echo "### Need Merge PRs:" >> ${{ env.TEMP_DIR }}/pr_body.txt | ||||||
|  | 
 | ||||||
|  |           pr_numbers_in_title="" | ||||||
|  | 
 | ||||||
|  |           # Process sorted PR numbers and generate commit hashes | ||||||
|  |           for pr_number in $(cat ${{ env.TEMP_DIR }}/pr_numbers.txt); do | ||||||
|  |             echo "Processing PR #$pr_number" | ||||||
|  |             pr_details=$(curl -s -H "Authorization: token $BOT_TOKEN" \ | ||||||
|  |               -H "Accept: application/vnd.github+json" \ | ||||||
|  |               "https://api.github.com/repos/${{ github.repository }}/pulls/$pr_number") | ||||||
|  |             pr_title=$(echo "$pr_details" | jq -r '.title') | ||||||
|  |             merge_commit=$(echo "$pr_details" | jq -r '.merge_commit_sha') | ||||||
|  |             short_commit_hash=$(echo "$merge_commit" | cut -c 1-7) | ||||||
|  | 
 | ||||||
|  |             # Append PR details to the body | ||||||
|  |             echo "- $pr_title: (#$pr_number) ($short_commit_hash)" >> ${{ env.TEMP_DIR }}/pr_body.txt | ||||||
|  | 
 | ||||||
|  |             if [ "$merge_commit" != "null" ];then | ||||||
|  |               echo "$merge_commit" >> ${{ env.TEMP_DIR }}/commit_hashes.txt | ||||||
|  |               echo "#$pr_number" >> ${{ env.TEMP_DIR }}/pr_title.txt | ||||||
|  |               pr_numbers_in_title="$pr_numbers_in_title #$pr_number" | ||||||
|  |             fi | ||||||
|  |           done | ||||||
|  | 
 | ||||||
|  |           commit_hashes=$(cat ${{ env.TEMP_DIR }}/commit_hashes.txt | tr '\n' ' ') | ||||||
|  |           first_commit_hash=$(head -n 1 ${{ env.TEMP_DIR }}/commit_hashes.txt) | ||||||
|  |           cherry_pick_branch="cherry-pick-${first_commit_hash:0:7}" | ||||||
|  |           echo "COMMIT_HASHES=$commit_hashes" >> $GITHUB_ENV | ||||||
|  |           echo "CHERRY_PICK_BRANCH=$cherry_pick_branch" >> $GITHUB_ENV | ||||||
|  |           echo "pr_numbers_in_title=$pr_numbers_in_title" >> $GITHUB_ENV | ||||||
|  | 
 | ||||||
|  |       - name: Pull and Cherry-pick Commits, Then Push | ||||||
|  |         env: | ||||||
|  |           GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} | ||||||
|  |           BOT_TOKEN: ${{ secrets.BOT_TOKEN }} | ||||||
|  |         run: | | ||||||
|  |           # Fetch and pull the latest changes from the target branch | ||||||
|  |           git fetch origin | ||||||
|  |           git checkout $TARGET_BRANCH | ||||||
|  |           git pull origin $TARGET_BRANCH | ||||||
|  | 
 | ||||||
|  |           # Create a new branch for cherry-picking | ||||||
|  |           git checkout -b $CHERRY_PICK_BRANCH | ||||||
|  | 
 | ||||||
|  |           # Cherry-pick the commits and handle conflicts | ||||||
|  |           for commit_hash in $COMMIT_HASHES; do | ||||||
|  |             echo "Attempting to cherry-pick commit $commit_hash" | ||||||
|  |             if ! git cherry-pick "$commit_hash" --strategy=recursive -X theirs; then | ||||||
|  |               echo "Conflict detected for $commit_hash. Resolving with incoming changes." | ||||||
|  |               conflict_files=$(git diff --name-only --diff-filter=U) | ||||||
|  |               echo "Conflicting files:" | ||||||
|  |               echo "$conflict_files" | ||||||
|  | 
 | ||||||
|  |               for file in $conflict_files; do | ||||||
|  |                 if [ -f "$file" ]; then | ||||||
|  |                   echo "Resolving conflict for $file" | ||||||
|  |                   git add "$file" | ||||||
|  |                 else | ||||||
|  |                   echo "File $file has been deleted. Skipping." | ||||||
|  |                   git rm "$file" | ||||||
|  |                 fi | ||||||
|  |               done | ||||||
|  | 
 | ||||||
|  |               echo "Conflicts resolved. Continuing cherry-pick." | ||||||
|  |               git cherry-pick --continue | ||||||
|  |             else | ||||||
|  |               echo "Cherry-pick successful for commit $commit_hash." | ||||||
|  |             fi | ||||||
|  |           done | ||||||
|  | 
 | ||||||
|  |           # Push the cherry-pick branch to the repository | ||||||
|  |           git remote set-url origin "https://${BOT_TOKEN}@github.com/${{ github.repository }}.git" | ||||||
|  |           git push origin $CHERRY_PICK_BRANCH --force | ||||||
|  | 
 | ||||||
|  |       - name: Create Pull Request | ||||||
|  |         run: | | ||||||
|  |           # Prepare and create the PR | ||||||
|  |           pr_title="deps: Merge ${{ env.pr_numbers_in_title }} PRs into $TARGET_BRANCH" | ||||||
|  |           pr_body=$(cat ${{ env.TEMP_DIR }}/pr_body.txt) | ||||||
|  | 
 | ||||||
|  |           echo "Prepared PR title:" | ||||||
|  |           echo "$pr_title" | ||||||
|  |           echo "Prepared PR body:" | ||||||
|  |           echo "$pr_body" | ||||||
|  | 
 | ||||||
|  |           # Create the PR using the GitHub API | ||||||
|  |           response=$(curl -s -X POST -H "Authorization: token $BOT_TOKEN" \ | ||||||
|  |             -H "Accept: application/vnd.github+json" \ | ||||||
|  |             https://api.github.com/repos/${{ github.repository }}/pulls \ | ||||||
|  |             -d "$(jq -n --arg title "$pr_title" \ | ||||||
|  |               --arg head "$CHERRY_PICK_BRANCH" \ | ||||||
|  |               --arg base "$TARGET_BRANCH" \ | ||||||
|  |               --arg body "$pr_body" \ | ||||||
|  |               '{title: $title, head: $head, base: $base, body: $body}')") | ||||||
|  | 
 | ||||||
|  |           pr_number=$(echo "$response" | jq -r '.number') | ||||||
|  |           echo "$pr_number" > ${{ env.TEMP_DIR }}/created_pr_number.txt | ||||||
|  |           echo "Created PR #$pr_number" | ||||||
|  | 
 | ||||||
|  |       - name: Add Label to Created Pull Request | ||||||
|  |         run: | | ||||||
|  |           # Add 'milestone-merge' label to the created PR | ||||||
|  |           pr_number=$(cat ${{ env.TEMP_DIR }}/created_pr_number.txt) | ||||||
|  |           echo "Adding label to PR #$pr_number" | ||||||
|  | 
 | ||||||
|  |           curl -s -X POST -H "Authorization: token $GITHUB_TOKEN" \ | ||||||
|  |             -H "Accept: application/vnd.github+json" \ | ||||||
|  |             -d '{"labels": ["milestone-merge"]}' \ | ||||||
|  |             "https://api.github.com/repos/${{ github.repository }}/issues/$pr_number/labels" | ||||||
|  | 
 | ||||||
|  |           echo "Added 'milestone-merge' label to PR #$pr_number." | ||||||
| @ -1,20 +1,3 @@ | |||||||
| # Copyright © 2023 OpenIM. All rights reserved. |  | ||||||
| # |  | ||||||
| # Licensed under the Apache License, Version 2.0 (the "License"); |  | ||||||
| # you may not use this file except in compliance with the License. |  | ||||||
| # You may obtain a copy of the License at |  | ||||||
| # |  | ||||||
| #     http://www.apache.org/licenses/LICENSE-2.0 |  | ||||||
| # |  | ||||||
| # Unless required by applicable law or agreed to in writing, software |  | ||||||
| # distributed under the License is distributed on an "AS IS" BASIS, |  | ||||||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |  | ||||||
| # See the License for the specific language governing permissions and |  | ||||||
| # limitations under the License. |  | ||||||
| 
 |  | ||||||
| # Determines if a message should be sent. If set to false, it triggers a silent sync without a message. If true, it requires triggering a conversation. |  | ||||||
| # For rpc notification, send twice: once as a message and once as a notification. |  | ||||||
| # The options field 'isNotification' indicates if it's a notification. |  | ||||||
| groupCreated: | groupCreated: | ||||||
|   isSendMsg: true |   isSendMsg: true | ||||||
| # Reliability level of the message sending. | # Reliability level of the message sending. | ||||||
| @ -309,9 +292,9 @@ userInfoUpdated: | |||||||
|   unreadCount: false |   unreadCount: false | ||||||
|   offlinePush: |   offlinePush: | ||||||
|     enable: true |     enable: true | ||||||
|     title: Remove a blocked user |     title: userInfo updated | ||||||
|     desc: Remove a blocked user |     desc: userInfo updated | ||||||
|     ext: Remove a blocked user |     ext: userInfo updated | ||||||
| 
 | 
 | ||||||
| userStatusChanged: | userStatusChanged: | ||||||
|   isSendMsg: false |   isSendMsg: false | ||||||
|  | |||||||
							
								
								
									
										2
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								go.mod
									
									
									
									
									
								
							| @ -12,7 +12,7 @@ require ( | |||||||
| 	github.com/gorilla/websocket v1.5.1 | 	github.com/gorilla/websocket v1.5.1 | ||||||
| 	github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 | 	github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 | ||||||
| 	github.com/mitchellh/mapstructure v1.5.0 | 	github.com/mitchellh/mapstructure v1.5.0 | ||||||
| 	github.com/openimsdk/protocol v0.0.72-alpha.46 | 	github.com/openimsdk/protocol v0.0.72-alpha.47 | ||||||
| 	github.com/openimsdk/tools v0.0.50-alpha.16 | 	github.com/openimsdk/tools v0.0.50-alpha.16 | ||||||
| 	github.com/pkg/errors v0.9.1 // indirect | 	github.com/pkg/errors v0.9.1 // indirect | ||||||
| 	github.com/prometheus/client_golang v1.18.0 | 	github.com/prometheus/client_golang v1.18.0 | ||||||
|  | |||||||
							
								
								
									
										4
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										4
									
								
								go.sum
									
									
									
									
									
								
							| @ -319,8 +319,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= | |||||||
| github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= | github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= | ||||||
| github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= | github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= | ||||||
| github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= | github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= | ||||||
| github.com/openimsdk/protocol v0.0.72-alpha.46 h1:1LZlfEHLzw1F4afFmqBczmXKJWm5rUQ+yr8rJ4oyEAc= | github.com/openimsdk/protocol v0.0.72-alpha.47 h1:FGHnEwsA05GxT3vnz7YH3fbVkuoO3P71ZZgkQQ71MjA= | ||||||
| github.com/openimsdk/protocol v0.0.72-alpha.46/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= | github.com/openimsdk/protocol v0.0.72-alpha.47/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= | ||||||
| github.com/openimsdk/tools v0.0.50-alpha.16 h1:bC1AQvJMuOHtZm8LZRvN8L5mH1Ws2VYdL+TLTs1iGSc= | github.com/openimsdk/tools v0.0.50-alpha.16 h1:bC1AQvJMuOHtZm8LZRvN8L5mH1Ws2VYdL+TLTs1iGSc= | ||||||
| github.com/openimsdk/tools v0.0.50-alpha.16/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4= | github.com/openimsdk/tools v0.0.50-alpha.16/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4= | ||||||
| github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= | github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= | ||||||
|  | |||||||
| @ -198,6 +198,13 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En | |||||||
| 		objectGroup.POST("/initiate_form_data", t.InitiateFormData) | 		objectGroup.POST("/initiate_form_data", t.InitiateFormData) | ||||||
| 		objectGroup.POST("/complete_form_data", t.CompleteFormData) | 		objectGroup.POST("/complete_form_data", t.CompleteFormData) | ||||||
| 		objectGroup.GET("/*name", t.ObjectRedirect) | 		objectGroup.GET("/*name", t.ObjectRedirect) | ||||||
|  | 
 | ||||||
|  | 		applicationGroup := r.Group("application") | ||||||
|  | 		applicationGroup.POST("/add_version", t.AddApplicationVersion) | ||||||
|  | 		applicationGroup.POST("/update_version", t.UpdateApplicationVersion) | ||||||
|  | 		applicationGroup.POST("/delete_version", t.DeleteApplicationVersion) | ||||||
|  | 		applicationGroup.POST("/latest_version", t.LatestApplicationVersion) | ||||||
|  | 		applicationGroup.POST("/page_versions", t.PageApplicationVersion) | ||||||
| 	} | 	} | ||||||
| 	// Message | 	// Message | ||||||
| 	msgGroup := r.Group("/msg") | 	msgGroup := r.Group("/msg") | ||||||
| @ -290,4 +297,6 @@ func GinParseToken(authRPC *rpcclient.Auth) gin.HandlerFunc { | |||||||
| var Whitelist = []string{ | var Whitelist = []string{ | ||||||
| 	"/auth/get_admin_token", | 	"/auth/get_admin_token", | ||||||
| 	"/auth/parse_token", | 	"/auth/parse_token", | ||||||
|  | 	"/application/latest_version", | ||||||
|  | 	"/application/page_versions", | ||||||
| } | } | ||||||
|  | |||||||
| @ -170,3 +170,23 @@ func (o *ThirdApi) SearchLogs(c *gin.Context) { | |||||||
| func (o *ThirdApi) GetPrometheus(c *gin.Context) { | func (o *ThirdApi) GetPrometheus(c *gin.Context) { | ||||||
| 	c.Redirect(http.StatusFound, o.GrafanaUrl) | 	c.Redirect(http.StatusFound, o.GrafanaUrl) | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | func (o *ThirdApi) LatestApplicationVersion(c *gin.Context) { | ||||||
|  | 	a2r.Call(third.ThirdClient.LatestApplicationVersion, o.Client, c) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (o *ThirdApi) AddApplicationVersion(c *gin.Context) { | ||||||
|  | 	a2r.Call(third.ThirdClient.AddApplicationVersion, o.Client, c) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (o *ThirdApi) UpdateApplicationVersion(c *gin.Context) { | ||||||
|  | 	a2r.Call(third.ThirdClient.UpdateApplicationVersion, o.Client, c) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (o *ThirdApi) DeleteApplicationVersion(c *gin.Context) { | ||||||
|  | 	a2r.Call(third.ThirdClient.DeleteApplicationVersion, o.Client, c) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (o *ThirdApi) PageApplicationVersion(c *gin.Context) { | ||||||
|  | 	a2r.Call(third.ThirdClient.PageApplicationVersion, o.Client, c) | ||||||
|  | } | ||||||
|  | |||||||
| @ -128,6 +128,7 @@ func (m *MsgTransfer) Start(index int, config *Config) error { | |||||||
| 
 | 
 | ||||||
| 	go m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyCH) | 	go m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyCH) | ||||||
| 	go m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyMongoCH) | 	go m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyMongoCH) | ||||||
|  | 	go m.historyCH.HandleUserHasReadSeqMessages(m.ctx) | ||||||
| 	err := m.historyCH.redisMessageBatches.Start() | 	err := m.historyCH.redisMessageBatches.Start() | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| @ -157,12 +158,14 @@ func (m *MsgTransfer) Start(index int, config *Config) error { | |||||||
| 		// graceful close kafka client. | 		// graceful close kafka client. | ||||||
| 		m.cancel() | 		m.cancel() | ||||||
| 		m.historyCH.redisMessageBatches.Close() | 		m.historyCH.redisMessageBatches.Close() | ||||||
|  | 		m.historyCH.Close() | ||||||
| 		m.historyCH.historyConsumerGroup.Close() | 		m.historyCH.historyConsumerGroup.Close() | ||||||
| 		m.historyMongoCH.historyConsumerGroup.Close() | 		m.historyMongoCH.historyConsumerGroup.Close() | ||||||
| 		return nil | 		return nil | ||||||
| 	case <-netDone: | 	case <-netDone: | ||||||
| 		m.cancel() | 		m.cancel() | ||||||
| 		m.historyCH.redisMessageBatches.Close() | 		m.historyCH.redisMessageBatches.Close() | ||||||
|  | 		m.historyCH.Close() | ||||||
| 		m.historyCH.historyConsumerGroup.Close() | 		m.historyCH.historyConsumerGroup.Close() | ||||||
| 		m.historyMongoCH.historyConsumerGroup.Close() | 		m.historyMongoCH.historyConsumerGroup.Close() | ||||||
| 		close(netDone) | 		close(netDone) | ||||||
|  | |||||||
| @ -18,8 +18,10 @@ import ( | |||||||
| 	"context" | 	"context" | ||||||
| 	"encoding/json" | 	"encoding/json" | ||||||
| 	"errors" | 	"errors" | ||||||
|  | 	"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" | ||||||
| 	"strconv" | 	"strconv" | ||||||
| 	"strings" | 	"strings" | ||||||
|  | 	"sync" | ||||||
| 	"time" | 	"time" | ||||||
| 
 | 
 | ||||||
| 	"github.com/IBM/sarama" | 	"github.com/IBM/sarama" | ||||||
| @ -40,11 +42,12 @@ import ( | |||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| const ( | const ( | ||||||
| 	size           = 500 | 	size              = 500 | ||||||
| 	mainDataBuffer = 500 | 	mainDataBuffer    = 500 | ||||||
| 	subChanBuffer  = 50 | 	subChanBuffer     = 50 | ||||||
| 	worker         = 50 | 	worker            = 50 | ||||||
| 	interval       = 100 * time.Millisecond | 	interval          = 100 * time.Millisecond | ||||||
|  | 	hasReadChanBuffer = 1000 | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| type ContextMsg struct { | type ContextMsg struct { | ||||||
| @ -52,14 +55,23 @@ type ContextMsg struct { | |||||||
| 	ctx     context.Context | 	ctx     context.Context | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | // This structure is used for asynchronously writing the sender’s read sequence (seq) regarding a message into MongoDB. | ||||||
|  | // For example, if the sender sends a message with a seq of 10, then their own read seq for this conversation should be set to 10. | ||||||
|  | type userHasReadSeq struct { | ||||||
|  | 	conversationID string | ||||||
|  | 	userHasReadMap map[string]int64 | ||||||
|  | } | ||||||
|  | 
 | ||||||
| type OnlineHistoryRedisConsumerHandler struct { | type OnlineHistoryRedisConsumerHandler struct { | ||||||
| 	historyConsumerGroup *kafka.MConsumerGroup | 	historyConsumerGroup *kafka.MConsumerGroup | ||||||
| 
 | 
 | ||||||
| 	redisMessageBatches *batcher.Batcher[sarama.ConsumerMessage] | 	redisMessageBatches *batcher.Batcher[sarama.ConsumerMessage] | ||||||
| 
 | 
 | ||||||
| 	msgTransferDatabase   controller.MsgTransferDatabase | 	msgTransferDatabase         controller.MsgTransferDatabase | ||||||
| 	conversationRpcClient *rpcclient.ConversationRpcClient | 	conversationRpcClient       *rpcclient.ConversationRpcClient | ||||||
| 	groupRpcClient        *rpcclient.GroupRpcClient | 	groupRpcClient              *rpcclient.GroupRpcClient | ||||||
|  | 	conversationUserHasReadChan chan *userHasReadSeq | ||||||
|  | 	wg                          sync.WaitGroup | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database controller.MsgTransferDatabase, | func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database controller.MsgTransferDatabase, | ||||||
| @ -70,6 +82,8 @@ func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database cont | |||||||
| 	} | 	} | ||||||
| 	var och OnlineHistoryRedisConsumerHandler | 	var och OnlineHistoryRedisConsumerHandler | ||||||
| 	och.msgTransferDatabase = database | 	och.msgTransferDatabase = database | ||||||
|  | 	och.conversationUserHasReadChan = make(chan *userHasReadSeq, hasReadChanBuffer) | ||||||
|  | 	och.wg.Add(1) | ||||||
| 
 | 
 | ||||||
| 	b := batcher.New[sarama.ConsumerMessage]( | 	b := batcher.New[sarama.ConsumerMessage]( | ||||||
| 		batcher.WithSize(size), | 		batcher.WithSize(size), | ||||||
| @ -115,25 +129,25 @@ func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (och *OnlineHistoryRedisConsumerHandler) doSetReadSeq(ctx context.Context, msgs []*ContextMsg) { | func (och *OnlineHistoryRedisConsumerHandler) doSetReadSeq(ctx context.Context, msgs []*ContextMsg) { | ||||||
| 	type seqKey struct { | 
 | ||||||
| 		conversationID string | 	var conversationID string | ||||||
| 		userID         string | 	var userSeqMap map[string]int64 | ||||||
| 	} |  | ||||||
| 	var readSeq map[seqKey]int64 |  | ||||||
| 	for _, msg := range msgs { | 	for _, msg := range msgs { | ||||||
| 		if msg.message.ContentType != constant.HasReadReceipt { | 		if msg.message.ContentType != constant.HasReadReceipt { | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
| 		var elem sdkws.NotificationElem | 		var elem sdkws.NotificationElem | ||||||
| 		if err := json.Unmarshal(msg.message.Content, &elem); err != nil { | 		if err := json.Unmarshal(msg.message.Content, &elem); err != nil { | ||||||
| 			log.ZError(ctx, "handlerConversationRead Unmarshal NotificationElem msg err", err, "msg", msg) | 			log.ZWarn(ctx, "handlerConversationRead Unmarshal NotificationElem msg err", err, "msg", msg) | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
| 		var tips sdkws.MarkAsReadTips | 		var tips sdkws.MarkAsReadTips | ||||||
| 		if err := json.Unmarshal([]byte(elem.Detail), &tips); err != nil { | 		if err := json.Unmarshal([]byte(elem.Detail), &tips); err != nil { | ||||||
| 			log.ZError(ctx, "handlerConversationRead Unmarshal MarkAsReadTips msg err", err, "msg", msg) | 			log.ZWarn(ctx, "handlerConversationRead Unmarshal MarkAsReadTips msg err", err, "msg", msg) | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
|  | 		//The conversation ID for each batch of messages processed by the batcher is the same. | ||||||
|  | 		conversationID = tips.ConversationID | ||||||
| 		if len(tips.Seqs) > 0 { | 		if len(tips.Seqs) > 0 { | ||||||
| 			for _, seq := range tips.Seqs { | 			for _, seq := range tips.Seqs { | ||||||
| 				if tips.HasReadSeq < seq { | 				if tips.HasReadSeq < seq { | ||||||
| @ -146,26 +160,25 @@ func (och *OnlineHistoryRedisConsumerHandler) doSetReadSeq(ctx context.Context, | |||||||
| 		if tips.HasReadSeq < 0 { | 		if tips.HasReadSeq < 0 { | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
| 		if readSeq == nil { | 		if userSeqMap == nil { | ||||||
| 			readSeq = make(map[seqKey]int64) | 			userSeqMap = make(map[string]int64) | ||||||
| 		} | 		} | ||||||
| 		key := seqKey{ | 
 | ||||||
| 			conversationID: tips.ConversationID, | 		if userSeqMap[tips.MarkAsReadUserID] > tips.HasReadSeq { | ||||||
| 			userID:         tips.MarkAsReadUserID, |  | ||||||
| 		} |  | ||||||
| 		if readSeq[key] > tips.HasReadSeq { |  | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
| 		readSeq[key] = tips.HasReadSeq | 		userSeqMap[tips.MarkAsReadUserID] = tips.HasReadSeq | ||||||
| 	} | 	} | ||||||
| 	if readSeq == nil { | 	if userSeqMap == nil { | ||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
| 	for key, seq := range readSeq { | 	if len(conversationID) == 0 { | ||||||
| 		if err := och.msgTransferDatabase.SetHasReadSeqToDB(ctx, key.userID, key.conversationID, seq); err != nil { | 		log.ZWarn(ctx, "conversation err", nil, "conversationID", conversationID) | ||||||
| 			log.ZError(ctx, "set read seq to db error", err, "userID", key.userID, "conversationID", key.conversationID, "seq", seq) |  | ||||||
| 		} |  | ||||||
| 	} | 	} | ||||||
|  | 	if err := och.msgTransferDatabase.SetHasReadSeqToDB(ctx, conversationID, userSeqMap); err != nil { | ||||||
|  | 		log.ZWarn(ctx, "set read seq to db error", err, "conversationID", conversationID, "userSeqMap", userSeqMap) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (och *OnlineHistoryRedisConsumerHandler) parseConsumerMessages(ctx context.Context, consumerMessages []*sarama.ConsumerMessage) []*ContextMsg { | func (och *OnlineHistoryRedisConsumerHandler) parseConsumerMessages(ctx context.Context, consumerMessages []*sarama.ConsumerMessage) []*ContextMsg { | ||||||
| @ -250,12 +263,21 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key | |||||||
| 	} | 	} | ||||||
| 	if len(storageMessageList) > 0 { | 	if len(storageMessageList) > 0 { | ||||||
| 		msg := storageMessageList[0] | 		msg := storageMessageList[0] | ||||||
| 		lastSeq, isNewConversation, err := och.msgTransferDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList) | 		lastSeq, isNewConversation, userSeqMap, err := och.msgTransferDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList) | ||||||
| 		if err != nil && !errors.Is(errs.Unwrap(err), redis.Nil) { | 		if err != nil && !errors.Is(errs.Unwrap(err), redis.Nil) { | ||||||
| 			log.ZError(ctx, "batch data insert to redis err", err, "storageMsgList", storageMessageList) | 			log.ZWarn(ctx, "batch data insert to redis err", err, "storageMsgList", storageMessageList) | ||||||
| 			return | 			return | ||||||
| 		} | 		} | ||||||
| 		log.ZInfo(ctx, "BatchInsertChat2Cache end") | 		log.ZInfo(ctx, "BatchInsertChat2Cache end") | ||||||
|  | 		err = och.msgTransferDatabase.SetHasReadSeqs(ctx, conversationID, userSeqMap) | ||||||
|  | 		if err != nil { | ||||||
|  | 			log.ZWarn(ctx, "SetHasReadSeqs error", err, "userSeqMap", userSeqMap, "conversationID", conversationID) | ||||||
|  | 			prommetrics.SeqSetFailedCounter.Inc() | ||||||
|  | 		} | ||||||
|  | 		och.conversationUserHasReadChan <- &userHasReadSeq{ | ||||||
|  | 			conversationID: conversationID, | ||||||
|  | 			userHasReadMap: userSeqMap, | ||||||
|  | 		} | ||||||
| 
 | 
 | ||||||
| 		if isNewConversation { | 		if isNewConversation { | ||||||
| 			switch msg.SessionType { | 			switch msg.SessionType { | ||||||
| @ -308,7 +330,7 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Con | |||||||
| 		storageMessageList = append(storageMessageList, msg.message) | 		storageMessageList = append(storageMessageList, msg.message) | ||||||
| 	} | 	} | ||||||
| 	if len(storageMessageList) > 0 { | 	if len(storageMessageList) > 0 { | ||||||
| 		lastSeq, _, err := och.msgTransferDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList) | 		lastSeq, _, _, err := och.msgTransferDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			log.ZError(ctx, "notification batch insert to redis error", err, "conversationID", conversationID, | 			log.ZError(ctx, "notification batch insert to redis error", err, "conversationID", conversationID, | ||||||
| 				"storageList", storageMessageList) | 				"storageList", storageMessageList) | ||||||
| @ -323,6 +345,21 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Con | |||||||
| 		och.toPushTopic(ctx, key, conversationID, storageList) | 		och.toPushTopic(ctx, key, conversationID, storageList) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  | func (och *OnlineHistoryRedisConsumerHandler) HandleUserHasReadSeqMessages(ctx context.Context) { | ||||||
|  | 	defer och.wg.Done() | ||||||
|  | 
 | ||||||
|  | 	for msg := range och.conversationUserHasReadChan { | ||||||
|  | 		if err := och.msgTransferDatabase.SetHasReadSeqToDB(ctx, msg.conversationID, msg.userHasReadMap); err != nil { | ||||||
|  | 			log.ZWarn(ctx, "set read seq to db error", err, "conversationID", msg.conversationID, "userSeqMap", msg.userHasReadMap) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	log.ZInfo(ctx, "Channel closed, exiting handleUserHasReadSeqMessages") | ||||||
|  | } | ||||||
|  | func (och *OnlineHistoryRedisConsumerHandler) Close() { | ||||||
|  | 	close(och.conversationUserHasReadChan) | ||||||
|  | 	och.wg.Wait() | ||||||
|  | } | ||||||
| 
 | 
 | ||||||
| func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, key, conversationID string, msgs []*ContextMsg) { | func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, key, conversationID string, msgs []*ContextMsg) { | ||||||
| 	for _, v := range msgs { | 	for _, v := range msgs { | ||||||
|  | |||||||
| @ -24,7 +24,6 @@ import ( | |||||||
| 	"github.com/openimsdk/protocol/constant" | 	"github.com/openimsdk/protocol/constant" | ||||||
| 	"github.com/openimsdk/protocol/sdkws" | 	"github.com/openimsdk/protocol/sdkws" | ||||||
| 	"github.com/openimsdk/tools/mcontext" | 	"github.com/openimsdk/tools/mcontext" | ||||||
| 	"github.com/openimsdk/tools/utils/datautil" |  | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| func (c *ConsumerHandler) webhookBeforeOfflinePush(ctx context.Context, before *config.BeforeConfig, userIDs []string, msg *sdkws.MsgData, offlinePushUserIDs *[]string) error { | func (c *ConsumerHandler) webhookBeforeOfflinePush(ctx context.Context, before *config.BeforeConfig, userIDs []string, msg *sdkws.MsgData, offlinePushUserIDs *[]string) error { | ||||||
| @ -70,7 +69,7 @@ func (c *ConsumerHandler) webhookBeforeOfflinePush(ctx context.Context, before * | |||||||
| 
 | 
 | ||||||
| func (c *ConsumerHandler) webhookBeforeOnlinePush(ctx context.Context, before *config.BeforeConfig, userIDs []string, msg *sdkws.MsgData) error { | func (c *ConsumerHandler) webhookBeforeOnlinePush(ctx context.Context, before *config.BeforeConfig, userIDs []string, msg *sdkws.MsgData) error { | ||||||
| 	return webhook.WithCondition(ctx, before, func(ctx context.Context) error { | 	return webhook.WithCondition(ctx, before, func(ctx context.Context) error { | ||||||
| 		if datautil.Contain(msg.SendID, userIDs...) || msg.ContentType == constant.Typing { | 		if msg.ContentType == constant.Typing { | ||||||
| 			return nil | 			return nil | ||||||
| 		} | 		} | ||||||
| 		req := callbackstruct.CallbackBeforePushReq{ | 		req := callbackstruct.CallbackBeforePushReq{ | ||||||
|  | |||||||
| @ -1026,7 +1026,7 @@ func (g *groupServer) SetGroupInfo(ctx context.Context, req *pbgroup.SetGroupInf | |||||||
| 	} | 	} | ||||||
| 	num := len(update) | 	num := len(update) | ||||||
| 	if req.GroupInfoForSet.Notification != "" { | 	if req.GroupInfoForSet.Notification != "" { | ||||||
| 		num-- | 		num -= 3 | ||||||
| 		func() { | 		func() { | ||||||
| 			conversation := &pbconversation.ConversationReq{ | 			conversation := &pbconversation.ConversationReq{ | ||||||
| 				ConversationID:   msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, req.GroupInfoForSet.GroupID), | 				ConversationID:   msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, req.GroupInfoForSet.GroupID), | ||||||
| @ -1133,8 +1133,9 @@ func (g *groupServer) SetGroupInfoEx(ctx context.Context, req *pbgroup.SetGroupI | |||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	num := len(updatedData) | 	num := len(updatedData) | ||||||
|  | 
 | ||||||
| 	if req.Notification != nil { | 	if req.Notification != nil { | ||||||
| 		num-- | 		num -= 3 | ||||||
| 
 | 
 | ||||||
| 		if req.Notification.Value != "" { | 		if req.Notification.Value != "" { | ||||||
| 			func() { | 			func() { | ||||||
| @ -1219,7 +1220,7 @@ func (g *groupServer) TransferGroupOwner(ctx context.Context, req *pbgroup.Trans | |||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if newOwner.MuteEndTime != time.Unix(0, 0) { | 	if newOwner.MuteEndTime.After(time.Now()) { | ||||||
| 		if _, err := g.CancelMuteGroupMember(ctx, &pbgroup.CancelMuteGroupMemberReq{ | 		if _, err := g.CancelMuteGroupMember(ctx, &pbgroup.CancelMuteGroupMemberReq{ | ||||||
| 			GroupID: group.GroupID, | 			GroupID: group.GroupID, | ||||||
| 			UserID:  req.NewOwnerUserID}); err != nil { | 			UserID:  req.NewOwnerUserID}); err != nil { | ||||||
|  | |||||||
							
								
								
									
										117
									
								
								internal/rpc/third/application.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										117
									
								
								internal/rpc/third/application.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,117 @@ | |||||||
|  | package third | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"github.com/openimsdk/open-im-server/v3/pkg/authverify" | ||||||
|  | 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" | ||||||
|  | 	"github.com/openimsdk/protocol/third" | ||||||
|  | 	"github.com/openimsdk/tools/errs" | ||||||
|  | 	"github.com/openimsdk/tools/utils/datautil" | ||||||
|  | 	"github.com/redis/go-redis/v9" | ||||||
|  | 	"go.mongodb.org/mongo-driver/bson/primitive" | ||||||
|  | 	"go.mongodb.org/mongo-driver/mongo" | ||||||
|  | 	"time" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | func IsNotFound(err error) bool { | ||||||
|  | 	switch errs.Unwrap(err) { | ||||||
|  | 	case redis.Nil, mongo.ErrNoDocuments: | ||||||
|  | 		return true | ||||||
|  | 	default: | ||||||
|  | 		return false | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (t *thirdServer) db2pbApplication(val *model.Application) *third.ApplicationVersion { | ||||||
|  | 	return &third.ApplicationVersion{ | ||||||
|  | 		Id:         val.ID.Hex(), | ||||||
|  | 		Platform:   val.Platform, | ||||||
|  | 		Version:    val.Version, | ||||||
|  | 		Url:        val.Url, | ||||||
|  | 		Text:       val.Text, | ||||||
|  | 		Force:      val.Force, | ||||||
|  | 		Latest:     val.Latest, | ||||||
|  | 		CreateTime: val.CreateTime.UnixMilli(), | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (t *thirdServer) LatestApplicationVersion(ctx context.Context, req *third.LatestApplicationVersionReq) (*third.LatestApplicationVersionResp, error) { | ||||||
|  | 	res, err := t.applicationDatabase.LatestVersion(ctx, req.Platform) | ||||||
|  | 	if err == nil { | ||||||
|  | 		return &third.LatestApplicationVersionResp{Version: t.db2pbApplication(res)}, nil | ||||||
|  | 	} else if IsNotFound(err) { | ||||||
|  | 		return &third.LatestApplicationVersionResp{}, nil | ||||||
|  | 	} else { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (t *thirdServer) AddApplicationVersion(ctx context.Context, req *third.AddApplicationVersionReq) (*third.AddApplicationVersionResp, error) { | ||||||
|  | 	if err := authverify.CheckAdmin(ctx, t.config.Share.IMAdminUserID); err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	val := &model.Application{ | ||||||
|  | 		ID:         primitive.NewObjectID(), | ||||||
|  | 		Platform:   req.Platform, | ||||||
|  | 		Version:    req.Version, | ||||||
|  | 		Url:        req.Url, | ||||||
|  | 		Text:       req.Text, | ||||||
|  | 		Force:      req.Force, | ||||||
|  | 		Latest:     req.Latest, | ||||||
|  | 		CreateTime: time.Now(), | ||||||
|  | 	} | ||||||
|  | 	if err := t.applicationDatabase.AddVersion(ctx, val); err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	return &third.AddApplicationVersionResp{}, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (t *thirdServer) UpdateApplicationVersion(ctx context.Context, req *third.UpdateApplicationVersionReq) (*third.UpdateApplicationVersionResp, error) { | ||||||
|  | 	if err := authverify.CheckAdmin(ctx, t.config.Share.IMAdminUserID); err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	oid, err := primitive.ObjectIDFromHex(req.Id) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, errs.ErrArgs.WrapMsg("invalid id " + err.Error()) | ||||||
|  | 	} | ||||||
|  | 	update := make(map[string]any) | ||||||
|  | 	putUpdate(update, "platform", req.Platform) | ||||||
|  | 	putUpdate(update, "version", req.Version) | ||||||
|  | 	putUpdate(update, "url", req.Url) | ||||||
|  | 	putUpdate(update, "text", req.Text) | ||||||
|  | 	putUpdate(update, "force", req.Force) | ||||||
|  | 	putUpdate(update, "latest", req.Latest) | ||||||
|  | 	if err := t.applicationDatabase.UpdateVersion(ctx, oid, update); err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	return &third.UpdateApplicationVersionResp{}, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (t *thirdServer) DeleteApplicationVersion(ctx context.Context, req *third.DeleteApplicationVersionReq) (*third.DeleteApplicationVersionResp, error) { | ||||||
|  | 	if err := authverify.CheckAdmin(ctx, t.config.Share.IMAdminUserID); err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	ids := make([]primitive.ObjectID, 0, len(req.Id)) | ||||||
|  | 	for _, id := range req.Id { | ||||||
|  | 		oid, err := primitive.ObjectIDFromHex(id) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return nil, errs.ErrArgs.WrapMsg("invalid id " + err.Error()) | ||||||
|  | 		} | ||||||
|  | 		ids = append(ids, oid) | ||||||
|  | 	} | ||||||
|  | 	if err := t.applicationDatabase.DeleteVersion(ctx, ids); err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	return &third.DeleteApplicationVersionResp{}, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (t *thirdServer) PageApplicationVersion(ctx context.Context, req *third.PageApplicationVersionReq) (*third.PageApplicationVersionResp, error) { | ||||||
|  | 	total, res, err := t.applicationDatabase.PageVersion(ctx, req.Platform, req.Pagination) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	return &third.PageApplicationVersionResp{ | ||||||
|  | 		Total:    total, | ||||||
|  | 		Versions: datautil.Slice(res, t.db2pbApplication), | ||||||
|  | 	}, nil | ||||||
|  | } | ||||||
| @ -38,12 +38,13 @@ import ( | |||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| type thirdServer struct { | type thirdServer struct { | ||||||
| 	thirdDatabase controller.ThirdDatabase | 	thirdDatabase       controller.ThirdDatabase | ||||||
| 	s3dataBase    controller.S3Database | 	s3dataBase          controller.S3Database | ||||||
| 	userRpcClient rpcclient.UserRpcClient | 	userRpcClient       rpcclient.UserRpcClient | ||||||
| 	defaultExpire time.Duration | 	defaultExpire       time.Duration | ||||||
| 	config        *Config | 	config              *Config | ||||||
| 	minio         *minio.Minio | 	minio               *minio.Minio | ||||||
|  | 	applicationDatabase controller.ApplicationDatabase | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| type Config struct { | type Config struct { | ||||||
| @ -74,6 +75,11 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg | |||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
|  | 	applicationMgo, err := mgo.NewApplicationMgo(mgocli.GetDB()) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	// Select the oss method according to the profile policy | 	// Select the oss method according to the profile policy | ||||||
| 	enable := config.RpcConfig.Object.Enable | 	enable := config.RpcConfig.Object.Enable | ||||||
| 	var ( | 	var ( | ||||||
| @ -98,12 +104,13 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg | |||||||
| 	} | 	} | ||||||
| 	localcache.InitLocalCache(&config.LocalCacheConfig) | 	localcache.InitLocalCache(&config.LocalCacheConfig) | ||||||
| 	third.RegisterThirdServer(server, &thirdServer{ | 	third.RegisterThirdServer(server, &thirdServer{ | ||||||
| 		thirdDatabase: controller.NewThirdDatabase(redis.NewThirdCache(rdb), logdb), | 		thirdDatabase:       controller.NewThirdDatabase(redis.NewThirdCache(rdb), logdb), | ||||||
| 		userRpcClient: rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID), | 		userRpcClient:       rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID), | ||||||
| 		s3dataBase:    controller.NewS3Database(rdb, o, s3db), | 		s3dataBase:          controller.NewS3Database(rdb, o, s3db), | ||||||
| 		defaultExpire: time.Hour * 24 * 7, | 		defaultExpire:       time.Hour * 24 * 7, | ||||||
| 		config:        config, | 		config:              config, | ||||||
| 		minio:         minioCli, | 		minio:               minioCli, | ||||||
|  | 		applicationDatabase: controller.NewApplicationDatabase(applicationMgo, redis.NewApplicationRedisCache(applicationMgo, rdb)), | ||||||
| 	}) | 	}) | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  | |||||||
| @ -82,3 +82,11 @@ func checkValidObjectName(objectName string) error { | |||||||
| func (t *thirdServer) IsManagerUserID(opUserID string) bool { | func (t *thirdServer) IsManagerUserID(opUserID string) bool { | ||||||
| 	return authverify.IsManagerUserID(opUserID, t.config.Share.IMAdminUserID) | 	return authverify.IsManagerUserID(opUserID, t.config.Share.IMAdminUserID) | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | func putUpdate[T any](update map[string]any, name string, val interface{ GetValuePtr() *T }) { | ||||||
|  | 	ptrVal := val.GetValuePtr() | ||||||
|  | 	if ptrVal == nil { | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 	update[name] = *ptrVal | ||||||
|  | } | ||||||
|  | |||||||
							
								
								
									
										11
									
								
								pkg/common/storage/cache/application.go
									
									
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										11
									
								
								pkg/common/storage/cache/application.go
									
									
									
									
										vendored
									
									
										Normal file
									
								
							| @ -0,0 +1,11 @@ | |||||||
|  | package cache | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | type ApplicationCache interface { | ||||||
|  | 	LatestVersion(ctx context.Context, platform string) (*model.Application, error) | ||||||
|  | 	DeleteCache(ctx context.Context, platforms []string) error | ||||||
|  | } | ||||||
							
								
								
									
										9
									
								
								pkg/common/storage/cache/cachekey/application.go
									
									
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										9
									
								
								pkg/common/storage/cache/cachekey/application.go
									
									
									
									
										vendored
									
									
										Normal file
									
								
							| @ -0,0 +1,9 @@ | |||||||
|  | package cachekey | ||||||
|  | 
 | ||||||
|  | const ( | ||||||
|  | 	ApplicationLatestVersion = "APPLICATION_LATEST_VERSION:" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | func GetApplicationLatestVersionKey(platform string) string { | ||||||
|  | 	return ApplicationLatestVersion + platform | ||||||
|  | } | ||||||
							
								
								
									
										43
									
								
								pkg/common/storage/cache/redis/application.go
									
									
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										43
									
								
								pkg/common/storage/cache/redis/application.go
									
									
									
									
										vendored
									
									
										Normal file
									
								
							| @ -0,0 +1,43 @@ | |||||||
|  | package redis | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"github.com/dtm-labs/rockscache" | ||||||
|  | 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" | ||||||
|  | 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" | ||||||
|  | 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" | ||||||
|  | 	"github.com/openimsdk/tools/utils/datautil" | ||||||
|  | 	"github.com/redis/go-redis/v9" | ||||||
|  | 	"time" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | func NewApplicationRedisCache(db database.Application, rdb redis.UniversalClient) *ApplicationRedisCache { | ||||||
|  | 	return &ApplicationRedisCache{ | ||||||
|  | 		db:         db, | ||||||
|  | 		rcClient:   rockscache.NewClient(rdb, *GetRocksCacheOptions()), | ||||||
|  | 		deleter:    NewBatchDeleterRedis(rdb, GetRocksCacheOptions(), nil), | ||||||
|  | 		expireTime: time.Hour * 24 * 7, | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type ApplicationRedisCache struct { | ||||||
|  | 	db         database.Application | ||||||
|  | 	rcClient   *rockscache.Client | ||||||
|  | 	deleter    *BatchDeleterRedis | ||||||
|  | 	expireTime time.Duration | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (a *ApplicationRedisCache) LatestVersion(ctx context.Context, platform string) (*model.Application, error) { | ||||||
|  | 	return getCache(ctx, a.rcClient, cachekey.GetApplicationLatestVersionKey(platform), a.expireTime, func(ctx context.Context) (*model.Application, error) { | ||||||
|  | 		return a.db.LatestVersion(ctx, platform) | ||||||
|  | 	}) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (a *ApplicationRedisCache) DeleteCache(ctx context.Context, platforms []string) error { | ||||||
|  | 	if len(platforms) == 0 { | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  | 	return a.deleter.ExecDelWithKeys(ctx, datautil.Slice(platforms, func(platform string) string { | ||||||
|  | 		return cachekey.GetApplicationLatestVersionKey(platform) | ||||||
|  | 	})) | ||||||
|  | } | ||||||
							
								
								
									
										69
									
								
								pkg/common/storage/controller/application.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										69
									
								
								pkg/common/storage/controller/application.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,69 @@ | |||||||
|  | package controller | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" | ||||||
|  | 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" | ||||||
|  | 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" | ||||||
|  | 	"github.com/openimsdk/tools/db/pagination" | ||||||
|  | 	"go.mongodb.org/mongo-driver/bson/primitive" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | type ApplicationDatabase interface { | ||||||
|  | 	LatestVersion(ctx context.Context, platform string) (*model.Application, error) | ||||||
|  | 	AddVersion(ctx context.Context, val *model.Application) error | ||||||
|  | 	UpdateVersion(ctx context.Context, id primitive.ObjectID, update map[string]any) error | ||||||
|  | 	DeleteVersion(ctx context.Context, id []primitive.ObjectID) error | ||||||
|  | 	PageVersion(ctx context.Context, platforms []string, page pagination.Pagination) (int64, []*model.Application, error) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func NewApplicationDatabase(db database.Application, cache cache.ApplicationCache) ApplicationDatabase { | ||||||
|  | 	return &applicationDatabase{db: db, cache: cache} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type applicationDatabase struct { | ||||||
|  | 	db    database.Application | ||||||
|  | 	cache cache.ApplicationCache | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (a *applicationDatabase) LatestVersion(ctx context.Context, platform string) (*model.Application, error) { | ||||||
|  | 	return a.cache.LatestVersion(ctx, platform) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (a *applicationDatabase) AddVersion(ctx context.Context, val *model.Application) error { | ||||||
|  | 	if err := a.db.AddVersion(ctx, val); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	return a.cache.DeleteCache(ctx, []string{val.Platform}) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (a *applicationDatabase) UpdateVersion(ctx context.Context, id primitive.ObjectID, update map[string]any) error { | ||||||
|  | 	platforms, err := a.db.FindPlatform(ctx, []primitive.ObjectID{id}) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	if err := a.db.UpdateVersion(ctx, id, update); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	if p, ok := update["platform"]; ok { | ||||||
|  | 		if val, ok := p.(string); ok { | ||||||
|  | 			platforms = append(platforms, val) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	return a.cache.DeleteCache(ctx, platforms) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (a *applicationDatabase) DeleteVersion(ctx context.Context, id []primitive.ObjectID) error { | ||||||
|  | 	platforms, err := a.db.FindPlatform(ctx, id) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	if err := a.db.DeleteVersion(ctx, id); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	return a.cache.DeleteCache(ctx, platforms) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (a *applicationDatabase) PageVersion(ctx context.Context, platforms []string, page pagination.Pagination) (int64, []*model.Application, error) { | ||||||
|  | 	return a.db.PageVersion(ctx, platforms, page) | ||||||
|  | } | ||||||
| @ -24,8 +24,11 @@ type MsgTransferDatabase interface { | |||||||
| 	DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error | 	DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error | ||||||
| 
 | 
 | ||||||
| 	// BatchInsertChat2Cache increments the sequence number and then batch inserts messages into the cache. | 	// BatchInsertChat2Cache increments the sequence number and then batch inserts messages into the cache. | ||||||
| 	BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNewConversation bool, err error) | 	BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNewConversation bool, userHasReadMap map[string]int64, err error) | ||||||
| 	SetHasReadSeqToDB(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error | 
 | ||||||
|  | 	SetHasReadSeqs(ctx context.Context, conversationID string, userSeqMap map[string]int64) error | ||||||
|  | 
 | ||||||
|  | 	SetHasReadSeqToDB(ctx context.Context, conversationID string, userSeqMap map[string]int64) error | ||||||
| 
 | 
 | ||||||
| 	// to mq | 	// to mq | ||||||
| 	MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error) | 	MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error) | ||||||
| @ -219,18 +222,18 @@ func (db *msgTransferDatabase) DeleteMessagesFromCache(ctx context.Context, conv | |||||||
| 	return db.msg.DeleteMessagesFromCache(ctx, conversationID, seqs) | 	return db.msg.DeleteMessagesFromCache(ctx, conversationID, seqs) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (db *msgTransferDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, err error) { | func (db *msgTransferDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, userHasReadMap map[string]int64, err error) { | ||||||
| 	lenList := len(msgs) | 	lenList := len(msgs) | ||||||
| 	if int64(lenList) > db.msgTable.GetSingleGocMsgNum() { | 	if int64(lenList) > db.msgTable.GetSingleGocMsgNum() { | ||||||
| 		return 0, false, errs.New("message count exceeds limit", "limit", db.msgTable.GetSingleGocMsgNum()).Wrap() | 		return 0, false, nil, errs.New("message count exceeds limit", "limit", db.msgTable.GetSingleGocMsgNum()).Wrap() | ||||||
| 	} | 	} | ||||||
| 	if lenList < 1 { | 	if lenList < 1 { | ||||||
| 		return 0, false, errs.New("no messages to insert", "minCount", 1).Wrap() | 		return 0, false, nil, errs.New("no messages to insert", "minCount", 1).Wrap() | ||||||
| 	} | 	} | ||||||
| 	currentMaxSeq, err := db.seqConversation.Malloc(ctx, conversationID, int64(len(msgs))) | 	currentMaxSeq, err := db.seqConversation.Malloc(ctx, conversationID, int64(len(msgs))) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		log.ZError(ctx, "storage.seq.Malloc", err) | 		log.ZError(ctx, "storage.seq.Malloc", err) | ||||||
| 		return 0, false, err | 		return 0, false, nil, err | ||||||
| 	} | 	} | ||||||
| 	isNew = currentMaxSeq == 0 | 	isNew = currentMaxSeq == 0 | ||||||
| 	lastMaxSeq := currentMaxSeq | 	lastMaxSeq := currentMaxSeq | ||||||
| @ -248,25 +251,25 @@ func (db *msgTransferDatabase) BatchInsertChat2Cache(ctx context.Context, conver | |||||||
| 	} else { | 	} else { | ||||||
| 		prommetrics.MsgInsertRedisSuccessCounter.Inc() | 		prommetrics.MsgInsertRedisSuccessCounter.Inc() | ||||||
| 	} | 	} | ||||||
| 	err = db.setHasReadSeqs(ctx, conversationID, userSeqMap) | 	return lastMaxSeq, isNew, userSeqMap, errs.Wrap(err) | ||||||
| 	if err != nil { |  | ||||||
| 		log.ZError(ctx, "SetHasReadSeqs error", err, "userSeqMap", userSeqMap, "conversationID", conversationID) |  | ||||||
| 		prommetrics.SeqSetFailedCounter.Inc() |  | ||||||
| 	} |  | ||||||
| 	return lastMaxSeq, isNew, errs.Wrap(err) |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (db *msgTransferDatabase) setHasReadSeqs(ctx context.Context, conversationID string, userSeqMap map[string]int64) error { | func (db *msgTransferDatabase) SetHasReadSeqs(ctx context.Context, conversationID string, userSeqMap map[string]int64) error { | ||||||
| 	for userID, seq := range userSeqMap { | 	for userID, seq := range userSeqMap { | ||||||
| 		if err := db.seqUser.SetUserReadSeqToDB(ctx, conversationID, userID, seq); err != nil { | 		if err := db.seqUser.SetUserReadSeq(ctx, conversationID, userID, seq); err != nil { | ||||||
| 			return err | 			return err | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (db *msgTransferDatabase) SetHasReadSeqToDB(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error { | func (db *msgTransferDatabase) SetHasReadSeqToDB(ctx context.Context, conversationID string, userSeqMap map[string]int64) error { | ||||||
| 	return db.seqUser.SetUserReadSeqToDB(ctx, conversationID, userID, hasReadSeq) | 	for userID, seq := range userSeqMap { | ||||||
|  | 		if err := db.seqUser.SetUserReadSeqToDB(ctx, conversationID, userID, seq); err != nil { | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (db *msgTransferDatabase) MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error) { | func (db *msgTransferDatabase) MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error) { | ||||||
|  | |||||||
							
								
								
									
										17
									
								
								pkg/common/storage/database/application.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										17
									
								
								pkg/common/storage/database/application.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,17 @@ | |||||||
|  | package database | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" | ||||||
|  | 	"github.com/openimsdk/tools/db/pagination" | ||||||
|  | 	"go.mongodb.org/mongo-driver/bson/primitive" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | type Application interface { | ||||||
|  | 	LatestVersion(ctx context.Context, platform string) (*model.Application, error) | ||||||
|  | 	AddVersion(ctx context.Context, val *model.Application) error | ||||||
|  | 	UpdateVersion(ctx context.Context, id primitive.ObjectID, update map[string]any) error | ||||||
|  | 	DeleteVersion(ctx context.Context, id []primitive.ObjectID) error | ||||||
|  | 	PageVersion(ctx context.Context, platforms []string, page pagination.Pagination) (int64, []*model.Application, error) | ||||||
|  | 	FindPlatform(ctx context.Context, id []primitive.ObjectID) ([]string, error) | ||||||
|  | } | ||||||
							
								
								
									
										82
									
								
								pkg/common/storage/database/mgo/application.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										82
									
								
								pkg/common/storage/database/mgo/application.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,82 @@ | |||||||
|  | package mgo | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" | ||||||
|  | 	"github.com/openimsdk/tools/db/mongoutil" | ||||||
|  | 	"github.com/openimsdk/tools/db/pagination" | ||||||
|  | 	"go.mongodb.org/mongo-driver/bson" | ||||||
|  | 	"go.mongodb.org/mongo-driver/bson/primitive" | ||||||
|  | 	"go.mongodb.org/mongo-driver/mongo" | ||||||
|  | 	"go.mongodb.org/mongo-driver/mongo/options" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | func NewApplicationMgo(db *mongo.Database) (*ApplicationMgo, error) { | ||||||
|  | 	coll := db.Collection("application") | ||||||
|  | 	_, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{ | ||||||
|  | 		{ | ||||||
|  | 			Keys: bson.D{ | ||||||
|  | 				{Key: "platform", Value: 1}, | ||||||
|  | 				{Key: "version", Value: 1}, | ||||||
|  | 			}, | ||||||
|  | 			Options: options.Index().SetUnique(true), | ||||||
|  | 		}, | ||||||
|  | 		{ | ||||||
|  | 			Keys: bson.D{ | ||||||
|  | 				{Key: "latest", Value: -1}, | ||||||
|  | 			}, | ||||||
|  | 		}, | ||||||
|  | 	}) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	return &ApplicationMgo{coll: coll}, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type ApplicationMgo struct { | ||||||
|  | 	coll *mongo.Collection | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (a *ApplicationMgo) sort() any { | ||||||
|  | 	return bson.D{{"latest", -1}, {"_id", -1}} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (a *ApplicationMgo) LatestVersion(ctx context.Context, platform string) (*model.Application, error) { | ||||||
|  | 	return mongoutil.FindOne[*model.Application](ctx, a.coll, bson.M{"platform": platform}, options.FindOne().SetSort(a.sort())) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (a *ApplicationMgo) AddVersion(ctx context.Context, val *model.Application) error { | ||||||
|  | 	if val.ID.IsZero() { | ||||||
|  | 		val.ID = primitive.NewObjectID() | ||||||
|  | 	} | ||||||
|  | 	return mongoutil.InsertMany(ctx, a.coll, []*model.Application{val}) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (a *ApplicationMgo) UpdateVersion(ctx context.Context, id primitive.ObjectID, update map[string]any) error { | ||||||
|  | 	if len(update) == 0 { | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  | 	return mongoutil.UpdateOne(ctx, a.coll, bson.M{"_id": id}, bson.M{"$set": update}, true) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (a *ApplicationMgo) DeleteVersion(ctx context.Context, id []primitive.ObjectID) error { | ||||||
|  | 	if len(id) == 0 { | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  | 	return mongoutil.DeleteMany(ctx, a.coll, bson.M{"_id": bson.M{"$in": id}}) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (a *ApplicationMgo) PageVersion(ctx context.Context, platforms []string, page pagination.Pagination) (int64, []*model.Application, error) { | ||||||
|  | 	filter := bson.M{} | ||||||
|  | 	if len(platforms) > 0 { | ||||||
|  | 		filter["platform"] = bson.M{"$in": platforms} | ||||||
|  | 	} | ||||||
|  | 	return mongoutil.FindPage[*model.Application](ctx, a.coll, filter, page, options.Find().SetSort(a.sort())) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (a *ApplicationMgo) FindPlatform(ctx context.Context, id []primitive.ObjectID) ([]string, error) { | ||||||
|  | 	if len(id) == 0 { | ||||||
|  | 		return nil, nil | ||||||
|  | 	} | ||||||
|  | 	return mongoutil.Find[string](ctx, a.coll, bson.M{"_id": bson.M{"$in": id}}, options.Find().SetProjection(bson.M{"_id": 0, "platform": 1})) | ||||||
|  | } | ||||||
							
								
								
									
										17
									
								
								pkg/common/storage/model/application.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										17
									
								
								pkg/common/storage/model/application.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,17 @@ | |||||||
|  | package model | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"go.mongodb.org/mongo-driver/bson/primitive" | ||||||
|  | 	"time" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | type Application struct { | ||||||
|  | 	ID         primitive.ObjectID `bson:"_id"` | ||||||
|  | 	Platform   string             `bson:"platform"` | ||||||
|  | 	Version    string             `bson:"version"` | ||||||
|  | 	Url        string             `bson:"url"` | ||||||
|  | 	Text       string             `bson:"text"` | ||||||
|  | 	Force      bool               `bson:"force"` | ||||||
|  | 	Latest     bool               `bson:"latest"` | ||||||
|  | 	CreateTime time.Time          `bson:"create_time"` | ||||||
|  | } | ||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user