Skip to content

Commit

Permalink
Refactoring of the observation generation workflow (#15)
Browse files Browse the repository at this point in the history
Changes:
* Rename observation generation CLI command to mkobs
* Add support for parsing additional fields
* Add more testing and minimal set of benchmarks

Bugfixes:
* Fix bug that lead to sync not properly resuming file transfer if files
were already downloaded
  • Loading branch information
hellais authored Nov 10, 2022
1 parent 7e474f9 commit 8c07302
Show file tree
Hide file tree
Showing 15 changed files with 626 additions and 377 deletions.
4 changes: 1 addition & 3 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ on:
- main
pull_request:
branches:
- '**'
- '*'
jobs:
Tests:
name: ${{ matrix.os }} / ${{ matrix.python-version }}
Expand Down Expand Up @@ -40,14 +40,12 @@ jobs:

- name: Set up venv cache
uses: actions/cache@v3
id: cache
with:
path: .venv
key: venv-${{ runner.os }}-${{ steps.full-python-version.outputs.version }}-${{ hashFiles('**/poetry.lock') }}

- name: Set up test data cache
uses: actions/cache@v3
id: cache
with:
path: tests/data/
key: tests-data-${{ hashFiles('tests/conftest.py') }}
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ __pycache__
/.coverage*
/coverage.xml
/tests/data/datadir/*
/tests/data/raw_measurements/*
/tests/data/measurements/*
/dist
/datadir
Expand Down
138 changes: 11 additions & 127 deletions oonidata/cli/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,18 @@
import gzip
import logging
import multiprocessing
from collections import defaultdict
from functools import partial
from pathlib import Path
from typing import List, Optional
from datetime import date, timedelta, datetime
from typing import List, Optional

import orjson
import click
from tqdm.contrib.logging import tqdm_logging_redirect

from oonidata import __version__
from oonidata.dataclient import (
FileEntry,
get_file_entries,
iter_measurements,
date_interval,
sync_measurements,
)
from oonidata.datautils import trim_measurement
from oonidata.db.connections import CSVConnection, ClickhouseConnection
from oonidata.fingerprintdb import FingerprintDB
from oonidata.netinfo import NetinfoDB
Expand Down Expand Up @@ -80,61 +73,6 @@ def cli():
pass


def make_filename(max_string_size: Optional[int], fe: FileEntry) -> str:
flags = ""
if max_string_size:
flags = f"_max{max_string_size}"
ts = fe.timestamp.strftime("%Y%m%d%H")
filename = f"{ts}_{fe.probe_cc}_{fe.testname}{flags}.jsonl.gz"
return filename


def download_file_entry_list(
fe_list: List[FileEntry],
output_dir: Path,
probe_cc: List[str],
test_name: List[str],
start_day: date,
end_day: date,
max_string_size: Optional[int],
):
"""
Download a list of file entries to the output dir.
It assumes that the list of file entries in the list are all pertinent to
the same testname, probe_cc, hour tuple
"""
total_fe_size = 0
output_dir = (
output_dir / fe_list[0].testname / fe_list[0].timestamp.strftime("%Y-%m-%d")
)
output_dir.mkdir(parents=True, exist_ok=True)

output_path = output_dir / make_filename(max_string_size, fe_list[0])

with gzip.open(output_path.with_suffix(".tmp"), "wb") as out_file:
for fe in fe_list:
assert fe.testname == fe_list[0].testname
assert fe.timestamp == fe_list[0].timestamp
assert fe.probe_cc == fe_list[0].probe_cc
total_fe_size += fe.size

for msmt_dict in iter_measurements(
start_day=start_day,
end_day=end_day,
probe_cc=probe_cc,
test_name=test_name,
file_entries=fe_list,
):
if max_string_size:
msmt_dict = trim_measurement(msmt_dict, max_string_size)
out_file.write(orjson.dumps(msmt_dict))
out_file.write(b"\n")

output_path.with_suffix(".tmp").rename(output_path)
return total_fe_size


@cli.command()
@click.option("--output-dir", type=Path, required=True)
@probe_cc_option
Expand All @@ -161,68 +99,14 @@ def sync(
if test_name:
click.echo(f"test_name: {','.join(test_name)}")

with tqdm_logging_redirect(unit_scale=True) as pbar:

def cb_list_fe(p):
if p.current_prefix_idx == 0:
pbar.total = p.total_prefixes
pbar.update(0)
pbar.set_description("starting prefix listing")
return
pbar.update(1)
pbar.set_description(
f"listed {p.total_file_entries} files in {p.current_prefix_idx}/{p.total_prefixes} prefixes"
)

all_file_entries = get_file_entries(
start_day=start_day,
end_day=end_day,
test_name=test_name,
probe_cc=probe_cc,
from_cans=True,
progress_callback=cb_list_fe,
)

total_fe_size = 0
to_download_fe = defaultdict(list)
for fe in all_file_entries:
if (
output_dir
/ fe.testname
/ fe.timestamp.strftime("%Y%-m-%d")
/ make_filename(max_string_size, fe)
).exists():
continue

ts = fe.timestamp.strftime("%Y%m%d%H")
# We group based on this key, so each process is writing to the same file.
# Each raw folder can have multiple files on a given hour
key = f"{ts}-{fe.testname}-{fe.probe_cc}"
to_download_fe[key].append(fe)
total_fe_size += fe.size

pbar.unit = "B"
pbar.reset(total=total_fe_size)
pbar.set_description("downloading files")
download_count = 0
with multiprocessing.Pool() as pool:
for fe_size in pool.imap_unordered(
partial(
download_file_entry_list,
output_dir=output_dir,
probe_cc=probe_cc,
test_name=test_name,
start_day=start_day,
end_day=end_day,
max_string_size=max_string_size,
),
to_download_fe.values(),
):
download_count += 1
pbar.update(fe_size)
pbar.set_description(
f"downloaded {download_count}/{len(to_download_fe)}"
)
sync_measurements(
output_dir=output_dir,
probe_cc=probe_cc,
test_name=test_name,
start_day=start_day,
end_day=end_day,
max_string_size=max_string_size,
)


def processing_worker(
Expand Down Expand Up @@ -283,7 +167,7 @@ def processing_worker(
)
@click.option("--start-at-idx", type=int, default=0)
@click.option("--fast-fail", default=False)
def process(
def mkobs(
probe_cc: List[str],
test_name: List[str],
start_day: date,
Expand All @@ -296,7 +180,7 @@ def process(
fast_fail: bool,
):
"""
Process OONI measurements to clickhouse or csv
Make observations for OONI measurements and write them into clickhouse or a CSV file
"""
FingerprintDB(datadir=data_dir, download=True)
NetinfoDB(datadir=data_dir, download=True)
Expand Down
143 changes: 135 additions & 8 deletions oonidata/dataclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@
import lz4.frame
import shutil
import orjson
import multiprocessing
import multiprocessing.pool
from pathlib import Path
from datetime import date, timedelta, datetime
from collections import defaultdict
from functools import partial

from dataclasses import dataclass

Expand All @@ -18,6 +22,9 @@
from botocore import UNSIGNED as botoSigUNSIGNED
from botocore.config import Config as botoConfig

from tqdm.contrib.logging import tqdm_logging_redirect

from oonidata.datautils import trim_measurement
from oonidata.normalize import iter_yaml_msmt_normalized
from oonidata.dataformat import trivial_id

Expand Down Expand Up @@ -262,14 +269,6 @@ def matches_filter(

return True

def log_download(self) -> None:
s = self.size / 1024 / 1024
d = "M"
if s < 1:
s = self.size / 1024
d = "K"
print(f"Downloading can {self.s3path} size {s:.1f} {d}B")

def stream_measurements(self):
body = s3.get_object(Bucket=self.bucket_name, Key=self.s3path)["Body"]
log.debug(f"streaming file {self}")
Expand Down Expand Up @@ -608,3 +607,131 @@ def iter_measurements(
total_file_entries=len(file_entries),
)
)


def make_filename(max_string_size: Optional[int], fe: FileEntry) -> str:
flags = ""
if max_string_size:
flags = f"_max{max_string_size}"
ts = fe.timestamp.strftime("%Y%m%d%H")
filename = f"{ts}_{fe.probe_cc}_{fe.testname}{flags}.jsonl.gz"
return filename


def download_file_entry_list(
fe_list: List[FileEntry],
output_dir: Path,
probe_cc: List[str],
test_name: List[str],
start_day: date,
end_day: date,
max_string_size: Optional[int],
):
"""
Download a list of file entries to the output dir.
It assumes that the list of file entries in the list are all pertinent to
the same testname, probe_cc, hour tuple
"""
total_fe_size = 0
output_dir = (
output_dir / fe_list[0].testname / fe_list[0].timestamp.strftime("%Y-%m-%d")
)
output_dir.mkdir(parents=True, exist_ok=True)

output_path = output_dir / make_filename(max_string_size, fe_list[0])

with gzip.open(output_path.with_suffix(".tmp"), "wb") as out_file:
for fe in fe_list:
assert fe.testname == fe_list[0].testname
assert fe.timestamp == fe_list[0].timestamp
assert fe.probe_cc == fe_list[0].probe_cc
total_fe_size += fe.size

for msmt_dict in iter_measurements(
start_day=start_day,
end_day=end_day,
probe_cc=probe_cc,
test_name=test_name,
file_entries=fe_list,
):
if max_string_size:
msmt_dict = trim_measurement(msmt_dict, max_string_size)
out_file.write(orjson.dumps(msmt_dict))
out_file.write(b"\n")

output_path.with_suffix(".tmp").rename(output_path)
return total_fe_size


def sync_measurements(
output_dir: Path,
probe_cc: List[str],
test_name: List[str],
start_day: date,
end_day: date,
max_string_size: Optional[int] = None,
):
with tqdm_logging_redirect(unit_scale=True) as pbar:

def cb_list_fe(p):
if p.current_prefix_idx == 0:
pbar.total = p.total_prefixes
pbar.update(0)
pbar.set_description("starting prefix listing")
return
pbar.update(1)
pbar.set_description(
f"listed {p.total_file_entries} files in {p.current_prefix_idx}/{p.total_prefixes} prefixes"
)

all_file_entries = get_file_entries(
start_day=start_day,
end_day=end_day,
test_name=test_name,
probe_cc=probe_cc,
from_cans=True,
progress_callback=cb_list_fe,
)

total_fe_size = 0
to_download_fe = defaultdict(list)
for fe in all_file_entries:
dst_path = (
output_dir
/ fe.testname
/ fe.timestamp.strftime("%Y-%m-%d")
/ make_filename(max_string_size, fe)
)
if dst_path.exists():
continue

ts = fe.timestamp.strftime("%Y%m%d%H")
# We group based on this key, so each process is writing to the same file.
# Each raw folder can have multiple files on a given hour
key = f"{ts}-{fe.testname}-{fe.probe_cc}"
to_download_fe[key].append(fe)
total_fe_size += fe.size

pbar.unit = "B"
pbar.reset(total=total_fe_size)
pbar.set_description("downloading files")
download_count = 0
with multiprocessing.Pool() as pool:
for fe_size in pool.imap_unordered(
partial(
download_file_entry_list,
output_dir=output_dir,
probe_cc=probe_cc,
test_name=test_name,
start_day=start_day,
end_day=end_day,
max_string_size=max_string_size,
),
to_download_fe.values(),
):
download_count += 1
pbar.update(fe_size)
pbar.set_description(
f"downloaded {download_count}/{len(to_download_fe)}"
)
Loading

0 comments on commit 8c07302

Please sign in to comment.