Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Catch exceptions in Streamlet operators #3044

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,16 @@ public void prepare(Map map, TopologyContext topologyContext, OutputCollector ou
@Override
public void execute(Tuple tuple) {
R obj = (R) tuple.getValue(0);
if (filterFn.test(obj)) {
collector.emit(new Values(obj));
try {
Boolean passed = filterFn.test(obj);
if (passed) {
collector.emit(new Values(obj));
}
collector.ack(tuple);
// SUPPRESS CHECKSTYLE IllegalCatch
} catch(Exception e) {
e.printStackTrace();
collector.fail(tuple);
}
collector.ack(tuple);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,17 @@ public void prepare(Map map, TopologyContext topologyContext, OutputCollector ou
@SuppressWarnings("unchecked")
@Override
public void execute(Tuple tuple) {
R obj = (R) tuple.getValue(0);
Iterable<? extends T> result = flatMapFn.apply(obj);
for (T o : result) {
collector.emit(new Values(o));
try {
R obj = (R) tuple.getValue(0);
Iterable<? extends T> result = flatMapFn.apply(obj);
for (T o : result) {
collector.emit(new Values(o));
}
collector.ack(tuple);
// SUPPRESS CHECKSTYLE IllegalCatch
} catch (Exception e) {
e.printStackTrace();
collector.fail(tuple);
}
collector.ack(tuple);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,20 @@ public void prepare(Map map, TopologyContext topologyContext, OutputCollector ou
public void execute(TupleWindow inputWindow) {
Map<K, VR> reduceMap = new HashMap<>();
Map<K, Integer> windowCountMap = new HashMap<>();
for (Tuple tuple : inputWindow.get()) {
V tup = (V) tuple.getValue(0);
addMap(reduceMap, windowCountMap, tup);
try {
for (Tuple tuple : inputWindow.get()) {
V tup = (V) tuple.getValue(0);
addMap(reduceMap, windowCountMap, tup);
}
// SUPPRESS CHECKSTYLE IllegalCatch
} catch (Exception e) {
e.printStackTrace();
for (Tuple tuple : inputWindow.get()) {
collector.fail(tuple);
}
return; // Early out
}

long startWindow;
long endWindow;
if (inputWindow.getStartTimestamp() == null) {
Expand All @@ -86,6 +96,9 @@ public void execute(TupleWindow inputWindow) {
KeyedWindow<K> keyedWindow = new KeyedWindow<>(key, window);
collector.emit(new Values(new KeyValue<>(keyedWindow, reduceMap.get(key))));
}
for (Tuple tuple : inputWindow.get()) {
collector.ack(tuple);
}
}

private void addMap(Map<K, VR> reduceMap, Map<K, Integer> windowCountMap, V tup) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,20 +90,32 @@ public Map<String, Object> getComponentConfiguration() {
@Override
public void execute(TupleWindow inputWindow) {
Map<K, Pair<List<V1>, List<V2>>> joinMap = new HashMap<>();
for (Tuple tuple : inputWindow.get()) {
if (tuple.getSourceComponent().equals(leftComponent)) {
V1 tup = (V1) tuple.getValue(0);
if (tup != null) {
addMapLeft(joinMap, tup);
}
} else {
V2 tup = (V2) tuple.getValue(0);
if (tup != null) {
addMapRight(joinMap, tup);
try {
for (Tuple tuple : inputWindow.get()) {
if (tuple.getSourceComponent().equals(leftComponent)) {
V1 tup = (V1) tuple.getValue(0);
if (tup != null) {
addMapLeft(joinMap, tup);
}
} else {
V2 tup = (V2) tuple.getValue(0);
if (tup != null) {
addMapRight(joinMap, tup);
}
}
}
evaluateJoinMap(joinMap, inputWindow);

for (Tuple tuple : inputWindow.get()) {
collector.ack(tuple);
}
// SUPPRESS CHECKSTYLE IllegalCatch
} catch (Exception e) {
e.printStackTrace();
for (Tuple tuple : inputWindow.get()) {
collector.fail(tuple);
}
}
evaluateJoinMap(joinMap, inputWindow);
}

private void evaluateJoinMap(Map<K, Pair<List<V1>, List<V2>>> joinMap, TupleWindow tupleWindow) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,14 @@ public void prepare(Map map, TopologyContext topologyContext, OutputCollector ou
@Override
public void execute(Tuple tuple) {
R obj = (R) tuple.getValue(0);
T result = mapFn.apply(obj);
collector.emit(new Values(result));
collector.ack(tuple);
try {
T result = mapFn.apply(obj);
collector.emit(new Values(result));
collector.ack(tuple);
// SUPPRESS CHECKSTYLE IllegalCatch
} catch (Exception e) {
e.printStackTrace();
collector.fail(tuple);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,20 @@ public void prepare(Map map, TopologyContext topologyContext, OutputCollector ou
public void execute(TupleWindow inputWindow) {
Map<K, V> reduceMap = new HashMap<>();
Map<K, Integer> windowCountMap = new HashMap<>();
for (Tuple tuple : inputWindow.get()) {
R tup = (R) tuple.getValue(0);
addMap(reduceMap, windowCountMap, tup);
try {
for (Tuple tuple : inputWindow.get()) {
R tup = (R) tuple.getValue(0);
addMap(reduceMap, windowCountMap, tup);
}
// SUPPRESS CHECKSTYLE IllegalCatch
} catch (Exception e) {
e.printStackTrace();
for (Tuple tuple : inputWindow.get()) {
collector.fail(tuple);
}
return; // Early out
}

long startWindow;
long endWindow;
if (inputWindow.getStartTimestamp() == null) {
Expand All @@ -87,6 +97,10 @@ public void execute(TupleWindow inputWindow) {
KeyedWindow<K> keyedWindow = new KeyedWindow<>(key, window);
collector.emit(new Values(new KeyValue<>(keyedWindow, reduceMap.get(key))));
}

for (Tuple tuple : inputWindow.get()) {
collector.ack(tuple);
}
}

private void addMap(Map<K, V> reduceMap, Map<K, Integer> windowCountMap, R tup) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,13 @@ public void prepare(Map<String, Object> map,
@Override
public void execute(Tuple tuple) {
R obj = (R) tuple.getValue(0);
serializableTransformer.transform(obj, x -> collector.emit(new Values(x)));
collector.ack(tuple);
try {
serializableTransformer.transform(obj, x -> collector.emit(new Values(x)));
collector.ack(tuple);
// SUPPRESS CHECKSTYLE IllegalCatch
} catch (Exception e) {
e.printStackTrace();
collector.fail(tuple);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,13 @@ public void prepare(Map map, TopologyContext topologyContext, OutputCollector ou
@Override
public void execute(Tuple tuple) {
I obj = (I) tuple.getValue(0);
collector.emit(new Values(obj));
collector.ack(tuple);
try {
collector.emit(new Values(obj));
collector.ack(tuple);
// SUPPRESS CHECKSTYLE IllegalCatch
} catch (Exception e) {
e.printStackTrace();
collector.fail(tuple);
}
}
}