Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

map: Add Drain for traversing maps while removing entries #1349

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 112 additions & 21 deletions map.go
Original file line number Diff line number Diff line change
Expand Up @@ -1344,10 +1344,31 @@ func batchCount(keys, values any) (int, error) {
//
// It's not possible to guarantee that all keys in a map will be
// returned if there are concurrent modifications to the map.
//
// Iterating a hash map from which keys are being deleted is not
// safe. You may see the same key multiple times. Iteration may
// also abort with an error, see IsIterationAborted.
//
// Iterating a queue/stack map returns an error (NextKey invalid
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and below:

Suggested change
// Iterating a queue/stack map returns an error (NextKey invalid
// Iterating a queue/stack map returns an error, use [Map.Drain] instead.

Nit: try to reduce your use of the term 'API', it's redundant and often implied. Less is more!

// argument): [Map.Drain] API should be used instead.
func (m *Map) Iterate() *MapIterator {
return newMapIterator(m)
}

// Drain traverses a map while also removing entries.
//
// It's safe to create multiple drainers at the same time,
// but their respective outputs will differ.
//
// Draining a map that does not support entry removal such as
// an array return an error (LookupAndDelete not supported):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exact error returned is an implementation detail and may change over time unless it's a sentinel. If it's in godoc, it becomes a contract.

If it does return an error wrapping ErrNotSupported, this is valid docmentation:

Draining a map that doesn't support deletions will return [ErrNotSupported].

// [Map.Iterate] API should be used instead.
func (m *Map) Drain() *MapIterator {
it := newMapIterator(m)
it.drain = true
return it
}

// Close the Map's underlying file descriptor, which could unload the
// Map from the kernel if it is not pinned or in use by a loaded Program.
func (m *Map) Close() error {
Expand Down Expand Up @@ -1602,6 +1623,12 @@ func marshalMap(m *Map, length int) ([]byte, error) {
return buf, nil
}

// isKeyValueMap returns true if map supports key-value pairs (ex. hash)
// and false in case of value-only maps (ex. queue).
func isKeyValueMap(m *Map) bool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would make more sense inverted, e.g. func (m *Map) isKeyless() bool since you're negating its only use below. Then the docstring can also become a bit more focused: isKeyless returns true if the map is value-only, like a [Queue].

return m.keySize != 0
}

// MapIterator iterates a Map.
//
// See Map.Iterate.
Expand All @@ -1611,7 +1638,7 @@ type MapIterator struct {
// of []byte to avoid allocations.
cursor any
count, maxEntries uint32
done bool
done, drain bool
err error
}

Expand All @@ -1622,12 +1649,56 @@ func newMapIterator(target *Map) *MapIterator {
}
}

// cursorToKeyOut copies the current value held in the cursor to the
// provided argument. In case of errors, returns false and sets a
// non-nil error in the MapIterator.
func (mi *MapIterator) cursorToKeyOut(keyOut interface{}) bool {
buf := mi.cursor.([]byte)
if ptr, ok := keyOut.(unsafe.Pointer); ok {
copy(unsafe.Slice((*byte)(ptr), len(buf)), buf)
} else {
Comment on lines +1657 to +1659
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry if this is a bit off topic. @ti-mo Do you recall why we have this exception here? This is just moving the existing code added in 4609dc7. But we seem to take so much care to ensure safety elsewhere, including in sysenc.Unmarshal. But not here.

If for some reason the user passes in an unsafe pointer in keyOut that is smaller than buf this causes us to write into unknown parts of the heap.

Copy link
Collaborator

@ti-mo ti-mo Dec 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not exactly sure, but the commit you linked does contain some hints around slices of types with trailing padding:

Allowing such a type creates an edge case: make([]padding, 1)
uses zero-allocation marshaling while make([]padding, 2)
doesn't, due to interior padding. It's simpler to skip such
types for now.

Passing in unsafe.Pointer sidesteps this limitation. Also see f0d238d. unsafe.Pointer is supported for most map operations, see marshalMapSyscallInput.

mi.err = sysenc.Unmarshal(keyOut, buf)
}
return mi.err == nil
}

// fetchNextKey loads into the cursor the key following the provided one.
func (mi *MapIterator) fetchNextKey(key interface{}) bool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func (mi *MapIterator) fetchNextKey(key interface{}) bool {
func (mi *MapIterator) nextKeyInto(key interface{}) bool {

mi.err = mi.target.NextKey(key, mi.cursor)
if mi.err == nil {
return true
}

if errors.Is(mi.err, ErrKeyNotExist) {
mi.done = true
mi.err = nil
} else {
mi.err = fmt.Errorf("get next key: %w", mi.err)
}

return false
}

// drainMapEntry removes and returns the key held in the cursor
// from the underlying map.
func (mi *MapIterator) drainMapEntry(valueOut interface{}) bool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a wrapper around LookupAndDelete:

Suggested change
func (mi *MapIterator) drainMapEntry(valueOut interface{}) bool {
func (mi *MapIterator) lookupAndDeleteInto(valueOut interface{}) bool {

mi.err = mi.target.LookupAndDelete(mi.cursor, valueOut)
if mi.err == nil {
mi.count++
return true
}

if errors.Is(mi.err, ErrKeyNotExist) {
mi.err = nil
} else {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpicking, but IMO this is important none the less: try to avoid else near the end of a block, it's often unnecessary.

In this case, it's also a little misleading when mixed in with early returns. The else clause would be executed if !errors.Is(..., ErrNotExist), but also when err == nil, and the latter case is already checked above.

It's usually clearer to be explicit and let things fall through naturally:

	if errors.Is(mi.err, ErrKeyNotExist) {
		mi.err = nil
		return false
	}

	mi.err = fmt.Errorf("lookup and delete next key: %w", mi.err)
	return false

mi.err = fmt.Errorf("lookup_and_delete key: %w", mi.err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer natural language in error strings, these will be seen by the user:

Suggested change
mi.err = fmt.Errorf("lookup_and_delete key: %w", mi.err)
mi.err = fmt.Errorf("lookup and delete next key: %w", mi.err)

}

return false
}

// Next decodes the next key and value.
//
// Iterating a hash map from which keys are being deleted is not
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this comment removed? This applies in general, and it's documentation on exported API, so we shouldn't just remove it.

// safe. You may see the same key multiple times. Iteration may
// also abort with an error, see IsIterationAborted.
//
// Returns false if there are no more entries. You must check
// the result of Err afterwards.
//
Expand All @@ -1636,6 +1707,38 @@ func (mi *MapIterator) Next(keyOut, valueOut interface{}) bool {
if mi.err != nil || mi.done {
return false
}
if mi.drain {
return mi.nextDrain(keyOut, valueOut)
}
return mi.nextIterate(keyOut, valueOut)
}

func (mi *MapIterator) nextDrain(keyOut, valueOut interface{}) bool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func (mi *MapIterator) nextDrain(keyOut, valueOut interface{}) bool {
func (mi *MapIterator) pop(keyOut, valueOut interface{}) bool {

This function pops a single entry, it doesn't drain the whole map.

// Handle value-only maps (ex. queue).
if !isKeyValueMap(mi.target) {
if keyOut != nil {
mi.err = fmt.Errorf("non-nil keyOut provided for map without a key, must be nil instead")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
mi.err = fmt.Errorf("non-nil keyOut provided for map without a key, must be nil instead")
mi.err = fmt.Errorf("non-nil keyOut provided for map without a key")

return false
}
return mi.drainMapEntry(valueOut)
}

if mi.cursor == nil {
mi.cursor = make([]byte, mi.target.keySize)
}

// Always retrieve first key in the map. This should ensure that the whole map
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is the case. Consider the following scenario:

  • drainer A: NextKey(0) yields a
  • drainer B: NextKey(0) yields a
  • drainer B: LookupAndDelete(a) succeeds
  • drainer A: LookupAndDelete(a) fails

The L&D failing can happen at any point due to a concurrent delete, also when e.g. a bpf program deletes a key, which is fairly common. In the current implementation, I think iteration halts at that point, while there may still be keys left in the map.

// is traversed, despite concurrent operations (ordering of items might differ).
for mi.err == nil && mi.fetchNextKey(nil) {
if mi.drainMapEntry(valueOut) {
return mi.cursorToKeyOut(keyOut)
}
}
return false
}

func (mi *MapIterator) nextIterate(keyOut, valueOut interface{}) bool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func (mi *MapIterator) nextIterate(keyOut, valueOut interface{}) bool {
func (mi *MapIterator) next(keyOut, valueOut interface{}) bool {

var key interface{}

// For array-like maps NextKey returns nil only after maxEntries
// iterations.
Expand All @@ -1645,17 +1748,12 @@ func (mi *MapIterator) Next(keyOut, valueOut interface{}) bool {
// is returned. If we pass an uninitialized []byte instead, it'll see a
// non-nil interface and try to marshal it.
mi.cursor = make([]byte, mi.target.keySize)
mi.err = mi.target.NextKey(nil, mi.cursor)
key = nil
} else {
mi.err = mi.target.NextKey(mi.cursor, mi.cursor)
key = mi.cursor
}

if errors.Is(mi.err, ErrKeyNotExist) {
mi.done = true
mi.err = nil
return false
} else if mi.err != nil {
mi.err = fmt.Errorf("get next key: %w", mi.err)
if !mi.fetchNextKey(key) {
return false
}

Expand All @@ -1677,14 +1775,7 @@ func (mi *MapIterator) Next(keyOut, valueOut interface{}) bool {
return false
}

buf := mi.cursor.([]byte)
if ptr, ok := keyOut.(unsafe.Pointer); ok {
copy(unsafe.Slice((*byte)(ptr), len(buf)), buf)
} else {
mi.err = sysenc.Unmarshal(keyOut, buf)
}

return mi.err == nil
return mi.cursorToKeyOut(keyOut)
}

mi.err = fmt.Errorf("%w", ErrIterationAborted)
Expand Down
133 changes: 133 additions & 0 deletions map_test.go
smagnani96 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -1174,6 +1174,139 @@ func TestMapIteratorAllocations(t *testing.T) {
qt.Assert(t, qt.Equals(allocs, float64(0)))
}

func TestMapDrain(t *testing.T) {
for _, mapType := range []MapType{
Hash,
Queue,
} {
t.Run(mapType.String(), func(t *testing.T) {
var (
keySize, value uint32
keyPtr interface{}
values = []uint32{}
data = []uint32{0, 1}
)

if mapType == Queue {
testutils.SkipOnOldKernel(t, "4.20", "map type queue")
smagnani96 marked this conversation as resolved.
Show resolved Hide resolved
keyPtr = nil
keySize = 0
}

if mapType == Hash {
testutils.SkipOnOldKernel(t, "5.14", "map type hash")
keyPtr = new(uint32)
keySize = 4
}

m, err := NewMap(&MapSpec{
Type: mapType,
KeySize: keySize,
ValueSize: 4,
MaxEntries: 2,
})
qt.Assert(t, qt.IsNil(err))
defer m.Close()

// Assert drain empty map.
entries := m.Drain()
qt.Assert(t, qt.IsFalse(entries.Next(keyPtr, &value)))
qt.Assert(t, qt.IsNil(entries.Err()))

for _, v := range data {
if keySize == 0 {
err = m.Put(nil, uint32(v))
} else {
err = m.Put(uint32(v), uint32(v))
}
qt.Assert(t, qt.IsNil(err))
}

entries = m.Drain()
for entries.Next(keyPtr, &value) {
values = append(values, value)
}
qt.Assert(t, qt.IsNil(entries.Err()))

sort.Slice(values, func(i, j int) bool { return values[i] < values[j] })
qt.Assert(t, qt.DeepEquals(values, data))
})
}
}

func TestDrainWrongMap(t *testing.T) {
arr, err := NewMap(&MapSpec{
Type: Array,
KeySize: 4,
ValueSize: 4,
MaxEntries: 10,
})
qt.Assert(t, qt.IsNil(err))
defer arr.Close()

var key, value uint32
entries := arr.Drain()

qt.Assert(t, qt.IsFalse(entries.Next(&key, &value)))
qt.Assert(t, qt.IsNotNil(entries.Err()))
fmt.Println(entries.Err())
}

func TestMapDrainerAllocations(t *testing.T) {
for _, mapType := range []MapType{
Hash,
Queue,
} {
t.Run(mapType.String(), func(t *testing.T) {
var (
keySize, value uint32
keyPtr interface{}
)

if mapType == Queue {
testutils.SkipOnOldKernel(t, "4.20", "map type queue")
keyPtr = nil
keySize = 0
}

if mapType == Hash {
testutils.SkipOnOldKernel(t, "5.14", "map type hash")
keyPtr = new(uint32)
keySize = 4
}

m, err := NewMap(&MapSpec{
Type: mapType,
KeySize: keySize,
ValueSize: 4,
MaxEntries: 10,
})
qt.Assert(t, qt.ErrorIs(err, nil))
defer m.Close()

for i := 0; i < int(m.MaxEntries()); i++ {
if keySize == 0 {
err = m.Put(nil, uint32(i))
} else {
err = m.Put(uint32(i), uint32(i))
}
if err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if err != nil {
qt.Assert(t, qt.IsNil(err))

t.Fatal(err)
}
}

iter := m.Drain()
allocs := testing.AllocsPerRun(int(m.MaxEntries()-1), func() {
if !iter.Next(keyPtr, &value) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if !iter.Next(keyPtr, &value) {
qt.Assert(t, qt.IsTrue(iter.Next(keyPtr, &value)))

Or maybe qt.Assert allocates? Feel free to ignore.

t.Fatal("Next failed while draining: %w", iter.Err())
}
})

qt.Assert(t, qt.Equals(allocs, float64(0)))
})
}
}

func TestMapBatchLookupAllocations(t *testing.T) {
testutils.SkipIfNotSupported(t, haveBatchAPI())

Expand Down
Loading