Skip to content

Commit

Permalink
fix(*): typing, linting and moved configs to own model
Browse files Browse the repository at this point in the history
  • Loading branch information
rwxd committed Jan 13, 2023
1 parent b89721c commit 8d5acfc
Show file tree
Hide file tree
Showing 10 changed files with 168 additions and 119 deletions.
29 changes: 26 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,31 @@ PROJECT_NAME := "ipams"
help:
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}'

setup: ## Setup required things
all: setup run ## run everything

@python3 -m pip install -r requirements.txt
test: pre-commit-all unit integration ## Run all tests

@python3 -m pip install -r requirements-dev.txt
setup: ## install required modules
python -m pip install -U -r requirements.txt
python -m pip install -U -r requirements-dev.txt
pre-commit install
pre-commit install-hooks

unit: ## run unit tests
python -m pytest -vvl tests/unit/ --showlocals

integration: ## run integration tests
python -m pytest -vvl --setup-show -vvl tests/integration/ --showlocals

run: ## run project
python -m $(PROJECT_NAME)

clean: ## clean cache and temp dirs
rm -rf ./.mypy_cache ./.pytest_cache
rm -f .coverage

pre-commit-all: ## run pre-commit on all files
pre-commit run --all-files

pre-commit: ## run pre-commit
pre-commit run
2 changes: 1 addition & 1 deletion ipams/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ def main() -> int:
return 0


if __name__ == "__main__":
if __name__ == '__main__':
raise SystemExit(main())
42 changes: 23 additions & 19 deletions ipams/cli.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import typer
import importlib.metadata
from ipaddress import ip_address, ip_network
from pathlib import Path
from ipams.config import parse_config

import typer
from rich.console import Console
import importlib.metadata

from ipams.config import parse_config
from ipams.netbox import NetBoxConnector
from ipams.phpipam import PhpIpamConnector

app = typer.Typer()
console = Console()
Expand All @@ -13,29 +17,29 @@

@app.command()
def ip(
ip: str = typer.Argument(..., help="IP address"),
ip: str = typer.Argument(..., help='IP address'),
config: Path = typer.Option(
default_config_path, "--config", "-c", help="Path to config file"
default_config_path, '--config', '-c', help='Path to config file'
),
):
converted = ip_address(ip)
parsed_config = parse_config(config)
for nb in parsed_config.netboxes:
table = nb.query_ip(converted)
table = NetBoxConnector(nb).query_ip(converted)
if len(table.rows) > 0:
console.print(table)

for phpipam in parsed_config.phpipams:
table = phpipam.query_ip(converted)
table = PhpIpamConnector(phpipam).query_ip(converted)
if len(table.rows) > 0:
console.print(table)


@app.command()
def host(
query: str = typer.Argument(..., help="Host name or ip address"),
query: str = typer.Argument(..., help='Host name or ip address'),
config: Path = typer.Option(
default_config_path, "--config", "-c", help="Path to config file"
default_config_path, '--config', '-c', help='Path to config file'
),
):
parsed_config = parse_config(config)
Expand All @@ -47,26 +51,26 @@ def host(

for nb in parsed_config.netboxes:
if ip:
table = nb.query_host_by_ip(ip)
table = NetBoxConnector(nb).query_host_by_ip(ip)
else:
table = nb.query_host_by_name(query)
table = NetBoxConnector(nb).query_host_by_name(query)
if len(table.rows) > 0:
console.print(table)

for phpipam in parsed_config.phpipams:
if ip:
table = phpipam.query_host_by_ip(ip)
table = PhpIpamConnector(phpipam).query_host_by_ip(ip)
else:
table = phpipam.query_host_by_name(query)
table = PhpIpamConnector(phpipam).query_host_by_name(query)
if len(table.rows) > 0:
console.print(table)


@app.command()
def network(
query: str = typer.Argument(..., help="Network name or address"),
query: str = typer.Argument(..., help='Network name or address'),
config: Path = typer.Option(
default_config_path, "--config", "-c", help="Path to config file"
default_config_path, '--config', '-c', help='Path to config file'
),
):
parsed_config = parse_config(config)
Expand All @@ -78,17 +82,17 @@ def network(

for nb in parsed_config.netboxes:
if subnet:
table = nb.query_network_by_address(subnet)
table = NetBoxConnector(nb).query_network_by_address(subnet)
else:
table = nb.query_network_by_string(query)
table = NetBoxConnector(nb).query_network_by_string(query)
if len(table.rows) > 0:
console.print(table)

for phpipam in parsed_config.phpipams:
if subnet:
table = phpipam.query_network_by_address(subnet)
table = PhpIpamConnector(phpipam).query_network_by_address(subnet)
else:
table = phpipam.query_network_by_string(query)
table = PhpIpamConnector(phpipam).query_network_by_string(query)
if len(table.rows) > 0:
console.print(table)

Expand Down
16 changes: 9 additions & 7 deletions ipams/config.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
from pathlib import Path
from pydantic import BaseModel

import yaml
from ipams.netbox import NetBoxConnector
from ipams.phpipam import PhpIpamConnector
from pydantic import BaseModel

from ipams.netbox import NetBoxConfig
from ipams.phpipam import PhpIpamConfig


class Config(BaseModel):
netboxes: list[NetBoxConnector] = []
phpipams: list[PhpIpamConnector] = []
netboxes: list[NetBoxConfig] = []
phpipams: list[PhpIpamConfig] = []


def parse_config(config: Path) -> Config:
'''Supports json or yaml config files'''
if not config.exists():
raise FileNotFoundError(f"Config file {config} does not exist")
if config.suffix in [".yaml", ".yml"]:
raise FileNotFoundError(f'Config file {config} does not exist')
if config.suffix in ['.yaml', '.yml']:
with open(config, 'r') as f:
return Config.parse_obj(yaml.safe_load(f))
return Config.parse_file(config)
44 changes: 26 additions & 18 deletions ipams/netbox.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,37 @@
from pynetbox.core.api import Api
from ipaddress import IPv4Address, IPv4Network, IPv6Address, IPv6Network
from typing import Union
from ipaddress import IPv4Address, IPv6Address, IPv4Network, IPv6Network
from ipams.query import IPQuery

from pydantic import BaseModel
from pynetbox.core.api import Api

from ipams.output import NetBoxHostTable, NetBoxIPTable, NetBoxNetworkTable


class NetBoxConnector(BaseModel):
class NetBoxConfig(BaseModel):
name: str
url: str
token: str
threading: bool = True
verify_ssl: bool = True

@property
def conn(self) -> Api:
return Api(self.url, token=self.token, threading=True)

class NetBoxConnector:
def __init__(self, config: NetBoxConfig):
self.name = config.name
self.url = config.url
self.conn = Api(
config.url,
token=config.token,
threading=config.threading,
)

def query_ip(self, ip: Union[IPv4Address, IPv6Address]) -> NetBoxIPTable:
"""Query NetBox for IP address"""
results = NetBoxIPTable(self.name)
for q_ip in self.conn.ipam.ip_addresses.filter(q=str(ip)):
results.add_row(
vrf=q_ip.vrf.name if q_ip.vrf else "",
tenant=q_ip.tenant.name if q_ip.tenant else "",
vrf=q_ip.vrf.name if q_ip.vrf else '',
tenant=q_ip.tenant.name if q_ip.tenant else '',
address=str(q_ip.address),
description=str(q_ip.description),
link=f"{self.url.rstrip('/')}/ipam/ip-addresses/{q_ip.id}/",
Expand All @@ -39,8 +47,8 @@ def query_host_by_ip(self, ip: Union[IPv4Address, IPv6Address]) -> NetBoxHostTab
device = self.conn.dcim.devices.get(id=q_ip.assigned_object.device.id)
if device:
results.add_row(
tenant=device.tenant.name if device.tenant else "",
site=device.site.name if device.site else "",
tenant=device.tenant.name if device.tenant else '',
site=device.site.name if device.site else '',
device=device.name,
address=str(q_ip.address),
link=f"{self.url.rstrip('/')}/dcim/devices/{device.id}/",
Expand All @@ -51,10 +59,10 @@ def query_host_by_name(self, name: str) -> NetBoxHostTable:
results = NetBoxHostTable(self.name)
for device in self.conn.dcim.devices.filter(q=name):
results.add_row(
tenant=device.tenant.name if device.tenant else "",
site=device.site.name if device.site else "",
tenant=device.tenant.name if device.tenant else '',
site=device.site.name if device.site else '',
device=device.name,
address=str(device.primary_ip.address if device.primary_ip4 else ""),
address=str(device.primary_ip.address if device.primary_ip4 else ''),
link=f"{self.url.rstrip('/')}/dcim/devices/{device.id}/",
)
return results
Expand All @@ -66,8 +74,8 @@ def query_network_by_address(
for q_network in self.conn.ipam.prefixes.filter(q=str(network)):
results.add_row(
network=str(q_network.prefix),
vrf=q_network.vrf.name if q_network.vrf else "",
tenant=q_network.tenant.name if q_network.tenant else "",
vrf=q_network.vrf.name if q_network.vrf else '',
tenant=q_network.tenant.name if q_network.tenant else '',
description=str(q_network.description),
link=f"{self.url.rstrip('/')}/ipam/prefixes/{q_network.id}/",
)
Expand All @@ -78,8 +86,8 @@ def query_network_by_string(self, query: str) -> NetBoxNetworkTable:
for q_network in self.conn.ipam.prefixes.filter(q=query):
results.add_row(
network=str(q_network.prefix),
vrf=q_network.vrf.name if q_network.vrf else "",
tenant=q_network.tenant.name if q_network.tenant else "",
vrf=q_network.vrf.name if q_network.vrf else '',
tenant=q_network.tenant.name if q_network.tenant else '',
description=str(q_network.description),
link=f"{self.url.rstrip('/')}/ipam/prefixes/{q_network.id}/",
)
Expand Down
Loading

0 comments on commit 8d5acfc

Please sign in to comment.