diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 0000000..b302db8 --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,38 @@ +name: Release + +on: + release: + types: [ released ] + +env: + MODULE_NAME: deep-pipe + +jobs: + release: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v2 + - name: Set up Python 3.9 + uses: actions/setup-python@v2 + with: + python-version: 3.9 + + - id: get_version + name: Get the release version + uses: battila7/get-version-action@v2 + + - name: Check the version and build the package + run: | + RELEASE=${{ steps.get_version.outputs.version-without-v }} + VERSION=$(python -c "from pathlib import Path; import runpy; folder, = {d.parent for d in Path().resolve().glob('*/__init__.py')}; print(runpy.run_path(folder / '__init__.py')['__version__'])") + MATCH=$(pip index versions $MODULE_NAME | grep "Available versions:" | grep $VERSION) || echo + echo $MATCH + if [ "$GITHUB_BASE_REF" = "master" ] && [ "$MATCH" != "" ]; then echo "Version $VERSION already present" && exit 1; fi + if [ "$VERSION" != "$RELEASE" ]; then echo "$VERSION vs $RELEASE" && exit 1; fi + python setup.py sdist + + - name: Publish to PyPi + uses: pypa/gh-action-pypi-publish@master + with: + password: ${{ secrets.PYPI_API_TOKEN }} diff --git a/dpipe/__init__.py b/dpipe/__init__.py index df9144c..10939f0 100644 --- a/dpipe/__init__.py +++ b/dpipe/__init__.py @@ -1 +1 @@ -__version__ = '0.1.1' +__version__ = '0.1.2' diff --git a/dpipe/commands.py b/dpipe/commands.py index 961bc8e..9fca475 100644 --- a/dpipe/commands.py +++ b/dpipe/commands.py @@ -10,7 +10,6 @@ from tqdm import tqdm from .io import save_json, PathLike, load as _load, save as _save -from dpipe.itertools import collect def populate(path: PathLike, func: Callable, *args, **kwargs): @@ -78,7 +77,6 @@ def transform(input_path, output_path, transform_fn): np.save(os.path.join(output_path, f), transform_fn(np.load(os.path.join(input_path, f)))) -@collect def load_from_folder(path: PathLike, loader=_load, ext='.npy'): """Yields (id, object) pairs loaded from ``path``.""" for file in sorted(Path(path).iterdir()): diff --git a/dpipe/im/grid.py b/dpipe/im/grid.py index 31d9999..c0bb349 100644 --- a/dpipe/im/grid.py +++ b/dpipe/im/grid.py @@ -2,7 +2,7 @@ Function for working with patches from tensors. See the :doc:`tutorials/patches` tutorial for more details. """ -from typing import Iterable, Type, Tuple +from typing import Iterable, Type, Tuple, Callable import numpy as np @@ -49,7 +49,7 @@ def get_boxes(shape: AxesLike, box_size: AxesLike, stride: AxesLike, axis: AxesL def divide(x: np.ndarray, patch_size: AxesLike, stride: AxesLike, axis: AxesLike = None, - valid: bool = False) -> Iterable[np.ndarray]: + valid: bool = False, get_boxes: Callable = get_boxes) -> Iterable[np.ndarray]: """ A convolution-like approach to generating patches from a tensor. @@ -63,6 +63,8 @@ def divide(x: np.ndarray, patch_size: AxesLike, stride: AxesLike, axis: AxesLike the stride (step-size) of the slice. valid whether patches of size smaller than ``patch_size`` should be left out. + get_boxes + function that yields boxes, for signature see ``get_boxes`` References ---------- @@ -101,7 +103,8 @@ def build(self): def combine(patches: Iterable[np.ndarray], output_shape: AxesLike, stride: AxesLike, - axis: AxesLike = None, valid: bool = False, combiner: Type[PatchCombiner] = Average) -> np.ndarray: + axis: AxesLike = None, valid: bool = False, + combiner: Type[PatchCombiner] = Average, get_boxes: Callable = get_boxes) -> np.ndarray: """ Build a tensor of shape ``output_shape`` from ``patches`` obtained in a convolution-like approach with corresponding parameters. diff --git a/dpipe/im/metrics.py b/dpipe/im/metrics.py index 31f3df0..e253949 100644 --- a/dpipe/im/metrics.py +++ b/dpipe/im/metrics.py @@ -52,7 +52,9 @@ def sensitivity(y_true, y_pred): @add_check_bool @add_check_shapes def specificity(y_true, y_pred): - return fraction(np.sum(y_pred & y_true), np.sum(y_pred), empty_val=0) + tn = np.sum((~y_true) & (~y_pred)) + fp = np.sum(y_pred & (~y_true)) + return fraction(tn, tn + fp, empty_val=0) @add_check_bool diff --git a/dpipe/layout/scripts.py b/dpipe/layout/scripts.py index a3cf3f0..8bf52b1 100644 --- a/dpipe/layout/scripts.py +++ b/dpipe/layout/scripts.py @@ -1,7 +1,7 @@ import argparse import lazycon - +from pathlib import Path def build(): parser = argparse.ArgumentParser('Build an experiment layout from the provided config.', add_help=False) @@ -20,10 +20,12 @@ def run(): parser = argparse.ArgumentParser('Run an experiment based on the provided config.', add_help=False) parser.add_argument('config') args = parser.parse_known_args()[0] + config_path = Path(args.config).absolute() - layout = lazycon.load(args.config).layout + layout = lazycon.load(config_path).layout layout.run_parser(parser) parser.add_argument('-h', '--help', action='help', default=argparse.SUPPRESS, help='Show this message and exit') args = parser.parse_args() - layout.run(**vars(args)) + folds = args.folds + layout.run(config=config_path, folds=folds) diff --git a/dpipe/predict/shape.py b/dpipe/predict/shape.py index b0e89a9..5e2d4bd 100644 --- a/dpipe/predict/shape.py +++ b/dpipe/predict/shape.py @@ -4,7 +4,7 @@ import numpy as np from ..im.axes import broadcast_to_axis, AxesLike, AxesParams, axis_from_dim, resolve_deprecation -from ..im.grid import divide, combine, PatchCombiner, Average +from ..im.grid import divide, combine, get_boxes, PatchCombiner, Average from ..itertools import extract, pmap from ..im.shape_ops import pad_to_shape, crop_to_shape, pad_to_divisible from ..im.shape_utils import prepend_dims, extract_dims @@ -81,7 +81,7 @@ def wrapper(x, *args, **kwargs): def patches_grid(patch_size: AxesLike, stride: AxesLike, axis: AxesLike = None, padding_values: Union[AxesParams, Callable] = 0, ratio: AxesParams = 0.5, - combiner: Type[PatchCombiner] = Average): + combiner: Type[PatchCombiner] = Average, get_boxes: Callable = get_boxes): """ Divide an incoming array into patches of corresponding ``patch_size`` and ``stride`` and then combine the predicted patches by aggregating the overlapping regions using the ``combiner`` - Average by default. @@ -107,8 +107,15 @@ def wrapper(x, *args, **kwargs): new_shape = padded_shape + (local_stride - padded_shape + local_size) % local_stride x = pad_to_shape(x, new_shape, input_axis, padding_values, ratio) - patches = pmap(predict, divide(x, local_size, local_stride, input_axis), *args, **kwargs) - prediction = combine(patches, extract(x.shape, input_axis), local_stride, axis, combiner=combiner) + patches = pmap( + predict, + divide(x, local_size, local_stride, input_axis, get_boxes=get_boxes), + *args, **kwargs + ) + prediction = combine( + patches, extract(x.shape, input_axis), local_stride, axis, + combiner=combiner, get_boxes=get_boxes + ) if valid: prediction = crop_to_shape(prediction, shape, axis, ratio) diff --git a/dpipe/py.typed b/dpipe/py.typed new file mode 100644 index 0000000..9b5e79e --- /dev/null +++ b/dpipe/py.typed @@ -0,0 +1,4 @@ +This marker file declares that the package supports type checking. +For details, you can refer to: +- PEP561: https://www.python.org/dev/peps/pep-0561/ +- mypy docs: https://mypy.readthedocs.io/en/stable/installed_packages.html diff --git a/dpipe/tests/test_gradient_accumulation.py b/dpipe/tests/test_gradient_accumulation.py new file mode 100644 index 0000000..63de95f --- /dev/null +++ b/dpipe/tests/test_gradient_accumulation.py @@ -0,0 +1,76 @@ +import pytest + +import torch +import numpy as np +from torch import nn +from dpipe.torch import train_step +from dpipe.train import train + + +@pytest.mark.parametrize('batch_size', [4, 16, 64]) +def test_train(batch_size): + net1 = nn.Sequential( + nn.Conv2d(3, 4, kernel_size=3, padding=1), + nn.LayerNorm([28, 28]), + nn.GELU(), + nn.Conv2d(4, 8, kernel_size=3, padding=1), + nn.LayerNorm([28, 28]), + nn.GELU(), + nn.Conv2d(8, 16, kernel_size=3, padding=1), + nn.LayerNorm([28, 28]), + nn.GELU(), + ) + net2 = nn.Sequential( + nn.Conv2d(3, 4, kernel_size=3, padding=1), + nn.LayerNorm([28, 28]), + nn.GELU(), + nn.Conv2d(4, 8, kernel_size=3, padding=1), + nn.LayerNorm([28, 28]), + nn.GELU(), + nn.Conv2d(8, 16, kernel_size=3, padding=1), + nn.LayerNorm([28, 28]), + nn.GELU(), + ) + net2.load_state_dict(net1.state_dict()) + + opt1 = torch.optim.SGD(net1.parameters(), lr=3e-4) + opt2 = torch.optim.SGD(net2.parameters(), lr=3e-4) + + n_epochs = 10 + n_batches = 10 + + data = np.random.randn(n_batches, batch_size, 3, 28, 28).astype(np.float32) + + def batch_iter1(): + for x in data: + yield x, 0 + + def batch_iter2(): + for x in data: + for batch_el in x: + yield batch_el[None], 0 + + def criterion(x, y): + return x.mean() + + train( + train_step, + batch_iter1, + n_epochs=n_epochs, + architecture=net1, + optimizer=opt1, + criterion=criterion, + ) + + train( + train_step, + batch_iter2, + n_epochs=n_epochs, + architecture=net2, + optimizer=opt2, + criterion=criterion, + gradient_accumulation_steps=batch_size, + ) + + for param1, param2 in zip(net1.parameters(), net2.parameters()): + assert torch.allclose(param1, param2) diff --git a/dpipe/torch/functional.py b/dpipe/torch/functional.py index fad7fc0..359ea6d 100644 --- a/dpipe/torch/functional.py +++ b/dpipe/torch/functional.py @@ -1,3 +1,4 @@ +import warnings from typing import Union, Callable import numpy as np @@ -8,6 +9,8 @@ __all__ = [ 'focal_loss_with_logits', 'linear_focal_loss_with_logits', 'weighted_cross_entropy_with_logits', + 'tversky_loss', 'focal_tversky_loss', 'tversky_loss_with_logits', 'focal_tversky_loss_with_logits', + 'dice_loss', 'dice_loss_with_logits', 'masked_loss', 'moveaxis', 'softmax', ] @@ -141,7 +144,7 @@ def weighted_cross_entropy_with_logits(logit: torch.Tensor, target: torch.Tensor return loss -def dice_loss(pred: torch.Tensor, target: torch.Tensor): +def dice_loss(pred: torch.Tensor, target: torch.Tensor, epsilon=1e-7): """ References ---------- @@ -152,22 +155,72 @@ def dice_loss(pred: torch.Tensor, target: torch.Tensor): sum_dims = list(range(1, target.dim())) - dice = 2 * torch.sum(pred * target, dim=sum_dims) / torch.sum(pred ** 2 + target ** 2, dim=sum_dims) + dice = 2 * torch.sum(pred * target, dim=sum_dims) / (torch.sum(pred ** 2 + target ** 2, dim=sum_dims) + epsilon) loss = 1 - dice return loss.mean() -def dice_loss_with_logits(logit: torch.Tensor, target: torch.Tensor): +def tversky_loss(pred: torch.Tensor, target: torch.Tensor, alpha=0.5, epsilon=1e-7, + reduce: Union[Callable, None] = torch.mean): + """ + References + ---------- + `Tversky Loss https://arxiv.org/abs/1706.05721`_ + """ + if not (target.size() == pred.size()): + raise ValueError("Target size ({}) must be the same as logit size ({})".format(target.size(), pred.size())) + + if alpha < 0 or alpha > 1: + raise ValueError("Invalid alpha value, expected to be in (0, 1) interval") + + sum_dims = list(range(1, target.dim())) + beta = 1 - alpha + + intersection = pred*target + fps, fns = pred*(1-target), (1-pred)*target + + numerator = torch.sum(intersection, dim=sum_dims) + denumenator = torch.sum(intersection, dim=sum_dims) + alpha*torch.sum(fps, dim=sum_dims) + beta*torch.sum(fns, dim=sum_dims) + tversky = numerator / (denumenator + epsilon) + loss = 1 - tversky + + if reduce is not None: + loss = reduce(loss) + return loss + + +def focal_tversky_loss(pred: torch.Tensor, target: torch.Tensor, gamma=4/3, alpha=0.5, epsilon=1e-7): + """ + References + ---------- + `Focal Tversky Loss https://arxiv.org/abs/1810.07842`_ """ - References - ---------- - `Dice Loss `_ - """ + if gamma <= 1: + warnings.warn("Gamma is <=1, to focus on less accurate predictions choose gamma > 1.") + tl = tversky_loss(pred, target, alpha, epsilon, reduce=None) + + return torch.pow(tl, 1/gamma).mean() + + +def loss_with_logits(criterion: Callable, logit: torch.Tensor, target: torch.Tensor, **kwargs): if not (target.size() == logit.size()): raise ValueError("Target size ({}) must be the same as logit size ({})".format(target.size(), logit.size())) pred = torch.sigmoid(logit) - return dice_loss(pred, target) + + return criterion(pred, target, **kwargs) + + +def dice_loss_with_logits(logit: torch.Tensor, target: torch.Tensor): + return loss_with_logits(dice_loss, logit, target) + + +def tversky_loss_with_logits(logit: torch.Tensor, target: torch.Tensor, alpha=0.5): + return loss_with_logits(tversky_loss, logit, target, alpha=alpha) + + +def focal_tversky_loss_with_logits(logit: torch.Tensor, target: torch.Tensor, gamma, alpha=0.5): + return loss_with_logits(focal_tversky_loss, logit, target, gamma=gamma, alpha=alpha) def masked_loss(mask: torch.Tensor, criterion: Callable, prediction: torch.Tensor, target: torch.Tensor, **kwargs): diff --git a/dpipe/torch/model.py b/dpipe/torch/model.py index 6c8f627..9642fbd 100644 --- a/dpipe/torch/model.py +++ b/dpipe/torch/model.py @@ -1,4 +1,4 @@ -from typing import Callable, Union, Sequence +from typing import Callable, Optional, Union, Sequence import numpy as np import torch @@ -12,13 +12,20 @@ __all__ = 'optimizer_step', 'train_step', 'inference_step', 'multi_inference_step' -def optimizer_step(optimizer: Optimizer, loss: torch.Tensor, scaler: torch.cuda.amp.GradScaler = None, - clip_grad: float = None, **params) -> torch.Tensor: +def optimizer_step( + optimizer: Optimizer, + loss: torch.Tensor, + scaler: Optional[torch.cuda.amp.GradScaler] = None, + clip_grad: Optional[float] = None, + accumulate: bool = False, + **params, +) -> torch.Tensor: """ - Performs the backward pass with respect to ``loss``, as well as a gradient step. - If a ``scaler`` is passed - it is used to perform the gradient step (automatic mixed precission support). - If a ``clip_grad`` is passed - gradient will be clipped by this value considered as maximum l2 norm + Performs the backward pass with respect to ``loss``, as well as a gradient step or gradient accumlation. + If a ``scaler`` is passed - it is used to perform the gradient step (automatic mixed precision support). + If a ``clip_grad`` is passed - gradient will be clipped by this value considered as maximum l2 norm. + ``accumulate`` indicates whether to perform gradient step or just accumulate gradients. ``params`` is used to change the optimizer's parameters. Examples @@ -27,39 +34,58 @@ def optimizer_step(optimizer: Optimizer, loss: torch.Tensor, scaler: torch.cuda. >>> optimizer_step(optimizer, loss) # perform a gradient step >>> optimizer_step(optimizer, loss, lr=1e-3) # set lr to 1e-3 and perform a gradient step >>> optimizer_step(optimizer, loss, betas=(0, 0)) # set betas to 0 and perform a gradient step + >>> optimizer_step(optimizer, loss, accumulate=True) # perform a gradient accumulation Notes ----- The incoming ``optimizer``'s parameters are not restored to their original values. """ set_params(optimizer, **params) - optimizer.zero_grad() + if scaler is not None: # autocast is not recommended during backward with torch.cuda.amp.autocast(False): scaler.scale(loss).backward() - if clip_grad is not None: - scaler.unscale_(optimizer) - clip_grad_norm_(get_parameters(optimizer), clip_grad) + if not accumulate: + if clip_grad is not None: + scaler.unscale_(optimizer) + assert not isinstance(clip_grad, bool), "Use of boolean clip_grad value (e.g. False) can lead to " \ + "unexpected behaviour. " + clip_grad_norm_(get_parameters(optimizer), clip_grad) - scaler.step(optimizer) - scaler.update() + scaler.step(optimizer) + scaler.update() else: loss.backward() - if clip_grad is not None: - clip_grad_norm_(get_parameters(optimizer), clip_grad) + if not accumulate: + if clip_grad is not None: + clip_grad_norm_(get_parameters(optimizer), clip_grad) + + optimizer.step() - optimizer.step() + if not accumulate: + optimizer.zero_grad(set_to_none=True) return loss -def train_step(*inputs: np.ndarray, architecture: Module, criterion: Callable, optimizer: Optimizer, n_targets: int = 1, - loss_key: str = None, scaler: torch.cuda.amp.GradScaler = None, clip_grad: float = None, **optimizer_params) -> np.ndarray: +def train_step( + *inputs: np.ndarray, + architecture: Module, + criterion: Callable, + optimizer: Optimizer, + n_targets: int = 1, + loss_key: Optional[str] = None, + scaler: Optional[torch.cuda.amp.GradScaler] = None, + clip_grad: Optional[float] = None, + accumulate: bool = False, + gradient_accumulation_steps: int = 1, + **optimizer_params, +) -> np.ndarray: """ - Performs a forward-backward pass, and make a gradient step, according to the given ``inputs``. + Performs a forward-backward pass, and make a gradient step or accumulation, according to the given ``inputs``. Parameters ---------- @@ -77,12 +103,15 @@ def train_step(*inputs: np.ndarray, architecture: Module, criterion: Callable, o loss_key in case ``criterion`` returns a dictionary of scalars, indicates which key should be used for gradient computation. - optimizer_params - additional parameters that will override the optimizer's current parameters (e.g. lr). scaler a gradient scaler used to operate in automatic mixed precision mode. clip_grad - maximum l2 norm of the gradient to clip it by + maximum l2 norm of the gradient to clip it by. + accumulate + whether to accumulate gradients or perform optimizer step. + gradient_accumulation_steps + optimizer_params + additional parameters that will override the optimizer's current parameters (e.g. lr). Notes ----- @@ -107,10 +136,26 @@ def train_step(*inputs: np.ndarray, architecture: Module, criterion: Callable, o loss = criterion(architecture(*inputs), *targets) if loss_key is not None: - optimizer_step(optimizer, loss[loss_key], scaler=scaler, clip_grad=clip_grad, **optimizer_params) + optimizer_step( + optimizer, + loss[loss_key] / gradient_accumulation_steps, + scaler=scaler, + clip_grad=clip_grad, + accumulate=accumulate, + **optimizer_params, + ) + return dmap(to_np, loss) - optimizer_step(optimizer, loss, scaler=scaler, clip_grad=clip_grad, **optimizer_params) + optimizer_step( + optimizer, + loss / gradient_accumulation_steps, + scaler=scaler, + clip_grad=clip_grad, + accumulate=accumulate, + **optimizer_params, + ) + return to_np(loss) diff --git a/dpipe/torch/utils.py b/dpipe/torch/utils.py index f189c1a..ea2891a 100644 --- a/dpipe/torch/utils.py +++ b/dpipe/torch/utils.py @@ -4,8 +4,9 @@ import numpy as np import torch from torch import nn -from torch.nn.parameter import Parameter from torch.optim import Optimizer +from torch.nn.parameter import Parameter +from torch.nn.modules.batchnorm import _BatchNorm from dpipe.io import PathLike from dpipe.itertools import squeeze_first, collect @@ -14,7 +15,7 @@ 'load_model_state', 'save_model_state', 'get_device', 'to_device', 'is_on_cuda', 'to_cuda', 'to_var', 'sequence_to_var', 'to_np', 'sequence_to_np', - 'set_params', 'set_lr', 'get_parameters', + 'set_params', 'set_lr', 'get_parameters', 'has_batchnorm', 'order_to_mode', ] @@ -22,25 +23,26 @@ ArrayLike = Union[np.ndarray, Iterable, int, float] -def load_model_state(module: nn.Module, path: PathLike, modify_state_fn: Callable = None) -> nn.Module: +def load_model_state(module: nn.Module, path: PathLike, modify_state_fn: Callable = None, strict: bool = True): """ Updates the ``module``'s state dict by the one located at ``path``. Parameters ---------- - module - path - modify_state_fn: Callable(current_state, loaded_state) + module: nn.Module + path: PathLike + modify_state_fn: Callable(current_state, state_to_load) if not ``None``, two arguments will be passed to the function: current state of the model and the state loaded from the path. This function should modify states as needed and return the final state to load. For example, it could help you to transfer weights from similar but not completely equal architecture. + strict: bool """ state_to_load = torch.load(path, map_location=get_device(module)) if modify_state_fn is not None: current_state = module.state_dict() state_to_load = modify_state_fn(current_state, state_to_load) - module.load_state_dict(state_to_load) + module.load_state_dict(state_to_load, strict=strict) def save_model_state(module: nn.Module, path: PathLike): @@ -216,3 +218,12 @@ def get_parameters(optimizer: Optimizer) -> Iterator[Parameter]: for group in optimizer.param_groups: for param in group['params']: yield param + + +def has_batchnorm(architecture: nn.Module) -> bool: + """Check whether ``architecture`` has BatchNorm module""" + for module in architecture.modules(): + if isinstance(module, _BatchNorm): + return True + + return False diff --git a/dpipe/train/base.py b/dpipe/train/base.py index 4bd3250..aad1d28 100644 --- a/dpipe/train/base.py +++ b/dpipe/train/base.py @@ -1,11 +1,14 @@ import contextlib -from typing import Callable +from typing import Callable, Optional +from warnings import warn import numpy as np from .checkpoint import Checkpoints from .policy import Policy, ValuePolicy, EarlyStopping from .logging import Logger +from ..torch.utils import has_batchnorm + __all__ = 'train', @@ -32,8 +35,16 @@ def _build_context_manager(o): yield o -def train(train_step: Callable, batch_iter: Callable, n_epochs: int = np.inf, logger: Logger = None, - checkpoints: Checkpoints = None, validate: Callable = None, **kwargs): +def train( + train_step: Callable, + batch_iter: Callable, + n_epochs: int = np.inf, + logger: Optional[Logger] = None, + checkpoints: Optional[Checkpoints] = None, + validate: Optional[Callable] = None, + gradient_accumulation_steps: int = 1, + **kwargs, +) -> None: """ Performs a series of train and validation steps. @@ -49,6 +60,7 @@ def train(train_step: Callable, batch_iter: Callable, n_epochs: int = np.inf, lo checkpoints: Checkpoints, None, optional validate: Callable, None, optional a function to calculate metrics on the validation set. + gradient_accumulation_steps: int kwargs additional keyword arguments passed to ``train_step``. For instances of `ValuePolicy` their `value` attribute is passed. @@ -77,6 +89,14 @@ def broadcast_event(method, *args, **kw): scalars = {name: value for name, value in kwargs.items() if not isinstance(value, Policy)} policies = {name: value for name, value in kwargs.items() if isinstance(value, Policy)} + assert isinstance(gradient_accumulation_steps, int) + + if gradient_accumulation_steps > 1 and has_batchnorm(kwargs['architecture']): + warn( + "Be careful! Implemented gradient accumulation is naive and doesn't take into account specifity of " + "BatchNorm you are using." + ) + with batch_iter as iterator: try: while epoch < n_epochs: @@ -85,7 +105,15 @@ def broadcast_event(method, *args, **kw): train_losses = [] for idx, inputs in enumerate(iterator()): broadcast_event(Policy.train_step_started, epoch, idx) - train_losses.append(train_step(*inputs, **scalars, **get_policy_values())) + train_losses.append( + train_step( + *inputs, + accumulate=(idx + 1) % gradient_accumulation_steps != 0, + gradient_accumulation_steps=gradient_accumulation_steps, + **scalars, + **get_policy_values(), + ) + ) broadcast_event(Policy.train_step_finished, epoch, idx, train_losses[-1]) logger.train(train_losses, epoch) @@ -99,6 +127,7 @@ def broadcast_event(method, *args, **kw): broadcast_event(Policy.epoch_finished, epoch, train_losses, metrics=metrics, policies=get_policy_values()) + checkpoints.save(epoch, train_losses, metrics) epoch += 1 diff --git a/dpipe/train/checkpoint.py b/dpipe/train/checkpoint.py index 1940f0b..cbef846 100644 --- a/dpipe/train/checkpoint.py +++ b/dpipe/train/checkpoint.py @@ -1,18 +1,16 @@ -import shutil import pickle +import shutil from pathlib import Path -from typing import Dict, Any, Union, Iterable, Sequence +from typing import Any, Dict, Iterable, Sequence, Union -import numpy as np import torch - -from dpipe.io import PathLike from dpipe.im.utils import composition +from dpipe.io import PathLike __all__ = 'Checkpoints', 'CheckpointManager' -def save_pickle(o, path): +def save_pickle(o, path: PathLike): if hasattr(o, '__getstate__'): state = o.__getstate__() else: @@ -22,7 +20,7 @@ def save_pickle(o, path): pickle.dump(state, file) -def load_pickle(o, path): +def load_pickle(o, path: PathLike): with open(path, 'rb') as file: state = pickle.load(file) @@ -33,11 +31,11 @@ def load_pickle(o, path): setattr(o, key, value) -def save_torch(o, path): +def save_torch(o, path: PathLike): torch.save(o.state_dict(), path) -def load_torch(o, path): +def load_torch(o, path: PathLike): o.load_state_dict(torch.load(path)) @@ -57,13 +55,13 @@ class Checkpoints: By default only the latest checkpoint is saved. """ - def __init__(self, base_path: PathLike, objects: Union[Iterable, Dict[PathLike, Any]], frequency: int = np.inf): + def __init__(self, base_path: PathLike, objects: Union[Iterable, Dict[PathLike, Any]], frequency: int = None): self.base_path: Path = Path(base_path) self._checkpoint_prefix = 'checkpoint_' if not isinstance(objects, dict): objects = self._generate_unique_names(objects) self.objects = objects or {} - self.frequency = frequency + self.frequency = frequency or float('inf') @staticmethod @composition(dict) @@ -82,10 +80,10 @@ def _generate_unique_names(objects): names.add(name) yield name, o - def _get_checkpoint_folder(self, iteration): + def _get_checkpoint_folder(self, iteration: int): return self.base_path / f'{self._checkpoint_prefix}{iteration}' - def _clear_checkpoint(self, iteration): + def _clear_checkpoint(self, iteration: int): if (iteration + 1) % self.frequency != 0: shutil.rmtree(self._get_checkpoint_folder(iteration)) @@ -101,7 +99,7 @@ def _dispatch_loader(o): return load_torch return load_pickle - def _save_to(self, folder): + def _save_to(self, folder: Path): for path, o in self.objects.items(): save = self._dispatch_saver(o) save(o, folder / path) @@ -122,9 +120,9 @@ def restore(self) -> int: max_iteration = -1 for file in self.base_path.iterdir(): - file = file.name - if file.startswith(self._checkpoint_prefix): - max_iteration = max(max_iteration, int(file[len(self._checkpoint_prefix):])) + filename = file.name + if filename.startswith(self._checkpoint_prefix): + max_iteration = max(max_iteration, int(filename[len(self._checkpoint_prefix):])) # no backups found if max_iteration < 0: diff --git a/dpipe/train/logging.py b/dpipe/train/logging.py index 45fbf06..818549c 100644 --- a/dpipe/train/logging.py +++ b/dpipe/train/logging.py @@ -1,15 +1,16 @@ import os +import warnings from collections import defaultdict from functools import partial from pathlib import Path -from typing import Sequence, Union +from typing import Any, Callable, Dict, Optional, Sequence, Union import numpy as np - +import wandb from dpipe.commands import load_from_folder -from dpipe.io import PathLike from dpipe.im.utils import zip_equal -import wandb +from dpipe.io import PathLike +from wandb.sdk.wandb_run import Run as wandbRun __all__ = 'Logger', 'ConsoleLogger', 'TBLogger', 'NamedTBLogger', 'WANDBLogger' @@ -27,7 +28,7 @@ def log_scalar_or_vector(logger, tag, value: np.ndarray, step): logger.log_scalar(tag, value, step) -def make_log_vector(logger, tag: str, first_step: int = 0) -> callable: +def make_log_vector(logger, tag: str, first_step: int = 0) -> Callable: def log(tag, value, step): log_vector(logger, tag, value, step) @@ -147,105 +148,194 @@ def train(self, train_losses, step): class WANDBLogger(Logger): - def __init__(self, project, run_name=None, *, - group=None, entity='neuro-ml', config=None, model=None, criterion=None, dir=None, resume="auto"): + def __init__( + self, + project: Optional[str], + run_name: Optional[str] = None, + *, + group: Optional[str] = None, + entity: str = 'neuro-ml', + config: Union[Dict, str, None] = None, + dir: Optional[str] = None, + resume: str = 'auto', + **watch_kwargs: Any, + ) -> None: + """A logger that writes to a wandb run. + + Call `wandb login` before first usage. """ - A logger that writes to a wandb run. - Call wandb.login() before usage. - """ - self._experiment = wandb.init( - entity=entity, - project=project, - resume=resume, - group=group, - dir=dir + settings = [None, wandb.Settings(start_method='fork'), wandb.Settings(start_method='thread')] + exp = None + for i, s in enumerate(settings): + try: + exp = wandb.init( + entity=entity, project=project, resume=resume, group=group, dir=dir, + settings=s + ) + break + except wandb.errors.UsageError: + warnings.warn(f"Couldn't init wandb with setting {i}, trying another one.") + continue + + assert isinstance(exp, wandbRun), 'Failed to register launch with wandb' + + current_fold_root = Path(exp.dir).parent.parent.parent + experiment_root = current_fold_root.parent + + # find out if the experiment is cut into several folds + cut_into_folds = ( + len( + [ + p + for p in experiment_root.glob('*') + if p.name.startswith('experiment_') and p.is_dir() + ] + ) + > 1 ) + + current_experiment_number = ( + str(int(current_fold_root.name.replace('experiment_', ''))) + if cut_into_folds + else 0 + ) + if run_name is not None: - self._experiment.name = run_name # can be changed manually + exp.name = run_name # can be changed manually else: - self._experiment.name = Path(self._experiment.dir).parent.parent.parent.parent.name - print(str(Path(self._experiment.dir).parent.parent.parent.parent)) - print(self._experiment.save(str(Path(self._experiment.dir).parent.parent.parent.parent / 'resources.config'), policy='now')) - + name = experiment_root.name + if cut_into_folds: + name = f'{name}-{current_experiment_number}' + exp.name = name + artifact = wandb.Artifact('model', type='config') + + try: + artifact.add_file( + str(experiment_root / 'resources.config'), f'{exp.name}/config.txt' + ) + # all json files of the current fold are added as artifacts + for json in current_fold_root.glob('*.json'): + artifact.add_file(str(json), f'{exp.name}/{json.name}') + except ValueError: + warnings.warn("It's likely you don't run a usual experiment, some artifacts were not found") + + self._experiment = exp + + wandb.log_artifact(artifact) + + self.update_config(dict(experiment=experiment_root.name)) + if cut_into_folds: + self.update_config(dict(fold=current_experiment_number)) if config is not None: - self.config(config) - - if model is not None: - self.watch(model, criterion) - - def value(self, name: str, value, step: int=None): - self._experiment.log({name: value, 'step': step}) - - def train(self, train_losses: Sequence[Union[dict, float]], step): - if train_losses and isinstance(train_losses[0], dict): + self.update_config(config) + + if watch_kwargs: + self.watch(**watch_kwargs) + + def __del__(self): + wandb.finish() + + @property + def experiment(self) -> wandbRun: + return self._experiment + + def value(self, name: str, value: Any, step: Optional[int] = None) -> None: + self._experiment.log({name: value, 'epoch': step}) + + def train( + self, train_losses: Union[Sequence[Dict], Sequence[float], Sequence[tuple], Sequence[np.ndarray]], step: int + ) -> None: + if not train_losses: + return None + train_losses_types = {type(tl) for tl in train_losses} + assert len(train_losses_types) == 1, 'Inconsistent train_losses' + t = train_losses_types.pop() + if issubclass(t, dict): for name, values in group_dicts(train_losses).items(): self.value(f'train/loss/{name}', np.mean(values), step) - else: + elif issubclass(t, (float, tuple, np.ndarray)): self.value('train/loss', np.mean(train_losses), step) + else: + msg = f'The elements of the train_losses are expected to be of dict, float, tuple or numpy array type, but the elements are of {t.__name__} type' + raise NotImplementedError(msg) - def watch(self, model, criterion=None): - self._experiment.watch(model, criterion=criterion) + def watch(self, **kwargs) -> None: + self.experiment.watch(**kwargs) - def config(self, config_args): - self._experiment.config.update(config_args, allow_val_change=True) + def update_config(self, config_args) -> None: + self.experiment.config.update(config_args, allow_val_change=True) - def agg_metrics(self, agg_metrics: Union[dict, str, Path], section=''): - """ - Log final metrics calculated in the end of experiment to summary table. + def agg_metrics( + self, agg_metrics: Union[dict, str, Path], section: str = '' + ) -> None: + """Log final metrics calculated in the end of experiment to summary table. Idea is to use these values for preparing leaderboard. agg_metrics: dictionary with name of metric as a key and with its value """ if isinstance(agg_metrics, str) or isinstance(agg_metrics, Path): - agg_metrics = {k if not section else f'{section}/{k}': v - for k, v in load_from_folder(agg_metrics, ext='.json')} + agg_metrics = { + k if not section else f'{section}/{k}': v + for k, v in load_from_folder(agg_metrics, ext='.json') + } elif section: - agg_metrics = {f'{section}/{k}': v - for k, v in agg_metrics.items()} + agg_metrics = {f'{section}/{k}': v for k, v in agg_metrics.items()} for k, v in agg_metrics.items(): - self._experiment.summary[k] = v - #self._experiment.summary.update() + self.experiment.summary[k] = v + # self.experiment.summary.update() - def ind_metrics(self, ind_metrics, step: int = 0, section: str = None): - """ - Save individual metrics to a table to see bad cases + def ind_metrics(self, ind_metrics: Any, step: int = 0, section: Optional[str] = None) -> None: + """Save individual metrics to a table to see bad cases ind_metrics: DataFrame step: int section: str, defines some metrics' grouping """ - from wandb import Table import pandas as pd + from wandb import Table + if isinstance(ind_metrics, str) or isinstance(ind_metrics, Path): ind_metrics = pd.DataFrame.from_dict( - {k: v for k, v in load_from_folder(ind_metrics, ext='.json')}).reset_index() + {k: v for k, v in load_from_folder(ind_metrics, ext='.json')} + ).reset_index().round(2) table = Table(dataframe=ind_metrics) - name = "Individual Metrics" if section is None else f"{section}/Individual Metrics" - self._experiment.log({name: table, 'step': step}) - - def image(self, name: str, *values, step: int, section: str = None, - masks_keys: tuple = ('predictions', 'ground_truth')): - """ - Method that logs images (set by values), - each value is a dict with fields,preds,target and optinally caption defined + name = ( + 'Individual Metrics' if section is None else f'{section}/Individual Metrics' + ) + self.experiment.log({name: table}) + + def image( + self, + name: str, + *values, + step: int, + section: Optional[str] = None, + masks_keys: tuple = ('predictions', 'ground_truth'), + ) -> None: + """Method that logs images (set by values), + each value is a dict with fields, preds, target and optinally caption defined Special policy that works as callback """ from wandb import Image - name = name if section is None else f"{section}/{name}" - self._experiment.log( + name = name if section is None else f'{section}/{name}' + self.experiment.log( { - name: [Image( - value['image'], - masks={k: {'mask_data': value[k]} for k in masks_keys}, - caption=value.get('caption', None) - ) for value in values], - 'step': step - }) - - def log_info(self, name: str, wandb_converter, *infos, section: str = None): - name = name if section is None else f"{section}/{name}" - self._experiment.log({name: [wandb_converter(info) for info in infos]}) + name: [ + Image( + value['image'], + masks={k: {'mask_data': value[k]} for k in masks_keys}, + caption=value.get('caption', None), + ) + for value in values + ], + }, + step=step, + ) + + def log_info(self, name: str, wandb_converter, *infos, section: Optional[str] = None, step: Optional[int] = None) -> None: + name = name if section is None else f'{section}/{name}' + self.experiment.log({name: [wandb_converter(info) for info in infos]}) diff --git a/dpipe/train/policy.py b/dpipe/train/policy.py index 5a0a05d..8c25734 100644 --- a/dpipe/train/policy.py +++ b/dpipe/train/policy.py @@ -57,6 +57,7 @@ class ValuePolicy(Policy, metaclass=ABCAttributesMeta): def __init__(self, initial): super().__init__() + # TODO: turn self.value into a property self.value = initial @@ -68,6 +69,8 @@ class DecreasingOnPlateau(ValuePolicy): """ Policy that traces average train loss and if it didn't decrease according to ``atol`` or ``rtol`` for ``patience`` epochs, multiply `value` by ``multiplier``. + ``atol`` :- absolute tolerance for detecting change in training loss value. + ``rtol`` :- relative tolerance for detecting change in training loss value. """ def __init__(self, *, initial: float, multiplier: float, patience: int, rtol, atol): @@ -81,19 +84,18 @@ def __init__(self, *, initial: float, multiplier: float, patience: int, rtol, at self.margin_loss = np.inf def get_margin_loss(self, loss): - return max([loss * (1 - self.rtol), loss - self.atol]) + return max(loss - self.atol, loss * (1 - self.rtol)) - def epoch_finished(self, epoch: int, train_losses: Sequence, **kwargs): + def epoch_finished(self, epoch: int, train_losses: Sequence, **kwargs) -> None: loss = np.mean(train_losses) if loss < self.margin_loss: self.margin_loss = self.get_margin_loss(loss) self.epochs_waited = 0 - else: - self.epochs_waited += 1 - - if self.epochs_waited > self.patience: - self.value *= self.lr_dec_mul - self.epochs_waited = 0 + return + self.epochs_waited += 1 + if self.epochs_waited > self.patience: + self.value *= self.lr_dec_mul + self.epochs_waited = 0 class Exponential(ValuePolicy): @@ -102,18 +104,21 @@ class Exponential(ValuePolicy): If ``floordiv`` is False - the `value` will be changed continuously. """ - def __init__(self, initial: float, multiplier: float, step_length: int = 1, floordiv: bool = True): + def __init__(self, initial: float, multiplier: float, step_length: int = 1, floordiv: bool = True, + min_value: float = -np.inf, max_value: float = np.inf): super().__init__(initial) self.multiplier = multiplier self.initial = initial self.step_length = step_length self.floordiv = floordiv + self.min_value = min_value + self.max_value = max_value def epoch_started(self, epoch: int): power = epoch / self.step_length if self.floordiv: power = np.floor(power) - self.value = self.initial * self.multiplier ** power + self.value = np.clip(self.initial * self.multiplier ** power, self.min_value, self.max_value) class Schedule(ValuePolicy):