mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-06 04:15:46 +08:00
Merge branch 'main' of github.com:openimsdk/open-im-server into v3.8-js-sdk-only
This commit is contained in:
commit
678c23074f
59
.github/workflows/go-build-test.yml
vendored
59
.github/workflows/go-build-test.yml
vendored
@ -89,6 +89,65 @@ jobs:
|
||||
mage start
|
||||
mage check
|
||||
|
||||
go-test:
|
||||
name: Benchmark Test with go ${{ matrix.go_version }} on ${{ matrix.os }}
|
||||
runs-on: ${{ matrix.os }}
|
||||
permissions:
|
||||
contents: write
|
||||
env:
|
||||
SDK_DIR: openim-sdk-core
|
||||
CONFIG_PATH: config/notification.yml
|
||||
# pull-requests: write
|
||||
strategy:
|
||||
matrix:
|
||||
os: [ ubuntu-latest ]
|
||||
go_version: [ "1.22.x" ]
|
||||
|
||||
steps:
|
||||
- name: Checkout Server repository
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Checkout SDK repository
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
repository: 'openimsdk/openim-sdk-core'
|
||||
path: ${{ env.SDK_DIR }}
|
||||
|
||||
- name: Set up Go ${{ matrix.go_version }}
|
||||
uses: actions/setup-go@v5
|
||||
with:
|
||||
go-version: ${{ matrix.go_version }}
|
||||
|
||||
- name: Get Server dependencies
|
||||
run: |
|
||||
go install github.com/magefile/mage@latest
|
||||
go mod download
|
||||
|
||||
- name: Install yq
|
||||
run: |
|
||||
sudo wget https://github.com/mikefarah/yq/releases/download/v4.34.1/yq_linux_amd64 -O /usr/bin/yq
|
||||
sudo chmod +x /usr/bin/yq
|
||||
|
||||
- name: Modify Server Configuration
|
||||
run: |
|
||||
yq e '.groupCreated.unreadCount = true' -i ${{ env.CONFIG_PATH }}
|
||||
yq e '.friendApplicationApproved.unreadCount = true' -i ${{ env.CONFIG_PATH }}
|
||||
|
||||
- name: Start Server Services
|
||||
run: |
|
||||
docker compose up -d
|
||||
mage build
|
||||
mage start
|
||||
mage check
|
||||
|
||||
- name: Build test SDK core
|
||||
run: |
|
||||
cd ${{ env.SDK_DIR }}
|
||||
go mod tidy
|
||||
cd integration_test
|
||||
mkdir data
|
||||
go run main.go -lgr 0.8 -imf -crg -ckgn -ckcon -sem -ckmsn -u 20 -su 5 -lg 2 -cg 2 -cgm 3 -sm 10 -gm 10 -reg
|
||||
|
||||
dockerfile-test:
|
||||
name: Build and Test Dockerfile
|
||||
runs-on: ubuntu-latest
|
||||
|
@ -3,4 +3,4 @@ prometheus:
|
||||
enable: true
|
||||
# List of ports that Prometheus listens on; each port corresponds to an instance of monitoring. Ensure these are managed accordingly
|
||||
# Because four instances have been launched, four ports need to be specified
|
||||
ports: [ 12020, 12021, 12022, 12023, 12024, 12025, 12026, 12027 ]
|
||||
ports: [ 12020, 12021, 12022, 12023, 12024, 12025, 12026, 12027, 12028, 12029, 12030, 12031, 12032, 12033, 12034, 12035 ]
|
||||
|
@ -12,3 +12,5 @@ rpcRegisterName:
|
||||
|
||||
imAdminUserID: [ imAdmin ]
|
||||
|
||||
# 1: For Android, iOS, Windows, Mac, and web platforms, only one instance can be online at a time
|
||||
multiLoginPolicy: 1
|
||||
|
33
go.mod
33
go.mod
@ -3,7 +3,7 @@ module github.com/openimsdk/open-im-server/v3
|
||||
go 1.21.2
|
||||
|
||||
require (
|
||||
firebase.google.com/go v3.13.0+incompatible
|
||||
firebase.google.com/go/v4 v4.14.1
|
||||
github.com/dtm-labs/rockscache v0.1.1
|
||||
github.com/gin-gonic/gin v1.9.1
|
||||
github.com/go-playground/validator/v10 v10.20.0
|
||||
@ -12,13 +12,13 @@ require (
|
||||
github.com/gorilla/websocket v1.5.1
|
||||
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
|
||||
github.com/mitchellh/mapstructure v1.5.0
|
||||
github.com/openimsdk/protocol v0.0.72-alpha.25
|
||||
github.com/openimsdk/protocol v0.0.72-alpha.30
|
||||
github.com/openimsdk/tools v0.0.50-alpha.12
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/prometheus/client_golang v1.18.0
|
||||
github.com/stretchr/testify v1.9.0
|
||||
go.mongodb.org/mongo-driver v1.14.0
|
||||
google.golang.org/api v0.165.0
|
||||
google.golang.org/api v0.170.0
|
||||
google.golang.org/grpc v1.66.2
|
||||
google.golang.org/protobuf v1.34.2
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
@ -47,12 +47,13 @@ require (
|
||||
)
|
||||
|
||||
require (
|
||||
cloud.google.com/go v0.112.0 // indirect
|
||||
cloud.google.com/go v0.112.1 // indirect
|
||||
cloud.google.com/go/compute/metadata v0.3.0 // indirect
|
||||
cloud.google.com/go/firestore v1.14.0 // indirect
|
||||
cloud.google.com/go/iam v1.1.5 // indirect
|
||||
cloud.google.com/go/longrunning v0.5.4 // indirect
|
||||
cloud.google.com/go/storage v1.36.0 // indirect
|
||||
cloud.google.com/go/firestore v1.15.0 // indirect
|
||||
cloud.google.com/go/iam v1.1.7 // indirect
|
||||
cloud.google.com/go/longrunning v0.5.5 // indirect
|
||||
cloud.google.com/go/storage v1.40.0 // indirect
|
||||
github.com/MicahParks/keyfunc v1.9.0 // indirect
|
||||
github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible // indirect
|
||||
github.com/aws/aws-sdk-go-v2 v1.23.1 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.5.1 // indirect
|
||||
@ -102,7 +103,7 @@ require (
|
||||
github.com/google/go-querystring v1.1.0 // indirect
|
||||
github.com/google/s2a-go v0.1.7 // indirect
|
||||
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
|
||||
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
|
||||
github.com/googleapis/gax-go/v2 v2.12.3 // indirect
|
||||
github.com/hashicorp/errwrap v1.1.0 // indirect
|
||||
github.com/hashicorp/go-multierror v1.1.1 // indirect
|
||||
github.com/hashicorp/go-uuid v1.0.3 // indirect
|
||||
@ -164,11 +165,11 @@ require (
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.13 // indirect
|
||||
go.etcd.io/etcd/client/v3 v3.5.13 // indirect
|
||||
go.opencensus.io v0.24.0 // indirect
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0 // indirect
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0 // indirect
|
||||
go.opentelemetry.io/otel v1.23.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.23.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.23.0 // indirect
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect
|
||||
go.opentelemetry.io/otel v1.24.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.24.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.24.0 // indirect
|
||||
go.uber.org/atomic v1.9.0 // indirect
|
||||
go.uber.org/multierr v1.11.0 // indirect
|
||||
golang.org/x/arch v0.7.0 // indirect
|
||||
@ -178,8 +179,8 @@ require (
|
||||
golang.org/x/sys v0.25.0 // indirect
|
||||
golang.org/x/text v0.18.0 // indirect
|
||||
golang.org/x/time v0.5.0 // indirect
|
||||
google.golang.org/appengine v1.6.8 // indirect
|
||||
google.golang.org/genproto v0.0.0-20240125205218-1f4bbc51befe // indirect
|
||||
google.golang.org/appengine/v2 v2.0.2 // indirect
|
||||
google.golang.org/genproto v0.0.0-20240213162025-012b6fc9bca9 // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20240604185151-ef581f913117 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
|
||||
gorm.io/gorm v1.25.8 // indirect
|
||||
|
84
go.sum
84
go.sum
@ -1,21 +1,23 @@
|
||||
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
|
||||
cloud.google.com/go v0.112.0 h1:tpFCD7hpHFlQ8yPwT3x+QeXqc2T6+n6T+hmABHfDUSM=
|
||||
cloud.google.com/go v0.112.0/go.mod h1:3jEEVwZ/MHU4djK5t5RHuKOA/GbLddgTdVubX1qnPD4=
|
||||
cloud.google.com/go v0.112.1 h1:uJSeirPke5UNZHIb4SxfZklVSiWWVqW4oXlETwZziwM=
|
||||
cloud.google.com/go v0.112.1/go.mod h1:+Vbu+Y1UU+I1rjmzeMOb/8RfkKJK2Gyxi1X6jJCZLo4=
|
||||
cloud.google.com/go/compute/metadata v0.3.0 h1:Tz+eQXMEqDIKRsmY3cHTL6FVaynIjX2QxYC4trgAKZc=
|
||||
cloud.google.com/go/compute/metadata v0.3.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k=
|
||||
cloud.google.com/go/firestore v1.14.0 h1:8aLcKnMPoldYU3YHgu4t2exrKhLQkqaXAGqT0ljrFVw=
|
||||
cloud.google.com/go/firestore v1.14.0/go.mod h1:96MVaHLsEhbvkBEdZgfN+AS/GIkco1LRpH9Xp9YZfzQ=
|
||||
cloud.google.com/go/iam v1.1.5 h1:1jTsCu4bcsNsE4iiqNT5SHwrDRCfRmIaaaVFhRveTJI=
|
||||
cloud.google.com/go/iam v1.1.5/go.mod h1:rB6P/Ic3mykPbFio+vo7403drjlgvoWfYpJhMXEbzv8=
|
||||
cloud.google.com/go/longrunning v0.5.4 h1:w8xEcbZodnA2BbW6sVirkkoC+1gP8wS57EUUgGS0GVg=
|
||||
cloud.google.com/go/longrunning v0.5.4/go.mod h1:zqNVncI0BOP8ST6XQD1+VcvuShMmq7+xFSzOL++V0dI=
|
||||
cloud.google.com/go/storage v1.36.0 h1:P0mOkAcaJxhCTvAkMhxMfrTKiNcub4YmmPBtlhAyTr8=
|
||||
cloud.google.com/go/storage v1.36.0/go.mod h1:M6M/3V/D3KpzMTJyPOR/HU6n2Si5QdaXYEsng2xgOs8=
|
||||
firebase.google.com/go v3.13.0+incompatible h1:3TdYC3DDi6aHn20qoRkxwGqNgdjtblwVAyRLQwGn/+4=
|
||||
firebase.google.com/go v3.13.0+incompatible/go.mod h1:xlah6XbEyW6tbfSklcfe5FHJIwjt8toICdV5Wh9ptHs=
|
||||
cloud.google.com/go/firestore v1.15.0 h1:/k8ppuWOtNuDHt2tsRV42yI21uaGnKDEQnRFeBpbFF8=
|
||||
cloud.google.com/go/firestore v1.15.0/go.mod h1:GWOxFXcv8GZUtYpWHw/w6IuYNux/BtmeVTMmjrm4yhk=
|
||||
cloud.google.com/go/iam v1.1.7 h1:z4VHOhwKLF/+UYXAJDFwGtNF0b6gjsW1Pk9Ml0U/IoM=
|
||||
cloud.google.com/go/iam v1.1.7/go.mod h1:J4PMPg8TtyurAUvSmPj8FF3EDgY1SPRZxcUGrn7WXGA=
|
||||
cloud.google.com/go/longrunning v0.5.5 h1:GOE6pZFdSrTb4KAiKnXsJBtlE6mEyaW44oKyMILWnOg=
|
||||
cloud.google.com/go/longrunning v0.5.5/go.mod h1:WV2LAxD8/rg5Z1cNW6FJ/ZpX4E4VnDnoTk0yawPBB7s=
|
||||
cloud.google.com/go/storage v1.40.0 h1:VEpDQV5CJxFmJ6ueWNsKxcr1QAYOXEgxDa+sBbJahPw=
|
||||
cloud.google.com/go/storage v1.40.0/go.mod h1:Rrj7/hKlG87BLqDJYtwR0fbPld8uJPbQ2ucUMY7Ir0g=
|
||||
firebase.google.com/go/v4 v4.14.1 h1:4qiUETaFRWoFGE1XP5VbcEdtPX93Qs+8B/7KvP2825g=
|
||||
firebase.google.com/go/v4 v4.14.1/go.mod h1:fgk2XshgNDEKaioKco+AouiegSI9oTWVqRaBdTTGBoM=
|
||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||
github.com/IBM/sarama v1.43.0 h1:YFFDn8mMI2QL0wOrG0J2sFoVIAFl7hS9JQi2YZsXtJc=
|
||||
github.com/IBM/sarama v1.43.0/go.mod h1:zlE6HEbC/SMQ9mhEYaF7nNLYOUyrs0obySKCckWP9BM=
|
||||
github.com/MicahParks/keyfunc v1.9.0 h1:lhKd5xrFHLNOWrDc4Tyb/Q1AJ4LCzQ48GVJyVIID3+o=
|
||||
github.com/MicahParks/keyfunc v1.9.0/go.mod h1:IdnCilugA0O/99dW+/MkvlyrsX8+L8+x95xuVNtM5jw=
|
||||
github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM=
|
||||
github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible h1:8psS8a+wKfiLt1iVDX79F7Y6wUM49Lcha2FMXt4UM8g=
|
||||
github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8=
|
||||
@ -107,8 +109,6 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF
|
||||
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
|
||||
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
|
||||
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
|
||||
github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A=
|
||||
github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew=
|
||||
github.com/fatih/color v1.14.1 h1:qfhVLaG5s+nCROl1zJsZRxFeYrHLqWroPOQ8BWiNb4w=
|
||||
github.com/fatih/color v1.14.1/go.mod h1:2oHN61fhTpgcxD3TSWCgKDiH1+x4OiDVVGH8WlgGZGg=
|
||||
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
|
||||
@ -159,6 +159,7 @@ github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MG
|
||||
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
|
||||
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
|
||||
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
|
||||
github.com/golang-jwt/jwt/v4 v4.4.2/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0=
|
||||
github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOWzg=
|
||||
github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0=
|
||||
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
|
||||
@ -167,6 +168,7 @@ github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l
|
||||
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
|
||||
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
|
||||
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
|
||||
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
|
||||
@ -175,8 +177,6 @@ github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:W
|
||||
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
|
||||
github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8=
|
||||
github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
|
||||
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
|
||||
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
|
||||
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
|
||||
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
|
||||
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
|
||||
@ -188,7 +188,6 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
|
||||
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
||||
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck=
|
||||
@ -205,8 +204,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/googleapis/enterprise-certificate-proxy v0.3.2 h1:Vie5ybvEvT75RniqhfFxPRy3Bf7vr3h0cechB90XaQs=
|
||||
github.com/googleapis/enterprise-certificate-proxy v0.3.2/go.mod h1:VLSiSSBs/ksPL8kq3OBOQ6WRI2QnaFynd1DCjZ62+V0=
|
||||
github.com/googleapis/gax-go/v2 v2.12.0 h1:A+gCJKdRfqXkr+BIRGtZLibNXf0m1f9E4HG56etFpas=
|
||||
github.com/googleapis/gax-go/v2 v2.12.0/go.mod h1:y+aIqrI5eb1YGMVJfuV3185Ts/D7qKpsEkdD5+I6QGU=
|
||||
github.com/googleapis/gax-go/v2 v2.12.3 h1:5/zPPDvw8Q1SuXjrqrZslrqT7dL/uJT2CQii/cLCKqA=
|
||||
github.com/googleapis/gax-go/v2 v2.12.3/go.mod h1:AKloxT6GtNbaLm8QTNSidHUVsHYcBHwWRvkNFJUQcS4=
|
||||
github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4=
|
||||
github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM=
|
||||
github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY=
|
||||
@ -322,8 +321,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
|
||||
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
|
||||
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
|
||||
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
|
||||
github.com/openimsdk/protocol v0.0.72-alpha.25 h1:W8E6gnwt5V6anr/8lYOf5v/Lcsggf7gIAzJbw7YU6So=
|
||||
github.com/openimsdk/protocol v0.0.72-alpha.25/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
|
||||
github.com/openimsdk/protocol v0.0.72-alpha.30 h1:LBIqDzD55cSQy3wX8fgSa3blz8+Cv54ae96/qUMINwM=
|
||||
github.com/openimsdk/protocol v0.0.72-alpha.30/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
|
||||
github.com/openimsdk/tools v0.0.50-alpha.12 h1:rV3BxgqN+F79vZvdoQ+97Eob8ScsRVEM8D+Wrcl23uo=
|
||||
github.com/openimsdk/tools v0.0.50-alpha.12/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
|
||||
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
|
||||
@ -436,18 +435,18 @@ go.mongodb.org/mongo-driver v1.14.0 h1:P98w8egYRjYe3XDjxhYJagTokP/H6HzlsnojRgZRd
|
||||
go.mongodb.org/mongo-driver v1.14.0/go.mod h1:Vzb0Mk/pa7e6cWw85R4F/endUC3u0U9jGcNU603k65c=
|
||||
go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
|
||||
go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo=
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0 h1:UNQQKPfTDe1J81ViolILjTKPr9WetKW6uei2hFgJmFs=
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0/go.mod h1:r9vWsPS/3AQItv3OSlEJ/E4mbrhUbbw18meOjArPtKQ=
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0 h1:sv9kVfal0MK0wBMCOGr+HeJm9v803BkJxGrk2au7j08=
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0/go.mod h1:SK2UL73Zy1quvRPonmOmRDiWk1KBV3LyIeeIxcEApWw=
|
||||
go.opentelemetry.io/otel v1.23.0 h1:Df0pqjqExIywbMCMTxkAwzjLZtRf+bBKLbUcpxO2C9E=
|
||||
go.opentelemetry.io/otel v1.23.0/go.mod h1:YCycw9ZeKhcJFrb34iVSkyT0iczq/zYDtZYFufObyB0=
|
||||
go.opentelemetry.io/otel/metric v1.23.0 h1:pazkx7ss4LFVVYSxYew7L5I6qvLXHA0Ap2pwV+9Cnpo=
|
||||
go.opentelemetry.io/otel/metric v1.23.0/go.mod h1:MqUW2X2a6Q8RN96E2/nqNoT+z9BSms20Jb7Bbp+HiTo=
|
||||
go.opentelemetry.io/otel/sdk v1.21.0 h1:FTt8qirL1EysG6sTQRZ5TokkU8d0ugCj8htOgThZXQ8=
|
||||
go.opentelemetry.io/otel/sdk v1.21.0/go.mod h1:Nna6Yv7PWTdgJHVRD9hIYywQBRx7pbox6nwBnZIxl/E=
|
||||
go.opentelemetry.io/otel/trace v1.23.0 h1:37Ik5Ib7xfYVb4V1UtnT97T1jI+AoIYkJyPkuL4iJgI=
|
||||
go.opentelemetry.io/otel/trace v1.23.0/go.mod h1:GSGTbIClEsuZrGIzoEHqsVfxgn5UkggkflQwDScNUsk=
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 h1:4Pp6oUg3+e/6M4C0A/3kJ2VYa++dsWVTtGgLVj5xtHg=
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0/go.mod h1:Mjt1i1INqiaoZOMGR1RIUJN+i3ChKoFRqzrRQhlkbs0=
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 h1:jq9TW8u3so/bN+JPT166wjOI6/vQPF6Xe7nMNIltagk=
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0/go.mod h1:p8pYQP+m5XfbZm9fxtSKAbM6oIllS7s2AfxrChvc7iw=
|
||||
go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo=
|
||||
go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo=
|
||||
go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGXlc88kI=
|
||||
go.opentelemetry.io/otel/metric v1.24.0/go.mod h1:VYhLe1rFfxuTXLgj4CBiyz+9WYBA8pNGJgDcSFRKBco=
|
||||
go.opentelemetry.io/otel/sdk v1.22.0 h1:6coWHw9xw7EfClIC/+O31R8IY3/+EiRFHevmHafB2Gw=
|
||||
go.opentelemetry.io/otel/sdk v1.22.0/go.mod h1:iu7luyVGYovrRpe2fmj3CVKouQNdTOkxtLzPvPz1DOc=
|
||||
go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI=
|
||||
go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU=
|
||||
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
|
||||
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
|
||||
go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8=
|
||||
@ -492,6 +491,7 @@ golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLL
|
||||
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
|
||||
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
|
||||
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
|
||||
golang.org/x/net v0.0.0-20220708220712-1185a9018129/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
|
||||
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
|
||||
golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco=
|
||||
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
|
||||
@ -554,19 +554,19 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
|
||||
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk=
|
||||
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
|
||||
google.golang.org/api v0.165.0 h1:zd5d4JIIIaYYsfVy1HzoXYZ9rWCSBxxAglbczzo7Bgc=
|
||||
google.golang.org/api v0.165.0/go.mod h1:2OatzO7ZDQsoS7IFf3rvsE17/TldiU3F/zxFHeqUB5o=
|
||||
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 h1:+cNy6SZtPcJQH3LJVLOSmiC7MMxXNOb3PU/VUEz+EhU=
|
||||
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90=
|
||||
google.golang.org/api v0.170.0 h1:zMaruDePM88zxZBG+NG8+reALO2rfLhe/JShitLyT48=
|
||||
google.golang.org/api v0.170.0/go.mod h1:/xql9M2btF85xac/VAm4PsLMTLVGUOpq4BE9R8jyNy8=
|
||||
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
|
||||
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
|
||||
google.golang.org/appengine v1.6.8 h1:IhEN5q69dyKagZPYMSdIjS2HqprW324FRQZJcGqPAsM=
|
||||
google.golang.org/appengine v1.6.8/go.mod h1:1jJ3jBArFh5pcgW8gCtRJnepW8FzD1V44FJffLiz/Ds=
|
||||
google.golang.org/appengine/v2 v2.0.2 h1:MSqyWy2shDLwG7chbwBJ5uMyw6SNqJzhJHNDwYB0Akk=
|
||||
google.golang.org/appengine/v2 v2.0.2/go.mod h1:PkgRUWz4o1XOvbqtWTkBtCitEJ5Tp4HoVEdMMYQR/8E=
|
||||
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
|
||||
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
|
||||
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
|
||||
google.golang.org/genproto v0.0.0-20240125205218-1f4bbc51befe h1:USL2DhxfgRchafRvt/wYyyQNzwgL7ZiURcozOE/Pkvo=
|
||||
google.golang.org/genproto v0.0.0-20240125205218-1f4bbc51befe/go.mod h1:cc8bqMqtv9gMOr0zHg2Vzff5ULhhL2IXP4sbcn32Dro=
|
||||
google.golang.org/genproto v0.0.0-20240213162025-012b6fc9bca9 h1:9+tzLLstTlPTRyJTh+ah5wIMsBW5c4tQwGTN3thOW9Y=
|
||||
google.golang.org/genproto v0.0.0-20240213162025-012b6fc9bca9/go.mod h1:mqHbVIp48Muh7Ywss/AD6I5kNVKZMmAa/QEW58Gxp2s=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20240604185151-ef581f913117 h1:+rdxYoE3E5htTEWIe15GlN6IfvbURM//Jt0mmkmm6ZU=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20240604185151-ef581f913117/go.mod h1:OimBR/bc1wPO9iV4NC2bpyjy3VnAwZh5EBPQdtaE5oo=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 h1:pPJltXNxVzT4pK9yD8vR9X75DaWYYmLGMsEvBfFQZzQ=
|
||||
@ -587,8 +587,6 @@ google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
|
||||
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
|
||||
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
|
||||
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
|
||||
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
|
||||
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
|
||||
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
|
||||
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
|
@ -62,3 +62,11 @@ func (o *ConversationApi) GetIncrementalConversation(c *gin.Context) {
|
||||
func (o *ConversationApi) GetOwnerConversation(c *gin.Context) {
|
||||
a2r.Call(conversation.ConversationClient.GetOwnerConversation, o.Client, c)
|
||||
}
|
||||
|
||||
func (o *ConversationApi) GetNotNotifyConversationIDs(c *gin.Context) {
|
||||
a2r.Call(conversation.ConversationClient.GetNotNotifyConversationIDs, o.Client, c)
|
||||
}
|
||||
|
||||
func (o *ConversationApi) GetPinnedConversationIDs(c *gin.Context) {
|
||||
a2r.Call(conversation.ConversationClient.GetPinnedConversationIDs, o.Client, c)
|
||||
}
|
||||
|
@ -72,6 +72,10 @@ func (o *FriendApi) GetPaginationBlacks(c *gin.Context) {
|
||||
a2r.Call(relation.FriendClient.GetPaginationBlacks, o.Client, c)
|
||||
}
|
||||
|
||||
func (o *FriendApi) GetSpecifiedBlacks(c *gin.Context) {
|
||||
a2r.Call(relation.FriendClient.GetSpecifiedBlacks, o.Client, c)
|
||||
}
|
||||
|
||||
func (o *FriendApi) RemoveBlack(c *gin.Context) {
|
||||
a2r.Call(relation.FriendClient.RemoveBlack, o.Client, c)
|
||||
}
|
||||
|
@ -67,6 +67,10 @@ func (o *GroupApi) GetGroupUsersReqApplicationList(c *gin.Context) {
|
||||
a2r.Call(group.GroupClient.GetGroupUsersReqApplicationList, o.Client, c)
|
||||
}
|
||||
|
||||
func (o *GroupApi) GetSpecifiedUserGroupRequestInfo(c *gin.Context) {
|
||||
a2r.Call(group.GroupClient.GetSpecifiedUserGroupRequestInfo, o.Client, c)
|
||||
}
|
||||
|
||||
func (o *GroupApi) GetGroupsInfo(c *gin.Context) {
|
||||
a2r.Call(group.GroupClient.GetGroupsInfo, o.Client, c)
|
||||
//a2r.Call(group.GroupClient.GetGroupsInfo, o.Client, c, a2r.NewNilReplaceOption(group.GroupClient.GetGroupsInfo))
|
||||
|
204
internal/api/jssdk/jssdk.go
Normal file
204
internal/api/jssdk/jssdk.go
Normal file
@ -0,0 +1,204 @@
|
||||
package jssdk
|
||||
|
||||
import (
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/openimsdk/protocol/conversation"
|
||||
"github.com/openimsdk/protocol/msg"
|
||||
"github.com/openimsdk/protocol/sdkws"
|
||||
"github.com/openimsdk/tools/a2r"
|
||||
"github.com/openimsdk/tools/mcontext"
|
||||
"github.com/openimsdk/tools/utils/datautil"
|
||||
"sort"
|
||||
)
|
||||
|
||||
const (
|
||||
maxGetActiveConversation = 500
|
||||
defaultGetActiveConversation = 100
|
||||
)
|
||||
|
||||
func NewJSSdkApi(msg msg.MsgClient, conv conversation.ConversationClient) *JSSdk {
|
||||
return &JSSdk{
|
||||
msg: msg,
|
||||
conv: conv,
|
||||
}
|
||||
}
|
||||
|
||||
type JSSdk struct {
|
||||
msg msg.MsgClient
|
||||
conv conversation.ConversationClient
|
||||
}
|
||||
|
||||
func (x *JSSdk) GetActiveConversations(c *gin.Context) {
|
||||
call(c, x.getActiveConversations)
|
||||
}
|
||||
|
||||
func (x *JSSdk) GetConversations(c *gin.Context) {
|
||||
call(c, x.getConversations)
|
||||
}
|
||||
|
||||
func (x *JSSdk) getActiveConversations(ctx *gin.Context) (*ConversationsResp, error) {
|
||||
req, err := a2r.ParseRequest[ActiveConversationsReq](ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if req.Count <= 0 || req.Count > maxGetActiveConversation {
|
||||
req.Count = defaultGetActiveConversation
|
||||
}
|
||||
opUserID := mcontext.GetOpUserID(ctx)
|
||||
conversationIDs, err := field(ctx, x.conv.GetConversationIDs,
|
||||
&conversation.GetConversationIDsReq{UserID: opUserID}, (*conversation.GetConversationIDsResp).GetConversationIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(conversationIDs) == 0 {
|
||||
return &ConversationsResp{}, nil
|
||||
}
|
||||
readSeq, err := field(ctx, x.msg.GetHasReadSeqs,
|
||||
&msg.GetHasReadSeqsReq{UserID: opUserID, ConversationIDs: conversationIDs}, (*msg.SeqsInfoResp).GetMaxSeqs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
activeConversation, err := field(ctx, x.msg.GetActiveConversation,
|
||||
&msg.GetActiveConversationReq{ConversationIDs: conversationIDs}, (*msg.GetActiveConversationResp).GetConversations)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(activeConversation) == 0 {
|
||||
return &ConversationsResp{}, nil
|
||||
}
|
||||
sortConversations := sortActiveConversations{
|
||||
Conversation: activeConversation,
|
||||
}
|
||||
if len(activeConversation) > 1 {
|
||||
pinnedConversationIDs, err := field(ctx, x.conv.GetPinnedConversationIDs,
|
||||
&conversation.GetPinnedConversationIDsReq{UserID: opUserID}, (*conversation.GetPinnedConversationIDsResp).GetConversationIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sortConversations.PinnedConversationIDs = datautil.SliceSet(pinnedConversationIDs)
|
||||
}
|
||||
sort.Sort(&sortConversations)
|
||||
sortList := sortConversations.Top(req.Count)
|
||||
conversations, err := field(ctx, x.conv.GetConversations,
|
||||
&conversation.GetConversationsReq{
|
||||
OwnerUserID: opUserID,
|
||||
ConversationIDs: datautil.Slice(sortList, func(c *msg.ActiveConversation) string {
|
||||
return c.ConversationID
|
||||
})}, (*conversation.GetConversationsResp).GetConversations)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
msgs, err := field(ctx, x.msg.GetSeqMessage,
|
||||
&msg.GetSeqMessageReq{
|
||||
UserID: opUserID,
|
||||
Conversations: datautil.Slice(sortList, func(c *msg.ActiveConversation) *msg.ConversationSeqs {
|
||||
return &msg.ConversationSeqs{
|
||||
ConversationID: c.ConversationID,
|
||||
Seqs: []int64{c.MaxSeq},
|
||||
}
|
||||
}),
|
||||
}, (*msg.GetSeqMessageResp).GetMsgs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conversationMap := datautil.SliceToMap(conversations, func(c *conversation.Conversation) string {
|
||||
return c.ConversationID
|
||||
})
|
||||
resp := make([]ConversationMsg, 0, len(sortList))
|
||||
for _, c := range sortList {
|
||||
conv, ok := conversationMap[c.ConversationID]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
var lastMsg *sdkws.MsgData
|
||||
if msgList, ok := msgs[c.ConversationID]; ok && len(msgList.Msgs) > 0 {
|
||||
lastMsg = msgList.Msgs[0]
|
||||
}
|
||||
resp = append(resp, ConversationMsg{
|
||||
Conversation: conv,
|
||||
LastMsg: lastMsg,
|
||||
MaxSeq: c.MaxSeq,
|
||||
ReadSeq: readSeq[c.ConversationID],
|
||||
})
|
||||
}
|
||||
var unreadCount int64
|
||||
for _, c := range activeConversation {
|
||||
count := c.MaxSeq - readSeq[c.ConversationID]
|
||||
if count > 0 {
|
||||
unreadCount += count
|
||||
}
|
||||
}
|
||||
return &ConversationsResp{
|
||||
Conversations: resp,
|
||||
UnreadCount: unreadCount,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (x *JSSdk) getConversations(ctx *gin.Context) (*ConversationsResp, error) {
|
||||
req, err := a2r.ParseRequest[conversation.GetConversationsReq](ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
req.OwnerUserID = mcontext.GetOpUserID(ctx)
|
||||
conversations, err := field(ctx, x.conv.GetConversations, req, (*conversation.GetConversationsResp).GetConversations)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(conversations) == 0 {
|
||||
return &ConversationsResp{}, nil
|
||||
}
|
||||
req.ConversationIDs = datautil.Slice(conversations, func(c *conversation.Conversation) string {
|
||||
return c.ConversationID
|
||||
})
|
||||
maxSeqs, err := field(ctx, x.msg.GetMaxSeqs,
|
||||
&msg.GetMaxSeqsReq{ConversationIDs: req.ConversationIDs}, (*msg.SeqsInfoResp).GetMaxSeqs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
readSeqs, err := field(ctx, x.msg.GetHasReadSeqs,
|
||||
&msg.GetHasReadSeqsReq{UserID: req.OwnerUserID, ConversationIDs: req.ConversationIDs}, (*msg.SeqsInfoResp).GetMaxSeqs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conversationSeqs := make([]*msg.ConversationSeqs, 0, len(conversations))
|
||||
for _, c := range conversations {
|
||||
if seq := maxSeqs[c.ConversationID]; seq > 0 {
|
||||
conversationSeqs = append(conversationSeqs, &msg.ConversationSeqs{
|
||||
ConversationID: c.ConversationID,
|
||||
Seqs: []int64{seq},
|
||||
})
|
||||
}
|
||||
}
|
||||
var msgs map[string]*sdkws.PullMsgs
|
||||
if len(conversationSeqs) > 0 {
|
||||
msgs, err = field(ctx, x.msg.GetSeqMessage,
|
||||
&msg.GetSeqMessageReq{UserID: req.OwnerUserID, Conversations: conversationSeqs}, (*msg.GetSeqMessageResp).GetMsgs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
resp := make([]ConversationMsg, 0, len(conversations))
|
||||
for _, c := range conversations {
|
||||
var lastMsg *sdkws.MsgData
|
||||
if msgList, ok := msgs[c.ConversationID]; ok && len(msgList.Msgs) > 0 {
|
||||
lastMsg = msgList.Msgs[0]
|
||||
}
|
||||
resp = append(resp, ConversationMsg{
|
||||
Conversation: c,
|
||||
LastMsg: lastMsg,
|
||||
MaxSeq: maxSeqs[c.ConversationID],
|
||||
ReadSeq: readSeqs[c.ConversationID],
|
||||
})
|
||||
}
|
||||
var unreadCount int64
|
||||
for conversationID, maxSeq := range maxSeqs {
|
||||
count := maxSeq - readSeqs[conversationID]
|
||||
if count > 0 {
|
||||
unreadCount += count
|
||||
}
|
||||
}
|
||||
return &ConversationsResp{
|
||||
Conversations: resp,
|
||||
UnreadCount: unreadCount,
|
||||
}, nil
|
||||
}
|
33
internal/api/jssdk/sort.go
Normal file
33
internal/api/jssdk/sort.go
Normal file
@ -0,0 +1,33 @@
|
||||
package jssdk
|
||||
|
||||
import "github.com/openimsdk/protocol/msg"
|
||||
|
||||
type sortActiveConversations struct {
|
||||
Conversation []*msg.ActiveConversation
|
||||
PinnedConversationIDs map[string]struct{}
|
||||
}
|
||||
|
||||
func (s sortActiveConversations) Top(limit int) []*msg.ActiveConversation {
|
||||
if limit > 0 && len(s.Conversation) > limit {
|
||||
return s.Conversation[:limit]
|
||||
}
|
||||
return s.Conversation
|
||||
}
|
||||
|
||||
func (s sortActiveConversations) Len() int {
|
||||
return len(s.Conversation)
|
||||
}
|
||||
|
||||
func (s sortActiveConversations) Less(i, j int) bool {
|
||||
iv, jv := s.Conversation[i], s.Conversation[j]
|
||||
_, ip := s.PinnedConversationIDs[iv.ConversationID]
|
||||
_, jp := s.PinnedConversationIDs[jv.ConversationID]
|
||||
if ip != jp {
|
||||
return ip
|
||||
}
|
||||
return iv.LastTime > jv.LastTime
|
||||
}
|
||||
|
||||
func (s sortActiveConversations) Swap(i, j int) {
|
||||
s.Conversation[i], s.Conversation[j] = s.Conversation[j], s.Conversation[i]
|
||||
}
|
22
internal/api/jssdk/stu.go
Normal file
22
internal/api/jssdk/stu.go
Normal file
@ -0,0 +1,22 @@
|
||||
package jssdk
|
||||
|
||||
import (
|
||||
"github.com/openimsdk/protocol/conversation"
|
||||
"github.com/openimsdk/protocol/sdkws"
|
||||
)
|
||||
|
||||
type ActiveConversationsReq struct {
|
||||
Count int `json:"count"`
|
||||
}
|
||||
|
||||
type ConversationMsg struct {
|
||||
Conversation *conversation.Conversation `json:"conversation"`
|
||||
LastMsg *sdkws.MsgData `json:"lastMsg"`
|
||||
MaxSeq int64 `json:"maxSeq"`
|
||||
ReadSeq int64 `json:"readSeq"`
|
||||
}
|
||||
|
||||
type ConversationsResp struct {
|
||||
UnreadCount int64 `json:"unreadCount"`
|
||||
Conversations []ConversationMsg `json:"conversations"`
|
||||
}
|
26
internal/api/jssdk/tools.go
Normal file
26
internal/api/jssdk/tools.go
Normal file
@ -0,0 +1,26 @@
|
||||
package jssdk
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/openimsdk/tools/apiresp"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
func field[A, B, C any](ctx context.Context, fn func(ctx context.Context, req *A, opts ...grpc.CallOption) (*B, error), req *A, get func(*B) C) (C, error) {
|
||||
resp, err := fn(ctx, req)
|
||||
if err != nil {
|
||||
var c C
|
||||
return c, err
|
||||
}
|
||||
return get(resp), nil
|
||||
}
|
||||
|
||||
func call[R any](c *gin.Context, fn func(ctx *gin.Context) (R, error)) {
|
||||
resp, err := fn(c)
|
||||
if err != nil {
|
||||
apiresp.GinError(c, err)
|
||||
return
|
||||
}
|
||||
apiresp.GinSuccess(c, resp)
|
||||
}
|
37
internal/api/jssdk_test.go
Normal file
37
internal/api/jssdk_test.go
Normal file
@ -0,0 +1,37 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"github.com/openimsdk/protocol/msg"
|
||||
"sort"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestName(t *testing.T) {
|
||||
val := sortActiveConversations{
|
||||
Conversation: []*msg.ActiveConversation{
|
||||
{
|
||||
ConversationID: "100",
|
||||
LastTime: 100,
|
||||
},
|
||||
{
|
||||
ConversationID: "200",
|
||||
LastTime: 200,
|
||||
},
|
||||
{
|
||||
ConversationID: "300",
|
||||
LastTime: 300,
|
||||
},
|
||||
{
|
||||
ConversationID: "400",
|
||||
LastTime: 400,
|
||||
},
|
||||
},
|
||||
//PinnedConversationIDs: map[string]struct{}{
|
||||
// "100": {},
|
||||
// "300": {},
|
||||
//},
|
||||
}
|
||||
sort.Sort(&val)
|
||||
t.Log(val)
|
||||
|
||||
}
|
@ -2,6 +2,8 @@ package api
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/openimsdk/open-im-server/v3/internal/api/jssdk"
|
||||
|
||||
"github.com/gin-contrib/gzip"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
@ -74,6 +76,7 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En
|
||||
r.Use(prommetricsGin(), gin.Recovery(), mw.CorsHandler(), mw.GinParseOperationID(), GinParseToken(authRpc))
|
||||
u := NewUserApi(*userRpc)
|
||||
m := NewMessageApi(messageRpc, userRpc, config.Share.IMAdminUserID)
|
||||
j := jssdk.NewJSSdkApi(messageRpc.Client, conversationRpc.Client)
|
||||
userRouterGroup := r.Group("/user")
|
||||
{
|
||||
userRouterGroup.POST("/user_register", u.UserRegister)
|
||||
@ -115,6 +118,7 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En
|
||||
friendRouterGroup.POST("/set_friend_remark", f.SetFriendRemark)
|
||||
friendRouterGroup.POST("/add_black", f.AddBlack)
|
||||
friendRouterGroup.POST("/get_black_list", f.GetPaginationBlacks)
|
||||
friendRouterGroup.POST("/get_specified_blacks", f.GetSpecifiedBlacks)
|
||||
friendRouterGroup.POST("/remove_black", f.RemoveBlack)
|
||||
friendRouterGroup.POST("/get_incremental_blacks", f.GetIncrementalBlacks)
|
||||
friendRouterGroup.POST("/import_friend", f.ImportFriends)
|
||||
@ -138,6 +142,7 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En
|
||||
groupRouterGroup.POST("/get_recv_group_applicationList", g.GetRecvGroupApplicationList)
|
||||
groupRouterGroup.POST("/get_user_req_group_applicationList", g.GetUserReqGroupApplicationList)
|
||||
groupRouterGroup.POST("/get_group_users_req_application_list", g.GetGroupUsersReqApplicationList)
|
||||
groupRouterGroup.POST("/get_specified_user_group_request_info", g.GetSpecifiedUserGroupRequestInfo)
|
||||
groupRouterGroup.POST("/get_groups_info", g.GetGroupsInfo)
|
||||
groupRouterGroup.POST("/kick_group", g.KickGroupMember)
|
||||
groupRouterGroup.POST("/get_group_members_info", g.GetGroupMembersInfo)
|
||||
@ -230,6 +235,8 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En
|
||||
conversationGroup.POST("/get_full_conversation_ids", c.GetFullOwnerConversationIDs)
|
||||
conversationGroup.POST("/get_incremental_conversations", c.GetIncrementalConversation)
|
||||
conversationGroup.POST("/get_owner_conversation", c.GetOwnerConversation)
|
||||
conversationGroup.POST("/get_not_notify_conversation_ids", c.GetNotNotifyConversationIDs)
|
||||
conversationGroup.POST("/get_pinned_conversation_ids", c.GetPinnedConversationIDs)
|
||||
}
|
||||
|
||||
statisticsGroup := r.Group("/statistics")
|
||||
@ -239,6 +246,11 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En
|
||||
statisticsGroup.POST("/group/create", g.GroupCreateCount)
|
||||
statisticsGroup.POST("/group/active", m.GetActiveGroup)
|
||||
}
|
||||
|
||||
jssdk := r.Group("/jssdk")
|
||||
jssdk.POST("/get_conversations", j.GetConversations)
|
||||
jssdk.POST("/get_active_conversations", j.GetActiveConversations)
|
||||
|
||||
return r
|
||||
}
|
||||
|
||||
@ -275,7 +287,6 @@ func GinParseToken(authRPC *rpcclient.Auth) gin.HandlerFunc {
|
||||
|
||||
// Whitelist api not parse token
|
||||
var Whitelist = []string{
|
||||
"/user/user_register",
|
||||
"/auth/user_token",
|
||||
"/auth/parse_token",
|
||||
}
|
||||
|
@ -321,7 +321,7 @@ func (ws *WsServer) KickUserConn(client *Client) error {
|
||||
}
|
||||
|
||||
func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Client, newClient *Client) {
|
||||
switch ws.msgGatewayConfig.MsgGateway.MultiLoginPolicy {
|
||||
switch ws.msgGatewayConfig.Share.MultiLoginPolicy {
|
||||
case constant.DefalutNotKick:
|
||||
case constant.PCAndOther:
|
||||
if constant.PlatformIDToClass(newClient.PlatformID) == constant.TerminalPC {
|
||||
|
@ -22,8 +22,8 @@ import (
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
firebase "firebase.google.com/go"
|
||||
"firebase.google.com/go/messaging"
|
||||
firebase "firebase.google.com/go/v4"
|
||||
"firebase.google.com/go/v4/messaging"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
|
||||
"github.com/openimsdk/protocol/constant"
|
||||
@ -99,7 +99,7 @@ func (f *Fcm) Push(ctx context.Context, userIDs []string, title, content string,
|
||||
apns := &messaging.APNSConfig{Payload: &messaging.APNSPayload{Aps: &messaging.Aps{Sound: opts.IOSPushSound}}}
|
||||
messageCount := len(messages)
|
||||
if messageCount >= SinglePushCountLimit {
|
||||
response, err := f.fcmMsgCli.SendAll(ctx, messages)
|
||||
response, err := f.fcmMsgCli.SendEach(ctx, messages)
|
||||
if err != nil {
|
||||
Fail = Fail + messageCount
|
||||
// Record push error
|
||||
@ -154,7 +154,7 @@ func (f *Fcm) Push(ctx context.Context, userIDs []string, title, content string,
|
||||
}
|
||||
messageCount := len(messages)
|
||||
if messageCount > 0 {
|
||||
response, err := f.fcmMsgCli.SendAll(ctx, messages)
|
||||
response, err := f.fcmMsgCli.SendEach(ctx, messages)
|
||||
if err != nil {
|
||||
Fail = Fail + messageCount
|
||||
} else {
|
||||
|
@ -64,6 +64,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
|
||||
redis2.NewTokenCacheModel(rdb, config.RpcConfig.TokenPolicy.Expire),
|
||||
config.Share.Secret,
|
||||
config.RpcConfig.TokenPolicy.Expire,
|
||||
config.Share.MultiLoginPolicy,
|
||||
),
|
||||
config: config,
|
||||
})
|
||||
|
@ -710,3 +710,19 @@ func (c *conversationServer) GetConversationsNeedDestructMsgs(ctx context.Contex
|
||||
|
||||
return &pbconversation.GetConversationsNeedDestructMsgsResp{Conversations: convert.ConversationsDB2Pb(temp)}, nil
|
||||
}
|
||||
|
||||
func (c *conversationServer) GetNotNotifyConversationIDs(ctx context.Context, req *pbconversation.GetNotNotifyConversationIDsReq) (*pbconversation.GetNotNotifyConversationIDsResp, error) {
|
||||
conversationIDs, err := c.conversationDatabase.GetNotNotifyConversationIDs(ctx, req.UserID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &pbconversation.GetNotNotifyConversationIDsResp{ConversationIDs: conversationIDs}, nil
|
||||
}
|
||||
|
||||
func (c *conversationServer) GetPinnedConversationIDs(ctx context.Context, req *pbconversation.GetPinnedConversationIDsReq) (*pbconversation.GetPinnedConversationIDsResp, error) {
|
||||
conversationIDs, err := c.conversationDatabase.GetPinnedConversationIDs(ctx, req.UserID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &pbconversation.GetPinnedConversationIDsResp{ConversationIDs: conversationIDs}, nil
|
||||
}
|
@ -218,6 +218,7 @@ func (s *groupServer) webhookAfterKickGroupMember(ctx context.Context, after *co
|
||||
CallbackCommand: callbackstruct.CallbackAfterKickGroupCommand,
|
||||
GroupID: req.GroupID,
|
||||
KickedUserIDs: req.KickedUserIDs,
|
||||
Reason: req.Reason,
|
||||
}
|
||||
s.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &callbackstruct.CallbackKillGroupMemberResp{}, after)
|
||||
}
|
||||
|
@ -167,11 +167,11 @@ func (g *groupServer) CheckGroupAdmin(ctx context.Context, groupID string) error
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *groupServer) GetPublicUserInfoMap(ctx context.Context, userIDs []string, complete bool) (map[string]*sdkws.PublicUserInfo, error) {
|
||||
func (g *groupServer) GetPublicUserInfoMap(ctx context.Context, userIDs []string) (map[string]*sdkws.PublicUserInfo, error) {
|
||||
if len(userIDs) == 0 {
|
||||
return map[string]*sdkws.PublicUserInfo{}, nil
|
||||
}
|
||||
users, err := g.user.GetPublicUserInfos(ctx, userIDs, complete)
|
||||
users, err := g.user.GetPublicUserInfos(ctx, userIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -696,7 +696,7 @@ func (g *groupServer) GetGroupApplicationList(ctx context.Context, req *pbgroup.
|
||||
userIDs = append(userIDs, gr.UserID)
|
||||
}
|
||||
userIDs = datautil.Distinct(userIDs)
|
||||
userMap, err := g.user.GetPublicUserInfoMap(ctx, userIDs, true)
|
||||
userMap, err := g.user.GetPublicUserInfoMap(ctx, userIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -1685,36 +1685,51 @@ func (g *groupServer) GetGroupUsersReqApplicationList(ctx context.Context, req *
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(requests) == 0 {
|
||||
return &pbgroup.GetGroupUsersReqApplicationListResp{}, nil
|
||||
}
|
||||
|
||||
groupIDs := datautil.Distinct(datautil.Slice(requests, func(e *model.GroupRequest) string {
|
||||
return e.GroupID
|
||||
}))
|
||||
|
||||
groups, err := g.db.FindGroup(ctx, groupIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
groupMap := datautil.SliceToMap(groups, func(e *model.Group) string {
|
||||
return e.GroupID
|
||||
})
|
||||
|
||||
if ids := datautil.Single(groupIDs, datautil.Keys(groupMap)); len(ids) > 0 {
|
||||
return nil, servererrs.ErrGroupIDNotFound.WrapMsg(strings.Join(ids, ","))
|
||||
}
|
||||
|
||||
userMap, err := g.user.GetPublicUserInfoMap(ctx, req.UserIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
owners, err := g.db.FindGroupsOwner(ctx, groupIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := g.PopulateGroupMember(ctx, owners...); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ownerMap := datautil.SliceToMap(owners, func(e *model.GroupMember) string {
|
||||
return e.GroupID
|
||||
})
|
||||
|
||||
groupMemberNum, err := g.db.MapGroupMemberNum(ctx, groupIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &pbgroup.GetGroupUsersReqApplicationListResp{
|
||||
Total: int64(len(requests)),
|
||||
GroupRequests: datautil.Slice(requests, func(e *model.GroupRequest) *sdkws.GroupRequest {
|
||||
@ -1722,7 +1737,71 @@ func (g *groupServer) GetGroupUsersReqApplicationList(ctx context.Context, req *
|
||||
if owner, ok := ownerMap[e.GroupID]; ok {
|
||||
ownerUserID = owner.UserID
|
||||
}
|
||||
return convert.Db2PbGroupRequest(e, nil, convert.Db2PbGroupInfo(groupMap[e.GroupID], ownerUserID, groupMemberNum[e.GroupID]))
|
||||
|
||||
var userInfo *sdkws.PublicUserInfo
|
||||
if user, ok := userMap[e.UserID]; !ok {
|
||||
userInfo = user
|
||||
}
|
||||
|
||||
return convert.Db2PbGroupRequest(e, userInfo, convert.Db2PbGroupInfo(groupMap[e.GroupID], ownerUserID, groupMemberNum[e.GroupID]))
|
||||
}),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (g *groupServer) GetSpecifiedUserGroupRequestInfo(ctx context.Context, req *pbgroup.GetSpecifiedUserGroupRequestInfoReq) (*pbgroup.GetSpecifiedUserGroupRequestInfoResp, error) {
|
||||
opUserID := mcontext.GetOpUserID(ctx)
|
||||
|
||||
owners, err := g.db.FindGroupsOwner(ctx, []string{req.GroupID})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if req.UserID != opUserID {
|
||||
req.UserID = mcontext.GetOpUserID(ctx)
|
||||
adminIDs, err := g.db.GetGroupRoleLevelMemberIDs(ctx, req.GroupID, constant.GroupAdmin)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
adminIDs = append(adminIDs, owners[0].UserID)
|
||||
|
||||
if !datautil.Contain(req.UserID, adminIDs...) {
|
||||
return nil, errs.ErrNoPermission.WrapMsg("opUser no permission")
|
||||
}
|
||||
}
|
||||
requests, err := g.db.FindGroupRequests(ctx, req.GroupID, []string{req.UserID})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(requests) == 0 {
|
||||
return &pbgroup.GetSpecifiedUserGroupRequestInfoResp{}, nil
|
||||
}
|
||||
|
||||
groups, err := g.db.FindGroup(ctx, []string{req.GroupID})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
userInfos, err := g.user.GetPublicUserInfos(ctx, []string{req.UserID})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
groupMemberNum, err := g.db.MapGroupMemberNum(ctx, []string{req.GroupID})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp := &pbgroup.GetSpecifiedUserGroupRequestInfoResp{
|
||||
GroupRequests: make([]*sdkws.GroupRequest, 0, len(requests)),
|
||||
}
|
||||
|
||||
for _, request := range requests {
|
||||
resp.GroupRequests = append(resp.GroupRequests, convert.Db2PbGroupRequest(request, userInfos[0], convert.Db2PbGroupInfo(groups[0], owners[0].UserID, groupMemberNum[groups[0].GroupID])))
|
||||
}
|
||||
|
||||
resp.Total = uint32(len(requests))
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
@ -55,7 +55,7 @@ func (m *msgServer) GetConversationsHasReadAndMaxSeq(ctx context.Context, req *m
|
||||
conversationMaxSeqMap[conversation.ConversationID] = conversation.MaxSeq
|
||||
}
|
||||
}
|
||||
maxSeqs, err := m.MsgDatabase.GetMaxSeqs(ctx, conversationIDs)
|
||||
maxSeqs, err := m.MsgDatabase.GetMaxSeqsWithTime(ctx, conversationIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -63,7 +63,8 @@ func (m *msgServer) GetConversationsHasReadAndMaxSeq(ctx context.Context, req *m
|
||||
for conversationID, maxSeq := range maxSeqs {
|
||||
resp.Seqs[conversationID] = &msg.Seqs{
|
||||
HasReadSeq: hasReadSeqs[conversationID],
|
||||
MaxSeq: maxSeq,
|
||||
MaxSeq: maxSeq.Seq,
|
||||
MaxSeqTime: maxSeq.Time,
|
||||
}
|
||||
if v, ok := conversationMaxSeqMap[conversationID]; ok {
|
||||
resp.Seqs[conversationID].MaxSeq = v
|
||||
|
@ -16,10 +16,10 @@ package msg
|
||||
|
||||
import (
|
||||
"context"
|
||||
pbmsg "github.com/openimsdk/protocol/msg"
|
||||
"github.com/openimsdk/tools/errs"
|
||||
"github.com/redis/go-redis/v9"
|
||||
|
||||
pbmsg "github.com/openimsdk/protocol/msg"
|
||||
"sort"
|
||||
)
|
||||
|
||||
func (m *msgServer) GetConversationMaxSeq(ctx context.Context, req *pbmsg.GetConversationMaxSeqReq) (*pbmsg.GetConversationMaxSeqResp, error) {
|
||||
@ -62,3 +62,25 @@ func (m *msgServer) SetUserConversationsMinSeq(ctx context.Context, req *pbmsg.S
|
||||
}
|
||||
return &pbmsg.SetUserConversationsMinSeqResp{}, nil
|
||||
}
|
||||
|
||||
func (m *msgServer) GetActiveConversation(ctx context.Context, req *pbmsg.GetActiveConversationReq) (*pbmsg.GetActiveConversationResp, error) {
|
||||
res, err := m.MsgDatabase.GetCacheMaxSeqWithTime(ctx, req.ConversationIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conversations := make([]*pbmsg.ActiveConversation, 0, len(res))
|
||||
for conversationID, val := range res {
|
||||
conversations = append(conversations, &pbmsg.ActiveConversation{
|
||||
MaxSeq: val.Seq,
|
||||
LastTime: val.Time,
|
||||
ConversationID: conversationID,
|
||||
})
|
||||
}
|
||||
if req.Limit > 0 {
|
||||
sort.Sort(activeConversations(conversations))
|
||||
if len(conversations) > int(req.Limit) {
|
||||
conversations = conversations[:req.Limit]
|
||||
}
|
||||
}
|
||||
return &pbmsg.GetActiveConversationResp{Conversations: conversations}, nil
|
||||
}
|
||||
|
@ -15,6 +15,7 @@
|
||||
package msg
|
||||
|
||||
import (
|
||||
"github.com/openimsdk/protocol/msg"
|
||||
"github.com/openimsdk/tools/errs"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
@ -28,3 +29,63 @@ func IsNotFound(err error) bool {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
type activeConversations []*msg.ActiveConversation
|
||||
|
||||
func (s activeConversations) Len() int {
|
||||
return len(s)
|
||||
}
|
||||
|
||||
func (s activeConversations) Less(i, j int) bool {
|
||||
return s[i].LastTime > s[j].LastTime
|
||||
}
|
||||
|
||||
func (s activeConversations) Swap(i, j int) {
|
||||
s[i], s[j] = s[j], s[i]
|
||||
}
|
||||
|
||||
//type seqTime struct {
|
||||
// ConversationID string
|
||||
// Seq int64
|
||||
// Time int64
|
||||
// Unread int64
|
||||
// Pinned bool
|
||||
//}
|
||||
//
|
||||
//func (s seqTime) String() string {
|
||||
// return fmt.Sprintf("<Time_%d,Unread_%d,Pinned_%t>", s.Time, s.Unread, s.Pinned)
|
||||
//}
|
||||
//
|
||||
//type seqTimes []seqTime
|
||||
//
|
||||
//func (s seqTimes) Len() int {
|
||||
// return len(s)
|
||||
//}
|
||||
//
|
||||
//// Less sticky priority, unread priority, time descending
|
||||
//func (s seqTimes) Less(i, j int) bool {
|
||||
// iv, jv := s[i], s[j]
|
||||
// if iv.Pinned && (!jv.Pinned) {
|
||||
// return true
|
||||
// }
|
||||
// if jv.Pinned && (!iv.Pinned) {
|
||||
// return false
|
||||
// }
|
||||
// if iv.Unread > 0 && jv.Unread == 0 {
|
||||
// return true
|
||||
// }
|
||||
// if jv.Unread > 0 && iv.Unread == 0 {
|
||||
// return false
|
||||
// }
|
||||
// return iv.Time > jv.Time
|
||||
//}
|
||||
//
|
||||
//func (s seqTimes) Swap(i, j int) {
|
||||
// s[i], s[j] = s[j], s[i]
|
||||
//}
|
||||
//
|
||||
//type conversationStatus struct {
|
||||
// ConversationID string
|
||||
// Pinned bool
|
||||
// Recv bool
|
||||
//}
|
||||
|
@ -23,13 +23,17 @@ import (
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
|
||||
"github.com/openimsdk/protocol/relation"
|
||||
"github.com/openimsdk/protocol/sdkws"
|
||||
"github.com/openimsdk/tools/errs"
|
||||
"github.com/openimsdk/tools/mcontext"
|
||||
"github.com/openimsdk/tools/utils/datautil"
|
||||
)
|
||||
|
||||
func (s *friendServer) GetPaginationBlacks(ctx context.Context, req *relation.GetPaginationBlacksReq) (resp *relation.GetPaginationBlacksResp, err error) {
|
||||
if err := s.userRpcClient.Access(ctx, req.UserID); err != nil {
|
||||
if err := authverify.CheckAccessV3(ctx, req.UserID, s.config.Share.IMAdminUserID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
total, blacks, err := s.blackDatabase.FindOwnerBlacks(ctx, req.UserID, req.Pagination)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -55,7 +59,7 @@ func (s *friendServer) IsBlack(ctx context.Context, req *relation.IsBlackReq) (*
|
||||
}
|
||||
|
||||
func (s *friendServer) RemoveBlack(ctx context.Context, req *relation.RemoveBlackReq) (*relation.RemoveBlackResp, error) {
|
||||
if err := s.userRpcClient.Access(ctx, req.OwnerUserID); err != nil {
|
||||
if err := authverify.CheckAccessV3(ctx, req.OwnerUserID, s.config.Share.IMAdminUserID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -64,6 +68,7 @@ func (s *friendServer) RemoveBlack(ctx context.Context, req *relation.RemoveBlac
|
||||
}
|
||||
|
||||
s.notificationSender.BlackDeletedNotification(ctx, req)
|
||||
s.webhookAfterRemoveBlack(ctx, &s.config.WebhooksConfig.AfterRemoveBlack, req)
|
||||
|
||||
return &relation.RemoveBlackResp{}, nil
|
||||
}
|
||||
@ -72,6 +77,11 @@ func (s *friendServer) AddBlack(ctx context.Context, req *relation.AddBlackReq)
|
||||
if err := authverify.CheckAccessV3(ctx, req.OwnerUserID, s.config.Share.IMAdminUserID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := s.webhookBeforeAddBlack(ctx, &s.config.WebhooksConfig.BeforeAddBlack, req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
_, err := s.userRpcClient.GetUsersInfo(ctx, []string{req.OwnerUserID, req.BlackUserID})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -90,3 +100,53 @@ func (s *friendServer) AddBlack(ctx context.Context, req *relation.AddBlackReq)
|
||||
s.notificationSender.BlackAddedNotification(ctx, req)
|
||||
return &relation.AddBlackResp{}, nil
|
||||
}
|
||||
|
||||
func (s *friendServer) GetSpecifiedBlacks(ctx context.Context, req *relation.GetSpecifiedBlacksReq) (*relation.GetSpecifiedBlacksResp, error) {
|
||||
if err := authverify.CheckAccessV3(ctx, req.OwnerUserID, s.config.Share.IMAdminUserID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(req.UserIDList) == 0 {
|
||||
return nil, errs.ErrArgs.WrapMsg("userIDList is empty")
|
||||
}
|
||||
|
||||
if datautil.Duplicate(req.UserIDList) {
|
||||
return nil, errs.ErrArgs.WrapMsg("userIDList repeated")
|
||||
}
|
||||
|
||||
userMap, err := s.userRpcClient.GetPublicUserInfoMap(ctx, req.UserIDList)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
blacks, err := s.blackDatabase.FindBlackInfos(ctx, req.OwnerUserID, req.UserIDList)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
blackMap := datautil.SliceToMap(blacks, func(e *model.Black) string {
|
||||
return e.BlockUserID
|
||||
})
|
||||
|
||||
resp := &relation.GetSpecifiedBlacksResp{
|
||||
Blacks: make([]*sdkws.BlackInfo, 0, len(req.UserIDList)),
|
||||
}
|
||||
|
||||
for _, userID := range req.UserIDList {
|
||||
if black := blackMap[userID]; black != nil {
|
||||
resp.Blacks = append(resp.Blacks,
|
||||
&sdkws.BlackInfo{
|
||||
OwnerUserID: black.OwnerUserID,
|
||||
CreateTime: black.CreateTime.UnixMilli(),
|
||||
BlackUserInfo: userMap[userID],
|
||||
AddSource: black.AddSource,
|
||||
OperatorUserID: black.OperatorUserID,
|
||||
Ex: black.Ex,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
resp.Total = int32(len(resp.Blacks))
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
@ -228,20 +228,23 @@ func (s *friendServer) RespondFriendApply(ctx context.Context, req *relation.Res
|
||||
|
||||
// ok.
|
||||
func (s *friendServer) DeleteFriend(ctx context.Context, req *relation.DeleteFriendReq) (resp *relation.DeleteFriendResp, err error) {
|
||||
resp = &relation.DeleteFriendResp{}
|
||||
if err := s.userRpcClient.Access(ctx, req.OwnerUserID); err != nil {
|
||||
if err := authverify.CheckAccessV3(ctx, req.OwnerUserID, s.config.Share.IMAdminUserID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
_, err = s.db.FindFriendsWithError(ctx, req.OwnerUserID, []string{req.FriendUserID})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := s.db.Delete(ctx, req.OwnerUserID, []string{req.FriendUserID}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
s.notificationSender.FriendDeletedNotification(ctx, req)
|
||||
s.webhookAfterDeleteFriend(ctx, &s.config.WebhooksConfig.AfterDeleteFriend, req)
|
||||
return resp, nil
|
||||
|
||||
return &relation.DeleteFriendResp{}, nil
|
||||
}
|
||||
|
||||
// ok.
|
||||
@ -249,20 +252,24 @@ func (s *friendServer) SetFriendRemark(ctx context.Context, req *relation.SetFri
|
||||
if err = s.webhookBeforeSetFriendRemark(ctx, &s.config.WebhooksConfig.BeforeSetFriendRemark, req); err != nil && err != servererrs.ErrCallbackContinue {
|
||||
return nil, err
|
||||
}
|
||||
resp = &relation.SetFriendRemarkResp{}
|
||||
if err := s.userRpcClient.Access(ctx, req.OwnerUserID); err != nil {
|
||||
|
||||
if err := authverify.CheckAccessV3(ctx, req.OwnerUserID, s.config.Share.IMAdminUserID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
_, err = s.db.FindFriendsWithError(ctx, req.OwnerUserID, []string{req.FriendUserID})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := s.db.UpdateRemark(ctx, req.OwnerUserID, req.FriendUserID, req.Remark); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
s.webhookAfterSetFriendRemark(ctx, &s.config.WebhooksConfig.AfterSetFriendRemark, req)
|
||||
s.notificationSender.FriendRemarkSetNotification(ctx, req.OwnerUserID, req.FriendUserID)
|
||||
return resp, nil
|
||||
|
||||
return &relation.SetFriendRemarkResp{}, nil
|
||||
}
|
||||
|
||||
// ok.
|
||||
@ -309,7 +316,7 @@ func (s *friendServer) GetDesignatedFriendsApply(ctx context.Context,
|
||||
|
||||
// Get received friend requests (i.e., those initiated by others).
|
||||
func (s *friendServer) GetPaginationFriendsApplyTo(ctx context.Context, req *relation.GetPaginationFriendsApplyToReq) (resp *relation.GetPaginationFriendsApplyToResp, err error) {
|
||||
if err := s.userRpcClient.Access(ctx, req.UserID); err != nil {
|
||||
if err := authverify.CheckAccessV3(ctx, req.UserID, s.config.Share.IMAdminUserID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -331,18 +338,23 @@ func (s *friendServer) GetPaginationFriendsApplyTo(ctx context.Context, req *rel
|
||||
|
||||
func (s *friendServer) GetPaginationFriendsApplyFrom(ctx context.Context, req *relation.GetPaginationFriendsApplyFromReq) (resp *relation.GetPaginationFriendsApplyFromResp, err error) {
|
||||
resp = &relation.GetPaginationFriendsApplyFromResp{}
|
||||
if err := s.userRpcClient.Access(ctx, req.UserID); err != nil {
|
||||
|
||||
if err := authverify.CheckAccessV3(ctx, req.UserID, s.config.Share.IMAdminUserID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
total, friendRequests, err := s.db.PageFriendRequestFromMe(ctx, req.UserID, req.Pagination)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp.FriendRequests, err = convert.FriendRequestDB2Pb(ctx, friendRequests, s.userRpcClient.GetUsersInfoMap)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp.Total = int32(total)
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
@ -357,31 +369,37 @@ func (s *friendServer) IsFriend(ctx context.Context, req *relation.IsFriendReq)
|
||||
}
|
||||
|
||||
func (s *friendServer) GetPaginationFriends(ctx context.Context, req *relation.GetPaginationFriendsReq) (resp *relation.GetPaginationFriendsResp, err error) {
|
||||
if err := s.userRpcClient.Access(ctx, req.UserID); err != nil {
|
||||
if err := authverify.CheckAccessV3(ctx, req.UserID, s.config.Share.IMAdminUserID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
total, friends, err := s.db.PageOwnerFriends(ctx, req.UserID, req.Pagination)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp = &relation.GetPaginationFriendsResp{}
|
||||
resp.FriendsInfo, err = convert.FriendsDB2Pb(ctx, friends, s.userRpcClient.GetUsersInfoMap)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp.Total = int32(total)
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (s *friendServer) GetFriendIDs(ctx context.Context, req *relation.GetFriendIDsReq) (resp *relation.GetFriendIDsResp, err error) {
|
||||
if err := s.userRpcClient.Access(ctx, req.UserID); err != nil {
|
||||
if err := authverify.CheckAccessV3(ctx, req.UserID, s.config.Share.IMAdminUserID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp = &relation.GetFriendIDsResp{}
|
||||
resp.FriendIDs, err = s.db.FindFriendUserIDs(ctx, req.UserID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
@ -389,35 +407,45 @@ func (s *friendServer) GetSpecifiedFriendsInfo(ctx context.Context, req *relatio
|
||||
if len(req.UserIDList) == 0 {
|
||||
return nil, errs.ErrArgs.WrapMsg("userIDList is empty")
|
||||
}
|
||||
|
||||
if datautil.Duplicate(req.UserIDList) {
|
||||
return nil, errs.ErrArgs.WrapMsg("userIDList repeated")
|
||||
}
|
||||
|
||||
userMap, err := s.userRpcClient.GetUsersInfoMap(ctx, req.UserIDList)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
friends, err := s.db.FindFriendsWithError(ctx, req.OwnerUserID, req.UserIDList)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
blacks, err := s.blackDatabase.FindBlackInfos(ctx, req.OwnerUserID, req.UserIDList)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
friendMap := datautil.SliceToMap(friends, func(e *model.Friend) string {
|
||||
return e.FriendUserID
|
||||
})
|
||||
|
||||
blackMap := datautil.SliceToMap(blacks, func(e *model.Black) string {
|
||||
return e.BlockUserID
|
||||
})
|
||||
|
||||
resp := &relation.GetSpecifiedFriendsInfoResp{
|
||||
Infos: make([]*relation.GetSpecifiedFriendsInfoInfo, 0, len(req.UserIDList)),
|
||||
}
|
||||
|
||||
for _, userID := range req.UserIDList {
|
||||
user := userMap[userID]
|
||||
|
||||
if user == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
var friendInfo *sdkws.FriendInfo
|
||||
if friend := friendMap[userID]; friend != nil {
|
||||
friendInfo = &sdkws.FriendInfo{
|
||||
@ -430,6 +458,7 @@ func (s *friendServer) GetSpecifiedFriendsInfo(ctx context.Context, req *relatio
|
||||
IsPinned: friend.IsPinned,
|
||||
}
|
||||
}
|
||||
|
||||
var blackInfo *sdkws.BlackInfo
|
||||
if black := blackMap[userID]; black != nil {
|
||||
blackInfo = &sdkws.BlackInfo{
|
||||
@ -440,12 +469,14 @@ func (s *friendServer) GetSpecifiedFriendsInfo(ctx context.Context, req *relatio
|
||||
Ex: black.Ex,
|
||||
}
|
||||
}
|
||||
|
||||
resp.Infos = append(resp.Infos, &relation.GetSpecifiedFriendsInfoInfo{
|
||||
UserInfo: user,
|
||||
FriendInfo: friendInfo,
|
||||
BlackInfo: blackInfo,
|
||||
})
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
|
@ -185,7 +185,6 @@ type MsgGateway struct {
|
||||
WebsocketMaxMsgLen int `mapstructure:"websocketMaxMsgLen"`
|
||||
WebsocketTimeout int `mapstructure:"websocketTimeout"`
|
||||
} `mapstructure:"longConnSvr"`
|
||||
MultiLoginPolicy int `mapstructure:"multiLoginPolicy"`
|
||||
}
|
||||
|
||||
type MsgTransfer struct {
|
||||
@ -358,9 +357,10 @@ type AfterConfig struct {
|
||||
}
|
||||
|
||||
type Share struct {
|
||||
Secret string `mapstructure:"secret"`
|
||||
RpcRegisterName RpcRegisterName `mapstructure:"rpcRegisterName"`
|
||||
IMAdminUserID []string `mapstructure:"imAdminUserID"`
|
||||
Secret string `mapstructure:"secret"`
|
||||
RpcRegisterName RpcRegisterName `mapstructure:"rpcRegisterName"`
|
||||
IMAdminUserID []string `mapstructure:"imAdminUserID"`
|
||||
MultiLoginPolicy int `mapstructure:"multiLoginPolicy"`
|
||||
}
|
||||
type RpcRegisterName struct {
|
||||
User string `mapstructure:"user"`
|
||||
|
@ -17,6 +17,8 @@ package cachekey
|
||||
const (
|
||||
ConversationKey = "CONVERSATION:"
|
||||
ConversationIDsKey = "CONVERSATION_IDS:"
|
||||
NotNotifyConversationIDsKey = "NOT_NOTIFY_CONVERSATION_IDS:"
|
||||
PinnedConversationIDsKey = "PINNED_CONVERSATION_IDS:"
|
||||
ConversationIDsHashKey = "CONVERSATION_IDS_HASH:"
|
||||
ConversationHasReadSeqKey = "CONVERSATION_HAS_READ_SEQ:"
|
||||
RecvMsgOptKey = "RECV_MSG_OPT:"
|
||||
@ -34,6 +36,14 @@ func GetConversationIDsKey(ownerUserID string) string {
|
||||
return ConversationIDsKey + ownerUserID
|
||||
}
|
||||
|
||||
func GetNotNotifyConversationIDsKey(ownerUserID string) string {
|
||||
return NotNotifyConversationIDsKey + ownerUserID
|
||||
}
|
||||
|
||||
func GetPinnedConversationIDs(ownerUserID string) string {
|
||||
return PinnedConversationIDsKey + ownerUserID
|
||||
}
|
||||
|
||||
func GetSuperGroupRecvNotNotifyUserIDsKey(groupID string) string {
|
||||
return SuperGroupRecvMsgNotNotifyUserIDsKey + groupID
|
||||
}
|
||||
|
21
pkg/common/storage/cache/cachekey/group.go
vendored
21
pkg/common/storage/cache/cachekey/group.go
vendored
@ -20,16 +20,17 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
groupExpireTime = time.Second * 60 * 60 * 12
|
||||
GroupInfoKey = "GROUP_INFO:"
|
||||
GroupMemberIDsKey = "GROUP_MEMBER_IDS:"
|
||||
GroupMembersHashKey = "GROUP_MEMBERS_HASH2:"
|
||||
GroupMemberInfoKey = "GROUP_MEMBER_INFO:"
|
||||
JoinedGroupsKey = "JOIN_GROUPS_KEY:"
|
||||
GroupMemberNumKey = "GROUP_MEMBER_NUM_CACHE:"
|
||||
GroupRoleLevelMemberIDsKey = "GROUP_ROLE_LEVEL_MEMBER_IDS:"
|
||||
GroupMemberMaxVersionKey = "GROUP_MEMBER_MAX_VERSION:"
|
||||
GroupJoinMaxVersionKey = "GROUP_JOIN_MAX_VERSION:"
|
||||
groupExpireTime = time.Second * 60 * 60 * 12
|
||||
GroupInfoKey = "GROUP_INFO:"
|
||||
GroupMemberIDsKey = "GROUP_MEMBER_IDS:"
|
||||
GroupMembersHashKey = "GROUP_MEMBERS_HASH2:"
|
||||
GroupMemberInfoKey = "GROUP_MEMBER_INFO:"
|
||||
JoinedGroupsKey = "JOIN_GROUPS_KEY:"
|
||||
GroupMemberNumKey = "GROUP_MEMBER_NUM_CACHE:"
|
||||
GroupRoleLevelMemberIDsKey = "GROUP_ROLE_LEVEL_MEMBER_IDS:"
|
||||
GroupAdminLevelMemberIDsKey = "GROUP_ADMIN_LEVEL_MEMBER_IDS:"
|
||||
GroupMemberMaxVersionKey = "GROUP_MEMBER_MAX_VERSION:"
|
||||
GroupJoinMaxVersionKey = "GROUP_JOIN_MAX_VERSION:"
|
||||
)
|
||||
|
||||
func GetGroupInfoKey(groupID string) string {
|
||||
|
5
pkg/common/storage/cache/conversation.go
vendored
5
pkg/common/storage/cache/conversation.go
vendored
@ -25,6 +25,8 @@ type ConversationCache interface {
|
||||
CloneConversationCache() ConversationCache
|
||||
// get user's conversationIDs from msgCache
|
||||
GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error)
|
||||
GetUserNotNotifyConversationIDs(ctx context.Context, userID string) ([]string, error)
|
||||
GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error)
|
||||
DelConversationIDs(userIDs ...string) ConversationCache
|
||||
|
||||
GetUserConversationIDsHash(ctx context.Context, ownerUserID string) (hash uint64, err error)
|
||||
@ -54,7 +56,8 @@ type ConversationCache interface {
|
||||
|
||||
GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
|
||||
DelConversationNotReceiveMessageUserIDs(conversationIDs ...string) ConversationCache
|
||||
|
||||
DelConversationNotNotifyMessageUserIDs(userIDs ...string) ConversationCache
|
||||
DelConversationPinnedMessageUserIDs(userIDs ...string) ConversationCache
|
||||
DelConversationVersionUserIDs(userIDs ...string) ConversationCache
|
||||
|
||||
FindMaxConversationUserVersion(ctx context.Context, userID string) (*relationtb.VersionLog, error)
|
||||
|
1
pkg/common/storage/cache/group.go
vendored
1
pkg/common/storage/cache/group.go
vendored
@ -16,6 +16,7 @@ package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/common"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||
)
|
||||
|
36
pkg/common/storage/cache/redis/conversation.go
vendored
36
pkg/common/storage/cache/redis/conversation.go
vendored
@ -71,6 +71,14 @@ func (c *ConversationRedisCache) getConversationIDsKey(ownerUserID string) strin
|
||||
return cachekey.GetConversationIDsKey(ownerUserID)
|
||||
}
|
||||
|
||||
func (c *ConversationRedisCache) getNotNotifyConversationIDsKey(ownerUserID string) string {
|
||||
return cachekey.GetNotNotifyConversationIDsKey(ownerUserID)
|
||||
}
|
||||
|
||||
func (c *ConversationRedisCache) getPinnedConversationIDsKey(ownerUserID string) string {
|
||||
return cachekey.GetPinnedConversationIDs(ownerUserID)
|
||||
}
|
||||
|
||||
func (c *ConversationRedisCache) getSuperGroupRecvNotNotifyUserIDsKey(groupID string) string {
|
||||
return cachekey.GetSuperGroupRecvNotNotifyUserIDsKey(groupID)
|
||||
}
|
||||
@ -105,6 +113,18 @@ func (c *ConversationRedisCache) GetUserConversationIDs(ctx context.Context, own
|
||||
})
|
||||
}
|
||||
|
||||
func (c *ConversationRedisCache) GetUserNotNotifyConversationIDs(ctx context.Context, userID string) ([]string, error) {
|
||||
return getCache(ctx, c.rcClient, c.getNotNotifyConversationIDsKey(userID), c.expireTime, func(ctx context.Context) ([]string, error) {
|
||||
return c.conversationDB.FindUserIDAllNotNotifyConversationID(ctx, userID)
|
||||
})
|
||||
}
|
||||
|
||||
func (c *ConversationRedisCache) GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error) {
|
||||
return getCache(ctx, c.rcClient, c.getPinnedConversationIDsKey(userID), c.expireTime, func(ctx context.Context) ([]string, error) {
|
||||
return c.conversationDB.FindUserIDAllPinnedConversationID(ctx, userID)
|
||||
})
|
||||
}
|
||||
|
||||
func (c *ConversationRedisCache) DelConversationIDs(userIDs ...string) cache.ConversationCache {
|
||||
keys := make([]string, 0, len(userIDs))
|
||||
for _, userID := range userIDs {
|
||||
@ -242,6 +262,22 @@ func (c *ConversationRedisCache) DelConversationNotReceiveMessageUserIDs(convers
|
||||
return cache
|
||||
}
|
||||
|
||||
func (c *ConversationRedisCache) DelConversationNotNotifyMessageUserIDs(userIDs ...string) cache.ConversationCache {
|
||||
cache := c.CloneConversationCache()
|
||||
for _, userID := range userIDs {
|
||||
cache.AddKeys(c.getNotNotifyConversationIDsKey(userID))
|
||||
}
|
||||
return cache
|
||||
}
|
||||
|
||||
func (c *ConversationRedisCache) DelConversationPinnedMessageUserIDs(userIDs ...string) cache.ConversationCache {
|
||||
cache := c.CloneConversationCache()
|
||||
for _, userID := range userIDs {
|
||||
cache.AddKeys(c.getPinnedConversationIDsKey(userID))
|
||||
}
|
||||
return cache
|
||||
}
|
||||
|
||||
func (c *ConversationRedisCache) DelConversationVersionUserIDs(userIDs ...string) cache.ConversationCache {
|
||||
cache := c.CloneConversationCache()
|
||||
for _, userID := range userIDs {
|
||||
|
228
pkg/common/storage/cache/redis/seq_conversation.go
vendored
228
pkg/common/storage/cache/redis/seq_conversation.go
vendored
@ -12,6 +12,7 @@ import (
|
||||
"github.com/openimsdk/tools/errs"
|
||||
"github.com/openimsdk/tools/log"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
@ -57,6 +58,14 @@ func (s *seqConversationCacheRedis) getSingleMaxSeq(ctx context.Context, convers
|
||||
return map[string]int64{conversationID: seq}, nil
|
||||
}
|
||||
|
||||
func (s *seqConversationCacheRedis) getSingleMaxSeqWithTime(ctx context.Context, conversationID string) (map[string]database.SeqTime, error) {
|
||||
seq, err := s.GetMaxSeqWithTime(ctx, conversationID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return map[string]database.SeqTime{conversationID: seq}, nil
|
||||
}
|
||||
|
||||
func (s *seqConversationCacheRedis) batchGetMaxSeq(ctx context.Context, keys []string, keyConversationID map[string]string, seqs map[string]int64) error {
|
||||
result := make([]*redis.StringCmd, len(keys))
|
||||
pipe := s.rdb.Pipeline()
|
||||
@ -88,6 +97,46 @@ func (s *seqConversationCacheRedis) batchGetMaxSeq(ctx context.Context, keys []s
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *seqConversationCacheRedis) batchGetMaxSeqWithTime(ctx context.Context, keys []string, keyConversationID map[string]string, seqs map[string]database.SeqTime) error {
|
||||
result := make([]*redis.SliceCmd, len(keys))
|
||||
pipe := s.rdb.Pipeline()
|
||||
for i, key := range keys {
|
||||
result[i] = pipe.HMGet(ctx, key, "CURR", "TIME")
|
||||
}
|
||||
if _, err := pipe.Exec(ctx); err != nil && !errors.Is(err, redis.Nil) {
|
||||
return errs.Wrap(err)
|
||||
}
|
||||
var notFoundKey []string
|
||||
for i, r := range result {
|
||||
val, err := r.Result()
|
||||
if len(val) != 2 {
|
||||
return errs.WrapMsg(err, "batchGetMaxSeqWithTime invalid result", "key", keys[i], "res", val)
|
||||
}
|
||||
if val[0] == nil {
|
||||
notFoundKey = append(notFoundKey, keys[i])
|
||||
continue
|
||||
}
|
||||
seq, err := s.parseInt64(val[0])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
mill, err := s.parseInt64(val[1])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
seqs[keyConversationID[keys[i]]] = database.SeqTime{Seq: seq, Time: mill}
|
||||
}
|
||||
for _, key := range notFoundKey {
|
||||
conversationID := keyConversationID[key]
|
||||
seq, err := s.GetMaxSeqWithTime(ctx, conversationID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
seqs[conversationID] = seq
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *seqConversationCacheRedis) GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) {
|
||||
switch len(conversationIDs) {
|
||||
case 0:
|
||||
@ -121,11 +170,44 @@ func (s *seqConversationCacheRedis) GetMaxSeqs(ctx context.Context, conversation
|
||||
return seqs, nil
|
||||
}
|
||||
|
||||
func (s *seqConversationCacheRedis) GetMaxSeqsWithTime(ctx context.Context, conversationIDs []string) (map[string]database.SeqTime, error) {
|
||||
switch len(conversationIDs) {
|
||||
case 0:
|
||||
return map[string]database.SeqTime{}, nil
|
||||
case 1:
|
||||
return s.getSingleMaxSeqWithTime(ctx, conversationIDs[0])
|
||||
}
|
||||
keys := make([]string, 0, len(conversationIDs))
|
||||
keyConversationID := make(map[string]string, len(conversationIDs))
|
||||
for _, conversationID := range conversationIDs {
|
||||
key := s.getSeqMallocKey(conversationID)
|
||||
if _, ok := keyConversationID[key]; ok {
|
||||
continue
|
||||
}
|
||||
keys = append(keys, key)
|
||||
keyConversationID[key] = conversationID
|
||||
}
|
||||
if len(keys) == 1 {
|
||||
return s.getSingleMaxSeqWithTime(ctx, conversationIDs[0])
|
||||
}
|
||||
slotKeys, err := groupKeysBySlot(ctx, s.rdb, keys)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
seqs := make(map[string]database.SeqTime, len(conversationIDs))
|
||||
for _, keys := range slotKeys {
|
||||
if err := s.batchGetMaxSeqWithTime(ctx, keys, keyConversationID, seqs); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return seqs, nil
|
||||
}
|
||||
|
||||
func (s *seqConversationCacheRedis) getSeqMallocKey(conversationID string) string {
|
||||
return cachekey.GetMallocSeqKey(conversationID)
|
||||
}
|
||||
|
||||
func (s *seqConversationCacheRedis) setSeq(ctx context.Context, key string, owner int64, currSeq int64, lastSeq int64) (int64, error) {
|
||||
func (s *seqConversationCacheRedis) setSeq(ctx context.Context, key string, owner int64, currSeq int64, lastSeq int64, mill int64) (int64, error) {
|
||||
if lastSeq < currSeq {
|
||||
return 0, errs.New("lastSeq must be greater than currSeq")
|
||||
}
|
||||
@ -138,8 +220,9 @@ local lockValue = ARGV[1]
|
||||
local dataSecond = ARGV[2]
|
||||
local curr_seq = tonumber(ARGV[3])
|
||||
local last_seq = tonumber(ARGV[4])
|
||||
local mallocTime = ARGV[5]
|
||||
if redis.call("EXISTS", key) == 0 then
|
||||
redis.call("HSET", key, "CURR", curr_seq, "LAST", last_seq)
|
||||
redis.call("HSET", key, "CURR", curr_seq, "LAST", last_seq, "TIME", mallocTime)
|
||||
redis.call("EXPIRE", key, dataSecond)
|
||||
return 1
|
||||
end
|
||||
@ -147,11 +230,11 @@ if redis.call("HGET", key, "LOCK") ~= lockValue then
|
||||
return 2
|
||||
end
|
||||
redis.call("HDEL", key, "LOCK")
|
||||
redis.call("HSET", key, "CURR", curr_seq, "LAST", last_seq)
|
||||
redis.call("HSET", key, "CURR", curr_seq, "LAST", last_seq, "TIME", mallocTime)
|
||||
redis.call("EXPIRE", key, dataSecond)
|
||||
return 0
|
||||
`
|
||||
result, err := s.rdb.Eval(ctx, script, []string{key}, owner, int64(s.dataTime/time.Second), currSeq, lastSeq).Int64()
|
||||
result, err := s.rdb.Eval(ctx, script, []string{key}, owner, int64(s.dataTime/time.Second), currSeq, lastSeq, mill).Int64()
|
||||
if err != nil {
|
||||
return 0, errs.Wrap(err)
|
||||
}
|
||||
@ -169,6 +252,7 @@ local key = KEYS[1]
|
||||
local size = tonumber(ARGV[1])
|
||||
local lockSecond = ARGV[2]
|
||||
local dataSecond = ARGV[3]
|
||||
local mallocTime = ARGV[4]
|
||||
local result = {}
|
||||
if redis.call("EXISTS", key) == 0 then
|
||||
local lockValue = math.random(0, 999999999)
|
||||
@ -176,6 +260,7 @@ if redis.call("EXISTS", key) == 0 then
|
||||
redis.call("EXPIRE", key, lockSecond)
|
||||
table.insert(result, 1)
|
||||
table.insert(result, lockValue)
|
||||
table.insert(result, mallocTime)
|
||||
return result
|
||||
end
|
||||
if redis.call("HEXISTS", key, "LOCK") == 1 then
|
||||
@ -189,6 +274,12 @@ if size == 0 then
|
||||
table.insert(result, 0)
|
||||
table.insert(result, curr_seq)
|
||||
table.insert(result, last_seq)
|
||||
local setTime = redis.call("HGET", key, "TIME")
|
||||
if setTime then
|
||||
table.insert(result, setTime)
|
||||
else
|
||||
table.insert(result, 0)
|
||||
end
|
||||
return result
|
||||
end
|
||||
local max_seq = curr_seq + size
|
||||
@ -196,21 +287,25 @@ if max_seq > last_seq then
|
||||
local lockValue = math.random(0, 999999999)
|
||||
redis.call("HSET", key, "LOCK", lockValue)
|
||||
redis.call("HSET", key, "CURR", last_seq)
|
||||
redis.call("HSET", key, "TIME", mallocTime)
|
||||
redis.call("EXPIRE", key, lockSecond)
|
||||
table.insert(result, 3)
|
||||
table.insert(result, curr_seq)
|
||||
table.insert(result, last_seq)
|
||||
table.insert(result, lockValue)
|
||||
table.insert(result, mallocTime)
|
||||
return result
|
||||
end
|
||||
redis.call("HSET", key, "CURR", max_seq)
|
||||
redis.call("HSET", key, "TIME", ARGV[4])
|
||||
redis.call("EXPIRE", key, dataSecond)
|
||||
table.insert(result, 0)
|
||||
table.insert(result, curr_seq)
|
||||
table.insert(result, last_seq)
|
||||
table.insert(result, mallocTime)
|
||||
return result
|
||||
`
|
||||
result, err := s.rdb.Eval(ctx, script, []string{key}, size, int64(s.lockTime/time.Second), int64(s.dataTime/time.Second)).Int64Slice()
|
||||
result, err := s.rdb.Eval(ctx, script, []string{key}, size, int64(s.lockTime/time.Second), int64(s.dataTime/time.Second), time.Now().UnixMilli()).Int64Slice()
|
||||
if err != nil {
|
||||
return nil, errs.Wrap(err)
|
||||
}
|
||||
@ -228,9 +323,9 @@ func (s *seqConversationCacheRedis) wait(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *seqConversationCacheRedis) setSeqRetry(ctx context.Context, key string, owner int64, currSeq int64, lastSeq int64) {
|
||||
func (s *seqConversationCacheRedis) setSeqRetry(ctx context.Context, key string, owner int64, currSeq int64, lastSeq int64, mill int64) {
|
||||
for i := 0; i < 10; i++ {
|
||||
state, err := s.setSeq(ctx, key, owner, currSeq, lastSeq)
|
||||
state, err := s.setSeq(ctx, key, owner, currSeq, lastSeq, mill)
|
||||
if err != nil {
|
||||
log.ZError(ctx, "set seq cache failed", err, "key", key, "owner", owner, "currSeq", currSeq, "lastSeq", lastSeq, "count", i+1)
|
||||
if err := s.wait(ctx); err != nil {
|
||||
@ -267,60 +362,74 @@ func (s *seqConversationCacheRedis) getMallocSize(conversationID string, size in
|
||||
}
|
||||
|
||||
func (s *seqConversationCacheRedis) Malloc(ctx context.Context, conversationID string, size int64) (int64, error) {
|
||||
seq, _, err := s.mallocTime(ctx, conversationID, size)
|
||||
return seq, err
|
||||
}
|
||||
|
||||
func (s *seqConversationCacheRedis) mallocTime(ctx context.Context, conversationID string, size int64) (int64, int64, error) {
|
||||
if size < 0 {
|
||||
return 0, errs.New("size must be greater than 0")
|
||||
return 0, 0, errs.New("size must be greater than 0")
|
||||
}
|
||||
key := s.getSeqMallocKey(conversationID)
|
||||
for i := 0; i < 10; i++ {
|
||||
states, err := s.malloc(ctx, key, size)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
return 0, 0, err
|
||||
}
|
||||
switch states[0] {
|
||||
case 0: // success
|
||||
return states[1], nil
|
||||
return states[1], states[3], nil
|
||||
case 1: // not found
|
||||
mallocSize := s.getMallocSize(conversationID, size)
|
||||
seq, err := s.mgo.Malloc(ctx, conversationID, mallocSize)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
return 0, 0, err
|
||||
}
|
||||
s.setSeqRetry(ctx, key, states[1], seq+size, seq+mallocSize)
|
||||
return seq, nil
|
||||
s.setSeqRetry(ctx, key, states[1], seq+size, seq+mallocSize, states[2])
|
||||
return seq, 0, nil
|
||||
case 2: // locked
|
||||
if err := s.wait(ctx); err != nil {
|
||||
return 0, err
|
||||
return 0, 0, err
|
||||
}
|
||||
continue
|
||||
case 3: // exceeded cache max value
|
||||
currSeq := states[1]
|
||||
lastSeq := states[2]
|
||||
mill := states[4]
|
||||
mallocSize := s.getMallocSize(conversationID, size)
|
||||
seq, err := s.mgo.Malloc(ctx, conversationID, mallocSize)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
return 0, 0, err
|
||||
}
|
||||
if lastSeq == seq {
|
||||
s.setSeqRetry(ctx, key, states[3], currSeq+size, seq+mallocSize)
|
||||
return currSeq, nil
|
||||
s.setSeqRetry(ctx, key, states[3], currSeq+size, seq+mallocSize, mill)
|
||||
return currSeq, states[4], nil
|
||||
} else {
|
||||
log.ZWarn(ctx, "malloc seq not equal cache last seq", nil, "conversationID", conversationID, "currSeq", currSeq, "lastSeq", lastSeq, "mallocSeq", seq)
|
||||
s.setSeqRetry(ctx, key, states[3], seq+size, seq+mallocSize)
|
||||
return seq, nil
|
||||
s.setSeqRetry(ctx, key, states[3], seq+size, seq+mallocSize, mill)
|
||||
return seq, mill, nil
|
||||
}
|
||||
default:
|
||||
log.ZError(ctx, "malloc seq unknown state", nil, "state", states[0], "conversationID", conversationID, "size", size)
|
||||
return 0, errs.New(fmt.Sprintf("unknown state: %d", states[0]))
|
||||
return 0, 0, errs.New(fmt.Sprintf("unknown state: %d", states[0]))
|
||||
}
|
||||
}
|
||||
log.ZError(ctx, "malloc seq retrying still failed", nil, "conversationID", conversationID, "size", size)
|
||||
return 0, errs.New("malloc seq waiting for lock timeout", "conversationID", conversationID, "size", size)
|
||||
return 0, 0, errs.New("malloc seq waiting for lock timeout", "conversationID", conversationID, "size", size)
|
||||
}
|
||||
|
||||
func (s *seqConversationCacheRedis) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) {
|
||||
return s.Malloc(ctx, conversationID, 0)
|
||||
}
|
||||
|
||||
func (s *seqConversationCacheRedis) GetMaxSeqWithTime(ctx context.Context, conversationID string) (database.SeqTime, error) {
|
||||
seq, mill, err := s.mallocTime(ctx, conversationID, 0)
|
||||
if err != nil {
|
||||
return database.SeqTime{}, err
|
||||
}
|
||||
return database.SeqTime{Seq: seq, Time: mill}, nil
|
||||
}
|
||||
|
||||
func (s *seqConversationCacheRedis) SetMinSeqs(ctx context.Context, seqs map[string]int64) error {
|
||||
keys := make([]string, 0, len(seqs))
|
||||
for conversationID, seq := range seqs {
|
||||
@ -331,3 +440,80 @@ func (s *seqConversationCacheRedis) SetMinSeqs(ctx context.Context, seqs map[str
|
||||
}
|
||||
return DeleteCacheBySlot(ctx, s.rocks, keys)
|
||||
}
|
||||
|
||||
// GetCacheMaxSeqWithTime only get the existing cache, if there is no cache, no cache will be generated
|
||||
func (s *seqConversationCacheRedis) GetCacheMaxSeqWithTime(ctx context.Context, conversationIDs []string) (map[string]database.SeqTime, error) {
|
||||
if len(conversationIDs) == 0 {
|
||||
return map[string]database.SeqTime{}, nil
|
||||
}
|
||||
key2conversationID := make(map[string]string)
|
||||
keys := make([]string, 0, len(conversationIDs))
|
||||
for _, conversationID := range conversationIDs {
|
||||
key := s.getSeqMallocKey(conversationID)
|
||||
if _, ok := key2conversationID[key]; ok {
|
||||
continue
|
||||
}
|
||||
key2conversationID[key] = conversationID
|
||||
keys = append(keys, key)
|
||||
}
|
||||
slotKeys, err := groupKeysBySlot(ctx, s.rdb, keys)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
res := make(map[string]database.SeqTime)
|
||||
for _, keys := range slotKeys {
|
||||
if len(keys) == 0 {
|
||||
continue
|
||||
}
|
||||
pipe := s.rdb.Pipeline()
|
||||
cmds := make([]*redis.SliceCmd, 0, len(keys))
|
||||
for _, key := range keys {
|
||||
cmds = append(cmds, pipe.HMGet(ctx, key, "CURR", "TIME"))
|
||||
}
|
||||
if _, err := pipe.Exec(ctx); err != nil {
|
||||
return nil, errs.Wrap(err)
|
||||
}
|
||||
for i, cmd := range cmds {
|
||||
val, err := cmd.Result()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(val) != 2 {
|
||||
return nil, errs.WrapMsg(err, "GetCacheMaxSeqWithTime invalid result", "key", keys[i], "res", val)
|
||||
}
|
||||
if val[0] == nil {
|
||||
continue
|
||||
}
|
||||
seq, err := s.parseInt64(val[0])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
mill, err := s.parseInt64(val[1])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conversationID := key2conversationID[keys[i]]
|
||||
res[conversationID] = database.SeqTime{Seq: seq, Time: mill}
|
||||
}
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (s *seqConversationCacheRedis) parseInt64(val any) (int64, error) {
|
||||
switch v := val.(type) {
|
||||
case nil:
|
||||
return 0, nil
|
||||
case int:
|
||||
return int64(v), nil
|
||||
case int64:
|
||||
return v, nil
|
||||
case string:
|
||||
res, err := strconv.ParseInt(v, 10, 64)
|
||||
if err != nil {
|
||||
return 0, errs.WrapMsg(err, "invalid string not int64", "value", v)
|
||||
}
|
||||
return res, nil
|
||||
default:
|
||||
return 0, errs.New("invalid result not int64", "resType", fmt.Sprintf("%T", v), "value", v)
|
||||
}
|
||||
}
|
||||
|
@ -14,7 +14,7 @@ import (
|
||||
)
|
||||
|
||||
func newTestSeq() *seqConversationCacheRedis {
|
||||
mgocli, err := mongo.Connect(context.Background(), options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.48:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second))
|
||||
mgocli, err := mongo.Connect(context.Background(), options.Client().ApplyURI("mongodb://openIM:openIM123@127.0.0.1:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
@ -23,7 +23,7 @@ func newTestSeq() *seqConversationCacheRedis {
|
||||
panic(err)
|
||||
}
|
||||
opt := &redis.Options{
|
||||
Addr: "172.16.8.48:16379",
|
||||
Addr: "127.0.0.1:16379",
|
||||
Password: "openIM123",
|
||||
DB: 1,
|
||||
}
|
||||
@ -107,3 +107,37 @@ func TestMinSeq(t *testing.T) {
|
||||
ts := newTestSeq()
|
||||
t.Log(ts.GetMinSeq(context.Background(), "10000000"))
|
||||
}
|
||||
|
||||
func TestMalloc(t *testing.T) {
|
||||
ts := newTestSeq()
|
||||
t.Log(ts.mallocTime(context.Background(), "10000000", 100))
|
||||
}
|
||||
|
||||
func TestHMGET(t *testing.T) {
|
||||
ts := newTestSeq()
|
||||
res, err := ts.GetCacheMaxSeqWithTime(context.Background(), []string{"10000000", "123456"})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
t.Log(res)
|
||||
}
|
||||
|
||||
func TestGetMaxSeqWithTime(t *testing.T) {
|
||||
ts := newTestSeq()
|
||||
t.Log(ts.GetMaxSeqWithTime(context.Background(), "10000000"))
|
||||
}
|
||||
|
||||
func TestGetMaxSeqWithTime1(t *testing.T) {
|
||||
ts := newTestSeq()
|
||||
t.Log(ts.GetMaxSeqsWithTime(context.Background(), []string{"10000000", "12345", "111"}))
|
||||
}
|
||||
|
||||
//
|
||||
//func TestHMGET(t *testing.T) {
|
||||
// ts := newTestSeq()
|
||||
// res, err := ts.rdb.HMGet(context.Background(), "MALLOC_SEQ:1", "CURR", "TIME1").Result()
|
||||
// if err != nil {
|
||||
// panic(err)
|
||||
// }
|
||||
// t.Log(res)
|
||||
//}
|
||||
|
8
pkg/common/storage/cache/seq_conversation.go
vendored
8
pkg/common/storage/cache/seq_conversation.go
vendored
@ -1,6 +1,9 @@
|
||||
package cache
|
||||
|
||||
import "context"
|
||||
import (
|
||||
"context"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
|
||||
)
|
||||
|
||||
type SeqConversationCache interface {
|
||||
Malloc(ctx context.Context, conversationID string, size int64) (int64, error)
|
||||
@ -9,4 +12,7 @@ type SeqConversationCache interface {
|
||||
GetMinSeq(ctx context.Context, conversationID string) (int64, error)
|
||||
GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error)
|
||||
SetMinSeqs(ctx context.Context, seqs map[string]int64) error
|
||||
GetCacheMaxSeqWithTime(ctx context.Context, conversationIDs []string) (map[string]database.SeqTime, error)
|
||||
GetMaxSeqsWithTime(ctx context.Context, conversationIDs []string) (map[string]database.SeqTime, error)
|
||||
GetMaxSeqWithTime(ctx context.Context, conversationID string) (database.SeqTime, error)
|
||||
}
|
||||
|
@ -35,13 +35,14 @@ type AuthDatabase interface {
|
||||
}
|
||||
|
||||
type authDatabase struct {
|
||||
cache cache.TokenModel
|
||||
accessSecret string
|
||||
accessExpire int64
|
||||
cache cache.TokenModel
|
||||
accessSecret string
|
||||
accessExpire int64
|
||||
multiLoginPolicy int
|
||||
}
|
||||
|
||||
func NewAuthDatabase(cache cache.TokenModel, accessSecret string, accessExpire int64) AuthDatabase {
|
||||
return &authDatabase{cache: cache, accessSecret: accessSecret, accessExpire: accessExpire}
|
||||
func NewAuthDatabase(cache cache.TokenModel, accessSecret string, accessExpire int64, policy int) AuthDatabase {
|
||||
return &authDatabase{cache: cache, accessSecret: accessSecret, accessExpire: accessExpire, multiLoginPolicy: policy}
|
||||
}
|
||||
|
||||
// If the result is empty.
|
||||
@ -55,15 +56,19 @@ func (a *authDatabase) SetTokenMapByUidPid(ctx context.Context, userID string, p
|
||||
|
||||
// Create Token.
|
||||
func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformID int) (string, error) {
|
||||
// todo: get all platform token
|
||||
tokens, err := a.cache.GetTokensWithoutError(ctx, userID, platformID)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
var deleteTokenKey []string
|
||||
var kickedTokenKey []string
|
||||
for k, v := range tokens {
|
||||
_, err = tokenverify.GetClaimFromToken(k, authverify.Secret(a.accessSecret))
|
||||
t, err := tokenverify.GetClaimFromToken(k, authverify.Secret(a.accessSecret))
|
||||
if err != nil || v != constant.NormalToken {
|
||||
deleteTokenKey = append(deleteTokenKey, k)
|
||||
} else if a.checkKickToken(ctx, platformID, t) {
|
||||
kickedTokenKey = append(kickedTokenKey, k)
|
||||
}
|
||||
}
|
||||
if len(deleteTokenKey) != 0 {
|
||||
@ -72,6 +77,14 @@ func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformI
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
if len(kickedTokenKey) != 0 {
|
||||
for _, k := range kickedTokenKey {
|
||||
err := a.cache.SetTokenFlagEx(ctx, userID, platformID, k, constant.KickedToken)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
claims := tokenverify.BuildClaims(userID, platformID, a.accessExpire)
|
||||
token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
|
||||
@ -85,3 +98,23 @@ func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformI
|
||||
}
|
||||
return tokenString, nil
|
||||
}
|
||||
|
||||
func (a *authDatabase) checkKickToken(ctx context.Context, platformID int, token *tokenverify.Claims) bool {
|
||||
switch a.multiLoginPolicy {
|
||||
case constant.DefalutNotKick:
|
||||
return false
|
||||
case constant.PCAndOther:
|
||||
if constant.PlatformIDToClass(platformID) == constant.TerminalPC ||
|
||||
constant.PlatformIDToClass(token.PlatformID) == constant.TerminalPC {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
case constant.AllLoginButSameTermKick:
|
||||
if platformID == token.PlatformID {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
@ -69,6 +69,10 @@ type ConversationDatabase interface {
|
||||
FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*relationtb.VersionLog, error)
|
||||
FindMaxConversationUserVersionCache(ctx context.Context, userID string) (*relationtb.VersionLog, error)
|
||||
GetOwnerConversation(ctx context.Context, ownerUserID string, pagination pagination.Pagination) (int64, []*relationtb.Conversation, error)
|
||||
// GetNotNotifyConversationIDs gets not notify conversationIDs by userID
|
||||
GetNotNotifyConversationIDs(ctx context.Context, userID string) ([]string, error)
|
||||
// GetPinnedConversationIDs gets pinned conversationIDs by userID
|
||||
GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error)
|
||||
}
|
||||
|
||||
func NewConversationDatabase(conversation database.Conversation, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase {
|
||||
@ -108,6 +112,10 @@ func (c *conversationDatabase) SetUsersConversationFieldTx(ctx context.Context,
|
||||
}
|
||||
if _, ok := fieldMap["recv_msg_opt"]; ok {
|
||||
cache = cache.DelConversationNotReceiveMessageUserIDs(conversation.ConversationID)
|
||||
cache = cache.DelConversationNotNotifyMessageUserIDs(userIDs...)
|
||||
}
|
||||
if _, ok := fieldMap["is_pinned"]; ok {
|
||||
cache = cache.DelConversationPinnedMessageUserIDs(userIDs...)
|
||||
}
|
||||
cache = cache.DelConversationVersionUserIDs(haveUserIDs...)
|
||||
}
|
||||
@ -144,6 +152,10 @@ func (c *conversationDatabase) UpdateUsersConversationField(ctx context.Context,
|
||||
cache = cache.DelUsersConversation(conversationID, userIDs...).DelConversationVersionUserIDs(userIDs...)
|
||||
if _, ok := args["recv_msg_opt"]; ok {
|
||||
cache = cache.DelConversationNotReceiveMessageUserIDs(conversationID)
|
||||
cache = cache.DelConversationNotNotifyMessageUserIDs(userIDs...)
|
||||
}
|
||||
if _, ok := args["is_pinned"]; ok {
|
||||
cache = cache.DelConversationPinnedMessageUserIDs(userIDs...)
|
||||
}
|
||||
return cache.ChainExecDel(ctx)
|
||||
}
|
||||
@ -152,14 +164,30 @@ func (c *conversationDatabase) CreateConversation(ctx context.Context, conversat
|
||||
if err := c.conversationDB.Create(ctx, conversations); err != nil {
|
||||
return err
|
||||
}
|
||||
var userIDs []string
|
||||
var (
|
||||
userIDs []string
|
||||
notNotifyUserIDs []string
|
||||
pinnedUserIDs []string
|
||||
)
|
||||
|
||||
cache := c.cache.CloneConversationCache()
|
||||
for _, conversation := range conversations {
|
||||
cache = cache.DelConversations(conversation.OwnerUserID, conversation.ConversationID)
|
||||
cache = cache.DelConversationNotReceiveMessageUserIDs(conversation.ConversationID)
|
||||
userIDs = append(userIDs, conversation.OwnerUserID)
|
||||
if conversation.RecvMsgOpt == constant.ReceiveNotNotifyMessage {
|
||||
notNotifyUserIDs = append(notNotifyUserIDs, conversation.OwnerUserID)
|
||||
}
|
||||
if conversation.IsPinned == true {
|
||||
pinnedUserIDs = append(pinnedUserIDs, conversation.OwnerUserID)
|
||||
}
|
||||
}
|
||||
return cache.DelConversationIDs(userIDs...).DelUserConversationIDsHash(userIDs...).DelConversationVersionUserIDs(userIDs...).ChainExecDel(ctx)
|
||||
return cache.DelConversationIDs(userIDs...).
|
||||
DelUserConversationIDsHash(userIDs...).
|
||||
DelConversationVersionUserIDs(userIDs...).
|
||||
DelConversationNotNotifyMessageUserIDs(notNotifyUserIDs...).
|
||||
DelConversationPinnedMessageUserIDs(pinnedUserIDs...).
|
||||
ChainExecDel(ctx)
|
||||
}
|
||||
|
||||
func (c *conversationDatabase) SyncPeerUserPrivateConversationTx(ctx context.Context, conversations []*relationtb.Conversation) error {
|
||||
@ -212,7 +240,10 @@ func (c *conversationDatabase) GetUserAllConversation(ctx context.Context, owner
|
||||
func (c *conversationDatabase) SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationtb.Conversation) error {
|
||||
return c.tx.Transaction(ctx, func(ctx context.Context) error {
|
||||
cache := c.cache.CloneConversationCache()
|
||||
cache = cache.DelConversationVersionUserIDs(ownerUserID)
|
||||
cache = cache.DelConversationVersionUserIDs(ownerUserID).
|
||||
DelConversationNotNotifyMessageUserIDs(ownerUserID).
|
||||
DelConversationPinnedMessageUserIDs(ownerUserID)
|
||||
|
||||
groupIDs := datautil.Distinct(datautil.Filter(conversations, func(e *relationtb.Conversation) (string, bool) {
|
||||
return e.GroupID, e.GroupID != ""
|
||||
}))
|
||||
@ -353,3 +384,19 @@ func (c *conversationDatabase) GetOwnerConversation(ctx context.Context, ownerUs
|
||||
}
|
||||
return int64(len(conversationIDs)), conversations, nil
|
||||
}
|
||||
|
||||
func (c *conversationDatabase) GetNotNotifyConversationIDs(ctx context.Context, userID string) ([]string, error) {
|
||||
conversationIDs, err := c.cache.GetUserNotNotifyConversationIDs(ctx, userID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return conversationIDs, nil
|
||||
}
|
||||
|
||||
func (c *conversationDatabase) GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error) {
|
||||
conversationIDs, err := c.cache.GetPinnedConversationIDs(ctx, userID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return conversationIDs, nil
|
||||
}
|
||||
|
@ -74,6 +74,10 @@ type CommonMsgDatabase interface {
|
||||
GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error)
|
||||
UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error
|
||||
|
||||
GetMaxSeqsWithTime(ctx context.Context, conversationIDs []string) (map[string]database.SeqTime, error)
|
||||
GetMaxSeqWithTime(ctx context.Context, conversationID string) (database.SeqTime, error)
|
||||
GetCacheMaxSeqWithTime(ctx context.Context, conversationIDs []string) (map[string]database.SeqTime, error)
|
||||
|
||||
//GetMongoMaxAndMinSeq(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo int64, err error)
|
||||
//GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error)
|
||||
SetSendMsgStatus(ctx context.Context, id string, status int32) error
|
||||
@ -866,3 +870,16 @@ func (db *commonMsgDatabase) setMinSeq(ctx context.Context, conversationID strin
|
||||
func (db *commonMsgDatabase) GetDocIDs(ctx context.Context) ([]string, error) {
|
||||
return db.msgDocDatabase.GetDocIDs(ctx)
|
||||
}
|
||||
|
||||
func (db *commonMsgDatabase) GetCacheMaxSeqWithTime(ctx context.Context, conversationIDs []string) (map[string]database.SeqTime, error) {
|
||||
return db.seqConversation.GetCacheMaxSeqWithTime(ctx, conversationIDs)
|
||||
}
|
||||
|
||||
func (db *commonMsgDatabase) GetMaxSeqWithTime(ctx context.Context, conversationID string) (database.SeqTime, error) {
|
||||
return db.seqConversation.GetMaxSeqWithTime(ctx, conversationID)
|
||||
}
|
||||
|
||||
func (db *commonMsgDatabase) GetMaxSeqsWithTime(ctx context.Context, conversationIDs []string) (map[string]database.SeqTime, error) {
|
||||
// todo: only the time in the redis cache will be taken, not the message time
|
||||
return db.seqConversation.GetMaxSeqsWithTime(ctx, conversationIDs)
|
||||
}
|
||||
|
@ -27,6 +27,8 @@ type Conversation interface {
|
||||
Find(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []*model.Conversation, err error)
|
||||
FindUserID(ctx context.Context, userIDs []string, conversationIDs []string) ([]string, error)
|
||||
FindUserIDAllConversationID(ctx context.Context, userID string) ([]string, error)
|
||||
FindUserIDAllNotNotifyConversationID(ctx context.Context, userID string) ([]string, error)
|
||||
FindUserIDAllPinnedConversationID(ctx context.Context, userID string) ([]string, error)
|
||||
Take(ctx context.Context, userID, conversationID string) (conversation *model.Conversation, err error)
|
||||
FindConversationID(ctx context.Context, userID string, conversationIDs []string) (existConversationID []string, err error)
|
||||
FindUserIDAllConversations(ctx context.Context, userID string) (conversations []*model.Conversation, err error)
|
||||
|
@ -124,6 +124,20 @@ func (c *ConversationMgo) FindUserIDAllConversationID(ctx context.Context, userI
|
||||
return mongoutil.Find[string](ctx, c.coll, bson.M{"owner_user_id": userID}, options.Find().SetProjection(bson.M{"_id": 0, "conversation_id": 1}))
|
||||
}
|
||||
|
||||
func (c *ConversationMgo) FindUserIDAllNotNotifyConversationID(ctx context.Context, userID string) ([]string, error) {
|
||||
return mongoutil.Find[string](ctx, c.coll, bson.M{
|
||||
"owner_user_id": userID,
|
||||
"recv_msg_opt": constant.ReceiveNotNotifyMessage,
|
||||
}, options.Find().SetProjection(bson.M{"_id": 0, "conversation_id": 1}))
|
||||
}
|
||||
|
||||
func (c *ConversationMgo) FindUserIDAllPinnedConversationID(ctx context.Context, userID string) ([]string, error) {
|
||||
return mongoutil.Find[string](ctx, c.coll, bson.M{
|
||||
"owner_user_id": userID,
|
||||
"is_pinned": true,
|
||||
}, options.Find().SetProjection(bson.M{"_id": 0, "conversation_id": 1}))
|
||||
}
|
||||
|
||||
func (c *ConversationMgo) Take(ctx context.Context, userID, conversationID string) (conversation *model.Conversation, err error) {
|
||||
return mongoutil.FindOne[*model.Conversation](ctx, c.coll, bson.M{"owner_user_id": userID, "conversation_id": conversationID})
|
||||
}
|
||||
|
@ -2,6 +2,11 @@ package database
|
||||
|
||||
import "context"
|
||||
|
||||
type SeqTime struct {
|
||||
Seq int64
|
||||
Time int64
|
||||
}
|
||||
|
||||
type SeqConversation interface {
|
||||
Malloc(ctx context.Context, conversationID string, size int64) (int64, error)
|
||||
GetMaxSeq(ctx context.Context, conversationID string) (int64, error)
|
||||
|
@ -109,12 +109,12 @@ func (u *UserRpcClient) GetUsersInfoMap(ctx context.Context, userIDs []string) (
|
||||
func (u *UserRpcClient) GetPublicUserInfos(
|
||||
ctx context.Context,
|
||||
userIDs []string,
|
||||
complete bool,
|
||||
) ([]*sdkws.PublicUserInfo, error) {
|
||||
users, err := u.GetUsersInfo(ctx, userIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return datautil.Slice(users, func(e *sdkws.UserInfo) *sdkws.PublicUserInfo {
|
||||
return &sdkws.PublicUserInfo{
|
||||
UserID: e.UserID,
|
||||
@ -127,10 +127,11 @@ func (u *UserRpcClient) GetPublicUserInfos(
|
||||
|
||||
// GetPublicUserInfo retrieves public information for a single user based on the provided user ID.
|
||||
func (u *UserRpcClient) GetPublicUserInfo(ctx context.Context, userID string) (*sdkws.PublicUserInfo, error) {
|
||||
users, err := u.GetPublicUserInfos(ctx, []string{userID}, true)
|
||||
users, err := u.GetPublicUserInfos(ctx, []string{userID})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return users[0], nil
|
||||
}
|
||||
|
||||
@ -138,12 +139,12 @@ func (u *UserRpcClient) GetPublicUserInfo(ctx context.Context, userID string) (*
|
||||
func (u *UserRpcClient) GetPublicUserInfoMap(
|
||||
ctx context.Context,
|
||||
userIDs []string,
|
||||
complete bool,
|
||||
) (map[string]*sdkws.PublicUserInfo, error) {
|
||||
users, err := u.GetPublicUserInfos(ctx, userIDs, complete)
|
||||
users, err := u.GetPublicUserInfos(ctx, userIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return datautil.SliceToMap(users, func(e *sdkws.PublicUserInfo) string {
|
||||
return e.UserID
|
||||
}), nil
|
||||
|
Loading…
x
Reference in New Issue
Block a user