From 29da48a64398d65d738afcfc6bceeb1d587457c7 Mon Sep 17 00:00:00 2001 From: Antonin Bas Date: Tue, 3 Sep 2024 18:13:22 -0700 Subject: [PATCH 1/2] Restore check for closed TCP connection in exporter process This is a partial reversal of d072ed8. It turns out that the check can actually be useful as it can detect that the collector ("server" side) has closed its side of the connection, and this can be used as a signal to close our side of the connection as well. This can happen when a process using our collector implementation is closed, but no TCP RST is received when sendign a data set (in particular, this can happen when running in K8s and using a virtual IP to connect to the collector). This check can detect the issue much faster than relying on a keep-alive timeout. Furthermore, a client of this library could end up blocking if the connection has not timed out yet and the send buffer is full. Signed-off-by: Antonin Bas --- pkg/exporter/process.go | 69 +++++++++++++++++++++++++++++++----- pkg/exporter/process_test.go | 24 +++++++++++++ 2 files changed, 85 insertions(+), 8 deletions(-) diff --git a/pkg/exporter/process.go b/pkg/exporter/process.go index 60cb48e3..674ff607 100644 --- a/pkg/exporter/process.go +++ b/pkg/exporter/process.go @@ -20,8 +20,10 @@ import ( "crypto/x509" "encoding/json" "fmt" + "io" "net" "sync" + "sync/atomic" "time" "github.com/pion/dtls/v2" @@ -31,6 +33,7 @@ import ( ) const startTemplateID uint16 = 255 +const defaultCheckConnInterval = 10 * time.Second const defaultJSONBufferLen = 5000 type templateValue struct { @@ -53,11 +56,12 @@ type ExportingProcess struct { seqNumber uint32 templateID uint16 templatesMap map[uint16]templateValue - templateRefCh chan struct{} templateMutex sync.Mutex sendJSONRecord bool jsonBufferLen int wg sync.WaitGroup + isClosed atomic.Bool + stopCh chan struct{} } type ExporterTLSClientConfig struct { @@ -153,9 +157,37 @@ func InitExportingProcess(input ExporterInput) (*ExportingProcess, error) { seqNumber: 0, templateID: startTemplateID, templatesMap: make(map[uint16]templateValue), - templateRefCh: make(chan struct{}), sendJSONRecord: input.SendJSONRecord, wg: sync.WaitGroup{}, + stopCh: make(chan struct{}), + } + + // Start a goroutine to check whether the collector has already closed the TCP connection. + if input.CollectorProtocol == "tcp" { + interval := input.CheckConnInterval + if interval == 0 { + interval = defaultCheckConnInterval + } + expProc.wg.Add(1) + go func() { + defer expProc.wg.Done() + ticker := time.NewTicker(interval) + oneByteForRead := make([]byte, 1) + defer ticker.Stop() + for { + select { + case <-expProc.stopCh: + return + case <-ticker.C: + isConnected := expProc.checkConnToCollector(oneByteForRead) + if !isConnected { + klog.Error("Connector has closed its side of the TCP connection, closing our side") + expProc.closeConnToCollector() + return + } + } + } + }() } // Template refresh logic is only for UDP transport. @@ -171,12 +203,14 @@ func InitExportingProcess(input ExporterInput) (*ExportingProcess, error) { defer ticker.Stop() for { select { - case <-expProc.templateRefCh: + case <-expProc.stopCh: return case <-ticker.C: err := expProc.sendRefreshedTemplates() if err != nil { - klog.Errorf("Error when sending refreshed templates: %v", err) + klog.Errorf("Error when sending refreshed templates, closing the connection to the collector: %v", err) + expProc.closeConnToCollector() + return } } } @@ -231,16 +265,35 @@ func (ep *ExportingProcess) GetMsgSizeLimit() int { } // CloseConnToCollector closes the connection to the collector. -// It should not be called twice. InitExportingProcess can be called again after calling -// CloseConnToCollector. +// It can safely be closed more than once, and subsequent calls will be no-ops. func (ep *ExportingProcess) CloseConnToCollector() { - close(ep.templateRefCh) + ep.closeConnToCollector() + ep.wg.Wait() +} + +// closeConnToCollector is the internal version of CloseConnToCollector. It closes all the resources +// but does not wait for the ep.wg counter to get to 0. Goroutines which need to terminate in order +// for ep.wg to be decremented can safely call closeConnToCollector. +func (ep *ExportingProcess) closeConnToCollector() { + if ep.isClosed.Swap(true) { + return + } + close(ep.stopCh) if err := ep.connToCollector.Close(); err != nil { // Just log the error that happened when closing the connection. Not returning error // as we do not expect library consumers to exit their programs with this error. klog.Errorf("Error when closing connection to collector: %v", err) } - ep.wg.Wait() +} + +// checkConnToCollector checks whether the connection from exporter is still open +// by trying to read from connection. Closed connection will return EOF from read. +func (ep *ExportingProcess) checkConnToCollector(oneByteForRead []byte) bool { + ep.connToCollector.SetReadDeadline(time.Now().Add(time.Millisecond)) + if _, err := ep.connToCollector.Read(oneByteForRead); err == io.EOF { + return false + } + return true } // NewTemplateID is called to get ID when creating new template record. diff --git a/pkg/exporter/process_test.go b/pkg/exporter/process_test.go index 26e41ed4..0cf6975d 100644 --- a/pkg/exporter/process_test.go +++ b/pkg/exporter/process_test.go @@ -744,3 +744,27 @@ func TestExportingProcess_GetMsgSizeLimit(t *testing.T) { t.Logf("Created exporter connecting to local server with address: %s", conn.LocalAddr().String()) assert.Equal(t, entities.MaxSocketMsgSize, exporter.GetMsgSizeLimit()) } + +func TestExportingProcess_CheckConnToCollector(t *testing.T) { + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("Got error when creating a local server: %v", err) + } + input := ExporterInput{ + CollectorAddress: listener.Addr().String(), + CollectorProtocol: listener.Addr().Network(), + } + exporter, err := InitExportingProcess(input) + if err != nil { + t.Fatalf("Got error when connecting to local server %s: %v", listener.Addr().String(), err) + } + + defer listener.Close() + conn, _ := listener.Accept() + oneByte := make([]byte, 1) + isOpen := exporter.checkConnToCollector(oneByte) + assert.True(t, isOpen) + conn.Close() + isOpen = exporter.checkConnToCollector(oneByte) + assert.False(t, isOpen) +} From 0521c6ac105339f15b6a2ce66b77a3dc69590204 Mon Sep 17 00:00:00 2001 From: Antonin Bas Date: Wed, 4 Sep 2024 10:57:23 -0700 Subject: [PATCH 2/2] Add unit test Signed-off-by: Antonin Bas --- pkg/exporter/process_test.go | 145 +++++++++++++++++++---------------- 1 file changed, 81 insertions(+), 64 deletions(-) diff --git a/pkg/exporter/process_test.go b/pkg/exporter/process_test.go index 0cf6975d..66a245ed 100644 --- a/pkg/exporter/process_test.go +++ b/pkg/exporter/process_test.go @@ -17,6 +17,7 @@ package exporter import ( "crypto/tls" "crypto/x509" + "fmt" "io" "net" "testing" @@ -35,34 +36,54 @@ func init() { registry.LoadRegistry() } -func TestExportingProcess_SendingTemplateRecordToLocalTCPServer(t *testing.T) { - // Create local server for testing - listener, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - t.Fatalf("Got error when creating a local server: %v", err) - } - t.Log("Created local server on random available port for testing") - - buffCh := make(chan []byte) - // Create go routine for local server - // TODO: Move this in to different function with byte size as arg +func runTCPServer(t *testing.T, listener net.Listener, stopCh <-chan struct{}, buffCh chan<- []byte) { + defer listener.Close() go func() { - defer listener.Close() conn, err := listener.Accept() if err != nil { + t.Error(err) return } defer conn.Close() t.Log("Accept the connection from exporter") - buff := make([]byte, 32) - _, err = conn.Read(buff) - if err != nil { - t.Error(err) - } - // Compare only template record part. Remove message header and set header. - buffCh <- buff[20:] - return + go func() { + defer close(buffCh) + buff := make([]byte, 512) + for { + bytes, err := conn.Read(buff) + if err != nil { + return + } + // Remove message header and set header. + buffCh <- buff[20:bytes] + } + }() + <-stopCh }() + <-stopCh +} + +func readWithTimeout[T any](ch <-chan T, timeout time.Duration) (T, error) { + select { + case x, ok := <-ch: + if !ok { + return *new(T), fmt.Errorf("channel was closed") + } + return x, nil + case <-time.After(timeout): + return *new(T), fmt.Errorf("timeout expired") + } +} + +func TestExportingProcess_SendingTemplateRecordToLocalTCPServer(t *testing.T) { + stopCh := make(chan struct{}) + defer close(stopCh) + buffCh := make(chan []byte) + // Create local server for testing + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err, "Error when creating a local server") + go runTCPServer(t, listener, stopCh, buffCh) + t.Log("Created local server on random available port for testing") // Create exporter using local server info input := ExporterInput{ @@ -104,6 +125,9 @@ func TestExportingProcess_SendingTemplateRecordToLocalTCPServer(t *testing.T) { // 32 is the size of the IPFIX message including all headers assert.Equal(t, 32, bytesSent) assert.Equal(t, uint32(0), exporter.seqNumber) + bytesAtServer, err := readWithTimeout(buffCh, 1*time.Second) + assert.NoError(t, err) + assert.Len(t, bytesAtServer, 12) exporter.CloseConnToCollector() } @@ -195,35 +219,15 @@ func TestExportingProcess_SendingTemplateRecordToLocalUDPServer(t *testing.T) { } func TestExportingProcess_SendingDataRecordToLocalTCPServer(t *testing.T) { + stopCh := make(chan struct{}) + defer close(stopCh) + buffCh := make(chan []byte) // Create local server for testing listener, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - t.Fatalf("Got error when creating a local server: %v", err) - } + require.NoError(t, err, "Error when creating a local server") + go runTCPServer(t, listener, stopCh, buffCh) t.Log("Created local server on random available port for testing") - buffCh := make(chan []byte) - // Create go routine for local server - // TODO: Move this in to different function with byte size as arg - go func() { - defer listener.Close() - conn, err := listener.Accept() - if err != nil { - return - } - defer conn.Close() - t.Log("Accept the connection from exporter") - buff := make([]byte, 28) - _, err = conn.Read(buff) - if err != nil { - t.Error(err) - } - // Compare only data record part. Remove message header and set header. - // TODO: Verify message header and set header through hardcoded byte values - buffCh <- buff[20:] - return - }() - // Create exporter using local server info input := ExporterInput{ CollectorAddress: listener.Addr().String(), @@ -279,7 +283,9 @@ func TestExportingProcess_SendingDataRecordToLocalTCPServer(t *testing.T) { assert.NoError(t, err) // 28 is the size of the IPFIX message including all headers (20 bytes) assert.Equal(t, 28, bytesSent) - assert.Equal(t, dataRecBuff, <-buffCh) + bytesAtServer, err := readWithTimeout(buffCh, 1*time.Second) + assert.NoError(t, err) + assert.Equal(t, dataRecBuff, bytesAtServer) assert.Equal(t, uint32(1), exporter.seqNumber) // Create data set with multiple data records to test invalid message length @@ -535,7 +541,8 @@ func TestInitExportingProcessWithTLS(t *testing.T) { if err == nil { exporter.CloseConnToCollector() } - serverErr := <-serverErrCh + serverErr, err := readWithTimeout(serverErrCh, 1*time.Second) + require.NoError(t, err) if tc.expectedServerErr != "" { assert.ErrorContains(t, serverErr, tc.expectedServerErr) } else { @@ -566,24 +573,10 @@ func TestExportingProcessWithTLS(t *testing.T) { return } + stopCh := make(chan struct{}) + defer close(stopCh) buffCh := make(chan []byte) - go func() { - defer listener.Close() - conn, err := listener.Accept() - if err != nil { - return - } - defer conn.Close() - t.Log("Accept the connection from exporter") - buff := make([]byte, 32) - _, err = conn.Read(buff) - if err != nil { - t.Error(err) - } - // Compare only template record part. Remove message header and set header. - buffCh <- buff[20:] - return - }() + go runTCPServer(t, listener, stopCh, buffCh) // Create exporter using local server info input := ExporterInput{ @@ -768,3 +761,27 @@ func TestExportingProcess_CheckConnToCollector(t *testing.T) { isOpen = exporter.checkConnToCollector(oneByte) assert.False(t, isOpen) } + +func TestExportingProcess_CloseConnToCollectorTwice(t *testing.T) { + stopCh := make(chan struct{}) + defer close(stopCh) + buffCh := make(chan []byte) + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err, "Error when creating a local server") + go runTCPServer(t, listener, stopCh, buffCh) + + input := ExporterInput{ + CollectorAddress: listener.Addr().String(), + CollectorProtocol: listener.Addr().Network(), + ObservationDomainID: 1, + TempRefTimeout: 0, + } + exporter, err := InitExportingProcess(input) + if err != nil { + t.Fatalf("Got error when connecting to local server %s: %v", listener.Addr().String(), err) + } + t.Logf("Created exporter connecting to local server with address: %s", listener.Addr().String()) + + exporter.CloseConnToCollector() + exporter.CloseConnToCollector() +}