Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pr update law v0.1.20 #56

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
run: apt-get -y update

- name: Install missing software
run: apt-get install -y git python3-pip && pip install black==23.3.0
run: apt-get install -y git python3-pip && pip install black==24.4.2

- uses: actions/checkout@v2

Expand Down
2 changes: 1 addition & 1 deletion law
Submodule law updated 67 files
+5 −1 .github/actions/build-image/action.yml
+0 −150 .github/workflows/deploy_images_c7.yml
+2 −2 .github/workflows/publish_pypi.yml
+7 −23 README.md
+1 −1 docker/Dockerfile_alma9_py310
+1 −1 docker/Dockerfile_alma9_py311
+1 −1 docker/Dockerfile_alma9_py37
+1 −1 docker/Dockerfile_alma9_py38
+1 −1 docker/Dockerfile_alma9_py39
+0 −99 docker/Dockerfile_centos7_py27
+0 −82 docker/Dockerfile_centos7_py310
+0 −161 docker/Dockerfile_centos7_py36
+0 −82 docker/Dockerfile_centos7_py37
+0 −82 docker/Dockerfile_centos7_py38
+0 −82 docker/Dockerfile_centos7_py39
+1 −1 docker/Dockerfile_centos8_py310
+1 −1 docker/Dockerfile_centos8_py311
+1 −1 docker/Dockerfile_centos8_py37
+1 −1 docker/Dockerfile_centos8_py38
+1 −1 docker/Dockerfile_centos8_py39
+1 −0 docs/api/target/index.rst
+27 −0 docs/api/target/mirrored.rst
+1 −0 docs/contrib/index.rst
+13 −0 docs/contrib/pandas.rst
+18 −5 law.cfg.example
+2 −0 law/__init__.py
+1 −1 law/__version__.py
+16 −11 law/cli/completion.sh
+2 −0 law/config.py
+74 −6 law/contrib/arc/workflow.py
+25 −12 law/contrib/awkward/formatter.py
+2 −0 law/contrib/cms/job.py
+63 −3 law/contrib/cms/workflow.py
+6 −0 law/contrib/coffea/formatter.py
+0 −2 law/contrib/gfal/target.py
+72 −5 law/contrib/glite/workflow.py
+10 −1 law/contrib/hdf5/formatter.py
+7 −0 law/contrib/htcondor/htcondor_wrapper.sh
+63 −20 law/contrib/htcondor/job.py
+119 −19 law/contrib/htcondor/workflow.py
+45 −27 law/contrib/keras/formatter.py
+93 −16 law/contrib/lsf/workflow.py
+6 −0 law/contrib/matplotlib/formatter.py
+9 −4 law/contrib/numpy/formatter.py
+12 −0 law/contrib/pandas/__init__.py
+89 −0 law/contrib/pandas/formatter.py
+9 −1 law/contrib/pyarrow/formatter.py
+3 −3 law/contrib/pyarrow/util.py
+21 −2 law/contrib/root/formatter.py
+4 −4 law/contrib/root/util.py
+1 −1 law/contrib/singularity/sandbox.py
+121 −15 law/contrib/slurm/workflow.py
+56 −27 law/contrib/tensorflow/formatter.py
+53 −20 law/job/base.py
+7 −1 law/job/law_job.sh
+49 −2 law/patches.py
+145 −80 law/target/collection.py
+9 −3 law/target/file.py
+64 −6 law/target/formatter.py
+26 −21 law/target/local.py
+402 −0 law/target/mirrored.py
+2 −2 law/target/remote/base.py
+3 −1 law/task/base.py
+61 −10 law/util.py
+70 −7 law/workflow/base.py
+47 −2 law/workflow/local.py
+100 −97 law/workflow/remote.py
5 changes: 0 additions & 5 deletions lawluigi_configs/KingMaker_law.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@ ping_interval: 20
wait_interval: 20
max_reschedules: 3

[job]
job_file_dir = $ANALYSIS_DATA_PATH/jobs
job_file_dir_cleanup: False
job_file_dir_mkdtemp: True

[target]
default_wlcg_fs = wlcg_fs

Expand Down
54 changes: 15 additions & 39 deletions processor/framework.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,9 @@
import socket
from law.util import interruptable_popen
from rich.console import Console
from law.util import merge_dicts
from datetime import datetime
from law.contrib.htcondor.job import HTCondorJobManager
from tempfile import mkdtemp
from getpass import getuser
from law.config import Config

try:
from luigi.parameter import UnconsumedParameterWarning
Expand Down Expand Up @@ -375,31 +372,17 @@ def get_submission_os(self):
# print(f"Running on {distro} {os_version}, using image {image}")
return image

def htcondor_create_job_manager(self, **kwargs):
kwargs = merge_dicts(self.htcondor_job_manager_defaults, kwargs)
return HTCondorJobManager(**kwargs)

def htcondor_output_directory(self):
# Add identification-str to prevent interference between different tasks of the same class
# Expand path to account for use of env variables (like $USER)
if self.is_local_output:
return law.LocalDirectoryTarget(
self.local_path("htcondor_files"),
law.LocalFileSystem(
None,
base=f"{os.path.expandvars(self.local_output_path)}",
),
)
return law.LocalDirectoryTarget(self.local_path("job_files"))

return law.wlcg.WLCGDirectoryTarget(
self.remote_path("htcondor_files"),
law.wlcg.WLCGFileSystem(None, base=os.path.expandvars(self.wlcg_path)),
)
def htcondor_log_directory(self):
log_path = os.path.join(self.htcondor_output_directory().abspath, "logs")
return law.LocalDirectoryTarget(log_path)

def htcondor_create_job_file_factory(self):
factory = super(HTCondorWorkflow, self).htcondor_create_job_file_factory()
# Print location of job dir
console.log(f"HTCondor job directory is: {factory.dir}")
path = self.htcondor_output_directory().abspath
factory = super().htcondor_create_job_file_factory(dir=path, mkdtemp=False)
console.log(f"HTCondor job directory is: {path}")
return factory

def htcondor_bootstrap_file(self):
Expand All @@ -421,22 +404,15 @@ def htcondor_job_config(self, config, job_num, branches):

analysis_name = os.getenv("ANA_NAME")
task_name = self.__class__.__name__
_cfg = Config.instance()
job_file_dir = _cfg.get_expanded("job", "job_file_dir")
logdir = os.path.join(
os.path.dirname(job_file_dir), "logs", self.production_tag
)
for file_ in ["Log", "Output", "Error"]:
os.makedirs(os.path.join(logdir, file_), exist_ok=True)

# Write job config file
config.custom_content = []
config.log = os.path.join(logdir, "Log", task_name + ".txt")
config.stdout = os.path.join(logdir, "Output", task_name + ".txt")
config.stderr = os.path.join(logdir, "Error", task_name + ".txt")

# config.custom_content.append(("stream_error", "True")) # Remove before commit
# config.custom_content.append(("stream_output", "True")) #
log_base_path = self.htcondor_log_directory().abspath
config.log = os.path.join(log_base_path, "Log_$(JobId).txt")
config.custom_log_file = os.path.join("All_$(JobId).txt")
# config.stdout = "Out_$(JobId).txt"
# config.stderr = "Err_$(JobId).txt"
# config.custom_content.append(("stream_error", "True")) # Remove before commit. Streamed files will end up in
# config.custom_content.append(("stream_output", "True")) # `self.htcondor_create_job_file_factory().dir
if self.htcondor_requirements:
config.custom_content.append(("Requirements", self.htcondor_requirements))
config.custom_content.append(("universe", self.htcondor_universe))
Expand Down Expand Up @@ -556,7 +532,7 @@ def htcondor_job_config(self, config, job_num, branches):
)
config.render_variables["LOCAL_TIMESTAMP"] = startup_time
config.render_variables["LOCAL_PWD"] = startup_dir
# only needed for $ANA_NAME=ML_train see setup.sh line 158
# only needed for $ANA_NAME=ML_train see setup.sh line 207
if os.getenv("MODULE_PYTHONPATH"):
config.render_variables["MODULE_PYTHONPATH"] = os.getenv(
"MODULE_PYTHONPATH"
Expand Down
58 changes: 1 addition & 57 deletions processor/tasks/CROWNBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import json
import shutil
from framework import console
from law.config import Config
from framework import HTCondorWorkflow, Task
from law.task.base import WrapperTask
from rich.table import Table
Expand Down Expand Up @@ -176,54 +175,7 @@ class CROWNExecuteBase(HTCondorWorkflow, law.LocalWorkflow):
files_per_task = luigi.IntParameter()

def htcondor_output_directory(self):
"""
The function `htcondor_output_directory` returns a WLCGDirectoryTarget object that represents a
directory in the WLCG file system.
:return: The code is returning a `law.wlcg.WLCGDirectoryTarget` object.
"""
# Add identification-str to prevent interference between different tasks of the same class
# Expand path to account for use of env variables (like $USER)
if self.is_local_output:
return law.LocalDirectoryTarget(
self.local_path(f"htcondor_files/{self.nick}"),
fs=law.LocalFileSystem(
None,
base=f"{os.path.expandvars(self.local_output_path)}",
),
)

return law.wlcg.WLCGDirectoryTarget(
self.remote_path(f"htcondor_files/{self.nick}"),
fs=law.wlcg.WLCGFileSystem(
None,
base=f"{os.path.expandvars(self.wlcg_path)}",
),
)

def htcondor_create_job_file_factory(self):
"""
The function `htcondor_create_job_file_factory` creates a job file factory for HTCondor workflows.
:return: The method is returning the factory object that is created by calling the
`htcondor_create_job_file_factory` method of the superclass `HTCondorWorkflow`.
"""
class_name = self.__class__.__name__
if "Friend" in class_name:
task_name = [class_name + self.nick, self.friend_name]
else:
task_name = [class_name + self.nick]
_cfg = Config.instance()
job_file_dir = _cfg.get_expanded("job", "job_file_dir")
job_files = os.path.join(
job_file_dir,
self.production_tag,
"_".join(task_name),
"files",
)
factory = super(HTCondorWorkflow, self).htcondor_create_job_file_factory(
dir=job_files,
mkdtemp=False,
)
return factory
return law.LocalDirectoryTarget(self.local_path(f"htcondor_files/{self.nick}"))

def htcondor_job_config(self, config, job_num, branches):
class_name = self.__class__.__name__
Expand All @@ -237,14 +189,6 @@ def htcondor_job_config(self, config, job_num, branches):
)
config = super().htcondor_job_config(config, job_num, branches)
config.custom_content.append(("JobBatchName", condor_batch_name_pattern))
for type in ["log", "stdout", "stderr"]:
logfilepath = getattr(config, type)
# split the filename, and add the sample nick as an additional folder
logfolder, logfile = os.path.split(logfilepath)
logfolder = os.path.join(logfolder, self.nick)
# create the new path
os.makedirs(logfolder, exist_ok=True)
setattr(config, type, os.path.join(logfolder, logfile))
return config

def modify_polling_status_line(self, status_line):
Expand Down
30 changes: 15 additions & 15 deletions processor/tasks/CROWNMultiFriends.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,21 +43,21 @@ def workflow_requires(self):
)
requirements["friend_tarball"] = CROWNBuildMultiFriend.req(self)
for friend in self.friend_dependencies:
requirements[
f"CROWNFriends_{self.nick}_{self.friend_mapping[friend]}"
] = CROWNFriends(
nick=self.nick,
analysis=self.analysis,
config=self.config,
production_tag=self.production_tag,
all_eras=self.all_eras,
shifts=self.shifts,
all_sample_types=self.all_sample_types,
era=self.era,
sample_type=self.sample_type,
scopes=self.scopes,
friend_name=self.friend_mapping[friend],
friend_config=friend,
requirements[f"CROWNFriends_{self.nick}_{self.friend_mapping[friend]}"] = (
CROWNFriends(
nick=self.nick,
analysis=self.analysis,
config=self.config,
production_tag=self.production_tag,
all_eras=self.all_eras,
shifts=self.shifts,
all_sample_types=self.all_sample_types,
era=self.era,
sample_type=self.sample_type,
scopes=self.scopes,
friend_name=self.friend_mapping[friend],
friend_config=friend,
)
)
return requirements

Expand Down
56 changes: 28 additions & 28 deletions processor/tasks/FriendQuantitiesMap.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,20 @@ def workflow_requires(self):
scopes=self.scopes,
)
for friend in self.friend_dependencies:
requirements[
f"CROWNFriends_{self.nick}_{self.friend_mapping[friend]}"
] = CROWNFriends(
nick=self.nick,
analysis=self.analysis,
config=self.config,
production_tag=self.production_tag,
all_eras=self.all_eras,
all_sample_types=self.all_sample_types,
era=self.era,
sample_type=self.sample_type,
scopes=self.scopes,
friend_name=self.friend_mapping[friend],
friend_config=friend,
requirements[f"CROWNFriends_{self.nick}_{self.friend_mapping[friend]}"] = (
CROWNFriends(
nick=self.nick,
analysis=self.analysis,
config=self.config,
production_tag=self.production_tag,
all_eras=self.all_eras,
all_sample_types=self.all_sample_types,
era=self.era,
sample_type=self.sample_type,
scopes=self.scopes,
friend_name=self.friend_mapping[friend],
friend_config=friend,
)
)
return requirements

Expand All @@ -65,20 +65,20 @@ def requires(self):
scopes=self.scopes,
)
for friend in self.friend_dependencies:
requirements[
f"CROWNFriends_{self.nick}_{self.friend_mapping[friend]}"
] = CROWNFriends(
nick=self.nick,
analysis=self.analysis,
config=self.config,
production_tag=self.production_tag,
all_eras=self.all_eras,
all_sample_types=self.all_sample_types,
era=self.era,
sample_type=self.sample_type,
scopes=self.scopes,
friend_name=self.friend_mapping[friend],
friend_config=friend,
requirements[f"CROWNFriends_{self.nick}_{self.friend_mapping[friend]}"] = (
CROWNFriends(
nick=self.nick,
analysis=self.analysis,
config=self.config,
production_tag=self.production_tag,
all_eras=self.all_eras,
all_sample_types=self.all_sample_types,
era=self.era,
sample_type=self.sample_type,
scopes=self.scopes,
friend_name=self.friend_mapping[friend],
friend_config=friend,
)
)
return requirements

Expand Down
30 changes: 15 additions & 15 deletions processor/tasks/ProduceFriends.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,21 @@ def requires(self):

requirements = {}
for samplenick in data["details"]:
requirements[
f"CROWNFriends_{samplenick}_{self.friend_name}"
] = CROWNFriends(
nick=samplenick,
analysis=self.analysis,
config=self.config,
production_tag=self.production_tag,
all_eras=data["eras"],
shifts=self.shifts,
all_sample_types=data["sample_types"],
scopes=self.scopes,
era=data["details"][samplenick]["era"],
sample_type=data["details"][samplenick]["sample_type"],
friend_config=self.friend_config,
friend_name=self.friend_name,
requirements[f"CROWNFriends_{samplenick}_{self.friend_name}"] = (
CROWNFriends(
nick=samplenick,
analysis=self.analysis,
config=self.config,
production_tag=self.production_tag,
all_eras=data["eras"],
shifts=self.shifts,
all_sample_types=data["sample_types"],
scopes=self.scopes,
era=data["details"][samplenick]["era"],
sample_type=data["details"][samplenick]["sample_type"],
friend_config=self.friend_config,
friend_name=self.friend_name,
)
)

return requirements
Expand Down
34 changes: 17 additions & 17 deletions processor/tasks/ProduceMultiFriends.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,23 @@ def requires(self):

requirements = {}
for samplenick in data["details"]:
requirements[
f"CROWNFriends_{samplenick}_{self.friend_name}"
] = CROWNMultiFriends(
nick=samplenick,
analysis=self.analysis,
config=self.config,
production_tag=self.production_tag,
all_eras=data["eras"],
shifts=self.shifts,
all_sample_types=data["sample_types"],
scopes=self.scopes,
era=data["details"][samplenick]["era"],
sample_type=data["details"][samplenick]["sample_type"],
friend_config=self.friend_config,
friend_name=self.friend_name,
friend_dependencies=self.friend_dependencies,
friend_mapping=self.friend_mapping,
requirements[f"CROWNFriends_{samplenick}_{self.friend_name}"] = (
CROWNMultiFriends(
nick=samplenick,
analysis=self.analysis,
config=self.config,
production_tag=self.production_tag,
all_eras=data["eras"],
shifts=self.shifts,
all_sample_types=data["sample_types"],
scopes=self.scopes,
era=data["details"][samplenick]["era"],
sample_type=data["details"][samplenick]["sample_type"],
friend_config=self.friend_config,
friend_name=self.friend_name,
friend_dependencies=self.friend_dependencies,
friend_mapping=self.friend_mapping,
)
)
return requirements

Expand Down
Loading