Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Dataflow Streaming] Reduce contention on work submission #33687

Merged
merged 7 commits into from
Jan 24, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
*/
package org.apache.beam.runners.dataflow.worker.util;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor.Guard;
Expand All @@ -36,6 +38,9 @@ public class BoundedQueueExecutor {

// Used to guard elementsOutstanding and bytesOutstanding.
private final Monitor monitor = new Monitor();
private final ConcurrentLinkedQueue<Long> decrementQueue = new ConcurrentLinkedQueue<>();
private final Object decrementQueueDrainLock = new Object();
private final AtomicBoolean isDecrementBatchPending = new AtomicBoolean(false);
private int elementsOutstanding = 0;
private long bytesOutstanding = 0;

Expand Down Expand Up @@ -236,10 +241,44 @@ private void executeMonitorHeld(Runnable work, long workBytes) {
}

private void decrementCounters(long workBytes) {
monitor.enter();
--elementsOutstanding;
bytesOutstanding -= workBytes;
monitor.leave();
// All threads queue decrements and one thread grabs the monitor and updates
// counters. We do this to reduce contention on monitor which is locked by
// GetWork thread
decrementQueue.add(workBytes);
arunpandianp marked this conversation as resolved.
Show resolved Hide resolved
boolean submittedToExistingBatch = isDecrementBatchPending.getAndSet(true);
if (submittedToExistingBatch) {
// There is already a thread about to drain the decrement queue
// Current thread does not need to drain.
return;
}
synchronized (decrementQueueDrainLock) {
arunpandianp marked this conversation as resolved.
Show resolved Hide resolved
// By setting false here, we may allow another decrement to claim submission of the next batch
// and start waiting on the decrementQueueDrainLock.
//
// However this prevents races that would leave decrements in the queue and unclaimed and we
// are ensured there is at most one additional thread blocked. This helps prevent the executor
// from creating threads over the limit if many were contending on the lock while their
// decrements were already applied.
isDecrementBatchPending.set(false);
long bytesToDecrement = 0;
int elementsToDecrement = 0;
while (true) {
Long pollResult = decrementQueue.poll();
if (pollResult == null) {
break;
}
bytesToDecrement += pollResult;
++elementsToDecrement;
}
if (elementsToDecrement == 0) {
return;
}

monitor.enter();
elementsOutstanding -= elementsToDecrement;
bytesOutstanding -= bytesToDecrement;
monitor.leave();
}
}

private long bytesAvailable() {
Expand Down
Loading