From abeed1dcb8131f45c2f7492e3a64cf3f9d346a10 Mon Sep 17 00:00:00 2001 From: Thom Carlin Date: Thu, 24 Oct 2024 10:58:33 -0400 Subject: [PATCH 01/10] Add fsnotify error monitoring --- pkg/workceptor/workunitbase.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/workceptor/workunitbase.go b/pkg/workceptor/workunitbase.go index 47aa21c34..36d156354 100644 --- a/pkg/workceptor/workunitbase.go +++ b/pkg/workceptor/workunitbase.go @@ -436,6 +436,11 @@ loop: bwu.w.nc.GetLogger().Error("Error reading %s: %s", statusFile, err) } } + case err, ok := <-watcher.Errors: + if !ok { + return + } + bwu.w.nc.GetLogger()Error("fsnotify Error reading %s: %s", statusFile, err) } complete := IsComplete(bwu.Status().State) if complete { From 7db5eb6090cef2e24a1f51ef4dc3cfbaa1c5a68b Mon Sep 17 00:00:00 2001 From: Thom Carlin Date: Thu, 24 Oct 2024 16:57:01 -0400 Subject: [PATCH 02/10] Add fsnotify error monitoring --- pkg/workceptor/workunitbase.go | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/pkg/workceptor/workunitbase.go b/pkg/workceptor/workunitbase.go index 36d156354..a8af7817e 100644 --- a/pkg/workceptor/workunitbase.go +++ b/pkg/workceptor/workunitbase.go @@ -37,6 +37,7 @@ const ( type WatcherWrapper interface { Add(name string) error Close() error + ErrorChannel() chan error EventChannel() chan fsnotify.Event } @@ -52,6 +53,10 @@ func (rw *RealWatcher) Close() error { return rw.watcher.Close() } +func (rw *RealWatcher) ErrorChannel() chan error { + return rw.watcher.Errors +} + func (rw *RealWatcher) EventChannel() chan fsnotify.Event { return rw.watcher.Events } @@ -126,6 +131,7 @@ func (bwu *BaseWorkUnit) Init(w *Workceptor, unitID string, workType string, fs if err == nil { bwu.watcher = &RealWatcher{watcher: watcher} } else { + bwu.w.nc.GetLogger.Info("fsnotify.NewWatcher returned %s", err) bwu.watcher = nil } } @@ -392,6 +398,9 @@ func (bwu *BaseWorkUnit) MonitorLocalStatus() { var watcherEvents chan fsnotify.Event watcherEvents = make(chan fsnotify.Event) + var watcherErrors chan error + watcherErrors = make(chan error) + if bwu.watcher != nil { err := bwu.watcher.Add(statusFile) if err == nil { @@ -402,6 +411,7 @@ func (bwu *BaseWorkUnit) MonitorLocalStatus() { } }() watcherEvents = bwu.watcher.EventChannel() + watcherErrors = bwu.watcher.ErrorChannel() } else { werr := bwu.watcher.Close() if werr != nil { @@ -412,6 +422,7 @@ func (bwu *BaseWorkUnit) MonitorLocalStatus() { } fi, err := bwu.fs.Stat(statusFile) if err != nil { + bwu.w.nc.GetLogger().Error("Error retrieving stat for %s: %s", statusFile, err) fi = nil } @@ -436,11 +447,11 @@ loop: bwu.w.nc.GetLogger().Error("Error reading %s: %s", statusFile, err) } } - case err, ok := <-watcher.Errors: - if !ok { - return - } - bwu.w.nc.GetLogger()Error("fsnotify Error reading %s: %s", statusFile, err) + case err, ok := <-watcherErrors: + if !ok { + return + } + bwu.w.nc.GetLogger().Error("fsnotify Error reading %s: %s", statusFile, err) } complete := IsComplete(bwu.Status().State) if complete { From a22a937eb1635541598f2486c2b53f559c75e623 Mon Sep 17 00:00:00 2001 From: Thom Carlin Date: Thu, 24 Oct 2024 17:00:09 -0400 Subject: [PATCH 03/10] Fix typo --- pkg/workceptor/workunitbase.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/workceptor/workunitbase.go b/pkg/workceptor/workunitbase.go index a8af7817e..c3dda6ea2 100644 --- a/pkg/workceptor/workunitbase.go +++ b/pkg/workceptor/workunitbase.go @@ -131,7 +131,7 @@ func (bwu *BaseWorkUnit) Init(w *Workceptor, unitID string, workType string, fs if err == nil { bwu.watcher = &RealWatcher{watcher: watcher} } else { - bwu.w.nc.GetLogger.Info("fsnotify.NewWatcher returned %s", err) + bwu.w.nc.GetLogger().Info("fsnotify.NewWatcher returned %s", err) bwu.watcher = nil } } From a773d1ce2a40b3abcbf142f084faa9c532e86f03 Mon Sep 17 00:00:00 2001 From: Thom Carlin Date: Thu, 24 Oct 2024 17:07:56 -0400 Subject: [PATCH 04/10] update WatcherWrapper --- pkg/workceptor/mock_workceptor/workunitbase.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/pkg/workceptor/mock_workceptor/workunitbase.go b/pkg/workceptor/mock_workceptor/workunitbase.go index 5468ee85f..815a6d88d 100644 --- a/pkg/workceptor/mock_workceptor/workunitbase.go +++ b/pkg/workceptor/mock_workceptor/workunitbase.go @@ -68,6 +68,20 @@ func (mr *MockWatcherWrapperMockRecorder) Close() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockWatcherWrapper)(nil).Close)) } +// ErrorChannel mocks base method. +func (m *MockWatcherWrapper) ErrorChannel() chan error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ErrorChannel") + ret0, _ := ret[0].(chan error) + return ret0 +} + +// ErrorChannel indicates an expected call of ErrorChannel. +func (mr *MockWatcherWrapperMockRecorder) ErrorChannel() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ErrorChannel", reflect.TypeOf((*MockWatcherWrapper)(nil).ErrorChannel)) +} + // EventChannel mocks base method. func (m *MockWatcherWrapper) EventChannel() chan fsnotify.Event { m.ctrl.T.Helper() From 57a406d586e53535cd9fb1d5c53e51fc689bddba Mon Sep 17 00:00:00 2001 From: Thom Carlin Date: Fri, 25 Oct 2024 07:54:58 -0400 Subject: [PATCH 05/10] Fix preexisting test issue --- pkg/workceptor/command.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/workceptor/command.go b/pkg/workceptor/command.go index 8d51de73c..efa6fcd6d 100644 --- a/pkg/workceptor/command.go +++ b/pkg/workceptor/command.go @@ -432,9 +432,9 @@ func (cfg commandRunnerCfg) Run() error { err := commandRunner(cfg.Command, cfg.Params, cfg.UnitDir) if err != nil { statusFilename := path.Join(cfg.UnitDir, "status") - err = (&StatusFileData{}).UpdateBasicStatus(statusFilename, WorkStateFailed, err.Error(), stdoutSize(cfg.UnitDir)) - if err != nil { - MainInstance.nc.GetLogger().Error("Error updating status file %s: %s", statusFilename, err) + err2 := (&StatusFileData{}).UpdateBasicStatus(statusFilename, WorkStateFailed, err.Error(), stdoutSize(cfg.UnitDir)) + if err2 != nil { + MainInstance.nc.GetLogger().Error("Error updating status file %s: %s", statusFilename, err2) } MainInstance.nc.GetLogger().Error("Command runner exited with error: %s\n", err) os.Exit(-1) From 325f2baa5efbd72f4e9cd4805b3fdcf011727194 Mon Sep 17 00:00:00 2001 From: Thom Carlin Date: Mon, 28 Oct 2024 08:01:16 -0400 Subject: [PATCH 06/10] Typo --- pkg/workceptor/workunitbase.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/workceptor/workunitbase.go b/pkg/workceptor/workunitbase.go index c3dda6ea2..f79ef2c36 100644 --- a/pkg/workceptor/workunitbase.go +++ b/pkg/workceptor/workunitbase.go @@ -407,7 +407,7 @@ func (bwu *BaseWorkUnit) MonitorLocalStatus() { defer func() { werr := bwu.watcher.Close() if werr != nil { - bwu.w.nc.GetLogger().Error("Error closing %s: %s", statusFile, err) + bwu.w.nc.GetLogger().Error("Error in defer closing %s: %s", statusFile, err) } }() watcherEvents = bwu.watcher.EventChannel() @@ -435,7 +435,7 @@ loop: if event.Op&fsnotify.Write == fsnotify.Write { err = bwu.Load() if err != nil { - bwu.w.nc.GetLogger().Error("Error reading %s: %s", statusFile, err) + bwu.w.nc.GetLogger().Error("Watcher Events Error reading %s: %s", statusFile, err) } } case <-time.After(time.Second): @@ -444,7 +444,7 @@ loop: fi = newFi err = bwu.Load() if err != nil { - bwu.w.nc.GetLogger().Error("Error reading %s: %s", statusFile, err) + bwu.w.nc.GetLogger().Error("Work unit load Error reading %s: %s", statusFile, err) } } case err, ok := <-watcherErrors: From 4171c1f1622257b2dfb356b59e765beda65b2f5a Mon Sep 17 00:00:00 2001 From: Thom Carlin Date: Mon, 28 Oct 2024 08:11:26 -0400 Subject: [PATCH 07/10] Revert command.go --- pkg/workceptor/command.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/workceptor/command.go b/pkg/workceptor/command.go index efa6fcd6d..8d51de73c 100644 --- a/pkg/workceptor/command.go +++ b/pkg/workceptor/command.go @@ -432,9 +432,9 @@ func (cfg commandRunnerCfg) Run() error { err := commandRunner(cfg.Command, cfg.Params, cfg.UnitDir) if err != nil { statusFilename := path.Join(cfg.UnitDir, "status") - err2 := (&StatusFileData{}).UpdateBasicStatus(statusFilename, WorkStateFailed, err.Error(), stdoutSize(cfg.UnitDir)) - if err2 != nil { - MainInstance.nc.GetLogger().Error("Error updating status file %s: %s", statusFilename, err2) + err = (&StatusFileData{}).UpdateBasicStatus(statusFilename, WorkStateFailed, err.Error(), stdoutSize(cfg.UnitDir)) + if err != nil { + MainInstance.nc.GetLogger().Error("Error updating status file %s: %s", statusFilename, err) } MainInstance.nc.GetLogger().Error("Command runner exited with error: %s\n", err) os.Exit(-1) From eef503f31bd72bb21ff0ca0d775f7ebaa1917975 Mon Sep 17 00:00:00 2001 From: Thom Carlin Date: Mon, 28 Oct 2024 12:55:03 -0400 Subject: [PATCH 08/10] Add expect for errorchannel --- pkg/workceptor/workunitbase_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/workceptor/workunitbase_test.go b/pkg/workceptor/workunitbase_test.go index 19ff3bae9..05d9e1a8e 100644 --- a/pkg/workceptor/workunitbase_test.go +++ b/pkg/workceptor/workunitbase_test.go @@ -364,6 +364,7 @@ func TestMonitorLocalStatus(t *testing.T) { if tc.fsNotifyEvent != nil { eventCh := make(chan fsnotify.Event, 1) mockWatcher.EXPECT().EventChannel().Return(eventCh).AnyTimes() + mockWatcher.EXPECT().ErrorChannel().Return(eventCh).AnyTimes() go func() { eventCh <- *tc.fsNotifyEvent }() } From 72948b2a64968d70aa84eed53974b0c1c5839143 Mon Sep 17 00:00:00 2001 From: Thom Carlin Date: Mon, 28 Oct 2024 13:53:22 -0400 Subject: [PATCH 09/10] Fix errorch event --- pkg/workceptor/workunitbase_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/workceptor/workunitbase_test.go b/pkg/workceptor/workunitbase_test.go index 05d9e1a8e..19ff3bae9 100644 --- a/pkg/workceptor/workunitbase_test.go +++ b/pkg/workceptor/workunitbase_test.go @@ -364,7 +364,6 @@ func TestMonitorLocalStatus(t *testing.T) { if tc.fsNotifyEvent != nil { eventCh := make(chan fsnotify.Event, 1) mockWatcher.EXPECT().EventChannel().Return(eventCh).AnyTimes() - mockWatcher.EXPECT().ErrorChannel().Return(eventCh).AnyTimes() go func() { eventCh <- *tc.fsNotifyEvent }() } From eee4e8cfd2c35f9c97db16c97f52ae4a8917b9a9 Mon Sep 17 00:00:00 2001 From: Thom Carlin Date: Mon, 28 Oct 2024 14:06:56 -0400 Subject: [PATCH 10/10] Correct errorCh --- pkg/workceptor/workunitbase_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/workceptor/workunitbase_test.go b/pkg/workceptor/workunitbase_test.go index 19ff3bae9..b21c7dfb7 100644 --- a/pkg/workceptor/workunitbase_test.go +++ b/pkg/workceptor/workunitbase_test.go @@ -365,6 +365,10 @@ func TestMonitorLocalStatus(t *testing.T) { eventCh := make(chan fsnotify.Event, 1) mockWatcher.EXPECT().EventChannel().Return(eventCh).AnyTimes() go func() { eventCh <- *tc.fsNotifyEvent }() + + errorCh := make(chan error, 1) + mockWatcher.EXPECT().ErrorChannel().Return(errorCh).AnyTimes() + // go func() { errorCh <- nil }() } go bwu.MonitorLocalStatus()