Skip to content

Commit

Permalink
ringbuf: check for available data on timeout
Browse files Browse the repository at this point in the history
BPF can suppress notifications of user space ringbuf readers by specifying the
BPF_RB_NO_WAKEUP flag. This is useful in high-throughput scenarios to emit a
wake up for every Nth event only. The problem is that some writes may stay
un-flushed in the buffer for a long time.

Check whether the ring buffer is really empty when the deadline expires. This
way it's possible to express the maximum duration  one is willing to wait for 
a flush.

Co-authored-by: Lorenz Bauer <[email protected]>
Signed-off-by: Ron Federman <[email protected]>
  • Loading branch information
RonFed and lmb authored Aug 14, 2023
1 parent dfd74c6 commit 01ebd4c
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 21 deletions.
2 changes: 2 additions & 0 deletions internal/unix/types_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ const (
BPF_FS_MAGIC = linux.BPF_FS_MAGIC
TRACEFS_MAGIC = linux.TRACEFS_MAGIC
DEBUGFS_MAGIC = linux.DEBUGFS_MAGIC
BPF_RB_NO_WAKEUP = linux.BPF_RB_NO_WAKEUP
BPF_RB_FORCE_WAKEUP = linux.BPF_RB_FORCE_WAKEUP
)

type Statfs_t = linux.Statfs_t
Expand Down
2 changes: 2 additions & 0 deletions internal/unix/types_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ const (
BPF_FS_MAGIC
TRACEFS_MAGIC
DEBUGFS_MAGIC
BPF_RB_NO_WAKEUP
BPF_RB_FORCE_WAKEUP
)

type Statfs_t struct {
Expand Down
11 changes: 9 additions & 2 deletions ringbuf/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,8 @@ func (r *Reader) SetDeadline(t time.Time) {
// Read the next record from the BPF ringbuf.
//
// Returns os.ErrClosed if Close is called on the Reader, or os.ErrDeadlineExceeded
// if a deadline was set.
// if a deadline was set and no valid entry was present. A producer might use BPF_RB_NO_WAKEUP
// which may cause the deadline to expire but a valid entry will be present.
func (r *Reader) Read() (Record, error) {
var rec Record
return rec, r.ReadInto(&rec)
Expand All @@ -204,6 +205,11 @@ func (r *Reader) ReadInto(rec *Record) error {
for {
if !r.haveData {
_, err := r.poller.Wait(r.epollEvents[:cap(r.epollEvents)], r.deadline)
if errors.Is(err, os.ErrDeadlineExceeded) && !r.ring.isEmpty() {
// Ignoring this for reading a valid entry after timeout
// This can occur if the producer submitted to the ring buffer with BPF_RB_NO_WAKEUP
err = nil
}
if err != nil {
return err
}
Expand All @@ -212,14 +218,15 @@ func (r *Reader) ReadInto(rec *Record) error {

for {
err := readRecord(r.ring, rec, r.header)
// Not using errors.Is which is quite a bit slower
// For a tight loop it might make a difference
if err == errBusy || err == errDiscard {
continue
}
if err == errEOR {
r.haveData = false
break
}

return err
}
}
Expand Down
88 changes: 69 additions & 19 deletions ringbuf/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,15 @@ import (
"github.com/cilium/ebpf/internal"
"github.com/cilium/ebpf/internal/testutils"
"github.com/cilium/ebpf/internal/testutils/fdtrace"
"github.com/cilium/ebpf/internal/unix"
"github.com/google/go-cmp/cmp"
)

type sampleMessage struct {
size int
flags int32
}

func TestMain(m *testing.M) {
fdtrace.TestMain(m)
}
Expand All @@ -24,19 +30,19 @@ func TestRingbufReader(t *testing.T) {

readerTests := []struct {
name string
messages []int
messages []sampleMessage
want map[int][]byte
}{
{
name: "send one short sample",
messages: []int{5},
messages: []sampleMessage{{size: 5}},
want: map[int][]byte{
5: {1, 2, 3, 4, 4},
},
},
{
name: "send three short samples, the second is discarded",
messages: []int{5, 10, 15},
messages: []sampleMessage{{size: 5}, {size: 10}, {size: 15}},
want: map[int][]byte{
5: {1, 2, 3, 4, 4},
15: {1, 2, 3, 4, 4, 3, 2, 1, 1, 2, 3, 4, 4, 3, 2},
Expand All @@ -45,7 +51,7 @@ func TestRingbufReader(t *testing.T) {
}
for _, tt := range readerTests {
t.Run(tt.name, func(t *testing.T) {
prog, events := mustOutputSamplesProg(t, 0, tt.messages...)
prog, events := mustOutputSamplesProg(t, tt.messages...)

rd, err := NewReader(events)
if err != nil {
Expand Down Expand Up @@ -80,7 +86,7 @@ func TestRingbufReader(t *testing.T) {
}
}

func outputSamplesProg(flags int32, sampleSizes ...int) (*ebpf.Program, *ebpf.Map, error) {
func outputSamplesProg(sampleMessages ...sampleMessage) (*ebpf.Program, *ebpf.Map, error) {
events, err := ebpf.NewMap(&ebpf.MapSpec{
Type: ebpf.RingBuf,
MaxEntries: 4096,
Expand All @@ -90,9 +96,9 @@ func outputSamplesProg(flags int32, sampleSizes ...int) (*ebpf.Program, *ebpf.Ma
}

var maxSampleSize int
for _, sampleSize := range sampleSizes {
if sampleSize > maxSampleSize {
maxSampleSize = sampleSize
for _, sampleMessage := range sampleMessages {
if sampleMessage.size > maxSampleSize {
maxSampleSize = sampleMessage.size
}
}

Expand All @@ -108,16 +114,16 @@ func outputSamplesProg(flags int32, sampleSizes ...int) (*ebpf.Program, *ebpf.Ma
)
}

for sampleIdx, sampleSize := range sampleSizes {
for sampleIdx, sampleMessage := range sampleMessages {
insns = append(insns,
asm.LoadMapPtr(asm.R1, events.FD()),
asm.Mov.Imm(asm.R2, int32(sampleSize)),
asm.Mov.Imm(asm.R2, int32(sampleMessage.size)),
asm.Mov.Imm(asm.R3, int32(0)),
asm.FnRingbufReserve.Call(),
asm.JEq.Imm(asm.R0, 0, "exit"),
asm.Mov.Reg(asm.R5, asm.R0),
)
for i := 0; i < sampleSize; i++ {
for i := 0; i < sampleMessage.size; i++ {
insns = append(insns,
asm.LoadMem(asm.R4, asm.RFP, int16(i+1)*-1, asm.Byte),
asm.StoreMem(asm.R5, int16(i), asm.R4, asm.Byte),
Expand All @@ -128,13 +134,13 @@ func outputSamplesProg(flags int32, sampleSizes ...int) (*ebpf.Program, *ebpf.Ma
if sampleIdx&1 != 0 {
insns = append(insns,
asm.Mov.Reg(asm.R1, asm.R5),
asm.Mov.Imm(asm.R2, flags),
asm.Mov.Imm(asm.R2, sampleMessage.flags),
asm.FnRingbufDiscard.Call(),
)
} else {
insns = append(insns,
asm.Mov.Reg(asm.R1, asm.R5),
asm.Mov.Imm(asm.R2, flags),
asm.Mov.Imm(asm.R2, sampleMessage.flags),
asm.FnRingbufSubmit.Call(),
)
}
Expand All @@ -158,10 +164,10 @@ func outputSamplesProg(flags int32, sampleSizes ...int) (*ebpf.Program, *ebpf.Ma
return prog, events, nil
}

func mustOutputSamplesProg(tb testing.TB, flags int32, sampleSizes ...int) (*ebpf.Program, *ebpf.Map) {
func mustOutputSamplesProg(tb testing.TB, sampleMessages ...sampleMessage) (*ebpf.Program, *ebpf.Map) {
tb.Helper()

prog, events, err := outputSamplesProg(flags, sampleSizes...)
prog, events, err := outputSamplesProg(sampleMessages...)
if err != nil {
tb.Fatal(err)
}
Expand All @@ -177,7 +183,7 @@ func mustOutputSamplesProg(tb testing.TB, flags int32, sampleSizes ...int) (*ebp
func TestReaderBlocking(t *testing.T) {
testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer")

prog, events := mustOutputSamplesProg(t, 0, 5)
prog, events := mustOutputSamplesProg(t, sampleMessage{size: 5, flags: 0})
ret, _, err := prog.Test(internal.EmptyBPFContext)
testutils.SkipIfNotSupported(t, err)
if err != nil {
Expand Down Expand Up @@ -234,10 +240,54 @@ func TestReaderBlocking(t *testing.T) {
}
}

func TestReaderNoWakeup(t *testing.T) {
testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer")

prog, events := mustOutputSamplesProg(t,
sampleMessage{size: 5, flags: unix.BPF_RB_NO_WAKEUP}, // Read after timeout
sampleMessage{size: 6, flags: unix.BPF_RB_NO_WAKEUP}, // Discard
sampleMessage{size: 7, flags: unix.BPF_RB_NO_WAKEUP}) // Read won't block

rd, err := NewReader(events)
if err != nil {
t.Fatal(err)
}
defer rd.Close()

ret, _, err := prog.Test(internal.EmptyBPFContext)
testutils.SkipIfNotSupported(t, err)
if err != nil {
t.Fatal(err)
}

if errno := syscall.Errno(-int32(ret)); errno != 0 {
t.Fatal("Expected 0 as return value, got", errno)
}

rd.SetDeadline(time.Now())
record, err := rd.Read()

if err != nil {
t.Error("Expected no error from first Read, got:", err)
}
if len(record.RawSample) != 5 {
t.Errorf("Expected to read 5 bytes bot got %d", len(record.RawSample))
}

record, err = rd.Read()

if err != nil {
t.Error("Expected no error from second Read, got:", err)
}
if len(record.RawSample) != 7 {
t.Errorf("Expected to read 7 bytes bot got %d", len(record.RawSample))
}
}

func TestReaderSetDeadline(t *testing.T) {
testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer")

_, events := mustOutputSamplesProg(t, 0, 5)
_, events := mustOutputSamplesProg(t, sampleMessage{size: 5, flags: 0})
rd, err := NewReader(events)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -267,7 +317,7 @@ func BenchmarkReader(b *testing.B) {

for _, bm := range readerBenchmarks {
b.Run(bm.name, func(b *testing.B) {
prog, events := mustOutputSamplesProg(b, bm.flags, 80)
prog, events := mustOutputSamplesProg(b, sampleMessage{size: 80, flags: bm.flags})

rd, err := NewReader(events)
if err != nil {
Expand Down Expand Up @@ -299,7 +349,7 @@ func BenchmarkReader(b *testing.B) {
func BenchmarkReadInto(b *testing.B) {
testutils.SkipOnOldKernel(b, "5.8", "BPF ring buffer")

prog, events := mustOutputSamplesProg(b, 0, 80)
prog, events := mustOutputSamplesProg(b, sampleMessage{size: 80, flags: 0})

rd, err := NewReader(events)
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions ringbuf/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ func (rr *ringReader) skipRead(skipBytes uint64) {
rr.cons += clamp(rr.cons, atomic.LoadUint64(rr.prod_pos), skipBytes)
}

func (rr *ringReader) isEmpty() bool {
cons := atomic.LoadUint64(rr.cons_pos)
prod := atomic.LoadUint64(rr.prod_pos)

return prod == cons
}

func (rr *ringReader) Read(p []byte) (int, error) {
prod := atomic.LoadUint64(rr.prod_pos)

Expand Down

0 comments on commit 01ebd4c

Please sign in to comment.