Skip to content

Commit

Permalink
Fix up bazel-buildfarm to pass all tests
Browse files Browse the repository at this point in the history
  • Loading branch information
chenj-hub committed Aug 14, 2024
1 parent d6c1859 commit 3977b90
Show file tree
Hide file tree
Showing 5 changed files with 7 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import build.buildfarm.cas.ContentAddressableStorage;
import build.buildfarm.cas.cfc.CASFileCache;
import build.buildfarm.common.BuildfarmExecutors;
import build.buildfarm.common.DigestUtil;
import build.buildfarm.common.io.Directories;
import build.buildfarm.common.io.Dirent;
import build.buildfarm.worker.ExecDirException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,6 @@ private void insertFileToCasMember(Digest digest, DigestFunction.Value digestFun
}
}

private long writeToCasMember(Digest digest, InputStream in)
throws IOException, InterruptedException {
private long writeToCasMember(Digest digest, DigestFunction.Value digestFunction, InputStream in)
throws IOException, InterruptedException {
// create a write for inserting into another CAS member.
Expand Down
65 changes: 2 additions & 63 deletions src/main/java/build/buildfarm/worker/shard/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.SEVERE;
Expand Down Expand Up @@ -63,8 +62,6 @@
import build.buildfarm.worker.resources.LocalResourceSetUtils;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.common.options.OptionsParsingException;
import com.google.longrunning.Operation;
import com.google.protobuf.ByteString;
import com.google.protobuf.Duration;
Expand All @@ -90,18 +87,11 @@
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import javax.annotation.Nullable;
import javax.naming.ConfigurationException;
import lombok.extern.java.Log;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.ComponentScan;

@Log
public final class Worker extends LoggingMain {
Expand Down Expand Up @@ -146,7 +136,6 @@ public final class Worker extends LoggingMain {
private LoadingCache<String, Instance> workerStubs;
private AtomicBoolean released = new AtomicBoolean(true);

@Autowired private ApplicationContext springContext;
/**
* The method will prepare the worker for graceful shutdown when the worker is ready. Note on
* using stderr here instead of log. By the time this is called in PreDestroy, the log is no
Expand Down Expand Up @@ -196,43 +185,6 @@ private Worker() {
super("BuildFarmShardWorker");
}

private void exitPostPipelineFailure() {
// Shutdown the worker if a pipeline fails. By means of the spring lifecycle
// hooks - e.g. the `PreDestroy` hook here - it will attempt to gracefully
// spin down the pipeline

// By calling these spring shutdown facilities; we're open to the risk that
// a subsystem may be hanging a criticial thread indeffinitly. Deadline the
// shutdown workflow to ensure we don't leave a zombie worker in this
// situation
ScheduledExecutorService shutdownDeadlineExecutor = newSingleThreadScheduledExecutor();

// This may be shorter than the action timeout; assume we have interrupted
// actions in a fatal uncaught exception.
int forceShutdownDeadline = 60;
ScheduledFuture<?> termFuture =
shutdownDeadlineExecutor.schedule(
new Runnable() {
public void run() {
log.log(
Level.SEVERE,
String.format(
"Force terminating due to shutdown deadline exceeded (%d seconds)",
forceShutdownDeadline));
System.exit(1);
}
},
forceShutdownDeadline,
SECONDS);

// Consider defining exit codes to better afford out of band instance
// recovery
int code = SpringApplication.exit(springContext, () -> 1);
termFuture.cancel(false);
shutdownDeadlineExecutor.shutdown();
System.exit(code);
}

private Operation stripOperation(Operation operation) {
return instance.stripOperation(operation);
}
Expand Down Expand Up @@ -674,22 +626,9 @@ public void start() throws ConfigurationException, InterruptedException, IOExcep
healthStatusManager.setStatus(
HealthStatusManager.SERVICE_NAME_ALL_SERVICES, ServingStatus.SERVING);
PrometheusPublisher.startHttpServer(configs.getPrometheusPort());
startFailsafeRegistration();

// An executor can also be used as storage worker as scheduler treats all new workers as storage workers
// TODO (Congt) Fix it upstream and revert it.
if (configs.getWorker().getCapabilities().isCas()) {
startFailsafeRegistration();
} else {
log.log(INFO, "Skipping worker registration");
}

// Listen for pipeline unhandled exceptions
ExecutorService pipelineExceptionExecutor = newSingleThreadExecutor();
SettableFuture<Void> pipelineExceptionFuture = SettableFuture.create();
pipelineExceptionFuture.addListener(this::exitPostPipelineFailure, pipelineExceptionExecutor);

pipeline.start(pipelineExceptionFuture);

pipeline.start(null);
healthCheckMetric.labels("start").inc();
executionSlotsTotal.set(configs.getWorker().getExecuteStageWidth());
inputFetchSlotsTotal.set(configs.getWorker().getInputFetchStageWidth());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public void noErrorWhenContextCancelled() throws Exception {
when(instance.getBlobWrite(
eq(Compressor.Value.IDENTITY),
eq(cancelledDigest),
eq(DigestFunction.Value.UNKNOWN),
eq(uuid),
any(RequestMetadata.class)))
.thenReturn(write);
Expand All @@ -137,6 +138,7 @@ public void noErrorWhenContextCancelled() throws Exception {
.getBlobWrite(
eq(Compressor.Value.IDENTITY),
eq(cancelledDigest),
eq(DigestFunction.Value.UNKNOWN),
eq(uuid),
any(RequestMetadata.class));
verifyNoInteractions(responseObserver);
Expand Down
4 changes: 2 additions & 2 deletions src/test/java/build/buildfarm/worker/PipelineTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void stageExitsOnInterrupt() throws InterruptedException {
Pipeline pipeline = new Pipeline();
TestStage stage = new TestStage("test");
pipeline.add(stage, 1);
pipeline.start();
pipeline.start(null);
pipeline.join();
}

Expand Down Expand Up @@ -147,7 +147,7 @@ public void stageContinuesOnException() throws InterruptedException {
Pipeline pipeline = new Pipeline();
ContinueStage stage = new ContinueStage("test");
pipeline.add(stage, 1);
pipeline.start();
pipeline.start(null);

boolean didNotThrow = false;
try {
Expand Down

0 comments on commit 3977b90

Please sign in to comment.