Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support shape_touched from Dask #900

Merged
merged 33 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
b8fc7fe
wip: initial commit
agoose77 Oct 3, 2023
1b4bd50
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 3, 2023
daa8529
fix: rename function
agoose77 Oct 4, 2023
df97041
Merge remote-tracking branch 'agoose77/agoose77/feat-support-shape-to…
agoose77 Oct 4, 2023
66c8710
fix: use report_necessary_buffers
agoose77 Oct 4, 2023
f31815b
Merge branch 'master' into agoose77/feat-support-shape-touched
agoose77 Oct 4, 2023
2353a23
fix: properly parse form keys
agoose77 Oct 4, 2023
17a6f2f
Merge remote-tracking branch 'agoose77/agoose77/feat-support-shape-to…
agoose77 Oct 4, 2023
bd07d03
hack: convert Content to array
agoose77 Oct 5, 2023
a6848a0
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 5, 2023
9c90205
fix: ensure layout nodes converted to arrays
agoose77 Oct 5, 2023
379a508
Merge remote-tracking branch 'agoose77/agoose77/feat-support-shape-to…
agoose77 Oct 5, 2023
04b5a1a
adjust coffea pins to latest releases and pre-releases
lgray Oct 7, 2023
f19c11b
use pytorch-only triton image
lgray Oct 7, 2023
7051d2e
streamline version requirements
lgray Oct 7, 2023
3fe957b
Merge branch 'master' into agoose77/feat-support-shape-touched
lgray Oct 7, 2023
d3fb1c9
Merge branch 'master' into agoose77/feat-support-shape-touched
lgray Oct 7, 2023
33d2e68
fix: don't import protocol
agoose77 Oct 8, 2023
9d94cb0
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 8, 2023
c49a8bc
Merge branch 'master' into agoose77/feat-support-shape-touched
lgray Oct 10, 2023
0d9c913
remove deprecated interface definition
lgray Oct 11, 2023
bb4df59
Update tests/test_jetmet_tools.py
agoose77 Oct 11, 2023
e0694ad
Update tests/test_jetmet_tools.py
agoose77 Oct 11, 2023
e7384f9
remove further remnants of old remapping interface
lgray Oct 11, 2023
92efdb2
refactor: make key translation obvious
agoose77 Oct 11, 2023
8ae3cd5
fix typo from refactor
lgray Oct 12, 2023
45a0060
update pins (note uncapped numpy and numba skooch)
lgray Oct 14, 2023
14d2cc2
try to convince pip to upgrade numpy upon installing coffea
lgray Oct 14, 2023
ab3599e
be more insistent
lgray Oct 14, 2023
faff41e
numba 0.58 pins numpy from above < 1.26
lgray Oct 14, 2023
864f709
clean up usage of quoted ",!offsets"
lgray Oct 17, 2023
9b96f7b
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 17, 2023
0a525d0
flake8 lint
lgray Oct 17, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ jobs:
python -m pip install xgboost
python -m pip install tritonclient[grpc,http]
# install checked out coffea
python -m pip install -q -e '.[dev,parsl,dask,spark]'
python -m pip install -q -e '.[dev,parsl,dask,spark]' --upgrade --upgrade-strategy eager
python -m pip list
java -version
- name: Install dependencies (MacOS)
Expand All @@ -80,7 +80,7 @@ jobs:
python -m pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cpu
python -m pip install xgboost
# install checked out coffea
python -m pip install -q -e '.[dev,dask,spark]'
python -m pip install -q -e '.[dev,dask,spark]' --upgrade --upgrade-strategy eager
python -m pip list
java -version
- name: Install dependencies (Windows)
Expand All @@ -91,14 +91,14 @@ jobs:
python -m pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cpu
python -m pip install xgboost
# install checked out coffea
python -m pip install -q -e '.[dev,dask]'
python -m pip install -q -e '.[dev,dask]' --upgrade --upgrade-strategy eager
python -m pip list
java -version

- name: Start triton server with example model
if: matrix.os == 'ubuntu-latest'
run: |
docker run -d --rm -p 8000:8000 -p 8001:8001 -p 8002:8002 -v ${{ github.workspace }}/tests/samples/triton_models_test:/models nvcr.io/nvidia/tritonserver:23.04-py3 tritonserver --model-repository=/models
docker run -d --rm -p 8000:8000 -p 8001:8001 -p 8002:8002 -v ${{ github.workspace }}/tests/samples/triton_models_test:/models nvcr.io/nvidia/tritonserver:23.04-pyt-python-py3 tritonserver --model-repository=/models

- name: Test with pytest
run: |
Expand Down
14 changes: 7 additions & 7 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,17 @@ classifiers = [
"Topic :: Utilities",
]
dependencies = [
"awkward>=2.4.2",
"uproot>=5.0.10",
"awkward>=2.4.6",
"uproot>=5.1.1",
"dask[array]>=2023.4.0",
"dask-awkward>=2023.7.1,!=2023.8.0",
"dask-histogram>=2023.6.0",
"correctionlib>=2.0.0",
"dask-awkward>=2023.10.0",
"dask-histogram>=2023.10.0",
"correctionlib>=2.3.3",
"pyarrow>=6.0.0",
"fsspec",
"matplotlib>=3",
"numba>=0.57.0",
"numpy>=1.22.0,<1.25", # < 1.25 for numba 0.57 series
"numba>=0.58.0",
"numpy>=1.22.0,<1.26", # < 1.26 for numba 0.58 series
"scipy>=1.1.0",
"tqdm>=4.27.0",
"lz4",
Expand Down
86 changes: 48 additions & 38 deletions src/coffea/nanoevents/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import dask_awkward
import fsspec
import uproot
from dask_awkward import ImplementsFormTransformation

from coffea.nanoevents.mapping import (
CachedMapping,
Expand Down Expand Up @@ -68,7 +67,7 @@ def _key_formatter(prefix, form_key, form, attribute):
return prefix + f"/{attribute}/{form_key}"


class _map_schema_base(ImplementsFormTransformation):
class _map_schema_base: # ImplementsFormMapping, ImplementsFormMappingInfo
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason not to inherit the protocol(s)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They weren't exported yet! I think a new release includes them. Let me check.

def __init__(
self, schemaclass=BaseSchema, metadata=None, behavior=None, version=None
):
Expand All @@ -77,24 +76,50 @@ def __init__(
self.metadata = metadata
self.version = version

def extract_form_keys_base_columns(self, form_keys):
base_columns = []
for form_key in form_keys:
base_columns.extend(
def keys_for_buffer_keys(self, buffer_keys):
base_columns = set()
for buffer_key in buffer_keys:
form_key, attribute = self.parse_buffer_key(buffer_key)
operands = urllib.parse.unquote(form_key).split(",")

it_operands = iter(operands)
next(it_operands)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function behaves like extract_form_key_base_columns, but supports operands e.g. attribute lookup.


base_columns.update(
[
acolumn
for acolumn in urllib.parse.unquote(form_key).split(",")
if not acolumn.startswith("!")
name
for name, maybe_transform in zip(operands, it_operands)
if maybe_transform == "!load"
]
)
return list(set(base_columns))
return base_columns

def parse_buffer_key(self, buffer_key):
prefix, attribute, form_key = buffer_key.rsplit("/", maxsplit=2)
if attribute == "offsets":
return (form_key[: -len("%2C%21offsets")], attribute)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to also be in the previous code as well, but some of these string splitting and urllib name mangling things were put into https://github.com/CoffeaTeam/coffea/blob/master/src/coffea/nanoevents/util.py and could be used here

else:
return (form_key, attribute)

@property
def buffer_key(self):
return partial(self._key_formatter, "")

def _key_formatter(self, prefix, form_key, form, attribute):
if attribute == "offsets":
form_key += "%2C%21offsets"
return prefix + f"/{attribute}/{form_key}"


class _TranslatedMapping:
def __init__(self, func, mapping):
self._func = func
self._mapping = mapping

def __getitem__(self, index):
return self._mapping[self._func(index)]


class _map_schema_uproot(_map_schema_base):
def __init__(
self, schemaclass=BaseSchema, metadata=None, behavior=None, version=None
Expand Down Expand Up @@ -125,9 +150,12 @@ def __call__(self, form):
},
"form_key": None,
}
return awkward.forms.form.from_dict(self.schemaclass(lform, self.version).form)
return (
awkward.forms.form.from_dict(self.schemaclass(lform, self.version).form),
self,
)

def create_column_mapping_and_key(self, tree, start, stop, interp_options):
def load_buffers(self, tree, keys, start, stop, interp_options):
from functools import partial

from coffea.nanoevents.util import tuple_to_key
Expand All @@ -147,8 +175,15 @@ def create_column_mapping_and_key(self, tree, start, stop, interp_options):
use_ak_forth=True,
)
mapping.preload_column_source(partition_key[0], partition_key[1], tree)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've deliberately tried not to touch this logic at all. Hopefully, by presenting the correct interface to uproot, internal refactoring can happen here in future without any side effects.

buffer_key = partial(self._key_formatter, tuple_to_key(partition_key))

return mapping, partial(self._key_formatter, tuple_to_key(partition_key))
# The buffer-keys that dask-awkward knows about will not include the
# partition key. Therefore, we must translate the keys here.
def translate_key(index):
form_key, attribute = self.parse_buffer_key(index)
return buffer_key(form_key=form_key, attribute=attribute, form=None)

return _TranslatedMapping(translate_key, mapping)


class _map_schema_parquet(_map_schema_base):
Expand All @@ -174,31 +209,6 @@ def __call__(self, form):

return awkward.forms.form.from_dict(self.schemaclass(lform, self.version).form)

def create_column_mapping_and_key(self, columns, start, stop, interp_options):
from functools import partial

from coffea.nanoevents.util import tuple_to_key

uuid = "NO_UUID"
obj_path = "NO_OBJECT_PATH"

partition_key = (
str(uuid),
obj_path,
f"{start}-{stop}",
)
uuidpfn = {uuid: columns}
mapping = PreloadedSourceMapping(
PreloadedOpener(uuidpfn),
start,
stop,
cache={},
access_log=None,
)
mapping.preload_column_source(partition_key[0], partition_key[1], columns)

return mapping, partial(self._key_formatter, tuple_to_key(partition_key))


class NanoEventsFactory:
"""A factory class to build NanoEvents objects"""
Expand Down
21 changes: 13 additions & 8 deletions src/coffea/nanoevents/mapping/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,19 @@ def __getitem__(self, key):
if len(stack) != 1:
raise RuntimeError(f"Syntax error in form key {nodes}")
out = stack.pop()
try:
out = numpy.array(out)
except ValueError:
if self._debug:
print(out)
raise RuntimeError(
f"Left with non-bare array after evaluating form key {nodes}"
)
import awkward

if isinstance(out, awkward.contents.Content):
out = awkward.to_numpy(out)
else:
try:
out = numpy.array(out)
except ValueError:
if self._debug:
print(out)
raise RuntimeError(
f"Left with non-bare array after evaluating form key {nodes}"
)
return out

@abstractmethod
Expand Down
15 changes: 12 additions & 3 deletions src/coffea/nanoevents/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,15 @@ def to_layout(array):
return array.layout


def ensure_array(arraylike):
if isinstance(arraylike, (awkward.contents.Content, awkward.Array)):
return awkward.to_numpy(arraylike)
elif isinstance(arraylike, awkward.index.Index):
return arraylike.data
else:
return numpy.asarray(arraylike)


def data(stack):
"""Extract content from array
(currently a noop, can probably take place of !content)
Expand Down Expand Up @@ -96,7 +105,7 @@ def counts2offsets(stack):
Signature: counts,!counts2offsets
Outputs an array with length one larger than input
"""
counts = numpy.array(stack.pop())
counts = ensure_array(stack.pop())
offsets = numpy.empty(len(counts) + 1, dtype=numpy.int64)
offsets[0] = 0
numpy.cumsum(counts, out=offsets[1:])
Expand All @@ -123,11 +132,11 @@ def local2global(stack):
Signature: index,target_offsets,!local2global
Outputs a content array with same shape as index content
"""
target_offsets = numpy.asarray(stack.pop())
target_offsets = ensure_array(stack.pop())
index = stack.pop()
index = index.mask[index >= 0] + target_offsets[:-1]
index = index.mask[index < target_offsets[1:]]
out = numpy.array(awkward.flatten(awkward.fill_none(index, -1), axis=None))
out = ensure_array(awkward.flatten(awkward.fill_none(index, -1), axis=None))
if out.dtype != numpy.int64:
raise RuntimeError
stack.append(out)
Expand Down
6 changes: 3 additions & 3 deletions src/coffea/processor/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1718,7 +1718,7 @@ def _work_function(
import dask_awkward

to_compute = processor_instance.process(events)
materialized = dask_awkward.necessary_columns(to_compute)
# materialized = dask_awkward.report_necessary_buffers(to_compute)
agoose77 marked this conversation as resolved.
Show resolved Hide resolved
out = dask.compute(to_compute, scheduler="single-threaded")[0]
except Exception as e:
raise Exception(f"Failed processing file: {item!r}") from e
Expand All @@ -1734,11 +1734,11 @@ def _work_function(
metrics = {}
if isinstance(file, uproot.ReadOnlyDirectory):
metrics["bytesread"] = file.file.source.num_requested_bytes
# metrics["data_and_shape_buffers"] = set(materialized)
# metrics["shape_only_buffers"] = set(materialized)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these placeholders for something in the future?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just to get tests to run - executor is going to be significantly changed in a subsequent PR.

if schema is not None and issubclass(schema, schemas.BaseSchema):
metrics["columns"] = set(materialized)
metrics["entries"] = len(events)
else:
metrics["columns"] = set(materialized)
metrics["entries"] = events.size
metrics["processtime"] = toc - tic
return {"out": out, "metrics": metrics, "processed": {item}}
Expand Down
4 changes: 2 additions & 2 deletions tests/test_jetmet_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -837,9 +837,9 @@ def test_corrected_jets_factory(optimization_enabled):
**{name: evaluator[name] for name in jec_stack_names[5:6]}
)

print(dak.necessary_columns(jets.eta))
print(dak.report_necessary_columns(jets.eta))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lgray note that report_necessary_columns reports whatever the IO layer thinks a "column" is. It doesn't necessarily mean "dotted field" right now, rather I envisage it talking about the concept of a "key" or "column" that can be read from the IO source.

@douglasdavis this is something I didn't fully think about when we made necessary_columns shim report_necessary_columns.

The difference will be that for remapped forms, the report_necessary_columns (and by proxy, necessary_columns) will report the underlying ROOT file keys, not the remapped keys.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It therefore might mean that we should remove that alias for necessary_columns.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually prefer the new behavior since users will probably expect to get a list of columns in the file they're reading! But seeing the remapped keys is nice for debugging.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make it so that we get a version or two of dask-awkward before dak.necessary_columns is gone and give migration instructions? I know a few end-users that use that function.

Copy link
Contributor Author

@agoose77 agoose77 Oct 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lgray I'm not 100% clear if we're on the same page.

report_necessary_columns is a new function, so I'm comfortable with its semantics deviating from ak.necessary_columns.

It would be possible to restore the old behavior, but it will require a reasonable amount of work on the uproot side (I am guessing).

ak.necessary_columns is currently an alias for this new function, but for uproot this wouldn't be a non-breaking change; if previously it would report ROOT_KEY.record.foo.bar, now it would report ROOT_KEY. Is this OK? I don't see it being that useful if it breaks existing users.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I suppose for my most typical use case I never see keys with ROOT_KEY.record.foo.bar formatting, it's really only ever ROOT_KEY and any structure is up in the form being remapped to. So they appear the same to me.

print(
dak.necessary_columns(
dak.report_necessary_columns(
resosf.getScaleFactor(
JetEta=jets.eta,
)
Expand Down