Skip to content

Commit

Permalink
profiler: Batch API fixes
Browse files Browse the repository at this point in the history
This PR is based off [@kakkoyun's work](parca-dev#326)
to use libbpf(go)'s batch APIs.

**Context**
The main issue we found while working with this API was that they were erroring with `EPERM`.
After some debugging, we realised that libbpf wasn't handle with errors in the way we
expected.

The debugging write-up and more context can be found [here](aquasecurity/libbpfgo#159),
and the fix is in [this PR](aquasecurity/libbpfgo#157).

After landing these changes upstream, we pointed to the updated libbpfgo version, as well as added
some [regression tests](parca-dev#381) to ensure that libbpfgo
behaves as expected, and to make it easier in the future to write further compatibility tests.

Note: the rest of the batch APIs error handling is still unfixed. Tracking in aquasecurity/libbpfgo#162.
  • Loading branch information
kakkoyun authored and javierhonduco committed May 5, 2022
1 parent 227837a commit 85fdfd4
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 52 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ bpf_compile_tools = $(CMD_LLC) $(CMD_CLANG)
.PHONY: $(bpf_compile_tools)
$(bpf_compile_tools): % : check_%

# TODO(kakkoyun): To prevent out of sync libbpf dependency, we nmight want to try directly linking/updating the submodule in the libbpf-go.
# TODO(kakkoyun): To prevent out of sync libbpf dependency, we might want to try directly linking/updating the submodule in the libbpf-go.
# - Determining the location of the go module cache dir and initializing the submodule in there and linking in here, should be doable.
$(LIBBPF_SRC):
test -d $(LIBBPF_SRC) || (echo "missing libbpf source - maybe do 'git submodule init && git submodule update'" ; false)
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/write_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (b *Batcher) batchLoop(ctx context.Context) error {
}

if len(batch) > 0 {
level.Debug(b.logger).Log("msg", "batch write client has sent profiles", "count", len(batch))
level.Debug(b.logger).Log("msg", "batch write client sent profiles", "count", len(batch))
}
return nil
}
Expand Down
130 changes: 80 additions & 50 deletions pkg/profiler/profiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,18 @@ type stackCountKey struct {

type bpfMaps struct {
counts *bpf.BPFMap
traces *bpf.BPFMap
stacks *bpf.BPFMap
}

func (m bpfMaps) clean(stacks []int32, logger log.Logger) {
for _, stackId := range stacks {
err := m.stacks.DeleteKey(unsafe.Pointer(&stackId))
if err != nil {
if !errors.Is(err, syscall.ENOENT) {
level.Debug(logger).Log("msg", "failed to delete stack trace", "errno", err)
}
}
}
}

type metrics struct {
Expand Down Expand Up @@ -246,6 +257,11 @@ func (p *CgroupProfiler) Run(ctx context.Context) error {
ctx, p.cancel = context.WithCancel(ctx)
p.mtx.Unlock()

// @nocommit, enabling this here to get the right error codes.
// once the changes land upstream, we don't have to enable it
// (javierhonduco: explain this better).
// bpf.SetStrictMode(bpf.LibbpfStrictModeDirectErrs)

m, err := bpf.NewModuleFromBufferArgs(bpf.NewModuleArgs{
BPFObjBuff: bpfObj,
BPFObjName: "parca",
Expand Down Expand Up @@ -317,11 +333,11 @@ func (p *CgroupProfiler) Run(ctx context.Context) error {
return fmt.Errorf("get counts map: %w", err)
}

traces, err := m.GetMap("stack_traces")
stacks, err := m.GetMap("stack_traces")
if err != nil {
return fmt.Errorf("get stack traces map: %w", err)
}
p.bpfMaps = &bpfMaps{counts: counts, traces: traces}
p.bpfMaps = &bpfMaps{counts: counts, stacks: stacks}

// Allocate this here, so it's only allocated once instead of every
// time that p.profileLoop is called below. This is because, as of now,
Expand Down Expand Up @@ -376,52 +392,41 @@ func (p *CgroupProfiler) profileLoop(ctx context.Context, captureTime time.Time)

// Variables needed for eBPF map batch iteration.
countKeysPtr = unsafe.Pointer(&p.countKeys[0])
nextCountKey = uintptr(1)
nextCountKey = uintptr(0)
)

// Reset count keys before collecting new traces from the kernel.
memsetCountKeys(p.countKeys, stackCountKey{})

batchSize := 0
it := p.bpfMaps.counts.Iterator()
for it.Next() {
batchSize++
}
if err := it.Err(); err != nil {
return fmt.Errorf("iterate over counts map: %w", err)
}

if batchSize == 0 {
return nil
}
level.Debug(p.logger).Log("msg", "fetching stack trace counts in batch", "batchSize", batchSize)
time.Sleep(1 * time.Second)

var (
values [][]byte
err error
)
values, err = p.bpfMaps.counts.GetValueAndDeleteBatch(countKeysPtr, nil, unsafe.Pointer(&nextCountKey), uint32(batchSize))

batchSize := p.bpfMaps.counts.GetMaxEntries()
level.Debug(p.logger).Log("msg", "fetching stack trace counts in batch", "batchSize", batchSize)

values, err = p.bpfMaps.counts.GetValueAndDeleteBatch(countKeysPtr, nil, unsafe.Pointer(&nextCountKey), batchSize)
processedCount := len(values)

if err != nil {
switch {
case errors.Is(err, syscall.EPERM):
level.Error(p.logger).Log("msg", "get value and delete batch: requested number of items is probably greater than existed", "err", err)
// return fmt.Errorf("get value and delete batch: requested number of items is probably greater than existed: %w", err)
return nil

case errors.Is(err, syscall.ENOENT):
level.Debug(p.logger).Log("msg", "no values in batch")
return nil

default:
return fmt.Errorf("get value and delete batch: %w", err)
}
level.Error(p.logger).Log("msg", "get value and delete batch failed with", "err", err)
return nil
}
if len(values) == 0 {
level.Debug(p.logger).Log("msg", "no values in batch")

if processedCount == 0 {
level.Error(p.logger).Log("msg", "no values in batch")
return nil
}

if nextCountKey != uintptr(0) {
level.Debug(p.logger).Log("msg", "Next batch should be null", "nextBatch", nextCountKey)
}

level.Debug(p.logger).Log("msg", "get value and delete batch", "batchSize", batchSize, "processedCount", processedCount)

stacksKeys := make(map[int32]bool, processedCount)

for i, key := range p.countKeys {
var (
pid = key.PID
Expand All @@ -433,23 +438,32 @@ func (p *CgroupProfiler) profileLoop(ctx context.Context, captureTime time.Time)
continue
}

// Don't go over more stacks than we've fetched.
if i > processedCount {
break
}

// Twice the stack depth because we have a user and a potential Kernel stack.
// Read order matters, since we read from the key buffer.
stack := stack{}
userErr := p.getAndDeleteUserStack(userStackID, &stack)
userErr := p.readUserStack(userStackID, &stack)
if userErr != nil {
if errors.Is(userErr, errUnrecoverable) {
return userErr
}
level.Debug(p.logger).Log("msg", "failed to read user stack", "err", userErr)
}
kernelErr := p.getAndDeleteKernelStack(kernelStackID, &stack)
stacksKeys[userStackID] = true

kernelErr := p.readKernelStack(kernelStackID, &stack)
if kernelErr != nil {
if errors.Is(kernelErr, errUnrecoverable) {
return kernelErr
}
level.Debug(p.logger).Log("msg", "failed to read kernel stack", "err", kernelErr)
}
stacksKeys[kernelStackID] = true

if userErr != nil && kernelErr != nil {
// Both stacks are missing. Nothing to do.
continue
Expand Down Expand Up @@ -529,6 +543,29 @@ func (p *CgroupProfiler) profileLoop(ctx context.Context, captureTime time.Time)
samples[stack] = sample
}

// Delete all stacks.
//
// The capacity will be difficult to estimate without counting as it's
// likely that there will be more than we need due to duplicated stack IDs.
stacksKeysVector := make([]int32, 0, len(stacksKeys)/2)
for key, _ := range stacksKeys {
stacksKeysVector = append(stacksKeysVector, key)
}

// TODO(javierhonduco): Getting -ENOTSUPP, perhaps my kernel doesn't support it?
// Needs more investigation.
//
// processed, err := p.bpfMaps.stacks.DeleteKeyBatch(unsafe.Pointer(&stacksKeysVector[0]), uint32(len(stacksKeysVector)))
// level.Debug(p.logger).Log("msg", "deleted counts map in batch", "deleted", processed)
// if err != nil {
// return fmt.Errorf("failed to delete stacks in batch: %w", err)
//
// }

// Remove the stacktraces one by one. We need to do it at the end as we might
// be deleting entries we need in subsequent iterations.
p.bpfMaps.clean(stacksKeysVector, p.logger)

prof, err := p.buildProfile(ctx, captureTime, samples, locations, kernelLocations, userLocations, mappings, kernelMapping)
if err != nil {
return fmt.Errorf("failed to build profile: %w", err)
Expand Down Expand Up @@ -684,14 +721,14 @@ func (p *CgroupProfiler) resolveKernelFunctions(kernelLocations []*profile.Locat
return kernelFunctions, nil
}

// getAndDeleteUserStack reads the user stack trace from the stacktraces ebpf map into the given buffer and deletes it.
func (p *CgroupProfiler) getAndDeleteUserStack(userStackID int32, stack *stack) error {
// readUserStack reads the user stack trace from the stacktraces ebpf map into the given buffer and deletes it.
func (p *CgroupProfiler) readUserStack(userStackID int32, stack *stack) error {
if userStackID == 0 {
p.metrics.failedStackUnwindingAttempts.WithLabelValues("user").Inc()
return errors.New("user stack ID is 0, probably stack unwinding failed")
}

stackBytes, err := p.bpfMaps.traces.GetValue(unsafe.Pointer(&userStackID))
stackBytes, err := p.bpfMaps.stacks.GetValue(unsafe.Pointer(&userStackID))
if err != nil {
p.metrics.missingStacks.WithLabelValues("user").Inc()
return fmt.Errorf("read user stack trace: %w", err)
Expand All @@ -701,21 +738,17 @@ func (p *CgroupProfiler) getAndDeleteUserStack(userStackID int32, stack *stack)
return fmt.Errorf("read user stack bytes, %s: %w", err, errUnrecoverable)
}

if err := p.bpfMaps.traces.DeleteKey(unsafe.Pointer(&userStackID)); err != nil {
return fmt.Errorf("unable to delete stack trace key: %w", err)
}

return nil
}

// getAndDeleteKernelStack reads the kernel stack trace from the stacktraces ebpf map into the given buffer and deletes it.
func (p *CgroupProfiler) getAndDeleteKernelStack(kernelStackID int32, stack *stack) error {
// readKernelStack reads the kernel stack trace from the stacktraces ebpf map into the given buffer and deletes it.
func (p *CgroupProfiler) readKernelStack(kernelStackID int32, stack *stack) error {
if kernelStackID == 0 {
p.metrics.failedStackUnwindingAttempts.WithLabelValues("kernel").Inc()
return errors.New("kernel stack ID is 0, probably stack unwinding failed")
}

stackBytes, err := p.bpfMaps.traces.GetValue(unsafe.Pointer(&kernelStackID))
stackBytes, err := p.bpfMaps.stacks.GetValue(unsafe.Pointer(&kernelStackID))
if err != nil {
p.metrics.missingStacks.WithLabelValues("kernel").Inc()
return fmt.Errorf("read kernel stack trace: %w", err)
Expand All @@ -725,9 +758,6 @@ func (p *CgroupProfiler) getAndDeleteKernelStack(kernelStackID int32, stack *sta
return fmt.Errorf("read kernel stack bytes, %s: %w", err, errUnrecoverable)
}

if err := p.bpfMaps.traces.DeleteKey(unsafe.Pointer(&kernelStackID)); err != nil {
return fmt.Errorf("unable to delete stack trace key: %w", err)
}
return nil
}

Expand Down

0 comments on commit 85fdfd4

Please sign in to comment.