Skip to content

Commit

Permalink
Add type hinting
Browse files Browse the repository at this point in the history
  • Loading branch information
saratomaz committed Jan 24, 2025
1 parent 5b14214 commit 1457302
Showing 1 changed file with 62 additions and 50 deletions.
112 changes: 62 additions & 50 deletions sync_tests/utils/node.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import collections
import datetime
import json
import logging
import os
Expand All @@ -7,9 +8,11 @@
import subprocess
import time
import typing as tp
import datetime
import urllib.request
from collections import OrderedDict

from pathlib import Path
from typing import Any

from sync_tests.utils import blockfrost
from sync_tests.utils import db_sync
Expand All @@ -24,29 +27,32 @@
NODE_LOG_FILE = ROOT_TEST_PATH / f"cardano-node/node_{ENVIRONMENT}_logfile.log"


def get_testnet_value(env: str):
def get_testnet_value(env: str) -> str:
"""Returns the appropriate testnet magic value for the specified environment."""
if env == 'mainnet':
return '--mainnet'
elif env == 'testnet':
return '--testnet-magic 1097911063'
elif env == 'staging':
return '--testnet-magic 633343913'
elif env == 'shelley-qa':
return '--testnet-magic 3'
elif env == 'preview':
return '--testnet-magic 2'
elif env == 'preprod':
return '--testnet-magic 1'
if env == "mainnet":
magic_value = "--mainnet"
elif env == "testnet":
magic_value = "--testnet-magic 1097911063"
elif env == "staging":
magic_value = "--testnet-magic 633343913"
elif env == "shelley-qa":
magic_value = "--testnet-magic 3"
elif env == "preview":
magic_value = "--testnet-magic 2"
elif env == "preprod":
magic_value = "--testnet-magic 1"
else:
return None

raise ValueError(
f"Invalid environment: {env}. "
f"Must be one of: 'mainnet', 'testnet', 'staging', 'shelley-qa', "
f"'preview', or 'preprod'."
)

return magic_value


def calculate_current_slot(env):
"""
Calculate the current slot number based on the environment's predefined Byron and Shelley
start times and the current UTC time.
"""
def calculate_current_slot(env: str) -> int:
"""Calculate the current slot number based on the environment's predefined."""
current_time = datetime.datetime.now(datetime.UTC)

start_times = {
Expand All @@ -67,28 +73,24 @@ def calculate_current_slot(env):
return last_slot_no


def get_no_of_slots_in_era(env, era_name, no_of_epochs_in_era):
"""Calculate the total number of slots in an era.
Based on the environment, era name, and the number of epochs in the era.
"""
def get_no_of_slots_in_era(env: str, era_name: str, no_of_epochs_in_era: int) -> int:
"""Calculate the total number of slots in an era."""
slot_length_secs = 1
epoch_length_slots = 432000

if era_name.lower() == 'byron':
if era_name.lower() == "byron":
slot_length_secs = 20
if env == 'shelley-qa':
if env == "shelley-qa":
epoch_length_slots = 7200
if env == 'preview':
if env == "preview":
epoch_length_slots = 86400

epoch_length_secs = int(epoch_length_slots / slot_length_secs)
return int(epoch_length_secs * no_of_epochs_in_era)


def get_current_tip(env:str):
def get_current_tip(env:str) -> dict:
"""Retrieves the current tip of the Cardano node."""
current_directory = os.getcwd()
os.chdir(ROOT_TEST_PATH / "cardano-node")
cmd = "./_cardano-cli latest query tip " + get_testnet_value(env)

Expand All @@ -104,27 +106,34 @@ def get_current_tip(env:str):
output_json = json.loads(output)
tip_info["epoch"] = int(output_json.get("epoch", 0))
tip_info["block"] = int(output_json.get("block", 0))
tip_info["hash_value"] = output_json.get("hash", "")
tip_info["slot"] = int(output_json.get("slot", 0))
tip_info["era"] = output_json.get("era", "").lower()
tip_info["sync_progress"] = int(float(
output_json.get("syncProgress", 0.0))) if "syncProgress" in output_json else None
tip_info["hash_value"] = output_json.get("hash", "")
tip_info["slot"] = int(output_json.get("slot", 0))
tip_info["era"] = output_json.get("era", "").lower()
tip_info["sync_progress"] = (
int(float(output_json.get("syncProgress", 0.0)))
if "syncProgress" in output_json
else None
)

return tip_info
break # Exit the loop once the tip_info is successfully populated
except subprocess.CalledProcessError as e:
logging.error(f" === Waiting 60s before retrying to get the tip again - {i}")
logging.error(
f" !!!ERROR: command {e.cmd} return with error (code {e.returncode}): {' '.join(str(e.output).split())}")
logging.error(f" !!!ERROR: command {e.cmd} return with error (code {e.returncode}): "
f"{' '.join(str(e.output).split())}")
if "Invalid argument" in str(e.output):
db_sync.emergency_upload_artifacts(env)
exit(1)
pass
time.sleep(60)
db_sync.emergency_upload_artifacts(env)
exit(1)
else:
# If the loop completes without success, upload artifacts and exit
db_sync.emergency_upload_artifacts(env)
exit(1)

return tip_info


def wait_for_node_to_start(env: str):
def wait_for_node_to_start(env: str) -> int:
"""Waits for Cardano node to start."""
# When starting from clean state it might take ~30 secs for the cli to work.
# When starting from existing state it might take >10 mins for the cli to work (opening db and
Expand All @@ -138,7 +147,7 @@ def wait_for_node_to_start(env: str):
return start_time_seconds


def start_node(env: str, node_start_arguments: tp.Optional[tp.List[str]] = None):
def start_node(env: str, node_start_arguments: tp.Optional[tp.List[str]] = None) -> int:
"""Starts the Cardano node in the current working directory."""
os.chdir(ROOT_TEST_PATH / "cardano-node")
current_directory = os.getcwd()
Expand Down Expand Up @@ -182,16 +191,16 @@ def start_node(env: str, node_start_arguments: tp.Optional[tp.List[str]] = None)

logging.info(f"DB folder was created after {counter} seconds")
secs_to_start = wait_for_node_to_start(env)
return secs_to_start
except subprocess.CalledProcessError as e:
raise RuntimeError(
"command '{}' return with error (code {}): {}".format(
e.cmd, e.returncode, " ".join(str(e.output).split())
)
)
return secs_to_start


def get_cli_version():
def get_cli_version() -> tp.Tuple[str, str]:
"""Get cardano-cli version"""
try:
cmd = "./_cardano-cli --version"
Expand All @@ -202,16 +211,19 @@ def get_cli_version():
)
cardano_cli_version = output.split("git rev ")[0].strip()
cardano_cli_git_rev = output.split("git rev ")[1].strip()
return str(cardano_cli_version), str(cardano_cli_git_rev)
except subprocess.CalledProcessError as e:
raise RuntimeError(
"command '{}' return with error (code {}): {}".format(
e.cmd, e.returncode, " ".join(str(e.output).split())
)
)

return str(cardano_cli_version), str(cardano_cli_git_rev)


def wait_for_node_to_sync(env, sync_percentage = 99.9):
def wait_for_node_to_sync(
env: str, sync_percentage: float = 99.9
) -> tuple[int, int, str, OrderedDict, OrderedDict]:
"""Waits for the Cardano node to sync."""
era_details_dict = collections.OrderedDict()
epoch_details_dict = collections.OrderedDict()
Expand Down Expand Up @@ -315,7 +327,7 @@ def wait_for_node_to_sync(env, sync_percentage = 99.9):
return sync_duration, last_slot_no, latest_chunk_no, era_details_dict, epoch_details_dict


def copy_node_executables(build_method: str = "nix"):
def copy_node_executables(build_method: str = "nix") -> None:
"""Copies the Cardano node executables built with the specified build method."""
current_directory = os.getcwd()
os.chdir(ROOT_TEST_PATH)
Expand Down Expand Up @@ -357,7 +369,7 @@ def copy_node_executables(build_method: str = "nix"):
raise RuntimeError(f"!!! ERROR - could not copy the Cardano node executables: {e}")


def get_node_config_files(env: str, node_topology_type: tp.Optional[str] = ""):
def get_node_config_files(env: str, node_topology_type: tp.Optional[str] = "") -> None:
"""Downloads Cardano node configuration files for the specified environment."""
base_url = "https://book.play.dev.cardano.org/environments/"
filenames = [
Expand Down Expand Up @@ -400,4 +412,4 @@ def disable_p2p_node_config() -> None:
with open("config.json", "w") as new_file:
json.dump(config, new_file, indent=4)

logging.info("Configuration updated successfully.")
logging.info("Configuration updated successfully.")

0 comments on commit 1457302

Please sign in to comment.