diff --git a/README.md b/README.md index 55b0b16..c12bd3f 100644 --- a/README.md +++ b/README.md @@ -20,48 +20,55 @@ Alternatively, you can always install it with `pip` as a python module: ## 1. Create an Event -ctfcli turns the current folder into a CTF event git repo. It asks for the base url of the CTFd instance you're working with and an access token. +Ctfcli turns the current folder into a CTF event git repo. +It asks for the base url of the CTFd instance you're working with and an access token. ``` ❯ ctf init Please enter CTFd instance URL: https://demo.ctfd.io Please enter CTFd Admin Access Token: d41d8cd98f00b204e9800998ecf8427e -Do you want to continue with https://demo.ctfd.io and d41d8cd98f00b204e9800998ecf8427e [y/N]: y +Do you want to continue with https://demo.ctfd.io and d41d8cd98f00b204e9800998ecf8427e [Y/n]: y Initialized empty Git repository in /Users/user/Downloads/event/.git/ ``` -This will create the `.ctf` folder with the `config` file that will specify the URL, access token, and keep a record of all the challenges dedicated for this event. +This will create the `.ctf` folder with the `config` file that will specify the URL, access token, and keep a record of +all the challenges dedicated for this event. ## 2. Add challenges -Events are made up of challenges. Challenges can be made from a subdirectory or pulled from another repository. Remote challenges are pulled into the event repo and a reference is kept in the `.ctf/config` file. +Events are made up of challenges. +Challenges can be made from a subdirectory or pulled from another repository. +GIT-enabled challenges are pulled into the event repo, and a reference is kept in the `.ctf/config` file. ``` ❯ ctf challenge add [REPO | FOLDER] ``` +##### Local folder: ``` ❯ ctf challenge add crypto/stuff ``` +##### GIT repository: ``` ❯ ctf challenge add https://github.com/challenge.git -challenge Cloning into 'challenge'... -remote: Enumerating objects: 624, done. -remote: Counting objects: 100% (624/624), done. -remote: Compressing objects: 100% (540/540), done. -remote: Total 624 (delta 109), reused 335 (delta 45), pack-reused 0 -Receiving objects: 100% (624/624), 6.49 MiB | 21.31 MiB/s, done. -Resolving deltas: 100% (109/109), done. +[...] +``` + +##### GIT repository to a specific subfolder: +``` +❯ ctf challenge add https://github.com/challenge.git crypto +Cloning into 'crypto/challenge'... +[...] ``` ## 3. Install challenges -Installing a challenge will automatically create the challenge in your CTFd instance using the API. +Installing a challenge will create the challenge in your CTFd instance using the API. ``` -❯ ctf challenge install [challenge.yml | DIRECTORY] +❯ ctf challenge install [challenge] ``` ``` @@ -72,12 +79,13 @@ Installing buffer_overflow Success! ``` -## 4. Update challenges +## 4. Sync challenges -Syncing a challenge will automatically update the challenge in your CTFd instance using the API. Any changes made in the `challenge.yml` file will be reflected in your instance. +Syncing a challenge will update the challenge in your CTFd instance using the API. +Any changes made in the `challenge.yml` file will be reflected in your instance. ``` -❯ ctf challenge sync [challenge.yml | DIRECTORY] +❯ ctf challenge sync [challenge] ``` ``` @@ -88,6 +96,70 @@ Syncing buffer_overflow Success! ``` +## 5. Deploy services + +Deploying a challenge will automatically create the challenge service (by default in your CTFd instance). +You can also use a different deployment handler to deploy the service via SSH to your own server, +or a separate docker registry. + +The challenge will also be automatically installed or synced. +Obtained connection info will be added to your `challenge.yml` file. +``` +❯ ctf challenge deploy [challenge] +``` + +``` +❯ ctf challenge deploy web-1 +Deploying challenge service 'web-1' (web-1/challenge.yml) with CloudDeploymentHandler ... +Challenge service deployed at: https://web-1-example-instance.chals.io +Updating challenge 'web-1' +Success! +``` + +## 6. Verify challenges + +Verifying a challenge will check if the local version of the challenge is the same as one installed in your CTFd instance. + +``` +❯ ctf challenge verify [challenge] +``` + +``` +❯ ctf challenge verify buffer_overflow +Verifying challenges [------------------------------------] 0% +Verifying challenges [####################################] 100% +Success! All challenges verified! +Challenges in sync: + - buffer_overflow +``` + +## 7. Mirror changes + +Mirroring a challenge is the reverse operation to syncing. +It will update the local version of the challenge with details of the one installed in your CTFd instance. +It will also issue a warning if you have any remote challenges that are not tracked locally. + +``` +❯ ctf challenge mirror [challenge] +``` + +``` +❯ ctf challenge verify buffer_overflow +Mirorring challenges [------------------------------------] 0% +Mirorring challenges [####################################] 100% +Success! All challenges mirrored! +``` + +## Operations on all challenges + +You can perform operations on all challenges defined in your config by simply skipping the challenge parameter. + +- `ctf challenge install` +- `ctf challenge sync` +- `ctf challenge deploy` +- `ctf challenge verify` +- `ctf challenge mirror` + # Challenge Templates `ctfcli` contains pre-made challenge templates to make it faster to create CTF challenges with safe defaults. @@ -126,6 +198,6 @@ The specification format has already been tested and used with CTFd in productio # Plugins -`ctfcli` plugins are essentially additions to to the command line interface via dynamic class modifications. See the [plugin documentation page](docs/plugins.md) for a simple example. +`ctfcli` plugins are essentially additions to the command line interface via dynamic class modifications. See the [plugin documentation page](docs/plugins.md) for a simple example. *`ctfcli` is an alpha project! The plugin interface is likely to change!* diff --git a/ctfcli/cli/challenges.py b/ctfcli/cli/challenges.py index f8d226e..a97229c 100644 --- a/ctfcli/cli/challenges.py +++ b/ctfcli/cli/challenges.py @@ -693,7 +693,7 @@ def deploy( elif deployment_result.connection_info: challenge["connection_info"] = deployment_result.connection_info - # Finally if no connection_info was provided in the challenge and the + # Finally, if no connection_info was provided in the challenge and the # deployment didn't result in one either, just ensure it's not present else: challenge["connection_info"] = None @@ -714,6 +714,8 @@ def deploy( f"Challenge service deployed at: {challenge['connection_info']}", fg="green", ) + + challenge.save() # Save the challenge with the new connection_info else: click.secho( "Could not resolve a connection_info for the deployed service.\nIf your DeploymentHandler " @@ -793,8 +795,8 @@ def lint( click.secho("Success! Lint didn't find any issues!", fg="green") return 0 - def healthcheck(self, challenge: str = None): - log.debug(f"lint: (challenge={challenge})") + def healthcheck(self, challenge: str = None) -> int: + log.debug(f"healthcheck: (challenge={challenge})") config = Config() challenge_path = Path.cwd() @@ -861,3 +863,193 @@ def healthcheck(self, challenge: str = None): click.secho("Success! Challenge passed the healthcheck.", fg="green") return 0 + + def mirror( + self, + challenge: str = None, + files_directory: str = "dist", + skip_verify: bool = False, + ignore: Union[str, Tuple[str]] = (), + ) -> int: + config = Config() + challenge_keys = [challenge] + + # Get all local challenges if not specifying a challenge + if challenge is None: + challenge_keys = config.challenges.keys() + + # Check if there are attributes to be ignored, and if there's only one cast it to a tuple + if isinstance(ignore, str): + ignore = (ignore,) + + # Load local challenges + local_challenges, failed_mirrors = [], [] + for challenge_key in challenge_keys: + challenge_path = config.project_path / Path(challenge_key) + + if not challenge_path.name.endswith(".yml"): + challenge_path = challenge_path / "challenge.yml" + + try: + local_challenges.append(Challenge(challenge_path)) + + except ChallengeException as e: + click.secho(str(e), fg="red") + failed_mirrors.append(challenge_key) + continue + + remote_challenges = Challenge.load_installed_challenges() + + if len(challenge_keys) > 1: + # When mirroring all challenges - issue a warning if there are extra challenges on the remote + # that do not have a local version + local_challenge_names = [c["name"] for c in local_challenges] + + for remote_challenge in remote_challenges: + if remote_challenge["name"] not in local_challenge_names: + click.secho( + f"Found challenge '{remote_challenge['name']}' in CTFd, but not in .ctf/config\n" + "Mirroring does not create new local challenges\n" + "Please add the local challenge if you wish to manage it with ctfcli\n", + fg="yellow", + ) + + with click.progressbar(local_challenges, label="Mirroring challenges") as challenges: + for challenge in challenges: + try: + if not skip_verify and challenge.verify(ignore=ignore): + click.secho( + f"Challenge '{challenge['name']}' is already in sync. Skipping mirroring.", + fg="blue", + ) + else: + # if skip_verify is True or challenge.verify(ignore=ignore) is False + challenge.mirror(files_directory_name=files_directory, ignore=ignore) + + except ChallengeException as e: + click.secho(str(e), fg="red") + failed_mirrors.append(challenge["name"]) + + if len(failed_mirrors) == 0: + click.secho("Success! All challenges mirrored!", fg="green") + return 0 + + click.secho("Mirror failed for:", fg="red") + for challenge in failed_mirrors: + click.echo(f" - {challenge}") + + return 1 + + def verify(self, challenge: str = None, ignore: Tuple[str] = ()) -> int: + config = Config() + challenge_keys = [challenge] + + # Get all local challenges if not specifying a challenge + if challenge is None: + challenge_keys = config.challenges.keys() + + # Check if there are attributes to be ignored, and if there's only one cast it to a tuple + if isinstance(ignore, str): + ignore = (ignore,) + + # Load local challenges + local_challenges, failed_verifications = [], [] + for challenge_key in challenge_keys: + challenge_path = config.project_path / Path(challenge_key) + + if not challenge_path.name.endswith(".yml"): + challenge_path = challenge_path / "challenge.yml" + + try: + local_challenges.append(Challenge(challenge_path)) + + except ChallengeException as e: + click.secho(str(e), fg="red") + failed_verifications.append(challenge_key) + continue + + remote_challenges = Challenge.load_installed_challenges() + + if len(challenge_keys) > 1: + # When verifying all challenges - issue a warning if there are extra challenges on the remote + # that do not have a local version + local_challenge_names = [c["name"] for c in local_challenges] + + for remote_challenge in remote_challenges: + if remote_challenge["name"] not in local_challenge_names: + click.secho( + f"Found challenge '{remote_challenge['name']}' in CTFd, but not in .ctf/config\n" + "Please add the local challenge if you wish to manage it with ctfcli\n", + fg="yellow", + ) + + challenges_in_sync, challenges_out_of_sync = [], [] + with click.progressbar(local_challenges, label="Verifying challenges") as challenges: + for challenge in challenges: + try: + if not challenge.verify(ignore=ignore): + challenges_out_of_sync.append(challenge["name"]) + else: + challenges_in_sync.append(challenge["name"]) + + except ChallengeException as e: + click.secho(str(e), fg="red") + failed_verifications.append(challenge["name"]) + + if len(failed_verifications) == 0: + click.secho("Success! All challenges verified!", fg="green") + + if len(challenges_in_sync) > 0: + click.secho("Challenges in sync:", fg="green") + for challenge in challenges_in_sync: + click.echo(f" - {challenge}") + + if len(challenges_out_of_sync) > 0: + click.secho("Challenges out of sync:", fg="yellow") + for challenge in challenges_out_of_sync: + click.echo(f" - {challenge}") + + if len(challenges_out_of_sync) > 1: + return 2 + + return 1 + + click.secho("Verification failed for:", fg="red") + for challenge in failed_verifications: + click.echo(f" - {challenge}") + + return 1 + + def format(self, challenge: str = None) -> int: + config = Config() + challenge_keys = [challenge] + + # Get all local challenges if not specifying a challenge + if challenge is None: + challenge_keys = config.challenges.keys() + + failed_formats = [] + for challenge_key in challenge_keys: + challenge_path = config.project_path / Path(challenge_key) + + if not challenge_path.name.endswith(".yml"): + challenge_path = challenge_path / "challenge.yml" + + try: + # load the challenge and save it without changes + Challenge(challenge_path).save() + + except ChallengeException as e: + click.secho(str(e), fg="red") + failed_formats.append(challenge_key) + continue + + if len(failed_formats) == 0: + click.secho("Success! All challenges formatted!", fg="green") + return 0 + + click.secho("Format failed for:", fg="red") + for challenge in failed_formats: + click.echo(f" - {challenge}") + + return 1 diff --git a/ctfcli/core/challenge.py b/ctfcli/core/challenge.py index 71bbfbb..ef5a0f0 100644 --- a/ctfcli/core/challenge.py +++ b/ctfcli/core/challenge.py @@ -1,7 +1,8 @@ +import re import subprocess from os import PathLike from pathlib import Path -from typing import Dict, List, Optional, Union +from typing import Any, Dict, List, Optional, Tuple, Union import click import yaml @@ -18,7 +19,39 @@ from ctfcli.utils.tools import strings +def str_presenter(dumper, data): + if len(data.splitlines()) > 1 or "\n" in data: + text_list = [line.rstrip() for line in data.splitlines()] + fixed_data = "\n".join(text_list) + return dumper.represent_scalar("tag:yaml.org,2002:str", fixed_data, style="|") + elif len(data) > 80: + return dumper.represent_scalar("tag:yaml.org,2002:str", data.rstrip(), style=">") + + return dumper.represent_scalar("tag:yaml.org,2002:str", data) + + +yaml.add_representer(str, str_presenter) +yaml.representer.SafeRepresenter.add_representer(str, str_presenter) + + class Challenge(dict): + key_order = [ + # fmt: off + "name", "author", "category", "description", "value", + "type", "extra", "image", "protocol", "host", + "connection_info", "healthcheck", "attempts", "flags", + "files", "topics", "tags", "files", "hints", + "requirements", "state", "version", + # fmt: on + ] + + keys_with_newline = [ + # fmt: off + "extra", "image", "attempts", "flags", "topics", "tags", + "files", "hints", "requirements", "state", "version", + # fmt: on + ] + @staticmethod def load_installed_challenge(challenge_id) -> Optional[Dict]: api = API() @@ -47,6 +80,32 @@ def load_installed_challenges() -> List: return installed_challenges + @staticmethod + def is_default_challenge_property(key: str, value: Any) -> bool: + if key == "connection_info" and value is None: + return True + + if key == "attempts" and value == 0: + return True + + if key == "state" and value == "visible": + return True + + if key == "type" and value == "standard": + return True + + if key in ["tags", "hints", "topics", "requirements", "files"] and value == []: + return True + + return False + + @property + def api(self): + if not self._api: + self._api = API() + + return self._api + # __init__ expects an absolute path to challenge_yml, or a relative one from the cwd # it does not join that path with the project_path def __init__(self, challenge_yml: Union[str, PathLike], overrides=None): @@ -74,27 +133,41 @@ def __init__(self, challenge_yml: Union[str, PathLike], overrides=None): challenge_data = {**challenge_definition, **overrides} super(Challenge, self).__init__(challenge_data) - # challenge id is unknown before sync or creation + # Challenge id is unknown before loading the remote challenge self.challenge_id = None - # API Session is not generated until it's necessary, but should be reused later - self.api = None + # API is not initialized before running an API-related operation, but should be reused later + self._api = None - # Set Image to None if challenge does not provide one + # Set Image to None if the challenge does not provide one self.image = None - # get name and build path for the image if challenge provides one + # Get name and a build path for the image if the challenge provides one if self.get("image"): self.image = Image(slugify(self["name"]), self.challenge_directory / self["image"]) + def _load_challenge_id(self): + remote_challenges = self.load_installed_challenges() + if not remote_challenges: + raise RemoteChallengeNotFound("Could not load any remote challenges") + + # get challenge id from the remote + for inspected_challenge in remote_challenges: + if inspected_challenge["name"] == self["name"]: + self.challenge_id = inspected_challenge["id"] + break + + # return if we failed to determine the challenge id (failed to find the challenge) + if self.challenge_id is None: + raise RemoteChallengeNotFound(f"Could not load remote challenge with name '{self['name']}'") + def _validate_files(self): # if the challenge defines files, make sure they exist before making any changes to the challenge for challenge_file in self["files"]: if not (self.challenge_directory / challenge_file).exists(): raise InvalidChallengeFile(f"File {challenge_file} could not be loaded") - def _get_initial_challenge_payload(self, ignore=()) -> Dict: - # alias self as challenge for accessing internal dict data + def _get_initial_challenge_payload(self, ignore: Tuple[str] = ()) -> Dict: challenge = self challenge_payload = { "name": self["name"], @@ -105,7 +178,7 @@ def _get_initial_challenge_payload(self, ignore=()) -> Dict: "state": "hidden", } - # Some challenge types (e.g. dynamic) override value. + # Some challenge types (e.g., dynamic) override value. # We can't send it to CTFd because we don't know the current value if challenge.get("value", None) is not None: # if value is an int as string, cast it @@ -200,7 +273,7 @@ def _create_files(self): new_files.append(("file", open(self.challenge_directory / challenge_file, mode="rb"))) files_payload = {"challenge_id": self.challenge_id, "type": "challenge"} - # Specifically use data= here instead of json= to send multipart/form-data + # Specifically use data= here to send multipart/form-data r = self.api.post("/api/v1/files", files=new_files, data=files_payload) r.raise_for_status() @@ -264,8 +337,7 @@ def _set_required_challenges(self): r = self.api.patch(f"/api/v1/challenges/{self.challenge_id}", json=requirements_payload) r.raise_for_status() - def sync(self, ignore=()) -> None: - # alias self as challenge for accessing internal dict data + def sync(self, ignore: Tuple[str] = ()) -> None: challenge = self if "name" in ignore: @@ -282,22 +354,8 @@ def sync(self, ignore=()) -> None: self._validate_files() challenge_payload = self._get_initial_challenge_payload(ignore=ignore) - remote_challenges = self.load_installed_challenges() - - if not remote_challenges: - raise RemoteChallengeNotFound("Could not load any remote challenges") - - # get challenge id from the remote - for inspected_challenge in remote_challenges: - if inspected_challenge["name"] == challenge["name"]: - self.challenge_id = inspected_challenge["id"] - break - - # return if we failed to determine the challenge id (failed to find the challenge) - if self.challenge_id is None: - raise RemoteChallengeNotFound(f"Could not load remote challenge with name '{challenge['name']}'") - # remote challenge should exist now + self._load_challenge_id() remote_challenge = self.load_installed_challenge(self.challenge_id) # if value, category, type or description are ignored, revert them to the remote state in the initial payload @@ -306,9 +364,6 @@ def sync(self, ignore=()) -> None: if p in ignore: challenge_payload[p] = remote_challenge[p] - if not self.api: - self.api = API() - # Update simple properties r = self.api.patch(f"/api/v1/challenges/{self.challenge_id}", json=challenge_payload) r.raise_for_status() @@ -354,6 +409,7 @@ def sync(self, ignore=()) -> None: if "state" not in ignore: if challenge.get("state", "visible") == "visible": make_challenge_visible = True + # 2. State is ignored, but regardless of the local value, the remote state was visible else: if remote_challenge.get("state") == "visible": @@ -363,8 +419,7 @@ def sync(self, ignore=()) -> None: r = self.api.patch(f"/api/v1/challenges/{self.challenge_id}", json={"state": "visible"}) r.raise_for_status() - def create(self, ignore=()) -> None: - # alias self as challenge for accessing internal dict data + def create(self, ignore: Tuple[str] = ()) -> None: challenge = self for attr in ["name", "value"]: @@ -386,17 +441,15 @@ def create(self, ignore=()) -> None: challenge_payload = self._get_initial_challenge_payload(ignore=ignore) - # in the case of create value and type can't be ignored: - # value is required (unless the challenge is a dynamic value challenge), and type will default to standard + # in the case of creation, value and type can't be ignored: + # value is required (unless the challenge is a dynamic value challenge), + # and the type will default to standard # if category or description are ignored, set them to an empty string reset_properties_if_ignored = ["category", "description"] for p in reset_properties_if_ignored: if p in ignore: challenge_payload[p] = "" - if not self.api: - self.api = API() - r = self.api.post("/api/v1/challenges", json=challenge_payload) r.raise_for_status() @@ -439,7 +492,7 @@ def lint(self, skip_hadolint=False, flag_format="flag{") -> bool: # Check if required fields are present for field in ["name", "author", "category", "description", "value"]: - # value is allowed to be none, if the challenge type is dynamic + # value is allowed to be none if the challenge type is dynamic if field == "value" and challenge.get("type") == "dynamic": continue @@ -480,7 +533,7 @@ def lint(self, skip_hadolint=False, flag_format="flag{") -> bool: else: click.secho("Skipping Hadolint", fg="yellow") - # Check that all files exists + # Check that all files exist challenge_files = challenge.get("files", []) for challenge_file in challenge_files: challenge_file_path = self.challenge_directory / challenge_file @@ -496,7 +549,7 @@ def lint(self, skip_hadolint=False, flag_format="flag{") -> bool: challenge_file_path = self.challenge_directory / challenge_file if not challenge_file_path.exists(): - # the check for files present is above, this is only to look for flags in files that we do have + # The check for files present is above; this is only to look for flags in files that we do have continue for s in strings(challenge_file_path): @@ -508,3 +561,217 @@ def lint(self, skip_hadolint=False, flag_format="flag{") -> bool: raise LintException(issues=issues) return True + + # Compare challenge requirements, will resolve all IDs to names + def _compare_challenge_requirements(self, r1: List[Union[str, int]], r2: List[Union[str, int]]) -> bool: + remote_challenges = self.load_installed_challenges() + + def normalize_requirements(requirements): + normalized = [] + for r in requirements: + if type(r) == int: + for remote_challenge in remote_challenges: + if remote_challenge["id"] == r: + normalized.append(remote_challenge["name"]) + break + else: + normalized.append(r) + + return normalized + + return normalize_requirements(r1) == normalize_requirements(r2) + + # Normalize challenge data from the API response to match challenge.yml + # It will remove any extra fields from the remote, as well as expand external references + # that have to be fetched separately (e.g., files, flags, hints, etc.) + # Note: files won't be included for two reasons: + # 1. To avoid downloading them unnecessarily, e.g., when they are ignored + # 2. Because it's dependent on the implementation whether to save them (mirror) or just compare (verify) + def _normalize_challenge(self, challenge_data: Dict[str, Any]): + challenge = {} + + copy_keys = ["name", "category", "value", "type", "state", "connection_info"] + for key in copy_keys: + if key in challenge_data: + challenge[key] = challenge_data[key] + + challenge["description"] = challenge_data["description"].strip().replace("\r\n", "\n").replace("\t", "") + challenge["attempts"] = challenge_data["max_attempts"] + + for key in ["initial", "decay", "minimum"]: + if key in challenge_data: + if "extra" not in challenge: + challenge["extra"] = {} + + challenge["extra"][key] = challenge_data[key] + + # Add flags + r = self.api.get(f"/api/v1/challenges/{self.challenge_id}/flags") + r.raise_for_status() + flags = r.json()["data"] + challenge["flags"] = [ + f["content"] + if f["type"] == "static" and (f["data"] is None or f["data"] == "") + else {"content": f["content"].strip().replace("\r\n", "\n"), "type": f["type"], "data": f["data"]} + for f in flags + ] + + # Add tags + r = self.api.get(f"/api/v1/challenges/{self.challenge_id}/tags") + r.raise_for_status() + tags = r.json()["data"] + challenge["tags"] = [t["value"] for t in tags] + + # Add hints + r = self.api.get(f"/api/v1/challenges/{self.challenge_id}/hints") + r.raise_for_status() + hints = r.json()["data"] + # skipping pre-requisites for hints because they are not supported in ctfcli + challenge["hints"] = [ + {"content": h["content"], "cost": h["cost"]} if h["cost"] > 0 else h["content"] for h in hints + ] + + # Add topics + r = self.api.get(f"/api/v1/challenges/{self.challenge_id}/topics") + r.raise_for_status() + topics = r.json()["data"] + challenge["topics"] = [t["value"] for t in topics] + + # Add requirements + r = self.api.get(f"/api/v1/challenges/{self.challenge_id}/requirements") + r.raise_for_status() + requirements = (r.json().get("data") or {}).get("prerequisites", []) + if len(requirements) > 0: + # Prefer challenge names over IDs + r = self.api.get("/api/v1/challenges") + r.raise_for_status() + challenges = r.json()["data"] + challenge["requirements"] = [c["name"] for c in challenges if c["id"] in requirements] + + return challenge + + def mirror(self, files_directory_name: str = "dist", ignore: Tuple[str] = ()) -> None: + self._load_challenge_id() + remote_challenge = self.load_installed_challenge(self.challenge_id) + challenge = self._normalize_challenge(remote_challenge) + + # Add files which are not handled in _normalize_challenge + if "files" not in ignore: + local_files = {Path(f).name: f for f in challenge.get("files", [])} + + # Update files + for remote_file in remote_challenge["files"]: + # Get base file name + remote_file_name = remote_file.split("/")[-1].split("?token=")[0] + + # The file is only present on the remote - we have to download it, and assume a path + if remote_file_name not in local_files: + r = self.api.get(remote_file) + r.raise_for_status() + + # Ensure the directory for the challenge files exists + challenge_files_directory = self.challenge_directory / files_directory_name + challenge_files_directory.mkdir(parents=True, exist_ok=True) + + (challenge_files_directory / remote_file_name).write_bytes(r.content) + if "files" not in challenge: + challenge["files"] = [] + + challenge["files"].append(f"{files_directory_name}/{remote_file_name}") + + # The file is already present in the challenge.yml - we know the desired path + else: + r = self.api.get(remote_file) + r.raise_for_status() + (self.challenge_directory / local_files[remote_file_name]).write_bytes(r.content) + + # Soft-Delete files that are not present on the remote + # Remove them from challenge.yml but do not delete them from disk + remote_file_names = [f.split("/")[-1].split("?token=")[0] for f in remote_challenge["files"]] + challenge["files"] = [f for f in challenge.get("files", []) if Path(f).name in remote_file_names] + + for key in challenge.keys(): + if key not in ignore: + self[key] = challenge[key] + + self.save() + + def verify(self, ignore: Tuple[str] = ()) -> bool: + self._load_challenge_id() + challenge = self + remote_challenge = self.load_installed_challenge(self.challenge_id) + normalized_challenge = self._normalize_challenge(remote_challenge) + + for key in normalized_challenge: + if key in ignore: + continue + + # If challenge.yml doesn't have some property from the remote + # Check if it's a default value that can be omitted + if key not in challenge: + if self.is_default_challenge_property(key, normalized_challenge[key]): + continue + + return False + + if challenge[key] != normalized_challenge[key]: + if key == "requirements": + if self._compare_challenge_requirements(challenge[key], normalized_challenge[key]): + continue + + return False + + # Handle a special case for files, unless they are ignored + if "files" not in ignore: + local_files = {Path(f).name: f for f in challenge.get("files", [])} + remote_files = {f.split("/")[-1].split("?token=")[0]: f for f in remote_challenge["files"]} + + # Check if there are no extra local files + for local_file in local_files: + if local_file not in remote_files: + return False + + # Check if all remote files are present locally + for remote_file in remote_files: + if remote_file not in local_files: + return False + + # Check if the remote files are the same as local + r = self.api.get(remote_files[remote_file]) + r.raise_for_status() + remote_file_contents = r.content + local_file_contents = (self.challenge_directory / local_files[remote_file]).read_bytes() + + if remote_file_contents != local_file_contents: + return False + + return True + + def save(self): + challenge_dict = dict(self) + + # sort the challenge dict by the key order defined from the spec + # also strip any default values + sorted_challenge_dict = { + k: challenge_dict[k] + for k in self.key_order + if k in challenge_dict and not self.is_default_challenge_property(k, challenge_dict[k]) + } + + # if there are any additional keys append them at the end + unknown_keys = set(challenge_dict) - set(self.key_order) + for k in unknown_keys: + sorted_challenge_dict[k] = challenge_dict[k] + + try: + challenge_yml = yaml.safe_dump(sorted_challenge_dict, sort_keys=False, allow_unicode=True) + + # attempt to pretty print the yaml (add an extra newline between selected top-level keys) + pattern = "|".join(r"^" + re.escape(key) + r":" for key in self.keys_with_newline) + pretty_challenge_yml = re.sub(pattern, r"\n\g<0>", challenge_yml, flags=re.MULTILINE) + + with open(self.challenge_file_path, "w") as challenge_file: + challenge_file.write(pretty_challenge_yml) + + except Exception as e: + raise InvalidChallengeFile(f"Challenge file could not be saved:\n{e}") diff --git a/tests/core/test_challenge.py b/tests/core/test_challenge.py index 3b2ef8c..184437a 100644 --- a/tests/core/test_challenge.py +++ b/tests/core/test_challenge.py @@ -1,7 +1,11 @@ +import re import unittest from pathlib import Path +from typing import List from unittest import mock -from unittest.mock import ANY, MagicMock, call +from unittest.mock import ANY, MagicMock, call, mock_open + +import yaml from ctfcli.core.challenge import Challenge from ctfcli.core.exceptions import ( @@ -1425,3 +1429,389 @@ def test_looks_for_flags_in_dist_files(self, *args, **kwargs): } self.assertDictEqual(expected_lint_issues, e.exception.issues) + + +class TestVerifyMirrorChallenge(unittest.TestCase): + installed_challenges = [ + { + "id": 1, + "type": "standard", + "name": "First Test Challenge", + "value": 150, + "solves": 0, + "solved_by_me": False, + "category": "test", + "tags": [], + "template": "view.html", + "script": "view.js", + }, + { + "id": 2, + "type": "standard", + "name": "Other Test Challenge", + "value": 200, + "solves": 0, + "solved_by_me": False, + "category": "test", + "tags": [], + "template": "view.html", + "script": "view.js", + }, + { + "id": 3, + "type": "standard", + "name": "Test Challenge", + "value": 150, + "solves": 0, + "solved_by_me": False, + "category": "Test", + "tags": [], + "template": "view.html", + "script": "view.js", + }, + ] + + minimal_challenge = BASE_DIR / "fixtures" / "challenges" / "test-challenge-minimal" / "challenge.yml" + full_challenge = BASE_DIR / "fixtures" / "challenges" / "test-challenge-full" / "challenge.yml" + + def mock_get(self, *args, **kwargs): + path = args[0] + + if path == "/api/v1/challenges?view=admin" or path == "/api/v1/challenges": + mock_response = MagicMock() + mock_response.json.return_value = {"success": True, "data": self.installed_challenges} + return mock_response + + if path == "/api/v1/challenges/3": + mock_response = MagicMock() + mock_response.json.return_value = { + "success": True, + "data": { + "id": 3, + "name": "Test Challenge", + "value": 150, + "description": "Test Description", + "connection_info": "https://example.com", + "next_id": None, + "category": "Test", + "state": "visible", + "max_attempts": 5, + "type": "standard", + "files": [ + "/files/6cccd16e23d7a7dd13f2ec4368be682b/test.png?token=jwt", + "/files/543543fd1697214513f241241212efaa/test.pdf?token=jwt", + ], + "tags": ["tag-1", "tag-2"], + "hints": [{"id": 1, "cost": 0}, {"id": 2, "cost": 100}], + "type_data": { + "id": "standard", + "name": "standard", + "templates": { + "create": "/plugins/challenges/assets/create.html", + "update": "/plugins/challenges/assets/update.html", + "view": "/plugins/challenges/assets/view.html", + }, + "scripts": { + "create": "/plugins/challenges/assets/create.js", + "update": "/plugins/challenges/assets/update.js", + "view": "/plugins/challenges/assets/view.js", + }, + }, + "solves": 0, + "solved_by_me": False, + "attempts": 0, + "view": "html view", + }, + } + return mock_response + + if path == "/api/v1/challenges/3/flags": + mock_response = MagicMock() + mock_response.json.return_value = { + "success": True, + "data": [ + { + "type": "static", + "challenge": 1, + "id": 1, + "content": "flag{test-flag}", + "data": None, + "challenge_id": 1, + }, + { + "type": "static", + "challenge": 1, + "id": 2, + "content": "flag{test-static}", + "data": "case_insensitive", + "challenge_id": 1, + }, + { + "type": "regex", + "challenge": 1, + "id": 3, + "content": "flag{test-regex-.*}", + "data": "case_insensitive", + "challenge_id": 1, + }, + ], + } + + return mock_response + + if path == "/api/v1/challenges/3/tags": + mock_response = MagicMock() + mock_response.json.return_value = { + "success": True, + "data": [ + { + "id": 1, + "challenge_id": 1, + "value": "tag-1", + }, + { + "id": 2, + "challenge_id": 1, + "value": "tag-2", + }, + ], + } + + return mock_response + + if path == "/api/v1/challenges/3/topics": + mock_response = MagicMock() + mock_response.json.return_value = { + "success": True, + "data": [ + { + "id": 1, + "challenge_id": 1, + "value": "topic-1", + "topic_id": 1, + }, + { + "id": 2, + "challenge_id": 1, + "value": "topic-2", + "topic_id": 2, + }, + ], + } + + return mock_response + + if path == "/api/v1/challenges/3/hints": + mock_response = MagicMock() + mock_response.json.return_value = { + "success": True, + "data": [ + { + "type": "standard", + "challenge": 1, + "id": 1, + "content": "free hint", + "cost": 0, + "challenge_id": 1, + "requirements": {"prerequisites": []}, + }, + { + "type": "standard", + "challenge": 1, + "id": 2, + "content": "paid hint", + "cost": 100, + "challenge_id": 1, + "requirements": {"prerequisites": []}, + }, + ], + } + + return mock_response + + if path == "/api/v1/challenges/3/requirements": + mock_response = MagicMock() + mock_response.json.return_value = {"success": True, "data": {"prerequisites": [1, 2]}} + + return mock_response + + return MagicMock() + + maxDiff = 1000 + + @mock.patch("ctfcli.core.challenge.API") + def test_normalize_fetches_and_normalizes_challenge(self, mock_api_constructor: MagicMock): + mock_api: MagicMock = mock_api_constructor.return_value + mock_api.get.side_effect = self.mock_get + + # does not matter in this test + challenge = Challenge(self.full_challenge) + challenge.challenge_id = 3 + + mock_challenge_data = { + "name": "Test Challenge", + "category": "Test", + "description": "Test Description", + "value": 150, + "max_attempts": 5, + "type": "standard", + "connection_info": "https://example.com", + "state": "hidden", + "initial": 100, + "decay": 10, + "minimum": 10, + # not including flags, tags, topics, hints, requirements as they are fetched separately (see mock_get) + # so, there's no need to place them in the mock data + } + + normalized_data = challenge._normalize_challenge(mock_challenge_data) + self.assertDictEqual( + { + "name": "Test Challenge", + "category": "Test", + "value": 150, + "type": "standard", + "state": "hidden", + "connection_info": "https://example.com", + "description": "Test Description", + "attempts": 5, + "flags": [ + "flag{test-flag}", + {"content": "flag{test-static}", "type": "static", "data": "case_insensitive"}, + {"content": "flag{test-regex-.*}", "type": "regex", "data": "case_insensitive"}, + ], + "tags": ["tag-1", "tag-2"], + "hints": ["free hint", {"content": "paid hint", "cost": 100}], + "topics": ["topic-1", "topic-2"], + "requirements": ["First Test Challenge", "Other Test Challenge"], + "extra": { + "initial": 100, + "decay": 10, + "minimum": 10, + }, + }, + normalized_data, + ) + + @mock.patch("ctfcli.core.challenge.API") + def test_verify_checks_if_challenge_is_the_same(self, mock_api_constructor: MagicMock): + mock_api: MagicMock = mock_api_constructor.return_value + mock_api.get.side_effect = self.mock_get + + challenge = Challenge(self.full_challenge) + + # pop keys with default values to see if they are ignored + for p in ["type", "state"]: + challenge.pop(p) + + challenge.challenge_id = 3 + self.assertTrue(challenge.verify(ignore=["files"])) + + @mock.patch("ctfcli.core.challenge.API") + def test_verify_checks_if_challenge_differs(self, mock_api_constructor: MagicMock): + mock_api: MagicMock = mock_api_constructor.return_value + mock_api.get.side_effect = self.mock_get + + challenge = Challenge(self.full_challenge, {"value": 200}) + challenge.challenge_id = 3 + self.assertFalse(challenge.verify(ignore=["files"])) + + @mock.patch("ctfcli.core.challenge.API") + def test_mirror_challenge(self, mock_api_constructor: MagicMock): + mock_api: MagicMock = mock_api_constructor.return_value + mock_api.get.side_effect = self.mock_get + + challenge = Challenge( + self.full_challenge, + { + "value": 200, + "description": "other description", + "connection_info": "https://other.example.com", + "flags": ["flag{other-flag}", "other-flag"], + "topics": ["other-topic-1", "other-topic-2"], + "tags": ["other-tag-1", "other-tag-2"], + "hints": ["other-free hint", {"content": "other-paid hint", "cost": 100}], + "requirements": ["Other Test Challenge"], + }, + ) + challenge.challenge_id = 3 + + with mock.patch("builtins.open", new_callable=mock_open()) as mock_open_file: + challenge.mirror(ignore=["files"]) + dumped_data = mock_open_file.return_value.__enter__().write.call_args_list[0].args[0] + + # adjust requirements for the test only, because they can be referenced as an ID and name, + # and ctfcli will update them to use the name + expected_challenge = Challenge( + self.full_challenge, + {"requirements": ["First Test Challenge", "Other Test Challenge"]}, + ) + + # pop keys with default values as they should not be in the loaded data + for k in ["state", "type"]: + expected_challenge.pop(k) + + loaded_data = yaml.safe_load(dumped_data) + self.assertDictEqual(expected_challenge, loaded_data) + + +class TestSaveChallenge(unittest.TestCase): + full_challenge = BASE_DIR / "fixtures" / "challenges" / "test-challenge-full" / "challenge.yml" + + def test_saved_content_is_valid(self): + challenge = Challenge(self.full_challenge) + + with mock.patch("builtins.open", new_callable=mock_open()) as mock_open_file: + challenge.save() + dumped_data = mock_open_file.return_value.__enter__().write.call_args_list[0].args[0] + + loaded_data = yaml.safe_load(dumped_data) + + # pop keys with default values as they should not be in the loaded data + for k in ["state", "type"]: + challenge.pop(k) + + self.assertDictEqual(challenge, loaded_data) + + def test_key_order_is_preserved(self): + challenge = Challenge(self.full_challenge) + + with mock.patch("builtins.open", new_callable=mock_open()) as mock_open_file: + challenge.save() + dumped_data = mock_open_file.return_value.__enter__().write.call_args_list[0].args[0] + + def check_order(yml: str, order: List[str]): + indices = {} + for key in order: + match = re.search(r"\b" + re.escape(key) + r"\b", yml) + + if match: + indices[key] = match.start() + else: + continue + + sorted_indices = sorted(indices.values()) + if sorted_indices == list(indices.values()): + return True + else: + return False + + key_order = challenge.key_order.copy() + for k in ["state", "type"]: + key_order.remove(k) + + self.assertTrue(check_order(dumped_data, key_order)) + + def test_additional_keys_are_appended(self): + challenge = Challenge(self.full_challenge, {"new-property": "some-value"}) + + with mock.patch("builtins.open", new_callable=mock_open()) as mock_open_file: + challenge.save() + dumped_data = mock_open_file.return_value.__enter__().write.call_args_list[0].args[0] + + # pop keys with default values as they should not be in the loaded data + for k in ["state", "type"]: + challenge.pop(k) + + loaded_data = yaml.safe_load(dumped_data) + self.assertDictEqual(challenge, loaded_data)