diff --git a/pyproject.toml b/pyproject.toml index 9f6d047..9ecc122 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -76,7 +76,7 @@ extend-exclude = 'deps\/.*$' line_length = 100 [tool.cibuildwheel] -skip = ["cp3{6,7,8}-*", "pp*", "*-win32", "*-manylinux_i686", "*-musllinux_i686", "*-musllinux_x86_64", "*-musllinux_aarch64"] +skip = ["cp3{6,7,8,13}-*", "pp*", "*-win32", "*-manylinux_i686", "*-musllinux_i686", "*-musllinux_x86_64", "*-musllinux_aarch64"] # MPI needed for testing with mpi4py before-all = "yum install -y openmpi3-devel java-11-openjdk" environment = { MPICC="/usr/lib64/openmpi3/bin/mpicc" } diff --git a/src/functionalizer/core.py b/src/functionalizer/core.py index d83ae5f..2652d99 100644 --- a/src/functionalizer/core.py +++ b/src/functionalizer/core.py @@ -5,10 +5,11 @@ from pathlib import Path import pyarrow.parquet as pq -import sparkmanager as sm from fz_td_recipe import Recipe from pyspark.sql import functions as F +import sparkmanager as sm + from . import utils from .circuit import Circuit from .definitions import CheckpointPhases, SortBy diff --git a/src/functionalizer/filters/__init__.py b/src/functionalizer/filters/__init__.py index 2d7b636..f0bc625 100644 --- a/src/functionalizer/filters/__init__.py +++ b/src/functionalizer/filters/__init__.py @@ -1,6 +1,6 @@ """Module with filters to process edge data.""" -from .definitions import DatasetOperation, load # NOQA +from .definitions import DatasetOperation, FilterInitializationError, load # NOQA from .helpers import enable_debug # NOQA from . import helpers # NOQA diff --git a/src/functionalizer/filters/definitions.py b/src/functionalizer/filters/definitions.py index c3bab10..0381978 100644 --- a/src/functionalizer/filters/definitions.py +++ b/src/functionalizer/filters/definitions.py @@ -11,7 +11,6 @@ from pathlib import Path import sparkmanager as sm - from functionalizer.circuit import Circuit from functionalizer.utils import get_logger from functionalizer.utils.checkpointing import checkpoint_resume @@ -38,6 +37,10 @@ def load(*dirnames: str) -> None: importlib.import_module(modulename) +class FilterInitializationError(RuntimeError): + """Error to be raised when filters should be skipped.""" + + # --------------------------------------------------- # Dataset operations # --------------------------------------------------- @@ -108,7 +111,7 @@ def initialize(mcs, names, *args): ) try: filters.append(fcls(*args)) - except Exception as e: + except FilterInitializationError as e: if fcls._required: logger.fatal("Could not instantiate %s", fcls.__name__) raise diff --git a/src/functionalizer/filters/implementations/reduce_and_cut.py b/src/functionalizer/filters/implementations/reduce_and_cut.py index f1faf42..6f09336 100644 --- a/src/functionalizer/filters/implementations/reduce_and_cut.py +++ b/src/functionalizer/filters/implementations/reduce_and_cut.py @@ -1,8 +1,8 @@ """Filter that matches distributions of synapses.""" -import sparkmanager as sm from pyspark.sql import functions as F +import sparkmanager as sm from functionalizer.circuit import touches_per_pathway from functionalizer.definitions import CheckpointPhases from functionalizer.filters import DatasetOperation, helpers diff --git a/src/functionalizer/filters/implementations/spine_length.py b/src/functionalizer/filters/implementations/spine_length.py index 116bb68..8f10190 100644 --- a/src/functionalizer/filters/implementations/spine_length.py +++ b/src/functionalizer/filters/implementations/spine_length.py @@ -3,10 +3,10 @@ from operator import attrgetter import pandas as pd -import sparkmanager as sm from pyspark.sql import functions as F -from functionalizer.filters import DatasetOperation +import sparkmanager as sm +from functionalizer.filters import DatasetOperation, FilterInitializationError from functionalizer.utils import get_logger from . import add_bin_column, add_random_column @@ -29,9 +29,12 @@ def __init__(self, recipe, source, target): recipe to obtain the desired distribution of spine lengths to match. """ super().__init__(recipe, source, target) - self.seed = recipe.seeds.synapseSeed + self.seed = recipe.get("seed") logger.info("Using seed %d for spine length adjustment", self.seed) + if not recipe.get("spine_lengths"): + raise FilterInitializationError("'synapse_reposition' not in recipe") + self.binnings = sorted(recipe.spine_lengths, key=attrgetter("length")) def apply(self, circuit): diff --git a/src/functionalizer/filters/implementations/spine_morphologies.py b/src/functionalizer/filters/implementations/spine_morphologies.py index 1b1852e..6417c65 100644 --- a/src/functionalizer/filters/implementations/spine_morphologies.py +++ b/src/functionalizer/filters/implementations/spine_morphologies.py @@ -6,7 +6,7 @@ import pandas as pd from pyspark.sql import functions as F -from functionalizer.filters import DatasetOperation +from functionalizer.filters import DatasetOperation, FilterInitializationError class SpineMorphologies(DatasetOperation): @@ -53,6 +53,10 @@ class SpineMorphologies(DatasetOperation): def __init__(self, recipe, source, target): """Initializes the filter using the morphology database.""" super().__init__(recipe, source, target) + + if not target.spine_morphology_path: + raise FilterInitializationError("target nodes do not define 'spine_morphologies_dir'") + self._morphologies, self._filter = _create_spine_morphology_udf( target.spine_morphology_path ) @@ -105,6 +109,7 @@ def _read_spine_morphology_attributes(spine_morpho_path: Path): Returns a dataframe with spine morphology properties. """ files = sorted(spine_morpho_path.glob("*.h5")) + assert len(files) > 0, "no spine morphologies present" ids = np.ndarray((0,), dtype=int) lengths = np.ndarray((0,), dtype=float) morphologies = np.ndarray((0,), dtype=int) diff --git a/src/functionalizer/filters/implementations/synapse_properties.py b/src/functionalizer/filters/implementations/synapse_properties.py index fedee9e..38d55f6 100644 --- a/src/functionalizer/filters/implementations/synapse_properties.py +++ b/src/functionalizer/filters/implementations/synapse_properties.py @@ -1,9 +1,9 @@ """Filters to add properties to synapses.""" -import sparkmanager as sm from pyspark.sql import functions as F from pyspark.sql import types as T +import sparkmanager as sm from functionalizer.filters import DatasetOperation from functionalizer.utils import get_logger diff --git a/src/functionalizer/filters/implementations/synapse_reposition.py b/src/functionalizer/filters/implementations/synapse_reposition.py index 4d09656..c68e15b 100644 --- a/src/functionalizer/filters/implementations/synapse_reposition.py +++ b/src/functionalizer/filters/implementations/synapse_reposition.py @@ -2,9 +2,9 @@ import numpy as np import pandas as pd -import sparkmanager as sm -from functionalizer.filters import DatasetOperation +import sparkmanager as sm +from functionalizer.filters import DatasetOperation, FilterInitializationError class SynapseReposition(DatasetOperation): @@ -20,6 +20,8 @@ class SynapseReposition(DatasetOperation): def __init__(self, recipe, source, target): """Initialize the filter, extracting the reposition part of the recipe.""" super().__init__(recipe, source, target) + if not recipe.get("synapse_reposition"): + raise FilterInitializationError("'synapse_reposition' not in recipe") self.columns, self.reposition = recipe.as_matrix("synapse_reposition") self.unset_value = len(recipe.get("synapse_reposition")) diff --git a/src/functionalizer/filters/implementations/touch.py b/src/functionalizer/filters/implementations/touch.py index e226880..79934a5 100644 --- a/src/functionalizer/filters/implementations/touch.py +++ b/src/functionalizer/filters/implementations/touch.py @@ -2,9 +2,9 @@ import numpy as np import pandas as pd -import sparkmanager as sm from pyspark.sql import functions as F +import sparkmanager as sm from functionalizer.filters import DatasetOperation from functionalizer.utils import get_logger diff --git a/src/functionalizer/io/circuit.py b/src/functionalizer/io/circuit.py index 7e972f0..50b7a43 100644 --- a/src/functionalizer/io/circuit.py +++ b/src/functionalizer/io/circuit.py @@ -9,11 +9,11 @@ import pandas as pd import pyarrow.parquet as pq -import sparkmanager as sm from packaging.version import VERSION_PATTERN, Version from pyspark.sql import DataFrame from pyspark.sql import functions as F +import sparkmanager as sm from functionalizer import schema from functionalizer.schema import OUTPUT_MAPPING from functionalizer.utils import get_logger diff --git a/src/functionalizer/utils/checkpointing.py b/src/functionalizer/utils/checkpointing.py index 9e73bf7..a9b196e 100644 --- a/src/functionalizer/utils/checkpointing.py +++ b/src/functionalizer/utils/checkpointing.py @@ -4,9 +4,10 @@ from functools import wraps from inspect import signature -import sparkmanager as sm from pyspark.sql.column import _to_seq +import sparkmanager as sm + from . import get_logger from .filesystem import exists, isdir, size diff --git a/src/functionalizer/utils/spark.py b/src/functionalizer/utils/spark.py index 8054f26..4277d5f 100644 --- a/src/functionalizer/utils/spark.py +++ b/src/functionalizer/utils/spark.py @@ -2,9 +2,10 @@ from contextlib import contextmanager -import sparkmanager as sm from pyspark.sql import functions as F +import sparkmanager as sm + from . import get_logger logger = get_logger(__name__) diff --git a/tests/circuit_1000n/circuit_config_invalid.json b/tests/circuit_1000n/circuit_config_invalid.json new file mode 100644 index 0000000..ca7a636 --- /dev/null +++ b/tests/circuit_1000n/circuit_config_invalid.json @@ -0,0 +1,31 @@ +{ + "manifest": { + "$BASE_DIR": "./", + "$COMPONENT_DIR": "$BASE_DIR", + "$NETWORK_DIR": "$BASE_DIR" + }, + "components": { + "biophysical_neuron_models_dir": "no comprendo", + "provenance": { + "bioname_dir": "$COMPONENT_DIR/bioname" + } + }, + "networks": { + "nodes": [ + { + "nodes_file": "$NETWORK_DIR/nodes.h5", + "nodes_types_file": null, + "populations": { + "All": { + "morphologies_dir": null, + "alternate_morphologies": { + "h5v1": "$BASE_DIR/morphologies/h5" + }, + "spine_morphologies_dir": "no comprendo" + } + } + } + ], + "edges": [] + } +} diff --git a/tests/conftest.py b/tests/conftest.py index 1df8e9f..8db64fa 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -8,6 +8,7 @@ from pathlib import Path import pytest + from functionalizer import filters from functionalizer.core import Functionalizer from functionalizer.definitions import RunningMode as RM @@ -29,6 +30,16 @@ [str(DATADIR / "touches" / "*.parquet")], ) +DEFAULT_ARGS = { + "recipe_file": DATADIR / "recipe.json", + "circuit_config": CIRCUIT_CONFIG, + "source": None, + "source_nodeset": None, + "target": None, + "target_nodeset": None, + "edges": [str(DATADIR / "touches" / "*.parquet")], +} + filters.load() @@ -49,10 +60,11 @@ def circuit_config(): return CIRCUIT_CONFIG -@pytest.fixture(scope="session", name="fz") -def fz_fixture(tmp_path_factory): +@pytest.fixture(scope="class", name="fz", params=[{}]) +def fz_fixture(request, tmp_path_factory): tmpdir = tmp_path_factory.mktemp("filters") - return create_functionalizer(tmpdir, RM.FUNCTIONAL.value).init_data(*ARGS) + kwargs = DEFAULT_ARGS | request.param + return create_functionalizer(tmpdir, RM.FUNCTIONAL.value).init_data(**kwargs) @pytest.fixture(scope="session", name="gj") diff --git a/tests/test_data_input_nodes.py b/tests/test_data_input_nodes.py index 5870e8e..f69a613 100644 --- a/tests/test_data_input_nodes.py +++ b/tests/test_data_input_nodes.py @@ -4,6 +4,7 @@ import pytest from conftest import DATADIR, create_functionalizer + from functionalizer.io import NodeData diff --git a/tests/test_data_input_parquet.py b/tests/test_data_input_parquet.py index fb2bf70..14ae744 100644 --- a/tests/test_data_input_parquet.py +++ b/tests/test_data_input_parquet.py @@ -3,6 +3,7 @@ import numpy import pandas as pd import pytest + import sparkmanager as sm from functionalizer.io.circuit import BRANCH_COLUMNS, EdgeData from functionalizer.utils.conf import Configuration diff --git a/tests/test_data_input_sonata.py b/tests/test_data_input_sonata.py index 0f0e234..0cfcf2d 100644 --- a/tests/test_data_input_sonata.py +++ b/tests/test_data_input_sonata.py @@ -5,8 +5,9 @@ import h5py import numpy import pytest +from conftest import DATADIR, DEFAULT_ARGS, create_functionalizer + import sparkmanager as sm -from conftest import ARGS, DATADIR, create_functionalizer from functionalizer.io.circuit import BRANCH_COLUMNS, EdgeData from functionalizer.utils.conf import Configuration @@ -54,9 +55,8 @@ def test_branch_shift(edges_w_branch_type): @pytest.mark.slow def test_sonata_properties(tmp_path_factory): tmpdir = tmp_path_factory.mktemp("sonata_properties") - fz = create_functionalizer(tmpdir, ["SynapseProperties"]).init_data( - *ARGS[:-1], edges=(os.path.join(DATADIR, "edges.h5"), "default") - ) + kwargs = DEFAULT_ARGS | {"edges": (os.path.join(DATADIR, "edges.h5"), "default")} + fz = create_functionalizer(tmpdir, ["SynapseProperties"]).init_data(**kwargs) fz.process_filters() assert "delay" in fz.circuit.df.columns diff --git a/tests/test_filters.py b/tests/test_filters.py index 0ed533f..a805b4b 100644 --- a/tests/test_filters.py +++ b/tests/test_filters.py @@ -3,8 +3,9 @@ import pandas as pd import pyspark.sql.functions as F import pytest +from conftest import DATADIR, DEFAULT_ARGS, create_functionalizer + import sparkmanager as sm -from conftest import ARGS, DATADIR, create_functionalizer from functionalizer.utils.spark import cache_broadcast_single_part NUM_AFTER_DISTANCE = 226301 @@ -33,9 +34,8 @@ def layer_counts(circuit): return dict(zip(res["mtype"], res["count"])) tmpdir = tmp_path_factory.mktemp("fixed_probabilities") - fz = create_functionalizer(tmpdir, ["ReduceAndCut"]).init_data( - DATADIR / "recipe_fixed.json", *ARGS[1:] - ) + kwargs = DEFAULT_ARGS | {"recipe_file": DATADIR / "recipe_fixed.json"} + fz = create_functionalizer(tmpdir, ["ReduceAndCut"]).init_data(**kwargs) before = layer_counts(fz.circuit) fz.process_filters() @@ -48,6 +48,24 @@ def layer_counts(circuit): assert "L6" not in after +class TestFilterInitialization: + """Test initialization of optional filters""" + + def test_spine_morphos(self, fz): + fz.process_filters(filters=["SpineMorphologies"]) + + +class TestBogusFilterInitialization: + """Test initialization of optional filters""" + + @pytest.mark.parametrize( + "fz", [{"circuit_config": DATADIR / "circuit_config_invalid.json"}], indirect=True + ) + def test_spine_morphos(self, fz): + with pytest.raises(AssertionError): + fz.process_filters(filters=["SpineMorphologies"]) + + @pytest.mark.slow class TestFilters(object): """Sequential tests of filters.""" @@ -71,7 +89,7 @@ def test_reduce_and_cut(self, fz): def test_resume(self, fz, tmp_path_factory): """Make sure that resuming "works" """ tmpdir = tmp_path_factory.mktemp("filters") - fz2 = create_functionalizer(tmpdir).init_data(*ARGS) + fz2 = create_functionalizer(tmpdir).init_data(**DEFAULT_ARGS) fz2.process_filters() original = fz.circuit.df.count() count = fz2.circuit.df.count() @@ -80,7 +98,7 @@ def test_resume(self, fz, tmp_path_factory): def test_overwrite(self, fz, tmp_path_factory): """Test that overwriting checkpointed data works""" tmpdir = tmp_path_factory.mktemp("filters") - fz2 = create_functionalizer(tmpdir).init_data(*ARGS) + fz2 = create_functionalizer(tmpdir).init_data(**DEFAULT_ARGS) fz2.process_filters(overwrite=True) original = fz.circuit.df.count() count = fz2.circuit.df.count() diff --git a/tests/test_gap_junctions.py b/tests/test_gap_junctions.py index 19858ad..b4c4fdc 100644 --- a/tests/test_gap_junctions.py +++ b/tests/test_gap_junctions.py @@ -4,9 +4,10 @@ import numpy as np import pytest -from functionalizer.filters import DatasetOperation from pyspark.sql import functions as F +from functionalizer.filters import DatasetOperation + # (src, dst), num_connections DENDRO_DATA = [ ((987, 990), 10), # 6 with exact or abs() == 1 match diff --git a/tests/test_morpho_shift.py b/tests/test_morpho_shift.py index 8584a65..a293fd4 100644 --- a/tests/test_morpho_shift.py +++ b/tests/test_morpho_shift.py @@ -4,11 +4,12 @@ from unittest.mock import MagicMock import pytest +from fz_td_recipe import Recipe + import sparkmanager as sm from functionalizer.circuit import Circuit from functionalizer.schema import LEGACY_MAPPING from functionalizer.utils.conf import Configuration -from fz_td_recipe import Recipe class MockLoader: diff --git a/tests/test_morphologies.py b/tests/test_morphologies.py index eb86130..e364299 100644 --- a/tests/test_morphologies.py +++ b/tests/test_morphologies.py @@ -4,6 +4,7 @@ import numpy import pytest + from functionalizer.io.morphologies import MorphologyDB BRANCHES = [ diff --git a/tests/test_property_generation.py b/tests/test_property_generation.py index b8c5bde..f35af91 100644 --- a/tests/test_property_generation.py +++ b/tests/test_property_generation.py @@ -3,11 +3,12 @@ from pathlib import Path import pytest -import sparkmanager as sm from conftest import CIRCUIT_CONFIG, DATADIR -from functionalizer.filters import DatasetOperation from fz_td_recipe import Recipe +import sparkmanager as sm +from functionalizer.filters import DatasetOperation + @pytest.mark.slow def test_property_assignment(fz): diff --git a/tests/test_spine_morphology.py b/tests/test_spine_morphology.py index 6791045..33532c6 100644 --- a/tests/test_spine_morphology.py +++ b/tests/test_spine_morphology.py @@ -4,6 +4,7 @@ import numpy.testing as npt import pandas as pd import pytest + from functionalizer.filters.implementations.spine_morphologies import ( _create_spine_morphology_udf, _read_spine_morphology_attributes,