diff --git a/.github/workflows/cleanup-after-milestone-prs-merged.yml b/.github/workflows/cleanup-after-milestone-prs-merged.yml new file mode 100644 index 000000000..8a3e381d6 --- /dev/null +++ b/.github/workflows/cleanup-after-milestone-prs-merged.yml @@ -0,0 +1,65 @@ +name: Cleanup After Milestone PRs Merged + +on: + pull_request: + types: + - closed + +jobs: + handle_pr: + runs-on: ubuntu-latest + + steps: + - name: Checkout repository + uses: actions/checkout@v4.2.0 + + - name: Get the PR title and extract PR numbers + id: extract_pr_numbers + run: | + # Get the PR title + PR_TITLE="${{ github.event.pull_request.title }}" + + echo "PR Title: $PR_TITLE" + + # Extract PR numbers from the title + PR_NUMBERS=$(echo "$PR_TITLE" | grep -oE "#[0-9]+" | tr -d '#' | tr '\n' ' ') + echo "Extracted PR Numbers: $PR_NUMBERS" + + # Save PR numbers to a file + echo "$PR_NUMBERS" > pr_numbers.txt + echo "Saved PR Numbers to pr_numbers.txt" + + # Check if the title matches a specific pattern + if echo "$PR_TITLE" | grep -qE "^deps: Merge( #[0-9]+)+ PRs into .+"; then + echo "proceed=true" >> $GITHUB_OUTPUT + else + echo "proceed=false" >> $GITHUB_OUTPUT + fi + + - name: Use extracted PR numbers and label PRs + if: (steps.extract_pr_numbers.outputs.proceed == 'true' || contains(github.event.pull_request.labels.*.name, 'milestone-merge')) && github.event.pull_request.merged == true + run: | + # Read the previously saved PR numbers + PR_NUMBERS=$(cat pr_numbers.txt) + echo "Using extracted PR Numbers: $PR_NUMBERS" + + # Loop through each PR number and add label + for PR_NUMBER in $PR_NUMBERS; do + echo "Adding 'cherry-picked' label to PR #$PR_NUMBER" + curl -X POST \ + -H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" \ + -H "Accept: application/vnd.github+json" \ + https://api.github.com/repos/${{ github.repository }}/issues/$PR_NUMBER/labels \ + -d '{"labels":["cherry-picked"]}' + done + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + + - name: Delete branch after PR close + if: steps.extract_pr_numbers.outputs.proceed == 'true' || contains(github.event.pull_request.labels.*.name, 'milestone-merge') + run: | + BRANCH_NAME="${{ github.event.pull_request.head.ref }}" + echo "Branch to delete: $BRANCH_NAME" + git push origin --delete "$BRANCH_NAME" + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} \ No newline at end of file diff --git a/.github/workflows/go-build-test.yml b/.github/workflows/go-build-test.yml index 1ed8f0397..5b37bf47d 100644 --- a/.github/workflows/go-build-test.yml +++ b/.github/workflows/go-build-test.yml @@ -2,11 +2,7 @@ name: Go Build Test on: push: - branches: - - main pull_request: - branches: - - main paths-ignore: - '**/*.md' diff --git a/.github/workflows/merge-from-milestone.yml b/.github/workflows/merge-from-milestone.yml new file mode 100644 index 000000000..44b4f81f4 --- /dev/null +++ b/.github/workflows/merge-from-milestone.yml @@ -0,0 +1,218 @@ +name: Create Pre-Release PR from Milestone + +permissions: + contents: write + pull-requests: write + issues: write + +on: + workflow_dispatch: + inputs: + milestone_name: + description: 'Milestone name to collect closed PRs from' + required: true + default: 'v3.8.2' + target_branch: + description: 'Target branch to merge the consolidated PR' + required: true + default: 'pre-release-v3.8.2' + +env: + MILESTONE_NAME: ${{ github.event.inputs.milestone_name || 'v3.8.2' }} + TARGET_BRANCH: ${{ github.event.inputs.target_branch || 'pre-release-v3.8.2' }} + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + BOT_TOKEN: ${{ secrets.BOT_TOKEN }} + LABEL_NAME: cherry-picked + TEMP_DIR: /tmp # Using /tmp as the temporary directory + +jobs: + cherry_pick_milestone_prs: + runs-on: ubuntu-latest + steps: + - name: Setup temp directory + run: | + # Create the temporary directory and initialize necessary files + mkdir -p ${{ env.TEMP_DIR }} + touch ${{ env.TEMP_DIR }}/pr_numbers.txt + touch ${{ env.TEMP_DIR }}/commit_hashes.txt + touch ${{ env.TEMP_DIR }}/pr_title.txt + touch ${{ env.TEMP_DIR }}/pr_body.txt + touch ${{ env.TEMP_DIR }}/created_pr_number.txt + + - name: Checkout repository + uses: actions/checkout@v4 + with: + fetch-depth: 0 + token: ${{ secrets.BOT_TOKEN }} + + - name: Setup Git User for OpenIM-Robot + run: | + # Set up Git credentials for the bot + git config --global user.email "OpenIM-Robot@users.noreply.github.com" + git config --global user.name "OpenIM-Robot" + + - name: Fetch Milestone ID and Filter PR Numbers + env: + MILESTONE_NAME: ${{ env.MILESTONE_NAME }} + run: | + # Fetch milestone details and extract milestone ID + milestones=$(curl -s -H "Authorization: token $BOT_TOKEN" \ + -H "Accept: application/vnd.github+json" \ + "https://api.github.com/repos/${{ github.repository }}/milestones") + milestone_id=$(echo "$milestones" | grep -B3 "\"title\": \"$MILESTONE_NAME\"" | grep '"number":' | head -n1 | grep -o '[0-9]\+') + if [ -z "$milestone_id" ]; then + echo "Milestone '$MILESTONE_NAME' not found. Exiting." + exit 1 + fi + echo "Milestone ID: $milestone_id" + echo "MILESTONE_ID=$milestone_id" >> $GITHUB_ENV + + # Fetch issues for the milestone + issues=$(curl -s -H "Authorization: token $BOT_TOKEN" \ + -H "Accept: application/vnd.github+json" \ + "https://api.github.com/repos/${{ github.repository }}/issues?milestone=$milestone_id&state=closed&per_page=100") + + > ${{ env.TEMP_DIR }}/pr_numbers.txt + + # Filter PRs that do not have the 'cherry-picked' label + for pr_number in $(echo "$issues" | jq -r '.[] | select(.pull_request != null) | .number'); do + labels=$(curl -s -H "Authorization: token $BOT_TOKEN" \ + -H "Accept: application/vnd.github+json" \ + "https://api.github.com/repos/${{ github.repository }}/issues/$pr_number/labels" | jq -r '.[].name') + + if ! echo "$labels" | grep -q "${LABEL_NAME}"; then + echo "PR #$pr_number does not have the 'cherry-picked' label. Adding to the list." + echo "$pr_number" >> ${{ env.TEMP_DIR }}/pr_numbers.txt + else + echo "PR #$pr_number already has the 'cherry-picked' label. Skipping." + fi + done + + # Sort the filtered PR numbers + sort -n ${{ env.TEMP_DIR }}/pr_numbers.txt -o ${{ env.TEMP_DIR }}/pr_numbers.txt + + echo "Filtered and sorted PR numbers:" + cat ${{ env.TEMP_DIR }}/pr_numbers.txt || echo "No closed PR numbers found for milestone." + + - name: Fetch Merge Commits for PRs and Generate Title and Body + run: | + # Ensure the files are initialized + > ${{ env.TEMP_DIR }}/commit_hashes.txt + > ${{ env.TEMP_DIR }}/pr_title.txt + > ${{ env.TEMP_DIR }}/pr_body.txt + + # Write description to the PR body + echo "### Description:" >> ${{ env.TEMP_DIR }}/pr_body.txt + echo "Merging PRs from milestone \`$MILESTONE_NAME\` into target branch \`$TARGET_BRANCH\`." >> ${{ env.TEMP_DIR }}/pr_body.txt + echo "" >> ${{ env.TEMP_DIR }}/pr_body.txt + echo "### Need Merge PRs:" >> ${{ env.TEMP_DIR }}/pr_body.txt + + pr_numbers_in_title="" + + # Process sorted PR numbers and generate commit hashes + for pr_number in $(cat ${{ env.TEMP_DIR }}/pr_numbers.txt); do + echo "Processing PR #$pr_number" + pr_details=$(curl -s -H "Authorization: token $BOT_TOKEN" \ + -H "Accept: application/vnd.github+json" \ + "https://api.github.com/repos/${{ github.repository }}/pulls/$pr_number") + pr_title=$(echo "$pr_details" | jq -r '.title') + merge_commit=$(echo "$pr_details" | jq -r '.merge_commit_sha') + short_commit_hash=$(echo "$merge_commit" | cut -c 1-7) + + # Append PR details to the body + echo "- $pr_title: (#$pr_number) ($short_commit_hash)" >> ${{ env.TEMP_DIR }}/pr_body.txt + + if [ "$merge_commit" != "null" ];then + echo "$merge_commit" >> ${{ env.TEMP_DIR }}/commit_hashes.txt + echo "#$pr_number" >> ${{ env.TEMP_DIR }}/pr_title.txt + pr_numbers_in_title="$pr_numbers_in_title #$pr_number" + fi + done + + commit_hashes=$(cat ${{ env.TEMP_DIR }}/commit_hashes.txt | tr '\n' ' ') + first_commit_hash=$(head -n 1 ${{ env.TEMP_DIR }}/commit_hashes.txt) + cherry_pick_branch="cherry-pick-${first_commit_hash:0:7}" + echo "COMMIT_HASHES=$commit_hashes" >> $GITHUB_ENV + echo "CHERRY_PICK_BRANCH=$cherry_pick_branch" >> $GITHUB_ENV + echo "pr_numbers_in_title=$pr_numbers_in_title" >> $GITHUB_ENV + + - name: Pull and Cherry-pick Commits, Then Push + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + BOT_TOKEN: ${{ secrets.BOT_TOKEN }} + run: | + # Fetch and pull the latest changes from the target branch + git fetch origin + git checkout $TARGET_BRANCH + git pull origin $TARGET_BRANCH + + # Create a new branch for cherry-picking + git checkout -b $CHERRY_PICK_BRANCH + + # Cherry-pick the commits and handle conflicts + for commit_hash in $COMMIT_HASHES; do + echo "Attempting to cherry-pick commit $commit_hash" + if ! git cherry-pick "$commit_hash" --strategy=recursive -X theirs; then + echo "Conflict detected for $commit_hash. Resolving with incoming changes." + conflict_files=$(git diff --name-only --diff-filter=U) + echo "Conflicting files:" + echo "$conflict_files" + + for file in $conflict_files; do + if [ -f "$file" ]; then + echo "Resolving conflict for $file" + git add "$file" + else + echo "File $file has been deleted. Skipping." + git rm "$file" + fi + done + + echo "Conflicts resolved. Continuing cherry-pick." + git cherry-pick --continue + else + echo "Cherry-pick successful for commit $commit_hash." + fi + done + + # Push the cherry-pick branch to the repository + git remote set-url origin "https://${BOT_TOKEN}@github.com/${{ github.repository }}.git" + git push origin $CHERRY_PICK_BRANCH --force + + - name: Create Pull Request + run: | + # Prepare and create the PR + pr_title="deps: Merge ${{ env.pr_numbers_in_title }} PRs into $TARGET_BRANCH" + pr_body=$(cat ${{ env.TEMP_DIR }}/pr_body.txt) + + echo "Prepared PR title:" + echo "$pr_title" + echo "Prepared PR body:" + echo "$pr_body" + + # Create the PR using the GitHub API + response=$(curl -s -X POST -H "Authorization: token $BOT_TOKEN" \ + -H "Accept: application/vnd.github+json" \ + https://api.github.com/repos/${{ github.repository }}/pulls \ + -d "$(jq -n --arg title "$pr_title" \ + --arg head "$CHERRY_PICK_BRANCH" \ + --arg base "$TARGET_BRANCH" \ + --arg body "$pr_body" \ + '{title: $title, head: $head, base: $base, body: $body}')") + + pr_number=$(echo "$response" | jq -r '.number') + echo "$pr_number" > ${{ env.TEMP_DIR }}/created_pr_number.txt + echo "Created PR #$pr_number" + + - name: Add Label to Created Pull Request + run: | + # Add 'milestone-merge' label to the created PR + pr_number=$(cat ${{ env.TEMP_DIR }}/created_pr_number.txt) + echo "Adding label to PR #$pr_number" + + curl -s -X POST -H "Authorization: token $GITHUB_TOKEN" \ + -H "Accept: application/vnd.github+json" \ + -d '{"labels": ["milestone-merge"]}' \ + "https://api.github.com/repos/${{ github.repository }}/issues/$pr_number/labels" + + echo "Added 'milestone-merge' label to PR #$pr_number." diff --git a/config/notification.yml b/config/notification.yml index 85ca91af1..ba5ca1c21 100644 --- a/config/notification.yml +++ b/config/notification.yml @@ -1,20 +1,3 @@ -# Copyright © 2023 OpenIM. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Determines if a message should be sent. If set to false, it triggers a silent sync without a message. If true, it requires triggering a conversation. -# For rpc notification, send twice: once as a message and once as a notification. -# The options field 'isNotification' indicates if it's a notification. groupCreated: isSendMsg: true # Reliability level of the message sending. @@ -309,9 +292,9 @@ userInfoUpdated: unreadCount: false offlinePush: enable: true - title: Remove a blocked user - desc: Remove a blocked user - ext: Remove a blocked user + title: userInfo updated + desc: userInfo updated + ext: userInfo updated userStatusChanged: isSendMsg: false diff --git a/go.mod b/go.mod index b982bc7d0..5e5a8b5be 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.72-alpha.46 + github.com/openimsdk/protocol v0.0.72-alpha.47 github.com/openimsdk/tools v0.0.50-alpha.16 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 diff --git a/go.sum b/go.sum index df9cf5194..53109c890 100644 --- a/go.sum +++ b/go.sum @@ -319,8 +319,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.72-alpha.46 h1:1LZlfEHLzw1F4afFmqBczmXKJWm5rUQ+yr8rJ4oyEAc= -github.com/openimsdk/protocol v0.0.72-alpha.46/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= +github.com/openimsdk/protocol v0.0.72-alpha.47 h1:FGHnEwsA05GxT3vnz7YH3fbVkuoO3P71ZZgkQQ71MjA= +github.com/openimsdk/protocol v0.0.72-alpha.47/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= github.com/openimsdk/tools v0.0.50-alpha.16 h1:bC1AQvJMuOHtZm8LZRvN8L5mH1Ws2VYdL+TLTs1iGSc= github.com/openimsdk/tools v0.0.50-alpha.16/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= diff --git a/internal/api/router.go b/internal/api/router.go index 560516d30..17c998912 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -198,6 +198,13 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En objectGroup.POST("/initiate_form_data", t.InitiateFormData) objectGroup.POST("/complete_form_data", t.CompleteFormData) objectGroup.GET("/*name", t.ObjectRedirect) + + applicationGroup := r.Group("application") + applicationGroup.POST("/add_version", t.AddApplicationVersion) + applicationGroup.POST("/update_version", t.UpdateApplicationVersion) + applicationGroup.POST("/delete_version", t.DeleteApplicationVersion) + applicationGroup.POST("/latest_version", t.LatestApplicationVersion) + applicationGroup.POST("/page_versions", t.PageApplicationVersion) } // Message msgGroup := r.Group("/msg") @@ -290,4 +297,6 @@ func GinParseToken(authRPC *rpcclient.Auth) gin.HandlerFunc { var Whitelist = []string{ "/auth/get_admin_token", "/auth/parse_token", + "/application/latest_version", + "/application/page_versions", } diff --git a/internal/api/third.go b/internal/api/third.go index 6baa70ee5..56661ba89 100644 --- a/internal/api/third.go +++ b/internal/api/third.go @@ -170,3 +170,23 @@ func (o *ThirdApi) SearchLogs(c *gin.Context) { func (o *ThirdApi) GetPrometheus(c *gin.Context) { c.Redirect(http.StatusFound, o.GrafanaUrl) } + +func (o *ThirdApi) LatestApplicationVersion(c *gin.Context) { + a2r.Call(third.ThirdClient.LatestApplicationVersion, o.Client, c) +} + +func (o *ThirdApi) AddApplicationVersion(c *gin.Context) { + a2r.Call(third.ThirdClient.AddApplicationVersion, o.Client, c) +} + +func (o *ThirdApi) UpdateApplicationVersion(c *gin.Context) { + a2r.Call(third.ThirdClient.UpdateApplicationVersion, o.Client, c) +} + +func (o *ThirdApi) DeleteApplicationVersion(c *gin.Context) { + a2r.Call(third.ThirdClient.DeleteApplicationVersion, o.Client, c) +} + +func (o *ThirdApi) PageApplicationVersion(c *gin.Context) { + a2r.Call(third.ThirdClient.PageApplicationVersion, o.Client, c) +} diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 7dc2ebeea..f11cfde1a 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -128,6 +128,7 @@ func (m *MsgTransfer) Start(index int, config *Config) error { go m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyCH) go m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyMongoCH) + go m.historyCH.HandleUserHasReadSeqMessages(m.ctx) err := m.historyCH.redisMessageBatches.Start() if err != nil { return err @@ -157,12 +158,14 @@ func (m *MsgTransfer) Start(index int, config *Config) error { // graceful close kafka client. m.cancel() m.historyCH.redisMessageBatches.Close() + m.historyCH.Close() m.historyCH.historyConsumerGroup.Close() m.historyMongoCH.historyConsumerGroup.Close() return nil case <-netDone: m.cancel() m.historyCH.redisMessageBatches.Close() + m.historyCH.Close() m.historyCH.historyConsumerGroup.Close() m.historyMongoCH.historyConsumerGroup.Close() close(netDone) diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index b0078649c..84453c8df 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -18,8 +18,10 @@ import ( "context" "encoding/json" "errors" + "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "strconv" "strings" + "sync" "time" "github.com/IBM/sarama" @@ -40,11 +42,12 @@ import ( ) const ( - size = 500 - mainDataBuffer = 500 - subChanBuffer = 50 - worker = 50 - interval = 100 * time.Millisecond + size = 500 + mainDataBuffer = 500 + subChanBuffer = 50 + worker = 50 + interval = 100 * time.Millisecond + hasReadChanBuffer = 1000 ) type ContextMsg struct { @@ -52,14 +55,23 @@ type ContextMsg struct { ctx context.Context } +// This structure is used for asynchronously writing the sender’s read sequence (seq) regarding a message into MongoDB. +// For example, if the sender sends a message with a seq of 10, then their own read seq for this conversation should be set to 10. +type userHasReadSeq struct { + conversationID string + userHasReadMap map[string]int64 +} + type OnlineHistoryRedisConsumerHandler struct { historyConsumerGroup *kafka.MConsumerGroup redisMessageBatches *batcher.Batcher[sarama.ConsumerMessage] - msgTransferDatabase controller.MsgTransferDatabase - conversationRpcClient *rpcclient.ConversationRpcClient - groupRpcClient *rpcclient.GroupRpcClient + msgTransferDatabase controller.MsgTransferDatabase + conversationRpcClient *rpcclient.ConversationRpcClient + groupRpcClient *rpcclient.GroupRpcClient + conversationUserHasReadChan chan *userHasReadSeq + wg sync.WaitGroup } func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database controller.MsgTransferDatabase, @@ -70,6 +82,8 @@ func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database cont } var och OnlineHistoryRedisConsumerHandler och.msgTransferDatabase = database + och.conversationUserHasReadChan = make(chan *userHasReadSeq, hasReadChanBuffer) + och.wg.Add(1) b := batcher.New[sarama.ConsumerMessage]( batcher.WithSize(size), @@ -115,25 +129,25 @@ func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID } func (och *OnlineHistoryRedisConsumerHandler) doSetReadSeq(ctx context.Context, msgs []*ContextMsg) { - type seqKey struct { - conversationID string - userID string - } - var readSeq map[seqKey]int64 + + var conversationID string + var userSeqMap map[string]int64 for _, msg := range msgs { if msg.message.ContentType != constant.HasReadReceipt { continue } var elem sdkws.NotificationElem if err := json.Unmarshal(msg.message.Content, &elem); err != nil { - log.ZError(ctx, "handlerConversationRead Unmarshal NotificationElem msg err", err, "msg", msg) + log.ZWarn(ctx, "handlerConversationRead Unmarshal NotificationElem msg err", err, "msg", msg) continue } var tips sdkws.MarkAsReadTips if err := json.Unmarshal([]byte(elem.Detail), &tips); err != nil { - log.ZError(ctx, "handlerConversationRead Unmarshal MarkAsReadTips msg err", err, "msg", msg) + log.ZWarn(ctx, "handlerConversationRead Unmarshal MarkAsReadTips msg err", err, "msg", msg) continue } + //The conversation ID for each batch of messages processed by the batcher is the same. + conversationID = tips.ConversationID if len(tips.Seqs) > 0 { for _, seq := range tips.Seqs { if tips.HasReadSeq < seq { @@ -146,26 +160,25 @@ func (och *OnlineHistoryRedisConsumerHandler) doSetReadSeq(ctx context.Context, if tips.HasReadSeq < 0 { continue } - if readSeq == nil { - readSeq = make(map[seqKey]int64) + if userSeqMap == nil { + userSeqMap = make(map[string]int64) } - key := seqKey{ - conversationID: tips.ConversationID, - userID: tips.MarkAsReadUserID, - } - if readSeq[key] > tips.HasReadSeq { + + if userSeqMap[tips.MarkAsReadUserID] > tips.HasReadSeq { continue } - readSeq[key] = tips.HasReadSeq + userSeqMap[tips.MarkAsReadUserID] = tips.HasReadSeq } - if readSeq == nil { + if userSeqMap == nil { return } - for key, seq := range readSeq { - if err := och.msgTransferDatabase.SetHasReadSeqToDB(ctx, key.userID, key.conversationID, seq); err != nil { - log.ZError(ctx, "set read seq to db error", err, "userID", key.userID, "conversationID", key.conversationID, "seq", seq) - } + if len(conversationID) == 0 { + log.ZWarn(ctx, "conversation err", nil, "conversationID", conversationID) } + if err := och.msgTransferDatabase.SetHasReadSeqToDB(ctx, conversationID, userSeqMap); err != nil { + log.ZWarn(ctx, "set read seq to db error", err, "conversationID", conversationID, "userSeqMap", userSeqMap) + } + } func (och *OnlineHistoryRedisConsumerHandler) parseConsumerMessages(ctx context.Context, consumerMessages []*sarama.ConsumerMessage) []*ContextMsg { @@ -250,12 +263,21 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key } if len(storageMessageList) > 0 { msg := storageMessageList[0] - lastSeq, isNewConversation, err := och.msgTransferDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList) + lastSeq, isNewConversation, userSeqMap, err := och.msgTransferDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList) if err != nil && !errors.Is(errs.Unwrap(err), redis.Nil) { - log.ZError(ctx, "batch data insert to redis err", err, "storageMsgList", storageMessageList) + log.ZWarn(ctx, "batch data insert to redis err", err, "storageMsgList", storageMessageList) return } log.ZInfo(ctx, "BatchInsertChat2Cache end") + err = och.msgTransferDatabase.SetHasReadSeqs(ctx, conversationID, userSeqMap) + if err != nil { + log.ZWarn(ctx, "SetHasReadSeqs error", err, "userSeqMap", userSeqMap, "conversationID", conversationID) + prommetrics.SeqSetFailedCounter.Inc() + } + och.conversationUserHasReadChan <- &userHasReadSeq{ + conversationID: conversationID, + userHasReadMap: userSeqMap, + } if isNewConversation { switch msg.SessionType { @@ -308,7 +330,7 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Con storageMessageList = append(storageMessageList, msg.message) } if len(storageMessageList) > 0 { - lastSeq, _, err := och.msgTransferDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList) + lastSeq, _, _, err := och.msgTransferDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList) if err != nil { log.ZError(ctx, "notification batch insert to redis error", err, "conversationID", conversationID, "storageList", storageMessageList) @@ -323,6 +345,21 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Con och.toPushTopic(ctx, key, conversationID, storageList) } } +func (och *OnlineHistoryRedisConsumerHandler) HandleUserHasReadSeqMessages(ctx context.Context) { + defer och.wg.Done() + + for msg := range och.conversationUserHasReadChan { + if err := och.msgTransferDatabase.SetHasReadSeqToDB(ctx, msg.conversationID, msg.userHasReadMap); err != nil { + log.ZWarn(ctx, "set read seq to db error", err, "conversationID", msg.conversationID, "userSeqMap", msg.userHasReadMap) + } + } + + log.ZInfo(ctx, "Channel closed, exiting handleUserHasReadSeqMessages") +} +func (och *OnlineHistoryRedisConsumerHandler) Close() { + close(och.conversationUserHasReadChan) + och.wg.Wait() +} func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, key, conversationID string, msgs []*ContextMsg) { for _, v := range msgs { diff --git a/internal/push/callback.go b/internal/push/callback.go index 889729582..f8e17bb8c 100644 --- a/internal/push/callback.go +++ b/internal/push/callback.go @@ -24,7 +24,6 @@ import ( "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/mcontext" - "github.com/openimsdk/tools/utils/datautil" ) func (c *ConsumerHandler) webhookBeforeOfflinePush(ctx context.Context, before *config.BeforeConfig, userIDs []string, msg *sdkws.MsgData, offlinePushUserIDs *[]string) error { @@ -70,7 +69,7 @@ func (c *ConsumerHandler) webhookBeforeOfflinePush(ctx context.Context, before * func (c *ConsumerHandler) webhookBeforeOnlinePush(ctx context.Context, before *config.BeforeConfig, userIDs []string, msg *sdkws.MsgData) error { return webhook.WithCondition(ctx, before, func(ctx context.Context) error { - if datautil.Contain(msg.SendID, userIDs...) || msg.ContentType == constant.Typing { + if msg.ContentType == constant.Typing { return nil } req := callbackstruct.CallbackBeforePushReq{ diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index ef917d539..30ac19a76 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -1026,7 +1026,7 @@ func (g *groupServer) SetGroupInfo(ctx context.Context, req *pbgroup.SetGroupInf } num := len(update) if req.GroupInfoForSet.Notification != "" { - num-- + num -= 3 func() { conversation := &pbconversation.ConversationReq{ ConversationID: msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, req.GroupInfoForSet.GroupID), @@ -1133,8 +1133,9 @@ func (g *groupServer) SetGroupInfoEx(ctx context.Context, req *pbgroup.SetGroupI } num := len(updatedData) + if req.Notification != nil { - num-- + num -= 3 if req.Notification.Value != "" { func() { @@ -1219,7 +1220,7 @@ func (g *groupServer) TransferGroupOwner(ctx context.Context, req *pbgroup.Trans } } - if newOwner.MuteEndTime != time.Unix(0, 0) { + if newOwner.MuteEndTime.After(time.Now()) { if _, err := g.CancelMuteGroupMember(ctx, &pbgroup.CancelMuteGroupMemberReq{ GroupID: group.GroupID, UserID: req.NewOwnerUserID}); err != nil { diff --git a/internal/rpc/third/application.go b/internal/rpc/third/application.go new file mode 100644 index 000000000..a6556055c --- /dev/null +++ b/internal/rpc/third/application.go @@ -0,0 +1,117 @@ +package third + +import ( + "context" + "github.com/openimsdk/open-im-server/v3/pkg/authverify" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/openimsdk/protocol/third" + "github.com/openimsdk/tools/errs" + "github.com/openimsdk/tools/utils/datautil" + "github.com/redis/go-redis/v9" + "go.mongodb.org/mongo-driver/bson/primitive" + "go.mongodb.org/mongo-driver/mongo" + "time" +) + +func IsNotFound(err error) bool { + switch errs.Unwrap(err) { + case redis.Nil, mongo.ErrNoDocuments: + return true + default: + return false + } +} + +func (t *thirdServer) db2pbApplication(val *model.Application) *third.ApplicationVersion { + return &third.ApplicationVersion{ + Id: val.ID.Hex(), + Platform: val.Platform, + Version: val.Version, + Url: val.Url, + Text: val.Text, + Force: val.Force, + Latest: val.Latest, + CreateTime: val.CreateTime.UnixMilli(), + } +} + +func (t *thirdServer) LatestApplicationVersion(ctx context.Context, req *third.LatestApplicationVersionReq) (*third.LatestApplicationVersionResp, error) { + res, err := t.applicationDatabase.LatestVersion(ctx, req.Platform) + if err == nil { + return &third.LatestApplicationVersionResp{Version: t.db2pbApplication(res)}, nil + } else if IsNotFound(err) { + return &third.LatestApplicationVersionResp{}, nil + } else { + return nil, err + } +} + +func (t *thirdServer) AddApplicationVersion(ctx context.Context, req *third.AddApplicationVersionReq) (*third.AddApplicationVersionResp, error) { + if err := authverify.CheckAdmin(ctx, t.config.Share.IMAdminUserID); err != nil { + return nil, err + } + val := &model.Application{ + ID: primitive.NewObjectID(), + Platform: req.Platform, + Version: req.Version, + Url: req.Url, + Text: req.Text, + Force: req.Force, + Latest: req.Latest, + CreateTime: time.Now(), + } + if err := t.applicationDatabase.AddVersion(ctx, val); err != nil { + return nil, err + } + return &third.AddApplicationVersionResp{}, nil +} + +func (t *thirdServer) UpdateApplicationVersion(ctx context.Context, req *third.UpdateApplicationVersionReq) (*third.UpdateApplicationVersionResp, error) { + if err := authverify.CheckAdmin(ctx, t.config.Share.IMAdminUserID); err != nil { + return nil, err + } + oid, err := primitive.ObjectIDFromHex(req.Id) + if err != nil { + return nil, errs.ErrArgs.WrapMsg("invalid id " + err.Error()) + } + update := make(map[string]any) + putUpdate(update, "platform", req.Platform) + putUpdate(update, "version", req.Version) + putUpdate(update, "url", req.Url) + putUpdate(update, "text", req.Text) + putUpdate(update, "force", req.Force) + putUpdate(update, "latest", req.Latest) + if err := t.applicationDatabase.UpdateVersion(ctx, oid, update); err != nil { + return nil, err + } + return &third.UpdateApplicationVersionResp{}, nil +} + +func (t *thirdServer) DeleteApplicationVersion(ctx context.Context, req *third.DeleteApplicationVersionReq) (*third.DeleteApplicationVersionResp, error) { + if err := authverify.CheckAdmin(ctx, t.config.Share.IMAdminUserID); err != nil { + return nil, err + } + ids := make([]primitive.ObjectID, 0, len(req.Id)) + for _, id := range req.Id { + oid, err := primitive.ObjectIDFromHex(id) + if err != nil { + return nil, errs.ErrArgs.WrapMsg("invalid id " + err.Error()) + } + ids = append(ids, oid) + } + if err := t.applicationDatabase.DeleteVersion(ctx, ids); err != nil { + return nil, err + } + return &third.DeleteApplicationVersionResp{}, nil +} + +func (t *thirdServer) PageApplicationVersion(ctx context.Context, req *third.PageApplicationVersionReq) (*third.PageApplicationVersionResp, error) { + total, res, err := t.applicationDatabase.PageVersion(ctx, req.Platform, req.Pagination) + if err != nil { + return nil, err + } + return &third.PageApplicationVersionResp{ + Total: total, + Versions: datautil.Slice(res, t.db2pbApplication), + }, nil +} diff --git a/internal/rpc/third/third.go b/internal/rpc/third/third.go index 0eeaaa314..c6b588d8d 100644 --- a/internal/rpc/third/third.go +++ b/internal/rpc/third/third.go @@ -38,12 +38,13 @@ import ( ) type thirdServer struct { - thirdDatabase controller.ThirdDatabase - s3dataBase controller.S3Database - userRpcClient rpcclient.UserRpcClient - defaultExpire time.Duration - config *Config - minio *minio.Minio + thirdDatabase controller.ThirdDatabase + s3dataBase controller.S3Database + userRpcClient rpcclient.UserRpcClient + defaultExpire time.Duration + config *Config + minio *minio.Minio + applicationDatabase controller.ApplicationDatabase } type Config struct { @@ -74,6 +75,11 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg if err != nil { return err } + applicationMgo, err := mgo.NewApplicationMgo(mgocli.GetDB()) + if err != nil { + return err + } + // Select the oss method according to the profile policy enable := config.RpcConfig.Object.Enable var ( @@ -98,12 +104,13 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg } localcache.InitLocalCache(&config.LocalCacheConfig) third.RegisterThirdServer(server, &thirdServer{ - thirdDatabase: controller.NewThirdDatabase(redis.NewThirdCache(rdb), logdb), - userRpcClient: rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID), - s3dataBase: controller.NewS3Database(rdb, o, s3db), - defaultExpire: time.Hour * 24 * 7, - config: config, - minio: minioCli, + thirdDatabase: controller.NewThirdDatabase(redis.NewThirdCache(rdb), logdb), + userRpcClient: rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID), + s3dataBase: controller.NewS3Database(rdb, o, s3db), + defaultExpire: time.Hour * 24 * 7, + config: config, + minio: minioCli, + applicationDatabase: controller.NewApplicationDatabase(applicationMgo, redis.NewApplicationRedisCache(applicationMgo, rdb)), }) return nil } diff --git a/internal/rpc/third/tool.go b/internal/rpc/third/tool.go index ac4be3968..4e22ffbf9 100644 --- a/internal/rpc/third/tool.go +++ b/internal/rpc/third/tool.go @@ -82,3 +82,11 @@ func checkValidObjectName(objectName string) error { func (t *thirdServer) IsManagerUserID(opUserID string) bool { return authverify.IsManagerUserID(opUserID, t.config.Share.IMAdminUserID) } + +func putUpdate[T any](update map[string]any, name string, val interface{ GetValuePtr() *T }) { + ptrVal := val.GetValuePtr() + if ptrVal == nil { + return + } + update[name] = *ptrVal +} diff --git a/pkg/common/storage/cache/application.go b/pkg/common/storage/cache/application.go new file mode 100644 index 000000000..588732ec8 --- /dev/null +++ b/pkg/common/storage/cache/application.go @@ -0,0 +1,11 @@ +package cache + +import ( + "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" +) + +type ApplicationCache interface { + LatestVersion(ctx context.Context, platform string) (*model.Application, error) + DeleteCache(ctx context.Context, platforms []string) error +} diff --git a/pkg/common/storage/cache/cachekey/application.go b/pkg/common/storage/cache/cachekey/application.go new file mode 100644 index 000000000..032adba3c --- /dev/null +++ b/pkg/common/storage/cache/cachekey/application.go @@ -0,0 +1,9 @@ +package cachekey + +const ( + ApplicationLatestVersion = "APPLICATION_LATEST_VERSION:" +) + +func GetApplicationLatestVersionKey(platform string) string { + return ApplicationLatestVersion + platform +} diff --git a/pkg/common/storage/cache/redis/application.go b/pkg/common/storage/cache/redis/application.go new file mode 100644 index 000000000..4a7a4ced6 --- /dev/null +++ b/pkg/common/storage/cache/redis/application.go @@ -0,0 +1,43 @@ +package redis + +import ( + "context" + "github.com/dtm-labs/rockscache" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/openimsdk/tools/utils/datautil" + "github.com/redis/go-redis/v9" + "time" +) + +func NewApplicationRedisCache(db database.Application, rdb redis.UniversalClient) *ApplicationRedisCache { + return &ApplicationRedisCache{ + db: db, + rcClient: rockscache.NewClient(rdb, *GetRocksCacheOptions()), + deleter: NewBatchDeleterRedis(rdb, GetRocksCacheOptions(), nil), + expireTime: time.Hour * 24 * 7, + } +} + +type ApplicationRedisCache struct { + db database.Application + rcClient *rockscache.Client + deleter *BatchDeleterRedis + expireTime time.Duration +} + +func (a *ApplicationRedisCache) LatestVersion(ctx context.Context, platform string) (*model.Application, error) { + return getCache(ctx, a.rcClient, cachekey.GetApplicationLatestVersionKey(platform), a.expireTime, func(ctx context.Context) (*model.Application, error) { + return a.db.LatestVersion(ctx, platform) + }) +} + +func (a *ApplicationRedisCache) DeleteCache(ctx context.Context, platforms []string) error { + if len(platforms) == 0 { + return nil + } + return a.deleter.ExecDelWithKeys(ctx, datautil.Slice(platforms, func(platform string) string { + return cachekey.GetApplicationLatestVersionKey(platform) + })) +} diff --git a/pkg/common/storage/controller/application.go b/pkg/common/storage/controller/application.go new file mode 100644 index 000000000..72bca07ef --- /dev/null +++ b/pkg/common/storage/controller/application.go @@ -0,0 +1,69 @@ +package controller + +import ( + "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/openimsdk/tools/db/pagination" + "go.mongodb.org/mongo-driver/bson/primitive" +) + +type ApplicationDatabase interface { + LatestVersion(ctx context.Context, platform string) (*model.Application, error) + AddVersion(ctx context.Context, val *model.Application) error + UpdateVersion(ctx context.Context, id primitive.ObjectID, update map[string]any) error + DeleteVersion(ctx context.Context, id []primitive.ObjectID) error + PageVersion(ctx context.Context, platforms []string, page pagination.Pagination) (int64, []*model.Application, error) +} + +func NewApplicationDatabase(db database.Application, cache cache.ApplicationCache) ApplicationDatabase { + return &applicationDatabase{db: db, cache: cache} +} + +type applicationDatabase struct { + db database.Application + cache cache.ApplicationCache +} + +func (a *applicationDatabase) LatestVersion(ctx context.Context, platform string) (*model.Application, error) { + return a.cache.LatestVersion(ctx, platform) +} + +func (a *applicationDatabase) AddVersion(ctx context.Context, val *model.Application) error { + if err := a.db.AddVersion(ctx, val); err != nil { + return err + } + return a.cache.DeleteCache(ctx, []string{val.Platform}) +} + +func (a *applicationDatabase) UpdateVersion(ctx context.Context, id primitive.ObjectID, update map[string]any) error { + platforms, err := a.db.FindPlatform(ctx, []primitive.ObjectID{id}) + if err != nil { + return err + } + if err := a.db.UpdateVersion(ctx, id, update); err != nil { + return err + } + if p, ok := update["platform"]; ok { + if val, ok := p.(string); ok { + platforms = append(platforms, val) + } + } + return a.cache.DeleteCache(ctx, platforms) +} + +func (a *applicationDatabase) DeleteVersion(ctx context.Context, id []primitive.ObjectID) error { + platforms, err := a.db.FindPlatform(ctx, id) + if err != nil { + return err + } + if err := a.db.DeleteVersion(ctx, id); err != nil { + return err + } + return a.cache.DeleteCache(ctx, platforms) +} + +func (a *applicationDatabase) PageVersion(ctx context.Context, platforms []string, page pagination.Pagination) (int64, []*model.Application, error) { + return a.db.PageVersion(ctx, platforms, page) +} diff --git a/pkg/common/storage/controller/msg_transfer.go b/pkg/common/storage/controller/msg_transfer.go index c6013dbc1..1ecd786aa 100644 --- a/pkg/common/storage/controller/msg_transfer.go +++ b/pkg/common/storage/controller/msg_transfer.go @@ -24,8 +24,11 @@ type MsgTransferDatabase interface { DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error // BatchInsertChat2Cache increments the sequence number and then batch inserts messages into the cache. - BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNewConversation bool, err error) - SetHasReadSeqToDB(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error + BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNewConversation bool, userHasReadMap map[string]int64, err error) + + SetHasReadSeqs(ctx context.Context, conversationID string, userSeqMap map[string]int64) error + + SetHasReadSeqToDB(ctx context.Context, conversationID string, userSeqMap map[string]int64) error // to mq MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error) @@ -219,18 +222,18 @@ func (db *msgTransferDatabase) DeleteMessagesFromCache(ctx context.Context, conv return db.msg.DeleteMessagesFromCache(ctx, conversationID, seqs) } -func (db *msgTransferDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, err error) { +func (db *msgTransferDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, userHasReadMap map[string]int64, err error) { lenList := len(msgs) if int64(lenList) > db.msgTable.GetSingleGocMsgNum() { - return 0, false, errs.New("message count exceeds limit", "limit", db.msgTable.GetSingleGocMsgNum()).Wrap() + return 0, false, nil, errs.New("message count exceeds limit", "limit", db.msgTable.GetSingleGocMsgNum()).Wrap() } if lenList < 1 { - return 0, false, errs.New("no messages to insert", "minCount", 1).Wrap() + return 0, false, nil, errs.New("no messages to insert", "minCount", 1).Wrap() } currentMaxSeq, err := db.seqConversation.Malloc(ctx, conversationID, int64(len(msgs))) if err != nil { log.ZError(ctx, "storage.seq.Malloc", err) - return 0, false, err + return 0, false, nil, err } isNew = currentMaxSeq == 0 lastMaxSeq := currentMaxSeq @@ -248,25 +251,25 @@ func (db *msgTransferDatabase) BatchInsertChat2Cache(ctx context.Context, conver } else { prommetrics.MsgInsertRedisSuccessCounter.Inc() } - err = db.setHasReadSeqs(ctx, conversationID, userSeqMap) - if err != nil { - log.ZError(ctx, "SetHasReadSeqs error", err, "userSeqMap", userSeqMap, "conversationID", conversationID) - prommetrics.SeqSetFailedCounter.Inc() - } - return lastMaxSeq, isNew, errs.Wrap(err) + return lastMaxSeq, isNew, userSeqMap, errs.Wrap(err) } -func (db *msgTransferDatabase) setHasReadSeqs(ctx context.Context, conversationID string, userSeqMap map[string]int64) error { +func (db *msgTransferDatabase) SetHasReadSeqs(ctx context.Context, conversationID string, userSeqMap map[string]int64) error { for userID, seq := range userSeqMap { - if err := db.seqUser.SetUserReadSeqToDB(ctx, conversationID, userID, seq); err != nil { + if err := db.seqUser.SetUserReadSeq(ctx, conversationID, userID, seq); err != nil { return err } } return nil } -func (db *msgTransferDatabase) SetHasReadSeqToDB(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error { - return db.seqUser.SetUserReadSeqToDB(ctx, conversationID, userID, hasReadSeq) +func (db *msgTransferDatabase) SetHasReadSeqToDB(ctx context.Context, conversationID string, userSeqMap map[string]int64) error { + for userID, seq := range userSeqMap { + if err := db.seqUser.SetUserReadSeqToDB(ctx, conversationID, userID, seq); err != nil { + return err + } + } + return nil } func (db *msgTransferDatabase) MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error) { diff --git a/pkg/common/storage/database/application.go b/pkg/common/storage/database/application.go new file mode 100644 index 000000000..c98ae74c8 --- /dev/null +++ b/pkg/common/storage/database/application.go @@ -0,0 +1,17 @@ +package database + +import ( + "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/openimsdk/tools/db/pagination" + "go.mongodb.org/mongo-driver/bson/primitive" +) + +type Application interface { + LatestVersion(ctx context.Context, platform string) (*model.Application, error) + AddVersion(ctx context.Context, val *model.Application) error + UpdateVersion(ctx context.Context, id primitive.ObjectID, update map[string]any) error + DeleteVersion(ctx context.Context, id []primitive.ObjectID) error + PageVersion(ctx context.Context, platforms []string, page pagination.Pagination) (int64, []*model.Application, error) + FindPlatform(ctx context.Context, id []primitive.ObjectID) ([]string, error) +} diff --git a/pkg/common/storage/database/mgo/application.go b/pkg/common/storage/database/mgo/application.go new file mode 100644 index 000000000..e59c0560a --- /dev/null +++ b/pkg/common/storage/database/mgo/application.go @@ -0,0 +1,82 @@ +package mgo + +import ( + "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/openimsdk/tools/db/mongoutil" + "github.com/openimsdk/tools/db/pagination" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +func NewApplicationMgo(db *mongo.Database) (*ApplicationMgo, error) { + coll := db.Collection("application") + _, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{ + { + Keys: bson.D{ + {Key: "platform", Value: 1}, + {Key: "version", Value: 1}, + }, + Options: options.Index().SetUnique(true), + }, + { + Keys: bson.D{ + {Key: "latest", Value: -1}, + }, + }, + }) + if err != nil { + return nil, err + } + return &ApplicationMgo{coll: coll}, nil +} + +type ApplicationMgo struct { + coll *mongo.Collection +} + +func (a *ApplicationMgo) sort() any { + return bson.D{{"latest", -1}, {"_id", -1}} +} + +func (a *ApplicationMgo) LatestVersion(ctx context.Context, platform string) (*model.Application, error) { + return mongoutil.FindOne[*model.Application](ctx, a.coll, bson.M{"platform": platform}, options.FindOne().SetSort(a.sort())) +} + +func (a *ApplicationMgo) AddVersion(ctx context.Context, val *model.Application) error { + if val.ID.IsZero() { + val.ID = primitive.NewObjectID() + } + return mongoutil.InsertMany(ctx, a.coll, []*model.Application{val}) +} + +func (a *ApplicationMgo) UpdateVersion(ctx context.Context, id primitive.ObjectID, update map[string]any) error { + if len(update) == 0 { + return nil + } + return mongoutil.UpdateOne(ctx, a.coll, bson.M{"_id": id}, bson.M{"$set": update}, true) +} + +func (a *ApplicationMgo) DeleteVersion(ctx context.Context, id []primitive.ObjectID) error { + if len(id) == 0 { + return nil + } + return mongoutil.DeleteMany(ctx, a.coll, bson.M{"_id": bson.M{"$in": id}}) +} + +func (a *ApplicationMgo) PageVersion(ctx context.Context, platforms []string, page pagination.Pagination) (int64, []*model.Application, error) { + filter := bson.M{} + if len(platforms) > 0 { + filter["platform"] = bson.M{"$in": platforms} + } + return mongoutil.FindPage[*model.Application](ctx, a.coll, filter, page, options.Find().SetSort(a.sort())) +} + +func (a *ApplicationMgo) FindPlatform(ctx context.Context, id []primitive.ObjectID) ([]string, error) { + if len(id) == 0 { + return nil, nil + } + return mongoutil.Find[string](ctx, a.coll, bson.M{"_id": bson.M{"$in": id}}, options.Find().SetProjection(bson.M{"_id": 0, "platform": 1})) +} diff --git a/pkg/common/storage/model/application.go b/pkg/common/storage/model/application.go new file mode 100644 index 000000000..f5bae2be6 --- /dev/null +++ b/pkg/common/storage/model/application.go @@ -0,0 +1,17 @@ +package model + +import ( + "go.mongodb.org/mongo-driver/bson/primitive" + "time" +) + +type Application struct { + ID primitive.ObjectID `bson:"_id"` + Platform string `bson:"platform"` + Version string `bson:"version"` + Url string `bson:"url"` + Text string `bson:"text"` + Force bool `bson:"force"` + Latest bool `bson:"latest"` + CreateTime time.Time `bson:"create_time"` +}