Merge pull request #695 from wangchuxiao-dev/dev2

Dev2
This commit is contained in:
WangchuXiao 2023-07-28 20:17:51 +08:00 committed by GitHub
commit 6713c4df52
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 452 additions and 109 deletions

View File

@ -35,18 +35,20 @@ jobs:
sudo make build sudo make build
# docker.io/openim/openim-server:latest # docker.io/openim/openim-server:latest
- name: Log in to Docker Hub
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Extract metadata (tags, labels) for Docker - name: Extract metadata (tags, labels) for Docker
id: meta id: meta
uses: docker/metadata-action@v4.6.0 uses: docker/metadata-action@v4.6.0
with: with:
images: openim/openim-server images: openim/openim-server
- name: Log in to Docker Hub
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
=======
>>>>>>> upstream/main
- name: Build and push Docker image - name: Build and push Docker image
uses: docker/build-push-action@v4 uses: docker/build-push-action@v4
with: with:
@ -56,6 +58,12 @@ jobs:
labels: ${{ steps.meta.outputs.labels }} labels: ${{ steps.meta.outputs.labels }}
# registry.cn-hangzhou.aliyuncs.com/openimsdk/openim-server:latest # registry.cn-hangzhou.aliyuncs.com/openimsdk/openim-server:latest
- name: Extract metadata (tags, labels) for Docker
id: meta2
uses: docker/metadata-action@v4.6.0
with:
images: registry.cn-hangzhou.aliyuncs.com/openimsdk/openim-server
- name: Log in to AliYun Docker Hub - name: Log in to AliYun Docker Hub
uses: docker/login-action@v2 uses: docker/login-action@v2
with: with:
@ -63,21 +71,21 @@ jobs:
username: ${{ secrets.ALIREGISTRY_USERNAME }} username: ${{ secrets.ALIREGISTRY_USERNAME }}
password: ${{ secrets.ALIREGISTRY_TOKEN }} password: ${{ secrets.ALIREGISTRY_TOKEN }}
- name: Extract metadata (tags, labels) for Docker
id: meta2
uses: docker/metadata-action@v4.6.0
with:
images: registry.cn-hangzhou.aliyuncs.com/openimsdk/openim-server
- name: Build and push Docker image - name: Build and push Docker image
uses: docker/build-push-action@v4 uses: docker/build-push-action@v4
with: with:
context: . context: .
push: ${{ github.event_name != 'pull_request' }} push: ${{ github.event_name != 'pull_request' }}
tags: ${{ steps.meta.outputs.tags }} tags: ${{ steps.meta2.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }} labels: ${{ steps.meta2.outputs.labels }}
# ghcr.io/openimsdk/openim-server:latest
- name: Extract metadata (tags, labels) for Docker
id: meta3
uses: docker/metadata-action@v4.6.0
with:
images: ghcr.io/openimsdk/openim-server
# ghcr.io/openim/openim-server:latest
- name: Log in to GitHub Container Registry - name: Log in to GitHub Container Registry
uses: docker/login-action@v2 uses: docker/login-action@v2
with: with:
@ -85,52 +93,10 @@ jobs:
username: ${{ github.actor }} username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }} password: ${{ secrets.GITHUB_TOKEN }}
- name: Extract metadata (tags, labels) for Docker
id: meta3
uses: docker/metadata-action@v4.6.0
with:
images: openim/openim-server
- name: Build and push Docker image - name: Build and push Docker image
uses: docker/build-push-action@v4 uses: docker/build-push-action@v4
with: with:
context: . context: .
push: ${{ github.event_name != 'pull_request' }} push: ${{ github.event_name != 'pull_request' }}
tags: ${{ steps.meta.outputs.tags }} tags: ${{ steps.meta3.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }} labels: ${{ steps.meta3.outputs.labels }}
# name: OpenIM Build Docker Images
# on:
# push:
# tags:
# - v*
# jobs:
# build:
# runs-on: ubuntu-latest
# strategy:
# matrix:
# bin:
# - openim-server
# steps:
# - name: Checkout
# uses: actions/checkout@v3
# - name: Setup Docker Buildx
# uses: docker/setup-buildx-action@v2
# - name: Login to GitHub Container Registry
# uses: docker/login-action@v2
# with:
# registry: ghcr.io
# username: ${{ github.repository_owner }}
# password: ${{ secrets.GITHUB_TOKEN }}
# - name: Docker metadata
# id: metadata
# uses: docker/metadata-action@v4
# with:
# images: ghcr.io/${{ github.repository_owner }}/openim-${{ matrix.bin }}
# - name: Build and release Docker images
# uses: docker/build-push-action@v3
# with:
# platforms: linux/386,linux/amd64,linux/arm64/v8
# target: ${{ matrix.bin }}
# tags: ${{ steps.metadata.outputs.tags }}
# push: true

View File

@ -480,6 +480,9 @@ checksum:
release: release:
footer: | footer: |
## Welcome to the {{ .Tag }} release of [chat](https://github.com/OpenIMSDK/chat)!🎉🎉!
**Full Changelog**: https://github.com/OpenIMSDK/Open-IM-Server/compare/{{ .PreviousTag }}...{{ .Tag }} **Full Changelog**: https://github.com/OpenIMSDK/Open-IM-Server/compare/{{ .PreviousTag }}...{{ .Tag }}
## Helping out ## Helping out

View File

@ -22,7 +22,8 @@ RUN /bin/sh -c "make build"
# Production Stage # Production Stage
FROM alpine FROM alpine
RUN apk --no-cache add tzdata RUN echo "https://mirrors.aliyun.com/alpine/v3.4/main" > /etc/apk/repositories && \
apk --no-cache add tzdata ca-certificates
# Set directory to map logs, config files, scripts, and SDK # Set directory to map logs, config files, scripts, and SDK
VOLUME ["/Open-IM-Server/logs", "/Open-IM-Server/config", "/Open-IM-Server/scripts", "/Open-IM-Server/db/sdk"] VOLUME ["/Open-IM-Server/logs", "/Open-IM-Server/config", "/Open-IM-Server/scripts", "/Open-IM-Server/db/sdk"]

View File

@ -126,10 +126,10 @@ api:
# Configuration for Aliyun OSS # Configuration for Aliyun OSS
object: object:
enable: "minio" enable: "minio"
apiURL: http://113.99.98.99:10002/object/ apiURL: http://127.0.0.1:10002/object/
minio: minio:
bucket: "openim" bucket: "openim"
endpoint: http://113.99.98.99:10005 endpoint: http://127.0.0.1:10005
accessKeyID: root accessKeyID: root
secretAccessKey: openIM123 secretAccessKey: openIM123
sessionToken: "" sessionToken: ""
@ -371,4 +371,4 @@ prometheus:
conversationPrometheusPort: [ 20230 ] conversationPrometheusPort: [ 20230 ]
rtcPrometheusPort: [ 21300 ] rtcPrometheusPort: [ 21300 ]
thirdPrometheusPort: [ 21301 ] thirdPrometheusPort: [ 21301 ]
messageTransferPrometheusPort: [ 21400, 21401, 21402, 21403 ] messageTransferPrometheusPort: [ 21400, 21401, 21402, 21403 ]

View File

@ -24,7 +24,7 @@ When pulling OpenIM's Docker images, you can choose the most suitable source bas
``` ```
bashCopy code bashCopy code
docker pull ghcr.io/openim/openim-server:latest docker pull ghcr.io/openimsdk/openim-server:latest
``` ```
- Pull from Alibaba Cloud: - Pull from Alibaba Cloud:
@ -47,7 +47,7 @@ When pulling OpenIM's Docker images, you can choose the most suitable source bas
``` ```
bashCopy code bashCopy code
docker pull ghcr.io/openim/openim-chat:latest docker pull ghcr.io/openimsdk/openim-chat:latest
``` ```
- Pull from Alibaba Cloud: - Pull from Alibaba Cloud:

2
go.mod
View File

@ -37,7 +37,7 @@ require (
require github.com/google/uuid v1.3.0 require github.com/google/uuid v1.3.0
require ( require (
github.com/OpenIMSDK/protocol v0.0.1 github.com/OpenIMSDK/protocol v0.0.2
github.com/OpenIMSDK/tools v0.0.5 github.com/OpenIMSDK/tools v0.0.5
github.com/aliyun/aliyun-oss-go-sdk v2.2.7+incompatible github.com/aliyun/aliyun-oss-go-sdk v2.2.7+incompatible
github.com/go-redis/redis v6.15.9+incompatible github.com/go-redis/redis v6.15.9+incompatible

4
go.sum
View File

@ -16,8 +16,8 @@ cloud.google.com/go/storage v1.28.1/go.mod h1:Qnisd4CqDdo6BGs2AD5LLnEsmSQ80wQ5og
firebase.google.com/go v3.13.0+incompatible h1:3TdYC3DDi6aHn20qoRkxwGqNgdjtblwVAyRLQwGn/+4= firebase.google.com/go v3.13.0+incompatible h1:3TdYC3DDi6aHn20qoRkxwGqNgdjtblwVAyRLQwGn/+4=
firebase.google.com/go v3.13.0+incompatible/go.mod h1:xlah6XbEyW6tbfSklcfe5FHJIwjt8toICdV5Wh9ptHs= firebase.google.com/go v3.13.0+incompatible/go.mod h1:xlah6XbEyW6tbfSklcfe5FHJIwjt8toICdV5Wh9ptHs=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/OpenIMSDK/protocol v0.0.1 h1:Q6J1jCU00dfqmguxw2XI+IGcVfBAkb5Tz8LgvyeNkk0= github.com/OpenIMSDK/protocol v0.0.2 h1:O53/WiqLCHF9aWPLI32GPF82hn7suM8PkhrtL89Klrw=
github.com/OpenIMSDK/protocol v0.0.1/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= github.com/OpenIMSDK/protocol v0.0.2/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y=
github.com/OpenIMSDK/tools v0.0.5 h1:yBVHJ3EpIDcp8VFKPjuGr6MQvFa3t4JByZ+vmeC06/Q= github.com/OpenIMSDK/tools v0.0.5 h1:yBVHJ3EpIDcp8VFKPjuGr6MQvFa3t4JByZ+vmeC06/Q=
github.com/OpenIMSDK/tools v0.0.5/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI= github.com/OpenIMSDK/tools v0.0.5/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI=
github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM= github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM=

View File

@ -79,6 +79,10 @@ func NewGinRouter(discov discoveryregistry.SvcDiscoveryRegistry, rdb redis.Unive
userRouterGroup.POST("/get_users", ParseToken, u.GetUsers) userRouterGroup.POST("/get_users", ParseToken, u.GetUsers)
userRouterGroup.POST("/get_users_online_status", ParseToken, u.GetUsersOnlineStatus) userRouterGroup.POST("/get_users_online_status", ParseToken, u.GetUsersOnlineStatus)
userRouterGroup.POST("/get_users_online_token_detail", ParseToken, u.GetUsersOnlineTokenDetail) userRouterGroup.POST("/get_users_online_token_detail", ParseToken, u.GetUsersOnlineTokenDetail)
userRouterGroup.POST("/subscribe_users_status", ParseToken, u.UnSubscriberStatus)
userRouterGroup.POST("/unsubscribe_users_status", ParseToken, u.UnSubscriberStatus)
userRouterGroup.POST("/get_users_status", ParseToken, u.GetUserStatus)
} }
// friend routing group // friend routing group
friendRouterGroup := r.Group("/friend", ParseToken) friendRouterGroup := r.Group("/friend", ParseToken)

View File

@ -62,6 +62,7 @@ func (u *UserApi) GetUsers(c *gin.Context) {
a2r.Call(user.UserClient.GetPaginationUsers, u.Client, c) a2r.Call(user.UserClient.GetPaginationUsers, u.Client, c)
} }
// GetUsersOnlineStatus Get user online status.
func (u *UserApi) GetUsersOnlineStatus(c *gin.Context) { func (u *UserApi) GetUsersOnlineStatus(c *gin.Context) {
var req msggateway.GetUsersOnlineStatusReq var req msggateway.GetUsersOnlineStatusReq
if err := c.BindJSON(&req); err != nil { if err := c.BindJSON(&req); err != nil {
@ -95,13 +96,13 @@ func (u *UserApi) GetUsersOnlineStatus(c *gin.Context) {
wsResult = append(wsResult, reply.SuccessResult...) wsResult = append(wsResult, reply.SuccessResult...)
} }
} }
// 遍历 api 请求体中的 userIDs // Traversing the userIDs in the api request body
for _, v1 := range req.UserIDs { for _, v1 := range req.UserIDs {
flag = false flag = false
res := new(msggateway.GetUsersOnlineStatusResp_SuccessResult) res := new(msggateway.GetUsersOnlineStatusResp_SuccessResult)
// 遍历从各个网关中获取的在线结果 // Iterate through the online results fetched from various gateways
for _, v2 := range wsResult { for _, v2 := range wsResult {
// 如果匹配上说明在线,反之 // If matches the above description on the line, and vice versa
if v2.UserID == v1 { if v2.UserID == v1 {
flag = true flag = true
res.UserID = v1 res.UserID = v1
@ -123,6 +124,7 @@ func (u *UserApi) UserRegisterCount(c *gin.Context) {
a2r.Call(user.UserClient.UserRegisterCount, u.Client, c) a2r.Call(user.UserClient.UserRegisterCount, u.Client, c)
} }
// GetUsersOnlineTokenDetail Get user online token details.
func (u *UserApi) GetUsersOnlineTokenDetail(c *gin.Context) { func (u *UserApi) GetUsersOnlineTokenDetail(c *gin.Context) {
var wsResult []*msggateway.GetUsersOnlineStatusResp_SuccessResult var wsResult []*msggateway.GetUsersOnlineStatusResp_SuccessResult
var respResult []*msggateway.SingleDetail var respResult []*msggateway.SingleDetail
@ -182,3 +184,17 @@ func (u *UserApi) GetUsersOnlineTokenDetail(c *gin.Context) {
apiresp.GinSuccess(c, respResult) apiresp.GinSuccess(c, respResult)
} }
// SubscriberStatus Presence status of subscribed users.
func (u *UserApi) SubscriberStatus(c *gin.Context) {
a2r.Call(user.UserClient.SubscribeOrCancelUsersStatus, u.Client, c)
}
// UnSubscriberStatus Unsubscribe a user's presence.
func (u *UserApi) UnSubscriberStatus(c *gin.Context) {
a2r.Call(user.UserClient.SubscribeOrCancelUsersStatus, u.Client, c)
}
func (u *UserApi) GetUserStatus(c *gin.Context) {
a2r.Call(user.UserClient.GetUserStatus, u.Client, c)
}

View File

@ -1040,7 +1040,7 @@ func (s *groupServer) GetUserReqApplicationList(ctx context.Context, req *pbGrou
groupIDs := utils.Distinct(utils.Slice(requests, func(e *relationTb.GroupRequestModel) string { groupIDs := utils.Distinct(utils.Slice(requests, func(e *relationTb.GroupRequestModel) string {
return e.GroupID return e.GroupID
})) }))
groups, err := s.GroupDatabase.FindGroup(ctx, groupIDs) groups, err := s.GroupDatabase.FindNotDismissedGroup(ctx, groupIDs)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -17,13 +17,12 @@ package user
import ( import (
"context" "context"
"errors" "errors"
"github.com/OpenIMSDK/Open-IM-Server/pkg/authverify"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/unrelation"
"github.com/OpenIMSDK/tools/log"
"strings" "strings"
"time" "time"
"github.com/OpenIMSDK/Open-IM-Server/pkg/authverify"
"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/convert" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/convert"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
@ -60,6 +59,10 @@ func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
if err != nil { if err != nil {
return err return err
} }
mongo, err := unrelation.NewMongo()
if err != nil {
return err
}
if err := db.AutoMigrate(&tablerelation.UserModel{}); err != nil { if err := db.AutoMigrate(&tablerelation.UserModel{}); err != nil {
return err return err
} }
@ -72,7 +75,8 @@ func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
} }
userDB := relation.NewUserGorm(db) userDB := relation.NewUserGorm(db)
cache := cache.NewUserCacheRedis(rdb, userDB, cache.GetDefaultOpt()) cache := cache.NewUserCacheRedis(rdb, userDB, cache.GetDefaultOpt())
database := controller.NewUserDatabase(userDB, cache, tx.NewGorm(db)) userMongoDB := unrelation.NewUserMongoDriver(mongo.GetDatabase())
database := controller.NewUserDatabase(userDB, cache, tx.NewGorm(db), userMongoDB)
friendRpcClient := rpcclient.NewFriendRpcClient(client) friendRpcClient := rpcclient.NewFriendRpcClient(client)
msgRpcClient := rpcclient.NewMessageRpcClient(client) msgRpcClient := rpcclient.NewMessageRpcClient(client)
u := &userServer{ u := &userServer{
@ -235,6 +239,7 @@ func (s *userServer) GetGlobalRecvMessageOpt(ctx context.Context, req *pbuser.Ge
return &pbuser.GetGlobalRecvMessageOptResp{GlobalRecvMsgOpt: user[0].GlobalRecvMsgOpt}, nil return &pbuser.GetGlobalRecvMessageOptResp{GlobalRecvMsgOpt: user[0].GlobalRecvMsgOpt}, nil
} }
// GetAllUserID Get user account by page.
func (s *userServer) GetAllUserID(ctx context.Context, req *pbuser.GetAllUserIDReq) (resp *pbuser.GetAllUserIDResp, err error) { func (s *userServer) GetAllUserID(ctx context.Context, req *pbuser.GetAllUserIDReq) (resp *pbuser.GetAllUserIDResp, err error) {
userIDs, err := s.UserDatabase.GetAllUserID(ctx, req.Pagination.PageNumber, req.Pagination.ShowNumber) userIDs, err := s.UserDatabase.GetAllUserID(ctx, req.Pagination.PageNumber, req.Pagination.ShowNumber)
if err != nil { if err != nil {
@ -243,6 +248,31 @@ func (s *userServer) GetAllUserID(ctx context.Context, req *pbuser.GetAllUserIDR
return &pbuser.GetAllUserIDResp{UserIDs: userIDs}, nil return &pbuser.GetAllUserIDResp{UserIDs: userIDs}, nil
} }
// SubscribeOrCancelUsersStatus Subscribe online or cancel online users.
func (s *userServer) SubscribeOrCancelUsersStatus(ctx context.Context, req *pbuser.SubscribeOrCancelUsersStatusReq) (resp *pbuser.SubscribeOrCancelUsersStatusResp, err error) { func (s *userServer) SubscribeOrCancelUsersStatus(ctx context.Context, req *pbuser.SubscribeOrCancelUsersStatusReq) (resp *pbuser.SubscribeOrCancelUsersStatusResp, err error) {
panic("implement me") err = s.UserDatabase.SubscribeOrCancelUsersStatus(ctx, req.UserID, req.UserIDs, req.Genre)
if err != nil {
return nil, err
}
//var status map[string][]string
//TODO 获取用户在线列表,返回订阅的用户的在线列表
return &pbuser.SubscribeOrCancelUsersStatusResp{}, nil
}
func (s *userServer) GetUserStatus(ctx context.Context, req *pbuser.GetUserStatusReq) (resp *pbuser.GetUserStatusResp, err error) {
//TODO 是否加一个参数校验-判断req.userID的数量每一个获取加一个限制一次请求限制500
onlineStatusList, err := s.UserDatabase.GetUserStatus(ctx, req.UserIDs)
if err != nil {
return nil, err
}
return &pbuser.GetUserStatusResp{StatusList: onlineStatusList}, nil
}
func (s *userServer) SetUserStatus(ctx context.Context, req *pbuser.SetUserStatusReq) (resp *pbuser.SetUserStatusResp, err error) {
err = s.UserDatabase.SetUserStatus(ctx, req.StatusList)
if err != nil {
return nil, err
}
return &pbuser.SetUserStatusResp{}, nil
} }

View File

@ -82,10 +82,12 @@ func InitMsgTool() (*MsgTool, error) {
discov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials())) discov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()))
userDB := relation.NewUserGorm(db) userDB := relation.NewUserGorm(db)
msgDatabase := controller.InitCommonMsgDatabase(rdb, mongo.GetDatabase()) msgDatabase := controller.InitCommonMsgDatabase(rdb, mongo.GetDatabase())
userMongoDB := unrelation.NewUserMongoDriver(mongo.GetDatabase())
userDatabase := controller.NewUserDatabase( userDatabase := controller.NewUserDatabase(
userDB, userDB,
cache.NewUserCacheRedis(rdb, relation.NewUserGorm(db), cache.GetDefaultOpt()), cache.NewUserCacheRedis(rdb, relation.NewUserGorm(db), cache.GetDefaultOpt()),
tx.NewGorm(db), tx.NewGorm(db),
userMongoDB,
) )
groupDatabase := controller.InitGroupDatabase(db, rdb, mongo.GetDatabase()) groupDatabase := controller.InitGroupDatabase(db, rdb, mongo.GetDatabase())
conversationDatabase := controller.NewConversationDatabase( conversationDatabase := controller.NewConversationDatabase(

View File

@ -16,6 +16,9 @@ package cache
import ( import (
"context" "context"
"encoding/json"
"github.com/OpenIMSDK/protocol/user"
"strconv"
"time" "time"
"github.com/dtm-labs/rockscache" "github.com/dtm-labs/rockscache"
@ -25,9 +28,12 @@ import (
) )
const ( const (
userExpireTime = time.Second * 60 * 60 * 12 userExpireTime = time.Second * 60 * 60 * 12
userInfoKey = "USER_INFO:" userInfoKey = "USER_INFO:"
userGlobalRecvMsgOptKey = "USER_GLOBAL_RECV_MSG_OPT_KEY:" userGlobalRecvMsgOptKey = "USER_GLOBAL_RECV_MSG_OPT_KEY:"
olineStatusKey = "ONLINE_STATUS:"
userOlineStatusExpireTime = time.Second * 60 * 60 * 24
statusMod = 500
) )
type UserCache interface { type UserCache interface {
@ -38,10 +44,13 @@ type UserCache interface {
DelUsersInfo(userIDs ...string) UserCache DelUsersInfo(userIDs ...string) UserCache
GetUserGlobalRecvMsgOpt(ctx context.Context, userID string) (opt int, err error) GetUserGlobalRecvMsgOpt(ctx context.Context, userID string) (opt int, err error)
DelUsersGlobalRecvMsgOpt(userIDs ...string) UserCache DelUsersGlobalRecvMsgOpt(userIDs ...string) UserCache
GetUserStatus(ctx context.Context, userIDs []string) ([]*user.OnlineStatus, error)
SetUserStatus(ctx context.Context, list []*user.OnlineStatus) error
} }
type UserCacheRedis struct { type UserCacheRedis struct {
metaCache metaCache
rdb redis.UniversalClient
userDB relationTb.UserModelInterface userDB relationTb.UserModelInterface
expireTime time.Duration expireTime time.Duration
rcClient *rockscache.Client rcClient *rockscache.Client
@ -54,6 +63,7 @@ func NewUserCacheRedis(
) UserCache { ) UserCache {
rcClient := rockscache.NewClient(rdb, options) rcClient := rockscache.NewClient(rdb, options)
return &UserCacheRedis{ return &UserCacheRedis{
rdb: rdb,
metaCache: NewMetaCacheRedis(rcClient), metaCache: NewMetaCacheRedis(rcClient),
userDB: userDB, userDB: userDB,
expireTime: userExpireTime, expireTime: userExpireTime,
@ -63,6 +73,7 @@ func NewUserCacheRedis(
func (u *UserCacheRedis) NewCache() UserCache { func (u *UserCacheRedis) NewCache() UserCache {
return &UserCacheRedis{ return &UserCacheRedis{
rdb: u.rdb,
metaCache: NewMetaCacheRedis(u.rcClient, u.metaCache.GetPreDelKeys()...), metaCache: NewMetaCacheRedis(u.rcClient, u.metaCache.GetPreDelKeys()...),
userDB: u.userDB, userDB: u.userDB,
expireTime: u.expireTime, expireTime: u.expireTime,
@ -145,3 +156,71 @@ func (u *UserCacheRedis) DelUsersGlobalRecvMsgOpt(userIDs ...string) UserCache {
cache.AddKeys(keys...) cache.AddKeys(keys...)
return cache return cache
} }
func (u *UserCacheRedis) getOnlineStatusKey(userID string) string {
return olineStatusKey + userID
}
// GetUserStatus get user status
func (u *UserCacheRedis) GetUserStatus(ctx context.Context, userIDs []string) ([]*user.OnlineStatus, error) {
var res []*user.OnlineStatus
for _, userID := range userIDs {
UserIDNum, err := strconv.Atoi(userID)
if err != nil {
return nil, err
}
var modKey = strconv.Itoa(UserIDNum % statusMod)
var onlineStatus user.OnlineStatus
key := olineStatusKey + modKey
result, err := u.rdb.HGet(ctx, key, userID).Result()
if err != nil {
if err == redis.Nil {
// key or field does not exist
res = append(res, &user.OnlineStatus{
UserID: userID,
Status: 0,
PlatformID: -1,
})
continue
} else {
return nil, err
}
}
err = json.Unmarshal([]byte(result), &onlineStatus)
if err != nil {
return nil, err
}
onlineStatus.UserID = userID
res = append(res, &onlineStatus)
}
return res, nil
}
// SetUserStatus Set the user status and save it in redis
func (u *UserCacheRedis) SetUserStatus(ctx context.Context, list []*user.OnlineStatus) error {
for _, status := range list {
var isNewKey int64
UserIDNum, err := strconv.Atoi(status.UserID)
if err != nil {
return err
}
var modKey = strconv.Itoa(UserIDNum % statusMod)
key := olineStatusKey + modKey
jsonData, err := json.Marshal(status)
if err != nil {
return err
}
isNewKey, err = u.rdb.Exists(ctx, key).Result()
if err != nil {
return err
}
_, err = u.rdb.HSet(ctx, key, status.UserID, string(jsonData)).Result()
if err != nil {
return err
}
if isNewKey > 0 {
u.rdb.Expire(ctx, key, userOlineStatusExpireTime)
}
}
return nil
}

View File

@ -39,6 +39,7 @@ type GroupDatabase interface {
CreateGroup(ctx context.Context, groups []*relationTb.GroupModel, groupMembers []*relationTb.GroupMemberModel) error CreateGroup(ctx context.Context, groups []*relationTb.GroupModel, groupMembers []*relationTb.GroupMemberModel) error
TakeGroup(ctx context.Context, groupID string) (group *relationTb.GroupModel, err error) TakeGroup(ctx context.Context, groupID string) (group *relationTb.GroupModel, err error)
FindGroup(ctx context.Context, groupIDs []string) (groups []*relationTb.GroupModel, err error) FindGroup(ctx context.Context, groupIDs []string) (groups []*relationTb.GroupModel, err error)
FindNotDismissedGroup(ctx context.Context, groupIDs []string) (groups []*relationTb.GroupModel, err error)
SearchGroup( SearchGroup(
ctx context.Context, ctx context.Context,
keyword string, keyword string,
@ -581,3 +582,7 @@ func (g *groupDatabase) CountRangeEverydayTotal(ctx context.Context, start time.
func (g *groupDatabase) FindGroupRequests(ctx context.Context, groupID string, userIDs []string) (int64, []*relationTb.GroupRequestModel, error) { func (g *groupDatabase) FindGroupRequests(ctx context.Context, groupID string, userIDs []string) (int64, []*relationTb.GroupRequestModel, error) {
return g.groupRequestDB.FindGroupRequests(ctx, groupID, userIDs) return g.groupRequestDB.FindGroupRequests(ctx, groupID, userIDs)
} }
func (g *groupDatabase) FindNotDismissedGroup(ctx context.Context, groupIDs []string) (groups []*relationTb.GroupModel, err error) {
return g.groupDB.FindNotDismissedGroup(ctx, groupIDs)
}

View File

@ -16,6 +16,9 @@ package controller
import ( import (
"context" "context"
unRelationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation"
"github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/protocol/user"
"time" "time"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
@ -26,38 +29,49 @@ import (
) )
type UserDatabase interface { type UserDatabase interface {
// 获取指定用户的信息 如有userID未找到 也返回错误 // FindWithError Get the information of the specified user. If the userID is not found, it will also return an error
FindWithError(ctx context.Context, userIDs []string) (users []*relation.UserModel, err error) FindWithError(ctx context.Context, userIDs []string) (users []*relation.UserModel, err error)
// 获取指定用户的信息 如有userID未找到 不返回错误 // Find Get the information of the specified user If the userID is not found, no error will be returned
Find(ctx context.Context, userIDs []string) (users []*relation.UserModel, err error) Find(ctx context.Context, userIDs []string) (users []*relation.UserModel, err error)
// 插入多条 外部保证userID 不重复 且在db中不存在 // Create Insert multiple external guarantees that the userID is not repeated and does not exist in the db
Create(ctx context.Context, users []*relation.UserModel) (err error) Create(ctx context.Context, users []*relation.UserModel) (err error)
// 更新(非零值) 外部保证userID存在 // Update update (non-zero value) external guarantee userID exists
Update(ctx context.Context, user *relation.UserModel) (err error) Update(ctx context.Context, user *relation.UserModel) (err error)
// 更新(零值) 外部保证userID存在 // UpdateByMap update (zero value) external guarantee userID exists
UpdateByMap(ctx context.Context, userID string, args map[string]interface{}) (err error) UpdateByMap(ctx context.Context, userID string, args map[string]interface{}) (err error)
// 如果没找到,不返回错误 // Page If not found, no error is returned
Page(ctx context.Context, pageNumber, showNumber int32) (users []*relation.UserModel, count int64, err error) Page(ctx context.Context, pageNumber, showNumber int32) (users []*relation.UserModel, count int64, err error)
// 只要有一个存在就为true // IsExist true as long as one exists
IsExist(ctx context.Context, userIDs []string) (exist bool, err error) IsExist(ctx context.Context, userIDs []string) (exist bool, err error)
// 获取所有用户ID // GetAllUserID Get all user IDs
GetAllUserID(ctx context.Context, pageNumber, showNumber int32) ([]string, error) GetAllUserID(ctx context.Context, pageNumber, showNumber int32) ([]string, error)
// 函数内部先查询db中是否存在存在则什么都不做不存在则插入 // InitOnce Inside the function, first query whether it exists in the db, if it exists, do nothing; if it does not exist, insert it
InitOnce(ctx context.Context, users []*relation.UserModel) (err error) InitOnce(ctx context.Context, users []*relation.UserModel) (err error)
// 获取用户总数 // CountTotal Get the total number of users
CountTotal(ctx context.Context, before *time.Time) (int64, error) CountTotal(ctx context.Context, before *time.Time) (int64, error)
// 获取范围内用户增量 // CountRangeEverydayTotal Get the user increment in the range
CountRangeEverydayTotal(ctx context.Context, start time.Time, end time.Time) (map[string]int64, error) CountRangeEverydayTotal(ctx context.Context, start time.Time, end time.Time) (map[string]int64, error)
//SubscribeOrCancelUsersStatus Subscribe or unsubscribe a user's presence status
SubscribeOrCancelUsersStatus(ctx context.Context, userID string, userIDs []string, genre int32) error
// GetAllSubscribeList Get a list of all subscriptions
GetAllSubscribeList(ctx context.Context, userID string) ([]string, error)
// GetSubscribedList Get all subscribed lists
GetSubscribedList(ctx context.Context, userID string) ([]string, error)
// GetUserStatus Get the online status of the user
GetUserStatus(ctx context.Context, userIDs []string) ([]*user.OnlineStatus, error)
// SetUserStatus Set the user status and store the user status in redis
SetUserStatus(ctx context.Context, list []*user.OnlineStatus) error
} }
type userDatabase struct { type userDatabase struct {
userDB relation.UserModelInterface userDB relation.UserModelInterface
cache cache.UserCache cache cache.UserCache
tx tx.Tx tx tx.Tx
mongoDB unRelationTb.UserModelInterface
} }
func NewUserDatabase(userDB relation.UserModelInterface, cache cache.UserCache, tx tx.Tx) UserDatabase { func NewUserDatabase(userDB relation.UserModelInterface, cache cache.UserCache, tx tx.Tx, mongoDB unRelationTb.UserModelInterface) UserDatabase {
return &userDatabase{userDB: userDB, cache: cache, tx: tx} return &userDatabase{userDB: userDB, cache: cache, tx: tx, mongoDB: mongoDB}
} }
func (u *userDatabase) InitOnce(ctx context.Context, users []*relation.UserModel) (err error) { func (u *userDatabase) InitOnce(ctx context.Context, users []*relation.UserModel) (err error) {
@ -75,7 +89,7 @@ func (u *userDatabase) InitOnce(ctx context.Context, users []*relation.UserModel
return nil return nil
} }
// 获取指定用户的信息 如有userID未找到 也返回错误. // FindWithError Get the information of the specified user and return an error if the userID is not found.
func (u *userDatabase) FindWithError(ctx context.Context, userIDs []string) (users []*relation.UserModel, err error) { func (u *userDatabase) FindWithError(ctx context.Context, userIDs []string) (users []*relation.UserModel, err error) {
users, err = u.cache.GetUsersInfo(ctx, userIDs) users, err = u.cache.GetUsersInfo(ctx, userIDs)
if err != nil { if err != nil {
@ -87,13 +101,13 @@ func (u *userDatabase) FindWithError(ctx context.Context, userIDs []string) (use
return return
} }
// 获取指定用户的信息 如有userID未找到 不返回错误. // Find Get the information of the specified user. If the userID is not found, no error will be returned.
func (u *userDatabase) Find(ctx context.Context, userIDs []string) (users []*relation.UserModel, err error) { func (u *userDatabase) Find(ctx context.Context, userIDs []string) (users []*relation.UserModel, err error) {
users, err = u.cache.GetUsersInfo(ctx, userIDs) users, err = u.cache.GetUsersInfo(ctx, userIDs)
return return
} }
// 插入多条 外部保证userID 不重复 且在db中不存在. // Create Insert multiple external guarantees that the userID is not repeated and does not exist in the db.
func (u *userDatabase) Create(ctx context.Context, users []*relation.UserModel) (err error) { func (u *userDatabase) Create(ctx context.Context, users []*relation.UserModel) (err error) {
if err := u.tx.Transaction(func(tx any) error { if err := u.tx.Transaction(func(tx any) error {
err = u.userDB.Create(ctx, users) err = u.userDB.Create(ctx, users)
@ -111,7 +125,7 @@ func (u *userDatabase) Create(ctx context.Context, users []*relation.UserModel)
return u.cache.DelUsersInfo(userIDs...).ExecDel(ctx) return u.cache.DelUsersInfo(userIDs...).ExecDel(ctx)
} }
// 更新(非零值) 外部保证userID存在. // Update (non-zero value) externally guarantees that userID exists.
func (u *userDatabase) Update(ctx context.Context, user *relation.UserModel) (err error) { func (u *userDatabase) Update(ctx context.Context, user *relation.UserModel) (err error) {
if err := u.userDB.Update(ctx, user); err != nil { if err := u.userDB.Update(ctx, user); err != nil {
return err return err
@ -119,7 +133,7 @@ func (u *userDatabase) Update(ctx context.Context, user *relation.UserModel) (er
return u.cache.DelUsersInfo(user.UserID).ExecDel(ctx) return u.cache.DelUsersInfo(user.UserID).ExecDel(ctx)
} }
// 更新(零值) 外部保证userID存在. // UpdateByMap update (zero value) externally guarantees that userID exists.
func (u *userDatabase) UpdateByMap(ctx context.Context, userID string, args map[string]interface{}) (err error) { func (u *userDatabase) UpdateByMap(ctx context.Context, userID string, args map[string]interface{}) (err error) {
if err := u.userDB.UpdateByMap(ctx, userID, args); err != nil { if err := u.userDB.UpdateByMap(ctx, userID, args); err != nil {
return err return err
@ -127,7 +141,7 @@ func (u *userDatabase) UpdateByMap(ctx context.Context, userID string, args map[
return u.cache.DelUsersInfo(userID).ExecDel(ctx) return u.cache.DelUsersInfo(userID).ExecDel(ctx)
} }
// 获取,如果没找到,不返回错误. // Page Gets, returns no error if not found.
func (u *userDatabase) Page( func (u *userDatabase) Page(
ctx context.Context, ctx context.Context,
pageNumber, showNumber int32, pageNumber, showNumber int32,
@ -135,7 +149,7 @@ func (u *userDatabase) Page(
return u.userDB.Page(ctx, pageNumber, showNumber) return u.userDB.Page(ctx, pageNumber, showNumber)
} }
// userIDs是否存在 只要有一个存在就为true. // IsExist Does userIDs exist? As long as there is one, it will be true.
func (u *userDatabase) IsExist(ctx context.Context, userIDs []string) (exist bool, err error) { func (u *userDatabase) IsExist(ctx context.Context, userIDs []string) (exist bool, err error) {
users, err := u.userDB.Find(ctx, userIDs) users, err := u.userDB.Find(ctx, userIDs)
if err != nil { if err != nil {
@ -147,18 +161,53 @@ func (u *userDatabase) IsExist(ctx context.Context, userIDs []string) (exist boo
return false, nil return false, nil
} }
// GetAllUserID Get all user IDs
func (u *userDatabase) GetAllUserID(ctx context.Context, pageNumber, showNumber int32) (userIDs []string, err error) { func (u *userDatabase) GetAllUserID(ctx context.Context, pageNumber, showNumber int32) (userIDs []string, err error) {
return u.userDB.GetAllUserID(ctx, pageNumber, showNumber) return u.userDB.GetAllUserID(ctx, pageNumber, showNumber)
} }
// CountTotal Get the total number of users
func (u *userDatabase) CountTotal(ctx context.Context, before *time.Time) (count int64, err error) { func (u *userDatabase) CountTotal(ctx context.Context, before *time.Time) (count int64, err error) {
return u.userDB.CountTotal(ctx, before) return u.userDB.CountTotal(ctx, before)
} }
func (u *userDatabase) CountRangeEverydayTotal( // CountRangeEverydayTotal Get the user increment in the range
ctx context.Context, func (u *userDatabase) CountRangeEverydayTotal(ctx context.Context, start time.Time, end time.Time) (map[string]int64, error) {
start time.Time,
end time.Time,
) (map[string]int64, error) {
return u.userDB.CountRangeEverydayTotal(ctx, start, end) return u.userDB.CountRangeEverydayTotal(ctx, start, end)
} }
//SubscribeOrCancelUsersStatus Subscribe or unsubscribe a user's presence status
func (u *userDatabase) SubscribeOrCancelUsersStatus(ctx context.Context, userID string, userIDs []string, genre int32) error {
var err error
if genre == constant.SubscriberUser {
err = u.mongoDB.AddSubscriptionList(ctx, userID, userIDs)
} else if genre == constant.Unsubscribe {
err = u.mongoDB.UnsubscriptionList(ctx, userID, userIDs)
}
return err
}
// GetAllSubscribeList Get a list of all subscriptions.
func (u *userDatabase) GetAllSubscribeList(ctx context.Context, userID string) ([]string, error) {
//TODO 获取所有订阅
return nil, nil
}
// GetSubscribedList Get all subscribed lists
func (u *userDatabase) GetSubscribedList(ctx context.Context, userID string) ([]string, error) {
//TODO 获取所有被订阅
return nil, nil
}
// GetUserStatus get user status
func (u *userDatabase) GetUserStatus(ctx context.Context, userIDs []string) ([]*user.OnlineStatus, error) {
onlineStatusList, err := u.cache.GetUserStatus(ctx, userIDs)
return onlineStatusList, err
}
// SetUserStatus Set the user status and save it in redis
func (u *userDatabase) SetUserStatus(ctx context.Context, list []*user.OnlineStatus) error {
return u.cache.SetUserStatus(ctx, list)
}

View File

@ -99,3 +99,7 @@ func (g *GroupGorm) CountRangeEverydayTotal(ctx context.Context, start time.Time
} }
return v, nil return v, nil
} }
func (g *GroupGorm) FindNotDismissedGroup(ctx context.Context, groupIDs []string) (groups []*relation.GroupModel, err error) {
return groups, utils.Wrap(g.DB.Where("group_id in (?) and status != ?", groupIDs, constant.GroupStatusDismissed).Find(&groups).Error, "")
}

View File

@ -51,6 +51,7 @@ type GroupModelInterface interface {
UpdateMap(ctx context.Context, groupID string, args map[string]interface{}) (err error) UpdateMap(ctx context.Context, groupID string, args map[string]interface{}) (err error)
UpdateStatus(ctx context.Context, groupID string, status int32) (err error) UpdateStatus(ctx context.Context, groupID string, status int32) (err error)
Find(ctx context.Context, groupIDs []string) (groups []*GroupModel, err error) Find(ctx context.Context, groupIDs []string) (groups []*GroupModel, err error)
FindNotDismissedGroup(ctx context.Context, groupIDs []string) (groups []*GroupModel, err error)
Take(ctx context.Context, groupID string) (group *GroupModel, err error) Take(ctx context.Context, groupID string) (group *GroupModel, err error)
Search( Search(
ctx context.Context, ctx context.Context,

View File

@ -0,0 +1,42 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package unrelation
import "context"
// SubscribeUser collection constant.
const (
SubscribeUser = "subscribe_user"
)
// UserModel collection structure.
type UserModel struct {
UserID string `bson:"user_id" json:"userID"`
UserIDList []string `bson:"user_id_list" json:"userIDList"`
}
func (UserModel) TableName() string {
return SubscribeUser
}
// UserModelInterface Operation interface of user mongodb.
type UserModelInterface interface {
// AddSubscriptionList Subscriber's handling of thresholds.
AddSubscriptionList(ctx context.Context, userID string, userIDList []string) error
// UnsubscriptionList Handling of unsubscribe.
UnsubscriptionList(ctx context.Context, userID string, userIDList []string) error
// RemoveSubscribedListFromUser Among the unsubscribed users, delete the user from the subscribed list.
RemoveSubscribedListFromUser(ctx context.Context, userID string, userIDList []string) error
}

View File

@ -0,0 +1,141 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package unrelation
import (
"context"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation"
"github.com/OpenIMSDK/tools/utils"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"log"
)
// prefixes and suffixes.
const (
SubscriptionPrefix = "subscription_prefix"
SubscribedPrefix = "subscribed_prefix"
)
// MaximumSubscription Maximum number of subscriptions.
const (
MaximumSubscription = 3000
)
func NewUserMongoDriver(database *mongo.Database) unrelation.UserModelInterface {
return &UserMongoDriver{
userCollection: database.Collection(unrelation.SubscribeUser),
}
}
type UserMongoDriver struct {
userCollection *mongo.Collection
}
// AddSubscriptionList Subscriber's handling of thresholds.
func (u *UserMongoDriver) AddSubscriptionList(ctx context.Context, userID string, userIDList []string) error {
// Check the number of lists in the key.
filter := bson.M{SubscriptionPrefix + userID: bson.M{"$size": 1}}
result, err := u.userCollection.Find(context.Background(), filter)
if err != nil {
return err
}
var newUserIDList []string
for result.Next(context.Background()) {
err := result.Decode(&newUserIDList)
if err != nil {
log.Fatal(err)
}
}
// If the threshold is exceeded, pop out the previous MaximumSubscription - len(userIDList) and insert it.
if len(newUserIDList)+len(userIDList) > MaximumSubscription {
newUserIDList = newUserIDList[MaximumSubscription-len(userIDList):]
_, err := u.userCollection.UpdateOne(
ctx,
bson.M{"user_id": SubscriptionPrefix + userID},
bson.M{"$set": bson.M{"user_id_list": newUserIDList}},
)
if err != nil {
return err
}
//for i := 1; i <= MaximumSubscription-len(userIDList); i++ {
// _, err := u.userCollection.UpdateOne(
// ctx,
// bson.M{"user_id": SubscriptionPrefix + userID},
// bson.M{SubscriptionPrefix + userID: bson.M{"$pop": -1}},
// )
// if err != nil {
// return err
// }
//}
}
upsert := true
opts := &options.UpdateOptions{
Upsert: &upsert,
}
_, err = u.userCollection.UpdateOne(
ctx,
bson.M{"user_id": SubscriptionPrefix + userID},
bson.M{"$addToSet": bson.M{"user_id_list": bson.M{"$each": userIDList}}},
opts,
)
if err != nil {
return err
}
for _, user := range userIDList {
_, err = u.userCollection.UpdateOne(
ctx,
bson.M{"user_id": SubscribedPrefix + user},
bson.M{"$addToSet": bson.M{"user_id_list": userID}},
opts,
)
if err != nil {
return utils.Wrap(err, "transaction failed")
}
}
return nil
}
// UnsubscriptionList Handling of unsubscribe.
func (u *UserMongoDriver) UnsubscriptionList(ctx context.Context, userID string, userIDList []string) error {
_, err := u.userCollection.UpdateOne(
ctx,
bson.M{"user_id": SubscriptionPrefix + userID},
bson.M{"$pull": bson.M{"user_id_list": bson.M{"$in": userIDList}}},
)
if err != nil {
return err
}
err = u.RemoveSubscribedListFromUser(ctx, userID, userIDList)
if err != nil {
return err
}
return nil
}
// RemoveSubscribedListFromUser Among the unsubscribed users, delete the user from the subscribed list.
func (u *UserMongoDriver) RemoveSubscribedListFromUser(ctx context.Context, userID string, userIDList []string) error {
var newUserIDList []string
for _, value := range userIDList {
newUserIDList = append(newUserIDList, SubscribedPrefix+value)
}
_, err := u.userCollection.UpdateOne(
ctx,
bson.M{"user_id": bson.M{"$in": newUserIDList}},
bson.M{"$pull": bson.M{"user_id_list": userID}},
)
return utils.Wrap(err, "")
}