Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SendDataRecords to IPFIX Exporter #391

Merged
merged 1 commit into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions pkg/entities/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ import (
)

const (
MaxSocketMsgSize int = 65535
MsgHeaderLength int = 16
MaxSocketMsgSize = 65535
MsgHeaderLength = 16
// Typically all networks should support a datagram size of 512B without fragmentation.
MinSupportedMsgSize = 512
)

// Message represents IPFIX message.
Expand Down
26 changes: 17 additions & 9 deletions pkg/exporter/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,24 @@
package exporter

import (
"bytes"
"fmt"
"time"

"github.com/vmware/go-ipfix/pkg/entities"
)

func CreateIPFIXMsg(set entities.Set, obsDomainID uint32, seqNumber uint32, exportTime time.Time) ([]byte, error) {
var buf bytes.Buffer
if _, err := WriteIPFIXMsgToBuffer(set, obsDomainID, seqNumber, exportTime, &buf); err != nil {
return nil, err
}
return buf.Bytes(), nil
}

// WriteIPFIXMsgToBuffer serializes the set as an IPFIX message and writes it to the provided
// buffer. It returns the message length and an error if applicable.
func WriteIPFIXMsgToBuffer(set entities.Set, obsDomainID uint32, seqNumber uint32, exportTime time.Time, buf *bytes.Buffer) (int, error) {
// Create a new message and use it to send the set.
msg := entities.NewMessage(false)

Expand All @@ -30,7 +41,7 @@ func CreateIPFIXMsg(set entities.Set, obsDomainID uint32, seqNumber uint32, expo
msgLen := entities.MsgHeaderLength + set.GetSetLength()
if msgLen > entities.MaxSocketMsgSize {
yuntanghsu marked this conversation as resolved.
Show resolved Hide resolved
// This is applicable for both TCP and UDP sockets.
return nil, fmt.Errorf("message size exceeds max socket buffer size")
return msgLen, fmt.Errorf("message size exceeds max socket buffer size")
}

// Set the fields in the message header.
Expand All @@ -42,15 +53,12 @@ func CreateIPFIXMsg(set entities.Set, obsDomainID uint32, seqNumber uint32, expo
msg.SetExportTime(uint32(exportTime.Unix()))
msg.SetSequenceNum(seqNumber)

bytesSlice := make([]byte, msgLen)
copy(bytesSlice[:entities.MsgHeaderLength], msg.GetMsgHeader())
copy(bytesSlice[entities.MsgHeaderLength:entities.MsgHeaderLength+entities.SetHeaderLen], set.GetHeaderBuffer())
index := entities.MsgHeaderLength + entities.SetHeaderLen
buf.Grow(msgLen)
buf.Write(msg.GetMsgHeader())
buf.Write(set.GetHeaderBuffer())
for _, record := range set.GetRecords() {
len := record.GetRecordLength()
copy(bytesSlice[index:index+len], record.GetBuffer())
index += len
buf.Write(record.GetBuffer())
}

return bytesSlice, nil
return msgLen, nil
}
207 changes: 150 additions & 57 deletions pkg/exporter/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@ type templateValue struct {
// 2. Only one observation point per observation domain is supported,
// so observation point ID not defined.
// 3. Supports only TCP and UDP; one session at a time. SCTP is not supported.
// 4. UDP needs to send MTU size packets as per RFC7011. We are not honoring that,
// and relying on IP fragmentation and assuming data loss in the network is minimal.
// We will revisit this if there are any issues, and get PathMTU from the user
// as part of exporter input.
// 4. UDP needs to send PMTU size packets as per RFC7011. In order to guarantee
// this, maxMsgSize should be set correctly. maxMsgSize is the maximum
// payload (IPFIX message) size, not the maximum packet size. You need to
// compute maxMsgSize based on the desired maximum packet size. If
// maxMsgSize is not set correctly, the message may be fragmented.
type ExportingProcess struct {
connToCollector net.Conn
obsDomainID uint32
Expand All @@ -59,6 +60,7 @@ type ExportingProcess struct {
templateMutex sync.Mutex
sendJSONRecord bool
jsonBufferLen int
maxMsgSize int
wg sync.WaitGroup
isClosed atomic.Bool
stopCh chan struct{}
Expand Down Expand Up @@ -86,24 +88,22 @@ type ExporterInput struct {
ObservationDomainID uint32
TempRefTimeout uint32
// TLSClientConfig is set to use an encrypted connection to the collector.
TLSClientConfig *ExporterTLSClientConfig
IsIPv6 bool
SendJSONRecord bool
JSONBufferLen int
TLSClientConfig *ExporterTLSClientConfig
IsIPv6 bool
SendJSONRecord bool
// JSONBufferLen is recommended for sending json records. If not given a
// valid value, we use a default of 5000B
JSONBufferLen int
// For UDP, this should be set by taking into account the PMTU and
// header sizes.
MaxMsgSize int
CheckConnInterval time.Duration
}

// InitExportingProcess takes in collector address(net.Addr format), obsID(observation ID)
// and tempRefTimeout(template refresh timeout). tempRefTimeout is applicable only
// for collectors listening over UDP; unit is seconds. For TCP, you can pass any
// value and it will be ignored. For UDP, if 0 is passed, 600s is used as the default.
//
// PathMTU is recommended for UDP transport. If not given a valid value, i.e., either
// 0 or a value more than 1500, we consider a default value of 512B as per RFC7011.
// PathMTU is optional for TCP as we use max socket buffer size of 65535. It can
// be provided as 0.
// JSONBufferLen is recommended for sending json record. If not given a valid value,
// we consider a default 5000B.
func InitExportingProcess(input ExporterInput) (*ExportingProcess, error) {
var conn net.Conn
var err error
Expand Down Expand Up @@ -162,6 +162,22 @@ func InitExportingProcess(input ExporterInput) (*ExportingProcess, error) {
stopCh: make(chan struct{}),
}

if expProc.sendJSONRecord {
if input.JSONBufferLen <= 0 {
expProc.jsonBufferLen = defaultJSONBufferLen
} else {
expProc.jsonBufferLen = input.JSONBufferLen
}
} else {
if input.MaxMsgSize == 0 {
expProc.maxMsgSize = entities.MaxSocketMsgSize
} else if input.MaxMsgSize < entities.MinSupportedMsgSize {
return nil, fmt.Errorf("maxMsgSize cannot be less than 512B")
} else {
expProc.maxMsgSize = input.MaxMsgSize
}
}

// Start a goroutine to check whether the collector has already closed the TCP connection.
if input.CollectorProtocol == "tcp" {
interval := input.CheckConnInterval
Expand Down Expand Up @@ -218,28 +234,22 @@ func InitExportingProcess(input ExporterInput) (*ExportingProcess, error) {
}
}()
}
if expProc.sendJSONRecord {
if input.JSONBufferLen <= 0 {
expProc.jsonBufferLen = defaultJSONBufferLen
} else {
expProc.jsonBufferLen = input.JSONBufferLen
}
}
return expProc, nil
}

func (ep *ExportingProcess) SendSet(set entities.Set) (int, error) {
func (ep *ExportingProcess) sendSet(set entities.Set, doDataRecSanityCheck bool, buf *bytes.Buffer) (int, error) {
// Iterate over all records in the set.
setType := set.GetSetType()
if setType == entities.Undefined {
return 0, fmt.Errorf("set type is not properly defined")
}
for _, record := range set.GetRecords() {
if setType == entities.Template {
if setType == entities.Template {
for _, record := range set.GetRecords() {
ep.updateTemplate(record.GetTemplateID(), record.GetOrderedElementList(), record.GetMinDataRecordLen())
} else if setType == entities.Data {
err := ep.dataRecSanityCheck(record)
if err != nil {
}
} else if setType == entities.Data && doDataRecSanityCheck {
for _, record := range set.GetRecords() {
if err := ep.dataRecSanityCheck(record); err != nil {
return 0, fmt.Errorf("error when doing sanity check:%v", err)
}
}
Expand All @@ -249,21 +259,96 @@ func (ep *ExportingProcess) SendSet(set entities.Set) (int, error) {

var bytesSent int
var err error
if !ep.sendJSONRecord {
bytesSent, err = ep.createAndSendIPFIXMsg(set)
} else {
if ep.sendJSONRecord {
if setType == entities.Data {
bytesSent, err = ep.createAndSendJSONMsg(set)
_, bytesSent, err = ep.createAndSendJSONRecords(set.GetRecords(), buf)
}
} else {
bytesSent, err = ep.createAndSendIPFIXMsg(set, buf)
}
if err != nil {
return bytesSent, err
return bytesSent, err
}

// SendSet sends the provided set and returns the number of bytes written and an error if applicable.
func (ep *ExportingProcess) SendSet(set entities.Set) (int, error) {
return ep.sendSet(set, true, &bytes.Buffer{})
}

// SendDataRecords is a specialized version of SendSet which can send a list of data records more
// efficiently. All the data records must be for the same template ID. This function performs fewer
// sanity checks on the data records, compared to SendSet. This function can also take a reusable
// buffer as a parameter to avoid repeated memory allocations. You can use nil as the buffer if you
// want this function to be responsible for allocation. SendDataRecords returns the number of
// records successfully sent, the total number of bytes sent, and an error if applicable.
func (ep *ExportingProcess) SendDataRecords(templateID uint16, records []entities.Record, buf *bytes.Buffer) (int, int, error) {
if buf == nil {
if ep.sendJSONRecord {
buf = bytes.NewBuffer(make([]byte, 0, ep.jsonBufferLen))
} else {
buf = bytes.NewBuffer(make([]byte, 0, ep.maxMsgSize))
}
} else {
buf.Reset()
}
return bytesSent, nil

if ep.sendJSONRecord {
return ep.createAndSendJSONRecords(records, buf)
}

recordsSent := 0
bytesSent := 0

set := entities.NewSet(false)

for recordsSent < len(records) {
// length will always match set.GetSetLength
length := entities.MsgHeaderLength + entities.SetHeaderLen
if err := set.PrepareSet(entities.Data, templateID); err != nil {
return 0, 0, err
}
numRecordsInSet := 0
for idx := recordsSent; idx < len(records); idx++ {
record := records[idx]
recordLength := record.GetRecordLength()
// If the record fits in the current message, add it to the set and continue to the next record.
if length+recordLength <= ep.maxMsgSize {
if err := set.AddRecordV3(record); err != nil {
return recordsSent, bytesSent, fmt.Errorf("error when adding record to data set: %w", err)
}
numRecordsInSet += 1
length += recordLength
continue
}
// There is no record in the set currently, yet this new record cannot fit!
if numRecordsInSet == 0 {
return recordsSent, bytesSent, fmt.Errorf("record exceeds max size")
yuntanghsu marked this conversation as resolved.
Show resolved Hide resolved
}
// Break out of the loop and send the set
break
}
// Time to send the set / message. Note that it is guaranteed that numRecordsInSet > 1.
// We choose not to invoke dataRecSanityCheck on the individual records.
n, err := ep.sendSet(set, false, buf)
bytesSent += n
if err != nil {
return recordsSent, bytesSent, fmt.Errorf("error when sending data set: %w", err)
}
recordsSent += numRecordsInSet
// We have more records to send, so prepare shared data structures.
if recordsSent < len(records) {
set.ResetSet()
buf.Reset()
}
}

return recordsSent, bytesSent, nil
}

// GetMsgSizeLimit returns the maximum IPFIX message size that this exporter is allowed to write to
// the connection. If the exporter is configured to send marshalled JSON records instead, this
// function will return 0.
func (ep *ExportingProcess) GetMsgSizeLimit() int {
return entities.MaxSocketMsgSize
return ep.maxMsgSize
}

// CloseConnToCollector closes the connection to the collector.
Expand Down Expand Up @@ -307,32 +392,41 @@ func (ep *ExportingProcess) NewTemplateID() uint16 {

// createAndSendIPFIXMsg takes in a set as input, creates the IPFIX message, and sends it out.
// TODO: This method will change when we support sending multiple sets.
func (ep *ExportingProcess) createAndSendIPFIXMsg(set entities.Set) (int, error) {
func (ep *ExportingProcess) createAndSendIPFIXMsg(set entities.Set, buf *bytes.Buffer) (int, error) {
if set.GetSetType() == entities.Data {
ep.seqNumber = ep.seqNumber + set.GetNumberOfRecords()
}
bytesSlice, err := CreateIPFIXMsg(set, ep.obsDomainID, ep.seqNumber, time.Now())
n, err := WriteIPFIXMsgToBuffer(set, ep.obsDomainID, ep.seqNumber, time.Now(), buf)
if err != nil {
return 0, err
}
if n > ep.maxMsgSize {
return 0, fmt.Errorf("IPFIX message length %d exceeds maximum size of %d", n, ep.maxMsgSize)
}

// Send the message on the exporter connection.
bytesSent, err := ep.connToCollector.Write(bytesSlice)
bytesSent, err := ep.connToCollector.Write(buf.Bytes())

if err != nil {
return bytesSent, fmt.Errorf("error when sending message on the connection: %v", err)
} else if bytesSent != len(bytesSlice) {
} else if bytesSent != n {
return bytesSent, fmt.Errorf("could not send the complete message on the connection")
}

return bytesSent, nil
}

// createAndSendJSONMsg takes in a set as input, creates the JSON record, and sends it out.
func (ep *ExportingProcess) createAndSendJSONMsg(set entities.Set) (int, error) {
var bytesSent int
for _, record := range set.GetRecords() {
elements := make(map[string]interface{})
// createAndSendJSONRecords takes in a slice of records as input, marshals each record to JSON using
// the provided buffer, and writes it to the connection. It returns the number of records sent, the
// total number of bytes sent, and an error if applicable.
func (ep *ExportingProcess) createAndSendJSONRecords(records []entities.Record, buf *bytes.Buffer) (int, int, error) {
buf.Grow(ep.jsonBufferLen)
recordsSent := 0
bytesSent := 0
elements := make(map[string]interface{})
message := make(map[string]interface{}, 2)
for _, record := range records {
clear(elements)
orderedElements := record.GetOrderedElementList()
for _, element := range orderedElements {
switch element.GetDataType() {
Expand Down Expand Up @@ -363,34 +457,33 @@ func (ep *ExportingProcess) createAndSendJSONMsg(set entities.Set) (int, error)
case entities.DateTimeMilliseconds:
elements[element.GetName()] = element.GetUnsigned64Value()
case entities.DateTimeMicroseconds, entities.DateTimeNanoseconds:
return bytesSent, fmt.Errorf("API does not support micro and nano seconds types yet")
return recordsSent, bytesSent, fmt.Errorf("API does not support micro and nano seconds types yet")
case entities.MacAddress:
elements[element.GetName()] = element.GetMacAddressValue()
case entities.Ipv4Address, entities.Ipv6Address:
elements[element.GetName()] = element.GetIPAddressValue()
case entities.String:
elements[element.GetName()] = element.GetStringValue()
default:
return bytesSent, fmt.Errorf("API supports only valid information elements with datatypes given in RFC7011")
return recordsSent, bytesSent, fmt.Errorf("API supports only valid information elements with datatypes given in RFC7011")
}
}
message := make(map[string]interface{}, 2)
message["ipfix"] = elements
message["@timestamp"] = time.Now().Format(time.RFC3339)
writer := bytes.NewBuffer(make([]byte, 0, ep.jsonBufferLen))
encoder := json.NewEncoder(writer)
err := encoder.Encode(message)
if err != nil {
return bytesSent, fmt.Errorf("error when encoding message to JSON: %v", err)
encoder := json.NewEncoder(buf)
if err := encoder.Encode(message); err != nil {
return recordsSent, bytesSent, fmt.Errorf("error when encoding message to JSON: %v", err)
}
// Send the message on the exporter connection.
bytes, err := ep.connToCollector.Write(writer.Bytes())
bytes, err := ep.connToCollector.Write(buf.Bytes())
bytesSent += bytes
if err != nil {
return bytes, fmt.Errorf("error when sending message on the connection: %v", err)
return recordsSent, bytesSent, fmt.Errorf("error when sending message on the connection: %v", err)
}
bytesSent += bytes
recordsSent += 1
buf.Reset()
}
return bytesSent, nil
return recordsSent, bytesSent, nil
}

func (ep *ExportingProcess) updateTemplate(id uint16, elements []entities.InfoElementWithValue, minDataRecLen uint16) {
Expand Down
Loading
Loading