mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-11-04 11:22:10 +08:00 
			
		
		
		
	remove old version online subscription
This commit is contained in:
		
							parent
							
								
									ca1b1a2d72
								
							
						
					
					
						commit
						ddf8f0d5cd
					
				@ -3,7 +3,6 @@ package user
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"github.com/openimsdk/protocol/constant"
 | 
			
		||||
	"github.com/openimsdk/protocol/sdkws"
 | 
			
		||||
	pbuser "github.com/openimsdk/protocol/user"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
@ -38,23 +37,6 @@ func (s *userServer) getUsersOnlineStatus(ctx context.Context, userIDs []string)
 | 
			
		||||
 | 
			
		||||
// SubscribeOrCancelUsersStatus Subscribe online or cancel online users.
 | 
			
		||||
func (s *userServer) SubscribeOrCancelUsersStatus(ctx context.Context, req *pbuser.SubscribeOrCancelUsersStatusReq) (*pbuser.SubscribeOrCancelUsersStatusResp, error) {
 | 
			
		||||
	if req.Genre == constant.SubscriberUser {
 | 
			
		||||
		err := s.db.SubscribeUsersStatus(ctx, req.UserID, req.UserIDs)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
		var status []*pbuser.OnlineStatus
 | 
			
		||||
		status, err = s.getUsersOnlineStatus(ctx, req.UserIDs)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
		return &pbuser.SubscribeOrCancelUsersStatusResp{StatusList: status}, nil
 | 
			
		||||
	} else if req.Genre == constant.Unsubscribe {
 | 
			
		||||
		err := s.db.UnsubscribeUsersStatus(ctx, req.UserID, req.UserIDs)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return &pbuser.SubscribeOrCancelUsersStatusResp{}, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -82,34 +64,12 @@ func (s *userServer) SetUserStatus(ctx context.Context, req *pbuser.SetUserStatu
 | 
			
		||||
	if err := s.online.SetUserOnline(ctx, req.UserID, online, offline); err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	list, err := s.db.GetSubscribedList(ctx, req.UserID)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	for _, userID := range list {
 | 
			
		||||
		tips := &sdkws.UserStatusChangeTips{
 | 
			
		||||
			FromUserID: req.UserID,
 | 
			
		||||
			ToUserID:   userID,
 | 
			
		||||
			Status:     req.Status,
 | 
			
		||||
			PlatformID: req.PlatformID,
 | 
			
		||||
		}
 | 
			
		||||
		s.userNotificationSender.UserStatusChangeNotification(ctx, tips)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return &pbuser.SetUserStatusResp{}, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetSubscribeUsersStatus Get the online status of subscribers.
 | 
			
		||||
func (s *userServer) GetSubscribeUsersStatus(ctx context.Context, req *pbuser.GetSubscribeUsersStatusReq) (*pbuser.GetSubscribeUsersStatusResp, error) {
 | 
			
		||||
	userList, err := s.db.GetAllSubscribeList(ctx, req.UserID)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	onlineStatusList, err := s.getUsersOnlineStatus(ctx, userList)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	return &pbuser.GetSubscribeUsersStatusResp{StatusList: onlineStatusList}, nil
 | 
			
		||||
	return &pbuser.GetSubscribeUsersStatusResp{}, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *userServer) SetUserOnlineStatus(ctx context.Context, req *pbuser.SetUserOnlineStatusReq) (*pbuser.SetUserOnlineStatusResp, error) {
 | 
			
		||||
 | 
			
		||||
@ -93,8 +93,7 @@ func Start(ctx context.Context, config *Config, client registry.SvcDiscoveryRegi
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	userCache := redis.NewUserCacheRedis(rdb, &config.LocalCacheConfig, userDB, redis.GetRocksCacheOptions())
 | 
			
		||||
	userMongoDB := mgo.NewUserMongoDriver(mgocli.GetDB())
 | 
			
		||||
	database := controller.NewUserDatabase(userDB, userCache, mgocli.GetTx(), userMongoDB)
 | 
			
		||||
	database := controller.NewUserDatabase(userDB, userCache, mgocli.GetTx())
 | 
			
		||||
	friendRpcClient := rpcclient.NewFriendRpcClient(client, config.Share.RpcRegisterName.Friend)
 | 
			
		||||
	groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group)
 | 
			
		||||
	msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg)
 | 
			
		||||
 | 
			
		||||
@ -62,14 +62,6 @@ type UserDatabase interface {
 | 
			
		||||
	CountRangeEverydayTotal(ctx context.Context, start time.Time, end time.Time) (map[string]int64, error)
 | 
			
		||||
 | 
			
		||||
	SortQuery(ctx context.Context, userIDName map[string]string, asc bool) ([]*model.User, error)
 | 
			
		||||
	// SubscribeUsersStatus Subscribe a user's presence status
 | 
			
		||||
	SubscribeUsersStatus(ctx context.Context, userID string, userIDs []string) error
 | 
			
		||||
	// UnsubscribeUsersStatus unsubscribe a user's presence status
 | 
			
		||||
	UnsubscribeUsersStatus(ctx context.Context, userID string, userIDs []string) error
 | 
			
		||||
	// GetAllSubscribeList Get a list of all subscriptions
 | 
			
		||||
	GetAllSubscribeList(ctx context.Context, userID string) ([]string, error)
 | 
			
		||||
	// GetSubscribedList Get all subscribed lists
 | 
			
		||||
	GetSubscribedList(ctx context.Context, userID string) ([]string, error)
 | 
			
		||||
 | 
			
		||||
	// CRUD user command
 | 
			
		||||
	AddUserCommand(ctx context.Context, userID string, Type int32, UUID string, value string, ex string) error
 | 
			
		||||
@ -83,11 +75,10 @@ type userDatabase struct {
 | 
			
		||||
	tx     tx.Tx
 | 
			
		||||
	userDB database.User
 | 
			
		||||
	cache  cache.UserCache
 | 
			
		||||
	mongoDB database.SubscribeUser
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewUserDatabase(userDB database.User, cache cache.UserCache, tx tx.Tx, mongoDB database.SubscribeUser) UserDatabase {
 | 
			
		||||
	return &userDatabase{userDB: userDB, cache: cache, tx: tx, mongoDB: mongoDB}
 | 
			
		||||
func NewUserDatabase(userDB database.User, cache cache.UserCache, tx tx.Tx) UserDatabase {
 | 
			
		||||
	return &userDatabase{userDB: userDB, cache: cache, tx: tx}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (u *userDatabase) InitOnce(ctx context.Context, users []*model.User) error {
 | 
			
		||||
@ -212,36 +203,6 @@ func (u *userDatabase) SortQuery(ctx context.Context, userIDName map[string]stri
 | 
			
		||||
	return u.userDB.SortQuery(ctx, userIDName, asc)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SubscribeUsersStatus Subscribe or unsubscribe a user's presence status.
 | 
			
		||||
func (u *userDatabase) SubscribeUsersStatus(ctx context.Context, userID string, userIDs []string) error {
 | 
			
		||||
	err := u.mongoDB.AddSubscriptionList(ctx, userID, userIDs)
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// UnsubscribeUsersStatus unsubscribe a user's presence status.
 | 
			
		||||
func (u *userDatabase) UnsubscribeUsersStatus(ctx context.Context, userID string, userIDs []string) error {
 | 
			
		||||
	err := u.mongoDB.UnsubscriptionList(ctx, userID, userIDs)
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetAllSubscribeList Get a list of all subscriptions.
 | 
			
		||||
func (u *userDatabase) GetAllSubscribeList(ctx context.Context, userID string) ([]string, error) {
 | 
			
		||||
	list, err := u.mongoDB.GetAllSubscribeList(ctx, userID)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	return list, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetSubscribedList Get all subscribed lists.
 | 
			
		||||
func (u *userDatabase) GetSubscribedList(ctx context.Context, userID string) ([]string, error) {
 | 
			
		||||
	list, err := u.mongoDB.GetSubscribedList(ctx, userID)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	return list, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (u *userDatabase) AddUserCommand(ctx context.Context, userID string, Type int32, UUID string, value string, ex string) error {
 | 
			
		||||
	return u.userDB.AddUserCommand(ctx, userID, Type, UUID, value, ex)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -1,189 +0,0 @@
 | 
			
		||||
// Copyright © 2023 OpenIM. All rights reserved.
 | 
			
		||||
//
 | 
			
		||||
// Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
// you may not use this file except in compliance with the License.
 | 
			
		||||
// You may obtain a copy of the License at
 | 
			
		||||
//
 | 
			
		||||
//     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
//
 | 
			
		||||
// Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
// distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
// See the License for the specific language governing permissions and
 | 
			
		||||
// limitations under the License.
 | 
			
		||||
 | 
			
		||||
package mgo
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
 | 
			
		||||
 | 
			
		||||
	"github.com/openimsdk/tools/errs"
 | 
			
		||||
	"go.mongodb.org/mongo-driver/bson"
 | 
			
		||||
	"go.mongodb.org/mongo-driver/mongo"
 | 
			
		||||
	"go.mongodb.org/mongo-driver/mongo/options"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// prefixes and suffixes.
 | 
			
		||||
const (
 | 
			
		||||
	SubscriptionPrefix = "subscription_prefix"
 | 
			
		||||
	SubscribedPrefix   = "subscribed_prefix"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// MaximumSubscription Maximum number of subscriptions.
 | 
			
		||||
const (
 | 
			
		||||
	MaximumSubscription = 3000
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func NewUserMongoDriver(database *mongo.Database) database.SubscribeUser {
 | 
			
		||||
	return &UserMongoDriver{
 | 
			
		||||
		userCollection: database.Collection(model.SubscribeUserTableName),
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type UserMongoDriver struct {
 | 
			
		||||
	userCollection *mongo.Collection
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// AddSubscriptionList Subscriber's handling of thresholds.
 | 
			
		||||
func (u *UserMongoDriver) AddSubscriptionList(ctx context.Context, userID string, userIDList []string) error {
 | 
			
		||||
	// Check the number of lists in the key.
 | 
			
		||||
	pipeline := mongo.Pipeline{
 | 
			
		||||
		{{"$match", bson.D{{"user_id", SubscriptionPrefix + userID}}}},
 | 
			
		||||
		{{"$project", bson.D{{"count", bson.D{{"$size", "$user_id_list"}}}}}},
 | 
			
		||||
	}
 | 
			
		||||
	// perform aggregate operations
 | 
			
		||||
	cursor, err := u.userCollection.Aggregate(ctx, pipeline)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return errs.Wrap(err)
 | 
			
		||||
	}
 | 
			
		||||
	defer cursor.Close(ctx)
 | 
			
		||||
	var cnt struct {
 | 
			
		||||
		Count int `bson:"count"`
 | 
			
		||||
	}
 | 
			
		||||
	// iterate over aggregated results
 | 
			
		||||
	for cursor.Next(ctx) {
 | 
			
		||||
		err = cursor.Decode(&cnt)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return errs.Wrap(err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	var newUserIDList []string
 | 
			
		||||
	// If the threshold is exceeded, pop out the previous MaximumSubscription - len(userIDList) and insert it.
 | 
			
		||||
	if cnt.Count+len(userIDList) > MaximumSubscription {
 | 
			
		||||
		newUserIDList, err = u.GetAllSubscribeList(ctx, userID)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
		newUserIDList = newUserIDList[MaximumSubscription-len(userIDList):]
 | 
			
		||||
		_, err = u.userCollection.UpdateOne(
 | 
			
		||||
			ctx,
 | 
			
		||||
			bson.M{"user_id": SubscriptionPrefix + userID},
 | 
			
		||||
			bson.M{"$set": bson.M{"user_id_list": newUserIDList}},
 | 
			
		||||
		)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
		// Another way to subscribe to N before pop,Delete after testing
 | 
			
		||||
		/*for i := 1; i <= MaximumSubscription-len(userIDList); i++ {
 | 
			
		||||
			_, err := u.userCollection.UpdateOne(
 | 
			
		||||
				ctx,
 | 
			
		||||
				bson.M{"user_id": SubscriptionPrefix + userID},
 | 
			
		||||
				bson.M{SubscriptionPrefix + userID: bson.M{"$pop": -1}},
 | 
			
		||||
			)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return err
 | 
			
		||||
			}
 | 
			
		||||
		}*/
 | 
			
		||||
	}
 | 
			
		||||
	upsert := true
 | 
			
		||||
	opts := &options.UpdateOptions{
 | 
			
		||||
		Upsert: &upsert,
 | 
			
		||||
	}
 | 
			
		||||
	_, err = u.userCollection.UpdateOne(
 | 
			
		||||
		ctx,
 | 
			
		||||
		bson.M{"user_id": SubscriptionPrefix + userID},
 | 
			
		||||
		bson.M{"$addToSet": bson.M{"user_id_list": bson.M{"$each": userIDList}}},
 | 
			
		||||
		opts,
 | 
			
		||||
	)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return errs.Wrap(err)
 | 
			
		||||
	}
 | 
			
		||||
	for _, user := range userIDList {
 | 
			
		||||
		_, err = u.userCollection.UpdateOne(
 | 
			
		||||
			ctx,
 | 
			
		||||
			bson.M{"user_id": SubscribedPrefix + user},
 | 
			
		||||
			bson.M{"$addToSet": bson.M{"user_id_list": userID}},
 | 
			
		||||
			opts,
 | 
			
		||||
		)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return errs.WrapMsg(err, "transaction failed")
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// UnsubscriptionList Handling of unsubscribe.
 | 
			
		||||
func (u *UserMongoDriver) UnsubscriptionList(ctx context.Context, userID string, userIDList []string) error {
 | 
			
		||||
	_, err := u.userCollection.UpdateOne(
 | 
			
		||||
		ctx,
 | 
			
		||||
		bson.M{"user_id": SubscriptionPrefix + userID},
 | 
			
		||||
		bson.M{"$pull": bson.M{"user_id_list": bson.M{"$in": userIDList}}},
 | 
			
		||||
	)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return errs.Wrap(err)
 | 
			
		||||
	}
 | 
			
		||||
	err = u.RemoveSubscribedListFromUser(ctx, userID, userIDList)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return errs.Wrap(err)
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// RemoveSubscribedListFromUser Among the unsubscribed users, delete the user from the subscribed list.
 | 
			
		||||
func (u *UserMongoDriver) RemoveSubscribedListFromUser(ctx context.Context, userID string, userIDList []string) error {
 | 
			
		||||
	var err error
 | 
			
		||||
	for _, userIDTemp := range userIDList {
 | 
			
		||||
		_, err = u.userCollection.UpdateOne(
 | 
			
		||||
			ctx,
 | 
			
		||||
			bson.M{"user_id": SubscribedPrefix + userIDTemp},
 | 
			
		||||
			bson.M{"$pull": bson.M{"user_id_list": userID}},
 | 
			
		||||
		)
 | 
			
		||||
	}
 | 
			
		||||
	return errs.Wrap(err)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetAllSubscribeList Get all users subscribed by this user.
 | 
			
		||||
func (u *UserMongoDriver) GetAllSubscribeList(ctx context.Context, userID string) (userIDList []string, err error) {
 | 
			
		||||
	var user model.SubscribeUser
 | 
			
		||||
	cursor := u.userCollection.FindOne(
 | 
			
		||||
		ctx,
 | 
			
		||||
		bson.M{"user_id": SubscriptionPrefix + userID})
 | 
			
		||||
	err = cursor.Decode(&user)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		if err == mongo.ErrNoDocuments {
 | 
			
		||||
			return []string{}, nil
 | 
			
		||||
		} else {
 | 
			
		||||
			return nil, errs.Wrap(err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return user.UserIDList, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetSubscribedList Get the user subscribed by those users.
 | 
			
		||||
func (u *UserMongoDriver) GetSubscribedList(ctx context.Context, userID string) (userIDList []string, err error) {
 | 
			
		||||
	var user model.SubscribeUser
 | 
			
		||||
	cursor := u.userCollection.FindOne(
 | 
			
		||||
		ctx,
 | 
			
		||||
		bson.M{"user_id": SubscribedPrefix + userID})
 | 
			
		||||
	err = cursor.Decode(&user)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		if err == mongo.ErrNoDocuments {
 | 
			
		||||
			return []string{}, nil
 | 
			
		||||
		} else {
 | 
			
		||||
			return nil, errs.Wrap(err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return user.UserIDList, nil
 | 
			
		||||
}
 | 
			
		||||
@ -1,31 +0,0 @@
 | 
			
		||||
// Copyright © 2023 OpenIM. All rights reserved.
 | 
			
		||||
//
 | 
			
		||||
// Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
// you may not use this file except in compliance with the License.
 | 
			
		||||
// You may obtain a copy of the License at
 | 
			
		||||
//
 | 
			
		||||
//     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
//
 | 
			
		||||
// Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
// distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
// See the License for the specific language governing permissions and
 | 
			
		||||
// limitations under the License.
 | 
			
		||||
 | 
			
		||||
package database
 | 
			
		||||
 | 
			
		||||
import "context"
 | 
			
		||||
 | 
			
		||||
// SubscribeUser Operation interface of user mongodb.
 | 
			
		||||
type SubscribeUser interface {
 | 
			
		||||
	// AddSubscriptionList Subscriber's handling of thresholds.
 | 
			
		||||
	AddSubscriptionList(ctx context.Context, userID string, userIDList []string) error
 | 
			
		||||
	// UnsubscriptionList Handling of unsubscribe.
 | 
			
		||||
	UnsubscriptionList(ctx context.Context, userID string, userIDList []string) error
 | 
			
		||||
	// RemoveSubscribedListFromUser Among the unsubscribed users, delete the user from the subscribed list.
 | 
			
		||||
	RemoveSubscribedListFromUser(ctx context.Context, userID string, userIDList []string) error
 | 
			
		||||
	// GetAllSubscribeList Get all users subscribed by this user
 | 
			
		||||
	GetAllSubscribeList(ctx context.Context, id string) (userIDList []string, err error)
 | 
			
		||||
	// GetSubscribedList Get the user subscribed by those users
 | 
			
		||||
	GetSubscribedList(ctx context.Context, id string) (userIDList []string, err error)
 | 
			
		||||
}
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user