diff --git a/go.mod b/go.mod index 36dd936cba..6320cbf213 100644 --- a/go.mod +++ b/go.mod @@ -30,4 +30,8 @@ require ( k8s.io/cri-api v0.23.1 ) -replace github.com/prometheus/prometheus => github.com/prometheus/prometheus v1.8.2-0.20201130085533-a6e18916ab40 +replace ( + // github.com/aquasecurity/libbpfgo => github.com/aquasecurity/libbpfgo v0.2.2-libbpf-0.5.0.0.20211215154851-b168808861fe + github.com/aquasecurity/libbpfgo => /home/deparker/Code/libbpfgo + github.com/prometheus/prometheus => github.com/prometheus/prometheus v1.8.2-0.20201130085533-a6e18916ab40 +) diff --git a/go.sum b/go.sum index 1d5e2bcce6..61f073a59b 100644 --- a/go.sum +++ b/go.sum @@ -174,8 +174,6 @@ github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kd github.com/apache/arrow/go/arrow v0.0.0-20191024131854-af6fa24be0db/go.mod h1:VTxUBvSJ3s3eHAg65PNgrsn5BtqCRPdmyXh6rAfdxN0= github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= -github.com/aquasecurity/libbpfgo v0.2.2-libbpf-0.5.0 h1:Qecy9Qvj4TG0LK7sfuJWzd1QlwMozHo7H0AyZMGjLg8= -github.com/aquasecurity/libbpfgo v0.2.2-libbpf-0.5.0/go.mod h1:/+clceXE103FaXvVTIY2HAkQjxNtkra4DRWvZYr2SKw= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= diff --git a/pkg/profiler/profiler.go b/pkg/profiler/profiler.go index 9eb7fb1df6..74fb9da037 100644 --- a/pkg/profiler/profiler.go +++ b/pkg/profiler/profiler.go @@ -18,12 +18,13 @@ import ( "context" _ "embed" "encoding/binary" + "errors" "fmt" - "io" "os" "runtime" "strings" "sync" + "syscall" "time" "unsafe" @@ -51,8 +52,19 @@ var bpfObj []byte const ( stackDepth = 127 // Always needs to be sync with MAX_STACK_DEPTH in parca-agent.bpf.c doubleStackDepth = 254 + batchSize = 1024 ) +// stackCountKey mirrors the struct in parca-agent.bpf.c. +// +// TODO(derekparker) Perhaps in order to keep these in sync we should write a Go generator to +// create the C struct from the Go struct. +type stackCountKey struct { + pid uint32 + userStackID int32 + kernelStackID int32 +} + type CgroupProfiler struct { logger log.Logger ksymCache *ksym.KsymCache @@ -230,185 +242,182 @@ func (p *CgroupProfiler) Run(ctx context.Context) error { } func (p *CgroupProfiler) profileLoop(ctx context.Context, now time.Time, counts, stackTraces *bpf.BPFMap) error { - prof := &profile.Profile{ - SampleType: []*profile.ValueType{{ - Type: "samples", - Unit: "count", - }}, - TimeNanos: now.UnixNano(), - DurationNanos: int64(p.profilingDuration), - - // We sample at 100Hz, which is every 10 Million nanoseconds. - PeriodType: &profile.ValueType{ - Type: "cpu", - Unit: "nanoseconds", - }, - Period: 10000000, - } - - mapping := maps.NewMapping(p.pidMappingFileCache) - kernelMapping := &profile.Mapping{ - File: "[kernel.kallsyms]", - } - kernelFunctions := map[uint64]*profile.Function{} - userFunctions := map[[2]uint64]*profile.Function{} - - // 2 uint64 1 for PID and 1 for Addr - locations := []*profile.Location{} - kernelLocations := []*profile.Location{} - kernelAddresses := map[uint64]struct{}{} - locationIndices := map[[2]uint64]int{} - samples := map[[doubleStackDepth]uint64]*profile.Sample{} - - // TODO(brancz): What was this for? - //has_collision := false - - it := counts.Iterator() - byteOrder := byteorder.GetHostByteOrder() - - // TODO(brancz): Use libbpf batch functions. - for it.Next() { - // This byte slice is only valid for this iteration, so it must be - // copied if we want to do anything with it outside of this loop. - keyBytes := it.Key() - - r := bytes.NewBuffer(keyBytes) - - pidBytes := make([]byte, 4) - if _, err := io.ReadFull(r, pidBytes); err != nil { - return fmt.Errorf("read pid bytes: %w", err) + var ( + prof = &profile.Profile{ + SampleType: []*profile.ValueType{{ + Type: "samples", + Unit: "count", + }}, + TimeNanos: now.UnixNano(), + DurationNanos: int64(p.profilingDuration), + + // We sample at 100Hz, which is every 10 Million nanoseconds. + PeriodType: &profile.ValueType{ + Type: "cpu", + Unit: "nanoseconds", + }, + Period: 10000000, } - pid := byteOrder.Uint32(pidBytes) - userStackIDBytes := make([]byte, 4) - if _, err := io.ReadFull(r, userStackIDBytes); err != nil { - return fmt.Errorf("read user stack ID bytes: %w", err) + mapping = maps.NewMapping(p.pidMappingFileCache) + kernelMapping = &profile.Mapping{ + File: "[kernel.kallsyms]", } - userStackID := int32(byteOrder.Uint32(userStackIDBytes)) - - kernelStackIDBytes := make([]byte, 4) - if _, err := io.ReadFull(r, kernelStackIDBytes); err != nil { - return fmt.Errorf("read kernel stack ID bytes: %w", err) + kernelFunctions = map[uint64]*profile.Function{} + userFunctions = map[[2]uint64]*profile.Function{} + + // 2 uint64 1 for PID and 1 for Addr + locations = []*profile.Location{} + kernelLocations = []*profile.Location{} + kernelAddresses = map[uint64]struct{}{} + locationIndices = map[[2]uint64]int{} + samples = map[[doubleStackDepth]uint64]*profile.Sample{} + byteOrder = byteorder.GetHostByteOrder() + + // Variables needed for eBPF map batch iteration. + k = stackCountKey{} + keys = make([]stackCountKey, batchSize) + keysPtr = unsafe.Pointer(&keys[0]) + startKey = unsafe.Pointer(nil) + nextKey = unsafe.Pointer(&k) + ) + + for nextKey != nil { + if startKey != nil { + memsetCountKeys(keys, stackCountKey{}) } - kernelStackID := int32(byteOrder.Uint32(kernelStackIDBytes)) - - valueBytes, err := counts.GetValue(unsafe.Pointer(&keyBytes[0])) + vals, err := counts.GetValueAndDeleteBatch(keysPtr, startKey, nextKey, uint32(batchSize)) if err != nil { - return fmt.Errorf("get count value: %w", err) + if errors.Is(err, syscall.ENOENT) { + // Map is empty, nothing to do. + break + } + return err } - value := byteOrder.Uint64(valueBytes) + startKey = nextKey - stackBytes, err := stackTraces.GetValue(unsafe.Pointer(&userStackID)) - if err != nil { - //profile.MissingStacks++ - continue - } + for i, key := range keys { + var ( + pid = key.pid + userStackID = key.userStackID + kernelStackID = key.kernelStackID + ) - // Twice the stack depth because we have a user and a potential Kernel stack. - stack := [doubleStackDepth]uint64{} - err = binary.Read(bytes.NewBuffer(stackBytes), byteOrder, stack[:stackDepth]) - if err != nil { - return fmt.Errorf("read user stack trace: %w", err) - } + if pid == 0 { + break + } + + value := byteOrder.Uint64(vals[i]) - if kernelStackID >= 0 { - stackBytes, err = stackTraces.GetValue(unsafe.Pointer(&kernelStackID)) + stackBytes, err := stackTraces.GetValue(unsafe.Pointer(&userStackID)) if err != nil { - //profile.MissingStacks++ + // TODO(derekparker): Should we log or increment missing stack trace count? continue } - err = binary.Read(bytes.NewBuffer(stackBytes), byteOrder, stack[stackDepth:]) + // Twice the stack depth because we have a user and a potential Kernel stack. + stack := [doubleStackDepth]uint64{} + err = binary.Read(bytes.NewBuffer(stackBytes), byteOrder, stack[:stackDepth]) if err != nil { - return fmt.Errorf("read kernel stack trace: %w", err) + return fmt.Errorf("read user stack trace: %w", err) } - } - sample, ok := samples[stack] - if ok { - // We already have a sample with this stack trace, so just add - // it to the previous one. - sample.Value[0] += int64(value) - continue - } + if kernelStackID >= 0 { + stackBytes, err = stackTraces.GetValue(unsafe.Pointer(&kernelStackID)) + if err != nil { + //profile.MissingStacks++ + continue + } - sampleLocations := []*profile.Location{} - - // Kernel stack - for _, addr := range stack[stackDepth:] { - if addr != uint64(0) { - key := [2]uint64{0, addr} - // PID 0 not possible so we'll use it to identify the kernel. - locationIndex, ok := locationIndices[key] - if !ok { - locationIndex = len(locations) - l := &profile.Location{ - ID: uint64(locationIndex + 1), - Address: addr, - Mapping: kernelMapping, - } - locations = append(locations, l) - kernelLocations = append(kernelLocations, l) - kernelAddresses[addr] = struct{}{} - locationIndices[key] = locationIndex + err = binary.Read(bytes.NewBuffer(stackBytes), byteOrder, stack[stackDepth:]) + if err != nil { + return fmt.Errorf("read kernel stack trace: %w", err) } - sampleLocations = append(sampleLocations, locations[locationIndex]) } - } - // User stack - perfMap, err := p.perfCache.CacheForPid(pid) - if err != nil { - // We expect only a minority of processes to have a JIT and produce - // the perf map. - level.Debug(p.logger).Log("msg", "no perfmap", "err", err) - } - for _, addr := range stack[:stackDepth] { - if addr != uint64(0) { - key := [2]uint64{uint64(pid), addr} - locationIndex, ok := locationIndices[key] - if !ok { - locationIndex = len(locations) - m, err := mapping.PidAddrMapping(pid, addr) - if err != nil { - level.Debug(p.logger).Log("msg", "failed to get mapping", "err", err) - } - l := &profile.Location{ - ID: uint64(locationIndex + 1), - Address: addr, - Mapping: m, + sample, ok := samples[stack] + if ok { + // We already have a sample with this stack trace, so just add + // it to the previous one. + sample.Value[0] += int64(value) + continue + } + + sampleLocations := []*profile.Location{} + + // Kernel stack + for _, addr := range stack[stackDepth:] { + if addr != uint64(0) { + key := [2]uint64{0, addr} + // PID 0 not possible so we'll use it to identify the kernel. + locationIndex, ok := locationIndices[key] + if !ok { + locationIndex = len(locations) + l := &profile.Location{ + ID: uint64(locationIndex + 1), + Address: addr, + Mapping: kernelMapping, + } + locations = append(locations, l) + kernelLocations = append(kernelLocations, l) + kernelAddresses[addr] = struct{}{} + locationIndices[key] = locationIndex } + sampleLocations = append(sampleLocations, locations[locationIndex]) + } + } - // Does this addr point to JITed code? - if perfMap != nil { - // TODO(zecke): Log errors other than perf.NoSymbolFound - jitFunction, ok := userFunctions[key] - if !ok { - if sym, err := perfMap.Lookup(addr); err == nil { - jitFunction = &profile.Function{Name: sym} - userFunctions[key] = jitFunction - } + // User stack + perfMap, err := p.perfCache.CacheForPid(pid) + if err != nil { + // We expect only a minority of processes to have a JIT and produce + // the perf map. + level.Debug(p.logger).Log("msg", "no perfmap", "err", err) + } + for _, addr := range stack[:stackDepth] { + if addr != uint64(0) { + key := [2]uint64{uint64(pid), addr} + locationIndex, ok := locationIndices[key] + if !ok { + locationIndex = len(locations) + m, err := mapping.PidAddrMapping(pid, addr) + if err != nil { + level.Debug(p.logger).Log("msg", "failed to get mapping", "err", err) } - if jitFunction != nil { - l.Line = []profile.Line{{Function: jitFunction}} + l := &profile.Location{ + ID: uint64(locationIndex + 1), + Address: addr, + Mapping: m, } - } - locations = append(locations, l) - locationIndices[key] = locationIndex + // Does this addr point to JITed code? + if perfMap != nil { + // TODO(zecke): Log errors other than perf.NoSymbolFound + jitFunction, ok := userFunctions[key] + if !ok { + if sym, err := perfMap.Lookup(addr); err == nil { + jitFunction = &profile.Function{Name: sym} + userFunctions[key] = jitFunction + } + } + if jitFunction != nil { + l.Line = []profile.Line{{Function: jitFunction}} + } + } + + locations = append(locations, l) + locationIndices[key] = locationIndex + } + sampleLocations = append(sampleLocations, locations[locationIndex]) } - sampleLocations = append(sampleLocations, locations[locationIndex]) } - } - sample = &profile.Sample{ - Value: []int64{int64(value)}, - Location: sampleLocations, + sample = &profile.Sample{ + Value: []int64{int64(value)}, + Location: sampleLocations, + } + samples[stack] = sample } - samples[stack] = sample - } - if it.Err() != nil { - return fmt.Errorf("failed iterator: %w", it.Err()) } // Build Profile from samples, locations and mappings. @@ -488,7 +497,7 @@ func (p *CgroupProfiler) profileLoop(ctx context.Context, now time.Time, counts, // can only delete the "previous" item once we've already iterated to // the next. - it = stackTraces.Iterator() + it := stackTraces.Iterator() var prev []byte = nil for it.Next() { if prev != nil { @@ -532,3 +541,16 @@ func (p *CgroupProfiler) profileLoop(ctx context.Context, now time.Time, counts, return nil } + +// memsetCountKeys will reset the given slice to the given value. +// This function makes use of the highly optimized copy builtin function +// and is able to fill the entire slice in O(log n) time. +func memsetCountKeys(in []stackCountKey, v stackCountKey) { + if len(in) == 0 { + return + } + in[0] = v + for bp := 1; bp < len(in); bp *= 2 { + copy(in[bp:], in[:bp]) + } +}