diff --git a/go.mod b/go.mod index a799f2c08..0637492eb 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.69-alpha.30 + github.com/openimsdk/protocol v0.0.69-alpha.38 github.com/openimsdk/tools v0.0.49-alpha.51 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 diff --git a/go.sum b/go.sum index d9f948874..92a783c06 100644 --- a/go.sum +++ b/go.sum @@ -319,8 +319,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.69-alpha.30 h1:OXzCIpDpIY/GI6h1SDYWN51OS9Xv/BcHaOwq8whPKqI= -github.com/openimsdk/protocol v0.0.69-alpha.30/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= +github.com/openimsdk/protocol v0.0.69-alpha.38 h1:kVZCHIXg/el8YJFoIBWhZu1sbbTUqmzgF4l0W3sUH24= +github.com/openimsdk/protocol v0.0.69-alpha.38/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= github.com/openimsdk/tools v0.0.49-alpha.51 h1:JTPEetVSNOczw1n+XjiPozaH2SBPQAc+9VlPE41wEeY= github.com/openimsdk/tools v0.0.49-alpha.51/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= diff --git a/internal/msggateway/client.go b/internal/msggateway/client.go index 2d898ff45..889e5c456 100644 --- a/internal/msggateway/client.go +++ b/internal/msggateway/client.go @@ -75,8 +75,8 @@ type Client struct { token string hbCtx context.Context hbCancel context.CancelFunc - subLock sync.Mutex - subUserIDs map[string]struct{} + subLock *sync.Mutex + subUserIDs map[string]struct{} // client conn subscription list } // ResetClient updates the client's state with new connection and context information. @@ -94,6 +94,11 @@ func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, longConnServer c.closedErr = nil c.token = ctx.GetToken() c.hbCtx, c.hbCancel = context.WithCancel(c.ctx) + c.subLock = new(sync.Mutex) + if c.subUserIDs != nil { + clear(c.subUserIDs) + } + c.subUserIDs = make(map[string]struct{}) } func (c *Client) pingHandler(appData string) error { @@ -246,13 +251,11 @@ func (c *Client) setAppBackgroundStatus(ctx context.Context, req *Req) ([]byte, } func (c *Client) close() { + c.w.Lock() + defer c.w.Unlock() if c.closed.Load() { return } - - c.w.Lock() - defer c.w.Unlock() - c.closed.Store(true) c.conn.Close() c.hbCancel() // Close server-initiated heartbeat. @@ -313,6 +316,14 @@ func (c *Client) KickOnlineMessage() error { return err } +func (c *Client) PushUserOnlineStatus(data []byte) error { + resp := Resp{ + ReqIdentifier: WsSubUserOnlineStatus, + Data: data, + } + return c.writeBinaryMsg(resp) +} + func (c *Client) writeBinaryMsg(resp Resp) error { if c.closed.Load() { return nil diff --git a/internal/msggateway/subscription.go b/internal/msggateway/subscription.go index 9460f5dbf..9bb41e0df 100644 --- a/internal/msggateway/subscription.go +++ b/internal/msggateway/subscription.go @@ -2,15 +2,11 @@ package msggateway import ( "context" - "encoding/json" - "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/utils/datautil" - "github.com/openimsdk/tools/utils/idutil" "google.golang.org/protobuf/proto" "sync" - "time" ) func (ws *WsServer) subscriberUserOnlineStatusChanges(ctx context.Context, userID string, platformIDs []int32) { @@ -45,33 +41,19 @@ func (ws *WsServer) SubUserOnlineStatus(ctx context.Context, client *Client, dat return proto.Marshal(&resp) } -type subClient struct { - clients map[string]*Client -} - func newSubscription() *Subscription { return &Subscription{ userIDs: make(map[string]*subClient), } } -type Subscription struct { - lock sync.RWMutex - userIDs map[string]*subClient +type subClient struct { + clients map[string]*Client } -func (s *Subscription) GetClient(userID string) []*Client { - s.lock.RLock() - defer s.lock.RUnlock() - cs, ok := s.userIDs[userID] - if !ok { - return nil - } - clients := make([]*Client, 0, len(cs.clients)) - for _, client := range cs.clients { - clients = append(clients, client) - } - return clients +type Subscription struct { + lock sync.RWMutex + userIDs map[string]*subClient // subscribe to the user's client connection } func (s *Subscription) DelClient(client *Client) { @@ -99,6 +81,20 @@ func (s *Subscription) DelClient(client *Client) { } } +func (s *Subscription) GetClient(userID string) []*Client { + s.lock.RLock() + defer s.lock.RUnlock() + cs, ok := s.userIDs[userID] + if !ok { + return nil + } + clients := make([]*Client, 0, len(cs.clients)) + for _, client := range cs.clients { + clients = append(clients, client) + } + return clients +} + func (s *Subscription) Sub(client *Client, addUserIDs, delUserIDs []string) { if len(addUserIDs)+len(delUserIDs) == 0 { return @@ -121,6 +117,7 @@ func (s *Subscription) Sub(client *Client, addUserIDs, delUserIDs []string) { continue } client.subUserIDs[userID] = struct{}{} + add[userID] = struct{}{} } client.subLock.Unlock() if len(del)+len(add) == 0 { @@ -154,28 +151,16 @@ func (ws *WsServer) pushUserIDOnlineStatus(ctx context.Context, userID string, p if len(clients) == 0 { return } - msgContent, err := json.Marshal(platformIDs) + onlineStatus, err := proto.Marshal(&sdkws.SubUserOnlineStatusTips{ + Subscribers: []*sdkws.SubUserOnlineStatusElem{{UserID: userID, OnlinePlatformIDs: platformIDs}}, + }) if err != nil { log.ZError(ctx, "pushUserIDOnlineStatus json.Marshal", err) return } - now := time.Now().UnixMilli() - msgID := idutil.GetMsgIDByMD5(userID) - msg := &sdkws.MsgData{ - SendID: userID, - ClientMsgID: msgID, - ServerMsgID: msgID, - SenderPlatformID: constant.AdminPlatformID, - SessionType: constant.NotificationChatType, - ContentType: constant.UserSubscribeOnlineStatusNotification, - Content: msgContent, - SendTime: now, - CreateTime: now, - } for _, client := range clients { - msg.RecvID = client.UserID - if err := client.PushMessage(ctx, msg); err != nil { - log.ZError(ctx, "UserSubscribeOnlineStatusNotification push failed", err, "userID", client.UserID, "platformID", client.PlatformID, "changeUserID", userID, "content", msgContent) + if err := client.PushUserOnlineStatus(onlineStatus); err != nil { + log.ZError(ctx, "UserSubscribeOnlineStatusNotification push failed", err, "userID", client.UserID, "platformID", client.PlatformID, "changeUserID", userID, "changePlatformID", platformIDs) } } } diff --git a/internal/msggateway/ws_server.go b/internal/msggateway/ws_server.go index e903084a9..537b8c5f0 100644 --- a/internal/msggateway/ws_server.go +++ b/internal/msggateway/ws_server.go @@ -358,9 +358,7 @@ func (ws *WsServer) unregisterClient(client *Client) { prommetrics.OnlineUserGauge.Dec() } ws.onlineUserConnNum.Add(-1) - client.subLock.Lock() - clear(client.subUserIDs) - client.subLock.Unlock() + ws.subscription.DelClient(client) //ws.SetUserOnlineStatus(client.ctx, client, constant.Offline) log.ZInfo(client.ctx, "user offline", "close reason", client.closedErr, "online user Num", ws.onlineUserNum.Load(), "online user conn Num", diff --git a/internal/rpc/friend/notification.go b/internal/rpc/friend/notification.go index ddee025bb..5fb34577f 100644 --- a/internal/rpc/friend/notification.go +++ b/internal/rpc/friend/notification.go @@ -16,6 +16,8 @@ package friend import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/versionctx" relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" @@ -191,10 +193,37 @@ func (f *FriendNotificationSender) FriendDeletedNotification(ctx context.Context f.Notification(ctx, req.OwnerUserID, req.FriendUserID, constant.FriendDeletedNotification, &tips) } +func (f *FriendNotificationSender) setVersion(ctx context.Context, version *uint64, versionID *string, collName string, id string) { + versions := versionctx.GetVersionLog(ctx).Get() + for _, coll := range versions { + if coll.Name == collName && coll.Doc.DID == id { + *version = uint64(coll.Doc.Version) + *versionID = coll.Doc.ID.Hex() + return + } + } +} + +func (f *FriendNotificationSender) setSortVersion(ctx context.Context, version *uint64, versionID *string, collName string, id string, sortVersion *uint64) { + versions := versionctx.GetVersionLog(ctx).Get() + for _, coll := range versions { + if coll.Name == collName && coll.Doc.DID == id { + *version = uint64(coll.Doc.Version) + *versionID = coll.Doc.ID.Hex() + for _, elem := range coll.Doc.Logs { + if elem.EID == relationtb.VersionSortChangeID { + *sortVersion = uint64(elem.Version) + } + } + } + } +} + func (f *FriendNotificationSender) FriendRemarkSetNotification(ctx context.Context, fromUserID, toUserID string) { tips := sdkws.FriendInfoChangedTips{FromToUserID: &sdkws.FromToUserID{}} tips.FromToUserID.FromUserID = fromUserID tips.FromToUserID.ToUserID = toUserID + f.setSortVersion(ctx, &tips.FriendVersion, &tips.FriendVersionID, database.FriendVersionName, toUserID, &tips.FriendSortVersion) f.Notification(ctx, fromUserID, toUserID, constant.FriendRemarkSetNotification, &tips) } diff --git a/internal/rpc/friend/sync.go b/internal/rpc/friend/sync.go index 684894609..eee9f2afd 100644 --- a/internal/rpc/friend/sync.go +++ b/internal/rpc/friend/sync.go @@ -4,6 +4,7 @@ import ( "context" "github.com/openimsdk/open-im-server/v3/pkg/util/hashutil" "github.com/openimsdk/protocol/sdkws" + "slices" "github.com/openimsdk/open-im-server/v3/internal/rpc/incrversion" "github.com/openimsdk/open-im-server/v3/pkg/authverify" @@ -52,12 +53,27 @@ func (s *friendServer) GetIncrementalFriends(ctx context.Context, req *relation. if err := authverify.CheckAccessV3(ctx, req.UserID, s.config.Share.IMAdminUserID); err != nil { return nil, err } + var sortVersion uint64 opt := incrversion.Option[*sdkws.FriendInfo, relation.GetIncrementalFriendsResp]{ - Ctx: ctx, - VersionKey: req.UserID, - VersionID: req.VersionID, - VersionNumber: req.Version, - Version: s.db.FindFriendIncrVersion, + Ctx: ctx, + VersionKey: req.UserID, + VersionID: req.VersionID, + VersionNumber: req.Version, + Version: func(ctx context.Context, ownerUserID string, version uint, limit int) (*model.VersionLog, error) { + vl, err := s.db.FindFriendIncrVersion(ctx, ownerUserID, version, limit) + if err != nil { + return nil, err + } + vl.Logs = slices.DeleteFunc(vl.Logs, func(elem model.VersionLogElem) bool { + if elem.EID == model.VersionSortChangeID { + vl.LogLen-- + sortVersion = uint64(elem.Version) + return true + } + return false + }) + return vl, nil + }, CacheMaxVersion: s.db.FindMaxFriendVersionCache, Find: func(ctx context.Context, ids []string) ([]*sdkws.FriendInfo, error) { return s.getFriend(ctx, req.UserID, ids) @@ -65,12 +81,13 @@ func (s *friendServer) GetIncrementalFriends(ctx context.Context, req *relation. ID: func(elem *sdkws.FriendInfo) string { return elem.FriendUser.UserID }, Resp: func(version *model.VersionLog, deleteIds []string, insertList, updateList []*sdkws.FriendInfo, full bool) *relation.GetIncrementalFriendsResp { return &relation.GetIncrementalFriendsResp{ - VersionID: version.ID.Hex(), - Version: uint64(version.Version), - Full: full, - Delete: deleteIds, - Insert: insertList, - Update: updateList, + VersionID: version.ID.Hex(), + Version: uint64(version.Version), + Full: full, + Delete: deleteIds, + Insert: insertList, + Update: updateList, + SortVersion: sortVersion, } }, } diff --git a/internal/rpc/group/notification.go b/internal/rpc/group/notification.go index a8824962d..a7398795f 100644 --- a/internal/rpc/group/notification.go +++ b/internal/rpc/group/notification.go @@ -306,6 +306,21 @@ func (g *GroupNotificationSender) setVersion(ctx context.Context, version *uint6 } } +func (g *GroupNotificationSender) setSortVersion(ctx context.Context, version *uint64, versionID *string, collName string, id string, sortVersion *uint64) { + versions := versionctx.GetVersionLog(ctx).Get() + for _, coll := range versions { + if coll.Name == collName && coll.Doc.DID == id { + *version = uint64(coll.Doc.Version) + *versionID = coll.Doc.ID.Hex() + for _, elem := range coll.Doc.Logs { + if elem.EID == model.VersionSortChangeID { + *sortVersion = uint64(elem.Version) + } + } + } + } +} + func (g *GroupNotificationSender) GroupCreatedNotification(ctx context.Context, tips *sdkws.GroupCreatedTips) { var err error defer func() { @@ -707,7 +722,7 @@ func (g *GroupNotificationSender) GroupMemberInfoSetNotification(ctx context.Con if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { return } - g.setVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, tips.Group.GroupID) + g.setSortVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, tips.Group.GroupID, &tips.GroupSortVersion) g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberInfoSetNotification, tips) } diff --git a/internal/rpc/group/sync.go b/internal/rpc/group/sync.go index 75d060c0e..f89a98ee8 100644 --- a/internal/rpc/group/sync.go +++ b/internal/rpc/group/sync.go @@ -10,9 +10,13 @@ import ( "github.com/openimsdk/protocol/constant" pbgroup "github.com/openimsdk/protocol/group" "github.com/openimsdk/protocol/sdkws" - "slices" ) +func (s *groupServer) BatchGetIncrementalGroupMember(ctx context.Context, req *pbgroup.BatchGetIncrementalGroupMemberReq) (*pbgroup.BatchGetIncrementalGroupMemberResp, error) { + //TODO implement me + panic("implement me") +} + func (s *groupServer) GetFullGroupMemberUserIDs(ctx context.Context, req *pbgroup.GetFullGroupMemberUserIDsReq) (*pbgroup.GetFullGroupMemberUserIDsResp, error) { vl, err := s.db.FindMaxGroupMemberVersionCache(ctx, req.GroupID) if err != nil { @@ -63,7 +67,10 @@ func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgrou if group.Status == constant.GroupStatusDismissed { return nil, servererrs.ErrDismissedAlready.Wrap() } - var hasGroupUpdate bool + var ( + hasGroupUpdate bool + sortVersion uint64 + ) opt := incrversion.Option[*sdkws.GroupMemberFullInfo, pbgroup.GetIncrementalGroupMemberResp]{ Ctx: ctx, VersionKey: req.GroupID, @@ -74,14 +81,20 @@ func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgrou if err != nil { return nil, err } - vl.Logs = slices.DeleteFunc(vl.Logs, func(elem model.VersionLogElem) bool { - if elem.EID == "" { + logs := make([]model.VersionLogElem, 0, len(vl.Logs)) + for i, log := range vl.Logs { + switch log.EID { + case model.VersionGroupChangeID: vl.LogLen-- hasGroupUpdate = true - return true + case model.VersionSortChangeID: + vl.LogLen-- + sortVersion = uint64(log.Version) + default: + logs = append(logs, vl.Logs[i]) } - return false - }) + } + vl.Logs = logs if vl.LogLen > 0 { hasGroupUpdate = true } @@ -94,12 +107,13 @@ func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgrou ID: func(elem *sdkws.GroupMemberFullInfo) string { return elem.UserID }, Resp: func(version *model.VersionLog, delIDs []string, insertList, updateList []*sdkws.GroupMemberFullInfo, full bool) *pbgroup.GetIncrementalGroupMemberResp { return &pbgroup.GetIncrementalGroupMemberResp{ - VersionID: version.ID.Hex(), - Version: uint64(version.Version), - Full: full, - Delete: delIDs, - Insert: insertList, - Update: updateList, + VersionID: version.ID.Hex(), + Version: uint64(version.Version), + Full: full, + Delete: delIDs, + Insert: insertList, + Update: updateList, + SortVersion: sortVersion, } }, } diff --git a/internal/rpc/user/online.go b/internal/rpc/user/online.go index e853ceae2..99b272006 100644 --- a/internal/rpc/user/online.go +++ b/internal/rpc/user/online.go @@ -3,7 +3,6 @@ package user import ( "context" "github.com/openimsdk/protocol/constant" - "github.com/openimsdk/protocol/sdkws" pbuser "github.com/openimsdk/protocol/user" ) @@ -38,23 +37,6 @@ func (s *userServer) getUsersOnlineStatus(ctx context.Context, userIDs []string) // SubscribeOrCancelUsersStatus Subscribe online or cancel online users. func (s *userServer) SubscribeOrCancelUsersStatus(ctx context.Context, req *pbuser.SubscribeOrCancelUsersStatusReq) (*pbuser.SubscribeOrCancelUsersStatusResp, error) { - if req.Genre == constant.SubscriberUser { - err := s.db.SubscribeUsersStatus(ctx, req.UserID, req.UserIDs) - if err != nil { - return nil, err - } - var status []*pbuser.OnlineStatus - status, err = s.getUsersOnlineStatus(ctx, req.UserIDs) - if err != nil { - return nil, err - } - return &pbuser.SubscribeOrCancelUsersStatusResp{StatusList: status}, nil - } else if req.Genre == constant.Unsubscribe { - err := s.db.UnsubscribeUsersStatus(ctx, req.UserID, req.UserIDs) - if err != nil { - return nil, err - } - } return &pbuser.SubscribeOrCancelUsersStatusResp{}, nil } @@ -82,34 +64,12 @@ func (s *userServer) SetUserStatus(ctx context.Context, req *pbuser.SetUserStatu if err := s.online.SetUserOnline(ctx, req.UserID, online, offline); err != nil { return nil, err } - list, err := s.db.GetSubscribedList(ctx, req.UserID) - if err != nil { - return nil, err - } - for _, userID := range list { - tips := &sdkws.UserStatusChangeTips{ - FromUserID: req.UserID, - ToUserID: userID, - Status: req.Status, - PlatformID: req.PlatformID, - } - s.userNotificationSender.UserStatusChangeNotification(ctx, tips) - } - return &pbuser.SetUserStatusResp{}, nil } // GetSubscribeUsersStatus Get the online status of subscribers. func (s *userServer) GetSubscribeUsersStatus(ctx context.Context, req *pbuser.GetSubscribeUsersStatusReq) (*pbuser.GetSubscribeUsersStatusResp, error) { - userList, err := s.db.GetAllSubscribeList(ctx, req.UserID) - if err != nil { - return nil, err - } - onlineStatusList, err := s.getUsersOnlineStatus(ctx, userList) - if err != nil { - return nil, err - } - return &pbuser.GetSubscribeUsersStatusResp{StatusList: onlineStatusList}, nil + return &pbuser.GetSubscribeUsersStatusResp{}, nil } func (s *userServer) SetUserOnlineStatus(ctx context.Context, req *pbuser.SetUserOnlineStatusReq) (*pbuser.SetUserOnlineStatusResp, error) { diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index 0b96077ec..779d9b0c4 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -93,8 +93,7 @@ func Start(ctx context.Context, config *Config, client registry.SvcDiscoveryRegi return err } userCache := redis.NewUserCacheRedis(rdb, &config.LocalCacheConfig, userDB, redis.GetRocksCacheOptions()) - userMongoDB := mgo.NewUserMongoDriver(mgocli.GetDB()) - database := controller.NewUserDatabase(userDB, userCache, mgocli.GetTx(), userMongoDB) + database := controller.NewUserDatabase(userDB, userCache, mgocli.GetTx()) friendRpcClient := rpcclient.NewFriendRpcClient(client, config.Share.RpcRegisterName.Friend) groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group) msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg) diff --git a/pkg/common/storage/controller/user.go b/pkg/common/storage/controller/user.go index 59559537b..5ce8104e7 100644 --- a/pkg/common/storage/controller/user.go +++ b/pkg/common/storage/controller/user.go @@ -62,14 +62,6 @@ type UserDatabase interface { CountRangeEverydayTotal(ctx context.Context, start time.Time, end time.Time) (map[string]int64, error) SortQuery(ctx context.Context, userIDName map[string]string, asc bool) ([]*model.User, error) - // SubscribeUsersStatus Subscribe a user's presence status - SubscribeUsersStatus(ctx context.Context, userID string, userIDs []string) error - // UnsubscribeUsersStatus unsubscribe a user's presence status - UnsubscribeUsersStatus(ctx context.Context, userID string, userIDs []string) error - // GetAllSubscribeList Get a list of all subscriptions - GetAllSubscribeList(ctx context.Context, userID string) ([]string, error) - // GetSubscribedList Get all subscribed lists - GetSubscribedList(ctx context.Context, userID string) ([]string, error) // CRUD user command AddUserCommand(ctx context.Context, userID string, Type int32, UUID string, value string, ex string) error @@ -80,14 +72,13 @@ type UserDatabase interface { } type userDatabase struct { - tx tx.Tx - userDB database.User - cache cache.UserCache - mongoDB database.SubscribeUser + tx tx.Tx + userDB database.User + cache cache.UserCache } -func NewUserDatabase(userDB database.User, cache cache.UserCache, tx tx.Tx, mongoDB database.SubscribeUser) UserDatabase { - return &userDatabase{userDB: userDB, cache: cache, tx: tx, mongoDB: mongoDB} +func NewUserDatabase(userDB database.User, cache cache.UserCache, tx tx.Tx) UserDatabase { + return &userDatabase{userDB: userDB, cache: cache, tx: tx} } func (u *userDatabase) InitOnce(ctx context.Context, users []*model.User) error { @@ -212,36 +203,6 @@ func (u *userDatabase) SortQuery(ctx context.Context, userIDName map[string]stri return u.userDB.SortQuery(ctx, userIDName, asc) } -// SubscribeUsersStatus Subscribe or unsubscribe a user's presence status. -func (u *userDatabase) SubscribeUsersStatus(ctx context.Context, userID string, userIDs []string) error { - err := u.mongoDB.AddSubscriptionList(ctx, userID, userIDs) - return err -} - -// UnsubscribeUsersStatus unsubscribe a user's presence status. -func (u *userDatabase) UnsubscribeUsersStatus(ctx context.Context, userID string, userIDs []string) error { - err := u.mongoDB.UnsubscriptionList(ctx, userID, userIDs) - return err -} - -// GetAllSubscribeList Get a list of all subscriptions. -func (u *userDatabase) GetAllSubscribeList(ctx context.Context, userID string) ([]string, error) { - list, err := u.mongoDB.GetAllSubscribeList(ctx, userID) - if err != nil { - return nil, err - } - return list, nil -} - -// GetSubscribedList Get all subscribed lists. -func (u *userDatabase) GetSubscribedList(ctx context.Context, userID string) ([]string, error) { - list, err := u.mongoDB.GetSubscribedList(ctx, userID) - if err != nil { - return nil, err - } - return list, nil -} - func (u *userDatabase) AddUserCommand(ctx context.Context, userID string, Type int32, UUID string, value string, ex string) error { return u.userDB.AddUserCommand(ctx, userID, Type, UUID, value, ex) } diff --git a/pkg/common/storage/database/mgo/friend.go b/pkg/common/storage/database/mgo/friend.go index 7f456fbda..76c82bac2 100644 --- a/pkg/common/storage/database/mgo/friend.go +++ b/pkg/common/storage/database/mgo/friend.go @@ -109,7 +109,13 @@ func (f *FriendMgo) UpdateByMap(ctx context.Context, ownerUserID string, friendU return mongoutil.IncrVersion(func() error { return mongoutil.UpdateOne(ctx, f.coll, filter, bson.M{"$set": args}, true) }, func() error { - return f.owner.IncrVersion(ctx, ownerUserID, []string{friendUserID}, model.VersionStateUpdate) + var friendUserIDs []string + if f.IsUpdateIsPinned(args) { + friendUserIDs = []string{model.VersionSortChangeID, friendUserID} + } else { + friendUserIDs = []string{friendUserID} + } + return f.owner.IncrVersion(ctx, ownerUserID, friendUserIDs, model.VersionStateUpdate) }) } @@ -214,7 +220,7 @@ func (f *FriendMgo) FindFriendUserIDs(ctx context.Context, ownerUserID string) ( func (f *FriendMgo) UpdateFriends(ctx context.Context, ownerUserID string, friendUserIDs []string, val map[string]any) error { // Ensure there are IDs to update - if len(friendUserIDs) == 0 { + if len(friendUserIDs) == 0 || len(val) == 0 { return nil // Or return an error if you expect there to always be IDs } @@ -230,7 +236,13 @@ func (f *FriendMgo) UpdateFriends(ctx context.Context, ownerUserID string, frien return mongoutil.IncrVersion(func() error { return mongoutil.Ignore(mongoutil.UpdateMany(ctx, f.coll, filter, update)) }, func() error { - return f.owner.IncrVersion(ctx, ownerUserID, friendUserIDs, model.VersionStateUpdate) + var userIDs []string + if f.IsUpdateIsPinned(val) { + userIDs = append([]string{model.VersionSortChangeID}, friendUserIDs...) + } else { + userIDs = friendUserIDs + } + return f.owner.IncrVersion(ctx, ownerUserID, userIDs, model.VersionStateUpdate) }) } @@ -248,3 +260,11 @@ func (f *FriendMgo) FindFriendUserID(ctx context.Context, friendUserID string) ( func (f *FriendMgo) IncrVersion(ctx context.Context, ownerUserID string, friendUserIDs []string, state int32) error { return f.owner.IncrVersion(ctx, ownerUserID, friendUserIDs, state) } + +func (f *FriendMgo) IsUpdateIsPinned(data map[string]any) bool { + if data == nil { + return false + } + _, ok := data["is_pinned"] + return ok +} diff --git a/pkg/common/storage/database/mgo/group_member.go b/pkg/common/storage/database/mgo/group_member.go index f89822d3c..42b3dd72b 100644 --- a/pkg/common/storage/database/mgo/group_member.go +++ b/pkg/common/storage/database/mgo/group_member.go @@ -59,7 +59,7 @@ type GroupMemberMgo struct { } func (g *GroupMemberMgo) memberSort() any { - return bson.D{{"role_level", -1}, {"create_time", -1}} + return bson.D{{"role_level", -1}, {"create_time", 1}} } func (g *GroupMemberMgo) Create(ctx context.Context, groupMembers []*model.GroupMember) (err error) { @@ -118,7 +118,7 @@ func (g *GroupMemberMgo) UpdateRoleLevel(ctx context.Context, groupID string, us return mongoutil.UpdateOne(ctx, g.coll, bson.M{"group_id": groupID, "user_id": userID}, bson.M{"$set": bson.M{"role_level": roleLevel}}, true) }, func() error { - return g.member.IncrVersion(ctx, groupID, []string{userID}, model.VersionStateUpdate) + return g.member.IncrVersion(ctx, groupID, []string{model.VersionSortChangeID, userID}, model.VersionStateUpdate) }) } func (g *GroupMemberMgo) UpdateUserRoleLevels(ctx context.Context, groupID string, firstUserID string, firstUserRoleLevel int32, secondUserID string, secondUserRoleLevel int32) error { @@ -131,10 +131,9 @@ func (g *GroupMemberMgo) UpdateUserRoleLevels(ctx context.Context, groupID strin bson.M{"$set": bson.M{"role_level": secondUserRoleLevel}}, true); err != nil { return err } - return nil }, func() error { - return g.member.IncrVersion(ctx, groupID, []string{firstUserID, secondUserID}, model.VersionStateUpdate) + return g.member.IncrVersion(ctx, groupID, []string{model.VersionSortChangeID, firstUserID, secondUserID}, model.VersionStateUpdate) }) } @@ -145,7 +144,13 @@ func (g *GroupMemberMgo) Update(ctx context.Context, groupID string, userID stri return mongoutil.IncrVersion(func() error { return mongoutil.UpdateOne(ctx, g.coll, bson.M{"group_id": groupID, "user_id": userID}, bson.M{"$set": data}, true) }, func() error { - return g.member.IncrVersion(ctx, groupID, []string{userID}, model.VersionStateUpdate) + var userIDs []string + if g.IsUpdateRoleLevel(data) { + userIDs = []string{model.VersionSortChangeID, userID} + } else { + userIDs = []string{userID} + } + return g.member.IncrVersion(ctx, groupID, userIDs, model.VersionStateUpdate) }) } diff --git a/pkg/common/storage/database/mgo/subscribe.go b/pkg/common/storage/database/mgo/subscribe.go deleted file mode 100644 index 5b7d9786b..000000000 --- a/pkg/common/storage/database/mgo/subscribe.go +++ /dev/null @@ -1,189 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package mgo - -import ( - "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" - - "github.com/openimsdk/tools/errs" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" -) - -// prefixes and suffixes. -const ( - SubscriptionPrefix = "subscription_prefix" - SubscribedPrefix = "subscribed_prefix" -) - -// MaximumSubscription Maximum number of subscriptions. -const ( - MaximumSubscription = 3000 -) - -func NewUserMongoDriver(database *mongo.Database) database.SubscribeUser { - return &UserMongoDriver{ - userCollection: database.Collection(model.SubscribeUserTableName), - } -} - -type UserMongoDriver struct { - userCollection *mongo.Collection -} - -// AddSubscriptionList Subscriber's handling of thresholds. -func (u *UserMongoDriver) AddSubscriptionList(ctx context.Context, userID string, userIDList []string) error { - // Check the number of lists in the key. - pipeline := mongo.Pipeline{ - {{"$match", bson.D{{"user_id", SubscriptionPrefix + userID}}}}, - {{"$project", bson.D{{"count", bson.D{{"$size", "$user_id_list"}}}}}}, - } - // perform aggregate operations - cursor, err := u.userCollection.Aggregate(ctx, pipeline) - if err != nil { - return errs.Wrap(err) - } - defer cursor.Close(ctx) - var cnt struct { - Count int `bson:"count"` - } - // iterate over aggregated results - for cursor.Next(ctx) { - err = cursor.Decode(&cnt) - if err != nil { - return errs.Wrap(err) - } - } - var newUserIDList []string - // If the threshold is exceeded, pop out the previous MaximumSubscription - len(userIDList) and insert it. - if cnt.Count+len(userIDList) > MaximumSubscription { - newUserIDList, err = u.GetAllSubscribeList(ctx, userID) - if err != nil { - return err - } - newUserIDList = newUserIDList[MaximumSubscription-len(userIDList):] - _, err = u.userCollection.UpdateOne( - ctx, - bson.M{"user_id": SubscriptionPrefix + userID}, - bson.M{"$set": bson.M{"user_id_list": newUserIDList}}, - ) - if err != nil { - return err - } - // Another way to subscribe to N before pop,Delete after testing - /*for i := 1; i <= MaximumSubscription-len(userIDList); i++ { - _, err := u.userCollection.UpdateOne( - ctx, - bson.M{"user_id": SubscriptionPrefix + userID}, - bson.M{SubscriptionPrefix + userID: bson.M{"$pop": -1}}, - ) - if err != nil { - return err - } - }*/ - } - upsert := true - opts := &options.UpdateOptions{ - Upsert: &upsert, - } - _, err = u.userCollection.UpdateOne( - ctx, - bson.M{"user_id": SubscriptionPrefix + userID}, - bson.M{"$addToSet": bson.M{"user_id_list": bson.M{"$each": userIDList}}}, - opts, - ) - if err != nil { - return errs.Wrap(err) - } - for _, user := range userIDList { - _, err = u.userCollection.UpdateOne( - ctx, - bson.M{"user_id": SubscribedPrefix + user}, - bson.M{"$addToSet": bson.M{"user_id_list": userID}}, - opts, - ) - if err != nil { - return errs.WrapMsg(err, "transaction failed") - } - } - return nil -} - -// UnsubscriptionList Handling of unsubscribe. -func (u *UserMongoDriver) UnsubscriptionList(ctx context.Context, userID string, userIDList []string) error { - _, err := u.userCollection.UpdateOne( - ctx, - bson.M{"user_id": SubscriptionPrefix + userID}, - bson.M{"$pull": bson.M{"user_id_list": bson.M{"$in": userIDList}}}, - ) - if err != nil { - return errs.Wrap(err) - } - err = u.RemoveSubscribedListFromUser(ctx, userID, userIDList) - if err != nil { - return errs.Wrap(err) - } - return nil -} - -// RemoveSubscribedListFromUser Among the unsubscribed users, delete the user from the subscribed list. -func (u *UserMongoDriver) RemoveSubscribedListFromUser(ctx context.Context, userID string, userIDList []string) error { - var err error - for _, userIDTemp := range userIDList { - _, err = u.userCollection.UpdateOne( - ctx, - bson.M{"user_id": SubscribedPrefix + userIDTemp}, - bson.M{"$pull": bson.M{"user_id_list": userID}}, - ) - } - return errs.Wrap(err) -} - -// GetAllSubscribeList Get all users subscribed by this user. -func (u *UserMongoDriver) GetAllSubscribeList(ctx context.Context, userID string) (userIDList []string, err error) { - var user model.SubscribeUser - cursor := u.userCollection.FindOne( - ctx, - bson.M{"user_id": SubscriptionPrefix + userID}) - err = cursor.Decode(&user) - if err != nil { - if err == mongo.ErrNoDocuments { - return []string{}, nil - } else { - return nil, errs.Wrap(err) - } - } - return user.UserIDList, nil -} - -// GetSubscribedList Get the user subscribed by those users. -func (u *UserMongoDriver) GetSubscribedList(ctx context.Context, userID string) (userIDList []string, err error) { - var user model.SubscribeUser - cursor := u.userCollection.FindOne( - ctx, - bson.M{"user_id": SubscribedPrefix + userID}) - err = cursor.Decode(&user) - if err != nil { - if err == mongo.ErrNoDocuments { - return []string{}, nil - } else { - return nil, errs.Wrap(err) - } - } - return user.UserIDList, nil -} diff --git a/pkg/common/storage/database/mgo/version_log.go b/pkg/common/storage/database/mgo/version_log.go index 8836742f0..3b449007b 100644 --- a/pkg/common/storage/database/mgo/version_log.go +++ b/pkg/common/storage/database/mgo/version_log.go @@ -18,8 +18,8 @@ import ( func NewVersionLog(coll *mongo.Collection) (database.VersionLog, error) { lm := &VersionLogMgo{coll: coll} - if lm.initIndex(context.Background()) != nil { - return nil, errs.ErrInternalServer.WrapMsg("init index failed", "coll", coll.Name()) + if err := lm.initIndex(context.Background()); err != nil { + return nil, errs.WrapMsg(err, "init version log index failed", "coll", coll.Name()) } return lm, nil } @@ -33,6 +33,7 @@ func (l *VersionLogMgo) initIndex(ctx context.Context) error { Keys: bson.M{ "d_id": 1, }, + Options: options.Index().SetUnique(true), }) return err } @@ -152,8 +153,24 @@ func (l *VersionLogMgo) writeLogBatch2(ctx context.Context, dId string, eIds []s "$unset": "delete_e_ids", }, } - opt := options.FindOneAndUpdate().SetUpsert(false).SetReturnDocument(options.After).SetProjection(bson.M{"logs": 0}) - return mongoutil.FindOneAndUpdate[*model.VersionLog](ctx, l.coll, filter, pipeline, opt) + projection := bson.M{ + "logs": 0, + } + opt := options.FindOneAndUpdate().SetUpsert(false).SetReturnDocument(options.After).SetProjection(projection) + res, err := mongoutil.FindOneAndUpdate[*model.VersionLog](ctx, l.coll, filter, pipeline, opt) + if err != nil { + return nil, err + } + res.Logs = make([]model.VersionLogElem, 0, len(eIds)) + for _, id := range eIds { + res.Logs = append(res.Logs, model.VersionLogElem{ + EID: id, + State: state, + Version: res.Version, + LastUpdate: res.LastUpdate, + }) + } + return res, nil } func (l *VersionLogMgo) findDoc(ctx context.Context, dId string) (*model.VersionLog, error) { diff --git a/pkg/common/storage/database/mgo/version_test.go b/pkg/common/storage/database/mgo/version_test.go index 236c61a2c..4576e45bc 100644 --- a/pkg/common/storage/database/mgo/version_test.go +++ b/pkg/common/storage/database/mgo/version_test.go @@ -9,12 +9,12 @@ import ( "time" ) -func Result[V any](val V, err error) V { - if err != nil { - panic(err) - } - return val -} +//func Result[V any](val V, err error) V { +// if err != nil { +// panic(err) +// } +// return val +//} func Check(err error) { if err != nil { @@ -30,7 +30,7 @@ func TestName(t *testing.T) { panic(err) } vl := tmp.(*VersionLogMgo) - res, err := vl.writeLogBatch2(context.Background(), "100", []string{"1000", "1001", "1003"}, model.VersionStateInsert, time.Now()) + res, err := vl.incrVersionResult(context.Background(), "100", []string{"1000", "1001", "1003"}, model.VersionStateInsert) if err != nil { t.Log(err) return diff --git a/pkg/common/storage/database/subscribe.go b/pkg/common/storage/database/subscribe.go deleted file mode 100644 index 5905ecd07..000000000 --- a/pkg/common/storage/database/subscribe.go +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package database - -import "context" - -// SubscribeUser Operation interface of user mongodb. -type SubscribeUser interface { - // AddSubscriptionList Subscriber's handling of thresholds. - AddSubscriptionList(ctx context.Context, userID string, userIDList []string) error - // UnsubscriptionList Handling of unsubscribe. - UnsubscriptionList(ctx context.Context, userID string, userIDList []string) error - // RemoveSubscribedListFromUser Among the unsubscribed users, delete the user from the subscribed list. - RemoveSubscribedListFromUser(ctx context.Context, userID string, userIDList []string) error - // GetAllSubscribeList Get all users subscribed by this user - GetAllSubscribeList(ctx context.Context, id string) (userIDList []string, err error) - // GetSubscribedList Get the user subscribed by those users - GetSubscribedList(ctx context.Context, id string) (userIDList []string, err error) -} diff --git a/pkg/common/storage/model/version_log.go b/pkg/common/storage/model/version_log.go index 11a40ef24..6ed8d30f2 100644 --- a/pkg/common/storage/model/version_log.go +++ b/pkg/common/storage/model/version_log.go @@ -14,6 +14,11 @@ const ( VersionStateUpdate ) +const ( + VersionGroupChangeID = "" + VersionSortChangeID = "____S_O_R_T_I_D____" +) + type VersionLogElem struct { EID string `bson:"e_id"` State int32 `bson:"state"` diff --git a/tools/seq/main.go b/tools/seq/main.go index 399e6d934..16da9f156 100644 --- a/tools/seq/main.go +++ b/tools/seq/main.go @@ -12,7 +12,7 @@ func main() { config string second int ) - flag.StringVar(&config, "c", "/Users/chao/Desktop/project/open-im-server/config", "config directory") + flag.StringVar(&config, "c", "", "config directory") flag.IntVar(&second, "sec", 3600*24, "delayed deletion of the original seq key after conversion") flag.Parse() if err := internal.Main(config, time.Duration(second)*time.Second); err != nil {