refactor: all module update.

This commit is contained in:
Gordon 2024-04-09 22:08:00 +08:00
parent af5c358212
commit a7d43bb186
41 changed files with 450 additions and 476 deletions

View File

@ -80,7 +80,7 @@ func Start(ctx context.Context, index int, config *Config) error {
var client discovery.SvcDiscoveryRegistry
// Determine whether zk is passed according to whether it is a clustered deployment
client, err = kdisc.NewDiscoveryRegister(&config.ZookeeperConfig)
client, err = kdisc.NewDiscoveryRegister(&config.ZookeeperConfig, &config.Share)
if err != nil {
return errs.WrapMsg(err, "failed to register discovery service")
}

View File

@ -49,6 +49,7 @@ func (s *Server) Start(ctx context.Context, index int, conf *Config) error {
conf.MsgGateway.RPC.RegisterIP,
conf.MsgGateway.RPC.Ports, index,
conf.Share.RpcRegisterName.MessageGateway,
&conf.Share,
conf,
s.InitServer,
)

View File

@ -75,7 +75,7 @@ func Start(ctx context.Context, index int, config *Config) error {
if err != nil {
return err
}
client, err := kdisc.NewDiscoveryRegister(&config.ZookeeperConfig)
client, err := kdisc.NewDiscoveryRegister(&config.ZookeeperConfig, &config.Share)
if err != nil {
return err
}
@ -142,7 +142,7 @@ func (m *MsgTransfer) Start(index int, config *Config) error {
proreg.MustRegister(
collectors.NewGoCollector(),
)
proreg.MustRegister(prommetrics.GetGrpcCusMetrics("Transfer", &config.ZookeeperConfig)...)
proreg.MustRegister(prommetrics.GetGrpcCusMetrics("Transfer", &config.Share)...)
http.Handle("/metrics", promhttp.HandlerFor(proreg, promhttp.HandlerOpts{Registry: proreg}))
err := http.ListenAndServe(fmt.Sprintf(":%d", prometheusPort), nil)
if err != nil && err != http.ErrServerClosed {

View File

@ -44,6 +44,7 @@ type Config struct {
NotificationConfig config.Notification
Share config.Share
WebhooksConfig config.Webhooks
LocalCacheConfig config.LocalCache
}
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
@ -65,8 +66,8 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
client,
offlinePusher,
database,
rpccache.NewGroupLocalCache(groupRpcClient, rdb),
rpccache.NewConversationLocalCache(conversationRpcClient, rdb),
rpccache.NewGroupLocalCache(groupRpcClient, &config.LocalCacheConfig, rdb),
rpccache.NewConversationLocalCache(conversationRpcClient, &config.LocalCacheConfig, rdb),
&conversationRpcClient,
&groupRpcClient,
&msgRpcClient,

View File

@ -55,6 +55,7 @@ type Config struct {
ZookeeperConfig config.ZooKeeper
NotificationConfig config.Notification
Share config.Share
LocalCacheConfig config.LocalCache
}
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
@ -78,7 +79,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
user: &userRpcClient,
conversationNotificationSender: notification.NewConversationNotificationSender(&config.NotificationConfig, &msgRpcClient),
groupRpcClient: &groupRpcClient,
conversationDatabase: controller.NewConversationDatabase(conversationDB, cache.NewConversationRedis(rdb, cache.GetDefaultOpt(), conversationDB), mgocli.GetTx()),
conversationDatabase: controller.NewConversationDatabase(conversationDB, cache.NewConversationRedis(rdb, &config.LocalCacheConfig, cache.GetDefaultOpt(), conversationDB), mgocli.GetTx()),
})
return nil
}

View File

@ -56,6 +56,7 @@ type Config struct {
NotificationConfig config.Notification
Share config.Share
WebhooksConfig config.Webhooks
LocalCacheConfig config.LocalCache
}
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
@ -99,12 +100,12 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
friendDatabase: controller.NewFriendDatabase(
friendMongoDB,
friendRequestMongoDB,
cache.NewFriendCacheRedis(rdb, friendMongoDB, cache.GetDefaultOpt()),
cache.NewFriendCacheRedis(rdb, &config.LocalCacheConfig, friendMongoDB, cache.GetDefaultOpt()),
mgocli.GetTx(),
),
blackDatabase: controller.NewBlackDatabase(
blackMongoDB,
cache.NewBlackCacheRedis(rdb, blackMongoDB, cache.GetDefaultOpt()),
cache.NewBlackCacheRedis(rdb, &config.LocalCacheConfig, blackMongoDB, cache.GetDefaultOpt()),
),
userRpcClient: &userRpcClient,
notificationSender: notificationSender,

View File

@ -69,6 +69,9 @@ type Config struct {
NotificationConfig config.Notification
Share config.Share
WebhooksConfig config.Webhooks
LocalCacheConfig config.LocalCache
}
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
@ -96,7 +99,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg)
conversationRpcClient := rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation)
var gs groupServer
database := controller.NewGroupDatabase(rdb, groupDB, groupMemberDB, groupRequestDB, mgocli.GetTx(), grouphash.NewGroupHashFromGroupServer(&gs))
database := controller.NewGroupDatabase(rdb,&config.LocalCacheConfig groupDB, groupMemberDB, groupRequestDB, mgocli.GetTx(), grouphash.NewGroupHashFromGroupServer(&gs))
gs.db = database
gs.user = userRpcClient
gs.notification = notification.NewGroupNotificationSender(database, &msgRpcClient, &userRpcClient, config, func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error) {

View File

@ -59,6 +59,7 @@ type (
NotificationConfig config.Notification
Share config.Share
WebhooksConfig config.Webhooks
LocalCacheConfig config.LocalCache
}
)
@ -95,10 +96,10 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
Conversation: &conversationClient,
MsgDatabase: msgDatabase,
RegisterCenter: client,
UserLocalCache: rpccache.NewUserLocalCache(userRpcClient, rdb),
GroupLocalCache: rpccache.NewGroupLocalCache(groupRpcClient, rdb),
ConversationLocalCache: rpccache.NewConversationLocalCache(conversationClient, rdb),
FriendLocalCache: rpccache.NewFriendLocalCache(friendRpcClient, rdb),
UserLocalCache: rpccache.NewUserLocalCache(userRpcClient, &config.LocalCacheConfig, rdb),
GroupLocalCache: rpccache.NewGroupLocalCache(groupRpcClient, &config.LocalCacheConfig, rdb),
ConversationLocalCache: rpccache.NewConversationLocalCache(conversationClient, &config.LocalCacheConfig, rdb),
FriendLocalCache: rpccache.NewFriendLocalCache(friendRpcClient, &config.LocalCacheConfig, rdb),
config: config,
}

View File

@ -63,6 +63,7 @@ type Config struct {
NotificationConfig config.Notification
Share config.Share
WebhooksConfig config.Webhooks
LocalCacheConfig config.LocalCache
}
func Start(ctx context.Context, config *Config, client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
@ -85,7 +86,7 @@ func Start(ctx context.Context, config *Config, client registry.SvcDiscoveryRegi
if err != nil {
return err
}
cache := cache.NewUserCacheRedis(rdb, userDB, cache.GetDefaultOpt())
cache := cache.NewUserCacheRedis(rdb, &config.LocalCacheConfig, userDB, cache.GetDefaultOpt())
userMongoDB := mgo.NewUserMongoDriver(mgocli.GetDB())
database := controller.NewUserDatabase(userDB, cache, mgocli.GetTx(), userMongoDB)
friendRpcClient := rpcclient.NewFriendRpcClient(client, config.Share.RpcRegisterName.Friend)

View File

@ -15,21 +15,11 @@
package tools
import (
"flag"
"fmt"
"math/rand"
"os"
"sync"
"testing"
"time"
"github.com/openimsdk/tools/errs"
"github.com/stretchr/testify/assert"
"gopkg.in/yaml.v3"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/redis/go-redis/v9"
"github.com/robfig/cron/v3"
"github.com/stretchr/testify/assert"
)
func TestDisLock(t *testing.T) {
@ -50,74 +40,74 @@ func TestDisLock(t *testing.T) {
assert.Equal(t, true, netlock(rdb, "cron-2", 2*time.Second))
}
func TestCronWrapFunc(t *testing.T) {
rdb := redis.NewClient(&redis.Options{})
defer rdb.Close()
once := sync.Once{}
done := make(chan struct{}, 1)
cb := func() {
once.Do(func() {
close(done)
})
}
start := time.Now()
key := fmt.Sprintf("cron-%v", rand.Int31())
crontab := cron.New(cron.WithSeconds())
crontab.AddFunc("*/1 * * * * *", cronWrapFunc(config.NewGlobalConfig(), rdb, key, cb))
crontab.Start()
<-done
dur := time.Since(start)
assert.LessOrEqual(t, dur.Seconds(), float64(2*time.Second))
crontab.Stop()
}
func TestCronWrapFuncWithNetlock(t *testing.T) {
conf, err := initCfg()
if err != nil {
panic(err)
}
conf.EnableCronLocker = true
rdb := redis.NewClient(&redis.Options{})
defer rdb.Close()
done := make(chan string, 10)
crontab := cron.New(cron.WithSeconds())
key := fmt.Sprintf("cron-%v", rand.Int31())
crontab.AddFunc("*/1 * * * * *", cronWrapFunc(conf, rdb, key, func() {
done <- "host1"
}))
crontab.AddFunc("*/1 * * * * *", cronWrapFunc(conf, rdb, key, func() {
done <- "host2"
}))
crontab.Start()
time.Sleep(12 * time.Second)
// the ttl of netlock is 5s, so expected value is 2.
assert.Equal(t, len(done), 2)
crontab.Stop()
}
func initCfg() (*config.GlobalConfig, error) {
const (
defaultCfgPath = "../../../../../config/config.yaml"
)
cfgPath := flag.String("c", defaultCfgPath, "Path to the configuration file")
data, err := os.ReadFile(*cfgPath)
if err != nil {
return nil, errs.WrapMsg(err, "ReadFile unmarshal failed")
}
conf := config.NewGlobalConfig()
err = yaml.Unmarshal(data, &conf)
if err != nil {
return nil, errs.WrapMsg(err, "InitConfig unmarshal failed")
}
return conf, nil
}
//func TestCronWrapFunc(t *testing.T) {
// rdb := redis.NewClient(&redis.Options{})
// defer rdb.Close()
//
// once := sync.Once{}
// done := make(chan struct{}, 1)
// cb := func() {
// once.Do(func() {
// close(done)
// })
// }
//
// start := time.Now()
// key := fmt.Sprintf("cron-%v", rand.Int31())
// crontab := cron.New(cron.WithSeconds())
// crontab.AddFunc("*/1 * * * * *", cronWrapFunc(config.NewGlobalConfig(), rdb, key, cb))
// crontab.Start()
// <-done
//
// dur := time.Since(start)
// assert.LessOrEqual(t, dur.Seconds(), float64(2*time.Second))
// crontab.Stop()
//}
//
//func TestCronWrapFuncWithNetlock(t *testing.T) {
// conf, err := initCfg()
// if err != nil {
// panic(err)
// }
// conf.EnableCronLocker = true
// rdb := redis.NewClient(&redis.Options{})
// defer rdb.Close()
//
// done := make(chan string, 10)
//
// crontab := cron.New(cron.WithSeconds())
//
// key := fmt.Sprintf("cron-%v", rand.Int31())
// crontab.AddFunc("*/1 * * * * *", cronWrapFunc(conf, rdb, key, func() {
// done <- "host1"
// }))
// crontab.AddFunc("*/1 * * * * *", cronWrapFunc(conf, rdb, key, func() {
// done <- "host2"
// }))
// crontab.Start()
//
// time.Sleep(12 * time.Second)
// // the ttl of netlock is 5s, so expected value is 2.
// assert.Equal(t, len(done), 2)
//
// crontab.Stop()
//}
//
//func initCfg() (*config.GlobalConfig, error) {
// const (
// defaultCfgPath = "../../../../../config/config.yaml"
// )
//
// cfgPath := flag.String("c", defaultCfgPath, "Path to the configuration file")
// data, err := os.ReadFile(*cfgPath)
// if err != nil {
// return nil, errs.WrapMsg(err, "ReadFile unmarshal failed")
// }
//
// conf := config.NewGlobalConfig()
// err = yaml.Unmarshal(data, &conf)
// if err != nil {
// return nil, errs.WrapMsg(err, "InitConfig unmarshal failed")
// }
// return conf, nil
//}

View File

@ -72,7 +72,7 @@ func InitMsgTool(ctx context.Context, config *CronTaskConfig) (*MsgTool, error)
if err != nil {
return nil, err
}
discov, err := kdisc.NewDiscoveryRegister(&config.ZookeeperConfig)
discov, err := kdisc.NewDiscoveryRegister(&config.ZookeeperConfig, &config.Share)
if err != nil {
return nil, err
}

View File

@ -26,18 +26,18 @@ import (
type AuthRpcCmd struct {
*RootCmd
ctx context.Context
configMap map[string]StructEnvPrefix
configMap map[string]any
authConfig auth.Config
}
func NewAuthRpcCmd() *AuthRpcCmd {
var authConfig auth.Config
ret := &AuthRpcCmd{authConfig: authConfig}
ret.configMap = map[string]StructEnvPrefix{
OpenIMRPCAuthCfgFileName: {EnvPrefix: authEnvPrefix, ConfigStruct: &authConfig.RpcConfig},
RedisConfigFileName: {EnvPrefix: redisEnvPrefix, ConfigStruct: &authConfig.RedisConfig},
ZookeeperConfigFileName: {EnvPrefix: zoopkeeperEnvPrefix, ConfigStruct: &authConfig.ZookeeperConfig},
ShareFileName: {EnvPrefix: shareEnvPrefix, ConfigStruct: &authConfig.Share},
ret.configMap = map[string]any{
OpenIMRPCAuthCfgFileName: &authConfig.RpcConfig,
RedisConfigFileName: &authConfig.RedisConfig,
ZookeeperConfigFileName: &authConfig.ZookeeperConfig,
ShareFileName: &authConfig.Share,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
@ -55,5 +55,5 @@ func (a *AuthRpcCmd) Exec() error {
func (a *AuthRpcCmd) preRunE() error {
return startrpc.Start(a.ctx, &a.authConfig.ZookeeperConfig, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP,
a.authConfig.RpcConfig.RPC.RegisterIP, a.authConfig.RpcConfig.RPC.Ports,
a.Index(), a.authConfig.Share.RpcRegisterName.Auth, &a.authConfig, auth.Start)
a.Index(), a.authConfig.Share.RpcRegisterName.Auth, &a.authConfig.Share, &a.authConfig, auth.Start)
}

View File

@ -23,6 +23,7 @@ var (
NotificationFileName string
ShareFileName string
WebhooksConfigFileName string
LocalCacheConfigFileName string
KafkaConfigFileName string
RedisConfigFileName string
ZookeeperConfigFileName string
@ -50,6 +51,7 @@ func init() {
NotificationFileName = "notification.yaml"
ShareFileName = "share.yaml"
WebhooksConfigFileName = "webhooks.yml"
LocalCacheConfigFileName = "local-cache.yml"
KafkaConfigFileName = "kafka.yml"
RedisConfigFileName = "redis.yml"
ZookeeperConfigFileName = "zookeeper.yml"

View File

@ -26,20 +26,21 @@ import (
type ConversationRpcCmd struct {
*RootCmd
ctx context.Context
configMap map[string]StructEnvPrefix
configMap map[string]any
conversationConfig conversation.Config
}
func NewConversationRpcCmd() *ConversationRpcCmd {
var conversationConfig conversation.Config
ret := &ConversationRpcCmd{conversationConfig: conversationConfig}
ret.configMap = map[string]StructEnvPrefix{
OpenIMRPCConversationCfgFileName: {EnvPrefix: conversationEnvPrefix, ConfigStruct: &conversationConfig.RpcConfig},
RedisConfigFileName: {EnvPrefix: redisEnvPrefix, ConfigStruct: &conversationConfig.RedisConfig},
ZookeeperConfigFileName: {EnvPrefix: zoopkeeperEnvPrefix, ConfigStruct: &conversationConfig.ZookeeperConfig},
MongodbConfigFileName: {EnvPrefix: mongodbEnvPrefix, ConfigStruct: &conversationConfig.MongodbConfig},
ShareFileName: {EnvPrefix: shareEnvPrefix, ConfigStruct: &conversationConfig.Share},
NotificationFileName: {EnvPrefix: notificationEnvPrefix, ConfigStruct: &conversationConfig.NotificationConfig},
ret.configMap = map[string]any{
OpenIMRPCConversationCfgFileName: &conversationConfig.RpcConfig,
RedisConfigFileName: &conversationConfig.RedisConfig,
ZookeeperConfigFileName: &conversationConfig.ZookeeperConfig,
MongodbConfigFileName: &conversationConfig.MongodbConfig,
ShareFileName: &conversationConfig.Share,
NotificationFileName: &conversationConfig.NotificationConfig,
LocalCacheConfigFileName: &conversationConfig.LocalCacheConfig,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
@ -56,5 +57,5 @@ func (a *ConversationRpcCmd) Exec() error {
func (a *ConversationRpcCmd) preRunE() error {
return startrpc.Start(a.ctx, &a.conversationConfig.ZookeeperConfig, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP,
a.conversationConfig.RpcConfig.RPC.RegisterIP, a.conversationConfig.RpcConfig.RPC.Ports,
a.Index(), a.conversationConfig.Share.RpcRegisterName.Auth, &a.conversationConfig, conversation.Start)
a.Index(), a.conversationConfig.Share.RpcRegisterName.Auth, &a.conversationConfig.Share, &a.conversationConfig, conversation.Start)
}

View File

@ -26,21 +26,22 @@ import (
type FriendRpcCmd struct {
*RootCmd
ctx context.Context
configMap map[string]StructEnvPrefix
configMap map[string]any
friendConfig friend.Config
}
func NewFriendRpcCmd() *FriendRpcCmd {
var friendConfig friend.Config
ret := &FriendRpcCmd{friendConfig: friendConfig}
ret.configMap = map[string]StructEnvPrefix{
OpenIMRPCFriendCfgFileName: {EnvPrefix: friendEnvPrefix, ConfigStruct: &friendConfig.RpcConfig},
RedisConfigFileName: {EnvPrefix: redisEnvPrefix, ConfigStruct: &friendConfig.RedisConfig},
ZookeeperConfigFileName: {EnvPrefix: zoopkeeperEnvPrefix, ConfigStruct: &friendConfig.ZookeeperConfig},
MongodbConfigFileName: {EnvPrefix: mongodbEnvPrefix, ConfigStruct: &friendConfig.MongodbConfig},
ShareFileName: {EnvPrefix: shareEnvPrefix, ConfigStruct: &friendConfig.Share},
NotificationFileName: {EnvPrefix: notificationEnvPrefix, ConfigStruct: &friendConfig.NotificationConfig},
WebhooksConfigFileName: {EnvPrefix: webhooksEnvPrefix, ConfigStruct: &friendConfig.WebhooksConfig},
ret.configMap = map[string]any{
OpenIMRPCFriendCfgFileName: &friendConfig.RpcConfig,
RedisConfigFileName: &friendConfig.RedisConfig,
ZookeeperConfigFileName: &friendConfig.ZookeeperConfig,
MongodbConfigFileName: &friendConfig.MongodbConfig,
ShareFileName: &friendConfig.Share,
NotificationFileName: &friendConfig.NotificationConfig,
WebhooksConfigFileName: &friendConfig.WebhooksConfig,
LocalCacheConfigFileName: &friendConfig.LocalCacheConfig,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
@ -57,5 +58,5 @@ func (a *FriendRpcCmd) Exec() error {
func (a *FriendRpcCmd) preRunE() error {
return startrpc.Start(a.ctx, &a.friendConfig.ZookeeperConfig, &a.friendConfig.RpcConfig.Prometheus, a.friendConfig.RpcConfig.RPC.ListenIP,
a.friendConfig.RpcConfig.RPC.RegisterIP, a.friendConfig.RpcConfig.RPC.Ports,
a.Index(), a.friendConfig.Share.RpcRegisterName.Auth, &a.friendConfig, friend.Start)
a.Index(), a.friendConfig.Share.RpcRegisterName.Auth, &a.friendConfig.Share, &a.friendConfig, friend.Start)
}

View File

@ -26,21 +26,22 @@ import (
type GroupRpcCmd struct {
*RootCmd
ctx context.Context
configMap map[string]StructEnvPrefix
configMap map[string]any
groupConfig group.Config
}
func NewGroupRpcCmd() *GroupRpcCmd {
var groupConfig group.Config
ret := &GroupRpcCmd{groupConfig: groupConfig}
ret.configMap = map[string]StructEnvPrefix{
OpenIMRPCGroupCfgFileName: {EnvPrefix: groupEnvPrefix, ConfigStruct: &groupConfig.RpcConfig},
RedisConfigFileName: {EnvPrefix: redisEnvPrefix, ConfigStruct: &groupConfig.RedisConfig},
ZookeeperConfigFileName: {EnvPrefix: zoopkeeperEnvPrefix, ConfigStruct: &groupConfig.ZookeeperConfig},
MongodbConfigFileName: {EnvPrefix: mongodbEnvPrefix, ConfigStruct: &groupConfig.MongodbConfig},
ShareFileName: {EnvPrefix: shareEnvPrefix, ConfigStruct: &groupConfig.Share},
NotificationFileName: {EnvPrefix: notificationEnvPrefix, ConfigStruct: &groupConfig.NotificationConfig},
WebhooksConfigFileName: {EnvPrefix: webhooksEnvPrefix, ConfigStruct: &groupConfig.WebhooksConfig},
ret.configMap = map[string]any{
OpenIMRPCGroupCfgFileName: &groupConfig.RpcConfig,
RedisConfigFileName: &groupConfig.RedisConfig,
ZookeeperConfigFileName: &groupConfig.ZookeeperConfig,
MongodbConfigFileName: &groupConfig.MongodbConfig,
ShareFileName: &groupConfig.Share,
NotificationFileName: &groupConfig.NotificationConfig,
WebhooksConfigFileName: &groupConfig.WebhooksConfig,
LocalCacheConfigFileName: &groupConfig.LocalCacheConfig,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
@ -57,5 +58,5 @@ func (a *GroupRpcCmd) Exec() error {
func (a *GroupRpcCmd) preRunE() error {
return startrpc.Start(a.ctx, &a.groupConfig.ZookeeperConfig, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP,
a.groupConfig.RpcConfig.RPC.RegisterIP, a.groupConfig.RpcConfig.RPC.Ports,
a.Index(), a.groupConfig.Share.RpcRegisterName.Auth, &a.groupConfig, group.Start)
a.Index(), a.groupConfig.Share.RpcRegisterName.Auth, &a.groupConfig.Share, &a.groupConfig, group.Start)
}

View File

@ -26,22 +26,23 @@ import (
type MsgRpcCmd struct {
*RootCmd
ctx context.Context
configMap map[string]StructEnvPrefix
configMap map[string]any
msgConfig msg.Config
}
func NewMsgRpcCmd() *MsgRpcCmd {
var msgConfig msg.Config
ret := &MsgRpcCmd{msgConfig: msgConfig}
ret.configMap = map[string]StructEnvPrefix{
OpenIMRPCMsgCfgFileName: {EnvPrefix: msgEnvPrefix, ConfigStruct: &msgConfig.RpcConfig},
RedisConfigFileName: {EnvPrefix: redisEnvPrefix, ConfigStruct: &msgConfig.RedisConfig},
ZookeeperConfigFileName: {EnvPrefix: zoopkeeperEnvPrefix, ConfigStruct: &msgConfig.ZookeeperConfig},
MongodbConfigFileName: {EnvPrefix: mongodbEnvPrefix, ConfigStruct: &msgConfig.MongodbConfig},
KafkaConfigFileName: {EnvPrefix: kafkaEnvPrefix, ConfigStruct: &msgConfig.KafkaConfig},
ShareFileName: {EnvPrefix: shareEnvPrefix, ConfigStruct: &msgConfig.Share},
NotificationFileName: {EnvPrefix: notificationEnvPrefix, ConfigStruct: &msgConfig.NotificationConfig},
WebhooksConfigFileName: {EnvPrefix: webhooksEnvPrefix, ConfigStruct: &msgConfig.WebhooksConfig},
ret.configMap = map[string]any{
OpenIMRPCMsgCfgFileName: &msgConfig.RpcConfig,
RedisConfigFileName: &msgConfig.RedisConfig,
ZookeeperConfigFileName: &msgConfig.ZookeeperConfig,
MongodbConfigFileName: &msgConfig.MongodbConfig,
KafkaConfigFileName: &msgConfig.KafkaConfig,
ShareFileName: &msgConfig.Share,
NotificationFileName: &msgConfig.NotificationConfig,
WebhooksConfigFileName: &msgConfig.WebhooksConfig,
LocalCacheConfigFileName: &msgConfig.LocalCacheConfig,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
@ -58,5 +59,5 @@ func (a *MsgRpcCmd) Exec() error {
func (a *MsgRpcCmd) preRunE() error {
return startrpc.Start(a.ctx, &a.msgConfig.ZookeeperConfig, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP,
a.msgConfig.RpcConfig.RPC.RegisterIP, a.msgConfig.RpcConfig.RPC.Ports,
a.Index(), a.msgConfig.Share.RpcRegisterName.Auth, &a.msgConfig, msg.Start)
a.Index(), a.msgConfig.Share.RpcRegisterName.Auth, &a.msgConfig.Share, &a.msgConfig, msg.Start)
}

View File

@ -27,19 +27,19 @@ import (
type MsgGatewayCmd struct {
*RootCmd
ctx context.Context
configMap map[string]StructEnvPrefix
configMap map[string]any
msgGatewayConfig msggateway.Config
}
func NewMsgGatewayCmd() *MsgGatewayCmd {
var msgGatewayConfig msggateway.Config
ret := &MsgGatewayCmd{msgGatewayConfig: msgGatewayConfig}
ret.configMap = map[string]StructEnvPrefix{
OpenIMMsgGatewayCfgFileName: {EnvPrefix: msgGatewayEnvPrefix, ConfigStruct: &msgGatewayConfig.MsgGateway},
RedisConfigFileName: {EnvPrefix: redisEnvPrefix, ConfigStruct: &msgGatewayConfig.RedisConfig},
ZookeeperConfigFileName: {EnvPrefix: zoopkeeperEnvPrefix, ConfigStruct: &msgGatewayConfig.ZookeeperConfig},
ShareFileName: {EnvPrefix: shareEnvPrefix, ConfigStruct: &msgGatewayConfig.Share},
WebhooksConfigFileName: {EnvPrefix: webhooksEnvPrefix, ConfigStruct: &msgGatewayConfig.WebhooksConfig},
ret.configMap = map[string]any{
OpenIMMsgGatewayCfgFileName: &msgGatewayConfig.MsgGateway,
RedisConfigFileName: &msgGatewayConfig.RedisConfig,
ZookeeperConfigFileName: &msgGatewayConfig.ZookeeperConfig,
ShareFileName: &msgGatewayConfig.Share,
WebhooksConfigFileName: &msgGatewayConfig.WebhooksConfig,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)

View File

@ -25,21 +25,21 @@ import (
type MsgTransferCmd struct {
*RootCmd
ctx context.Context
configMap map[string]StructEnvPrefix
configMap map[string]any
msgTransferConfig msgtransfer.Config
}
func NewMsgTransferCmd() *MsgTransferCmd {
var msgTransferConfig msgtransfer.Config
ret := &MsgTransferCmd{msgTransferConfig: msgTransferConfig}
ret.configMap = map[string]StructEnvPrefix{
OpenIMMsgTransferCfgFileName: {EnvPrefix: msgTransferEnvPrefix, ConfigStruct: &msgTransferConfig.MsgTransfer},
RedisConfigFileName: {EnvPrefix: redisEnvPrefix, ConfigStruct: &msgTransferConfig.RedisConfig},
MongodbConfigFileName: {EnvPrefix: mongodbEnvPrefix, ConfigStruct: &msgTransferConfig.MongodbConfig},
KafkaConfigFileName: {EnvPrefix: kafkaEnvPrefix, ConfigStruct: &msgTransferConfig.KafkaConfig},
ZookeeperConfigFileName: {EnvPrefix: zoopkeeperEnvPrefix, ConfigStruct: &msgTransferConfig.ZookeeperConfig},
ShareFileName: {EnvPrefix: shareEnvPrefix, ConfigStruct: &msgTransferConfig.Share},
WebhooksConfigFileName: {EnvPrefix: webhooksEnvPrefix, ConfigStruct: &msgTransferConfig.WebhooksConfig},
ret.configMap = map[string]any{
OpenIMMsgTransferCfgFileName: &msgTransferConfig.MsgTransfer,
RedisConfigFileName: &msgTransferConfig.RedisConfig,
MongodbConfigFileName: &msgTransferConfig.MongodbConfig,
KafkaConfigFileName: &msgTransferConfig.KafkaConfig,
ZookeeperConfigFileName: &msgTransferConfig.ZookeeperConfig,
ShareFileName: &msgTransferConfig.Share,
WebhooksConfigFileName: &msgTransferConfig.WebhooksConfig,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)

View File

@ -138,7 +138,7 @@ func NewSeqCmd() *SeqCmd {
func (s *SeqCmd) GetSeqCmd() *cobra.Command {
s.Command.Run = func(cmdLines *cobra.Command, args []string) {
_, err := tools.InitMsgTool(context.Background(), s.MsgTool.config)
_, err := tools.InitMsgTool(context.Background(), nil)
if err != nil {
program.ExitWithError(err)
}

View File

@ -26,22 +26,23 @@ import (
type PushRpcCmd struct {
*RootCmd
ctx context.Context
configMap map[string]StructEnvPrefix
configMap map[string]any
pushConfig push.Config
}
func NewPushRpcCmd() *PushRpcCmd {
var pushConfig push.Config
ret := &PushRpcCmd{pushConfig: pushConfig}
ret.configMap = map[string]StructEnvPrefix{
OpenIMPushCfgFileName: {EnvPrefix: pushEnvPrefix, ConfigStruct: &pushConfig.RpcConfig},
RedisConfigFileName: {EnvPrefix: redisEnvPrefix, ConfigStruct: &pushConfig.RedisConfig},
ZookeeperConfigFileName: {EnvPrefix: zoopkeeperEnvPrefix, ConfigStruct: &pushConfig.ZookeeperConfig},
MongodbConfigFileName: {EnvPrefix: mongodbEnvPrefix, ConfigStruct: &pushConfig.MongodbConfig},
KafkaConfigFileName: {EnvPrefix: kafkaEnvPrefix, ConfigStruct: &pushConfig.KafkaConfig},
ShareFileName: {EnvPrefix: shareEnvPrefix, ConfigStruct: &pushConfig.Share},
NotificationFileName: {EnvPrefix: notificationEnvPrefix, ConfigStruct: &pushConfig.NotificationConfig},
WebhooksConfigFileName: {EnvPrefix: webhooksEnvPrefix, ConfigStruct: &pushConfig.WebhooksConfig},
ret.configMap = map[string]any{
OpenIMPushCfgFileName: &pushConfig.RpcConfig,
RedisConfigFileName: &pushConfig.RedisConfig,
ZookeeperConfigFileName: &pushConfig.ZookeeperConfig,
MongodbConfigFileName: &pushConfig.MongodbConfig,
KafkaConfigFileName: &pushConfig.KafkaConfig,
ShareFileName: &pushConfig.Share,
NotificationFileName: &pushConfig.NotificationConfig,
WebhooksConfigFileName: &pushConfig.WebhooksConfig,
LocalCacheConfigFileName: &pushConfig.LocalCacheConfig,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
@ -58,5 +59,5 @@ func (a *PushRpcCmd) Exec() error {
func (a *PushRpcCmd) preRunE() error {
return startrpc.Start(a.ctx, &a.pushConfig.ZookeeperConfig, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP,
a.pushConfig.RpcConfig.RPC.RegisterIP, a.pushConfig.RpcConfig.RPC.Ports,
a.Index(), a.pushConfig.Share.RpcRegisterName.Auth, &a.pushConfig, push.Start)
a.Index(), a.pushConfig.Share.RpcRegisterName.Auth, &a.pushConfig.Share, &a.pushConfig, push.Start)
}

View File

@ -43,11 +43,10 @@ func (r *RootCmd) Port() int {
type CmdOpts struct {
loggerPrefixName string
configMap map[string]StructEnvPrefix
configMap map[string]any
}
type StructEnvPrefix struct {
EnvPrefix string
ConfigStruct any
EnvPrefix string
}
func WithCronTaskLogName() func(*CmdOpts) {
@ -61,7 +60,7 @@ func WithLogName(logName string) func(*CmdOpts) {
opts.loggerPrefixName = logName
}
}
func WithConfigMap(configMap map[string]StructEnvPrefix) func(*CmdOpts) {
func WithConfigMap(configMap map[string]any) func(*CmdOpts) {
return func(opts *CmdOpts) {
opts.configMap = configMap
}
@ -102,16 +101,16 @@ func (r *RootCmd) initializeConfiguration(cmd *cobra.Command, opts *CmdOpts) err
}
// Load common configuration file
//opts.configMap[ShareFileName] = StructEnvPrefix{EnvPrefix: shareEnvPrefix, ConfigStruct: &r.share}
for configFileName, structEnvPrefix := range opts.configMap {
for configFileName, configStruct := range opts.configMap {
err := config2.LoadConfig(filepath.Join(configDirectory, configFileName),
structEnvPrefix.EnvPrefix, structEnvPrefix.ConfigStruct)
ConfigEnvPrefixMap[configFileName], configStruct)
if err != nil {
return err
}
}
// Load common log configuration file
return config2.LoadConfig(filepath.Join(configDirectory, LogConfigFileName),
logEnvPrefix, &r.log)
ConfigEnvPrefixMap[LogConfigFileName], &r.log)
}
func (r *RootCmd) applyOptions(opts ...func(*CmdOpts)) *CmdOpts {

View File

@ -26,21 +26,21 @@ import (
type ThirdRpcCmd struct {
*RootCmd
ctx context.Context
configMap map[string]StructEnvPrefix
configMap map[string]any
thirdConfig third.Config
}
func NewThirdRpcCmd() *ThirdRpcCmd {
var thirdConfig third.Config
ret := &ThirdRpcCmd{thirdConfig: thirdConfig}
ret.configMap = map[string]StructEnvPrefix{
OpenIMRPCThirdCfgFileName: {EnvPrefix: thridEnvPrefix, ConfigStruct: &thirdConfig.RpcConfig},
RedisConfigFileName: {EnvPrefix: redisEnvPrefix, ConfigStruct: &thirdConfig.RedisConfig},
ZookeeperConfigFileName: {EnvPrefix: zoopkeeperEnvPrefix, ConfigStruct: &thirdConfig.ZookeeperConfig},
MongodbConfigFileName: {EnvPrefix: mongodbEnvPrefix, ConfigStruct: &thirdConfig.MongodbConfig},
ShareFileName: {EnvPrefix: shareEnvPrefix, ConfigStruct: &thirdConfig.Share},
NotificationFileName: {EnvPrefix: notificationEnvPrefix, ConfigStruct: &thirdConfig.NotificationConfig},
MinioConfigFileName: {EnvPrefix: minioEnvPrefix, ConfigStruct: &thirdConfig.MinioConfig},
ret.configMap = map[string]any{
OpenIMRPCThirdCfgFileName: &thirdConfig.RpcConfig,
RedisConfigFileName: &thirdConfig.RedisConfig,
ZookeeperConfigFileName: &thirdConfig.ZookeeperConfig,
MongodbConfigFileName: &thirdConfig.MongodbConfig,
ShareFileName: &thirdConfig.Share,
NotificationFileName: &thirdConfig.NotificationConfig,
MinioConfigFileName: &thirdConfig.MinioConfig,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
@ -57,5 +57,5 @@ func (a *ThirdRpcCmd) Exec() error {
func (a *ThirdRpcCmd) preRunE() error {
return startrpc.Start(a.ctx, &a.thirdConfig.ZookeeperConfig, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP,
a.thirdConfig.RpcConfig.RPC.RegisterIP, a.thirdConfig.RpcConfig.RPC.Ports,
a.Index(), a.thirdConfig.Share.RpcRegisterName.Auth, &a.thirdConfig, third.Start)
a.Index(), a.thirdConfig.Share.RpcRegisterName.Auth, &a.thirdConfig.Share, &a.thirdConfig, third.Start)
}

View File

@ -26,22 +26,23 @@ import (
type UserRpcCmd struct {
*RootCmd
ctx context.Context
configMap map[string]StructEnvPrefix
configMap map[string]any
userConfig user.Config
}
func NewUserRpcCmd() *UserRpcCmd {
var userConfig user.Config
ret := &UserRpcCmd{userConfig: userConfig}
ret.configMap = map[string]StructEnvPrefix{
OpenIMRPCUserCfgFileName: {EnvPrefix: userEnvPrefix, ConfigStruct: &userConfig.RpcConfig},
RedisConfigFileName: {EnvPrefix: redisEnvPrefix, ConfigStruct: &userConfig.RedisConfig},
ZookeeperConfigFileName: {EnvPrefix: zoopkeeperEnvPrefix, ConfigStruct: &userConfig.ZookeeperConfig},
MongodbConfigFileName: {EnvPrefix: mongodbEnvPrefix, ConfigStruct: &userConfig.MongodbConfig},
KafkaConfigFileName: {EnvPrefix: kafkaEnvPrefix, ConfigStruct: &userConfig.KafkaConfig},
ShareFileName: {EnvPrefix: shareEnvPrefix, ConfigStruct: &userConfig.Share},
NotificationFileName: {EnvPrefix: notificationEnvPrefix, ConfigStruct: &userConfig.NotificationConfig},
WebhooksConfigFileName: {EnvPrefix: webhooksEnvPrefix, ConfigStruct: &userConfig.WebhooksConfig},
ret.configMap = map[string]any{
OpenIMRPCUserCfgFileName: &userConfig.RpcConfig,
RedisConfigFileName: &userConfig.RedisConfig,
ZookeeperConfigFileName: &userConfig.ZookeeperConfig,
MongodbConfigFileName: &userConfig.MongodbConfig,
KafkaConfigFileName: &userConfig.KafkaConfig,
ShareFileName: &userConfig.Share,
NotificationFileName: &userConfig.NotificationConfig,
WebhooksConfigFileName: &userConfig.WebhooksConfig,
LocalCacheConfigFileName: &userConfig.LocalCacheConfig,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
@ -58,5 +59,5 @@ func (a *UserRpcCmd) Exec() error {
func (a *UserRpcCmd) preRunE() error {
return startrpc.Start(a.ctx, &a.userConfig.ZookeeperConfig, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP,
a.userConfig.RpcConfig.RPC.RegisterIP, a.userConfig.RpcConfig.RPC.Ports,
a.Index(), a.userConfig.Share.RpcRegisterName.Auth, &a.userConfig, user.Start)
a.Index(), a.userConfig.Share.RpcRegisterName.Auth, &a.userConfig.Share, &a.userConfig, user.Start)
}

View File

@ -48,10 +48,10 @@ type BlackCacheRedis struct {
blackDB relationtb.BlackModelInterface
}
func NewBlackCacheRedis(rdb redis.UniversalClient, blackDB relationtb.BlackModelInterface, options rockscache.Options) BlackCache {
func NewBlackCacheRedis(rdb redis.UniversalClient, localCache *config.LocalCache, blackDB relationtb.BlackModelInterface, options rockscache.Options) BlackCache {
rcClient := rockscache.NewClient(rdb, options)
mc := NewMetaCacheRedis(rcClient)
b := config.Config.LocalCache.Friend
b := localCache.Friend
log.ZDebug(context.Background(), "black local cache init", "Topic", b.Topic, "SlotNum", b.SlotNum, "SlotSize", b.SlotSize, "enable", b.Enable())
mc.SetTopic(b.Topic)
mc.SetRawRedisClient(rdb)

View File

@ -83,10 +83,10 @@ type ConversationCache interface {
DelConversationNotReceiveMessageUserIDs(conversationIDs ...string) ConversationCache
}
func NewConversationRedis(rdb redis.UniversalClient, opts rockscache.Options, db relationtb.ConversationModelInterface) ConversationCache {
func NewConversationRedis(rdb redis.UniversalClient, localCache *config.LocalCache, opts rockscache.Options, db relationtb.ConversationModelInterface) ConversationCache {
rcClient := rockscache.NewClient(rdb, opts)
mc := NewMetaCacheRedis(rcClient)
c := config.Config.LocalCache.Conversation
c := localCache.Conversation
log.ZDebug(context.Background(), "black local cache init", "Topic", c.Topic, "SlotNum", c.SlotNum, "SlotSize", c.SlotSize, "enable", c.Enable())
mc.SetTopic(c.Topic)
mc.SetRawRedisClient(rdb)

View File

@ -58,11 +58,11 @@ type FriendCacheRedis struct {
}
// NewFriendCacheRedis creates a new instance of FriendCacheRedis.
func NewFriendCacheRedis(rdb redis.UniversalClient, friendDB relationtb.FriendModelInterface,
func NewFriendCacheRedis(rdb redis.UniversalClient, localCache *config.LocalCache, friendDB relationtb.FriendModelInterface,
options rockscache.Options) FriendCache {
rcClient := rockscache.NewClient(rdb, options)
mc := NewMetaCacheRedis(rcClient)
f := config.Config.LocalCache.Friend
f := localCache.Friend
log.ZDebug(context.Background(), "friend local cache init", "Topic", f.Topic, "SlotNum", f.SlotNum, "SlotSize", f.SlotSize, "enable", f.Enable())
mc.SetTopic(f.Topic)
mc.SetRawRedisClient(rdb)

View File

@ -87,6 +87,7 @@ type GroupCacheRedis struct {
func NewGroupCacheRedis(
rdb redis.UniversalClient,
localCache *config.LocalCache,
groupDB relationtb.GroupModelInterface,
groupMemberDB relationtb.GroupMemberModelInterface,
groupRequestDB relationtb.GroupRequestModelInterface,
@ -95,7 +96,7 @@ func NewGroupCacheRedis(
) GroupCache {
rcClient := rockscache.NewClient(rdb, opts)
mc := NewMetaCacheRedis(rcClient)
g := config.Config.LocalCache.Group
g := localCache.Group
mc.SetTopic(g.Topic)
log.ZDebug(context.Background(), "group local cache init", "Topic", g.Topic, "SlotNum", g.SlotNum, "SlotSize", g.SlotSize, "enable", g.Enable())
mc.SetRawRedisClient(rdb)

View File

@ -61,10 +61,10 @@ type UserCacheRedis struct {
rcClient *rockscache.Client
}
func NewUserCacheRedis(rdb redis.UniversalClient, userDB relationtb.UserModelInterface, options rockscache.Options) UserCache {
func NewUserCacheRedis(rdb redis.UniversalClient, localCache *config.LocalCache, userDB relationtb.UserModelInterface, options rockscache.Options) UserCache {
rcClient := rockscache.NewClient(rdb, options)
mc := NewMetaCacheRedis(rcClient)
u := config.Config.LocalCache.User
u := localCache.User
log.ZDebug(context.Background(), "user local cache init", "Topic", u.Topic, "SlotNum", u.SlotNum, "SlotSize", u.SlotSize, "enable", u.Enable())
mc.SetTopic(u.Topic)
mc.SetRawRedisClient(rdb)

View File

@ -16,6 +16,7 @@ package controller
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"time"
"github.com/dtm-labs/rockscache"
@ -107,6 +108,7 @@ type GroupDatabase interface {
func NewGroupDatabase(
rdb redis.UniversalClient,
localCache *config.LocalCache,
groupDB relationtb.GroupModelInterface,
groupMemberDB relationtb.GroupMemberModelInterface,
groupRequestDB relationtb.GroupRequestModelInterface,
@ -121,7 +123,7 @@ func NewGroupDatabase(
groupMemberDB: groupMemberDB,
groupRequestDB: groupRequestDB,
ctxTx: ctxTx,
cache: cache.NewGroupCacheRedis(rdb, groupDB, groupMemberDB, groupRequestDB, groupHash, rcOptions),
cache: cache.NewGroupCacheRedis(rdb, localCache, groupDB, groupMemberDB, groupRequestDB, groupHash, rcOptions),
}
}

View File

@ -14,161 +14,161 @@
package direct
import (
"context"
"fmt"
config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/errs"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
type ServiceAddresses map[string][]int
func getServiceAddresses(rpcRegisterName *config2.RpcRegisterName,
rpcPort *config2.RpcPort, longConnSvrPort []int) ServiceAddresses {
return ServiceAddresses{
rpcRegisterName.OpenImUserName: rpcPort.OpenImUserPort,
rpcRegisterName.OpenImFriendName: rpcPort.OpenImFriendPort,
rpcRegisterName.OpenImMsgName: rpcPort.OpenImMessagePort,
rpcRegisterName.OpenImMessageGatewayName: longConnSvrPort,
rpcRegisterName.OpenImGroupName: rpcPort.OpenImGroupPort,
rpcRegisterName.OpenImAuthName: rpcPort.OpenImAuthPort,
rpcRegisterName.OpenImPushName: rpcPort.OpenImPushPort,
rpcRegisterName.OpenImConversationName: rpcPort.OpenImConversationPort,
rpcRegisterName.OpenImThirdName: rpcPort.OpenImThirdPort,
}
}
type ConnDirect struct {
additionalOpts []grpc.DialOption
currentServiceAddress string
conns map[string][]*grpc.ClientConn
resolverDirect *ResolverDirect
config *config2.GlobalConfig
}
func (cd *ConnDirect) GetClientLocalConns() map[string][]*grpc.ClientConn {
return nil
}
func (cd *ConnDirect) GetUserIdHashGatewayHost(ctx context.Context, userId string) (string, error) {
return "", nil
}
func (cd *ConnDirect) Register(serviceName, host string, port int, opts ...grpc.DialOption) error {
return nil
}
func (cd *ConnDirect) UnRegister() error {
return nil
}
func (cd *ConnDirect) CreateRpcRootNodes(serviceNames []string) error {
return nil
}
func (cd *ConnDirect) RegisterConf2Registry(key string, conf []byte) error {
return nil
}
func (cd *ConnDirect) GetConfFromRegistry(key string) ([]byte, error) {
return nil, nil
}
func (cd *ConnDirect) Close() {
}
func NewConnDirect(config *config2.GlobalConfig) (*ConnDirect, error) {
return &ConnDirect{
conns: make(map[string][]*grpc.ClientConn),
resolverDirect: NewResolverDirect(),
config: config,
}, nil
}
func (cd *ConnDirect) GetConns(ctx context.Context,
serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) {
if conns, exists := cd.conns[serviceName]; exists {
return conns, nil
}
ports := getServiceAddresses(&cd.config.RpcRegisterName,
&cd.config.RpcPort, cd.config.LongConnSvr.OpenImMessageGatewayPort)[serviceName]
var connections []*grpc.ClientConn
for _, port := range ports {
conn, err := cd.dialServiceWithoutResolver(ctx, fmt.Sprintf(cd.config.Rpc.ListenIP+":%d", port), append(cd.additionalOpts, opts...)...)
if err != nil {
return nil, errs.Wrap(fmt.Errorf("connect to port %d failed,serviceName %s, IP %s", port, serviceName, cd.config.Rpc.ListenIP))
}
connections = append(connections, conn)
}
if len(connections) == 0 {
return nil, errs.New("no connections found for service", "serviceName", serviceName).Wrap()
}
return connections, nil
}
func (cd *ConnDirect) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
// Get service addresses
addresses := getServiceAddresses(&cd.config.RpcRegisterName,
&cd.config.RpcPort, cd.config.LongConnSvr.OpenImMessageGatewayPort)
address, ok := addresses[serviceName]
if !ok {
return nil, errs.New("unknown service name", "serviceName", serviceName).Wrap()
}
var result string
for _, addr := range address {
if result != "" {
result = result + "," + fmt.Sprintf(cd.config.Rpc.ListenIP+":%d", addr)
} else {
result = fmt.Sprintf(cd.config.Rpc.ListenIP+":%d", addr)
}
}
// Try to dial a new connection
conn, err := cd.dialService(ctx, result, append(cd.additionalOpts, opts...)...)
if err != nil {
return nil, errs.WrapMsg(err, "address", result)
}
// Store the new connection
cd.conns[serviceName] = append(cd.conns[serviceName], conn)
return conn, nil
}
func (cd *ConnDirect) GetSelfConnTarget() string {
return cd.currentServiceAddress
}
func (cd *ConnDirect) AddOption(opts ...grpc.DialOption) {
cd.additionalOpts = append(cd.additionalOpts, opts...)
}
func (cd *ConnDirect) CloseConn(conn *grpc.ClientConn) {
if conn != nil {
conn.Close()
}
}
func (cd *ConnDirect) dialService(ctx context.Context, address string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
options := append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := grpc.DialContext(ctx, cd.resolverDirect.Scheme()+":///"+address, options...)
if err != nil {
return nil, errs.WrapMsg(err, "address", address)
}
return conn, nil
}
func (cd *ConnDirect) dialServiceWithoutResolver(ctx context.Context, address string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
options := append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := grpc.DialContext(ctx, address, options...)
if err != nil {
return nil, errs.Wrap(err)
}
return conn, nil
}
//import (
// "context"
// "fmt"
//
// config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config"
// "github.com/openimsdk/tools/errs"
// "google.golang.org/grpc"
// "google.golang.org/grpc/credentials/insecure"
//)
//
//type ServiceAddresses map[string][]int
//
//func getServiceAddresses(rpcRegisterName *config2.RpcRegisterName,
// rpcPort *config2.RpcPort, longConnSvrPort []int) ServiceAddresses {
// return ServiceAddresses{
// rpcRegisterName.OpenImUserName: rpcPort.OpenImUserPort,
// rpcRegisterName.OpenImFriendName: rpcPort.OpenImFriendPort,
// rpcRegisterName.OpenImMsgName: rpcPort.OpenImMessagePort,
// rpcRegisterName.OpenImMessageGatewayName: longConnSvrPort,
// rpcRegisterName.OpenImGroupName: rpcPort.OpenImGroupPort,
// rpcRegisterName.OpenImAuthName: rpcPort.OpenImAuthPort,
// rpcRegisterName.OpenImPushName: rpcPort.OpenImPushPort,
// rpcRegisterName.OpenImConversationName: rpcPort.OpenImConversationPort,
// rpcRegisterName.OpenImThirdName: rpcPort.OpenImThirdPort,
// }
//}
//
//type ConnDirect struct {
// additionalOpts []grpc.DialOption
// currentServiceAddress string
// conns map[string][]*grpc.ClientConn
// resolverDirect *ResolverDirect
// config *config2.GlobalConfig
//}
//
//func (cd *ConnDirect) GetClientLocalConns() map[string][]*grpc.ClientConn {
// return nil
//}
//
//func (cd *ConnDirect) GetUserIdHashGatewayHost(ctx context.Context, userId string) (string, error) {
// return "", nil
//}
//
//func (cd *ConnDirect) Register(serviceName, host string, port int, opts ...grpc.DialOption) error {
// return nil
//}
//
//func (cd *ConnDirect) UnRegister() error {
// return nil
//}
//
//func (cd *ConnDirect) CreateRpcRootNodes(serviceNames []string) error {
// return nil
//}
//
//func (cd *ConnDirect) RegisterConf2Registry(key string, conf []byte) error {
// return nil
//}
//
//func (cd *ConnDirect) GetConfFromRegistry(key string) ([]byte, error) {
// return nil, nil
//}
//
//func (cd *ConnDirect) Close() {
//
//}
//
//func NewConnDirect(config *config2.GlobalConfig) (*ConnDirect, error) {
// return &ConnDirect{
// conns: make(map[string][]*grpc.ClientConn),
// resolverDirect: NewResolverDirect(),
// config: config,
// }, nil
//}
//
//func (cd *ConnDirect) GetConns(ctx context.Context,
// serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) {
//
// if conns, exists := cd.conns[serviceName]; exists {
// return conns, nil
// }
// ports := getServiceAddresses(&cd.config.RpcRegisterName,
// &cd.config.RpcPort, cd.config.LongConnSvr.OpenImMessageGatewayPort)[serviceName]
// var connections []*grpc.ClientConn
// for _, port := range ports {
// conn, err := cd.dialServiceWithoutResolver(ctx, fmt.Sprintf(cd.config.Rpc.ListenIP+":%d", port), append(cd.additionalOpts, opts...)...)
// if err != nil {
// return nil, errs.Wrap(fmt.Errorf("connect to port %d failed,serviceName %s, IP %s", port, serviceName, cd.config.Rpc.ListenIP))
// }
// connections = append(connections, conn)
// }
//
// if len(connections) == 0 {
// return nil, errs.New("no connections found for service", "serviceName", serviceName).Wrap()
// }
// return connections, nil
//}
//
//func (cd *ConnDirect) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
// // Get service addresses
// addresses := getServiceAddresses(&cd.config.RpcRegisterName,
// &cd.config.RpcPort, cd.config.LongConnSvr.OpenImMessageGatewayPort)
// address, ok := addresses[serviceName]
// if !ok {
// return nil, errs.New("unknown service name", "serviceName", serviceName).Wrap()
// }
// var result string
// for _, addr := range address {
// if result != "" {
// result = result + "," + fmt.Sprintf(cd.config.Rpc.ListenIP+":%d", addr)
// } else {
// result = fmt.Sprintf(cd.config.Rpc.ListenIP+":%d", addr)
// }
// }
// // Try to dial a new connection
// conn, err := cd.dialService(ctx, result, append(cd.additionalOpts, opts...)...)
// if err != nil {
// return nil, errs.WrapMsg(err, "address", result)
// }
//
// // Store the new connection
// cd.conns[serviceName] = append(cd.conns[serviceName], conn)
// return conn, nil
//}
//
//func (cd *ConnDirect) GetSelfConnTarget() string {
// return cd.currentServiceAddress
//}
//
//func (cd *ConnDirect) AddOption(opts ...grpc.DialOption) {
// cd.additionalOpts = append(cd.additionalOpts, opts...)
//}
//
//func (cd *ConnDirect) CloseConn(conn *grpc.ClientConn) {
// if conn != nil {
// conn.Close()
// }
//}
//
//func (cd *ConnDirect) dialService(ctx context.Context, address string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
// options := append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
// conn, err := grpc.DialContext(ctx, cd.resolverDirect.Scheme()+":///"+address, options...)
//
// if err != nil {
// return nil, errs.WrapMsg(err, "address", address)
// }
// return conn, nil
//}
//
//func (cd *ConnDirect) dialServiceWithoutResolver(ctx context.Context, address string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
// options := append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
// conn, err := grpc.DialContext(ctx, address, options...)
//
// if err != nil {
// return nil, errs.Wrap(err)
// }
// return conn, nil
//}

View File

@ -16,7 +16,6 @@ package discoveryregister
import (
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/direct"
"github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/kubernetes"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/discovery/zookeeper"
@ -31,9 +30,8 @@ const (
)
// NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type.
func NewDiscoveryRegister(zookeeperConfig *config.ZooKeeper, env string) (discovery.SvcDiscoveryRegistry, error) {
switch env {
func NewDiscoveryRegister(zookeeperConfig *config.ZooKeeper, share *config.Share) (discovery.SvcDiscoveryRegistry, error) {
switch share.Env {
case zookeeperConst:
return zookeeper.NewZkClient(
zookeeperConfig.Address,
@ -44,10 +42,11 @@ func NewDiscoveryRegister(zookeeperConfig *config.ZooKeeper, env string) (discov
zookeeper.WithTimeout(10),
)
case kubenetesConst:
return kubernetes.NewK8sDiscoveryRegister(config.RpcRegisterName.OpenImMessageGatewayName)
return kubernetes.NewK8sDiscoveryRegister(share.RpcRegisterName.MessageGateway)
case directConst:
return direct.NewConnDirect(config)
//return direct.NewConnDirect(config)
default:
return nil, errs.New("unsupported discovery type", "type", env).Wrap()
return nil, errs.New("unsupported discovery type", "type", share.Env).Wrap()
}
return nil, nil
}

View File

@ -15,13 +15,7 @@
package discoveryregister
import (
"github.com/openimsdk/tools/discovery"
"os"
"testing"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/stretchr/testify/assert"
)
func setupTestEnvironment() {
@ -32,35 +26,35 @@ func setupTestEnvironment() {
os.Setenv("ZOOKEEPER_PASSWORD", "")
}
func TestNewDiscoveryRegister(t *testing.T) {
setupTestEnvironment()
conf := config.NewGlobalConfig()
tests := []struct {
envType string
gatewayName string
expectedError bool
expectedResult bool
}{
{"zookeeper", "MessageGateway", false, true},
{"k8s", "MessageGateway", false, true},
{"direct", "MessageGateway", false, true},
{"invalid", "MessageGateway", true, false},
}
for _, test := range tests {
conf.Envs.Discovery = test.envType
conf.RpcRegisterName.OpenImMessageGatewayName = test.gatewayName
client, err := NewDiscoveryRegister(conf)
if test.expectedError {
assert.Error(t, err)
} else {
assert.NoError(t, err)
if test.expectedResult {
assert.Implements(t, (*discovery.SvcDiscoveryRegistry)(nil), client)
} else {
assert.Nil(t, client)
}
}
}
}
//func TestNewDiscoveryRegister(t *testing.T) {
// setupTestEnvironment()
// conf := config.NewGlobalConfig()
// tests := []struct {
// envType string
// gatewayName string
// expectedError bool
// expectedResult bool
// }{
// {"zookeeper", "MessageGateway", false, true},
// {"k8s", "MessageGateway", false, true},
// {"direct", "MessageGateway", false, true},
// {"invalid", "MessageGateway", true, false},
// }
//
// for _, test := range tests {
// conf.Envs.Discovery = test.envType
// conf.RpcRegisterName.OpenImMessageGatewayName = test.gatewayName
// client, err := NewDiscoveryRegister(conf)
//
// if test.expectedError {
// assert.Error(t, err)
// } else {
// assert.NoError(t, err)
// if test.expectedResult {
// assert.Implements(t, (*discovery.SvcDiscoveryRegistry)(nil), client)
// } else {
// assert.Nil(t, client)
// }
// }
// }
//}

View File

@ -17,34 +17,8 @@ package zookeeper
import (
"os"
"strings"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/discovery/zookeeper"
)
// NewZookeeperDiscoveryRegister creates a new instance of ZookeeperDR for Zookeeper service discovery and registration.
func NewZookeeperDiscoveryRegister(zkConf *config.Zookeeper) (discovery.SvcDiscoveryRegistry, error) {
schema := getEnv("ZOOKEEPER_SCHEMA", zkConf.Schema)
zkAddr := getZkAddrFromEnv(zkConf.ZkAddr)
username := getEnv("ZOOKEEPER_USERNAME", zkConf.Username)
password := getEnv("ZOOKEEPER_PASSWORD", zkConf.Password)
zk, err := zookeeper.NewZkClient(
zkAddr,
schema,
zookeeper.WithFreq(time.Hour),
zookeeper.WithUserNameAndPassword(username, password),
zookeeper.WithRoundRobin(),
zookeeper.WithTimeout(10),
)
if err != nil {
return nil, err
}
return zk, nil
}
// getEnv returns the value of an environment variable if it exists, otherwise it returns the fallback value.
func getEnv(key, fallback string) string {
if value, exists := os.LookupEnv(key); exists {

View File

@ -31,17 +31,17 @@ func NewGrpcPromObj(cusMetrics []prometheus.Collector) (*prometheus.Registry, *g
return reg, grpcMetrics, nil
}
func GetGrpcCusMetrics(registerName string, zookeeper *config2.ZooKeeper) []prometheus.Collector {
func GetGrpcCusMetrics(registerName string, share *config2.Share) []prometheus.Collector {
switch registerName {
case zookeeper.RpcRegisterName.MessageGateway:
case share.RpcRegisterName.MessageGateway:
return []prometheus.Collector{OnlineUserGauge}
case zookeeper.RpcRegisterName.Msg:
case share.RpcRegisterName.Msg:
return []prometheus.Collector{SingleChatMsgProcessSuccessCounter, SingleChatMsgProcessFailedCounter, GroupChatMsgProcessSuccessCounter, GroupChatMsgProcessFailedCounter}
case "Transfer":
return []prometheus.Collector{MsgInsertRedisSuccessCounter, MsgInsertRedisFailedCounter, MsgInsertMongoSuccessCounter, MsgInsertMongoFailedCounter, SeqSetFailedCounter}
case zookeeper.RpcRegisterName.Push:
case share.RpcRegisterName.Push:
return []prometheus.Collector{MsgOfflinePushFailedCounter}
case zookeeper.RpcRegisterName.Auth:
case share.RpcRegisterName.Auth:
return []prometheus.Collector{UserLoginCounter}
default:
return nil

View File

@ -19,8 +19,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config"
)
func TestNewGrpcPromObj(t *testing.T) {
@ -57,22 +55,22 @@ func TestNewGrpcPromObj(t *testing.T) {
assert.True(t, found, "Custom metric not found in registry")
}
func TestGetGrpcCusMetrics(t *testing.T) {
conf := config2.NewGlobalConfig()
config2.InitConfig(conf, "../../config")
// Test various cases based on the switch statement in the GetGrpcCusMetrics function.
testCases := []struct {
name string
expected int // The expected number of metrics for each case.
}{
{conf.RpcRegisterName.OpenImMessageGatewayName, 1},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
metrics := GetGrpcCusMetrics(tc.name, &conf.RpcRegisterName)
assert.Len(t, metrics, tc.expected)
})
}
}
//func TestGetGrpcCusMetrics(t *testing.T) {
// conf := config2.NewGlobalConfig()
//
// config2.InitConfig(conf, "../../config")
// // Test various cases based on the switch statement in the GetGrpcCusMetrics function.
// testCases := []struct {
// name string
// expected int // The expected number of metrics for each case.
// }{
// {conf.RpcRegisterName.OpenImMessageGatewayName, 1},
// }
//
// for _, tc := range testCases {
// t.Run(tc.name, func(t *testing.T) {
// metrics := GetGrpcCusMetrics(tc.name, &conf.RpcRegisterName)
// assert.Len(t, metrics, tc.expected)
// })
// }
//}

View File

@ -45,7 +45,7 @@ import (
// Start rpc server.
func Start[T any](ctx context.Context, zookeeperConfig *config2.ZooKeeper, prometheusConfig *config2.Prometheus, listenIP,
registerIP string, rpcPorts []int, index int, rpcRegisterName string, config T, rpcFn func(ctx context.Context,
registerIP string, rpcPorts []int, index int, rpcRegisterName string, share *config2.Share, config T, rpcFn func(ctx context.Context,
config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption) error {
rpcPort, err := datautil.GetElemByIndex(rpcPorts, index)
@ -68,7 +68,7 @@ func Start[T any](ctx context.Context, zookeeperConfig *config2.ZooKeeper, prome
}
defer listener.Close()
client, err := kdisc.NewDiscoveryRegister(zookeeperConfig)
client, err := kdisc.NewDiscoveryRegister(zookeeperConfig, share)
if err != nil {
return err
}
@ -83,7 +83,7 @@ func Start[T any](ctx context.Context, zookeeperConfig *config2.ZooKeeper, prome
var reg *prometheus.Registry
var metric *grpcprometheus.ServerMetrics
if prometheusConfig.Enable {
cusMetrics := prommetrics.GetGrpcCusMetrics(rpcRegisterName, zookeeperConfig)
cusMetrics := prommetrics.GetGrpcCusMetrics(rpcRegisterName, share)
reg, metric, _ = prommetrics.NewGrpcPromObj(cusMetrics)
options = append(options, mw.GrpcServer(), grpc.StreamInterceptor(metric.StreamServerInterceptor()),
grpc.UnaryInterceptor(metric.UnaryServerInterceptor()))

View File

@ -27,8 +27,8 @@ import (
"github.com/redis/go-redis/v9"
)
func NewConversationLocalCache(client rpcclient.ConversationRpcClient, cli redis.UniversalClient) *ConversationLocalCache {
lc := config.Config.LocalCache.Conversation
func NewConversationLocalCache(client rpcclient.ConversationRpcClient, localCache *config.LocalCache, cli redis.UniversalClient) *ConversationLocalCache {
lc := localCache.Conversation
log.ZDebug(context.Background(), "ConversationLocalCache", "topic", lc.Topic, "slotNum", lc.SlotNum, "slotSize", lc.SlotSize, "enable", lc.Enable())
x := &ConversationLocalCache{
client: client,

View File

@ -25,8 +25,8 @@ import (
"github.com/redis/go-redis/v9"
)
func NewFriendLocalCache(client rpcclient.FriendRpcClient, cli redis.UniversalClient) *FriendLocalCache {
lc := config.Config.LocalCache.Friend
func NewFriendLocalCache(client rpcclient.FriendRpcClient, localCache *config.LocalCache, cli redis.UniversalClient) *FriendLocalCache {
lc := localCache.Friend
log.ZDebug(context.Background(), "FriendLocalCache", "topic", lc.Topic, "slotNum", lc.SlotNum, "slotSize", lc.SlotSize, "enable", lc.Enable())
x := &FriendLocalCache{
client: client,

View File

@ -27,8 +27,8 @@ import (
"github.com/redis/go-redis/v9"
)
func NewGroupLocalCache(client rpcclient.GroupRpcClient, cli redis.UniversalClient) *GroupLocalCache {
lc := config.Config.LocalCache.Group
func NewGroupLocalCache(client rpcclient.GroupRpcClient, localCache *config.LocalCache, cli redis.UniversalClient) *GroupLocalCache {
lc := localCache.Group
log.ZDebug(context.Background(), "GroupLocalCache", "topic", lc.Topic, "slotNum", lc.SlotNum, "slotSize", lc.SlotSize, "enable", lc.Enable())
x := &GroupLocalCache{
client: client,

View File

@ -27,8 +27,8 @@ import (
"github.com/redis/go-redis/v9"
)
func NewUserLocalCache(client rpcclient.UserRpcClient, cli redis.UniversalClient) *UserLocalCache {
lc := config.Config.LocalCache.User
func NewUserLocalCache(client rpcclient.UserRpcClient, localCache *config.LocalCache, cli redis.UniversalClient) *UserLocalCache {
lc := localCache.User
log.ZDebug(context.Background(), "UserLocalCache", "topic", lc.Topic, "slotNum", lc.SlotNum, "slotSize", lc.SlotSize, "enable", lc.Enable())
x := &UserLocalCache{
client: client,