From 23df83cb0b68b94f57404e0577c059c0c6cd2f45 Mon Sep 17 00:00:00 2001 From: John Guo Date: Wed, 22 May 2024 21:14:43 +0800 Subject: [PATCH] database/gdb: fix deadlock when orm operations perform in cache function from gcache (#3585) --- .github/workflows/golangci-lint.yml | 4 +- .golangci.yml | 13 +- contrib/config/consul/consul.go | 31 ++- contrib/config/kubecm/kubecm.go | 20 +- contrib/config/polaris/polaris.go | 22 +- contrib/rpc/grpcx/grpcx_grpc_server.go | 22 +- .../internal/tracing/tracing_interceptor.go | 11 +- database/gdb/gdb.go | 17 +- database/gdb/gdb_core.go | 20 +- database/gdb/gdb_core_utility.go | 60 ++--- database/gdb/gdb_driver_wrapper_db.go | 32 ++- database/gdb/gdb_func.go | 24 ++ database/gdb/gdb_model_cache.go | 15 +- database/gdb/gdb_model_soft_time.go | 56 +++-- net/ghttp/ghttp_server.go | 66 ++--- os/gfsnotify/gfsnotify.go | 4 +- os/gfsnotify/gfsnotify_watcher_loop.go | 236 +++++++++--------- os/grpool/grpool_pool.go | 36 +-- os/gtimer/gtimer_entry.go | 36 +-- os/gtimer/gtimer_timer_loop.go | 50 ++-- 20 files changed, 418 insertions(+), 357 deletions(-) diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml index e0024a1c8..ade760534 100644 --- a/.github/workflows/golangci-lint.yml +++ b/.github/workflows/golangci-lint.yml @@ -47,8 +47,8 @@ jobs: with: go-version: ${{ matrix.go-version }} - name: golangci-lint - uses: golangci/golangci-lint-action@v4 + uses: golangci/golangci-lint-action@v6 with: # Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version. - version: v1.56.2 + version: v1.58.2 args: --timeout 3m0s diff --git a/.golangci.yml b/.golangci.yml index 3c370df08..11c072949 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -38,7 +38,7 @@ linters: # Custom enable linters we want to use. enable: - errcheck # Errcheck is a program for checking for unchecked errors in go programs. - - errchkjson # Checks types passed to the json encoding functions. Reports unsupported types and optionally reports occasions, where the check for the returned error can be omitted. + - errchkjson # Checks types passed to the JSON encoding functions. Reports unsupported types and optionally reports occasions, where the check for the returned error can be omitted. - funlen # Tool for detection of long functions - goconst # Finds repeated strings that could be replaced by a constant - gocritic # Provides diagnostics that check for bugs, performance and style issues. @@ -172,10 +172,6 @@ linters-settings: # https://golangci-lint.run/usage/linters/#gosimple gosimple: - # Select the Go version to target. - # Default: 1.13 - # Deprecated: use the global `run.go` instead. - go: "1.15" # Sxxxx checks in https://staticcheck.io/docs/configuration/options/#checks # Default: ["*"] checks: [ @@ -184,9 +180,6 @@ linters-settings: # https://golangci-lint.run/usage/linters/#govet govet: - # Report about shadowed variables. - # Default: false - check-shadowing: true # Settings per analyzer. settings: # Analyzer name, run `go tool vet help` to see all analyzers. @@ -259,10 +252,6 @@ linters-settings: # https://golangci-lint.run/usage/linters/#staticcheck staticcheck: - # Select the Go version to target. - # Default: "1.13" - # Deprecated: use the global `run.go` instead. - go: "1.20" # SAxxxx checks in https://staticcheck.io/docs/configuration/options/#checks # Default: ["*"] checks: [ "all","-SA1019","-SA4015","-SA1029","-SA1016","-SA9003","-SA4006","-SA6003" ] diff --git a/contrib/config/consul/consul.go b/contrib/config/consul/consul.go index 3a8fe8ab3..afb1cf29e 100644 --- a/contrib/config/consul/consul.go +++ b/contrib/config/consul/consul.go @@ -11,13 +11,14 @@ import ( "context" "fmt" + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/api/watch" + "github.com/gogf/gf/v2/encoding/gjson" "github.com/gogf/gf/v2/errors/gerror" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/gcfg" "github.com/gogf/gf/v2/os/glog" - "github.com/hashicorp/consul/api" - "github.com/hashicorp/consul/api/watch" ) // Config is the configuration object for consul client. @@ -156,22 +157,28 @@ func (c *Client) addWatcher() (err error) { return } - if err := c.doUpdate(v.Value); err != nil { - c.config.Logger.Errorf(context.Background(), + if err = c.doUpdate(v.Value); err != nil { + c.config.Logger.Errorf( + context.Background(), "watch config from consul path %+v update failed: %s", - c.config.Path, err) + c.config.Path, err, + ) } } plan.Datacenter = c.config.ConsulConfig.Datacenter plan.Token = c.config.ConsulConfig.Token - go func() { - if err := plan.Run(c.config.ConsulConfig.Address); err != nil { - c.config.Logger.Errorf(context.Background(), - "watch config from consul path %+v plan start failed: %s", - c.config.Path, err) - } - }() + go c.startAsynchronousWatch(plan) return nil } + +func (c *Client) startAsynchronousWatch(plan *watch.Plan) { + if err := plan.Run(c.config.ConsulConfig.Address); err != nil { + c.config.Logger.Errorf( + context.Background(), + "watch config from consul path %+v plan start failed: %s", + c.config.Path, err, + ) + } +} diff --git a/contrib/config/kubecm/kubecm.go b/contrib/config/kubecm/kubecm.go index dd4ed0f98..a110193cf 100644 --- a/contrib/config/kubecm/kubecm.go +++ b/contrib/config/kubecm/kubecm.go @@ -164,14 +164,16 @@ func (c *Client) doWatch(ctx context.Context, namespace string) (err error) { c.config.ConfigMap, namespace, ) } - go func() { - for { - event := <-watchHandler.ResultChan() - switch event.Type { - case watch.Modified: - _ = c.doUpdate(ctx, namespace) - } - } - }() + go c.startAsynchronousWatch(ctx, namespace, watchHandler) return nil } + +func (c *Client) startAsynchronousWatch(ctx context.Context, namespace string, watchHandler watch.Interface) { + for { + event := <-watchHandler.ResultChan() + switch event.Type { + case watch.Modified: + _ = c.doUpdate(ctx, namespace) + } + } +} diff --git a/contrib/config/polaris/polaris.go b/contrib/config/polaris/polaris.go index 8f4881418..c14a8ff85 100644 --- a/contrib/config/polaris/polaris.go +++ b/contrib/config/polaris/polaris.go @@ -158,14 +158,18 @@ func (c *Client) doWatch(ctx context.Context) (err error) { if !c.config.Watch { return nil } - var changeChan = c.client.AddChangeListenerWithChannel() - go func() { - for { - select { - case <-changeChan: - _ = c.doUpdate(ctx) - } - } - }() + go c.startAsynchronousWatch( + ctx, + c.client.AddChangeListenerWithChannel(), + ) return nil } + +func (c *Client) startAsynchronousWatch(ctx context.Context, changeChan <-chan model.ConfigFileChangeEvent) { + for { + select { + case <-changeChan: + _ = c.doUpdate(ctx) + } + } +} diff --git a/contrib/rpc/grpcx/grpcx_grpc_server.go b/contrib/rpc/grpcx/grpcx_grpc_server.go index a44f5f118..7d14d30d9 100644 --- a/contrib/rpc/grpcx/grpcx_grpc_server.go +++ b/contrib/rpc/grpcx/grpcx_grpc_server.go @@ -108,11 +108,7 @@ func (s *GrpcServer) Run() { } // Start listening. - go func() { - if err = s.Server.Serve(s.listener); err != nil { - s.Logger().Fatalf(ctx, `%+v`, err) - } - }() + go s.doServeAsynchronously(ctx) // Service register. s.doServiceRegister() @@ -124,6 +120,12 @@ func (s *GrpcServer) Run() { s.doSignalListen() } +func (s *GrpcServer) doServeAsynchronously(ctx context.Context) { + if err := s.Server.Serve(s.listener); err != nil { + s.Logger().Fatalf(ctx, `%+v`, err) + } +} + // doSignalListen does signal listening and handling for gracefully shutdown. func (s *GrpcServer) doSignalListen() { var ctx = context.Background() @@ -204,10 +206,12 @@ func (s *GrpcServer) doServiceDeregister() { // Start starts the server in no-blocking way. func (s *GrpcServer) Start() { s.waitGroup.Add(1) - go func() { - defer s.waitGroup.Done() - s.Run() - }() + go s.doStartAsynchronously() +} + +func (s *GrpcServer) doStartAsynchronously() { + defer s.waitGroup.Done() + s.Run() } // Wait works with Start, which blocks current goroutine until the server stops. diff --git a/contrib/rpc/grpcx/internal/tracing/tracing_interceptor.go b/contrib/rpc/grpcx/internal/tracing/tracing_interceptor.go index d7e149d87..e84c21c92 100644 --- a/contrib/rpc/grpcx/internal/tracing/tracing_interceptor.go +++ b/contrib/rpc/grpcx/internal/tracing/tracing_interceptor.go @@ -139,16 +139,15 @@ const ( ) func wrapClientStream(s grpc.ClientStream, desc *grpc.StreamDesc) *clientStream { - events := make(chan streamEvent) - eventsDone := make(chan struct{}) - finished := make(chan error) - + var ( + events = make(chan streamEvent) + eventsDone = make(chan struct{}) + finished = make(chan error) + ) go func() { defer close(eventsDone) - // Both streams have to be closed state := byte(0) - for event := range events { switch event.Type { case closeEvent: diff --git a/database/gdb/gdb.go b/database/gdb/gdb.go index 20fd8b507..8bffa2636 100644 --- a/database/gdb/gdb.go +++ b/database/gdb/gdb.go @@ -268,6 +268,7 @@ type Core struct { logger glog.ILogger // Logger for logging functionality. config *ConfigNode // Current config node. dynamicConfig dynamicConfig // Dynamic configurations, which can be changed in runtime. + innerMemCache *gcache.Cache } type dynamicConfig struct { @@ -525,9 +526,6 @@ var ( // allDryRun sets dry-run feature for all database connections. // It is commonly used for command options for convenience. allDryRun = false - - // tableFieldsMap caches the table information retrieved from database. - tableFieldsMap = gmap.NewStrAnyMap(true) ) func init() { @@ -587,12 +585,13 @@ func newDBByConfigNode(node *ConfigNode, group string) (db DB, err error) { node = parseConfigNodeLink(node) } c := &Core{ - group: group, - debug: gtype.NewBool(), - cache: gcache.New(), - links: gmap.New(true), - logger: glog.New(), - config: node, + group: group, + debug: gtype.NewBool(), + cache: gcache.New(), + links: gmap.New(true), + logger: glog.New(), + config: node, + innerMemCache: gcache.New(), dynamicConfig: dynamicConfig{ MaxIdleConnCount: node.MaxIdleConnCount, MaxOpenConnCount: node.MaxOpenConnCount, diff --git a/database/gdb/gdb_core.go b/database/gdb/gdb_core.go index 80ffef5cd..a3896c30b 100644 --- a/database/gdb/gdb_core.go +++ b/database/gdb/gdb_core.go @@ -22,6 +22,7 @@ import ( "github.com/gogf/gf/v2/internal/intlog" "github.com/gogf/gf/v2/internal/reflection" "github.com/gogf/gf/v2/internal/utils" + "github.com/gogf/gf/v2/os/gcache" "github.com/gogf/gf/v2/text/gregex" "github.com/gogf/gf/v2/text/gstr" "github.com/gogf/gf/v2/util/gconv" @@ -737,20 +738,27 @@ func (c *Core) HasTable(name string) (bool, error) { return false, nil } +func (c *Core) GetInnerMemCache() *gcache.Cache { + return c.innerMemCache +} + // GetTablesWithCache retrieves and returns the table names of current database with cache. func (c *Core) GetTablesWithCache() ([]string, error) { var ( - ctx = c.db.GetCtx() - cacheKey = fmt.Sprintf(`Tables: %s`, c.db.GetGroup()) + ctx = c.db.GetCtx() + cacheKey = fmt.Sprintf(`Tables:%s`, c.db.GetGroup()) + cacheDuration = gcache.DurationNoExpire + innerMemCache = c.GetInnerMemCache() ) - result, err := c.GetCache().GetOrSetFuncLock( - ctx, cacheKey, func(ctx context.Context) (interface{}, error) { + result, err := innerMemCache.GetOrSetFuncLock( + ctx, cacheKey, + func(ctx context.Context) (interface{}, error) { tableList, err := c.db.Tables(ctx) if err != nil { - return false, err + return nil, err } return tableList, nil - }, 0, + }, cacheDuration, ) if err != nil { return nil, err diff --git a/database/gdb/gdb_core_utility.go b/database/gdb/gdb_core_utility.go index 71f0b48ea..4872f13a1 100644 --- a/database/gdb/gdb_core_utility.go +++ b/database/gdb/gdb_core_utility.go @@ -11,12 +11,10 @@ import ( "context" "fmt" - "github.com/gogf/gf/v2/crypto/gmd5" "github.com/gogf/gf/v2/errors/gcode" "github.com/gogf/gf/v2/errors/gerror" "github.com/gogf/gf/v2/text/gregex" "github.com/gogf/gf/v2/text/gstr" - "github.com/gogf/gf/v2/util/gconv" "github.com/gogf/gf/v2/util/gutil" ) @@ -144,22 +142,40 @@ func (c *Core) TableFields(ctx context.Context, table string, schema ...string) // ClearTableFields removes certain cached table fields of current configuration group. func (c *Core) ClearTableFields(ctx context.Context, table string, schema ...string) (err error) { - tableFieldsMap.Remove(fmt.Sprintf( - `%s%s@%s#%s`, - cachePrefixTableFields, + tableFieldsCacheKey := genTableFieldsCacheKey( c.db.GetGroup(), gutil.GetOrDefaultStr(c.db.GetSchema(), schema...), table, - )) + ) + _, err = c.innerMemCache.Remove(ctx, tableFieldsCacheKey) return } // ClearTableFieldsAll removes all cached table fields of current configuration group. func (c *Core) ClearTableFieldsAll(ctx context.Context) (err error) { var ( - keys = tableFieldsMap.Keys() - cachePrefix = fmt.Sprintf(`%s%s`, cachePrefixTableFields, c.db.GetGroup()) - removedKeys = make([]string, 0) + keys, _ = c.innerMemCache.KeyStrings(ctx) + cachePrefix = cachePrefixTableFields + removedKeys = make([]any, 0) + ) + for _, key := range keys { + if gstr.HasPrefix(key, cachePrefix) { + removedKeys = append(removedKeys, key) + } + } + + if len(removedKeys) > 0 { + err = c.innerMemCache.Removes(ctx, removedKeys) + } + return +} + +// ClearCache removes cached sql result of certain table. +func (c *Core) ClearCache(ctx context.Context, table string) (err error) { + var ( + keys, _ = c.db.GetCache().KeyStrings(ctx) + cachePrefix = fmt.Sprintf(`%s%s@`, cachePrefixSelectCache, table) + removedKeys = make([]any, 0) ) for _, key := range keys { if gstr.HasPrefix(key, cachePrefix) { @@ -167,32 +183,20 @@ func (c *Core) ClearTableFieldsAll(ctx context.Context) (err error) { } } if len(removedKeys) > 0 { - tableFieldsMap.Removes(removedKeys) + err = c.db.GetCache().Removes(ctx, removedKeys) } return } -// ClearCache removes cached sql result of certain table. -func (c *Core) ClearCache(ctx context.Context, table string) (err error) { - return c.db.GetCache().Clear(ctx) -} - // ClearCacheAll removes all cached sql result from cache func (c *Core) ClearCacheAll(ctx context.Context) (err error) { - return c.db.GetCache().Clear(ctx) -} - -func (c *Core) makeSelectCacheKey(name, schema, table, sql string, args ...interface{}) string { - if name == "" { - name = fmt.Sprintf( - `%s@%s#%s:%s`, - c.db.GetGroup(), - schema, - table, - gmd5.MustEncryptString(sql+", @PARAMS:"+gconv.String(args)), - ) + if err = c.db.GetCache().Clear(ctx); err != nil { + return err } - return fmt.Sprintf(`%s%s`, cachePrefixSelectCache, name) + if err = c.GetInnerMemCache().Clear(ctx); err != nil { + return err + } + return } // HasField determine whether the field exists in the table. diff --git a/database/gdb/gdb_driver_wrapper_db.go b/database/gdb/gdb_driver_wrapper_db.go index ac0d7bffd..4a6670627 100644 --- a/database/gdb/gdb_driver_wrapper_db.go +++ b/database/gdb/gdb_driver_wrapper_db.go @@ -11,10 +11,12 @@ import ( "database/sql" "fmt" + "github.com/gogf/gf/v2/container/gvar" "github.com/gogf/gf/v2/encoding/gjson" "github.com/gogf/gf/v2/errors/gcode" "github.com/gogf/gf/v2/errors/gerror" "github.com/gogf/gf/v2/internal/intlog" + "github.com/gogf/gf/v2/os/gcache" "github.com/gogf/gf/v2/text/gstr" "github.com/gogf/gf/v2/util/gutil" ) @@ -68,25 +70,29 @@ func (d *DriverWrapperDB) TableFields( ) } var ( + innerMemCache = d.GetCore().GetInnerMemCache() // prefix:group@schema#table - cacheKey = fmt.Sprintf( - `%s%s@%s#%s`, - cachePrefixTableFields, + cacheKey = genTableFieldsCacheKey( d.GetGroup(), gutil.GetOrDefaultStr(d.GetSchema(), schema...), table, ) - value = tableFieldsMap.GetOrSetFuncLock(cacheKey, func() interface{} { - ctx = context.WithValue(ctx, ctxKeyInternalProducedSQL, struct{}{}) - fields, err = d.DB.TableFields(ctx, table, schema...) - if err != nil { - return nil - } - return fields - }) + cacheFunc = func(ctx context.Context) (interface{}, error) { + return d.DB.TableFields( + context.WithValue(ctx, ctxKeyInternalProducedSQL, struct{}{}), + table, schema..., + ) + } + value *gvar.Var ) - if value != nil { - fields = value.(map[string]*TableField) + value, err = innerMemCache.GetOrSetFuncLock( + ctx, cacheKey, cacheFunc, gcache.DurationNoExpire, + ) + if err != nil { + return + } + if !value.IsNil() { + fields = value.Val().(map[string]*TableField) } return } diff --git a/database/gdb/gdb_func.go b/database/gdb/gdb_func.go index 00a00462a..c065003d0 100644 --- a/database/gdb/gdb_func.go +++ b/database/gdb/gdb_func.go @@ -16,6 +16,7 @@ import ( "time" "github.com/gogf/gf/v2/container/garray" + "github.com/gogf/gf/v2/encoding/ghash" "github.com/gogf/gf/v2/encoding/gjson" "github.com/gogf/gf/v2/internal/empty" "github.com/gogf/gf/v2/internal/intlog" @@ -944,3 +945,26 @@ func FormatMultiLineSqlToSingle(sql string) (string, error) { } return sql, nil } + +func genTableFieldsCacheKey(group, schema, table string) string { + return fmt.Sprintf( + `%s%s@%s#%s`, + cachePrefixTableFields, + group, + schema, + table, + ) +} + +func genSelectCacheKey(table, group, schema, name, sql string, args ...interface{}) string { + if name == "" { + name = fmt.Sprintf( + `%s@%s#%s:%d`, + table, + group, + schema, + ghash.BKDR64([]byte(sql+", @PARAMS:"+gconv.String(args))), + ) + } + return fmt.Sprintf(`%s%s`, cachePrefixSelectCache, name) +} diff --git a/database/gdb/gdb_model_cache.go b/database/gdb/gdb_model_cache.go index 2bdab1c8e..c1b6ca820 100644 --- a/database/gdb/gdb_model_cache.go +++ b/database/gdb/gdb_model_cache.go @@ -142,10 +142,17 @@ func (m *Model) saveSelectResultToCache( } func (m *Model) makeSelectCacheKey(sql string, args ...interface{}) string { - return m.db.GetCore().makeSelectCacheKey( - m.cacheOption.Name, - m.db.GetSchema(), - m.db.GetCore().guessPrimaryTableName(m.tables), + var ( + table = m.db.GetCore().guessPrimaryTableName(m.tables) + group = m.db.GetGroup() + schema = m.db.GetSchema() + customName = m.cacheOption.Name + ) + return genSelectCacheKey( + table, + group, + schema, + customName, sql, args..., ) diff --git a/database/gdb/gdb_model_soft_time.go b/database/gdb/gdb_model_soft_time.go index ff972e633..c2817d12a 100644 --- a/database/gdb/gdb_model_soft_time.go +++ b/database/gdb/gdb_model_soft_time.go @@ -190,43 +190,51 @@ func (m *softTimeMaintainer) getSoftFieldNameAndType( schema string, table string, checkFiledNames []string, ) (fieldName string, fieldType LocalType) { var ( - cacheKey = fmt.Sprintf(`getSoftFieldNameAndType:%s#%s#%s`, schema, table, strings.Join(checkFiledNames, "_")) + innerMemCache = m.db.GetCore().GetInnerMemCache() + cacheKey = fmt.Sprintf( + `getSoftFieldNameAndType:%s#%s#%s`, + schema, table, strings.Join(checkFiledNames, "_"), + ) cacheDuration = gcache.DurationNoExpire cacheFunc = func(ctx context.Context) (value interface{}, err error) { // Ignore the error from TableFields. - fieldsMap, _ := m.TableFields(table, schema) - if len(fieldsMap) > 0 { - for _, checkFiledName := range checkFiledNames { - fieldName, _ = gutil.MapPossibleItemByKey( - gconv.Map(fieldsMap), checkFiledName, + fieldsMap, err := m.TableFields(table, schema) + if err != nil { + return nil, err + } + if len(fieldsMap) == 0 { + return nil, nil + } + for _, checkFiledName := range checkFiledNames { + fieldName, _ = gutil.MapPossibleItemByKey( + gconv.Map(fieldsMap), checkFiledName, + ) + if fieldName != "" { + fieldType, _ = m.db.CheckLocalTypeForField( + ctx, fieldsMap[fieldName].Type, nil, ) - if fieldName != "" { - fieldType, _ = m.db.CheckLocalTypeForField( - ctx, fieldsMap[fieldName].Type, nil, - ) - var cacheItem = getSoftFieldNameAndTypeCacheItem{ - FieldName: fieldName, - FieldType: fieldType, - } - return cacheItem, nil + var cacheItem = getSoftFieldNameAndTypeCacheItem{ + FieldName: fieldName, + FieldType: fieldType, } + return cacheItem, nil } } return } ) - result, err := gcache.GetOrSetFunc(ctx, cacheKey, cacheFunc, cacheDuration) + result, err := innerMemCache.GetOrSetFunc( + ctx, cacheKey, cacheFunc, cacheDuration, + ) if err != nil { - intlog.Error(ctx, err) + return } - if result != nil { - var cacheItem getSoftFieldNameAndTypeCacheItem - if err = result.Scan(&cacheItem); err != nil { - return "", "" - } - fieldName = cacheItem.FieldName - fieldType = cacheItem.FieldType + if result == nil { + return } + cacheItem := result.Val().(getSoftFieldNameAndTypeCacheItem) + fieldName = cacheItem.FieldName + fieldType = cacheItem.FieldType return } diff --git a/net/ghttp/ghttp_server.go b/net/ghttp/ghttp_server.go index 1da5ce4a3..75944a489 100644 --- a/net/ghttp/ghttp_server.go +++ b/net/ghttp/ghttp_server.go @@ -569,43 +569,45 @@ func (s *Server) startServer(fdMap listenerFdMap) { } // Start listening asynchronously. serverRunning.Add(1) - var wg = sync.WaitGroup{} - for _, v := range s.servers { + var wg = &sync.WaitGroup{} + for _, gs := range s.servers { wg.Add(1) - go func(server *gracefulServer) { - s.serverCount.Add(1) - var err error - // Create listener. - if server.isHttps { - err = server.CreateListenerTLS( - s.config.HTTPSCertPath, s.config.HTTPSKeyPath, s.config.TLSConfig, - ) - } else { - err = server.CreateListener() - } - if err != nil { - s.Logger().Fatalf(ctx, `%+v`, err) - } - wg.Done() - // Start listening and serving in blocking way. - err = server.Serve(ctx) - // The process exits if the server is closed with none closing error. - if err != nil && !strings.EqualFold(http.ErrServerClosed.Error(), err.Error()) { - s.Logger().Fatalf(ctx, `%+v`, err) - } - // If all the underlying servers' shutdown, the process exits. - if s.serverCount.Add(-1) < 1 { - s.closeChan <- struct{}{} - if serverRunning.Add(-1) < 1 { - serverMapping.Remove(s.instance) - allShutdownChan <- struct{}{} - } - } - }(v) + go s.startGracefulServer(ctx, wg, gs) } wg.Wait() } +func (s *Server) startGracefulServer(ctx context.Context, wg *sync.WaitGroup, server *gracefulServer) { + s.serverCount.Add(1) + var err error + // Create listener. + if server.isHttps { + err = server.CreateListenerTLS( + s.config.HTTPSCertPath, s.config.HTTPSKeyPath, s.config.TLSConfig, + ) + } else { + err = server.CreateListener() + } + if err != nil { + s.Logger().Fatalf(ctx, `%+v`, err) + } + wg.Done() + // Start listening and serving in blocking way. + err = server.Serve(ctx) + // The process exits if the server is closed with none closing error. + if err != nil && !strings.EqualFold(http.ErrServerClosed.Error(), err.Error()) { + s.Logger().Fatalf(ctx, `%+v`, err) + } + // If all the underlying servers' shutdown, the process exits. + if s.serverCount.Add(-1) < 1 { + s.closeChan <- struct{}{} + if serverRunning.Add(-1) < 1 { + serverMapping.Remove(s.instance) + allShutdownChan <- struct{}{} + } + } +} + // Status retrieves and returns the server status. func (s *Server) Status() ServerStatus { if serverRunning.Val() == 0 { diff --git a/os/gfsnotify/gfsnotify.go b/os/gfsnotify/gfsnotify.go index be58e2c37..75a76edf5 100644 --- a/os/gfsnotify/gfsnotify.go +++ b/os/gfsnotify/gfsnotify.go @@ -96,8 +96,8 @@ func New() (*Watcher, error) { intlog.Printf(context.TODO(), "New watcher failed: %v", err) return nil, err } - w.watchLoop() - w.eventLoop() + go w.watchLoop() + go w.eventLoop() return w, nil } diff --git a/os/gfsnotify/gfsnotify_watcher_loop.go b/os/gfsnotify/gfsnotify_watcher_loop.go index 4ebca3ad8..fd1d37f63 100644 --- a/os/gfsnotify/gfsnotify_watcher_loop.go +++ b/os/gfsnotify/gfsnotify_watcher_loop.go @@ -18,136 +18,132 @@ import ( // watchLoop starts the loop for event listening from underlying inotify monitor. func (w *Watcher) watchLoop() { - go func() { - for { - select { - // Close event. - case <-w.closeChan: + for { + select { + // Close event. + case <-w.closeChan: + return + + // Event listening. + case ev, ok := <-w.watcher.Events: + if !ok { return - - // Event listening. - case ev, ok := <-w.watcher.Events: - if !ok { - return - } - // Filter the repeated event in custom duration. - _, err := w.cache.SetIfNotExist( - context.Background(), - ev.String(), - func(ctx context.Context) (value interface{}, err error) { - w.events.Push(&Event{ - event: ev, - Path: ev.Name, - Op: Op(ev.Op), - Watcher: w, - }) - return struct{}{}, nil - }, repeatEventFilterDuration, - ) - if err != nil { - intlog.Errorf(context.TODO(), `%+v`, err) - } - - case err := <-w.watcher.Errors: + } + // Filter the repeated event in custom duration. + _, err := w.cache.SetIfNotExist( + context.Background(), + ev.String(), + func(ctx context.Context) (value interface{}, err error) { + w.events.Push(&Event{ + event: ev, + Path: ev.Name, + Op: Op(ev.Op), + Watcher: w, + }) + return struct{}{}, nil + }, repeatEventFilterDuration, + ) + if err != nil { intlog.Errorf(context.TODO(), `%+v`, err) } + + case err := <-w.watcher.Errors: + intlog.Errorf(context.TODO(), `%+v`, err) } - }() + } } // eventLoop is the core event handler. func (w *Watcher) eventLoop() { - go func() { - for { - if v := w.events.Pop(); v != nil { - event := v.(*Event) - // If there's no any callback of this path, it removes it from monitor. - callbacks := w.getCallbacks(event.Path) - if len(callbacks) == 0 { - _ = w.watcher.Remove(event.Path) - continue - } - switch { - case event.IsRemove(): - // It should check again the existence of the path. - // It adds it back to the monitor if it still exists. - if fileExists(event.Path) { - // It adds the path back to monitor. - // We need no worry about the repeat adding. - if err := w.watcher.Add(event.Path); err != nil { - intlog.Errorf(context.TODO(), `%+v`, err) - } else { - intlog.Printf(context.TODO(), "fake remove event, watcher re-adds monitor for: %s", event.Path) - } - // Change the event to RENAME, which means it renames itself to its origin name. - event.Op = RENAME - } - - case event.IsRename(): - // It should check again the existence of the path. - // It adds it back to the monitor if it still exists. - // Especially Some editors might do RENAME and then CHMOD when it's editing file. - if fileExists(event.Path) { - // It might lost the monitoring for the path, so we add the path back to monitor. - // We need no worry about the repeat adding. - if err := w.watcher.Add(event.Path); err != nil { - intlog.Errorf(context.TODO(), `%+v`, err) - } else { - intlog.Printf(context.TODO(), "fake rename event, watcher re-adds monitor for: %s", event.Path) - } - // Change the event to CHMOD. - event.Op = CHMOD - } - - case event.IsCreate(): - // ========================================= - // Note that it here just adds the path to monitor without any callback registering, - // because its parent already has the callbacks. - // ========================================= - if fileIsDir(event.Path) { - // If it's a folder, it then does adding recursively to monitor. - for _, subPath := range fileAllDirs(event.Path) { - if fileIsDir(subPath) { - if err := w.watcher.Add(subPath); err != nil { - intlog.Errorf(context.TODO(), `%+v`, err) - } else { - intlog.Printf(context.TODO(), "folder creation event, watcher adds monitor for: %s", subPath) - } - } - } - } else { - // If it's a file, it directly adds it to monitor. - if err := w.watcher.Add(event.Path); err != nil { - intlog.Errorf(context.TODO(), `%+v`, err) - } else { - intlog.Printf(context.TODO(), "file creation event, watcher adds monitor for: %s", event.Path) - } - } - } - // Calling the callbacks in order. - for _, callback := range callbacks { - go func(callback *Callback) { - defer func() { - if err := recover(); err != nil { - switch err { - case callbackExitEventPanicStr: - w.RemoveCallback(callback.Id) - default: - if e, ok := err.(error); ok { - panic(gerror.WrapCode(gcode.CodeInternalPanic, e)) - } - panic(err) - } - } - }() - callback.Func(event) - }(callback) - } - } else { - break + for { + if v := w.events.Pop(); v != nil { + event := v.(*Event) + // If there's no any callback of this path, it removes it from monitor. + callbacks := w.getCallbacks(event.Path) + if len(callbacks) == 0 { + _ = w.watcher.Remove(event.Path) + continue } + switch { + case event.IsRemove(): + // It should check again the existence of the path. + // It adds it back to the monitor if it still exists. + if fileExists(event.Path) { + // It adds the path back to monitor. + // We need no worry about the repeat adding. + if err := w.watcher.Add(event.Path); err != nil { + intlog.Errorf(context.TODO(), `%+v`, err) + } else { + intlog.Printf(context.TODO(), "fake remove event, watcher re-adds monitor for: %s", event.Path) + } + // Change the event to RENAME, which means it renames itself to its origin name. + event.Op = RENAME + } + + case event.IsRename(): + // It should check again the existence of the path. + // It adds it back to the monitor if it still exists. + // Especially Some editors might do RENAME and then CHMOD when it's editing file. + if fileExists(event.Path) { + // It might lost the monitoring for the path, so we add the path back to monitor. + // We need no worry about the repeat adding. + if err := w.watcher.Add(event.Path); err != nil { + intlog.Errorf(context.TODO(), `%+v`, err) + } else { + intlog.Printf(context.TODO(), "fake rename event, watcher re-adds monitor for: %s", event.Path) + } + // Change the event to CHMOD. + event.Op = CHMOD + } + + case event.IsCreate(): + // ========================================= + // Note that it here just adds the path to monitor without any callback registering, + // because its parent already has the callbacks. + // ========================================= + if fileIsDir(event.Path) { + // If it's a folder, it then does adding recursively to monitor. + for _, subPath := range fileAllDirs(event.Path) { + if fileIsDir(subPath) { + if err := w.watcher.Add(subPath); err != nil { + intlog.Errorf(context.TODO(), `%+v`, err) + } else { + intlog.Printf(context.TODO(), "folder creation event, watcher adds monitor for: %s", subPath) + } + } + } + } else { + // If it's a file, it directly adds it to monitor. + if err := w.watcher.Add(event.Path); err != nil { + intlog.Errorf(context.TODO(), `%+v`, err) + } else { + intlog.Printf(context.TODO(), "file creation event, watcher adds monitor for: %s", event.Path) + } + } + } + // Calling the callbacks in order. + for _, callback := range callbacks { + go func(callback *Callback) { + defer func() { + if err := recover(); err != nil { + switch err { + case callbackExitEventPanicStr: + w.RemoveCallback(callback.Id) + default: + if e, ok := err.(error); ok { + panic(gerror.WrapCode(gcode.CodeInternalPanic, e)) + } + panic(err) + } + } + }() + callback.Func(event) + }(callback) + } + } else { + break } - }() + } } // getCallbacks searches and returns all callbacks with given `path`. diff --git a/os/grpool/grpool_pool.go b/os/grpool/grpool_pool.go index 707e37e13..a2fc9b14b 100644 --- a/os/grpool/grpool_pool.go +++ b/os/grpool/grpool_pool.go @@ -99,21 +99,23 @@ func (p *Pool) checkAndForkNewGoroutineWorker() { } // Create job function in goroutine. - go func() { - defer p.count.Add(-1) - - var ( - listItem interface{} - poolItem *localPoolItem - ) - // Harding working, one by one, job never empty, worker never die. - for !p.closed.Val() { - listItem = p.list.PopBack() - if listItem == nil { - return - } - poolItem = listItem.(*localPoolItem) - poolItem.Func(poolItem.Ctx) - } - }() + go p.asynchronousWorker() +} + +func (p *Pool) asynchronousWorker() { + defer p.count.Add(-1) + + var ( + listItem interface{} + poolItem *localPoolItem + ) + // Harding working, one by one, job never empty, worker never die. + for !p.closed.Val() { + listItem = p.list.PopBack() + if listItem == nil { + return + } + poolItem = listItem.(*localPoolItem) + poolItem.Func(poolItem.Ctx) + } } diff --git a/os/gtimer/gtimer_entry.go b/os/gtimer/gtimer_entry.go index 0e3d1ac59..fd7668898 100644 --- a/os/gtimer/gtimer_entry.go +++ b/os/gtimer/gtimer_entry.go @@ -8,6 +8,7 @@ package gtimer import ( "context" + "github.com/gogf/gf/v2/errors/gcode" "github.com/gogf/gf/v2/container/gtype" @@ -45,26 +46,29 @@ func (entry *Entry) Run() { return } } - go func() { - defer func() { - if exception := recover(); exception != nil { - if exception != panicExit { - if v, ok := exception.(error); ok && gerror.HasStack(v) { - panic(v) - } else { - panic(gerror.NewCodef(gcode.CodeInternalPanic, "exception recovered: %+v", exception)) - } + go entry.callJobFunc() +} + +// callJobFunc executes the job function in entry. +func (entry *Entry) callJobFunc() { + defer func() { + if exception := recover(); exception != nil { + if exception != panicExit { + if v, ok := exception.(error); ok && gerror.HasStack(v) { + panic(v) } else { - entry.Close() - return + panic(gerror.NewCodef(gcode.CodeInternalPanic, "exception recovered: %+v", exception)) } + } else { + entry.Close() + return } - if entry.Status() == StatusRunning { - entry.SetStatus(StatusReady) - } - }() - entry.job(entry.ctx) + } + if entry.Status() == StatusRunning { + entry.SetStatus(StatusReady) + } }() + entry.job(entry.ctx) } // doCheckAndRunByTicks checks the if job can run in given timer ticks, diff --git a/os/gtimer/gtimer_timer_loop.go b/os/gtimer/gtimer_timer_loop.go index ae94bd311..7fbdf04af 100644 --- a/os/gtimer/gtimer_timer_loop.go +++ b/os/gtimer/gtimer_timer_loop.go @@ -10,40 +10,36 @@ import "time" // loop starts the ticker using a standalone goroutine. func (t *Timer) loop() { - go func() { - var ( - currentTimerTicks int64 - timerIntervalTicker = time.NewTicker(t.options.Interval) - ) - defer timerIntervalTicker.Stop() - for { - select { - case <-timerIntervalTicker.C: - // Check the timer status. - switch t.status.Val() { - case StatusRunning: - // Timer proceeding. - if currentTimerTicks = t.ticks.Add(1); currentTimerTicks >= t.queue.NextPriority() { - t.proceed(currentTimerTicks) - } - - case StatusStopped: - // Do nothing. - - case StatusClosed: - // Timer exits. - return + var ( + currentTimerTicks int64 + timerIntervalTicker = time.NewTicker(t.options.Interval) + ) + defer timerIntervalTicker.Stop() + for { + select { + case <-timerIntervalTicker.C: + // Check the timer status. + switch t.status.Val() { + case StatusRunning: + // Timer proceeding. + if currentTimerTicks = t.ticks.Add(1); currentTimerTicks >= t.queue.NextPriority() { + t.proceed(currentTimerTicks) } + + case StatusStopped: + // Do nothing. + + case StatusClosed: + // Timer exits. + return } } - }() + } } // proceed function proceeds the timer job checking and running logic. func (t *Timer) proceed(currentTimerTicks int64) { - var ( - value interface{} - ) + var value interface{} for { value = t.queue.Pop() if value == nil {