Skip to content

Commit

Permalink
add support for Rust crates/Cargo package index
Browse files Browse the repository at this point in the history
  • Loading branch information
stefan6419846 committed Mar 28, 2024
1 parent ac89cdb commit 283db0b
Show file tree
Hide file tree
Showing 21 changed files with 1,030 additions and 31 deletions.
5 changes: 4 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ jobs:
python -m pip install .[dev,extended_rpm]
- name: test
run:
python -m unittest discover --verbose --start-directory tests/
coverage --branch -m unittest discover --verbose --start-directory tests/
- name: coverage
run:
coverage report
- name: lint
run:
flake8
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Development version

* Add handling for Rust crates:
* Automatically download the packages referenced in a given `Cargo.lock` file.
* Parse the metadata of `Cargo.toml` files.

# Version 0.9.0 - 2024-03-23

* Add support for `.egg-info` files for retrieving Python metadata.
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ I wrote this tool to simplify the initial scanning steps for third-party package
* Look into font files to easily analyze their metadata.
* Look into RPM file metadata.
* Look into Python package metadata.
* Look into Rust crate metadata.
* Recursively look into nested archives, for example by unpacking the actual upstream source code archives inside RPM (source) files.
* Download the package versions declared inside a `Cargo.lock` file.
* Make everything available from the terminal.

## Installation
Expand Down
29 changes: 29 additions & 0 deletions license_tools/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,37 @@ def main() -> None:
help="Retrieve Python package metadata.",
)

parser.add_argument(
"--cargo-lock-download",
action="store_true",
required=False,
default=False,
help="Instead of analyzing the files, download the packages for a Cargo.lock file.",
)
parser.add_argument(
"--cargo-lock",
type=str,
required=False,
default=None,
help="Path to the Cargo.lock file to use with `--cargo-lock-download`."
)
parser.add_argument(
"--target-directory",
type=str,
required=False,
default=None,
help="Path to write the Cargo crate files to when using the `--cargo-lock-download` option."
)

arguments = parser.parse_args()

if arguments.cargo_lock_download:
from license_tools.tools import cargo_tools
return cargo_tools.download_from_lock_file(
lock_path=arguments.cargo_lock,
target_directory=arguments.target_directory
)

retrieval.run(
directory=arguments.directory,
file_path=arguments.file,
Expand Down
15 changes: 15 additions & 0 deletions license_tools/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Copyright (c) stefan6419846. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.

from __future__ import annotations

try:
from importlib.metadata import version as _version
except ImportError:
from importlib_metadata import version as _version


VERSION: str = _version("license_tools")

del _version
5 changes: 4 additions & 1 deletion license_tools/retrieval.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import scancode_config # type: ignore[import-untyped]
from joblib import Parallel, delayed # type: ignore[import-untyped]

from license_tools.tools import font_tools, linking_tools, pip_tools, scancode_tools
from license_tools.tools import cargo_tools, font_tools, linking_tools, pip_tools, scancode_tools
from license_tools.tools.scancode_tools import FileResults, Licenses, PackageResults
from license_tools.utils import archive_utils
from license_tools.utils.path_utils import TemporaryDirectoryWithFixedName
Expand Down Expand Up @@ -182,6 +182,9 @@ def run_on_file(
FileResults,
_run_on_archive_file(path=path, short_path=short_path, default_to_none=False)
)
if path.name.startswith("Cargo.toml"):
print(short_path)
print(cargo_tools.check_metadata(path=path) + "\n")

retrieval_kwargs = RetrievalFlags.to_kwargs(flags=retrieval_flags)

Expand Down
122 changes: 122 additions & 0 deletions license_tools/tools/cargo_tools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
# Copyright (c) stefan6419846. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.

"""
Tools related to Cargo/Rust.
"""

from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path
from typing import Any, Generator

import tomli

from license_tools.utils import download_utils, rendering_utils
from license_tools.utils.download_utils import Download


# https://doc.rust-lang.org/cargo/reference/manifest.html
_VERBOSE_NAMES = {
"name": "Name",
"version": "Version",
"authors": "Authors",
"description": "Description",
"readme": "README",
"homepage": "Homepage",
"repository": "Repository",
"license": "License",
"license-file": "License File",
"keywords": "Keywords",
"categories": "Categories",
}


def read_toml(path: Path) -> dict[str, Any]:
"""
Read the given TOML file.
:param path: The file to read.
:return: The parsed file content.
"""
return tomli.loads(path.read_text())


def analyze_metadata(path: Path | str) -> dict[str, str | list[str]] | None:
"""
Analyze the Rust package metadata for the given directory.
:param path: The directory/file to analyze. Should either be a directory or `Cargo.toml` file.
:return: The package metadata.
"""
path = Path(path)
if path.name != "Cargo.toml":
if path.joinpath("Cargo.toml").exists():
path = path / "Cargo.toml"
elif len(list(path.glob("*"))) == 1:
path = next(path.glob("*")) / "Cargo.toml"
else:
raise ValueError(f"No clear Cargo.toml in {path}.")
manifest = read_toml(path)
return manifest.get("package")


def check_metadata(path: Path | str) -> str:
"""
Render the relevant details for the given package.
:param path: The package path.
:return: The rendered dictionary-like representation of the relevant fields.
"""
metadata = analyze_metadata(path)
if not metadata:
return ""
return rendering_utils.render_dictionary(
dictionary=metadata, verbose_names_mapping=_VERBOSE_NAMES, multi_value_keys={"authors", "categories", "keywords"}
)


@dataclass
class PackageVersion:
name: str
version: str
checksum: str

def to_download(self) -> Download:
return Download(
url=f"https://crates.io/api/v1/crates/{self.name}/{self.version}/download",
filename=f"{self.name}_{self.version}.crate",
sha256=self.checksum
)


def get_package_versions(lock_path: Path | str) -> Generator[PackageVersion, None, None]:
"""
Get the packages from the given lock file.
:param lock_path: The lock file to read.
:return: The packages retrieved from lock file.
"""
data = read_toml(Path(lock_path))
for package in data["package"]:
if package.get("source") != "registry+https://github.com/rust-lang/crates.io-index":
print("Skipping", package)
continue
yield PackageVersion(name=package["name"], version=package["version"], checksum=package["checksum"])


def download_from_lock_file(lock_path: Path | str, target_directory: Path | str) -> None:
"""
Download the packages from the given lock file.
:param lock_path: The lock file to read.
:param target_directory: The directory to write the packages to.
"""
target_directory = Path(target_directory)
if not target_directory.exists():
target_directory.mkdir()

downloads = [package.to_download() for package in get_package_versions(lock_path)]
download_utils.download_one_file_per_second(downloads=downloads, directory=target_directory)
23 changes: 6 additions & 17 deletions license_tools/tools/pip_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
from __future__ import annotations

from pathlib import Path

from license_tools.utils import rendering_utils

try:
from importlib.metadata import Distribution, PathDistribution
except ImportError:
Expand Down Expand Up @@ -57,20 +60,6 @@ def check_metadata(path: Path | str) -> str:
:return: The rendered dictionary-like representation of the relevant fields.
"""
metadata = analyze_metadata(path)
maximum_length = max(map(len, _VERBOSE_NAMES.values()))
rendered = []
for key, value in metadata.items():
if key not in _VERBOSE_NAMES:
continue
if key in {"licensefile", "license_classifier", "requires"} and isinstance(value, (list, set)):
if len(value) == 1:
value = value.pop()
rendered.append(f"{_VERBOSE_NAMES.get(key):>{maximum_length}}: {value}")
elif not value:
rendered.append(f"{_VERBOSE_NAMES.get(key):>{maximum_length}}:")
else:
value = "\n" + "\n".join(map(lambda x: " " * maximum_length + f" * {x}", sorted(value)))
rendered.append(f"{_VERBOSE_NAMES.get(key):>{maximum_length}}:{value}")
else:
rendered.append(f"{_VERBOSE_NAMES.get(key):>{maximum_length}}: {value}")
return "\n".join(rendered)
return rendering_utils.render_dictionary(
dictionary=metadata, verbose_names_mapping=_VERBOSE_NAMES, multi_value_keys={"licensefile", "license_classifier", "requires"}
)
97 changes: 97 additions & 0 deletions license_tools/utils/download_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# Copyright (c) stefan6419846. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.

"""
Download handling.
"""

from __future__ import annotations

import hashlib
import logging
import time
from dataclasses import dataclass
from pathlib import Path

import requests

from license_tools.constants import VERSION


logger = logging.getLogger(__name__)
del logging


USER_AGENT = f"https://github.com/stefan6419846/license_tools version {VERSION}"


class ChecksumError(ValueError):
pass


@dataclass
class Download:
url: str
filename: str
sha256: str | None = None

def verify_checksum(self, data: bytes) -> None:
"""
Check if the checksum of the given data matches the expected one.
"""
if self.sha256 is not None:
digest = hashlib.sha256(data).hexdigest()
expected = self.sha256
else:
return
if digest != expected:
raise ChecksumError(f'Checksum mismatch: Got {digest}, expected {expected}!')


class DownloadError(ValueError):
pass


def get_session() -> requests.Session:
"""
Get an identifiable session.
:return: The session which identifies us against the server.
"""
session = requests.Session()
session.headers.update({"User-Agent": USER_AGENT})
return session


def download_file(download: Download, directory: Path, session: requests.Session | None = None) -> None:
"""
Download the given file.
:param download: Download to perform.
:param directory: Directory to download to.
:param session: Session to use.
"""
if session is None:
session = get_session()
target_path = directory / download.filename
logger.info("Downloading %s to %s ...", download.url, target_path)
response = session.get(download.url)
if not response.ok:
raise DownloadError(f"Download not okay? {download.url} {response}")
download.verify_checksum(response.content)
target_path.write_bytes(response.content)


def download_one_file_per_second(downloads: list[Download], directory: Path) -> None:
"""
Download the given files with not more than one request per second. This conforms to
https://crates.io/data-access#api accordingly.
:param downloads: List of downloads to perform.
:param directory: Directory to download to.
"""
session = get_session()
for download in downloads:
download_file(download=download, directory=directory, session=session)
time.sleep(1)
Loading

0 comments on commit 283db0b

Please sign in to comment.