From 01ebd4c1e2b9f8b3dd4fd2382aa1092c3c9bfc9d Mon Sep 17 00:00:00 2001 From: Ron Federman <73110295+RonFed@users.noreply.github.com> Date: Mon, 14 Aug 2023 18:34:25 +0300 Subject: [PATCH] ringbuf: check for available data on timeout BPF can suppress notifications of user space ringbuf readers by specifying the BPF_RB_NO_WAKEUP flag. This is useful in high-throughput scenarios to emit a wake up for every Nth event only. The problem is that some writes may stay un-flushed in the buffer for a long time. Check whether the ring buffer is really empty when the deadline expires. This way it's possible to express the maximum duration one is willing to wait for a flush. Co-authored-by: Lorenz Bauer Signed-off-by: Ron Federman --- internal/unix/types_linux.go | 2 + internal/unix/types_other.go | 2 + ringbuf/reader.go | 11 ++++- ringbuf/reader_test.go | 88 ++++++++++++++++++++++++++++-------- ringbuf/ring.go | 7 +++ 5 files changed, 89 insertions(+), 21 deletions(-) diff --git a/internal/unix/types_linux.go b/internal/unix/types_linux.go index 7c9705919..51ed7d059 100644 --- a/internal/unix/types_linux.go +++ b/internal/unix/types_linux.go @@ -85,6 +85,8 @@ const ( BPF_FS_MAGIC = linux.BPF_FS_MAGIC TRACEFS_MAGIC = linux.TRACEFS_MAGIC DEBUGFS_MAGIC = linux.DEBUGFS_MAGIC + BPF_RB_NO_WAKEUP = linux.BPF_RB_NO_WAKEUP + BPF_RB_FORCE_WAKEUP = linux.BPF_RB_FORCE_WAKEUP ) type Statfs_t = linux.Statfs_t diff --git a/internal/unix/types_other.go b/internal/unix/types_other.go index 5e86b5052..1760e9e79 100644 --- a/internal/unix/types_other.go +++ b/internal/unix/types_other.go @@ -89,6 +89,8 @@ const ( BPF_FS_MAGIC TRACEFS_MAGIC DEBUGFS_MAGIC + BPF_RB_NO_WAKEUP + BPF_RB_FORCE_WAKEUP ) type Statfs_t struct { diff --git a/ringbuf/reader.go b/ringbuf/reader.go index 12736acb7..d5619fb91 100644 --- a/ringbuf/reader.go +++ b/ringbuf/reader.go @@ -186,7 +186,8 @@ func (r *Reader) SetDeadline(t time.Time) { // Read the next record from the BPF ringbuf. // // Returns os.ErrClosed if Close is called on the Reader, or os.ErrDeadlineExceeded -// if a deadline was set. +// if a deadline was set and no valid entry was present. A producer might use BPF_RB_NO_WAKEUP +// which may cause the deadline to expire but a valid entry will be present. func (r *Reader) Read() (Record, error) { var rec Record return rec, r.ReadInto(&rec) @@ -204,6 +205,11 @@ func (r *Reader) ReadInto(rec *Record) error { for { if !r.haveData { _, err := r.poller.Wait(r.epollEvents[:cap(r.epollEvents)], r.deadline) + if errors.Is(err, os.ErrDeadlineExceeded) && !r.ring.isEmpty() { + // Ignoring this for reading a valid entry after timeout + // This can occur if the producer submitted to the ring buffer with BPF_RB_NO_WAKEUP + err = nil + } if err != nil { return err } @@ -212,6 +218,8 @@ func (r *Reader) ReadInto(rec *Record) error { for { err := readRecord(r.ring, rec, r.header) + // Not using errors.Is which is quite a bit slower + // For a tight loop it might make a difference if err == errBusy || err == errDiscard { continue } @@ -219,7 +227,6 @@ func (r *Reader) ReadInto(rec *Record) error { r.haveData = false break } - return err } } diff --git a/ringbuf/reader_test.go b/ringbuf/reader_test.go index 964bd28fb..94f8180bf 100644 --- a/ringbuf/reader_test.go +++ b/ringbuf/reader_test.go @@ -12,9 +12,15 @@ import ( "github.com/cilium/ebpf/internal" "github.com/cilium/ebpf/internal/testutils" "github.com/cilium/ebpf/internal/testutils/fdtrace" + "github.com/cilium/ebpf/internal/unix" "github.com/google/go-cmp/cmp" ) +type sampleMessage struct { + size int + flags int32 +} + func TestMain(m *testing.M) { fdtrace.TestMain(m) } @@ -24,19 +30,19 @@ func TestRingbufReader(t *testing.T) { readerTests := []struct { name string - messages []int + messages []sampleMessage want map[int][]byte }{ { name: "send one short sample", - messages: []int{5}, + messages: []sampleMessage{{size: 5}}, want: map[int][]byte{ 5: {1, 2, 3, 4, 4}, }, }, { name: "send three short samples, the second is discarded", - messages: []int{5, 10, 15}, + messages: []sampleMessage{{size: 5}, {size: 10}, {size: 15}}, want: map[int][]byte{ 5: {1, 2, 3, 4, 4}, 15: {1, 2, 3, 4, 4, 3, 2, 1, 1, 2, 3, 4, 4, 3, 2}, @@ -45,7 +51,7 @@ func TestRingbufReader(t *testing.T) { } for _, tt := range readerTests { t.Run(tt.name, func(t *testing.T) { - prog, events := mustOutputSamplesProg(t, 0, tt.messages...) + prog, events := mustOutputSamplesProg(t, tt.messages...) rd, err := NewReader(events) if err != nil { @@ -80,7 +86,7 @@ func TestRingbufReader(t *testing.T) { } } -func outputSamplesProg(flags int32, sampleSizes ...int) (*ebpf.Program, *ebpf.Map, error) { +func outputSamplesProg(sampleMessages ...sampleMessage) (*ebpf.Program, *ebpf.Map, error) { events, err := ebpf.NewMap(&ebpf.MapSpec{ Type: ebpf.RingBuf, MaxEntries: 4096, @@ -90,9 +96,9 @@ func outputSamplesProg(flags int32, sampleSizes ...int) (*ebpf.Program, *ebpf.Ma } var maxSampleSize int - for _, sampleSize := range sampleSizes { - if sampleSize > maxSampleSize { - maxSampleSize = sampleSize + for _, sampleMessage := range sampleMessages { + if sampleMessage.size > maxSampleSize { + maxSampleSize = sampleMessage.size } } @@ -108,16 +114,16 @@ func outputSamplesProg(flags int32, sampleSizes ...int) (*ebpf.Program, *ebpf.Ma ) } - for sampleIdx, sampleSize := range sampleSizes { + for sampleIdx, sampleMessage := range sampleMessages { insns = append(insns, asm.LoadMapPtr(asm.R1, events.FD()), - asm.Mov.Imm(asm.R2, int32(sampleSize)), + asm.Mov.Imm(asm.R2, int32(sampleMessage.size)), asm.Mov.Imm(asm.R3, int32(0)), asm.FnRingbufReserve.Call(), asm.JEq.Imm(asm.R0, 0, "exit"), asm.Mov.Reg(asm.R5, asm.R0), ) - for i := 0; i < sampleSize; i++ { + for i := 0; i < sampleMessage.size; i++ { insns = append(insns, asm.LoadMem(asm.R4, asm.RFP, int16(i+1)*-1, asm.Byte), asm.StoreMem(asm.R5, int16(i), asm.R4, asm.Byte), @@ -128,13 +134,13 @@ func outputSamplesProg(flags int32, sampleSizes ...int) (*ebpf.Program, *ebpf.Ma if sampleIdx&1 != 0 { insns = append(insns, asm.Mov.Reg(asm.R1, asm.R5), - asm.Mov.Imm(asm.R2, flags), + asm.Mov.Imm(asm.R2, sampleMessage.flags), asm.FnRingbufDiscard.Call(), ) } else { insns = append(insns, asm.Mov.Reg(asm.R1, asm.R5), - asm.Mov.Imm(asm.R2, flags), + asm.Mov.Imm(asm.R2, sampleMessage.flags), asm.FnRingbufSubmit.Call(), ) } @@ -158,10 +164,10 @@ func outputSamplesProg(flags int32, sampleSizes ...int) (*ebpf.Program, *ebpf.Ma return prog, events, nil } -func mustOutputSamplesProg(tb testing.TB, flags int32, sampleSizes ...int) (*ebpf.Program, *ebpf.Map) { +func mustOutputSamplesProg(tb testing.TB, sampleMessages ...sampleMessage) (*ebpf.Program, *ebpf.Map) { tb.Helper() - prog, events, err := outputSamplesProg(flags, sampleSizes...) + prog, events, err := outputSamplesProg(sampleMessages...) if err != nil { tb.Fatal(err) } @@ -177,7 +183,7 @@ func mustOutputSamplesProg(tb testing.TB, flags int32, sampleSizes ...int) (*ebp func TestReaderBlocking(t *testing.T) { testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer") - prog, events := mustOutputSamplesProg(t, 0, 5) + prog, events := mustOutputSamplesProg(t, sampleMessage{size: 5, flags: 0}) ret, _, err := prog.Test(internal.EmptyBPFContext) testutils.SkipIfNotSupported(t, err) if err != nil { @@ -234,10 +240,54 @@ func TestReaderBlocking(t *testing.T) { } } +func TestReaderNoWakeup(t *testing.T) { + testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer") + + prog, events := mustOutputSamplesProg(t, + sampleMessage{size: 5, flags: unix.BPF_RB_NO_WAKEUP}, // Read after timeout + sampleMessage{size: 6, flags: unix.BPF_RB_NO_WAKEUP}, // Discard + sampleMessage{size: 7, flags: unix.BPF_RB_NO_WAKEUP}) // Read won't block + + rd, err := NewReader(events) + if err != nil { + t.Fatal(err) + } + defer rd.Close() + + ret, _, err := prog.Test(internal.EmptyBPFContext) + testutils.SkipIfNotSupported(t, err) + if err != nil { + t.Fatal(err) + } + + if errno := syscall.Errno(-int32(ret)); errno != 0 { + t.Fatal("Expected 0 as return value, got", errno) + } + + rd.SetDeadline(time.Now()) + record, err := rd.Read() + + if err != nil { + t.Error("Expected no error from first Read, got:", err) + } + if len(record.RawSample) != 5 { + t.Errorf("Expected to read 5 bytes bot got %d", len(record.RawSample)) + } + + record, err = rd.Read() + + if err != nil { + t.Error("Expected no error from second Read, got:", err) + } + if len(record.RawSample) != 7 { + t.Errorf("Expected to read 7 bytes bot got %d", len(record.RawSample)) + } +} + func TestReaderSetDeadline(t *testing.T) { testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer") - _, events := mustOutputSamplesProg(t, 0, 5) + _, events := mustOutputSamplesProg(t, sampleMessage{size: 5, flags: 0}) rd, err := NewReader(events) if err != nil { t.Fatal(err) @@ -267,7 +317,7 @@ func BenchmarkReader(b *testing.B) { for _, bm := range readerBenchmarks { b.Run(bm.name, func(b *testing.B) { - prog, events := mustOutputSamplesProg(b, bm.flags, 80) + prog, events := mustOutputSamplesProg(b, sampleMessage{size: 80, flags: bm.flags}) rd, err := NewReader(events) if err != nil { @@ -299,7 +349,7 @@ func BenchmarkReader(b *testing.B) { func BenchmarkReadInto(b *testing.B) { testutils.SkipOnOldKernel(b, "5.8", "BPF ring buffer") - prog, events := mustOutputSamplesProg(b, 0, 80) + prog, events := mustOutputSamplesProg(b, sampleMessage{size: 80, flags: 0}) rd, err := NewReader(events) if err != nil { diff --git a/ringbuf/ring.go b/ringbuf/ring.go index b3eeb6699..a3ecc005c 100644 --- a/ringbuf/ring.go +++ b/ringbuf/ring.go @@ -91,6 +91,13 @@ func (rr *ringReader) skipRead(skipBytes uint64) { rr.cons += clamp(rr.cons, atomic.LoadUint64(rr.prod_pos), skipBytes) } +func (rr *ringReader) isEmpty() bool { + cons := atomic.LoadUint64(rr.cons_pos) + prod := atomic.LoadUint64(rr.prod_pos) + + return prod == cons +} + func (rr *ringReader) Read(p []byte) (int, error) { prod := atomic.LoadUint64(rr.prod_pos)