From 754436a54f3c276123e3cc273eba5605ce9edbbe Mon Sep 17 00:00:00 2001 From: Marc Nuri Date: Thu, 11 Jan 2024 18:26:07 +0100 Subject: [PATCH] refactor: temporary usage of local CompletableFuture PodLogService uses an instance variable to keep state derived from a method that can be invoked multiple times and breaking the functionality. This change (first of a series) moves to use a local variable instead. Signed-off-by: Marc Nuri --- .../jkube/kit/common/util/AsyncUtil.java | 11 ++++++++ .../kit/config/service/PodLogService.java | 26 ++++++++----------- 2 files changed, 22 insertions(+), 15 deletions(-) diff --git a/jkube-kit/common/src/main/java/org/eclipse/jkube/kit/common/util/AsyncUtil.java b/jkube-kit/common/src/main/java/org/eclipse/jkube/kit/common/util/AsyncUtil.java index cfed5dfe73..3a9a33bbd5 100644 --- a/jkube-kit/common/src/main/java/org/eclipse/jkube/kit/common/util/AsyncUtil.java +++ b/jkube-kit/common/src/main/java/org/eclipse/jkube/kit/common/util/AsyncUtil.java @@ -78,6 +78,17 @@ public static T get(CompletableFuture completableFuture, Duration duratio } } + public static T get(CompletableFuture completableFuture) { + try { + return completableFuture.get(); + } catch (ExecutionException e) { + throw new IllegalStateException(e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } + } + @FunctionalInterface public interface ThrowingFunction { R apply(T t) throws Exception; diff --git a/jkube-kit/config/service/src/main/java/org/eclipse/jkube/kit/config/service/PodLogService.java b/jkube-kit/config/service/src/main/java/org/eclipse/jkube/kit/config/service/PodLogService.java index 47cd7bf909..f057bead25 100644 --- a/jkube-kit/config/service/src/main/java/org/eclipse/jkube/kit/config/service/PodLogService.java +++ b/jkube-kit/config/service/src/main/java/org/eclipse/jkube/kit/config/service/PodLogService.java @@ -32,6 +32,7 @@ import lombok.Getter; import lombok.NoArgsConstructor; import org.eclipse.jkube.kit.common.KitLogger; +import org.eclipse.jkube.kit.common.util.AsyncUtil; import org.eclipse.jkube.kit.common.util.KubernetesHelper; import org.apache.commons.lang3.StringUtils; @@ -41,6 +42,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -65,7 +67,6 @@ public class PodLogService { private Watch podWatcher; private LogWatch logWatcher; private final Map addedPods = new ConcurrentHashMap<>(); - private final CountDownLatch terminateLatch = new CountDownLatch(1); private String watchingPodName; private CountDownLatch logWatchTerminateLatch; @@ -157,9 +158,10 @@ private void waitAndLogPods(final NamespacedKubernetesClient kc, LabelSelector s } } } + final CompletableFuture logsRetrieved = new CompletableFuture<>(); // we may have missed the ADDED event so lets simulate one if (latestPod != null) { - onPod(Watcher.Action.ADDED, latestPod, kc, ctrlCMessage, followLog); + onPod(logsRetrieved, Watcher.Action.ADDED, latestPod, kc, ctrlCMessage, followLog); } if (!watchAddedPodsOnly && !runningPod) { log.warn("No pod is running yet. Are you sure you deployed your app using Eclipse JKube apply/deploy mechanism?"); @@ -168,7 +170,7 @@ private void waitAndLogPods(final NamespacedKubernetesClient kc, LabelSelector s podWatcher = pods.watch(new Watcher() { @Override public void eventReceived(Action action, Pod pod) { - onPod(action, pod, kc, ctrlCMessage, followLog); + onPod(logsRetrieved, action, pod, kc, ctrlCMessage, followLog); } @Override @@ -177,18 +179,12 @@ public void onClose(WatcherException e) { } }); - if (waitInCurrentThread) { - while (terminateLatch.getCount() > 0) { - try { - terminateLatch.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } + if (waitInCurrentThread && !logsRetrieved.isDone()) { + AsyncUtil.get(logsRetrieved); } } - private void onPod(Watcher.Action action, Pod pod, NamespacedKubernetesClient kubernetes, String ctrlCMessage, boolean followLog) { + private void onPod(CompletableFuture logsRetrieved, Watcher.Action action, Pod pod, NamespacedKubernetesClient kubernetes, String ctrlCMessage, boolean followLog) { String name = KubernetesHelper.getName(pod); if (action.equals(Watcher.Action.DELETED)) { addedPods.remove(name); @@ -211,11 +207,11 @@ private void onPod(Watcher.Action action, Pod pod, NamespacedKubernetesClient ku } if (watchPod != null && KubernetesHelper.isPodRunning(watchPod)) { - watchLogOfPodName(kubernetes, ctrlCMessage, followLog, watchPod, KubernetesHelper.getName(watchPod)); + watchLogOfPodName(logsRetrieved, kubernetes, ctrlCMessage, followLog, watchPod, KubernetesHelper.getName(watchPod)); } } - private void watchLogOfPodName(NamespacedKubernetesClient kubernetes, String ctrlCMessage, boolean followLog, Pod pod, String name) { + private void watchLogOfPodName(CompletableFuture logsRetrieved, NamespacedKubernetesClient kubernetes, String ctrlCMessage, boolean followLog, Pod pod, String name) { if (watchingPodName == null || !watchingPodName.equals(name)) { if (logWatcher != null) { log.info("Closing log watcher for %s as now watching %s", watchingPodName, name); @@ -250,7 +246,7 @@ private void watchLogOfPodName(NamespacedKubernetesClient kubernetes, String ctr log.info("[[s]]%s", line); } } - terminateLatch.countDown(); + logsRetrieved.complete(null); } } }