Skip to content

Commit

Permalink
refactor: temporary usage of local CompletableFuture
Browse files Browse the repository at this point in the history
PodLogService uses an instance variable to keep state
derived from a method that can be invoked multiple times
and breaking the functionality.

This change (first of a series) moves to use a local variable instead.

Signed-off-by: Marc Nuri <[email protected]>
  • Loading branch information
manusa committed Jan 11, 2024
1 parent 7956212 commit 754436a
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,17 @@ public static <T> T get(CompletableFuture<T> completableFuture, Duration duratio
}
}

public static <T> T get(CompletableFuture<T> completableFuture) {
try {
return completableFuture.get();
} catch (ExecutionException e) {
throw new IllegalStateException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
}

@FunctionalInterface
public interface ThrowingFunction<T, R> {
R apply(T t) throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import lombok.Getter;
import lombok.NoArgsConstructor;
import org.eclipse.jkube.kit.common.KitLogger;
import org.eclipse.jkube.kit.common.util.AsyncUtil;
import org.eclipse.jkube.kit.common.util.KubernetesHelper;
import org.apache.commons.lang3.StringUtils;

Expand All @@ -41,6 +42,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;

Expand All @@ -65,7 +67,6 @@ public class PodLogService {
private Watch podWatcher;
private LogWatch logWatcher;
private final Map<String, Pod> addedPods = new ConcurrentHashMap<>();
private final CountDownLatch terminateLatch = new CountDownLatch(1);
private String watchingPodName;
private CountDownLatch logWatchTerminateLatch;

Expand Down Expand Up @@ -157,9 +158,10 @@ private void waitAndLogPods(final NamespacedKubernetesClient kc, LabelSelector s
}
}
}
final CompletableFuture<Void> logsRetrieved = new CompletableFuture<>();
// we may have missed the ADDED event so lets simulate one
if (latestPod != null) {
onPod(Watcher.Action.ADDED, latestPod, kc, ctrlCMessage, followLog);
onPod(logsRetrieved, Watcher.Action.ADDED, latestPod, kc, ctrlCMessage, followLog);
}
if (!watchAddedPodsOnly && !runningPod) {
log.warn("No pod is running yet. Are you sure you deployed your app using Eclipse JKube apply/deploy mechanism?");
Expand All @@ -168,7 +170,7 @@ private void waitAndLogPods(final NamespacedKubernetesClient kc, LabelSelector s
podWatcher = pods.watch(new Watcher<Pod>() {
@Override
public void eventReceived(Action action, Pod pod) {
onPod(action, pod, kc, ctrlCMessage, followLog);
onPod(logsRetrieved, action, pod, kc, ctrlCMessage, followLog);
}

@Override
Expand All @@ -177,18 +179,12 @@ public void onClose(WatcherException e) {
}
});

if (waitInCurrentThread) {
while (terminateLatch.getCount() > 0) {
try {
terminateLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
if (waitInCurrentThread && !logsRetrieved.isDone()) {
AsyncUtil.get(logsRetrieved);
}
}

private void onPod(Watcher.Action action, Pod pod, NamespacedKubernetesClient kubernetes, String ctrlCMessage, boolean followLog) {
private void onPod(CompletableFuture<Void> logsRetrieved, Watcher.Action action, Pod pod, NamespacedKubernetesClient kubernetes, String ctrlCMessage, boolean followLog) {
String name = KubernetesHelper.getName(pod);
if (action.equals(Watcher.Action.DELETED)) {
addedPods.remove(name);
Expand All @@ -211,11 +207,11 @@ private void onPod(Watcher.Action action, Pod pod, NamespacedKubernetesClient ku
}

if (watchPod != null && KubernetesHelper.isPodRunning(watchPod)) {
watchLogOfPodName(kubernetes, ctrlCMessage, followLog, watchPod, KubernetesHelper.getName(watchPod));
watchLogOfPodName(logsRetrieved, kubernetes, ctrlCMessage, followLog, watchPod, KubernetesHelper.getName(watchPod));
}
}

private void watchLogOfPodName(NamespacedKubernetesClient kubernetes, String ctrlCMessage, boolean followLog, Pod pod, String name) {
private void watchLogOfPodName(CompletableFuture<Void> logsRetrieved, NamespacedKubernetesClient kubernetes, String ctrlCMessage, boolean followLog, Pod pod, String name) {
if (watchingPodName == null || !watchingPodName.equals(name)) {
if (logWatcher != null) {
log.info("Closing log watcher for %s as now watching %s", watchingPodName, name);
Expand Down Expand Up @@ -250,7 +246,7 @@ private void watchLogOfPodName(NamespacedKubernetesClient kubernetes, String ctr
log.info("[[s]]%s", line);
}
}
terminateLatch.countDown();
logsRetrieved.complete(null);
}
}
}
Expand Down

0 comments on commit 754436a

Please sign in to comment.