From 79562123a47b88b2aa440ffc35252f2f2f4f4a51 Mon Sep 17 00:00:00 2001 From: Marc Nuri Date: Thu, 11 Jan 2024 15:32:37 +0100 Subject: [PATCH] refactor: KubernetesHelper.printLogsAsync reused common tools Signed-off-by: Marc Nuri --- .../jkube/kit/common/util/AsyncUtil.java | 13 ++++- .../kit/common/util/KubernetesHelper.java | 54 +++++++++---------- .../kit/config/service/PodLogService.java | 7 ++- .../openshift/OpenshiftBuildService.java | 8 ++- 4 files changed, 47 insertions(+), 35 deletions(-) diff --git a/jkube-kit/common/src/main/java/org/eclipse/jkube/kit/common/util/AsyncUtil.java b/jkube-kit/common/src/main/java/org/eclipse/jkube/kit/common/util/AsyncUtil.java index 052ed2adba..cfed5dfe73 100644 --- a/jkube-kit/common/src/main/java/org/eclipse/jkube/kit/common/util/AsyncUtil.java +++ b/jkube-kit/common/src/main/java/org/eclipse/jkube/kit/common/util/AsyncUtil.java @@ -34,11 +34,11 @@ private static class ExecutorServiceHolder { public static final ExecutorService INSTANCE = Executors.newCachedThreadPool(); } - public static CompletableFuture async(Callable callable) { + public static CompletableFuture async(ThrowingFunction, T> function) { final CompletableFuture future = new CompletableFuture<>(); CompletableFuture.runAsync(() -> { try { - future.complete(callable.call()); + future.complete(function.apply(future)); } catch (Exception ex) { future.completeExceptionally(ex); } @@ -51,6 +51,10 @@ public static CompletableFuture async(Callable callable) { return future; } + public static CompletableFuture async(Callable callable) { + return async(f -> callable.call()); + } + public static Function, CompletableFuture> await(Supplier supplier) { return predicate -> async(() -> { T ret; @@ -73,4 +77,9 @@ public static T get(CompletableFuture completableFuture, Duration duratio throw new IllegalStateException("Failure while waiting to get future ", e); } } + + @FunctionalInterface + public interface ThrowingFunction { + R apply(T t) throws Exception; + } } diff --git a/jkube-kit/common/src/main/java/org/eclipse/jkube/kit/common/util/KubernetesHelper.java b/jkube-kit/common/src/main/java/org/eclipse/jkube/kit/common/util/KubernetesHelper.java index 165289782e..2d71e19917 100644 --- a/jkube-kit/common/src/main/java/org/eclipse/jkube/kit/common/util/KubernetesHelper.java +++ b/jkube-kit/common/src/main/java/org/eclipse/jkube/kit/common/util/KubernetesHelper.java @@ -14,12 +14,10 @@ package org.eclipse.jkube.kit.common.util; -import java.io.BufferedReader; import java.io.File; import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; @@ -35,7 +33,8 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -45,6 +44,8 @@ import io.fabric8.kubernetes.api.model.authorization.v1.SelfSubjectAccessReview; import io.fabric8.kubernetes.api.model.authorization.v1.SelfSubjectAccessReviewBuilder; import io.fabric8.kubernetes.client.utils.ApiVersionUtil; +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.LineIterator; import org.eclipse.jkube.kit.common.KitLogger; import io.fabric8.kubernetes.api.model.Container; @@ -89,6 +90,8 @@ import io.fabric8.openshift.api.model.DeploymentConfigSpec; import org.apache.commons.lang3.StringUtils; +import static org.eclipse.jkube.kit.common.util.AsyncUtil.async; + public class KubernetesHelper { protected static final String DATE_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ssX"; @@ -383,32 +386,23 @@ public static String getBuildStatusPhase(Build build) { return null; } - public static void printLogsAsync(LogWatch logWatcher, final String failureMessage, final CountDownLatch terminateLatch, final KitLogger log) { - final InputStream in = logWatcher.getOutput(); - Thread thread = new Thread() { - @Override - public void run() { - try (BufferedReader reader = new BufferedReader(new InputStreamReader(in))) { - while (true) { - String line = reader.readLine(); - if (line == null) { - return; - } - if (terminateLatch.getCount() <= 0L) { - return; - } - log.info("[[s]]%s", line); - } - } catch (IOException e) { - // Check again the latch which could be already count down to zero in between - // so that an IO exception occurs on read - if (terminateLatch.getCount() > 0L) { - log.error("%s : %s", failureMessage, e); - } - } - } - }; - thread.start(); + public static CompletableFuture printLogsAsync(LogWatch logWatcher, Consumer lineConsumer) { + final LineIterator it = IOUtils.lineIterator(logWatcher.getOutput(), StandardCharsets.UTF_8); + return async(cf -> { + while (it.hasNext()) { + final String line = it.nextLine(); + if (line == null) { + cf.complete(null); + } else { + lineConsumer.accept(line); + } + // Can be completed internally or externally + if (cf.isDone()) { + return null; + } + } + return null; + }); } public static String getBuildStatusReason(Build build) { diff --git a/jkube-kit/config/service/src/main/java/org/eclipse/jkube/kit/config/service/PodLogService.java b/jkube-kit/config/service/src/main/java/org/eclipse/jkube/kit/config/service/PodLogService.java index d1a224dce5..47cd7bf909 100644 --- a/jkube-kit/config/service/src/main/java/org/eclipse/jkube/kit/config/service/PodLogService.java +++ b/jkube-kit/config/service/src/main/java/org/eclipse/jkube/kit/config/service/PodLogService.java @@ -282,7 +282,12 @@ private void watchLog(final LogWatch logWatcher, String podName, final String fa context.getNewPodLog().info("Press Ctrl-C to " + ctrlCMessage); context.getNewPodLog().info(""); - KubernetesHelper.printLogsAsync(logWatcher, failureMessage, this.logWatchTerminateLatch, log); + KubernetesHelper.printLogsAsync(logWatcher, line -> log.info("[[s]]%s", line)) + .whenComplete((v, t) -> { + if (t != null) { + log.error("%s: %s", failureMessage, t); + } + }); } private String containerNameMessage(String containerName) { diff --git a/jkube-kit/config/service/src/main/java/org/eclipse/jkube/kit/config/service/openshift/OpenshiftBuildService.java b/jkube-kit/config/service/src/main/java/org/eclipse/jkube/kit/config/service/openshift/OpenshiftBuildService.java index ec7ea89e1c..b6842cb34d 100644 --- a/jkube-kit/config/service/src/main/java/org/eclipse/jkube/kit/config/service/openshift/OpenshiftBuildService.java +++ b/jkube-kit/config/service/src/main/java/org/eclipse/jkube/kit/config/service/openshift/OpenshiftBuildService.java @@ -467,8 +467,12 @@ private void waitForOpenShiftBuildToComplete(OpenShiftClient client, Build build waitUntilPodIsReady(buildName + "-build", 120, log); log.info("Waiting for build " + buildName + " to complete..."); try (LogWatch logWatch = client.pods().inNamespace(applicableOpenShiftNamespace).withName(buildName + "-build").watchLog()) { - KubernetesHelper.printLogsAsync(logWatch, - "Failed to tail build log", logTerminateLatch, log); + KubernetesHelper.printLogsAsync(logWatch, line -> log.info("[[s]]%s", line)) + .whenComplete((v, t) -> { + if (t != null) { + log.error("Failed to tail build log: %s", t); + } + }); Watcher buildWatcher = getBuildWatcher(latch, buildName, buildHolder); try (Watch watcher = client.builds().inNamespace(applicableOpenShiftNamespace).withName(buildName).watch(buildWatcher)) { // Check if the build is already finished to avoid waiting indefinitely