Skip to content

Commit

Permalink
fieldfilters: implement near zero-overhead copy-on-filter
Browse files Browse the repository at this point in the history
Our previous implementation of field filters relied on creating a deep copy of the event
before filtering, because otherwise clearing important fields could lead to segmentation
faults due to the way we cache process information or cause filters to interfere with each
other under multiple concurrent GetEventsRequests. Creating the deep copy fixed the above issues,
but absolutely killed performance.

This patch introduces a new implementation that lazily copies fields into a new protobuf
message only when they "pass" the filter. The result is a near zero-overhead field filter
implementation that actually significantly improves performance over the base case when
filtering many fields.

The following is output from the benchmark. Serialize is the base case with no filtering
while DeepCopy roughly approximates our old implementation. The new implementation is
nearly zero overhead in the worst case and performs significantly better when filtering
lots of fields since we save significant cycles on JSON serialization.

    ❯ go test -seed 7996182721713197025 -bench=. -benchtime=20000x
    goos: linux
    goarch: amd64
    pkg: github.com/cilium/tetragon/pkg/fieldfilters
    cpu: 12th Gen Intel(R) Core(TM) i9-12900K
    BenchmarkSerialize-24                                              20000             57269 ns/op
    --- BENCH: BenchmarkSerialize-24
        benchmark_test.go:56: configured random event generator (seed=7996182721713197025)
        benchmark_test.go:56: configured random event generator (seed=7996182721713197025)
    BenchmarkSerialize_DeepCopy-24                                     20000             67012 ns/op
    --- BENCH: BenchmarkSerialize_DeepCopy-24
        benchmark_test.go:56: configured random event generator (seed=7996182721713197025)
        benchmark_test.go:56: configured random event generator (seed=7996182721713197025)
    BenchmarkSerialize_DeepCopyProcess-24                              20000             61535 ns/op
    --- BENCH: BenchmarkSerialize_DeepCopyProcess-24
        benchmark_test.go:56: configured random event generator (seed=7996182721713197025)
        benchmark_test.go:56: configured random event generator (seed=7996182721713197025)
    BenchmarkSerialize_FieldFilters-24                                 20000             57372 ns/op
    --- BENCH: BenchmarkSerialize_FieldFilters-24
        benchmark_test.go:56: configured random event generator (seed=7996182721713197025)
        benchmark_test.go:56: configured random event generator (seed=7996182721713197025)
    BenchmarkSerialize_FieldFilters_NoProcessInfo-24                   20000             26625 ns/op
    --- BENCH: BenchmarkSerialize_FieldFilters_NoProcessInfo-24
        benchmark_test.go:56: configured random event generator (seed=7996182721713197025)
        benchmark_test.go:56: configured random event generator (seed=7996182721713197025)
    BenchmarkSerialize_FieldFilters_NoProcesInfoKeepExecid-24          20000             36356 ns/op
    --- BENCH: BenchmarkSerialize_FieldFilters_NoProcesInfoKeepExecid-24
        benchmark_test.go:56: configured random event generator (seed=7996182721713197025)
        benchmark_test.go:56: configured random event generator (seed=7996182721713197025)
    PASS
    ok      github.com/cilium/tetragon/pkg/fieldfilters     19.345s

Signed-off-by: William Findlay <[email protected]>
  • Loading branch information
willfindlay authored and jrfastab committed Nov 16, 2023
1 parent 08a7d91 commit 19b2000
Show file tree
Hide file tree
Showing 19 changed files with 1,006 additions and 885 deletions.
6 changes: 5 additions & 1 deletion cmd/tetra/getevents/io_reader_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,12 @@ func (i *ioReaderClient) GetEvents(ctx context.Context, in *tetragon.GetEventsRe
if err != nil {
return nil, err
}
ffs, err := fieldfilters.FieldFiltersFromGetEventsRequest(in)
if err != nil {
return nil, fmt.Errorf("failed to create field filters: %w", err)
}
i.allowlist = allowlist
i.fieldFilters = fieldfilters.FieldFiltersFromGetEventsRequest(in)
i.fieldFilters = ffs
if i.debug {
fmt.Fprintf(os.Stderr, "DEBUG: GetEvents request: %+v\n", in)
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ require (
github.com/iancoleman/strcase v0.3.0
github.com/jpillora/longestcommon v0.0.0-20161227235612-adb9d91ee629
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51
github.com/mennanov/fmutils v0.2.1
github.com/mennanov/fieldmask-utils v1.1.0
github.com/opencontainers/runtime-spec v1.1.0
github.com/pelletier/go-toml v1.9.5
github.com/pkg/errors v0.9.1
Expand Down
542 changes: 3 additions & 539 deletions go.sum

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions pkg/fieldfilters/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,10 @@ func BenchmarkSerialize_FieldFilters(b *testing.B) {
gen := newRandomEventGenerator(b, Seed)
encoder := getEncoder()
evs := gen.GenerateN(b)
ff := NewExcludeFieldFilter([]tetragon.EventType{}, []string{}, false)
ff, err := NewExcludeFieldFilter([]tetragon.EventType{}, []string{}, false)
require.NoError(b, err)
b.StartTimer()

var err error
for i := 0; i < b.N; i++ {
ev := evs[i]
ev, err = ff.Filter(ev)
Expand All @@ -145,10 +145,10 @@ func BenchmarkSerialize_FieldFilters_NoProcessInfo(b *testing.B) {
gen := newRandomEventGenerator(b, Seed)
encoder := getEncoder()
evs := gen.GenerateN(b)
ff := NewExcludeFieldFilter([]tetragon.EventType{}, []string{"process", "parent"}, false)
ff, err := NewExcludeFieldFilter([]tetragon.EventType{}, []string{"process", "parent"}, false)
require.NoError(b, err)
b.StartTimer()

var err error
for i := 0; i < b.N; i++ {
ev := evs[i]
ev, err = ff.Filter(ev)
Expand All @@ -165,10 +165,10 @@ func BenchmarkSerialize_FieldFilters_NoProcesInfoKeepExecid(b *testing.B) {
gen := newRandomEventGenerator(b, Seed)
encoder := getEncoder()
evs := gen.GenerateN(b)
ff := NewExcludeFieldFilter([]tetragon.EventType{}, []string{"process.pid", "process.binary", "process.uid", "process.cwd", "process.arguments", "process.flags", "process.start_time", "process.auid", "process.pod", "process.docker", "process.refcnt", "process.cap", "process.ns", "process.tid", "process.process_credentials", "process.binary_properties", "parent.pid", "parent.binary", "parent.uid", "parent.cwd", "parent.arguments", "parent.flags", "parent.start_time", "parent.auid", "parent.pod", "parent.docker", "parent.refcnt", "parent.cap", "parent.ns", "parent.tid", "parent.parent_credentials", "parent.binary_properties"}, false)
ff, err := NewExcludeFieldFilter([]tetragon.EventType{}, []string{"process.pid", "process.binary", "process.uid", "process.cwd", "process.arguments", "process.flags", "process.start_time", "process.auid", "process.pod", "process.docker", "process.refcnt", "process.cap", "process.ns", "process.tid", "process.process_credentials", "process.binary_properties", "parent.pid", "parent.binary", "parent.uid", "parent.cwd", "parent.arguments", "parent.flags", "parent.start_time", "parent.auid", "parent.pod", "parent.docker", "parent.refcnt", "parent.cap", "parent.ns", "parent.tid", "parent.parent_credentials", "parent.binary_properties"}, false)
require.NoError(b, err)
b.StartTimer()

var err error
for i := 0; i < b.N; i++ {
ev := evs[i]
ev, err = ff.Filter(ev)
Expand Down
93 changes: 55 additions & 38 deletions pkg/fieldfilters/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ package fieldfilters

import (
"encoding/json"
"errors"
"fmt"
"io"
"strings"
"unicode"

"github.com/cilium/tetragon/api/v1/tetragon"
"github.com/mennanov/fmutils"
"google.golang.org/protobuf/proto"
fieldmask_utils "github.com/mennanov/fieldmask-utils"
"google.golang.org/protobuf/reflect/protoreflect"
)

Expand Down Expand Up @@ -41,14 +41,17 @@ func ParseFieldFilterList(filters string) ([]*tetragon.FieldFilter, error) {
return results, nil
}

// Converts a string to snake case.
func fixupSnakeCaseString(s string) string {
// Converts a string to camel case.
func fixupSnakeCaseString(s string, upper bool) string {
var builder strings.Builder

for i, r := range s {
if s[i] == '_' {
continue
}
if i == 0 && upper {
r = unicode.ToUpper(r)
}
if i != 0 && s[i-1] == '_' {
r = unicode.ToUpper(r)
}
Expand Down Expand Up @@ -80,7 +83,7 @@ func fixupFieldFilterString(s string) string {
return s
}

dat["fields"] = fixupSnakeCaseString(fields)
dat["fields"] = fixupSnakeCaseString(fields, false)
enc.Encode(&dat)
}

Expand All @@ -90,33 +93,48 @@ func fixupFieldFilterString(s string) string {
// FieldFilter is a helper for filtering fields in events
type FieldFilter struct {
eventSet []tetragon.EventType
fields fmutils.NestedMask
action tetragon.FieldFilterAction
fields fieldmask_utils.FieldFilter
invertEventSet bool
}

// NewFieldFilter constructs a new FieldFilter from a set of fields.
func NewFieldFilter(eventSet []tetragon.EventType, fields []string, action tetragon.FieldFilterAction, invertEventSet bool) *FieldFilter {
func NewFieldFilter(eventSet []tetragon.EventType, fields []string, action tetragon.FieldFilterAction, invertEventSet bool) (*FieldFilter, error) {
var err error
var filter fieldmask_utils.FieldFilter
switch action {
case tetragon.FieldFilterAction_INCLUDE:
filter, err = fieldmask_utils.MaskFromPaths(fields, func(s string) string {
return fixupSnakeCaseString(s, true)
})
case tetragon.FieldFilterAction_EXCLUDE:
filter, err = fieldmask_utils.MaskInverseFromPaths(fields, func(s string) string {
return fixupSnakeCaseString(s, true)
})
default:
return nil, fmt.Errorf("invalid fieldfilter action: %v", action)
}
if err != nil {
return nil, err
}
return &FieldFilter{
eventSet: eventSet,
fields: fmutils.NestedMaskFromPaths(fields),
action: action,
fields: filter,
invertEventSet: invertEventSet,
}
}, nil
}

// NewIncludeFieldFilter constructs a new inclusion FieldFilter from a set of fields.
func NewIncludeFieldFilter(eventSet []tetragon.EventType, fields []string, invertEventSet bool) *FieldFilter {
func NewIncludeFieldFilter(eventSet []tetragon.EventType, fields []string, invertEventSet bool) (*FieldFilter, error) {
return NewFieldFilter(eventSet, fields, tetragon.FieldFilterAction_INCLUDE, invertEventSet)
}

// NewExcludeFieldFilter constructs a new exclusion FieldFilter from a set of fields.
func NewExcludeFieldFilter(eventSet []tetragon.EventType, fields []string, invertEventSet bool) *FieldFilter {
func NewExcludeFieldFilter(eventSet []tetragon.EventType, fields []string, invertEventSet bool) (*FieldFilter, error) {
return NewFieldFilter(eventSet, fields, tetragon.FieldFilterAction_EXCLUDE, invertEventSet)
}

// FieldFilterFromProto constructs a new FieldFilter from a Tetragon API field filter.
func FieldFilterFromProto(filter *tetragon.FieldFilter) *FieldFilter {
func FieldFilterFromProto(filter *tetragon.FieldFilter) (*FieldFilter, error) {
var fields []string

if filter.Fields != nil {
Expand All @@ -133,33 +151,31 @@ func FieldFilterFromProto(filter *tetragon.FieldFilter) *FieldFilter {

// FieldFiltersFromGetEventsRequest returns a list of EventFieldFilter for
// a GetEventsRequest.
func FieldFiltersFromGetEventsRequest(request *tetragon.GetEventsRequest) []*FieldFilter {
//
// nolint:revive // revive complains about stutter
func FieldFiltersFromGetEventsRequest(request *tetragon.GetEventsRequest) ([]*FieldFilter, error) {
var filters []*FieldFilter

for _, filter := range request.FieldFilters {
if filter == nil {
continue
}
filters = append(filters, FieldFilterFromProto(filter))

ff, err := FieldFilterFromProto(filter)
if err != nil {
return nil, err
}

filters = append(filters, ff)
}

return filters
return filters, nil
}

// Filter filters the fields in the GetEventsResponse, keeping fields specified in the
// inclusion filter and discarding fields specified in the exclusion filter. Exclusion
// takes precedence over inclusion and an empty filter set will keep all remaining fields.
func (f *FieldFilter) Filter(event *tetragon.GetEventsResponse) (*tetragon.GetEventsResponse, error) {
// We need to deep copy the event here to avoid issues caused by filtering out
// information that is shared between events through the event cache (e.g. process
// info). This can cause segmentation faults and other nasty bugs. Avoid all that by
// doing a deep copy here before filtering.
//
// FIXME: We need to fix this so that it doesn't kill performance by doing a deep
// copy. This will require architectural changes to both the field filters and the
// event cache.
event = proto.Clone(event).(*tetragon.GetEventsResponse)

if len(f.eventSet) > 0 {
// skip filtering by default unless the event set is inverted, in which case we
// want to filter by default and skip only if we have a match
Expand Down Expand Up @@ -189,24 +205,25 @@ func (f *FieldFilter) Filter(event *tetragon.GetEventsResponse) (*tetragon.GetEv
}
}

rft := event.ProtoReflect()
rft.Range(func(eventDesc protoreflect.FieldDescriptor, _ protoreflect.Value) bool {
if eventDesc.ContainingOneof() == nil || !rft.Has(eventDesc) {
src := event.ProtoReflect()
dst := src.New()
var filterErrs []error
src.Range(func(fd protoreflect.FieldDescriptor, v protoreflect.Value) bool {
if fd.ContainingOneof() == nil || !src.Has(fd) {
return true
}
event := rft.Mutable(eventDesc).Message().Interface()
switch f.action {
case tetragon.FieldFilterAction_INCLUDE:
f.fields.Filter(event)
default:
f.fields.Prune(event)
event := src.Get(fd).Message().Interface()
dstEvent := dst.Mutable(fd).Message().Interface()
err := fieldmask_utils.StructToStruct(f.fields, event, dstEvent)
if err != nil {
filterErrs = append(filterErrs, err)
}
return true
})

if !rft.IsValid() {
if !src.IsValid() {
return nil, fmt.Errorf("invalid event after field filter")
}

return event, nil
return dst.Interface().(*tetragon.GetEventsResponse), errors.Join(filterErrs...)
}
62 changes: 36 additions & 26 deletions pkg/fieldfilters/fields_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/cilium/tetragon/api/v1/tetragon"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/fieldmaskpb"
"google.golang.org/protobuf/types/known/timestamppb"
Expand Down Expand Up @@ -68,7 +69,7 @@ func TestEventFieldFilters(t *testing.T) {
EventSet: []tetragon.EventType{},
Fields: &fieldmaskpb.FieldMask{
Paths: []string{
"process",
"parent",
"process.pid",
"process.uid",
"process.pod",
Expand All @@ -92,9 +93,11 @@ func TestEventFieldFilters(t *testing.T) {
}

// Construct the filter
filters := FieldFiltersFromGetEventsRequest(request)
filters, err := FieldFiltersFromGetEventsRequest(request)
require.NoError(t, err)
for _, filter := range filters {
ev, _ = filter.Filter(ev)
ev, err = filter.Filter(ev)
assert.NoError(t, err)
}

// These fields should all have been included and so should not be empty
Expand Down Expand Up @@ -124,19 +127,22 @@ func TestFieldFilterByEventType(t *testing.T) {
},
}

filter := NewExcludeFieldFilter([]tetragon.EventType{tetragon.EventType_PROCESS_EXIT}, []string{"process.pid"}, false)
filter, err := NewExcludeFieldFilter([]tetragon.EventType{tetragon.EventType_PROCESS_EXIT}, []string{"process.pid"}, false)
require.NoError(t, err)
ev, _ = filter.Filter(ev)

assert.NotEmpty(t, ev.GetProcessExec().Process.Pid)

filter = NewExcludeFieldFilter([]tetragon.EventType{tetragon.EventType_PROCESS_EXEC}, []string{"process.pid"}, false)
filter, err = NewExcludeFieldFilter([]tetragon.EventType{tetragon.EventType_PROCESS_EXEC}, []string{"process.pid"}, false)
require.NoError(t, err)
ev, _ = filter.Filter(ev)

assert.Empty(t, ev.GetProcessExec().Process.Pid)
}

func TestEmptyFieldFilter(t *testing.T) {
filter := NewIncludeFieldFilter([]tetragon.EventType{}, []string{}, false)
filter, err := NewIncludeFieldFilter([]tetragon.EventType{}, []string{}, false)
require.NoError(t, err)

ev := &tetragon.GetEventsResponse{
Event: &tetragon.GetEventsResponse_ProcessExec{
Expand Down Expand Up @@ -248,7 +254,8 @@ func TestFieldFilterInvertedEventSet(t *testing.T) {
},
}

filter := NewExcludeFieldFilter([]tetragon.EventType{tetragon.EventType_PROCESS_EXEC}, []string{"process", "parent"}, true)
filter, err := NewExcludeFieldFilter([]tetragon.EventType{tetragon.EventType_PROCESS_EXEC}, []string{"process", "parent"}, true)
require.NoError(t, err)
assert.True(t, proto.Equal(ev, expected), "events are equal before filter")
ev, _ = filter.Filter(ev)
assert.True(t, proto.Equal(ev, expected), "events are equal after filter")
Expand All @@ -268,7 +275,8 @@ func TestFieldFilterInvertedEventSet(t *testing.T) {
},
}

filter = NewExcludeFieldFilter([]tetragon.EventType{tetragon.EventType_PROCESS_KPROBE}, []string{"process", "parent"}, true)
filter, err = NewExcludeFieldFilter([]tetragon.EventType{tetragon.EventType_PROCESS_KPROBE}, []string{"process", "parent"}, true)
require.NoError(t, err)
assert.False(t, proto.Equal(ev, expected), "events are not equal before filter")
ev, _ = filter.Filter(ev)
assert.True(t, proto.Equal(ev, expected), "events are equal after filter")
Expand Down Expand Up @@ -579,24 +587,26 @@ func TestSlimExecEventsFieldFilterExample(t *testing.T) {
},
}

filters := []*FieldFilter{
NewExcludeFieldFilter([]tetragon.EventType{}, []string{"parent"}, false),
NewExcludeFieldFilter([]tetragon.EventType{tetragon.EventType_PROCESS_EXEC}, []string{
"process.pid",
"process.uid",
"process.cwd",
"process.binary",
"process.arguments",
"process.flags",
"process.start_time",
"process.auid",
"process.pod",
"process.docker",
"process.refcnt",
"process.cap",
"process.ns",
}, true),
}
ff1, err := NewExcludeFieldFilter([]tetragon.EventType{}, []string{"parent"}, false)
require.NoError(t, err)
ff2, err := NewExcludeFieldFilter([]tetragon.EventType{tetragon.EventType_PROCESS_EXEC}, []string{
"process.pid",
"process.uid",
"process.cwd",
"process.binary",
"process.arguments",
"process.flags",
"process.start_time",
"process.auid",
"process.pod",
"process.docker",
"process.refcnt",
"process.cap",
"process.ns",
}, true)
require.NoError(t, err)

filters := []*FieldFilter{ff1, ff2}

for _, filter := range filters {
for i, ev := range evs {
Expand Down
2 changes: 1 addition & 1 deletion pkg/fieldfilters/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func TestFixupSnakeCaseString(t *testing.T) {
s := "process.exec_id,process.binary,parent.exec_id,foo.bar_qux_baz,this_is_a_test"
expected := "process.execId,process.binary,parent.execId,foo.barQuxBaz,thisIsATest"

assert.Equal(t, expected, fixupSnakeCaseString(s))
assert.Equal(t, expected, fixupSnakeCaseString(s, false))
}

func TestParseFieldFilterList(t *testing.T) {
Expand Down
5 changes: 4 additions & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,10 @@ func (s *Server) GetEventsWG(request *tetragon.GetEventsRequest, server tetragon
}

// Filter the GetEventsResponse fields
filters := fieldfilters.FieldFiltersFromGetEventsRequest(request)
filters, err := fieldfilters.FieldFiltersFromGetEventsRequest(request)
if err != nil {
return fmt.Errorf("failed to create field filters: %w", err)
}

for _, filter := range filters {
ev, err := filter.Filter(event)
Expand Down
4 changes: 4 additions & 0 deletions vendor/github.com/mennanov/fieldmask-utils/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 19b2000

Please sign in to comment.