Skip to content

Commit

Permalink
feat: AddRingBuf() using ring_buffer__add
Browse files Browse the repository at this point in the history
libbpf provides ring_buffer__add to handle multiple ringbuf event
callbacks

- single RingBuffer object now handles multiple slots
- implements AddRingBuf() for ring_buffer__add wrapper
- updates ringbuffers selftest to show how to handle it

Reference
- https://github.com/torvalds/linux/blob/eb6a9339efeb6f3d2b5c86fdf2382cdc293eca2c/tools/testing/selftests/bpf/progs/test_ringbuf_multi.c
- https://github.com/torvalds/linux/blob/eb6a9339efeb6f3d2b5c86fdf2382cdc293eca2c/tools/testing/selftests/bpf/prog_tests/ringbuf_multi.c#41
  • Loading branch information
mighty1231 committed May 20, 2024
1 parent 59f5984 commit f4f8407
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 26 deletions.
27 changes: 17 additions & 10 deletions buf-ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
type RingBuffer struct {
rb *C.struct_ring_buffer
bpfMap *BPFMap
slot uint
slots []uint
stop chan struct{}
closed bool
wg sync.WaitGroup
Expand Down Expand Up @@ -50,20 +50,25 @@ func (rb *RingBuffer) Stop() {
// may have stopped at this point. Failure to drain it will
// result in a deadlock: the channel will fill up and the poll
// goroutine will block in the callback.
eventChan := eventChannels.get(rb.slot).(chan []byte)
go func() {
// revive:disable:empty-block
for range eventChan {
}
// revive:enable:empty-block
}()
for _, slot := range rb.slots {
eventChan := eventChannels.get(slot).(chan []byte)
go func() {
// revive:disable:empty-block
for range eventChan {
}
// revive:enable:empty-block
}()
}

// Wait for the poll goroutine to exit
rb.wg.Wait()

// Close the channel -- this is useful for the consumer but
// also to terminate the drain goroutine above.
close(eventChan)
for _, slot := range rb.slots {
eventChan := eventChannels.get(slot).(chan []byte)
close(eventChan)
}

// Reset pb.stop to allow multiple safe calls to Stop()
rb.stop = nil
Expand All @@ -76,7 +81,9 @@ func (rb *RingBuffer) Close() {

rb.Stop()
C.ring_buffer__free(rb.rb)
eventChannels.remove(rb.slot)
for _, slot := range rb.slots {
eventChannels.remove(slot)
}
rb.closed = true
}

Expand Down
11 changes: 11 additions & 0 deletions libbpfgo.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,17 @@ struct ring_buffer *cgo_init_ring_buf(int map_fd, uintptr_t ctx)
return rb;
}

int cgo_add_ring_buf(struct ring_buffer* rb, int map_fd, uintptr_t ctx)
{
int ret = ring_buffer__add(rb, map_fd, ringbufferCallback, (void *) ctx);
if (ret != 0) {
fprintf(stderr, "Failed to add ring buffer: %s\n", strerror(-ret));
return ret;
}

return ret;
}

struct perf_buffer *cgo_init_perf_buf(int map_fd, int page_cnt, uintptr_t ctx)
{
struct perf_buffer_opts pb_opts = {};
Expand Down
1 change: 1 addition & 0 deletions libbpfgo.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
void cgo_libbpf_set_print_fn();

struct ring_buffer *cgo_init_ring_buf(int map_fd, uintptr_t ctx);
int cgo_add_ring_buf(struct ring_buffer* rb, int map_fd, uintptr_t ctx);
struct perf_buffer *cgo_init_perf_buf(int map_fd, int page_cnt, uintptr_t ctx);

void cgo_bpf_map__initial_value(struct bpf_map *map, void *value);
Expand Down
26 changes: 25 additions & 1 deletion module.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,18 +336,42 @@ func (m *Module) InitRingBuf(mapName string, eventsChan chan []byte) (*RingBuffe

rbC, errno := C.cgo_init_ring_buf(C.int(bpfMap.FileDescriptor()), C.uintptr_t(slot))
if rbC == nil {
eventChannels.remove(uint(slot))
return nil, fmt.Errorf("failed to initialize ring buffer: %w", errno)
}

ringBuf := &RingBuffer{
rb: rbC,
bpfMap: bpfMap,
slot: uint(slot),
slots: []uint{uint(slot)},
}
m.ringBufs = append(m.ringBufs, ringBuf)
return ringBuf, nil
}

func (m *Module) AddRingBuf(ringBuf *RingBuffer, mapName string, eventsChan chan []byte) (bool, error) {
bpfMap, err := m.GetMap(mapName)
if err != nil {
return false, err
}

if eventsChan == nil {
return false, fmt.Errorf("events channel can not be nil")
}

slot := eventChannels.put(eventsChan)
if slot == -1 {
return false, fmt.Errorf("max ring buffers reached")
}
ringBuf.slots = append(ringBuf.slots, uint(slot))

ret, errno := C.cgo_add_ring_buf(ringBuf.rb, C.int(bpfMap.FileDescriptor()), C.uintptr_t(slot))
if ret != 0 {
return false, fmt.Errorf("failed to add ring buffer: %w", errno)
}
return true, nil
}

func (m *Module) InitPerfBuf(mapName string, eventsChan chan []byte, lostChan chan uint64, pageCnt int) (*PerfBuffer, error) {
bpfMap, err := m.GetMap(mapName)
if err != nil {
Expand Down
14 changes: 12 additions & 2 deletions selftest/ringbuffers/main.bpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
struct {
__uint(type, BPF_MAP_TYPE_RINGBUF);
__uint(max_entries, 1 << 24);
} events SEC(".maps");
} events1 SEC(".maps"),
events2 SEC(".maps");

long ringbuffer_flags = 0;

Expand All @@ -18,13 +19,22 @@ int kprobe__sys_mmap(struct pt_regs *ctx)
int *process;

// Reserve space on the ringbuffer for the sample
process = bpf_ringbuf_reserve(&events, sizeof(int), ringbuffer_flags);
process = bpf_ringbuf_reserve(&events1, sizeof(int), ringbuffer_flags);
if (!process) {
return 0;
}

*process = 2021;

bpf_ringbuf_submit(process, ringbuffer_flags);

process = bpf_ringbuf_reserve(&events2, sizeof(int), ringbuffer_flags);
if (!process) {
return 0;
}

*process = 2024;

bpf_ringbuf_submit(process, ringbuffer_flags);
return 0;
}
46 changes: 33 additions & 13 deletions selftest/ringbuffers/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

func resizeMap(module *bpf.Module, name string, size uint32) error {
m, err := module.GetMap("events")
m, err := module.GetMap(name)
if err != nil {
return err
}
Expand All @@ -39,7 +39,7 @@ func main() {
}
defer bpfModule.Close()

if err = resizeMap(bpfModule, "events", 8192); err != nil {
if err = resizeMap(bpfModule, "events1", 8192); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(-1)
}
Expand All @@ -58,32 +58,52 @@ func main() {
os.Exit(-1)
}

eventsChannel := make(chan []byte)
rb, err := bpfModule.InitRingBuf("events", eventsChannel)
eventsChannel1 := make(chan []byte)
rb, err := bpfModule.InitRingBuf("events1", eventsChannel1)
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(-1)
}

eventsChannel2 := make(chan []byte)
ret, err := bpfModule.AddRingBuf(rb, "events2", eventsChannel2)
if ret == false {
fmt.Fprintln(os.Stderr, err)
os.Exit(-1)
}

rb.Poll(300)

numberOfEventsReceived := 0
numberOfEvent1Received := 0
numberOfEvent2Received := 0
go func() {
for {
syscall.Mmap(999, 999, 999, 1, 1)
time.Sleep(time.Second / 2)
}
}()

recvLoop:
for {
b := <-eventsChannel
if binary.LittleEndian.Uint32(b) != 2021 {
fmt.Fprintf(os.Stderr, "invalid data retrieved\n")
os.Exit(-1)
}
numberOfEventsReceived++
if numberOfEventsReceived > 5 {
break recvLoop
select {
case b := <-eventsChannel1:
if binary.LittleEndian.Uint32(b) != 2021 {
fmt.Fprintf(os.Stderr, "invalid data retrieved\n")
os.Exit(-1)
}
numberOfEvent1Received++
if numberOfEvent1Received > 5 && numberOfEvent2Received > 5 {
break recvLoop
}
case b := <-eventsChannel2:
if binary.LittleEndian.Uint32(b) != 2024 {
fmt.Fprintf(os.Stderr, "invalid data retrieved\n")
os.Exit(-1)
}
numberOfEvent2Received++
if numberOfEvent1Received > 5 && numberOfEvent2Received > 5 {
break recvLoop
}
}
}

Expand Down

0 comments on commit f4f8407

Please sign in to comment.