From 04bac6ddb7ebe31a56b09b2a64001af3dc448445 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Tue, 11 Feb 2025 16:31:53 +0800 Subject: [PATCH] 1 --- cmd/main.go | 27 ++++++- internal/api/init.go | 31 ++++---- internal/msggateway/init.go | 6 +- internal/msgtransfer/init.go | 93 ++--------------------- internal/push/onlinepusher.go | 5 +- internal/push/push.go | 7 +- internal/rpc/auth/auth.go | 5 +- internal/rpc/conversation/conversation.go | 15 ++-- internal/rpc/group/group.go | 13 +--- internal/rpc/msg/server.go | 15 ++-- internal/rpc/relation/friend.go | 17 ++--- internal/rpc/third/third.go | 9 ++- internal/rpc/user/user.go | 20 ++--- internal/tools/cron_task.go | 25 +++--- pkg/common/cmd/api.go | 7 +- pkg/common/cmd/cron_task.go | 20 ++++- pkg/common/config/config.go | 1 + pkg/common/config/constant.go | 2 +- pkg/common/config/global.go | 11 +++ pkg/common/discovery/discoveryregister.go | 5 +- pkg/common/startrpc/start.go | 29 ++++--- pkg/dbbuild/builder.go | 25 ++++++ pkg/dbbuild/microservices.go | 26 +++++++ pkg/dbbuild/standalone.go | 76 ++++++++++++++++++ pkg/mqbuild/builder.go | 4 +- pkg/rpccache/online.go | 16 ++-- 26 files changed, 303 insertions(+), 207 deletions(-) create mode 100644 pkg/common/config/global.go create mode 100644 pkg/dbbuild/builder.go create mode 100644 pkg/dbbuild/microservices.go create mode 100644 pkg/dbbuild/standalone.go diff --git a/cmd/main.go b/cmd/main.go index e0e73888d..fc61816e1 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -3,6 +3,7 @@ package main import ( "bytes" "context" + "encoding/json" "flag" "fmt" "net" @@ -29,16 +30,23 @@ import ( "github.com/openimsdk/open-im-server/v3/internal/rpc/relation" "github.com/openimsdk/open-im-server/v3/internal/rpc/third" "github.com/openimsdk/open-im-server/v3/internal/rpc/user" + "github.com/openimsdk/open-im-server/v3/internal/tools" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/discovery/standalone" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/utils/datautil" + "github.com/openimsdk/tools/utils/network" "github.com/spf13/viper" "google.golang.org/grpc" ) +func init() { + config.SetStandalone() + prommetrics.RegistryAll() +} + func main() { var configPath string flag.StringVar(&configPath, "c", "", "config path") @@ -62,6 +70,7 @@ func main() { putCmd(cmd, true, msggateway.Start) putCmd(cmd, true, msgtransfer.Start) putCmd(cmd, true, api.Start) + putCmd(cmd, true, tools.Start) ctx := context.Background() if err := cmd.run(ctx); err != nil { fmt.Println(err) @@ -91,7 +100,7 @@ func (x *cmds) getTypePath(typ reflect.Type) string { } func (x *cmds) initDiscovery() { - x.config.Discovery.Enable = config.Standalone + x.config.Discovery.Enable = "standalone" vof := reflect.ValueOf(&x.config.Discovery.RpcService).Elem() tof := reflect.TypeOf(&x.config.Discovery.RpcService).Elem() num := tof.NumField() @@ -143,6 +152,9 @@ func (x *cmds) initAllConfig() error { } } x.initDiscovery() + x.config.Redis.Disable = true + x.config.LocalCache = config.LocalCache{} + config.InitNotification(&x.config.Notification) return nil } @@ -209,13 +221,23 @@ func (x *cmds) run(ctx context.Context) error { return err } } + ip, err := network.GetLocalIP() + if err != nil { + return err + } listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) if err != nil { return fmt.Errorf("prometheus listen %d error %w", port, err) } defer listener.Close() - prommetrics.RegistryAll() log.ZDebug(ctx, "prometheus start", "addr", listener.Addr()) + target, err := json.Marshal(prommetrics.BuildDefaultTarget(ip, listener.Addr().(*net.TCPAddr).Port)) + if err != nil { + return err + } + if err := standalone.GetKeyValue().SetKey(ctx, prommetrics.BuildDiscoveryKey(prommetrics.APIKeyName), target); err != nil { + return err + } go func() { err := prommetrics.Start(listener) if err == nil { @@ -278,7 +300,6 @@ func (x *cmds) run(ctx context.Context) error { wg.Wait() close(done) }() - select { case <-time.After(time.Second * 20): log.ZError(ctx, "server exit timeout", nil) diff --git a/internal/api/init.go b/internal/api/init.go index 7bb8cc1ac..69ade8aaf 100644 --- a/internal/api/init.go +++ b/internal/api/init.go @@ -17,17 +17,15 @@ package api import ( "context" "errors" - "fmt" "net" "net/http" - "os" - "os/signal" "strconv" - "syscall" "time" conf "github.com/openimsdk/open-im-server/v3/pkg/common/config" + disetcd "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd" "github.com/openimsdk/tools/discovery" + "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/network" @@ -60,7 +58,6 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, service g Handler: router, Addr: net.JoinHostPort(network.GetListenIP(config.API.Api.ListenIP), strconv.Itoa(apiPort)), } - log.CInfo(ctx, "api server is init", "runtimeEnv", runtimeenv.RuntimeEnvironment(), "address", httpServer.Addr, "apiPort", apiPort) go func() { defer close(done) <-ctx.Done() @@ -69,6 +66,7 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, service g log.ZWarn(ctx, "api server shutdown err", err) } }() + log.CInfo(ctx, "api server is init", "runtimeEnv", runtimeenv.RuntimeEnvironment(), "address", httpServer.Addr, "apiPort", apiPort) err := httpServer.ListenAndServe() if err == nil { err = errors.New("api done") @@ -76,19 +74,18 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, service g cancel(err) }() - //if config.Discovery.Enable == conf.ETCD { - // cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), config.GetConfigNames()) - // cm.Watch(ctx) - //} - - sigs := make(chan os.Signal, 1) - signal.Notify(sigs, syscall.SIGTERM) - select { - case val := <-sigs: - log.ZDebug(ctx, "recv exit", "signal", val.String()) - cancel(fmt.Errorf("signal %s", val.String())) - case <-ctx.Done(): + if config.Discovery.Enable == conf.ETCD { + cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), config.GetConfigNames()) + cm.Watch(ctx) } + //sigs := make(chan os.Signal, 1) + //signal.Notify(sigs, syscall.SIGTERM) + //select { + //case val := <-sigs: + // log.ZDebug(ctx, "recv exit", "signal", val.String()) + // cancel(fmt.Errorf("signal %s", val.String())) + //case <-ctx.Done(): + //} exitCause := context.Cause(ctx) log.ZWarn(ctx, "api server exit", exitCause) timer := time.NewTimer(time.Second * 15) diff --git a/internal/msggateway/init.go b/internal/msggateway/init.go index 6c6960220..8772693cc 100644 --- a/internal/msggateway/init.go +++ b/internal/msggateway/init.go @@ -19,8 +19,8 @@ import ( "time" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/dbbuild" "github.com/openimsdk/open-im-server/v3/pkg/rpccache" - "github.com/openimsdk/tools/db/redisutil" "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/runtimeenv" @@ -48,10 +48,12 @@ func Start(ctx context.Context, conf *Config, client discovery.Conn, server grpc return err } - rdb, err := redisutil.NewRedisClient(ctx, conf.RedisConfig.Build()) + dbb := dbbuild.NewBuilder(nil, &conf.RedisConfig) + rdb, err := dbb.Redis(ctx) if err != nil { return err } + longServer := NewWsServer( conf, WithPort(wsPort), diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 3ca6c6b46..d6c8a0797 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -16,34 +16,22 @@ package msgtransfer import ( "context" - "errors" - "fmt" - "net" - "net/http" "os" "os/signal" - "strconv" "syscall" disetcd "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo" + "github.com/openimsdk/open-im-server/v3/pkg/dbbuild" "github.com/openimsdk/open-im-server/v3/pkg/mqbuild" "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/mq" - "github.com/openimsdk/tools/utils/jsonutil" - "github.com/openimsdk/tools/utils/network" - - "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo" - "github.com/openimsdk/tools/db/mongoutil" - "github.com/openimsdk/tools/db/redisutil" - "github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/runtimeenv" conf "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" - "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/system/program" "google.golang.org/grpc" @@ -74,25 +62,19 @@ type Config struct { } func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { - builder := mqbuild.NewBuilder(&config.Discovery, &config.KafkaConfig) + builder := mqbuild.NewBuilder(&config.KafkaConfig) log.CInfo(ctx, "MSG-TRANSFER server is initializing", "runTimeEnv", runtimeenv.RuntimeEnvironment(), "prometheusPorts", config.MsgTransfer.Prometheus.Ports, "index", config.Index) - - mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build()) + dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig) + mgocli, err := dbb.Mongo(ctx) if err != nil { return err } - rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build()) + rdb, err := dbb.Redis(ctx) if err != nil { return err } - //client, err := discRegister.NewDiscoveryRegister(&config.Discovery, nil) - //if err != nil { - // return err - //} - //client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), - // grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) if config.Discovery.Enable == conf.ETCD { cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), []string{ @@ -193,67 +175,6 @@ func (m *MsgTransfer) Start(ctx context.Context, index int, config *Config, clie return err } - registerIP, err := network.GetRpcRegisterIP("") - if err != nil { - return err - } - - getAutoPort := func() (net.Listener, int, error) { - registerAddr := net.JoinHostPort(registerIP, "0") - listener, err := net.Listen("tcp", registerAddr) - if err != nil { - return nil, 0, errs.WrapMsg(err, "listen err", "registerAddr", registerAddr) - } - _, portStr, _ := net.SplitHostPort(listener.Addr().String()) - port, _ := strconv.Atoi(portStr) - return listener, port, nil - } - - if config.Discovery.Enable != conf.Standalone && config.MsgTransfer.Prometheus.Enable { - if config.MsgTransfer.Prometheus.AutoSetPorts && config.Discovery.Enable != conf.ETCD { - return errs.New("only etcd support autoSetPorts", "RegisterName", "api").Wrap() - } - var ( - listener net.Listener - prometheusPort int - ) - - if config.MsgTransfer.Prometheus.AutoSetPorts { - listener, prometheusPort, err = getAutoPort() - if err != nil { - return err - } - - etcdClient := client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() - - _, err = etcdClient.Put(context.TODO(), prommetrics.BuildDiscoveryKey(prommetrics.MessageTransferKeyName), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort))) - if err != nil { - return errs.WrapMsg(err, "etcd put err") - } - } else { - prometheusPort, err = datautil.GetElemByIndex(config.MsgTransfer.Prometheus.Ports, index) - if err != nil { - return err - } - listener, err = net.Listen("tcp", fmt.Sprintf(":%d", prometheusPort)) - if err != nil { - return errs.WrapMsg(err, "listen err", "addr", fmt.Sprintf(":%d", prometheusPort)) - } - } - - go func() { - defer func() { - if r := recover(); r != nil { - log.ZPanic(m.ctx, "MsgTransfer Start Panic", errs.ErrPanic(r)) - } - }() - if err := prommetrics.TransferInit(listener); err != nil && !errors.Is(err, http.ErrServerClosed) { - netErr = errs.WrapMsg(err, "prometheus start error", "prometheusPort", prometheusPort) - netDone <- struct{}{} - } - }() - } - // todo sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGTERM) diff --git a/internal/push/onlinepusher.go b/internal/push/onlinepusher.go index e55765706..b9d371c55 100644 --- a/internal/push/onlinepusher.go +++ b/internal/push/onlinepusher.go @@ -41,14 +41,15 @@ func (u emptyOnlinePusher) GetOnlinePushFailedUserIDs(ctx context.Context, msg * } func NewOnlinePusher(disCov discovery.Conn, config *Config) (OnlinePusher, error) { + if conf.Standalone() { + return NewDefaultAllNode(disCov, config), nil + } if runtimeenv.RuntimeEnvironment() == conf.KUBERNETES { return NewDefaultAllNode(disCov, config), nil } switch config.Discovery.Enable { case conf.ETCD: return NewDefaultAllNode(disCov, config), nil - case conf.Standalone: - return NewDefaultAllNode(disCov, config), nil default: return nil, errs.New(fmt.Sprintf("unsupported discovery type %s", config.Discovery.Enable)) } diff --git a/internal/push/push.go b/internal/push/push.go index 5c2e34ab4..8a1882b62 100644 --- a/internal/push/push.go +++ b/internal/push/push.go @@ -9,9 +9,9 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" + "github.com/openimsdk/open-im-server/v3/pkg/dbbuild" "github.com/openimsdk/open-im-server/v3/pkg/mqbuild" pbpush "github.com/openimsdk/protocol/push" - "github.com/openimsdk/tools/db/redisutil" "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" @@ -46,7 +46,8 @@ func (p pushServer) DelUserPushToken(ctx context.Context, } func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { - rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build()) + dbb := dbbuild.NewBuilder(nil, &config.RedisConfig) + rdb, err := dbb.Redis(ctx) if err != nil { return err } @@ -55,7 +56,7 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, server gr if err != nil { return err } - builder := mqbuild.NewBuilder(&config.Discovery, &config.KafkaConfig) + builder := mqbuild.NewBuilder(&config.KafkaConfig) offlinePushProducer, err := builder.GetTopicProducer(ctx, config.KafkaConfig.ToOfflinePushTopic) if err != nil { diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index 3287d9112..2bf86cb10 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -18,11 +18,11 @@ import ( "context" "errors" + "github.com/openimsdk/open-im-server/v3/pkg/dbbuild" "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "github.com/openimsdk/open-im-server/v3/pkg/common/config" redis2 "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" - "github.com/openimsdk/tools/db/redisutil" "github.com/openimsdk/tools/utils/datautil" "github.com/redis/go-redis/v9" @@ -56,7 +56,8 @@ type Config struct { } func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { - rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build()) + dbb := dbbuild.NewBuilder(nil, &config.RedisConfig) + rdb, err := dbb.Redis(ctx) if err != nil { return err } diff --git a/internal/rpc/conversation/conversation.go b/internal/rpc/conversation/conversation.go index 359c28303..2ee4d6e1c 100644 --- a/internal/rpc/conversation/conversation.go +++ b/internal/rpc/conversation/conversation.go @@ -19,24 +19,22 @@ import ( "sort" "time" + "github.com/openimsdk/open-im-server/v3/pkg/dbbuild" "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/convert" + "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" dbModel "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/localcache" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" - "github.com/openimsdk/tools/db/redisutil" - - "github.com/openimsdk/open-im-server/v3/pkg/common/convert" - "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" "github.com/openimsdk/protocol/constant" pbconversation "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/protocol/sdkws" - "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" @@ -67,11 +65,12 @@ type Config struct { } func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { - mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build()) + dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig) + mgocli, err := dbb.Mongo(ctx) if err != nil { return err } - rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build()) + rdb, err := dbb.Redis(ctx) if err != nil { return err } diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 39555d47f..8ad4a949f 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -23,6 +23,7 @@ import ( "strings" "time" + "github.com/openimsdk/open-im-server/v3/pkg/dbbuild" "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "github.com/openimsdk/open-im-server/v3/pkg/authverify" @@ -43,8 +44,6 @@ import ( pbgroup "github.com/openimsdk/protocol/group" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/protocol/wrapperspb" - "github.com/openimsdk/tools/db/mongoutil" - "github.com/openimsdk/tools/db/redisutil" "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" @@ -78,11 +77,12 @@ type Config struct { } func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { - mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build()) + dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig) + mgocli, err := dbb.Mongo(ctx) if err != nil { return err } - rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build()) + rdb, err := dbb.Redis(ctx) if err != nil { return err } @@ -98,11 +98,6 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, server gr if err != nil { return err } - - //userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID) - //msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg) - //conversationRpcClient := rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation) - userConn, err := client.GetConn(ctx, config.Discovery.RpcService.User) if err != nil { return err diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index 1025e9f77..d0b228156 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -17,23 +17,21 @@ package msg import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/dbbuild" "github.com/openimsdk/open-im-server/v3/pkg/mqbuild" "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo" "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" - "github.com/openimsdk/protocol/sdkws" - "github.com/openimsdk/tools/db/mongoutil" - "github.com/openimsdk/tools/db/redisutil" - - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" "github.com/openimsdk/open-im-server/v3/pkg/notification" "github.com/openimsdk/open-im-server/v3/pkg/rpccache" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/protocol/msg" + "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/discovery" "google.golang.org/grpc" ) @@ -79,16 +77,17 @@ func (m *msgServer) addInterceptorHandler(interceptorFunc ...MessageInterceptorF } func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { - builder := mqbuild.NewBuilder(&config.Discovery, &config.KafkaConfig) + builder := mqbuild.NewBuilder(&config.KafkaConfig) redisProducer, err := builder.GetTopicProducer(ctx, config.KafkaConfig.ToRedisTopic) if err != nil { return err } - mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build()) + dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig) + mgocli, err := dbb.Mongo(ctx) if err != nil { return err } - rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build()) + rdb, err := dbb.Redis(ctx) if err != nil { return err } diff --git a/internal/rpc/relation/friend.go b/internal/rpc/relation/friend.go index aaf767508..a4908d924 100644 --- a/internal/rpc/relation/friend.go +++ b/internal/rpc/relation/friend.go @@ -17,26 +17,24 @@ package relation import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/dbbuild" "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "github.com/openimsdk/tools/mq/memamq" + "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/convert" + "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" "github.com/openimsdk/open-im-server/v3/pkg/localcache" - "github.com/openimsdk/tools/db/redisutil" - - "github.com/openimsdk/open-im-server/v3/pkg/authverify" - "github.com/openimsdk/open-im-server/v3/pkg/common/convert" - "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/relation" "github.com/openimsdk/protocol/sdkws" - "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/utils/datautil" @@ -68,11 +66,12 @@ type Config struct { } func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { - mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build()) + dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig) + mgocli, err := dbb.Mongo(ctx) if err != nil { return err } - rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build()) + rdb, err := dbb.Redis(ctx) if err != nil { return err } diff --git a/internal/rpc/third/third.go b/internal/rpc/third/third.go index 38da8c3cc..377e694c2 100644 --- a/internal/rpc/third/third.go +++ b/internal/rpc/third/third.go @@ -19,6 +19,7 @@ import ( "fmt" "time" + "github.com/openimsdk/open-im-server/v3/pkg/dbbuild" "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "github.com/openimsdk/open-im-server/v3/pkg/common/config" @@ -30,8 +31,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" "github.com/openimsdk/protocol/third" - "github.com/openimsdk/tools/db/mongoutil" - "github.com/openimsdk/tools/db/redisutil" "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/s3" "github.com/openimsdk/tools/s3/cos" @@ -62,14 +61,16 @@ type Config struct { } func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { - mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build()) + dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig) + mgocli, err := dbb.Mongo(ctx) if err != nil { return err } - rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build()) + rdb, err := dbb.Redis(ctx) if err != nil { return err } + logdb, err := mgo.NewLogMongo(mgocli.GetDB()) if err != nil { return err diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index 656f0d168..bde9a3f8f 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -23,27 +23,25 @@ import ( "time" "github.com/openimsdk/open-im-server/v3/internal/rpc/relation" + "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/convert" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" + "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo" tablerelation "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" + "github.com/openimsdk/open-im-server/v3/pkg/dbbuild" "github.com/openimsdk/open-im-server/v3/pkg/localcache" "github.com/openimsdk/open-im-server/v3/pkg/rpcli" + "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/group" friendpb "github.com/openimsdk/protocol/relation" - "github.com/openimsdk/tools/db/redisutil" - - "github.com/openimsdk/open-im-server/v3/pkg/authverify" - "github.com/openimsdk/open-im-server/v3/pkg/common/convert" - "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" - "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/sdkws" pbuser "github.com/openimsdk/protocol/user" - "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/pagination" "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/errs" @@ -77,14 +75,16 @@ type Config struct { } func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { - mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build()) + dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig) + mgocli, err := dbb.Mongo(ctx) if err != nil { return err } - rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build()) + rdb, err := dbb.Redis(ctx) if err != nil { return err } + users := make([]*tablerelation.User, 0) for _, v := range config.Share.IMAdminUserID { diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index 628e378d0..7fb7b4abc 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -4,41 +4,34 @@ import ( "context" "github.com/openimsdk/open-im-server/v3/pkg/common/config" - kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discovery" disetcd "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd" pbconversation "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/third" + "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/discovery/etcd" - "github.com/openimsdk/tools/mcontext" - "github.com/openimsdk/tools/mw" - "github.com/openimsdk/tools/utils/runtimeenv" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" - "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/mcontext" + "github.com/openimsdk/tools/utils/runtimeenv" "github.com/robfig/cron/v3" + "google.golang.org/grpc" ) -type CronTaskConfig struct { +type Config struct { CronTask config.CronTask Share config.Share Discovery config.Discovery } -func Start(ctx context.Context, conf *CronTaskConfig) error { +// func Start(ctx context.Context, conf *CronTaskConfig) error { +func Start(ctx context.Context, conf *Config, client discovery.Conn, service grpc.ServiceRegistrar) error { log.CInfo(ctx, "CRON-TASK server is initializing", "runTimeEnv", runtimeenv.RuntimeEnvironment(), "chatRecordsClearTime", conf.CronTask.CronExecuteTime, "msgDestructTime", conf.CronTask.RetainChatRecords) if conf.CronTask.RetainChatRecords < 1 { - return errs.New("msg destruct time must be greater than 1").Wrap() + return nil } - client, err := kdisc.NewDiscoveryRegister(&conf.Discovery, nil) - if err != nil { - return errs.WrapMsg(err, "failed to register discovery service") - } - client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials())) ctx = mcontext.SetOpUserID(ctx, conf.Share.IMAdminUserID[0]) msgConn, err := client.GetConn(ctx, conf.Discovery.RpcService.Msg) @@ -91,7 +84,7 @@ func Start(ctx context.Context, conf *CronTaskConfig) error { type cronServer struct { ctx context.Context - config *CronTaskConfig + config *Config cron *cron.Cron msgClient msg.MsgClient conversationClient pbconversation.ConversationClient diff --git a/pkg/common/cmd/api.go b/pkg/common/cmd/api.go index 8ee8bfc76..484467798 100644 --- a/pkg/common/cmd/api.go +++ b/pkg/common/cmd/api.go @@ -74,12 +74,15 @@ func (a *ApiCmd) Exec() error { func (a *ApiCmd) runE() error { a.apiConfig.Index = config.Index(a.Index()) - var prometheus config.Prometheus + prometheus := config.Prometheus{ + Enable: a.apiConfig.API.Prometheus.Enable, + Ports: a.apiConfig.API.Prometheus.Ports, + } return startrpc.Start( a.ctx, &a.apiConfig.Discovery, &prometheus, a.apiConfig.API.Api.ListenIP, "", - false, + a.apiConfig.API.Prometheus.AutoSetPorts, nil, int(a.apiConfig.Index), a.apiConfig.Discovery.RpcService.MessageGateway, &a.apiConfig.Notification, diff --git a/pkg/common/cmd/cron_task.go b/pkg/common/cmd/cron_task.go index e7eb0ce18..eb8b1489e 100644 --- a/pkg/common/cmd/cron_task.go +++ b/pkg/common/cmd/cron_task.go @@ -19,6 +19,7 @@ import ( "github.com/openimsdk/open-im-server/v3/internal/tools" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" "github.com/openimsdk/open-im-server/v3/version" "github.com/openimsdk/tools/system/program" "github.com/spf13/cobra" @@ -28,11 +29,11 @@ type CronTaskCmd struct { *RootCmd ctx context.Context configMap map[string]any - cronTaskConfig *tools.CronTaskConfig + cronTaskConfig *tools.Config } func NewCronTaskCmd() *CronTaskCmd { - var cronTaskConfig tools.CronTaskConfig + var cronTaskConfig tools.Config ret := &CronTaskCmd{cronTaskConfig: &cronTaskConfig} ret.configMap = map[string]any{ config.OpenIMCronTaskCfgFileName: &cronTaskConfig.CronTask, @@ -52,5 +53,18 @@ func (a *CronTaskCmd) Exec() error { } func (a *CronTaskCmd) runE() error { - return tools.Start(a.ctx, a.cronTaskConfig) + var prometheus config.Prometheus + return startrpc.Start( + a.ctx, &a.cronTaskConfig.Discovery, + &prometheus, + "", "", + false, + nil, 0, + "", + nil, + a.cronTaskConfig, + []string{}, + []string{}, + tools.Start, + ) } diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index f74fe2440..2cc6979b2 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -323,6 +323,7 @@ type RPC struct { } type Redis struct { + Disable bool `yaml:"disable"` Address []string `yaml:"address"` Username string `yaml:"username"` Password string `yaml:"password"` diff --git a/pkg/common/config/constant.go b/pkg/common/config/constant.go index 066fcd5c0..fa3f0ca05 100644 --- a/pkg/common/config/constant.go +++ b/pkg/common/config/constant.go @@ -23,7 +23,7 @@ const ( DeploymentType = "DEPLOYMENT_TYPE" KUBERNETES = runtimeenv.Kubernetes ETCD = "etcd" - Standalone = "standalone" + //Standalone = "standalone" ) const ( diff --git a/pkg/common/config/global.go b/pkg/common/config/global.go new file mode 100644 index 000000000..19f74b0a9 --- /dev/null +++ b/pkg/common/config/global.go @@ -0,0 +1,11 @@ +package config + +var standalone bool + +func SetStandalone() { + standalone = true +} + +func Standalone() bool { + return standalone +} diff --git a/pkg/common/discovery/discoveryregister.go b/pkg/common/discovery/discoveryregister.go index 56160b9a9..dc100be5c 100644 --- a/pkg/common/discovery/discoveryregister.go +++ b/pkg/common/discovery/discoveryregister.go @@ -31,6 +31,9 @@ import ( // NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type. func NewDiscoveryRegister(discovery *config.Discovery, watchNames []string) (discovery.SvcDiscoveryRegistry, error) { + if config.Standalone() { + return standalone.GetSvcDiscoveryRegistry(), nil + } if runtimeenv.RuntimeEnvironment() == config.KUBERNETES { return kubernetes.NewKubernetesConnManager(discovery.Kubernetes.Namespace, grpc.WithDefaultCallOptions( @@ -40,8 +43,6 @@ func NewDiscoveryRegister(discovery *config.Discovery, watchNames []string) (dis } switch discovery.Enable { - case config.Standalone: - return standalone.GetSvcDiscoveryRegistry(), nil case config.ETCD: return etcd.NewSvcDiscoveryRegistry( discovery.Etcd.RootDirectory, diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index 328fbf3c5..f08e4b2a8 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -16,6 +16,7 @@ package startrpc import ( "context" + "errors" "fmt" "net" "os" @@ -26,6 +27,7 @@ import ( conf "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/tools/utils/datautil" + "github.com/openimsdk/tools/utils/jsonutil" "github.com/openimsdk/tools/utils/network" "google.golang.org/grpc/status" @@ -39,7 +41,11 @@ import ( "google.golang.org/grpc/credentials/insecure" ) -func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConfig *conf.Prometheus, listenIP, +func init() { + prommetrics.RegistryAll() +} + +func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *conf.Prometheus, listenIP, registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, notification *conf.Notification, config T, watchConfigNames []string, watchServiceNames []string, rpcFn func(ctx context.Context, config T, client discovery.Conn, server grpc.ServiceRegistrar) error, @@ -49,10 +55,6 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf conf.InitNotification(notification) } - if discovery.Enable == conf.Standalone { - return nil - } - options = append(options, mw.GrpcServer()) registerIP, err := network.GetRpcRegisterIP(registerIP) @@ -83,7 +85,7 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf watchConfigNames = append(watchConfigNames, conf.LogConfigFileName) - client, err := kdisc.NewDiscoveryRegister(discovery, watchServiceNames) + client, err := kdisc.NewDiscoveryRegister(disc, watchServiceNames) if err != nil { return err } @@ -116,15 +118,20 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf if err != nil { return err } - if err := client.Register(ctx, "prometheus_"+rpcRegisterName, registerIP, prometheusPort); err != nil { + log.ZDebug(ctx, "prometheus start", "addr", prometheusListener.Addr(), "rpcRegisterName", rpcRegisterName) + target, err := jsonutil.JsonMarshal(prommetrics.BuildDefaultTarget(registerIP, prometheusPort)) + if err != nil { return err } - - cs := prommetrics.GetGrpcCusMetrics(rpcRegisterName, discovery) + if err := client.SetKey(ctx, prommetrics.BuildDiscoveryKey(prommetrics.APIKeyName), target); err != nil { + if !errors.Is(err, discovery.ErrNotSupported) { + return err + } + } go func() { - err := prommetrics.RpcInit(cs, prometheusListener) + err := prommetrics.Start(prometheusListener) if err == nil { - err = fmt.Errorf("serve end") + err = fmt.Errorf("listener done") } cancel(fmt.Errorf("prommetrics %s %w", rpcRegisterName, err)) }() diff --git a/pkg/dbbuild/builder.go b/pkg/dbbuild/builder.go new file mode 100644 index 000000000..7712b8117 --- /dev/null +++ b/pkg/dbbuild/builder.go @@ -0,0 +1,25 @@ +package dbbuild + +import ( + "context" + + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/tools/db/mongoutil" + "github.com/redis/go-redis/v9" +) + +type Builder interface { + Mongo(ctx context.Context) (*mongoutil.Client, error) + Redis(ctx context.Context) (redis.UniversalClient, error) +} + +func NewBuilder(mongoConf *config.Mongo, redisConf *config.Redis) Builder { + if config.Standalone() { + globalStandalone.setConfig(mongoConf, redisConf) + return globalStandalone + } + return µservices{ + mongo: mongoConf, + redis: redisConf, + } +} diff --git a/pkg/dbbuild/microservices.go b/pkg/dbbuild/microservices.go new file mode 100644 index 000000000..f96760b1f --- /dev/null +++ b/pkg/dbbuild/microservices.go @@ -0,0 +1,26 @@ +package dbbuild + +import ( + "context" + + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/tools/db/mongoutil" + "github.com/openimsdk/tools/db/redisutil" + "github.com/redis/go-redis/v9" +) + +type microservices struct { + mongo *config.Mongo + redis *config.Redis +} + +func (x *microservices) Mongo(ctx context.Context) (*mongoutil.Client, error) { + return mongoutil.NewMongoDB(ctx, x.mongo.Build()) +} + +func (x *microservices) Redis(ctx context.Context) (redis.UniversalClient, error) { + if x.redis.Disable { + return nil, nil + } + return redisutil.NewRedisClient(ctx, x.redis.Build()) +} diff --git a/pkg/dbbuild/standalone.go b/pkg/dbbuild/standalone.go new file mode 100644 index 000000000..37bd377de --- /dev/null +++ b/pkg/dbbuild/standalone.go @@ -0,0 +1,76 @@ +package dbbuild + +import ( + "context" + "sync" + + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/tools/db/mongoutil" + "github.com/openimsdk/tools/db/redisutil" + "github.com/redis/go-redis/v9" +) + +const ( + standaloneMongo = "mongo" + standaloneRedis = "redis" +) + +var globalStandalone = &standalone{} + +type standaloneConn[C any] struct { + Conn C + Err error +} + +func (x *standaloneConn[C]) result() (C, error) { + return x.Conn, x.Err +} + +type standalone struct { + lock sync.Mutex + mongo *config.Mongo + redis *config.Redis + conn map[string]any +} + +func (x *standalone) setConfig(mongoConf *config.Mongo, redisConf *config.Redis) { + x.lock.Lock() + defer x.lock.Unlock() + x.mongo = mongoConf + x.redis = redisConf +} + +func (x *standalone) Mongo(ctx context.Context) (*mongoutil.Client, error) { + x.lock.Lock() + defer x.lock.Unlock() + if x.conn == nil { + x.conn = make(map[string]any) + } + v, ok := x.conn[standaloneMongo] + if !ok { + var val standaloneConn[*mongoutil.Client] + val.Conn, val.Err = mongoutil.NewMongoDB(ctx, x.mongo.Build()) + v = &val + x.conn[standaloneMongo] = v + } + return v.(*standaloneConn[*mongoutil.Client]).result() +} + +func (x *standalone) Redis(ctx context.Context) (redis.UniversalClient, error) { + x.lock.Lock() + defer x.lock.Unlock() + if x.redis.Disable { + return nil, nil + } + if x.conn == nil { + x.conn = make(map[string]any) + } + v, ok := x.conn[standaloneRedis] + if !ok { + var val standaloneConn[redis.UniversalClient] + val.Conn, val.Err = redisutil.NewRedisClient(ctx, x.redis.Build()) + v = &val + x.conn[standaloneRedis] = v + } + return v.(*standaloneConn[redis.UniversalClient]).result() +} diff --git a/pkg/mqbuild/builder.go b/pkg/mqbuild/builder.go index 6c369f23a..938159372 100644 --- a/pkg/mqbuild/builder.go +++ b/pkg/mqbuild/builder.go @@ -15,8 +15,8 @@ type Builder interface { GetTopicConsumer(ctx context.Context, topic string) (mq.Consumer, error) } -func NewBuilder(discovery *config.Discovery, kafka *config.Kafka) Builder { - if discovery.Enable == config.Standalone { +func NewBuilder(kafka *config.Kafka) Builder { + if config.Standalone() { return standaloneBuilder{} } return &kafkaBuilder{ diff --git a/pkg/rpccache/online.go b/pkg/rpccache/online.go index b5308bbe8..959b054a2 100644 --- a/pkg/rpccache/online.go +++ b/pkg/rpccache/online.go @@ -3,15 +3,16 @@ package rpccache import ( "context" "fmt" - "github.com/openimsdk/open-im-server/v3/pkg/rpcli" - "github.com/openimsdk/protocol/constant" - "github.com/openimsdk/protocol/user" "math/rand" "strconv" "sync" "sync/atomic" "time" + "github.com/openimsdk/open-im-server/v3/pkg/rpcli" + "github.com/openimsdk/protocol/constant" + "github.com/openimsdk/protocol/user" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/localcache" "github.com/openimsdk/open-im-server/v3/pkg/localcache/lru" @@ -51,10 +52,11 @@ func NewOnlineCache(client *rpcli.UserClient, group *GroupLocalCache, rdb redis. x.CurrentPhase.Store(DoSubscribeOver) x.Cond.Broadcast() } - - go func() { - x.doSubscribe(ctx, rdb, fn) - }() + if rdb != nil { + go func() { + x.doSubscribe(ctx, rdb, fn) + }() + } return x, nil }