From 6479d130366fa4ad17a670c76683995d321f13f0 Mon Sep 17 00:00:00 2001 From: Thom Carlin Date: Thu, 24 Oct 2024 14:26:42 -0400 Subject: [PATCH] Add remote_work close errors --- pkg/workceptor/remote_work.go | 35 +++++++++++++++++++++++++++-------- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/pkg/workceptor/remote_work.go b/pkg/workceptor/remote_work.go index 6ad1d283e..36f4cc3aa 100644 --- a/pkg/workceptor/remote_work.go +++ b/pkg/workceptor/remote_work.go @@ -24,8 +24,8 @@ import ( // remoteUnit implements the WorkUnit interface for the Receptor remote worker plugin. type remoteUnit struct { BaseWorkUnitForWorkUnit - topJC *utils.JobContext - logger *logger.ReceptorLogger + topJC *utils.JobContext + logger *logger.ReceptorLogger } // RemoteExtraData is the content of the ExtraData JSON field for a remote work unit. @@ -136,7 +136,11 @@ func (rw *remoteUnit) getConnectionAndRun(ctx context.Context, firstTimeSync boo go func() { conn, reader := rw.getConnection(ctx) if conn != nil { - _ = action(ctx, conn, reader) + err := action(ctx, conn, reader) + if err != nil { + rw.GetWorkceptor().nc.GetLogger().Error("Error running action function: %s", err) + } + } else { failure() } @@ -297,7 +301,10 @@ func (rw *remoteUnit) monitorRemoteStatus(mw *utils.JobContext, forRelease bool) _, err := conn.Write([]byte(fmt.Sprintf("work status %s\n", remoteUnitID))) if err != nil { rw.GetWorkceptor().nc.GetLogger().Debug("Write error sending to %s: %s\n", remoteUnitID, err) - _ = conn.(interface{ CloseConnection() error }).CloseConnection() + cerr := conn.(interface{ CloseConnection() error }).CloseConnection() + if cerr != nil { + rw.GetWorkceptor().nc.GetLogger().Error("Error closing connection to remote work unit %s: %s", remoteUnitID, cerr) + } conn = nil continue @@ -305,7 +312,10 @@ func (rw *remoteUnit) monitorRemoteStatus(mw *utils.JobContext, forRelease bool) status, err := utils.ReadStringContext(mw, reader, '\n') if err != nil { rw.GetWorkceptor().nc.GetLogger().Debug("Read error reading from %s: %s\n", remoteNode, err) - _ = conn.(interface{ CloseConnection() error }).CloseConnection() + cerr := conn.(interface{ CloseConnection() error }).CloseConnection() + if cerr != nil { + rw.GetWorkceptor().nc.GetLogger().Error("Error closing connection from node %s: %s", remoteNode, cerr) + } conn = nil continue @@ -404,7 +414,10 @@ func (rw *remoteUnit) monitorRemoteStdout(mw *utils.JobContext) { conn, reader := rw.getConnection(mw) defer func() { if conn != nil { - _ = conn.(interface{ CloseConnection() error }).CloseConnection() + cerr := conn.(interface{ CloseConnection() error }).CloseConnection() + if cerr != nil { + rw.GetWorkceptor().nc.GetLogger().Error("Error closing connection to %s: %s", remoteUnitID, cerr) + } } }() if conn == nil { @@ -464,7 +477,10 @@ func (rw *remoteUnit) monitorRemoteStdout(mw *utils.JobContext) { if ok { cr.CancelRead() } - _ = conn.(interface{ CloseConnection() error }).CloseConnection() + cerr := conn.(interface{ CloseConnection() error }).CloseConnection() + if cerr != nil { + rw.GetWorkceptor().nc.GetLogger().Error("Error closing connection to %s: %s", remoteUnitID, cerr) + } return } @@ -657,9 +673,12 @@ func (rw *remoteUnit) cancelOrRelease(release bool, force bool) error { return nil } if release && force { - _ = rw.connectAndRun(rw.GetWorkceptor().ctx, func(ctx context.Context, conn net.Conn, reader *bufio.Reader) error { + err := rw.connectAndRun(rw.GetWorkceptor().ctx, func(ctx context.Context, conn net.Conn, reader *bufio.Reader) error { return rw.cancelOrReleaseRemoteUnit(ctx, conn, reader, true) }) + if err != nil { + rw.GetWorkceptor().nc.GetLogger().Error("Error closing connection to %s: %s", remoteUnitID, cerr) + } return rw.BaseWorkUnitForWorkUnit.Release(true) }