diff --git a/perf/reader.go b/perf/reader.go index 47d1b3218..1aec79d50 100644 --- a/perf/reader.go +++ b/perf/reader.go @@ -48,6 +48,10 @@ type Record struct { // The number of samples which could not be output, since // the ring buffer was full. LostSamples uint64 + + // The minimum number of bytes remaining in the per-CPU buffer after this Record has been read. + // Negative for overwritable buffers. + Remaining int } // Read a record from a reader and tag it as being from the given CPU. @@ -158,6 +162,8 @@ type Reader struct { paused bool overwritable bool + + bufferSize int } // ReaderOptions control the behaviour of the user @@ -216,6 +222,7 @@ func NewReaderWithOptions(array *ebpf.Map, perCPUBuffer int, opts ReaderOptions) // bpf_perf_event_output checks which CPU an event is enabled on, // but doesn't allow using a wildcard like -1 to specify "all CPUs". // Hence we have to create a ring for each CPU. + bufferSize := 0 for i := 0; i < nCPU; i++ { ring, err := newPerfEventRing(i, perCPUBuffer, opts.Watermark, opts.Overwritable) if errors.Is(err, unix.ENODEV) { @@ -224,6 +231,7 @@ func NewReaderWithOptions(array *ebpf.Map, perCPUBuffer int, opts ReaderOptions) pauseFds = append(pauseFds, -1) continue } + bufferSize = ring.size() if err != nil { return nil, fmt.Errorf("failed to create perf ring for CPU %d: %v", i, err) @@ -251,6 +259,7 @@ func NewReaderWithOptions(array *ebpf.Map, perCPUBuffer int, opts ReaderOptions) eventHeader: make([]byte, perfEventHeaderSize), pauseFds: pauseFds, overwritable: opts.Overwritable, + bufferSize: bufferSize, } if err = pr.Resume(); err != nil { return nil, err @@ -430,6 +439,11 @@ func (pr *Reader) Resume() error { return nil } +// BufferSize is the size in bytes of each per-CPU buffer +func (pr *Reader) BufferSize() int { + return pr.bufferSize +} + // NB: Has to be preceded by a call to ring.loadHead. func (pr *Reader) readRecordFromRing(rec *Record, ring *perfEventRing) error { defer ring.writeTail() @@ -439,6 +453,7 @@ func (pr *Reader) readRecordFromRing(rec *Record, ring *perfEventRing) error { if pr.overwritable && (errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF)) { return errEOR } + rec.Remaining = ring.remaining() return err } diff --git a/perf/reader_test.go b/perf/reader_test.go index e3c0e6457..d693c4965 100644 --- a/perf/reader_test.go +++ b/perf/reader_test.go @@ -38,9 +38,15 @@ func TestPerfReader(t *testing.T) { } defer rd.Close() - outputSamples(t, events, 5) + qt.Assert(t, rd.BufferSize(), qt.Equals, 4096) - checkRecord(t, rd) + outputSamples(t, events, 5, 5) + + _, rem := checkRecord(t, rd) + qt.Assert(t, rem >= 5, qt.IsTrue, qt.Commentf("expected at least 5 Remaining")) + + _, rem = checkRecord(t, rd) + qt.Assert(t, rem, qt.Equals, 0, qt.Commentf("expected zero Remaining")) rd.SetDeadline(time.Now().Add(4 * time.Millisecond)) _, err = rd.Read() @@ -155,7 +161,7 @@ func outputSamplesProg(tb testing.TB, events *ebpf.Map, sampleSizes ...byte) *eb return prog } -func checkRecord(tb testing.TB, rd *Reader) (id int) { +func checkRecord(tb testing.TB, rd *Reader) (id int, remaining int) { tb.Helper() rec, err := rd.Read() @@ -172,7 +178,7 @@ func checkRecord(tb testing.TB, rd *Reader) (id int) { // padding is ignored since it's value is undefined. - return int(rec.RawSample[1]) + return int(rec.RawSample[1]), rec.Remaining } func TestPerfReaderLostSample(t *testing.T) { @@ -305,8 +311,9 @@ func TestPerfReaderOverwritable(t *testing.T) { nextID := maxEvents for i := 0; i < maxEvents; i++ { - id := checkRecord(t, rd) + id, rem := checkRecord(t, rd) qt.Assert(t, id, qt.Equals, nextID) + qt.Assert(t, rem, qt.Equals, -1) nextID-- } } diff --git a/perf/ring.go b/perf/ring.go index 6daa4a6e9..ddf3519f2 100644 --- a/perf/ring.go +++ b/perf/ring.go @@ -127,6 +127,7 @@ func createPerfEvent(cpu, watermark int, overwritable bool) (int, error) { type ringReader interface { loadHead() size() int + remaining() int writeTail() Read(p []byte) (int, error) } @@ -157,6 +158,10 @@ func (rr *forwardReader) size() int { return len(rr.ring) } +func (rr *forwardReader) remaining() int { + return int((rr.head - rr.tail) & rr.mask) +} + func (rr *forwardReader) writeTail() { // Commit the new tail. This lets the kernel know that // the ring buffer has been consumed. @@ -244,6 +249,12 @@ func (rr *reverseReader) size() int { return len(rr.ring) } +func (rr *reverseReader) remaining() int { + // remaining data is inaccurate for overwritable buffers + // once an overwrite happens, so return -1 here. + return -1 +} + func (rr *reverseReader) writeTail() { // We do not care about tail for over writable perf buffer. // So, this function is noop. diff --git a/ringbuf/reader.go b/ringbuf/reader.go index d5619fb91..ea11c823f 100644 --- a/ringbuf/reader.go +++ b/ringbuf/reader.go @@ -44,6 +44,9 @@ func (rh *ringbufHeader) dataLen() int { type Record struct { RawSample []byte + + // The minimum number of bytes remaining in the ring buffer after this Record has been read. + Remaining int } // Read a record from an event ring. @@ -98,6 +101,7 @@ func readRecord(rd *ringbufEventRing, rec *Record, buf []byte) error { rd.storeConsumer() rec.RawSample = rec.RawSample[:header.dataLen()] + rec.Remaining = rd.remaining() return nil } @@ -231,3 +235,8 @@ func (r *Reader) ReadInto(rec *Record) error { } } } + +// BufferSize returns the size in bytes of the ring buffer +func (r *Reader) BufferSize() int { + return r.ring.size() +} diff --git a/ringbuf/reader_test.go b/ringbuf/reader_test.go index 94f8180bf..bed7d0096 100644 --- a/ringbuf/reader_test.go +++ b/ringbuf/reader_test.go @@ -59,6 +59,10 @@ func TestRingbufReader(t *testing.T) { } defer rd.Close() + if uint32(rd.BufferSize()) != 2*events.MaxEntries() { + t.Errorf("expected %d BufferSize, got %d", events.MaxEntries(), rd.BufferSize()) + } + ret, _, err := prog.Test(internal.EmptyBPFContext) testutils.SkipIfNotSupported(t, err) if err != nil { @@ -77,6 +81,15 @@ func TestRingbufReader(t *testing.T) { t.Fatal("Can't read samples:", err) } raw[len(record.RawSample)] = record.RawSample + if len(raw) == len(tt.want) { + if record.Remaining != 0 { + t.Errorf("expected 0 Remaining, got %d", record.Remaining) + } + } else { + if record.Remaining == 0 { + t.Error("expected non-zero Remaining, got 0") + } + } } if diff := cmp.Diff(tt.want, raw); diff != "" { diff --git a/ringbuf/ring.go b/ringbuf/ring.go index a3ecc005c..6dd04a93e 100644 --- a/ringbuf/ring.go +++ b/ringbuf/ring.go @@ -98,6 +98,17 @@ func (rr *ringReader) isEmpty() bool { return prod == cons } +func (rr *ringReader) size() int { + return cap(rr.ring) +} + +func (rr *ringReader) remaining() int { + cons := atomic.LoadUint64(rr.cons_pos) + prod := atomic.LoadUint64(rr.prod_pos) + + return int((prod - cons) & rr.mask) +} + func (rr *ringReader) Read(p []byte) (int, error) { prod := atomic.LoadUint64(rr.prod_pos)