Skip to content

Commit

Permalink
[Dataflow Streaming] Code micro optimizations (1/N) (#33580)
Browse files Browse the repository at this point in the history
* replace Optional.nullable with if check
* ActiveWorkState: Replace AtomicReference with mutex, critical path accessing activeGetWorkBudget is already under mutex
* WindmillTimerInternals: Remove ExposedByteArrayOutputStream from timerTag()
* Override hashcode() in ShardingKey
* Override hashcode() in WindmillComputationKey
  • Loading branch information
arunpandianp authored Jan 22, 2025
1 parent c761112 commit 9761271
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
@NotThreadSafe
@Internal
public class StreamingModeExecutionContext extends DataflowExecutionContext<StepContext> {

private static final Logger LOG = LoggerFactory.getLogger(StreamingModeExecutionContext.class);

private final String computationId;
Expand Down Expand Up @@ -191,7 +192,7 @@ public boolean throwExceptionsForLargeOutput() {
}

public boolean workIsFailed() {
return Optional.ofNullable(work).map(Work::isFailed).orElse(false);
return work != null && work.isFailed();
}

public void start(
Expand Down Expand Up @@ -553,6 +554,7 @@ protected DataflowExecutionState createState(
}

private static class ScopedReadStateSupplier implements Supplier<Closeable> {

private final ExecutionState readState;
private final @Nullable ExecutionStateTracker stateTracker;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.runners.dataflow.worker;

import com.google.auto.value.AutoValue;
import java.util.Objects;
import org.apache.beam.runners.dataflow.worker.streaming.ShardedKey;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.TextFormat;
Expand Down Expand Up @@ -45,4 +46,10 @@ public final String toString() {
return String.format(
"%s: %s-%d", computationId(), TextFormat.escapeBytes(key()), shardingKey());
}

@Override
public final int hashCode() {
// Sharding key collisions are unexpected, avoid hashing full key
return Objects.hash(shardingKey(), computationId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.function.Consumer;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
Expand All @@ -33,8 +32,6 @@
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.ExposedByteArrayInputStream;
import org.apache.beam.sdk.util.ExposedByteArrayOutputStream;
import org.apache.beam.sdk.util.VarInt;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
Expand All @@ -54,6 +51,7 @@
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
class WindmillTimerInternals implements TimerInternals {

private static final Instant OUTPUT_TIMESTAMP_MAX_WINDMILL_VALUE =
GlobalWindow.INSTANCE.maxTimestamp().plus(Duration.millis(1));

Expand Down Expand Up @@ -406,36 +404,27 @@ private static boolean useNewTimerTagEncoding(TimerData timerData) {
*/
public static ByteString timerTag(WindmillNamespacePrefix prefix, TimerData timerData) {
String tagString;
ExposedByteArrayOutputStream out = new ExposedByteArrayOutputStream();
try {
if (useNewTimerTagEncoding(timerData)) {
tagString =
new StringBuilder()
.append(prefix.byteString().toStringUtf8()) // this never ends with a slash
.append(
timerData.getNamespace().stringKey()) // this must begin and end with a slash
.append('+')
.append(timerData.getTimerId()) // this is arbitrary; currently unescaped
.append('+')
.append(timerData.getTimerFamilyId())
.toString();
out.write(tagString.getBytes(StandardCharsets.UTF_8));
} else {
// Timers without timerFamily would have timerFamily would be an empty string
tagString =
new StringBuilder()
.append(prefix.byteString().toStringUtf8()) // this never ends with a slash
.append(
timerData.getNamespace().stringKey()) // this must begin and end with a slash
.append('+')
.append(timerData.getTimerId()) // this is arbitrary; currently unescaped
.toString();
out.write(tagString.getBytes(StandardCharsets.UTF_8));
}
return ByteString.readFrom(new ExposedByteArrayInputStream(out.toByteArray()));
} catch (IOException e) {
throw new RuntimeException(e);
if (useNewTimerTagEncoding(timerData)) {
tagString =
new StringBuilder()
.append(prefix.byteString().toStringUtf8()) // this never ends with a slash
.append(timerData.getNamespace().stringKey()) // this must begin and end with a slash
.append('+')
.append(timerData.getTimerId()) // this is arbitrary; currently unescaped
.append('+')
.append(timerData.getTimerFamilyId())
.toString();
} else {
// Timers without timerFamily would have timerFamily would be an empty string
tagString =
new StringBuilder()
.append(prefix.byteString().toStringUtf8()) // this never ends with a slash
.append(timerData.getNamespace().stringKey()) // this must begin and end with a slash
.append('+')
.append(timerData.getTimerId()) // this is arbitrary; currently unescaped
.toString();
}
return ByteString.copyFromUtf8(tagString);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
Expand All @@ -57,6 +56,7 @@
@ThreadSafe
@Internal
public final class ActiveWorkState {

private static final Logger LOG = LoggerFactory.getLogger(ActiveWorkState.class);

/* The max number of keys in COMMITTING or COMMIT_QUEUED status to be shown for observability.*/
Expand All @@ -77,14 +77,15 @@ public final class ActiveWorkState {
* activated in {@link #activateWorkForKey(ExecutableWork)}, and decremented when work is
* completed in {@link #completeWorkAndGetNextWorkForKey(ShardedKey, WorkId)}.
*/
private final AtomicReference<GetWorkBudget> activeGetWorkBudget;
@GuardedBy("this")
private GetWorkBudget activeGetWorkBudget;

private ActiveWorkState(
Map<ShardedKey, Deque<ExecutableWork>> activeWork,
WindmillStateCache.ForComputation computationStateCache) {
this.activeWork = activeWork;
this.computationStateCache = computationStateCache;
this.activeGetWorkBudget = new AtomicReference<>(GetWorkBudget.noBudget());
this.activeGetWorkBudget = GetWorkBudget.noBudget();
}

static ActiveWorkState create(WindmillStateCache.ForComputation computationStateCache) {
Expand Down Expand Up @@ -219,14 +220,12 @@ synchronized ImmutableList<RefreshableWork> getRefreshableWork(Instant refreshDe
.collect(toImmutableList());
}

private void incrementActiveWorkBudget(Work work) {
activeGetWorkBudget.updateAndGet(
getWorkBudget -> getWorkBudget.apply(1, work.getSerializedWorkItemSize()));
private synchronized void incrementActiveWorkBudget(Work work) {
activeGetWorkBudget = activeGetWorkBudget.apply(1, work.getSerializedWorkItemSize());
}

private void decrementActiveWorkBudget(Work work) {
activeGetWorkBudget.updateAndGet(
getWorkBudget -> getWorkBudget.subtract(1, work.getSerializedWorkItemSize()));
private synchronized void decrementActiveWorkBudget(Work work) {
activeGetWorkBudget = activeGetWorkBudget.subtract(1, work.getSerializedWorkItemSize());
}

/**
Expand Down Expand Up @@ -331,8 +330,8 @@ private synchronized ImmutableMap<ShardedKey, WorkId> getStuckCommitsAt(
* means that the work is received from Windmill, being processed or queued to be processed in
* {@link ActiveWorkState}, and not committed back to Windmill.
*/
GetWorkBudget currentActiveWorkBudget() {
return activeGetWorkBudget.get();
synchronized GetWorkBudget currentActiveWorkBudget() {
return activeGetWorkBudget;
}

synchronized void printActiveWork(PrintWriter writer, Instant now) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,10 @@ public static ShardedKey create(ByteString key, long shardingKey) {
public final String toString() {
return String.format("%016x", shardingKey());
}

@Override
public final int hashCode() {
// Sharding key collisions are unexpected, avoid hashing full key
return Long.hashCode(shardingKey());
}
}

0 comments on commit 9761271

Please sign in to comment.