diff --git a/build/charts/flow-aggregator/README.md b/build/charts/flow-aggregator/README.md index 3678788e34b..cd040447e28 100644 --- a/build/charts/flow-aggregator/README.md +++ b/build/charts/flow-aggregator/README.md @@ -50,6 +50,7 @@ Kubernetes: `>= 1.19.0-0` | image | object | `{"pullPolicy":"IfNotPresent","repository":"antrea/flow-aggregator","tag":""}` | Container image used by Flow Aggregator. | | inactiveFlowRecordTimeout | string | `"90s"` | Provide the inactive flow record timeout as a duration string. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". | | logVerbosity | int | `0` | Log verbosity switch for Flow Aggregator. | +| mode | string | `"Aggregate"` | Mode in which to run the flow aggregator. Must be one of "Aggregate" (flow records received from source and destination are aggregated and sent as one flow record) or "Proxy" (flow records are enhanced with some additional information, then sent directly without buffering or aggregation). | | recordContents.podLabels | bool | `false` | Determine whether source and destination Pod labels will be included in the flow records. | | s3Uploader.awsCredentials | object | `{"aws_access_key_id":"changeme","aws_secret_access_key":"changeme","aws_session_token":""}` | Credentials to authenticate to AWS. They will be stored in a Secret and injected into the Pod as environment variables. | | s3Uploader.bucketName | string | `""` | BucketName is the name of the S3 bucket to which flow records will be uploaded. It is required. | diff --git a/build/charts/flow-aggregator/conf/flow-aggregator.conf b/build/charts/flow-aggregator/conf/flow-aggregator.conf index e6bfef7701e..8f23c85a90d 100644 --- a/build/charts/flow-aggregator/conf/flow-aggregator.conf +++ b/build/charts/flow-aggregator/conf/flow-aggregator.conf @@ -1,3 +1,8 @@ +# Mode in which to run the flow aggregator. Must be one of "Aggregate" (flow records received +# from source and destination are aggregated and sent as one flow record) or "Proxy" (flow records +# are enhanced with some additional information, then sent directly without buffering or aggregation). +mode: {{ .Values.mode }} + # Provide the active flow record timeout as a duration string. This determines # how often the flow aggregator exports the active flow records to the flow # collector. Thus, for flows with a continuous stream of packets, a flow record diff --git a/build/charts/flow-aggregator/values.yaml b/build/charts/flow-aggregator/values.yaml index a2dc7ac4843..5b92a546894 100644 --- a/build/charts/flow-aggregator/values.yaml +++ b/build/charts/flow-aggregator/values.yaml @@ -4,6 +4,10 @@ image: pullPolicy: "IfNotPresent" tag: "" +# -- Mode in which to run the flow aggregator. Must be one of "Aggregate" (flow records received +# from source and destination are aggregated and sent as one flow record) or "Proxy" (flow records +# are enhanced with some additional information, then sent directly without buffering or aggregation). +mode: "Aggregate" # -- Provide the active flow record timeout as a duration string. # Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". activeFlowRecordTimeout: 60s diff --git a/build/yamls/flow-aggregator.yml b/build/yamls/flow-aggregator.yml index 1f3e7c9c647..be812f9af86 100644 --- a/build/yamls/flow-aggregator.yml +++ b/build/yamls/flow-aggregator.yml @@ -154,6 +154,11 @@ subjects: apiVersion: v1 data: flow-aggregator.conf: | + # Mode in which to run the flow aggregator. Must be one of "Aggregate" (flow records received + # from source and destination are aggregated and sent as one flow record) or "Proxy" (flow records + # are enhanced with some additional information, then sent directly without buffering or aggregation). + mode: Aggregate + # Provide the active flow record timeout as a duration string. This determines # how often the flow aggregator exports the active flow records to the flow # collector. Thus, for flows with a continuous stream of packets, a flow record diff --git a/pkg/config/flowaggregator/config.go b/pkg/config/flowaggregator/config.go index a6e166fc500..113d01abc35 100644 --- a/pkg/config/flowaggregator/config.go +++ b/pkg/config/flowaggregator/config.go @@ -22,7 +22,15 @@ const ( AggregatorTransportProtocolUDP AggregatorTransportProtocol = "UDP" ) +type AggregatorMode string + +const ( + AggregatorModeAggregate AggregatorMode = "Aggregate" + AggregatorModeProxy AggregatorMode = "Proxy" +) + type FlowAggregatorConfig struct { + Mode AggregatorMode `yaml:"mode,omitempty"` // Provide the active flow record timeout as a duration string. This determines // how often the flow aggregator exports the active flow records to the flow // collector. Thus, for flows with a continuous stream of packets, a flow record diff --git a/pkg/config/flowaggregator/default.go b/pkg/config/flowaggregator/default.go index 9e452ecec6d..9a9478577a1 100644 --- a/pkg/config/flowaggregator/default.go +++ b/pkg/config/flowaggregator/default.go @@ -48,6 +48,9 @@ const ( ) func SetConfigDefaults(flowAggregatorConf *FlowAggregatorConfig) { + if flowAggregatorConf.Mode == "" { + flowAggregatorConf.Mode = AggregatorModeAggregate + } if flowAggregatorConf.ActiveFlowRecordTimeout == "" { flowAggregatorConf.ActiveFlowRecordTimeout = DefaultActiveFlowRecordTimeout } diff --git a/pkg/flowaggregator/exporter/ipfix.go b/pkg/flowaggregator/exporter/ipfix.go index 91d2a17a087..c9edbe473b7 100644 --- a/pkg/flowaggregator/exporter/ipfix.go +++ b/pkg/flowaggregator/exporter/ipfix.go @@ -45,6 +45,7 @@ type IPFIXExporter struct { externalFlowCollectorProto string exportingProcess ipfix.IPFIXExportingProcess sendJSONRecord bool + aggregatorMode flowaggregatorconfig.AggregatorMode observationDomainID uint32 templateRefreshTimeout time.Duration templateIDv4 uint16 @@ -88,6 +89,7 @@ func NewIPFIXExporter( externalFlowCollectorAddr: opt.ExternalFlowCollectorAddr, externalFlowCollectorProto: opt.ExternalFlowCollectorProto, sendJSONRecord: sendJSONRecord, + aggregatorMode: opt.AggregatorMode, observationDomainID: observationDomainID, templateRefreshTimeout: opt.TemplateRefreshTimeout, registry: registry, @@ -274,66 +276,85 @@ func (e *IPFIXExporter) sendTemplateSet(isIPv6 bool) (int, error) { } elements = append(elements, ie) } - // The order of source and destination stats elements needs to match the order specified in - // addFieldsForStatsAggregation method in go-ipfix aggregation process. - for i := range infoelements.StatsElementList { - // Add Antrea source stats fields - ieName := infoelements.AntreaSourceStatsElementList[i] - ie, err := e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID) - if err != nil { - return 0, err + if e.aggregatorMode == flowaggregatorconfig.AggregatorModeAggregate { + // The order of source and destination stats elements needs to match the order specified in + // addFieldsForStatsAggregation method in go-ipfix aggregation process. + for i := range infoelements.StatsElementList { + // Add Antrea source stats fields + ieName := infoelements.AntreaSourceStatsElementList[i] + ie, err := e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID) + if err != nil { + return 0, err + } + elements = append(elements, ie) + // Add Antrea destination stats fields + ieName = infoelements.AntreaDestinationStatsElementList[i] + ie, err = e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID) + if err != nil { + return 0, err + } + elements = append(elements, ie) } - elements = append(elements, ie) - // Add Antrea destination stats fields - ieName = infoelements.AntreaDestinationStatsElementList[i] - ie, err = e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID) - if err != nil { - return 0, err + for _, ie := range infoelements.AntreaFlowEndSecondsElementList { + ie, err := e.createInfoElementForTemplateSet(ie, ipfixregistry.AntreaEnterpriseID) + if err != nil { + return 0, err + } + elements = append(elements, ie) + } + for i := range infoelements.AntreaThroughputElementList { + // Add common throughput fields + ieName := infoelements.AntreaThroughputElementList[i] + ie, err := e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID) + if err != nil { + return 0, err + } + elements = append(elements, ie) + // Add source node specific throughput fields + ieName = infoelements.AntreaSourceThroughputElementList[i] + ie, err = e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID) + if err != nil { + return 0, err + } + elements = append(elements, ie) + // Add destination node specific throughput fields + ieName = infoelements.AntreaDestinationThroughputElementList[i] + ie, err = e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID) + if err != nil { + return 0, err + } + elements = append(elements, ie) } - elements = append(elements, ie) } - for _, ie := range infoelements.AntreaFlowEndSecondsElementList { + for _, ie := range infoelements.AntreaLabelsElementList { ie, err := e.createInfoElementForTemplateSet(ie, ipfixregistry.AntreaEnterpriseID) if err != nil { return 0, err } elements = append(elements, ie) } - for i := range infoelements.AntreaThroughputElementList { - // Add common throughput fields - ieName := infoelements.AntreaThroughputElementList[i] - ie, err := e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID) - if err != nil { - return 0, err - } - elements = append(elements, ie) - // Add source node specific throughput fields - ieName = infoelements.AntreaSourceThroughputElementList[i] - ie, err = e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID) + ie, err := e.createInfoElementForTemplateSet("clusterId", ipfixregistry.AntreaEnterpriseID) + if err != nil { + return 0, err + } + elements = append(elements, ie) + if e.aggregatorMode == flowaggregatorconfig.AggregatorModeProxy { + ie, err := e.createInfoElementForTemplateSet("originalObservationDomainId", ipfixregistry.IANAEnterpriseID) if err != nil { return 0, err } elements = append(elements, ie) - // Add destination node specific throughput fields - ieName = infoelements.AntreaDestinationThroughputElementList[i] - ie, err = e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID) + ie, err = e.createInfoElementForTemplateSet("originalExporterIPv4Address", ipfixregistry.IANAEnterpriseID) if err != nil { return 0, err } elements = append(elements, ie) - } - for _, ie := range infoelements.AntreaLabelsElementList { - ie, err := e.createInfoElementForTemplateSet(ie, ipfixregistry.AntreaEnterpriseID) + ie, err = e.createInfoElementForTemplateSet("originalExporterIPv6Address", ipfixregistry.IANAEnterpriseID) if err != nil { return 0, err } elements = append(elements, ie) } - ie, err := e.createInfoElementForTemplateSet("clusterId", ipfixregistry.AntreaEnterpriseID) - if err != nil { - return 0, err - } - elements = append(elements, ie) e.set.ResetSet() if err := e.set.PrepareSet(ipfixentities.Template, templateID); err != nil { return 0, err diff --git a/pkg/flowaggregator/exporter/ipfix_test.go b/pkg/flowaggregator/exporter/ipfix_test.go index e7c374efcb4..e9749807573 100644 --- a/pkg/flowaggregator/exporter/ipfix_test.go +++ b/pkg/flowaggregator/exporter/ipfix_test.go @@ -67,6 +67,7 @@ func TestIPFIXExporter_sendTemplateSet(t *testing.T) { templateIDv6: testTemplateIDv6, registry: mockIPFIXRegistry, set: mockTempSet, + aggregatorMode: flowaggregatorconfig.AggregatorModeAggregate, observationDomainID: testObservationDomainID, } elemList := createElementList(isIPv6, mockIPFIXRegistry) @@ -118,13 +119,14 @@ func TestIPFIXExporter_UpdateOptions(t *testing.T) { RecordFormat: "IPFIX", }, } - ipfixExporter := IPFIXExporter{ + ipfixExporter := &IPFIXExporter{ config: config.FlowCollector, externalFlowCollectorAddr: "", externalFlowCollectorProto: "", templateIDv4: testTemplateIDv4, templateIDv6: testTemplateIDv6, set: mockSet, + aggregatorMode: flowaggregatorconfig.AggregatorModeAggregate, observationDomainID: testObservationDomainID, } testTemplateID := testTemplateIDv4 @@ -182,12 +184,13 @@ func TestIPFIXExporter_AddRecord(t *testing.T) { initIPFIXExportingProcess = initIPFIXExportingProcessSaved }() - ipfixExporter := IPFIXExporter{ + ipfixExporter := &IPFIXExporter{ externalFlowCollectorAddr: "", externalFlowCollectorProto: "", templateIDv4: testTemplateIDv4, templateIDv6: testTemplateIDv6, set: mockSet, + aggregatorMode: flowaggregatorconfig.AggregatorModeAggregate, observationDomainID: testObservationDomainID, } testTemplateID := testTemplateIDv4 @@ -216,9 +219,10 @@ func TestIPFIXExporter_initIPFIXExportingProcess_Error(t *testing.T) { initIPFIXExportingProcess = initIPFIXExportingProcessSaved }() - ipfixExporter := IPFIXExporter{ + ipfixExporter := &IPFIXExporter{ externalFlowCollectorAddr: "", externalFlowCollectorProto: "", + aggregatorMode: flowaggregatorconfig.AggregatorModeAggregate, } assert.Error(t, ipfixExporter.AddRecord(mockRecord, false)) @@ -231,13 +235,14 @@ func TestIPFIXExporter_sendRecord_Error(t *testing.T) { mockSet := ipfixentitiestesting.NewMockSet(ctrl) mockRecord := ipfixentitiestesting.NewMockRecord(ctrl) - ipfixExporter := IPFIXExporter{ + ipfixExporter := &IPFIXExporter{ externalFlowCollectorAddr: "", externalFlowCollectorProto: "", exportingProcess: mockIPFIXExpProc, templateIDv4: testTemplateIDv4, templateIDv6: testTemplateIDv6, set: mockSet, + aggregatorMode: flowaggregatorconfig.AggregatorModeAggregate, observationDomainID: testObservationDomainID, } testTemplateID := testTemplateIDv4 @@ -308,7 +313,9 @@ func TestInitExportingProcess(t *testing.T) { t.Run("tcp success", func(t *testing.T) { ctrl := gomock.NewController(t) mockIPFIXRegistry := ipfixtesting.NewMockIPFIXRegistry(ctrl) - opt := &options.Options{} + opt := &options.Options{ + AggregatorMode: flowaggregatorconfig.AggregatorModeAggregate, + } opt.Config = &flowaggregatorconfig.FlowAggregatorConfig{} flowaggregatorconfig.SetConfigDefaults(opt.Config) listener, err := net.Listen("tcp", "127.0.0.1:0") @@ -326,7 +333,9 @@ func TestInitExportingProcess(t *testing.T) { t.Run("udp success", func(t *testing.T) { ctrl := gomock.NewController(t) mockIPFIXRegistry := ipfixtesting.NewMockIPFIXRegistry(ctrl) - opt := &options.Options{} + opt := &options.Options{ + AggregatorMode: flowaggregatorconfig.AggregatorModeAggregate, + } opt.Config = &flowaggregatorconfig.FlowAggregatorConfig{} flowaggregatorconfig.SetConfigDefaults(opt.Config) udpAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0") @@ -346,7 +355,9 @@ func TestInitExportingProcess(t *testing.T) { t.Run("tcp failure", func(t *testing.T) { ctrl := gomock.NewController(t) mockIPFIXRegistry := ipfixtesting.NewMockIPFIXRegistry(ctrl) - opt := &options.Options{} + opt := &options.Options{ + AggregatorMode: flowaggregatorconfig.AggregatorModeAggregate, + } opt.Config = &flowaggregatorconfig.FlowAggregatorConfig{} flowaggregatorconfig.SetConfigDefaults(opt.Config) // dialing this address is guaranteed to fail (we use 0 as the port number) diff --git a/pkg/flowaggregator/flowaggregator.go b/pkg/flowaggregator/flowaggregator.go index de6cda63350..fd7fa4bf70a 100644 --- a/pkg/flowaggregator/flowaggregator.go +++ b/pkg/flowaggregator/flowaggregator.go @@ -18,6 +18,7 @@ import ( "bytes" "encoding/json" "fmt" + "net" "os" "path/filepath" "sync" @@ -101,6 +102,7 @@ var ( ) type flowAggregator struct { + aggregatorMode flowaggregatorconfig.AggregatorMode clusterUUID uuid.UUID aggregatorTransportProtocol flowaggregatorconfig.AggregatorTransportProtocol collectingProcess ipfix.IPFIXCollectingProcess @@ -124,6 +126,7 @@ type flowAggregator struct { s3Exporter exporter.Interface logExporter exporter.Interface logTickerDuration time.Duration + preprocessorOutCh chan *ipfixentities.Message } func NewFlowAggregator( @@ -160,6 +163,7 @@ func NewFlowAggregator( } fa := &flowAggregator{ + aggregatorMode: opt.AggregatorMode, clusterUUID: clusterUUID, aggregatorTransportProtocol: opt.AggregatorTransportProtocol, activeFlowRecordTimeout: opt.ActiveFlowRecordTimeout, @@ -175,19 +179,19 @@ func NewFlowAggregator( configData: data, APIServer: opt.Config.APIServer, logTickerDuration: time.Minute, + // We support buffering a small amount of messages. + preprocessorOutCh: make(chan *ipfixentities.Message, 16), } if err := fa.InitCollectingProcess(); err != nil { return nil, fmt.Errorf("error when creating collecting process: %w", err) } - // Use a buffered channel which ideally should be large enough to accommodate all the records - // included in a given IPFIX message. It would be unusual to have more than 128 records in - // an IPFIX message. - recordCh := make(chan ipfixentities.Record, 128) - if err := fa.InitPreprocessor(recordCh); err != nil { + if err := fa.InitPreprocessor(); err != nil { return nil, fmt.Errorf("error when creating preprocessor: %w", err) } - if err := fa.InitAggregationProcess(recordCh); err != nil { - return nil, fmt.Errorf("error when creating aggregation process: %w", err) + if opt.AggregatorMode == flowaggregatorconfig.AggregatorModeAggregate { + if err := fa.InitAggregationProcess(); err != nil { + return nil, fmt.Errorf("error when creating aggregation process: %w", err) + } } if opt.Config.ClickHouse.Enable { var err error @@ -263,10 +267,16 @@ func (fa *flowAggregator) InitCollectingProcess() error { IsEncrypted: false, } } - cpInput.NumExtraElements = len(infoelements.AntreaSourceStatsElementList) + len(infoelements.AntreaDestinationStatsElementList) + len(infoelements.AntreaLabelsElementList) + - len(infoelements.AntreaFlowEndSecondsElementList) + len(infoelements.AntreaThroughputElementList) + len(infoelements.AntreaSourceThroughputElementList) + len(infoelements.AntreaDestinationThroughputElementList) + cpInput.NumExtraElements = len(infoelements.AntreaLabelsElementList) // clusterId cpInput.NumExtraElements += 1 + if fa.aggregatorMode == flowaggregatorconfig.AggregatorModeAggregate { + cpInput.NumExtraElements += len(infoelements.AntreaSourceStatsElementList) + len(infoelements.AntreaDestinationStatsElementList) + + len(infoelements.AntreaFlowEndSecondsElementList) + len(infoelements.AntreaThroughputElementList) + len(infoelements.AntreaSourceThroughputElementList) + len(infoelements.AntreaDestinationThroughputElementList) + } else { + // originalObservationDomainId, originalExporterIPv4Address, originalExporterIPv6Address + cpInput.NumExtraElements += 3 + } // Tell the collector to accept IEs which are not part of the IPFIX registry (hardcoded in // the go-ipfix library). The preprocessor will take care of removing these elements. cpInput.DecodingMode = collector.DecodingModeLenientKeepUnknown @@ -275,7 +285,7 @@ func (fa *flowAggregator) InitCollectingProcess() error { return err } -func (fa *flowAggregator) InitPreprocessor(recordCh chan<- ipfixentities.Record) error { +func (fa *flowAggregator) InitPreprocessor() error { getInfoElementFromRegistry := func(ieName string, enterpriseID uint32) (*ipfixentities.InfoElement, error) { ie, err := fa.registry.GetInfoElement(ieName, enterpriseID) if err != nil { @@ -325,14 +335,14 @@ func (fa *flowAggregator) InitPreprocessor(recordCh chan<- ipfixentities.Record) if err != nil { return err } - fa.preprocessor, err = newPreprocessor(infoElementsIPv4, infoElementsIPv6, fa.collectingProcess.GetMsgChan(), recordCh) + fa.preprocessor, err = newPreprocessor(infoElementsIPv4, infoElementsIPv6, fa.collectingProcess.GetMsgChan(), fa.preprocessorOutCh) return err } -func (fa *flowAggregator) InitAggregationProcess(recordCh <-chan ipfixentities.Record) error { +func (fa *flowAggregator) InitAggregationProcess() error { var err error apInput := ipfixintermediate.AggregationInput{ - RecordChan: recordCh, + MessageChan: fa.preprocessorOutCh, WorkerNum: aggregationWorkerNum, CorrelateFields: correlateFields, ActiveExpiryTimeout: fa.activeFlowRecordTimeout, @@ -360,13 +370,15 @@ func (fa *flowAggregator) Run(stopCh <-chan struct{}) { defer ipfixProcessesWg.Done() fa.preprocessor.Run(stopCh) }() - ipfixProcessesWg.Add(1) - go func() { - // Same comment as above. - defer ipfixProcessesWg.Done() - // blocking function, will return when fa.aggregationProcess.Stop() is called - fa.aggregationProcess.Start() - }() + if fa.aggregationProcess != nil { + ipfixProcessesWg.Add(1) + go func() { + // Same comment as above. + defer ipfixProcessesWg.Done() + // blocking function, will return when fa.aggregationProcess.Stop() is called + fa.aggregationProcess.Start() + }() + } if fa.ipfixExporter != nil { fa.ipfixExporter.Start() @@ -426,10 +438,6 @@ func (fa *flowAggregator) Run(stopCh <-chan struct{}) { // function, hence preventing any concurrency issue as the exporter.Interface // implementations are not safe for concurrent access. func (fa *flowAggregator) flowExportLoop(stopCh <-chan struct{}) { - expireTimer := time.NewTimer(fa.activeFlowRecordTimeout) - defer expireTimer.Stop() - logTicker := time.NewTicker(fa.logTickerDuration) - defer logTicker.Stop() defer func() { // We stop the exporters from flowExportLoop and not from Run, // to avoid any possible race condition. @@ -446,6 +454,130 @@ func (fa *flowAggregator) flowExportLoop(stopCh <-chan struct{}) { fa.logExporter.Stop() } }() + if fa.aggregatorMode == flowaggregatorconfig.AggregatorModeAggregate { + fa.flowExportLoopAggregate(stopCh) + } else if fa.aggregatorMode == flowaggregatorconfig.AggregatorModeProxy { + fa.flowExportLoopProxy(stopCh) + } +} + +func (fa *flowAggregator) proxyRecord(record ipfixentities.Record, obsDomainID uint32, exporterAddress string) error { + getAddress := func(record ipfixentities.Record, name string) string { + element, _, exist := record.GetInfoElementWithValue(name) + if !exist { + return "" + } + return element.GetIPAddressValue().String() + } + + getFlowType := func(record ipfixentities.Record) uint8 { + element, _, exist := record.GetInfoElementWithValue("flowType") + if !exist { + klog.ErrorS(nil, "Missing flowType") + return 0 + } + return element.GetUnsigned8Value() + } + + sourceIPv4Address := getAddress(record, "sourceIPv4Address") + sourceIPv6Address := getAddress(record, "sourceIPv6Address") + destinationIPv4Address := getAddress(record, "destinationIPv4Address") + destinationIPv6Address := getAddress(record, "destinationIPv6Address") + var isIPv6 bool + var sourceAddress, destinationAddress string + switch { + case sourceIPv4Address != "" && sourceIPv6Address == "" && destinationIPv4Address != "" && destinationIPv6Address == "": + isIPv6 = false + sourceAddress = sourceIPv4Address + destinationAddress = destinationIPv4Address + case sourceIPv4Address == "" && sourceIPv6Address != "" && destinationIPv4Address == "" && destinationIPv6Address != "": + isIPv6 = true + sourceAddress = sourceIPv6Address + destinationAddress = destinationIPv6Address + default: + // All other cases are invalid. + return fmt.Errorf("invalid format for record: source and destination must be present and IPv4 / IPv6 fields are mutually-exclusive") + } + startTime, err := fa.getRecordStartTime(record) + if err != nil { + return fmt.Errorf("cannot find record start time: %w", err) + } + if getFlowType(record) == ipfixregistry.FlowTypeInterNode { + // This is the only case where K8s metadata could be missing + fa.fillK8sMetadata(sourceAddress, destinationAddress, record, startTime) + } + fa.fillPodLabels(sourceAddress, destinationAddress, record, startTime) + if err := fa.fillClusterID(record); err != nil { + klog.ErrorS(err, "Failed to add clusterId") + } + if err := fa.addOriginalObservationDomainID(record, obsDomainID); err != nil { + klog.ErrorS(err, "Failed to add originalObservationDomainId") + } + originalExporterAddress := net.ParseIP(exporterAddress) + if err := fa.addOriginalExporterIPv4Address(record, originalExporterAddress); err != nil { + klog.ErrorS(err, "Failed to add originalExporterIPv4Address") + } + if err := fa.addOriginalExporterIPv6Address(record, originalExporterAddress); err != nil { + klog.ErrorS(err, "Failed to add originalExporterIPv6Address") + } + return fa.sendRecord(record, isIPv6) +} + +func (fa *flowAggregator) flowExportLoopProxy(stopCh <-chan struct{}) { + logTicker := time.NewTicker(fa.logTickerDuration) + defer logTicker.Stop() + msgCh := fa.collectingProcess.GetMsgChan() + + proxyRecords := func(msg *ipfixentities.Message) { + set := msg.GetSet() + if set.GetSetType() != ipfixentities.Data { // only process data records + return + } + + records := set.GetRecords() + for _, record := range records { + if err := fa.proxyRecord(record, msg.GetObsDomainID(), msg.GetExportAddress()); err != nil { + klog.ErrorS(err, "Failed to proxy record") + } + } + } + + updateCh := fa.updateCh + for { + select { + case <-stopCh: + return + case msg, ok := <-msgCh: + if !ok { + msgCh = nil + break + } + proxyRecords(msg) + case <-logTicker.C: + // Add visibility of processing stats of Flow Aggregator + klog.V(4).InfoS("Total number of records received", "count", fa.collectingProcess.GetNumRecordsReceived()) + klog.V(4).InfoS("Total number of records exported by each active exporter", "count", fa.numRecordsExported) + klog.V(4).InfoS("Number of exporters connected with Flow Aggregator", "count", fa.collectingProcess.GetNumConnToCollector()) + case opt, ok := <-updateCh: + if !ok { + // set the channel to nil and essentially disable this select case. + // we could also just return straightaway as this should only happen + // when stopCh is closed, but maybe it's better to keep stopCh as + // the only signal for stopping the event loop. + updateCh = nil + break + } + fa.updateFlowAggregator(opt) + } + } +} + +func (fa *flowAggregator) flowExportLoopAggregate(stopCh <-chan struct{}) { + expireTimer := time.NewTimer(fa.activeFlowRecordTimeout) + defer expireTimer.Stop() + logTicker := time.NewTicker(fa.logTickerDuration) + defer logTicker.Stop() + updateCh := fa.updateCh for { select { @@ -454,7 +586,7 @@ func (fa *flowAggregator) flowExportLoop(stopCh <-chan struct{}) { case <-expireTimer.C: // Pop the flow record item from expire priority queue in the Aggregation // Process and send the flow records. - if err := fa.aggregationProcess.ForAllExpiredFlowRecordsDo(fa.sendFlowKeyRecord); err != nil { + if err := fa.aggregationProcess.ForAllExpiredFlowRecordsDo(fa.sendAggregatedRecord); err != nil { klog.ErrorS(err, "Error when sending expired flow records") expireTimer.Reset(fa.activeFlowRecordTimeout) continue @@ -481,58 +613,65 @@ func (fa *flowAggregator) flowExportLoop(stopCh <-chan struct{}) { } } -func (fa *flowAggregator) sendFlowKeyRecord(key ipfixintermediate.FlowKey, record *ipfixintermediate.AggregationFlowRecord) error { - isRecordIPv4 := fa.aggregationProcess.IsAggregatedRecordIPv4(*record) - startTime, err := fa.getRecordStartTime(record.Record) - if err != nil { - return fmt.Errorf("cannot find record start time: %v", err) - } - if !fa.aggregationProcess.AreCorrelatedFieldsFilled(*record) { - fa.fillK8sMetadata(key, record.Record, *startTime) - fa.aggregationProcess.SetCorrelatedFieldsFilled(record, true) - } - // Even if fa.includePodLabels is false, we still need to add an empty IE to match the template. - if !fa.aggregationProcess.AreExternalFieldsFilled(*record) { - fa.fillPodLabels(key, record.Record, *startTime) - if err := fa.fillClusterID(record.Record); err != nil { - klog.ErrorS(err, "Failed to add clusterId") - } - fa.aggregationProcess.SetExternalFieldsFilled(record, true) - } +func (fa *flowAggregator) sendRecord(record ipfixentities.Record, isRecordIPv6 bool) error { if fa.ipfixExporter != nil { - if err := fa.ipfixExporter.AddRecord(record.Record, !isRecordIPv4); err != nil { + if err := fa.ipfixExporter.AddRecord(record, isRecordIPv6); err != nil { return err } } if fa.clickHouseExporter != nil { - if err := fa.clickHouseExporter.AddRecord(record.Record, !isRecordIPv4); err != nil { + if err := fa.clickHouseExporter.AddRecord(record, isRecordIPv6); err != nil { return err } } if fa.s3Exporter != nil { - if err := fa.s3Exporter.AddRecord(record.Record, !isRecordIPv4); err != nil { + if err := fa.s3Exporter.AddRecord(record, isRecordIPv6); err != nil { return err } } if fa.logExporter != nil { - if err := fa.logExporter.AddRecord(record.Record, !isRecordIPv4); err != nil { + if err := fa.logExporter.AddRecord(record, isRecordIPv6); err != nil { return err } } + fa.numRecordsExported = fa.numRecordsExported + 1 + return nil +} + +func (fa *flowAggregator) sendAggregatedRecord(key ipfixintermediate.FlowKey, record *ipfixintermediate.AggregationFlowRecord) error { + isRecordIPv4 := fa.aggregationProcess.IsAggregatedRecordIPv4(*record) + startTime, err := fa.getRecordStartTime(record.Record) + if err != nil { + return fmt.Errorf("cannot find record start time: %v", err) + } + if !fa.aggregationProcess.AreCorrelatedFieldsFilled(*record) { + fa.fillK8sMetadata(key.SourceAddress, key.DestinationAddress, record.Record, startTime) + fa.aggregationProcess.SetCorrelatedFieldsFilled(record, true) + } + // Even if fa.includePodLabels is false, we still need to add an empty IE to match the template. + if !fa.aggregationProcess.AreExternalFieldsFilled(*record) { + fa.fillPodLabels(key.SourceAddress, key.DestinationAddress, record.Record, startTime) + if err := fa.fillClusterID(record.Record); err != nil { + klog.ErrorS(err, "Failed to add clusterId") + } + fa.aggregationProcess.SetExternalFieldsFilled(record, true) + } + if err := fa.sendRecord(record.Record, !isRecordIPv4); err != nil { + return err + } if err := fa.aggregationProcess.ResetStatAndThroughputElementsInRecord(record.Record); err != nil { return err } - fa.numRecordsExported = fa.numRecordsExported + 1 return nil } // fillK8sMetadata fills Pod name, Pod namespace and Node name for inter-Node flows // that have incomplete info due to deny network policy. -func (fa *flowAggregator) fillK8sMetadata(key ipfixintermediate.FlowKey, record ipfixentities.Record, startTime time.Time) { +func (fa *flowAggregator) fillK8sMetadata(sourceAddress, destinationAddress string, record ipfixentities.Record, startTime time.Time) { // fill source Pod info when sourcePodName is empty if sourcePodName, _, exist := record.GetInfoElementWithValue("sourcePodName"); exist { if sourcePodName.GetStringValue() == "" { - pod, exist := fa.podStore.GetPodByIPAndTime(key.SourceAddress, startTime) + pod, exist := fa.podStore.GetPodByIPAndTime(sourceAddress, startTime) if exist { sourcePodName.SetStringValue(pod.Name) if sourcePodNamespace, _, exist := record.GetInfoElementWithValue("sourcePodNamespace"); exist { @@ -542,14 +681,14 @@ func (fa *flowAggregator) fillK8sMetadata(key ipfixintermediate.FlowKey, record sourceNodeName.SetStringValue(pod.Spec.NodeName) } } else { - klog.ErrorS(nil, "Cannot find Pod information", "sourceAddress", key.SourceAddress, "flowStartTime", startTime) + klog.ErrorS(nil, "Cannot find Pod information", "sourceAddress", sourceAddress, "flowStartTime", startTime) } } } // fill destination Pod info when destinationPodName is empty if destinationPodName, _, exist := record.GetInfoElementWithValue("destinationPodName"); exist { if destinationPodName.GetStringValue() == "" { - pod, exist := fa.podStore.GetPodByIPAndTime(key.DestinationAddress, startTime) + pod, exist := fa.podStore.GetPodByIPAndTime(destinationAddress, startTime) if exist { destinationPodName.SetStringValue(pod.Name) if destinationPodNamespace, _, exist := record.GetInfoElementWithValue("destinationPodNamespace"); exist { @@ -559,19 +698,19 @@ func (fa *flowAggregator) fillK8sMetadata(key ipfixintermediate.FlowKey, record destinationNodeName.SetStringValue(pod.Spec.NodeName) } } else { - klog.ErrorS(nil, "Cannot find Pod information", "destinationAddress", key.DestinationAddress, "flowStartTime", startTime) + klog.ErrorS(nil, "Cannot find Pod information", "destinationAddress", destinationAddress, "flowStartTime", startTime) } } } } -func (fa *flowAggregator) getRecordStartTime(record ipfixentities.Record) (*time.Time, error) { +func (fa *flowAggregator) getRecordStartTime(record ipfixentities.Record) (time.Time, error) { flowStartSeconds, _, exist := record.GetInfoElementWithValue("flowStartSeconds") if !exist { - return nil, fmt.Errorf("flowStartSeconds filed is empty") + return time.Time{}, fmt.Errorf("flowStartSeconds filed is empty") } startTime := time.Unix(int64(flowStartSeconds.GetUnsigned32Value()), 0) - return &startTime, nil + return startTime, nil } func (fa *flowAggregator) fetchPodLabels(ip string, startTime time.Time) string { @@ -623,11 +762,11 @@ func (fa *flowAggregator) fillPodLabelsForSide(ip string, record ipfixentities.R return nil } -func (fa *flowAggregator) fillPodLabels(key ipfixintermediate.FlowKey, record ipfixentities.Record, startTime time.Time) { - if err := fa.fillPodLabelsForSide(key.SourceAddress, record, startTime, "sourcePodNamespace", "sourcePodName", "sourcePodLabels"); err != nil { +func (fa *flowAggregator) fillPodLabels(sourceAddress, destinationAddress string, record ipfixentities.Record, startTime time.Time) { + if err := fa.fillPodLabelsForSide(sourceAddress, record, startTime, "sourcePodNamespace", "sourcePodName", "sourcePodLabels"); err != nil { klog.ErrorS(err, "Error when filling Pod labels", "side", "source") } - if err := fa.fillPodLabelsForSide(key.DestinationAddress, record, startTime, "destinationPodNamespace", "destinationPodName", "destinationPodLabels"); err != nil { + if err := fa.fillPodLabelsForSide(destinationAddress, record, startTime, "destinationPodNamespace", "destinationPodName", "destinationPodLabels"); err != nil { klog.ErrorS(err, "Error when filling Pod labels", "side", "destination") } } @@ -643,15 +782,64 @@ func (fa *flowAggregator) fillClusterID(record ipfixentities.Record) error { return nil } +func (fa *flowAggregator) addOriginalObservationDomainID(record ipfixentities.Record, obsDomainID uint32) error { + ie, err := fa.registry.GetInfoElement("originalObservationDomainId", ipfixregistry.IANAEnterpriseID) + if err != nil { + return fmt.Errorf("error when getting originalObservationDomainId InfoElement: %w", err) + } + if err := record.AddInfoElement(ipfixentities.NewUnsigned32InfoElement(ie, obsDomainID)); err != nil { + return fmt.Errorf("error when adding originalObservationDomainId InfoElement with value: %w", err) + } + return nil +} + +func (fa *flowAggregator) addOriginalExporterIPv4Address(record ipfixentities.Record, address net.IP) error { + if address.To4() == nil { + address = net.IPv4zero + } + ie, err := fa.registry.GetInfoElement("originalExporterIPv4Address", ipfixregistry.IANAEnterpriseID) + if err != nil { + return fmt.Errorf("error when getting originalExporterIPv4Address InfoElement: %w", err) + } + if err := record.AddInfoElement(ipfixentities.NewIPAddressInfoElement(ie, address)); err != nil { + return fmt.Errorf("error when adding originalExporterIPv4Address InfoElement with value: %w", err) + } + return nil +} + +func (fa *flowAggregator) addOriginalExporterIPv6Address(record ipfixentities.Record, address net.IP) error { + if address.To4() != nil { + address = net.IPv6zero + } + ie, err := fa.registry.GetInfoElement("originalExporterIPv6Address", ipfixregistry.IANAEnterpriseID) + if err != nil { + return fmt.Errorf("error when getting originalExporterIPv6Address InfoElement: %w", err) + } + if err := record.AddInfoElement(ipfixentities.NewIPAddressInfoElement(ie, address)); err != nil { + return fmt.Errorf("error when adding originalExporterIPv6Address InfoElement with value: %w", err) + } + return nil +} + func (fa *flowAggregator) GetFlowRecords(flowKey *ipfixintermediate.FlowKey) []map[string]interface{} { - return fa.aggregationProcess.GetRecords(flowKey) + if fa.aggregationProcess != nil { + return fa.aggregationProcess.GetRecords(flowKey) + } + return nil +} + +func (fa *flowAggregator) getNumFlows() int64 { + if fa.aggregationProcess != nil { + return fa.aggregationProcess.GetNumFlows() + } + return 0 } func (fa *flowAggregator) GetRecordMetrics() querier.Metrics { return querier.Metrics{ NumRecordsExported: fa.numRecordsExported, NumRecordsReceived: fa.collectingProcess.GetNumRecordsReceived(), - NumFlows: fa.aggregationProcess.GetNumFlows(), + NumFlows: fa.getNumFlows(), NumConnToCollector: fa.collectingProcess.GetNumConnToCollector(), WithClickHouseExporter: fa.clickHouseExporter != nil, WithS3Exporter: fa.s3Exporter != nil, diff --git a/pkg/flowaggregator/flowaggregator_test.go b/pkg/flowaggregator/flowaggregator_test.go index c63ccb5f175..40b6a859ac7 100644 --- a/pkg/flowaggregator/flowaggregator_test.go +++ b/pkg/flowaggregator/flowaggregator_test.go @@ -16,6 +16,7 @@ package flowaggregator import ( "bytes" + "net" "os" "path/filepath" "sync" @@ -58,7 +59,7 @@ func init() { ipfixregistry.LoadRegistry() } -func TestFlowAggregator_sendFlowKeyRecord(t *testing.T) { +func TestFlowAggregator_sendAggregatedRecord(t *testing.T) { ipv4Key := ipfixintermediate.FlowKey{ SourceAddress: "10.0.0.1", DestinationAddress: "10.0.0.2", @@ -195,12 +196,173 @@ func TestFlowAggregator_sendFlowKeyRecord(t *testing.T) { mockAggregationProcess.EXPECT().SetExternalFieldsFilled(flowRecord, true) mockAggregationProcess.EXPECT().IsAggregatedRecordIPv4(*flowRecord).Return(!tc.isIPv6) - err := fa.sendFlowKeyRecord(tc.flowKey, flowRecord) + err := fa.sendAggregatedRecord(tc.flowKey, flowRecord) assert.NoError(t, err, "Error when sending flow key record, key: %v, record: %v", tc.flowKey, flowRecord) }) } } +func TestFlowAggregator_proxyRecord(t *testing.T) { + podA := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "podA", + }, + } + podB := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "podB", + }, + } + + const sourceAddressIPv4 = "10.0.0.1" + const destinationAddressIPv4 = "10.0.0.2" + const sourceAddressIPv6 = "2001:0:3238:dfe1:63::fefb" + const destinationAddressIPv6 = "2001:0:3238:dfe1:63::fefc" + const nodeAddressIPv4 = "192.168.77.100" + const nodeAddressIPv6 = "fd3b:fcf5:3e92:d732::100" + + testcases := []struct { + name string + isIPv6 bool + includePodLabels bool + }{ + { + "IPv4_with_pod_labels", + false, + true, + }, + { + "IPv6_with_pod_labels", + true, + true, + }, + { + "IPv4_without_pod_labels", + false, + false, + }, + { + "IPv6_without_pod_labels", + true, + false, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + mockPodStore := podstoretest.NewMockInterface(ctrl) + mockIPFIXExporter := exportertesting.NewMockInterface(ctrl) + mockIPFIXRegistry := ipfixtesting.NewMockIPFIXRegistry(ctrl) + mockRecord := ipfixentitiestesting.NewMockRecord(ctrl) + + clusterUUID := uuid.New() + newFlowAggregator := func(includePodLabels bool) *flowAggregator { + return &flowAggregator{ + aggregatorMode: flowaggregatorconfig.AggregatorModeProxy, + clusterUUID: clusterUUID, + aggregatorTransportProtocol: "tcp", + activeFlowRecordTimeout: testActiveTimeout, + inactiveFlowRecordTimeout: testInactiveTimeout, + ipfixExporter: mockIPFIXExporter, + registry: mockIPFIXRegistry, + flowAggregatorAddress: "", + includePodLabels: includePodLabels, + podStore: mockPodStore, + } + } + + fa := newFlowAggregator(tc.includePodLabels) + mockIPFIXExporter.EXPECT().AddRecord(mockRecord, tc.isIPv6) + + startTime := time.Now().Truncate(time.Second) + + flowTypeIE := ipfixentities.NewUnsigned8InfoElement(ipfixentities.NewInfoElement("flowType", 0, ipfixentities.Unsigned8, ipfixregistry.AntreaEnterpriseID, 0), ipfixregistry.FlowTypeInterNode) + mockRecord.EXPECT().GetInfoElementWithValue("flowType").Return(flowTypeIE, 0, true) + + var sourceAddress, destinationAddress string + var sourceIPv4Address, sourceIPv6Address, destinationIPv4Address, destinationIPv6Address string + if tc.isIPv6 { + sourceAddress = sourceAddressIPv6 + destinationAddress = destinationAddressIPv6 + sourceIPv6Address = sourceAddressIPv6 + destinationIPv6Address = destinationAddressIPv6 + } else { + sourceAddress = sourceAddressIPv4 + destinationAddress = destinationAddressIPv4 + sourceIPv4Address = sourceAddressIPv4 + destinationIPv4Address = destinationAddressIPv4 + } + sourceIPv4AddressIE := ipfixentities.NewIPAddressInfoElement(ipfixentities.NewInfoElement("sourceIPv4Address", 0, ipfixentities.Ipv4Address, ipfixregistry.IANAEnterpriseID, 0), net.ParseIP(sourceIPv4Address)) + mockRecord.EXPECT().GetInfoElementWithValue("sourceIPv4Address").Return(sourceIPv4AddressIE, 0, !tc.isIPv6) + destinationIPv4AddressIE := ipfixentities.NewIPAddressInfoElement(ipfixentities.NewInfoElement("destinationIPv4Address", 0, ipfixentities.Ipv4Address, ipfixregistry.IANAEnterpriseID, 0), net.ParseIP(destinationIPv4Address)) + mockRecord.EXPECT().GetInfoElementWithValue("destinationIPv4Address").Return(destinationIPv4AddressIE, 0, !tc.isIPv6) + sourceIPv6AddressIE := ipfixentities.NewIPAddressInfoElement(ipfixentities.NewInfoElement("sourceIPv6Address", 0, ipfixentities.Ipv6Address, ipfixregistry.IANAEnterpriseID, 0), net.ParseIP(sourceIPv6Address)) + mockRecord.EXPECT().GetInfoElementWithValue("sourceIPv6Address").Return(sourceIPv6AddressIE, 0, tc.isIPv6) + destinationIPv6AddressIE := ipfixentities.NewIPAddressInfoElement(ipfixentities.NewInfoElement("destinationIPv6Address", 0, ipfixentities.Ipv6Address, ipfixregistry.IANAEnterpriseID, 0), net.ParseIP(destinationIPv6Address)) + mockRecord.EXPECT().GetInfoElementWithValue("destinationIPv6Address").Return(destinationIPv6AddressIE, 0, tc.isIPv6) + + flowStartSecondsIE := ipfixentities.NewDateTimeSecondsInfoElement(ipfixentities.NewInfoElement("flowStartSeconds", 150, 14, ipfixregistry.IANAEnterpriseID, 4), uint32(startTime.Unix())) + mockRecord.EXPECT().GetInfoElementWithValue("flowStartSeconds").Return(flowStartSecondsIE, 0, true) + sourcePodNameIE := ipfixentities.NewStringInfoElement(ipfixentities.NewInfoElement("sourcePodName", 0, 0, ipfixregistry.AntreaEnterpriseID, 0), "podA") + mockRecord.EXPECT().GetInfoElementWithValue("sourcePodName").Return(sourcePodNameIE, 0, true).MinTimes(1) + destinationPodNameIE := ipfixentities.NewStringInfoElement(ipfixentities.NewInfoElement("destinationPodName", 0, 0, ipfixregistry.AntreaEnterpriseID, 0), "podB") + mockRecord.EXPECT().GetInfoElementWithValue("destinationPodName").Return(destinationPodNameIE, 0, true).MinTimes(1) + podLabels := "" + if tc.includePodLabels { + podLabels = "{}" + sourcePodNamespaceIE := ipfixentities.NewStringInfoElement(ipfixentities.NewInfoElement("sourcePodNamespace", 0, 0, ipfixregistry.AntreaEnterpriseID, 0), "default") + mockRecord.EXPECT().GetInfoElementWithValue("sourcePodNamespace").Return(sourcePodNamespaceIE, 0, true) + destinationPodNamespaceIE := ipfixentities.NewStringInfoElement(ipfixentities.NewInfoElement("destinationPodNamespace", 0, 0, ipfixregistry.AntreaEnterpriseID, 0), "default") + mockRecord.EXPECT().GetInfoElementWithValue("destinationPodNamespace").Return(destinationPodNamespaceIE, 0, true) + mockPodStore.EXPECT().GetPodByIPAndTime(sourceAddress, startTime).Return(podA, true) + mockPodStore.EXPECT().GetPodByIPAndTime(destinationAddress, startTime).Return(podB, true) + } + sourcePodLabelsElement := ipfixentities.NewInfoElement("sourcePodLabels", 0, ipfixentities.String, ipfixregistry.AntreaEnterpriseID, 0) + mockIPFIXRegistry.EXPECT().GetInfoElement("sourcePodLabels", ipfixregistry.AntreaEnterpriseID).Return(sourcePodLabelsElement, nil) + sourcePodLabelsIE := ipfixentities.NewStringInfoElement(sourcePodLabelsElement, podLabels) + mockRecord.EXPECT().AddInfoElement(sourcePodLabelsIE).Return(nil) + destinationPodLabelsElement := ipfixentities.NewInfoElement("destinationPodLabels", 0, ipfixentities.String, ipfixregistry.AntreaEnterpriseID, 0) + mockIPFIXRegistry.EXPECT().GetInfoElement("destinationPodLabels", ipfixregistry.AntreaEnterpriseID).Return(destinationPodLabelsElement, nil) + destinationPodLabelsIE := ipfixentities.NewStringInfoElement(destinationPodLabelsElement, podLabels) + mockRecord.EXPECT().AddInfoElement(destinationPodLabelsIE).Return(nil) + clusterIDElement := ipfixentities.NewInfoElement("clusterId", 0, ipfixentities.String, ipfixregistry.AntreaEnterpriseID, 0) + mockIPFIXRegistry.EXPECT().GetInfoElement("clusterId", ipfixregistry.AntreaEnterpriseID).Return(clusterIDElement, nil) + clusterIDIE := ipfixentities.NewStringInfoElement(clusterIDElement, clusterUUID.String()) + mockRecord.EXPECT().AddInfoElement(clusterIDIE).Return(nil) + + const obsDomainID = 123 + originalObservationDomainIE := ipfixentities.NewInfoElement("originalObservationDomainId", 0, 0, ipfixregistry.IANAEnterpriseID, 4) + mockIPFIXRegistry.EXPECT().GetInfoElement("originalObservationDomainId", ipfixregistry.IANAEnterpriseID).Return(originalObservationDomainIE, nil) + mockRecord.EXPECT().AddInfoElement(ipfixentities.NewUnsigned32InfoElement(originalObservationDomainIE, obsDomainID)) + + var exporterAddress string + var exporterAddressIPv4, exporterAddressIPv6 net.IP + if tc.isIPv6 { + exporterAddressIPv4 = net.IPv4zero + exporterAddressIPv6 = net.ParseIP(nodeAddressIPv6) + exporterAddress = nodeAddressIPv6 + } else { + exporterAddressIPv4 = net.ParseIP(nodeAddressIPv4) + exporterAddressIPv6 = net.IPv6zero + exporterAddress = nodeAddressIPv4 + } + + originalExporterIPv6AddressIE := ipfixentities.NewInfoElement("originalExporterIPv6Address", 0, 0, ipfixregistry.IANAEnterpriseID, 16) + mockIPFIXRegistry.EXPECT().GetInfoElement("originalExporterIPv6Address", ipfixregistry.IANAEnterpriseID).Return(originalExporterIPv6AddressIE, nil) + mockRecord.EXPECT().AddInfoElement(ipfixentities.NewIPAddressInfoElement(originalExporterIPv6AddressIE, exporterAddressIPv6)) + originalExporterIPv4AddressIE := ipfixentities.NewInfoElement("originalExporterIPv4Address", 0, 0, ipfixregistry.IANAEnterpriseID, 4) + mockIPFIXRegistry.EXPECT().GetInfoElement("originalExporterIPv4Address", ipfixregistry.IANAEnterpriseID).Return(originalExporterIPv4AddressIE, nil) + mockRecord.EXPECT().AddInfoElement(ipfixentities.NewIPAddressInfoElement(originalExporterIPv4AddressIE, exporterAddressIPv4)) + + err := fa.proxyRecord(mockRecord, obsDomainID, exporterAddress) + assert.NoError(t, err, "Error when proxying flow record") + }) + } +} + func TestFlowAggregator_watchConfiguration(t *testing.T) { opt := options.Options{ Config: &flowaggregatorconfig.FlowAggregatorConfig{ @@ -544,6 +706,7 @@ func TestFlowAggregator_Run(t *testing.T) { } flowAggregator := &flowAggregator{ + aggregatorMode: flowaggregatorconfig.AggregatorModeAggregate, // must be large enough to avoid a call to ForAllExpiredFlowRecordsDo activeFlowRecordTimeout: 1 * time.Hour, logTickerDuration: 1 * time.Hour, @@ -860,11 +1023,11 @@ func TestFlowAggregator_InitAggregationProcess(t *testing.T) { inactiveFlowRecordTimeout: testInactiveTimeout, aggregatorTransportProtocol: flowaggregatorconfig.AggregatorTransportProtocolTCP, registry: ipfix.NewIPFIXRegistry(), + preprocessorOutCh: make(chan *ipfixentities.Message), } require.NoError(t, fa.InitCollectingProcess()) - recordCh := make(chan ipfixentities.Record) - require.NoError(t, fa.InitPreprocessor(recordCh)) - require.NoError(t, fa.InitAggregationProcess(recordCh)) + require.NoError(t, fa.InitPreprocessor()) + require.NoError(t, fa.InitAggregationProcess()) } func TestFlowAggregator_fillK8sMetadata(t *testing.T) { @@ -916,10 +1079,8 @@ func TestFlowAggregator_fillK8sMetadata(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) - ipv4Key := ipfixintermediate.FlowKey{ - SourceAddress: "192.168.1.2", - DestinationAddress: "192.168.1.3", - } + sourceAdress := "192.168.1.2" + destinationAddress := "192.168.1.3" fa := &flowAggregator{ podStore: mockPodStore, @@ -934,7 +1095,7 @@ func TestFlowAggregator_fillK8sMetadata(t *testing.T) { mockPodStore.EXPECT().GetPodByIPAndTime("192.168.1.2", gomock.Any()).Return(srcPod, true) mockPodStore.EXPECT().GetPodByIPAndTime("192.168.1.3", gomock.Any()).Return(dstPod, true) - fa.fillK8sMetadata(ipv4Key, mockRecord, time.Now()) + fa.fillK8sMetadata(sourceAdress, destinationAddress, mockRecord, time.Now()) } func TestNewFlowAggregator(t *testing.T) { diff --git a/pkg/flowaggregator/options/options.go b/pkg/flowaggregator/options/options.go index ef7fa0abfe4..7d5b70e0306 100644 --- a/pkg/flowaggregator/options/options.go +++ b/pkg/flowaggregator/options/options.go @@ -27,6 +27,8 @@ import ( type Options struct { // The configuration object Config *flowaggregatorconfig.FlowAggregatorConfig + // Mode is the mode in which to run the flow aggregator (with aggregation or just as a proxy) + AggregatorMode flowaggregatorconfig.AggregatorMode // Expiration timeout for active flow records in the flow aggregator ActiveFlowRecordTimeout time.Duration // Expiration timeout for inactive flow records in the flow aggregator @@ -51,7 +53,7 @@ func LoadConfig(configBytes []byte) (*Options, error) { return nil, fmt.Errorf("failed to unmarshal FlowAggregator config from ConfigMap: %v", err) } flowaggregatorconfig.SetConfigDefaults(opt.Config) - // validate all the required options. + // Validate all the required options. if opt.Config.FlowCollector.Enable && opt.Config.FlowCollector.Address == "" { return nil, fmt.Errorf("external flow collector enabled without providing address") } @@ -59,9 +61,18 @@ func LoadConfig(configBytes []byte) (*Options, error) { return nil, fmt.Errorf("s3Uploader enabled without specifying bucket name") } if !opt.Config.FlowCollector.Enable && !opt.Config.ClickHouse.Enable && !opt.Config.S3Uploader.Enable && !opt.Config.FlowLogger.Enable { - return nil, fmt.Errorf("external flow collector or ClickHouse or S3Uploader should be configured") + return nil, fmt.Errorf("at least one collector / sink should be configured") } // Validate common parameters + if opt.Config.Mode != flowaggregatorconfig.AggregatorModeAggregate && opt.Config.Mode != flowaggregatorconfig.AggregatorModeProxy { + return nil, fmt.Errorf("unsupported FlowAggregator mode %s", opt.Config.Mode) + } + opt.AggregatorMode = opt.Config.Mode + if opt.AggregatorMode == flowaggregatorconfig.AggregatorModeProxy { + if opt.Config.ClickHouse.Enable || opt.Config.S3Uploader.Enable || opt.Config.FlowLogger.Enable { + return nil, fmt.Errorf("only flow collector is supported in Proxy mode") + } + } var err error opt.ActiveFlowRecordTimeout, err = time.ParseDuration(opt.Config.ActiveFlowRecordTimeout) if err != nil { diff --git a/pkg/flowaggregator/preprocessor.go b/pkg/flowaggregator/preprocessor.go index aee9c31b3a8..0b1c386f434 100644 --- a/pkg/flowaggregator/preprocessor.go +++ b/pkg/flowaggregator/preprocessor.go @@ -23,17 +23,17 @@ import ( ) // preprocessor is in charge of processing messages received from the IPFIX collector, prior to -// handling records over to the aggregation process. At the moment, its only task is to ensure that -// all records have the expected fields. If a record has extra fields, they will be discarded. If -// some fields are missing, they will be "appended" to the record with a "zero" value. For example, -// we will use 0 for integral types, "" for strings, 0.0.0.0 for IPv4 address, etc. Note that we are -// able to keep the implementation simple by assuming that a record either has missing fields or -// extra fields (not a combination of both), and that such fields are always at the tail of the -// field list. This assumption is based on implementation knowledge of the FlowExporter and the -// FlowAggregator. +// proxying them to another collector of handing them over to the aggregation process. At the +// moment, its only task is to ensure that all records have the expected fields. If a record has +// extra fields, they will be discarded. If some fields are missing, they will be "appended" to the +// record with a "zero" value. For example, we will use 0 for integral types, "" for strings, +// 0.0.0.0 for IPv4 address, etc. Note that we are able to keep the implementation simple by +// assuming that a record either has missing fields or extra fields (not a combination of both), and +// that such fields are always at the tail of the field list. This assumption is based on +// implementation knowledge of the FlowExporter and the FlowAggregator. type preprocessor struct { inCh <-chan *entities.Message - outCh chan<- entities.Record + outCh chan<- *entities.Message expectedElementsV4 int expectedElementsV6 int @@ -100,7 +100,7 @@ func makeDefaultElementsWithValue(infoElements []*entities.InfoElement) ([]entit return elementsWithValue, nil } -func newPreprocessor(infoElementsV4, infoElementsV6 []*entities.InfoElement, inCh <-chan *entities.Message, outCh chan<- entities.Record) (*preprocessor, error) { +func newPreprocessor(infoElementsV4, infoElementsV6 []*entities.InfoElement, inCh <-chan *entities.Message, outCh chan<- *entities.Message) (*preprocessor, error) { defaultElementsWithValueV4, err := makeDefaultElementsWithValue(infoElementsV4) if err != nil { return nil, fmt.Errorf("error when generating default values for IPv4 Information Elements expected from exporter: %w", err) @@ -144,24 +144,39 @@ func (p *preprocessor) processMsg(msg *entities.Message) { return } records := set.GetRecords() + if len(records) == 0 { + return + } + // All the records in the data set must match a given template, so we only need to look at + // the first one to decide how to proceed. + firstRecord := records[0] + elementList := firstRecord.GetOrderedElementList() + numElements := len(elementList) + isIPv4 := isRecordIPv4(firstRecord) + expectedElements := p.expectedElementsV4 + if !isIPv4 { + expectedElements = p.expectedElementsV6 + } + // Fast path: everything matches so we can just forward the message as is. + if numElements == expectedElements { + p.outCh <- msg + return + } + newSet := entities.NewSet(true) + // Set templateID to 0: the set records will not match the template any more. + if err := newSet.PrepareSet(entities.Data, 0); err != nil { + klog.ErrorS(err, "Failed to prepare modified set") + return + } for _, record := range records { elementList := record.GetOrderedElementList() - numElements := len(elementList) - isIPv4 := isRecordIPv4(record) - expectedElements := p.expectedElementsV4 - if !isIPv4 { - expectedElements = p.expectedElementsV6 - } - if numElements == expectedElements { - p.outCh <- record - } else if numElements > expectedElements { + if numElements > expectedElements { if klog.V(5).Enabled() { klog.InfoS("Record received from exporter includes unexpected elements, truncating", "expectedElements", expectedElements, "receivedElements", numElements) } // Creating a new Record seems like the best option here. By using - // NewDataRecordFromElements, we should minimize the number of allocations - // required. - p.outCh <- entities.NewDataRecordFromElements(0, elementList[:expectedElements], true) + // AddRecordV2, we should minimize the number of allocations required. + newSet.AddRecordV2(elementList[:expectedElements], 0) } else { if klog.V(5).Enabled() { klog.InfoS("Record received from exporter is missing information elements, adding fields with zero values", "expectedElements", expectedElements, "receivedElements", numElements) @@ -171,8 +186,11 @@ func (p *preprocessor) processMsg(msg *entities.Message) { } else { elementList = append(elementList, p.defaultElementsWithValueV6[numElements:]...) } - p.outCh <- entities.NewDataRecordFromElements(0, elementList, true) + newSet.AddRecordV2(elementList, 0) } } - + // This will overwrite the existing set with the new one. + // Note that the message length will no longer be correct, but this should not matter. + msg.AddSet(newSet) + p.outCh <- msg } diff --git a/pkg/flowaggregator/preprocessor_test.go b/pkg/flowaggregator/preprocessor_test.go index 100cc92883d..95482a92c68 100644 --- a/pkg/flowaggregator/preprocessor_test.go +++ b/pkg/flowaggregator/preprocessor_test.go @@ -81,14 +81,19 @@ func TestPreprocessorProcessMsg(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - // Buffered channel with capacity 1 to hold the output record generated by processMsg. - outCh := make(chan ipfixentities.Record, 1) + // Buffered channel with capacity 1 to hold the output message generated by processMsg. + outCh := make(chan *ipfixentities.Message, 1) p, err := newPreprocessor(iesIPv4[:tc.templateElementsCount], iesIPv6[:tc.templateElementsCount], nil, outCh) require.NoError(t, err) p.processMsg(tc.msg) var r ipfixentities.Record select { - case r = <-outCh: + case m := <-outCh: + set := m.GetSet() + require.NotNil(t, set) + records := set.GetRecords() + require.Len(t, records, 1) + r = records[0] default: } if tc.expectedIEsWithValue == nil { diff --git a/test/e2e/flowaggregator_test.go b/test/e2e/flowaggregator_test.go index fa4f50c469b..135a5218c68 100644 --- a/test/e2e/flowaggregator_test.go +++ b/test/e2e/flowaggregator_test.go @@ -40,6 +40,7 @@ import ( "antrea.io/antrea/pkg/antctl" "antrea.io/antrea/pkg/antctl/runtime" secv1beta1 "antrea.io/antrea/pkg/apis/crd/v1beta1" + flowaggregatorconfig "antrea.io/antrea/pkg/config/flowaggregator" "antrea.io/antrea/pkg/features" "antrea.io/antrea/pkg/flowaggregator/apis" "antrea.io/antrea/test/e2e/utils" @@ -166,6 +167,10 @@ var ( // We use a global variable for this to avoid having to pass it down to all helper functions. // It will be initialized the first time setupFlowAggregatorTest is called. antreaClusterUUID = "" + + // In the ToExternalFlows test, flow record will arrive 5.5s (exporterActiveFlowExportTimeout+aggregatorActiveFlowRecordTimeout) after executing wget command + // We set the timeout to 9s (5.5s plus one more aggregatorActiveFlowRecordTimeout) to make the ToExternalFlows test more stable + getCollectorOutputDefaultTimeout = exporterActiveFlowExportTimeout + 2*aggregatorActiveFlowRecordTimeout ) type testFlow struct { @@ -273,7 +278,7 @@ func TestFlowAggregator(t *testing.T) { data, v4Enabled, v6Enabled := setupFlowAggregatorTest(t, flowVisibilityTestOptions{ databaseURL: defaultCHDatabaseURL, }) - if err := getAndCheckFlowAggregatorMetrics(t, data); err != nil { + if err := getAndCheckFlowAggregatorMetrics(t, data, true); err != nil { t.Fatalf("Error when checking metrics of Flow Aggregator: %v", err) } @@ -303,6 +308,66 @@ func TestFlowAggregator(t *testing.T) { } +func TestFlowAggregatorProxyMode(t *testing.T) { + skipIfNotFlowVisibilityTest(t) + skipIfHasWindowsNodes(t) + + var err error + data, v4Enabled, v6Enabled := setupFlowAggregatorTest(t, flowVisibilityTestOptions{ + mode: flowaggregatorconfig.AggregatorModeProxy, + }) + require.NoError(t, getAndCheckFlowAggregatorMetrics(t, data, false), "Error when checking metrics of Flow Aggregator") + + k8sUtils, err = NewKubernetesUtils(data) + require.NoError(t, err, "Error when creating Kubernetes utils client") + + podAIPs, podBIPs, _, _, _, err = createPerftestPods(data) + require.NoError(t, err, "Error when creating perftest Pods") + + if v4Enabled { + t.Run("IPv4", func(t *testing.T) { testHelperProxyMode(t, data, false) }) + } + + if v6Enabled { + t.Run("IPv6", func(t *testing.T) { testHelperProxyMode(t, data, true) }) + } + +} + +func testHelperProxyMode(t *testing.T, data *TestData, isIPv6 bool) { + label := "Proxy-IntraNodeFlows" + addLabelToTestPods(t, data, label, []string{"perftest-a", "perftest-b"}) + + var srcIP, dstIP string + var cmd []string + if !isIPv6 { + srcIP = podAIPs.IPv4.String() + dstIP = podBIPs.IPv4.String() + cmd = []string{"iperf3", "-c", dstIP, "-t", "5"} + } else { + srcIP = podAIPs.IPv6.String() + dstIP = podBIPs.IPv6.String() + cmd = []string{"iperf3", "-6", "-c", dstIP, "-t", "5"} + } + stdout, _, err := data.RunCommandFromPod(data.testNamespace, "perftest-a", "iperf", cmd) + require.NoError(t, err, "Error when running iperf3 client") + _, srcPort, _ := getBandwidthAndPorts(stdout) + + // should be larger than exporterActiveFlowExportTimeout (+ safety margin). + const timeout = 10 * time.Second + records := getCollectorOutput(t, srcIP, dstIP, srcPort, false /* isDstService */, true /* lookForFlowEnd */, isIPv6, data, label /* labelFilter */, timeout) + require.NotEmpty(t, records) + record := records[len(records)-1] + assert.Contains(t, record, fmt.Sprintf("sourcePodNamespace: %s", data.testNamespace), "Record does not have correct sourcePodNamespace") + assert.Contains(t, record, fmt.Sprintf("destinationPodNamespace: %s", data.testNamespace), "Record does not have correct destinationPodNamespace") + assert.Contains(t, record, fmt.Sprintf("sourcePodName: %s", "perftest-a"), "Record does not have correct sourcePodName") + assert.Contains(t, record, fmt.Sprintf("destinationPodName: %s", "perftest-b"), "Record does not have correct destinationPodName") + assert.Contains(t, record, fmt.Sprintf("clusterId: %s", antreaClusterUUID), "Record does not have the correct clusterId") + assert.Contains(t, record, "originalObservationDomainId", "Record does not have originalObservationDomainId") + assert.Contains(t, record, "originalExporterIPv4Address", "Record does not have originalExporterIPv4Address") + assert.Contains(t, record, "originalExporterIPv6Address", "Record does not have originalExporterIPv6Address") +} + func checkIntraNodeFlows(t *testing.T, data *TestData, podAIPs, podBIPs *PodIPs, isIPv6 bool, labelFilter string) { np1, np2 := deployK8sNetworkPolicies(t, data, "perftest-a", "perftest-b") defer func() { @@ -1015,7 +1080,7 @@ func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP stri } func checkRecordsForFlowsCollector(t *testing.T, data *TestData, srcIP, dstIP, srcPort string, isIPv6, isIntraNode, checkService, checkK8sNetworkPolicy, checkAntreaNetworkPolicy bool, bandwidthInMbps float64, labelFilter string) { - records := getCollectorOutput(t, srcIP, dstIP, srcPort, checkService, true, isIPv6, data, labelFilter) + records := getCollectorOutput(t, srcIP, dstIP, srcPort, checkService, true, isIPv6, data, labelFilter, getCollectorOutputDefaultTimeout) // Checking only data records as data records cannot be decoded without template // record. assert.GreaterOrEqualf(t, len(records), expectedNumDataRecords, "IPFIX collector should receive expected number of flow records, filtered records: %v", records) @@ -1164,7 +1229,7 @@ func checkRecordsForToExternalFlows(t *testing.T, data *TestData, srcNodeName st } stdout, stderr, err := data.RunCommandFromPod(data.testNamespace, srcPodName, toolboxContainerName, strings.Fields(cmd)) require.NoErrorf(t, err, "Error when running wget command, stdout: %s, stderr: %s", stdout, stderr) - records := getCollectorOutput(t, srcIP, dstIP, "", false, false, isIPv6, data, labelFilter) + records := getCollectorOutput(t, srcIP, dstIP, "", false, false, isIPv6, data, labelFilter, getCollectorOutputDefaultTimeout) for _, record := range records { checkPodAndNodeData(t, record, srcPodName, srcNodeName, "", "", data.testNamespace) checkFlowType(t, record, ipfixregistry.FlowTypeToExternal) @@ -1213,8 +1278,8 @@ func checkRecordsForDenyFlows(t *testing.T, data *TestData, testFlow1, testFlow2 } func checkRecordsForDenyFlowsCollector(t *testing.T, data *TestData, testFlow1, testFlow2 testFlow, isIPv6, isIntraNode, isANP bool, labelFilter string) { - records1 := getCollectorOutput(t, testFlow1.srcIP, testFlow1.dstIP, "", false, false, isIPv6, data, labelFilter) - records2 := getCollectorOutput(t, testFlow2.srcIP, testFlow2.dstIP, "", false, false, isIPv6, data, labelFilter) + records1 := getCollectorOutput(t, testFlow1.srcIP, testFlow1.dstIP, "", false, false, isIPv6, data, labelFilter, getCollectorOutputDefaultTimeout) + records2 := getCollectorOutput(t, testFlow2.srcIP, testFlow2.dstIP, "", false, false, isIPv6, data, labelFilter, getCollectorOutputDefaultTimeout) records := append(records1, records2...) src_flow1, dst_flow1 := matchSrcAndDstAddress(testFlow1.srcIP, testFlow1.dstIP, false, isIPv6) src_flow2, dst_flow2 := matchSrcAndDstAddress(testFlow2.srcIP, testFlow2.dstIP, false, isIPv6) @@ -1419,7 +1484,7 @@ func checkL7FlowExporterDataClickHouse(t *testing.T, record *ClickHouseFullRow, assert.NotEmpty(t, record.HttpVals, "Record does not have httpVals") } -func getUint64FieldFromRecord(t *testing.T, record string, field string) uint64 { +func getUint64FieldFromRecord(t require.TestingT, record string, field string) uint64 { if strings.Contains(record, "TEMPLATE SET") { return 0 } @@ -1439,11 +1504,9 @@ func getUint64FieldFromRecord(t *testing.T, record string, field string) uint64 // received all the expected records for a given flow with source IP, destination IP // and source port. We send source port to ignore the control flows during the // iperf test. -func getCollectorOutput(t *testing.T, srcIP, dstIP, srcPort string, isDstService bool, lookForFlowEnd bool, isIPv6 bool, data *TestData, labelFilter string) []string { +func getCollectorOutput(t require.TestingT, srcIP, dstIP, srcPort string, isDstService bool, lookForFlowEnd bool, isIPv6 bool, data *TestData, labelFilter string, timeout time.Duration) []string { var allRecords, records []string - // In the ToExternalFlows test, flow record will arrive 5.5s (exporterActiveFlowExportTimeout+aggregatorActiveFlowRecordTimeout) after executing wget command - // We set the timeout to 9s (5.5s plus one more aggregatorActiveFlowRecordTimeout) to make the ToExternalFlows test more stable - err := wait.PollUntilContextTimeout(context.Background(), 500*time.Millisecond, exporterActiveFlowExportTimeout+aggregatorActiveFlowRecordTimeout*2, true, func(ctx context.Context) (bool, error) { + err := wait.PollUntilContextTimeout(context.Background(), 500*time.Millisecond, timeout, true, func(ctx context.Context) (bool, error) { var rc int var err error var cmd string @@ -1484,8 +1547,8 @@ func getCollectorOutput(t *testing.T, srcIP, dstIP, srcPort string, isDstService if err == context.DeadlineExceeded { const numRecordsToPrint = 20 fmt.Printf("Last %d records received by IPFIX collector:\n", numRecordsToPrint) - for i := 0; i < len(records) && i < numRecordsToPrint; i++ { - fmt.Println(records[i]) + for i := 0; i < len(allRecords) && i < numRecordsToPrint; i++ { + fmt.Println(allRecords[i]) } } require.NoErrorf(t, err, "IPFIX collector did not receive the expected records, source IP: %s, dest IP: %s, source port: %s, total records count: %d, filtered records count: %d", srcIP, dstIP, srcPort, len(allRecords), len(records)) @@ -1873,7 +1936,7 @@ func createToExternalTestServer(t *testing.T, data *TestData) *PodIPs { return serverIPs } -func getAndCheckFlowAggregatorMetrics(t *testing.T, data *TestData) error { +func getAndCheckFlowAggregatorMetrics(t *testing.T, data *TestData, withClickHouseExporter bool) error { flowAggPod, err := data.getFlowAggregator() if err != nil { return fmt.Errorf("error when getting flow-aggregator Pod: %w", err) @@ -1890,7 +1953,7 @@ func getAndCheckFlowAggregatorMetrics(t *testing.T, data *TestData) error { if err := json.Unmarshal([]byte(stdout), metrics); err != nil { return false, fmt.Errorf("error when decoding recordmetrics: %w", err) } - if metrics.NumConnToCollector != int64(clusterInfo.numNodes) || !metrics.WithClickHouseExporter || !metrics.WithIPFIXExporter || metrics.NumRecordsExported == 0 { + if metrics.NumConnToCollector != int64(clusterInfo.numNodes) || (withClickHouseExporter != metrics.WithClickHouseExporter) || !metrics.WithIPFIXExporter || metrics.NumRecordsExported == 0 { t.Logf("Metrics are not correct. Current metrics: NumConnToCollector=%d, ClickHouseExporter=%v, IPFIXExporter=%v, NumRecordsExported=%d", metrics.NumConnToCollector, metrics.WithClickHouseExporter, metrics.WithIPFIXExporter, metrics.NumRecordsExported) return false, nil } @@ -1931,7 +1994,7 @@ func testL7FlowExporterController(t *testing.T, data *TestData, isIPv6 bool) { cmd := []string{"curl", getHTTPURLFromIPPort(testFlow1.dstIP, serverPodPort)} stdout, stderr, err := data.RunCommandFromPod(data.testNamespace, testFlow1.srcPodName, "l7flowexporter", cmd) require.NoErrorf(t, err, "Error when running curl command, stdout: %s, stderr: %s", stdout, stderr) - records := getCollectorOutput(t, testFlow1.srcIP, testFlow1.dstIP, "", false, true, isIPv6, data, "") + records := getCollectorOutput(t, testFlow1.srcIP, testFlow1.dstIP, "", false, true, isIPv6, data, "", getCollectorOutputDefaultTimeout) for _, record := range records { assert := assert.New(t) assert.Contains(record, testFlow1.srcPodName, "Record with srcIP does not have Pod name: %s", testFlow1.srcPodName) diff --git a/test/e2e/framework.go b/test/e2e/framework.go index 61456335450..d18f65c482a 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -233,6 +233,7 @@ type TestOptions struct { } type flowVisibilityTestOptions struct { + mode flowaggregatorconfig.AggregatorMode databaseURL string secureConnection bool } @@ -1020,7 +1021,6 @@ func (data *TestData) deleteClickHouseOperator() error { // deployFlowAggregator deploys the Flow Aggregator with ipfix collector and clickHouse address. func (data *TestData) deployFlowAggregator(ipfixCollector string, o flowVisibilityTestOptions) error { - flowAggYaml := flowAggregatorYML if testOptions.enableCoverage { flowAggYaml = flowAggregatorCovYML @@ -1072,6 +1072,10 @@ func (data *TestData) deployFlowAggregator(ipfixCollector string, o flowVisibili } func (data *TestData) mutateFlowAggregatorConfigMap(ipfixCollectorAddr string, o flowVisibilityTestOptions) error { + if o.mode == flowaggregatorconfig.AggregatorModeProxy && o.databaseURL != "" { + return fmt.Errorf("cannot use Proxy mode with ClickHouse") + } + configMap, err := data.GetFlowAggregatorConfigMap() if err != nil { return err @@ -1082,21 +1086,29 @@ func (data *TestData) mutateFlowAggregatorConfigMap(ipfixCollectorAddr string, o return fmt.Errorf("failed to unmarshal FlowAggregator config from ConfigMap: %v", err) } + flowAggregatorConf.Mode = o.mode flowAggregatorConf.FlowCollector = flowaggregatorconfig.FlowCollectorConfig{ Enable: true, Address: ipfixCollectorAddr, } - flowAggregatorConf.ClickHouse = flowaggregatorconfig.ClickHouseConfig{ - Enable: true, - CommitInterval: aggregatorClickHouseCommitInterval.String(), + if o.databaseURL != "" { + flowAggregatorConf.ClickHouse = flowaggregatorconfig.ClickHouseConfig{ + Enable: true, + CommitInterval: aggregatorClickHouseCommitInterval.String(), + DatabaseURL: o.databaseURL, + TLS: flowaggregatorconfig.TLSConfig{ + CACert: o.secureConnection, + }, + } + + } else { + flowAggregatorConf.ClickHouse = flowaggregatorconfig.ClickHouseConfig{ + Enable: false, + } } flowAggregatorConf.ActiveFlowRecordTimeout = aggregatorActiveFlowRecordTimeout.String() flowAggregatorConf.InactiveFlowRecordTimeout = aggregatorInactiveFlowRecordTimeout.String() flowAggregatorConf.RecordContents.PodLabels = true - flowAggregatorConf.ClickHouse.DatabaseURL = o.databaseURL - if o.secureConnection { - flowAggregatorConf.ClickHouse.TLS.CACert = true - } b, err := yaml.Marshal(&flowAggregatorConf) if err != nil {