Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISVZ s novým schématem #252

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 49 additions & 67 deletions data/zakazky/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@
import csv
import datetime as dt
import gzip
import io
import json
import shutil
import os
import re
import ssl
from contextlib import contextmanager
from datetime import datetime
from urllib.request import Request, urlopen

from lxml.etree import iterparse

# ISVZ nema duveryhodny certy
# TODO(PR): overit
ssl._create_default_https_context = ssl._create_unverified_context


Expand Down Expand Up @@ -70,78 +71,59 @@ def read_url(url):
root_url = "https://isvz.nipez.cz/sites/default/files/content/opendata-predchozi/"
url_sources = {
"zzvz": (
root_url + "ODZZVZ/{}.xml",
root_url + "ODZZVZSS/{year}_{table}.csv",
list(range(2016, 2022 + 1)),
),
"vvz": (
root_url + "ODVVZ/{}.xml",
list(range(2006, 2016 + 1)),
),
"etrziste": (
root_url + "ODET/{}.xml",
list(range(2012, 2017 + 1)),
),
# TODO(PR): tohle je pro nas nove (a naopak vvz/etrziste nemame)
# "zzvzmo": (
# root_url + "ODZZVZMOSS/{}_{}.csv",
# list(range(2016, 2022 + 1)),
# {"VerejnaZakazka": "vz", "CastiVerejneZakazky": "casti_vz"},
# ),
# TODO(PR): tohle v datech neni
# "vvz": (
# root_url + "ODVVZ/{}.xml",
# list(range(2006, 2016 + 1)),
# ),
# "etrziste": (
# root_url + "ODET/{}.xml",
# list(range(2012, 2017 + 1)),
# ),
}


def main(outdir: str, partial: bool = False):
cdir = os.path.dirname(os.path.abspath(__file__))
with open(os.path.join(cdir, "mapping.json"), encoding="utf-8") as f:
allmaps = json.load(f)

assert list(allmaps.keys()) == ["etrziste", "vvz", "zzvz"]

for ds, mapping in allmaps.items():
filehandles, csvwriters = {}, {}

for v in mapping.values():
full_ds = f"{ds}_{v['table']}"
tfn = os.path.join(outdir, f"{full_ds}.csv")
filehandles[full_ds] = open(tfn, "w", encoding="utf8")
csvwriters[full_ds] = csv.DictWriter(
filehandles[full_ds],
fieldnames=v["header"],
lineterminator="\n",
)
csvwriters[full_ds].writeheader()

base_url, years = url_sources[ds]

for year in years:
if partial and year != years[-1]:
continue
print(ds, year)
url = base_url.format(year)
with read_url(url) as resp:
for action, element in iterparse(resp):
assert action == "end"
if element.tag not in mapping:
continue
mp = mapping[element.tag]
full_ds = f"{ds}_{mp['table']}"

row = {
el.tag: el.text.strip() if el.text else None
for el in element.getchildren()
}

for k, v in row.items():
if k in mp.get("dates", []):
row[k] = fix_date(v)
if v and k in mp.get("numeric", []):
row[k] = v.replace(",", ".")
if "ICO" in k:
ico = fix_ico(v)
if ico is None and v is not None:
print("nevalidni ico", v, f"({full_ds}, {url})")
row[k] = ico

csvwriters[full_ds].writerow(row)

element.clear()

for fh in filehandles.values():
fh.close()
with open(os.path.join(cdir, "mapping.json")) as f:
mapping = json.load(f)

for dataset, mp in mapping.items():
base_url, years = url_sources[dataset]
for table, props in mp.items():
for year in years:
print(dataset, table, year)

if partial and year != years[-1]:
continue
tdir = os.path.join(outdir, f"{dataset}_{props['table']}")
os.makedirs(tdir, exist_ok=True)
tfn = os.path.join(tdir, f"{year}.csv")
url = base_url.format(year=year, table=table)
with read_url(url) as resp, open(tfn, "wt", encoding="utf-8") as fw:
r = io.TextIOWrapper(resp, encoding="utf-8")
cr = csv.DictReader(r)
# TODO: nemame garanci poradi klicu (je to impl. detail) - asi lepsi predelat ze slovniku
cw = csv.DictWriter(fw, lineterminator="\n", fieldnames=props["header"].values())
# cw = csv.writer(fw)
cw.writeheader()
for row in cr:
nrow = {v: row[k] for k, v in props["header"].items()}
for k, v in nrow.items():
if "ico" not in k:
continue
nrow[k] = fix_ico(v)

cw.writerow(nrow)


if __name__ == "__main__":
Expand Down
Loading