diff --git a/heron/config/src/yaml/BUILD b/heron/config/src/yaml/BUILD index b7cf4359e48..866db128766 100644 --- a/heron/config/src/yaml/BUILD +++ b/heron/config/src/yaml/BUILD @@ -17,6 +17,11 @@ filegroup( srcs = glob(["conf/local/*.yaml"]), ) +filegroup( + name = "conf-ecs-yaml", + srcs = glob(["conf/ecs/*.yaml"]), +) + filegroup( name = "conf-aurora-yaml", srcs = glob(["conf/aurora/*"]), diff --git a/heron/config/src/yaml/conf/ecs/README b/heron/config/src/yaml/conf/ecs/README new file mode 100644 index 00000000000..f6308aa8159 --- /dev/null +++ b/heron/config/src/yaml/conf/ecs/README @@ -0,0 +1,6 @@ +This folder contains sample configs needed for using running heron on AWS Cluster +Please follow the steps at this google doc for detailed set up and workflow: + +https://docs.google.com/document/d/1ecbCuA46cIKPfY0SP0F1dcRlei4DIPz3pZ6ZSZ5zZgc/edit + +Then you can run Heorn on AWS !!! diff --git a/heron/config/src/yaml/conf/ecs/cleanUp.sh b/heron/config/src/yaml/conf/ecs/cleanUp.sh new file mode 100755 index 00000000000..b7190e3b299 --- /dev/null +++ b/heron/config/src/yaml/conf/ecs/cleanUp.sh @@ -0,0 +1,21 @@ +#!/bin/bash + +aws autoscaling delete-launch-configuration --launch-configuration-name ecs-heron-launch-configuration +echo "done delete-launch-configuration" +aws iam remove-role-from-instance-profile --instance-profile-name ecs-heron-instance-profile --role-name ecs-heron-role +echo "done iam remove-role-from-instance-profile" +aws iam delete-instance-profile --instance-profile-name ecs-heron-instance-profile +echo "done iam iam delete-instance-profile" +aws iam delete-role-policy --role-name ecs-heron-role --policy-name ecs-heron-policy +echo "done iam delete-role-policy " +aws iam delete-role --role-name ecs-heron-role +echo "done iam delete-role " +aws ec2 delete-key-pair --key-name ecs-heron-keypair +rm -f ecs-heron-key*.pem + +GROUP_ID=$(aws ec2 describe-security-groups --query 'SecurityGroups[?GroupName==`ecs-heron-securitygroup`].GroupId' --output text) +aws ec2 delete-security-group --group-id "$GROUP_ID" +echo "done delete security-groups " + +aws ecs delete-cluster --cluster ecs-heron-cluster +echo "done delete cluster " diff --git a/heron/config/src/yaml/conf/ecs/ecs-heron-policy.json b/heron/config/src/yaml/conf/ecs/ecs-heron-policy.json new file mode 100644 index 00000000000..27583a9828e --- /dev/null +++ b/heron/config/src/yaml/conf/ecs/ecs-heron-policy.json @@ -0,0 +1,28 @@ +{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "ecs:CreateCluster", + "ecs:DeregisterContainerInstance", + "ecs:DiscoverPollEndpoint", + "ecs:Poll", + "ecs:RegisterContainerInstance", + "ecs:Submit*", + "ecs:ListClusters", + "ecs:ListContainerInstances", + "ecs:DescribeContainerInstances", + "ecs:ListServices", + "ecs:DescribeTasks", + "ecs:DescribeServices", + "ec2:DescribeInstances", + "ec2:DescribeTags", + "autoscaling:DescribeAutoScalingInstances" + ], + "Resource": [ + "*" + ] + } + ] +} diff --git a/heron/config/src/yaml/conf/ecs/ecs-heron-role.json b/heron/config/src/yaml/conf/ecs/ecs-heron-role.json new file mode 100644 index 00000000000..f025fed4763 --- /dev/null +++ b/heron/config/src/yaml/conf/ecs/ecs-heron-role.json @@ -0,0 +1,8 @@ +{ + "Version": "2012-10-17", + "Statement": { + "Effect": "Allow", + "Principal": {"Service": "ec2.amazonaws.com"}, + "Action": "sts:AssumeRole" + } +} diff --git a/heron/config/src/yaml/conf/ecs/ecs_compose_template.yaml b/heron/config/src/yaml/conf/ecs/ecs_compose_template.yaml new file mode 100755 index 00000000000..305c9f058cb --- /dev/null +++ b/heron/config/src/yaml/conf/ecs/ecs_compose_template.yaml @@ -0,0 +1,17 @@ +version: '2' +services: + CONTAINER_NUMBER: + image: ananthgs/onlyheronandubuntu + #command: ["sh", "-c", "mkdir /s3; cd /s3 ;aws s3 cp s3://herondockercal/TOPOLOGY_NAME/topology.tar.gz /s3 ;aws s3 cp s3://herondockercal/heron-core-testbuild-ubuntu14.04.tar.gz /s3 ;cd /s3; tar -zxvf topology.tar.gz; tar -zxvf heron-core-testbuild-ubuntu14.04.tar.gz; HERON_EXECUTOR ;"] + command: ["sh", "-c", "mkdir /s3; cd /s3 ;aws s3 cp s3://herondockercal/TOPOLOGY_NAME/topology.tar.gz /s3 ;aws s3 cp s3://herondockercal/heron-core.tar.gz /s3 ;cd /s3; tar -zxvf topology.tar.gz; tar -zxvf heron-core.tar.gz; HERON_EXECUTOR ;"] + networks: + - heron + ports:FREEPORTS + volumes: + - "herondata:/root/.herondata" +networks: + heron: + driver: bridge +volumes: + herondata: + driver: local diff --git a/heron/config/src/yaml/conf/ecs/heron_internals.yaml b/heron/config/src/yaml/conf/ecs/heron_internals.yaml new file mode 100644 index 00000000000..caf8a21ad69 --- /dev/null +++ b/heron/config/src/yaml/conf/ecs/heron_internals.yaml @@ -0,0 +1,272 @@ +################################################################################ +# Default values for various configs used inside Heron. +################################################################################ +# All the config associated with time is in the unit of milli-seconds, +# unless otherwise specified. +################################################################################ +# All the config associated with data size is in the unit of bytes, unless +# otherwise specified. +################################################################################ + +################################################################################ +# System level configs +################################################################################ + +### heron.* configs are general configurations over all componenets + +# The relative path to the logging directory +heron.logging.directory: "log-files" + +# The maximum log file size in MB +heron.logging.maximum.size.mb: 100 + +# The maximum number of log files +heron.logging.maximum.files: 5 + +# The interval in seconds after which to check if the tmaster location has been fetched or not +heron.check.tmaster.location.interval.sec: 120 + +# The interval in seconds to prune logging files in C++ +heron.logging.prune.interval.sec: 300 + +# The interval in seconds to flush log files in C++ +heron.logging.flush.interval.sec: 10 + +# The threshold level to log error +heron.logging.err.threshold: 3 + +# The interval in seconds for different components to export metrics to metrics manager +heron.metrics.export.interval.sec: 60 + +# The maximum count of exceptions in one MetricPublisherPublishMessage protobuf +heron.metrics.max.exceptions.per.message.count: 1024 + +################################################################################ +# Configs related to Stream Manager, starts with heron.streammgr.* +################################################################################ + +# Maximum size in bytes of a packet to be send out from stream manager +heron.streammgr.packet.maximum.size.bytes: 102400 + +# The tuple cache (used for batching) can be drained in two ways: +# (a) Time based +# (b) size based + +# The frequency in ms to drain the tuple cache in stream manager +heron.streammgr.cache.drain.frequency.ms: 10 + +# The sized based threshold in MB for draining the tuple cache +heron.streammgr.cache.drain.size.mb: 100 + +# For efficient acknowledgements +heron.streammgr.xormgr.rotatingmap.nbuckets: 3 + +# The reconnect interval to other stream managers in secs for stream manager client +heron.streammgr.client.reconnect.interval.sec: 1 + +# The reconnect interval to tamster in second for stream manager client +heron.streammgr.client.reconnect.tmaster.interval.sec: 10 + +# The maximum packet size in MB of stream manager's network options +heron.streammgr.network.options.maximum.packet.mb: 100 + +# The interval in seconds to send heartbeat +heron.streammgr.tmaster.heartbeat.interval.sec: 10 + +# Maximum batch size in MB to read by stream manager from socket +heron.streammgr.connection.read.batch.size.mb: 1 + +# Maximum batch size in MB to write by stream manager to socket +heron.streammgr.connection.write.batch.size.mb: 1 + +# Number of times we should wait to see a buffer full while enqueueing data +# before declaring start of back pressure +heron.streammgr.network.backpressure.threshold: 3 + +# High water mark on the num in MB that can be left outstanding on a connection +heron.streammgr.network.backpressure.highwatermark.mb: 100 + +# Low water mark on the num in MB that can be left outstanding on a connection +heron.streammgr.network.backpressure.lowwatermark.mb: 50 + +################################################################################ +# Configs related to Topology Master, starts with heron.tmaster.* +################################################################################ + +# The maximum interval in minutes of metrics to be kept in tmaster +heron.tmaster.metrics.collector.maximum.interval.min: 180 + +# The maximum time to retry to establish the tmaster +heron.tmaster.establish.retry.times: 30 + +# The interval to retry to establish the tmaster +heron.tmaster.establish.retry.interval.sec: 1 + +# Maximum packet size in MB of tmaster's network options to connect to stream managers +heron.tmaster.network.master.options.maximum.packet.mb: 16 + +# Maximum packet size in MB of tmaster's network options to connect to scheduler +heron.tmaster.network.controller.options.maximum.packet.mb: 1 + +# Maximum packet size in MB of tmaster's network options for stat queries +heron.tmaster.network.stats.options.maximum.packet.mb: 1 + +# The interval for tmaster to purge metrics from socket +heron.tmaster.metrics.collector.purge.interval.sec: 60 + +# The maximum # of exceptions to be stored in tmetrics collector, to prevent potential OOM +heron.tmaster.metrics.collector.maximum.exception: 256 + +# Should the metrics reporter bind on all interfaces +heron.tmaster.metrics.network.bindallinterfaces: False + +# The timeout in seconds for stream mgr, compared with (current time - last heartbeat time) +heron.tmaster.stmgr.state.timeout.sec: 60 + +################################################################################ +# Configs related to Topology Master, starts with heron.metricsmgr.* +################################################################################ + +# The size of packets to read from socket will be determined by the minimal of: +# (a) time based +# (b) size based + +# Time based, the maximum batch time in ms for metricsmgr to read from socket +heron.metricsmgr.network.read.batch.time.ms: 16 + +# Size based, the maximum batch size in bytes to read from socket +heron.metricsmgr.network.read.batch.size.bytes: 32768 + +# The size of packets to write to socket will be determined by the minimum of +# (a) time based +# (b) size based + +# Time based, the maximum batch time in ms for metricsmgr to write to socket +heron.metricsmgr.network.write.batch.time.ms: 16 + +# Size based, the maximum batch size in bytes to write to socket +heron.metricsmgr.network.write.batch.size.bytes: 32768 + +# The maximum socket's send buffer size in bytes +heron.metricsmgr.network.options.socket.send.buffer.size.bytes: 6553600 + +# The maximum socket's received buffer size in bytes of metricsmgr's network options +heron.metricsmgr.network.options.socket.received.buffer.size.bytes: 8738000 + +################################################################################ +# Configs related to Heron Instance, starts with heron.instance.* +################################################################################ + +# The queue capacity (num of items) in bolt for buffer packets to read from stream manager +heron.instance.internal.bolt.read.queue.capacity: 128 + +# The queue capacity (num of items) in bolt for buffer packets to write to stream manager +heron.instance.internal.bolt.write.queue.capacity: 128 + +# The queue capacity (num of items) in spout for buffer packets to read from stream manager +heron.instance.internal.spout.read.queue.capacity: 1024 + +# The queue capacity (num of items) in spout for buffer packets to write to stream manager +heron.instance.internal.spout.write.queue.capacity: 128 + +# The queue capacity (num of items) for metrics packets to write to metrics manager +heron.instance.internal.metrics.write.queue.capacity: 128 + +# The size of packets read from stream manager will be determined by the minimal of +# (a) time based +# (b) size based + +# Time based, the maximum batch time in ms for instance to read from stream manager per attempt +heron.instance.network.read.batch.time.ms: 16 + +# Size based, the maximum batch size in bytes to read from stream manager +heron.instance.network.read.batch.size.bytes: 32768 + +# The size of packets written to stream manager will be determined by the minimum of +# (a) time based +# (b) size based + +# Time based, the maximum batch time in ms for instance to write to stream manager per attempt +heron.instance.network.write.batch.time.ms: 16 + +# Size based, the maximum batch size in bytes to write to stream manager +heron.instance.network.write.batch.size.bytes: 32768 + +# The maximum socket's send buffer size in bytes +heron.instance.network.options.socket.send.buffer.size.bytes: 6553600 + +# The maximum socket's received buffer size in bytes of instance's network options +heron.instance.network.options.socket.received.buffer.size.bytes: 8738000 + +# The maximum # of data tuple to batch in a HeronDataTupleSet protobuf +heron.instance.set.data.tuple.capacity: 1024 + +# The maximum size in bytes of data tuple to batch in a HeronDataTupleSet protobuf +heron.instance.set.data.tuple.size.bytes: 8388608 + +# The maximum # of control tuple to batch in a HeronControlTupleSet protobuf +heron.instance.set.control.tuple.capacity: 1024 + +# The maximum time in ms for a spout to do acknowledgement per attempt, the ack batch could +# also break if there are no more ack tuples to process +heron.instance.ack.batch.time.ms: 128 + +# The maximum time in ms for an spout instance to emit tuples per attempt +heron.instance.emit.batch.time.ms: 16 + +# The maximum batch size in bytes for an spout to emit tuples per attempt +heron.instance.emit.batch.size.bytes: 32768 + +# The maximum time in ms for an bolt instance to execute tuples per attempt +heron.instance.execute.batch.time.ms: 16 + +# The maximum batch size in bytes for an bolt instance to execute tuples per attempt +heron.instance.execute.batch.size.bytes: 32768 + +# The time interval for an instance to check the state change, +# for example, the interval a spout uses to check whether activate/deactivate is invoked +heron.instance.state.check.interval.sec: 5 + +# The time to wait before the instance exits forcibly when uncaught exception happens +heron.instance.force.exit.timeout.ms: 2000 + +# Interval in seconds to reconnect to the stream manager, including the request timeout in connecting +heron.instance.reconnect.streammgr.interval.sec: 5 +heron.instance.reconnect.streammgr.times: 60 + +# Interval in seconds to reconnect to the metrics manager, including the request timeout in connecting +heron.instance.reconnect.metricsmgr.interval.sec: 5 +heron.instance.reconnect.metricsmgr.times: 60 + +# The interval in second for an instance to sample its system metrics, for instance, cpu load. +heron.instance.metrics.system.sample.interval.sec: 10 + +heron.instance.slave.fetch.pplan.interval.sec: 1 + +# For efficient acknowledgement +heron.instance.acknowledgement.nbuckets: 10 + +################################################################################ +# For dynamically tuning the available sizes in the interval read & write queues +# to provide high performance while avoiding GC issues +################################################################################ + +# The expected size on read queue in bolt +heron.instance.tuning.expected.bolt.read.queue.size: 8 + +# The expected size on write queue in bolt +heron.instance.tuning.expected.bolt.write.queue.size: 8 + +# The expected size on read queue in spout +heron.instance.tuning.expected.spout.read.queue.size: 512 + +# The exepected size on write queue in spout +heron.instance.tuning.expected.spout.write.queue.size: 8 + +# The expected size on metrics write queue +heron.instance.tuning.expected.metrics.write.queue.size: 8 + +heron.instance.tuning.current.sample.weight: 0.8 + +# Interval in ms to tune the size of in & out data queue in instance +heron.instance.tuning.interval.ms: 100 diff --git a/heron/config/src/yaml/conf/ecs/metrics_sinks.yaml b/heron/config/src/yaml/conf/ecs/metrics_sinks.yaml new file mode 100644 index 00000000000..e58150482ea --- /dev/null +++ b/heron/config/src/yaml/conf/ecs/metrics_sinks.yaml @@ -0,0 +1,81 @@ +########### These all have default values as shown + +# We would specify the unique sink-id first +sinks: + - file-sink + - tmaster-sink + +########### Now we would specify the detailed configuration for every unique sink +########### Syntax: sink-id: - option(s) + +########### option class is required as we need to instantiate a new instance by reflection +########### option flush-frequency-ms is required to invoke flush() at interval +########### option sink-restart-attempts, representsing # of times to restart a sink when it throws exceptions and dies. +########### If this option is missed, default value 0 would be supplied; negative value represents to restart it forever. + +########### Other options would be constructed as an immutable map passed to IMetricsSink's init(Map conf) as argument, +########### We would be able to fetch value by conf.get(options), for instance: +########### We could get "com.twitter.heron.metricsmgr.sink.FileSink" if conf.get("class") is called inside file-sink's instance + +### Config for file-sink +file-sink: + class: "com.twitter.heron.metricsmgr.sink.FileSink" + flush-frequency-ms: 60000 # 1 min + sink-restart-attempts: -1 # Forever + filename-output: "metrics.json" # File for metrics to write to + file-maximum: 5 # maximum number of file saved in disk + +### Config for tmaster-sink +tmaster-sink: + class: "com.twitter.heron.metricsmgr.sink.tmaster.TMasterSink" + flush-frequency-ms: 60000 + sink-restart-attempts: -1 # Forever + tmaster-location-check-interval-sec: 5 + tmaster-client: + reconnect-interval-second: 5 # The re-connect interval to TMaster from TMasterClient + # The size of packets written to TMaster will be determined by the minimal of: (a) time based (b) size based + network-write-batch-size-bytes: 32768 # Size based, the maximum batch size in bytes to write to TMaster + network-write-batch-time-ms: 16 # Time based, the maximum batch time in ms for Metrics Manager to write to TMaster per attempt + network-read-batch-size-bytes: 32768 # Size based, the maximum batch size in bytes to write to TMaster + network-read-batch-time-ms: 16 # Time based, the maximum batch time in ms for Metrics Manager to write to TMaster per attempt + socket-send-buffer-size-bytes: 6553600 # The maximum socket's send buffer size in bytes + socket-received-buffer-size-bytes: 8738000 # The maximum socket's received buffer size in bytes + tmaster-metrics-type: + "__emit-count": SUM + "__execute-count": SUM + "__fail-count": SUM + "__ack-count": SUM + "__complete-latency": AVG + "__execute-latency": AVG + "__process-latency": AVG + "__jvm-uptime-secs": LAST + "__jvm-process-cpu-load": LAST + "__jvm-memory-used-mb": LAST + "__jvm-memory-mb-total": LAST + "__jvm-gc-collection-time-ms": LAST + "__server/__time_spent_back_pressure_initiated": SUM + "__time_spent_back_pressure_by_compid": SUM + +### Config for scribe-sink +# scribe-sink: +# class: "com.twitter.heron.metricsmgr.sink.ScribeSink" +# flush-frequency-ms: 60000 +# sink-restart-attempts: -1 # Forever +# scribe-host: "127.0.0.1" # The host of scribe to be exported metrics to +# scribe-port: 1463 # The port of scribe to be exported metrics to +# scribe-category: "scribe-category" # The category of the scribe to be exported metrics to +# service-namespace: "heron" # The service name of the metrics in scribe-category +# scribe-timeout-ms: 200 # The timeout in seconds for metrics manager to write metrics to scribe +# scribe-connect-server-attempts: 2 # The maximum retry attempts to connect to scribe server +# scribe-retry-attempts: 5 # The maximum retry attempts to write metrics to scribe +# scribe-retry-interval-ms: 100 # The interval to retry to write metrics to scribe + +### Config for graphite-sink +### Currently the graphite-sink is disabled +# graphite-sink: +# class: "com.twitter.heron.metricsmgr.sink.GraphiteSink" +# flush-frequency-ms: 60000 +# graphite_host: "127.0.0.1" # The host of graphite to be exported metrics to +# graphite_port: 2004 # The port of graphite to be exported metrics to +# metrics_prefix: "heron" # The prefix of every metrics +# server_max_reconnect-attempts: 20 # The max reconnect attempts when failing to connect to graphite server diff --git a/heron/config/src/yaml/conf/ecs/packing.yaml b/heron/config/src/yaml/conf/ecs/packing.yaml new file mode 100644 index 00000000000..f3021ca03ef --- /dev/null +++ b/heron/config/src/yaml/conf/ecs/packing.yaml @@ -0,0 +1,2 @@ +# packing algorithm for packing instances into containers +heron.class.packing.algorithm: com.twitter.heron.packing.roundrobin.RoundRobinPacking diff --git a/heron/config/src/yaml/conf/ecs/scheduler.yaml b/heron/config/src/yaml/conf/ecs/scheduler.yaml new file mode 100644 index 00000000000..d3d8a73007f --- /dev/null +++ b/heron/config/src/yaml/conf/ecs/scheduler.yaml @@ -0,0 +1,18 @@ +# scheduler class for distributing the topology for execution +heron.class.scheduler: com.twitter.heron.scheduler.ecs.EcsScheduler + +# launcher class for submitting and launching the topology +heron.class.launcher: com.twitter.heron.scheduler.ecs.EcsLauncher + +# location of java - pick it up from shell environment +#heron.directory.sandbox.java.home: /usr/lib/jvm/java-8-oracle +#heron.directory.sandbox.java.home: ${JAVA_HOME} + +heron.ecs.topology.binary.file: heron-examples.jar + +heron.scheduler.ecs.working.directory: ${HOME}/.herondata/topologies/${CLUSTER}/${ROLE}/${TOPOLOGY} + +# location of java - pick it up from shell environment +heron.ecs.compose.template.file: ${HOME}/.heron/conf/ecs/ecs_compose_template.yaml + +heron.ecs.ami.instance: http://169.254.169.254/latest/meta-data \ No newline at end of file diff --git a/heron/config/src/yaml/conf/ecs/set-ecs-cluster-name.sh b/heron/config/src/yaml/conf/ecs/set-ecs-cluster-name.sh new file mode 100644 index 00000000000..fafe4a335d3 --- /dev/null +++ b/heron/config/src/yaml/conf/ecs/set-ecs-cluster-name.sh @@ -0,0 +1,2 @@ +#!/bin/bash +echo ECS_CLUSTER=ecs-heron-cluster >> /etc/ecs-heron/ecs.config diff --git a/heron/config/src/yaml/conf/ecs/setupEcs.sh b/heron/config/src/yaml/conf/ecs/setupEcs.sh new file mode 100755 index 00000000000..16b5f67203d --- /dev/null +++ b/heron/config/src/yaml/conf/ecs/setupEcs.sh @@ -0,0 +1,92 @@ +#!/bin/bash + +##### PLEASE NOTE USE THIS AS A REFERENCE. YOU MAY NEED TO CHANGE REGION/CLUSTER/AMI/KEYPAIR VALUES + + +REGIONS="" + +SCOPE_AAS_PROBE_TOKEN="$1" + +if [ -z "$(which aws)" ]; then + echo "error: Cannot find AWS-CLI, please make sure it's installed" + exit 1 +fi + +if [ -z "$(which ecs-cli)" ]; then + echo "error: Cannot find AWS-CLI, please make sure it's installed" + exit 1 +fi + +REGION=$(aws configure list 2> /dev/null | grep region | awk '{ print $2 }') +if [ -z "$REGION" ]; then + echo "error: Region not set, please make sure to run 'aws configure'" + exit 1 +fi + +AMI=$(aws --region $REGION ec2 describe-images --filters Name=root-device-type,Values=ebs Name=architecture,Values=x86_64 Name=virtualization-type,Values=hvm Name=name,Values=*ubuntu-xenial-16.04-amd64-server-20161214 --query 'Images[*].{ID:ImageId}' --output text) + +# Check that setup wasn't already run +CLUSTER_STATUS=$(aws ecs describe-clusters --clusters ecs-heron-cluster --query 'clusters[0].status' --output text) +if [ "$CLUSTER_STATUS" != "None" -a "$CLUSTER_STATUS" != "INACTIVE" ]; then + echo "error: ECS cluster ecs-heron-cluster is active, cleanup first" + exit 1 +fi + +set -euo pipefail + +# Cluster +echo -n "Creating ECS cluster (ecs-heron-cluster) .. " +aws ecs create-cluster --cluster-name ecs-heron-cluster > /dev/null +echo "done" + +# Security group +echo -n "Creating Security Group (ecs-heron-securitygroup) .. " +SECURITY_GROUP_ID=$(aws ec2 create-security-group --group-name ecs-heron-securitygroup --description 'ECS Heron' --query 'GroupId' --output text) +# Wait for the group to get associated with the VPC +sleep 5 +#opening all ports as port assignment will be random +aws ec2 authorize-security-group-ingress --group-id $SECURITY_GROUP_ID --protocol tcp --port 0-65535 --cidr 0.0.0.0/0 + +# Key pair +echo -n "Creating Key Pair (ecs-heron-keypair, file ecs-heron-keypair.pem) .. " +aws ec2 create-key-pair --key-name ecs-heron-keypair --query 'KeyMaterial' --output text > ecs-heron-keypair.pem +chmod 600 ecs-heron-keypair.pem +echo "done" + +# IAM role +echo -n "Creating IAM role (ecs-heron-role) .. " +aws iam create-role --role-name ecs-heron-role --assume-role-policy-document file://ecs-heron-role.json > /dev/null +aws iam put-role-policy --role-name ecs-heron-role --policy-name ecs-heron-policy --policy-document file://ecs-heron-policy.json +aws iam create-instance-profile --instance-profile-name ecs-heron-instance-profile > /dev/null +# Wait for the instance profile to be ready, otherwise we get an error when trying to use it +while ! aws iam get-instance-profile --instance-profile-name ecs-heron-instance-profile 2>&1 > /dev/null; do + sleep 2 +done +aws iam add-role-to-instance-profile --instance-profile-name ecs-heron-instance-profile --role-name ecs-heron-role +echo "done" + + +# Launch configuration +echo -n "Creating Launch Configuration (ecs-heron-launch-configuration) .. " +# Wait for the role to be ready, otherwise we get: +# A client error (ValidationError) occurred when calling the CreateLaunchConfiguration operation: You are not authorized to perform this operation. + +sleep 15 + +TMP_USER_DATA_FILE=$(mktemp /tmp/ecs-heron-user-data-XXXX) +trap 'rm $TMP_USER_DATA_FILE' EXIT +cp set-ecs-cluster-name.sh $TMP_USER_DATA_FILE +if [ -n "$SCOPE_AAS_PROBE_TOKEN" ]; then + echo "echo SERVICE_TOKEN=$SCOPE_AAS_PROBE_TOKEN >> /etc/ecs-heron/scope.config" >> $TMP_USER_DATA_FILE +fi + +aws autoscaling create-launch-configuration --image-id $AMI --launch-configuration-name ecs-heron-launch-configuration --key-name ecs-heron-keypair --security-groups $SECURITY_GROUP_ID --instance-type t2.micro --user-data file://$TMP_USER_DATA_FILE --iam-instance-profile ecs-heron-instance-profile --associate-public-ip-address --instance-monitoring Enabled=false +echo "done" + +#set up the ecs cli config +ecs-cli configure --force --region $REGION --access-key ecs-heron-keypair --cluster ecs-heron-cluster + +#create a CloudFormation template +ecs-cli up --force --keypair ecs-heron-keypair --capability-iam --size 2 --instance-type m4.large + +echo "Setup is ready! Please submit the ECS HERON Topology" diff --git a/heron/config/src/yaml/conf/ecs/statemgr.yaml b/heron/config/src/yaml/conf/ecs/statemgr.yaml new file mode 100644 index 00000000000..9cf0649858b --- /dev/null +++ b/heron/config/src/yaml/conf/ecs/statemgr.yaml @@ -0,0 +1,42 @@ +# local state manager class for managing state in a persistent fashion +heron.class.state.manager: com.twitter.heron.statemgr.zookeeper.curator.CuratorStateManager + +#Un-comment the line below and add ZK IP address +#heron.statemgr.connection.string: "127.0.0.1:2181" +heron.statemgr.connection.string: "54.215.208.158:2181" + + +# path of the root address to store the state in a local file system +heron.statemgr.root.path: "/heron" + +#################################################################### +# Following are config for Zk State Manager +#################################################################### +heron.statemgr.zookeeper.is.initialize.tree: True + +heron.statemgr.zookeeper.session.timeout.ms: 30000 + +heron.statemgr.zookeeper.connection.timeout.ms: 30000 + +heron.statemgr.zookeeper.retry.count: 10 + +heron.statemgr.zookeeper.retry.interval.ms: 10000 + +#################################################################### +# Following are config for tunneling +#################################################################### +# Whether we need tunnel if no direct access on zk server +heron.statemgr.is.tunnel.needed: False + +# The connection timeout in ms when trying to connect to zk server +heron.statemgr.tunnel.connection.timeout.ms: 1000 + +# The count of retry to check whether has direct access on zk server +heron.statemgr.tunnel.connection.retry.count: 2 + +# The interval in ms between two retry checking whether has direct access on zk server +heron.statemgr.tunnel.retry.interval.ms: 1000 + +# The count of retry to verify whether seting up a tunnel process +heron.statemgr.tunnel.verify.count: 10 + diff --git a/heron/config/src/yaml/conf/ecs/uploader.yaml b/heron/config/src/yaml/conf/ecs/uploader.yaml new file mode 100644 index 00000000000..51710786e38 --- /dev/null +++ b/heron/config/src/yaml/conf/ecs/uploader.yaml @@ -0,0 +1,32 @@ +# uploader class for transferring the topology jar/tar files to storage +heron.class.uploader: com.twitter.heron.uploader.s3.S3Uploader + +# S3 bucket to put the jar file into +#heron.uploader.s3.bucket: bucketname +heron.uploader.s3.bucket: herondockercal + +# By default the path prefix will be empty and the full path would be s3://#{bucket}/#{topology_name}/topology.tar.gz +# This allows you to prepend a prefix to the path to specify a sub-folder in which cased the full path would be: +# s3://#{bucket}/#{prefix}/#{topology_name}/topology.tar.gz +# heron.uploader.s3.path_prefix: path/prefix + +# By default, assume AWS S3. However, you can specify a custom url if you are using a +# S3 compatible storage layer (or using a reverse proxy for accessing S3). +# heron.uploader.s3.uri: hostname:port +# If you want to access S3 through a http proxy, use +# heron.upload.s3.proxy_uri: http://username:password@hostname:port +# username and password are optional + +# Specifies a custom region - see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/regions/Regions.html#US_EAST_1 +#heron.uploader.s3.region : meh +heron.uploader.s3.region : us-west-1 + +# AWS Credentials +# By default the S3 Uploader will use the Default Credential Provider Chain for accessing the S3 bucket - +# see http://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html#using-the-default-credential-provider-chain +# However you have the option to directly specify aws credentials: +#heron.uploader.s3.access_key: xxxxxx +#heron.uploader.s3.secret_key: xxxxxx +# Alternatively to directly specifying aws credentials, you can specify the aws profile +# in case you have multiple AWS profiles in your credentials file (~/.aws/credentials): +# heron.uploader.s3.aws_profile: profile_name diff --git a/heron/config/src/yaml/conf/localzk/README b/heron/config/src/yaml/conf/localzk/README index f2d19eb6017..f6308aa8159 100644 --- a/heron/config/src/yaml/conf/localzk/README +++ b/heron/config/src/yaml/conf/localzk/README @@ -1,11 +1,6 @@ -This folder contains sample configs needed for using zookeeper in LocalScheduler. -In order to run LocalScheduler, you need to set up a running zookeeper server basing on the config inside statemgr.yaml: -1. Set up the appropriate connection string. -2. Create following required nodes in zookeeper (one time effort): - /{heron.statemgr.root.path}/tmasters - /{heron.statemgr.root.path}/topologies - /{heron.statemgr.root.path}/pplans - /{heron.statemgr.root.path}/executionstate - /{heron.statemgr.root.path}/schedulers +This folder contains sample configs needed for using running heron on AWS Cluster +Please follow the steps at this google doc for detailed set up and workflow: -Then you can run LocalScheduler with zookeeper state manager. +https://docs.google.com/document/d/1ecbCuA46cIKPfY0SP0F1dcRlei4DIPz3pZ6ZSZ5zZgc/edit + +Then you can run Heorn on AWS !!! diff --git a/heron/executor/src/python/heron_executor.py b/heron/executor/src/python/heron_executor.py index b1fb0a62409..a63a9d46421 100755 --- a/heron/executor/src/python/heron_executor.py +++ b/heron/executor/src/python/heron_executor.py @@ -30,8 +30,8 @@ import time import yaml import socket +import urllib2 import traceback - from functools import partial @@ -55,7 +55,8 @@ def print_usage(): " " " " " " - " ") + " " + " ") def id_map(prefix, container_plans, add_zero_id=False): ids = {} @@ -106,8 +107,41 @@ def log_pid_for_process(process_name, pid): Log.info('Logging pid %d to file %s' %(pid, filename)) atomic_write_file(filename, str(pid)) -def is_docker_environment(): - return os.path.isfile('/.dockerenv') +def isEcsAmiInstance(ecs_ami): + meta = ecs_ami + '/ami-id' + req = urllib2.Request(meta) + + try: + response = urllib2.urlopen(req).read() + if 'ami' in response: + #_msg = 'I am in AWS running on {}'.format(response) + return 1 + else: + #_msg = 'I am in dev - no AWS AMI' + return 0 + except Exception: + #_msg = 'no metadata, not in AWS' + return 0 + +def getHost(ecs_ami): + if not ecs_ami: + return socket.gethostname() + else: + l_host = '' + # Needed for Docker environments since the hostname of a docker container is the container's + # id within docker, rather than the host's hostname. NOTE: this 'HOST' env variable is not + # guaranteed to be set in all Docker executor environments (outside of Marathon) + if os.path.isfile('/.dockerenv'): + # Need to set the HOST environment vaira ble if docker is for AWS ECS tasks + if isEcsAmiInstance(ecs_ami): + l_host = subprocess.Popen(["curl", ecs_ami + "/local-ipv4"] + , stdout=subprocess.PIPE).communicate()[0] + else: + l_host = os.environ.get('HOST') if 'HOST' in os.environ else socket.gethostname() + else: + l_host = socket.gethostname() + return l_host + def stdout_log_fn(cmd): """Simple function callback that is used to log the streaming output of a subprocess command @@ -117,6 +151,7 @@ def stdout_log_fn(cmd): # Log the messages to stdout and strip off the newline because Log.info adds one automatically return lambda line: Log.info("%s stdout: %s", cmd, line.rstrip('\n')) + class ProcessInfo(object): def __init__(self, process, name, command, attempts=1): """ @@ -160,13 +195,7 @@ def init_parsed_args(self, args): base64.b64decode(parsed_args.instance_jvm_opts.lstrip('"'). rstrip('"').replace('=', '=')) self.classpath = parsed_args.classpath - # Needed for Docker environments since the hostname of a docker container is the container's - # id within docker, rather than the host's hostname. NOTE: this 'HOST' env variable is not - # guaranteed to be set in all Docker executor environments (outside of Marathon) - if is_docker_environment(): - self.master_host = os.environ.get('HOST') if 'HOST' in os.environ else socket.gethostname() - else: - self.master_host = socket.gethostname() + self.master_host = getHost(parsed_args.ecsAmiUrl) self.master_port = parsed_args.master_port self.tmaster_controller_port = parsed_args.tmaster_controller_port self.tmaster_stats_port = parsed_args.tmaster_stats_port @@ -270,6 +299,7 @@ def parse_args(args): parser.add_argument("metricscachemgr_classpath") parser.add_argument("metricscachemgr_masterport") parser.add_argument("metricscachemgr_statsport") + parser.add_argument("ecsAmiUrl", nargs='?', default="") parsed_args, unknown_args = parser.parse_known_args(args[1:]) diff --git a/heron/scheduler-core/src/java/com/twitter/heron/scheduler/utils/SchedulerUtils.java b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/utils/SchedulerUtils.java index 3d103aedad9..61ad97f12a1 100644 --- a/heron/scheduler-core/src/java/com/twitter/heron/scheduler/utils/SchedulerUtils.java +++ b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/utils/SchedulerUtils.java @@ -62,7 +62,7 @@ public static String[] schedulerCommand( List commands = new ArrayList<>(); // The java executable should be "{JAVA_HOME}/bin/java" - String javaExecutable = String.format("%s/%s", Context.clusterJavaHome(config), "bin/java"); + String javaExecutable = String.format("%s/%s", Context.javaHome(config), "bin/java"); commands.add(javaExecutable); commands.add("-cp"); @@ -88,7 +88,7 @@ public static String[] schedulerCommand( * @param freePorts list of free ports * @return String[] representing the arguments to start heron-scheduler */ - public static String[] schedulerCommandArgs( + public static String[] schedulerCommandArgs( Config config, Config runtime, List freePorts) { // First let us have some safe checks if (freePorts.size() < PORTS_REQUIRED_FOR_SCHEDULER) { diff --git a/heron/schedulers/src/java/BUILD b/heron/schedulers/src/java/BUILD index bf8f9a7632d..a33e400adf4 100644 --- a/heron/schedulers/src/java/BUILD +++ b/heron/schedulers/src/java/BUILD @@ -6,6 +6,9 @@ common_deps_files = [ "//heron/common/src/java:basics-java", "@commons_io_commons_io//jar", "@com_google_guava_guava//jar", + "@com_fasterxml_jackson_core_jackson_annotations//jar", + "@com_fasterxml_jackson_core_jackson_core//jar", + "@com_fasterxml_jackson_core_jackson_databind//jar", ] spi_deps_files = [ @@ -65,6 +68,25 @@ genrule( cmd = "cp $< $@", ) +java_library( + name='ecs-scheduler-java', + srcs = glob(["**/ecs/*.java"]), + deps = scheduler_deps_files, +) + +java_binary( + name='ecs-scheduler-unshaded', + srcs = glob(["**/ecs/*.java"]), + deps = scheduler_deps_files, +) + +genrule( + name = "heron-ecs-scheduler", + srcs = [":ecs-scheduler-unshaded_deploy.jar"], + outs = ["heron-ecs-scheduler.jar"], + cmd = "cp $< $@", +) + java_library( name='aurora-scheduler-java', srcs = glob(["**/aurora/*.java"]), diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/ecs/EcsContext.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/ecs/EcsContext.java new file mode 100644 index 00000000000..722ced29a7c --- /dev/null +++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/ecs/EcsContext.java @@ -0,0 +1,76 @@ +// Copyright 2016 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.twitter.heron.scheduler.ecs; + +import com.twitter.heron.spi.common.Config; +import com.twitter.heron.spi.common.Context; +import com.twitter.heron.spi.common.TokenSub; + +public class EcsContext extends Context { + + public static String ecsClusterBinary(Config config) { + String ecsClusterBinary = config.getStringValue( + EcsKey.ECS_CLUSTER_BINARY.value(), EcsKey.ECS_CLUSTER_BINARY.getDefaultString()); + return TokenSub.substitute(config, ecsClusterBinary); + } + public static String workingDirectory(Config config) { + String workingDirectory = config.getStringValue( + EcsKey.WORKING_DIRECTORY.value(), EcsKey.WORKING_DIRECTORY.getDefaultString()); + return TokenSub.substitute(config, workingDirectory); + } + + public static String ecsComposeTemplate(Config config) { + String composeTemplate = config.getStringValue( + EcsKey.ECS_COMPOSE_TEMPLATE.value(), EcsKey.ECS_COMPOSE_TEMPLATE.getDefaultString()); + return TokenSub.substitute(config, composeTemplate); + } + + public static String AmiInstanceUrl(Config config) { + String amiInstanceUrl = config.getStringValue( + EcsKey.ECS_AMI_INSTANCE.value(), EcsKey.ECS_AMI_INSTANCE.getDefaultString()); + return TokenSub.substitute(config, amiInstanceUrl); + } + public static String composeupCmd(Config config) { + String amiInstanceUrl = config.getStringValue( + EcsKey.ECS_COMPOSE_UPCMD.value(), EcsKey.ECS_COMPOSE_UPCMD.getDefaultString()); + return TokenSub.substitute(config, amiInstanceUrl); + } + public static String composeStopCmd(Config config) { + String amiInstanceUrl = config.getStringValue( + EcsKey.ECS_COMPOSE_STOP.value(), EcsKey.ECS_COMPOSE_STOP.getDefaultString()); + return TokenSub.substitute(config, amiInstanceUrl); + } + public static String composeListCmd(Config config) { + String amiInstanceUrl = config.getStringValue( + EcsKey.ECS_COMPOSE_LIST.value(), EcsKey.ECS_COMPOSE_LIST.getDefaultString()); + return TokenSub.substitute(config, amiInstanceUrl); + } + public static String composeListby(Config config) { + String amiInstanceUrl = config.getStringValue( + EcsKey.ECS_LIST_BY.value(), EcsKey.ECS_LIST_BY.getDefaultString()); + return TokenSub.substitute(config, amiInstanceUrl); + } + public static String composeFamilyName(Config config) { + String amiInstanceUrl = config.getStringValue( + EcsKey.ECS_GET_FAMILY.value(), EcsKey.ECS_GET_FAMILY.getDefaultString()); + return TokenSub.substitute(config, amiInstanceUrl); + } + public static String composeTaskTag(Config config) { + String amiInstanceUrl = config.getStringValue( + EcsKey.ECS_TASK_TAG.value(), EcsKey.ECS_TASK_TAG.getDefaultString()); + return TokenSub.substitute(config, amiInstanceUrl); + } +} + diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/ecs/EcsKey.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/ecs/EcsKey.java new file mode 100644 index 00000000000..490680c6c8d --- /dev/null +++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/ecs/EcsKey.java @@ -0,0 +1,63 @@ +// Copyright 2016 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.twitter.heron.scheduler.ecs; + +import com.twitter.heron.spi.common.Key; + +public enum EcsKey { + // config key for specifying the destination topology binary file + ECS_CLUSTER_BINARY("heron.ecs.topology.binary.file", "heron-examples.jar"), + ECS_COMPOSE_TEMPLATE("heron.ecs.compose.template.file", + "${HOME}/.heron/conf/ecs/ecs_compose_template.yaml"), + ECS_AMI_INSTANCE("heron.ecs.ami.instance", "http://169.254.169.254/latest/meta-data/local-ipv4"), + ECS_COMPOSE_UPCMD("heron.ecs.compose.up", "ecs-cli compose --project-name "), + ECS_COMPOSE_STOP("heron.ecs.compose.up", "aws ecs stop-task --cluster ecs-heron-cluster --task "), + //ECS_COMPOSE_LIST("heron.ecs.compose.up", "ecs-cli ps") + ECS_COMPOSE_LIST("heron.ecs.compose.up", "aws ecs list-tasks --family "), + ECS_LIST_BY("heron.ecs.list.by", "families"), + ECS_TASK_TAG("heron.ecs.task.tag", "taskArns"), + ECS_GET_FAMILY("heron.ecs.family.name", + "aws ecs list-task-definition-families --family-prefix ecscompose-"), + WORKING_DIRECTORY("heron.scheduler.ecs.working.directory", + "${HOME}/.herondata/topologies/${CLUSTER}/${ROLE}/${TOPOLOGY}"); + + + private final String value; + private final Key.Type type; + private final Object defaultValue; + + EcsKey(String value, String defaultValue) { + this.value = value; + this.type = Key.Type.STRING; + this.defaultValue = defaultValue; + } + + public String value() { + return value; + } + + public Object getDefault() { + return defaultValue; + } + + public String getDefaultString() { + if (type != Key.Type.STRING) { + throw new IllegalAccessError(String.format( + "Config Key %s is type %s, getDefaultString() not supported", this.name(), this.type)); + } + return (String) this.defaultValue; + } + +} diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/ecs/EcsLauncher.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/ecs/EcsLauncher.java new file mode 100644 index 00000000000..daf15d8247a --- /dev/null +++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/ecs/EcsLauncher.java @@ -0,0 +1,127 @@ +// Copyright 2016 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.twitter.heron.scheduler.ecs; + + +import java.io.File; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.twitter.heron.common.basics.SysUtils; +import com.twitter.heron.scheduler.utils.LauncherUtils; +import com.twitter.heron.scheduler.utils.Runtime; +import com.twitter.heron.scheduler.utils.SchedulerUtils; +import com.twitter.heron.spi.common.Config; +import com.twitter.heron.spi.common.Context; +import com.twitter.heron.spi.packing.PackingPlan; +import com.twitter.heron.spi.scheduler.ILauncher; +import com.twitter.heron.spi.scheduler.IScheduler; +import com.twitter.heron.spi.utils.ShellUtils; + +public class EcsLauncher implements ILauncher { + protected static final Logger LOG = Logger.getLogger(EcsLauncher.class.getName()); + + private Config config; + private Config runtime; + private String topologyWorkingDirectory; + + @Override + public void initialize(Config mConfig, Config mRuntime) { + this.config = mConfig; + this.runtime = mRuntime; + this.topologyWorkingDirectory = EcsContext.workingDirectory(config); + } + + @Override + public void close() { + // Do nothing + } + + //@Override + public boolean launch_old(PackingPlan packing) { + LauncherUtils launcherUtils = LauncherUtils.getInstance(); + Config ytruntime = launcherUtils.createConfigWithPackingDetails(runtime, packing); + return launcherUtils.onScheduleAsLibrary(config, ytruntime, getScheduler(), packing); + } + + @Override + public boolean launch(PackingPlan packing) { + LOG.log(Level.FINE, "Launching topology for local cluster {0}", + EcsContext.cluster(config)); + + if (!setupWorkingDirectory()) { + LOG.severe("Failed to setup working directory"); + return false; + } + + String[] schedulerCmd = getSchedulerCommand(); + Process p = startScheduler(schedulerCmd); + if (p == null) { + LOG.severe("Failed to start SchedulerMain using: " + Arrays.toString(schedulerCmd)); + return false; + } + + LOG.log(Level.FINE, String.format( + "To check the status and logs of the topology, use the working directory %s", + EcsContext.workingDirectory(config))); + + return true; + + } + + protected String[] getSchedulerCommand() { + List freePorts = new ArrayList<>(SchedulerUtils.PORTS_REQUIRED_FOR_SCHEDULER); + for (int i = 0; i < SchedulerUtils.PORTS_REQUIRED_FOR_SCHEDULER; i++) { + freePorts.add(SysUtils.getFreePort()); + } + + return SchedulerUtils.schedulerCommand(config, runtime, freePorts); + } + + protected Process startScheduler(String[] schedulerCmd) { + return ShellUtils.runASyncProcess(EcsContext.verbose(config), schedulerCmd, + new File(topologyWorkingDirectory)); + } + protected boolean setupWorkingDirectory() { + // get the path of core release URI + String coreReleasePackageURI = EcsContext.corePackageUri(config); + + // form the target dest core release file name + String coreReleaseFileDestination = Paths.get( + topologyWorkingDirectory, "heron-core.tar.gz").toString(); + + // Form the topology package's URI + String topologyPackageURI = Runtime.topologyPackageUri(runtime).toString(); + + // form the target topology package file name + String topologyPackageDestination = Paths.get( + topologyWorkingDirectory, "topology.tar.gz").toString(); + + return SchedulerUtils.setupWorkingDirectory( + topologyWorkingDirectory, + coreReleasePackageURI, + coreReleaseFileDestination, + topologyPackageURI, + topologyPackageDestination, + Context.verbose(config)); + } + protected IScheduler getScheduler() { + return new EcsScheduler(); + } +} diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/ecs/EcsScheduler.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/ecs/EcsScheduler.java new file mode 100644 index 00000000000..ac3215778c1 --- /dev/null +++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/ecs/EcsScheduler.java @@ -0,0 +1,273 @@ +// Copyright 2016 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.twitter.heron.scheduler.ecs; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +//import java.util.Arrays; +import java.util.List; +import java.util.logging.Logger; + +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; + +import org.apache.commons.io.IOUtils; + +import com.twitter.heron.common.basics.SysUtils; +import com.twitter.heron.proto.scheduler.Scheduler; +import com.twitter.heron.scheduler.utils.SchedulerUtils; +import com.twitter.heron.spi.common.Config; +import com.twitter.heron.spi.common.Context; +import com.twitter.heron.spi.packing.PackingPlan; +import com.twitter.heron.spi.scheduler.IScheduler; +import com.twitter.heron.spi.utils.ShellUtils; + +public class EcsScheduler implements IScheduler { + private static final Logger LOG = Logger.getLogger(EcsScheduler.class.getName()); + private Config config; + private Config runtime; + private StringBuilder nfreePorts; + private volatile boolean isTopologyKilled = false; + private File tempDockerFile = null; + + @Override + public void initialize(Config mConfig, Config mRuntime) { + if (Config.toClusterMode(mConfig) != null) { + this.config = Config.toClusterMode(mConfig); + } else { + this.config = mConfig; + } + this.runtime = mRuntime; + } + + @Override + public void close() { + } + + + protected int startExecutorSyncProcess(int container) { + String executingInShell = new String(); + executingInShell = getExecutorCommand(container)[0]; + return ShellUtils.runProcess(executingInShell, null); + } + @VisibleForTesting + protected void startExecutor(final int container) { + LOG.info("Starting a new executor for container: " + container); + int shellOutput = startExecutorSyncProcess(container); + LOG.info("output value for the executor container: " + + container + String.valueOf(shellOutput)); + } + + protected String[] getExecutorCommand(int container) { + List freePorts = new ArrayList<>(SchedulerUtils.PORTS_REQUIRED_FOR_EXECUTOR); + Integer localFreePort = null; + nfreePorts = new StringBuilder(); + for (int i = 0; i < SchedulerUtils.PORTS_REQUIRED_FOR_EXECUTOR; i++) { + localFreePort = SysUtils.getFreePort(); + freePorts.add(localFreePort); + nfreePorts.append("\n - \""); + nfreePorts.append(localFreePort); + nfreePorts.append(":"); + nfreePorts.append(localFreePort); + nfreePorts.append("\""); + } + String[] executorCmd = SchedulerUtils.executorCommand(config, runtime, container, freePorts); + String finalExecCommand = setClusterValues(formHeronExecCommand(executorCmd)); + String ecsTaskProject = EcsContext.topologyName(config) + "_" + container; + FileOutputStream dockerFilestream = null; + String content = null; + try { + tempDockerFile = File.createTempFile("docker", ".yml"); + content = getDockerFileContent(finalExecCommand, container); + tempDockerFile.setWritable(true); + dockerFilestream = new FileOutputStream(tempDockerFile); + IOUtils.write(content, dockerFilestream); + IOUtils.closeQuietly(dockerFilestream); + } catch (IOException e) { + LOG.severe("Unable to create ecs task for container: " + container); + } finally { + IOUtils.closeQuietly(dockerFilestream); + } + String finalCommand = String.format("%s %s --file %s up", + EcsContext.composeupCmd(config), + ecsTaskProject, tempDockerFile); + LOG.info("final Ecs Task command " + finalCommand); + //tempDockerFile.deleteOnExit(); + return new String[] {finalCommand}; + } + + protected String setClusterValues(String localExecCommand) { + String clusterExecCommand = localExecCommand.replace(Context.topologyBinaryFile(config), + EcsContext.ecsClusterBinary(config)); + clusterExecCommand = clusterExecCommand.replaceAll("\"", "'"); + return clusterExecCommand; + } + + protected String getDockerFileContent(String execCommand, int container) throws IOException { + + String commandBuiler = new String(Files.readAllBytes( + Paths.get(EcsContext.ecsComposeTemplate(config)))); + commandBuiler = commandBuiler.replaceAll("TOPOLOGY_NAME", + EcsContext.topologyName(config)); + commandBuiler = commandBuiler.replaceAll("CONTAINER_NUMBER", + "executor" + String.valueOf(container)); + commandBuiler = commandBuiler.replace("HERON_EXECUTOR", execCommand); + commandBuiler = commandBuiler.replace("FREEPORTS", nfreePorts); + System.out.println("commandBuiler :\n" + commandBuiler); + return commandBuiler; + } + + private String formHeronExecCommand(String[] inStringArray) { + StringBuilder builder = new StringBuilder(); + for (String string : inStringArray) { + if (builder.length() > 0) { + builder.append(" "); + } + builder.append(string); + } + builder.append(" "); + builder.append(EcsContext.AmiInstanceUrl(config)); + String stringToReturn = builder.toString(); + return stringToReturn; + } + + /** + * Schedule the provided packed plan + */ + @Override + public boolean onSchedule(PackingPlan packing) { + LOG.info("Starting to deploy topology: " + EcsContext.topologyName(config)); + LOG.info("Starting executor for TMaster"); + startExecutor(0); + // for each container, run its own executor + for (PackingPlan.ContainerPlan container : packing.getContainers()) { + startExecutor(container.getId()); + } + LOG.info("Executor for each container have been started."); + return true; + } + + @Override + public List getJobLinks() { + List list = new ArrayList(); + StringBuilder familyListcmd = new StringBuilder(); + familyListcmd.append(EcsContext.composeFamilyName(config)); + familyListcmd.append(EcsContext.topologyName(config)); + LOG.info(String.format("final list cmd: %s", familyListcmd)); + StringBuilder stdout = new StringBuilder(); + StringBuilder stderr = new StringBuilder(); + List familyString; + int status = ShellUtils.runProcess(familyListcmd.toString(), stdout); + if (status != 0) { + LOG.severe(String.format( + "Failed to run process. Command=%s, STDOUT=%s, STDERR=%s", + familyListcmd.toString(), stdout, stderr)); + } else { + String listjsonString = stdout.toString(); + try { + familyString = parseJsonName(listjsonString, EcsContext.composeListby(config)); + for (String familyName : familyString) { + StringBuilder listout = new StringBuilder(); + StringBuilder listerr = new StringBuilder(); + StringBuilder taskListcmd = new StringBuilder(); + taskListcmd.append(EcsContext.composeListCmd(config)); + taskListcmd.append(" "); + taskListcmd.append(familyName); + int jobListstatus = ShellUtils.runProcess(taskListcmd.toString(), listout); + if (jobListstatus != 0) { + LOG.severe(String.format( + "Failed to run process. Command=%s, STDOUT=%s, STDERR=%s", + taskListcmd.toString(), listout, listerr)); + } else { + List taskString = parseJsonName(listout.toString(), + EcsContext.composeTaskTag(config)); + for (String taskId : taskString) { + list.add(taskId); + } + } + } + } catch (JsonParseException e) { + LOG.severe("Unable to get list due to Parsing issues"); + } catch (IOException ioe) { + LOG.severe("Unable to get list due to IO issues"); + } + } + return list; + } + + private List parseJsonName(String jString, String jName) throws JsonParseException, + IOException { + ObjectMapper mapper = new ObjectMapper(); + JsonNode actualObj = mapper.readTree(jString); + //JsonNode jsonNode1 = actualObj.get("families"); + List jsonList = new ArrayList(); + final JsonNode arrNode = new ObjectMapper().readTree(jString).get(jName); + if (arrNode.isArray()) { + for (final JsonNode objNode : arrNode) { + String taskDefn = objNode.asText(); + jsonList.add(taskDefn.toString()); + } + } else { + jsonList.add(arrNode.textValue()); + } + return jsonList; + } + + @Override + public boolean onKill(Scheduler.KillTopologyRequest request) { + StringBuilder stdout = new StringBuilder(); + StringBuilder stderr = new StringBuilder(); + + List taskList = getJobLinks(); + for (String taskId : taskList) { + StringBuilder killJob = new StringBuilder(); + killJob.append(EcsContext.composeStopCmd(config)); + killJob.append(" "); + killJob.append(taskId); + int status = ShellUtils.runProcess(killJob.toString(), null); + if (status != 0) { + LOG.severe(String.format( + "Failed to run process. Command=%s, STDOUT=%s, STDERR=%s", + EcsContext.composeStopCmd(config), stdout, stderr)); + isTopologyKilled = false; + } else { + LOG.info("Topology Task stop Successful"); + isTopologyKilled = true; + } + } + return isTopologyKilled; + } + + @Override + public boolean onRestart(Scheduler.RestartTopologyRequest request) { + // TODO(ananthgs): Need to see if re-starting each task is good. + LOG.severe("Topology onRestart not implemented by this scheduler Please use kill & start."); + return false; + } + + @Override + public boolean onUpdate(Scheduler.UpdateTopologyRequest request) { + // TODO(ananthgs): Need to decide how to get pplans and update + LOG.severe("Topology onUpdate not implemented by this scheduler."); + return false; + } + +} diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/local/LocalLauncher.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/local/LocalLauncher.java index f4535f5e491..da7faeb2243 100644 --- a/heron/schedulers/src/java/com/twitter/heron/scheduler/local/LocalLauncher.java +++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/local/LocalLauncher.java @@ -49,6 +49,7 @@ public void initialize(Config mConfig, Config mRuntime) { // get the topology working directory this.topologyWorkingDirectory = LocalContext.workingDirectory(config); + } @Override diff --git a/heron/schedulers/tests/java/BUILD b/heron/schedulers/tests/java/BUILD index 02f20754330..68cf9a78599 100644 --- a/heron/schedulers/tests/java/BUILD +++ b/heron/schedulers/tests/java/BUILD @@ -62,6 +62,10 @@ marathon_deps_files = [ "//heron/schedulers/src/java:marathon-scheduler-java", ] +ecs_deps_files = [ + "//heron/schedulers/src/java:ecs-scheduler-java", +] + java_library( name = "aurora-tests", srcs = glob(["**/aurora/*.java"]), @@ -161,3 +165,18 @@ java_tests( runtime_deps = [ ":marathon-tests" ], size = "small", ) + +java_library( + name = "ecs-tests", + srcs = glob(["**/ecs/*.java"]), + deps = scheduler_deps_files + ecs_deps_files, +) + +java_tests( + test_classes = [ + "com.twitter.heron.scheduler.ecs.EcsSchedulerTest", + "com.twitter.heron.scheduler.ecs.EcsLauncherTest", + ], + runtime_deps = [ ":ecs-tests" ], + size = "small", +) \ No newline at end of file diff --git a/heron/schedulers/tests/java/com/twitter/heron/scheduler/ecs/EcsSchedulerTest.java b/heron/schedulers/tests/java/com/twitter/heron/scheduler/ecs/EcsSchedulerTest.java new file mode 100644 index 00000000000..745db610107 --- /dev/null +++ b/heron/schedulers/tests/java/com/twitter/heron/scheduler/ecs/EcsSchedulerTest.java @@ -0,0 +1,142 @@ +// Copyright 2016 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.twitter.heron.scheduler.ecs; + +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Set; +//import java.util.concurrent.ExecutorService; +//import java.util.concurrent.TimeUnit; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +//import org.mockito.Matchers; +import org.mockito.Mockito; +//import static org.mockito.Mockito.mock; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import com.twitter.heron.api.generated.TopologyAPI; +import com.twitter.heron.common.basics.PackageType; +//import com.twitter.heron.scheduler.utils.LauncherUtils; +import com.twitter.heron.scheduler.utils.SchedulerUtils; +import com.twitter.heron.spi.common.Config; +import com.twitter.heron.spi.common.ConfigLoader; + +import com.twitter.heron.spi.common.Key; +//import com.twitter.heron.spi.packing.IPacking; +import com.twitter.heron.spi.packing.PackingPlan; +//import com.twitter.heron.spi.scheduler.ILauncher; +//import com.twitter.heron.spi.statemgr.SchedulerStateManagerAdaptor; +import com.twitter.heron.spi.utils.PackingTestUtils; +import com.twitter.heron.spi.utils.TopologyTests; + + +@RunWith(PowerMockRunner.class) +@PrepareForTest({SchedulerUtils.class, ConfigLoader.class}) + +public class EcsSchedulerTest { + + private static final String TOPOLOGY_NAME = "testTopology"; + private static final String CLUSTER = "testCluster"; + private static final String ROLE = "testRole"; + private static final String ENVIRON = "testEnviron"; + private static final String BUILD_VERSION = "live"; + private static final String BUILD_USER = "user"; + private static final String TOPOLOGY_DEFINITION_FILE = "topologyDefFile"; + private static final String TOPOLOGY_BINARY_FILE = "topologyBinFile"; + private static final String TOPOLOGY_PACKAGE_TYPE = "tar"; + private static final String[] EXECUTOR_CMD = {"executor", "_", "cmd"}; + private static final String DOCKER_CONTENT = "testDockerContent"; + private static final String TEST_DATA_PATH = + "/__main__/heron/spi/tests/java/com/twitter/heron/spi/common/testdata"; + private final String heronHome = + Paths.get(System.getenv("JAVA_RUNFILES"), TEST_DATA_PATH).toString(); + private final String configPath = Paths.get(heronHome, "local").toString(); + private static final String TEST_DOCKER_PATH = + "/__main__/heron/config/src/yaml/conf/ecs/ecs_compose_template.yaml"; + private final String dockerPath = Paths.get(heronHome, "").toString(); + private static EcsScheduler scheduler; + private Config clusterConfig; + private Config runtime; + private EcsContext ecsContext; + private Set containerPlans; + + @Before + public void before() throws Exception { + scheduler = Mockito.spy(EcsScheduler.class); + + Config rawConfig = ConfigLoader.loadConfig( + heronHome, configPath, "/release/file", "/override/file"); + clusterConfig = Mockito.spy(Config.toClusterMode(rawConfig)); + + runtime = Mockito.mock(Config.class); + + scheduler.initialize(clusterConfig, runtime); + } + @After + public void after() throws Exception { + + } + @BeforeClass + public static void beforeClass() throws Exception { + + } + + @AfterClass + public static void afterClass() throws Exception { + scheduler.close(); + } + + @Test + public void testOnSchedule() throws Exception { + + + Mockito.doReturn(TOPOLOGY_DEFINITION_FILE). + when(clusterConfig).get(Key.TOPOLOGY_DEFINITION_FILE); + Mockito.doReturn(TOPOLOGY_BINARY_FILE).when(clusterConfig).get(Key.TOPOLOGY_BINARY_FILE); + Mockito.doReturn(PackageType.getPackageType("test.tar")). + when(clusterConfig).get(Key.TOPOLOGY_PACKAGE_TYPE); + + TopologyAPI.Topology topology = TopologyTests.createTopology( + TOPOLOGY_NAME, new com.twitter.heron.api.Config(), new HashMap(), + new HashMap()); + Mockito.when(runtime.get(Key.TOPOLOGY_DEFINITION)).thenReturn(topology); + Mockito.when(runtime.get(Key.TOPOLOGY_DEFINITION_FILE)).thenReturn(TOPOLOGY_DEFINITION_FILE); + + ecsContext = Mockito.mock(EcsContext.class); + PowerMockito.mockStatic(EcsContext.class); + Mockito.when(EcsContext.ecsComposeTemplate(clusterConfig)).thenReturn(TEST_DOCKER_PATH); + + PackingPlan packingPlan = Mockito.mock(PackingPlan.class); + Set containers = new HashSet<>(); + containers.add(PackingTestUtils.testContainerPlan(1)); + //containers.add(PackingTestUtils.testContainerPlan(2)); + //containers.add(PackingTestUtils.testContainerPlan(3)); + Mockito.when(packingPlan.getContainers()).thenReturn(containers); + + containers.add(Mockito.mock(PackingPlan.ContainerPlan.class)); + PackingPlan validPlan = + new PackingPlan(TOPOLOGY_NAME, containers); + Assert.assertTrue(scheduler.onSchedule(validPlan)); + } +} diff --git a/scripts/packages/BUILD b/scripts/packages/BUILD index d551ef86fcc..7c8d9a68a5c 100644 --- a/scripts/packages/BUILD +++ b/scripts/packages/BUILD @@ -250,6 +250,14 @@ pkg_tar( ] ) +pkg_tar( + name = "heron-client-conf-ecs", + package_dir = "conf/ecs", + files = [ + "//heron/config/src/yaml:conf-ecs-yaml", + ] +) + pkg_tar( name = "heron-client-conf", package_dir = "conf", @@ -275,6 +283,7 @@ pkg_tar( ":heron-client-conf-yarn", ":heron-client-conf-mesos", ":heron-client-conf-marathon", + ":heron-client-conf-ecs", ":heron-client-dist", ":heron-client-examples", ":heron-client-lib-third_party", diff --git a/third_party/java/BUILD b/third_party/java/BUILD index 22caa0dfd23..537b0b1584c 100644 --- a/third_party/java/BUILD +++ b/third_party/java/BUILD @@ -2,6 +2,21 @@ licenses(["notice"]) package(default_visibility = ["//visibility:public"]) +java_library( + name = "jackson-databind", + srcs = [ "Empty.java" ], + exports = [ + "@com_fasterxml_jackson_core_jackson-annotations//jar", + "@com_fasterxml_jackson_core_jackson-core//jar", + "@com_fasterxml_jackson_core_jackson-databind//jar", + ], + deps = [ + "@com_fasterxml_jackson_core_jackson_annotations//jar", + "@com_fasterxml_jackson_core_jackson_core//jar", + "@com_fasterxml_jackson_core_jackson_databind//jar", + ], +) + java_library( name = "aws-java-sdk", srcs = [ "Empty.java" ], diff --git a/tools/rules/heron_client.bzl b/tools/rules/heron_client.bzl index f458a050e60..94836b0bb4a 100644 --- a/tools/rules/heron_client.bzl +++ b/tools/rules/heron_client.bzl @@ -17,6 +17,11 @@ def heron_client_local_files(): "//heron/config/src/yaml:conf-local-yaml", ] +def heron_client_ecs_files(): + return [ + "//heron/config/src/yaml:conf-ecs-yaml", + ] + def heron_client_aurora_files(): return [ "//heron/config/src/yaml:conf-aurora-yaml", @@ -36,6 +41,7 @@ def heron_client_lib_scheduler_files(): "//heron/schedulers/src/java:heron-yarn-scheduler", "//heron/schedulers/src/java:heron-mesos-scheduler", "//heron/schedulers/src/java:heron-marathon-scheduler", + "//heron/schedulers/src/java:heron-ecs-scheduler", "//heron/packing/src/java:heron-roundrobin-packing", "//heron/packing/src/java:heron-binpacking-packing", ] diff --git a/tools/rules/heron_core.bzl b/tools/rules/heron_core.bzl index 24d501dcb77..763b0abd7fc 100644 --- a/tools/rules/heron_core.bzl +++ b/tools/rules/heron_core.bzl @@ -33,6 +33,7 @@ def heron_core_lib_scheduler_files(): return [ "//heron/scheduler-core/src/java:heron-scheduler", "//heron/schedulers/src/java:heron-local-scheduler", + "//heron/schedulers/src/java:heron-ecs-scheduler", "//heron/schedulers/src/java:heron-slurm-scheduler", "//heron/schedulers/src/java:heron-mesos-scheduler", "//heron/schedulers/src/java:heron-marathon-scheduler",