Skip to content

Commit

Permalink
Refactor template as module (#49)
Browse files Browse the repository at this point in the history
* Refactor template as module
* Abstraction to allow custom template storage (eg: file, redis, http...)
* Works similarly to Transport and Format
  • Loading branch information
lspgn authored Apr 16, 2023
1 parent 6b3c5f1 commit c4c4ffb
Show file tree
Hide file tree
Showing 10 changed files with 492 additions and 104 deletions.
23 changes: 19 additions & 4 deletions cmd/goflow2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ import (
_ "github.com/netsampler/goflow2/transport/file"
_ "github.com/netsampler/goflow2/transport/kafka"

// import various NetFlow/IPFIX templates
"github.com/netsampler/goflow2/decoders/netflow/templates"
_ "github.com/netsampler/goflow2/decoders/netflow/templates/file"
_ "github.com/netsampler/goflow2/decoders/netflow/templates/memory"

"github.com/netsampler/goflow2/utils"
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
Expand All @@ -39,6 +44,8 @@ var (
LogLevel = flag.String("loglevel", "info", "Log level")
LogFmt = flag.String("logfmt", "normal", "Log formatter")

NetFlowTemplates = flag.String("netflow.templates", "memory", fmt.Sprintf("Choose the format (available: %s)", strings.Join(templates.GetTemplates(), ", ")))

Format = flag.String("format", "json", fmt.Sprintf("Choose the format (available: %s)", strings.Join(format.GetFormats(), ", ")))
Transport = flag.String("transport", "file", fmt.Sprintf("Choose the transport (available: %s)", strings.Join(transport.GetTransports(), ", ")))

Expand Down Expand Up @@ -95,6 +102,13 @@ func main() {
}
defer transporter.Close(ctx)

// the following is only useful when parsing NetFlowV9/IPFIX (template-based flow)
templateSystem, err := templates.FindTemplateSystem(ctx, *NetFlowTemplates)
if err != nil {
log.Fatal(err)
}
defer templateSystem.Close(ctx)

switch *LogFmt {
case "json":
log.SetFormatter(&log.JSONFormatter{})
Expand Down Expand Up @@ -154,10 +168,11 @@ func main() {
err = sSFlow.FlowRoutine(*Workers, hostname, int(port), *ReusePort)
} else if listenAddrUrl.Scheme == "netflow" {
sNF := &utils.StateNetFlow{
Format: formatter,
Transport: transporter,
Logger: log.StandardLogger(),
Config: config,
Format: formatter,
Transport: transporter,
Logger: log.StandardLogger(),
Config: config,
TemplateSystem: templateSystem,
}
err = sNF.FlowRoutine(*Workers, hostname, int(port), *ReusePort)
} else if listenAddrUrl.Scheme == "nfl" {
Expand Down
91 changes: 15 additions & 76 deletions decoders/netflow/netflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,14 @@ package netflow

import (
"bytes"
"context"
"encoding/binary"
"fmt"
"sync"

"github.com/netsampler/goflow2/decoders/netflow/templates"
"github.com/netsampler/goflow2/decoders/utils"
)

type FlowBaseTemplateSet map[uint16]map[uint32]map[uint16]interface{}

type NetFlowTemplateSystem interface {
GetTemplate(version uint16, obsDomainId uint32, templateId uint16) (interface{}, error)
AddTemplate(version uint16, obsDomainId uint32, template interface{})
}

func DecodeNFv9OptionsTemplateSet(payload *bytes.Buffer) ([]NFv9OptionsTemplateRecord, error) {
var records []NFv9OptionsTemplateRecord
var err error
Expand Down Expand Up @@ -249,66 +243,11 @@ func DecodeDataSet(version uint16, payload *bytes.Buffer, listFields []Field) ([
return records, nil
}

func (ts *BasicTemplateSystem) GetTemplates() map[uint16]map[uint32]map[uint16]interface{} {
ts.templateslock.RLock()
tmp := ts.templates
ts.templateslock.RUnlock()
return tmp
}

func (ts *BasicTemplateSystem) AddTemplate(version uint16, obsDomainId uint32, template interface{}) {
ts.templateslock.Lock()
defer ts.templateslock.Unlock()
_, exists := ts.templates[version]
if exists != true {
ts.templates[version] = make(map[uint32]map[uint16]interface{})
}
_, exists = ts.templates[version][obsDomainId]
if exists != true {
ts.templates[version][obsDomainId] = make(map[uint16]interface{})
}
var templateId uint16
switch templateIdConv := template.(type) {
case IPFIXOptionsTemplateRecord:
templateId = templateIdConv.TemplateId
case NFv9OptionsTemplateRecord:
templateId = templateIdConv.TemplateId
case TemplateRecord:
templateId = templateIdConv.TemplateId
}
ts.templates[version][obsDomainId][templateId] = template
}

func (ts *BasicTemplateSystem) GetTemplate(version uint16, obsDomainId uint32, templateId uint16) (interface{}, error) {
ts.templateslock.RLock()
defer ts.templateslock.RUnlock()
templatesVersion, okver := ts.templates[version]
if okver {
templatesObsDom, okobs := templatesVersion[obsDomainId]
if okobs {
template, okid := templatesObsDom[templateId]
if okid {
return template, nil
}
}
}
return nil, NewErrorTemplateNotFound(version, obsDomainId, templateId, "info")
}

type BasicTemplateSystem struct {
templates FlowBaseTemplateSet
templateslock *sync.RWMutex
}

func CreateTemplateSystem() *BasicTemplateSystem {
ts := &BasicTemplateSystem{
templates: make(FlowBaseTemplateSet),
templateslock: &sync.RWMutex{},
}
return ts
func DecodeMessage(payload *bytes.Buffer, templates templates.TemplateInterface) (interface{}, error) {
return DecodeMessageContext(context.Background(), payload, "", templates)
}

func DecodeMessage(payload *bytes.Buffer, templates NetFlowTemplateSystem) (interface{}, error) {
func DecodeMessageContext(ctx context.Context, payload *bytes.Buffer, templateKey string, tpli templates.TemplateInterface) (interface{}, error) {
var size uint16
packetNFv9 := NFv9Packet{}
packetIPFIX := IPFIXPacket{}
Expand Down Expand Up @@ -368,9 +307,9 @@ func DecodeMessage(payload *bytes.Buffer, templates NetFlowTemplateSystem) (inte

flowSet = templatefs

if templates != nil {
if tpli != nil {
for _, record := range records {
templates.AddTemplate(version, obsDomainId, record)
tpli.AddTemplate(ctx, templates.NewTemplateKey(templateKey, version, obsDomainId, record.TemplateId), record)
}
}

Expand All @@ -386,9 +325,9 @@ func DecodeMessage(payload *bytes.Buffer, templates NetFlowTemplateSystem) (inte
}
flowSet = optsTemplatefs

if templates != nil {
if tpli != nil {
for _, record := range records {
templates.AddTemplate(version, obsDomainId, record)
tpli.AddTemplate(ctx, templates.NewTemplateKey(templateKey, version, obsDomainId, record.TemplateId), record)
}
}

Expand All @@ -404,9 +343,9 @@ func DecodeMessage(payload *bytes.Buffer, templates NetFlowTemplateSystem) (inte
}
flowSet = templatefs

if templates != nil {
if tpli != nil {
for _, record := range records {
templates.AddTemplate(version, obsDomainId, record)
tpli.AddTemplate(ctx, templates.NewTemplateKey(templateKey, version, obsDomainId, record.TemplateId), record)
}
}

Expand All @@ -422,20 +361,20 @@ func DecodeMessage(payload *bytes.Buffer, templates NetFlowTemplateSystem) (inte
}
flowSet = optsTemplatefs

if templates != nil {
if tpli != nil {
for _, record := range records {
templates.AddTemplate(version, obsDomainId, record)
tpli.AddTemplate(ctx, templates.NewTemplateKey(templateKey, version, obsDomainId, record.TemplateId), record)
}
}

} else if fsheader.Id >= 256 {
dataReader := bytes.NewBuffer(payload.Next(nextrelpos))

if templates == nil {
if tpli == nil {
continue
}

template, err := templates.GetTemplate(version, obsDomainId, fsheader.Id)
template, err := tpli.GetTemplate(ctx, templates.NewTemplateKey(templateKey, version, obsDomainId, fsheader.Id))

if err == nil {
switch templatec := template.(type) {
Expand Down
6 changes: 5 additions & 1 deletion decoders/netflow/netflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@ package netflow

import (
"bytes"
"context"
"testing"

"github.com/netsampler/goflow2/decoders/netflow/templates/memory"

"github.com/stretchr/testify/assert"
)

func TestDecodeNetFlowV9(t *testing.T) {
templates := CreateTemplateSystem()
templates := &memory.MemoryDriver{}
templates.Init(context.Background())

// Decode a template
template := []byte{
Expand Down
Loading

0 comments on commit c4c4ffb

Please sign in to comment.