From bf9fe93f76b895825d8852e010dffd5342e1f860 Mon Sep 17 00:00:00 2001 From: Boyang Jerry Peng Date: Sun, 1 Oct 2017 10:32:49 -0500 Subject: [PATCH] changing watermark generator to be triggered by tick tuples for event time based windowing (#2370) * changing watermark generator to be triggered by tick tuples for event time based windowing * fixing unit test --- .../heron/api/bolt/WindowedBoltExecutor.java | 10 ++-- .../windowing/WaterMarkEventGenerator.java | 46 +++---------------- .../WaterMarkEventGeneratorTest.java | 5 +- 3 files changed, 16 insertions(+), 45 deletions(-) diff --git a/heron/api/src/java/com/twitter/heron/api/bolt/WindowedBoltExecutor.java b/heron/api/src/java/com/twitter/heron/api/bolt/WindowedBoltExecutor.java index 79960b65dca..ace25cc32b7 100644 --- a/heron/api/src/java/com/twitter/heron/api/bolt/WindowedBoltExecutor.java +++ b/heron/api/src/java/com/twitter/heron/api/bolt/WindowedBoltExecutor.java @@ -201,7 +201,9 @@ private WindowManager initWindowManager(WindowLifecycleListener } else { watermarkInterval = DEFAULT_WATERMARK_EVENT_INTERVAL_MS; } - waterMarkEventGenerator = new WaterMarkEventGenerator<>(manager, watermarkInterval, + // Use tick tuple to perodically generate watermarks + Config.setTickTupleFrequencyMs(topoConf, watermarkInterval); + waterMarkEventGenerator = new WaterMarkEventGenerator<>(manager, maxLagMs, getComponentStreams(context)); } else { if (topoConf.containsKey(WindowingConfigs.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM)) { @@ -310,8 +312,10 @@ protected void doPrepare(Map topoConf, TopologyContext context, public void execute(Tuple input) { if (TupleUtils.isTick(input)) { long currTime = System.currentTimeMillis(); - triggerPolicy.track(new TimerEvent<>(input, currTime)); - evictionPolicy.track(new TimerEvent<>(input, currTime)); + windowManager.add(new TimerEvent<>(input, currTime)); + if (isTupleTs()) { + waterMarkEventGenerator.run(); + } } else { if (isTupleTs()) { long ts = timestampExtractor.extractTimestamp(input); diff --git a/heron/api/src/java/com/twitter/heron/api/windowing/WaterMarkEventGenerator.java b/heron/api/src/java/com/twitter/heron/api/windowing/WaterMarkEventGenerator.java index ffef85243fc..7eb69c80ed8 100644 --- a/heron/api/src/java/com/twitter/heron/api/windowing/WaterMarkEventGenerator.java +++ b/heron/api/src/java/com/twitter/heron/api/windowing/WaterMarkEventGenerator.java @@ -17,14 +17,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.logging.Logger; -import com.twitter.heron.api.exception.FailedException; import com.twitter.heron.api.generated.TopologyAPI; /** @@ -33,33 +26,27 @@ * across all the input streams (minus the lag). Once a watermark event is emitted * any tuple coming with an earlier timestamp can be considered as late events. */ -public class WaterMarkEventGenerator implements Runnable { - private static final Logger LOG = Logger.getLogger(WaterMarkEventGenerator.class.getName()); +public class WaterMarkEventGenerator { private final WindowManager windowManager; private final int eventTsLag; private final Set inputStreams; private final Map streamToTs; - private final ScheduledExecutorService executorService; - private final int interval; - private ScheduledFuture executorFuture; private volatile long lastWaterMarkTs; + private boolean started = false; /** * Creates a new WatermarkEventGenerator. * * @param windowManager The window manager this generator will submit watermark events to - * @param intervalMs The generator will check if it should generate a watermark event with this * interval * @param eventTsLagMs The max allowed lag behind the last watermark event before an event is * considered late * @param inputStreams The input streams this generator is expected to handle */ - public WaterMarkEventGenerator(WindowManager windowManager, int intervalMs, int - eventTsLagMs, Set inputStreams) { + public WaterMarkEventGenerator(WindowManager windowManager, int eventTsLagMs, + Set inputStreams) { this.windowManager = windowManager; streamToTs = new ConcurrentHashMap<>(); - executorService = Executors.newSingleThreadScheduledExecutor(); - this.interval = intervalMs; this.eventTsLag = eventTsLagMs; this.inputStreams = inputStreams; } @@ -74,22 +61,16 @@ public boolean track(TopologyAPI.StreamId stream, long ts) { if (currentVal == null || ts > currentVal) { streamToTs.put(stream, ts); } - checkFailures(); return ts >= lastWaterMarkTs; } - @Override - @SuppressWarnings("IllegalCatch") public void run() { - try { + if (started) { long waterMarkTs = computeWaterMarkTs(); if (waterMarkTs > lastWaterMarkTs) { this.windowManager.add(new WaterMarkEvent<>(waterMarkTs)); lastWaterMarkTs = waterMarkTs; } - } catch (Throwable th) { - LOG.severe(String.format("Failed while processing watermark event\n%s", th)); - throw th; } } @@ -108,22 +89,7 @@ private long computeWaterMarkTs() { return ts - eventTsLag; } - private void checkFailures() { - if (executorFuture != null && executorFuture.isDone()) { - try { - executorFuture.get(); - } catch (InterruptedException ex) { - LOG.severe(String.format("Got exception:\n%s", ex)); - throw new FailedException(ex); - } catch (ExecutionException ex) { - LOG.severe(String.format("Got exception:\n%s", ex)); - throw new FailedException(ex.getCause()); - } - } - } - public void start() { - this.executorFuture = executorService.scheduleAtFixedRate(this, interval, interval, TimeUnit - .MILLISECONDS); + started = true; } } diff --git a/heron/api/tests/java/com/twitter/heron/api/windowing/WaterMarkEventGeneratorTest.java b/heron/api/tests/java/com/twitter/heron/api/windowing/WaterMarkEventGeneratorTest.java index 4191f33cce7..01ef89a9ab2 100644 --- a/heron/api/tests/java/com/twitter/heron/api/windowing/WaterMarkEventGeneratorTest.java +++ b/heron/api/tests/java/com/twitter/heron/api/windowing/WaterMarkEventGeneratorTest.java @@ -48,7 +48,7 @@ public void add(Event event) { } }; // set watermark interval to a high value and trigger manually to fix timing issues - waterMarkEventGenerator = new WaterMarkEventGenerator<>(windowManager, 100000, 5, Collections + waterMarkEventGenerator = new WaterMarkEventGenerator<>(windowManager, 5, Collections .singleton(streamId("s1"))); waterMarkEventGenerator.start(); } @@ -77,7 +77,8 @@ public void testTrackTwoStreams() throws Exception { Set streams = new HashSet<>(); streams.add(streamId("s1")); streams.add(streamId("s2")); - waterMarkEventGenerator = new WaterMarkEventGenerator<>(windowManager, 100000, 5, streams); + waterMarkEventGenerator = new WaterMarkEventGenerator<>(windowManager, 5, streams); + waterMarkEventGenerator.start(); waterMarkEventGenerator.track(streamId("s1"), 100); waterMarkEventGenerator.track(streamId("s1"), 110);