This commit is contained in:
withchao 2025-02-08 15:52:34 +08:00
parent c48a082602
commit 0c20ec5a9c
2 changed files with 75 additions and 110 deletions

View File

@ -27,10 +27,8 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/discovery/standalone" "github.com/openimsdk/tools/discovery/standalone"
"github.com/openimsdk/tools/utils/datautil"
"github.com/spf13/viper" "github.com/spf13/viper"
"google.golang.org/grpc" "google.golang.org/grpc"
"gopkg.in/yaml.v3"
) )
func main() { func main() {
@ -45,17 +43,17 @@ func main() {
} }
} }
cmd := newCmds(configPath) cmd := newCmds(configPath)
putCmd1(cmd, false, auth.Start) putCmd(cmd, false, auth.Start)
putCmd1(cmd, false, conversation.Start) putCmd(cmd, false, conversation.Start)
putCmd1(cmd, false, relation.Start) putCmd(cmd, false, relation.Start)
putCmd1(cmd, false, group.Start) putCmd(cmd, false, group.Start)
putCmd1(cmd, false, msg.Start) putCmd(cmd, false, msg.Start)
putCmd1(cmd, false, third.Start) putCmd(cmd, false, third.Start)
putCmd1(cmd, false, user.Start) putCmd(cmd, false, user.Start)
putCmd1(cmd, false, push.Start) putCmd(cmd, false, push.Start)
putCmd1(cmd, true, msggateway.Start) putCmd(cmd, true, msggateway.Start)
putCmd1(cmd, true, msgtransfer.Start) putCmd(cmd, true, msgtransfer.Start)
putCmd1(cmd, true, api.Start) putCmd(cmd, true, api.Start)
ctx := context.Background() ctx := context.Background()
if err := cmd.run(ctx); err != nil { if err := cmd.run(ctx); err != nil {
fmt.Println(err) fmt.Println(err)
@ -64,28 +62,46 @@ func main() {
fmt.Println("exit") fmt.Println("exit")
} }
func getTypePath(typ reflect.Type) string {
return path.Join(typ.PkgPath(), typ.Name())
}
func newCmds(confPath string) *cmds { func newCmds(confPath string) *cmds {
return &cmds{confPath: confPath} return &cmds{confPath: confPath}
} }
type cmdName struct {
Name string
Func func(ctx context.Context) error
Block bool
}
type cmds struct { type cmds struct {
confPath string confPath string
cmds []cmdName cmds []cmdName
conf map[string][]byte config config.AllConfig
conf map[string]reflect.Value
} }
func (x *cmds) readConfig() error { func (x *cmds) getTypePath(typ reflect.Type) string {
skip := []string{ return path.Join(typ.PkgPath(), typ.Name())
config.DiscoveryConfigFilename, }
func (x *cmds) initDiscovery() {
x.config.Discovery.Enable = config.Standalone
vof := reflect.ValueOf(&x.config.Discovery.RpcService).Elem()
tof := reflect.TypeOf(&x.config.Discovery.RpcService).Elem()
num := tof.NumField()
for i := 0; i < num; i++ {
field := tof.Field(i)
if !field.IsExported() {
continue
}
if field.Type.Kind() != reflect.String {
continue
}
vof.Field(i).SetString(field.Name)
} }
if x.conf == nil { }
x.conf = make(map[string][]byte)
} func (x *cmds) initAllConfig() error {
vof := reflect.ValueOf(&config.AllConfig{}).Elem() x.conf = make(map[string]reflect.Value)
vof := reflect.ValueOf(&x.config).Elem()
num := vof.NumField() num := vof.NumField()
for i := 0; i < num; i++ { for i := 0; i < num; i++ {
field := vof.Field(i) field := vof.Field(i)
@ -96,24 +112,29 @@ func (x *cmds) readConfig() error {
ptr = false ptr = false
} }
} }
itemConf := field.Addr().Interface() x.conf[x.getTypePath(field.Type())] = field
name := itemConf.(interface{ GetConfigFileName() string }).GetConfigFileName() val := field.Addr().Interface()
if datautil.Contain(name, skip...) { name := val.(interface{ GetConfigFileName() string }).GetConfigFileName()
x.conf[getTypePath(field.Type())] = nil confData, err := os.ReadFile(filepath.Join(x.confPath, name))
continue
}
data, err := os.ReadFile(filepath.Join(x.confPath, name))
if err != nil { if err != nil {
if os.IsNotExist(err) {
continue
}
return err
}
v := viper.New()
v.SetConfigType("yaml")
if err := v.ReadConfig(bytes.NewReader(confData)); err != nil {
return err
}
opt := func(conf *mapstructure.DecoderConfig) {
conf.TagName = config.StructTagName
}
if err := v.Unmarshal(val, opt); err != nil {
return err return err
} }
x.conf[getTypePath(field.Type())] = data
} }
val := config.Discovery{Enable: config.Standalone} x.initDiscovery()
var buf bytes.Buffer
if err := yaml.NewEncoder(&buf).Encode(&val); err != nil {
return err
}
x.conf[getTypePath(reflect.TypeOf(val))] = buf.Bytes()
return nil return nil
} }
@ -134,44 +155,23 @@ func (x *cmds) parseConf(conf any) error {
continue continue
} }
field := vof.Field(i) field := vof.Field(i)
pkt := getTypePath(field.Type()) pkt := x.getTypePath(field.Type())
confData, ok := x.conf[pkt] val, ok := x.conf[pkt]
if !ok { if !ok {
switch field.Interface().(type) { switch field.Interface().(type) {
case config.Index:
case config.Path: case config.Path:
field.SetString(x.confPath) field.SetString(x.confPath)
case config.AllConfig: case config.AllConfig:
var allConf config.AllConfig field.Set(reflect.ValueOf(x.config))
if err := x.parseConf(&allConf); err != nil {
return err
}
field.Set(reflect.ValueOf(allConf))
case *config.AllConfig: case *config.AllConfig:
var allConf config.AllConfig field.Set(reflect.ValueOf(&x.config))
if err := x.parseConf(&allConf); err != nil {
return err
}
field.Set(reflect.ValueOf(&allConf))
default: default:
return fmt.Errorf("config field %s %s not found", vof.Type().Name(), typeField.Name) return fmt.Errorf("config field %s %s not found", vof.Type().Name(), typeField.Name)
} }
continue continue
} }
if confData == nil { field.Set(val)
continue
}
val := field.Addr().Interface()
v := viper.New()
v.SetConfigType("yaml")
if err := v.ReadConfig(bytes.NewReader(confData)); err != nil {
return err
}
fn := func(conf *mapstructure.DecoderConfig) {
conf.TagName = config.StructTagName
}
if err := v.Unmarshal(val, fn); err != nil {
return err
}
} }
return nil return nil
} }
@ -181,19 +181,16 @@ func (x *cmds) add(name string, block bool, fn func(ctx context.Context) error)
} }
func (x *cmds) run(ctx context.Context) error { func (x *cmds) run(ctx context.Context) error {
if x.conf == nil {
if err := x.readConfig(); err != nil {
return err
}
}
if len(x.cmds) == 0 { if len(x.cmds) == 0 {
return fmt.Errorf("no command to run") return fmt.Errorf("no command to run")
} }
if err := x.initAllConfig(); err != nil {
return err
}
ctx, cancel := context.WithCancelCause(ctx) ctx, cancel := context.WithCancelCause(ctx)
for i := range x.cmds { for i := range x.cmds {
cmd := x.cmds[i] cmd := x.cmds[i]
go func() { go func() {
//fmt.Println("start", cmd.Name)
if err := cmd.Func(ctx); err != nil { if err := cmd.Func(ctx); err != nil {
cancel(fmt.Errorf("server %s exit %w", cmd.Name, err)) cancel(fmt.Errorf("server %s exit %w", cmd.Name, err))
return return
@ -207,22 +204,12 @@ func (x *cmds) run(ctx context.Context) error {
return context.Cause(ctx) return context.Cause(ctx)
} }
type cmdName struct { func putCmd[C any](cmd *cmds, block bool, fn func(ctx context.Context, config *C, client discovery.Conn, server grpc.ServiceRegistrar) error) {
Name string
Func func(ctx context.Context) error
Block bool
}
func getFuncPacketName(fn any) string {
name := path.Base(runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()) name := path.Base(runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name())
if index := strings.Index(name, "."); index >= 0 { if index := strings.Index(name, "."); index >= 0 {
name = name[:index] name = name[:index]
} }
return name cmd.add(name, block, func(ctx context.Context) error {
}
func putCmd1[C any](cmd *cmds, block bool, fn func(ctx context.Context, config *C, client discovery.Conn, server grpc.ServiceRegistrar) error) {
cmd.add(getFuncPacketName(fn), block, func(ctx context.Context) error {
var conf C var conf C
if err := cmd.parseConf(&conf); err != nil { if err := cmd.parseConf(&conf); err != nil {
return err return err
@ -230,24 +217,3 @@ func putCmd1[C any](cmd *cmds, block bool, fn func(ctx context.Context, config *
return fn(ctx, &conf, standalone.GetDiscoveryConn(), standalone.GetServiceRegistrar()) return fn(ctx, &conf, standalone.GetDiscoveryConn(), standalone.GetServiceRegistrar())
}) })
} }
//
//func putCmd2[C any](cmd *cmds, block bool, fn func(ctx context.Context, index int, config *C) error) {
// cmd.add(getFuncPacketName(fn), block, func(ctx context.Context) error {
// var conf C
// if err := cmd.parseConf(&conf); err != nil {
// return err
// }
// return fn(ctx, 0, &conf)
// })
//}
//
//func putCmd3[C any](cmd *cmds, block bool, fn func(ctx context.Context, config *C, client discovery.Conn, server grpc.ServiceRegistrar, index int) error) {
// cmd.add(getFuncPacketName(fn), block, func(ctx context.Context) error {
// var conf C
// if err := cmd.parseConf(&conf); err != nil {
// return err
// }
// return fn(ctx, &conf, standalone.GetDiscoveryConn(), standalone.GetServiceRegistrar(), 0)
// })
//}

View File

@ -129,12 +129,12 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, _ grpc.Se
//} //}
address := net.JoinHostPort(network.GetListenIP(config.API.Api.ListenIP), strconv.Itoa(apiPort)) address := net.JoinHostPort(network.GetListenIP(config.API.Api.ListenIP), strconv.Itoa(apiPort))
server := http.Server{Addr: address, Handler: router} httpServer := &http.Server{Addr: address, Handler: router}
log.CInfo(ctx, "API server is initializing", "runtimeEnv", runtimeenv.RuntimeEnvironment(), "address", address, "apiPort", apiPort, "prometheusPort", prometheusPort) log.CInfo(ctx, "API server is initializing", "runtimeEnv", runtimeenv.RuntimeEnvironment(), "address", address, "apiPort", apiPort, "prometheusPort", prometheusPort)
go func() { go func() {
err = server.ListenAndServe() err = httpServer.ListenAndServe()
if err != nil && !errors.Is(err, http.ErrServerClosed) { if err != nil && !errors.Is(err, http.ErrServerClosed) {
netErr = errs.WrapMsg(err, fmt.Sprintf("api start err: %s", server.Addr)) netErr = errs.WrapMsg(err, fmt.Sprintf("api start err: %s", httpServer.Addr))
netDone <- struct{}{} netDone <- struct{}{}
} }
}() }()
@ -150,13 +150,12 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, _ grpc.Se
shutdown := func() error { shutdown := func() error {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel() defer cancel()
err := server.Shutdown(ctx) err := httpServer.Shutdown(ctx)
if err != nil { if err != nil {
return errs.WrapMsg(err, "shutdown err") return errs.WrapMsg(err, "shutdown err")
} }
return nil return nil
} }
disetcd.RegisterShutDown(shutdown)
select { select {
case <-sigs: case <-sigs:
program.SIGTERMExit() program.SIGTERMExit()