Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Add APIs to control packing/repacking algorithm and change instance resources #2059

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 78 additions & 1 deletion heron/api/src/java/com/twitter/heron/api/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import javax.xml.bind.DatatypeConverter;

import com.twitter.heron.common.basics.ByteAmount;
import com.twitter.heron.common.basics.PackingAlgorithmType;
import com.twitter.heron.common.basics.RepackingAlgorithmType;
import com.twitter.heron.common.basics.TypeUtils;

/**
Expand Down Expand Up @@ -229,6 +231,38 @@ public class Config extends HashMap<String, Object> {
public static final String TOPOLOGY_UPDATE_REACTIVATE_WAIT_SECS =
"topology.update.reactivate.wait.secs";

/**
* Packing algorithm used to calculate packing plan
*/
public static final String TOPOLOGY_PACKING_ALGORITHM =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really doubt if we need the packing algorithm to be configurable by users. By the original design, it's a system wide property which should only be set by the heron administrator.

Also, how are the ram, cpu and disk resource configs used? It seems these config doesn't tell the difference for different components. It applies all components' instances for a topology. Correct me if my understanding is wrong here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could see having multiple packing implementation supported, where power users could specify a non-default based on their needs. Or even write one of their own.

"heron.class.packing.algorithm";

/**
* Repacking algorithm used to recalculate packing plan
*/
public static final String TOPOLOGY_REPACKING_ALGORITHM =
"heron.class.repacking.algorithm";

/**
* Amount of RAM per instance to be reserved for this topology.
* In bytes.
*/
public static final String TOPOLOGY_INSTANCE_RAM_REQUESTED =
"heron.resources.instance.ram";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please append with .bytes to make units explicit. same for disk settings.


/**
* Number of cpu cores per instance to be reserved for this topology
*/
public static final String TOPOLOGY_INSTANCE_CPU_REQUESTED =
"heron.resources.instance.cpu";

/**
* Amount of disk per instance to be reserved for this topology.
* In bytes.
*/
public static final String TOPOLOGY_INSTANCE_DISK_REQUESTED =
"heron.resources.instance.disk";

private static final long serialVersionUID = 2550967708478837032L;
// We maintain a list of all user exposed vars
private static Set<String> apiVars = new HashSet<>();
Expand All @@ -241,7 +275,6 @@ public class Config extends HashMap<String, Object> {
apiVars.add(TOPOLOGY_MAX_SPOUT_PENDING);
apiVars.add(TOPOLOGY_WORKER_CHILDOPTS);
apiVars.add(TOPOLOGY_COMPONENT_JVMOPTS);
apiVars.add(TOPOLOGY_SERIALIZER_CLASSNAME);
apiVars.add(TOPOLOGY_TICK_TUPLE_FREQ_SECS);
apiVars.add(TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS);
apiVars.add(TOPOLOGY_ENABLE_ACKING);
Expand All @@ -265,6 +298,8 @@ public class Config extends HashMap<String, Object> {
apiVars.add(TOPOLOGY_ADDITIONAL_CLASSPATH);
apiVars.add(TOPOLOGY_UPDATE_DEACTIVATE_WAIT_SECS);
apiVars.add(TOPOLOGY_UPDATE_REACTIVATE_WAIT_SECS);
apiVars.add(TOPOLOGY_PACKING_ALGORITHM);
apiVars.add(TOPOLOGY_REPACKING_ALGORITHM);
}

public Config() {
Expand Down Expand Up @@ -452,6 +487,28 @@ public static void setTopologyExactlyOnceEnabled(Map<String, Object> conf, boole
conf.put(Config.TOPOLOGY_EXACTLYONCE_ENABLED, String.valueOf(exactOnce));
}

public static void setTopologyPackingAlgorithm(Map<String, Object> conf,
PackingAlgorithmType type) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The packing type is just a Class. It needs to be extensible so users can implement their own so an enum won't work.

public static void setTopologyPackingClass(Map<String, Object> conf,
    Class<? extends IPacking> packingClass);
public static void setTopologyRepackingClass(Map<String, Object> conf,
    Class<? extends IRepacking> repackingClass);

Copy link
Contributor Author

@objmagic objmagic Jul 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was worried if doing this needs to pull in lots of dependencies from schedulers. But I can try though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would cause a problem, since IPacking and IRepacking are in spi, which depends on api, but not the other way around. We could considering moving these interfaces into api. @kramasamy do you have an opinion on this one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. I was not clear in my message above: yes, I was worried about circular dependencies as well when reading BUILD files.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding packaging of IPacking and IRepacking, is the distribution of interfaces between api and spi packages needed? Given the current structure, moving these interfaces may create confusion. These interfaces are closely linked to other components like IScheduler.

We could avoid future movements of interfaces used by users by keeping all the interfaces implemented by custom code in api package?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the intent (which makes sense to me) is that api is where topology-level APIs are that topology developers might implement or specify (i.e., IBolt, ISpout, etc). The spi package is where system-level APIs live that would be implemented by administrating teams for a new scheduler or state manager for example.

I think the packing algorithms could be implemented or at least specified by topology authors at the topology level. Hence it makes sense for those interfaces to move to api, but not others perse.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @billonahill I agree with the interpretation of the api and spi packages.

I think packing configuration is rarely provided by topology developer. It is rather commonly specified by the administrator or the member deploying the topology via yaml config files or command line arguments. To me this fits the definition of system-level API. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the packing impls should be defaulted at the system level, but I certainly see the need to be able to override them at the topology level. We've recommended to some topology owners to specify a different packing impl to change the way resources are allocated for their specific job, which seems reasonable to me.

conf.put(Config.TOPOLOGY_PACKING_ALGORITHM, type.toString());
}

public static void setTopologyRepackingAlgorithm(Map<String, Object> conf,
RepackingAlgorithmType type) {
conf.put(Config.TOPOLOGY_REPACKING_ALGORITHM, type.toString());
}

public static void setInstanceCpuRequested(Map<String, Object> conf, float ncpus) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/ncpus/cpus/g here and below

conf.put(Config.TOPOLOGY_INSTANCE_CPU_REQUESTED, Float.toString(ncpus));
}

public static void setInstanceDiskRequested(Map<String, Object> conf, ByteAmount nbytes) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/nbytes/byteAmount/g here and below.

conf.put(Config.TOPOLOGY_INSTANCE_DISK_REQUESTED, Long.toString(nbytes.asBytes()));
}

public static void setInstanceRamRequested(Map<String, Object> conf, ByteAmount nbytes) {
conf.put(Config.TOPOLOGY_INSTANCE_RAM_REQUESTED, Long.toString(nbytes.asBytes()));
}

public void setDebug(boolean isOn) {
setDebug(this, isOn);
}
Expand Down Expand Up @@ -589,4 +646,24 @@ public void setTopologyStatefulStartClean(boolean clean) {
public void setTopologyExactlyOnceEnabled(boolean exactOnce) {
setTopologyExactlyOnceEnabled(this, exactOnce);
}

public void setTopologyPackingAlgorithm(PackingAlgorithmType type) {
setTopologyPackingAlgorithm(this, type);
}

public void setTopologyRepackingAlgorithm(RepackingAlgorithmType type) {
setTopologyRepackingAlgorithm(this, type);
}

public void setInstanceCpuRequested(float ncpus) {
setInstanceCpuRequested(this, ncpus);
}

public void setInstanceDiskRequested(ByteAmount nbytes) {
setInstanceDiskRequested(this, nbytes);
}

public void setInstanceRamRequested(ByteAmount nbytes) {
setInstanceRamRequested(this, nbytes);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2017 Twitter. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.twitter.heron.common.basics;

/***
* This enum defines packing algorithm type
*/
public enum PackingAlgorithmType {
ROUNDROBIN,
RESOURCE_COMPLIANT_ROUNDROBIN,
FIRST_FIT_DECREASING;

@Override
public String toString() {
switch (this) {
case ROUNDROBIN:
return "com.twitter.heron.packing.roundrobin.RoundRobinPacking";
case RESOURCE_COMPLIANT_ROUNDROBIN:
return "com.twitter.heron.packing.roundrobin.ResourceCompliantRRPacking";
case FIRST_FIT_DECREASING:
return "com.twitter.heron.packing.binpacking.FirstFitDecreasingPacking";
default:
throw new RuntimeException(String.format("Unknown packing algorithm: %s",
this.toString()));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2017 Twitter. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.twitter.heron.common.basics;

/***
* This enum defines packing algorithm type
*/
public enum RepackingAlgorithmType {
RESOURCE_COMPLIANT_ROUNDROBIN,
FIRST_FIT_DECREASING;

@Override
public String toString() {
switch (this) {
case RESOURCE_COMPLIANT_ROUNDROBIN:
return "com.twitter.heron.packing.roundrobin.ResourceCompliantRRPacking";
case FIRST_FIT_DECREASING:
return "com.twitter.heron.packing.binpacking.FirstFitDecreasingPacking";
default:
throw new RuntimeException(String.format("Unknown repacking algorithm: %s",
this.toString()));
}
}
}