From b8fb456b47ff9cd21fe7726bc1f761ed21639ad6 Mon Sep 17 00:00:00 2001 From: Runhang Li Date: Thu, 13 Jul 2017 11:09:31 -0700 Subject: [PATCH 1/2] API to control packing algorithm. --- .../java/com/twitter/heron/api/Config.java | 17 ++++++++ .../common/basics/PackingAlgorithmType.java | 39 +++++++++++++++++++ .../heron/examples/WordCountTopology.java | 2 + 3 files changed, 58 insertions(+) create mode 100644 heron/common/src/java/com/twitter/heron/common/basics/PackingAlgorithmType.java diff --git a/heron/api/src/java/com/twitter/heron/api/Config.java b/heron/api/src/java/com/twitter/heron/api/Config.java index f31f43125e9..cc03b7df2d4 100644 --- a/heron/api/src/java/com/twitter/heron/api/Config.java +++ b/heron/api/src/java/com/twitter/heron/api/Config.java @@ -24,6 +24,7 @@ import javax.xml.bind.DatatypeConverter; import com.twitter.heron.common.basics.ByteAmount; +import com.twitter.heron.common.basics.PackingAlgorithmType; import com.twitter.heron.common.basics.TypeUtils; /** @@ -229,6 +230,12 @@ public class Config extends HashMap { public static final String TOPOLOGY_UPDATE_REACTIVATE_WAIT_SECS = "topology.update.reactivate.wait.secs"; + /** + * Packing algorithm used to calculate packing plan + */ + public static final String TOPOLOGY_PACKING_ALGORITHM = + "heron.class.packing.algorithm"; + private static final long serialVersionUID = 2550967708478837032L; // We maintain a list of all user exposed vars private static Set apiVars = new HashSet<>(); @@ -265,6 +272,7 @@ public class Config extends HashMap { apiVars.add(TOPOLOGY_ADDITIONAL_CLASSPATH); apiVars.add(TOPOLOGY_UPDATE_DEACTIVATE_WAIT_SECS); apiVars.add(TOPOLOGY_UPDATE_REACTIVATE_WAIT_SECS); + apiVars.add(TOPOLOGY_PACKING_ALGORITHM); } public Config() { @@ -452,6 +460,11 @@ public static void setTopologyExactlyOnceEnabled(Map conf, boole conf.put(Config.TOPOLOGY_EXACTLYONCE_ENABLED, String.valueOf(exactOnce)); } + public static void setTopologyPackingAlgorithm(Map conf, + PackingAlgorithmType type) { + conf.put(Config.TOPOLOGY_PACKING_ALGORITHM, type.toString()); + } + public void setDebug(boolean isOn) { setDebug(this, isOn); } @@ -589,4 +602,8 @@ public void setTopologyStatefulStartClean(boolean clean) { public void setTopologyExactlyOnceEnabled(boolean exactOnce) { setTopologyExactlyOnceEnabled(this, exactOnce); } + + public void setTopologyPackingAlgorithm(PackingAlgorithmType type) { + setTopologyPackingAlgorithm(this, type); + } } diff --git a/heron/common/src/java/com/twitter/heron/common/basics/PackingAlgorithmType.java b/heron/common/src/java/com/twitter/heron/common/basics/PackingAlgorithmType.java new file mode 100644 index 00000000000..c46a4629c34 --- /dev/null +++ b/heron/common/src/java/com/twitter/heron/common/basics/PackingAlgorithmType.java @@ -0,0 +1,39 @@ +// Copyright 2017 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.twitter.heron.common.basics; + +/*** + * This enum defines packing algorithm type + */ +public enum PackingAlgorithmType { + ROUNDROBIN, + RESOURCE_COMPLIANT_ROUNDROBIN, + FIRST_FIT_DECREASING; + + @Override + public String toString() { + switch (this) { + case ROUNDROBIN: + return "com.twitter.heron.packing.roundrobin.RoundRobinPacking"; + case RESOURCE_COMPLIANT_ROUNDROBIN: + return "com.twitter.heron.packing.roundrobin.ResourceCompliantRRPacking"; + case FIRST_FIT_DECREASING: + return "com.twitter.heron.packing.binpacking.FirstFitDecreasingPacking"; + default: + throw new RuntimeException(String.format("Unknown packing algorithm: %s", + this.toString())); + } + } +} diff --git a/heron/examples/src/java/com/twitter/heron/examples/WordCountTopology.java b/heron/examples/src/java/com/twitter/heron/examples/WordCountTopology.java index 01a0a3cfded..55c14f412ab 100644 --- a/heron/examples/src/java/com/twitter/heron/examples/WordCountTopology.java +++ b/heron/examples/src/java/com/twitter/heron/examples/WordCountTopology.java @@ -19,6 +19,7 @@ import java.util.Random; import com.twitter.heron.common.basics.ByteAmount; +import com.twitter.heron.common.basics.PackingAlgorithmType; import backtype.storm.Config; import backtype.storm.StormSubmitter; @@ -183,6 +184,7 @@ public static void main(String[] args) throws AlreadyAliveException, InvalidTopo com.twitter.heron.api.Config.setComponentRam(conf, "word", ByteAmount.fromGigabytes(2)); com.twitter.heron.api.Config.setComponentRam(conf, "consumer", ByteAmount.fromGigabytes(3)); com.twitter.heron.api.Config.setContainerCpuRequested(conf, 6); + com.twitter.heron.api.Config.setTopologyPackingAlgorithm(conf, PackingAlgorithmType.FIRST_FIT_DECREASING); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } From 719b767a16acb4845ba78772e6ef6eb5c356b0f4 Mon Sep 17 00:00:00 2001 From: Runhang Li Date: Thu, 13 Jul 2017 13:46:01 -0700 Subject: [PATCH 2/2] API to control repacking and instance resources. --- .../java/com/twitter/heron/api/Config.java | 62 ++++++++++++++++++- .../common/basics/RepackingAlgorithmType.java | 36 +++++++++++ .../heron/examples/WordCountTopology.java | 2 - 3 files changed, 97 insertions(+), 3 deletions(-) create mode 100644 heron/common/src/java/com/twitter/heron/common/basics/RepackingAlgorithmType.java diff --git a/heron/api/src/java/com/twitter/heron/api/Config.java b/heron/api/src/java/com/twitter/heron/api/Config.java index cc03b7df2d4..1b1e64ef856 100644 --- a/heron/api/src/java/com/twitter/heron/api/Config.java +++ b/heron/api/src/java/com/twitter/heron/api/Config.java @@ -25,6 +25,7 @@ import com.twitter.heron.common.basics.ByteAmount; import com.twitter.heron.common.basics.PackingAlgorithmType; +import com.twitter.heron.common.basics.RepackingAlgorithmType; import com.twitter.heron.common.basics.TypeUtils; /** @@ -236,6 +237,32 @@ public class Config extends HashMap { public static final String TOPOLOGY_PACKING_ALGORITHM = "heron.class.packing.algorithm"; + /** + * Repacking algorithm used to recalculate packing plan + */ + public static final String TOPOLOGY_REPACKING_ALGORITHM = + "heron.class.repacking.algorithm"; + + /** + * Amount of RAM per instance to be reserved for this topology. + * In bytes. + */ + public static final String TOPOLOGY_INSTANCE_RAM_REQUESTED = + "heron.resources.instance.ram"; + + /** + * Number of cpu cores per instance to be reserved for this topology + */ + public static final String TOPOLOGY_INSTANCE_CPU_REQUESTED = + "heron.resources.instance.cpu"; + + /** + * Amount of disk per instance to be reserved for this topology. + * In bytes. + */ + public static final String TOPOLOGY_INSTANCE_DISK_REQUESTED = + "heron.resources.instance.disk"; + private static final long serialVersionUID = 2550967708478837032L; // We maintain a list of all user exposed vars private static Set apiVars = new HashSet<>(); @@ -248,7 +275,6 @@ public class Config extends HashMap { apiVars.add(TOPOLOGY_MAX_SPOUT_PENDING); apiVars.add(TOPOLOGY_WORKER_CHILDOPTS); apiVars.add(TOPOLOGY_COMPONENT_JVMOPTS); - apiVars.add(TOPOLOGY_SERIALIZER_CLASSNAME); apiVars.add(TOPOLOGY_TICK_TUPLE_FREQ_SECS); apiVars.add(TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS); apiVars.add(TOPOLOGY_ENABLE_ACKING); @@ -273,6 +299,7 @@ public class Config extends HashMap { apiVars.add(TOPOLOGY_UPDATE_DEACTIVATE_WAIT_SECS); apiVars.add(TOPOLOGY_UPDATE_REACTIVATE_WAIT_SECS); apiVars.add(TOPOLOGY_PACKING_ALGORITHM); + apiVars.add(TOPOLOGY_REPACKING_ALGORITHM); } public Config() { @@ -465,6 +492,23 @@ public static void setTopologyPackingAlgorithm(Map conf, conf.put(Config.TOPOLOGY_PACKING_ALGORITHM, type.toString()); } + public static void setTopologyRepackingAlgorithm(Map conf, + RepackingAlgorithmType type) { + conf.put(Config.TOPOLOGY_REPACKING_ALGORITHM, type.toString()); + } + + public static void setInstanceCpuRequested(Map conf, float ncpus) { + conf.put(Config.TOPOLOGY_INSTANCE_CPU_REQUESTED, Float.toString(ncpus)); + } + + public static void setInstanceDiskRequested(Map conf, ByteAmount nbytes) { + conf.put(Config.TOPOLOGY_INSTANCE_DISK_REQUESTED, Long.toString(nbytes.asBytes())); + } + + public static void setInstanceRamRequested(Map conf, ByteAmount nbytes) { + conf.put(Config.TOPOLOGY_INSTANCE_RAM_REQUESTED, Long.toString(nbytes.asBytes())); + } + public void setDebug(boolean isOn) { setDebug(this, isOn); } @@ -606,4 +650,20 @@ public void setTopologyExactlyOnceEnabled(boolean exactOnce) { public void setTopologyPackingAlgorithm(PackingAlgorithmType type) { setTopologyPackingAlgorithm(this, type); } + + public void setTopologyRepackingAlgorithm(RepackingAlgorithmType type) { + setTopologyRepackingAlgorithm(this, type); + } + + public void setInstanceCpuRequested(float ncpus) { + setInstanceCpuRequested(this, ncpus); + } + + public void setInstanceDiskRequested(ByteAmount nbytes) { + setInstanceDiskRequested(this, nbytes); + } + + public void setInstanceRamRequested(ByteAmount nbytes) { + setInstanceRamRequested(this, nbytes); + } } diff --git a/heron/common/src/java/com/twitter/heron/common/basics/RepackingAlgorithmType.java b/heron/common/src/java/com/twitter/heron/common/basics/RepackingAlgorithmType.java new file mode 100644 index 00000000000..61815c4968e --- /dev/null +++ b/heron/common/src/java/com/twitter/heron/common/basics/RepackingAlgorithmType.java @@ -0,0 +1,36 @@ +// Copyright 2017 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.twitter.heron.common.basics; + +/*** + * This enum defines packing algorithm type + */ +public enum RepackingAlgorithmType { + RESOURCE_COMPLIANT_ROUNDROBIN, + FIRST_FIT_DECREASING; + + @Override + public String toString() { + switch (this) { + case RESOURCE_COMPLIANT_ROUNDROBIN: + return "com.twitter.heron.packing.roundrobin.ResourceCompliantRRPacking"; + case FIRST_FIT_DECREASING: + return "com.twitter.heron.packing.binpacking.FirstFitDecreasingPacking"; + default: + throw new RuntimeException(String.format("Unknown repacking algorithm: %s", + this.toString())); + } + } +} diff --git a/heron/examples/src/java/com/twitter/heron/examples/WordCountTopology.java b/heron/examples/src/java/com/twitter/heron/examples/WordCountTopology.java index 55c14f412ab..01a0a3cfded 100644 --- a/heron/examples/src/java/com/twitter/heron/examples/WordCountTopology.java +++ b/heron/examples/src/java/com/twitter/heron/examples/WordCountTopology.java @@ -19,7 +19,6 @@ import java.util.Random; import com.twitter.heron.common.basics.ByteAmount; -import com.twitter.heron.common.basics.PackingAlgorithmType; import backtype.storm.Config; import backtype.storm.StormSubmitter; @@ -184,7 +183,6 @@ public static void main(String[] args) throws AlreadyAliveException, InvalidTopo com.twitter.heron.api.Config.setComponentRam(conf, "word", ByteAmount.fromGigabytes(2)); com.twitter.heron.api.Config.setComponentRam(conf, "consumer", ByteAmount.fromGigabytes(3)); com.twitter.heron.api.Config.setContainerCpuRequested(conf, 6); - com.twitter.heron.api.Config.setTopologyPackingAlgorithm(conf, PackingAlgorithmType.FIRST_FIT_DECREASING); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); }