From 6479d130366fa4ad17a670c76683995d321f13f0 Mon Sep 17 00:00:00 2001 From: Thom Carlin Date: Thu, 24 Oct 2024 14:26:42 -0400 Subject: [PATCH 1/4] Add remote_work close errors --- pkg/workceptor/remote_work.go | 35 +++++++++++++++++++++++++++-------- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/pkg/workceptor/remote_work.go b/pkg/workceptor/remote_work.go index 6ad1d283e..36f4cc3aa 100644 --- a/pkg/workceptor/remote_work.go +++ b/pkg/workceptor/remote_work.go @@ -24,8 +24,8 @@ import ( // remoteUnit implements the WorkUnit interface for the Receptor remote worker plugin. type remoteUnit struct { BaseWorkUnitForWorkUnit - topJC *utils.JobContext - logger *logger.ReceptorLogger + topJC *utils.JobContext + logger *logger.ReceptorLogger } // RemoteExtraData is the content of the ExtraData JSON field for a remote work unit. @@ -136,7 +136,11 @@ func (rw *remoteUnit) getConnectionAndRun(ctx context.Context, firstTimeSync boo go func() { conn, reader := rw.getConnection(ctx) if conn != nil { - _ = action(ctx, conn, reader) + err := action(ctx, conn, reader) + if err != nil { + rw.GetWorkceptor().nc.GetLogger().Error("Error running action function: %s", err) + } + } else { failure() } @@ -297,7 +301,10 @@ func (rw *remoteUnit) monitorRemoteStatus(mw *utils.JobContext, forRelease bool) _, err := conn.Write([]byte(fmt.Sprintf("work status %s\n", remoteUnitID))) if err != nil { rw.GetWorkceptor().nc.GetLogger().Debug("Write error sending to %s: %s\n", remoteUnitID, err) - _ = conn.(interface{ CloseConnection() error }).CloseConnection() + cerr := conn.(interface{ CloseConnection() error }).CloseConnection() + if cerr != nil { + rw.GetWorkceptor().nc.GetLogger().Error("Error closing connection to remote work unit %s: %s", remoteUnitID, cerr) + } conn = nil continue @@ -305,7 +312,10 @@ func (rw *remoteUnit) monitorRemoteStatus(mw *utils.JobContext, forRelease bool) status, err := utils.ReadStringContext(mw, reader, '\n') if err != nil { rw.GetWorkceptor().nc.GetLogger().Debug("Read error reading from %s: %s\n", remoteNode, err) - _ = conn.(interface{ CloseConnection() error }).CloseConnection() + cerr := conn.(interface{ CloseConnection() error }).CloseConnection() + if cerr != nil { + rw.GetWorkceptor().nc.GetLogger().Error("Error closing connection from node %s: %s", remoteNode, cerr) + } conn = nil continue @@ -404,7 +414,10 @@ func (rw *remoteUnit) monitorRemoteStdout(mw *utils.JobContext) { conn, reader := rw.getConnection(mw) defer func() { if conn != nil { - _ = conn.(interface{ CloseConnection() error }).CloseConnection() + cerr := conn.(interface{ CloseConnection() error }).CloseConnection() + if cerr != nil { + rw.GetWorkceptor().nc.GetLogger().Error("Error closing connection to %s: %s", remoteUnitID, cerr) + } } }() if conn == nil { @@ -464,7 +477,10 @@ func (rw *remoteUnit) monitorRemoteStdout(mw *utils.JobContext) { if ok { cr.CancelRead() } - _ = conn.(interface{ CloseConnection() error }).CloseConnection() + cerr := conn.(interface{ CloseConnection() error }).CloseConnection() + if cerr != nil { + rw.GetWorkceptor().nc.GetLogger().Error("Error closing connection to %s: %s", remoteUnitID, cerr) + } return } @@ -657,9 +673,12 @@ func (rw *remoteUnit) cancelOrRelease(release bool, force bool) error { return nil } if release && force { - _ = rw.connectAndRun(rw.GetWorkceptor().ctx, func(ctx context.Context, conn net.Conn, reader *bufio.Reader) error { + err := rw.connectAndRun(rw.GetWorkceptor().ctx, func(ctx context.Context, conn net.Conn, reader *bufio.Reader) error { return rw.cancelOrReleaseRemoteUnit(ctx, conn, reader, true) }) + if err != nil { + rw.GetWorkceptor().nc.GetLogger().Error("Error closing connection to %s: %s", remoteUnitID, cerr) + } return rw.BaseWorkUnitForWorkUnit.Release(true) } From 2876d2e1d8704b8a2d2170694b5647291ffbf0ca Mon Sep 17 00:00:00 2001 From: Thom Carlin Date: Thu, 24 Oct 2024 14:29:50 -0400 Subject: [PATCH 2/4] Typo --- pkg/workceptor/remote_work.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/workceptor/remote_work.go b/pkg/workceptor/remote_work.go index 36f4cc3aa..955fe8176 100644 --- a/pkg/workceptor/remote_work.go +++ b/pkg/workceptor/remote_work.go @@ -677,7 +677,7 @@ func (rw *remoteUnit) cancelOrRelease(release bool, force bool) error { return rw.cancelOrReleaseRemoteUnit(ctx, conn, reader, true) }) if err != nil { - rw.GetWorkceptor().nc.GetLogger().Error("Error closing connection to %s: %s", remoteUnitID, cerr) + rw.GetWorkceptor().nc.GetLogger().Error("Error with connect and run: %s", err) } return rw.BaseWorkUnitForWorkUnit.Release(true) From 404e691a262e6142559c87be5ac639cbd0a188a9 Mon Sep 17 00:00:00 2001 From: Thom Carlin Date: Thu, 24 Oct 2024 14:31:39 -0400 Subject: [PATCH 3/4] Formatting error --- pkg/workceptor/remote_work.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/workceptor/remote_work.go b/pkg/workceptor/remote_work.go index 955fe8176..85fac217b 100644 --- a/pkg/workceptor/remote_work.go +++ b/pkg/workceptor/remote_work.go @@ -24,8 +24,8 @@ import ( // remoteUnit implements the WorkUnit interface for the Receptor remote worker plugin. type remoteUnit struct { BaseWorkUnitForWorkUnit - topJC *utils.JobContext - logger *logger.ReceptorLogger + topJC *utils.JobContext + logger *logger.ReceptorLogger } // RemoteExtraData is the content of the ExtraData JSON field for a remote work unit. From 954bf996ee1fd5d6e21ea5b903b4c48fdd18a459 Mon Sep 17 00:00:00 2001 From: Thom Carlin Date: Thu, 24 Oct 2024 14:33:19 -0400 Subject: [PATCH 4/4] Try 2 to fix formatting error --- pkg/workceptor/remote_work.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/workceptor/remote_work.go b/pkg/workceptor/remote_work.go index 85fac217b..009afc831 100644 --- a/pkg/workceptor/remote_work.go +++ b/pkg/workceptor/remote_work.go @@ -140,7 +140,6 @@ func (rw *remoteUnit) getConnectionAndRun(ctx context.Context, firstTimeSync boo if err != nil { rw.GetWorkceptor().nc.GetLogger().Error("Error running action function: %s", err) } - } else { failure() }