diff --git a/heron/api/src/java/com/twitter/heron/api/Config.java b/heron/api/src/java/com/twitter/heron/api/Config.java index f31f43125e9..1b1e64ef856 100644 --- a/heron/api/src/java/com/twitter/heron/api/Config.java +++ b/heron/api/src/java/com/twitter/heron/api/Config.java @@ -24,6 +24,8 @@ import javax.xml.bind.DatatypeConverter; import com.twitter.heron.common.basics.ByteAmount; +import com.twitter.heron.common.basics.PackingAlgorithmType; +import com.twitter.heron.common.basics.RepackingAlgorithmType; import com.twitter.heron.common.basics.TypeUtils; /** @@ -229,6 +231,38 @@ public class Config extends HashMap { public static final String TOPOLOGY_UPDATE_REACTIVATE_WAIT_SECS = "topology.update.reactivate.wait.secs"; + /** + * Packing algorithm used to calculate packing plan + */ + public static final String TOPOLOGY_PACKING_ALGORITHM = + "heron.class.packing.algorithm"; + + /** + * Repacking algorithm used to recalculate packing plan + */ + public static final String TOPOLOGY_REPACKING_ALGORITHM = + "heron.class.repacking.algorithm"; + + /** + * Amount of RAM per instance to be reserved for this topology. + * In bytes. + */ + public static final String TOPOLOGY_INSTANCE_RAM_REQUESTED = + "heron.resources.instance.ram"; + + /** + * Number of cpu cores per instance to be reserved for this topology + */ + public static final String TOPOLOGY_INSTANCE_CPU_REQUESTED = + "heron.resources.instance.cpu"; + + /** + * Amount of disk per instance to be reserved for this topology. + * In bytes. + */ + public static final String TOPOLOGY_INSTANCE_DISK_REQUESTED = + "heron.resources.instance.disk"; + private static final long serialVersionUID = 2550967708478837032L; // We maintain a list of all user exposed vars private static Set apiVars = new HashSet<>(); @@ -241,7 +275,6 @@ public class Config extends HashMap { apiVars.add(TOPOLOGY_MAX_SPOUT_PENDING); apiVars.add(TOPOLOGY_WORKER_CHILDOPTS); apiVars.add(TOPOLOGY_COMPONENT_JVMOPTS); - apiVars.add(TOPOLOGY_SERIALIZER_CLASSNAME); apiVars.add(TOPOLOGY_TICK_TUPLE_FREQ_SECS); apiVars.add(TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS); apiVars.add(TOPOLOGY_ENABLE_ACKING); @@ -265,6 +298,8 @@ public class Config extends HashMap { apiVars.add(TOPOLOGY_ADDITIONAL_CLASSPATH); apiVars.add(TOPOLOGY_UPDATE_DEACTIVATE_WAIT_SECS); apiVars.add(TOPOLOGY_UPDATE_REACTIVATE_WAIT_SECS); + apiVars.add(TOPOLOGY_PACKING_ALGORITHM); + apiVars.add(TOPOLOGY_REPACKING_ALGORITHM); } public Config() { @@ -452,6 +487,28 @@ public static void setTopologyExactlyOnceEnabled(Map conf, boole conf.put(Config.TOPOLOGY_EXACTLYONCE_ENABLED, String.valueOf(exactOnce)); } + public static void setTopologyPackingAlgorithm(Map conf, + PackingAlgorithmType type) { + conf.put(Config.TOPOLOGY_PACKING_ALGORITHM, type.toString()); + } + + public static void setTopologyRepackingAlgorithm(Map conf, + RepackingAlgorithmType type) { + conf.put(Config.TOPOLOGY_REPACKING_ALGORITHM, type.toString()); + } + + public static void setInstanceCpuRequested(Map conf, float ncpus) { + conf.put(Config.TOPOLOGY_INSTANCE_CPU_REQUESTED, Float.toString(ncpus)); + } + + public static void setInstanceDiskRequested(Map conf, ByteAmount nbytes) { + conf.put(Config.TOPOLOGY_INSTANCE_DISK_REQUESTED, Long.toString(nbytes.asBytes())); + } + + public static void setInstanceRamRequested(Map conf, ByteAmount nbytes) { + conf.put(Config.TOPOLOGY_INSTANCE_RAM_REQUESTED, Long.toString(nbytes.asBytes())); + } + public void setDebug(boolean isOn) { setDebug(this, isOn); } @@ -589,4 +646,24 @@ public void setTopologyStatefulStartClean(boolean clean) { public void setTopologyExactlyOnceEnabled(boolean exactOnce) { setTopologyExactlyOnceEnabled(this, exactOnce); } + + public void setTopologyPackingAlgorithm(PackingAlgorithmType type) { + setTopologyPackingAlgorithm(this, type); + } + + public void setTopologyRepackingAlgorithm(RepackingAlgorithmType type) { + setTopologyRepackingAlgorithm(this, type); + } + + public void setInstanceCpuRequested(float ncpus) { + setInstanceCpuRequested(this, ncpus); + } + + public void setInstanceDiskRequested(ByteAmount nbytes) { + setInstanceDiskRequested(this, nbytes); + } + + public void setInstanceRamRequested(ByteAmount nbytes) { + setInstanceRamRequested(this, nbytes); + } } diff --git a/heron/common/src/java/com/twitter/heron/common/basics/PackingAlgorithmType.java b/heron/common/src/java/com/twitter/heron/common/basics/PackingAlgorithmType.java new file mode 100644 index 00000000000..c46a4629c34 --- /dev/null +++ b/heron/common/src/java/com/twitter/heron/common/basics/PackingAlgorithmType.java @@ -0,0 +1,39 @@ +// Copyright 2017 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.twitter.heron.common.basics; + +/*** + * This enum defines packing algorithm type + */ +public enum PackingAlgorithmType { + ROUNDROBIN, + RESOURCE_COMPLIANT_ROUNDROBIN, + FIRST_FIT_DECREASING; + + @Override + public String toString() { + switch (this) { + case ROUNDROBIN: + return "com.twitter.heron.packing.roundrobin.RoundRobinPacking"; + case RESOURCE_COMPLIANT_ROUNDROBIN: + return "com.twitter.heron.packing.roundrobin.ResourceCompliantRRPacking"; + case FIRST_FIT_DECREASING: + return "com.twitter.heron.packing.binpacking.FirstFitDecreasingPacking"; + default: + throw new RuntimeException(String.format("Unknown packing algorithm: %s", + this.toString())); + } + } +} diff --git a/heron/common/src/java/com/twitter/heron/common/basics/RepackingAlgorithmType.java b/heron/common/src/java/com/twitter/heron/common/basics/RepackingAlgorithmType.java new file mode 100644 index 00000000000..61815c4968e --- /dev/null +++ b/heron/common/src/java/com/twitter/heron/common/basics/RepackingAlgorithmType.java @@ -0,0 +1,36 @@ +// Copyright 2017 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.twitter.heron.common.basics; + +/*** + * This enum defines packing algorithm type + */ +public enum RepackingAlgorithmType { + RESOURCE_COMPLIANT_ROUNDROBIN, + FIRST_FIT_DECREASING; + + @Override + public String toString() { + switch (this) { + case RESOURCE_COMPLIANT_ROUNDROBIN: + return "com.twitter.heron.packing.roundrobin.ResourceCompliantRRPacking"; + case FIRST_FIT_DECREASING: + return "com.twitter.heron.packing.binpacking.FirstFitDecreasingPacking"; + default: + throw new RuntimeException(String.format("Unknown repacking algorithm: %s", + this.toString())); + } + } +}