diff --git a/src/main/java/build/buildfarm/worker/shard/CFCExecFileSystem.java b/src/main/java/build/buildfarm/worker/shard/CFCExecFileSystem.java index bb667fcd0c..80fe18f0f5 100644 --- a/src/main/java/build/buildfarm/worker/shard/CFCExecFileSystem.java +++ b/src/main/java/build/buildfarm/worker/shard/CFCExecFileSystem.java @@ -40,6 +40,7 @@ import build.buildfarm.cas.ContentAddressableStorage; import build.buildfarm.cas.cfc.CASFileCache; import build.buildfarm.common.BuildfarmExecutors; +import build.buildfarm.common.DigestUtil; import build.buildfarm.common.io.Directories; import build.buildfarm.common.io.Dirent; import build.buildfarm.worker.ExecDirException; diff --git a/src/main/java/build/buildfarm/worker/shard/RemoteCasWriter.java b/src/main/java/build/buildfarm/worker/shard/RemoteCasWriter.java index ba9f3d9a5e..8db3874f8f 100644 --- a/src/main/java/build/buildfarm/worker/shard/RemoteCasWriter.java +++ b/src/main/java/build/buildfarm/worker/shard/RemoteCasWriter.java @@ -81,8 +81,6 @@ private void insertFileToCasMember(Digest digest, DigestFunction.Value digestFun } } - private long writeToCasMember(Digest digest, InputStream in) - throws IOException, InterruptedException { private long writeToCasMember(Digest digest, DigestFunction.Value digestFunction, InputStream in) throws IOException, InterruptedException { // create a write for inserting into another CAS member. diff --git a/src/main/java/build/buildfarm/worker/shard/Worker.java b/src/main/java/build/buildfarm/worker/shard/Worker.java index 0548f13519..e5a2ed53f3 100644 --- a/src/main/java/build/buildfarm/worker/shard/Worker.java +++ b/src/main/java/build/buildfarm/worker/shard/Worker.java @@ -21,7 +21,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static java.util.concurrent.Executors.newSingleThreadExecutor; -import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.logging.Level.INFO; import static java.util.logging.Level.SEVERE; @@ -63,8 +62,6 @@ import build.buildfarm.worker.resources.LocalResourceSetUtils; import com.google.common.cache.LoadingCache; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.SettableFuture; -import com.google.devtools.common.options.OptionsParsingException; import com.google.longrunning.Operation; import com.google.protobuf.ByteString; import com.google.protobuf.Duration; @@ -90,18 +87,11 @@ import java.util.UUID; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import javax.annotation.Nullable; import javax.naming.ConfigurationException; import lombok.extern.java.Log; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.context.ApplicationContext; -import org.springframework.context.annotation.ComponentScan; @Log public final class Worker extends LoggingMain { @@ -146,7 +136,6 @@ public final class Worker extends LoggingMain { private LoadingCache workerStubs; private AtomicBoolean released = new AtomicBoolean(true); - @Autowired private ApplicationContext springContext; /** * The method will prepare the worker for graceful shutdown when the worker is ready. Note on * using stderr here instead of log. By the time this is called in PreDestroy, the log is no @@ -196,43 +185,6 @@ private Worker() { super("BuildFarmShardWorker"); } - private void exitPostPipelineFailure() { - // Shutdown the worker if a pipeline fails. By means of the spring lifecycle - // hooks - e.g. the `PreDestroy` hook here - it will attempt to gracefully - // spin down the pipeline - - // By calling these spring shutdown facilities; we're open to the risk that - // a subsystem may be hanging a criticial thread indeffinitly. Deadline the - // shutdown workflow to ensure we don't leave a zombie worker in this - // situation - ScheduledExecutorService shutdownDeadlineExecutor = newSingleThreadScheduledExecutor(); - - // This may be shorter than the action timeout; assume we have interrupted - // actions in a fatal uncaught exception. - int forceShutdownDeadline = 60; - ScheduledFuture termFuture = - shutdownDeadlineExecutor.schedule( - new Runnable() { - public void run() { - log.log( - Level.SEVERE, - String.format( - "Force terminating due to shutdown deadline exceeded (%d seconds)", - forceShutdownDeadline)); - System.exit(1); - } - }, - forceShutdownDeadline, - SECONDS); - - // Consider defining exit codes to better afford out of band instance - // recovery - int code = SpringApplication.exit(springContext, () -> 1); - termFuture.cancel(false); - shutdownDeadlineExecutor.shutdown(); - System.exit(code); - } - private Operation stripOperation(Operation operation) { return instance.stripOperation(operation); } @@ -674,22 +626,9 @@ public void start() throws ConfigurationException, InterruptedException, IOExcep healthStatusManager.setStatus( HealthStatusManager.SERVICE_NAME_ALL_SERVICES, ServingStatus.SERVING); PrometheusPublisher.startHttpServer(configs.getPrometheusPort()); + startFailsafeRegistration(); - // An executor can also be used as storage worker as scheduler treats all new workers as storage workers - // TODO (Congt) Fix it upstream and revert it. - if (configs.getWorker().getCapabilities().isCas()) { - startFailsafeRegistration(); - } else { - log.log(INFO, "Skipping worker registration"); - } - - // Listen for pipeline unhandled exceptions - ExecutorService pipelineExceptionExecutor = newSingleThreadExecutor(); - SettableFuture pipelineExceptionFuture = SettableFuture.create(); - pipelineExceptionFuture.addListener(this::exitPostPipelineFailure, pipelineExceptionExecutor); - - pipeline.start(pipelineExceptionFuture); - + pipeline.start(null); healthCheckMetric.labels("start").inc(); executionSlotsTotal.set(configs.getWorker().getExecuteStageWidth()); inputFetchSlotsTotal.set(configs.getWorker().getInputFetchStageWidth()); diff --git a/src/test/java/build/buildfarm/common/services/WriteStreamObserverTest.java b/src/test/java/build/buildfarm/common/services/WriteStreamObserverTest.java index 5834ab1a87..4a3c864b24 100644 --- a/src/test/java/build/buildfarm/common/services/WriteStreamObserverTest.java +++ b/src/test/java/build/buildfarm/common/services/WriteStreamObserverTest.java @@ -115,6 +115,7 @@ public void noErrorWhenContextCancelled() throws Exception { when(instance.getBlobWrite( eq(Compressor.Value.IDENTITY), eq(cancelledDigest), + eq(DigestFunction.Value.UNKNOWN), eq(uuid), any(RequestMetadata.class))) .thenReturn(write); @@ -137,6 +138,7 @@ public void noErrorWhenContextCancelled() throws Exception { .getBlobWrite( eq(Compressor.Value.IDENTITY), eq(cancelledDigest), + eq(DigestFunction.Value.UNKNOWN), eq(uuid), any(RequestMetadata.class)); verifyNoInteractions(responseObserver); diff --git a/src/test/java/build/buildfarm/worker/PipelineTest.java b/src/test/java/build/buildfarm/worker/PipelineTest.java index 0901f4dad1..209f81ca43 100644 --- a/src/test/java/build/buildfarm/worker/PipelineTest.java +++ b/src/test/java/build/buildfarm/worker/PipelineTest.java @@ -110,7 +110,7 @@ public void stageExitsOnInterrupt() throws InterruptedException { Pipeline pipeline = new Pipeline(); TestStage stage = new TestStage("test"); pipeline.add(stage, 1); - pipeline.start(); + pipeline.start(null); pipeline.join(); } @@ -147,7 +147,7 @@ public void stageContinuesOnException() throws InterruptedException { Pipeline pipeline = new Pipeline(); ContinueStage stage = new ContinueStage("test"); pipeline.add(stage, 1); - pipeline.start(); + pipeline.start(null); boolean didNotThrow = false; try {