mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-05 20:11:14 +08:00
feat: group members, friends sorting version, client online subscription (#2427)
* fix: GroupApplicationAcceptedNotification * fix: GroupApplicationAcceptedNotification * fix: NotificationUserInfoUpdate * cicd: robot automated Change * fix: component * fix: getConversationInfo * feat: cron task * feat: cron task * feat: cron task * feat: cron task * feat: cron task * fix: minio config url recognition error * new mongo * new mongo * new mongo * new mongo * new mongo * new mongo * new mongo * new mongo * friend incr sync * friend incr sync * friend incr sync * friend incr sync * friend incr sync * mage * optimization version log * optimization version log * sync * sync * sync * group sync * sync option * sync option * refactor: replace `friend` package with `realtion`. * refactor: update lastest commit to relation. * sync option * sync option * sync option * sync * sync * go.mod * seq * update: go mod * refactor: change incremental to full * feat: get full friend user ids * feat: api and config * seq * group version * merge * seq * seq * seq * fix: sort by id avoid unstable sort friends. * group * group * group * fix: sort by id avoid unstable sort friends. * fix: sort by id avoid unstable sort friends. * fix: sort by id avoid unstable sort friends. * user version * seq * seq * seq user * user online * implement minio expire delete. * user online * config * fix * fix * implement minio expire delete logic. * online cache * online cache * online cache * online cache * online cache * online cache * online cache * online cache * online cache * online cache * online cache * online cache * feat: implement scheduled delete outdated object in minio. * update gomake version * update gomake version * implement FindExpires pagination. * remove unnesseary incr. * fix uncorrect args call. * online push * online push * online push * resolving conflicts * resolving conflicts * test * api prommetrics * api prommetrics * api prommetrics * api prommetrics * api prommetrics * rpc prommetrics * rpc prommetrics * online status * online status * online status * online status * sub * conversation version incremental * merge seq * merge online * merge online * merge online * merge seq * GetOwnerConversation * fix: change incremental syncer router name. * rockscache batch get * rockscache seq batch get * fix: GetMsgDocModelByIndex bug * update go.mod * update go.mod * merge * feat: prometheus * feat: prometheus * group member sort * sub * sub * fix: seq conversion bug * fix: redis pipe exec * sort version * sort version * sort version * remove old version online subscription * remove old version online subscription * version log index --------- Co-authored-by: withchao <withchao@users.noreply.github.com> Co-authored-by: Monet Lee <monet_lee@163.com> Co-authored-by: OpenIM-Gordon <46924906+FGadvancer@users.noreply.github.com> Co-authored-by: icey-yu <1186114839@qq.com>
This commit is contained in:
parent
01f62c8baf
commit
d945a07549
2
go.mod
2
go.mod
@ -12,7 +12,7 @@ require (
|
|||||||
github.com/gorilla/websocket v1.5.1
|
github.com/gorilla/websocket v1.5.1
|
||||||
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
|
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
|
||||||
github.com/mitchellh/mapstructure v1.5.0
|
github.com/mitchellh/mapstructure v1.5.0
|
||||||
github.com/openimsdk/protocol v0.0.69-alpha.30
|
github.com/openimsdk/protocol v0.0.69-alpha.38
|
||||||
github.com/openimsdk/tools v0.0.49-alpha.51
|
github.com/openimsdk/tools v0.0.49-alpha.51
|
||||||
github.com/pkg/errors v0.9.1 // indirect
|
github.com/pkg/errors v0.9.1 // indirect
|
||||||
github.com/prometheus/client_golang v1.18.0
|
github.com/prometheus/client_golang v1.18.0
|
||||||
|
4
go.sum
4
go.sum
@ -319,8 +319,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
|
|||||||
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
|
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
|
||||||
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
|
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
|
||||||
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
|
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
|
||||||
github.com/openimsdk/protocol v0.0.69-alpha.30 h1:OXzCIpDpIY/GI6h1SDYWN51OS9Xv/BcHaOwq8whPKqI=
|
github.com/openimsdk/protocol v0.0.69-alpha.38 h1:kVZCHIXg/el8YJFoIBWhZu1sbbTUqmzgF4l0W3sUH24=
|
||||||
github.com/openimsdk/protocol v0.0.69-alpha.30/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
|
github.com/openimsdk/protocol v0.0.69-alpha.38/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
|
||||||
github.com/openimsdk/tools v0.0.49-alpha.51 h1:JTPEetVSNOczw1n+XjiPozaH2SBPQAc+9VlPE41wEeY=
|
github.com/openimsdk/tools v0.0.49-alpha.51 h1:JTPEetVSNOczw1n+XjiPozaH2SBPQAc+9VlPE41wEeY=
|
||||||
github.com/openimsdk/tools v0.0.49-alpha.51/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
|
github.com/openimsdk/tools v0.0.49-alpha.51/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
|
||||||
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
|
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
|
||||||
|
@ -75,8 +75,8 @@ type Client struct {
|
|||||||
token string
|
token string
|
||||||
hbCtx context.Context
|
hbCtx context.Context
|
||||||
hbCancel context.CancelFunc
|
hbCancel context.CancelFunc
|
||||||
subLock sync.Mutex
|
subLock *sync.Mutex
|
||||||
subUserIDs map[string]struct{}
|
subUserIDs map[string]struct{} // client conn subscription list
|
||||||
}
|
}
|
||||||
|
|
||||||
// ResetClient updates the client's state with new connection and context information.
|
// ResetClient updates the client's state with new connection and context information.
|
||||||
@ -94,6 +94,11 @@ func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, longConnServer
|
|||||||
c.closedErr = nil
|
c.closedErr = nil
|
||||||
c.token = ctx.GetToken()
|
c.token = ctx.GetToken()
|
||||||
c.hbCtx, c.hbCancel = context.WithCancel(c.ctx)
|
c.hbCtx, c.hbCancel = context.WithCancel(c.ctx)
|
||||||
|
c.subLock = new(sync.Mutex)
|
||||||
|
if c.subUserIDs != nil {
|
||||||
|
clear(c.subUserIDs)
|
||||||
|
}
|
||||||
|
c.subUserIDs = make(map[string]struct{})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) pingHandler(appData string) error {
|
func (c *Client) pingHandler(appData string) error {
|
||||||
@ -246,13 +251,11 @@ func (c *Client) setAppBackgroundStatus(ctx context.Context, req *Req) ([]byte,
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) close() {
|
func (c *Client) close() {
|
||||||
|
c.w.Lock()
|
||||||
|
defer c.w.Unlock()
|
||||||
if c.closed.Load() {
|
if c.closed.Load() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
c.w.Lock()
|
|
||||||
defer c.w.Unlock()
|
|
||||||
|
|
||||||
c.closed.Store(true)
|
c.closed.Store(true)
|
||||||
c.conn.Close()
|
c.conn.Close()
|
||||||
c.hbCancel() // Close server-initiated heartbeat.
|
c.hbCancel() // Close server-initiated heartbeat.
|
||||||
@ -313,6 +316,14 @@ func (c *Client) KickOnlineMessage() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Client) PushUserOnlineStatus(data []byte) error {
|
||||||
|
resp := Resp{
|
||||||
|
ReqIdentifier: WsSubUserOnlineStatus,
|
||||||
|
Data: data,
|
||||||
|
}
|
||||||
|
return c.writeBinaryMsg(resp)
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Client) writeBinaryMsg(resp Resp) error {
|
func (c *Client) writeBinaryMsg(resp Resp) error {
|
||||||
if c.closed.Load() {
|
if c.closed.Load() {
|
||||||
return nil
|
return nil
|
||||||
|
@ -2,15 +2,11 @@ package msggateway
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
|
||||||
"github.com/openimsdk/protocol/constant"
|
|
||||||
"github.com/openimsdk/protocol/sdkws"
|
"github.com/openimsdk/protocol/sdkws"
|
||||||
"github.com/openimsdk/tools/log"
|
"github.com/openimsdk/tools/log"
|
||||||
"github.com/openimsdk/tools/utils/datautil"
|
"github.com/openimsdk/tools/utils/datautil"
|
||||||
"github.com/openimsdk/tools/utils/idutil"
|
|
||||||
"google.golang.org/protobuf/proto"
|
"google.golang.org/protobuf/proto"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func (ws *WsServer) subscriberUserOnlineStatusChanges(ctx context.Context, userID string, platformIDs []int32) {
|
func (ws *WsServer) subscriberUserOnlineStatusChanges(ctx context.Context, userID string, platformIDs []int32) {
|
||||||
@ -45,33 +41,19 @@ func (ws *WsServer) SubUserOnlineStatus(ctx context.Context, client *Client, dat
|
|||||||
return proto.Marshal(&resp)
|
return proto.Marshal(&resp)
|
||||||
}
|
}
|
||||||
|
|
||||||
type subClient struct {
|
|
||||||
clients map[string]*Client
|
|
||||||
}
|
|
||||||
|
|
||||||
func newSubscription() *Subscription {
|
func newSubscription() *Subscription {
|
||||||
return &Subscription{
|
return &Subscription{
|
||||||
userIDs: make(map[string]*subClient),
|
userIDs: make(map[string]*subClient),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type Subscription struct {
|
type subClient struct {
|
||||||
lock sync.RWMutex
|
clients map[string]*Client
|
||||||
userIDs map[string]*subClient
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Subscription) GetClient(userID string) []*Client {
|
type Subscription struct {
|
||||||
s.lock.RLock()
|
lock sync.RWMutex
|
||||||
defer s.lock.RUnlock()
|
userIDs map[string]*subClient // subscribe to the user's client connection
|
||||||
cs, ok := s.userIDs[userID]
|
|
||||||
if !ok {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
clients := make([]*Client, 0, len(cs.clients))
|
|
||||||
for _, client := range cs.clients {
|
|
||||||
clients = append(clients, client)
|
|
||||||
}
|
|
||||||
return clients
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Subscription) DelClient(client *Client) {
|
func (s *Subscription) DelClient(client *Client) {
|
||||||
@ -99,6 +81,20 @@ func (s *Subscription) DelClient(client *Client) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Subscription) GetClient(userID string) []*Client {
|
||||||
|
s.lock.RLock()
|
||||||
|
defer s.lock.RUnlock()
|
||||||
|
cs, ok := s.userIDs[userID]
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
clients := make([]*Client, 0, len(cs.clients))
|
||||||
|
for _, client := range cs.clients {
|
||||||
|
clients = append(clients, client)
|
||||||
|
}
|
||||||
|
return clients
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Subscription) Sub(client *Client, addUserIDs, delUserIDs []string) {
|
func (s *Subscription) Sub(client *Client, addUserIDs, delUserIDs []string) {
|
||||||
if len(addUserIDs)+len(delUserIDs) == 0 {
|
if len(addUserIDs)+len(delUserIDs) == 0 {
|
||||||
return
|
return
|
||||||
@ -121,6 +117,7 @@ func (s *Subscription) Sub(client *Client, addUserIDs, delUserIDs []string) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
client.subUserIDs[userID] = struct{}{}
|
client.subUserIDs[userID] = struct{}{}
|
||||||
|
add[userID] = struct{}{}
|
||||||
}
|
}
|
||||||
client.subLock.Unlock()
|
client.subLock.Unlock()
|
||||||
if len(del)+len(add) == 0 {
|
if len(del)+len(add) == 0 {
|
||||||
@ -154,28 +151,16 @@ func (ws *WsServer) pushUserIDOnlineStatus(ctx context.Context, userID string, p
|
|||||||
if len(clients) == 0 {
|
if len(clients) == 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
msgContent, err := json.Marshal(platformIDs)
|
onlineStatus, err := proto.Marshal(&sdkws.SubUserOnlineStatusTips{
|
||||||
|
Subscribers: []*sdkws.SubUserOnlineStatusElem{{UserID: userID, OnlinePlatformIDs: platformIDs}},
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.ZError(ctx, "pushUserIDOnlineStatus json.Marshal", err)
|
log.ZError(ctx, "pushUserIDOnlineStatus json.Marshal", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
now := time.Now().UnixMilli()
|
|
||||||
msgID := idutil.GetMsgIDByMD5(userID)
|
|
||||||
msg := &sdkws.MsgData{
|
|
||||||
SendID: userID,
|
|
||||||
ClientMsgID: msgID,
|
|
||||||
ServerMsgID: msgID,
|
|
||||||
SenderPlatformID: constant.AdminPlatformID,
|
|
||||||
SessionType: constant.NotificationChatType,
|
|
||||||
ContentType: constant.UserSubscribeOnlineStatusNotification,
|
|
||||||
Content: msgContent,
|
|
||||||
SendTime: now,
|
|
||||||
CreateTime: now,
|
|
||||||
}
|
|
||||||
for _, client := range clients {
|
for _, client := range clients {
|
||||||
msg.RecvID = client.UserID
|
if err := client.PushUserOnlineStatus(onlineStatus); err != nil {
|
||||||
if err := client.PushMessage(ctx, msg); err != nil {
|
log.ZError(ctx, "UserSubscribeOnlineStatusNotification push failed", err, "userID", client.UserID, "platformID", client.PlatformID, "changeUserID", userID, "changePlatformID", platformIDs)
|
||||||
log.ZError(ctx, "UserSubscribeOnlineStatusNotification push failed", err, "userID", client.UserID, "platformID", client.PlatformID, "changeUserID", userID, "content", msgContent)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -358,9 +358,7 @@ func (ws *WsServer) unregisterClient(client *Client) {
|
|||||||
prommetrics.OnlineUserGauge.Dec()
|
prommetrics.OnlineUserGauge.Dec()
|
||||||
}
|
}
|
||||||
ws.onlineUserConnNum.Add(-1)
|
ws.onlineUserConnNum.Add(-1)
|
||||||
client.subLock.Lock()
|
ws.subscription.DelClient(client)
|
||||||
clear(client.subUserIDs)
|
|
||||||
client.subLock.Unlock()
|
|
||||||
//ws.SetUserOnlineStatus(client.ctx, client, constant.Offline)
|
//ws.SetUserOnlineStatus(client.ctx, client, constant.Offline)
|
||||||
log.ZInfo(client.ctx, "user offline", "close reason", client.closedErr, "online user Num",
|
log.ZInfo(client.ctx, "user offline", "close reason", client.closedErr, "online user Num",
|
||||||
ws.onlineUserNum.Load(), "online user conn Num",
|
ws.onlineUserNum.Load(), "online user conn Num",
|
||||||
|
@ -16,6 +16,8 @@ package friend
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/versionctx"
|
||||||
|
|
||||||
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||||
|
|
||||||
@ -191,10 +193,37 @@ func (f *FriendNotificationSender) FriendDeletedNotification(ctx context.Context
|
|||||||
f.Notification(ctx, req.OwnerUserID, req.FriendUserID, constant.FriendDeletedNotification, &tips)
|
f.Notification(ctx, req.OwnerUserID, req.FriendUserID, constant.FriendDeletedNotification, &tips)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (f *FriendNotificationSender) setVersion(ctx context.Context, version *uint64, versionID *string, collName string, id string) {
|
||||||
|
versions := versionctx.GetVersionLog(ctx).Get()
|
||||||
|
for _, coll := range versions {
|
||||||
|
if coll.Name == collName && coll.Doc.DID == id {
|
||||||
|
*version = uint64(coll.Doc.Version)
|
||||||
|
*versionID = coll.Doc.ID.Hex()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *FriendNotificationSender) setSortVersion(ctx context.Context, version *uint64, versionID *string, collName string, id string, sortVersion *uint64) {
|
||||||
|
versions := versionctx.GetVersionLog(ctx).Get()
|
||||||
|
for _, coll := range versions {
|
||||||
|
if coll.Name == collName && coll.Doc.DID == id {
|
||||||
|
*version = uint64(coll.Doc.Version)
|
||||||
|
*versionID = coll.Doc.ID.Hex()
|
||||||
|
for _, elem := range coll.Doc.Logs {
|
||||||
|
if elem.EID == relationtb.VersionSortChangeID {
|
||||||
|
*sortVersion = uint64(elem.Version)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (f *FriendNotificationSender) FriendRemarkSetNotification(ctx context.Context, fromUserID, toUserID string) {
|
func (f *FriendNotificationSender) FriendRemarkSetNotification(ctx context.Context, fromUserID, toUserID string) {
|
||||||
tips := sdkws.FriendInfoChangedTips{FromToUserID: &sdkws.FromToUserID{}}
|
tips := sdkws.FriendInfoChangedTips{FromToUserID: &sdkws.FromToUserID{}}
|
||||||
tips.FromToUserID.FromUserID = fromUserID
|
tips.FromToUserID.FromUserID = fromUserID
|
||||||
tips.FromToUserID.ToUserID = toUserID
|
tips.FromToUserID.ToUserID = toUserID
|
||||||
|
f.setSortVersion(ctx, &tips.FriendVersion, &tips.FriendVersionID, database.FriendVersionName, toUserID, &tips.FriendSortVersion)
|
||||||
f.Notification(ctx, fromUserID, toUserID, constant.FriendRemarkSetNotification, &tips)
|
f.Notification(ctx, fromUserID, toUserID, constant.FriendRemarkSetNotification, &tips)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/util/hashutil"
|
"github.com/openimsdk/open-im-server/v3/pkg/util/hashutil"
|
||||||
"github.com/openimsdk/protocol/sdkws"
|
"github.com/openimsdk/protocol/sdkws"
|
||||||
|
"slices"
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/internal/rpc/incrversion"
|
"github.com/openimsdk/open-im-server/v3/internal/rpc/incrversion"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
|
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
|
||||||
@ -52,12 +53,27 @@ func (s *friendServer) GetIncrementalFriends(ctx context.Context, req *relation.
|
|||||||
if err := authverify.CheckAccessV3(ctx, req.UserID, s.config.Share.IMAdminUserID); err != nil {
|
if err := authverify.CheckAccessV3(ctx, req.UserID, s.config.Share.IMAdminUserID); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
var sortVersion uint64
|
||||||
opt := incrversion.Option[*sdkws.FriendInfo, relation.GetIncrementalFriendsResp]{
|
opt := incrversion.Option[*sdkws.FriendInfo, relation.GetIncrementalFriendsResp]{
|
||||||
Ctx: ctx,
|
Ctx: ctx,
|
||||||
VersionKey: req.UserID,
|
VersionKey: req.UserID,
|
||||||
VersionID: req.VersionID,
|
VersionID: req.VersionID,
|
||||||
VersionNumber: req.Version,
|
VersionNumber: req.Version,
|
||||||
Version: s.db.FindFriendIncrVersion,
|
Version: func(ctx context.Context, ownerUserID string, version uint, limit int) (*model.VersionLog, error) {
|
||||||
|
vl, err := s.db.FindFriendIncrVersion(ctx, ownerUserID, version, limit)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
vl.Logs = slices.DeleteFunc(vl.Logs, func(elem model.VersionLogElem) bool {
|
||||||
|
if elem.EID == model.VersionSortChangeID {
|
||||||
|
vl.LogLen--
|
||||||
|
sortVersion = uint64(elem.Version)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
})
|
||||||
|
return vl, nil
|
||||||
|
},
|
||||||
CacheMaxVersion: s.db.FindMaxFriendVersionCache,
|
CacheMaxVersion: s.db.FindMaxFriendVersionCache,
|
||||||
Find: func(ctx context.Context, ids []string) ([]*sdkws.FriendInfo, error) {
|
Find: func(ctx context.Context, ids []string) ([]*sdkws.FriendInfo, error) {
|
||||||
return s.getFriend(ctx, req.UserID, ids)
|
return s.getFriend(ctx, req.UserID, ids)
|
||||||
@ -65,12 +81,13 @@ func (s *friendServer) GetIncrementalFriends(ctx context.Context, req *relation.
|
|||||||
ID: func(elem *sdkws.FriendInfo) string { return elem.FriendUser.UserID },
|
ID: func(elem *sdkws.FriendInfo) string { return elem.FriendUser.UserID },
|
||||||
Resp: func(version *model.VersionLog, deleteIds []string, insertList, updateList []*sdkws.FriendInfo, full bool) *relation.GetIncrementalFriendsResp {
|
Resp: func(version *model.VersionLog, deleteIds []string, insertList, updateList []*sdkws.FriendInfo, full bool) *relation.GetIncrementalFriendsResp {
|
||||||
return &relation.GetIncrementalFriendsResp{
|
return &relation.GetIncrementalFriendsResp{
|
||||||
VersionID: version.ID.Hex(),
|
VersionID: version.ID.Hex(),
|
||||||
Version: uint64(version.Version),
|
Version: uint64(version.Version),
|
||||||
Full: full,
|
Full: full,
|
||||||
Delete: deleteIds,
|
Delete: deleteIds,
|
||||||
Insert: insertList,
|
Insert: insertList,
|
||||||
Update: updateList,
|
Update: updateList,
|
||||||
|
SortVersion: sortVersion,
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -306,6 +306,21 @@ func (g *GroupNotificationSender) setVersion(ctx context.Context, version *uint6
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (g *GroupNotificationSender) setSortVersion(ctx context.Context, version *uint64, versionID *string, collName string, id string, sortVersion *uint64) {
|
||||||
|
versions := versionctx.GetVersionLog(ctx).Get()
|
||||||
|
for _, coll := range versions {
|
||||||
|
if coll.Name == collName && coll.Doc.DID == id {
|
||||||
|
*version = uint64(coll.Doc.Version)
|
||||||
|
*versionID = coll.Doc.ID.Hex()
|
||||||
|
for _, elem := range coll.Doc.Logs {
|
||||||
|
if elem.EID == model.VersionSortChangeID {
|
||||||
|
*sortVersion = uint64(elem.Version)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (g *GroupNotificationSender) GroupCreatedNotification(ctx context.Context, tips *sdkws.GroupCreatedTips) {
|
func (g *GroupNotificationSender) GroupCreatedNotification(ctx context.Context, tips *sdkws.GroupCreatedTips) {
|
||||||
var err error
|
var err error
|
||||||
defer func() {
|
defer func() {
|
||||||
@ -707,7 +722,7 @@ func (g *GroupNotificationSender) GroupMemberInfoSetNotification(ctx context.Con
|
|||||||
if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
|
if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
g.setVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, tips.Group.GroupID)
|
g.setSortVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, tips.Group.GroupID, &tips.GroupSortVersion)
|
||||||
g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberInfoSetNotification, tips)
|
g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberInfoSetNotification, tips)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -10,9 +10,13 @@ import (
|
|||||||
"github.com/openimsdk/protocol/constant"
|
"github.com/openimsdk/protocol/constant"
|
||||||
pbgroup "github.com/openimsdk/protocol/group"
|
pbgroup "github.com/openimsdk/protocol/group"
|
||||||
"github.com/openimsdk/protocol/sdkws"
|
"github.com/openimsdk/protocol/sdkws"
|
||||||
"slices"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func (s *groupServer) BatchGetIncrementalGroupMember(ctx context.Context, req *pbgroup.BatchGetIncrementalGroupMemberReq) (*pbgroup.BatchGetIncrementalGroupMemberResp, error) {
|
||||||
|
//TODO implement me
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
func (s *groupServer) GetFullGroupMemberUserIDs(ctx context.Context, req *pbgroup.GetFullGroupMemberUserIDsReq) (*pbgroup.GetFullGroupMemberUserIDsResp, error) {
|
func (s *groupServer) GetFullGroupMemberUserIDs(ctx context.Context, req *pbgroup.GetFullGroupMemberUserIDsReq) (*pbgroup.GetFullGroupMemberUserIDsResp, error) {
|
||||||
vl, err := s.db.FindMaxGroupMemberVersionCache(ctx, req.GroupID)
|
vl, err := s.db.FindMaxGroupMemberVersionCache(ctx, req.GroupID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -63,7 +67,10 @@ func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgrou
|
|||||||
if group.Status == constant.GroupStatusDismissed {
|
if group.Status == constant.GroupStatusDismissed {
|
||||||
return nil, servererrs.ErrDismissedAlready.Wrap()
|
return nil, servererrs.ErrDismissedAlready.Wrap()
|
||||||
}
|
}
|
||||||
var hasGroupUpdate bool
|
var (
|
||||||
|
hasGroupUpdate bool
|
||||||
|
sortVersion uint64
|
||||||
|
)
|
||||||
opt := incrversion.Option[*sdkws.GroupMemberFullInfo, pbgroup.GetIncrementalGroupMemberResp]{
|
opt := incrversion.Option[*sdkws.GroupMemberFullInfo, pbgroup.GetIncrementalGroupMemberResp]{
|
||||||
Ctx: ctx,
|
Ctx: ctx,
|
||||||
VersionKey: req.GroupID,
|
VersionKey: req.GroupID,
|
||||||
@ -74,14 +81,20 @@ func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgrou
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
vl.Logs = slices.DeleteFunc(vl.Logs, func(elem model.VersionLogElem) bool {
|
logs := make([]model.VersionLogElem, 0, len(vl.Logs))
|
||||||
if elem.EID == "" {
|
for i, log := range vl.Logs {
|
||||||
|
switch log.EID {
|
||||||
|
case model.VersionGroupChangeID:
|
||||||
vl.LogLen--
|
vl.LogLen--
|
||||||
hasGroupUpdate = true
|
hasGroupUpdate = true
|
||||||
return true
|
case model.VersionSortChangeID:
|
||||||
|
vl.LogLen--
|
||||||
|
sortVersion = uint64(log.Version)
|
||||||
|
default:
|
||||||
|
logs = append(logs, vl.Logs[i])
|
||||||
}
|
}
|
||||||
return false
|
}
|
||||||
})
|
vl.Logs = logs
|
||||||
if vl.LogLen > 0 {
|
if vl.LogLen > 0 {
|
||||||
hasGroupUpdate = true
|
hasGroupUpdate = true
|
||||||
}
|
}
|
||||||
@ -94,12 +107,13 @@ func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgrou
|
|||||||
ID: func(elem *sdkws.GroupMemberFullInfo) string { return elem.UserID },
|
ID: func(elem *sdkws.GroupMemberFullInfo) string { return elem.UserID },
|
||||||
Resp: func(version *model.VersionLog, delIDs []string, insertList, updateList []*sdkws.GroupMemberFullInfo, full bool) *pbgroup.GetIncrementalGroupMemberResp {
|
Resp: func(version *model.VersionLog, delIDs []string, insertList, updateList []*sdkws.GroupMemberFullInfo, full bool) *pbgroup.GetIncrementalGroupMemberResp {
|
||||||
return &pbgroup.GetIncrementalGroupMemberResp{
|
return &pbgroup.GetIncrementalGroupMemberResp{
|
||||||
VersionID: version.ID.Hex(),
|
VersionID: version.ID.Hex(),
|
||||||
Version: uint64(version.Version),
|
Version: uint64(version.Version),
|
||||||
Full: full,
|
Full: full,
|
||||||
Delete: delIDs,
|
Delete: delIDs,
|
||||||
Insert: insertList,
|
Insert: insertList,
|
||||||
Update: updateList,
|
Update: updateList,
|
||||||
|
SortVersion: sortVersion,
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -3,7 +3,6 @@ package user
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"github.com/openimsdk/protocol/constant"
|
"github.com/openimsdk/protocol/constant"
|
||||||
"github.com/openimsdk/protocol/sdkws"
|
|
||||||
pbuser "github.com/openimsdk/protocol/user"
|
pbuser "github.com/openimsdk/protocol/user"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -38,23 +37,6 @@ func (s *userServer) getUsersOnlineStatus(ctx context.Context, userIDs []string)
|
|||||||
|
|
||||||
// SubscribeOrCancelUsersStatus Subscribe online or cancel online users.
|
// SubscribeOrCancelUsersStatus Subscribe online or cancel online users.
|
||||||
func (s *userServer) SubscribeOrCancelUsersStatus(ctx context.Context, req *pbuser.SubscribeOrCancelUsersStatusReq) (*pbuser.SubscribeOrCancelUsersStatusResp, error) {
|
func (s *userServer) SubscribeOrCancelUsersStatus(ctx context.Context, req *pbuser.SubscribeOrCancelUsersStatusReq) (*pbuser.SubscribeOrCancelUsersStatusResp, error) {
|
||||||
if req.Genre == constant.SubscriberUser {
|
|
||||||
err := s.db.SubscribeUsersStatus(ctx, req.UserID, req.UserIDs)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
var status []*pbuser.OnlineStatus
|
|
||||||
status, err = s.getUsersOnlineStatus(ctx, req.UserIDs)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return &pbuser.SubscribeOrCancelUsersStatusResp{StatusList: status}, nil
|
|
||||||
} else if req.Genre == constant.Unsubscribe {
|
|
||||||
err := s.db.UnsubscribeUsersStatus(ctx, req.UserID, req.UserIDs)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return &pbuser.SubscribeOrCancelUsersStatusResp{}, nil
|
return &pbuser.SubscribeOrCancelUsersStatusResp{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -82,34 +64,12 @@ func (s *userServer) SetUserStatus(ctx context.Context, req *pbuser.SetUserStatu
|
|||||||
if err := s.online.SetUserOnline(ctx, req.UserID, online, offline); err != nil {
|
if err := s.online.SetUserOnline(ctx, req.UserID, online, offline); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
list, err := s.db.GetSubscribedList(ctx, req.UserID)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
for _, userID := range list {
|
|
||||||
tips := &sdkws.UserStatusChangeTips{
|
|
||||||
FromUserID: req.UserID,
|
|
||||||
ToUserID: userID,
|
|
||||||
Status: req.Status,
|
|
||||||
PlatformID: req.PlatformID,
|
|
||||||
}
|
|
||||||
s.userNotificationSender.UserStatusChangeNotification(ctx, tips)
|
|
||||||
}
|
|
||||||
|
|
||||||
return &pbuser.SetUserStatusResp{}, nil
|
return &pbuser.SetUserStatusResp{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetSubscribeUsersStatus Get the online status of subscribers.
|
// GetSubscribeUsersStatus Get the online status of subscribers.
|
||||||
func (s *userServer) GetSubscribeUsersStatus(ctx context.Context, req *pbuser.GetSubscribeUsersStatusReq) (*pbuser.GetSubscribeUsersStatusResp, error) {
|
func (s *userServer) GetSubscribeUsersStatus(ctx context.Context, req *pbuser.GetSubscribeUsersStatusReq) (*pbuser.GetSubscribeUsersStatusResp, error) {
|
||||||
userList, err := s.db.GetAllSubscribeList(ctx, req.UserID)
|
return &pbuser.GetSubscribeUsersStatusResp{}, nil
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
onlineStatusList, err := s.getUsersOnlineStatus(ctx, userList)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return &pbuser.GetSubscribeUsersStatusResp{StatusList: onlineStatusList}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *userServer) SetUserOnlineStatus(ctx context.Context, req *pbuser.SetUserOnlineStatusReq) (*pbuser.SetUserOnlineStatusResp, error) {
|
func (s *userServer) SetUserOnlineStatus(ctx context.Context, req *pbuser.SetUserOnlineStatusReq) (*pbuser.SetUserOnlineStatusResp, error) {
|
||||||
|
@ -93,8 +93,7 @@ func Start(ctx context.Context, config *Config, client registry.SvcDiscoveryRegi
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
userCache := redis.NewUserCacheRedis(rdb, &config.LocalCacheConfig, userDB, redis.GetRocksCacheOptions())
|
userCache := redis.NewUserCacheRedis(rdb, &config.LocalCacheConfig, userDB, redis.GetRocksCacheOptions())
|
||||||
userMongoDB := mgo.NewUserMongoDriver(mgocli.GetDB())
|
database := controller.NewUserDatabase(userDB, userCache, mgocli.GetTx())
|
||||||
database := controller.NewUserDatabase(userDB, userCache, mgocli.GetTx(), userMongoDB)
|
|
||||||
friendRpcClient := rpcclient.NewFriendRpcClient(client, config.Share.RpcRegisterName.Friend)
|
friendRpcClient := rpcclient.NewFriendRpcClient(client, config.Share.RpcRegisterName.Friend)
|
||||||
groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group)
|
groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group)
|
||||||
msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg)
|
msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg)
|
||||||
|
@ -62,14 +62,6 @@ type UserDatabase interface {
|
|||||||
CountRangeEverydayTotal(ctx context.Context, start time.Time, end time.Time) (map[string]int64, error)
|
CountRangeEverydayTotal(ctx context.Context, start time.Time, end time.Time) (map[string]int64, error)
|
||||||
|
|
||||||
SortQuery(ctx context.Context, userIDName map[string]string, asc bool) ([]*model.User, error)
|
SortQuery(ctx context.Context, userIDName map[string]string, asc bool) ([]*model.User, error)
|
||||||
// SubscribeUsersStatus Subscribe a user's presence status
|
|
||||||
SubscribeUsersStatus(ctx context.Context, userID string, userIDs []string) error
|
|
||||||
// UnsubscribeUsersStatus unsubscribe a user's presence status
|
|
||||||
UnsubscribeUsersStatus(ctx context.Context, userID string, userIDs []string) error
|
|
||||||
// GetAllSubscribeList Get a list of all subscriptions
|
|
||||||
GetAllSubscribeList(ctx context.Context, userID string) ([]string, error)
|
|
||||||
// GetSubscribedList Get all subscribed lists
|
|
||||||
GetSubscribedList(ctx context.Context, userID string) ([]string, error)
|
|
||||||
|
|
||||||
// CRUD user command
|
// CRUD user command
|
||||||
AddUserCommand(ctx context.Context, userID string, Type int32, UUID string, value string, ex string) error
|
AddUserCommand(ctx context.Context, userID string, Type int32, UUID string, value string, ex string) error
|
||||||
@ -80,14 +72,13 @@ type UserDatabase interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type userDatabase struct {
|
type userDatabase struct {
|
||||||
tx tx.Tx
|
tx tx.Tx
|
||||||
userDB database.User
|
userDB database.User
|
||||||
cache cache.UserCache
|
cache cache.UserCache
|
||||||
mongoDB database.SubscribeUser
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewUserDatabase(userDB database.User, cache cache.UserCache, tx tx.Tx, mongoDB database.SubscribeUser) UserDatabase {
|
func NewUserDatabase(userDB database.User, cache cache.UserCache, tx tx.Tx) UserDatabase {
|
||||||
return &userDatabase{userDB: userDB, cache: cache, tx: tx, mongoDB: mongoDB}
|
return &userDatabase{userDB: userDB, cache: cache, tx: tx}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (u *userDatabase) InitOnce(ctx context.Context, users []*model.User) error {
|
func (u *userDatabase) InitOnce(ctx context.Context, users []*model.User) error {
|
||||||
@ -212,36 +203,6 @@ func (u *userDatabase) SortQuery(ctx context.Context, userIDName map[string]stri
|
|||||||
return u.userDB.SortQuery(ctx, userIDName, asc)
|
return u.userDB.SortQuery(ctx, userIDName, asc)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SubscribeUsersStatus Subscribe or unsubscribe a user's presence status.
|
|
||||||
func (u *userDatabase) SubscribeUsersStatus(ctx context.Context, userID string, userIDs []string) error {
|
|
||||||
err := u.mongoDB.AddSubscriptionList(ctx, userID, userIDs)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// UnsubscribeUsersStatus unsubscribe a user's presence status.
|
|
||||||
func (u *userDatabase) UnsubscribeUsersStatus(ctx context.Context, userID string, userIDs []string) error {
|
|
||||||
err := u.mongoDB.UnsubscriptionList(ctx, userID, userIDs)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetAllSubscribeList Get a list of all subscriptions.
|
|
||||||
func (u *userDatabase) GetAllSubscribeList(ctx context.Context, userID string) ([]string, error) {
|
|
||||||
list, err := u.mongoDB.GetAllSubscribeList(ctx, userID)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return list, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetSubscribedList Get all subscribed lists.
|
|
||||||
func (u *userDatabase) GetSubscribedList(ctx context.Context, userID string) ([]string, error) {
|
|
||||||
list, err := u.mongoDB.GetSubscribedList(ctx, userID)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return list, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (u *userDatabase) AddUserCommand(ctx context.Context, userID string, Type int32, UUID string, value string, ex string) error {
|
func (u *userDatabase) AddUserCommand(ctx context.Context, userID string, Type int32, UUID string, value string, ex string) error {
|
||||||
return u.userDB.AddUserCommand(ctx, userID, Type, UUID, value, ex)
|
return u.userDB.AddUserCommand(ctx, userID, Type, UUID, value, ex)
|
||||||
}
|
}
|
||||||
|
@ -109,7 +109,13 @@ func (f *FriendMgo) UpdateByMap(ctx context.Context, ownerUserID string, friendU
|
|||||||
return mongoutil.IncrVersion(func() error {
|
return mongoutil.IncrVersion(func() error {
|
||||||
return mongoutil.UpdateOne(ctx, f.coll, filter, bson.M{"$set": args}, true)
|
return mongoutil.UpdateOne(ctx, f.coll, filter, bson.M{"$set": args}, true)
|
||||||
}, func() error {
|
}, func() error {
|
||||||
return f.owner.IncrVersion(ctx, ownerUserID, []string{friendUserID}, model.VersionStateUpdate)
|
var friendUserIDs []string
|
||||||
|
if f.IsUpdateIsPinned(args) {
|
||||||
|
friendUserIDs = []string{model.VersionSortChangeID, friendUserID}
|
||||||
|
} else {
|
||||||
|
friendUserIDs = []string{friendUserID}
|
||||||
|
}
|
||||||
|
return f.owner.IncrVersion(ctx, ownerUserID, friendUserIDs, model.VersionStateUpdate)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -214,7 +220,7 @@ func (f *FriendMgo) FindFriendUserIDs(ctx context.Context, ownerUserID string) (
|
|||||||
|
|
||||||
func (f *FriendMgo) UpdateFriends(ctx context.Context, ownerUserID string, friendUserIDs []string, val map[string]any) error {
|
func (f *FriendMgo) UpdateFriends(ctx context.Context, ownerUserID string, friendUserIDs []string, val map[string]any) error {
|
||||||
// Ensure there are IDs to update
|
// Ensure there are IDs to update
|
||||||
if len(friendUserIDs) == 0 {
|
if len(friendUserIDs) == 0 || len(val) == 0 {
|
||||||
return nil // Or return an error if you expect there to always be IDs
|
return nil // Or return an error if you expect there to always be IDs
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -230,7 +236,13 @@ func (f *FriendMgo) UpdateFriends(ctx context.Context, ownerUserID string, frien
|
|||||||
return mongoutil.IncrVersion(func() error {
|
return mongoutil.IncrVersion(func() error {
|
||||||
return mongoutil.Ignore(mongoutil.UpdateMany(ctx, f.coll, filter, update))
|
return mongoutil.Ignore(mongoutil.UpdateMany(ctx, f.coll, filter, update))
|
||||||
}, func() error {
|
}, func() error {
|
||||||
return f.owner.IncrVersion(ctx, ownerUserID, friendUserIDs, model.VersionStateUpdate)
|
var userIDs []string
|
||||||
|
if f.IsUpdateIsPinned(val) {
|
||||||
|
userIDs = append([]string{model.VersionSortChangeID}, friendUserIDs...)
|
||||||
|
} else {
|
||||||
|
userIDs = friendUserIDs
|
||||||
|
}
|
||||||
|
return f.owner.IncrVersion(ctx, ownerUserID, userIDs, model.VersionStateUpdate)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -248,3 +260,11 @@ func (f *FriendMgo) FindFriendUserID(ctx context.Context, friendUserID string) (
|
|||||||
func (f *FriendMgo) IncrVersion(ctx context.Context, ownerUserID string, friendUserIDs []string, state int32) error {
|
func (f *FriendMgo) IncrVersion(ctx context.Context, ownerUserID string, friendUserIDs []string, state int32) error {
|
||||||
return f.owner.IncrVersion(ctx, ownerUserID, friendUserIDs, state)
|
return f.owner.IncrVersion(ctx, ownerUserID, friendUserIDs, state)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (f *FriendMgo) IsUpdateIsPinned(data map[string]any) bool {
|
||||||
|
if data == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
_, ok := data["is_pinned"]
|
||||||
|
return ok
|
||||||
|
}
|
||||||
|
@ -59,7 +59,7 @@ type GroupMemberMgo struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (g *GroupMemberMgo) memberSort() any {
|
func (g *GroupMemberMgo) memberSort() any {
|
||||||
return bson.D{{"role_level", -1}, {"create_time", -1}}
|
return bson.D{{"role_level", -1}, {"create_time", 1}}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *GroupMemberMgo) Create(ctx context.Context, groupMembers []*model.GroupMember) (err error) {
|
func (g *GroupMemberMgo) Create(ctx context.Context, groupMembers []*model.GroupMember) (err error) {
|
||||||
@ -118,7 +118,7 @@ func (g *GroupMemberMgo) UpdateRoleLevel(ctx context.Context, groupID string, us
|
|||||||
return mongoutil.UpdateOne(ctx, g.coll, bson.M{"group_id": groupID, "user_id": userID},
|
return mongoutil.UpdateOne(ctx, g.coll, bson.M{"group_id": groupID, "user_id": userID},
|
||||||
bson.M{"$set": bson.M{"role_level": roleLevel}}, true)
|
bson.M{"$set": bson.M{"role_level": roleLevel}}, true)
|
||||||
}, func() error {
|
}, func() error {
|
||||||
return g.member.IncrVersion(ctx, groupID, []string{userID}, model.VersionStateUpdate)
|
return g.member.IncrVersion(ctx, groupID, []string{model.VersionSortChangeID, userID}, model.VersionStateUpdate)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
func (g *GroupMemberMgo) UpdateUserRoleLevels(ctx context.Context, groupID string, firstUserID string, firstUserRoleLevel int32, secondUserID string, secondUserRoleLevel int32) error {
|
func (g *GroupMemberMgo) UpdateUserRoleLevels(ctx context.Context, groupID string, firstUserID string, firstUserRoleLevel int32, secondUserID string, secondUserRoleLevel int32) error {
|
||||||
@ -131,10 +131,9 @@ func (g *GroupMemberMgo) UpdateUserRoleLevels(ctx context.Context, groupID strin
|
|||||||
bson.M{"$set": bson.M{"role_level": secondUserRoleLevel}}, true); err != nil {
|
bson.M{"$set": bson.M{"role_level": secondUserRoleLevel}}, true); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}, func() error {
|
}, func() error {
|
||||||
return g.member.IncrVersion(ctx, groupID, []string{firstUserID, secondUserID}, model.VersionStateUpdate)
|
return g.member.IncrVersion(ctx, groupID, []string{model.VersionSortChangeID, firstUserID, secondUserID}, model.VersionStateUpdate)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -145,7 +144,13 @@ func (g *GroupMemberMgo) Update(ctx context.Context, groupID string, userID stri
|
|||||||
return mongoutil.IncrVersion(func() error {
|
return mongoutil.IncrVersion(func() error {
|
||||||
return mongoutil.UpdateOne(ctx, g.coll, bson.M{"group_id": groupID, "user_id": userID}, bson.M{"$set": data}, true)
|
return mongoutil.UpdateOne(ctx, g.coll, bson.M{"group_id": groupID, "user_id": userID}, bson.M{"$set": data}, true)
|
||||||
}, func() error {
|
}, func() error {
|
||||||
return g.member.IncrVersion(ctx, groupID, []string{userID}, model.VersionStateUpdate)
|
var userIDs []string
|
||||||
|
if g.IsUpdateRoleLevel(data) {
|
||||||
|
userIDs = []string{model.VersionSortChangeID, userID}
|
||||||
|
} else {
|
||||||
|
userIDs = []string{userID}
|
||||||
|
}
|
||||||
|
return g.member.IncrVersion(ctx, groupID, userIDs, model.VersionStateUpdate)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,189 +0,0 @@
|
|||||||
// Copyright © 2023 OpenIM. All rights reserved.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package mgo
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
|
||||||
|
|
||||||
"github.com/openimsdk/tools/errs"
|
|
||||||
"go.mongodb.org/mongo-driver/bson"
|
|
||||||
"go.mongodb.org/mongo-driver/mongo"
|
|
||||||
"go.mongodb.org/mongo-driver/mongo/options"
|
|
||||||
)
|
|
||||||
|
|
||||||
// prefixes and suffixes.
|
|
||||||
const (
|
|
||||||
SubscriptionPrefix = "subscription_prefix"
|
|
||||||
SubscribedPrefix = "subscribed_prefix"
|
|
||||||
)
|
|
||||||
|
|
||||||
// MaximumSubscription Maximum number of subscriptions.
|
|
||||||
const (
|
|
||||||
MaximumSubscription = 3000
|
|
||||||
)
|
|
||||||
|
|
||||||
func NewUserMongoDriver(database *mongo.Database) database.SubscribeUser {
|
|
||||||
return &UserMongoDriver{
|
|
||||||
userCollection: database.Collection(model.SubscribeUserTableName),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type UserMongoDriver struct {
|
|
||||||
userCollection *mongo.Collection
|
|
||||||
}
|
|
||||||
|
|
||||||
// AddSubscriptionList Subscriber's handling of thresholds.
|
|
||||||
func (u *UserMongoDriver) AddSubscriptionList(ctx context.Context, userID string, userIDList []string) error {
|
|
||||||
// Check the number of lists in the key.
|
|
||||||
pipeline := mongo.Pipeline{
|
|
||||||
{{"$match", bson.D{{"user_id", SubscriptionPrefix + userID}}}},
|
|
||||||
{{"$project", bson.D{{"count", bson.D{{"$size", "$user_id_list"}}}}}},
|
|
||||||
}
|
|
||||||
// perform aggregate operations
|
|
||||||
cursor, err := u.userCollection.Aggregate(ctx, pipeline)
|
|
||||||
if err != nil {
|
|
||||||
return errs.Wrap(err)
|
|
||||||
}
|
|
||||||
defer cursor.Close(ctx)
|
|
||||||
var cnt struct {
|
|
||||||
Count int `bson:"count"`
|
|
||||||
}
|
|
||||||
// iterate over aggregated results
|
|
||||||
for cursor.Next(ctx) {
|
|
||||||
err = cursor.Decode(&cnt)
|
|
||||||
if err != nil {
|
|
||||||
return errs.Wrap(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
var newUserIDList []string
|
|
||||||
// If the threshold is exceeded, pop out the previous MaximumSubscription - len(userIDList) and insert it.
|
|
||||||
if cnt.Count+len(userIDList) > MaximumSubscription {
|
|
||||||
newUserIDList, err = u.GetAllSubscribeList(ctx, userID)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
newUserIDList = newUserIDList[MaximumSubscription-len(userIDList):]
|
|
||||||
_, err = u.userCollection.UpdateOne(
|
|
||||||
ctx,
|
|
||||||
bson.M{"user_id": SubscriptionPrefix + userID},
|
|
||||||
bson.M{"$set": bson.M{"user_id_list": newUserIDList}},
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
// Another way to subscribe to N before pop,Delete after testing
|
|
||||||
/*for i := 1; i <= MaximumSubscription-len(userIDList); i++ {
|
|
||||||
_, err := u.userCollection.UpdateOne(
|
|
||||||
ctx,
|
|
||||||
bson.M{"user_id": SubscriptionPrefix + userID},
|
|
||||||
bson.M{SubscriptionPrefix + userID: bson.M{"$pop": -1}},
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}*/
|
|
||||||
}
|
|
||||||
upsert := true
|
|
||||||
opts := &options.UpdateOptions{
|
|
||||||
Upsert: &upsert,
|
|
||||||
}
|
|
||||||
_, err = u.userCollection.UpdateOne(
|
|
||||||
ctx,
|
|
||||||
bson.M{"user_id": SubscriptionPrefix + userID},
|
|
||||||
bson.M{"$addToSet": bson.M{"user_id_list": bson.M{"$each": userIDList}}},
|
|
||||||
opts,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return errs.Wrap(err)
|
|
||||||
}
|
|
||||||
for _, user := range userIDList {
|
|
||||||
_, err = u.userCollection.UpdateOne(
|
|
||||||
ctx,
|
|
||||||
bson.M{"user_id": SubscribedPrefix + user},
|
|
||||||
bson.M{"$addToSet": bson.M{"user_id_list": userID}},
|
|
||||||
opts,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return errs.WrapMsg(err, "transaction failed")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// UnsubscriptionList Handling of unsubscribe.
|
|
||||||
func (u *UserMongoDriver) UnsubscriptionList(ctx context.Context, userID string, userIDList []string) error {
|
|
||||||
_, err := u.userCollection.UpdateOne(
|
|
||||||
ctx,
|
|
||||||
bson.M{"user_id": SubscriptionPrefix + userID},
|
|
||||||
bson.M{"$pull": bson.M{"user_id_list": bson.M{"$in": userIDList}}},
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return errs.Wrap(err)
|
|
||||||
}
|
|
||||||
err = u.RemoveSubscribedListFromUser(ctx, userID, userIDList)
|
|
||||||
if err != nil {
|
|
||||||
return errs.Wrap(err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// RemoveSubscribedListFromUser Among the unsubscribed users, delete the user from the subscribed list.
|
|
||||||
func (u *UserMongoDriver) RemoveSubscribedListFromUser(ctx context.Context, userID string, userIDList []string) error {
|
|
||||||
var err error
|
|
||||||
for _, userIDTemp := range userIDList {
|
|
||||||
_, err = u.userCollection.UpdateOne(
|
|
||||||
ctx,
|
|
||||||
bson.M{"user_id": SubscribedPrefix + userIDTemp},
|
|
||||||
bson.M{"$pull": bson.M{"user_id_list": userID}},
|
|
||||||
)
|
|
||||||
}
|
|
||||||
return errs.Wrap(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetAllSubscribeList Get all users subscribed by this user.
|
|
||||||
func (u *UserMongoDriver) GetAllSubscribeList(ctx context.Context, userID string) (userIDList []string, err error) {
|
|
||||||
var user model.SubscribeUser
|
|
||||||
cursor := u.userCollection.FindOne(
|
|
||||||
ctx,
|
|
||||||
bson.M{"user_id": SubscriptionPrefix + userID})
|
|
||||||
err = cursor.Decode(&user)
|
|
||||||
if err != nil {
|
|
||||||
if err == mongo.ErrNoDocuments {
|
|
||||||
return []string{}, nil
|
|
||||||
} else {
|
|
||||||
return nil, errs.Wrap(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return user.UserIDList, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetSubscribedList Get the user subscribed by those users.
|
|
||||||
func (u *UserMongoDriver) GetSubscribedList(ctx context.Context, userID string) (userIDList []string, err error) {
|
|
||||||
var user model.SubscribeUser
|
|
||||||
cursor := u.userCollection.FindOne(
|
|
||||||
ctx,
|
|
||||||
bson.M{"user_id": SubscribedPrefix + userID})
|
|
||||||
err = cursor.Decode(&user)
|
|
||||||
if err != nil {
|
|
||||||
if err == mongo.ErrNoDocuments {
|
|
||||||
return []string{}, nil
|
|
||||||
} else {
|
|
||||||
return nil, errs.Wrap(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return user.UserIDList, nil
|
|
||||||
}
|
|
@ -18,8 +18,8 @@ import (
|
|||||||
|
|
||||||
func NewVersionLog(coll *mongo.Collection) (database.VersionLog, error) {
|
func NewVersionLog(coll *mongo.Collection) (database.VersionLog, error) {
|
||||||
lm := &VersionLogMgo{coll: coll}
|
lm := &VersionLogMgo{coll: coll}
|
||||||
if lm.initIndex(context.Background()) != nil {
|
if err := lm.initIndex(context.Background()); err != nil {
|
||||||
return nil, errs.ErrInternalServer.WrapMsg("init index failed", "coll", coll.Name())
|
return nil, errs.WrapMsg(err, "init version log index failed", "coll", coll.Name())
|
||||||
}
|
}
|
||||||
return lm, nil
|
return lm, nil
|
||||||
}
|
}
|
||||||
@ -33,6 +33,7 @@ func (l *VersionLogMgo) initIndex(ctx context.Context) error {
|
|||||||
Keys: bson.M{
|
Keys: bson.M{
|
||||||
"d_id": 1,
|
"d_id": 1,
|
||||||
},
|
},
|
||||||
|
Options: options.Index().SetUnique(true),
|
||||||
})
|
})
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -152,8 +153,24 @@ func (l *VersionLogMgo) writeLogBatch2(ctx context.Context, dId string, eIds []s
|
|||||||
"$unset": "delete_e_ids",
|
"$unset": "delete_e_ids",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
opt := options.FindOneAndUpdate().SetUpsert(false).SetReturnDocument(options.After).SetProjection(bson.M{"logs": 0})
|
projection := bson.M{
|
||||||
return mongoutil.FindOneAndUpdate[*model.VersionLog](ctx, l.coll, filter, pipeline, opt)
|
"logs": 0,
|
||||||
|
}
|
||||||
|
opt := options.FindOneAndUpdate().SetUpsert(false).SetReturnDocument(options.After).SetProjection(projection)
|
||||||
|
res, err := mongoutil.FindOneAndUpdate[*model.VersionLog](ctx, l.coll, filter, pipeline, opt)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
res.Logs = make([]model.VersionLogElem, 0, len(eIds))
|
||||||
|
for _, id := range eIds {
|
||||||
|
res.Logs = append(res.Logs, model.VersionLogElem{
|
||||||
|
EID: id,
|
||||||
|
State: state,
|
||||||
|
Version: res.Version,
|
||||||
|
LastUpdate: res.LastUpdate,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *VersionLogMgo) findDoc(ctx context.Context, dId string) (*model.VersionLog, error) {
|
func (l *VersionLogMgo) findDoc(ctx context.Context, dId string) (*model.VersionLog, error) {
|
||||||
|
@ -9,12 +9,12 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func Result[V any](val V, err error) V {
|
//func Result[V any](val V, err error) V {
|
||||||
if err != nil {
|
// if err != nil {
|
||||||
panic(err)
|
// panic(err)
|
||||||
}
|
// }
|
||||||
return val
|
// return val
|
||||||
}
|
//}
|
||||||
|
|
||||||
func Check(err error) {
|
func Check(err error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -30,7 +30,7 @@ func TestName(t *testing.T) {
|
|||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
vl := tmp.(*VersionLogMgo)
|
vl := tmp.(*VersionLogMgo)
|
||||||
res, err := vl.writeLogBatch2(context.Background(), "100", []string{"1000", "1001", "1003"}, model.VersionStateInsert, time.Now())
|
res, err := vl.incrVersionResult(context.Background(), "100", []string{"1000", "1001", "1003"}, model.VersionStateInsert)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Log(err)
|
t.Log(err)
|
||||||
return
|
return
|
||||||
|
@ -1,31 +0,0 @@
|
|||||||
// Copyright © 2023 OpenIM. All rights reserved.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package database
|
|
||||||
|
|
||||||
import "context"
|
|
||||||
|
|
||||||
// SubscribeUser Operation interface of user mongodb.
|
|
||||||
type SubscribeUser interface {
|
|
||||||
// AddSubscriptionList Subscriber's handling of thresholds.
|
|
||||||
AddSubscriptionList(ctx context.Context, userID string, userIDList []string) error
|
|
||||||
// UnsubscriptionList Handling of unsubscribe.
|
|
||||||
UnsubscriptionList(ctx context.Context, userID string, userIDList []string) error
|
|
||||||
// RemoveSubscribedListFromUser Among the unsubscribed users, delete the user from the subscribed list.
|
|
||||||
RemoveSubscribedListFromUser(ctx context.Context, userID string, userIDList []string) error
|
|
||||||
// GetAllSubscribeList Get all users subscribed by this user
|
|
||||||
GetAllSubscribeList(ctx context.Context, id string) (userIDList []string, err error)
|
|
||||||
// GetSubscribedList Get the user subscribed by those users
|
|
||||||
GetSubscribedList(ctx context.Context, id string) (userIDList []string, err error)
|
|
||||||
}
|
|
@ -14,6 +14,11 @@ const (
|
|||||||
VersionStateUpdate
|
VersionStateUpdate
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
VersionGroupChangeID = ""
|
||||||
|
VersionSortChangeID = "____S_O_R_T_I_D____"
|
||||||
|
)
|
||||||
|
|
||||||
type VersionLogElem struct {
|
type VersionLogElem struct {
|
||||||
EID string `bson:"e_id"`
|
EID string `bson:"e_id"`
|
||||||
State int32 `bson:"state"`
|
State int32 `bson:"state"`
|
||||||
|
@ -12,7 +12,7 @@ func main() {
|
|||||||
config string
|
config string
|
||||||
second int
|
second int
|
||||||
)
|
)
|
||||||
flag.StringVar(&config, "c", "/Users/chao/Desktop/project/open-im-server/config", "config directory")
|
flag.StringVar(&config, "c", "", "config directory")
|
||||||
flag.IntVar(&second, "sec", 3600*24, "delayed deletion of the original seq key after conversion")
|
flag.IntVar(&second, "sec", 3600*24, "delayed deletion of the original seq key after conversion")
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
if err := internal.Main(config, time.Duration(second)*time.Second); err != nil {
|
if err := internal.Main(config, time.Duration(second)*time.Second); err != nil {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user