Skip to content

Commit

Permalink
Fix default batcher to correctly call all done callbacks exactly once
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Feb 4, 2025
1 parent a6aaf37 commit 659002a
Show file tree
Hide file tree
Showing 13 changed files with 339 additions and 248 deletions.
25 changes: 25 additions & 0 deletions .chloggen/fix-default-batcher.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix default batcher to correctly call all done callbacks exactly once

# One or more tracking issues or pull requests related to the change
issues: [12247]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
6 changes: 3 additions & 3 deletions exporter/exporterhelper/internal/batch_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,13 @@ func TestBatchSender_MergeOrSplit(t *testing.T) {
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 8, Sink: sink}))
assert.Eventually(t, func() bool {
return sink.RequestsCount() == 1 && sink.ItemsCount() == 8
}, 50*time.Millisecond, 10*time.Millisecond)
}, 500*time.Millisecond, 10*time.Millisecond)

// big request should be broken down into two requests, both are sent right away.
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 17, Sink: sink}))
assert.Eventually(t, func() bool {
return sink.RequestsCount() == 3 && sink.ItemsCount() == 25
}, 50*time.Millisecond, 10*time.Millisecond)
}, 500*time.Millisecond, 10*time.Millisecond)

// request that cannot be split should be dropped.
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{
Expand All @@ -215,7 +215,7 @@ func TestBatchSender_MergeOrSplit(t *testing.T) {

assert.Eventually(t, func() bool {
return sink.RequestsCount() == 5 && sink.ItemsCount() == 38
}, 50*time.Millisecond, 10*time.Millisecond)
}, 500*time.Millisecond, 10*time.Millisecond)
})
}

Expand Down
4 changes: 2 additions & 2 deletions exporter/exporterhelper/internal/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ func NewQueueSender(
return err
}
if !usePullingBasedExporterQueueBatcher.IsEnabled() {
q, err := newObsQueue(qSet, qf(context.Background(), qSet, qCfg, func(ctx context.Context, req internal.Request, done exporterqueue.DoneCallback) {
done(exportFunc(ctx, req))
q, err := newObsQueue(qSet, qf(context.Background(), qSet, qCfg, func(ctx context.Context, req internal.Request, done exporterqueue.Done) {
done.OnDone(exportFunc(ctx, req))
}))
if err != nil {
return nil, err
Expand Down
10 changes: 7 additions & 3 deletions exporter/exporterqueue/bounded_memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ import (
"go.opentelemetry.io/collector/component"
)

var noopDone DoneCallback = func(error) {}
type noopDone struct{}

func (*noopDone) OnDone(error) {}

var noopDoneInst = &noopDone{}

// boundedMemoryQueue implements a producer-consumer exchange similar to a ring buffer queue,
// where the queue is bounded and if it fills up due to slow consumers, the new items written by
Expand All @@ -36,7 +40,7 @@ func newBoundedMemoryQueue[T any](set memoryQueueSettings[T]) readableQueue[T] {
}
}

func (q *boundedMemoryQueue[T]) Read(context.Context) (context.Context, T, DoneCallback, bool) {
func (q *boundedMemoryQueue[T]) Read(context.Context) (context.Context, T, Done, bool) {
ctx, req, ok := q.sizedQueue.pop()
return ctx, req, noopDone, ok
return ctx, req, noopDoneInst, ok
}
14 changes: 7 additions & 7 deletions exporter/exporterqueue/bounded_memory_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ func TestQueueUsage(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
q := newBoundedMemoryQueue[uint64](memoryQueueSettings[uint64]{sizer: tt.sizer, capacity: int64(100)})
consumed := &atomic.Int64{}
ac := newConsumerQueue(q, 1, func(_ context.Context, _ uint64, done DoneCallback) {
ac := newConsumerQueue(q, 1, func(_ context.Context, _ uint64, done Done) {
consumed.Add(1)
done(nil)
done.OnDone(nil)
})
require.NoError(t, ac.Start(context.Background(), componenttest.NewNopHost()))
for j := 0; j < 10; j++ {
Expand Down Expand Up @@ -147,9 +147,9 @@ func TestBlockingQueueUsage(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
q := newBoundedMemoryQueue[uint64](memoryQueueSettings[uint64]{sizer: tt.sizer, capacity: int64(100), blocking: true})
consumed := &atomic.Int64{}
ac := newConsumerQueue(q, 10, func(_ context.Context, _ uint64, done DoneCallback) {
ac := newConsumerQueue(q, 10, func(_ context.Context, _ uint64, done Done) {
consumed.Add(1)
done(nil)
done.OnDone(nil)
})
require.NoError(t, ac.Start(context.Background(), componenttest.NewNopHost()))
wg := &sync.WaitGroup{}
Expand Down Expand Up @@ -182,7 +182,7 @@ func consume[T any](q readableQueue[T], consumeFunc func(context.Context, T) err
if !ok {
return false
}
done(consumeFunc(ctx, req))
done.OnDone(consumeFunc(ctx, req))
return true
}

Expand All @@ -205,9 +205,9 @@ func BenchmarkOffer(b *testing.B) {
q := newBoundedMemoryQueue[uint64](memoryQueueSettings[uint64]{sizer: tt.sizer, capacity: int64(10 * b.N)})
consumed := &atomic.Int64{}
require.NoError(b, q.Start(context.Background(), componenttest.NewNopHost()))
ac := newConsumerQueue(q, 1, func(_ context.Context, _ uint64, done DoneCallback) {
ac := newConsumerQueue(q, 1, func(_ context.Context, _ uint64, done Done) {
consumed.Add(1)
done(nil)
done.OnDone(nil)
})
require.NoError(b, ac.Start(context.Background(), componenttest.NewNopHost()))
b.ResetTimer()
Expand Down
13 changes: 11 additions & 2 deletions exporter/exporterqueue/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error {
return nil
}

func (pq *persistentQueue[T]) Read(ctx context.Context) (context.Context, T, DoneCallback, bool) {
func (pq *persistentQueue[T]) Read(ctx context.Context) (context.Context, T, Done, bool) {
pq.mu.Lock()
defer pq.mu.Unlock()

Expand Down Expand Up @@ -301,7 +301,7 @@ func (pq *persistentQueue[T]) Read(ctx context.Context) (context.Context, T, Don
}
pq.hasMoreSpace.Signal()

return context.Background(), req, func(processErr error) { pq.onDone(index, processErr) }, true
return context.Background(), req, indexDone[T]{index: index, pq: pq}, true
}
}

Expand Down Expand Up @@ -554,3 +554,12 @@ func bytesToItemIndexArray(buf []byte) ([]uint64, error) {
}
return val, nil
}

type indexDone[T any] struct {
index uint64
pq *persistentQueue[T]
}

func (id indexDone[T]) OnDone(err error) {
id.pq.onDone(id.index, err)
}
12 changes: 6 additions & 6 deletions exporter/exporterqueue/persistent_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,8 @@ func createAndStartTestPersistentQueue(t *testing.T, sizer sizer[uint64], capaci
unmarshaler: uint64Unmarshaler,
set: exportertest.NewNopSettings(),
})
ac := newConsumerQueue(pq, numConsumers, func(ctx context.Context, item uint64, done DoneCallback) {
done(consumeFunc(ctx, item))
ac := newConsumerQueue(pq, numConsumers, func(ctx context.Context, item uint64, done Done) {
done.OnDone(consumeFunc(ctx, item))
})
host := &mockHost{ext: map[component.ID]component.Component{
{}: storagetest.NewMockStorageExtension(nil),
Expand Down Expand Up @@ -425,9 +425,9 @@ func TestPersistentBlockingQueue(t *testing.T) {
set: exportertest.NewNopSettings(),
})
consumed := &atomic.Int64{}
ac := newConsumerQueue(pq, 10, func(_ context.Context, _ uint64, done DoneCallback) {
ac := newConsumerQueue(pq, 10, func(_ context.Context, _ uint64, done Done) {
consumed.Add(1)
done(nil)
done.OnDone(nil)
})
host := &mockHost{ext: map[component.ID]component.Component{
{}: storagetest.NewMockStorageExtension(nil),
Expand Down Expand Up @@ -671,7 +671,7 @@ func TestPersistentQueue_CurrentlyProcessedItems(t *testing.T) {
requireCurrentlyDispatchedItemsEqual(t, ps, []uint64{0, 1})

// Lets mark item 1 as finished, it will remove it from the currently dispatched items list.
secondDone(nil)
secondDone.OnDone(nil)
requireCurrentlyDispatchedItemsEqual(t, ps, []uint64{0})

// Reload the storage. Since items 0 was not finished, this should be re-enqueued at the end.
Expand Down Expand Up @@ -915,7 +915,7 @@ func TestPersistentQueue_ShutdownWhileConsuming(t *testing.T) {
assert.False(t, ps.client.(*storagetest.MockStorageClient).IsClosed())
require.NoError(t, ps.Shutdown(context.Background()))
assert.False(t, ps.client.(*storagetest.MockStorageClient).IsClosed())
done(nil)
done.OnDone(nil)
assert.True(t, ps.client.(*storagetest.MockStorageClient).IsClosed())
}

Expand Down
11 changes: 7 additions & 4 deletions exporter/exporterqueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@ import (
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
var ErrQueueIsFull = errors.New("sending queue is full")

// DoneCallback represents the callback that will be called when the read request is completely processed by the
// Done represents the callback that will be called when the read request is completely processed by the
// downstream components.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
type DoneCallback func(processErr error)
type Done interface {
// OnDone needs to be called when processing of the queue item is done.
OnDone(error)
}

type ConsumeFunc[T any] func(context.Context, T, DoneCallback)
type ConsumeFunc[T any] func(context.Context, T, Done)

// Queue defines a producer-consumer exchange which can be backed by e.g. the memory-based ring buffer queue
// (boundedMemoryQueue) or via a disk-based queue (persistentQueue)
Expand All @@ -49,7 +52,7 @@ type readableQueue[T any] interface {
// finished, the done callback must be called to clean up the storage.
// The function blocks until an item is available or if the queue is stopped.
// If the queue is stopped returns false, otherwise true.
Read(context.Context) (context.Context, T, DoneCallback, bool)
Read(context.Context) (context.Context, T, Done, bool)
}

// Settings defines settings for creating a queue.
Expand Down
2 changes: 1 addition & 1 deletion exporter/internal/queue/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
// Batcher is in charge of reading items from the queue and send them out asynchronously.
type Batcher interface {
component.Component
Consume(context.Context, internal.Request, exporterqueue.DoneCallback)
Consume(context.Context, internal.Request, exporterqueue.Done)
}

func NewBatcher(batchCfg exporterbatcher.Config,
Expand Down
Loading

0 comments on commit 659002a

Please sign in to comment.