refactor: unified naming for module startup functions.

This commit is contained in:
Gordon 2024-03-19 16:41:09 +08:00
parent 1132bce252
commit 1b62cb1b22
24 changed files with 99 additions and 69 deletions

View File

@ -50,14 +50,14 @@ import (
"google.golang.org/grpc/credentials/insecure"
)
func Start(config *config.GlobalConfig, port int, proPort int) error {
func Start(ctx context.Context, config *config.GlobalConfig, port int, proPort int) error {
if port == 0 || proPort == 0 {
err := errors.New("port or proPort is empty")
wrappedErr := errs.WrapMsg(err, "validation error", "port", port, "proPort", proPort)
return wrappedErr
}
rdb, err := cache.NewRedis(&config.Redis)
rdb, err := cache.NewRedis(ctx, &config.Redis)
if err != nil {
return err
}
@ -104,7 +104,8 @@ func Start(config *config.GlobalConfig, port int, proPort int) error {
}
server := http.Server{Addr: address, Handler: router}
log.CInfo(ctx, "api server starting", "address", address, "apiPort", port,
"prometheusPort", proPort)
go func() {
err = server.ListenAndServe()
if err != nil && err != http.ErrServerClosed {

View File

@ -30,8 +30,8 @@ import (
"google.golang.org/grpc"
)
func (s *Server) InitServer(config *config.GlobalConfig, disCov discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := cache.NewRedis(&config.Redis)
func (s *Server) InitServer(ctx context.Context, config *config.GlobalConfig, disCov discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := cache.NewRedis(ctx, &config.Redis)
if err != nil {
return err
}
@ -43,8 +43,8 @@ func (s *Server) InitServer(config *config.GlobalConfig, disCov discoveryregistr
return nil
}
func (s *Server) Start(conf *config.GlobalConfig) error {
return startrpc.Start(context.Background(),
func (s *Server) Start(ctx context.Context, conf *config.GlobalConfig) error {
return startrpc.Start(ctx,
s.rpcPort,
conf.RpcRegisterName.OpenImMessageGatewayName,
s.prometheusPort,
@ -123,7 +123,8 @@ func (s *Server) GetUsersOnlineStatus(
}
func (s *Server) OnlineBatchPushOneMsg(ctx context.Context, req *msggateway.OnlineBatchPushOneMsgReq) (*msggateway.OnlineBatchPushOneMsgResp, error) {
panic("implement me")
//todo implement
return nil, nil
}
func (s *Server) SuperGroupOnlineBatchPushOneMsg(ctx context.Context, req *msggateway.OnlineBatchPushOneMsgReq,

View File

@ -15,15 +15,17 @@
package msggateway
import (
"fmt"
"context"
"github.com/OpenIMSDK/tools/log"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
)
// Start run ws server.
func Start(conf *config.GlobalConfig, rpcPort, wsPort, prometheusPort int) error {
fmt.Println("start rpc/msg_gateway server, port: ", rpcPort, wsPort, prometheusPort, ", OpenIM version: ", config.Version)
func Start(ctx context.Context, conf *config.GlobalConfig, rpcPort, wsPort, prometheusPort int) error {
log.CInfo(ctx, "msg_gateway server starting", "rpcPort", rpcPort, "wsPort", wsPort,
"prometheusPort", prometheusPort)
longServer, err := NewWsServer(
conf,
WithPort(wsPort),
@ -39,7 +41,7 @@ func Start(conf *config.GlobalConfig, rpcPort, wsPort, prometheusPort int) error
hubServer := NewServer(rpcPort, prometheusPort, longServer, conf)
netDone := make(chan error)
go func() {
err = hubServer.Start(conf)
err = hubServer.Start(ctx, conf)
netDone <- err
}()
return hubServer.LongConnServer.Run(netDone)

View File

@ -18,6 +18,7 @@ import (
"context"
"errors"
"fmt"
"github.com/OpenIMSDK/tools/log"
"net/http"
"os"
"os/signal"
@ -52,13 +53,13 @@ type MsgTransfer struct {
cancel context.CancelFunc
}
func Start(config *config.GlobalConfig, prometheusPort int) error {
rdb, err := cache.NewRedis(&config.Redis)
func Start(ctx context.Context, config *config.GlobalConfig, prometheusPort, index int) error {
rdb, err := cache.NewRedis(ctx, &config.Redis)
if err != nil {
return err
}
mongo, err := unrelation.NewMongo(&config.Mongo)
mongo, err := unrelation.NewMongoDB(ctx, &config.Mongo)
if err != nil {
return err
}
@ -88,7 +89,7 @@ func Start(config *config.GlobalConfig, prometheusPort int) error {
if err != nil {
return err
}
return msgTransfer.Start(prometheusPort, config)
return msgTransfer.Start(ctx, prometheusPort, config, index)
}
func NewMsgTransfer(kafkaConf *config.Kafka, msgDatabase controller.CommonMsgDatabase, conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) (*MsgTransfer, error) {
@ -107,8 +108,9 @@ func NewMsgTransfer(kafkaConf *config.Kafka, msgDatabase controller.CommonMsgDat
}, nil
}
func (m *MsgTransfer) Start(prometheusPort int, config *config.GlobalConfig) error {
fmt.Println("start msg transfer", "prometheusPort:", prometheusPort)
func (m *MsgTransfer) Start(ctx context.Context, prometheusPort int, config *config.GlobalConfig, index int) error {
log.CInfo(ctx, "msg_transfer server starting",
"prometheusPort", prometheusPort, "index", index)
if prometheusPort <= 0 {
return errs.WrapMsg(errors.New("invalid prometheus port"), "prometheusPort validation failed", "providedPort", prometheusPort)
}

View File

@ -34,8 +34,8 @@ type pushServer struct {
pusher *Pusher
}
func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := cache.NewRedis(&config.Redis)
func Start(ctx context.Context, config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := cache.NewRedis(ctx, &config.Redis)
if err != nil {
return err
}

View File

@ -41,8 +41,8 @@ type authServer struct {
config *config.GlobalConfig
}
func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := cache.NewRedis(&config.Redis)
func Start(ctx context.Context, config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := cache.NewRedis(ctx, &config.Redis)
if err != nil {
return err
}

View File

@ -47,12 +47,12 @@ type conversationServer struct {
conversationNotificationSender *notification.ConversationNotificationSender
}
func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := cache.NewRedis(&config.Redis)
func Start(ctx context.Context, config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := cache.NewRedis(ctx, &config.Redis)
if err != nil {
return err
}
mongo, err := unrelation.NewMongo(&config.Mongo)
mongo, err := unrelation.NewMongoDB(ctx, &config.Mongo)
if err != nil {
return err
}

View File

@ -47,15 +47,15 @@ type friendServer struct {
config *config.GlobalConfig
}
func Start(config *config.GlobalConfig, client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
func Start(ctx context.Context, config *config.GlobalConfig, client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
// Initialize MongoDB
mongo, err := unrelation.NewMongo(&config.Mongo)
mongo, err := unrelation.NewMongoDB(ctx, &config.Mongo)
if err != nil {
return err
}
// Initialize Redis
rdb, err := cache.NewRedis(&config.Redis)
rdb, err := cache.NewRedis(ctx, &config.Redis)
if err != nil {
return err
}

View File

@ -60,12 +60,12 @@ type groupServer struct {
config *config.GlobalConfig
}
func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
mongo, err := unrelation.NewMongo(&config.Mongo)
func Start(ctx context.Context, config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
mongo, err := unrelation.NewMongoDB(ctx, &config.Mongo)
if err != nil {
return err
}
rdb, err := cache.NewRedis(&config.Redis)
rdb, err := cache.NewRedis(ctx, &config.Redis)
if err != nil {
return err
}

View File

@ -15,6 +15,7 @@
package msg
import (
"context"
"github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/protocol/conversation"
"github.com/OpenIMSDK/protocol/msg"
@ -62,12 +63,12 @@ func (m *msgServer) addInterceptorHandler(interceptorFunc ...MessageInterceptorF
// return nil
//}
func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := cache.NewRedis(&config.Redis)
func Start(ctx context.Context, config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := cache.NewRedis(ctx, &config.Redis)
if err != nil {
return err
}
mongo, err := unrelation.NewMongo(&config.Mongo)
mongo, err := unrelation.NewMongoDB(ctx, &config.Mongo)
if err != nil {
return err
}

View File

@ -36,8 +36,12 @@ import (
"google.golang.org/grpc"
)
func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
mongo, err := unrelation.NewMongo(&config.Mongo)
func Start(ctx context.Context, config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := cache.NewRedis(ctx, &config.Redis)
if err != nil {
return err
}
mongo, err := unrelation.NewMongoDB(ctx, &config.Mongo)
if err != nil {
return err
}
@ -60,10 +64,7 @@ func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryReg
apiURL += "/"
}
apiURL += "object/"
rdb, err := cache.NewRedis(&config.Redis)
if err != nil {
return err
}
// Select the oss method according to the profile policy
enable := config.Object.Enable
var o s3.Interface

View File

@ -59,12 +59,12 @@ func (s *userServer) GetGroupOnlineUser(ctx context.Context, req *pbuser.GetGrou
panic("implement me")
}
func Start(config *config.GlobalConfig, client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := cache.NewRedis(&config.Redis)
func Start(ctx context.Context, config *config.GlobalConfig, client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := cache.NewRedis(ctx, &config.Redis)
if err != nil {
return err
}
mongo, err := unrelation.NewMongo(&config.Mongo)
mongo, err := unrelation.NewMongoDB(ctx, &config.Mongo)
if err != nil {
return err
}

View File

@ -16,7 +16,7 @@ package tools
import (
"context"
"fmt"
"github.com/OpenIMSDK/tools/log"
"os"
"os/signal"
"syscall"
@ -29,8 +29,9 @@ import (
"github.com/robfig/cron/v3"
)
func StartTask(config *config.GlobalConfig) error {
fmt.Println("cron task start, config", config.ChatRecordsClearTime)
func StartTask(ctx context.Context, config *config.GlobalConfig) error {
log.CInfo(ctx, "cron task server starting", "chatRecordsClearTime", config.ChatRecordsClearTime, "msgDestructTime", config.MsgDestructTime)
msgTool, err := InitMsgTool(config)
if err != nil {
@ -39,20 +40,19 @@ func StartTask(config *config.GlobalConfig) error {
msgTool.convertTools()
rdb, err := cache.NewRedis(&config.Redis)
rdb, err := cache.NewRedis(ctx, &config.Redis)
if err != nil {
return err
}
// register cron tasks
var crontab = cron.New()
fmt.Printf("Start chatRecordsClearTime cron task, cron config: %s\n", config.ChatRecordsClearTime)
_, err = crontab.AddFunc(config.ChatRecordsClearTime, cronWrapFunc(config, rdb, "cron_clear_msg_and_fix_seq", msgTool.AllConversationClearMsgAndFixSeq))
if err != nil {
return errs.Wrap(err)
}
fmt.Printf("Start msgDestruct cron task, cron config: %s\n", config.MsgDestructTime)
_, err = crontab.AddFunc(config.MsgDestructTime, cronWrapFunc(config, rdb, "cron_conversations_destruct_msgs", msgTool.ConversationsDestructMsgs))
if err != nil {
return errs.WrapMsg(err, "cron_conversations_destruct_msgs")
@ -66,10 +66,10 @@ func StartTask(config *config.GlobalConfig) error {
<-sigs
// stop crontab, Wait for the running task to exit.
ctx := crontab.Stop()
cronCtx := crontab.Stop()
select {
case <-ctx.Done():
case <-cronCtx.Done():
// graceful exit
case <-time.After(15 * time.Second):

View File

@ -68,7 +68,7 @@ func InitMsgTool(config *config.GlobalConfig) (*MsgTool, error) {
if err != nil {
return nil, err
}
mongo, err := unrelation.NewMongo(&config.Mongo)
mongo, err := unrelation.NewMongoDB(&config.Mongo)
if err != nil {
return nil, err
}

View File

@ -15,18 +15,22 @@
package cmd
import (
"context"
"github.com/OpenIMSDK/protocol/constant"
"github.com/openimsdk/open-im-server/v3/internal/api"
config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
"github.com/spf13/cobra"
)
type ApiCmd struct {
*RootCmd
ctx context.Context
}
func NewApiCmd(name string) *ApiCmd {
ret := &ApiCmd{RootCmd: NewRootCmd(genutil.GetProcessName(), name)}
ret.ctx = context.WithValue(context.Background(), "version", config2.Version)
ret.SetRootCmdPt(ret)
ret.addPreRun()
ret.addRunE()
@ -42,7 +46,7 @@ func (a *ApiCmd) addPreRun() {
func (a *ApiCmd) addRunE() {
a.Command.RunE = func(cmd *cobra.Command, args []string) error {
return api.Start(a.config, a.port, a.prometheusPort)
return api.Start(a.ctx, a.config, a.port, a.prometheusPort)
}
}

View File

@ -15,6 +15,7 @@
package cmd
import (
"context"
"github.com/openimsdk/open-im-server/v3/internal/tools"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
@ -23,12 +24,14 @@ import (
type CronTaskCmd struct {
*RootCmd
initFunc func(config *config.GlobalConfig) error
initFunc func(ctx context.Context, config *config.GlobalConfig) error
ctx context.Context
}
func NewCronTaskCmd(name string) *CronTaskCmd {
ret := &CronTaskCmd{RootCmd: NewRootCmd(genutil.GetProcessName(), name, WithCronTaskLogName()),
initFunc: tools.StartTask}
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
ret.addRunE()
ret.SetRootCmdPt(ret)
return ret
@ -36,7 +39,7 @@ func NewCronTaskCmd(name string) *CronTaskCmd {
func (c *CronTaskCmd) addRunE() {
c.Command.RunE = func(cmd *cobra.Command, args []string) error {
return c.initFunc(c.config)
return c.initFunc(c.ctx, c.config)
}
}

View File

@ -15,6 +15,8 @@
package cmd
import (
"context"
config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"log"
"github.com/OpenIMSDK/protocol/constant"
@ -25,10 +27,12 @@ import (
type MsgGatewayCmd struct {
*RootCmd
ctx context.Context
}
func NewMsgGatewayCmd(name string) *MsgGatewayCmd {
ret := &MsgGatewayCmd{NewRootCmd(genutil.GetProcessName(), name)}
ret := &MsgGatewayCmd{RootCmd: NewRootCmd(genutil.GetProcessName(), name)}
ret.ctx = context.WithValue(context.Background(), "version", config2.Version)
ret.addRunE()
ret.SetRootCmdPt(ret)
return ret
@ -51,7 +55,7 @@ func (m *MsgGatewayCmd) getWsPortFlag(cmd *cobra.Command) int {
func (m *MsgGatewayCmd) addRunE() {
m.Command.RunE = func(cmd *cobra.Command, args []string) error {
return msggateway.Start(m.config, m.getPortFlag(cmd), m.getWsPortFlag(cmd), m.getPrometheusPortFlag(cmd))
return msggateway.Start(m.ctx, m.config, m.getPortFlag(cmd), m.getWsPortFlag(cmd), m.getPrometheusPortFlag(cmd))
}
}

View File

@ -15,7 +15,9 @@
package cmd
import (
"context"
"fmt"
config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/OpenIMSDK/protocol/constant"
"github.com/openimsdk/open-im-server/v3/internal/msgtransfer"
@ -25,10 +27,12 @@ import (
type MsgTransferCmd struct {
*RootCmd
ctx context.Context
}
func NewMsgTransferCmd(name string) *MsgTransferCmd {
ret := &MsgTransferCmd{NewRootCmd(genutil.GetProcessName(), name)}
ret := &MsgTransferCmd{RootCmd: NewRootCmd(genutil.GetProcessName(), name)}
ret.ctx = context.WithValue(context.Background(), "version", config2.Version)
ret.addRunE()
ret.SetRootCmdPt(ret)
return ret
@ -36,7 +40,7 @@ func NewMsgTransferCmd(name string) *MsgTransferCmd {
func (m *MsgTransferCmd) addRunE() {
m.Command.RunE = func(cmd *cobra.Command, args []string) error {
return msgtransfer.Start(m.config, m.getPrometheusPortFlag(cmd))
return msgtransfer.Start(m.ctx, m.config, m.getPrometheusPortFlag(cmd), m.getTransferProgressFlagValue())
}
}

View File

@ -62,11 +62,14 @@ func WithLogName(logName string) func(*CmdOpts) {
func NewRootCmd(processName, name string, opts ...func(*CmdOpts)) *RootCmd {
rootCmd := &RootCmd{processName: processName, Name: name, config: config.NewGlobalConfig()}
cmd := cobra.Command{
Use: "Start openIM application",
Short: fmt.Sprintf(`Start %s `, name),
Long: fmt.Sprintf(`Start %s `, name),
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
return rootCmd.persistentPreRun(cmd, opts...)
},
SilenceUsage: true,
SilenceErrors: true,
}
rootCmd.Command = cmd
rootCmd.addConfFlag()

View File

@ -28,7 +28,7 @@ import (
"google.golang.org/grpc"
)
type rpcInitFuc func(config *config2.GlobalConfig, disCov discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error
type rpcInitFuc func(ctx context.Context, config *config2.GlobalConfig, disCov discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error
type RpcCmd struct {
*RootCmd
@ -68,7 +68,7 @@ func (a *RpcCmd) Exec() error {
return a.Execute()
}
func (a *RpcCmd) StartSvr(name string, rpcFn func(config *config2.GlobalConfig, disCov discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error) error {
func (a *RpcCmd) StartSvr(name string, rpcFn func(ctx context.Context, config *config2.GlobalConfig, disCov discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error) error {
if a.GetPortFlag() == 0 {
return errs.Wrap(errors.New("port is required"))
}

View File

@ -18,6 +18,7 @@ import (
"context"
"errors"
"fmt"
"github.com/OpenIMSDK/tools/log"
"os"
"strings"
"time"
@ -38,7 +39,7 @@ const (
)
// NewRedis Initialize redis connection.
func NewRedis(redisConf *config.Redis) (redis.UniversalClient, error) {
func NewRedis(ctx context.Context, redisConf *config.Redis) (redis.UniversalClient, error) {
if redisClient != nil {
return redisClient, nil
}
@ -80,6 +81,7 @@ func NewRedis(redisConf *config.Redis) (redis.UniversalClient, error) {
return nil, errs.WrapMsg(err, errMsg)
}
redisClient = rdb
log.CInfo(ctx, "redis connected successfully", "address", redisConf.Address, "username", redisConf.Username, "password", redisConf.Password, "clusterMode", redisConf.ClusterMode, "enablePipeline", redisConf.EnablePipeline)
return rdb, err
}

View File

@ -44,7 +44,7 @@ func Test_BatchInsertChat2DB(t *testing.T) {
conf.RetainChatRecords = 3650
conf.ChatRecordsClearTime = "0 2 * * 3"
mongo, err := unrelation.NewMongo(&conf.Mongo)
mongo, err := unrelation.NewMongoDB(context.Background(), &conf.Mongo)
if err != nil {
t.Fatal(err)
}
@ -156,7 +156,7 @@ func GetDB() *commonMsgDatabase {
conf.RetainChatRecords = 3650
conf.ChatRecordsClearTime = "0 2 * * 3"
mongo, err := unrelation.NewMongo(&conf.Mongo)
mongo, err := unrelation.NewMongoDB(context.Background(), &conf.Mongo)
if err != nil {
panic(err)
}

View File

@ -17,6 +17,7 @@ package unrelation
import (
"context"
"fmt"
"github.com/OpenIMSDK/tools/log"
"os"
"strings"
"time"
@ -40,8 +41,8 @@ type Mongo struct {
mongoConf *config.Mongo
}
// NewMongo Initialize MongoDB connection.
func NewMongo(mongoConf *config.Mongo) (*Mongo, error) {
// NewMongoDB Initialize MongoDB connection.
func NewMongoDB(ctx context.Context, mongoConf *config.Mongo) (*Mongo, error) {
specialerror.AddReplace(mongo.ErrNoDocuments, errs.ErrRecordNotFound)
uri := buildMongoURI(mongoConf)
@ -50,13 +51,14 @@ func NewMongo(mongoConf *config.Mongo) (*Mongo, error) {
// Retry connecting to MongoDB
for i := 0; i <= maxRetry; i++ {
ctx, cancel := context.WithTimeout(context.Background(), mongoConnTimeout)
ctx, cancel := context.WithTimeout(ctx, mongoConnTimeout)
defer cancel()
mongoClient, err = mongo.Connect(ctx, options.Client().ApplyURI(uri))
if err == nil {
if err = mongoClient.Ping(ctx, nil); err != nil {
return nil, errs.WrapMsg(err, uri)
}
log.CInfo(ctx, "MongoDB connected", "uri", uri)
return &Mongo{db: mongoClient, mongoConf: mongoConf}, nil
}
if shouldRetry(err) {

View File

@ -52,7 +52,7 @@ func Start(
rpcRegisterName string,
prometheusPort int,
config *config2.GlobalConfig,
rpcFn func(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error,
rpcFn func(ctx context.Context, config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error,
options ...grpc.ServerOption,
) error {
log.CInfo(ctx, "rpc server starting", "rpcRegisterName", rpcRegisterName, "rpcPort", rpcPort,
@ -96,7 +96,7 @@ func Start(
once.Do(srv.GracefulStop)
}()
err = rpcFn(config, client, srv)
err = rpcFn(ctx, config, client, srv)
if err != nil {
return err
}