mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-12-11 23:47:32 +08:00
Merge branch 'main' of https://github.com/openimsdk/open-im-server into build/service-limit
This commit is contained in:
commit
abcd4b948c
155
.github/workflows/publish-docker-image.yml
vendored
155
.github/workflows/publish-docker-image.yml
vendored
@ -4,42 +4,80 @@ on:
|
|||||||
push:
|
push:
|
||||||
branches:
|
branches:
|
||||||
- release-*
|
- release-*
|
||||||
# tags:
|
|
||||||
# - 'v*'
|
|
||||||
|
|
||||||
release:
|
release:
|
||||||
types: [published]
|
types: [published]
|
||||||
|
|
||||||
workflow_dispatch:
|
workflow_dispatch:
|
||||||
inputs:
|
inputs:
|
||||||
tag:
|
tag:
|
||||||
description: "Tag version to be used for Docker image"
|
description: "Tag version to be used for Docker image"
|
||||||
required: true
|
required: true
|
||||||
default: "v3.8.0"
|
default: "v3.8.3"
|
||||||
|
|
||||||
|
env:
|
||||||
|
GO_VERSION: "1.22"
|
||||||
|
IMAGE_NAME: "openim-server"
|
||||||
|
# IMAGE_NAME: ${{ github.event.repository.name }}
|
||||||
|
DOCKER_BUILDKIT: 1
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
build-and-test:
|
publish-docker-images:
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
|
if: ${{ !(github.event_name == 'pull_request' && github.event.pull_request.merged == false) }}
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v4
|
- name: Checkout main repository
|
||||||
|
uses: actions/checkout@v4
|
||||||
with:
|
with:
|
||||||
path: main-repo
|
path: main-repo
|
||||||
|
|
||||||
# - name: Set up QEMU
|
- name: Set up QEMU
|
||||||
# uses: docker/setup-qemu-action@v3.3.0
|
uses: docker/setup-qemu-action@v3.3.0
|
||||||
|
|
||||||
- name: Set up Docker Buildx
|
- name: Set up Docker Buildx
|
||||||
uses: docker/setup-buildx-action@v3.8.0
|
id: buildx
|
||||||
|
uses: docker/setup-buildx-action@v3
|
||||||
- name: Build Docker image
|
|
||||||
id: build
|
|
||||||
uses: docker/build-push-action@v5
|
|
||||||
with:
|
with:
|
||||||
context: ./main-repo
|
driver-opts: network=host
|
||||||
load: true
|
|
||||||
tags: "openim/openim-server:local"
|
- name: Extract metadata for Docker
|
||||||
cache-from: type=gha,scope=build
|
id: meta
|
||||||
cache-to: type=gha,mode=max,scope=build
|
uses: docker/metadata-action@v5.6.0
|
||||||
|
with:
|
||||||
|
images: |
|
||||||
|
${{ secrets.DOCKER_USERNAME }}/${{ env.IMAGE_NAME }}
|
||||||
|
ghcr.io/${{ github.repository_owner }}/${{ env.IMAGE_NAME }}
|
||||||
|
registry.cn-hangzhou.aliyuncs.com/openimsdk/${{ env.IMAGE_NAME }}
|
||||||
|
tags: |
|
||||||
|
type=ref,event=tag
|
||||||
|
type=schedule
|
||||||
|
type=ref,event=branch
|
||||||
|
type=ref,event=pr
|
||||||
|
type=semver,pattern={{version}}
|
||||||
|
type=semver,pattern=v{{version}}
|
||||||
|
type=semver,pattern={{major}}.{{minor}}
|
||||||
|
type=semver,pattern={{major}}
|
||||||
|
type=sha
|
||||||
|
|
||||||
|
- name: Install skopeo
|
||||||
|
run: |
|
||||||
|
sudo apt-get update && sudo apt-get install -y skopeo
|
||||||
|
|
||||||
|
- name: Build multi-arch images as OCI
|
||||||
|
run: |
|
||||||
|
mkdir -p /tmp/oci-image /tmp/docker-cache
|
||||||
|
|
||||||
|
# Build multi-architecture image and save in OCI format
|
||||||
|
docker buildx build \
|
||||||
|
--platform linux/amd64,linux/arm64 \
|
||||||
|
--output type=oci,dest=/tmp/oci-image/multi-arch.tar \
|
||||||
|
--cache-to type=local,dest=/tmp/docker-cache \
|
||||||
|
--cache-from type=gha \
|
||||||
|
./main-repo
|
||||||
|
|
||||||
|
# Use skopeo to convert the amd64 image from OCI format to Docker format and load it
|
||||||
|
skopeo copy --override-arch amd64 oci-archive:/tmp/oci-image/multi-arch.tar docker-daemon:${{ secrets.DOCKER_USERNAME }}/${{ env.IMAGE_NAME }}:local
|
||||||
|
|
||||||
|
# check image
|
||||||
|
docker image ls | grep openim
|
||||||
|
|
||||||
- name: Checkout compose repository
|
- name: Checkout compose repository
|
||||||
uses: actions/checkout@v4
|
uses: actions/checkout@v4
|
||||||
@ -52,11 +90,11 @@ jobs:
|
|||||||
run: |
|
run: |
|
||||||
IP=$(hostname -I | awk '{print $1}')
|
IP=$(hostname -I | awk '{print $1}')
|
||||||
echo "The IP Address is: $IP"
|
echo "The IP Address is: $IP"
|
||||||
echo "::set-output name=ip::$IP"
|
echo "ip=$IP" >> $GITHUB_OUTPUT
|
||||||
|
|
||||||
- name: Update .env to use the local image
|
- name: Update .env to use the local image
|
||||||
run: |
|
run: |
|
||||||
sed -i 's|OPENIM_SERVER_IMAGE=.*|OPENIM_SERVER_IMAGE=openim/openim-server:local|' ${{ github.workspace }}/compose-repo/.env
|
sed -i 's|OPENIM_SERVER_IMAGE=.*|OPENIM_SERVER_IMAGE=${{ secrets.DOCKER_USERNAME }}/${{ env.IMAGE_NAME }}:local|' ${{ github.workspace }}/compose-repo/.env
|
||||||
sed -i 's|MINIO_EXTERNAL_ADDRESS=.*|MINIO_EXTERNAL_ADDRESS=http://${{ steps.get-ip.outputs.ip }}:10005|' ${{ github.workspace }}/compose-repo/.env
|
sed -i 's|MINIO_EXTERNAL_ADDRESS=.*|MINIO_EXTERNAL_ADDRESS=http://${{ steps.get-ip.outputs.ip }}:10005|' ${{ github.workspace }}/compose-repo/.env
|
||||||
|
|
||||||
- name: Start services using Docker Compose
|
- name: Start services using Docker Compose
|
||||||
@ -66,23 +104,34 @@ jobs:
|
|||||||
|
|
||||||
docker compose ps
|
docker compose ps
|
||||||
|
|
||||||
- name: Extract metadata for Docker (tags, labels)
|
# - name: Check openim-server health
|
||||||
id: meta
|
# run: |
|
||||||
uses: docker/metadata-action@v5.6.0
|
# timeout=300
|
||||||
with:
|
# interval=30
|
||||||
images: |
|
# elapsed=0
|
||||||
openim/openim-server
|
# while [[ $elapsed -le $timeout ]]; do
|
||||||
ghcr.io/openimsdk/openim-server
|
# if ! docker exec openim-server mage check; then
|
||||||
registry.cn-hangzhou.aliyuncs.com/openimsdk/openim-server
|
# echo "openim-server is not ready, waiting..."
|
||||||
tags: |
|
# sleep $interval
|
||||||
type=ref,event=tag
|
# elapsed=$(($elapsed + $interval))
|
||||||
type=schedule
|
# else
|
||||||
type=ref,event=branch
|
# echo "Health check successful"
|
||||||
# type=semver,pattern={{version}}
|
# exit 0
|
||||||
type=semver,pattern=v{{version}}
|
# fi
|
||||||
type=semver,pattern=release-{{raw}}
|
# done
|
||||||
type=sha
|
# echo "Health check failed after 5 minutes"
|
||||||
type=raw,value=${{ github.event.inputs.tag }}
|
# exit 1
|
||||||
|
|
||||||
|
# - name: Check openim-chat health
|
||||||
|
# if: success()
|
||||||
|
# run: |
|
||||||
|
# if ! docker exec openim-chat mage check; then
|
||||||
|
# echo "openim-chat check failed"
|
||||||
|
# exit 1
|
||||||
|
# else
|
||||||
|
# echo "Health check successful"
|
||||||
|
# exit 0
|
||||||
|
# fi
|
||||||
|
|
||||||
- name: Log in to Docker Hub
|
- name: Log in to Docker Hub
|
||||||
uses: docker/login-action@v3.3.0
|
uses: docker/login-action@v3.3.0
|
||||||
@ -104,22 +153,27 @@ jobs:
|
|||||||
username: ${{ secrets.ALIREGISTRY_USERNAME }}
|
username: ${{ secrets.ALIREGISTRY_USERNAME }}
|
||||||
password: ${{ secrets.ALIREGISTRY_TOKEN }}
|
password: ${{ secrets.ALIREGISTRY_TOKEN }}
|
||||||
|
|
||||||
- name: Push Docker images
|
- name: Push multi-architecture images
|
||||||
uses: docker/build-push-action@v5
|
if: success()
|
||||||
with:
|
run: |
|
||||||
context: ./main-repo
|
docker buildx build \
|
||||||
push: true
|
--platform linux/amd64,linux/arm64 \
|
||||||
platforms: linux/amd64,linux/arm64
|
$(echo "${{ steps.meta.outputs.tags }}" | sed 's/,/ --tag /g' | sed 's/^/--tag /') \
|
||||||
tags: ${{ steps.meta.outputs.tags }}
|
--cache-from type=local,src=/tmp/docker-cache \
|
||||||
labels: ${{ steps.meta.outputs.labels }}
|
--push \
|
||||||
cache-from: type=gha,scope=build
|
./main-repo
|
||||||
cache-to: type=gha,mode=max,scope=build
|
|
||||||
|
|
||||||
- name: Verify multi-platform support
|
- name: Verify multi-platform support
|
||||||
run: |
|
run: |
|
||||||
images=("openim/openim-server" "ghcr.io/openimsdk/openim-server" "registry.cn-hangzhou.aliyuncs.com/openimsdk/openim-server")
|
images=(
|
||||||
|
"${{ secrets.DOCKER_USERNAME }}/${{ env.IMAGE_NAME }}"
|
||||||
|
"ghcr.io/${{ github.repository_owner }}/${{ env.IMAGE_NAME }}"
|
||||||
|
"registry.cn-hangzhou.aliyuncs.com/openimsdk/${{ env.IMAGE_NAME }}"
|
||||||
|
)
|
||||||
|
|
||||||
for image in "${images[@]}"; do
|
for image in "${images[@]}"; do
|
||||||
for tag in $(echo "${{ steps.meta.outputs.tags }}" | tr ',' '\n'); do
|
for tag in $(echo "${{ steps.meta.outputs.tags }}" | tr ',' '\n' | cut -d':' -f2); do
|
||||||
|
echo "Verifying multi-arch support for $image:$tag"
|
||||||
manifest=$(docker manifest inspect "$image:$tag" || echo "error")
|
manifest=$(docker manifest inspect "$image:$tag" || echo "error")
|
||||||
if [[ "$manifest" == "error" ]]; then
|
if [[ "$manifest" == "error" ]]; then
|
||||||
echo "Manifest not found for $image:$tag"
|
echo "Manifest not found for $image:$tag"
|
||||||
@ -135,5 +189,6 @@ jobs:
|
|||||||
echo "Multi-platform support check failed for $image:$tag - missing arm64"
|
echo "Multi-platform support check failed for $image:$tag - missing arm64"
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
echo "✅ $image:$tag supports both amd64 and arm64 architectures"
|
||||||
done
|
done
|
||||||
done
|
done
|
||||||
|
|||||||
@ -41,6 +41,9 @@ afterSendGroupMsg:
|
|||||||
attentionIds: []
|
attentionIds: []
|
||||||
# See beforeSendSingleMsg comment.
|
# See beforeSendSingleMsg comment.
|
||||||
deniedTypes: []
|
deniedTypes: []
|
||||||
|
afterMsgSaveDB:
|
||||||
|
enable: false
|
||||||
|
timeout: 5
|
||||||
afterUserOnline:
|
afterUserOnline:
|
||||||
enable: false
|
enable: false
|
||||||
timeout: 5
|
timeout: 5
|
||||||
|
|||||||
4
go.mod
4
go.mod
@ -12,8 +12,8 @@ require (
|
|||||||
github.com/gorilla/websocket v1.5.1
|
github.com/gorilla/websocket v1.5.1
|
||||||
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
|
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
|
||||||
github.com/mitchellh/mapstructure v1.5.0
|
github.com/mitchellh/mapstructure v1.5.0
|
||||||
github.com/openimsdk/protocol v0.0.73-alpha.14
|
github.com/openimsdk/protocol v0.0.73-alpha.17
|
||||||
github.com/openimsdk/tools v0.0.50-alpha.104
|
github.com/openimsdk/tools v0.0.50-alpha.105
|
||||||
github.com/pkg/errors v0.9.1 // indirect
|
github.com/pkg/errors v0.9.1 // indirect
|
||||||
github.com/prometheus/client_golang v1.18.0
|
github.com/prometheus/client_golang v1.18.0
|
||||||
github.com/stretchr/testify v1.10.0
|
github.com/stretchr/testify v1.10.0
|
||||||
|
|||||||
8
go.sum
8
go.sum
@ -349,10 +349,10 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
|
|||||||
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
|
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
|
||||||
github.com/openimsdk/gomake v0.0.15-alpha.11 h1:PQudYDRESYeYlUYrrLLJhYIlUPO5x7FAx+o5El9U/Bw=
|
github.com/openimsdk/gomake v0.0.15-alpha.11 h1:PQudYDRESYeYlUYrrLLJhYIlUPO5x7FAx+o5El9U/Bw=
|
||||||
github.com/openimsdk/gomake v0.0.15-alpha.11/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
|
github.com/openimsdk/gomake v0.0.15-alpha.11/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
|
||||||
github.com/openimsdk/protocol v0.0.73-alpha.14 h1:lv9wNiPRm6G7q74TfpMobKrSfeTaBlZ+Ps3O6UFPmaE=
|
github.com/openimsdk/protocol v0.0.73-alpha.17 h1:ddo0QMns1GVwAmrPIPlAQ7uKmThAYLnOt+CIOgLsJyE=
|
||||||
github.com/openimsdk/protocol v0.0.73-alpha.14/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw=
|
github.com/openimsdk/protocol v0.0.73-alpha.17/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw=
|
||||||
github.com/openimsdk/tools v0.0.50-alpha.104 h1:fmDMqMC+SasN8noGKCT++0BQDqEF2O4ek9bLkiLMeHw=
|
github.com/openimsdk/tools v0.0.50-alpha.105 h1:axuCvKXhxY2RGLhpMMFNgBtE0B65T2Sr1JDW3UD9nBs=
|
||||||
github.com/openimsdk/tools v0.0.50-alpha.104/go.mod h1:x9i/e+WJFW4tocy6RNJQ9NofQiP3KJ1Y576/06TqOG4=
|
github.com/openimsdk/tools v0.0.50-alpha.105/go.mod h1:x9i/e+WJFW4tocy6RNJQ9NofQiP3KJ1Y576/06TqOG4=
|
||||||
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
|
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
|
||||||
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
|
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
|
||||||
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
|
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
|
||||||
|
|||||||
@ -76,3 +76,7 @@ func (o *ConversationApi) GetPinnedConversationIDs(c *gin.Context) {
|
|||||||
func (o *ConversationApi) UpdateConversationsByUser(c *gin.Context) {
|
func (o *ConversationApi) UpdateConversationsByUser(c *gin.Context) {
|
||||||
a2r.Call(c, conversation.ConversationClient.UpdateConversationsByUser, o.Client)
|
a2r.Call(c, conversation.ConversationClient.UpdateConversationsByUser, o.Client)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (o *ConversationApi) DeleteConversations(c *gin.Context) {
|
||||||
|
a2r.Call(c, conversation.ConversationClient.DeleteConversations, o.Client)
|
||||||
|
}
|
||||||
|
|||||||
@ -289,6 +289,7 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, cf
|
|||||||
conversationGroup.POST("/get_owner_conversation", c.GetOwnerConversation)
|
conversationGroup.POST("/get_owner_conversation", c.GetOwnerConversation)
|
||||||
conversationGroup.POST("/get_not_notify_conversation_ids", c.GetNotNotifyConversationIDs)
|
conversationGroup.POST("/get_not_notify_conversation_ids", c.GetNotNotifyConversationIDs)
|
||||||
conversationGroup.POST("/get_pinned_conversation_ids", c.GetPinnedConversationIDs)
|
conversationGroup.POST("/get_pinned_conversation_ids", c.GetPinnedConversationIDs)
|
||||||
|
conversationGroup.POST("/delete_conversations", c.DeleteConversations)
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
|
|||||||
@ -70,6 +70,7 @@ type Client struct {
|
|||||||
UserID string `json:"userID"`
|
UserID string `json:"userID"`
|
||||||
IsBackground bool `json:"isBackground"`
|
IsBackground bool `json:"isBackground"`
|
||||||
SDKType string `json:"sdkType"`
|
SDKType string `json:"sdkType"`
|
||||||
|
SDKVersion string `json:"sdkVersion"`
|
||||||
Encoder Encoder
|
Encoder Encoder
|
||||||
ctx *UserConnContext
|
ctx *UserConnContext
|
||||||
longConnServer LongConnServer
|
longConnServer LongConnServer
|
||||||
@ -97,6 +98,7 @@ func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, longConnServer
|
|||||||
c.closedErr = nil
|
c.closedErr = nil
|
||||||
c.token = ctx.GetToken()
|
c.token = ctx.GetToken()
|
||||||
c.SDKType = ctx.GetSDKType()
|
c.SDKType = ctx.GetSDKType()
|
||||||
|
c.SDKVersion = ctx.GetSDKVersion()
|
||||||
c.hbCtx, c.hbCancel = context.WithCancel(c.ctx)
|
c.hbCtx, c.hbCancel = context.WithCancel(c.ctx)
|
||||||
c.subLock = new(sync.Mutex)
|
c.subLock = new(sync.Mutex)
|
||||||
if c.subUserIDs != nil {
|
if c.subUserIDs != nil {
|
||||||
|
|||||||
@ -28,6 +28,7 @@ const (
|
|||||||
BackgroundStatus = "isBackground"
|
BackgroundStatus = "isBackground"
|
||||||
SendResponse = "isMsgResp"
|
SendResponse = "isMsgResp"
|
||||||
SDKType = "sdkType"
|
SDKType = "sdkType"
|
||||||
|
SDKVersion = "sdkVersion"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|||||||
@ -15,12 +15,13 @@
|
|||||||
package msggateway
|
package msggateway
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
|
||||||
|
|
||||||
"github.com/openimsdk/protocol/constant"
|
"github.com/openimsdk/protocol/constant"
|
||||||
"github.com/openimsdk/tools/utils/encrypt"
|
"github.com/openimsdk/tools/utils/encrypt"
|
||||||
"github.com/openimsdk/tools/utils/stringutil"
|
"github.com/openimsdk/tools/utils/stringutil"
|
||||||
@ -140,6 +141,10 @@ func (c *UserConnContext) GetToken() string {
|
|||||||
return c.Req.URL.Query().Get(Token)
|
return c.Req.URL.Query().Get(Token)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *UserConnContext) GetSDKVersion() string {
|
||||||
|
return c.Req.URL.Query().Get(SDKVersion)
|
||||||
|
}
|
||||||
|
|
||||||
func (c *UserConnContext) GetCompression() bool {
|
func (c *UserConnContext) GetCompression() bool {
|
||||||
compression, exists := c.Query(Compression)
|
compression, exists := c.Query(Compression)
|
||||||
if exists && compression == GzipCompressionProtocol {
|
if exists && compression == GzipCompressionProtocol {
|
||||||
|
|||||||
@ -13,6 +13,7 @@ import (
|
|||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/rpccache"
|
"github.com/openimsdk/open-im-server/v3/pkg/rpccache"
|
||||||
pbAuth "github.com/openimsdk/protocol/auth"
|
pbAuth "github.com/openimsdk/protocol/auth"
|
||||||
|
"github.com/openimsdk/tools/errs"
|
||||||
"github.com/openimsdk/tools/mcontext"
|
"github.com/openimsdk/tools/mcontext"
|
||||||
|
|
||||||
"github.com/go-playground/validator/v10"
|
"github.com/go-playground/validator/v10"
|
||||||
@ -64,6 +65,8 @@ type WsServer struct {
|
|||||||
webhookClient *webhook.Client
|
webhookClient *webhook.Client
|
||||||
userClient *rpcli.UserClient
|
userClient *rpcli.UserClient
|
||||||
authClient *rpcli.AuthClient
|
authClient *rpcli.AuthClient
|
||||||
|
|
||||||
|
ready atomic.Bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type kickHandler struct {
|
type kickHandler struct {
|
||||||
@ -93,6 +96,8 @@ func (ws *WsServer) SetDiscoveryRegistry(ctx context.Context, disCov discovery.C
|
|||||||
ws.authClient = rpcli.NewAuthClient(authConn)
|
ws.authClient = rpcli.NewAuthClient(authConn)
|
||||||
ws.MessageHandler = NewGrpcHandler(ws.validate, rpcli.NewMsgClient(msgConn), rpcli.NewPushMsgServiceClient(pushConn))
|
ws.MessageHandler = NewGrpcHandler(ws.validate, rpcli.NewMsgClient(msgConn), rpcli.NewPushMsgServiceClient(pushConn))
|
||||||
ws.disCov = disCov
|
ws.disCov = disCov
|
||||||
|
|
||||||
|
ws.ready.Store(true)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -254,6 +259,10 @@ func (ws *WsServer) registerClient(client *Client) {
|
|||||||
oldClients []*Client
|
oldClients []*Client
|
||||||
)
|
)
|
||||||
oldClients, userOK, clientOK = ws.clients.Get(client.UserID, client.PlatformID)
|
oldClients, userOK, clientOK = ws.clients.Get(client.UserID, client.PlatformID)
|
||||||
|
|
||||||
|
log.ZInfo(client.ctx, "registerClient", "userID", client.UserID, "platformID", client.PlatformID,
|
||||||
|
"sdkVersion", client.SDKVersion)
|
||||||
|
|
||||||
if !userOK {
|
if !userOK {
|
||||||
ws.clients.Set(client.UserID, client)
|
ws.clients.Set(client.UserID, client)
|
||||||
log.ZDebug(client.ctx, "user not exist", "userID", client.UserID, "platformID", client.PlatformID)
|
log.ZDebug(client.ctx, "user not exist", "userID", client.UserID, "platformID", client.PlatformID)
|
||||||
@ -453,6 +462,11 @@ func (ws *WsServer) wsHandler(w http.ResponseWriter, r *http.Request) {
|
|||||||
// Create a new connection context
|
// Create a new connection context
|
||||||
connContext := newContext(w, r)
|
connContext := newContext(w, r)
|
||||||
|
|
||||||
|
if !ws.ready.Load() {
|
||||||
|
httpError(connContext, errs.New("ws server not ready"))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Check if the current number of online user connections exceeds the maximum limit
|
// Check if the current number of online user connections exceeds the maximum limit
|
||||||
if ws.onlineUserConnNum.Load() >= ws.wsMaxConnNum {
|
if ws.onlineUserConnNum.Load() >= ws.wsMaxConnNum {
|
||||||
// If it exceeds the maximum connection number, return an error via HTTP and stop processing
|
// If it exceeds the maximum connection number, return an error via HTTP and stop processing
|
||||||
@ -469,6 +483,11 @@ func (ws *WsServer) wsHandler(w http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ws.authClient == nil {
|
||||||
|
httpError(connContext, errs.New("auth client is not initialized"))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Call the authentication client to parse the Token obtained from the context
|
// Call the authentication client to parse the Token obtained from the context
|
||||||
resp, err := ws.authClient.ParseToken(connContext, connContext.GetToken())
|
resp, err := ws.authClient.ParseToken(connContext, connContext.GetToken())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@ -51,37 +51,24 @@ func GetContent(msg *sdkws.MsgData) string {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mc *OnlineHistoryMongoConsumerHandler) webhookAfterSendSingleMsg(ctx context.Context, after *config.AfterConfig, msg *sdkws.MsgData) {
|
func (mc *OnlineHistoryMongoConsumerHandler) webhookAfterMsgSaveDB(ctx context.Context, after *config.AfterConfig, msg *sdkws.MsgData) {
|
||||||
if msg.ContentType == constant.Typing {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if !filterAfterMsg(msg, after) {
|
if !filterAfterMsg(msg, after) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
cbReq := &cbapi.CallbackAfterSendSingleMsgReq{
|
cbReq := &cbapi.CallbackAfterMsgSaveDBReq{
|
||||||
CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand),
|
CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterMsgSaveDBCommand),
|
||||||
RecvID: msg.RecvID,
|
|
||||||
}
|
|
||||||
mc.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendSingleMsgResp{}, after, buildKeyMsgDataQuery(msg))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mc *OnlineHistoryMongoConsumerHandler) webhookAfterSendGroupMsg(ctx context.Context, after *config.AfterConfig, msg *sdkws.MsgData) {
|
|
||||||
if msg.ContentType == constant.Typing {
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if !filterAfterMsg(msg, after) {
|
switch msg.SessionType {
|
||||||
return
|
case constant.SingleChatType, constant.NotificationChatType:
|
||||||
|
cbReq.RecvID = msg.RecvID
|
||||||
|
case constant.ReadGroupChatType:
|
||||||
|
cbReq.GroupID = msg.GroupID
|
||||||
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
cbReq := &cbapi.CallbackAfterSendGroupMsgReq{
|
mc.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterMsgSaveDBResp{}, after, buildKeyMsgDataQuery(msg))
|
||||||
CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendGroupMsgCommand),
|
|
||||||
GroupID: msg.GroupID,
|
|
||||||
}
|
|
||||||
|
|
||||||
mc.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after, buildKeyMsgDataQuery(msg))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func buildKeyMsgDataQuery(msg *sdkws.MsgData) map[string]string {
|
func buildKeyMsgDataQuery(msg *sdkws.MsgData) map[string]string {
|
||||||
|
|||||||
@ -18,6 +18,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
|
|
||||||
"github.com/openimsdk/tools/mq"
|
"github.com/openimsdk/tools/mq"
|
||||||
|
|
||||||
"sync"
|
"sync"
|
||||||
|
|||||||
@ -15,7 +15,6 @@
|
|||||||
package msgtransfer
|
package msgtransfer
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/openimsdk/protocol/constant"
|
|
||||||
"github.com/openimsdk/tools/mq"
|
"github.com/openimsdk/tools/mq"
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
||||||
@ -57,7 +56,7 @@ func (mc *OnlineHistoryMongoConsumerHandler) HandleChatWs2Mongo(val mq.Message)
|
|||||||
log.ZDebug(ctx, "mongo consumer recv msg", "msgs", msgFromMQ.String())
|
log.ZDebug(ctx, "mongo consumer recv msg", "msgs", msgFromMQ.String())
|
||||||
err = mc.msgTransferDatabase.BatchInsertChat2DB(ctx, msgFromMQ.ConversationID, msgFromMQ.MsgData, msgFromMQ.LastSeq)
|
err = mc.msgTransferDatabase.BatchInsertChat2DB(ctx, msgFromMQ.ConversationID, msgFromMQ.MsgData, msgFromMQ.LastSeq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.ZError(ctx, "single data insert to mongo err", err, "msg", msgFromMQ.MsgData, "conversationID", msgFromMQ.ConversationID)
|
log.ZError(ctx, "batch data insert to mongo err", err, "msg", msgFromMQ.MsgData, "conversationID", msgFromMQ.ConversationID)
|
||||||
prommetrics.MsgInsertMongoFailedCounter.Inc()
|
prommetrics.MsgInsertMongoFailedCounter.Inc()
|
||||||
} else {
|
} else {
|
||||||
prommetrics.MsgInsertMongoSuccessCounter.Inc()
|
prommetrics.MsgInsertMongoSuccessCounter.Inc()
|
||||||
@ -65,12 +64,7 @@ func (mc *OnlineHistoryMongoConsumerHandler) HandleChatWs2Mongo(val mq.Message)
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, msgData := range msgFromMQ.MsgData {
|
for _, msgData := range msgFromMQ.MsgData {
|
||||||
switch msgData.SessionType {
|
mc.webhookAfterMsgSaveDB(ctx, &mc.config.WebhooksConfig.AfterMsgSaveDB, msgData)
|
||||||
case constant.SingleChatType:
|
|
||||||
mc.webhookAfterSendSingleMsg(ctx, &mc.config.WebhooksConfig.AfterSendSingleMsg, msgData)
|
|
||||||
case constant.ReadGroupChatType:
|
|
||||||
mc.webhookAfterSendGroupMsg(ctx, &mc.config.WebhooksConfig.AfterSendGroupMsg, msgData)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//var seqs []int64
|
//var seqs []int64
|
||||||
|
|||||||
@ -37,6 +37,7 @@ import (
|
|||||||
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
|
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
|
||||||
"github.com/openimsdk/protocol/constant"
|
"github.com/openimsdk/protocol/constant"
|
||||||
pbconversation "github.com/openimsdk/protocol/conversation"
|
pbconversation "github.com/openimsdk/protocol/conversation"
|
||||||
|
"github.com/openimsdk/protocol/msg"
|
||||||
"github.com/openimsdk/protocol/sdkws"
|
"github.com/openimsdk/protocol/sdkws"
|
||||||
"github.com/openimsdk/tools/discovery"
|
"github.com/openimsdk/tools/discovery"
|
||||||
"github.com/openimsdk/tools/errs"
|
"github.com/openimsdk/tools/errs"
|
||||||
@ -795,7 +796,7 @@ func (c *conversationServer) ClearUserConversationMsg(ctx context.Context, req *
|
|||||||
}
|
}
|
||||||
latestMsgDestructTime := time.UnixMilli(req.Timestamp)
|
latestMsgDestructTime := time.UnixMilli(req.Timestamp)
|
||||||
for i, conversation := range conversations {
|
for i, conversation := range conversations {
|
||||||
if conversation.IsMsgDestruct == false || conversation.MsgDestructTime == 0 {
|
if !conversation.IsMsgDestruct || conversation.MsgDestructTime == 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
seq, err := c.msgClient.GetLastMessageSeqByTime(ctx, conversation.ConversationID, req.Timestamp-(conversation.MsgDestructTime*1000))
|
seq, err := c.msgClient.GetLastMessageSeqByTime(ctx, conversation.ConversationID, req.Timestamp-(conversation.MsgDestructTime*1000))
|
||||||
@ -835,3 +836,53 @@ func (c *conversationServer) setConversationMinSeqAndLatestMsgDestructTime(ctx c
|
|||||||
c.conversationNotificationSender.ConversationChangeNotification(ctx, ownerUserID, []string{conversationID})
|
c.conversationNotificationSender.ConversationChangeNotification(ctx, ownerUserID, []string{conversationID})
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *conversationServer) DeleteConversations(ctx context.Context, req *pbconversation.DeleteConversationsReq) (resp *pbconversation.DeleteConversationsResp, err error) {
|
||||||
|
if err := authverify.CheckAccess(ctx, req.OwnerUserID); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if req.NeedDeleteTime == 0 && len(req.ConversationIDs) == 0 {
|
||||||
|
return nil, errs.ErrArgs.WrapMsg("need_delete_time or conversationIDs need be set")
|
||||||
|
}
|
||||||
|
|
||||||
|
if req.NeedDeleteTime != 0 && len(req.ConversationIDs) != 0 {
|
||||||
|
return nil, errs.ErrArgs.WrapMsg("need_delete_time and conversationIDs cannot both be set")
|
||||||
|
}
|
||||||
|
|
||||||
|
var needDeleteConversationIDs []string
|
||||||
|
|
||||||
|
if len(req.ConversationIDs) == 0 {
|
||||||
|
deleteTimeThreshold := time.Now().AddDate(0, 0, -int(req.NeedDeleteTime)).UnixMilli()
|
||||||
|
conversationIDs, err := c.conversationDatabase.GetConversationIDs(ctx, req.OwnerUserID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
latestMsgs, err := c.msgClient.GetLastMessage(ctx, &msg.GetLastMessageReq{
|
||||||
|
UserID: req.OwnerUserID,
|
||||||
|
ConversationIDs: conversationIDs,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for conversationID, msg := range latestMsgs.Msgs {
|
||||||
|
if msg.SendTime < deleteTimeThreshold {
|
||||||
|
needDeleteConversationIDs = append(needDeleteConversationIDs, conversationID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(needDeleteConversationIDs) == 0 {
|
||||||
|
return &pbconversation.DeleteConversationsResp{}, nil
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
needDeleteConversationIDs = req.ConversationIDs
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := c.conversationDatabase.DeleteUsersConversations(ctx, req.OwnerUserID, needDeleteConversationIDs); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// c.conversationNotificationSender.ConversationDeleteNotification(ctx, req.OwnerUserID, needDeleteConversationIDs)
|
||||||
|
|
||||||
|
return &pbconversation.DeleteConversationsResp{}, nil
|
||||||
|
}
|
||||||
|
|||||||
@ -73,3 +73,12 @@ func (c *ConversationNotificationSender) ConversationUnreadChangeNotification(
|
|||||||
|
|
||||||
c.Notification(ctx, userID, userID, constant.ConversationUnreadNotification, tips)
|
c.Notification(ctx, userID, userID, constant.ConversationUnreadNotification, tips)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *ConversationNotificationSender) ConversationDeleteNotification(ctx context.Context, userID string, conversationIDs []string) {
|
||||||
|
tips := &sdkws.ConversationDeleteTips{
|
||||||
|
UserID: userID,
|
||||||
|
ConversationIDs: conversationIDs,
|
||||||
|
}
|
||||||
|
|
||||||
|
c.Notification(ctx, userID, userID, constant.ConversationDeleteNotification, tips)
|
||||||
|
}
|
||||||
|
|||||||
@ -16,8 +16,10 @@ package msg
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/base64"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/apistruct"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
|
||||||
"github.com/openimsdk/tools/errs"
|
"github.com/openimsdk/tools/errs"
|
||||||
|
|
||||||
@ -28,6 +30,7 @@ import (
|
|||||||
"github.com/openimsdk/protocol/sdkws"
|
"github.com/openimsdk/protocol/sdkws"
|
||||||
"github.com/openimsdk/tools/mcontext"
|
"github.com/openimsdk/tools/mcontext"
|
||||||
"github.com/openimsdk/tools/utils/datautil"
|
"github.com/openimsdk/tools/utils/datautil"
|
||||||
|
"github.com/openimsdk/tools/utils/stringutil"
|
||||||
"google.golang.org/protobuf/proto"
|
"google.golang.org/protobuf/proto"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -87,19 +90,19 @@ func (m *msgServer) webhookBeforeSendSingleMsg(ctx context.Context, before *conf
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Move to msgtransfer
|
// Move to msgtransfer
|
||||||
// func (m *msgServer) webhookAfterSendSingleMsg(ctx context.Context, after *config.AfterConfig, msg *pbchat.SendMsgReq) {
|
func (m *msgServer) webhookAfterSendSingleMsg(ctx context.Context, after *config.AfterConfig, msg *pbchat.SendMsgReq) {
|
||||||
// if msg.MsgData.ContentType == constant.Typing {
|
if msg.MsgData.ContentType == constant.Typing {
|
||||||
// return
|
return
|
||||||
// }
|
}
|
||||||
// if !filterAfterMsg(msg, after) {
|
if !filterAfterMsg(msg, after) {
|
||||||
// return
|
return
|
||||||
// }
|
}
|
||||||
// cbReq := &cbapi.CallbackAfterSendSingleMsgReq{
|
cbReq := &cbapi.CallbackAfterSendSingleMsgReq{
|
||||||
// CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand),
|
CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand),
|
||||||
// RecvID: msg.MsgData.RecvID,
|
RecvID: msg.MsgData.RecvID,
|
||||||
// }
|
}
|
||||||
// m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendSingleMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData))
|
m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendSingleMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData))
|
||||||
// }
|
}
|
||||||
|
|
||||||
func (m *msgServer) webhookBeforeSendGroupMsg(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq) error {
|
func (m *msgServer) webhookBeforeSendGroupMsg(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq) error {
|
||||||
return webhook.WithCondition(ctx, before, func(ctx context.Context) error {
|
return webhook.WithCondition(ctx, before, func(ctx context.Context) error {
|
||||||
@ -121,21 +124,20 @@ func (m *msgServer) webhookBeforeSendGroupMsg(ctx context.Context, before *confi
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Move to msgtransfer
|
func (m *msgServer) webhookAfterSendGroupMsg(ctx context.Context, after *config.AfterConfig, msg *pbchat.SendMsgReq) {
|
||||||
// func (m *msgServer) webhookAfterSendGroupMsg(ctx context.Context, after *config.AfterConfig, msg *pbchat.SendMsgReq) {
|
if msg.MsgData.ContentType == constant.Typing {
|
||||||
// if msg.MsgData.ContentType == constant.Typing {
|
return
|
||||||
// return
|
}
|
||||||
// }
|
if !filterAfterMsg(msg, after) {
|
||||||
// if !filterAfterMsg(msg, after) {
|
return
|
||||||
// return
|
}
|
||||||
// }
|
cbReq := &cbapi.CallbackAfterSendGroupMsgReq{
|
||||||
// cbReq := &cbapi.CallbackAfterSendGroupMsgReq{
|
CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendGroupMsgCommand),
|
||||||
// CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendGroupMsgCommand),
|
GroupID: msg.MsgData.GroupID,
|
||||||
// GroupID: msg.MsgData.GroupID,
|
}
|
||||||
// }
|
|
||||||
|
|
||||||
// m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData))
|
m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData))
|
||||||
// }
|
}
|
||||||
|
|
||||||
func (m *msgServer) webhookBeforeMsgModify(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq, beforeMsgData **sdkws.MsgData) error {
|
func (m *msgServer) webhookBeforeMsgModify(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq, beforeMsgData **sdkws.MsgData) error {
|
||||||
return webhook.WithCondition(ctx, before, func(ctx context.Context) error {
|
return webhook.WithCondition(ctx, before, func(ctx context.Context) error {
|
||||||
@ -204,14 +206,14 @@ func (m *msgServer) webhookAfterRevokeMsg(ctx context.Context, after *config.Aft
|
|||||||
m.webhookClient.AsyncPost(ctx, callbackReq.GetCallbackCommand(), callbackReq, &cbapi.CallbackAfterRevokeMsgResp{}, after)
|
m.webhookClient.AsyncPost(ctx, callbackReq.GetCallbackCommand(), callbackReq, &cbapi.CallbackAfterRevokeMsgResp{}, after)
|
||||||
}
|
}
|
||||||
|
|
||||||
// func buildKeyMsgDataQuery(msg *sdkws.MsgData) map[string]string {
|
func buildKeyMsgDataQuery(msg *sdkws.MsgData) map[string]string {
|
||||||
// keyMsgData := apistruct.KeyMsgData{
|
keyMsgData := apistruct.KeyMsgData{
|
||||||
// SendID: msg.SendID,
|
SendID: msg.SendID,
|
||||||
// RecvID: msg.RecvID,
|
RecvID: msg.RecvID,
|
||||||
// GroupID: msg.GroupID,
|
GroupID: msg.GroupID,
|
||||||
// }
|
}
|
||||||
|
|
||||||
// return map[string]string{
|
return map[string]string{
|
||||||
// webhook.Key: base64.StdEncoding.EncodeToString(stringutil.StructToJsonBytes(keyMsgData)),
|
webhook.Key: base64.StdEncoding.EncodeToString(stringutil.StructToJsonBytes(keyMsgData)),
|
||||||
// }
|
}
|
||||||
// }
|
}
|
||||||
|
|||||||
@ -86,7 +86,7 @@ func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *pbmsg.SendMsgReq,
|
|||||||
go m.setConversationAtInfo(ctx, req.MsgData)
|
go m.setConversationAtInfo(ctx, req.MsgData)
|
||||||
}
|
}
|
||||||
|
|
||||||
// m.webhookAfterSendGroupMsg(ctx, &m.config.WebhooksConfig.AfterSendGroupMsg, req)
|
m.webhookAfterSendGroupMsg(ctx, &m.config.WebhooksConfig.AfterSendGroupMsg, req)
|
||||||
|
|
||||||
prommetrics.GroupChatMsgProcessSuccessCounter.Inc()
|
prommetrics.GroupChatMsgProcessSuccessCounter.Inc()
|
||||||
resp = &pbmsg.SendMsgResp{}
|
resp = &pbmsg.SendMsgResp{}
|
||||||
@ -194,7 +194,7 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// m.webhookAfterSendSingleMsg(ctx, &m.config.WebhooksConfig.AfterSendSingleMsg, req)
|
m.webhookAfterSendSingleMsg(ctx, &m.config.WebhooksConfig.AfterSendSingleMsg, req)
|
||||||
prommetrics.SingleChatMsgProcessSuccessCounter.Inc()
|
prommetrics.SingleChatMsgProcessSuccessCounter.Inc()
|
||||||
return &pbmsg.SendMsgResp{
|
return &pbmsg.SendMsgResp{
|
||||||
ServerMsgID: req.MsgData.ServerMsgID,
|
ServerMsgID: req.MsgData.ServerMsgID,
|
||||||
|
|||||||
@ -66,4 +66,5 @@ const (
|
|||||||
CallbackAfterCreateSingleChatConversationsCommand = "callbackAfterCreateSingleChatConversationsCommand"
|
CallbackAfterCreateSingleChatConversationsCommand = "callbackAfterCreateSingleChatConversationsCommand"
|
||||||
CallbackBeforeCreateGroupChatConversationsCommand = "callbackBeforeCreateGroupChatConversationsCommand"
|
CallbackBeforeCreateGroupChatConversationsCommand = "callbackBeforeCreateGroupChatConversationsCommand"
|
||||||
CallbackAfterCreateGroupChatConversationsCommand = "callbackAfterCreateGroupChatConversationsCommand"
|
CallbackAfterCreateGroupChatConversationsCommand = "callbackAfterCreateGroupChatConversationsCommand"
|
||||||
|
CallbackAfterMsgSaveDBCommand = "callbackAfterMsgSaveDBCommand"
|
||||||
)
|
)
|
||||||
|
|||||||
@ -2,42 +2,42 @@ package callbackstruct
|
|||||||
|
|
||||||
type CallbackBeforeCreateSingleChatConversationsReq struct {
|
type CallbackBeforeCreateSingleChatConversationsReq struct {
|
||||||
CallbackCommand `json:"callbackCommand"`
|
CallbackCommand `json:"callbackCommand"`
|
||||||
OwnerUserID string `json:"owner_user_id"`
|
OwnerUserID string `json:"ownerUserId"`
|
||||||
ConversationID string `json:"conversation_id"`
|
ConversationID string `json:"conversationId"`
|
||||||
ConversationType int32 `json:"conversation_type"`
|
ConversationType int32 `json:"conversationType"`
|
||||||
UserID string `json:"user_id"`
|
UserID string `json:"userId"`
|
||||||
RecvMsgOpt int32 `json:"recv_msg_opt"`
|
RecvMsgOpt int32 `json:"recvMsgOpt"`
|
||||||
IsPinned bool `json:"is_pinned"`
|
IsPinned bool `json:"isPinned"`
|
||||||
IsPrivateChat bool `json:"is_private_chat"`
|
IsPrivateChat bool `json:"isPrivateChat"`
|
||||||
BurnDuration int32 `json:"burn_duration"`
|
BurnDuration int32 `json:"burnDuration"`
|
||||||
GroupAtType int32 `json:"group_at_type"`
|
GroupAtType int32 `json:"groupAtType"`
|
||||||
AttachedInfo string `json:"attached_info"`
|
AttachedInfo string `json:"attachedInfo"`
|
||||||
Ex string `json:"ex"`
|
Ex string `json:"ex"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type CallbackBeforeCreateSingleChatConversationsResp struct {
|
type CallbackBeforeCreateSingleChatConversationsResp struct {
|
||||||
CommonCallbackResp
|
CommonCallbackResp
|
||||||
RecvMsgOpt *int32 `json:"recv_msg_opt"`
|
RecvMsgOpt *int32 `json:"recvMsgOpt"`
|
||||||
IsPinned *bool `json:"is_pinned"`
|
IsPinned *bool `json:"isPinned"`
|
||||||
IsPrivateChat *bool `json:"is_private_chat"`
|
IsPrivateChat *bool `json:"isPrivateChat"`
|
||||||
BurnDuration *int32 `json:"burn_duration"`
|
BurnDuration *int32 `json:"burnDuration"`
|
||||||
GroupAtType *int32 `json:"group_at_type"`
|
GroupAtType *int32 `json:"groupAtType"`
|
||||||
AttachedInfo *string `json:"attached_info"`
|
AttachedInfo *string `json:"attachedInfo"`
|
||||||
Ex *string `json:"ex"`
|
Ex *string `json:"ex"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type CallbackAfterCreateSingleChatConversationsReq struct {
|
type CallbackAfterCreateSingleChatConversationsReq struct {
|
||||||
CallbackCommand `json:"callbackCommand"`
|
CallbackCommand `json:"callbackCommand"`
|
||||||
OwnerUserID string `json:"owner_user_id"`
|
OwnerUserID string `json:"ownerUserId"`
|
||||||
ConversationID string `json:"conversation_id"`
|
ConversationID string `json:"conversationId"`
|
||||||
ConversationType int32 `json:"conversation_type"`
|
ConversationType int32 `json:"conversationType"`
|
||||||
UserID string `json:"user_id"`
|
UserID string `json:"userId"`
|
||||||
RecvMsgOpt int32 `json:"recv_msg_opt"`
|
RecvMsgOpt int32 `json:"recvMsgOpt"`
|
||||||
IsPinned bool `json:"is_pinned"`
|
IsPinned bool `json:"isPinned"`
|
||||||
IsPrivateChat bool `json:"is_private_chat"`
|
IsPrivateChat bool `json:"isPrivateChat"`
|
||||||
BurnDuration int32 `json:"burn_duration"`
|
BurnDuration int32 `json:"burnDuration"`
|
||||||
GroupAtType int32 `json:"group_at_type"`
|
GroupAtType int32 `json:"groupAtType"`
|
||||||
AttachedInfo string `json:"attached_info"`
|
AttachedInfo string `json:"attachedInfo"`
|
||||||
Ex string `json:"ex"`
|
Ex string `json:"ex"`
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -47,42 +47,42 @@ type CallbackAfterCreateSingleChatConversationsResp struct {
|
|||||||
|
|
||||||
type CallbackBeforeCreateGroupChatConversationsReq struct {
|
type CallbackBeforeCreateGroupChatConversationsReq struct {
|
||||||
CallbackCommand `json:"callbackCommand"`
|
CallbackCommand `json:"callbackCommand"`
|
||||||
OwnerUserID string `json:"owner_user_id"`
|
OwnerUserID string `json:"ownerUserId"`
|
||||||
ConversationID string `json:"conversation_id"`
|
ConversationID string `json:"conversationId"`
|
||||||
ConversationType int32 `json:"conversation_type"`
|
ConversationType int32 `json:"conversationType"`
|
||||||
GroupID string `json:"group_id"`
|
GroupID string `json:"groupId"`
|
||||||
RecvMsgOpt int32 `json:"recv_msg_opt"`
|
RecvMsgOpt int32 `json:"recvMsgOpt"`
|
||||||
IsPinned bool `json:"is_pinned"`
|
IsPinned bool `json:"isPinned"`
|
||||||
IsPrivateChat bool `json:"is_private_chat"`
|
IsPrivateChat bool `json:"isPrivateChat"`
|
||||||
BurnDuration int32 `json:"burn_duration"`
|
BurnDuration int32 `json:"burnDuration"`
|
||||||
GroupAtType int32 `json:"group_at_type"`
|
GroupAtType int32 `json:"groupAtType"`
|
||||||
AttachedInfo string `json:"attached_info"`
|
AttachedInfo string `json:"attachedInfo"`
|
||||||
Ex string `json:"ex"`
|
Ex string `json:"ex"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type CallbackBeforeCreateGroupChatConversationsResp struct {
|
type CallbackBeforeCreateGroupChatConversationsResp struct {
|
||||||
CommonCallbackResp
|
CommonCallbackResp
|
||||||
RecvMsgOpt *int32 `json:"recv_msg_opt"`
|
RecvMsgOpt *int32 `json:"recvMsgOpt"`
|
||||||
IsPinned *bool `json:"is_pinned"`
|
IsPinned *bool `json:"isPinned"`
|
||||||
IsPrivateChat *bool `json:"is_private_chat"`
|
IsPrivateChat *bool `json:"isPrivateChat"`
|
||||||
BurnDuration *int32 `json:"burn_duration"`
|
BurnDuration *int32 `json:"burnDuration"`
|
||||||
GroupAtType *int32 `json:"group_at_type"`
|
GroupAtType *int32 `json:"groupAtType"`
|
||||||
AttachedInfo *string `json:"attached_info"`
|
AttachedInfo *string `json:"attachedInfo"`
|
||||||
Ex *string `json:"ex"`
|
Ex *string `json:"ex"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type CallbackAfterCreateGroupChatConversationsReq struct {
|
type CallbackAfterCreateGroupChatConversationsReq struct {
|
||||||
CallbackCommand `json:"callbackCommand"`
|
CallbackCommand `json:"callbackCommand"`
|
||||||
OwnerUserID string `json:"owner_user_id"`
|
OwnerUserID string `json:"ownerUserId"`
|
||||||
ConversationID string `json:"conversation_id"`
|
ConversationID string `json:"conversationId"`
|
||||||
ConversationType int32 `json:"conversation_type"`
|
ConversationType int32 `json:"conversationType"`
|
||||||
GroupID string `json:"group_id"`
|
GroupID string `json:"groupId"`
|
||||||
RecvMsgOpt int32 `json:"recv_msg_opt"`
|
RecvMsgOpt int32 `json:"recvMsgOpt"`
|
||||||
IsPinned bool `json:"is_pinned"`
|
IsPinned bool `json:"isPinned"`
|
||||||
IsPrivateChat bool `json:"is_private_chat"`
|
IsPrivateChat bool `json:"isPrivateChat"`
|
||||||
BurnDuration int32 `json:"burn_duration"`
|
BurnDuration int32 `json:"burnDuration"`
|
||||||
GroupAtType int32 `json:"group_at_type"`
|
GroupAtType int32 `json:"groupAtType"`
|
||||||
AttachedInfo string `json:"attached_info"`
|
AttachedInfo string `json:"attachedInfo"`
|
||||||
Ex string `json:"ex"`
|
Ex string `json:"ex"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -103,3 +103,13 @@ type CallbackSingleMsgReadReq struct {
|
|||||||
type CallbackSingleMsgReadResp struct {
|
type CallbackSingleMsgReadResp struct {
|
||||||
CommonCallbackResp
|
CommonCallbackResp
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type CallbackAfterMsgSaveDBReq struct {
|
||||||
|
CommonCallbackReq
|
||||||
|
RecvID string `json:"recvID"`
|
||||||
|
GroupID string `json:"groupID"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type CallbackAfterMsgSaveDBResp struct {
|
||||||
|
CommonCallbackResp
|
||||||
|
}
|
||||||
|
|||||||
@ -473,6 +473,7 @@ type Webhooks struct {
|
|||||||
BeforeSendGroupMsg BeforeConfig `yaml:"beforeSendGroupMsg"`
|
BeforeSendGroupMsg BeforeConfig `yaml:"beforeSendGroupMsg"`
|
||||||
BeforeMsgModify BeforeConfig `yaml:"beforeMsgModify"`
|
BeforeMsgModify BeforeConfig `yaml:"beforeMsgModify"`
|
||||||
AfterSendGroupMsg AfterConfig `yaml:"afterSendGroupMsg"`
|
AfterSendGroupMsg AfterConfig `yaml:"afterSendGroupMsg"`
|
||||||
|
AfterMsgSaveDB AfterConfig `yaml:"afterMsgSaveDB"`
|
||||||
AfterUserOnline AfterConfig `yaml:"afterUserOnline"`
|
AfterUserOnline AfterConfig `yaml:"afterUserOnline"`
|
||||||
AfterUserOffline AfterConfig `yaml:"afterUserOffline"`
|
AfterUserOffline AfterConfig `yaml:"afterUserOffline"`
|
||||||
AfterUserKickOff AfterConfig `yaml:"afterUserKickOff"`
|
AfterUserKickOff AfterConfig `yaml:"afterUserKickOff"`
|
||||||
|
|||||||
3
pkg/common/storage/cache/conversation.go
vendored
3
pkg/common/storage/cache/conversation.go
vendored
@ -16,6 +16,7 @@ package cache
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -57,7 +58,7 @@ type ConversationCache interface {
|
|||||||
GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
|
GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
|
||||||
DelConversationNotReceiveMessageUserIDs(conversationIDs ...string) ConversationCache
|
DelConversationNotReceiveMessageUserIDs(conversationIDs ...string) ConversationCache
|
||||||
DelConversationNotNotifyMessageUserIDs(userIDs ...string) ConversationCache
|
DelConversationNotNotifyMessageUserIDs(userIDs ...string) ConversationCache
|
||||||
DelConversationPinnedMessageUserIDs(userIDs ...string) ConversationCache
|
DelUserPinnedConversations(userIDs ...string) ConversationCache
|
||||||
DelConversationVersionUserIDs(userIDs ...string) ConversationCache
|
DelConversationVersionUserIDs(userIDs ...string) ConversationCache
|
||||||
|
|
||||||
FindMaxConversationUserVersion(ctx context.Context, userID string) (*relationtb.VersionLog, error)
|
FindMaxConversationUserVersion(ctx context.Context, userID string) (*relationtb.VersionLog, error)
|
||||||
|
|||||||
@ -253,7 +253,7 @@ func (c *ConversationRedisCache) DelConversationNotNotifyMessageUserIDs(userIDs
|
|||||||
return cache
|
return cache
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ConversationRedisCache) DelConversationPinnedMessageUserIDs(userIDs ...string) cache.ConversationCache {
|
func (c *ConversationRedisCache) DelUserPinnedConversations(userIDs ...string) cache.ConversationCache {
|
||||||
cache := c.CloneConversationCache()
|
cache := c.CloneConversationCache()
|
||||||
for _, userID := range userIDs {
|
for _, userID := range userIDs {
|
||||||
cache.AddKeys(c.getPinnedConversationIDsKey(userID))
|
cache.AddKeys(c.getPinnedConversationIDsKey(userID))
|
||||||
|
|||||||
@ -78,6 +78,8 @@ type ConversationDatabase interface {
|
|||||||
GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error)
|
GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error)
|
||||||
// FindRandConversation finds random conversations based on the specified timestamp and limit.
|
// FindRandConversation finds random conversations based on the specified timestamp and limit.
|
||||||
FindRandConversation(ctx context.Context, ts int64, limit int) ([]*relationtb.Conversation, error)
|
FindRandConversation(ctx context.Context, ts int64, limit int) ([]*relationtb.Conversation, error)
|
||||||
|
|
||||||
|
DeleteUsersConversations(ctx context.Context, userID string, conversationIDs []string) (err error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewConversationDatabase(conversation database.Conversation, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase {
|
func NewConversationDatabase(conversation database.Conversation, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase {
|
||||||
@ -120,7 +122,7 @@ func (c *conversationDatabase) SetUsersConversationFieldTx(ctx context.Context,
|
|||||||
cache = cache.DelConversationNotNotifyMessageUserIDs(userIDs...)
|
cache = cache.DelConversationNotNotifyMessageUserIDs(userIDs...)
|
||||||
}
|
}
|
||||||
if _, ok := fieldMap["is_pinned"]; ok {
|
if _, ok := fieldMap["is_pinned"]; ok {
|
||||||
cache = cache.DelConversationPinnedMessageUserIDs(userIDs...)
|
cache = cache.DelUserPinnedConversations(userIDs...)
|
||||||
}
|
}
|
||||||
cache = cache.DelConversationVersionUserIDs(haveUserIDs...)
|
cache = cache.DelConversationVersionUserIDs(haveUserIDs...)
|
||||||
}
|
}
|
||||||
@ -172,7 +174,7 @@ func (c *conversationDatabase) UpdateUsersConversationField(ctx context.Context,
|
|||||||
cache = cache.DelConversationNotNotifyMessageUserIDs(userIDs...)
|
cache = cache.DelConversationNotNotifyMessageUserIDs(userIDs...)
|
||||||
}
|
}
|
||||||
if _, ok := args["is_pinned"]; ok {
|
if _, ok := args["is_pinned"]; ok {
|
||||||
cache = cache.DelConversationPinnedMessageUserIDs(userIDs...)
|
cache = cache.DelUserPinnedConversations(userIDs...)
|
||||||
}
|
}
|
||||||
return cache.ChainExecDel(ctx)
|
return cache.ChainExecDel(ctx)
|
||||||
}
|
}
|
||||||
@ -203,7 +205,7 @@ func (c *conversationDatabase) CreateConversation(ctx context.Context, conversat
|
|||||||
DelUserConversationIDsHash(userIDs...).
|
DelUserConversationIDsHash(userIDs...).
|
||||||
DelConversationVersionUserIDs(userIDs...).
|
DelConversationVersionUserIDs(userIDs...).
|
||||||
DelConversationNotNotifyMessageUserIDs(notNotifyUserIDs...).
|
DelConversationNotNotifyMessageUserIDs(notNotifyUserIDs...).
|
||||||
DelConversationPinnedMessageUserIDs(pinnedUserIDs...).
|
DelUserPinnedConversations(pinnedUserIDs...).
|
||||||
ChainExecDel(ctx)
|
ChainExecDel(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -259,7 +261,7 @@ func (c *conversationDatabase) SetUserConversations(ctx context.Context, ownerUs
|
|||||||
cache := c.cache.CloneConversationCache()
|
cache := c.cache.CloneConversationCache()
|
||||||
cache = cache.DelConversationVersionUserIDs(ownerUserID).
|
cache = cache.DelConversationVersionUserIDs(ownerUserID).
|
||||||
DelConversationNotNotifyMessageUserIDs(ownerUserID).
|
DelConversationNotNotifyMessageUserIDs(ownerUserID).
|
||||||
DelConversationPinnedMessageUserIDs(ownerUserID)
|
DelUserPinnedConversations(ownerUserID)
|
||||||
|
|
||||||
groupIDs := datautil.Distinct(datautil.Filter(conversations, func(e *relationtb.Conversation) (string, bool) {
|
groupIDs := datautil.Distinct(datautil.Filter(conversations, func(e *relationtb.Conversation) (string, bool) {
|
||||||
return e.GroupID, e.GroupID != ""
|
return e.GroupID, e.GroupID != ""
|
||||||
@ -429,3 +431,21 @@ func (c *conversationDatabase) GetPinnedConversationIDs(ctx context.Context, use
|
|||||||
func (c *conversationDatabase) FindRandConversation(ctx context.Context, ts int64, limit int) ([]*relationtb.Conversation, error) {
|
func (c *conversationDatabase) FindRandConversation(ctx context.Context, ts int64, limit int) ([]*relationtb.Conversation, error) {
|
||||||
return c.conversationDB.FindRandConversation(ctx, ts, limit)
|
return c.conversationDB.FindRandConversation(ctx, ts, limit)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *conversationDatabase) DeleteUsersConversations(ctx context.Context, userID string, conversationIDs []string) (err error) {
|
||||||
|
return c.tx.Transaction(ctx, func(ctx context.Context) error {
|
||||||
|
err = c.conversationDB.DeleteUsersConversations(ctx, userID, conversationIDs)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
cache := c.cache.CloneConversationCache()
|
||||||
|
cache = cache.DelConversations(userID, conversationIDs...).
|
||||||
|
DelConversationVersionUserIDs(userID).
|
||||||
|
DelConversationIDs(userID).
|
||||||
|
DelUserConversationIDsHash(userID).
|
||||||
|
DelConversationNotNotifyMessageUserIDs(userID).
|
||||||
|
DelUserPinnedConversations(userID)
|
||||||
|
|
||||||
|
return cache.ChainExecDel(ctx)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|||||||
@ -44,4 +44,5 @@ type Conversation interface {
|
|||||||
GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
|
GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
|
||||||
FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error)
|
FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error)
|
||||||
FindRandConversation(ctx context.Context, ts int64, limit int) ([]*model.Conversation, error)
|
FindRandConversation(ctx context.Context, ts int64, limit int) ([]*model.Conversation, error)
|
||||||
|
DeleteUsersConversations(ctx context.Context, userID string, conversationIDs []string) (err error)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -308,3 +308,20 @@ func (c *ConversationMgo) FindRandConversation(ctx context.Context, ts int64, li
|
|||||||
}
|
}
|
||||||
return mongoutil.Aggregate[*model.Conversation](ctx, c.coll, pipeline)
|
return mongoutil.Aggregate[*model.Conversation](ctx, c.coll, pipeline)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *ConversationMgo) DeleteUsersConversations(ctx context.Context, userID string, conversationIDs []string) (err error) {
|
||||||
|
if len(conversationIDs) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return mongoutil.IncrVersion(func() error {
|
||||||
|
err := mongoutil.DeleteMany(ctx, c.coll, bson.M{"owner_user_id": userID, "conversation_id": bson.M{"$in": conversationIDs}})
|
||||||
|
return err
|
||||||
|
}, func() error {
|
||||||
|
for _, conversationID := range conversationIDs {
|
||||||
|
if err := c.version.IncrVersion(ctx, userID, []string{conversationID}, model.VersionStateDelete); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|||||||
@ -73,7 +73,7 @@ func (u *UserMgo) TakeNotification(ctx context.Context, level int64) (user []*mo
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (u *UserMgo) TakeGTEAppManagerLevel(ctx context.Context, level int64) (user []*model.User, err error) {
|
func (u *UserMgo) TakeGTEAppManagerLevel(ctx context.Context, level int64) (user []*model.User, err error) {
|
||||||
return mongoutil.Find[*model.User](ctx, u.coll, bson.M{"app_manager_level": bson.M{"$gte": level}})
|
return mongoutil.Find[*model.User](ctx, u.coll, bson.M{"app_manger_level": bson.M{"$gte": level}})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (u *UserMgo) TakeByNickname(ctx context.Context, nickname string) (user []*model.User, err error) {
|
func (u *UserMgo) TakeByNickname(ctx context.Context, nickname string) (user []*model.User, err error) {
|
||||||
|
|||||||
@ -109,7 +109,7 @@ func GetConversationIDBySessionType(sessionType int, ids ...string) string {
|
|||||||
case constant.ReadGroupChatType:
|
case constant.ReadGroupChatType:
|
||||||
return "sg_" + ids[0] // super group chat
|
return "sg_" + ids[0] // super group chat
|
||||||
case constant.NotificationChatType:
|
case constant.NotificationChatType:
|
||||||
return "sn_" + ids[0] // server notification chat
|
return "sn_" + strings.Join(ids, "_") // server notification chat
|
||||||
}
|
}
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user