mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-11-01 00:42:13 +08:00 
			
		
		
		
	* pb * fix: Modifying other fields while setting IsPrivateChat does not take effect * fix: quote message error revoke * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * upgrading pkg tools * fix * fix * optimize log output * feat: support GetLastMessage * feat: support GetLastMessage * feat: s3 switch * feat: s3 switch * fix: GetUsersOnline * feat: SendBusinessNotification supported configuration parameters * feat: SendBusinessNotification supported configuration parameters * feat: SendBusinessNotification supported configuration parameters * feat: seq conversion failed without exiting * monolithic * fix: DeleteDoc crash * fix: DeleteDoc crash * fix: monolithic * fix: monolithic * fix: fill send time * fix: fill send time * fix: crash caused by withdrawing messages from users who have left the group * fix: mq * fix: mq * fix: user msg timestamp * fix: mq * 1 * 1 * 1 * 1 * 1 * 1 * 1 * seq read config * seq read config * 1 * 1 * fix: the source message of the reference is withdrawn, and the referenced message is deleted * 1 * 1 * 1 * 1 * 1 * 1 * 1 * 1 * 1 * 1 * 1 * 1 * 1 * 1
		
			
				
	
	
		
			184 lines
		
	
	
		
			4.3 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			184 lines
		
	
	
		
			4.3 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package mgo
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"strconv"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/google/uuid"
 | |
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
 | |
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
 | |
| 	"github.com/openimsdk/tools/db/mongoutil"
 | |
| 	"github.com/openimsdk/tools/errs"
 | |
| 	"go.mongodb.org/mongo-driver/bson"
 | |
| 	"go.mongodb.org/mongo-driver/mongo"
 | |
| 	"go.mongodb.org/mongo-driver/mongo/options"
 | |
| )
 | |
| 
 | |
| func NewCacheMgo(db *mongo.Database) (*CacheMgo, error) {
 | |
| 	coll := db.Collection(database.CacheName)
 | |
| 	_, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
 | |
| 		{
 | |
| 			Keys: bson.D{
 | |
| 				{Key: "key", Value: 1},
 | |
| 			},
 | |
| 			Options: options.Index().SetUnique(true),
 | |
| 		},
 | |
| 		{
 | |
| 			Keys: bson.D{
 | |
| 				{Key: "expire_at", Value: 1},
 | |
| 			},
 | |
| 			Options: options.Index().SetExpireAfterSeconds(0),
 | |
| 		},
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		return nil, errs.Wrap(err)
 | |
| 	}
 | |
| 	return &CacheMgo{coll: coll}, nil
 | |
| }
 | |
| 
 | |
| type CacheMgo struct {
 | |
| 	coll *mongo.Collection
 | |
| }
 | |
| 
 | |
| func (x *CacheMgo) findToMap(res []model.Cache, now time.Time) map[string]string {
 | |
| 	kv := make(map[string]string)
 | |
| 	for _, re := range res {
 | |
| 		if re.ExpireAt != nil && re.ExpireAt.Before(now) {
 | |
| 			continue
 | |
| 		}
 | |
| 		kv[re.Key] = re.Value
 | |
| 	}
 | |
| 	return kv
 | |
| 
 | |
| }
 | |
| 
 | |
| func (x *CacheMgo) Get(ctx context.Context, key []string) (map[string]string, error) {
 | |
| 	if len(key) == 0 {
 | |
| 		return nil, nil
 | |
| 	}
 | |
| 	now := time.Now()
 | |
| 	res, err := mongoutil.Find[model.Cache](ctx, x.coll, bson.M{
 | |
| 		"key": bson.M{"$in": key},
 | |
| 		"$or": []bson.M{
 | |
| 			{"expire_at": bson.M{"$gt": now}},
 | |
| 			{"expire_at": nil},
 | |
| 		},
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	return x.findToMap(res, now), nil
 | |
| }
 | |
| 
 | |
| func (x *CacheMgo) Prefix(ctx context.Context, prefix string) (map[string]string, error) {
 | |
| 	now := time.Now()
 | |
| 	res, err := mongoutil.Find[model.Cache](ctx, x.coll, bson.M{
 | |
| 		"key": bson.M{"$regex": "^" + prefix},
 | |
| 		"$or": []bson.M{
 | |
| 			{"expire_at": bson.M{"$gt": now}},
 | |
| 			{"expire_at": nil},
 | |
| 		},
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	return x.findToMap(res, now), nil
 | |
| }
 | |
| 
 | |
| func (x *CacheMgo) Set(ctx context.Context, key string, value string, expireAt time.Duration) error {
 | |
| 	cv := &model.Cache{
 | |
| 		Key:   key,
 | |
| 		Value: value,
 | |
| 	}
 | |
| 	if expireAt > 0 {
 | |
| 		now := time.Now().Add(expireAt)
 | |
| 		cv.ExpireAt = &now
 | |
| 	}
 | |
| 	opt := options.Update().SetUpsert(true)
 | |
| 	return mongoutil.UpdateOne(ctx, x.coll, bson.M{"key": key}, bson.M{"$set": cv}, false, opt)
 | |
| }
 | |
| 
 | |
| func (x *CacheMgo) Incr(ctx context.Context, key string, value int) (int, error) {
 | |
| 	pipeline := mongo.Pipeline{
 | |
| 		{
 | |
| 			{"$set", bson.M{
 | |
| 				"value": bson.M{
 | |
| 					"$toString": bson.M{
 | |
| 						"$add": bson.A{
 | |
| 							bson.M{"$toInt": "$value"},
 | |
| 							value,
 | |
| 						},
 | |
| 					},
 | |
| 				},
 | |
| 			}},
 | |
| 		},
 | |
| 	}
 | |
| 	opt := options.FindOneAndUpdate().SetReturnDocument(options.After)
 | |
| 	res, err := mongoutil.FindOneAndUpdate[model.Cache](ctx, x.coll, bson.M{"key": key}, pipeline, opt)
 | |
| 	if err != nil {
 | |
| 		return 0, err
 | |
| 	}
 | |
| 	return strconv.Atoi(res.Value)
 | |
| }
 | |
| 
 | |
| func (x *CacheMgo) Del(ctx context.Context, key []string) error {
 | |
| 	if len(key) == 0 {
 | |
| 		return nil
 | |
| 	}
 | |
| 	_, err := x.coll.DeleteMany(ctx, bson.M{"key": bson.M{"$in": key}})
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| func (x *CacheMgo) lockKey(key string) string {
 | |
| 	return "LOCK_" + key
 | |
| }
 | |
| 
 | |
| func (x *CacheMgo) Lock(ctx context.Context, key string, duration time.Duration) (string, error) {
 | |
| 	tmp, err := uuid.NewUUID()
 | |
| 	if err != nil {
 | |
| 		return "", err
 | |
| 	}
 | |
| 	if duration <= 0 || duration > time.Minute*10 {
 | |
| 		duration = time.Minute * 10
 | |
| 	}
 | |
| 	cv := &model.Cache{
 | |
| 		Key:      x.lockKey(key),
 | |
| 		Value:    tmp.String(),
 | |
| 		ExpireAt: nil,
 | |
| 	}
 | |
| 	ctx, cancel := context.WithTimeout(ctx, time.Second*30)
 | |
| 	defer cancel()
 | |
| 	wait := func() error {
 | |
| 		timeout := time.NewTimer(time.Millisecond * 100)
 | |
| 		defer timeout.Stop()
 | |
| 		select {
 | |
| 		case <-ctx.Done():
 | |
| 			return ctx.Err()
 | |
| 		case <-timeout.C:
 | |
| 			return nil
 | |
| 		}
 | |
| 	}
 | |
| 	for {
 | |
| 		if err := mongoutil.DeleteOne(ctx, x.coll, bson.M{"key": key, "expire_at": bson.M{"$lt": time.Now()}}); err != nil {
 | |
| 			return "", err
 | |
| 		}
 | |
| 		expireAt := time.Now().Add(duration)
 | |
| 		cv.ExpireAt = &expireAt
 | |
| 		if err := mongoutil.InsertMany[*model.Cache](ctx, x.coll, []*model.Cache{cv}); err != nil {
 | |
| 			if mongo.IsDuplicateKeyError(err) {
 | |
| 				if err := wait(); err != nil {
 | |
| 					return "", err
 | |
| 				}
 | |
| 				continue
 | |
| 			}
 | |
| 			return "", err
 | |
| 		}
 | |
| 		return cv.Value, nil
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (x *CacheMgo) Unlock(ctx context.Context, key string, value string) error {
 | |
| 	return mongoutil.DeleteOne(ctx, x.coll, bson.M{"key": x.lockKey(key), "value": value})
 | |
| }
 |