open-im-server/pkg/common/db/cache/init_redis.go

110 lines
3.3 KiB
Go

// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cache
import (
"context"
"errors"
"fmt"
"github.com/openimsdk/tools/log"
"github.com/redis/go-redis/v9"
"os"
"strings"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/mw/specialerror"
)
var (
// Singleton pattern.
redisClient redis.UniversalClient
)
const (
maxRetry = 10 // number of retries
)
// NewRedis Initialize redis connection.
func NewRedis(ctx context.Context, redisConf *config.Redis) (redis.UniversalClient, error) {
if redisClient != nil {
return redisClient, nil
}
// Read configuration from environment variables
overrideConfigFromEnv(redisConf)
if len(redisConf.Address) == 0 {
return nil, errs.Wrap(errors.New("redis address is empty"))
}
specialerror.AddReplace(redis.Nil, errs.ErrRecordNotFound)
var rdb redis.UniversalClient
if len(redisConf.Address) > 1 || redisConf.ClusterMode {
rdb = redis.NewClusterClient(&redis.ClusterOptions{
Addrs: redisConf.Address,
Username: redisConf.Username,
Password: redisConf.Password, // no password set
PoolSize: 50,
MaxRetries: maxRetry,
})
} else {
rdb = redis.NewClient(&redis.Options{
Addr: redisConf.Address[0],
Username: redisConf.Username,
Password: redisConf.Password,
DB: 0, // use default DB
PoolSize: 100, // connection pool size
MaxRetries: maxRetry,
})
}
var err error
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
err = rdb.Ping(ctx).Err()
if err != nil {
errMsg := fmt.Sprintf("address:%s, username:%s, password:%s, clusterMode:%t, enablePipeline:%t", redisConf.Address, redisConf.Username,
redisConf.Password, redisConf.ClusterMode, redisConf.EnablePipeline)
return nil, errs.WrapMsg(err, errMsg)
}
redisClient = rdb
log.CInfo(ctx, "redis connected successfully", "address", redisConf.Address, "username", redisConf.Username, "password", redisConf.Password, "clusterMode", redisConf.ClusterMode, "enablePipeline", redisConf.EnablePipeline)
return rdb, err
}
// overrideConfigFromEnv overrides configuration fields with environment variables if present.
func overrideConfigFromEnv(redis *config.Redis) {
if envAddr := os.Getenv("REDIS_ADDRESS"); envAddr != "" {
if envPort := os.Getenv("REDIS_PORT"); envPort != "" {
addresses := strings.Split(envAddr, ",")
for i, addr := range addresses {
addresses[i] = addr + ":" + envPort
}
redis.Address = addresses
} else {
redis.Address = strings.Split(envAddr, ",")
}
}
if envUser := os.Getenv("REDIS_USERNAME"); envUser != "" {
redis.Username = envUser
}
if envPass := os.Getenv("REDIS_PASSWORD"); envPass != "" {
redis.Password = envPass
}
}