mirror of
https://github.com/gogf/gf.git
synced 2025-04-05 11:18:50 +08:00
feat(os/gfsnotify): add recursive watching for created subfolders and sub-files under folders that already watched (#3830)
This commit is contained in:
parent
38622f966f
commit
459c8ce84e
@ -99,7 +99,7 @@ func (p *Pool) File() (*File, error) {
|
|||||||
}
|
}
|
||||||
// It firstly checks using !p.init.Val() for performance purpose.
|
// It firstly checks using !p.init.Val() for performance purpose.
|
||||||
if !p.init.Val() && p.init.Cas(false, true) {
|
if !p.init.Val() && p.init.Cas(false, true) {
|
||||||
_, _ = gfsnotify.Add(f.path, func(event *gfsnotify.Event) {
|
var watchCallback = func(event *gfsnotify.Event) {
|
||||||
// If the file is removed or renamed, recreates the pool by increasing the pool id.
|
// If the file is removed or renamed, recreates the pool by increasing the pool id.
|
||||||
if event.IsRemove() || event.IsRename() {
|
if event.IsRemove() || event.IsRename() {
|
||||||
// It drops the old pool.
|
// It drops the old pool.
|
||||||
@ -110,7 +110,8 @@ func (p *Pool) File() (*File, error) {
|
|||||||
// Whenever the pool id changes, the pool will be recreated.
|
// Whenever the pool id changes, the pool will be recreated.
|
||||||
p.id.Add(1)
|
p.id.Add(1)
|
||||||
}
|
}
|
||||||
}, false)
|
}
|
||||||
|
_, _ = gfsnotify.Add(f.path, watchCallback, gfsnotify.WatchOption{NoRecursive: true})
|
||||||
}
|
}
|
||||||
return f, nil
|
return f, nil
|
||||||
}
|
}
|
||||||
|
@ -42,7 +42,7 @@ type Callback struct {
|
|||||||
Path string // Bound file path (absolute).
|
Path string // Bound file path (absolute).
|
||||||
name string // Registered name for AddOnce.
|
name string // Registered name for AddOnce.
|
||||||
elem *glist.Element // Element in the callbacks of watcher.
|
elem *glist.Element // Element in the callbacks of watcher.
|
||||||
recursive bool // Is bound to path recursively or not.
|
recursive bool // Is bound to sub-path recursively or not.
|
||||||
}
|
}
|
||||||
|
|
||||||
// Event is the event produced by underlying fsnotify.
|
// Event is the event produced by underlying fsnotify.
|
||||||
@ -53,6 +53,15 @@ type Event struct {
|
|||||||
Watcher *Watcher // Parent watcher.
|
Watcher *Watcher // Parent watcher.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WatchOption holds the option for watching.
|
||||||
|
type WatchOption struct {
|
||||||
|
// NoRecursive explicitly specifies no recursive watching.
|
||||||
|
// Recursive watching will also watch all its current and following created subfolders and sub-files.
|
||||||
|
//
|
||||||
|
// Note that the recursive watching is enabled in default.
|
||||||
|
NoRecursive bool
|
||||||
|
}
|
||||||
|
|
||||||
// Op is the bits union for file operations.
|
// Op is the bits union for file operations.
|
||||||
type Op uint32
|
type Op uint32
|
||||||
|
|
||||||
@ -75,13 +84,15 @@ const (
|
|||||||
var (
|
var (
|
||||||
mu sync.Mutex // Mutex for concurrent safety of defaultWatcher.
|
mu sync.Mutex // Mutex for concurrent safety of defaultWatcher.
|
||||||
defaultWatcher *Watcher // Default watcher.
|
defaultWatcher *Watcher // Default watcher.
|
||||||
callbackIdMap = gmap.NewIntAnyMap(true) // Id to callback mapping.
|
callbackIdMap = gmap.NewIntAnyMap(true) // Global callback id to callback function mapping.
|
||||||
callbackIdGenerator = gtype.NewInt() // Atomic id generator for callback.
|
callbackIdGenerator = gtype.NewInt() // Atomic id generator for callback.
|
||||||
)
|
)
|
||||||
|
|
||||||
// New creates and returns a new watcher.
|
// New creates and returns a new watcher.
|
||||||
// Note that the watcher number is limited by the file handle setting of the system.
|
// Note that the watcher number is limited by the file handle setting of the system.
|
||||||
// Eg: fs.inotify.max_user_instances system variable in linux systems.
|
// Example: fs.inotify.max_user_instances system variable in linux systems.
|
||||||
|
//
|
||||||
|
// In most case, you can use the default watcher for usage instead of creating one.
|
||||||
func New() (*Watcher, error) {
|
func New() (*Watcher, error) {
|
||||||
w := &Watcher{
|
w := &Watcher{
|
||||||
cache: gcache.New(),
|
cache: gcache.New(),
|
||||||
@ -102,26 +113,30 @@ func New() (*Watcher, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Add monitors `path` using default watcher with callback function `callbackFunc`.
|
// Add monitors `path` using default watcher with callback function `callbackFunc`.
|
||||||
|
//
|
||||||
|
// The parameter `path` can be either a file or a directory path.
|
||||||
// The optional parameter `recursive` specifies whether monitoring the `path` recursively, which is true in default.
|
// The optional parameter `recursive` specifies whether monitoring the `path` recursively, which is true in default.
|
||||||
func Add(path string, callbackFunc func(event *Event), recursive ...bool) (callback *Callback, err error) {
|
func Add(path string, callbackFunc func(event *Event), option ...WatchOption) (callback *Callback, err error) {
|
||||||
w, err := getDefaultWatcher()
|
w, err := getDefaultWatcher()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return w.Add(path, callbackFunc, recursive...)
|
return w.Add(path, callbackFunc, option...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddOnce monitors `path` using default watcher with callback function `callbackFunc` only once using unique name `name`.
|
// AddOnce monitors `path` using default watcher with callback function `callbackFunc` only once using unique name `name`.
|
||||||
// If AddOnce is called multiple times with the same `name` parameter, `path` is only added to monitor once. It returns error
|
|
||||||
// if it's called twice with the same `name`.
|
|
||||||
//
|
//
|
||||||
|
// If AddOnce is called multiple times with the same `name` parameter, `path` is only added to monitor once.
|
||||||
|
// It returns error if it's called twice with the same `name`.
|
||||||
|
//
|
||||||
|
// The parameter `path` can be either a file or a directory path.
|
||||||
// The optional parameter `recursive` specifies whether monitoring the `path` recursively, which is true in default.
|
// The optional parameter `recursive` specifies whether monitoring the `path` recursively, which is true in default.
|
||||||
func AddOnce(name, path string, callbackFunc func(event *Event), recursive ...bool) (callback *Callback, err error) {
|
func AddOnce(name, path string, callbackFunc func(event *Event), option ...WatchOption) (callback *Callback, err error) {
|
||||||
w, err := getDefaultWatcher()
|
w, err := getDefaultWatcher()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return w.AddOnce(name, path, callbackFunc, recursive...)
|
return w.AddOnce(name, path, callbackFunc, option...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove removes all monitoring callbacks of given `path` from watcher recursively.
|
// Remove removes all monitoring callbacks of given `path` from watcher recursively.
|
||||||
|
@ -106,14 +106,12 @@ func doFileScanDir(path string, pattern string, recursive ...bool) ([]string, er
|
|||||||
file, err = os.Open(path)
|
file, err = os.Open(path)
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = gerror.Wrapf(err, `os.Open failed for path "%s"`, path)
|
return nil, gerror.Wrapf(err, `os.Open failed for path "%s"`, path)
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
defer file.Close()
|
defer file.Close()
|
||||||
names, err := file.Readdirnames(-1)
|
names, err := file.Readdirnames(-1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = gerror.Wrapf(err, `read directory files failed for path "%s"`, path)
|
return nil, gerror.Wrapf(err, `read directory files failed for path "%s"`, path)
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
filePath := ""
|
filePath := ""
|
||||||
for _, name := range names {
|
for _, name := range names {
|
||||||
|
@ -16,10 +16,14 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// Add monitors `path` with callback function `callbackFunc` to the watcher.
|
// Add monitors `path` with callback function `callbackFunc` to the watcher.
|
||||||
|
//
|
||||||
|
// The parameter `path` can be either a file or a directory path.
|
||||||
// The optional parameter `recursive` specifies whether monitoring the `path` recursively,
|
// The optional parameter `recursive` specifies whether monitoring the `path` recursively,
|
||||||
// which is true in default.
|
// which is true in default.
|
||||||
func (w *Watcher) Add(path string, callbackFunc func(event *Event), recursive ...bool) (callback *Callback, err error) {
|
func (w *Watcher) Add(
|
||||||
return w.AddOnce("", path, callbackFunc, recursive...)
|
path string, callbackFunc func(event *Event), option ...WatchOption,
|
||||||
|
) (callback *Callback, err error) {
|
||||||
|
return w.AddOnce("", path, callbackFunc, option...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddOnce monitors `path` with callback function `callbackFunc` only once using unique name
|
// AddOnce monitors `path` with callback function `callbackFunc` only once using unique name
|
||||||
@ -28,26 +32,40 @@ func (w *Watcher) Add(path string, callbackFunc func(event *Event), recursive ..
|
|||||||
//
|
//
|
||||||
// It returns error if it's called twice with the same `name`.
|
// It returns error if it's called twice with the same `name`.
|
||||||
//
|
//
|
||||||
|
// The parameter `path` can be either a file or a directory path.
|
||||||
// The optional parameter `recursive` specifies whether monitoring the `path` recursively,
|
// The optional parameter `recursive` specifies whether monitoring the `path` recursively,
|
||||||
// which is true in default.
|
// which is true in default.
|
||||||
func (w *Watcher) AddOnce(name, path string, callbackFunc func(event *Event), recursive ...bool) (callback *Callback, err error) {
|
func (w *Watcher) AddOnce(
|
||||||
|
name, path string, callbackFunc func(event *Event), option ...WatchOption,
|
||||||
|
) (callback *Callback, err error) {
|
||||||
|
var watchOption = w.getWatchOption(option...)
|
||||||
w.nameSet.AddIfNotExistFuncLock(name, func() bool {
|
w.nameSet.AddIfNotExistFuncLock(name, func() bool {
|
||||||
// Firstly add the path to watcher.
|
// Firstly add the path to watcher.
|
||||||
callback, err = w.addWithCallbackFunc(name, path, callbackFunc, recursive...)
|
//
|
||||||
|
// A path can only be watched once; watching it more than once is a no-op and will
|
||||||
|
// not return an error.
|
||||||
|
callback, err = w.addWithCallbackFunc(
|
||||||
|
name, path, callbackFunc, option...,
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
// If it's recursive adding, it then adds all sub-folders to the monitor.
|
// If it's recursive adding, it then adds all sub-folders to the monitor.
|
||||||
// NOTE:
|
// NOTE:
|
||||||
// 1. It only recursively adds **folders** to the monitor, NOT files,
|
// 1. It only recursively adds **folders** to the monitor, NOT files,
|
||||||
// because if the folders are monitored and their sub-files are also monitored.
|
// because if the folders are monitored and their sub-files are also monitored.
|
||||||
// 2. It bounds no callbacks to the folders, because it will search the callbacks
|
// 2. It bounds no callbacks to the folders, because it will search the callbacks
|
||||||
// from its parent recursively if any event produced.
|
// from its parent recursively if any event produced.
|
||||||
if fileIsDir(path) && (len(recursive) == 0 || recursive[0]) {
|
if fileIsDir(path) && !watchOption.NoRecursive {
|
||||||
for _, subPath := range fileAllDirs(path) {
|
for _, subPath := range fileAllDirs(path) {
|
||||||
if fileIsDir(subPath) {
|
if fileIsDir(subPath) {
|
||||||
if err = w.watcher.Add(subPath); err != nil {
|
if watchAddErr := w.watcher.Add(subPath); watchAddErr != nil {
|
||||||
err = gerror.Wrapf(err, `add watch failed for path "%s"`, subPath)
|
err = gerror.Wrapf(
|
||||||
|
err,
|
||||||
|
`add watch failed for path "%s", err: %s`,
|
||||||
|
subPath, watchAddErr.Error(),
|
||||||
|
)
|
||||||
} else {
|
} else {
|
||||||
intlog.Printf(context.TODO(), "watcher adds monitor for: %s", subPath)
|
intlog.Printf(context.TODO(), "watcher adds monitor for: %s", subPath)
|
||||||
}
|
}
|
||||||
@ -62,14 +80,24 @@ func (w *Watcher) AddOnce(name, path string, callbackFunc func(event *Event), re
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (w *Watcher) getWatchOption(option ...WatchOption) WatchOption {
|
||||||
|
if len(option) > 0 {
|
||||||
|
return option[0]
|
||||||
|
}
|
||||||
|
return WatchOption{}
|
||||||
|
}
|
||||||
|
|
||||||
// addWithCallbackFunc adds the path to underlying monitor, creates and returns a callback object.
|
// addWithCallbackFunc adds the path to underlying monitor, creates and returns a callback object.
|
||||||
// Very note that if it calls multiple times with the same `path`, the latest one will overwrite the previous one.
|
// Very note that if it calls multiple times with the same `path`, the latest one will overwrite the previous one.
|
||||||
func (w *Watcher) addWithCallbackFunc(name, path string, callbackFunc func(event *Event), recursive ...bool) (callback *Callback, err error) {
|
func (w *Watcher) addWithCallbackFunc(
|
||||||
|
name, path string, callbackFunc func(event *Event), option ...WatchOption,
|
||||||
|
) (callback *Callback, err error) {
|
||||||
|
var watchOption = w.getWatchOption(option...)
|
||||||
// Check and convert the given path to absolute path.
|
// Check and convert the given path to absolute path.
|
||||||
if t := fileRealPath(path); t == "" {
|
if realPath := fileRealPath(path); realPath == "" {
|
||||||
return nil, gerror.NewCodef(gcode.CodeInvalidParameter, `"%s" does not exist`, path)
|
return nil, gerror.NewCodef(gcode.CodeInvalidParameter, `"%s" does not exist`, path)
|
||||||
} else {
|
} else {
|
||||||
path = t
|
path = realPath
|
||||||
}
|
}
|
||||||
// Create callback object.
|
// Create callback object.
|
||||||
callback = &Callback{
|
callback = &Callback{
|
||||||
@ -77,10 +105,7 @@ func (w *Watcher) addWithCallbackFunc(name, path string, callbackFunc func(event
|
|||||||
Func: callbackFunc,
|
Func: callbackFunc,
|
||||||
Path: path,
|
Path: path,
|
||||||
name: name,
|
name: name,
|
||||||
recursive: true,
|
recursive: !watchOption.NoRecursive,
|
||||||
}
|
|
||||||
if len(recursive) > 0 {
|
|
||||||
callback.recursive = recursive[0]
|
|
||||||
}
|
}
|
||||||
// Register the callback to watcher.
|
// Register the callback to watcher.
|
||||||
w.callbacks.LockFunc(func(m map[string]interface{}) {
|
w.callbacks.LockFunc(func(m map[string]interface{}) {
|
||||||
@ -113,74 +138,50 @@ func (w *Watcher) Close() {
|
|||||||
w.events.Close()
|
w.events.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove removes monitor and all callbacks associated with the `path` recursively.
|
// Remove removes watching and all callbacks associated with the `path` recursively.
|
||||||
|
// Note that, it's recursive in default if given `path` is a directory.
|
||||||
func (w *Watcher) Remove(path string) error {
|
func (w *Watcher) Remove(path string) error {
|
||||||
// Firstly remove the callbacks of the path.
|
var (
|
||||||
if value := w.callbacks.Remove(path); value != nil {
|
err error
|
||||||
list := value.(*glist.List)
|
subPaths []string
|
||||||
for {
|
removedPaths = make([]string, 0)
|
||||||
if item := list.PopFront(); item != nil {
|
)
|
||||||
callbackIdMap.Remove(item.(*Callback).Id)
|
removedPaths = append(removedPaths, path)
|
||||||
} else {
|
if fileIsDir(path) {
|
||||||
break
|
subPaths, err = fileScanDir(path, "*", true)
|
||||||
}
|
if err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
removedPaths = append(removedPaths, subPaths...)
|
||||||
}
|
}
|
||||||
// Secondly remove monitor of all sub-files which have no callbacks.
|
|
||||||
if subPaths, err := fileScanDir(path, "*", true); err == nil && len(subPaths) > 0 {
|
for _, removedPath := range removedPaths {
|
||||||
for _, subPath := range subPaths {
|
// remove the callbacks of the path.
|
||||||
if w.checkPathCanBeRemoved(subPath) {
|
if value := w.callbacks.Remove(removedPath); value != nil {
|
||||||
if internalErr := w.watcher.Remove(subPath); internalErr != nil {
|
list := value.(*glist.List)
|
||||||
intlog.Errorf(context.TODO(), `%+v`, internalErr)
|
for {
|
||||||
|
if item := list.PopFront(); item != nil {
|
||||||
|
callbackIdMap.Remove(item.(*Callback).Id)
|
||||||
|
} else {
|
||||||
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
// remove the monitor of the path from underlying monitor.
|
||||||
// Lastly remove the monitor of the path from underlying monitor.
|
if watcherRemoveErr := w.watcher.Remove(removedPath); watcherRemoveErr != nil {
|
||||||
err := w.watcher.Remove(path)
|
err = gerror.Wrapf(
|
||||||
if err != nil {
|
err,
|
||||||
err = gerror.Wrapf(err, `remove watch failed for path "%s"`, path)
|
`remove watch failed for path "%s", err: %s`,
|
||||||
|
removedPath, watcherRemoveErr.Error(),
|
||||||
|
)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// checkPathCanBeRemoved checks whether the given path have no callbacks bound.
|
|
||||||
func (w *Watcher) checkPathCanBeRemoved(path string) bool {
|
|
||||||
// Firstly check the callbacks in the watcher directly.
|
|
||||||
if v := w.callbacks.Get(path); v != nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
// Secondly check its parent whether has callbacks.
|
|
||||||
dirPath := fileDir(path)
|
|
||||||
if v := w.callbacks.Get(dirPath); v != nil {
|
|
||||||
for _, c := range v.(*glist.List).FrontAll() {
|
|
||||||
if c.(*Callback).recursive {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
// Recursively check its parent.
|
|
||||||
parentDirPath := ""
|
|
||||||
for {
|
|
||||||
parentDirPath = fileDir(dirPath)
|
|
||||||
if parentDirPath == dirPath {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
if v := w.callbacks.Get(parentDirPath); v != nil {
|
|
||||||
for _, c := range v.(*glist.List).FrontAll() {
|
|
||||||
if c.(*Callback).recursive {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
dirPath = parentDirPath
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
// RemoveCallback removes callback with given callback id from watcher.
|
// RemoveCallback removes callback with given callback id from watcher.
|
||||||
|
//
|
||||||
|
// Note that, it auto removes the path watching if there's no callback bound on it.
|
||||||
func (w *Watcher) RemoveCallback(callbackId int) {
|
func (w *Watcher) RemoveCallback(callbackId int) {
|
||||||
callback := (*Callback)(nil)
|
callback := (*Callback)(nil)
|
||||||
if r := callbackIdMap.Get(callbackId); r != nil {
|
if r := callbackIdMap.Get(callbackId); r != nil {
|
||||||
|
@ -8,7 +8,6 @@ package gfsnotify
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"github.com/gogf/gf/v2/errors/gcode"
|
"github.com/gogf/gf/v2/errors/gcode"
|
||||||
"github.com/gogf/gf/v2/errors/gerror"
|
"github.com/gogf/gf/v2/errors/gerror"
|
||||||
|
|
||||||
@ -20,33 +19,36 @@ import (
|
|||||||
func (w *Watcher) watchLoop() {
|
func (w *Watcher) watchLoop() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
// Close event.
|
// close event.
|
||||||
case <-w.closeChan:
|
case <-w.closeChan:
|
||||||
return
|
return
|
||||||
|
|
||||||
// Event listening.
|
// event listening.
|
||||||
case ev, ok := <-w.watcher.Events:
|
case ev, ok := <-w.watcher.Events:
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// Filter the repeated event in custom duration.
|
// filter the repeated event in custom duration.
|
||||||
|
var cacheFunc = func(ctx context.Context) (value interface{}, err error) {
|
||||||
|
w.events.Push(&Event{
|
||||||
|
event: ev,
|
||||||
|
Path: ev.Name,
|
||||||
|
Op: Op(ev.Op),
|
||||||
|
Watcher: w,
|
||||||
|
})
|
||||||
|
return struct{}{}, nil
|
||||||
|
}
|
||||||
_, err := w.cache.SetIfNotExist(
|
_, err := w.cache.SetIfNotExist(
|
||||||
context.Background(),
|
context.Background(),
|
||||||
ev.String(),
|
ev.String(),
|
||||||
func(ctx context.Context) (value interface{}, err error) {
|
cacheFunc,
|
||||||
w.events.Push(&Event{
|
repeatEventFilterDuration,
|
||||||
event: ev,
|
|
||||||
Path: ev.Name,
|
|
||||||
Op: Op(ev.Op),
|
|
||||||
Watcher: w,
|
|
||||||
})
|
|
||||||
return struct{}{}, nil
|
|
||||||
}, repeatEventFilterDuration,
|
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
intlog.Errorf(context.TODO(), `%+v`, err)
|
intlog.Errorf(context.TODO(), `%+v`, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// error occurs in underlying watcher.
|
||||||
case err := <-w.watcher.Errors:
|
case err := <-w.watcher.Errors:
|
||||||
intlog.Errorf(context.TODO(), `%+v`, err)
|
intlog.Errorf(context.TODO(), `%+v`, err)
|
||||||
}
|
}
|
||||||
@ -55,26 +57,39 @@ func (w *Watcher) watchLoop() {
|
|||||||
|
|
||||||
// eventLoop is the core event handler.
|
// eventLoop is the core event handler.
|
||||||
func (w *Watcher) eventLoop() {
|
func (w *Watcher) eventLoop() {
|
||||||
|
var (
|
||||||
|
err error
|
||||||
|
ctx = context.TODO()
|
||||||
|
)
|
||||||
for {
|
for {
|
||||||
if v := w.events.Pop(); v != nil {
|
if v := w.events.Pop(); v != nil {
|
||||||
event := v.(*Event)
|
event := v.(*Event)
|
||||||
// If there's no any callback of this path, it removes it from monitor.
|
// If there's no any callback of this path, it removes it from monitor,
|
||||||
callbacks := w.getCallbacks(event.Path)
|
// as a path watching without callback is meaningless.
|
||||||
|
callbacks := w.getCallbacksForPath(event.Path)
|
||||||
if len(callbacks) == 0 {
|
if len(callbacks) == 0 {
|
||||||
_ = w.watcher.Remove(event.Path)
|
_ = w.watcher.Remove(event.Path)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
case event.IsRemove():
|
case event.IsRemove():
|
||||||
// It should check again the existence of the path.
|
// It should check again the existence of the path.
|
||||||
// It adds it back to the monitor if it still exists.
|
// It adds it back to the monitor if it still exists.
|
||||||
if fileExists(event.Path) {
|
if fileExists(event.Path) {
|
||||||
// It adds the path back to monitor.
|
// A watch will be automatically removed if the watched path is deleted or
|
||||||
|
// renamed.
|
||||||
|
//
|
||||||
|
// It here adds the path back to monitor.
|
||||||
// We need no worry about the repeat adding.
|
// We need no worry about the repeat adding.
|
||||||
if err := w.watcher.Add(event.Path); err != nil {
|
if err = w.watcher.Add(event.Path); err != nil {
|
||||||
intlog.Errorf(context.TODO(), `%+v`, err)
|
intlog.Errorf(ctx, `%+v`, err)
|
||||||
} else {
|
} else {
|
||||||
intlog.Printf(context.TODO(), "fake remove event, watcher re-adds monitor for: %s", event.Path)
|
intlog.Printf(
|
||||||
|
ctx,
|
||||||
|
"fake remove event, watcher re-adds monitor for: %s",
|
||||||
|
event.Path,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
// Change the event to RENAME, which means it renames itself to its origin name.
|
// Change the event to RENAME, which means it renames itself to its origin name.
|
||||||
event.Op = RENAME
|
event.Op = RENAME
|
||||||
@ -85,60 +100,49 @@ func (w *Watcher) eventLoop() {
|
|||||||
// It adds it back to the monitor if it still exists.
|
// It adds it back to the monitor if it still exists.
|
||||||
// Especially Some editors might do RENAME and then CHMOD when it's editing file.
|
// Especially Some editors might do RENAME and then CHMOD when it's editing file.
|
||||||
if fileExists(event.Path) {
|
if fileExists(event.Path) {
|
||||||
// It might lost the monitoring for the path, so we add the path back to monitor.
|
// A watch will be automatically removed if the watched path is deleted or
|
||||||
|
// renamed.
|
||||||
|
//
|
||||||
|
// It might lose the monitoring for the path, so we add the path back to monitor.
|
||||||
// We need no worry about the repeat adding.
|
// We need no worry about the repeat adding.
|
||||||
if err := w.watcher.Add(event.Path); err != nil {
|
if err = w.watcher.Add(event.Path); err != nil {
|
||||||
intlog.Errorf(context.TODO(), `%+v`, err)
|
intlog.Errorf(ctx, `%+v`, err)
|
||||||
} else {
|
} else {
|
||||||
intlog.Printf(context.TODO(), "fake rename event, watcher re-adds monitor for: %s", event.Path)
|
intlog.Printf(
|
||||||
|
ctx,
|
||||||
|
"fake rename event, watcher re-adds monitor for: %s",
|
||||||
|
event.Path,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
// Change the event to CHMOD.
|
// Change the event to CHMOD.
|
||||||
event.Op = CHMOD
|
event.Op = CHMOD
|
||||||
}
|
}
|
||||||
|
|
||||||
case event.IsCreate():
|
case event.IsCreate():
|
||||||
// =========================================
|
// =================================================================================
|
||||||
// Note that it here just adds the path to monitor without any callback registering,
|
// Note that it here just adds the path to monitor without any callback registering,
|
||||||
// because its parent already has the callbacks.
|
// because its parent already has the callbacks.
|
||||||
// =========================================
|
// =================================================================================
|
||||||
if fileIsDir(event.Path) {
|
if w.checkRecursiveWatchingInCreatingEvent(event.Path) {
|
||||||
// If it's a folder, it then does adding recursively to monitor.
|
// It handles only folders, watching folders also watching its sub files.
|
||||||
for _, subPath := range fileAllDirs(event.Path) {
|
for _, subPath := range fileAllDirs(event.Path) {
|
||||||
if fileIsDir(subPath) {
|
if fileIsDir(subPath) {
|
||||||
if err := w.watcher.Add(subPath); err != nil {
|
if err = w.watcher.Add(subPath); err != nil {
|
||||||
intlog.Errorf(context.TODO(), `%+v`, err)
|
intlog.Errorf(ctx, `%+v`, err)
|
||||||
} else {
|
} else {
|
||||||
intlog.Printf(context.TODO(), "folder creation event, watcher adds monitor for: %s", subPath)
|
intlog.Printf(
|
||||||
|
ctx,
|
||||||
|
"folder creation event, watcher adds monitor for: %s",
|
||||||
|
subPath,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
// If it's a file, it directly adds it to monitor.
|
|
||||||
if err := w.watcher.Add(event.Path); err != nil {
|
|
||||||
intlog.Errorf(context.TODO(), `%+v`, err)
|
|
||||||
} else {
|
|
||||||
intlog.Printf(context.TODO(), "file creation event, watcher adds monitor for: %s", event.Path)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Calling the callbacks in order.
|
// Calling the callbacks in multiple goroutines.
|
||||||
for _, callback := range callbacks {
|
for _, callback := range callbacks {
|
||||||
go func(callback *Callback) {
|
go w.doCallback(event, callback)
|
||||||
defer func() {
|
|
||||||
if err := recover(); err != nil {
|
|
||||||
switch err {
|
|
||||||
case callbackExitEventPanicStr:
|
|
||||||
w.RemoveCallback(callback.Id)
|
|
||||||
default:
|
|
||||||
if e, ok := err.(error); ok {
|
|
||||||
panic(gerror.WrapCode(gcode.CodeInternalPanic, e))
|
|
||||||
}
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
callback.Func(event)
|
|
||||||
}(callback)
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
break
|
break
|
||||||
@ -146,35 +150,85 @@ func (w *Watcher) eventLoop() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// getCallbacks searches and returns all callbacks with given `path`.
|
// checkRecursiveWatchingInCreatingEvent checks and returns whether recursive adding given `path` to watcher
|
||||||
|
// in creating event.
|
||||||
|
func (w *Watcher) checkRecursiveWatchingInCreatingEvent(path string) bool {
|
||||||
|
if !fileIsDir(path) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
var (
|
||||||
|
parentDirPath string
|
||||||
|
dirPath = path
|
||||||
|
)
|
||||||
|
for {
|
||||||
|
parentDirPath = fileDir(dirPath)
|
||||||
|
if parentDirPath == dirPath {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if callbackItem := w.callbacks.Get(parentDirPath); callbackItem != nil {
|
||||||
|
for _, node := range callbackItem.(*glist.List).FrontAll() {
|
||||||
|
callback := node.(*Callback)
|
||||||
|
if callback.recursive {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
dirPath = parentDirPath
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *Watcher) doCallback(event *Event, callback *Callback) {
|
||||||
|
defer func() {
|
||||||
|
if exception := recover(); exception != nil {
|
||||||
|
switch exception {
|
||||||
|
case callbackExitEventPanicStr:
|
||||||
|
w.RemoveCallback(callback.Id)
|
||||||
|
default:
|
||||||
|
if e, ok := exception.(error); ok {
|
||||||
|
panic(gerror.WrapCode(gcode.CodeInternalPanic, e))
|
||||||
|
}
|
||||||
|
panic(exception)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
callback.Func(event)
|
||||||
|
}
|
||||||
|
|
||||||
|
// getCallbacksForPath searches and returns all callbacks with given `path`.
|
||||||
|
//
|
||||||
// It also searches its parents for callbacks if they're recursive.
|
// It also searches its parents for callbacks if they're recursive.
|
||||||
func (w *Watcher) getCallbacks(path string) (callbacks []*Callback) {
|
func (w *Watcher) getCallbacksForPath(path string) (callbacks []*Callback) {
|
||||||
// Firstly add the callbacks of itself.
|
// Firstly add the callbacks of itself.
|
||||||
if v := w.callbacks.Get(path); v != nil {
|
if item := w.callbacks.Get(path); item != nil {
|
||||||
for _, v := range v.(*glist.List).FrontAll() {
|
for _, node := range item.(*glist.List).FrontAll() {
|
||||||
callback := v.(*Callback)
|
callback := node.(*Callback)
|
||||||
callbacks = append(callbacks, callback)
|
callbacks = append(callbacks, callback)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// ============================================================================================================
|
||||||
// Secondly searches its direct parent for callbacks.
|
// Secondly searches its direct parent for callbacks.
|
||||||
// It is special handling here, which is the different between `recursive` and `not recursive` logic
|
//
|
||||||
|
// Note that it is SPECIAL handling here, which is the different between `recursive` and `not recursive` logic
|
||||||
// for direct parent folder of `path` that events are from.
|
// for direct parent folder of `path` that events are from.
|
||||||
|
// ============================================================================================================
|
||||||
dirPath := fileDir(path)
|
dirPath := fileDir(path)
|
||||||
if v := w.callbacks.Get(dirPath); v != nil {
|
if item := w.callbacks.Get(dirPath); item != nil {
|
||||||
for _, v := range v.(*glist.List).FrontAll() {
|
for _, node := range item.(*glist.List).FrontAll() {
|
||||||
callback := v.(*Callback)
|
callback := node.(*Callback)
|
||||||
callbacks = append(callbacks, callback)
|
callbacks = append(callbacks, callback)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Lastly searches all the parents of directory of `path` recursively for callbacks.
|
// Lastly searches all the parents of directory of `path` recursively for callbacks.
|
||||||
for {
|
for {
|
||||||
parentDirPath := fileDir(dirPath)
|
parentDirPath := fileDir(dirPath)
|
||||||
if parentDirPath == dirPath {
|
if parentDirPath == dirPath {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if v := w.callbacks.Get(parentDirPath); v != nil {
|
if item := w.callbacks.Get(parentDirPath); item != nil {
|
||||||
for _, v := range v.(*glist.List).FrontAll() {
|
for _, node := range item.(*glist.List).FrontAll() {
|
||||||
callback := v.(*Callback)
|
callback := node.(*Callback)
|
||||||
if callback.recursive {
|
if callback.recursive {
|
||||||
callbacks = append(callbacks, callback)
|
callbacks = append(callbacks, callback)
|
||||||
}
|
}
|
||||||
|
@ -193,6 +193,39 @@ func TestWatcher_Callback2(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestWatcher_WatchFolderWithRecursively(t *testing.T) {
|
||||||
|
gtest.C(t, func(t *gtest.T) {
|
||||||
|
var (
|
||||||
|
err error
|
||||||
|
array = garray.New(true)
|
||||||
|
dirPath = gfile.Temp(gtime.TimestampNanoStr())
|
||||||
|
)
|
||||||
|
err = gfile.Mkdir(dirPath)
|
||||||
|
t.AssertNil(err)
|
||||||
|
defer gfile.Remove(dirPath)
|
||||||
|
|
||||||
|
_, err = gfsnotify.Add(dirPath, func(event *gfsnotify.Event) {
|
||||||
|
//fmt.Println(event.String())
|
||||||
|
array.Append(1)
|
||||||
|
})
|
||||||
|
t.AssertNil(err)
|
||||||
|
time.Sleep(time.Millisecond * 100)
|
||||||
|
t.Assert(array.Len(), 0)
|
||||||
|
|
||||||
|
subDirPath := gfile.Join(dirPath, gtime.TimestampNanoStr())
|
||||||
|
err = gfile.Mkdir(subDirPath)
|
||||||
|
t.AssertNil(err)
|
||||||
|
time.Sleep(time.Millisecond * 100)
|
||||||
|
t.Assert(array.Len(), 1)
|
||||||
|
|
||||||
|
f, err := gfile.Create(gfile.Join(subDirPath, gtime.TimestampNanoStr()))
|
||||||
|
t.AssertNil(err)
|
||||||
|
t.AssertNil(f.Close())
|
||||||
|
time.Sleep(time.Millisecond * 100)
|
||||||
|
t.Assert(array.Len(), 2)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func TestWatcher_WatchFolderWithoutRecursively(t *testing.T) {
|
func TestWatcher_WatchFolderWithoutRecursively(t *testing.T) {
|
||||||
gtest.C(t, func(t *gtest.T) {
|
gtest.C(t, func(t *gtest.T) {
|
||||||
var (
|
var (
|
||||||
@ -206,7 +239,7 @@ func TestWatcher_WatchFolderWithoutRecursively(t *testing.T) {
|
|||||||
_, err = gfsnotify.Add(dirPath, func(event *gfsnotify.Event) {
|
_, err = gfsnotify.Add(dirPath, func(event *gfsnotify.Event) {
|
||||||
// fmt.Println(event.String())
|
// fmt.Println(event.String())
|
||||||
array.Append(1)
|
array.Append(1)
|
||||||
}, false)
|
}, gfsnotify.WatchOption{NoRecursive: true})
|
||||||
t.AssertNil(err)
|
t.AssertNil(err)
|
||||||
time.Sleep(time.Millisecond * 100)
|
time.Sleep(time.Millisecond * 100)
|
||||||
t.Assert(array.Len(), 0)
|
t.Assert(array.Len(), 0)
|
||||||
|
@ -83,7 +83,7 @@ func (sp *SPath) addToCache(filePath, rootPath string) {
|
|||||||
// When the files under the directory are updated, the cache will be updated meanwhile.
|
// When the files under the directory are updated, the cache will be updated meanwhile.
|
||||||
// Note that since the listener is added recursively, if you delete a directory, the files (including the directory)
|
// Note that since the listener is added recursively, if you delete a directory, the files (including the directory)
|
||||||
// under the directory will also generate delete events, which means it will generate N+1 events in total
|
// under the directory will also generate delete events, which means it will generate N+1 events in total
|
||||||
// if a directory deleted and there're N files under it.
|
// if a directory deleted and there are N files under it.
|
||||||
func (sp *SPath) addMonitorByPath(path string) {
|
func (sp *SPath) addMonitorByPath(path string) {
|
||||||
if sp.cache == nil {
|
if sp.cache == nil {
|
||||||
return
|
return
|
||||||
@ -102,7 +102,7 @@ func (sp *SPath) addMonitorByPath(path string) {
|
|||||||
case event.IsCreate():
|
case event.IsCreate():
|
||||||
sp.addToCache(event.Path, path)
|
sp.addToCache(event.Path, path)
|
||||||
}
|
}
|
||||||
}, true)
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// removeMonitorByPath removes gfsnotify monitoring of `path` recursively.
|
// removeMonitorByPath removes gfsnotify monitoring of `path` recursively.
|
||||||
|
Loading…
x
Reference in New Issue
Block a user