Skip to content

Commit

Permalink
Only skip optional filters on certain exceptions. (#7)
Browse files Browse the repository at this point in the history
We currently skip filters that are not required when they raise an
exception during instantiation.  This sometimes masks underlying issues
with the configuration or data - this changes the behavior to only skip
filters if certain exceptions are raised.
  • Loading branch information
matz-e authored Sep 23, 2024
1 parent cd45fc5 commit 1ef42db
Show file tree
Hide file tree
Showing 24 changed files with 118 additions and 34 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ extend-exclude = 'deps\/.*$'
line_length = 100

[tool.cibuildwheel]
skip = ["cp3{6,7,8}-*", "pp*", "*-win32", "*-manylinux_i686", "*-musllinux_i686", "*-musllinux_x86_64", "*-musllinux_aarch64"]
skip = ["cp3{6,7,8,13}-*", "pp*", "*-win32", "*-manylinux_i686", "*-musllinux_i686", "*-musllinux_x86_64", "*-musllinux_aarch64"]
# MPI needed for testing with mpi4py
before-all = "yum install -y openmpi3-devel java-11-openjdk"
environment = { MPICC="/usr/lib64/openmpi3/bin/mpicc" }
Expand Down
3 changes: 2 additions & 1 deletion src/functionalizer/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
from pathlib import Path

import pyarrow.parquet as pq
import sparkmanager as sm
from fz_td_recipe import Recipe
from pyspark.sql import functions as F

import sparkmanager as sm

from . import utils
from .circuit import Circuit
from .definitions import CheckpointPhases, SortBy
Expand Down
2 changes: 1 addition & 1 deletion src/functionalizer/filters/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Module with filters to process edge data."""

from .definitions import DatasetOperation, load # NOQA
from .definitions import DatasetOperation, FilterInitializationError, load # NOQA
from .helpers import enable_debug # NOQA

from . import helpers # NOQA
7 changes: 5 additions & 2 deletions src/functionalizer/filters/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from pathlib import Path

import sparkmanager as sm

from functionalizer.circuit import Circuit
from functionalizer.utils import get_logger
from functionalizer.utils.checkpointing import checkpoint_resume
Expand All @@ -38,6 +37,10 @@ def load(*dirnames: str) -> None:
importlib.import_module(modulename)


class FilterInitializationError(RuntimeError):
"""Error to be raised when filters should be skipped."""


# ---------------------------------------------------
# Dataset operations
# ---------------------------------------------------
Expand Down Expand Up @@ -108,7 +111,7 @@ def initialize(mcs, names, *args):
)
try:
filters.append(fcls(*args))
except Exception as e:
except FilterInitializationError as e:
if fcls._required:
logger.fatal("Could not instantiate %s", fcls.__name__)
raise
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
"""Filter that matches distributions of synapses."""

import sparkmanager as sm
from pyspark.sql import functions as F

import sparkmanager as sm
from functionalizer.circuit import touches_per_pathway
from functionalizer.definitions import CheckpointPhases
from functionalizer.filters import DatasetOperation, helpers
Expand Down
9 changes: 6 additions & 3 deletions src/functionalizer/filters/implementations/spine_length.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
from operator import attrgetter

import pandas as pd
import sparkmanager as sm
from pyspark.sql import functions as F

from functionalizer.filters import DatasetOperation
import sparkmanager as sm
from functionalizer.filters import DatasetOperation, FilterInitializationError
from functionalizer.utils import get_logger

from . import add_bin_column, add_random_column
Expand All @@ -29,9 +29,12 @@ def __init__(self, recipe, source, target):
recipe to obtain the desired distribution of spine lengths to match.
"""
super().__init__(recipe, source, target)
self.seed = recipe.seeds.synapseSeed
self.seed = recipe.get("seed")
logger.info("Using seed %d for spine length adjustment", self.seed)

if not recipe.get("spine_lengths"):
raise FilterInitializationError("'synapse_reposition' not in recipe")

self.binnings = sorted(recipe.spine_lengths, key=attrgetter("length"))

def apply(self, circuit):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import pandas as pd
from pyspark.sql import functions as F

from functionalizer.filters import DatasetOperation
from functionalizer.filters import DatasetOperation, FilterInitializationError


class SpineMorphologies(DatasetOperation):
Expand Down Expand Up @@ -53,6 +53,10 @@ class SpineMorphologies(DatasetOperation):
def __init__(self, recipe, source, target):
"""Initializes the filter using the morphology database."""
super().__init__(recipe, source, target)

if not target.spine_morphology_path:
raise FilterInitializationError("target nodes do not define 'spine_morphologies_dir'")

self._morphologies, self._filter = _create_spine_morphology_udf(
target.spine_morphology_path
)
Expand Down Expand Up @@ -105,6 +109,7 @@ def _read_spine_morphology_attributes(spine_morpho_path: Path):
Returns a dataframe with spine morphology properties.
"""
files = sorted(spine_morpho_path.glob("*.h5"))
assert len(files) > 0, "no spine morphologies present"
ids = np.ndarray((0,), dtype=int)
lengths = np.ndarray((0,), dtype=float)
morphologies = np.ndarray((0,), dtype=int)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
"""Filters to add properties to synapses."""

import sparkmanager as sm
from pyspark.sql import functions as F
from pyspark.sql import types as T

import sparkmanager as sm
from functionalizer.filters import DatasetOperation
from functionalizer.utils import get_logger

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

import numpy as np
import pandas as pd
import sparkmanager as sm

from functionalizer.filters import DatasetOperation
import sparkmanager as sm
from functionalizer.filters import DatasetOperation, FilterInitializationError


class SynapseReposition(DatasetOperation):
Expand All @@ -20,6 +20,8 @@ class SynapseReposition(DatasetOperation):
def __init__(self, recipe, source, target):
"""Initialize the filter, extracting the reposition part of the recipe."""
super().__init__(recipe, source, target)
if not recipe.get("synapse_reposition"):
raise FilterInitializationError("'synapse_reposition' not in recipe")
self.columns, self.reposition = recipe.as_matrix("synapse_reposition")
self.unset_value = len(recipe.get("synapse_reposition"))

Expand Down
2 changes: 1 addition & 1 deletion src/functionalizer/filters/implementations/touch.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

import numpy as np
import pandas as pd
import sparkmanager as sm
from pyspark.sql import functions as F

import sparkmanager as sm
from functionalizer.filters import DatasetOperation
from functionalizer.utils import get_logger

Expand Down
2 changes: 1 addition & 1 deletion src/functionalizer/io/circuit.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@

import pandas as pd
import pyarrow.parquet as pq
import sparkmanager as sm
from packaging.version import VERSION_PATTERN, Version
from pyspark.sql import DataFrame
from pyspark.sql import functions as F

import sparkmanager as sm
from functionalizer import schema
from functionalizer.schema import OUTPUT_MAPPING
from functionalizer.utils import get_logger
Expand Down
3 changes: 2 additions & 1 deletion src/functionalizer/utils/checkpointing.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
from functools import wraps
from inspect import signature

import sparkmanager as sm
from pyspark.sql.column import _to_seq

import sparkmanager as sm

from . import get_logger
from .filesystem import exists, isdir, size

Expand Down
3 changes: 2 additions & 1 deletion src/functionalizer/utils/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

from contextlib import contextmanager

import sparkmanager as sm
from pyspark.sql import functions as F

import sparkmanager as sm

from . import get_logger

logger = get_logger(__name__)
Expand Down
31 changes: 31 additions & 0 deletions tests/circuit_1000n/circuit_config_invalid.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
{
"manifest": {
"$BASE_DIR": "./",
"$COMPONENT_DIR": "$BASE_DIR",
"$NETWORK_DIR": "$BASE_DIR"
},
"components": {
"biophysical_neuron_models_dir": "no comprendo",
"provenance": {
"bioname_dir": "$COMPONENT_DIR/bioname"
}
},
"networks": {
"nodes": [
{
"nodes_file": "$NETWORK_DIR/nodes.h5",
"nodes_types_file": null,
"populations": {
"All": {
"morphologies_dir": null,
"alternate_morphologies": {
"h5v1": "$BASE_DIR/morphologies/h5"
},
"spine_morphologies_dir": "no comprendo"
}
}
}
],
"edges": []
}
}
18 changes: 15 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pathlib import Path

import pytest

from functionalizer import filters
from functionalizer.core import Functionalizer
from functionalizer.definitions import RunningMode as RM
Expand All @@ -29,6 +30,16 @@
[str(DATADIR / "touches" / "*.parquet")],
)

DEFAULT_ARGS = {
"recipe_file": DATADIR / "recipe.json",
"circuit_config": CIRCUIT_CONFIG,
"source": None,
"source_nodeset": None,
"target": None,
"target_nodeset": None,
"edges": [str(DATADIR / "touches" / "*.parquet")],
}

filters.load()


Expand All @@ -49,10 +60,11 @@ def circuit_config():
return CIRCUIT_CONFIG


@pytest.fixture(scope="session", name="fz")
def fz_fixture(tmp_path_factory):
@pytest.fixture(scope="class", name="fz", params=[{}])
def fz_fixture(request, tmp_path_factory):
tmpdir = tmp_path_factory.mktemp("filters")
return create_functionalizer(tmpdir, RM.FUNCTIONAL.value).init_data(*ARGS)
kwargs = DEFAULT_ARGS | request.param
return create_functionalizer(tmpdir, RM.FUNCTIONAL.value).init_data(**kwargs)


@pytest.fixture(scope="session", name="gj")
Expand Down
1 change: 1 addition & 0 deletions tests/test_data_input_nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import pytest
from conftest import DATADIR, create_functionalizer

from functionalizer.io import NodeData


Expand Down
1 change: 1 addition & 0 deletions tests/test_data_input_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import numpy
import pandas as pd
import pytest

import sparkmanager as sm
from functionalizer.io.circuit import BRANCH_COLUMNS, EdgeData
from functionalizer.utils.conf import Configuration
Expand Down
8 changes: 4 additions & 4 deletions tests/test_data_input_sonata.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
import h5py
import numpy
import pytest
from conftest import DATADIR, DEFAULT_ARGS, create_functionalizer

import sparkmanager as sm
from conftest import ARGS, DATADIR, create_functionalizer
from functionalizer.io.circuit import BRANCH_COLUMNS, EdgeData
from functionalizer.utils.conf import Configuration

Expand Down Expand Up @@ -54,9 +55,8 @@ def test_branch_shift(edges_w_branch_type):
@pytest.mark.slow
def test_sonata_properties(tmp_path_factory):
tmpdir = tmp_path_factory.mktemp("sonata_properties")
fz = create_functionalizer(tmpdir, ["SynapseProperties"]).init_data(
*ARGS[:-1], edges=(os.path.join(DATADIR, "edges.h5"), "default")
)
kwargs = DEFAULT_ARGS | {"edges": (os.path.join(DATADIR, "edges.h5"), "default")}
fz = create_functionalizer(tmpdir, ["SynapseProperties"]).init_data(**kwargs)
fz.process_filters()

assert "delay" in fz.circuit.df.columns
Expand Down
30 changes: 24 additions & 6 deletions tests/test_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
import pandas as pd
import pyspark.sql.functions as F
import pytest
from conftest import DATADIR, DEFAULT_ARGS, create_functionalizer

import sparkmanager as sm
from conftest import ARGS, DATADIR, create_functionalizer
from functionalizer.utils.spark import cache_broadcast_single_part

NUM_AFTER_DISTANCE = 226301
Expand Down Expand Up @@ -33,9 +34,8 @@ def layer_counts(circuit):
return dict(zip(res["mtype"], res["count"]))

tmpdir = tmp_path_factory.mktemp("fixed_probabilities")
fz = create_functionalizer(tmpdir, ["ReduceAndCut"]).init_data(
DATADIR / "recipe_fixed.json", *ARGS[1:]
)
kwargs = DEFAULT_ARGS | {"recipe_file": DATADIR / "recipe_fixed.json"}
fz = create_functionalizer(tmpdir, ["ReduceAndCut"]).init_data(**kwargs)

before = layer_counts(fz.circuit)
fz.process_filters()
Expand All @@ -48,6 +48,24 @@ def layer_counts(circuit):
assert "L6" not in after


class TestFilterInitialization:
"""Test initialization of optional filters"""

def test_spine_morphos(self, fz):
fz.process_filters(filters=["SpineMorphologies"])


class TestBogusFilterInitialization:
"""Test initialization of optional filters"""

@pytest.mark.parametrize(
"fz", [{"circuit_config": DATADIR / "circuit_config_invalid.json"}], indirect=True
)
def test_spine_morphos(self, fz):
with pytest.raises(AssertionError):
fz.process_filters(filters=["SpineMorphologies"])


@pytest.mark.slow
class TestFilters(object):
"""Sequential tests of filters."""
Expand All @@ -71,7 +89,7 @@ def test_reduce_and_cut(self, fz):
def test_resume(self, fz, tmp_path_factory):
"""Make sure that resuming "works" """
tmpdir = tmp_path_factory.mktemp("filters")
fz2 = create_functionalizer(tmpdir).init_data(*ARGS)
fz2 = create_functionalizer(tmpdir).init_data(**DEFAULT_ARGS)
fz2.process_filters()
original = fz.circuit.df.count()
count = fz2.circuit.df.count()
Expand All @@ -80,7 +98,7 @@ def test_resume(self, fz, tmp_path_factory):
def test_overwrite(self, fz, tmp_path_factory):
"""Test that overwriting checkpointed data works"""
tmpdir = tmp_path_factory.mktemp("filters")
fz2 = create_functionalizer(tmpdir).init_data(*ARGS)
fz2 = create_functionalizer(tmpdir).init_data(**DEFAULT_ARGS)
fz2.process_filters(overwrite=True)
original = fz.circuit.df.count()
count = fz2.circuit.df.count()
Expand Down
3 changes: 2 additions & 1 deletion tests/test_gap_junctions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@

import numpy as np
import pytest
from functionalizer.filters import DatasetOperation
from pyspark.sql import functions as F

from functionalizer.filters import DatasetOperation

# (src, dst), num_connections
DENDRO_DATA = [
((987, 990), 10), # 6 with exact or abs() == 1 match
Expand Down
Loading

0 comments on commit 1ef42db

Please sign in to comment.