diff --git a/WORKSPACE b/WORKSPACE index 9e586d719a6..18259ef6d77 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -277,6 +277,18 @@ maven_jar( artifact = "org.apache.reef:tang:" + reef_version ) +# added for trident prototype +maven_jar( + name = "org_clojure_clojure", + artifact = "org.clojure:clojure:1.7.0" +) + +# added for trident prototype +maven_jar( + name = "org_apache_storm_core", + artifact = "org.apache.storm:storm-core:1.0.0" +) + maven_jar( name = "org_apache_thrift_libthrift", artifact = "org.apache.thrift:libthrift:0.5.0-1", diff --git a/heron/common/src/java/com/twitter/heron/common/utils/logging/LoggingHelper.java b/heron/common/src/java/com/twitter/heron/common/utils/logging/LoggingHelper.java index 03a76617ff1..eccde5b82b7 100644 --- a/heron/common/src/java/com/twitter/heron/common/utils/logging/LoggingHelper.java +++ b/heron/common/src/java/com/twitter/heron/common/utils/logging/LoggingHelper.java @@ -76,6 +76,7 @@ public static void loggerInit(Level level, boolean isRedirectStdOutErr, String f if (rootLogger.getLevel().intValue() < Level.WARNING.intValue()) { // zookeeper logging scares me. if people want this, we can patch to config-drive this Logger.getLogger("org.apache.zookeeper").setLevel(Level.WARNING); + Logger.getLogger("org.apache.storm.shade.org.apache.zookeeper").setLevel(Level.WARNING); } if (isRedirectStdOutErr) { diff --git a/heron/examples/src/java/BUILD b/heron/examples/src/java/BUILD index 9b5fad5280d..2becdc632f6 100644 --- a/heron/examples/src/java/BUILD +++ b/heron/examples/src/java/BUILD @@ -7,6 +7,7 @@ java_binary( "//heron/api/src/java:api-java", "//heron/common/src/java:basics-java", "//heron/storm/src/java:storm-compatibility-java", + "@org_apache_storm_core//jar", ], create_executable = 0, ) diff --git a/heron/examples/src/java/com/twitter/heron/examples/TridentWordCountTopology.java b/heron/examples/src/java/com/twitter/heron/examples/TridentWordCountTopology.java new file mode 100644 index 00000000000..a95f32b45e1 --- /dev/null +++ b/heron/examples/src/java/com/twitter/heron/examples/TridentWordCountTopology.java @@ -0,0 +1,88 @@ +// Copyright 2017 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License +package com.twitter.heron.examples; + +import java.util.Arrays; +import java.util.Collections; + +import org.apache.storm.Config; +import org.apache.storm.generated.AlreadyAliveException; +import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.trident.TridentState; +import org.apache.storm.trident.TridentTopology; +import org.apache.storm.trident.operation.builtin.Count; +import org.apache.storm.trident.testing.FixedBatchSpout; +import org.apache.storm.trident.testing.MemoryMapState; +import org.apache.storm.trident.testing.Split; +import org.apache.storm.trident.topology.TridentTopologyBuilder; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; + +import org.apache.storm.StormSubmitter; + +/** + * + */ +public class TridentWordCountTopology { + + public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException { + if (args.length < 1) { + throw new RuntimeException("Specify topology name"); + } + + int parallelism = 1; + if (args.length > 1) { + parallelism = Integer.parseInt(args[1]); + } + + @SuppressWarnings("unchecked") + FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, + new Values("the cow jumped over the moon"), + new Values("the man went to the store and bought some candy"), + new Values("four score and seven years ago"), + new Values("how many apples can you eat")); + spout.setCycle(true); + + // This spout cycles through that set of sentences over and over to produce the sentence stream. + // Here's the code to do the streaming word count part of the computation: + TridentTopology topology = new TridentTopology(); + TridentState wordCounts = + topology.newStream("spout1", spout) + .each(new Fields("sentence"), new Split(), new Fields("word")) + .groupBy(new Fields("word")) + .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")) + .parallelismHint(parallelism); + + Config conf = new Config(); + conf.setDebug(true); + + // TODO: for some reason this is automatically added to spouts but not bolts... + conf.put(Config.TOPOLOGY_KRYO_REGISTER, Collections.singletonList( + "org.apache.storm.trident.topology.TransactionAttempt")); + conf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Collections.singletonList("localhost")); + conf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, 2181); + conf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, "/transaction_root"); + conf.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 5000); + conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 5000); + conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 5); + conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 5000); + conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING, 5000); + //Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS + conf.put("topology.trident.batch.emit.interval.millis", 5000); + + StormTopology stormTopology = topology.build(); + StormSubmitter.submitTopology(args[0], conf, stormTopology); + } +} diff --git a/heron/executor/src/python/heron_executor.py b/heron/executor/src/python/heron_executor.py index 6314112be53..4eab7da9892 100755 --- a/heron/executor/src/python/heron_executor.py +++ b/heron/executor/src/python/heron_executor.py @@ -491,7 +491,13 @@ def _get_java_instance_cmd(self, instance_info): '-XX:+HeapDumpOnOutOfMemoryError', '-XX:+UseConcMarkSweepGC', '-XX:ParallelGCThreads=4', - '-Xloggc:log-files/gc.%s.log' % instance_id] + '-Xloggc:log-files/gc.%s.log' % instance_id.replace("$", "")] + if global_task_id == -1: # Used to enable debugging of specific instances during startup + instance_cmd =\ + instance_cmd + ["-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005"] + instance_cmd = instance_cmd + [ + "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=500%d" % global_task_id] + instance_cmd = instance_cmd + self.instance_jvm_opts.split() if component_name in self.component_jvm_opts: instance_cmd = instance_cmd + self.component_jvm_opts[component_name].split() diff --git a/heron/instance/src/java/com/twitter/heron/instance/HeronInstance.java b/heron/instance/src/java/com/twitter/heron/instance/HeronInstance.java index d670367ce17..77050207d35 100644 --- a/heron/instance/src/java/com/twitter/heron/instance/HeronInstance.java +++ b/heron/instance/src/java/com/twitter/heron/instance/HeronInstance.java @@ -159,7 +159,7 @@ public static void main(String[] args) throws IOException { // Init the logging setting and redirect the stdout and stderr to logging // For now we just set the logging level as INFO; later we may accept an argument to set it. - Level loggingLevel = Level.INFO; + Level loggingLevel = Level.FINE; String loggingDir = systemConfig.getHeronLoggingDirectory(); // Log to file and TMaster diff --git a/heron/instance/src/java/com/twitter/heron/instance/bolt/BoltOutputCollectorImpl.java b/heron/instance/src/java/com/twitter/heron/instance/bolt/BoltOutputCollectorImpl.java index 1153cef360b..4410c991d21 100644 --- a/heron/instance/src/java/com/twitter/heron/instance/bolt/BoltOutputCollectorImpl.java +++ b/heron/instance/src/java/com/twitter/heron/instance/bolt/BoltOutputCollectorImpl.java @@ -15,6 +15,7 @@ package com.twitter.heron.instance.bolt; import java.time.Duration; +import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.List; @@ -126,8 +127,8 @@ private List admitBoltTuple(String streamId, sendTuple(bldr, streamId, tuple); - // TODO:- remove this after changing the api - return null; + // TODO: this used to return null. modified to make Trident work. Verify this is correct + return new ArrayList<>(); } private void admitAckTuple(Tuple tuple) { diff --git a/heron/storm/src/java/BUILD b/heron/storm/src/java/BUILD index c9ed8e2fd51..2efc128604a 100644 --- a/heron/storm/src/java/BUILD +++ b/heron/storm/src/java/BUILD @@ -7,6 +7,15 @@ storm_deps_files = [ "//heron/common/src/java:basics-java", "//heron/simulator/src/java:simulator-java", "@com_googlecode_json_simple_json_simple//jar", + "//heron/proto:proto_topology_java", # added for trident prototype + "@commons_io_commons_io//jar", # added for trident prototype + "@commons_lang_commons_lang//jar", # added for trident prototype + "@org_apache_storm_core//jar", # added for trident prototype + "@org_clojure_clojure//jar", # added for trident prototype + "@org_slf4j_slf4j_api//jar", # added for trident prototype + "@org_slf4j_slf4j_jdk14//jar", # added for trident prototype + "@org_yaml_snakeyaml//jar", # added for trident prototype + "@org_ow2_asm_asm_all//jar", # added for trident prototype, required at runtime "//third_party/java:kryo-neverlink", ] diff --git a/heron/storm/src/java/org/apache/storm/Config.java b/heron/storm/src/java/org/apache/storm/Config.java index e226a1caf5b..14682822305 100644 --- a/heron/storm/src/java/org/apache/storm/Config.java +++ b/heron/storm/src/java/org/apache/storm/Config.java @@ -27,6 +27,7 @@ import org.apache.storm.serialization.IKryoDecorator; import org.apache.storm.serialization.IKryoFactory; +import org.apache.storm.validation.ConfigValidationAnnotations; /** * Topology configs are specified as a plain old map. This class provides a @@ -269,6 +270,63 @@ public class Config extends HashMap { */ public static final String STORM_ZOOKEEPER_PORT = "storm.zookeeper.port"; + /** + * The ceiling of the interval between retries of a Zookeeper operation. + */ + @ConfigValidationAnnotations.isInteger + public static final String STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING="storm.zookeeper.retry.intervalceiling.millis"; + + /** + * A list of hosts of Exhibitor servers used to discover/maintain connection to ZooKeeper cluster. + * Any configured ZooKeeper servers will be used for the curator/exhibitor backup connection string. + */ + @ConfigValidationAnnotations.isStringList + public static final String STORM_EXHIBITOR_SERVERS = "storm.exhibitor.servers"; + + /** + * The port Storm will use to connect to each of the exhibitor servers. + */ + @ConfigValidationAnnotations.isInteger + @ConfigValidationAnnotations.isPositiveNumber + public static final String STORM_EXHIBITOR_PORT = "storm.exhibitor.port"; + + /* + * How often to poll Exhibitor cluster in millis. + */ + @ConfigValidationAnnotations.isString + public static final String STORM_EXHIBITOR_URIPATH="storm.exhibitor.poll.uripath"; + + /** + * How often to poll Exhibitor cluster in millis. + */ + @ConfigValidationAnnotations.isInteger + public static final String STORM_EXHIBITOR_POLL="storm.exhibitor.poll.millis"; + + /** + * The number of times to retry an Exhibitor operation. + */ + @ConfigValidationAnnotations.isInteger + public static final String STORM_EXHIBITOR_RETRY_TIMES="storm.exhibitor.retry.times"; + + /** + * The interval between retries of an Exhibitor operation. + */ + @ConfigValidationAnnotations.isInteger + public static final String STORM_EXHIBITOR_RETRY_INTERVAL="storm.exhibitor.retry.interval"; + + /** + * The ceiling of the interval between retries of an Exhibitor operation. + */ + @ConfigValidationAnnotations.isInteger + public static final String STORM_EXHIBITOR_RETRY_INTERVAL_CEILING="storm.exhibitor.retry.intervalceiling.millis"; + + /** + * How often a batch can be emitted in a Trident topology. + */ + @ConfigValidationAnnotations.isInteger + @ConfigValidationAnnotations.isPositiveNumber + public static final String TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS="topology.trident.batch.emit.interval.millis"; + /** * The root directory in ZooKeeper for metadata about TransactionalSpouts. */ diff --git a/heron/storm/src/java/org/apache/storm/generated/GlobalStreamId.java b/heron/storm/src/java/org/apache/storm/generated/GlobalStreamId.java index 8d167329518..49770723a6b 100644 --- a/heron/storm/src/java/org/apache/storm/generated/GlobalStreamId.java +++ b/heron/storm/src/java/org/apache/storm/generated/GlobalStreamId.java @@ -18,7 +18,10 @@ package org.apache.storm.generated; -public class GlobalStreamId { +import java.io.Serializable; + +public class GlobalStreamId implements Serializable { + private static final long serialVersionUID = 1873909238460677921L; private String componentId; // required private String streamId; // required @@ -60,4 +63,28 @@ public void set_streamId(String newStreamId) { public void unset_streamId() { this.streamId = null; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + GlobalStreamId that = (GlobalStreamId) o; + + if (componentId != null ? !componentId.equals(that.componentId) : that.componentId != null) + return false; + return streamId != null ? streamId.equals(that.streamId) : that.streamId == null; + } + + @Override + public int hashCode() { + int result = componentId != null ? componentId.hashCode() : 0; + result = 31 * result + (streamId != null ? streamId.hashCode() : 0); + return result; + } + + @Override + public String toString() { + return "GlobalStreamId{componentId='" + componentId + "', streamId='" + streamId + "'}"; + } } diff --git a/heron/storm/src/java/org/apache/storm/task/GeneralTopologyContext.java b/heron/storm/src/java/org/apache/storm/task/GeneralTopologyContext.java index b674200fb58..f180020a4bd 100644 --- a/heron/storm/src/java/org/apache/storm/task/GeneralTopologyContext.java +++ b/heron/storm/src/java/org/apache/storm/task/GeneralTopologyContext.java @@ -18,14 +18,20 @@ package org.apache.storm.task; +import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.generated.Grouping; import org.apache.storm.generated.StormTopology; import org.apache.storm.tuple.Fields; import org.json.simple.JSONAware; +import com.twitter.heron.api.generated.TopologyAPI; + // import org.apache.storm.generated.ComponentCommon; // import org.apache.storm.generated.GlobalStreamId; // import org.apache.storm.generated.Grouping; @@ -104,10 +110,9 @@ public Fields getComponentOutputFields(String componentId, String streamId) { /** * Gets the declared output fields for the specified global stream id. */ - /* public Fields getComponentOutputFields(GlobalStreamId id) { + return getComponentOutputFields(id.get_componentId(), id.get_streamId()); } - */ /** * Gets the declared inputs to the specified component. @@ -120,6 +125,61 @@ public Map getSources(String componentId) { } */ + // TODO: this is total jank + public Map getSources(String componentId) { + Map heron = delegate.getSources(componentId); + Map converted = new NoValueMap<>(); + for (TopologyAPI.StreamId heronStreamId : heron.keySet()) { + converted.put(new GlobalStreamId(heronStreamId.getComponentName(), heronStreamId.getId()), null); + } + return converted; + } + + // TODO: this is total jank + private class NoValueMap extends HashMap { + private static final long serialVersionUID = -729425631561874300L; + + @Override + public Collection values() { + throw new RuntimeException("Values not supported. Map only being used in context where callers want keyset"); + } + + @Override + public Set> entrySet() { + throw new RuntimeException("Values not supported. Map only being used in context where callers want keyset"); + } + + @Override + public V get(Object key) { + throw new RuntimeException("Values not supported. Map only being used in context where callers want keyset"); + } + } + + /** + * Gets information about who is consuming the outputs of the specified component, + * and how. + * + * @return Map from stream id to component id to the Grouping used. + */ +// public Map> getHeronTargets(String componentId) { +// return delegate.getTargets(componentId); +// } + + public Map> getTargets(String componentId) { + Map> heron = delegate.getTargets(componentId); + Map> converted = new HashMap<>(); + for (String streamId : heron.keySet()) { + // TODO: this is total jank + Map heronGrouping = heron.get(streamId); + HashMap groupingConverted = new NoValueMap<>(); + for (String key : heronGrouping.keySet()) { + groupingConverted.put(key, null); + } + converted.put(streamId, groupingConverted); + } + return converted; + } + /** * Gets information about who is consuming the outputs of the specified component, * and how. diff --git a/heron/storm/src/java/org/apache/storm/task/TopologyContext.java b/heron/storm/src/java/org/apache/storm/task/TopologyContext.java index fb4da90fb4f..b88315cca1e 100644 --- a/heron/storm/src/java/org/apache/storm/task/TopologyContext.java +++ b/heron/storm/src/java/org/apache/storm/task/TopologyContext.java @@ -22,10 +22,13 @@ // import org.apache.storm.generated.Grouping; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.generated.Grouping; import org.apache.storm.generated.StormTopology; import org.apache.storm.hooks.ITaskHook; import org.apache.storm.hooks.ITaskHookDelegate; @@ -51,6 +54,7 @@ */ public class TopologyContext extends WorkerTopologyContext implements IMetricsContext { private com.twitter.heron.api.topology.TopologyContext delegate; + private Map _executorData; // Constructor to match the signature of the storm's TopologyContext // Note that here, we fake the clojure.lang.Atom by creating our own class @@ -67,11 +71,13 @@ public TopologyContext(StormTopology topology, Map stormConf, Map executorData, Map registeredMetrics, org.apache.storm.clojure.lang.Atom openOrPrepareWasCalled) { super((com.twitter.heron.api.topology.TopologyContext) null); + this._executorData = executorData; } public TopologyContext(com.twitter.heron.api.topology.TopologyContext delegate) { super(delegate); this.delegate = delegate; + this._executorData = new HashMap<>(); } /** @@ -188,16 +194,28 @@ public Map getThisSources() { } */ + public Map getThisSources() { + return getSources(getThisComponentId()); + } + /* * Gets information about who is consuming the outputs of this component, and how. * * @return Map from stream id to component id to the Grouping used. */ +// public Map> getThisHeronTargets() { +// return getHeronTargets(getThisComponentId()); +// } + /* + * Gets information about who is consuming the outputs of this component, and how. + * + * @return Map from stream id to component id to the Grouping used. + */ public Map> getThisTargets() { return getTargets(getThisComponentId()); } - */ + public void setTaskData(String name, Object data) { delegate.setTaskData(name, data); } @@ -206,15 +224,13 @@ public Object getTaskData(String name) { return delegate.getTaskData(name); } - /* - public void setExecutorData(String name, Object data) { - _executorData.put(name, data); - } + public void setExecutorData(String name, Object data) { + _executorData.put(name, data); + } - public Object getExecutorData(String name) { - return _executorData.get(name); - } - */ + public Object getExecutorData(String name) { + return _executorData.get(name); + } public void addTaskHook(ITaskHook newHook) { Collection hooks = delegate.getHooks(); diff --git a/heron/storm/src/java/org/apache/storm/topology/BoltDeclarerImpl.java b/heron/storm/src/java/org/apache/storm/topology/BoltDeclarerImpl.java index f4aaa65edbc..b42e2691b69 100644 --- a/heron/storm/src/java/org/apache/storm/topology/BoltDeclarerImpl.java +++ b/heron/storm/src/java/org/apache/storm/topology/BoltDeclarerImpl.java @@ -19,13 +19,18 @@ package org.apache.storm.topology; import java.util.Map; +import java.util.logging.Logger; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.generated.Grouping; import org.apache.storm.grouping.CustomStreamGrouping; import org.apache.storm.grouping.CustomStreamGroupingDelegate; import org.apache.storm.tuple.Fields; import org.apache.storm.utils.Utils; public class BoltDeclarerImpl implements BoltDeclarer { + private static final Logger LOG = Logger.getLogger(BoltDeclarerImpl.class.getName()); + private com.twitter.heron.api.topology.BoltDeclarer delegate; public BoltDeclarerImpl(com.twitter.heron.api.topology.BoltDeclarer delegate) { @@ -157,4 +162,60 @@ public BoltDeclarer customGrouping( delegate.customGrouping(componentId, streamId, new CustomStreamGroupingDelegate(grouping)); return this; } + + @Override + public BoltDeclarer partialKeyGrouping(String componentId, Fields fields) { + throw new RuntimeException("partialKeyGrouping not supported"); + } + + @Override + public BoltDeclarer partialKeyGrouping(String componentId, String streamId, Fields fields) { + throw new RuntimeException("partialKeyGrouping not supported"); + } + + @Override + public BoltDeclarer grouping(GlobalStreamId id, Grouping grouping) { + switch (grouping.getSetField()) { + case ALL: + return allGrouping(id.get_componentId(), id.get_streamId()); + case DIRECT: + return directGrouping(id.get_componentId(), id.get_streamId()); + case FIELDS: + return fieldsGrouping(id.get_componentId(), id.get_streamId(), new Fields(grouping.get_fields())); + case LOCAL_OR_SHUFFLE: + return localOrShuffleGrouping(id.get_componentId(), id.get_streamId()); + case SHUFFLE: + return shuffleGrouping(id.get_componentId(), id.get_streamId()); + case NONE: + return noneGrouping(id.get_componentId(), id.get_streamId()); + case CUSTOM_SERIALIZED: + grouping.get_custom_serialized(); + LOG.warning(String.format( + "%s.grouping(GlobalStreamId id, Grouping grouping) not supported for %s, swapping in " + + "noneGrouping. The tuple stream routing will be broken for streamId %s", + getClass().getName(), grouping.getSetField(), id)); + return noneGrouping(id.get_componentId(), id.get_streamId()); + case CUSTOM_OBJECT: + //grouping.get_custom_object(); + default: + throw new RuntimeException( + "grouping(GlobalStreamId id, Grouping grouping) not supported for " + + grouping.getSetField()); + } + } + + @Override + public BoltDeclarer setMemoryLoad(Number onHeap) { + return null; + } + + @Override + public BoltDeclarer setMemoryLoad(Number onHeap, Number offHeap) { + return null; + } + + @Override + public BoltDeclarer setCPULoad(Number amount) { + return null; + } } diff --git a/heron/storm/src/java/org/apache/storm/topology/ComponentConfigurationDeclarer.java b/heron/storm/src/java/org/apache/storm/topology/ComponentConfigurationDeclarer.java index 93eeb27a3e4..a149024ea1c 100644 --- a/heron/storm/src/java/org/apache/storm/topology/ComponentConfigurationDeclarer.java +++ b/heron/storm/src/java/org/apache/storm/topology/ComponentConfigurationDeclarer.java @@ -21,7 +21,8 @@ import java.util.Map; @SuppressWarnings("rawtypes") -public interface ComponentConfigurationDeclarer { +public interface ComponentConfigurationDeclarer + extends ResourceDeclarer { T addConfigurations(Map conf); T addConfiguration(String config, Object value); diff --git a/heron/storm/src/java/org/apache/storm/topology/InputDeclarer.java b/heron/storm/src/java/org/apache/storm/topology/InputDeclarer.java index ca781a2b7a8..0a766fed99e 100644 --- a/heron/storm/src/java/org/apache/storm/topology/InputDeclarer.java +++ b/heron/storm/src/java/org/apache/storm/topology/InputDeclarer.java @@ -18,6 +18,8 @@ package org.apache.storm.topology; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.generated.Grouping; import org.apache.storm.grouping.CustomStreamGrouping; import org.apache.storm.tuple.Fields; @@ -57,5 +59,9 @@ public interface InputDeclarer { T customGrouping(String componentId, String streamId, CustomStreamGrouping grouping); - // T grouping(GlobalStreamId id, Grouping grouping); + T partialKeyGrouping(String componentId, Fields fields); + + T partialKeyGrouping(String componentId, String streamId, Fields fields); + + T grouping(GlobalStreamId id, Grouping grouping); } diff --git a/heron/storm/src/java/org/apache/storm/topology/ResourceDeclarer.java b/heron/storm/src/java/org/apache/storm/topology/ResourceDeclarer.java new file mode 100644 index 00000000000..37dc917849c --- /dev/null +++ b/heron/storm/src/java/org/apache/storm/topology/ResourceDeclarer.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.topology; + +/** + * This is a new base interface that can be used by anything that wants to mirror + * RAS's basic API. Trident uses this to allow setting resources in the Stream API. + */ +@SuppressWarnings("rawtypes") +public interface ResourceDeclarer { + T setMemoryLoad(Number onHeap); + T setMemoryLoad(Number onHeap, Number offHeap); + T setCPULoad(Number amount); +} + diff --git a/heron/storm/src/java/org/apache/storm/topology/SpoutDeclarerImpl.java b/heron/storm/src/java/org/apache/storm/topology/SpoutDeclarerImpl.java index 28dcc901889..29aa08fa2d9 100644 --- a/heron/storm/src/java/org/apache/storm/topology/SpoutDeclarerImpl.java +++ b/heron/storm/src/java/org/apache/storm/topology/SpoutDeclarerImpl.java @@ -63,4 +63,19 @@ public SpoutDeclarer setNumTasks(Number val) { // Heron does not support this return this; } + + @Override + public SpoutDeclarer setMemoryLoad(Number onHeap) { + return null; + } + + @Override + public SpoutDeclarer setMemoryLoad(Number onHeap, Number offHeap) { + return null; + } + + @Override + public SpoutDeclarer setCPULoad(Number amount) { + return null; + } } diff --git a/heron/storm/src/java/org/apache/storm/trident/README.md b/heron/storm/src/java/org/apache/storm/trident/README.md new file mode 100644 index 00000000000..ca023e75ab7 --- /dev/null +++ b/heron/storm/src/java/org/apache/storm/trident/README.md @@ -0,0 +1,58 @@ +Hackweek prototype to get a sample trident topology working. + +tl;dr; TridentWordCountTopologyHeron runs and tuples are being transmitted. Bolt b-1 receives +sentences and splits them into words and counts. Bolt b-0 does not seem to receive and aggregate +them. We need to understand why. + +To run: + +``` +$ cd path/to/zookeeper && bin/zkServer.sh start +$ ~/bin/heron kill local TridentWordCountTopology && rm -rf ~/.herondata/* +$ bazel run --config=darwin --verbose_failures -- scripts/packages/heron-client-install.sh --user && \ + ~/bin/heron submit local ~/.heron/examples/heron-examples.jar \ + com.twitter.heron.examples.TridentWordCountTopology TridentWordCountTopology +$ less ~/.herondata/topologies/local/billg/TridentWordCountTopology/log-files/container_1_b-1_4.log.0 +``` + +Notes: +- spout-spout1 emits sentences via FixedBatchSpout on stream s1 to bolt b1 +- b1 receives sentences on SubtopologyBolt and delegates to EachProcessor to Split to output words + to AppendCollector to AggregateProcessor to GroupedAggregator + +Current status: +- Topology compiles and can be submitted +- DAG and streams appears to be correctly represented +- Aggregation does not appear to be happening on the terminal bolt (b-0) +- Trident/Storm do not provide reasonable defaults to configs and instances fails violently when + expected configs are not set. See TridentWordCountTopology. +- Many methods have been added/hacked to get the topology to run, but +- Failures on stream $coord-bg0 appear in the counters for bolt b-1, but the logs don't show anything + +Issues: +1. `com.twitter.heron.instance.bolt.BoltOutputCollectorImpl.admitBoltTuple` changed to return task ids +2. `BoltDeclarerImpl.grouping(GlobalStreamId id, Grouping grouping)` doesn't support `CUSTOM_SERIALIZED` properly +3. GeneralTopologyContext does a bunch of janky stuff with NoValueMap, for callers who need keySets only +4. Zookeeper acls are not implemented in Utils. + +TODO: +- Figure out why bolt b-1 is failing to process tuples on the $coord-bg0 stream. +- Bolt b-1 seems to receive sentences on stream s1 and split them into words in the code, but they +don't seem to be getting to b-0. Understand why. Are they being emitted and received and he counters +are wrong, or are they not emitted. +- Understand MemoryMapState and see counts getting persisted in it. I suspect this should be done by b-0. +- Understand why direct grouping and `emitDirect` are needed +- Fix `admitBoltTuple` changed to return task ids to return real tuples ids (see #1 above) +- Understand why `CUSTOM_SERIALIZED` is needed and how to support (see #4 above) +- Figure out why `org.apache.storm.trident.topology.TransactionAttempt` is only registered as + `Config.TOPOLOGY_KRYO_REGISTER` in spouts and not bolts. +- Lots of additions were added to org.apache.storm code in heron. These implementations should all + be verified and in some cases fixed. Storm code in heron also seems to drift between versions. This + code should really be pinned to a given storm version. To make it easier to upgrade to new storm + version. Because storm classes are copied and modified, managing storm versions with heron + modifications is really hard currently. +- To get things working the org.apache.storm core jar was added as a dep to the project (see WORKSPACE), + since it has all the trident code. We wouldn't want to do this in the long haul. Instead we'd + probably want to include an artifact that only has the trident code. +- Do a FULL AUDIT of all changes in the branch before even thinking about merging any of it. This is + prototype hackweek code, people. diff --git a/heron/storm/src/java/org/apache/storm/trident/topology/MasterBatchCoordinator.java b/heron/storm/src/java/org/apache/storm/trident/topology/MasterBatchCoordinator.java new file mode 100644 index 00000000000..88b36e6cc59 --- /dev/null +++ b/heron/storm/src/java/org/apache/storm/trident/topology/MasterBatchCoordinator.java @@ -0,0 +1,289 @@ +// Copyright 2017 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License +package org.apache.storm.trident.topology; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.TreeMap; + +import org.apache.storm.Config; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.trident.spout.ITridentSpout; +import org.apache.storm.trident.topology.state.TransactionalState; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.WindowedTimeThrottler; + +import com.twitter.heron.common.basics.TypeUtils; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; + +@SuppressWarnings({"unchecked", "rawtypes", "cast", "serial", "checkstyle:all"}) +public class MasterBatchCoordinator extends BaseRichSpout { +// public static final Logger LOG = LoggerFactory.getLogger(MasterBatchCoordinator.class); + + public static final long INIT_TXID = 1L; + + + public static final String BATCH_STREAM_ID = "$batch"; + public static final String COMMIT_STREAM_ID = "$commit"; + public static final String SUCCESS_STREAM_ID = "$success"; + + private static final String CURRENT_TX = "currtx"; + private static final String CURRENT_ATTEMPTS = "currattempts"; + + private List _states = new ArrayList(); + + TreeMap _activeTx = new TreeMap(); + TreeMap _attemptIds; + + private SpoutOutputCollector _collector; + Long _currTransaction; + int _maxTransactionActive; + + List _coordinators = new ArrayList(); + + + List _managedSpoutIds; + List _spouts; + WindowedTimeThrottler _throttler; + + boolean _active = true; + + public MasterBatchCoordinator(List spoutIds, List spouts) { + if(spoutIds.isEmpty()) { + throw new IllegalArgumentException("Must manage at least one spout"); + } + _managedSpoutIds = spoutIds; + _spouts = spouts; + } + + public List getManagedSpoutIds(){ + return _managedSpoutIds; + } + + @Override + public void activate() { + _active = true; + } + + @Override + public void deactivate() { + _active = false; + } + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + _throttler = new WindowedTimeThrottler((Number)conf.get(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS), 1); + for(String spoutId: _managedSpoutIds) { + _states.add(TransactionalState.newCoordinatorState(conf, spoutId)); + } + _currTransaction = getStoredCurrTransaction(); + + _collector = collector; + // only heron-specifc change in this class to use TypeUtils + Number active = TypeUtils.getInteger(conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING)); + if(active==null) { + _maxTransactionActive = 1; + } else { + _maxTransactionActive = active.intValue(); + } + _attemptIds = getStoredCurrAttempts(_currTransaction, _maxTransactionActive); + + + for(int i=0; i<_spouts.size(); i++) { + String txId = _managedSpoutIds.get(i); + _coordinators.add(_spouts.get(i).getCoordinator(txId, conf, context)); + } + } + + @Override + public void close() { + for(TransactionalState state: _states) { + state.close(); + } + } + + @Override + public void nextTuple() { + sync(); + } + + @Override + public void ack(Object msgId) { + TransactionAttempt tx = (TransactionAttempt) msgId; + TransactionStatus status = _activeTx.get(tx.getTransactionId()); + if(status!=null && tx.equals(status.attempt)) { + if(status.status== AttemptStatus.PROCESSING) { + status.status = AttemptStatus.PROCESSED; + } else if(status.status== AttemptStatus.COMMITTING) { + _activeTx.remove(tx.getTransactionId()); + _attemptIds.remove(tx.getTransactionId()); + _collector.emit(SUCCESS_STREAM_ID, new Values(tx)); + _currTransaction = nextTransactionId(tx.getTransactionId()); + for(TransactionalState state: _states) { + state.setData(CURRENT_TX, _currTransaction); + } + } + sync(); + } + } + + @Override + public void fail(Object msgId) { + TransactionAttempt tx = (TransactionAttempt) msgId; + TransactionStatus stored = _activeTx.remove(tx.getTransactionId()); + if(stored!=null && tx.equals(stored.attempt)) { + _activeTx.tailMap(tx.getTransactionId()).clear(); + sync(); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + // in partitioned example, in case an emitter task receives a later transaction than it's emitted so far, + // when it sees the earlier txid it should know to emit nothing + declarer.declareStream(BATCH_STREAM_ID, new Fields("tx")); + declarer.declareStream(COMMIT_STREAM_ID, new Fields("tx")); + declarer.declareStream(SUCCESS_STREAM_ID, new Fields("tx")); + } + + private void sync() { + // note that sometimes the tuples active may be less than max_spout_pending, e.g. + // max_spout_pending = 3 + // tx 1, 2, 3 active, tx 2 is acked. there won't be a commit for tx 2 (because tx 1 isn't committed yet), + // and there won't be a batch for tx 4 because there's max_spout_pending tx active + TransactionStatus maybeCommit = _activeTx.get(_currTransaction); + if(maybeCommit!=null && maybeCommit.status == AttemptStatus.PROCESSED) { + maybeCommit.status = AttemptStatus.COMMITTING; + _collector.emit(COMMIT_STREAM_ID, new Values(maybeCommit.attempt), maybeCommit.attempt); + } + + if(_active) { + if(_activeTx.size() < _maxTransactionActive) { + Long curr = _currTransaction; + for(int i=0; i<_maxTransactionActive; i++) { + if(!_activeTx.containsKey(curr) && isReady(curr)) { + // by using a monotonically increasing attempt id, downstream tasks + // can be memory efficient by clearing out state for old attempts + // as soon as they see a higher attempt id for a transaction + Integer attemptId = _attemptIds.get(curr); + if(attemptId==null) { + attemptId = 0; + } else { + attemptId++; + } + _attemptIds.put(curr, attemptId); + for(TransactionalState state: _states) { + state.setData(CURRENT_ATTEMPTS, _attemptIds); + } + + TransactionAttempt attempt = new TransactionAttempt(curr, attemptId); + _activeTx.put(curr, new TransactionStatus(attempt)); + _collector.emit(BATCH_STREAM_ID, new Values(attempt), attempt); + _throttler.markEvent(); + } + curr = nextTransactionId(curr); + } + } + } + } + + private boolean isReady(long txid) { + if(_throttler.isThrottled()) return false; + //TODO: make this strategy configurable?... right now it goes if anyone is ready + for(ITridentSpout.BatchCoordinator coord: _coordinators) { + if(coord.isReady(txid)) return true; + } + return false; + } + + @Override + public Map getComponentConfiguration() { + Config ret = new Config(); + ret.setMaxTaskParallelism(1); + ret.registerSerialization(TransactionAttempt.class); + return ret; + } + + private static enum AttemptStatus { + PROCESSING, + PROCESSED, + COMMITTING + } + + private static class TransactionStatus { + TransactionAttempt attempt; + AttemptStatus status; + + public TransactionStatus(TransactionAttempt attempt) { + this.attempt = attempt; + this.status = AttemptStatus.PROCESSING; + } + + @Override + public String toString() { + return attempt.toString() + " <" + status.toString() + ">"; + } + } + + + private Long nextTransactionId(Long id) { + return id + 1; + } + + private Long getStoredCurrTransaction() { + Long ret = INIT_TXID; + for(TransactionalState state: _states) { + Long curr = (Long) state.getData(CURRENT_TX); + if(curr!=null && curr.compareTo(ret) > 0) { + ret = curr; + } + } + return ret; + } + + private TreeMap getStoredCurrAttempts(long currTransaction, int maxBatches) { + TreeMap ret = new TreeMap(); + for(TransactionalState state: _states) { + Map attempts = (Map) state.getData(CURRENT_ATTEMPTS); + if(attempts==null) attempts = new HashMap(); + for(Entry e: attempts.entrySet()) { + // this is because json doesn't allow numbers as keys... + // TODO: replace json with a better form of encoding + Number txidObj; + if(e.getKey() instanceof String) { + txidObj = Long.parseLong((String) e.getKey()); + } else { + txidObj = (Number) e.getKey(); + } + long txid = ((Number) txidObj).longValue(); + int attemptId = ((Number) e.getValue()).intValue(); + Integer curr = ret.get(txid); + if(curr==null || attemptId > curr) { + ret.put(txid, attemptId); + } + } + } + ret.headMap(currTransaction).clear(); + ret.tailMap(currTransaction + maxBatches - 1).clear(); + return ret; + } +} diff --git a/heron/storm/src/java/org/apache/storm/trident/topology/TridentBoltExecutor.java b/heron/storm/src/java/org/apache/storm/trident/topology/TridentBoltExecutor.java new file mode 100644 index 00000000000..f30d69aaad1 --- /dev/null +++ b/heron/storm/src/java/org/apache/storm/trident/topology/TridentBoltExecutor.java @@ -0,0 +1,442 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.trident.topology; + +import org.apache.storm.Config; +import org.apache.storm.coordination.BatchOutputCollector; +import org.apache.storm.coordination.BatchOutputCollectorImpl; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.generated.Grouping; +import org.apache.storm.task.IOutputCollector; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.FailedException; +import org.apache.storm.topology.IRichBolt; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.ReportedFailedException; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.RotatingMap; +import org.apache.storm.utils.TupleUtils; +import org.apache.storm.utils.Utils; +import java.io.Serializable; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.commons.lang.builder.ToStringBuilder; +import org.apache.storm.trident.spout.IBatchID; + +@SuppressWarnings({"unchecked", "rawtypes", "serial", "cast"}) +public class TridentBoltExecutor implements IRichBolt { + public static final String COORD_STREAM_PREFIX = "$coord-"; + + public static String COORD_STREAM(String batch) { + return COORD_STREAM_PREFIX + batch; + } + + public static class CoordType implements Serializable { + public boolean singleCount; + + protected CoordType(boolean singleCount) { + this.singleCount = singleCount; + } + + public static CoordType single() { + return new CoordType(true); + } + + public static CoordType all() { + return new CoordType(false); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof CoordType)) return false; + + CoordType coordType = (CoordType) o; + + return singleCount == coordType.singleCount; + } + + @Override + public int hashCode() { + return (singleCount ? 1 : 0); + } + + @Override + public String toString() { + return ""; + } + } + + public static class CoordSpec implements Serializable { + public GlobalStreamId commitStream = null; + public Map coords = new HashMap<>(); + + public CoordSpec() { + } + } + + public static class CoordCondition implements Serializable { + public GlobalStreamId commitStream; + public int expectedTaskReports; + Set targetTasks; + + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this); + } + } + + Map _batchGroupIds; + Map _coordSpecs; + Map _coordConditions; + ITridentBatchBolt _bolt; + long _messageTimeoutMs; + long _lastRotate; + + RotatingMap _batches; + + // map from batchgroupid to coordspec + public TridentBoltExecutor(ITridentBatchBolt bolt, Map batchGroupIds, Map coordinationSpecs) { + _batchGroupIds = batchGroupIds; + _coordSpecs = coordinationSpecs; + _bolt = bolt; + } + + public static class TrackedBatch { + int attemptId; + BatchInfo info; + CoordCondition condition; + int reportedTasks = 0; + int expectedTupleCount = 0; + int receivedTuples = 0; + Map taskEmittedTuples = new HashMap<>(); + boolean failed = false; + boolean receivedCommit; + Tuple delayedAck = null; + + public TrackedBatch(BatchInfo info, CoordCondition condition, int attemptId) { + this.info = info; + this.condition = condition; + this.attemptId = attemptId; + receivedCommit = condition.commitStream == null; + } + + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this); + } + } + + private static class CoordinatedOutputCollector implements IOutputCollector { + IOutputCollector _delegate; + + TrackedBatch _currBatch = null; + + public void setCurrBatch(TrackedBatch batch) { + _currBatch = batch; + } + + public CoordinatedOutputCollector(IOutputCollector delegate) { + _delegate = delegate; + } + + public List emit(String stream, Collection anchors, List tuple) { + List tasks = _delegate.emit(stream, anchors, tuple); + updateTaskCounts(tasks); + return tasks; + } + + public void emitDirect(int task, String stream, Collection anchors, List tuple) { + updateTaskCounts(Arrays.asList(task)); + _delegate.emitDirect(task, stream, anchors, tuple); + } + + public void ack(Tuple tuple) { + throw new IllegalStateException("Method should never be called"); + } + + public void fail(Tuple tuple) { + throw new IllegalStateException("Method should never be called"); + } + + public void resetTimeout(Tuple tuple) { + throw new IllegalStateException("Method should never be called"); + } + + public void reportError(Throwable error) { + _delegate.reportError(error); + } + + + private void updateTaskCounts(List tasks) { + if(_currBatch!=null) { + Map taskEmittedTuples = _currBatch.taskEmittedTuples; + for(Integer task: tasks) { + int newCount = Utils.get(taskEmittedTuples, task, 0) + 1; + taskEmittedTuples.put(task, newCount); + } + } + } + } + + OutputCollector _collector; + CoordinatedOutputCollector _coordCollector; + BatchOutputCollector _coordOutputCollector; + TopologyContext _context; + + @Override + public void prepare(Map conf, TopologyContext context, OutputCollector collector) { + _messageTimeoutMs = 30000; // TODO support this context.maxTopologyMessageTimeout() * 1000L; + _lastRotate = System.currentTimeMillis(); + _batches = new RotatingMap<>(2); + _context = context; + _collector = collector; + _coordCollector = new CoordinatedOutputCollector(collector); + _coordOutputCollector = new BatchOutputCollectorImpl(new OutputCollector(_coordCollector)); + + _coordConditions = (Map) context.getExecutorData("__coordConditions"); + if(_coordConditions==null) { + _coordConditions = new HashMap<>(); + for(String batchGroup: _coordSpecs.keySet()) { + CoordSpec spec = _coordSpecs.get(batchGroup); + CoordCondition cond = new CoordCondition(); + cond.commitStream = spec.commitStream; + cond.expectedTaskReports = 0; + for(String comp: spec.coords.keySet()) { + CoordType ct = spec.coords.get(comp); + if(ct.equals(CoordType.single())) { + cond.expectedTaskReports+=1; + } else { + cond.expectedTaskReports+=context.getComponentTasks(comp).size(); + } + } + cond.targetTasks = new HashSet<>(); + // get all downstream consumers of this component of the the stream COORD_STREAM(batchGroup) + for(String component: Utils.get(context.getThisTargets(), + COORD_STREAM(batchGroup), + new HashMap()).keySet()) { + cond.targetTasks.addAll(context.getComponentTasks(component)); + } + _coordConditions.put(batchGroup, cond); + } + context.setExecutorData("_coordConditions", _coordConditions); + } + _bolt.prepare(conf, context, _coordOutputCollector); + } + + private void failBatch(TrackedBatch tracked, FailedException e) { + if(e!=null && e instanceof ReportedFailedException) { + _collector.reportError(e); + } + tracked.failed = true; + if(tracked.delayedAck!=null) { + _collector.fail(tracked.delayedAck); + tracked.delayedAck = null; + } + } + + private void failBatch(TrackedBatch tracked) { + failBatch(tracked, null); + } + + private boolean finishBatch(TrackedBatch tracked, Tuple finishTuple) { + boolean success = true; + try { + _bolt.finishBatch(tracked.info); + String stream = COORD_STREAM(tracked.info.batchGroup); + for(Integer task: tracked.condition.targetTasks) { + _collector.emitDirect(task, stream, finishTuple, new Values(tracked.info.batchId, Utils.get(tracked.taskEmittedTuples, task, 0))); + } + if(tracked.delayedAck!=null) { + _collector.ack(tracked.delayedAck); + tracked.delayedAck = null; + } + } catch(FailedException e) { + failBatch(tracked, e); + success = false; + } + _batches.remove(tracked.info.batchId.getId()); + return success; + } + + private void checkFinish(TrackedBatch tracked, Tuple tuple, TupleType type) { + if(tracked.failed) { + failBatch(tracked); + _collector.fail(tuple); + return; + } + CoordCondition cond = tracked.condition; + boolean delayed = tracked.delayedAck==null && + (cond.commitStream!=null && type==TupleType.COMMIT + || cond.commitStream==null); + if(delayed) { + tracked.delayedAck = tuple; + } + boolean failed = false; + if(tracked.receivedCommit && tracked.reportedTasks == cond.expectedTaskReports) { + if(tracked.receivedTuples == tracked.expectedTupleCount) { + finishBatch(tracked, tuple); + } else { + //TODO: add logging that not all tuples were received + failBatch(tracked); + _collector.fail(tuple); + failed = true; + } + } + + if(!delayed && !failed) { + _collector.ack(tuple); + } + + } + + @Override + public void execute(Tuple tuple) { + if(TupleUtils.isTick(tuple)) { + long now = System.currentTimeMillis(); + if(now - _lastRotate > _messageTimeoutMs) { + _batches.rotate(); + _lastRotate = now; + } + return; + } + + String batchGroup = _batchGroupIds.get(new GlobalStreamId(tuple.getSourceComponent(), tuple.getSourceStreamId())); + if(batchGroup==null) { + // this is so we can do things like have simple DRPC that doesn't need to use batch processing + _coordCollector.setCurrBatch(null); + _bolt.execute(null, tuple); + _collector.ack(tuple); + return; + } + IBatchID id = (IBatchID) tuple.getValue(0); + //get transaction id + //if it already exists and attempt id is greater than the attempt there + + + TrackedBatch tracked = (TrackedBatch) _batches.get(id.getId()); +// if(_batches.size() > 10 && _context.getThisTaskIndex() == 0) { +// System.out.println("Received in " + _context.getThisComponentId() + " " + _context.getThisTaskIndex() +// + " (" + _batches.size() + ")" + +// "\ntuple: " + tuple + +// "\nwith tracked " + tracked + +// "\nwith id " + id + +// "\nwith group " + batchGroup +// + "\n"); +// +// } + //System.out.println("Num tracked: " + _batches.size() + " " + _context.getThisComponentId() + " " + _context.getThisTaskIndex()); + + // this code here ensures that only one attempt is ever tracked for a batch, so when + // failures happen you don't get an explosion in memory usage in the tasks + if(tracked!=null) { + if(id.getAttemptId() > tracked.attemptId) { + _batches.remove(id.getId()); + tracked = null; + } else if(id.getAttemptId() < tracked.attemptId) { + // no reason to try to execute a previous attempt than we've already seen + return; + } + } + + if(tracked==null) { + tracked = new TrackedBatch(new BatchInfo(batchGroup, id, _bolt.initBatchState(batchGroup, id)), _coordConditions.get(batchGroup), id.getAttemptId()); + _batches.put(id.getId(), tracked); + } + _coordCollector.setCurrBatch(tracked); + + //System.out.println("TRACKED: " + tracked + " " + tuple); + + TupleType t = getTupleType(tuple, tracked); + if(t==TupleType.COMMIT) { + tracked.receivedCommit = true; + checkFinish(tracked, tuple, t); + } else if(t==TupleType.COORD) { + int count = tuple.getInteger(1); + tracked.reportedTasks++; + tracked.expectedTupleCount+=count; + checkFinish(tracked, tuple, t); + } else { + tracked.receivedTuples++; + boolean success = true; + try { + _bolt.execute(tracked.info, tuple); + if(tracked.condition.expectedTaskReports==0) { + success = finishBatch(tracked, tuple); + } + } catch(FailedException e) { + failBatch(tracked, e); + } + if(success) { + _collector.ack(tuple); + } else { + _collector.fail(tuple); + } + } + _coordCollector.setCurrBatch(null); + } + + @Override + public void cleanup() { + _bolt.cleanup(); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + _bolt.declareOutputFields(declarer); + for(String batchGroup: _coordSpecs.keySet()) { + declarer.declareStream(COORD_STREAM(batchGroup), true, new Fields("id", "count")); + } + } + + @Override + public Map getComponentConfiguration() { + Map ret = _bolt.getComponentConfiguration(); + if(ret==null) ret = new HashMap<>(); + ret.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5); + // TODO: Need to be able to set the tick tuple time to the message timeout, ideally without parameterization + return ret; + } + + private TupleType getTupleType(Tuple tuple, TrackedBatch batch) { + CoordCondition cond = batch.condition; + if(cond.commitStream!=null + && new GlobalStreamId(tuple.getSourceComponent(), tuple.getSourceStreamId()).equals(cond.commitStream)) { + return TupleType.COMMIT; + } else if(cond.expectedTaskReports > 0 + && tuple.getSourceStreamId().startsWith(COORD_STREAM_PREFIX)) { + return TupleType.COORD; + } else { + return TupleType.REGULAR; + } + } + + static enum TupleType { + REGULAR, + COMMIT, + COORD + } +} \ No newline at end of file diff --git a/heron/storm/src/java/org/apache/storm/trident/topology/TridentTopologyBuilder.java b/heron/storm/src/java/org/apache/storm/trident/topology/TridentTopologyBuilder.java new file mode 100644 index 00000000000..5d3213bad1f --- /dev/null +++ b/heron/storm/src/java/org/apache/storm/trident/topology/TridentTopologyBuilder.java @@ -0,0 +1,756 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.trident.topology; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.lang.builder.ToStringBuilder; +import org.apache.commons.lang.builder.ToStringStyle; +import org.apache.storm.generated.GlobalStreamId; +//import org.apache.storm.generated.Grouping; +import org.apache.storm.generated.Grouping; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.grouping.CustomStreamGrouping; +import org.apache.storm.grouping.PartialKeyGrouping; +import org.apache.storm.topology.BaseConfigurationDeclarer; +import org.apache.storm.topology.BoltDeclarer; +import org.apache.storm.topology.IRichSpout; +import org.apache.storm.topology.InputDeclarer; +import org.apache.storm.topology.SpoutDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.trident.spout.BatchSpoutExecutor; +import org.apache.storm.trident.spout.IBatchSpout; +import org.apache.storm.trident.spout.ICommitterTridentSpout; +import org.apache.storm.trident.spout.ITridentSpout; +import org.apache.storm.trident.spout.RichSpoutBatchTriggerer; +import org.apache.storm.trident.spout.TridentSpoutCoordinator; +import org.apache.storm.trident.spout.TridentSpoutExecutor; +import org.apache.storm.trident.topology.TridentBoltExecutor.CoordSpec; +import org.apache.storm.trident.topology.TridentBoltExecutor.CoordType; +import org.apache.storm.tuple.Fields; + +// based on transactional topologies +@SuppressWarnings({"unchecked", "rawtypes", "checkstyle:all"}) +public class TridentTopologyBuilder { + Map _batchIds = new HashMap(); + Map _spouts = new HashMap(); + Map _batchPerTupleSpouts = new HashMap(); + Map _bolts = new HashMap(); + + + @SuppressWarnings({"unchecked", "rawtypes"}) + public SpoutDeclarer setBatchPerTupleSpout(String id, String streamName, IRichSpout spout, Integer parallelism, String batchGroup) { + Map batchGroups = new HashMap(); + batchGroups.put(streamName, batchGroup); + markBatchGroups(id, batchGroups); + SpoutComponent c = new SpoutComponent(spout, streamName, parallelism, batchGroup); + _batchPerTupleSpouts.put(id, c); + return new SpoutDeclarerImpl(c); + } + + public SpoutDeclarer setSpout(String id, String streamName, String txStateId, IBatchSpout spout, Integer parallelism, String batchGroup) { + return setSpout(id, streamName, txStateId, new BatchSpoutExecutor(spout), parallelism, batchGroup); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + public SpoutDeclarer setSpout(String id, String streamName, String txStateId, ITridentSpout spout, Integer parallelism, String batchGroup) { + Map batchGroups = new HashMap(); + batchGroups.put(streamName, batchGroup); + markBatchGroups(id, batchGroups); + + TransactionalSpoutComponent c = new TransactionalSpoutComponent(spout, streamName, parallelism, txStateId, batchGroup); + _spouts.put(id, c); + return new SpoutDeclarerImpl(c); + } + + // map from stream name to batch id + public BoltDeclarer setBolt(String id, ITridentBatchBolt bolt, Integer parallelism, Set committerBatches, Map batchGroups) { + markBatchGroups(id, batchGroups); + Component c = new Component(bolt, parallelism, committerBatches); + _bolts.put(id, c); + return new BoltDeclarerImpl(c); + + } + + String masterCoordinator(String batchGroup) { + return "$mastercoord-" + batchGroup; + } + + static final String SPOUT_COORD_PREFIX = "$spoutcoord-"; + + public static String spoutCoordinator(String spoutId) { + return SPOUT_COORD_PREFIX + spoutId; + } + + public static String spoutIdFromCoordinatorId(String coordId) { + return coordId.substring(SPOUT_COORD_PREFIX.length()); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + Map fleshOutStreamBatchIds(boolean includeCommitStream) { + Map ret = new HashMap<>(_batchIds); + Set allBatches = new HashSet(_batchIds.values()); + for(String b: allBatches) { + ret.put(new GlobalStreamId(masterCoordinator(b), MasterBatchCoordinator.BATCH_STREAM_ID), b); + if(includeCommitStream) { + ret.put(new GlobalStreamId(masterCoordinator(b), MasterBatchCoordinator.COMMIT_STREAM_ID), b); + } + // DO NOT include the success stream as part of the batch. it should not trigger coordination tuples, + // and is just a metadata tuple to assist in cleanup, should not trigger batch tracking + } + + for(String id: _spouts.keySet()) { + TransactionalSpoutComponent c = _spouts.get(id); + if(c.batchGroupId!=null) { + ret.put(new GlobalStreamId(spoutCoordinator(id), MasterBatchCoordinator.BATCH_STREAM_ID), c.batchGroupId); + } + } + + //this takes care of setting up coord streams for spouts and bolts + for(GlobalStreamId s: _batchIds.keySet()) { + String b = _batchIds.get(s); + ret.put(new GlobalStreamId(s.get_componentId(), TridentBoltExecutor.COORD_STREAM(b)), b); + } + + return ret; + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + public StormTopology buildTopology() { + TopologyBuilder builder = new TopologyBuilder(); + Map batchIdsForSpouts = fleshOutStreamBatchIds(false); + Map batchIdsForBolts = fleshOutStreamBatchIds(true); + + Map> batchesToCommitIds = new HashMap<>(); + Map> batchesToSpouts = new HashMap<>(); + + for(String id: _spouts.keySet()) { + TransactionalSpoutComponent c = _spouts.get(id); + if(c.spout instanceof IRichSpout) { + + //TODO: wrap this to set the stream name + builder.setSpout(id, (IRichSpout) c.spout, c.parallelism); + } else { + String batchGroup = c.batchGroupId; + if(!batchesToCommitIds.containsKey(batchGroup)) { + batchesToCommitIds.put(batchGroup, new ArrayList()); + } + batchesToCommitIds.get(batchGroup).add(c.commitStateId); + + if(!batchesToSpouts.containsKey(batchGroup)) { + batchesToSpouts.put(batchGroup, new ArrayList()); + } + batchesToSpouts.get(batchGroup).add((ITridentSpout) c.spout); + + + BoltDeclarer scd = + builder.setBolt(spoutCoordinator(id), new TridentSpoutCoordinator(c.commitStateId, (ITridentSpout) c.spout)) + .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.BATCH_STREAM_ID) + .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.SUCCESS_STREAM_ID); + + for(Map m: c.componentConfs) { + scd.addConfigurations(m); + } + + Map specs = new HashMap(); + specs.put(c.batchGroupId, new CoordSpec()); + BoltDeclarer bd = builder.setBolt(id, + new TridentBoltExecutor( + new TridentSpoutExecutor( + c.commitStateId, + c.streamName, + ((ITridentSpout) c.spout)), + batchIdsForSpouts, + specs), + c.parallelism); + bd.allGrouping(spoutCoordinator(id), MasterBatchCoordinator.BATCH_STREAM_ID); + bd.allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.SUCCESS_STREAM_ID); + if(c.spout instanceof ICommitterTridentSpout) { + bd.allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.COMMIT_STREAM_ID); + } + for(Map m: c.componentConfs) { + bd.addConfigurations(m); + } + } + } + + for(String id: _batchPerTupleSpouts.keySet()) { + SpoutComponent c = _batchPerTupleSpouts.get(id); + SpoutDeclarer d = builder.setSpout(id, new RichSpoutBatchTriggerer((IRichSpout) c.spout, c.streamName, c.batchGroupId), c.parallelism); + + for(Map conf: c.componentConfs) { + d.addConfigurations(conf); + } + } + + for(Map.Entry> entry: batchesToCommitIds.entrySet()) { + String batch = entry.getKey(); + List commitIds = entry.getValue(); + builder.setSpout(masterCoordinator(batch), new MasterBatchCoordinator(commitIds, batchesToSpouts.get(batch))); + } + + for(String id: _bolts.keySet()) { + Component c = _bolts.get(id); + + Map specs = new HashMap<>(); + + for(GlobalStreamId s: getBoltSubscriptionStreams(id)) { + String batch = batchIdsForBolts.get(s); + if (batch == null) { + throw new RuntimeException(String.format( + "Batch group id not found for stream id %s in batchIdsForBolts: %s", + s, batchIdsForBolts)); + } + if(!specs.containsKey(batch)) specs.put(batch, new CoordSpec()); + CoordSpec spec = specs.get(batch); + CoordType ct; + if(_batchPerTupleSpouts.containsKey(s.get_componentId())) { + ct = CoordType.single(); + } else { + ct = CoordType.all(); + } + spec.coords.put(s.get_componentId(), ct); + } + + for(String b: c.committerBatches) { + String masterCoordinator = masterCoordinator(b); + CoordSpec spec = specs.get(b); + GlobalStreamId streamId = new GlobalStreamId(masterCoordinator, MasterBatchCoordinator.COMMIT_STREAM_ID); + spec.commitStream = streamId; + } + + BoltDeclarer d = builder.setBolt(id, new TridentBoltExecutor(c.bolt, batchIdsForBolts, specs), c.parallelism); + for(Map conf: c.componentConfs) { + d.addConfigurations(conf); + } + + for(InputDeclaration inputDecl: c.declarations) { + inputDecl.declare(d); + } + + Map> batchToComponents = getBoltBatchToComponentSubscriptions(id); + for(Map.Entry> entry: batchToComponents.entrySet()) { + for(String comp: entry.getValue()) { + d.directGrouping(comp, TridentBoltExecutor.COORD_STREAM(entry.getKey())); + } + } + + for(String b: c.committerBatches) { + d.allGrouping(masterCoordinator(b), MasterBatchCoordinator.COMMIT_STREAM_ID); + } + } + + return builder.createTopology(); + } + + private void markBatchGroups(String component, Map batchGroups) { + for(Map.Entry entry: batchGroups.entrySet()) { + _batchIds.put(new GlobalStreamId(component, entry.getKey()), entry.getValue()); + } + } + + + @SuppressWarnings({"unchecked", "rawtypes"}) + private static class SpoutComponent { + public Object spout; + public Integer parallelism; + public List> componentConfs = new ArrayList<>(); + String batchGroupId; + String streamName; + + public SpoutComponent(Object spout, String streamName, Integer parallelism, String batchGroupId) { + this.spout = spout; + this.streamName = streamName; + this.parallelism = parallelism; + this.batchGroupId = batchGroupId; + } + + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this); + } + } + + private static class TransactionalSpoutComponent extends SpoutComponent { + public String commitStateId; + + public TransactionalSpoutComponent(Object spout, String streamName, Integer parallelism, String commitStateId, String batchGroupId) { + super(spout, streamName, parallelism, batchGroupId); + this.commitStateId = commitStateId; + } + + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE); + } + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + private static class Component { + public ITridentBatchBolt bolt; + public Integer parallelism; + public List declarations = new ArrayList<>(); + public List> componentConfs = new ArrayList<>(); + public Set committerBatches; + + public Component(ITridentBatchBolt bolt, Integer parallelism,Set committerBatches) { + this.bolt = bolt; + this.parallelism = parallelism; + this.committerBatches = committerBatches; + } + + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE); + } + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + Map> getBoltBatchToComponentSubscriptions(String id) { + Map> ret = new HashMap(); + for(GlobalStreamId s: getBoltSubscriptionStreams(id)) { + String b = _batchIds.get(s); + if(!ret.containsKey(b)) ret.put(b, new HashSet()); + ret.get(b).add(s.get_componentId()); + } + return ret; + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + List getBoltSubscriptionStreams(String id) { + List ret = new ArrayList(); + Component c = _bolts.get(id); + for(InputDeclaration d: c.declarations) { + ret.add(new GlobalStreamId(d.getComponent(), d.getStream())); + } + return ret; + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + private static interface InputDeclaration { + void declare(InputDeclarer declarer); + String getComponent(); + String getStream(); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + private static class SpoutDeclarerImpl extends BaseConfigurationDeclarer implements SpoutDeclarer { + SpoutComponent _component; + + public SpoutDeclarerImpl(SpoutComponent component) { + _component = component; + } + + @Override + public SpoutDeclarer addConfigurations(Map conf) { + _component.componentConfs.add(conf); + return this; + } + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + private static class BoltDeclarerImpl extends BaseConfigurationDeclarer implements BoltDeclarer { + Component _component; + + public BoltDeclarerImpl(Component component) { + _component = component; + } + + @Override + public BoltDeclarer fieldsGrouping(final String component, final Fields fields) { + addDeclaration(new InputDeclaration() { + @Override + public void declare(InputDeclarer declarer) { + declarer.fieldsGrouping(component, fields); + } + + @Override + public String getComponent() { + return component; + } + + @Override + public String getStream() { + return null; + } + }); + return this; + } + + @Override + public BoltDeclarer fieldsGrouping(final String component, final String streamId, final Fields fields) { + addDeclaration(new InputDeclaration() { + @Override + public void declare(InputDeclarer declarer) { + declarer.fieldsGrouping(component, streamId, fields); + } + + @Override + public String getComponent() { + return component; + } + + @Override + public String getStream() { + return streamId; + } + }); + return this; + } + + @Override + public BoltDeclarer globalGrouping(final String component) { + addDeclaration(new InputDeclaration() { + @Override + public void declare(InputDeclarer declarer) { + declarer.globalGrouping(component); + } + + @Override + public String getComponent() { + return component; + } + + @Override + public String getStream() { + return null; + } + }); + return this; + } + + @Override + public BoltDeclarer globalGrouping(final String component, final String streamId) { + addDeclaration(new InputDeclaration() { + @Override + public void declare(InputDeclarer declarer) { + declarer.globalGrouping(component, streamId); + } + + @Override + public String getComponent() { + return component; + } + + @Override + public String getStream() { + return streamId; + } + }); + return this; + } + + @Override + public BoltDeclarer shuffleGrouping(final String component) { + addDeclaration(new InputDeclaration() { + @Override + public void declare(InputDeclarer declarer) { + declarer.shuffleGrouping(component); + } + + @Override + public String getComponent() { + return component; + } + + @Override + public String getStream() { + return null; + } + }); + return this; + } + + @Override + public BoltDeclarer shuffleGrouping(final String component, final String streamId) { + addDeclaration(new InputDeclaration() { + @Override + public void declare(InputDeclarer declarer) { + declarer.shuffleGrouping(component, streamId); + } + + @Override + public String getComponent() { + return component; + } + + @Override + public String getStream() { + return streamId; + } + }); + return this; + } + + @Override + public BoltDeclarer localOrShuffleGrouping(final String component) { + addDeclaration(new InputDeclaration() { + @Override + public void declare(InputDeclarer declarer) { + declarer.localOrShuffleGrouping(component); + } + + @Override + public String getComponent() { + return component; + } + + @Override + public String getStream() { + return null; + } + }); + return this; + } + + @Override + public BoltDeclarer localOrShuffleGrouping(final String component, final String streamId) { + addDeclaration(new InputDeclaration() { + @Override + public void declare(InputDeclarer declarer) { + declarer.localOrShuffleGrouping(component, streamId); + } + + @Override + public String getComponent() { + return component; + } + + @Override + public String getStream() { + return streamId; + } + }); + return this; + } + + @Override + public BoltDeclarer noneGrouping(final String component) { + addDeclaration(new InputDeclaration() { + @Override + public void declare(InputDeclarer declarer) { + declarer.noneGrouping(component); + } + + @Override + public String getComponent() { + return component; + } + + @Override + public String getStream() { + return null; + } + }); + return this; + } + + @Override + public BoltDeclarer noneGrouping(final String component, final String streamId) { + addDeclaration(new InputDeclaration() { + @Override + public void declare(InputDeclarer declarer) { + declarer.noneGrouping(component, streamId); + } + + @Override + public String getComponent() { + return component; + } + + @Override + public String getStream() { + return streamId; + } + }); + return this; + } + + @Override + public BoltDeclarer allGrouping(final String component) { + addDeclaration(new InputDeclaration() { + @Override + public void declare(InputDeclarer declarer) { + declarer.allGrouping(component); + } + + @Override + public String getComponent() { + return component; + } + + @Override + public String getStream() { + return null; + } + }); + return this; + } + + @Override + public BoltDeclarer allGrouping(final String component, final String streamId) { + addDeclaration(new InputDeclaration() { + @Override + public void declare(InputDeclarer declarer) { + declarer.allGrouping(component, streamId); + } + + @Override + public String getComponent() { + return component; + } + + @Override + public String getStream() { + return streamId; + } + }); + return this; + } + + @Override + public BoltDeclarer directGrouping(final String component) { + addDeclaration(new InputDeclaration() { + @Override + public void declare(InputDeclarer declarer) { + declarer.directGrouping(component); + } + + @Override + public String getComponent() { + return component; + } + + @Override + public String getStream() { + return null; + } + }); + return this; + } + + @Override + public BoltDeclarer directGrouping(final String component, final String streamId) { + addDeclaration(new InputDeclaration() { + @Override + public void declare(InputDeclarer declarer) { + declarer.directGrouping(component, streamId); + } + + @Override + public String getComponent() { + return component; + } + + @Override + public String getStream() { + return streamId; + } + }); + return this; + } + + @Override + public BoltDeclarer partialKeyGrouping(String componentId, Fields fields) { + return customGrouping(componentId, new PartialKeyGrouping(fields)); + } + + @Override + public BoltDeclarer partialKeyGrouping(String componentId, String streamId, Fields fields) { + return customGrouping(componentId, streamId, new PartialKeyGrouping(fields)); + } + + @Override + public BoltDeclarer customGrouping(final String component, final CustomStreamGrouping grouping) { + addDeclaration(new InputDeclaration() { + @Override + public void declare(InputDeclarer declarer) { + declarer.customGrouping(component, grouping); + } + + @Override + public String getComponent() { + return component; + } + + @Override + public String getStream() { + return null; + } + }); + return this; + } + + @Override + public BoltDeclarer customGrouping(final String component, final String streamId, final CustomStreamGrouping grouping) { + addDeclaration(new InputDeclaration() { + @Override + public void declare(InputDeclarer declarer) { + declarer.customGrouping(component, streamId, grouping); + } + + @Override + public String getComponent() { + return component; + } + + @Override + public String getStream() { + return streamId; + } + }); + return this; + } + + @Override + public BoltDeclarer grouping(final GlobalStreamId stream, final Grouping grouping) { + addDeclaration(new InputDeclaration() { + @Override + public void declare(InputDeclarer declarer) { + declarer.grouping(stream, grouping); + } + + @Override + public String getComponent() { + return stream.get_componentId(); + } + + @Override + public String getStream() { + return stream.get_streamId(); + } + }); + return this; + } + + private void addDeclaration(InputDeclaration declaration) { + _component.declarations.add(declaration); + } + + @Override + public BoltDeclarer addConfigurations(Map conf) { + _component.componentConfs.add(conf); + return this; + } + } +} diff --git a/heron/storm/src/java/org/apache/storm/utils/Utils.java b/heron/storm/src/java/org/apache/storm/utils/Utils.java index d67af5ed552..1183bfbcc36 100644 --- a/heron/storm/src/java/org/apache/storm/utils/Utils.java +++ b/heron/storm/src/java/org/apache/storm/utils/Utils.java @@ -18,14 +18,45 @@ package org.apache.storm.utils; +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.ObjectInputStream; +import java.net.URL; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; +import org.apache.commons.io.input.ClassLoaderObjectInputStream; +import org.apache.commons.lang.StringUtils; +import org.apache.storm.shade.org.apache.curator.ensemble.exhibitor.DefaultExhibitorRestClient; +import org.apache.storm.shade.org.apache.curator.ensemble.exhibitor.ExhibitorEnsembleProvider; +import org.apache.storm.shade.org.apache.curator.ensemble.exhibitor.Exhibitors; +import org.apache.storm.shade.org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.storm.shade.org.apache.curator.framework.CuratorFramework; +import org.apache.storm.Config; +import org.apache.storm.shade.org.apache.zookeeper.data.ACL; +import org.apache.storm.thrift.TBase; +import org.apache.storm.thrift.TDeserializer; +import org.apache.storm.thrift.TException; +import org.apache.storm.thrift.TSerializer; +import org.yaml.snakeyaml.Yaml; +import org.yaml.snakeyaml.constructor.SafeConstructor; + import com.twitter.heron.common.basics.TypeUtils; // import org.json.simple.JSONValue; +@SuppressWarnings({"unchecked", "rawtypes"}) public final class Utils { public static final String DEFAULT_STREAM_ID = com.twitter.heron.api.utils.Utils.DEFAULT_STREAM_ID; @@ -92,10 +123,188 @@ public static byte[] serialize(Object obj) { return com.twitter.heron.api.utils.Utils.serialize(obj); } + public static byte[] javaSerialize(Object obj) { + return serialize(obj); + } + public static Object deserialize(byte[] serialized) { return com.twitter.heron.api.utils.Utils.deserialize(serialized); } + private static ThreadLocal threadSer = new ThreadLocal(); + + public static byte[] thriftSerialize(TBase t) { + try { + TSerializer ser = threadSer.get(); + if (ser == null) { + ser = new TSerializer(); + threadSer.set(ser); + } + return ser.serialize(t); + } catch (TException e) { + throw new RuntimeException(e); + } + } + + public static T thriftDeserialize(Class c, byte[] b) { + try { + return Utils.thriftDeserialize(c, b, 0, b.length); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private static T thriftDeserialize(Class c, byte[] b, int offset, int length) { + try { + T ret = (T) c.newInstance(); + TDeserializer des = getDes(); + des.deserialize((TBase) ret, b, offset, length); + return ret; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private static ThreadLocal threadDes = new ThreadLocal(); + + private static TDeserializer getDes() { + TDeserializer des = threadDes.get(); + if (des == null) { + des = new TDeserializer(); + threadDes.set(des); + } + return des; + } + + private static ClassLoader cl = null; + + public static T javaDeserialize(byte[] serialized, Class clazz) { + try { + ByteArrayInputStream bis = new ByteArrayInputStream(serialized); + ObjectInputStream ois = null; + if (null == cl) { + ois = new ObjectInputStream(bis); + } else { + // Use custom class loader set in testing environment + ois = new ClassLoaderObjectInputStream(cl, bis); + } + Object ret = ois.readObject(); + ois.close(); + return (T) ret; + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + } + + public static String join(Iterable coll, String sep) { + Iterator it = coll.iterator(); + StringBuilder ret = new StringBuilder(); + while (it.hasNext()) { + ret.append(it.next()); + if (it.hasNext()) { + ret.append(sep); + } + } + return ret.toString(); + } + + public static CuratorFramework newCuratorStarted(Map conf, List servers, + Object port, String root, + ZookeeperAuthInfo auth) { + CuratorFramework ret = newCurator(conf, servers, port, root, auth); + ret.start(); + return ret; + } + + public static CuratorFramework newCuratorStarted(Map conf, List servers, Object port, ZookeeperAuthInfo auth) { + CuratorFramework ret = newCurator(conf, servers, port, auth); + ret.start(); + return ret; + } + + public static CuratorFramework newCurator(Map conf, List servers, Object port, ZookeeperAuthInfo auth) { + return newCurator(conf, servers, port, "", auth); + } + + public static CuratorFramework newCurator(Map conf, List servers, Object port, String root, ZookeeperAuthInfo auth) { + List serverPorts = new ArrayList(); + for (String zkServer : servers) { + serverPorts.add(zkServer + ":" + Utils.getInt(port)); + } + String zkStr = StringUtils.join(serverPorts, ",") + root; + CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); + + setupBuilder(builder, zkStr, conf, auth); + + return builder.build(); + } + + protected static void setupBuilder(CuratorFrameworkFactory.Builder builder, final String zkStr, Map conf, ZookeeperAuthInfo auth) { + List exhibitorServers = getStrings(conf.get(Config.STORM_EXHIBITOR_SERVERS)); + if (!exhibitorServers.isEmpty()) { + // use exhibitor servers + builder.ensembleProvider(new ExhibitorEnsembleProvider( + new Exhibitors(exhibitorServers, Utils.getInt(conf.get(Config.STORM_EXHIBITOR_PORT)), + new Exhibitors.BackupConnectionStringProvider() { + @Override + public String getBackupConnectionString() throws Exception { + // use zk servers as backup if they exist + return zkStr; + } + }), + new DefaultExhibitorRestClient(), + Utils.getString(conf.get(Config.STORM_EXHIBITOR_URIPATH)), + Utils.getInt(conf.get(Config.STORM_EXHIBITOR_POLL)), + new StormBoundedExponentialBackoffRetry( + Utils.getInt(conf.get(Config.STORM_EXHIBITOR_RETRY_INTERVAL)), + Utils.getInt(conf.get(Config.STORM_EXHIBITOR_RETRY_INTERVAL_CEILING)), + Utils.getInt(conf.get(Config.STORM_EXHIBITOR_RETRY_TIMES))))); + } else { + builder.connectString(zkStr); + } + builder + .connectionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT))) + .sessionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT))) + .retryPolicy(new StormBoundedExponentialBackoffRetry( + Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL)), + Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING)), + Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)))); + + if (auth != null && auth.scheme != null && auth.payload != null) { + builder.authorization(auth.scheme, auth.payload); + } + } + + public static List getStrings(final Object o) { + if (o == null) { + return new ArrayList(); + } else if (o instanceof String) { + return new ArrayList() { + private static final long serialVersionUID = -2182685021645675803L; + { add((String) o); }}; + } else if (o instanceof Collection) { + List answer = new ArrayList(); + for (Object v : (Collection) o) { + answer.add(v.toString()); + } + return answer; + } else { + throw new IllegalArgumentException("Don't know how to convert to string list"); + } + } + + public static String getString(Object o) { + if (null == o) { + throw new IllegalArgumentException("Don't know how to convert null to String"); + } + return o.toString(); + } + public static List getWorkerACL(Map conf) { + return null; //TODO: implement ACL support + } + public static Integer getInt(Object o) { return TypeUtils.getInteger(o); } @@ -115,4 +324,97 @@ public static byte[] toByteArray(ByteBuffer buffer) { public static T get(Map m, S key, T defaultValue) { return com.twitter.heron.api.utils.Utils.get(m, key, defaultValue); } + + public static Map readStormConfig() { + Map ret = readDefaultConfig(); + String confFile = System.getProperty("storm.conf.file"); + Map storm; + if (confFile == null || confFile.equals("")) { + storm = findAndReadConfigFile("storm.yaml", false); + } else { + storm = findAndReadConfigFile(confFile, true); + } + ret.putAll(storm); + ret.putAll(readCommandLineOpts()); + return ret; + } + + public static Map readDefaultConfig() { + return findAndReadConfigFile("defaults.yaml", true); + } + + private static Map findAndReadConfigFile(String name, boolean mustExist) { + InputStream in = null; + boolean confFileEmpty = false; + try { + in = getConfigFileInputStream(name); + if (null != in) { + Yaml yaml = new Yaml(new SafeConstructor()); + Map ret = (Map) yaml.load(new InputStreamReader(in)); + if (null != ret) { + return new HashMap(ret); + } else { + confFileEmpty = true; + } + } + + if (mustExist) { + if(confFileEmpty) + throw new RuntimeException("Config file " + name + " doesn't have any valid storm configs"); + else + throw new RuntimeException("Could not find config file on classpath " + name); + } else { + return new HashMap(); + } + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + if (null != in) { + try { + in.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + } + + private static InputStream getConfigFileInputStream(String configFilePath) + throws IOException { + if (null == configFilePath) { + throw new IOException( + "Could not find config file, name not specified"); + } + + HashSet resources = new HashSet<>(findResources(configFilePath)); + if (resources.isEmpty()) { + File configFile = new File(configFilePath); + if (configFile.exists()) { + return new FileInputStream(configFile); + } + } else if (resources.size() > 1) { + throw new IOException( + "Found multiple " + configFilePath + + " resources. You're probably bundling the Storm jars with your topology jar. " + + resources); + } else { +// LOG.debug("Using "+configFilePath+" from resources"); + URL resource = resources.iterator().next(); + return resource.openStream(); + } + return null; + } + + private static List findResources(String name) { + try { + Enumeration resources = Thread.currentThread().getContextClassLoader().getResources(name); + List ret = new ArrayList<>(); + while (resources.hasMoreElements()) { + ret.add(resources.nextElement()); + } + return ret; + } catch(IOException e) { + throw new RuntimeException(e); + } + } }