Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Further changes to make ES 5.0 work #597

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -296,49 +296,6 @@ public String nativeCommand(List<String> arguments) {
+ "\" nobody";
}

public List<String> esArguments(ClusterState clusterState, Protos.DiscoveryInfo discoveryInfo, Protos.SlaveID slaveID) {
List<String> args = new ArrayList<>();
List<Protos.TaskInfo> taskList = clusterState.getTaskList();
String hostAddress = "";
if (taskList.size() > 0) {
Protos.TaskInfo taskInfo = taskList.get(0);
String taskId = taskInfo.getTaskId().getValue();
InetSocketAddress transportAddress = clusterState.getGuiTaskList().get(taskId).getTransportAddress();
hostAddress = NetworkUtils.addressToString(transportAddress, getIsUseIpAddress()).replace("http://", "");
}
addIfNotEmpty(args, "--default.discovery.zen.ping.unicast.hosts", hostAddress);
args.add("--default.http.port=" + discoveryInfo.getPorts().getPorts(Discovery.CLIENT_PORT_INDEX).getNumber());
args.add("--default.transport.tcp.port=" + discoveryInfo.getPorts().getPorts(Discovery.TRANSPORT_PORT_INDEX).getNumber());
args.add("--default.cluster.name=" + getElasticsearchClusterName());
args.add("--default.node.master=true");
args.add("--default.node.data=true");
args.add("--default.node.local=false");
args.add("--default.index.number_of_replicas=0");
args.add("--default.index.auto_expand_replicas=0-all");
if (!isFrameworkUseDocker()) {
String taskSpecificDataDir = taskSpecificHostDir(slaveID);
args.add("--path.home=" + HOST_PATH_HOME); // Cannot be overidden
args.add("--default.path.data=" + taskSpecificDataDir);
args.add("--path.conf=" + HOST_PATH_CONF); // Cannot be overidden
} else {
args.add("--path.data=" + CONTAINER_PATH_DATA); // Cannot be overidden
}
args.add("--default.bootstrap.mlockall=true");
args.add("--default.network.bind_host=0.0.0.0");
args.add("--default.network.publish_host=_non_loopback:ipv4_");
args.add("--default.gateway.recover_after_nodes=1");
args.add("--default.gateway.expected_nodes=1");
args.add("--default.indices.recovery.max_bytes_per_sec=100mb");
args.add("--default.discovery.type=zen");
args.add("--default.discovery.zen.fd.ping_timeout=30s");
args.add("--default.discovery.zen.fd.ping_interval=1s");
args.add("--default.discovery.zen.fd.ping_retries=30");
args.add("--default.discovery.zen.ping.multicast.enabled=false");


return args;
}

public String taskSpecificHostDir(Protos.SlaveID slaveID) {
return getDataDir() + "/" + getElasticsearchClusterName() + "/" + slaveID.getValue();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,14 @@ private Protos.TaskInfo buildNativeTask(Protos.Offer offer, Configuration config

LOGGER.info("Creating Elasticsearch task with resources: " + resources.toString());

final List<String> args = configuration.esArguments(clusterState, discovery, offer.getSlaveId());

return Protos.TaskInfo.newBuilder()
.setName(configuration.getTaskName())
.setData(toData(offer.getHostname(), hostAddress, clock.nowUTC()))
.setTaskId(Protos.TaskID.newBuilder().setValue(taskId(offer, clock)))
.setSlaveId(offer.getSlaveId())
.addAllResources(resources)
.setDiscovery(discovery)
.setCommand(nativeCommand(configuration, args, elasticSearchNodeId))
.setCommand(nativeCommand(configuration, new List<String>(), elasticSearchNodeId))
.build();
}

Expand All @@ -97,7 +95,6 @@ private Protos.TaskInfo buildDockerTask(Protos.Offer offer, Configuration config
LOGGER.info("Creating Elasticsearch task with resources: " + resources.toString());

final Protos.TaskID taskId = Protos.TaskID.newBuilder().setValue(taskId(offer, clock)).build();
final List<String> args = configuration.esArguments(clusterState, discovery, offer.getSlaveId());
final Protos.ContainerInfo containerInfo = getContainer(configuration, taskId, elasticSearchNodeId, offer.getSlaveId());

return Protos.TaskInfo.newBuilder()
Expand All @@ -107,7 +104,7 @@ private Protos.TaskInfo buildDockerTask(Protos.Offer offer, Configuration config
.setSlaveId(offer.getSlaveId())
.addAllResources(resources)
.setDiscovery(discovery)
.setCommand(dockerCommand(configuration, args, elasticSearchNodeId))
.setCommand(dockerCommand(configuration, new List<String>(), elasticSearchNodeId))
.setContainer(containerInfo)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,10 @@ public List<Protos.Environment.Variable> getList() {
* @param configuration
*/
private void populateEnvMap(Configuration configuration) {
addToList(ES_HEAP, getHeapSpaceString(configuration));
addToList(ES_JAVA_OPTS, getHeapSpaceString(configuration));
if (configuration.isFrameworkUseDocker()) {
addToList(native_mesos_library_key, native_mesos_library_path);
}
addToList(ES_JAVA_OPTS, getHeapSpaceString(configuration, 192));
}

private void populateEnvMapForMesos(Configuration configuration, Long nodeId) {
Expand Down Expand Up @@ -104,24 +103,14 @@ private Protos.Environment.Variable getEnvProto(String key, String value) {
.setValue(value).build();
}

/**
* Gets the heap space settings. Will set heap space to (available - 256MB) or available/4, whichever is smaller.
* @param configuration The mesos cluster configuration
* @return A string representing the java heap space.
*/
private String getHeapSpaceString(Configuration configuration) {
int osRam = (int) Math.min(256.0, configuration.getMem() / 4.0);
return "" + ((int) configuration.getMem() - osRam) + "m";
}

/**
* Gets the heap space settings. Will set minimum heap space as 256, minimum or available/4, whichever is smaller. Max heap will be available space.
* @param configuration The mesos cluster configuration
* @param min The minimum heap space; used if smaller than 256 and smaller than available/4
* @return A string representing the java heap space.
*/
private String getHeapSpaceString(Configuration configuration, int min) {
int osRam = (int) Math.min(256.0, min, configuration.getMem() / 4.0);
return "-Xms" + osRam + "m -Xmx"+ configuration.getMem() + "m";
private String getHeapSpaceString(Configuration configuration) {
int osRam = (int) Math.min(256.0, configuration.getMem() / 4.0);
return "-Xms" + osRam + "m -Xmx" + osRam + "m";
}
}