diff --git a/get_oss_cad_suite.py b/get_oss_cad_suite.py new file mode 100644 index 00000000..320578df --- /dev/null +++ b/get_oss_cad_suite.py @@ -0,0 +1,99 @@ +#!/usr/bin/env python3 + +# pip install progressbar +import progressbar + +import json +import pprint +import urllib.request +import platform +from pathlib import Path +from shutil import unpack_archive, rmtree + +system = platform.system().lower() +arch = platform.machine().lower() + +if arch == "x86_64" or arch == "amd64": + arch = "x64" +elif arch == "aarch64": + arch = "arm64" + +# print("platform=", platform.platform()) +print(system, arch) + +repo = "YosysHQ/oss-cad-suite-build" + +_json = json.loads( + urllib.request.urlopen( + urllib.request.Request( + f"https://api.github.com/repos/{repo}/releases/latest", + headers={"Accept": "application/vnd.github.v3+json"}, + ) + ).read() +) + +assets = _json["assets"] + +asset_ext = "tgz" if system in ["linux", "darwin"] else "zip" + +assets = [ + asset + for asset in assets + if asset["name"].endswith(asset_ext) and f"{system}-{arch}" in asset["name"] +] + +if not assets: + print(f"No suitable asset found for {system}-{arch}.") + exit(1) + +asset = assets[0] +# pprint.pprint(asset) +name = asset["name"] +size = asset["size"] +updated_at = asset["updated_at"] +url = asset["browser_download_url"] + + +class MyProgressBar: + def __init__(self): + self.pbar = None + + def __call__(self, block_num, block_size, total_size): + if not self.pbar: + self.pbar = progressbar.ProgressBar(maxval=total_size) + self.pbar.start() + + downloaded = block_num * block_size + if downloaded < total_size: + self.pbar.update(downloaded) + else: + self.pbar.finish() + + +archive_file = Path(name) +if archive_file.exists() and archive_file.stat().st_size == size: + print(f"using previously downloaded {archive_file}...") +else: + print(f"Downloading {name} ({size} Bytes) from {url}...") + urllib.request.urlretrieve(url, name, MyProgressBar()) + + +target_dir = Path.home() / ".xeda" / "tools" + +# content of the archive +target_subdir = target_dir / "oss-cad-suite" + +if target_dir.exists(): + if target_subdir.exists(): + print(f"Removing existing installation at {target_subdir}...") + rmtree(target_subdir) +else: + target_dir.mkdir(parents=True) + +print(f"Unpacking {name} to {target_subdir}...") +unpack_archive(name, target_dir) + +assert target_subdir.exists() and target_subdir.is_dir() +bin_dir = target_subdir / "bin" +assert bin_dir.exists() and bin_dir.is_dir() +print(f"Installation complete. Add {bin_dir} to your PATH.") diff --git a/pyproject.toml b/pyproject.toml index c930c08c..35181126 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -62,7 +62,7 @@ dependencies = [ "pyvcd ~= 0.3", "GitPython >= 3.1.30", "fabric >= 3.1.0", - "execnet >= 2.0.2", + "execnet >= 2.1.1", "devtools >= 0.11", ] diff --git a/src/xeda/cli.py b/src/xeda/cli.py index 673f5cff..2b5e8fe5 100644 --- a/src/xeda/cli.py +++ b/src/xeda/cli.py @@ -74,9 +74,11 @@ def setup_logger(log_level, detailed_logs, log_to_file: Optional[Path] = None): logging.getLogger().setLevel(log_level) coloredlogs.install( level=log_level, - fmt="[%(name)s] %(asctime)s %(levelname)s %(message)s" - if detailed_logs - else "%(levelname)s %(message)s", + fmt=( + "[%(name)s] %(asctime)s %(levelname)s %(message)s" + if detailed_logs + else "%(levelname)s %(message)s" + ), logger=log.root, ) if detailed_logs: @@ -107,6 +109,18 @@ def cli(ctx: click.Context, **kwargs): metavar="FLOW_NAME", type=click.Choice(all_flow_names), ) +@click.argument( + "design_file", + metavar="DESIGN", + type=click.Path( + file_okay=True, + dir_okay=False, + readable=True, + path_type=Path, + ), + default=None, + required=False, +) @click.option( "--xeda-run-dir", type=click.Path( @@ -154,12 +168,6 @@ def cli(ctx: click.Context, **kwargs): ), help="Path to Xeda project file.", ) -@click.option( - "--design-name", - # cls=ClickMutex, - # mutually_exclusive_with=["design_file"], - help="Specify design.name in case multiple designs are available in a xedaproject.", -) @click.option( "--design", "--design-file", @@ -177,6 +185,12 @@ def cli(ctx: click.Context, **kwargs): mutually_exclusive_with=["design_name"], help="Path to Xeda design file containing the description of a single design.", ) +@click.option( + "--design-name", + cls=ClickMutex, + mutually_exclusive_with=["design_file"], + help="Specify design.name in case multiple designs are available in a xedaproject.", +) @click.option( "--design-overrides", metavar="KEY=VALUE...", @@ -201,7 +215,7 @@ def cli(ctx: click.Context, **kwargs): default=tuple(), help="""Override setting values for the executed flow. Separate multiple KEY=VALUE overrides with commas. KEY can be a hierarchical name using dot notation. Example: --settings clock_period=2.345 impl.strategy=Debug - """ + """, # examples: # FIXME move to docs # - xeda vivado_sim --flow-settings stop_time=100us # - xeda vivado_synth --flow-settings impl.strategy=Debug --flow-settings clock_period=2.345 @@ -233,8 +247,9 @@ def cli(ctx: click.Context, **kwargs): def run( ctx: click.Context, flow: str, - cached_dependencies: bool, - flow_settings: Union[None, str, Iterable[str]], + design_file: Optional[str] = None, + cached_dependencies: bool = True, + flow_settings: Union[None, str, Iterable[str]] = None, incremental: bool = True, clean: bool = False, xeda_run_dir: Optional[Path] = None, @@ -268,6 +283,11 @@ def run( else: flow_settings = [] + if not design and design_file: + design = design_file + if not design: + sys.exit("No design file specified!") + if remote: from .flow_runner.remote import RemoteRunner @@ -674,7 +694,7 @@ def completion(_ctx: click.Context, stdout, shell=None): completion_class = get_completion_class(shell) if completion_class: complete = completion_class( - cli=cli, ctx_args={}, prog_name=__package__, complete_var="source_xeda" + cli=cli, ctx_args={}, prog_name=__package__ or "xeda", complete_var="source_xeda" ) print(complete.source()) else: diff --git a/src/xeda/design.py b/src/xeda/design.py index 7c9ebd73..ca7aaea7 100644 --- a/src/xeda/design.py +++ b/src/xeda/design.py @@ -316,6 +316,11 @@ def src_with_type(src, stc_type): class Clock(XedaBaseModel): port: str + name: Optional[str] = None + + @validator("name", pre=True, always=True) + def _name_validate(cls, value, values) -> str: + return value or values.get("port", None) class Generator(XedaBaseModel): @@ -669,22 +674,40 @@ def _authors_from_str(cls, value): return [value] return value - def __init__( - self, - design_root: Union[None, str, os.PathLike] = None, - **data: Any, - ) -> None: + @classmethod + def process_compatibility(cls, data: Dict[str, Any]) -> Dict[str, Any]: + if "rtl" not in data: + clocks = data.pop("clocks", None) + if clocks is None: + clock = data.pop("clock", None) + if clock: + clocks = list(clock) if isinstance(clock, (list, tuple)) else [clock] + if ( + clocks + and isinstance(clocks, (list, tuple)) + and all(isinstance(c, str) for c in clocks) + ): + clocks = {c: {"port": c} for c in clocks} + data["rtl"] = { + "generator": data.pop("generator", None), + "sources": data.pop("sources", []), + "parameters": data.pop("parameters", []), + "defines": data.pop("defines", []), + "top": data.pop("top", None), + "clocks": clocks, + } + return data + + @classmethod + def process_generation(cls, data: Dict[str, Any]): + design_root = data.get("design_root") if not design_root: - design_root = data.pop("design_root", Path.cwd()) - assert design_root - if not isinstance(design_root, Path): + design_root = Path.cwd() + else: design_root = Path(design_root) - design_root = design_root.resolve() - if not data.get("design_root"): - data["design_root"] = design_root - with WorkingDirectory(design_root): - generator = data.get("rtl", {}).get("generator", None) - if generator: + generator = data.get("rtl", {}).get("generator", None) + if generator: + with WorkingDirectory(design_root): log.info("Running generator: %s", generator) if isinstance(generator, str): os.system(generator) # nosec S605 @@ -709,6 +732,28 @@ def __init__( # raise FileNotFoundError(gen_script) # args.insert(0, sys.executable) subprocess.run(args, check=True, cwd=design_root) + + @classmethod + def process_dict(cls, data: Dict[str, Any]) -> Dict[str, Any]: + data = cls.process_compatibility(data) + cls.process_generation(data) + return data + + def __init__( + self, + design_root: Union[None, str, os.PathLike] = None, + **data: Any, + ) -> None: + if not design_root: + design_root = data.pop("design_root", Path.cwd()) + assert design_root + if not isinstance(design_root, Path): + design_root = Path(design_root) + design_root = design_root.resolve() + if not data.get("design_root"): + data["design_root"] = design_root + data = Design.process_dict(data) + with WorkingDirectory(design_root): try: super().__init__(**data) except ValidationError as e: @@ -784,13 +829,6 @@ def from_toml( allow_extra: bool = False, remove_extra: Optional[List[str]] = None, ) -> DesignType: - if overrides is None: - overrides = {} - if remove_extra is None: - remove_extra = [] - if not isinstance(design_file, Path): - design_file = Path(design_file) - assert design_file.suffix == ".toml" return cls.from_file( design_file, design_root=design_root, @@ -827,7 +865,7 @@ def from_file( if "name" not in design_dict: design_name = design_file.stem design_name = removesuffix(design_name, ".xeda") - log.warning( + log.debug( "'design.name' not specified! Inferring design name: `%s` from design file name.", design_name, ) @@ -889,10 +927,13 @@ def rtl_hash(self) -> str: @cached_property def tb_hash(self) -> str: hashes = list(sorted(src.content_hash for src in self.tb.sources)) - param_strs = [f"{p}={v}" for p, v in self.tb.parameters.items()] + param_strs = [ + f"{p}={v}" for p, v in self.tb.parameters.items() # pylint: disable=no-member + ] r = bytes(", ".join(hashes + param_strs), "utf-8") return hashlib.sha3_256(r).hexdigest() + # pylint: disable=arguments-differ def dict(self): return super().dict( exclude_unset=True, diff --git a/src/xeda/flow/synth.py b/src/xeda/flow/synth.py index 346ffea4..e5d387e5 100644 --- a/src/xeda/flow/synth.py +++ b/src/xeda/flow/synth.py @@ -10,7 +10,7 @@ from ..dataclass import Field, XedaBaseModel, root_validator, validator from ..design import Design from ..units import convert_unit -from ..utils import first_value +from ..utils import first_value, first_key from .flow import Flow, FlowSettingsError from .fpga import FPGA @@ -99,7 +99,7 @@ def root_validate_phys_clock(cls, values: Dict[str, Any]) -> Dict[str, Any]: else: raise ValueError("Neither freq or period were specified") if not values.get("name"): - values["name"] = "main_clock" + values["name"] = "" return values @@ -140,22 +140,34 @@ def _synthflow_settings_root_validator(cls, values): clock_period value takes priority for that particular value and overrides that clock's period """ clocks = values.get("clocks") + # main_clock_name = "main_clock" + clock = values.pop("clock", None) clock_period = values.get("clock_period") - main_clock_name = "main_clock" - if clocks is None and "clock" in values: - clocks = {main_clock_name: values.pop("clock")} - if clocks and (len(clocks) == 1 or main_clock_name in clocks): - if main_clock_name in clocks: - main_clock = clocks[main_clock_name] - else: - main_clock = list(clocks.values())[0] - main_clock_name = list(clocks.keys())[0] - if isinstance(main_clock, PhysicalClock): - main_clock = dict(main_clock) - if clock_period: - log.debug("Setting main_clock period to %s", clock_period) - main_clock["period"] = clock_period - clocks[main_clock_name] = PhysicalClock(**main_clock) + if (not clocks) and (clock or clock_period): + if not clock: + clock = {"period": clock_period} + if not isinstance(clock, PhysicalClock): + assert isinstance( + clock, dict + ), "clock should be a dictionary or PhysicalClock instance" + if clock_period: # overrides the period value + clock["period"] = clock_period + clock = PhysicalClock(**clock) + # if not clock.name: + # clock.name = main_clock_name + clocks = {clock.name: clock} + # if clocks and (len(clocks) == 1 or main_clock_name in clocks): + # if main_clock_name in clocks: + # main_clock = clocks[main_clock_name] + # else: + # main_clock = list(clocks.values())[0] + # main_clock_name = list(clocks.keys())[0] + # if isinstance(main_clock, PhysicalClock): + # main_clock = dict(main_clock) + # if clock_period: + # log.debug("Setting main_clock period to %s", clock_period) + # main_clock["period"] = clock_period + # clocks[main_clock_name] = PhysicalClock(**main_clock) if clocks: values["clocks"] = clocks return values @@ -165,6 +177,20 @@ def main_clock(self) -> Optional[PhysicalClock]: return self.clocks.get("main_clock") or first_value(self.clocks) def __init__(self, flow_settings: Settings, design: Design, run_path: Path): + # shorthand for single clock specification + if len(flow_settings.clocks) == 1 and len(design.rtl.clocks) == 1: + clock_name = first_key(flow_settings.clocks) + assert clock_name is not None + clock_obj = flow_settings.clocks.pop(clock_name) + design_clock = first_value(design.rtl.clocks) + assert design_clock is not None + if not clock_obj.port: + clock_obj.port = design_clock.port + if not clock_obj.name: + clock_obj.name = design_clock.name + clock_name = clock_obj.name + assert clock_name is not None + flow_settings.clocks[clock_name] = clock_obj for clock_name, physical_clock in flow_settings.clocks.items(): if not physical_clock.port: if clock_name not in design.rtl.clocks: @@ -188,7 +214,7 @@ def __init__(self, flow_settings: Settings, design: Design, run_path: Path): physical_clock.port = design.rtl.clocks[clock_name].port flow_settings.clocks[clock_name] = physical_clock for clock_name, clock in design.rtl.clocks.items(): - if clock_name not in flow_settings.clocks: + if clock.port not in (c.port for c in flow_settings.clocks.values()): log.critical( "No clock period or frequency was specified for clock: %s (design clock port: '%s')", clock_name, diff --git a/src/xeda/flow_runner/default_runner.py b/src/xeda/flow_runner/default_runner.py index 8e5057c1..c5bcc9fd 100644 --- a/src/xeda/flow_runner/default_runner.py +++ b/src/xeda/flow_runner/default_runner.py @@ -57,6 +57,7 @@ "FlowRunner", "DefaultRunner", "print_results", + "add_file_logger", ] log = logging.getLogger(__name__) @@ -692,11 +693,13 @@ class DefaultRunner(FlowRunner): """Executes a flow and its dependencies and then reports selected results""" -def add_file_logger(logdir: Path, timestamp: Union[None, str, datetime] = None): +def add_file_logger(logdir: Union[Path, str], timestamp: Union[None, str, datetime] = None): if timestamp is None: timestamp = datetime.now() if not isinstance(timestamp, str): timestamp = timestamp.strftime("%Y-%m-%d-%H%M%S%f")[:-3] + if not isinstance(logdir, Path): + logdir = Path(logdir) logdir.mkdir(exist_ok=True, parents=True) logfile = logdir / f"xeda_{timestamp}.log" log.info("Logging to %s", logfile) diff --git a/src/xeda/flow_runner/remote.py b/src/xeda/flow_runner/remote.py index 0e3f2ca1..1f4e95bd 100644 --- a/src/xeda/flow_runner/remote.py +++ b/src/xeda/flow_runner/remote.py @@ -1,11 +1,17 @@ +from datetime import datetime import json import logging import os import tempfile +from turtle import st import zipfile from pathlib import Path from typing import Any, Dict, Optional, Tuple, Union +import execnet +from fabric import Connection +from fabric.transfer import Transfer + from ..design import Design from ..utils import backup_existing, dump_json, settings_to_dict from ..version import __version__ @@ -22,8 +28,6 @@ def send_design(design: Design, conn, remote_path: str) -> Tuple[str, str]: root_path = design.root_path - from fabric import Connection - assert isinstance(conn, Connection) with tempfile.TemporaryDirectory() as tmpdirname: @@ -31,24 +35,37 @@ def send_design(design: Design, conn, remote_path: str) -> Tuple[str, str]: zip_file = temp_dir / f"{design.name}.zip" log.info("Preparing design archive: %s", zip_file) new_design: Dict[str, Any] = {**design.dict(), "design_root": None} - rtl: Dict[str, Any] = {} # new_design.get("rtl", {}) - tb: Dict[str, Any] = {} # new_design.get("tb", {}) + rtl: Dict[str, Any] = {} + tb: Dict[str, Any] = {} remote_sources_path = Path("sources") rtl["sources"] = [ str(remote_sources_path / src.path.relative_to(root_path)) for src in design.rtl.sources ] + rtl["defines"] = design.rtl.defines + rtl["attributes"] = design.rtl.attributes + rtl["parameters"] = design.rtl.parameters rtl["top"] = design.rtl.top rtl["clocks"] = list(map(lambda kv: (kv[0], kv[1].dict()), design.rtl.clocks.items())) + # FIXME add src type/attributes tb["sources"] = [ str(remote_sources_path / src.file.relative_to(root_path)) for src in design.tb.sources ] tb["top"] = design.tb.top + tb["cocotb"] = design.tb.cocotb + if design.tb.uut: + tb["uut"] = design.tb.uut + if design.tb.parameters: + tb["parameters"] = design.tb.parameters + if design.tb.defines: + tb["defines"] = design.tb.defines new_design["rtl"] = rtl new_design["tb"] = tb new_design["flow"] = design.flow design_file = temp_dir / f"{design.name}.xeda.json" + print(f"Design: {new_design}") with open(design_file, "w") as f: json.dump(new_design, f) + print(f"Design file: {design_file}") with zipfile.ZipFile(zip_file, mode="w") as archive: for src in design.sources_of_type("*", rtl=True, tb=True): file_path = src.path @@ -65,14 +82,15 @@ def send_design(design: Design, conn, remote_path: str) -> Tuple[str, str]: return zip_file.name, design_file.name -def remote_runner(channel, remote_path, zip_file, flow, design_file, flow_settings={}, env=None): +def remote_runner(channel, remote_path, zip_file, flow, design_file, flow_settings, env=None): + # pylint: disable=import-outside-toplevel,reimported,redefined-outer-name import os import zipfile import json + from datetime import datetime + from pathlib import Path - from xeda.flow_runner import DefaultRunner # pyright: ignore reportMissingImports - - print(f"changing directory to {remote_path}") + from xeda.flow_runner import DefaultRunner os.chdir(remote_path) if env: @@ -84,7 +102,8 @@ def remote_runner(channel, remote_path, zip_file, flow, design_file, flow_settin with zipfile.ZipFile(zip_file, mode="r") as archive: archive.extractall(path=remote_path) - xeda_run_dir = "remote_run" + # xeda_run_dir = Path("remote_run").joinpath(datetime.now().strftime("%y%m%d%H%M%S%f")) + xeda_run_dir = str(Path.cwd()) launcher = DefaultRunner( xeda_run_dir, cached_dependencies=True, @@ -92,23 +111,18 @@ def remote_runner(channel, remote_path, zip_file, flow, design_file, flow_settin clean=True, incremental=False, post_cleanup=False, - # post_cleanup_purge = True, + display_results=False, ) - f = launcher.run( flow, design=design_file, + design_allow_extra=True, flow_settings=flow_settings, ) - - results = ( - "{success: false}" - if f is None - else json.dumps( - f.results.to_dict(), - default=str, - indent=1, - ) + results = json.dumps( + {"success": False} if f is None else f.results.to_dict(), + default=str, + indent=1, ) channel.send(results) @@ -126,6 +140,15 @@ def get_login_env(conn) -> Dict[str, str]: return {kv[0]: kv[1] for kv in lines_split if len(kv) == 2} +class RemoteLogger: + + def cb(self, data): + if data is None: + log.info("Remote channel closed.") + return + print(data) + + class RemoteRunner(FlowLauncher): class Settings(FlowLauncher.Settings): clean: bool = True @@ -137,19 +160,17 @@ def run_remote( host: str, user: Optional[str] = None, port: Optional[int] = None, - flow_settings=[], + flow_settings=None, ): - flow_settings = settings_to_dict(flow_settings) # imports deferred due to "import imp" deprecation warnings from 'fabric' - import execnet - from fabric import Connection - from fabric.transfer import Transfer + + flow_settings = settings_to_dict(flow_settings or []) host_split = host.split(":") if port is None and len(host_split) == 2 and host_split[1].isnumeric(): host = host_split[0] port = int(host_split[1]) - if not isinstance(design, Design): + if isinstance(design, (str, Path)): design = Design.from_file(design) design_hash = semantic_hash( dict( @@ -164,13 +185,17 @@ def run_remote( log.info("logging in...") remote_env = get_login_env(conn) remote_env_path = remote_env.get("PATH", "") - remote_home = remote_env.get("HOME") - log.info("remote PATH=%s HOME=%s", remote_env_path, remote_home) - - remote_xeda = f"{remote_home}/.xeda" - if not Transfer(conn).is_remote_dir(remote_xeda): - conn.sftp().mkdir(remote_xeda) - remote_path = f"{remote_xeda}/{design.name}_{design_hash[:DIR_NAME_HASH_LEN]}" + remote_home = remote_env.get("HOME", ".") + log.info("Remote PATH=%s HOME=%s", remote_env_path, remote_home) + + remote_xeda = Path(remote_home) / ".xeda" + if not Transfer(conn).is_remote_dir(str(remote_xeda)): + conn.sftp().mkdir(str(remote_xeda)) + remote_xeda_run = remote_xeda / "remote_run" + if not Transfer(conn).is_remote_dir(str(remote_xeda_run)): + conn.sftp().mkdir(str(remote_xeda_run)) + # use a timestamped subdirectory to avoid any race conditions and also have the chronology clear + remote_path = str(remote_xeda_run / datetime.now().strftime("%y%m%d%H%M%S%f")) if not Transfer(conn).is_remote_dir(remote_path): conn.sftp().mkdir(remote_path) assert Transfer(conn).is_remote_dir(remote_path) @@ -194,9 +219,13 @@ def run_remote( python_exec = "python3" - gw = execnet.makegateway( - f"ssh={ssh_opt}//chdir={remote_path}//env:PATH={remote_env_path}//python={python_exec}" - ) + spec = { + "ssh": ssh_opt, + "chdir": remote_path, + "env:PATH": remote_env_path, + "python": python_exec, + } + gw = execnet.makegateway("//".join([f"{k}={v}" for k, v in spec.items()])) channel = gw.remote_exec( """ import sys, os @@ -205,9 +234,7 @@ def run_remote( ) platform, version_info, _ = channel.receive() version_info_str = ".".join(str(v) for v in version_info) - log.info( - f"Hostname:{host} Platform:{platform} Python:{version_info_str} PATH={remote_env_path}" - ) + log.info("Remote host:%s (%s python:%s)", host, platform, version_info_str) PY_MIN_VERSION = (3, 8, 0) assert version_info[0] == PY_MIN_VERSION[0] and ( version_info[1] > PY_MIN_VERSION[1] @@ -238,6 +265,19 @@ def run_remote( ) dump_json(all_settings, settings_json, backup=self.settings.backups) results = None + + receiver = RemoteLogger() + + outchan = gw.remote_exec( + """ + import sys + outchan = channel.gateway.newchannel() + sys.stderr = sys.stdout = outchan.makefile("w") + channel.send(outchan) + """ + ).receive() + outchan.setcallback(receiver.cb, endmarker=None) + try: results_channel = gw.remote_exec( remote_runner, @@ -247,11 +287,11 @@ def run_remote( design_file=design_file, flow_settings=flow_settings, ) - if not results_channel.isclosed(): results_str = results_channel.receive() if results_str: results = json.loads(results_str) + results_channel.waitclose() except execnet.gateway_base.RemoteError as e: log.critical("Remote exception: %s", e.formatted)