From 1b62cb1b22df20465c336c8613718ff59d8fe635 Mon Sep 17 00:00:00 2001 From: Gordon <46924906+FGadvancer@users.noreply.github.com> Date: Tue, 19 Mar 2024 16:41:09 +0800 Subject: [PATCH] refactor: unified naming for module startup functions. --- internal/api/route.go | 7 ++++--- internal/msggateway/hub_server.go | 11 ++++++----- internal/msggateway/init.go | 10 ++++++---- internal/msgtransfer/init.go | 14 ++++++++------ internal/push/push_rpc_server.go | 4 ++-- internal/rpc/auth/auth.go | 4 ++-- internal/rpc/conversation/conversaion.go | 6 +++--- internal/rpc/friend/friend.go | 6 +++--- internal/rpc/group/group.go | 6 +++--- internal/rpc/msg/server.go | 7 ++++--- internal/rpc/third/third.go | 13 +++++++------ internal/rpc/user/user.go | 6 +++--- internal/tools/cron_task.go | 16 ++++++++-------- internal/tools/msg.go | 2 +- pkg/common/cmd/api.go | 6 +++++- pkg/common/cmd/cron_task.go | 7 +++++-- pkg/common/cmd/msg_gateway.go | 8 ++++++-- pkg/common/cmd/msg_transfer.go | 8 ++++++-- pkg/common/cmd/root.go | 3 +++ pkg/common/cmd/rpc.go | 4 ++-- pkg/common/db/cache/init_redis.go | 4 +++- pkg/common/db/controller/msg_test.go | 4 ++-- pkg/common/db/unrelation/mongo.go | 8 +++++--- pkg/common/startrpc/start.go | 4 ++-- 24 files changed, 99 insertions(+), 69 deletions(-) diff --git a/internal/api/route.go b/internal/api/route.go index 25406cfc6..51c735f8d 100644 --- a/internal/api/route.go +++ b/internal/api/route.go @@ -50,14 +50,14 @@ import ( "google.golang.org/grpc/credentials/insecure" ) -func Start(config *config.GlobalConfig, port int, proPort int) error { +func Start(ctx context.Context, config *config.GlobalConfig, port int, proPort int) error { if port == 0 || proPort == 0 { err := errors.New("port or proPort is empty") wrappedErr := errs.WrapMsg(err, "validation error", "port", port, "proPort", proPort) return wrappedErr } - rdb, err := cache.NewRedis(&config.Redis) + rdb, err := cache.NewRedis(ctx, &config.Redis) if err != nil { return err } @@ -104,7 +104,8 @@ func Start(config *config.GlobalConfig, port int, proPort int) error { } server := http.Server{Addr: address, Handler: router} - + log.CInfo(ctx, "api server starting", "address", address, "apiPort", port, + "prometheusPort", proPort) go func() { err = server.ListenAndServe() if err != nil && err != http.ErrServerClosed { diff --git a/internal/msggateway/hub_server.go b/internal/msggateway/hub_server.go index 97f63907c..f92682fb4 100644 --- a/internal/msggateway/hub_server.go +++ b/internal/msggateway/hub_server.go @@ -30,8 +30,8 @@ import ( "google.golang.org/grpc" ) -func (s *Server) InitServer(config *config.GlobalConfig, disCov discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { - rdb, err := cache.NewRedis(&config.Redis) +func (s *Server) InitServer(ctx context.Context, config *config.GlobalConfig, disCov discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { + rdb, err := cache.NewRedis(ctx, &config.Redis) if err != nil { return err } @@ -43,8 +43,8 @@ func (s *Server) InitServer(config *config.GlobalConfig, disCov discoveryregistr return nil } -func (s *Server) Start(conf *config.GlobalConfig) error { - return startrpc.Start(context.Background(), +func (s *Server) Start(ctx context.Context, conf *config.GlobalConfig) error { + return startrpc.Start(ctx, s.rpcPort, conf.RpcRegisterName.OpenImMessageGatewayName, s.prometheusPort, @@ -123,7 +123,8 @@ func (s *Server) GetUsersOnlineStatus( } func (s *Server) OnlineBatchPushOneMsg(ctx context.Context, req *msggateway.OnlineBatchPushOneMsgReq) (*msggateway.OnlineBatchPushOneMsgResp, error) { - panic("implement me") + //todo implement + return nil, nil } func (s *Server) SuperGroupOnlineBatchPushOneMsg(ctx context.Context, req *msggateway.OnlineBatchPushOneMsgReq, diff --git a/internal/msggateway/init.go b/internal/msggateway/init.go index 17281ecbf..0efeed708 100644 --- a/internal/msggateway/init.go +++ b/internal/msggateway/init.go @@ -15,15 +15,17 @@ package msggateway import ( - "fmt" + "context" + "github.com/OpenIMSDK/tools/log" "time" "github.com/openimsdk/open-im-server/v3/pkg/common/config" ) // Start run ws server. -func Start(conf *config.GlobalConfig, rpcPort, wsPort, prometheusPort int) error { - fmt.Println("start rpc/msg_gateway server, port: ", rpcPort, wsPort, prometheusPort, ", OpenIM version: ", config.Version) +func Start(ctx context.Context, conf *config.GlobalConfig, rpcPort, wsPort, prometheusPort int) error { + log.CInfo(ctx, "msg_gateway server starting", "rpcPort", rpcPort, "wsPort", wsPort, + "prometheusPort", prometheusPort) longServer, err := NewWsServer( conf, WithPort(wsPort), @@ -39,7 +41,7 @@ func Start(conf *config.GlobalConfig, rpcPort, wsPort, prometheusPort int) error hubServer := NewServer(rpcPort, prometheusPort, longServer, conf) netDone := make(chan error) go func() { - err = hubServer.Start(conf) + err = hubServer.Start(ctx, conf) netDone <- err }() return hubServer.LongConnServer.Run(netDone) diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 15e2afabd..731ea418a 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "github.com/OpenIMSDK/tools/log" "net/http" "os" "os/signal" @@ -52,13 +53,13 @@ type MsgTransfer struct { cancel context.CancelFunc } -func Start(config *config.GlobalConfig, prometheusPort int) error { - rdb, err := cache.NewRedis(&config.Redis) +func Start(ctx context.Context, config *config.GlobalConfig, prometheusPort, index int) error { + rdb, err := cache.NewRedis(ctx, &config.Redis) if err != nil { return err } - mongo, err := unrelation.NewMongo(&config.Mongo) + mongo, err := unrelation.NewMongoDB(ctx, &config.Mongo) if err != nil { return err } @@ -88,7 +89,7 @@ func Start(config *config.GlobalConfig, prometheusPort int) error { if err != nil { return err } - return msgTransfer.Start(prometheusPort, config) + return msgTransfer.Start(ctx, prometheusPort, config, index) } func NewMsgTransfer(kafkaConf *config.Kafka, msgDatabase controller.CommonMsgDatabase, conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) (*MsgTransfer, error) { @@ -107,8 +108,9 @@ func NewMsgTransfer(kafkaConf *config.Kafka, msgDatabase controller.CommonMsgDat }, nil } -func (m *MsgTransfer) Start(prometheusPort int, config *config.GlobalConfig) error { - fmt.Println("start msg transfer", "prometheusPort:", prometheusPort) +func (m *MsgTransfer) Start(ctx context.Context, prometheusPort int, config *config.GlobalConfig, index int) error { + log.CInfo(ctx, "msg_transfer server starting", + "prometheusPort", prometheusPort, "index", index) if prometheusPort <= 0 { return errs.WrapMsg(errors.New("invalid prometheus port"), "prometheusPort validation failed", "providedPort", prometheusPort) } diff --git a/internal/push/push_rpc_server.go b/internal/push/push_rpc_server.go index 23a20f9d9..cafef3e06 100644 --- a/internal/push/push_rpc_server.go +++ b/internal/push/push_rpc_server.go @@ -34,8 +34,8 @@ type pushServer struct { pusher *Pusher } -func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { - rdb, err := cache.NewRedis(&config.Redis) +func Start(ctx context.Context, config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { + rdb, err := cache.NewRedis(ctx, &config.Redis) if err != nil { return err } diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index 3be3485b4..713f31ed5 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -41,8 +41,8 @@ type authServer struct { config *config.GlobalConfig } -func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { - rdb, err := cache.NewRedis(&config.Redis) +func Start(ctx context.Context, config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { + rdb, err := cache.NewRedis(ctx, &config.Redis) if err != nil { return err } diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index 82ff859f0..45fbbde21 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -47,12 +47,12 @@ type conversationServer struct { conversationNotificationSender *notification.ConversationNotificationSender } -func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { - rdb, err := cache.NewRedis(&config.Redis) +func Start(ctx context.Context, config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { + rdb, err := cache.NewRedis(ctx, &config.Redis) if err != nil { return err } - mongo, err := unrelation.NewMongo(&config.Mongo) + mongo, err := unrelation.NewMongoDB(ctx, &config.Mongo) if err != nil { return err } diff --git a/internal/rpc/friend/friend.go b/internal/rpc/friend/friend.go index acce77277..03fd0d6c4 100644 --- a/internal/rpc/friend/friend.go +++ b/internal/rpc/friend/friend.go @@ -47,15 +47,15 @@ type friendServer struct { config *config.GlobalConfig } -func Start(config *config.GlobalConfig, client registry.SvcDiscoveryRegistry, server *grpc.Server) error { +func Start(ctx context.Context, config *config.GlobalConfig, client registry.SvcDiscoveryRegistry, server *grpc.Server) error { // Initialize MongoDB - mongo, err := unrelation.NewMongo(&config.Mongo) + mongo, err := unrelation.NewMongoDB(ctx, &config.Mongo) if err != nil { return err } // Initialize Redis - rdb, err := cache.NewRedis(&config.Redis) + rdb, err := cache.NewRedis(ctx, &config.Redis) if err != nil { return err } diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index fc5cafd76..e3a83595d 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -60,12 +60,12 @@ type groupServer struct { config *config.GlobalConfig } -func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { - mongo, err := unrelation.NewMongo(&config.Mongo) +func Start(ctx context.Context, config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { + mongo, err := unrelation.NewMongoDB(ctx, &config.Mongo) if err != nil { return err } - rdb, err := cache.NewRedis(&config.Redis) + rdb, err := cache.NewRedis(ctx, &config.Redis) if err != nil { return err } diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index ccca08e5f..64e7be1b5 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -15,6 +15,7 @@ package msg import ( + "context" "github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/protocol/conversation" "github.com/OpenIMSDK/protocol/msg" @@ -62,12 +63,12 @@ func (m *msgServer) addInterceptorHandler(interceptorFunc ...MessageInterceptorF // return nil //} -func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { - rdb, err := cache.NewRedis(&config.Redis) +func Start(ctx context.Context, config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { + rdb, err := cache.NewRedis(ctx, &config.Redis) if err != nil { return err } - mongo, err := unrelation.NewMongo(&config.Mongo) + mongo, err := unrelation.NewMongoDB(ctx, &config.Mongo) if err != nil { return err } diff --git a/internal/rpc/third/third.go b/internal/rpc/third/third.go index c57490710..d76920248 100644 --- a/internal/rpc/third/third.go +++ b/internal/rpc/third/third.go @@ -36,8 +36,12 @@ import ( "google.golang.org/grpc" ) -func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { - mongo, err := unrelation.NewMongo(&config.Mongo) +func Start(ctx context.Context, config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { + rdb, err := cache.NewRedis(ctx, &config.Redis) + if err != nil { + return err + } + mongo, err := unrelation.NewMongoDB(ctx, &config.Mongo) if err != nil { return err } @@ -60,10 +64,7 @@ func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryReg apiURL += "/" } apiURL += "object/" - rdb, err := cache.NewRedis(&config.Redis) - if err != nil { - return err - } + // Select the oss method according to the profile policy enable := config.Object.Enable var o s3.Interface diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index 48fcb61bf..6a5e4f0e0 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -59,12 +59,12 @@ func (s *userServer) GetGroupOnlineUser(ctx context.Context, req *pbuser.GetGrou panic("implement me") } -func Start(config *config.GlobalConfig, client registry.SvcDiscoveryRegistry, server *grpc.Server) error { - rdb, err := cache.NewRedis(&config.Redis) +func Start(ctx context.Context, config *config.GlobalConfig, client registry.SvcDiscoveryRegistry, server *grpc.Server) error { + rdb, err := cache.NewRedis(ctx, &config.Redis) if err != nil { return err } - mongo, err := unrelation.NewMongo(&config.Mongo) + mongo, err := unrelation.NewMongoDB(ctx, &config.Mongo) if err != nil { return err } diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index a5af9be26..82a4c0344 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -16,7 +16,7 @@ package tools import ( "context" - "fmt" + "github.com/OpenIMSDK/tools/log" "os" "os/signal" "syscall" @@ -29,8 +29,9 @@ import ( "github.com/robfig/cron/v3" ) -func StartTask(config *config.GlobalConfig) error { - fmt.Println("cron task start, config", config.ChatRecordsClearTime) +func StartTask(ctx context.Context, config *config.GlobalConfig) error { + + log.CInfo(ctx, "cron task server starting", "chatRecordsClearTime", config.ChatRecordsClearTime, "msgDestructTime", config.MsgDestructTime) msgTool, err := InitMsgTool(config) if err != nil { @@ -39,20 +40,19 @@ func StartTask(config *config.GlobalConfig) error { msgTool.convertTools() - rdb, err := cache.NewRedis(&config.Redis) + rdb, err := cache.NewRedis(ctx, &config.Redis) if err != nil { return err } // register cron tasks var crontab = cron.New() - fmt.Printf("Start chatRecordsClearTime cron task, cron config: %s\n", config.ChatRecordsClearTime) + _, err = crontab.AddFunc(config.ChatRecordsClearTime, cronWrapFunc(config, rdb, "cron_clear_msg_and_fix_seq", msgTool.AllConversationClearMsgAndFixSeq)) if err != nil { return errs.Wrap(err) } - fmt.Printf("Start msgDestruct cron task, cron config: %s\n", config.MsgDestructTime) _, err = crontab.AddFunc(config.MsgDestructTime, cronWrapFunc(config, rdb, "cron_conversations_destruct_msgs", msgTool.ConversationsDestructMsgs)) if err != nil { return errs.WrapMsg(err, "cron_conversations_destruct_msgs") @@ -66,10 +66,10 @@ func StartTask(config *config.GlobalConfig) error { <-sigs // stop crontab, Wait for the running task to exit. - ctx := crontab.Stop() + cronCtx := crontab.Stop() select { - case <-ctx.Done(): + case <-cronCtx.Done(): // graceful exit case <-time.After(15 * time.Second): diff --git a/internal/tools/msg.go b/internal/tools/msg.go index 3dda2e441..b5adffbba 100644 --- a/internal/tools/msg.go +++ b/internal/tools/msg.go @@ -68,7 +68,7 @@ func InitMsgTool(config *config.GlobalConfig) (*MsgTool, error) { if err != nil { return nil, err } - mongo, err := unrelation.NewMongo(&config.Mongo) + mongo, err := unrelation.NewMongoDB(&config.Mongo) if err != nil { return nil, err } diff --git a/pkg/common/cmd/api.go b/pkg/common/cmd/api.go index b8c488b0e..70a92878b 100644 --- a/pkg/common/cmd/api.go +++ b/pkg/common/cmd/api.go @@ -15,18 +15,22 @@ package cmd import ( + "context" "github.com/OpenIMSDK/protocol/constant" "github.com/openimsdk/open-im-server/v3/internal/api" + config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" "github.com/spf13/cobra" ) type ApiCmd struct { *RootCmd + ctx context.Context } func NewApiCmd(name string) *ApiCmd { ret := &ApiCmd{RootCmd: NewRootCmd(genutil.GetProcessName(), name)} + ret.ctx = context.WithValue(context.Background(), "version", config2.Version) ret.SetRootCmdPt(ret) ret.addPreRun() ret.addRunE() @@ -42,7 +46,7 @@ func (a *ApiCmd) addPreRun() { func (a *ApiCmd) addRunE() { a.Command.RunE = func(cmd *cobra.Command, args []string) error { - return api.Start(a.config, a.port, a.prometheusPort) + return api.Start(a.ctx, a.config, a.port, a.prometheusPort) } } diff --git a/pkg/common/cmd/cron_task.go b/pkg/common/cmd/cron_task.go index 11edd9070..ffe85ef4a 100644 --- a/pkg/common/cmd/cron_task.go +++ b/pkg/common/cmd/cron_task.go @@ -15,6 +15,7 @@ package cmd import ( + "context" "github.com/openimsdk/open-im-server/v3/internal/tools" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" @@ -23,12 +24,14 @@ import ( type CronTaskCmd struct { *RootCmd - initFunc func(config *config.GlobalConfig) error + initFunc func(ctx context.Context, config *config.GlobalConfig) error + ctx context.Context } func NewCronTaskCmd(name string) *CronTaskCmd { ret := &CronTaskCmd{RootCmd: NewRootCmd(genutil.GetProcessName(), name, WithCronTaskLogName()), initFunc: tools.StartTask} + ret.ctx = context.WithValue(context.Background(), "version", config.Version) ret.addRunE() ret.SetRootCmdPt(ret) return ret @@ -36,7 +39,7 @@ func NewCronTaskCmd(name string) *CronTaskCmd { func (c *CronTaskCmd) addRunE() { c.Command.RunE = func(cmd *cobra.Command, args []string) error { - return c.initFunc(c.config) + return c.initFunc(c.ctx, c.config) } } diff --git a/pkg/common/cmd/msg_gateway.go b/pkg/common/cmd/msg_gateway.go index b2244a2d0..a766292b8 100644 --- a/pkg/common/cmd/msg_gateway.go +++ b/pkg/common/cmd/msg_gateway.go @@ -15,6 +15,8 @@ package cmd import ( + "context" + config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config" "log" "github.com/OpenIMSDK/protocol/constant" @@ -25,10 +27,12 @@ import ( type MsgGatewayCmd struct { *RootCmd + ctx context.Context } func NewMsgGatewayCmd(name string) *MsgGatewayCmd { - ret := &MsgGatewayCmd{NewRootCmd(genutil.GetProcessName(), name)} + ret := &MsgGatewayCmd{RootCmd: NewRootCmd(genutil.GetProcessName(), name)} + ret.ctx = context.WithValue(context.Background(), "version", config2.Version) ret.addRunE() ret.SetRootCmdPt(ret) return ret @@ -51,7 +55,7 @@ func (m *MsgGatewayCmd) getWsPortFlag(cmd *cobra.Command) int { func (m *MsgGatewayCmd) addRunE() { m.Command.RunE = func(cmd *cobra.Command, args []string) error { - return msggateway.Start(m.config, m.getPortFlag(cmd), m.getWsPortFlag(cmd), m.getPrometheusPortFlag(cmd)) + return msggateway.Start(m.ctx, m.config, m.getPortFlag(cmd), m.getWsPortFlag(cmd), m.getPrometheusPortFlag(cmd)) } } diff --git a/pkg/common/cmd/msg_transfer.go b/pkg/common/cmd/msg_transfer.go index 386caf67a..b327fcad6 100644 --- a/pkg/common/cmd/msg_transfer.go +++ b/pkg/common/cmd/msg_transfer.go @@ -15,7 +15,9 @@ package cmd import ( + "context" "fmt" + config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/OpenIMSDK/protocol/constant" "github.com/openimsdk/open-im-server/v3/internal/msgtransfer" @@ -25,10 +27,12 @@ import ( type MsgTransferCmd struct { *RootCmd + ctx context.Context } func NewMsgTransferCmd(name string) *MsgTransferCmd { - ret := &MsgTransferCmd{NewRootCmd(genutil.GetProcessName(), name)} + ret := &MsgTransferCmd{RootCmd: NewRootCmd(genutil.GetProcessName(), name)} + ret.ctx = context.WithValue(context.Background(), "version", config2.Version) ret.addRunE() ret.SetRootCmdPt(ret) return ret @@ -36,7 +40,7 @@ func NewMsgTransferCmd(name string) *MsgTransferCmd { func (m *MsgTransferCmd) addRunE() { m.Command.RunE = func(cmd *cobra.Command, args []string) error { - return msgtransfer.Start(m.config, m.getPrometheusPortFlag(cmd)) + return msgtransfer.Start(m.ctx, m.config, m.getPrometheusPortFlag(cmd), m.getTransferProgressFlagValue()) } } diff --git a/pkg/common/cmd/root.go b/pkg/common/cmd/root.go index 603c0bfd0..54be8ef2f 100644 --- a/pkg/common/cmd/root.go +++ b/pkg/common/cmd/root.go @@ -62,11 +62,14 @@ func WithLogName(logName string) func(*CmdOpts) { func NewRootCmd(processName, name string, opts ...func(*CmdOpts)) *RootCmd { rootCmd := &RootCmd{processName: processName, Name: name, config: config.NewGlobalConfig()} cmd := cobra.Command{ + Use: "Start openIM application", Short: fmt.Sprintf(`Start %s `, name), Long: fmt.Sprintf(`Start %s `, name), PersistentPreRunE: func(cmd *cobra.Command, args []string) error { return rootCmd.persistentPreRun(cmd, opts...) }, + SilenceUsage: true, + SilenceErrors: true, } rootCmd.Command = cmd rootCmd.addConfFlag() diff --git a/pkg/common/cmd/rpc.go b/pkg/common/cmd/rpc.go index a05491c24..2e593053e 100644 --- a/pkg/common/cmd/rpc.go +++ b/pkg/common/cmd/rpc.go @@ -28,7 +28,7 @@ import ( "google.golang.org/grpc" ) -type rpcInitFuc func(config *config2.GlobalConfig, disCov discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error +type rpcInitFuc func(ctx context.Context, config *config2.GlobalConfig, disCov discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error type RpcCmd struct { *RootCmd @@ -68,7 +68,7 @@ func (a *RpcCmd) Exec() error { return a.Execute() } -func (a *RpcCmd) StartSvr(name string, rpcFn func(config *config2.GlobalConfig, disCov discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error) error { +func (a *RpcCmd) StartSvr(name string, rpcFn func(ctx context.Context, config *config2.GlobalConfig, disCov discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error) error { if a.GetPortFlag() == 0 { return errs.Wrap(errors.New("port is required")) } diff --git a/pkg/common/db/cache/init_redis.go b/pkg/common/db/cache/init_redis.go index 7e642c106..c58647549 100644 --- a/pkg/common/db/cache/init_redis.go +++ b/pkg/common/db/cache/init_redis.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "github.com/OpenIMSDK/tools/log" "os" "strings" "time" @@ -38,7 +39,7 @@ const ( ) // NewRedis Initialize redis connection. -func NewRedis(redisConf *config.Redis) (redis.UniversalClient, error) { +func NewRedis(ctx context.Context, redisConf *config.Redis) (redis.UniversalClient, error) { if redisClient != nil { return redisClient, nil } @@ -80,6 +81,7 @@ func NewRedis(redisConf *config.Redis) (redis.UniversalClient, error) { return nil, errs.WrapMsg(err, errMsg) } redisClient = rdb + log.CInfo(ctx, "redis connected successfully", "address", redisConf.Address, "username", redisConf.Username, "password", redisConf.Password, "clusterMode", redisConf.ClusterMode, "enablePipeline", redisConf.EnablePipeline) return rdb, err } diff --git a/pkg/common/db/controller/msg_test.go b/pkg/common/db/controller/msg_test.go index 1d86dcfba..4e64211e0 100644 --- a/pkg/common/db/controller/msg_test.go +++ b/pkg/common/db/controller/msg_test.go @@ -44,7 +44,7 @@ func Test_BatchInsertChat2DB(t *testing.T) { conf.RetainChatRecords = 3650 conf.ChatRecordsClearTime = "0 2 * * 3" - mongo, err := unrelation.NewMongo(&conf.Mongo) + mongo, err := unrelation.NewMongoDB(context.Background(), &conf.Mongo) if err != nil { t.Fatal(err) } @@ -156,7 +156,7 @@ func GetDB() *commonMsgDatabase { conf.RetainChatRecords = 3650 conf.ChatRecordsClearTime = "0 2 * * 3" - mongo, err := unrelation.NewMongo(&conf.Mongo) + mongo, err := unrelation.NewMongoDB(context.Background(), &conf.Mongo) if err != nil { panic(err) } diff --git a/pkg/common/db/unrelation/mongo.go b/pkg/common/db/unrelation/mongo.go index 7549975fe..d0f013c91 100644 --- a/pkg/common/db/unrelation/mongo.go +++ b/pkg/common/db/unrelation/mongo.go @@ -17,6 +17,7 @@ package unrelation import ( "context" "fmt" + "github.com/OpenIMSDK/tools/log" "os" "strings" "time" @@ -40,8 +41,8 @@ type Mongo struct { mongoConf *config.Mongo } -// NewMongo Initialize MongoDB connection. -func NewMongo(mongoConf *config.Mongo) (*Mongo, error) { +// NewMongoDB Initialize MongoDB connection. +func NewMongoDB(ctx context.Context, mongoConf *config.Mongo) (*Mongo, error) { specialerror.AddReplace(mongo.ErrNoDocuments, errs.ErrRecordNotFound) uri := buildMongoURI(mongoConf) @@ -50,13 +51,14 @@ func NewMongo(mongoConf *config.Mongo) (*Mongo, error) { // Retry connecting to MongoDB for i := 0; i <= maxRetry; i++ { - ctx, cancel := context.WithTimeout(context.Background(), mongoConnTimeout) + ctx, cancel := context.WithTimeout(ctx, mongoConnTimeout) defer cancel() mongoClient, err = mongo.Connect(ctx, options.Client().ApplyURI(uri)) if err == nil { if err = mongoClient.Ping(ctx, nil); err != nil { return nil, errs.WrapMsg(err, uri) } + log.CInfo(ctx, "MongoDB connected", "uri", uri) return &Mongo{db: mongoClient, mongoConf: mongoConf}, nil } if shouldRetry(err) { diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index aac88bab0..6e521c2aa 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -52,7 +52,7 @@ func Start( rpcRegisterName string, prometheusPort int, config *config2.GlobalConfig, - rpcFn func(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error, + rpcFn func(ctx context.Context, config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption, ) error { log.CInfo(ctx, "rpc server starting", "rpcRegisterName", rpcRegisterName, "rpcPort", rpcPort, @@ -96,7 +96,7 @@ func Start( once.Do(srv.GracefulStop) }() - err = rpcFn(config, client, srv) + err = rpcFn(ctx, config, client, srv) if err != nil { return err }