From 0c20ec5a9c4c1f8acdd9e16cbd5cf176e3028066 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Sat, 8 Feb 2025 15:52:34 +0800 Subject: [PATCH] 1 --- cmd/main.go | 176 +++++++++++++++++-------------------------- internal/api/init.go | 9 +-- 2 files changed, 75 insertions(+), 110 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index a2afb11fa..dd225fa13 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -27,10 +27,8 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/discovery/standalone" - "github.com/openimsdk/tools/utils/datautil" "github.com/spf13/viper" "google.golang.org/grpc" - "gopkg.in/yaml.v3" ) func main() { @@ -45,17 +43,17 @@ func main() { } } cmd := newCmds(configPath) - putCmd1(cmd, false, auth.Start) - putCmd1(cmd, false, conversation.Start) - putCmd1(cmd, false, relation.Start) - putCmd1(cmd, false, group.Start) - putCmd1(cmd, false, msg.Start) - putCmd1(cmd, false, third.Start) - putCmd1(cmd, false, user.Start) - putCmd1(cmd, false, push.Start) - putCmd1(cmd, true, msggateway.Start) - putCmd1(cmd, true, msgtransfer.Start) - putCmd1(cmd, true, api.Start) + putCmd(cmd, false, auth.Start) + putCmd(cmd, false, conversation.Start) + putCmd(cmd, false, relation.Start) + putCmd(cmd, false, group.Start) + putCmd(cmd, false, msg.Start) + putCmd(cmd, false, third.Start) + putCmd(cmd, false, user.Start) + putCmd(cmd, false, push.Start) + putCmd(cmd, true, msggateway.Start) + putCmd(cmd, true, msgtransfer.Start) + putCmd(cmd, true, api.Start) ctx := context.Background() if err := cmd.run(ctx); err != nil { fmt.Println(err) @@ -64,28 +62,46 @@ func main() { fmt.Println("exit") } -func getTypePath(typ reflect.Type) string { - return path.Join(typ.PkgPath(), typ.Name()) -} - func newCmds(confPath string) *cmds { return &cmds{confPath: confPath} } +type cmdName struct { + Name string + Func func(ctx context.Context) error + Block bool +} type cmds struct { confPath string cmds []cmdName - conf map[string][]byte + config config.AllConfig + conf map[string]reflect.Value } -func (x *cmds) readConfig() error { - skip := []string{ - config.DiscoveryConfigFilename, +func (x *cmds) getTypePath(typ reflect.Type) string { + return path.Join(typ.PkgPath(), typ.Name()) +} + +func (x *cmds) initDiscovery() { + x.config.Discovery.Enable = config.Standalone + vof := reflect.ValueOf(&x.config.Discovery.RpcService).Elem() + tof := reflect.TypeOf(&x.config.Discovery.RpcService).Elem() + num := tof.NumField() + for i := 0; i < num; i++ { + field := tof.Field(i) + if !field.IsExported() { + continue + } + if field.Type.Kind() != reflect.String { + continue + } + vof.Field(i).SetString(field.Name) } - if x.conf == nil { - x.conf = make(map[string][]byte) - } - vof := reflect.ValueOf(&config.AllConfig{}).Elem() +} + +func (x *cmds) initAllConfig() error { + x.conf = make(map[string]reflect.Value) + vof := reflect.ValueOf(&x.config).Elem() num := vof.NumField() for i := 0; i < num; i++ { field := vof.Field(i) @@ -96,24 +112,29 @@ func (x *cmds) readConfig() error { ptr = false } } - itemConf := field.Addr().Interface() - name := itemConf.(interface{ GetConfigFileName() string }).GetConfigFileName() - if datautil.Contain(name, skip...) { - x.conf[getTypePath(field.Type())] = nil - continue - } - data, err := os.ReadFile(filepath.Join(x.confPath, name)) + x.conf[x.getTypePath(field.Type())] = field + val := field.Addr().Interface() + name := val.(interface{ GetConfigFileName() string }).GetConfigFileName() + confData, err := os.ReadFile(filepath.Join(x.confPath, name)) if err != nil { + if os.IsNotExist(err) { + continue + } + return err + } + v := viper.New() + v.SetConfigType("yaml") + if err := v.ReadConfig(bytes.NewReader(confData)); err != nil { + return err + } + opt := func(conf *mapstructure.DecoderConfig) { + conf.TagName = config.StructTagName + } + if err := v.Unmarshal(val, opt); err != nil { return err } - x.conf[getTypePath(field.Type())] = data } - val := config.Discovery{Enable: config.Standalone} - var buf bytes.Buffer - if err := yaml.NewEncoder(&buf).Encode(&val); err != nil { - return err - } - x.conf[getTypePath(reflect.TypeOf(val))] = buf.Bytes() + x.initDiscovery() return nil } @@ -134,44 +155,23 @@ func (x *cmds) parseConf(conf any) error { continue } field := vof.Field(i) - pkt := getTypePath(field.Type()) - confData, ok := x.conf[pkt] + pkt := x.getTypePath(field.Type()) + val, ok := x.conf[pkt] if !ok { switch field.Interface().(type) { + case config.Index: case config.Path: field.SetString(x.confPath) case config.AllConfig: - var allConf config.AllConfig - if err := x.parseConf(&allConf); err != nil { - return err - } - field.Set(reflect.ValueOf(allConf)) + field.Set(reflect.ValueOf(x.config)) case *config.AllConfig: - var allConf config.AllConfig - if err := x.parseConf(&allConf); err != nil { - return err - } - field.Set(reflect.ValueOf(&allConf)) + field.Set(reflect.ValueOf(&x.config)) default: return fmt.Errorf("config field %s %s not found", vof.Type().Name(), typeField.Name) } continue } - if confData == nil { - continue - } - val := field.Addr().Interface() - v := viper.New() - v.SetConfigType("yaml") - if err := v.ReadConfig(bytes.NewReader(confData)); err != nil { - return err - } - fn := func(conf *mapstructure.DecoderConfig) { - conf.TagName = config.StructTagName - } - if err := v.Unmarshal(val, fn); err != nil { - return err - } + field.Set(val) } return nil } @@ -181,19 +181,16 @@ func (x *cmds) add(name string, block bool, fn func(ctx context.Context) error) } func (x *cmds) run(ctx context.Context) error { - if x.conf == nil { - if err := x.readConfig(); err != nil { - return err - } - } if len(x.cmds) == 0 { return fmt.Errorf("no command to run") } + if err := x.initAllConfig(); err != nil { + return err + } ctx, cancel := context.WithCancelCause(ctx) for i := range x.cmds { cmd := x.cmds[i] go func() { - //fmt.Println("start", cmd.Name) if err := cmd.Func(ctx); err != nil { cancel(fmt.Errorf("server %s exit %w", cmd.Name, err)) return @@ -207,22 +204,12 @@ func (x *cmds) run(ctx context.Context) error { return context.Cause(ctx) } -type cmdName struct { - Name string - Func func(ctx context.Context) error - Block bool -} - -func getFuncPacketName(fn any) string { +func putCmd[C any](cmd *cmds, block bool, fn func(ctx context.Context, config *C, client discovery.Conn, server grpc.ServiceRegistrar) error) { name := path.Base(runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()) if index := strings.Index(name, "."); index >= 0 { name = name[:index] } - return name -} - -func putCmd1[C any](cmd *cmds, block bool, fn func(ctx context.Context, config *C, client discovery.Conn, server grpc.ServiceRegistrar) error) { - cmd.add(getFuncPacketName(fn), block, func(ctx context.Context) error { + cmd.add(name, block, func(ctx context.Context) error { var conf C if err := cmd.parseConf(&conf); err != nil { return err @@ -230,24 +217,3 @@ func putCmd1[C any](cmd *cmds, block bool, fn func(ctx context.Context, config * return fn(ctx, &conf, standalone.GetDiscoveryConn(), standalone.GetServiceRegistrar()) }) } - -// -//func putCmd2[C any](cmd *cmds, block bool, fn func(ctx context.Context, index int, config *C) error) { -// cmd.add(getFuncPacketName(fn), block, func(ctx context.Context) error { -// var conf C -// if err := cmd.parseConf(&conf); err != nil { -// return err -// } -// return fn(ctx, 0, &conf) -// }) -//} -// -//func putCmd3[C any](cmd *cmds, block bool, fn func(ctx context.Context, config *C, client discovery.Conn, server grpc.ServiceRegistrar, index int) error) { -// cmd.add(getFuncPacketName(fn), block, func(ctx context.Context) error { -// var conf C -// if err := cmd.parseConf(&conf); err != nil { -// return err -// } -// return fn(ctx, &conf, standalone.GetDiscoveryConn(), standalone.GetServiceRegistrar(), 0) -// }) -//} diff --git a/internal/api/init.go b/internal/api/init.go index fcf99a618..3fecc119d 100644 --- a/internal/api/init.go +++ b/internal/api/init.go @@ -129,12 +129,12 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, _ grpc.Se //} address := net.JoinHostPort(network.GetListenIP(config.API.Api.ListenIP), strconv.Itoa(apiPort)) - server := http.Server{Addr: address, Handler: router} + httpServer := &http.Server{Addr: address, Handler: router} log.CInfo(ctx, "API server is initializing", "runtimeEnv", runtimeenv.RuntimeEnvironment(), "address", address, "apiPort", apiPort, "prometheusPort", prometheusPort) go func() { - err = server.ListenAndServe() + err = httpServer.ListenAndServe() if err != nil && !errors.Is(err, http.ErrServerClosed) { - netErr = errs.WrapMsg(err, fmt.Sprintf("api start err: %s", server.Addr)) + netErr = errs.WrapMsg(err, fmt.Sprintf("api start err: %s", httpServer.Addr)) netDone <- struct{}{} } }() @@ -150,13 +150,12 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, _ grpc.Se shutdown := func() error { ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) defer cancel() - err := server.Shutdown(ctx) + err := httpServer.Shutdown(ctx) if err != nil { return errs.WrapMsg(err, "shutdown err") } return nil } - disetcd.RegisterShutDown(shutdown) select { case <-sigs: program.SIGTERMExit()