mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-06 04:15:46 +08:00
merge: update code from main to v3.8-js-sdk-only. (#2818)
* feat: implement merge milestone PR to target-branch. (#2796) * build: improve workflows logic. (#2801) * fix: improve time condition check mehtod. (#2804) * fix: improve time condition check mehtod. * fix * fix: webhook before online push (#2805) * fix: set own read seq in MongoDB when sender send a message. (#2808) * fix: solve err Notification when setGroupInfo. (#2806) * fix: solve err Notification when setGroupInfo. * build: update checkout version. * fix: update notification contents. * Introducing OpenIM Guru on Gurubase.io (#2788) * feat: support app update service (#2811) * fix: GroupApplicationAcceptedNotification * fix: GroupApplicationAcceptedNotification * fix: NotificationUserInfoUpdate * cicd: robot automated Change * fix: component * fix: getConversationInfo * feat: cron task * feat: cron task * feat: cron task * feat: cron task * feat: cron task * fix: minio config url recognition error * update gomake version * update gomake version * fix: seq conversion bug * fix: redis pipe exec * fix: ImportFriends * fix: A large number of logs keysAndValues length is not even * feat: mark read aggregate write * feat: online status supports redis cluster * feat: online status supports redis cluster * feat: online status supports redis cluster * merge * merge * read seq is written to mongo * read seq is written to mongo * fix: invitation to join group notification * fix: friend op_user_id * feat: optimizing asynchronous context * feat: optimizing memamq size * feat: add GetSeqMessage * feat: GroupApplicationAgreeMemberEnterNotification * feat: GroupApplicationAgreeMemberEnterNotification * feat: go.mod * feat: go.mod * feat: join group notification and get seq * feat: join group notification and get seq * feat: avoid pulling messages from sessions with a large number of max seq values of 0 * feat: API supports gzip * go.mod * fix: nil pointer error on close * fix: listen error * fix: listen error * update go.mod * feat: add log * fix: token parse token value * fix: GetMsgBySeqs boundary issues * fix: sn_ not sort * fix: sn_ not sort * fix: sn_ not sort * fix: jssdk add * fix: jssdk support * fix: jssdk support * fix: jssdk support * fix: the message I sent is not set to read seq in mongodb * fix: cannot modify group member avatars * fix: MemberEnterNotification * fix: MemberEnterNotification * fix: MsgData status * feat: add ApplicationVersion * feat: add ApplicationVersion * feat: add ApplicationVersion --------- Co-authored-by: withchao <withchao@users.noreply.github.com> * feat: ApplicationVersion move chat (#2813) * fix: GroupApplicationAcceptedNotification * fix: GroupApplicationAcceptedNotification * fix: NotificationUserInfoUpdate * cicd: robot automated Change * fix: component * fix: getConversationInfo * feat: cron task * feat: cron task * feat: cron task * feat: cron task * feat: cron task * fix: minio config url recognition error * update gomake version * update gomake version * fix: seq conversion bug * fix: redis pipe exec * fix: ImportFriends * fix: A large number of logs keysAndValues length is not even * feat: mark read aggregate write * feat: online status supports redis cluster * feat: online status supports redis cluster * feat: online status supports redis cluster * merge * merge * read seq is written to mongo * read seq is written to mongo * fix: invitation to join group notification * fix: friend op_user_id * feat: optimizing asynchronous context * feat: optimizing memamq size * feat: add GetSeqMessage * feat: GroupApplicationAgreeMemberEnterNotification * feat: GroupApplicationAgreeMemberEnterNotification * feat: go.mod * feat: go.mod * feat: join group notification and get seq * feat: join group notification and get seq * feat: avoid pulling messages from sessions with a large number of max seq values of 0 * feat: API supports gzip * go.mod * fix: nil pointer error on close * fix: listen error * fix: listen error * update go.mod * feat: add log * fix: token parse token value * fix: GetMsgBySeqs boundary issues * fix: sn_ not sort * fix: sn_ not sort * fix: sn_ not sort * fix: jssdk add * fix: jssdk support * fix: jssdk support * fix: jssdk support * fix: the message I sent is not set to read seq in mongodb * fix: cannot modify group member avatars * fix: MemberEnterNotification * fix: MemberEnterNotification * fix: MsgData status * feat: add ApplicationVersion * feat: ApplicationVersion move chat --------- Co-authored-by: withchao <withchao@users.noreply.github.com> * fix: improve condition check. (#2815) --------- Co-authored-by: Monet Lee <monet_lee@163.com> Co-authored-by: icey-yu <119291641+icey-yu@users.noreply.github.com> Co-authored-by: Kürşat Aktaş <kursat.ce@gmail.com> Co-authored-by: chao <48119764+withchao@users.noreply.github.com> Co-authored-by: withchao <withchao@users.noreply.github.com>
This commit is contained in:
parent
646f1b9194
commit
ee04c156b5
65
.github/workflows/cleanup-after-milestone-prs-merged.yml
vendored
Normal file
65
.github/workflows/cleanup-after-milestone-prs-merged.yml
vendored
Normal file
@ -0,0 +1,65 @@
|
||||
name: Cleanup After Milestone PRs Merged
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
types:
|
||||
- closed
|
||||
|
||||
jobs:
|
||||
handle_pr:
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
steps:
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@v4.2.0
|
||||
|
||||
- name: Get the PR title and extract PR numbers
|
||||
id: extract_pr_numbers
|
||||
run: |
|
||||
# Get the PR title
|
||||
PR_TITLE="${{ github.event.pull_request.title }}"
|
||||
|
||||
echo "PR Title: $PR_TITLE"
|
||||
|
||||
# Extract PR numbers from the title
|
||||
PR_NUMBERS=$(echo "$PR_TITLE" | grep -oE "#[0-9]+" | tr -d '#' | tr '\n' ' ')
|
||||
echo "Extracted PR Numbers: $PR_NUMBERS"
|
||||
|
||||
# Save PR numbers to a file
|
||||
echo "$PR_NUMBERS" > pr_numbers.txt
|
||||
echo "Saved PR Numbers to pr_numbers.txt"
|
||||
|
||||
# Check if the title matches a specific pattern
|
||||
if echo "$PR_TITLE" | grep -qE "^deps: Merge( #[0-9]+)+ PRs into .+"; then
|
||||
echo "proceed=true" >> $GITHUB_OUTPUT
|
||||
else
|
||||
echo "proceed=false" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
|
||||
- name: Use extracted PR numbers and label PRs
|
||||
if: (steps.extract_pr_numbers.outputs.proceed == 'true' || contains(github.event.pull_request.labels.*.name, 'milestone-merge')) && github.event.pull_request.merged == true
|
||||
run: |
|
||||
# Read the previously saved PR numbers
|
||||
PR_NUMBERS=$(cat pr_numbers.txt)
|
||||
echo "Using extracted PR Numbers: $PR_NUMBERS"
|
||||
|
||||
# Loop through each PR number and add label
|
||||
for PR_NUMBER in $PR_NUMBERS; do
|
||||
echo "Adding 'cherry-picked' label to PR #$PR_NUMBER"
|
||||
curl -X POST \
|
||||
-H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" \
|
||||
-H "Accept: application/vnd.github+json" \
|
||||
https://api.github.com/repos/${{ github.repository }}/issues/$PR_NUMBER/labels \
|
||||
-d '{"labels":["cherry-picked"]}'
|
||||
done
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
|
||||
- name: Delete branch after PR close
|
||||
if: steps.extract_pr_numbers.outputs.proceed == 'true' || contains(github.event.pull_request.labels.*.name, 'milestone-merge')
|
||||
run: |
|
||||
BRANCH_NAME="${{ github.event.pull_request.head.ref }}"
|
||||
echo "Branch to delete: $BRANCH_NAME"
|
||||
git push origin --delete "$BRANCH_NAME"
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
4
.github/workflows/go-build-test.yml
vendored
4
.github/workflows/go-build-test.yml
vendored
@ -2,11 +2,7 @@ name: Go Build Test
|
||||
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- main
|
||||
pull_request:
|
||||
branches:
|
||||
- main
|
||||
paths-ignore:
|
||||
- '**/*.md'
|
||||
|
||||
|
218
.github/workflows/merge-from-milestone.yml
vendored
Normal file
218
.github/workflows/merge-from-milestone.yml
vendored
Normal file
@ -0,0 +1,218 @@
|
||||
name: Create Pre-Release PR from Milestone
|
||||
|
||||
permissions:
|
||||
contents: write
|
||||
pull-requests: write
|
||||
issues: write
|
||||
|
||||
on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
milestone_name:
|
||||
description: 'Milestone name to collect closed PRs from'
|
||||
required: true
|
||||
default: 'v3.8.2'
|
||||
target_branch:
|
||||
description: 'Target branch to merge the consolidated PR'
|
||||
required: true
|
||||
default: 'pre-release-v3.8.2'
|
||||
|
||||
env:
|
||||
MILESTONE_NAME: ${{ github.event.inputs.milestone_name || 'v3.8.2' }}
|
||||
TARGET_BRANCH: ${{ github.event.inputs.target_branch || 'pre-release-v3.8.2' }}
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
BOT_TOKEN: ${{ secrets.BOT_TOKEN }}
|
||||
LABEL_NAME: cherry-picked
|
||||
TEMP_DIR: /tmp # Using /tmp as the temporary directory
|
||||
|
||||
jobs:
|
||||
cherry_pick_milestone_prs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Setup temp directory
|
||||
run: |
|
||||
# Create the temporary directory and initialize necessary files
|
||||
mkdir -p ${{ env.TEMP_DIR }}
|
||||
touch ${{ env.TEMP_DIR }}/pr_numbers.txt
|
||||
touch ${{ env.TEMP_DIR }}/commit_hashes.txt
|
||||
touch ${{ env.TEMP_DIR }}/pr_title.txt
|
||||
touch ${{ env.TEMP_DIR }}/pr_body.txt
|
||||
touch ${{ env.TEMP_DIR }}/created_pr_number.txt
|
||||
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0
|
||||
token: ${{ secrets.BOT_TOKEN }}
|
||||
|
||||
- name: Setup Git User for OpenIM-Robot
|
||||
run: |
|
||||
# Set up Git credentials for the bot
|
||||
git config --global user.email "OpenIM-Robot@users.noreply.github.com"
|
||||
git config --global user.name "OpenIM-Robot"
|
||||
|
||||
- name: Fetch Milestone ID and Filter PR Numbers
|
||||
env:
|
||||
MILESTONE_NAME: ${{ env.MILESTONE_NAME }}
|
||||
run: |
|
||||
# Fetch milestone details and extract milestone ID
|
||||
milestones=$(curl -s -H "Authorization: token $BOT_TOKEN" \
|
||||
-H "Accept: application/vnd.github+json" \
|
||||
"https://api.github.com/repos/${{ github.repository }}/milestones")
|
||||
milestone_id=$(echo "$milestones" | grep -B3 "\"title\": \"$MILESTONE_NAME\"" | grep '"number":' | head -n1 | grep -o '[0-9]\+')
|
||||
if [ -z "$milestone_id" ]; then
|
||||
echo "Milestone '$MILESTONE_NAME' not found. Exiting."
|
||||
exit 1
|
||||
fi
|
||||
echo "Milestone ID: $milestone_id"
|
||||
echo "MILESTONE_ID=$milestone_id" >> $GITHUB_ENV
|
||||
|
||||
# Fetch issues for the milestone
|
||||
issues=$(curl -s -H "Authorization: token $BOT_TOKEN" \
|
||||
-H "Accept: application/vnd.github+json" \
|
||||
"https://api.github.com/repos/${{ github.repository }}/issues?milestone=$milestone_id&state=closed&per_page=100")
|
||||
|
||||
> ${{ env.TEMP_DIR }}/pr_numbers.txt
|
||||
|
||||
# Filter PRs that do not have the 'cherry-picked' label
|
||||
for pr_number in $(echo "$issues" | jq -r '.[] | select(.pull_request != null) | .number'); do
|
||||
labels=$(curl -s -H "Authorization: token $BOT_TOKEN" \
|
||||
-H "Accept: application/vnd.github+json" \
|
||||
"https://api.github.com/repos/${{ github.repository }}/issues/$pr_number/labels" | jq -r '.[].name')
|
||||
|
||||
if ! echo "$labels" | grep -q "${LABEL_NAME}"; then
|
||||
echo "PR #$pr_number does not have the 'cherry-picked' label. Adding to the list."
|
||||
echo "$pr_number" >> ${{ env.TEMP_DIR }}/pr_numbers.txt
|
||||
else
|
||||
echo "PR #$pr_number already has the 'cherry-picked' label. Skipping."
|
||||
fi
|
||||
done
|
||||
|
||||
# Sort the filtered PR numbers
|
||||
sort -n ${{ env.TEMP_DIR }}/pr_numbers.txt -o ${{ env.TEMP_DIR }}/pr_numbers.txt
|
||||
|
||||
echo "Filtered and sorted PR numbers:"
|
||||
cat ${{ env.TEMP_DIR }}/pr_numbers.txt || echo "No closed PR numbers found for milestone."
|
||||
|
||||
- name: Fetch Merge Commits for PRs and Generate Title and Body
|
||||
run: |
|
||||
# Ensure the files are initialized
|
||||
> ${{ env.TEMP_DIR }}/commit_hashes.txt
|
||||
> ${{ env.TEMP_DIR }}/pr_title.txt
|
||||
> ${{ env.TEMP_DIR }}/pr_body.txt
|
||||
|
||||
# Write description to the PR body
|
||||
echo "### Description:" >> ${{ env.TEMP_DIR }}/pr_body.txt
|
||||
echo "Merging PRs from milestone \`$MILESTONE_NAME\` into target branch \`$TARGET_BRANCH\`." >> ${{ env.TEMP_DIR }}/pr_body.txt
|
||||
echo "" >> ${{ env.TEMP_DIR }}/pr_body.txt
|
||||
echo "### Need Merge PRs:" >> ${{ env.TEMP_DIR }}/pr_body.txt
|
||||
|
||||
pr_numbers_in_title=""
|
||||
|
||||
# Process sorted PR numbers and generate commit hashes
|
||||
for pr_number in $(cat ${{ env.TEMP_DIR }}/pr_numbers.txt); do
|
||||
echo "Processing PR #$pr_number"
|
||||
pr_details=$(curl -s -H "Authorization: token $BOT_TOKEN" \
|
||||
-H "Accept: application/vnd.github+json" \
|
||||
"https://api.github.com/repos/${{ github.repository }}/pulls/$pr_number")
|
||||
pr_title=$(echo "$pr_details" | jq -r '.title')
|
||||
merge_commit=$(echo "$pr_details" | jq -r '.merge_commit_sha')
|
||||
short_commit_hash=$(echo "$merge_commit" | cut -c 1-7)
|
||||
|
||||
# Append PR details to the body
|
||||
echo "- $pr_title: (#$pr_number) ($short_commit_hash)" >> ${{ env.TEMP_DIR }}/pr_body.txt
|
||||
|
||||
if [ "$merge_commit" != "null" ];then
|
||||
echo "$merge_commit" >> ${{ env.TEMP_DIR }}/commit_hashes.txt
|
||||
echo "#$pr_number" >> ${{ env.TEMP_DIR }}/pr_title.txt
|
||||
pr_numbers_in_title="$pr_numbers_in_title #$pr_number"
|
||||
fi
|
||||
done
|
||||
|
||||
commit_hashes=$(cat ${{ env.TEMP_DIR }}/commit_hashes.txt | tr '\n' ' ')
|
||||
first_commit_hash=$(head -n 1 ${{ env.TEMP_DIR }}/commit_hashes.txt)
|
||||
cherry_pick_branch="cherry-pick-${first_commit_hash:0:7}"
|
||||
echo "COMMIT_HASHES=$commit_hashes" >> $GITHUB_ENV
|
||||
echo "CHERRY_PICK_BRANCH=$cherry_pick_branch" >> $GITHUB_ENV
|
||||
echo "pr_numbers_in_title=$pr_numbers_in_title" >> $GITHUB_ENV
|
||||
|
||||
- name: Pull and Cherry-pick Commits, Then Push
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
BOT_TOKEN: ${{ secrets.BOT_TOKEN }}
|
||||
run: |
|
||||
# Fetch and pull the latest changes from the target branch
|
||||
git fetch origin
|
||||
git checkout $TARGET_BRANCH
|
||||
git pull origin $TARGET_BRANCH
|
||||
|
||||
# Create a new branch for cherry-picking
|
||||
git checkout -b $CHERRY_PICK_BRANCH
|
||||
|
||||
# Cherry-pick the commits and handle conflicts
|
||||
for commit_hash in $COMMIT_HASHES; do
|
||||
echo "Attempting to cherry-pick commit $commit_hash"
|
||||
if ! git cherry-pick "$commit_hash" --strategy=recursive -X theirs; then
|
||||
echo "Conflict detected for $commit_hash. Resolving with incoming changes."
|
||||
conflict_files=$(git diff --name-only --diff-filter=U)
|
||||
echo "Conflicting files:"
|
||||
echo "$conflict_files"
|
||||
|
||||
for file in $conflict_files; do
|
||||
if [ -f "$file" ]; then
|
||||
echo "Resolving conflict for $file"
|
||||
git add "$file"
|
||||
else
|
||||
echo "File $file has been deleted. Skipping."
|
||||
git rm "$file"
|
||||
fi
|
||||
done
|
||||
|
||||
echo "Conflicts resolved. Continuing cherry-pick."
|
||||
git cherry-pick --continue
|
||||
else
|
||||
echo "Cherry-pick successful for commit $commit_hash."
|
||||
fi
|
||||
done
|
||||
|
||||
# Push the cherry-pick branch to the repository
|
||||
git remote set-url origin "https://${BOT_TOKEN}@github.com/${{ github.repository }}.git"
|
||||
git push origin $CHERRY_PICK_BRANCH --force
|
||||
|
||||
- name: Create Pull Request
|
||||
run: |
|
||||
# Prepare and create the PR
|
||||
pr_title="deps: Merge ${{ env.pr_numbers_in_title }} PRs into $TARGET_BRANCH"
|
||||
pr_body=$(cat ${{ env.TEMP_DIR }}/pr_body.txt)
|
||||
|
||||
echo "Prepared PR title:"
|
||||
echo "$pr_title"
|
||||
echo "Prepared PR body:"
|
||||
echo "$pr_body"
|
||||
|
||||
# Create the PR using the GitHub API
|
||||
response=$(curl -s -X POST -H "Authorization: token $BOT_TOKEN" \
|
||||
-H "Accept: application/vnd.github+json" \
|
||||
https://api.github.com/repos/${{ github.repository }}/pulls \
|
||||
-d "$(jq -n --arg title "$pr_title" \
|
||||
--arg head "$CHERRY_PICK_BRANCH" \
|
||||
--arg base "$TARGET_BRANCH" \
|
||||
--arg body "$pr_body" \
|
||||
'{title: $title, head: $head, base: $base, body: $body}')")
|
||||
|
||||
pr_number=$(echo "$response" | jq -r '.number')
|
||||
echo "$pr_number" > ${{ env.TEMP_DIR }}/created_pr_number.txt
|
||||
echo "Created PR #$pr_number"
|
||||
|
||||
- name: Add Label to Created Pull Request
|
||||
run: |
|
||||
# Add 'milestone-merge' label to the created PR
|
||||
pr_number=$(cat ${{ env.TEMP_DIR }}/created_pr_number.txt)
|
||||
echo "Adding label to PR #$pr_number"
|
||||
|
||||
curl -s -X POST -H "Authorization: token $GITHUB_TOKEN" \
|
||||
-H "Accept: application/vnd.github+json" \
|
||||
-d '{"labels": ["milestone-merge"]}' \
|
||||
"https://api.github.com/repos/${{ github.repository }}/issues/$pr_number/labels"
|
||||
|
||||
echo "Added 'milestone-merge' label to PR #$pr_number."
|
@ -16,6 +16,7 @@
|
||||
[](https://www.bestpractices.dev/projects/8045)
|
||||
[](https://github.com/openimsdk/open-im-server/issues?q=is%3Aissue+is%3Aopen+sort%3Aupdated-desc+label%3A%22good+first+issue%22)
|
||||
[](https://golang.org/)
|
||||
[](https://gurubase.io/g/openim)
|
||||
|
||||
|
||||
<p align="center">
|
||||
|
@ -1,20 +1,3 @@
|
||||
# Copyright © 2023 OpenIM. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# Determines if a message should be sent. If set to false, it triggers a silent sync without a message. If true, it requires triggering a conversation.
|
||||
# For rpc notification, send twice: once as a message and once as a notification.
|
||||
# The options field 'isNotification' indicates if it's a notification.
|
||||
groupCreated:
|
||||
isSendMsg: true
|
||||
# Reliability level of the message sending.
|
||||
@ -309,9 +292,9 @@ userInfoUpdated:
|
||||
unreadCount: false
|
||||
offlinePush:
|
||||
enable: true
|
||||
title: Remove a blocked user
|
||||
desc: Remove a blocked user
|
||||
ext: Remove a blocked user
|
||||
title: userInfo updated
|
||||
desc: userInfo updated
|
||||
ext: userInfo updated
|
||||
|
||||
userStatusChanged:
|
||||
isSendMsg: false
|
||||
|
2
go.mod
2
go.mod
@ -12,7 +12,7 @@ require (
|
||||
github.com/gorilla/websocket v1.5.1
|
||||
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
|
||||
github.com/mitchellh/mapstructure v1.5.0
|
||||
github.com/openimsdk/protocol v0.0.72-alpha.47
|
||||
github.com/openimsdk/protocol v0.0.72-alpha.51
|
||||
github.com/openimsdk/tools v0.0.50-alpha.16
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/prometheus/client_golang v1.18.0
|
||||
|
4
go.sum
4
go.sum
@ -319,8 +319,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
|
||||
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
|
||||
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
|
||||
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
|
||||
github.com/openimsdk/protocol v0.0.72-alpha.47 h1:FGHnEwsA05GxT3vnz7YH3fbVkuoO3P71ZZgkQQ71MjA=
|
||||
github.com/openimsdk/protocol v0.0.72-alpha.47/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
|
||||
github.com/openimsdk/protocol v0.0.72-alpha.51 h1:G5Yjndp/FRyOJWhoQcSF2x2GvYiAIlqN0vjkvjUPycU=
|
||||
github.com/openimsdk/protocol v0.0.72-alpha.51/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
|
||||
github.com/openimsdk/tools v0.0.50-alpha.16 h1:bC1AQvJMuOHtZm8LZRvN8L5mH1Ws2VYdL+TLTs1iGSc=
|
||||
github.com/openimsdk/tools v0.0.50-alpha.16/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
|
||||
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
|
||||
|
@ -198,13 +198,6 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En
|
||||
objectGroup.POST("/initiate_form_data", t.InitiateFormData)
|
||||
objectGroup.POST("/complete_form_data", t.CompleteFormData)
|
||||
objectGroup.GET("/*name", t.ObjectRedirect)
|
||||
|
||||
applicationGroup := r.Group("application")
|
||||
applicationGroup.POST("/add_version", t.AddApplicationVersion)
|
||||
applicationGroup.POST("/update_version", t.UpdateApplicationVersion)
|
||||
applicationGroup.POST("/delete_version", t.DeleteApplicationVersion)
|
||||
applicationGroup.POST("/latest_version", t.LatestApplicationVersion)
|
||||
applicationGroup.POST("/page_versions", t.PageApplicationVersion)
|
||||
}
|
||||
// Message
|
||||
msgGroup := r.Group("/msg")
|
||||
@ -297,6 +290,4 @@ func GinParseToken(authRPC *rpcclient.Auth) gin.HandlerFunc {
|
||||
var Whitelist = []string{
|
||||
"/auth/get_admin_token",
|
||||
"/auth/parse_token",
|
||||
"/application/latest_version",
|
||||
"/application/page_versions",
|
||||
}
|
||||
|
@ -170,23 +170,3 @@ func (o *ThirdApi) SearchLogs(c *gin.Context) {
|
||||
func (o *ThirdApi) GetPrometheus(c *gin.Context) {
|
||||
c.Redirect(http.StatusFound, o.GrafanaUrl)
|
||||
}
|
||||
|
||||
func (o *ThirdApi) LatestApplicationVersion(c *gin.Context) {
|
||||
a2r.Call(third.ThirdClient.LatestApplicationVersion, o.Client, c)
|
||||
}
|
||||
|
||||
func (o *ThirdApi) AddApplicationVersion(c *gin.Context) {
|
||||
a2r.Call(third.ThirdClient.AddApplicationVersion, o.Client, c)
|
||||
}
|
||||
|
||||
func (o *ThirdApi) UpdateApplicationVersion(c *gin.Context) {
|
||||
a2r.Call(third.ThirdClient.UpdateApplicationVersion, o.Client, c)
|
||||
}
|
||||
|
||||
func (o *ThirdApi) DeleteApplicationVersion(c *gin.Context) {
|
||||
a2r.Call(third.ThirdClient.DeleteApplicationVersion, o.Client, c)
|
||||
}
|
||||
|
||||
func (o *ThirdApi) PageApplicationVersion(c *gin.Context) {
|
||||
a2r.Call(third.ThirdClient.PageApplicationVersion, o.Client, c)
|
||||
}
|
||||
|
@ -128,6 +128,7 @@ func (m *MsgTransfer) Start(index int, config *Config) error {
|
||||
|
||||
go m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyCH)
|
||||
go m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyMongoCH)
|
||||
go m.historyCH.HandleUserHasReadSeqMessages(m.ctx)
|
||||
err := m.historyCH.redisMessageBatches.Start()
|
||||
if err != nil {
|
||||
return err
|
||||
@ -157,12 +158,14 @@ func (m *MsgTransfer) Start(index int, config *Config) error {
|
||||
// graceful close kafka client.
|
||||
m.cancel()
|
||||
m.historyCH.redisMessageBatches.Close()
|
||||
m.historyCH.Close()
|
||||
m.historyCH.historyConsumerGroup.Close()
|
||||
m.historyMongoCH.historyConsumerGroup.Close()
|
||||
return nil
|
||||
case <-netDone:
|
||||
m.cancel()
|
||||
m.historyCH.redisMessageBatches.Close()
|
||||
m.historyCH.Close()
|
||||
m.historyCH.historyConsumerGroup.Close()
|
||||
m.historyMongoCH.historyConsumerGroup.Close()
|
||||
close(netDone)
|
||||
|
@ -18,8 +18,10 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/IBM/sarama"
|
||||
@ -40,11 +42,12 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
size = 500
|
||||
mainDataBuffer = 500
|
||||
subChanBuffer = 50
|
||||
worker = 50
|
||||
interval = 100 * time.Millisecond
|
||||
size = 500
|
||||
mainDataBuffer = 500
|
||||
subChanBuffer = 50
|
||||
worker = 50
|
||||
interval = 100 * time.Millisecond
|
||||
hasReadChanBuffer = 1000
|
||||
)
|
||||
|
||||
type ContextMsg struct {
|
||||
@ -52,14 +55,23 @@ type ContextMsg struct {
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
// This structure is used for asynchronously writing the sender’s read sequence (seq) regarding a message into MongoDB.
|
||||
// For example, if the sender sends a message with a seq of 10, then their own read seq for this conversation should be set to 10.
|
||||
type userHasReadSeq struct {
|
||||
conversationID string
|
||||
userHasReadMap map[string]int64
|
||||
}
|
||||
|
||||
type OnlineHistoryRedisConsumerHandler struct {
|
||||
historyConsumerGroup *kafka.MConsumerGroup
|
||||
|
||||
redisMessageBatches *batcher.Batcher[sarama.ConsumerMessage]
|
||||
|
||||
msgTransferDatabase controller.MsgTransferDatabase
|
||||
conversationRpcClient *rpcclient.ConversationRpcClient
|
||||
groupRpcClient *rpcclient.GroupRpcClient
|
||||
msgTransferDatabase controller.MsgTransferDatabase
|
||||
conversationRpcClient *rpcclient.ConversationRpcClient
|
||||
groupRpcClient *rpcclient.GroupRpcClient
|
||||
conversationUserHasReadChan chan *userHasReadSeq
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database controller.MsgTransferDatabase,
|
||||
@ -70,6 +82,8 @@ func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database cont
|
||||
}
|
||||
var och OnlineHistoryRedisConsumerHandler
|
||||
och.msgTransferDatabase = database
|
||||
och.conversationUserHasReadChan = make(chan *userHasReadSeq, hasReadChanBuffer)
|
||||
och.wg.Add(1)
|
||||
|
||||
b := batcher.New[sarama.ConsumerMessage](
|
||||
batcher.WithSize(size),
|
||||
@ -115,25 +129,25 @@ func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID
|
||||
}
|
||||
|
||||
func (och *OnlineHistoryRedisConsumerHandler) doSetReadSeq(ctx context.Context, msgs []*ContextMsg) {
|
||||
type seqKey struct {
|
||||
conversationID string
|
||||
userID string
|
||||
}
|
||||
var readSeq map[seqKey]int64
|
||||
|
||||
var conversationID string
|
||||
var userSeqMap map[string]int64
|
||||
for _, msg := range msgs {
|
||||
if msg.message.ContentType != constant.HasReadReceipt {
|
||||
continue
|
||||
}
|
||||
var elem sdkws.NotificationElem
|
||||
if err := json.Unmarshal(msg.message.Content, &elem); err != nil {
|
||||
log.ZError(ctx, "handlerConversationRead Unmarshal NotificationElem msg err", err, "msg", msg)
|
||||
log.ZWarn(ctx, "handlerConversationRead Unmarshal NotificationElem msg err", err, "msg", msg)
|
||||
continue
|
||||
}
|
||||
var tips sdkws.MarkAsReadTips
|
||||
if err := json.Unmarshal([]byte(elem.Detail), &tips); err != nil {
|
||||
log.ZError(ctx, "handlerConversationRead Unmarshal MarkAsReadTips msg err", err, "msg", msg)
|
||||
log.ZWarn(ctx, "handlerConversationRead Unmarshal MarkAsReadTips msg err", err, "msg", msg)
|
||||
continue
|
||||
}
|
||||
//The conversation ID for each batch of messages processed by the batcher is the same.
|
||||
conversationID = tips.ConversationID
|
||||
if len(tips.Seqs) > 0 {
|
||||
for _, seq := range tips.Seqs {
|
||||
if tips.HasReadSeq < seq {
|
||||
@ -146,26 +160,25 @@ func (och *OnlineHistoryRedisConsumerHandler) doSetReadSeq(ctx context.Context,
|
||||
if tips.HasReadSeq < 0 {
|
||||
continue
|
||||
}
|
||||
if readSeq == nil {
|
||||
readSeq = make(map[seqKey]int64)
|
||||
if userSeqMap == nil {
|
||||
userSeqMap = make(map[string]int64)
|
||||
}
|
||||
key := seqKey{
|
||||
conversationID: tips.ConversationID,
|
||||
userID: tips.MarkAsReadUserID,
|
||||
}
|
||||
if readSeq[key] > tips.HasReadSeq {
|
||||
|
||||
if userSeqMap[tips.MarkAsReadUserID] > tips.HasReadSeq {
|
||||
continue
|
||||
}
|
||||
readSeq[key] = tips.HasReadSeq
|
||||
userSeqMap[tips.MarkAsReadUserID] = tips.HasReadSeq
|
||||
}
|
||||
if readSeq == nil {
|
||||
if userSeqMap == nil {
|
||||
return
|
||||
}
|
||||
for key, seq := range readSeq {
|
||||
if err := och.msgTransferDatabase.SetHasReadSeqToDB(ctx, key.userID, key.conversationID, seq); err != nil {
|
||||
log.ZError(ctx, "set read seq to db error", err, "userID", key.userID, "conversationID", key.conversationID, "seq", seq)
|
||||
}
|
||||
if len(conversationID) == 0 {
|
||||
log.ZWarn(ctx, "conversation err", nil, "conversationID", conversationID)
|
||||
}
|
||||
if err := och.msgTransferDatabase.SetHasReadSeqToDB(ctx, conversationID, userSeqMap); err != nil {
|
||||
log.ZWarn(ctx, "set read seq to db error", err, "conversationID", conversationID, "userSeqMap", userSeqMap)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (och *OnlineHistoryRedisConsumerHandler) parseConsumerMessages(ctx context.Context, consumerMessages []*sarama.ConsumerMessage) []*ContextMsg {
|
||||
@ -250,12 +263,21 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key
|
||||
}
|
||||
if len(storageMessageList) > 0 {
|
||||
msg := storageMessageList[0]
|
||||
lastSeq, isNewConversation, err := och.msgTransferDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList)
|
||||
lastSeq, isNewConversation, userSeqMap, err := och.msgTransferDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList)
|
||||
if err != nil && !errors.Is(errs.Unwrap(err), redis.Nil) {
|
||||
log.ZError(ctx, "batch data insert to redis err", err, "storageMsgList", storageMessageList)
|
||||
log.ZWarn(ctx, "batch data insert to redis err", err, "storageMsgList", storageMessageList)
|
||||
return
|
||||
}
|
||||
log.ZInfo(ctx, "BatchInsertChat2Cache end")
|
||||
err = och.msgTransferDatabase.SetHasReadSeqs(ctx, conversationID, userSeqMap)
|
||||
if err != nil {
|
||||
log.ZWarn(ctx, "SetHasReadSeqs error", err, "userSeqMap", userSeqMap, "conversationID", conversationID)
|
||||
prommetrics.SeqSetFailedCounter.Inc()
|
||||
}
|
||||
och.conversationUserHasReadChan <- &userHasReadSeq{
|
||||
conversationID: conversationID,
|
||||
userHasReadMap: userSeqMap,
|
||||
}
|
||||
|
||||
if isNewConversation {
|
||||
switch msg.SessionType {
|
||||
@ -308,7 +330,7 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Con
|
||||
storageMessageList = append(storageMessageList, msg.message)
|
||||
}
|
||||
if len(storageMessageList) > 0 {
|
||||
lastSeq, _, err := och.msgTransferDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList)
|
||||
lastSeq, _, _, err := och.msgTransferDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList)
|
||||
if err != nil {
|
||||
log.ZError(ctx, "notification batch insert to redis error", err, "conversationID", conversationID,
|
||||
"storageList", storageMessageList)
|
||||
@ -323,6 +345,21 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Con
|
||||
och.toPushTopic(ctx, key, conversationID, storageList)
|
||||
}
|
||||
}
|
||||
func (och *OnlineHistoryRedisConsumerHandler) HandleUserHasReadSeqMessages(ctx context.Context) {
|
||||
defer och.wg.Done()
|
||||
|
||||
for msg := range och.conversationUserHasReadChan {
|
||||
if err := och.msgTransferDatabase.SetHasReadSeqToDB(ctx, msg.conversationID, msg.userHasReadMap); err != nil {
|
||||
log.ZWarn(ctx, "set read seq to db error", err, "conversationID", msg.conversationID, "userSeqMap", msg.userHasReadMap)
|
||||
}
|
||||
}
|
||||
|
||||
log.ZInfo(ctx, "Channel closed, exiting handleUserHasReadSeqMessages")
|
||||
}
|
||||
func (och *OnlineHistoryRedisConsumerHandler) Close() {
|
||||
close(och.conversationUserHasReadChan)
|
||||
och.wg.Wait()
|
||||
}
|
||||
|
||||
func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, key, conversationID string, msgs []*ContextMsg) {
|
||||
for _, v := range msgs {
|
||||
|
@ -24,7 +24,6 @@ import (
|
||||
"github.com/openimsdk/protocol/constant"
|
||||
"github.com/openimsdk/protocol/sdkws"
|
||||
"github.com/openimsdk/tools/mcontext"
|
||||
"github.com/openimsdk/tools/utils/datautil"
|
||||
)
|
||||
|
||||
func (c *ConsumerHandler) webhookBeforeOfflinePush(ctx context.Context, before *config.BeforeConfig, userIDs []string, msg *sdkws.MsgData, offlinePushUserIDs *[]string) error {
|
||||
@ -70,7 +69,7 @@ func (c *ConsumerHandler) webhookBeforeOfflinePush(ctx context.Context, before *
|
||||
|
||||
func (c *ConsumerHandler) webhookBeforeOnlinePush(ctx context.Context, before *config.BeforeConfig, userIDs []string, msg *sdkws.MsgData) error {
|
||||
return webhook.WithCondition(ctx, before, func(ctx context.Context) error {
|
||||
if datautil.Contain(msg.SendID, userIDs...) || msg.ContentType == constant.Typing {
|
||||
if msg.ContentType == constant.Typing {
|
||||
return nil
|
||||
}
|
||||
req := callbackstruct.CallbackBeforePushReq{
|
||||
|
@ -1026,7 +1026,7 @@ func (g *groupServer) SetGroupInfo(ctx context.Context, req *pbgroup.SetGroupInf
|
||||
}
|
||||
num := len(update)
|
||||
if req.GroupInfoForSet.Notification != "" {
|
||||
num--
|
||||
num -= 3
|
||||
func() {
|
||||
conversation := &pbconversation.ConversationReq{
|
||||
ConversationID: msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, req.GroupInfoForSet.GroupID),
|
||||
@ -1133,8 +1133,9 @@ func (g *groupServer) SetGroupInfoEx(ctx context.Context, req *pbgroup.SetGroupI
|
||||
}
|
||||
|
||||
num := len(updatedData)
|
||||
|
||||
if req.Notification != nil {
|
||||
num--
|
||||
num -= 3
|
||||
|
||||
if req.Notification.Value != "" {
|
||||
func() {
|
||||
@ -1219,7 +1220,7 @@ func (g *groupServer) TransferGroupOwner(ctx context.Context, req *pbgroup.Trans
|
||||
}
|
||||
}
|
||||
|
||||
if newOwner.MuteEndTime != time.Unix(0, 0) {
|
||||
if newOwner.MuteEndTime.After(time.Now()) {
|
||||
if _, err := g.CancelMuteGroupMember(ctx, &pbgroup.CancelMuteGroupMemberReq{
|
||||
GroupID: group.GroupID,
|
||||
UserID: req.NewOwnerUserID}); err != nil {
|
||||
@ -1810,7 +1811,6 @@ func (g *groupServer) GetSpecifiedUserGroupRequestInfo(ctx context.Context, req
|
||||
}
|
||||
|
||||
if req.UserID != opUserID {
|
||||
req.UserID = mcontext.GetOpUserID(ctx)
|
||||
adminIDs, err := g.db.GetGroupRoleLevelMemberIDs(ctx, req.GroupID, constant.GroupAdmin)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -1819,10 +1819,11 @@ func (g *groupServer) GetSpecifiedUserGroupRequestInfo(ctx context.Context, req
|
||||
adminIDs = append(adminIDs, owners[0].UserID)
|
||||
adminIDs = append(adminIDs, g.config.Share.IMAdminUserID...)
|
||||
|
||||
if !datautil.Contain(req.UserID, adminIDs...) {
|
||||
if !datautil.Contain(opUserID, adminIDs...) {
|
||||
return nil, errs.ErrNoPermission.WrapMsg("opUser no permission")
|
||||
}
|
||||
}
|
||||
|
||||
requests, err := g.db.FindGroupRequests(ctx, req.GroupID, []string{req.UserID})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -1,117 +0,0 @@
|
||||
package third
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||
"github.com/openimsdk/protocol/third"
|
||||
"github.com/openimsdk/tools/errs"
|
||||
"github.com/openimsdk/tools/utils/datautil"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"time"
|
||||
)
|
||||
|
||||
func IsNotFound(err error) bool {
|
||||
switch errs.Unwrap(err) {
|
||||
case redis.Nil, mongo.ErrNoDocuments:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func (t *thirdServer) db2pbApplication(val *model.Application) *third.ApplicationVersion {
|
||||
return &third.ApplicationVersion{
|
||||
Id: val.ID.Hex(),
|
||||
Platform: val.Platform,
|
||||
Version: val.Version,
|
||||
Url: val.Url,
|
||||
Text: val.Text,
|
||||
Force: val.Force,
|
||||
Latest: val.Latest,
|
||||
CreateTime: val.CreateTime.UnixMilli(),
|
||||
}
|
||||
}
|
||||
|
||||
func (t *thirdServer) LatestApplicationVersion(ctx context.Context, req *third.LatestApplicationVersionReq) (*third.LatestApplicationVersionResp, error) {
|
||||
res, err := t.applicationDatabase.LatestVersion(ctx, req.Platform)
|
||||
if err == nil {
|
||||
return &third.LatestApplicationVersionResp{Version: t.db2pbApplication(res)}, nil
|
||||
} else if IsNotFound(err) {
|
||||
return &third.LatestApplicationVersionResp{}, nil
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
func (t *thirdServer) AddApplicationVersion(ctx context.Context, req *third.AddApplicationVersionReq) (*third.AddApplicationVersionResp, error) {
|
||||
if err := authverify.CheckAdmin(ctx, t.config.Share.IMAdminUserID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
val := &model.Application{
|
||||
ID: primitive.NewObjectID(),
|
||||
Platform: req.Platform,
|
||||
Version: req.Version,
|
||||
Url: req.Url,
|
||||
Text: req.Text,
|
||||
Force: req.Force,
|
||||
Latest: req.Latest,
|
||||
CreateTime: time.Now(),
|
||||
}
|
||||
if err := t.applicationDatabase.AddVersion(ctx, val); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &third.AddApplicationVersionResp{}, nil
|
||||
}
|
||||
|
||||
func (t *thirdServer) UpdateApplicationVersion(ctx context.Context, req *third.UpdateApplicationVersionReq) (*third.UpdateApplicationVersionResp, error) {
|
||||
if err := authverify.CheckAdmin(ctx, t.config.Share.IMAdminUserID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
oid, err := primitive.ObjectIDFromHex(req.Id)
|
||||
if err != nil {
|
||||
return nil, errs.ErrArgs.WrapMsg("invalid id " + err.Error())
|
||||
}
|
||||
update := make(map[string]any)
|
||||
putUpdate(update, "platform", req.Platform)
|
||||
putUpdate(update, "version", req.Version)
|
||||
putUpdate(update, "url", req.Url)
|
||||
putUpdate(update, "text", req.Text)
|
||||
putUpdate(update, "force", req.Force)
|
||||
putUpdate(update, "latest", req.Latest)
|
||||
if err := t.applicationDatabase.UpdateVersion(ctx, oid, update); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &third.UpdateApplicationVersionResp{}, nil
|
||||
}
|
||||
|
||||
func (t *thirdServer) DeleteApplicationVersion(ctx context.Context, req *third.DeleteApplicationVersionReq) (*third.DeleteApplicationVersionResp, error) {
|
||||
if err := authverify.CheckAdmin(ctx, t.config.Share.IMAdminUserID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ids := make([]primitive.ObjectID, 0, len(req.Id))
|
||||
for _, id := range req.Id {
|
||||
oid, err := primitive.ObjectIDFromHex(id)
|
||||
if err != nil {
|
||||
return nil, errs.ErrArgs.WrapMsg("invalid id " + err.Error())
|
||||
}
|
||||
ids = append(ids, oid)
|
||||
}
|
||||
if err := t.applicationDatabase.DeleteVersion(ctx, ids); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &third.DeleteApplicationVersionResp{}, nil
|
||||
}
|
||||
|
||||
func (t *thirdServer) PageApplicationVersion(ctx context.Context, req *third.PageApplicationVersionReq) (*third.PageApplicationVersionResp, error) {
|
||||
total, res, err := t.applicationDatabase.PageVersion(ctx, req.Platform, req.Pagination)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &third.PageApplicationVersionResp{
|
||||
Total: total,
|
||||
Versions: datautil.Slice(res, t.db2pbApplication),
|
||||
}, nil
|
||||
}
|
@ -38,13 +38,12 @@ import (
|
||||
)
|
||||
|
||||
type thirdServer struct {
|
||||
thirdDatabase controller.ThirdDatabase
|
||||
s3dataBase controller.S3Database
|
||||
userRpcClient rpcclient.UserRpcClient
|
||||
defaultExpire time.Duration
|
||||
config *Config
|
||||
minio *minio.Minio
|
||||
applicationDatabase controller.ApplicationDatabase
|
||||
thirdDatabase controller.ThirdDatabase
|
||||
s3dataBase controller.S3Database
|
||||
userRpcClient rpcclient.UserRpcClient
|
||||
defaultExpire time.Duration
|
||||
config *Config
|
||||
minio *minio.Minio
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
@ -75,10 +74,6 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
applicationMgo, err := mgo.NewApplicationMgo(mgocli.GetDB())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Select the oss method according to the profile policy
|
||||
enable := config.RpcConfig.Object.Enable
|
||||
@ -104,13 +99,12 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
|
||||
}
|
||||
localcache.InitLocalCache(&config.LocalCacheConfig)
|
||||
third.RegisterThirdServer(server, &thirdServer{
|
||||
thirdDatabase: controller.NewThirdDatabase(redis.NewThirdCache(rdb), logdb),
|
||||
userRpcClient: rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID),
|
||||
s3dataBase: controller.NewS3Database(rdb, o, s3db),
|
||||
defaultExpire: time.Hour * 24 * 7,
|
||||
config: config,
|
||||
minio: minioCli,
|
||||
applicationDatabase: controller.NewApplicationDatabase(applicationMgo, redis.NewApplicationRedisCache(applicationMgo, rdb)),
|
||||
thirdDatabase: controller.NewThirdDatabase(redis.NewThirdCache(rdb), logdb),
|
||||
userRpcClient: rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID),
|
||||
s3dataBase: controller.NewS3Database(rdb, o, s3db),
|
||||
defaultExpire: time.Hour * 24 * 7,
|
||||
config: config,
|
||||
minio: minioCli,
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
11
pkg/common/storage/cache/application.go
vendored
11
pkg/common/storage/cache/application.go
vendored
@ -1,11 +0,0 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||
)
|
||||
|
||||
type ApplicationCache interface {
|
||||
LatestVersion(ctx context.Context, platform string) (*model.Application, error)
|
||||
DeleteCache(ctx context.Context, platforms []string) error
|
||||
}
|
@ -1,9 +0,0 @@
|
||||
package cachekey
|
||||
|
||||
const (
|
||||
ApplicationLatestVersion = "APPLICATION_LATEST_VERSION:"
|
||||
)
|
||||
|
||||
func GetApplicationLatestVersionKey(platform string) string {
|
||||
return ApplicationLatestVersion + platform
|
||||
}
|
43
pkg/common/storage/cache/redis/application.go
vendored
43
pkg/common/storage/cache/redis/application.go
vendored
@ -1,43 +0,0 @@
|
||||
package redis
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/dtm-labs/rockscache"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||
"github.com/openimsdk/tools/utils/datautil"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"time"
|
||||
)
|
||||
|
||||
func NewApplicationRedisCache(db database.Application, rdb redis.UniversalClient) *ApplicationRedisCache {
|
||||
return &ApplicationRedisCache{
|
||||
db: db,
|
||||
rcClient: rockscache.NewClient(rdb, *GetRocksCacheOptions()),
|
||||
deleter: NewBatchDeleterRedis(rdb, GetRocksCacheOptions(), nil),
|
||||
expireTime: time.Hour * 24 * 7,
|
||||
}
|
||||
}
|
||||
|
||||
type ApplicationRedisCache struct {
|
||||
db database.Application
|
||||
rcClient *rockscache.Client
|
||||
deleter *BatchDeleterRedis
|
||||
expireTime time.Duration
|
||||
}
|
||||
|
||||
func (a *ApplicationRedisCache) LatestVersion(ctx context.Context, platform string) (*model.Application, error) {
|
||||
return getCache(ctx, a.rcClient, cachekey.GetApplicationLatestVersionKey(platform), a.expireTime, func(ctx context.Context) (*model.Application, error) {
|
||||
return a.db.LatestVersion(ctx, platform)
|
||||
})
|
||||
}
|
||||
|
||||
func (a *ApplicationRedisCache) DeleteCache(ctx context.Context, platforms []string) error {
|
||||
if len(platforms) == 0 {
|
||||
return nil
|
||||
}
|
||||
return a.deleter.ExecDelWithKeys(ctx, datautil.Slice(platforms, func(platform string) string {
|
||||
return cachekey.GetApplicationLatestVersionKey(platform)
|
||||
}))
|
||||
}
|
@ -1,69 +0,0 @@
|
||||
package controller
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||
"github.com/openimsdk/tools/db/pagination"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
)
|
||||
|
||||
type ApplicationDatabase interface {
|
||||
LatestVersion(ctx context.Context, platform string) (*model.Application, error)
|
||||
AddVersion(ctx context.Context, val *model.Application) error
|
||||
UpdateVersion(ctx context.Context, id primitive.ObjectID, update map[string]any) error
|
||||
DeleteVersion(ctx context.Context, id []primitive.ObjectID) error
|
||||
PageVersion(ctx context.Context, platforms []string, page pagination.Pagination) (int64, []*model.Application, error)
|
||||
}
|
||||
|
||||
func NewApplicationDatabase(db database.Application, cache cache.ApplicationCache) ApplicationDatabase {
|
||||
return &applicationDatabase{db: db, cache: cache}
|
||||
}
|
||||
|
||||
type applicationDatabase struct {
|
||||
db database.Application
|
||||
cache cache.ApplicationCache
|
||||
}
|
||||
|
||||
func (a *applicationDatabase) LatestVersion(ctx context.Context, platform string) (*model.Application, error) {
|
||||
return a.cache.LatestVersion(ctx, platform)
|
||||
}
|
||||
|
||||
func (a *applicationDatabase) AddVersion(ctx context.Context, val *model.Application) error {
|
||||
if err := a.db.AddVersion(ctx, val); err != nil {
|
||||
return err
|
||||
}
|
||||
return a.cache.DeleteCache(ctx, []string{val.Platform})
|
||||
}
|
||||
|
||||
func (a *applicationDatabase) UpdateVersion(ctx context.Context, id primitive.ObjectID, update map[string]any) error {
|
||||
platforms, err := a.db.FindPlatform(ctx, []primitive.ObjectID{id})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := a.db.UpdateVersion(ctx, id, update); err != nil {
|
||||
return err
|
||||
}
|
||||
if p, ok := update["platform"]; ok {
|
||||
if val, ok := p.(string); ok {
|
||||
platforms = append(platforms, val)
|
||||
}
|
||||
}
|
||||
return a.cache.DeleteCache(ctx, platforms)
|
||||
}
|
||||
|
||||
func (a *applicationDatabase) DeleteVersion(ctx context.Context, id []primitive.ObjectID) error {
|
||||
platforms, err := a.db.FindPlatform(ctx, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := a.db.DeleteVersion(ctx, id); err != nil {
|
||||
return err
|
||||
}
|
||||
return a.cache.DeleteCache(ctx, platforms)
|
||||
}
|
||||
|
||||
func (a *applicationDatabase) PageVersion(ctx context.Context, platforms []string, page pagination.Pagination) (int64, []*model.Application, error) {
|
||||
return a.db.PageVersion(ctx, platforms, page)
|
||||
}
|
@ -24,8 +24,11 @@ type MsgTransferDatabase interface {
|
||||
DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error
|
||||
|
||||
// BatchInsertChat2Cache increments the sequence number and then batch inserts messages into the cache.
|
||||
BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNewConversation bool, err error)
|
||||
SetHasReadSeqToDB(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error
|
||||
BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNewConversation bool, userHasReadMap map[string]int64, err error)
|
||||
|
||||
SetHasReadSeqs(ctx context.Context, conversationID string, userSeqMap map[string]int64) error
|
||||
|
||||
SetHasReadSeqToDB(ctx context.Context, conversationID string, userSeqMap map[string]int64) error
|
||||
|
||||
// to mq
|
||||
MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error)
|
||||
@ -219,18 +222,18 @@ func (db *msgTransferDatabase) DeleteMessagesFromCache(ctx context.Context, conv
|
||||
return db.msg.DeleteMessagesFromCache(ctx, conversationID, seqs)
|
||||
}
|
||||
|
||||
func (db *msgTransferDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, err error) {
|
||||
func (db *msgTransferDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, userHasReadMap map[string]int64, err error) {
|
||||
lenList := len(msgs)
|
||||
if int64(lenList) > db.msgTable.GetSingleGocMsgNum() {
|
||||
return 0, false, errs.New("message count exceeds limit", "limit", db.msgTable.GetSingleGocMsgNum()).Wrap()
|
||||
return 0, false, nil, errs.New("message count exceeds limit", "limit", db.msgTable.GetSingleGocMsgNum()).Wrap()
|
||||
}
|
||||
if lenList < 1 {
|
||||
return 0, false, errs.New("no messages to insert", "minCount", 1).Wrap()
|
||||
return 0, false, nil, errs.New("no messages to insert", "minCount", 1).Wrap()
|
||||
}
|
||||
currentMaxSeq, err := db.seqConversation.Malloc(ctx, conversationID, int64(len(msgs)))
|
||||
if err != nil {
|
||||
log.ZError(ctx, "storage.seq.Malloc", err)
|
||||
return 0, false, err
|
||||
return 0, false, nil, err
|
||||
}
|
||||
isNew = currentMaxSeq == 0
|
||||
lastMaxSeq := currentMaxSeq
|
||||
@ -248,25 +251,25 @@ func (db *msgTransferDatabase) BatchInsertChat2Cache(ctx context.Context, conver
|
||||
} else {
|
||||
prommetrics.MsgInsertRedisSuccessCounter.Inc()
|
||||
}
|
||||
err = db.setHasReadSeqs(ctx, conversationID, userSeqMap)
|
||||
if err != nil {
|
||||
log.ZError(ctx, "SetHasReadSeqs error", err, "userSeqMap", userSeqMap, "conversationID", conversationID)
|
||||
prommetrics.SeqSetFailedCounter.Inc()
|
||||
}
|
||||
return lastMaxSeq, isNew, errs.Wrap(err)
|
||||
return lastMaxSeq, isNew, userSeqMap, errs.Wrap(err)
|
||||
}
|
||||
|
||||
func (db *msgTransferDatabase) setHasReadSeqs(ctx context.Context, conversationID string, userSeqMap map[string]int64) error {
|
||||
func (db *msgTransferDatabase) SetHasReadSeqs(ctx context.Context, conversationID string, userSeqMap map[string]int64) error {
|
||||
for userID, seq := range userSeqMap {
|
||||
if err := db.seqUser.SetUserReadSeqToDB(ctx, conversationID, userID, seq); err != nil {
|
||||
if err := db.seqUser.SetUserReadSeq(ctx, conversationID, userID, seq); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *msgTransferDatabase) SetHasReadSeqToDB(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error {
|
||||
return db.seqUser.SetUserReadSeqToDB(ctx, conversationID, userID, hasReadSeq)
|
||||
func (db *msgTransferDatabase) SetHasReadSeqToDB(ctx context.Context, conversationID string, userSeqMap map[string]int64) error {
|
||||
for userID, seq := range userSeqMap {
|
||||
if err := db.seqUser.SetUserReadSeqToDB(ctx, conversationID, userID, seq); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *msgTransferDatabase) MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error) {
|
||||
|
@ -1,17 +0,0 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||
"github.com/openimsdk/tools/db/pagination"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
)
|
||||
|
||||
type Application interface {
|
||||
LatestVersion(ctx context.Context, platform string) (*model.Application, error)
|
||||
AddVersion(ctx context.Context, val *model.Application) error
|
||||
UpdateVersion(ctx context.Context, id primitive.ObjectID, update map[string]any) error
|
||||
DeleteVersion(ctx context.Context, id []primitive.ObjectID) error
|
||||
PageVersion(ctx context.Context, platforms []string, page pagination.Pagination) (int64, []*model.Application, error)
|
||||
FindPlatform(ctx context.Context, id []primitive.ObjectID) ([]string, error)
|
||||
}
|
@ -1,82 +0,0 @@
|
||||
package mgo
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||
"github.com/openimsdk/tools/db/mongoutil"
|
||||
"github.com/openimsdk/tools/db/pagination"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
)
|
||||
|
||||
func NewApplicationMgo(db *mongo.Database) (*ApplicationMgo, error) {
|
||||
coll := db.Collection("application")
|
||||
_, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
|
||||
{
|
||||
Keys: bson.D{
|
||||
{Key: "platform", Value: 1},
|
||||
{Key: "version", Value: 1},
|
||||
},
|
||||
Options: options.Index().SetUnique(true),
|
||||
},
|
||||
{
|
||||
Keys: bson.D{
|
||||
{Key: "latest", Value: -1},
|
||||
},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &ApplicationMgo{coll: coll}, nil
|
||||
}
|
||||
|
||||
type ApplicationMgo struct {
|
||||
coll *mongo.Collection
|
||||
}
|
||||
|
||||
func (a *ApplicationMgo) sort() any {
|
||||
return bson.D{{"latest", -1}, {"_id", -1}}
|
||||
}
|
||||
|
||||
func (a *ApplicationMgo) LatestVersion(ctx context.Context, platform string) (*model.Application, error) {
|
||||
return mongoutil.FindOne[*model.Application](ctx, a.coll, bson.M{"platform": platform}, options.FindOne().SetSort(a.sort()))
|
||||
}
|
||||
|
||||
func (a *ApplicationMgo) AddVersion(ctx context.Context, val *model.Application) error {
|
||||
if val.ID.IsZero() {
|
||||
val.ID = primitive.NewObjectID()
|
||||
}
|
||||
return mongoutil.InsertMany(ctx, a.coll, []*model.Application{val})
|
||||
}
|
||||
|
||||
func (a *ApplicationMgo) UpdateVersion(ctx context.Context, id primitive.ObjectID, update map[string]any) error {
|
||||
if len(update) == 0 {
|
||||
return nil
|
||||
}
|
||||
return mongoutil.UpdateOne(ctx, a.coll, bson.M{"_id": id}, bson.M{"$set": update}, true)
|
||||
}
|
||||
|
||||
func (a *ApplicationMgo) DeleteVersion(ctx context.Context, id []primitive.ObjectID) error {
|
||||
if len(id) == 0 {
|
||||
return nil
|
||||
}
|
||||
return mongoutil.DeleteMany(ctx, a.coll, bson.M{"_id": bson.M{"$in": id}})
|
||||
}
|
||||
|
||||
func (a *ApplicationMgo) PageVersion(ctx context.Context, platforms []string, page pagination.Pagination) (int64, []*model.Application, error) {
|
||||
filter := bson.M{}
|
||||
if len(platforms) > 0 {
|
||||
filter["platform"] = bson.M{"$in": platforms}
|
||||
}
|
||||
return mongoutil.FindPage[*model.Application](ctx, a.coll, filter, page, options.Find().SetSort(a.sort()))
|
||||
}
|
||||
|
||||
func (a *ApplicationMgo) FindPlatform(ctx context.Context, id []primitive.ObjectID) ([]string, error) {
|
||||
if len(id) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
return mongoutil.Find[string](ctx, a.coll, bson.M{"_id": bson.M{"$in": id}}, options.Find().SetProjection(bson.M{"_id": 0, "platform": 1}))
|
||||
}
|
@ -8,6 +8,7 @@ import (
|
||||
type Application struct {
|
||||
ID primitive.ObjectID `bson:"_id"`
|
||||
Platform string `bson:"platform"`
|
||||
Hot bool `bson:"hot"`
|
||||
Version string `bson:"version"`
|
||||
Url string `bson:"url"`
|
||||
Text string `bson:"text"`
|
||||
|
Loading…
x
Reference in New Issue
Block a user