Skip to content

Commit

Permalink
PEP585: Prefer list, dict, set over List, Dict, Set (#68)
Browse files Browse the repository at this point in the history
  • Loading branch information
hf-kklein authored May 4, 2023
1 parent 5b67a48 commit e1a6740
Show file tree
Hide file tree
Showing 12 changed files with 47 additions and 54 deletions.
4 changes: 2 additions & 2 deletions src/bomf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
BOMF stands for BO4E Migration Framework.
"""
from abc import ABC
from typing import Generic, List
from typing import Generic

import attrs

Expand Down Expand Up @@ -37,7 +37,7 @@ class MigrationStrategy(ABC, Generic[IntermediateDataSet, TargetDataModel]):
The target loader moves the target entities into the actual target system.
"""

async def migrate(self) -> List[LoadingSummary]:
async def migrate(self) -> list[LoadingSummary]:
"""
run the entire migration flow from source to target which includes:
1. create bo4e data source using the source_data_set_to_bo4e_mapper
Expand Down
12 changes: 6 additions & 6 deletions src/bomf/filter/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import asyncio
import logging
from abc import ABC, abstractmethod
from typing import Awaitable, Callable, Generic, List, Set, TypeVar
from typing import Awaitable, Callable, Generic, TypeVar

Candidate = TypeVar("Candidate") #: an arbitrary but fixed type on which the filter operates

Expand All @@ -34,11 +34,11 @@ async def predicate(self, candidate: Candidate) -> bool:
"""
raise NotImplementedError("The inheriting class has to implement this method")

async def apply(self, candidates: List[Candidate]) -> List[Candidate]:
async def apply(self, candidates: list[Candidate]) -> list[Candidate]:
"""
apply this filter on the candidates
"""
tasks: List[Awaitable[bool]] = [self.predicate(c) for c in candidates]
tasks: list[Awaitable[bool]] = [self.predicate(c) for c in candidates]
self._logger.info("%s created %i predicate tasks; Awaiting them all", str(self), len(tasks))
predicate_results = await asyncio.gather(*tasks)
self._logger.info("%s awaited %i tasks", str(self), len(tasks))
Expand Down Expand Up @@ -75,7 +75,7 @@ def __init__(self, base_filter: Filter[Aggregate]):
self._base_filter = base_filter

@abstractmethod
async def aggregate(self, candidates: List[Candidate]) -> List[Aggregate]:
async def aggregate(self, candidates: list[Candidate]) -> list[Aggregate]:
"""
Create aggregates which are then passed to the base filter that works on the aggregate.
The method is async so that you can do complex (and e.g. network based) aggregations.
Expand All @@ -89,7 +89,7 @@ def disaggregate(self, aggregate: Aggregate) -> Candidate:
"""
raise NotImplementedError("The inheriting class has to implement this method")

async def apply(self, candidates: List[Candidate]) -> List[Candidate]:
async def apply(self, candidates: list[Candidate]) -> list[Candidate]:
"""
If aggregate and disaggregate and the base_filter are properly setup, then apply will filter the list of
candidates based on the aggregate base filter.
Expand All @@ -109,7 +109,7 @@ class HardcodedFilter(Filter[Candidate], ABC, Generic[Candidate, CandidateProper
a harcoded filter filters on a hardcoded list of allowed/blocked values (formerly known as white- and blacklist)
"""

def __init__(self, criteria_selector: Callable[[Candidate], CandidateProperty], values: Set[CandidateProperty]):
def __init__(self, criteria_selector: Callable[[Candidate], CandidateProperty], values: set[CandidateProperty]):
"""
instantiate by providing a criteria selector that returns a property on which we can filter and a set of values.
Whether the values are used as allowed or not allowed (block) depends on the inheriting class
Expand Down
4 changes: 2 additions & 2 deletions src/bomf/filter/sourcedataproviderfilter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Source Data Provider Filters combine the features of a filter with the features of a Source Data Provider.
"""

from typing import Callable, Generic, List, Literal, Optional, overload
from typing import Callable, Generic, Literal, Optional, overload

from bomf import KeyTyp, SourceDataProvider
from bomf.filter import Candidate, Filter
Expand Down Expand Up @@ -77,7 +77,7 @@ async def apply(
However, in general, you have to specify how the data can be indexed using a key_selector which is not None.
If you provide both a JsonFileSourceDataProvider AND a key_selector, the explicit key_selector will be used.
"""
survivors: List[Candidate] = await self._filter.apply(await source_data_provider.get_data())
survivors: list[Candidate] = await self._filter.apply(await source_data_provider.get_data())
key_selector_to_be_used: Callable[[Candidate], KeyTyp]
if key_selector is not None:
key_selector_to_be_used = key_selector
Expand Down
14 changes: 7 additions & 7 deletions src/bomf/loader/entityloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from abc import ABC, abstractmethod
from datetime import datetime
from pathlib import Path
from typing import Awaitable, Callable, Generic, List, Optional, TypeVar
from typing import Awaitable, Callable, Generic, Optional, TypeVar

import attrs
from pydantic import BaseModel # pylint:disable=no-name-in-module
Expand Down Expand Up @@ -131,12 +131,12 @@ async def load(self, entity: _TargetEntity) -> LoadingSummary:
loading_error=None,
)

async def load_entities(self, entities: List[_TargetEntity]) -> List[LoadingSummary]:
async def load_entities(self, entities: list[_TargetEntity]) -> list[LoadingSummary]:
"""
load all the given entities into the target system
"""
# here we could use some error handling in the future
tasks: List[Awaitable[LoadingSummary]] = [self.load(entity) for entity in entities]
tasks: list[Awaitable[LoadingSummary]] = [self.load(entity) for entity in entities]
result = await asyncio.gather(*tasks)
return result

Expand All @@ -149,17 +149,17 @@ class JsonFileEntityLoader(EntityLoader[_TargetEntity], Generic[_TargetEntity]):
async def verify(self, entity: _TargetEntity, id_in_target_system: Optional[str] = None) -> bool:
return True

def __init__(self, file_path: Path, list_encoder: Callable[[List[_TargetEntity]], List[dict]]):
def __init__(self, file_path: Path, list_encoder: Callable[[list[_TargetEntity]], list[dict]]):
"""provide a path to a json file (will be created if not exists and overwritten if exists)"""
self._file_path = file_path
self._list_encoder = list_encoder
self._entities: List[_TargetEntity] = []
self._entities: list[_TargetEntity] = []

async def load_entity(self, entity: _TargetEntity) -> Optional[EntityLoadingResult]:
self._entities.append(entity)
return None

async def load_entities(self, entities: List[_TargetEntity]) -> List[LoadingSummary]:
async def load_entities(self, entities: list[_TargetEntity]) -> list[LoadingSummary]:
base_result = await super().load_entities(entities)
dict_list = self._list_encoder(self._entities)
with open(self._file_path, "w+", encoding="utf-8") as outfile:
Expand All @@ -174,7 +174,7 @@ async def load_entities(self, entities: List[_TargetEntity]) -> List[LoadingSumm
class _ListOfPydanticModels(BaseModel, Generic[_PydanticTargetModel]):
# https://stackoverflow.com/a/58641115/10009545
# for the instantiation see the serialization unit test
__root__: List[_PydanticTargetModel]
__root__: list[_PydanticTargetModel]


class PydanticJsonFileEntityLoader(JsonFileEntityLoader[_PydanticTargetModel], Generic[_PydanticTargetModel]):
Expand Down
6 changes: 3 additions & 3 deletions src/bomf/mapper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""
import asyncio
from abc import ABC, abstractmethod
from typing import Generic, List, TypeVar
from typing import Generic, TypeVar

from bomf.model import Bo4eDataSet

Expand All @@ -29,7 +29,7 @@ class SourceToBo4eDataSetMapper(ABC, Generic[IntermediateDataSet]):
# the only thing it has to provide is a method to create_data_sets (in bo4e).
# we don't care from where it gets them in the first place

async def create_data_sets(self) -> List[IntermediateDataSet]:
async def create_data_sets(self) -> list[IntermediateDataSet]:
"""
apply the mapping to all the provided source data sets.
Expand All @@ -49,7 +49,7 @@ async def create_target_model(self, dataset: IntermediateDataSet) -> TargetDataM
maps the given source data model into an intermediate data set
"""

async def create_target_models(self, datasets: List[IntermediateDataSet]) -> List[TargetDataModel]:
async def create_target_models(self, datasets: list[IntermediateDataSet]) -> list[TargetDataModel]:
"""
apply the mapping to all the provided dataset
"""
Expand Down
16 changes: 8 additions & 8 deletions src/bomf/provider/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from abc import ABC, abstractmethod
from itertools import groupby
from pathlib import Path
from typing import Callable, Generic, List, Mapping, Optional, Protocol, TypeVar, Union
from typing import Callable, Generic, Mapping, TypeVar, Union

SourceDataModel = TypeVar("SourceDataModel")
"""
Expand All @@ -27,7 +27,7 @@ class SourceDataProvider(ABC, Generic[SourceDataModel, KeyTyp]):
"""

@abstractmethod
async def get_data(self) -> List[SourceDataModel]:
async def get_data(self) -> list[SourceDataModel]:
"""
Returns all available entities from the source data model.
They will be filtered in a SourceDataModel Filter ("Preselect")
Expand All @@ -46,11 +46,11 @@ class ListBasedSourceDataProvider(SourceDataProvider[SourceDataModel, KeyTyp]):
A source data provider that is instantiated with a list of source data models
"""

def __init__(self, source_data_models: List[SourceDataModel], key_selector: Callable[[SourceDataModel], KeyTyp]):
def __init__(self, source_data_models: list[SourceDataModel], key_selector: Callable[[SourceDataModel], KeyTyp]):
"""
instantiate it by providing a list of source data models
"""
self._models: List[SourceDataModel] = source_data_models
self._models: list[SourceDataModel] = source_data_models
logger = logging.getLogger(self.__module__)
for key, key_models in groupby(source_data_models, key=key_selector):
affected_entries_count = len(list(key_models))
Expand All @@ -69,7 +69,7 @@ def __init__(self, source_data_models: List[SourceDataModel], key_selector: Call
async def get_entry(self, key: KeyTyp) -> SourceDataModel:
return self._models_dict[key]

async def get_data(self) -> List[SourceDataModel]:
async def get_data(self) -> list[SourceDataModel]:
return self._models


Expand All @@ -81,7 +81,7 @@ class JsonFileSourceDataProvider(SourceDataProvider[SourceDataModel, KeyTyp], Ge
def __init__(
self,
json_file_path: Path,
data_selector: Callable[[Union[dict, list]], List[SourceDataModel]],
data_selector: Callable[[Union[dict, list]], list[SourceDataModel]],
key_selector: Callable[[SourceDataModel], KeyTyp],
encoding="utf-8",
):
Expand All @@ -91,13 +91,13 @@ def __init__(
"""
with open(json_file_path, "r", encoding=encoding) as json_file:
raw_data = json.load(json_file)
self._source_data_models: List[SourceDataModel] = data_selector(raw_data)
self._source_data_models: list[SourceDataModel] = data_selector(raw_data)
self._key_to_data_model_mapping: Mapping[KeyTyp, SourceDataModel] = {
key_selector(sdm): sdm for sdm in self._source_data_models
}
self.key_selector = key_selector

async def get_data(self) -> List[SourceDataModel]:
async def get_data(self) -> list[SourceDataModel]:
return self._source_data_models

async def get_entry(self, key: KeyTyp) -> SourceDataModel:
Expand Down
1 change: 0 additions & 1 deletion src/bomf/validation/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,3 @@
)
from bomf.validation.core.utils import optional_field, required_field
from bomf.validation.core.validator import MappedValidator, Parameter, Parameters, Validator
from bomf.validation.path_map import PathMappedValidator
17 changes: 8 additions & 9 deletions unittests/test_filter.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import dataclasses
import logging
from itertools import groupby
from typing import List

import pytest # type:ignore[import]

Expand All @@ -26,7 +25,7 @@ class TestFilter:
),
],
)
async def test_filter(self, filter_under_test: Filter, candidates: List[dict], survivors: List[dict], caplog):
async def test_filter(self, filter_under_test: Filter, candidates: list[dict], survivors: list[dict], caplog):
caplog.set_level(logging.DEBUG, logger=self.__module__)
actual = await filter_under_test.apply(candidates)
assert actual == survivors
Expand Down Expand Up @@ -61,8 +60,8 @@ async def predicate(self, candidate: _MyAggregate) -> bool:
base_filter = _BaseFilter()
super(_BarFilter, self).__init__(base_filter)

async def aggregate(self, candidates: List[_MyCandidate]) -> List[_MyAggregate]:
result: List[_MyAggregate] = []
async def aggregate(self, candidates: list[_MyCandidate]) -> list[_MyAggregate]:
result: list[_MyAggregate] = []
for group_key, group in groupby(sorted(candidates, key=lambda c: c.string), lambda c: c.string):
group_items = list(group)
max_number_in_group = max(group_item.number for group_item in group_items)
Expand Down Expand Up @@ -93,7 +92,7 @@ class TestAggregateFilter:
],
)
async def test_aggregate_filter(
self, filter_under_test: AggregateFilter, candidates: List[dict], survivors: List[dict], caplog
self, filter_under_test: AggregateFilter, candidates: list[dict], survivors: list[dict], caplog
):
caplog.set_level(logging.DEBUG, logger=self.__module__)
actual = await filter_under_test.apply(candidates)
Expand All @@ -105,14 +104,14 @@ async def test_aggregate_filter(
class TestBlockAndAllowlistFilter:
async def test_allowlist_filter(self):
allowlist = {"A", "B", "C"}
candidates: List[dict[str, str]] = [{"foo": "A"}, {"foo": "B"}, {"foo": "Z"}]
candidates: list[dict[str, str]] = [{"foo": "A"}, {"foo": "B"}, {"foo": "Z"}]
allowlist_filter: AllowlistFilter[dict[str, str], str] = AllowlistFilter(lambda c: c["foo"], allowlist)
actual = await allowlist_filter.apply(candidates)
assert actual == [{"foo": "A"}, {"foo": "B"}]

async def test_blocklist_filter(self):
blocklist = {"A", "B", "C"}
candidates: List[dict[str, str]] = [{"foo": "A"}, {"foo": "B"}, {"foo": "Z"}]
candidates: list[dict[str, str]] = [{"foo": "A"}, {"foo": "B"}, {"foo": "Z"}]
blocklist_filter: BlocklistFilter[dict[str, str], str] = BlocklistFilter(lambda c: c["foo"], blocklist)
actual = await blocklist_filter.apply(candidates)
assert actual == [{"foo": "Z"}]
Expand All @@ -137,8 +136,8 @@ class TestSourceDataProviderFilter:
async def test_source_data_provider_filter(
self,
candidate_filter: Filter[_MyCandidate],
candidates: List[_MyCandidate],
survivors: List[_MyCandidate],
candidates: list[_MyCandidate],
survivors: list[_MyCandidate],
caplog,
):
my_provider: ListBasedSourceDataProvider[_MyCandidate, int] = ListBasedSourceDataProvider(
Expand Down
6 changes: 3 additions & 3 deletions unittests/test_mapper.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List, Optional, Type
from typing import Optional, Type

import attrs
import pytest # type:ignore[import]
Expand Down Expand Up @@ -38,7 +38,7 @@ def get_business_object(self, bo_type: Type[Bo4eTyp], specification: Optional[st


class _DictToMaLoMeLoMapper(SourceToBo4eDataSetMapper):
async def create_data_sets(self) -> List[_MaLoAndMeLo]:
async def create_data_sets(self) -> list[_MaLoAndMeLo]:
return [
_MaLoAndMeLo(
melo=Messlokation.construct(messlokations_id=source["meloId"]),
Expand All @@ -49,7 +49,7 @@ async def create_data_sets(self) -> List[_MaLoAndMeLo]:


class _MaLoMeLoToListMapper(Bo4eDataSetToTargetMapper):
async def create_target_model(self, dataset: _MaLoAndMeLo) -> List[str]:
async def create_target_model(self, dataset: _MaLoAndMeLo) -> list[str]:
return [
dataset.get_business_object(Marktlokation).marktlokations_id,
dataset.get_business_object(Messlokation).messlokations_id,
Expand Down
17 changes: 7 additions & 10 deletions unittests/test_migration.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
"""
Tests the overall data flow using bomf.
"""
import asyncio
from typing import Dict, List, Optional

import attrs
from typing import Optional

from bomf import (
Bo4eDataSetToTargetMapper,
Expand All @@ -22,13 +19,13 @@
from bomf.validation.core import SyncValidatorFunction
from bomf.validation.path_map import PathMappedValidator

_MySourceDataModel = Dict[str, str]
_MySourceDataModel = dict[str, str]
_MyKeyTyp = str
_MyTargetDataModel = List[str]
_MyTargetDataModel = list[str]


class _MyIntermediateDataModel(Bo4eDataSet):
data: Dict[str, str]
data: dict[str, str]

def get_id(self) -> str:
return "12345"
Expand All @@ -38,7 +35,7 @@ class _MySourceDataProvider(SourceDataProvider[_MySourceDataModel, _MyKeyTyp]):
async def get_entry(self, key: KeyTyp) -> _MySourceDataModel:
raise NotImplementedError("Not relevant for the test")

async def get_data(self) -> List[_MySourceDataModel]:
async def get_data(self) -> list[_MySourceDataModel]:
return [
{"foo": "bar"},
{"FOO": "BAR"},
Expand All @@ -54,11 +51,11 @@ async def predicate(self, candidate: _MySourceDataModel) -> bool:


class _MyToBo4eMapper(SourceToBo4eDataSetMapper[_MyIntermediateDataModel]):
def __init__(self, what_ever_you_like: List[_MySourceDataModel]):
def __init__(self, what_ever_you_like: list[_MySourceDataModel]):
# what_ever_you_like is a place holde for all the relation magic that may happen
self._source_models = what_ever_you_like

async def create_data_sets(self) -> List[_MyIntermediateDataModel]:
async def create_data_sets(self) -> list[_MyIntermediateDataModel]:
return [_MyIntermediateDataModel(data=source) for source in self._source_models]


Expand Down
3 changes: 1 addition & 2 deletions unittests/test_source_data_provider.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import logging
from pathlib import Path
from typing import List

import pytest # type:ignore[import]

Expand All @@ -15,7 +14,7 @@ class LegacyDataSystemDataProvider(SourceDataProvider):
async def get_entry(self, key: KeyTyp) -> str:
raise NotImplementedError("Not relevant for this test")

async def get_data(self) -> List[str]:
async def get_data(self) -> list[str]:
return ["foo", "bar", "baz"]


Expand Down
Loading

0 comments on commit e1a6740

Please sign in to comment.