Skip to content

Commit

Permalink
Add remote_work close errors
Browse files Browse the repository at this point in the history
  • Loading branch information
thom-at-redhat committed Oct 24, 2024
1 parent 616f38b commit 6479d13
Showing 1 changed file with 27 additions and 8 deletions.
35 changes: 27 additions & 8 deletions pkg/workceptor/remote_work.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
// remoteUnit implements the WorkUnit interface for the Receptor remote worker plugin.
type remoteUnit struct {
BaseWorkUnitForWorkUnit
topJC *utils.JobContext
logger *logger.ReceptorLogger
topJC *utils.JobContext
logger *logger.ReceptorLogger
}

// RemoteExtraData is the content of the ExtraData JSON field for a remote work unit.
Expand Down Expand Up @@ -136,7 +136,11 @@ func (rw *remoteUnit) getConnectionAndRun(ctx context.Context, firstTimeSync boo
go func() {
conn, reader := rw.getConnection(ctx)
if conn != nil {
_ = action(ctx, conn, reader)
err := action(ctx, conn, reader)
if err != nil {
rw.GetWorkceptor().nc.GetLogger().Error("Error running action function: %s", err)
}

} else {
failure()
}
Expand Down Expand Up @@ -297,15 +301,21 @@ func (rw *remoteUnit) monitorRemoteStatus(mw *utils.JobContext, forRelease bool)
_, err := conn.Write([]byte(fmt.Sprintf("work status %s\n", remoteUnitID)))
if err != nil {
rw.GetWorkceptor().nc.GetLogger().Debug("Write error sending to %s: %s\n", remoteUnitID, err)
_ = conn.(interface{ CloseConnection() error }).CloseConnection()
cerr := conn.(interface{ CloseConnection() error }).CloseConnection()
if cerr != nil {
rw.GetWorkceptor().nc.GetLogger().Error("Error closing connection to remote work unit %s: %s", remoteUnitID, cerr)
}
conn = nil

continue
}
status, err := utils.ReadStringContext(mw, reader, '\n')
if err != nil {
rw.GetWorkceptor().nc.GetLogger().Debug("Read error reading from %s: %s\n", remoteNode, err)
_ = conn.(interface{ CloseConnection() error }).CloseConnection()
cerr := conn.(interface{ CloseConnection() error }).CloseConnection()
if cerr != nil {
rw.GetWorkceptor().nc.GetLogger().Error("Error closing connection from node %s: %s", remoteNode, cerr)
}
conn = nil

continue
Expand Down Expand Up @@ -404,7 +414,10 @@ func (rw *remoteUnit) monitorRemoteStdout(mw *utils.JobContext) {
conn, reader := rw.getConnection(mw)
defer func() {
if conn != nil {
_ = conn.(interface{ CloseConnection() error }).CloseConnection()
cerr := conn.(interface{ CloseConnection() error }).CloseConnection()
if cerr != nil {
rw.GetWorkceptor().nc.GetLogger().Error("Error closing connection to %s: %s", remoteUnitID, cerr)
}
}
}()
if conn == nil {
Expand Down Expand Up @@ -464,7 +477,10 @@ func (rw *remoteUnit) monitorRemoteStdout(mw *utils.JobContext) {
if ok {
cr.CancelRead()
}
_ = conn.(interface{ CloseConnection() error }).CloseConnection()
cerr := conn.(interface{ CloseConnection() error }).CloseConnection()
if cerr != nil {
rw.GetWorkceptor().nc.GetLogger().Error("Error closing connection to %s: %s", remoteUnitID, cerr)
}

return
}
Expand Down Expand Up @@ -657,9 +673,12 @@ func (rw *remoteUnit) cancelOrRelease(release bool, force bool) error {
return nil
}
if release && force {
_ = rw.connectAndRun(rw.GetWorkceptor().ctx, func(ctx context.Context, conn net.Conn, reader *bufio.Reader) error {
err := rw.connectAndRun(rw.GetWorkceptor().ctx, func(ctx context.Context, conn net.Conn, reader *bufio.Reader) error {
return rw.cancelOrReleaseRemoteUnit(ctx, conn, reader, true)
})
if err != nil {
rw.GetWorkceptor().nc.GetLogger().Error("Error closing connection to %s: %s", remoteUnitID, cerr)

Check failure on line 680 in pkg/workceptor/remote_work.go

View workflow job for this annotation

GitHub Actions / go test coverage

undefined: remoteUnitID

Check failure on line 680 in pkg/workceptor/remote_work.go

View workflow job for this annotation

GitHub Actions / go test coverage

undefined: cerr

Check failure on line 680 in pkg/workceptor/remote_work.go

View workflow job for this annotation

GitHub Actions / go test coverage

undefined: remoteUnitID

Check failure on line 680 in pkg/workceptor/remote_work.go

View workflow job for this annotation

GitHub Actions / go test coverage

undefined: cerr

Check failure on line 680 in pkg/workceptor/remote_work.go

View workflow job for this annotation

GitHub Actions / lint-receptor

undefined: remoteUnitID

Check failure on line 680 in pkg/workceptor/remote_work.go

View workflow job for this annotation

GitHub Actions / lint-receptor

undefined: cerr

Check failure on line 680 in pkg/workceptor/remote_work.go

View workflow job for this annotation

GitHub Actions / lint-receptor

undefined: remoteUnitID

Check failure on line 680 in pkg/workceptor/remote_work.go

View workflow job for this annotation

GitHub Actions / lint-receptor

undefined: cerr

Check failure on line 680 in pkg/workceptor/remote_work.go

View workflow job for this annotation

GitHub Actions / lint-receptor

undefined: remoteUnitID

Check failure on line 680 in pkg/workceptor/remote_work.go

View workflow job for this annotation

GitHub Actions / lint-receptor

undefined: cerr

Check failure on line 680 in pkg/workceptor/remote_work.go

View workflow job for this annotation

GitHub Actions / lint-receptor

undefined: remoteUnitID

Check failure on line 680 in pkg/workceptor/remote_work.go

View workflow job for this annotation

GitHub Actions / receptor (Go 1.20)

undefined: remoteUnitID

Check failure on line 680 in pkg/workceptor/remote_work.go

View workflow job for this annotation

GitHub Actions / receptor (Go 1.20)

undefined: cerr

Check failure on line 680 in pkg/workceptor/remote_work.go

View workflow job for this annotation

GitHub Actions / receptor (Go 1.21)

undefined: remoteUnitID

Check failure on line 680 in pkg/workceptor/remote_work.go

View workflow job for this annotation

GitHub Actions / receptor (Go 1.21)

undefined: cerr
}

return rw.BaseWorkUnitForWorkUnit.Release(true)
}
Expand Down

0 comments on commit 6479d13

Please sign in to comment.