- Easy integration with HPC batch systems like PBS/Torque.
- Support for Docker and Singularity containers in the Transformation/Executables catalog.
- Integration with Jupyter notebooks: In addition to the DAX API for creating workflows, Pegasus has a Python API for running and monitoring workflows from Jupyter notebooks or other Python code.
- For short tasks where the overhead of running them on a cluster is too big, Pegasus has a task clustering, which combines the short task together in a bigger job. This feature needs to be tested further to determine how useful in practice it is.
- Workflows are defined as code. Interfaces exist for Python, R, Java, and Perl. As such, anyone using HPC infrastructure should be able to define the DAGs without much trouble. This means that defining the workflow is like writing another script for your project. This is OK, but it lacks the self-documenting, easy-to-translate from individual commands to an automated workflow, nature that Snakemake has.
- In Pegasus, the workflows are independent of the underlying physical infrastructure (location of data, executables, cluster end-points) where they are run. This makes them portable, but it results in a more abstract workflow that you then have to configure with the physical infrastructure parameters. There are Site (temp and output storage locations), Transformation (executable locations), and Replica (input locations) "catalogs" (XML files) that need to be configured for a workflow. Thus, defining the workflow is abstract and cumbersome, and it results in extra jobs for file management being added to the workflow. This diagram illustrates this nicely:
- While a bit cumbersome, the catalogs streamline accessing data on remote hosts. You declare where your inputs, outputs, and scratch locations are in the catalogs, and Pegasus takes care of any file transfer that need to occur when you deploy the workflow on an HPC system This, however, is not very relevant to us since users will largely be executing these on HPC login nodes, where the same storage is mounted as on the compute nodes.
- A Web app allows you to monitor the running of the job or how the job ran. There is a lot of detail, perhaps too much. It is difficult to get a picture at a glance of what a specific workflow is doing. For example, a simple
split
, follower by a fewls
s generates multiple jobs (jobs are the subunits of the workflow), not only for the actual commands, but also for cleaning up, creating directories, registering the workflow. To see what each job does you have to drill down in it, view its tasks (tasks are the subunits of a job), and then you can see the actual command represented by the job. This structure is a little cumbersome, but you do get a detailed, transparent view of it. - There are a number of command-line tools that allow you to monitor progress, debug, and view several summaries of the workflow from the command line. These are more streamlined than the Web interface, while still offering enough detail.
Pegasus has direct support for execution on a batch HPC system powered by PBS/Torque, through the Site Catalog.
import os
import pwd
import sys
import time
from Pegasus.DAX3 import *
# The name of the DAX file is the first argument
if len(sys.argv) != 2:
sys.stderr.write("Usage: %s DAXFILE\n" % (sys.argv[0]))
sys.exit(1)
daxfile = sys.argv[1]
USER = pwd.getpwuid(os.getuid())[0]
# Create a abstract dag
dax = ADAG("split")
# Add some workflow-level metadata
dax.metadata("creator", "%s@%s" % (USER, os.uname()[1]))
dax.metadata("created", time.ctime())
webpage = File("pegasus.html")
# the split job that splits the webpage into smaller chunks
split = Job("split")
split.addArguments("-l","100","-a","1",webpage,"part.")
split.uses(webpage, link=Link.INPUT)
# associate the label with the job. all jobs with same label
# are run with PMC when doing job clustering
split.addProfile( Profile("pegasus","label","p1"))
dax.addJob(split)
# we do a parmeter sweep on the first 4 chunks created
for c in "abcd":
part = File("part.%s" % c)
split.uses(part, link=Link.OUTPUT, transfer=False, register=False)
count = File("count.txt.%s" % c)
wc = Job("wc")
wc.addProfile( Profile("pegasus","label","p1"))
wc.addArguments("-l",part)
wc.setStdout(count)
wc.uses(part, link=Link.INPUT)
wc.uses(count, link=Link.OUTPUT, transfer=True, register=True)
dax.addJob(wc)
#adding dependency
dax.depends(wc, split)
f = open(daxfile, "w")
dax.writeXML(f)
f.close()
print "Generated dax %s" %daxfile
pegasus-plan --conf pegasus.properties \
--dax $DAXFILE \
--dir $DIR/submit \
--input-dir $DIR/input \
--output-dir $DIR/output \
--cleanup leaf \
--force \
--sites condorpool \
--submit
.
βββ README.md
βββ daxgen.py
βββ generate_dax.sh
βββ input
βΒ Β βββ pegasus.html
βββ output
βΒ Β βββ count.txt.a
βΒ Β βββ count.txt.b
βΒ Β βββ count.txt.c
βΒ Β βββ count.txt.d
βββ pegasus.properties
βββ plan_cluster_dax.sh
βββ plan_dax.sh
βββ rc.txt
βββ scratch
βΒ Β βββ tutorial
βΒ Β βββ pegasus
βΒ Β βββ split
βββ sites.xml
βββ sites.xml~
βββ split.dax
βββ submit
βΒ Β βββ tutorial
βΒ Β βββ pegasus
βΒ Β βββ split
βΒ Β βββ run0001
βΒ Β βββ 00
βΒ Β βΒ Β βββ 00
βΒ Β βΒ Β βββ cleanup_split_0_local.err
βΒ Β βΒ Β βββ cleanup_split_0_local.in
βΒ Β βΒ Β βββ cleanup_split_0_local.out
βΒ Β βΒ Β βββ cleanup_split_0_local.sub
βΒ Β βΒ Β βββ create_dir_split_0_local.err.000
βΒ Β βΒ Β βββ create_dir_split_0_local.in
βΒ Β βΒ Β βββ create_dir_split_0_local.out.000
βΒ Β βΒ Β βββ create_dir_split_0_local.sub
βΒ Β βΒ Β βββ register_local_1_0.err.000
βΒ Β βΒ Β βββ register_local_1_0.in
βΒ Β βΒ Β βββ register_local_1_0.out.000
βΒ Β βΒ Β βββ register_local_1_0.sub
βΒ Β βΒ Β βββ split_ID0000001.err.000
βΒ Β βΒ Β βββ split_ID0000001.out.000
βΒ Β βΒ Β βββ split_ID0000001.sh
βΒ Β βΒ Β βββ split_ID0000001.sub
βΒ Β βΒ Β βββ stage_in_remote_local_0_0.err.000
βΒ Β βΒ Β βββ stage_in_remote_local_0_0.in
βΒ Β βΒ Β βββ stage_in_remote_local_0_0.out.000
βΒ Β βΒ Β βββ stage_in_remote_local_0_0.sub
βΒ Β βΒ Β βββ stage_out_local_local_1_0.err.000
βΒ Β βΒ Β βββ stage_out_local_local_1_0.in
βΒ Β βΒ Β βββ stage_out_local_local_1_0.out.000
βΒ Β βΒ Β βββ stage_out_local_local_1_0.sub
βΒ Β βΒ Β βββ wc_ID0000002.err.000
βΒ Β βΒ Β βββ wc_ID0000002.meta
βΒ Β βΒ Β βββ wc_ID0000002.out.000
βΒ Β βΒ Β βββ wc_ID0000002.sh
βΒ Β βΒ Β βββ wc_ID0000002.sub
βΒ Β βΒ Β βββ wc_ID0000003.err.000
βΒ Β βΒ Β βββ wc_ID0000003.meta
βΒ Β βΒ Β βββ wc_ID0000003.out.000
βΒ Β βΒ Β βββ wc_ID0000003.sh
βΒ Β βΒ Β βββ wc_ID0000003.sub
βΒ Β βΒ Β βββ wc_ID0000004.err.000
βΒ Β βΒ Β βββ wc_ID0000004.meta
βΒ Β βΒ Β βββ wc_ID0000004.out.000
βΒ Β βΒ Β βββ wc_ID0000004.sh
βΒ Β βΒ Β βββ wc_ID0000004.sub
βΒ Β βΒ Β βββ wc_ID0000005.err.000
βΒ Β βΒ Β βββ wc_ID0000005.meta
βΒ Β βΒ Β βββ wc_ID0000005.out.000
βΒ Β βΒ Β βββ wc_ID0000005.sh
βΒ Β βΒ Β βββ wc_ID0000005.sub
βΒ Β βββ braindump.txt
βΒ Β βββ catalogs
βΒ Β βΒ Β βββ rc.txt
βΒ Β βΒ Β βββ sites.xml
βΒ Β βΒ Β βββ tc.txt
βΒ Β βββ jobstate.log
βΒ Β βββ monitord-notifications.log
βΒ Β βββ monitord.done
βΒ Β βββ monitord.info
βΒ Β βββ monitord.log
βΒ Β βββ monitord.started
βΒ Β βββ monitord.subwf
βΒ Β βββ pegasus-worker-4.8.0-x86_64_rhel_7.tar.gz
βΒ Β βββ pegasus.7943275248767313075.properties
βΒ Β βββ split-0.cache
βΒ Β βββ split-0.dag
βΒ Β βββ split-0.dag.condor.sub
βΒ Β βββ split-0.dag.dagman.log
βΒ Β βββ split-0.dag.dagman.out
βΒ Β βββ split-0.dag.lib.err
βΒ Β βββ split-0.dag.lib.out
βΒ Β βββ split-0.dag.metrics
βΒ Β βββ split-0.dag.metrics.out
βΒ Β βββ split-0.dag.nodes.log
βΒ Β βββ split-0.dot
βΒ Β βββ split-0.exitcode.log
βΒ Β βββ split-0.log
βΒ Β βββ split-0.metadata
βΒ Β βββ split-0.metrics
βΒ Β βββ split-0.notify
βΒ Β βββ split-0.stampede.db
βΒ Β βββ split-0.static.bp
βΒ Β βββ split.dax
βββ tc.txt
- Web: Docs, BitBucket
- License: MIT License
- Has native support for using in a batch cluster HPC environment. While each task can be configured with required nodes, cores, memory, etc. they all need to be executed in the batch system environment.
- Has support for running jobs from Docker and Singularity containers.
- Has a Python API for starting and stopping jobs, which makes it possible to use with Jupyter notebooks.
- Limited commercial cloud support.
- The easiest to use, and most user-friendly syntax. At the same time, Python code is supported in the
Snakefile
itself, so power users could write custom Python logic. - The resulting
Snakefile
and configuration, is a self-documenting experiment that translates closely to how you would run scripts on the command line.
- The biggest disadvantage is there is no job monitoring.
- Very limited visualization of resulting DAGs (but useful).
configfile: "config.json"
rule all:
input:
'{data_dir}/share-counts/share-counts-by-url.tab'.format(**config),
'{data_dir}/share-counts/share-counts-by-domain.tab'.format(**config),
rule create_news_sources_list:
input:
'{data_dir}/pageranks/part-r-00000'.format(**config),
'{data_dir}/top500.tab'.format(**config)
output:
'{data_dir}/news-sources.tab'.format(**config)
shell:
'python {code_dir}/create_news_sources_list.py {{input}} {{output}}'.format(**config)
rule strip_tweets:
input:
'{data_dir}/tweets/raw'.format(**config),
'{data_dir}/news-sources.tab'.format(**config),
output:
'{data_dir}/tweets/news'.format(**config)
threads:
config['default_num_threads']
shell:
'python {code_dir}/strip_tweets.py {{threads}} {{input}} {{output}}'.format(**config)
rule count_shares:
input:
'{data_dir}/tweets/news'.format(**config)
output:
'{data_dir}/share-counts/share-counts-by-url'.format(**config),
'{data_dir}/share-counts/share-counts-by-domain'.format(**config)
threads:
config['default_num_threads']
shell:
'python {code_dir}/count_shares.py {{threads}} {{input}} {{output}}'.format(**config)
rule reduce_share_counts:
input:
'{data_dir}/share-counts/share-counts-by-{{agg}}'.format(**config)
output:
'{data_dir}/share-counts/share-counts-by-{{agg}}.tab'.format(**config)
shell:
'python {code_dir}/reduce.py --value_convert_fn=int --sort=reverse sum {{input}} {{output}}'.format(**config)
$ snakemake --cores 16
$ tree
.
βββ README.md
βββ Snakefile
βββ config.json
βββ scripts
βββ config.py
βββ count_shares.py
βββ create_news_sources_list.py
βββ reduce.py
βββ strip_tweets.py
βββ urls.py
- Less friendly to HPC and batch system environments. However, with some limited coding, we can provide a solution. A SLURM one for luigi already exists, which would be very similar to a PBS/Torque solution. A solution for Airflow is on the list of planned features for 2017. luigi also has a sciluigi wrapper that makes working with scientific workflows easier.
- Major difference is Airflow has its own scheduler, while for luigi, you need to use cron jobs.
- DAGs are defined as Python code. Familiarity with Python and OOP is required.
- Both provide useful command-line logging.
- GUI for monitoring jobs is more extensive in Airflow.
"""
Code that goes along with the Airflow located at:
http://airflow.readthedocs.org/en/latest/tutorial.html
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'queue': 'bash_queue',
'pool': 'backfill',
'priority_weight': 10,
'end_date': datetime(2016, 1, 1),
}
dag = DAG(
'tutorial', default_args=default_args, schedule_interval=timedelta(1))
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t1)
$ python workflow1.py
$ airflow list_dags
$ airflow list_tasks workflow1
$ airflow list_tasks workflow1 --tree
class AggregateArtists(luigi.Task):
date_interval = luigi.DateIntervalParameter()
def output(self):
return luigi.LocalTarget("data/artist_streams_%s.tsv" % self.date_interval)
def requires(self):
return [Streams(date) for date in self.date_interval]
def run(self):
artist_count = defaultdict(int)
for input in self.input():
with input.open('r') as in_file:
for line in in_file:
timestamp, artist, track = line.strip().split()
artist_count[artist] += 1
with self.output().open('w') as out_file:
for artist, count in artist_count.iteritems():
print >> out_file, artist, count
class Top10Artists(luigi.Task):
date_interval = luigi.DateIntervalParameter()
use_hadoop = luigi.BoolParameter()
def requires(self):
if self.use_hadoop:
return AggregateArtistsHadoop(self.date_interval)
else:
return AggregateArtists(self.date_interval)
def output(self):
return luigi.LocalTarget("data/top_artists_%s.tsv" % self.date_interval)
def run(self):
top_10 = nlargest(10, self._input_iterator())
with self.output().open('w') as out_file:
for streams, artist in top_10:
print >> out_file, self.date_interval.date_a, self.date_interval.date_b, artist, streams
def _input_iterator(self):
with self.input().open('r') as in_file:
for line in in_file:
artist, streams = line.strip().split()
yield int(streams), int(artist)
class ArtistToplistToDatabase(luigi.contrib.postgres.CopyToTable):
date_interval = luigi.DateIntervalParameter()
use_hadoop = luigi.BoolParameter()
host = "localhost"
database = "toplists"
user = "luigi"
password = "abc123" # ;)
table = "top10"
columns = [("date_from", "DATE"),
("date_to", "DATE"),
("artist", "TEXT"),
("streams", "INT")]
def requires(self):
return Top10Artists(self.date_interval, self.use_hadoop)
$ luigi --module top_artists AggregateArtists --local-scheduler --date-interval 2012-06
$ luigi --module examples.top_artists Top10Artists --local-scheduler --date-interval 2012-07