Skip to content

Commit

Permalink
Add Proxy mode for FlowAggregator
Browse files Browse the repository at this point in the history
In Proxy mode, connection data is not buffered in the FlowAggregator and
records from source and destination Nodes are not correlated. Instead,
records are proxied directly to the destination collector. In Proxy
mode, only the IPFIX exporter is supported.

A simple e2e test is added to test this new mode.

Fixes antrea-io#6773

Signed-off-by: Antonin Bas <[email protected]>
  • Loading branch information
antoninbas committed Jan 14, 2025
1 parent 5ee28ec commit 8166339
Show file tree
Hide file tree
Showing 14 changed files with 683 additions and 168 deletions.
5 changes: 5 additions & 0 deletions build/charts/flow-aggregator/conf/flow-aggregator.conf
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# Mode in which to run the flow aggregator. Must be one of "Aggregate" (flow records received
# from source and destination are aggregated and sent as one flow record) or "Proxy" (flow records
# are enhanced with some additional information, then sent directly without buffering or aggregation).
mode: {{ .Values.mode }}

# Provide the active flow record timeout as a duration string. This determines
# how often the flow aggregator exports the active flow records to the flow
# collector. Thus, for flows with a continuous stream of packets, a flow record
Expand Down
4 changes: 4 additions & 0 deletions build/charts/flow-aggregator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ image:
pullPolicy: "IfNotPresent"
tag: ""

# -- Mode in which to run the flow aggregator. Must be one of "Aggregate" (flow records received
# from source and destination are aggregated and sent as one flow record) or "Proxy" (flow records
# are enhanced with some additional information, then sent directly without buffering or aggregation).
mode: "Aggregate"
# -- Provide the active flow record timeout as a duration string.
# Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
activeFlowRecordTimeout: 60s
Expand Down
5 changes: 5 additions & 0 deletions build/yamls/flow-aggregator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ subjects:
apiVersion: v1
data:
flow-aggregator.conf: |
# Mode in which to run the flow aggregator. Must be one of "Aggregate" (flow records received
# from source and destination are aggregated and sent as one flow record) or "Proxy" (flow records
# are enhanced with some additional information, then sent directly without buffering or aggregation).
mode: Aggregate
# Provide the active flow record timeout as a duration string. This determines
# how often the flow aggregator exports the active flow records to the flow
# collector. Thus, for flows with a continuous stream of packets, a flow record
Expand Down
8 changes: 8 additions & 0 deletions pkg/config/flowaggregator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,15 @@ const (
AggregatorTransportProtocolUDP AggregatorTransportProtocol = "UDP"
)

type AggregatorMode string

const (
AggregatorModeAggregate AggregatorMode = "Aggregate"
AggregatorModeProxy AggregatorMode = "Proxy"
)

type FlowAggregatorConfig struct {
Mode AggregatorMode `yaml:"mode,omitempty"`
// Provide the active flow record timeout as a duration string. This determines
// how often the flow aggregator exports the active flow records to the flow
// collector. Thus, for flows with a continuous stream of packets, a flow record
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/flowaggregator/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ const (
)

func SetConfigDefaults(flowAggregatorConf *FlowAggregatorConfig) {
if flowAggregatorConf.Mode == "" {
flowAggregatorConf.Mode = AggregatorModeAggregate
}
if flowAggregatorConf.ActiveFlowRecordTimeout == "" {
flowAggregatorConf.ActiveFlowRecordTimeout = DefaultActiveFlowRecordTimeout
}
Expand Down
97 changes: 59 additions & 38 deletions pkg/flowaggregator/exporter/ipfix.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type IPFIXExporter struct {
externalFlowCollectorProto string
exportingProcess ipfix.IPFIXExportingProcess
sendJSONRecord bool
aggregatorMode flowaggregatorconfig.AggregatorMode
observationDomainID uint32
templateRefreshTimeout time.Duration
templateIDv4 uint16
Expand Down Expand Up @@ -88,6 +89,7 @@ func NewIPFIXExporter(
externalFlowCollectorAddr: opt.ExternalFlowCollectorAddr,
externalFlowCollectorProto: opt.ExternalFlowCollectorProto,
sendJSONRecord: sendJSONRecord,
aggregatorMode: opt.AggregatorMode,
observationDomainID: observationDomainID,
templateRefreshTimeout: opt.TemplateRefreshTimeout,
registry: registry,
Expand Down Expand Up @@ -274,66 +276,85 @@ func (e *IPFIXExporter) sendTemplateSet(isIPv6 bool) (int, error) {
}
elements = append(elements, ie)
}
// The order of source and destination stats elements needs to match the order specified in
// addFieldsForStatsAggregation method in go-ipfix aggregation process.
for i := range infoelements.StatsElementList {
// Add Antrea source stats fields
ieName := infoelements.AntreaSourceStatsElementList[i]
ie, err := e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID)
if err != nil {
return 0, err
if e.aggregatorMode == flowaggregatorconfig.AggregatorModeAggregate {
// The order of source and destination stats elements needs to match the order specified in
// addFieldsForStatsAggregation method in go-ipfix aggregation process.
for i := range infoelements.StatsElementList {
// Add Antrea source stats fields
ieName := infoelements.AntreaSourceStatsElementList[i]
ie, err := e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID)
if err != nil {
return 0, err
}
elements = append(elements, ie)
// Add Antrea destination stats fields
ieName = infoelements.AntreaDestinationStatsElementList[i]
ie, err = e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID)
if err != nil {
return 0, err
}
elements = append(elements, ie)
}
elements = append(elements, ie)
// Add Antrea destination stats fields
ieName = infoelements.AntreaDestinationStatsElementList[i]
ie, err = e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID)
if err != nil {
return 0, err
for _, ie := range infoelements.AntreaFlowEndSecondsElementList {
ie, err := e.createInfoElementForTemplateSet(ie, ipfixregistry.AntreaEnterpriseID)
if err != nil {
return 0, err
}
elements = append(elements, ie)
}
for i := range infoelements.AntreaThroughputElementList {
// Add common throughput fields
ieName := infoelements.AntreaThroughputElementList[i]
ie, err := e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID)
if err != nil {
return 0, err
}
elements = append(elements, ie)
// Add source node specific throughput fields
ieName = infoelements.AntreaSourceThroughputElementList[i]
ie, err = e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID)
if err != nil {
return 0, err
}
elements = append(elements, ie)
// Add destination node specific throughput fields
ieName = infoelements.AntreaDestinationThroughputElementList[i]
ie, err = e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID)
if err != nil {
return 0, err
}
elements = append(elements, ie)
}
elements = append(elements, ie)
}
for _, ie := range infoelements.AntreaFlowEndSecondsElementList {
for _, ie := range infoelements.AntreaLabelsElementList {
ie, err := e.createInfoElementForTemplateSet(ie, ipfixregistry.AntreaEnterpriseID)
if err != nil {
return 0, err
}
elements = append(elements, ie)
}
for i := range infoelements.AntreaThroughputElementList {
// Add common throughput fields
ieName := infoelements.AntreaThroughputElementList[i]
ie, err := e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID)
if err != nil {
return 0, err
}
elements = append(elements, ie)
// Add source node specific throughput fields
ieName = infoelements.AntreaSourceThroughputElementList[i]
ie, err = e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID)
ie, err := e.createInfoElementForTemplateSet("clusterId", ipfixregistry.AntreaEnterpriseID)
if err != nil {
return 0, err
}
elements = append(elements, ie)
if e.aggregatorMode == flowaggregatorconfig.AggregatorModeProxy {
ie, err := e.createInfoElementForTemplateSet("originalObservationDomainId", ipfixregistry.IANAEnterpriseID)
if err != nil {
return 0, err
}
elements = append(elements, ie)
// Add destination node specific throughput fields
ieName = infoelements.AntreaDestinationThroughputElementList[i]
ie, err = e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID)
ie, err = e.createInfoElementForTemplateSet("originalExporterIPv4Address", ipfixregistry.IANAEnterpriseID)
if err != nil {
return 0, err
}
elements = append(elements, ie)
}
for _, ie := range infoelements.AntreaLabelsElementList {
ie, err := e.createInfoElementForTemplateSet(ie, ipfixregistry.AntreaEnterpriseID)
ie, err = e.createInfoElementForTemplateSet("originalExporterIPv6Address", ipfixregistry.IANAEnterpriseID)
if err != nil {
return 0, err
}
elements = append(elements, ie)
}
ie, err := e.createInfoElementForTemplateSet("clusterId", ipfixregistry.AntreaEnterpriseID)
if err != nil {
return 0, err
}
elements = append(elements, ie)
e.set.ResetSet()
if err := e.set.PrepareSet(ipfixentities.Template, templateID); err != nil {
return 0, err
Expand Down
25 changes: 18 additions & 7 deletions pkg/flowaggregator/exporter/ipfix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func TestIPFIXExporter_sendTemplateSet(t *testing.T) {
templateIDv6: testTemplateIDv6,
registry: mockIPFIXRegistry,
set: mockTempSet,
aggregatorMode: flowaggregatorconfig.AggregatorModeAggregate,
observationDomainID: testObservationDomainID,
}
elemList := createElementList(isIPv6, mockIPFIXRegistry)
Expand Down Expand Up @@ -118,13 +119,14 @@ func TestIPFIXExporter_UpdateOptions(t *testing.T) {
RecordFormat: "IPFIX",
},
}
ipfixExporter := IPFIXExporter{
ipfixExporter := &IPFIXExporter{
config: config.FlowCollector,
externalFlowCollectorAddr: "",
externalFlowCollectorProto: "",
templateIDv4: testTemplateIDv4,
templateIDv6: testTemplateIDv6,
set: mockSet,
aggregatorMode: flowaggregatorconfig.AggregatorModeAggregate,
observationDomainID: testObservationDomainID,
}
testTemplateID := testTemplateIDv4
Expand Down Expand Up @@ -182,12 +184,13 @@ func TestIPFIXExporter_AddRecord(t *testing.T) {
initIPFIXExportingProcess = initIPFIXExportingProcessSaved
}()

ipfixExporter := IPFIXExporter{
ipfixExporter := &IPFIXExporter{
externalFlowCollectorAddr: "",
externalFlowCollectorProto: "",
templateIDv4: testTemplateIDv4,
templateIDv6: testTemplateIDv6,
set: mockSet,
aggregatorMode: flowaggregatorconfig.AggregatorModeAggregate,
observationDomainID: testObservationDomainID,
}
testTemplateID := testTemplateIDv4
Expand Down Expand Up @@ -216,9 +219,10 @@ func TestIPFIXExporter_initIPFIXExportingProcess_Error(t *testing.T) {
initIPFIXExportingProcess = initIPFIXExportingProcessSaved
}()

ipfixExporter := IPFIXExporter{
ipfixExporter := &IPFIXExporter{
externalFlowCollectorAddr: "",
externalFlowCollectorProto: "",
aggregatorMode: flowaggregatorconfig.AggregatorModeAggregate,
}

assert.Error(t, ipfixExporter.AddRecord(mockRecord, false))
Expand All @@ -231,13 +235,14 @@ func TestIPFIXExporter_sendRecord_Error(t *testing.T) {
mockSet := ipfixentitiestesting.NewMockSet(ctrl)
mockRecord := ipfixentitiestesting.NewMockRecord(ctrl)

ipfixExporter := IPFIXExporter{
ipfixExporter := &IPFIXExporter{
externalFlowCollectorAddr: "",
externalFlowCollectorProto: "",
exportingProcess: mockIPFIXExpProc,
templateIDv4: testTemplateIDv4,
templateIDv6: testTemplateIDv6,
set: mockSet,
aggregatorMode: flowaggregatorconfig.AggregatorModeAggregate,
observationDomainID: testObservationDomainID,
}
testTemplateID := testTemplateIDv4
Expand Down Expand Up @@ -308,7 +313,9 @@ func TestInitExportingProcess(t *testing.T) {
t.Run("tcp success", func(t *testing.T) {
ctrl := gomock.NewController(t)
mockIPFIXRegistry := ipfixtesting.NewMockIPFIXRegistry(ctrl)
opt := &options.Options{}
opt := &options.Options{
AggregatorMode: flowaggregatorconfig.AggregatorModeAggregate,
}
opt.Config = &flowaggregatorconfig.FlowAggregatorConfig{}
flowaggregatorconfig.SetConfigDefaults(opt.Config)
listener, err := net.Listen("tcp", "127.0.0.1:0")
Expand All @@ -326,7 +333,9 @@ func TestInitExportingProcess(t *testing.T) {
t.Run("udp success", func(t *testing.T) {
ctrl := gomock.NewController(t)
mockIPFIXRegistry := ipfixtesting.NewMockIPFIXRegistry(ctrl)
opt := &options.Options{}
opt := &options.Options{
AggregatorMode: flowaggregatorconfig.AggregatorModeAggregate,
}
opt.Config = &flowaggregatorconfig.FlowAggregatorConfig{}
flowaggregatorconfig.SetConfigDefaults(opt.Config)
udpAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0")
Expand All @@ -346,7 +355,9 @@ func TestInitExportingProcess(t *testing.T) {
t.Run("tcp failure", func(t *testing.T) {
ctrl := gomock.NewController(t)
mockIPFIXRegistry := ipfixtesting.NewMockIPFIXRegistry(ctrl)
opt := &options.Options{}
opt := &options.Options{
AggregatorMode: flowaggregatorconfig.AggregatorModeAggregate,
}
opt.Config = &flowaggregatorconfig.FlowAggregatorConfig{}
flowaggregatorconfig.SetConfigDefaults(opt.Config)
// dialing this address is guaranteed to fail (we use 0 as the port number)
Expand Down
Loading

0 comments on commit 8166339

Please sign in to comment.