mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-10-31 08:29:33 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			154 lines
		
	
	
		
			3.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			154 lines
		
	
	
		
			3.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package unrelation
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"fmt"
 | |
| 	"os"
 | |
| 	"strings"
 | |
| 	"time"
 | |
| 
 | |
| 	"go.mongodb.org/mongo-driver/bson"
 | |
| 	"go.mongodb.org/mongo-driver/mongo"
 | |
| 	"go.mongodb.org/mongo-driver/mongo/options"
 | |
| 
 | |
| 	"github.com/OpenIMSDK/tools/errs"
 | |
| 	"github.com/OpenIMSDK/tools/mw/specialerror"
 | |
| 	"github.com/OpenIMSDK/tools/utils"
 | |
| 
 | |
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/config"
 | |
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	maxRetry         = 10 // number of retries
 | |
| 	mongoConnTimeout = 10 * time.Second
 | |
| )
 | |
| 
 | |
| type Mongo struct {
 | |
| 	db *mongo.Client
 | |
| }
 | |
| 
 | |
| // NewMongo Initialize MongoDB connection.
 | |
| func NewMongo() (*Mongo, error) {
 | |
| 	specialerror.AddReplace(mongo.ErrNoDocuments, errs.ErrRecordNotFound)
 | |
| 	uri := buildMongoURI()
 | |
| 	fmt.Println("mongo:", uri)
 | |
| 
 | |
| 	var mongoClient *mongo.Client
 | |
| 	var err error
 | |
| 
 | |
| 	// Retry connecting to MongoDB
 | |
| 	for i := 0; i <= maxRetry; i++ {
 | |
| 		ctx, cancel := context.WithTimeout(context.Background(), mongoConnTimeout)
 | |
| 		defer cancel()
 | |
| 		mongoClient, err = mongo.Connect(ctx, options.Client().ApplyURI(uri))
 | |
| 		if err == nil {
 | |
| 			return &Mongo{db: mongoClient}, nil
 | |
| 		}
 | |
| 		if shouldRetry(err) {
 | |
| 			fmt.Printf("Failed to connect to MongoDB, retrying: %s\n", err)
 | |
| 			time.Sleep(time.Second) // exponential backoff could be implemented here
 | |
| 			continue
 | |
| 		}
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	return nil, err
 | |
| }
 | |
| 
 | |
| func buildMongoURI() string {
 | |
| 	uri := os.Getenv("MONGO_URI")
 | |
| 	if uri != "" {
 | |
| 		return uri
 | |
| 	}
 | |
| 
 | |
| 	username := os.Getenv("MONGO_USERNAME")
 | |
| 	password := os.Getenv("MONGO_PASSWORD")
 | |
| 	address := os.Getenv("MONGO_ADDRESS")
 | |
| 	port := os.Getenv("MONGO_PORT")
 | |
| 	database := os.Getenv("MONGO_DATABASE")
 | |
| 	maxPoolSize := os.Getenv("MONGO_MAX_POOL_SIZE")
 | |
| 
 | |
| 	if username == "" {
 | |
| 		username = config.Config.Mongo.Username
 | |
| 	}
 | |
| 	if password == "" {
 | |
| 		password = config.Config.Mongo.Password
 | |
| 	}
 | |
| 	if address == "" {
 | |
| 		address = strings.Join(config.Config.Mongo.Address, ",")
 | |
| 	} else if port != "" {
 | |
| 		address = fmt.Sprintf("%s:%s", address, port)
 | |
| 	}
 | |
| 	if database == "" {
 | |
| 		database = config.Config.Mongo.Database
 | |
| 	}
 | |
| 	if maxPoolSize == "" {
 | |
| 		maxPoolSize = fmt.Sprint(config.Config.Mongo.MaxPoolSize)
 | |
| 	}
 | |
| 
 | |
| 	uriFormat := "mongodb://%s/%s?maxPoolSize=%s&authSource=admin"
 | |
| 	if username != "" && password != "" {
 | |
| 		uriFormat = "mongodb://%s:%s@%s/%s?maxPoolSize=%s&authSource=admin"
 | |
| 		return fmt.Sprintf(uriFormat, username, password, address, database, maxPoolSize)
 | |
| 	}
 | |
| 	return fmt.Sprintf(uriFormat, address, database, maxPoolSize)
 | |
| }
 | |
| 
 | |
| func shouldRetry(err error) bool {
 | |
| 	if cmdErr, ok := err.(mongo.CommandError); ok {
 | |
| 		return cmdErr.Code != 13 && cmdErr.Code != 18
 | |
| 	}
 | |
| 	return true
 | |
| }
 | |
| 
 | |
| // GetClient returns the MongoDB client.
 | |
| func (m *Mongo) GetClient() *mongo.Client {
 | |
| 	return m.db
 | |
| }
 | |
| 
 | |
| // GetDatabase returns the specific database from MongoDB.
 | |
| func (m *Mongo) GetDatabase() *mongo.Database {
 | |
| 	return m.db.Database(config.Config.Mongo.Database)
 | |
| }
 | |
| 
 | |
| // CreateMsgIndex creates an index for messages in MongoDB.
 | |
| func (m *Mongo) CreateMsgIndex() error {
 | |
| 	return m.createMongoIndex(unrelation.Msg, true, "doc_id")
 | |
| }
 | |
| 
 | |
| // createMongoIndex creates an index in a MongoDB collection.
 | |
| func (m *Mongo) createMongoIndex(collection string, isUnique bool, keys ...string) error {
 | |
| 	db := m.GetDatabase().Collection(collection)
 | |
| 	opts := options.CreateIndexes().SetMaxTime(10 * time.Second)
 | |
| 	indexView := db.Indexes()
 | |
| 
 | |
| 	keysDoc := buildIndexKeys(keys)
 | |
| 
 | |
| 	index := mongo.IndexModel{
 | |
| 		Keys: keysDoc,
 | |
| 	}
 | |
| 	if isUnique {
 | |
| 		index.Options = options.Index().SetUnique(true)
 | |
| 	}
 | |
| 
 | |
| 	_, err := indexView.CreateOne(context.Background(), index, opts)
 | |
| 	if err != nil {
 | |
| 		return utils.Wrap(err, "CreateIndex")
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // buildIndexKeys builds the BSON document for index keys.
 | |
| func buildIndexKeys(keys []string) bson.D {
 | |
| 	keysDoc := bson.D{}
 | |
| 	for _, key := range keys {
 | |
| 		direction := 1 // default direction is ascending
 | |
| 		if strings.HasPrefix(key, "-") {
 | |
| 			direction = -1 // descending order for prefixed with "-"
 | |
| 			key = strings.TrimLeft(key, "-")
 | |
| 		}
 | |
| 		keysDoc = append(keysDoc, bson.E{Key: key, Value: direction})
 | |
| 	}
 | |
| 	return keysDoc
 | |
| }
 |