Skip to content

Commit

Permalink
Merge branch 'devel' of ssh://ssh.github.com:443/ansible/receptor int…
Browse files Browse the repository at this point in the history
…o add_fsnotify_error_monitoring
  • Loading branch information
thom-at-redhat committed Oct 24, 2024
2 parents c3c3d40 + 52be401 commit e43291d
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 11 deletions.
32 changes: 25 additions & 7 deletions pkg/workceptor/remote_work.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (rw *remoteUnit) getConnection(ctx context.Context) (net.Conn, *bufio.Reade
if err == nil {
return conn, reader
}
rw.GetWorkceptor().nc.GetLogger().Debug("Connection to %s failed with error: %s",
rw.GetWorkceptor().nc.GetLogger().Info("Connection to %s failed with error: %s",
rw.Status().ExtraData.(*RemoteExtraData).RemoteNode, err)
errStr := err.Error()
if strings.Contains(errStr, "CRYPTO_ERROR") {
Expand Down Expand Up @@ -136,7 +136,10 @@ func (rw *remoteUnit) getConnectionAndRun(ctx context.Context, firstTimeSync boo
go func() {
conn, reader := rw.getConnection(ctx)
if conn != nil {
_ = action(ctx, conn, reader)
err := action(ctx, conn, reader)
if err != nil {
rw.GetWorkceptor().nc.GetLogger().Error("Error running action function: %s", err)
}
} else {
failure()
}
Expand Down Expand Up @@ -297,15 +300,21 @@ func (rw *remoteUnit) monitorRemoteStatus(mw *utils.JobContext, forRelease bool)
_, err := conn.Write([]byte(fmt.Sprintf("work status %s\n", remoteUnitID)))
if err != nil {
rw.GetWorkceptor().nc.GetLogger().Debug("Write error sending to %s: %s\n", remoteUnitID, err)
_ = conn.(interface{ CloseConnection() error }).CloseConnection()
cerr := conn.(interface{ CloseConnection() error }).CloseConnection()
if cerr != nil {
rw.GetWorkceptor().nc.GetLogger().Error("Error closing connection to remote work unit %s: %s", remoteUnitID, cerr)
}
conn = nil

continue
}
status, err := utils.ReadStringContext(mw, reader, '\n')
if err != nil {
rw.GetWorkceptor().nc.GetLogger().Debug("Read error reading from %s: %s\n", remoteNode, err)
_ = conn.(interface{ CloseConnection() error }).CloseConnection()
cerr := conn.(interface{ CloseConnection() error }).CloseConnection()
if cerr != nil {
rw.GetWorkceptor().nc.GetLogger().Error("Error closing connection from node %s: %s", remoteNode, cerr)
}
conn = nil

continue
Expand Down Expand Up @@ -404,7 +413,10 @@ func (rw *remoteUnit) monitorRemoteStdout(mw *utils.JobContext) {
conn, reader := rw.getConnection(mw)
defer func() {
if conn != nil {
_ = conn.(interface{ CloseConnection() error }).CloseConnection()
cerr := conn.(interface{ CloseConnection() error }).CloseConnection()
if cerr != nil {
rw.GetWorkceptor().nc.GetLogger().Error("Error closing connection to %s: %s", remoteUnitID, cerr)
}
}
}()
if conn == nil {
Expand Down Expand Up @@ -464,7 +476,10 @@ func (rw *remoteUnit) monitorRemoteStdout(mw *utils.JobContext) {
if ok {
cr.CancelRead()
}
_ = conn.(interface{ CloseConnection() error }).CloseConnection()
cerr := conn.(interface{ CloseConnection() error }).CloseConnection()
if cerr != nil {
rw.GetWorkceptor().nc.GetLogger().Error("Error closing connection to %s: %s", remoteUnitID, cerr)
}

return
}
Expand Down Expand Up @@ -657,9 +672,12 @@ func (rw *remoteUnit) cancelOrRelease(release bool, force bool) error {
return nil
}
if release && force {
_ = rw.connectAndRun(rw.GetWorkceptor().ctx, func(ctx context.Context, conn net.Conn, reader *bufio.Reader) error {
err := rw.connectAndRun(rw.GetWorkceptor().ctx, func(ctx context.Context, conn net.Conn, reader *bufio.Reader) error {
return rw.cancelOrReleaseRemoteUnit(ctx, conn, reader, true)
})
if err != nil {
rw.GetWorkceptor().nc.GetLogger().Error("Error with connect and run: %s", err)
}

return rw.BaseWorkUnitForWorkUnit.Release(true)
}
Expand Down
21 changes: 17 additions & 4 deletions pkg/workceptor/workunitbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,11 @@ func (sfd *StatusFileData) Save(filename string) error {
}
err = sfd.saveToFile(file)
if err != nil {
_ = file.Close()
serr := file.Close()

if serr != nil {
MainInstance.nc.GetLogger().Error("Error closing %s: %s", filename, serr)
}

return err
}
Expand Down Expand Up @@ -262,7 +266,10 @@ func (sfd *StatusFileData) Load(filename string) error {
}
err = sfd.loadFromFile(file)
if err != nil {
_ = file.Close()
lerr := file.Close()
if lerr != nil {
MainInstance.nc.GetLogger().Error("Error closing %s: %s", filename, lerr)
}

return err
}
Expand Down Expand Up @@ -389,11 +396,17 @@ func (bwu *BaseWorkUnit) MonitorLocalStatus() {
err := bwu.watcher.Add(statusFile)
if err == nil {
defer func() {
_ = bwu.watcher.Close()
werr := bwu.watcher.Close()
if werr != nil {
bwu.w.nc.GetLogger().Error("Error closing %s: %s", statusFile, err)
}
}()
watcherEvents = bwu.watcher.EventChannel()
} else {
_ = bwu.watcher.Close()
werr := bwu.watcher.Close()
if werr != nil {
bwu.w.nc.GetLogger().Error("Error closing %s: %s", statusFile, err)
}
bwu.watcher = nil
}
}
Expand Down

0 comments on commit e43291d

Please sign in to comment.