Skip to content

Commit

Permalink
pkg/agent: Use eBPF map batch operations
Browse files Browse the repository at this point in the history
This patch adds support for using eBPF batch operations on maps which
should dramatically improve the performance of the parca-agent by
enabling it to batch lookup and delete keys/values in maps in a single
operation.

This replaces the use of iterators in favor of iteration in batches.

Fixes parca-dev#74
  • Loading branch information
derekparker committed Jan 13, 2022
1 parent 6387a51 commit 8f549ff
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 111 deletions.
6 changes: 5 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,8 @@ require (
sigs.k8s.io/yaml v1.2.0 // indirect
)

replace github.com/prometheus/prometheus => github.com/prometheus/prometheus v1.8.2-0.20201130085533-a6e18916ab40
replace (
// github.com/aquasecurity/libbpfgo => github.com/aquasecurity/libbpfgo v0.2.2-libbpf-0.5.0.0.20211215154851-b168808861fe
github.com/aquasecurity/libbpfgo => /home/deparker/Code/libbpfgo
github.com/prometheus/prometheus => github.com/prometheus/prometheus v1.8.2-0.20201130085533-a6e18916ab40
)
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,6 @@ github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kd
github.com/apache/arrow/go/arrow v0.0.0-20191024131854-af6fa24be0db/go.mod h1:VTxUBvSJ3s3eHAg65PNgrsn5BtqCRPdmyXh6rAfdxN0=
github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/aquasecurity/libbpfgo v0.2.3-libbpf-0.6.1 h1:YGFburh0Fo4X7Ke7IgUzcyqjWWJLeWlaHXKsXi/09NQ=
github.com/aquasecurity/libbpfgo v0.2.3-libbpf-0.6.1/go.mod h1:/+clceXE103FaXvVTIY2HAkQjxNtkra4DRWvZYr2SKw=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
Expand Down
194 changes: 86 additions & 108 deletions pkg/profiler/profiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ import (
"context"
_ "embed"
"encoding/binary"
"errors"
"fmt"
"io"
"os"
"runtime"
"strings"
"sync"
"syscall"
"time"
"unsafe"

Expand Down Expand Up @@ -51,8 +52,19 @@ var bpfObj []byte
const (
stackDepth = 127 // Always needs to be sync with MAX_STACK_DEPTH in parca-agent.bpf.c
doubleStackDepth = 254
batchSize = 1024
)

// stackCountKey mirrors the struct in parca-agent.bpf.c.
//
// TODO(derekparker) Perhaps in order to keep these in sync we should write a Go generator to
// create the C struct from the Go struct.
type stackCountKey struct {
pid uint32
userStackID int32
kernelStackID int32
}

type CgroupProfiler struct {
logger log.Logger
ksymCache *ksym.KsymCache
Expand Down Expand Up @@ -202,6 +214,15 @@ func (p *CgroupProfiler) Run(ctx context.Context) error {
return fmt.Errorf("get counts map: %w", err)
}

// Allocate this here so it's only allocated once instead of every
// time that p.profileLoop is called below. This is because, as of now,
// this slice will be around 122Kb. We allocate enough to read the entire
// map instead of using the batch iteration feature because it vastly
// simplifies the code in profileLoop and the batch operations are a bit tricky to get right.
// If allocating this much memory upfront is a problem we can always revisit and use
// smaller batch sizes.
countKeys := make([]stackCountKey, counts.GetMaxEntries())

stackTraces, err := m.GetMap("stack_traces")
if err != nil {
return fmt.Errorf("get stack traces map: %w", err)
Expand All @@ -220,7 +241,7 @@ func (p *CgroupProfiler) Run(ctx context.Context) error {

t := time.Now()

err := p.profileLoop(ctx, t, counts, stackTraces)
err := p.profileLoop(ctx, t, counts, countKeys, stackTraces)
if err != nil {
level.Debug(p.logger).Log("msg", "profile loop error", "err", err)
}
Expand All @@ -229,80 +250,72 @@ func (p *CgroupProfiler) Run(ctx context.Context) error {
}
}

func (p *CgroupProfiler) profileLoop(ctx context.Context, now time.Time, counts, stackTraces *bpf.BPFMap) error {
prof := &profile.Profile{
SampleType: []*profile.ValueType{{
Type: "samples",
Unit: "count",
}},
TimeNanos: now.UnixNano(),
DurationNanos: int64(p.profilingDuration),

// We sample at 100Hz, which is every 10 Million nanoseconds.
PeriodType: &profile.ValueType{
Type: "cpu",
Unit: "nanoseconds",
},
Period: 10000000,
}

mapping := maps.NewMapping(p.pidMappingFileCache)
kernelMapping := &profile.Mapping{
File: "[kernel.kallsyms]",
}
kernelFunctions := map[uint64]*profile.Function{}
userFunctions := map[[2]uint64]*profile.Function{}

// 2 uint64 1 for PID and 1 for Addr
locations := []*profile.Location{}
kernelLocations := []*profile.Location{}
kernelAddresses := map[uint64]struct{}{}
locationIndices := map[[2]uint64]int{}
samples := map[[doubleStackDepth]uint64]*profile.Sample{}
func (p *CgroupProfiler) profileLoop(ctx context.Context, now time.Time, counts *bpf.BPFMap, keys []stackCountKey, stackTraces *bpf.BPFMap) error {
var (
prof = &profile.Profile{
SampleType: []*profile.ValueType{{
Type: "samples",
Unit: "count",
}},
TimeNanos: now.UnixNano(),
DurationNanos: int64(p.profilingDuration),

// We sample at 100Hz, which is every 10 Million nanoseconds.
PeriodType: &profile.ValueType{
Type: "cpu",
Unit: "nanoseconds",
},
Period: 10000000,
}

// TODO(brancz): What was this for?
//has_collision := false
mapping = maps.NewMapping(p.pidMappingFileCache)
kernelMapping = &profile.Mapping{
File: "[kernel.kallsyms]",
}
kernelFunctions = map[uint64]*profile.Function{}
userFunctions = map[[2]uint64]*profile.Function{}

it := counts.Iterator()
byteOrder := byteorder.GetHostByteOrder()
// 2 uint64 1 for PID and 1 for Addr
locations = []*profile.Location{}
kernelLocations = []*profile.Location{}
kernelAddresses = map[uint64]struct{}{}
locationIndices = map[[2]uint64]int{}
samples = map[[doubleStackDepth]uint64]*profile.Sample{}
byteOrder = byteorder.GetHostByteOrder()

// TODO(brancz): Use libbpf batch functions.
for it.Next() {
// This byte slice is only valid for this iteration, so it must be
// copied if we want to do anything with it outside of this loop.
keyBytes := it.Key()
// Variables needed for eBPF map batch iteration.
keysPtr = unsafe.Pointer(&keys[0])
nextKey = uintptr(1)
)

r := bytes.NewBuffer(keyBytes)
memsetCountKeys(keys, stackCountKey{})

pidBytes := make([]byte, 4)
if _, err := io.ReadFull(r, pidBytes); err != nil {
return fmt.Errorf("read pid bytes: %w", err)
vals, err := counts.GetValueAndDeleteBatch(keysPtr, nil, unsafe.Pointer(&nextKey), counts.GetMaxEntries())
if err != nil {
if !errors.Is(err, syscall.ENOENT) { // Map is empty or we got all keys in the last batch.
return err
}
pid := byteOrder.Uint32(pidBytes)
}

userStackIDBytes := make([]byte, 4)
if _, err := io.ReadFull(r, userStackIDBytes); err != nil {
return fmt.Errorf("read user stack ID bytes: %w", err)
}
userStackID := int32(byteOrder.Uint32(userStackIDBytes))
for i, key := range keys {
var (
pid = key.pid
userStackID = key.userStackID
kernelStackID = key.kernelStackID
)

kernelStackIDBytes := make([]byte, 4)
if _, err := io.ReadFull(r, kernelStackIDBytes); err != nil {
return fmt.Errorf("read kernel stack ID bytes: %w", err)
if pid == 0 {
break
}
kernelStackID := int32(byteOrder.Uint32(kernelStackIDBytes))

valueBytes, err := counts.GetValue(unsafe.Pointer(&keyBytes[0]))
if err != nil {
return fmt.Errorf("get count value: %w", err)
}
value := byteOrder.Uint64(valueBytes)
value := byteOrder.Uint64(vals[i])

stackBytes, err := stackTraces.GetValue(unsafe.Pointer(&userStackID))
if err != nil {
//profile.MissingStacks++
// TODO(derekparker): Should we log or increment missing stack trace count?
continue
}
stackTraces.DeleteKey(unsafe.Pointer(&userStackID))

// Twice the stack depth because we have a user and a potential Kernel stack.
stack := [doubleStackDepth]uint64{}
Expand All @@ -317,6 +330,7 @@ func (p *CgroupProfiler) profileLoop(ctx context.Context, now time.Time, counts,
//profile.MissingStacks++
continue
}
stackTraces.DeleteKey(unsafe.Pointer(&kernelStackID))

err = binary.Read(bytes.NewBuffer(stackBytes), byteOrder, stack[stackDepth:])
if err != nil {
Expand Down Expand Up @@ -407,9 +421,6 @@ func (p *CgroupProfiler) profileLoop(ctx context.Context, now time.Time, counts,
}
samples[stack] = sample
}
if it.Err() != nil {
return fmt.Errorf("failed iterator: %w", it.Err())
}

// Build Profile from samples, locations and mappings.
for _, s := range samples {
Expand Down Expand Up @@ -484,51 +495,18 @@ func (p *CgroupProfiler) profileLoop(ctx context.Context, now time.Time, counts,
level.Error(p.logger).Log("msg", "failed to send profile", "err", err)
}

// BPF iterators need the previous value to iterate to the next, so we
// can only delete the "previous" item once we've already iterated to
// the next.

it = stackTraces.Iterator()
var prev []byte = nil
for it.Next() {
if prev != nil {
err := stackTraces.DeleteKey(unsafe.Pointer(&prev[0]))
if err != nil {
level.Warn(p.logger).Log("msg", "failed to delete stack trace", "err", err)
}
}

key := it.Key()
prev = make([]byte, len(key))
copy(prev, key)
}
if prev != nil {
err := stackTraces.DeleteKey(unsafe.Pointer(&prev[0]))
if err != nil {
level.Warn(p.logger).Log("msg", "failed to delete stack trace", "err", err)
}
}

it = counts.Iterator()
prev = nil
for it.Next() {
if prev != nil {
err := counts.DeleteKey(unsafe.Pointer(&prev[0]))
if err != nil {
level.Warn(p.logger).Log("msg", "failed to delete count", "err", err)
}
}
return nil
}

key := it.Key()
prev = make([]byte, len(key))
copy(prev, key)
// memsetCountKeys will reset the given slice to the given value.
// This function makes use of the highly optimized copy builtin function
// and is able to fill the entire slice in O(log n) time.
func memsetCountKeys(in []stackCountKey, v stackCountKey) {
if len(in) == 0 {
return
}
if prev != nil {
err := counts.DeleteKey(unsafe.Pointer(&prev[0]))
if err != nil {
level.Warn(p.logger).Log("msg", "failed to delete count", "err", err)
}
in[0] = v
for bp := 1; bp < len(in); bp *= 2 {
copy(in[bp:], in[:bp])
}

return nil
}

0 comments on commit 8f549ff

Please sign in to comment.