diff --git a/README.md b/README.md index 82fffa9c..583dbaad 100644 --- a/README.md +++ b/README.md @@ -1283,25 +1283,10 @@ If you have a clear false negative, are explicitly testing 'edge', inconsistent checker is in your way, you can set the `SCENARIO_SKIP_CONSISTENCY_CHECKS` envvar and skip it altogether. Hopefully you don't need that. -# Snapshot +# Jhack integrations -Scenario comes with a cli tool called `snapshot`. Assuming you've pip-installed `ops-scenario`, you should be able to -reach the entry point by typing `scenario snapshot` in a shell so long as the install dir is in your `PATH`. +Up until `v5.6.0`, `scenario` shipped with a cli tool called `snapshot`, used to interact with a live charm's state. +The functionality [has been moved over to `jhack`](https://github.com/PietroPasotti/jhack/pull/111), +to allow us to keep working on it independently, and to streamline +the profile of `scenario` itself as it becomes more broadly adopted and ready for widespread usage. -Snapshot's purpose is to gather the `State` data structure from a real, live charm running in some cloud your local juju -client has access to. This is handy in case: - -- you want to write a test about the state the charm you're developing is currently in -- your charm is bork or in some inconsistent state, and you want to write a test to check the charm will handle it - correctly the next time around (aka regression testing) -- you are new to Scenario and want to quickly get started with a real-life example. - -Suppose you have a Juju model with a `prometheus-k8s` unit deployed as `prometheus-k8s/0`. If you type -`scenario snapshot prometheus-k8s/0`, you will get a printout of the State object. Pipe that out into some file, import -all you need from `scenario`, and you have a working `State` that you can `Context.run` events with. - -You can also pass a `--format` flag to obtain instead: - -- a jsonified `State` data structure, for portability -- a full-fledged pytest test case (with imports and all), where you only have to fill in the charm type and the event - that you wish to trigger. diff --git a/pyproject.toml b/pyproject.toml index 9b96ffd4..3d7b0289 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,7 +8,7 @@ build-backend = "setuptools.build_meta" [project] name = "ops-scenario" -version = "5.6.2" +version = "5.7" authors = [ { name = "Pietro Pasotti", email = "pietro.pasotti@canonical.com" } @@ -39,8 +39,6 @@ classifiers = [ "Homepage" = "https://github.com/canonical/ops-scenario" "Bug Tracker" = "https://github.com/canonical/ops-scenario/issues" -[project.scripts] -scenario = "scenario.scripts.main:main" [tool.setuptools.package-dir] scenario = "scenario" diff --git a/scenario/scripts/errors.py b/scenario/scripts/errors.py deleted file mode 100644 index f713ef60..00000000 --- a/scenario/scripts/errors.py +++ /dev/null @@ -1,17 +0,0 @@ -#!/usr/bin/env python3 -# Copyright 2023 Canonical Ltd. -# See LICENSE file for licensing details. -class SnapshotError(RuntimeError): - """Base class for errors raised by snapshot.""" - - -class InvalidTargetUnitName(SnapshotError): - """Raised if the unit name passed to snapshot is invalid.""" - - -class InvalidTargetModelName(SnapshotError): - """Raised if the model name passed to snapshot is invalid.""" - - -class StateApplyError(SnapshotError): - """Raised when the state-apply juju command fails.""" diff --git a/scenario/scripts/logger.py b/scenario/scripts/logger.py deleted file mode 100644 index 98cadfb1..00000000 --- a/scenario/scripts/logger.py +++ /dev/null @@ -1,17 +0,0 @@ -#!/usr/bin/env python3 -# Copyright 2023 Canonical Ltd. -# See LICENSE file for licensing details. - -import logging -import os - -logger = logging.getLogger(__file__) - - -def setup_logging(verbosity: int): - base_loglevel = int(os.getenv("LOGLEVEL", 30)) - verbosity = min(verbosity, 2) - loglevel = base_loglevel - (verbosity * 10) - logging.basicConfig(format="%(message)s") - logging.getLogger().setLevel(logging.WARNING) - logger.setLevel(loglevel) diff --git a/scenario/scripts/main.py b/scenario/scripts/main.py deleted file mode 100644 index ebee6084..00000000 --- a/scenario/scripts/main.py +++ /dev/null @@ -1,57 +0,0 @@ -#!/usr/bin/env python3 -# Copyright 2023 Canonical Ltd. -# See LICENSE file for licensing details. -from importlib import metadata -from importlib.metadata import PackageNotFoundError -from pathlib import Path - -import typer - -from scenario.scripts import logger -from scenario.scripts.snapshot import snapshot -from scenario.scripts.state_apply import state_apply - - -def _version(): - """Print the scenario version and exit.""" - try: - print(metadata.version("ops-scenario")) - return - except PackageNotFoundError: - pass - - pyproject_toml = Path(__file__).parent.parent.parent / "pyproject.toml" - - if not pyproject_toml.exists(): - print("") - return - - for line in pyproject_toml.read_text().split("\n"): - if line.startswith("version"): - print(line.split("=")[1].strip("\"' ")) - return - - -def main(): - app = typer.Typer( - name="scenario", - help="Scenario utilities. " - "For docs, issues and feature requests, visit " - "the github repo --> https://github.com/canonical/ops-scenario", - no_args_is_help=True, - rich_markup_mode="markdown", - ) - - app.command(name="version")(_version) - app.command(name="snapshot", no_args_is_help=True)(snapshot) - app.command(name="state-apply", no_args_is_help=True)(state_apply) - - @app.callback() - def setup_logging(verbose: int = typer.Option(0, "-v", count=True)): - logger.setup_logging(verbose) - - app() - - -if __name__ == "__main__": - main() diff --git a/scenario/scripts/snapshot.py b/scenario/scripts/snapshot.py deleted file mode 100644 index f2c678b6..00000000 --- a/scenario/scripts/snapshot.py +++ /dev/null @@ -1,1001 +0,0 @@ -#!/usr/bin/env python3 -# Copyright 2023 Canonical Ltd. -# See LICENSE file for licensing details. - -import datetime -import json -import os -import re -import shlex -import sys -import tempfile -from dataclasses import asdict, dataclass -from enum import Enum -from itertools import chain -from pathlib import Path -from subprocess import run -from typing import Any, BinaryIO, Dict, Iterable, List, Optional, TextIO, Tuple, Union - -import ops.pebble -import typer -import yaml -from ops.storage import SQLiteStorage - -from scenario.runtime import UnitStateDB -from scenario.scripts.errors import InvalidTargetModelName, InvalidTargetUnitName -from scenario.scripts.logger import logger as root_scripts_logger -from scenario.scripts.utils import JujuUnitName -from scenario.state import ( - Address, - BindAddress, - BindFailedError, - Container, - Event, - Model, - Mount, - Network, - Port, - Relation, - Secret, - State, - _EntityStatus, -) - -logger = root_scripts_logger.getChild(__file__) - -JUJU_RELATION_KEYS = frozenset({"egress-subnets", "ingress-address", "private-address"}) -JUJU_CONFIG_KEYS = frozenset({}) - -SNAPSHOT_OUTPUT_DIR = (Path(os.getcwd()).parent / "snapshot_storage").absolute() -CHARM_SUBCLASS_REGEX = re.compile(r"class (\D+)\(CharmBase\):") - - -def _try_format(string: str): - try: - import black - - try: - return black.format_str(string, mode=black.Mode()) - except black.parsing.InvalidInput as e: - logger.error(f"error parsing {string}: {e}") - return string - except ModuleNotFoundError: - logger.warning("install black for formatting") - return string - - -def format_state(state: State): - """Stringify this State as nicely as possible.""" - return _try_format(repr(state)) - - -PYTEST_TEST_TEMPLATE = """ -from scenario import * -from charm import {ct} - -def test_case(): - # Arrange: prepare the state - state = {state} - - #Act: trigger an event on the state - ctx = Context( - {ct}, - juju_version="{jv}") - - out = ctx.run( - {en} - state, - ) - - # Assert: verify that the output state is the way you want it to be - # TODO: add assertions -""" - - -def format_test_case( - state: State, - charm_type_name: str = None, - event_name: str = None, - juju_version: str = None, -): - """Format this State as a pytest test case.""" - ct = charm_type_name or "CHARM_TYPE, # TODO: replace with charm type name" - en = "EVENT_NAME, # TODO: replace with event name" - if event_name: - try: - en = Event(event_name).bind(state) - except BindFailedError: - logger.error( - f"Failed to bind {event_name} to {state}; leaving placeholder instead", - ) - - jv = juju_version or "3.0, # TODO: check juju version is correct" - state_fmt = repr(state) - return _try_format( - PYTEST_TEST_TEMPLATE.format(state=state_fmt, ct=ct, en=en, jv=jv), - ) - - -def _juju_run(cmd: str, model=None) -> Dict[str, Any]: - """Execute juju {command} in a given model.""" - _model = f" -m {model}" if model else "" - cmd = f"juju {cmd}{_model} --format json" - raw = run(shlex.split(cmd), capture_output=True, text=True).stdout - return json.loads(raw) - - -def _juju_ssh(target: JujuUnitName, cmd: str, model: Optional[str] = None) -> str: - _model = f" -m {model}" if model else "" - command = f"juju ssh{_model} {target.unit_name} {cmd}" - raw = run(shlex.split(command), capture_output=True, text=True).stdout - return raw - - -def _juju_exec(target: JujuUnitName, model: Optional[str], cmd: str) -> str: - """Execute a juju command. - - Notes: - Visit the Juju documentation to view all possible Juju commands: - https://juju.is/docs/olm/juju-cli-commands - """ - _model = f" -m {model}" if model else "" - _target = f" -u {target}" if target else "" - return run( - shlex.split(f"juju exec{_model}{_target} -- {cmd}"), - capture_output=True, - text=True, - ).stdout - - -def get_leader(target: JujuUnitName, model: Optional[str]): - # could also get it from _juju_run('status')... - logger.info("getting leader...") - return _juju_exec(target, model, "is-leader") == "True" - - -def get_network(target: JujuUnitName, model: Optional[str], endpoint: str) -> Network: - """Get the Network data structure for this endpoint.""" - raw = _juju_exec(target, model, f"network-get {endpoint}") - json_data = yaml.safe_load(raw) - - bind_addresses = [] - for raw_bind in json_data["bind-addresses"]: - addresses = [] - for raw_adds in raw_bind["addresses"]: - addresses.append( - Address( - hostname=raw_adds["hostname"], - value=raw_adds["value"], - cidr=raw_adds["cidr"], - address=raw_adds.get("address", ""), - ), - ) - - bind_addresses.append( - BindAddress( - interface_name=raw_bind.get("interface-name", ""), - addresses=addresses, - ), - ) - return Network( - name=endpoint, - bind_addresses=bind_addresses, - egress_subnets=json_data.get("egress-subnets", None), - ingress_addresses=json_data.get("ingress-addresses", None), - ) - - -def get_secrets( - target: JujuUnitName, # noqa: U100 - model: Optional[str], # noqa: U100 - metadata: Dict, # noqa: U100 - relations: Tuple[str, ...] = (), # noqa: U100 -) -> List[Secret]: - """Get Secret list from the charm.""" - logger.warning("Secrets snapshotting not implemented yet. Also, are you *sure*?") - return [] - - -def get_networks( - target: JujuUnitName, - model: Optional[str], - metadata: Dict, - include_dead: bool = False, - relations: Tuple[str, ...] = (), -) -> List[Network]: - """Get all Networks from this unit.""" - logger.info("getting networks...") - networks = [get_network(target, model, "juju-info")] - - endpoints = relations # only alive relations - if include_dead: - endpoints = chain( - metadata.get("provides", ()), - metadata.get("requires", ()), - metadata.get("peers", ()), - ) - - for endpoint in endpoints: - logger.debug(f" getting network for endpoint {endpoint!r}") - networks.append(get_network(target, model, endpoint)) - return networks - - -def get_metadata(target: JujuUnitName, model: Model): - """Get metadata.yaml from this target.""" - logger.info("fetching metadata...") - - meta_path = target.remote_charm_root / "metadata.yaml" - - raw_meta = _juju_ssh( - target, - f"cat {meta_path}", - model=model.name, - ) - return yaml.safe_load(raw_meta) - - -class RemotePebbleClient: - """Clever little class that wraps calls to a remote pebble client.""" - - def __init__( - self, - container: str, - target: JujuUnitName, - model: Optional[str] = None, - ): - self.socket_path = f"/charm/containers/{container}/pebble.socket" - self.container = container - self.target = target - self.model = model - - def _run(self, cmd: str) -> str: - _model = f" -m {self.model}" if self.model else "" - command = ( - f"juju ssh{_model} --container {self.container} {self.target.unit_name} " - f"/charm/bin/pebble {cmd}" - ) - proc = run(shlex.split(command), capture_output=True, text=True) - if proc.returncode == 0: - return proc.stdout - raise RuntimeError( - f"error wrapping pebble call with {command}: " - f"process exited with {proc.returncode}; " - f"stdout = {proc.stdout}; " - f"stderr = {proc.stderr}", - ) - - def can_connect(self) -> bool: - try: - version = self.get_system_info() - except Exception: - return False - return bool(version) - - def get_system_info(self): - return self._run("version") - - def get_plan(self) -> dict: - plan_raw = self._run("plan") - return yaml.safe_load(plan_raw) - - def pull( - self, - path: str, # noqa: U100 - *, - encoding: Optional[str] = "utf-8", # noqa: U100 - ) -> Union[BinaryIO, TextIO]: - raise NotImplementedError() - - def list_files( - self, - path: str, # noqa: U100 - *, - pattern: Optional[str] = None, # noqa: U100 - itself: bool = False, # noqa: U100 - ) -> List[ops.pebble.FileInfo]: - raise NotImplementedError() - - def get_checks( - self, - level: Optional[ops.pebble.CheckLevel] = None, - names: Optional[Iterable[str]] = None, - ) -> List[ops.pebble.CheckInfo]: - _level = f" --level={level}" if level else "" - _names = (" " + " ".join(names)) if names else "" - out = self._run(f"checks{_level}{_names}") - if out == "Plan has no health checks.": - return [] - raise NotImplementedError() - - -def fetch_file( - target: JujuUnitName, - remote_path: Union[Path, str], - container_name: str, - local_path: Union[Path, str], - model: Optional[str] = None, -) -> None: - """Download a file from a live unit to a local path.""" - model_arg = f" -m {model}" if model else "" - scp_cmd = ( - f"juju scp --container {container_name}{model_arg} " - f"{target.unit_name}:{remote_path} {local_path}" - ) - run(shlex.split(scp_cmd)) - - -def get_mounts( - target: JujuUnitName, - model: Optional[str], - container_name: str, - container_meta: Dict, - fetch_files: Optional[List[Path]] = None, - temp_dir_base_path: Path = SNAPSHOT_OUTPUT_DIR, -) -> Dict[str, Mount]: - """Get named Mounts from a container's metadata, and download specified files from the unit.""" - mount_meta = container_meta.get("mounts") - - if fetch_files and not mount_meta: - logger.error( - f"No mounts defined for container {container_name} in metadata.yaml. " - f"Cannot fetch files {fetch_files} for this container.", - ) - return {} - - mount_spec = {} - for mt in mount_meta or (): - if name := mt.get("storage"): - mount_spec[name] = mt["location"] - else: - logger.error(f"unknown mount type: {mt}") - - mounts = {} - for remote_path in fetch_files or (): - found = None - for mn, mt in mount_spec.items(): - if str(remote_path).startswith(mt): - found = mn, mt - - if not found: - logger.error( - "could not find mount corresponding to requested remote_path " - f"{remote_path}: skipping...", - ) - continue - - mount_name, src = found - mount = mounts.get(mount_name) - if not mount: - # create the mount obj and tempdir - location = tempfile.TemporaryDirectory(prefix=str(temp_dir_base_path)).name - mount = Mount(src=src, location=location) - mounts[mount_name] = mount - - # populate the local tempdir - filepath = Path(mount.location).joinpath(*remote_path.parts[1:]) - os.makedirs(os.path.dirname(filepath), exist_ok=True) - try: - fetch_file( - target, - container_name=container_name, - model=model, - remote_path=remote_path, - local_path=filepath, - ) - - except RuntimeError as e: - logger.error(e) - - return mounts - - -def get_container( - target: JujuUnitName, - model: Optional[str], - container_name: str, - container_meta: Dict, - fetch_files: Optional[List[Path]] = None, - temp_dir_base_path: Path = SNAPSHOT_OUTPUT_DIR, -) -> Container: - """Get container data structure from the target.""" - remote_client = RemotePebbleClient(container_name, target, model) - plan = remote_client.get_plan() - - return Container( - name=container_name, - _base_plan=plan, - can_connect=remote_client.can_connect(), - mounts=get_mounts( - target, - model, - container_name, - container_meta, - fetch_files, - temp_dir_base_path=temp_dir_base_path, - ), - ) - - -def get_containers( - target: JujuUnitName, - model: Optional[str], - metadata: Optional[Dict], - fetch_files: Dict[str, List[Path]] = None, - temp_dir_base_path: Path = SNAPSHOT_OUTPUT_DIR, -) -> List[Container]: - """Get all containers from this unit.""" - fetch_files = fetch_files or {} - logger.info("getting containers...") - - if not metadata: - logger.warning("no metadata: unable to get containers") - return [] - - containers = [] - for container_name, container_meta in metadata.get("containers", {}).items(): - container = get_container( - target, - model, - container_name, - container_meta, - fetch_files=fetch_files.get(container_name), - temp_dir_base_path=temp_dir_base_path, - ) - containers.append(container) - return containers - - -def get_juju_status(model: Optional[str]) -> Dict: - """Return juju status as json.""" - logger.info("getting status...") - return _juju_run("status --relations", model=model) - - -@dataclass -class Status: - app: _EntityStatus - unit: _EntityStatus - workload_version: str - - -def get_status(juju_status: Dict, target: JujuUnitName) -> Status: - """Parse `juju status` to get the Status data structure and some relation information.""" - app = juju_status["applications"][target.app_name] - - app_status_raw = app["application-status"] - app_status = app_status_raw["current"], app_status_raw.get("message", "") - - unit_status_raw = app["units"][target]["workload-status"] - unit_status = unit_status_raw["current"], unit_status_raw.get("message", "") - - workload_version = app.get("version", "") - return Status( - app=_EntityStatus(*app_status), - unit=_EntityStatus(*unit_status), - workload_version=workload_version, - ) - - -def get_endpoints(juju_status: Dict, target: JujuUnitName) -> Tuple[str, ...]: - """Parse `juju status` to get the relation names owned by the target.""" - app = juju_status["applications"][target.app_name] - relations_raw = app.get("relations", None) - if not relations_raw: - return () - relations = tuple(relations_raw.keys()) - return relations - - -def get_opened_ports( - target: JujuUnitName, - model: Optional[str], -) -> List[Port]: - """Get opened ports list from target.""" - logger.info("getting opened ports...") - - opened_ports_raw = _juju_exec( - target, - model, - "opened-ports --format json", - ) - ports = [] - - for raw_port in json.loads(opened_ports_raw): - _port_n, _proto = raw_port.split("/") - ports.append(Port(_proto, int(_port_n))) - - return ports - - -def get_config( - target: JujuUnitName, - model: Optional[str], -) -> Dict[str, Union[str, int, float, bool]]: - """Get config dict from target.""" - - logger.info("getting config...") - json_data = _juju_run(f"config {target.app_name}", model=model) - - # dispatch table for builtin config options - converters = { - "string": str, - "int": int, - "integer": int, # fixme: which one is it? - "number": float, - "boolean": lambda x: x == "true", - "attrs": lambda x: x, # fixme: wot? - } - - cfg = {} - for name, option in json_data.get("settings", ()).items(): - if value := option.get("value"): - try: - converter = converters[option["type"]] - except KeyError: - raise ValueError(f'unrecognized type {option["type"]}') - cfg[name] = converter(value) - - else: - logger.debug(f"skipped {name}: no value.") - - return cfg - - -def _get_interface_from_metadata(endpoint: str, metadata: Dict) -> Optional[str]: - """Get the name of the interface used by endpoint.""" - for role in ["provides", "requires"]: - for ep, ep_meta in metadata.get(role, {}).items(): - if ep == endpoint: - return ep_meta["interface"] - - logger.error(f"No interface for endpoint {endpoint} found in charm metadata.") - return None - - -def get_relations( - target: JujuUnitName, - model: Optional[str], - metadata: Dict, - include_juju_relation_data=False, -) -> List[Relation]: - """Get the list of relations active for this target.""" - logger.info("getting relations...") - - try: - json_data = _juju_run(f"show-unit {target}", model=model) - except json.JSONDecodeError as e: - raise InvalidTargetUnitName(target) from e - - def _clean(relation_data: dict): - if include_juju_relation_data: - return relation_data - else: - for key in JUJU_RELATION_KEYS: - del relation_data[key] - return relation_data - - relations = [] - for raw_relation in json_data[target].get("relation-info", ()): - logger.debug( - f" getting relation data for endpoint {raw_relation.get('endpoint')!r}", - ) - related_units = raw_relation.get("related-units") - if not related_units: - continue - # related-units: - # owner/0: - # in-scope: true - # data: - # egress-subnets: 10.152.183.130/32 - # ingress-address: 10.152.183.130 - # private-address: 10.152.183.130 - - relation_id = raw_relation["relation-id"] - - local_unit_data_raw = _juju_exec( - target, - model, - f"relation-get -r {relation_id} - {target} --format json", - ) - local_unit_data = json.loads(local_unit_data_raw) - local_app_data_raw = _juju_exec( - target, - model, - f"relation-get -r {relation_id} - {target} --format json --app", - ) - local_app_data = json.loads(local_app_data_raw) - - some_remote_unit_id = JujuUnitName(next(iter(related_units))) - - # fixme: at the moment the juju CLI offers no way to see what type of relation this is; - # if it's a peer relation or a subordinate, we should use the corresponding - # scenario.state types instead of a regular Relation. - - relations.append( - Relation( - endpoint=raw_relation["endpoint"], - interface=_get_interface_from_metadata( - raw_relation["endpoint"], - metadata, - ), - relation_id=relation_id, - remote_app_data=raw_relation["application-data"], - remote_app_name=some_remote_unit_id.app_name, - remote_units_data={ - JujuUnitName(tgt).unit_id: _clean(val["data"]) - for tgt, val in related_units.items() - }, - local_app_data=local_app_data, - local_unit_data=_clean(local_unit_data), - ), - ) - return relations - - -def get_model(name: str = None) -> Model: - """Get the Model data structure.""" - logger.info("getting model...") - - json_data = _juju_run("models") - model_name = name or json_data["current-model"] - try: - model_info = next( - filter(lambda m: m["short-name"] == model_name, json_data["models"]), - ) - except StopIteration as e: - raise InvalidTargetModelName(name) from e - - model_uuid = model_info["model-uuid"] - model_type = model_info["type"] - - return Model(name=model_name, uuid=model_uuid, type=model_type) - - -def try_guess_charm_type_name() -> Optional[str]: - """If we are running this from a charm project root, get the charm type name from charm.py.""" - try: - charm_path = Path(os.getcwd()) / "src" / "charm.py" - if charm_path.exists(): - source = charm_path.read_text() - charms = CHARM_SUBCLASS_REGEX.findall(source) - if len(charms) < 1: - raise RuntimeError(f"Not enough charms at {charm_path}.") - elif len(charms) > 1: - raise RuntimeError(f"Too many charms at {charm_path}.") - return charms[0] - except Exception as e: - logger.warning(f"unable to guess charm type: {e}") - return None - - -class FormatOption( - str, - Enum, -): # Enum for typer support, str for native comparison and ==. - """Output formatting options for snapshot.""" - - state = "state" # the default: will print the python repr of the State dataclass. - json = "json" - pytest = "pytest" - - -def get_juju_version(juju_status: Dict) -> str: - """Get juju agent version from juju status output.""" - return juju_status["model"]["version"] - - -def get_charm_version(target: JujuUnitName, juju_status: Dict) -> str: - """Get charm version info from juju status output.""" - app_info = juju_status["applications"][target.app_name] - channel = app_info.get("charm-channel", "") - charm_name = app_info.get("charm-name", "n/a") - workload_version = app_info.get("version", "n/a") - charm_rev = app_info.get("charm-rev", "n/a") - charm_origin = app_info.get("charm-origin", "n/a") - return ( - f"charm {charm_name!r} ({channel}/{charm_rev}); " - f"origin := {charm_origin}; app version := {workload_version}." - ) - - -class RemoteUnitStateDB(UnitStateDB): - """Represents a remote unit's state db.""" - - def __init__(self, model: Optional[str], target: JujuUnitName): - self._tempfile = tempfile.NamedTemporaryFile() - super().__init__(self._tempfile.name) - - self._model = model - self._target = target - - def _fetch_state(self): - fetch_file( - self._target, - remote_path=self._target.remote_charm_root / ".unit-state.db", - container_name="charm", - local_path=self._state_file, - model=self._model, - ) - - @property - def _has_state(self): - """Whether the state file exists.""" - return self._state_file.exists() and self._state_file.read_bytes() - - def _open_db(self) -> SQLiteStorage: - if not self._has_state: - self._fetch_state() - return super()._open_db() - - -def _snapshot( - target: str, - model: Optional[str] = None, - pprint: bool = True, - include: Optional[str] = None, - include_juju_relation_data=False, - include_dead_relation_networks=False, - format: FormatOption = "state", - event_name: Optional[str] = None, - fetch_files: Optional[Dict[str, List[Path]]] = None, - temp_dir_base_path: Path = SNAPSHOT_OUTPUT_DIR, -): - """see snapshot's docstring""" - - try: - target = JujuUnitName(target) - except InvalidTargetUnitName: - logger.critical( - f"invalid target: {target!r} is not a valid unit name. Should be formatted like so:" - f"`foo/1`, or `database/0`, or `myapp-foo-bar/42`.", - ) - sys.exit(1) - - logger.info(f'beginning snapshot of {target} in model {model or ""}...') - - def if_include(key, fn, default): - if include is None or key in include: - return fn() - return default - - try: - state_model = get_model(model) - except InvalidTargetModelName: - logger.critical(f"unable to get Model from name {model}.", exc_info=True) - sys.exit(1) - - # todo: what about controller? - model = state_model.name - - metadata = get_metadata(target, state_model) - if not metadata: - logger.critical(f"could not fetch metadata from {target}.") - sys.exit(1) - - try: - unit_state_db = RemoteUnitStateDB(model, target) - juju_status = get_juju_status(model) - endpoints = get_endpoints(juju_status, target) - status = get_status(juju_status, target=target) - - state = State( - leader=get_leader(target, model), - unit_status=status.unit, - app_status=status.app, - workload_version=status.workload_version, - model=state_model, - config=if_include("c", lambda: get_config(target, model), {}), - opened_ports=if_include( - "p", - lambda: get_opened_ports(target, model), - [], - ), - relations=if_include( - "r", - lambda: get_relations( - target, - model, - metadata=metadata, - include_juju_relation_data=include_juju_relation_data, - ), - [], - ), - containers=if_include( - "k", - lambda: get_containers( - target, - model, - metadata, - fetch_files=fetch_files, - temp_dir_base_path=temp_dir_base_path, - ), - [], - ), - networks=if_include( - "n", - lambda: get_networks( - target, - model, - metadata, - include_dead=include_dead_relation_networks, - relations=endpoints, - ), - [], - ), - secrets=if_include( - "S", - lambda: get_secrets( - target, - model, - metadata, - relations=endpoints, - ), - [], - ), - deferred=if_include( - "d", - unit_state_db.get_deferred_events, - [], - ), - stored_state=if_include( - "t", - unit_state_db.get_stored_state, - [], - ), - ) - - # todo: these errors should surface earlier. - except InvalidTargetUnitName: - _model = f"model {model}" or "the current model" - logger.critical(f"invalid target: {target!r} not found in {_model}") - sys.exit(1) - except InvalidTargetModelName: - logger.critical(f"invalid model: {model!r} not found.") - sys.exit(1) - - logger.info("snapshot done.") - - if pprint: - charm_version = get_charm_version(target, juju_status) - juju_version = get_juju_version(juju_status) - if format == FormatOption.pytest: - charm_type_name = try_guess_charm_type_name() - txt = format_test_case( - state, - event_name=event_name, - charm_type_name=charm_type_name, - juju_version=juju_version, - ) - elif format == FormatOption.state: - txt = format_state(state) - elif format == FormatOption.json: - txt = json.dumps(asdict(state), indent=2) - else: - raise ValueError(f"unknown format {format}") - - # json does not support comments, so it would be invalid output. - if format != FormatOption.json: - # print out some metadata - controller_timestamp = juju_status["controller"]["timestamp"] - local_timestamp = datetime.datetime.now().strftime("%m/%d/%Y, %H:%M:%S") - print( - f"# Generated by scenario.snapshot. \n" - f"# Snapshot of {state_model.name}:{target.unit_name} at {local_timestamp}. \n" - f"# Controller timestamp := {controller_timestamp}. \n" - f"# Juju version := {juju_version} \n" - f"# Charm fingerprint := {charm_version} \n", - ) - - print(txt) - - return state - - -def snapshot( - target: str = typer.Argument(..., help="Target unit."), - model: Optional[str] = typer.Option( - None, - "-m", - "--model", - help="Which model to look at.", - ), - format: FormatOption = typer.Option( - "state", - "-f", - "--format", - help="How to format the output. " - "``state``: Outputs a black-formatted repr() of the State object (if black is installed! " - "else it will be ugly but valid python code). All you need to do then is import the " - "necessary objects from scenario.state, and you should have a valid State object. " - "``json``: Outputs a Jsonified State object. Perfect for storage. " - "``pytest``: Outputs a full-blown pytest scenario test based on this State. " - "Pipe it to a file and fill in the blanks.", - ), - event_name: str = typer.Option( - None, - "--event_name", - "-e", - help="Event to include in the generate test file; only applicable " - "if the output format is 'pytest'.", - ), - include: str = typer.Option( - "rckndtp", - "--include", - "-i", - help="What data to include in the state. " - "``r``: relation, ``c``: config, ``k``: containers, " - "``n``: networks, ``S``: secrets(!), ``p``: opened ports, " - "``d``: deferred events, ``t``: stored state.", - ), - include_dead_relation_networks: bool = typer.Option( - False, - "--include-dead-relation-networks", - help="Whether to gather networks of inactive relation endpoints.", - is_flag=True, - ), - include_juju_relation_data: bool = typer.Option( - False, - "--include-juju-relation-data", - help="Whether to include in the relation data the default juju keys (egress-subnets," - "ingress-address, private-address).", - is_flag=True, - ), - fetch: Path = typer.Option( - None, - "--fetch", - help="Path to a local file containing a json spec of files to be fetched from the unit. " - "For k8s units, it's supposed to be a {container_name: List[Path]} mapping listing " - "the files that need to be fetched from the existing containers.", - ), - # TODO: generalize "fetch" to allow passing '.' for the 'charm' container or 'the machine'. - output_dir: Path = typer.Option( - SNAPSHOT_OUTPUT_DIR, - "--output-dir", - help="Directory in which to store any files fetched as part of the state. In the case " - "of k8s charms, this might mean files obtained through Mounts,", - ), -) -> State: - """Gather and output the State of a remote target unit. - - If black is available, the output will be piped through it for formatting. - - Usage: snapshot myapp/0 > ./tests/scenario/case1.py - """ - - fetch_files = json.loads(fetch.read_text()) if fetch else None - - return _snapshot( - target=target, - model=model, - format=format, - event_name=event_name, - include=include, - include_juju_relation_data=include_juju_relation_data, - include_dead_relation_networks=include_dead_relation_networks, - temp_dir_base_path=output_dir, - fetch_files=fetch_files, - ) - - -# for the benefit of script usage -_snapshot.__doc__ = snapshot.__doc__ - -if __name__ == "__main__": - # print(_snapshot("zookeeper/0", model="foo", format=FormatOption.pytest)) - - print( - _snapshot( - "traefik/0", - format=FormatOption.state, - include="r", - # fetch_files={ - # "traefik": [ - # Path("/opt/traefik/juju/certificates.yaml"), - # Path("/opt/traefik/juju/certificate.cert"), - # Path("/opt/traefik/juju/certificate.key"), - # Path("/etc/traefik/traefik.yaml"), - # ] - # }, - ), - ) diff --git a/scenario/scripts/state_apply.py b/scenario/scripts/state_apply.py deleted file mode 100644 index f864b141..00000000 --- a/scenario/scripts/state_apply.py +++ /dev/null @@ -1,256 +0,0 @@ -#!/usr/bin/env python3 -# Copyright 2023 Canonical Ltd. -# See LICENSE file for licensing details. - -import json -import logging -import os -import sys -from pathlib import Path -from subprocess import CalledProcessError, run -from typing import Dict, Iterable, List, Optional - -import typer - -from scenario.scripts.errors import InvalidTargetUnitName, StateApplyError -from scenario.scripts.utils import JujuUnitName -from scenario.state import ( - Container, - DeferredEvent, - Port, - Relation, - Secret, - State, - StoredState, - _EntityStatus, -) - -SNAPSHOT_DATA_DIR = (Path(os.getcwd()).parent / "snapshot_storage").absolute() - -logger = logging.getLogger("snapshot") - - -def set_relations(relations: Iterable[Relation]) -> List[str]: # noqa: U100 - logger.info("preparing relations...") - logger.warning("set_relations not implemented yet") - return [] - - -def set_status( - unit_status: _EntityStatus, - app_status: _EntityStatus, - app_version: str, -) -> List[str]: - logger.info("preparing status...") - cmds = [] - - cmds.append(f"status-set {unit_status.name} {unit_status.message}") - cmds.append(f"status-set --application {app_status.name} {app_status.message}") - cmds.append(f"application-version-set {app_version}") - - return cmds - - -def set_config(config: Dict[str, str]) -> List[str]: # noqa: U100 - logger.info("preparing config...") - logger.warning("set_config not implemented yet") - return [] - - -def set_opened_ports(opened_ports: List[Port]) -> List[str]: - logger.info("preparing opened ports...") - # fixme: this will only open new ports, it will not close all already-open ports. - - cmds = [] - - for port in opened_ports: - cmds.append(f"open-port {port.port}/{port.protocol}") - - return cmds - - -def set_containers(containers: Iterable[Container]) -> List[str]: # noqa: U100 - logger.info("preparing containers...") - logger.warning("set_containers not implemented yet") - return [] - - -def set_secrets(secrets: Iterable[Secret]) -> List[str]: # noqa: U100 - logger.info("preparing secrets...") - logger.warning("set_secrets not implemented yet") - return [] - - -def set_deferred_events( - deferred_events: Iterable[DeferredEvent], # noqa: U100 -) -> List[str]: - logger.info("preparing deferred_events...") - logger.warning("set_deferred_events not implemented yet") - return [] - - -def set_stored_state(stored_state: Iterable[StoredState]) -> List[str]: # noqa: U100 - logger.info("preparing stored_state...") - logger.warning("set_stored_state not implemented yet") - return [] - - -def exec_in_unit(target: JujuUnitName, model: str, cmds: List[str]): - logger.info("Running juju exec...") - - _model = f" -m {model}" if model else "" - cmd_fmt = "; ".join(cmds) - try: - run(f'juju exec -u {target}{_model} -- "{cmd_fmt}"') - except CalledProcessError as e: - raise StateApplyError( - f"Failed to apply state: process exited with {e.returncode}; " - f"stdout = {e.stdout}; " - f"stderr = {e.stderr}.", - ) - - -def run_commands(cmds: List[str]): - logger.info("Applying remaining state...") - for cmd in cmds: - try: - run(cmd) - except CalledProcessError as e: - # todo: should we log and continue instead? - raise StateApplyError( - f"Failed to apply state: process exited with {e.returncode}; " - f"stdout = {e.stdout}; " - f"stderr = {e.stderr}.", - ) - - -def _state_apply( - target: str, - state: State, - model: Optional[str] = None, - include: str = None, - include_juju_relation_data=False, # noqa: U100 - push_files: Dict[str, List[Path]] = None, # noqa: U100 - snapshot_data_dir: Path = SNAPSHOT_DATA_DIR, # noqa: U100 -): - """see state_apply's docstring""" - logger.info("Starting state-apply...") - - try: - target = JujuUnitName(target) - except InvalidTargetUnitName: - logger.critical( - f"invalid target: {target!r} is not a valid unit name. Should be formatted like so:" - f"`foo/1`, or `database/0`, or `myapp-foo-bar/42`.", - ) - sys.exit(1) - - logger.info(f'beginning snapshot of {target} in model {model or ""}...') - - def if_include(key, fn): - if include is None or key in include: - return fn() - return [] - - j_exec_cmds: List[str] = [] - - j_exec_cmds += if_include( - "s", - lambda: set_status(state.unit_status, state.app_status, state.workload_version), - ) - j_exec_cmds += if_include("p", lambda: set_opened_ports(state.opened_ports)) - j_exec_cmds += if_include("r", lambda: set_relations(state.relations)) - j_exec_cmds += if_include("S", lambda: set_secrets(state.secrets)) - - cmds: List[str] = [] - - # todo: config is a bit special because it's not owned by the unit but by the cloud admin. - # should it be included in state-apply? - # if_include("c", lambda: set_config(state.config)) - cmds += if_include("k", lambda: set_containers(state.containers)) - cmds += if_include("d", lambda: set_deferred_events(state.deferred)) - cmds += if_include("t", lambda: set_stored_state(state.stored_state)) - - # we gather juju-exec commands to run them all at once in the unit. - exec_in_unit(target, model, j_exec_cmds) - # non-juju-exec commands are ran one by one, individually - run_commands(cmds) - - logger.info("Done!") - - -def state_apply( - target: str = typer.Argument(..., help="Target unit."), - state: Path = typer.Argument( - ..., - help="Source State to apply. Json file containing a State data structure; " - "the same you would obtain by running snapshot.", - ), - model: Optional[str] = typer.Option( - None, - "-m", - "--model", - help="Which model to look at.", - ), - include: str = typer.Option( - "scrkSdt", - "--include", - "-i", - help="What parts of the state to apply. Defaults to: all of them. " - "``r``: relation, ``c``: config, ``k``: containers, " - "``s``: status, ``S``: secrets(!), " - "``d``: deferred events, ``t``: stored state.", - ), - include_juju_relation_data: bool = typer.Option( - False, - "--include-juju-relation-data", - help="Whether to include in the relation data the default juju keys (egress-subnets," - "ingress-address, private-address).", - is_flag=True, - ), - push_files: Path = typer.Option( - None, - "--push-files", - help="Path to a local file containing a json spec of files to be fetched from the unit. " - "For k8s units, it's supposed to be a {container_name: List[Path]} mapping listing " - "the files that need to be pushed to the each container.", - ), - # TODO: generalize "push_files" to allow passing '.' for the 'charm' container or 'the machine'. - data_dir: Path = typer.Option( - SNAPSHOT_DATA_DIR, - "--data-dir", - help="Directory in which to any files associated with the state are stored. In the case " - "of k8s charms, this might mean files obtained through Mounts,", - ), -): - """Apply a State to a remote target unit. - - If black is available, the output will be piped through it for formatting. - - Usage: state-apply myapp/0 > ./tests/scenario/case1.py - """ - push_files_ = json.loads(push_files.read_text()) if push_files else None - state_json = json.loads(state.read_text()) - - # TODO: state_json to State - raise NotImplementedError("WIP: implement State.from_json") - state_: State = State.from_json(state_json) - - return _state_apply( - target=target, - state=state_, - model=model, - include=include, - include_juju_relation_data=include_juju_relation_data, - snapshot_data_dir=data_dir, - push_files=push_files_, - ) - - -# for the benefit of scripted usage -_state_apply.__doc__ = state_apply.__doc__ - -if __name__ == "__main__": - from scenario import State - - _state_apply("zookeeper/0", model="foo", state=State()) diff --git a/scenario/scripts/utils.py b/scenario/scripts/utils.py deleted file mode 100644 index de9dc01e..00000000 --- a/scenario/scripts/utils.py +++ /dev/null @@ -1,23 +0,0 @@ -#!/usr/bin/env python3 -# Copyright 2023 Canonical Ltd. -# See LICENSE file for licensing details. -from pathlib import Path - -from scenario.scripts.errors import InvalidTargetUnitName - - -class JujuUnitName(str): - """This class represents the name of a juju unit that can be snapshotted.""" - - def __init__(self, unit_name: str): - super().__init__() - app_name, _, unit_id = unit_name.rpartition("/") - if not app_name or not unit_id: - raise InvalidTargetUnitName(f"invalid unit name: {unit_name!r}") - self.unit_name = unit_name - self.app_name = app_name - self.unit_id = int(unit_id) - self.normalized = f"{app_name}-{unit_id}" - self.remote_charm_root = Path( - f"/var/lib/juju/agents/unit-{self.normalized}/charm", - )