From 7b4113664212a4eff06c359b3f4fb87bf5dbd74a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arturo=20Filast=C3=B2?= Date: Mon, 7 Nov 2022 14:03:06 +0100 Subject: [PATCH] Blocking events Changes: * Refactors verdicts into the experiment_results and blocking_events model for the websites experiment * Add version information to CLI tool * Add support for older python versions (>=3.7) * Add support for running tests under CI --- .github/workflows/tests.yml | 56 ++ .gitignore | 10 +- Readme.md | 104 ++- oonidata/__init__.py | 6 + oonidata/blockingevents.py | 305 ------- oonidata/cli/process.py | 20 - oonidata/dataformat.py | 26 +- oonidata/db/create_tables.py | 42 +- oonidata/experiments/__init__.py | 0 oonidata/experiments/control.py | 283 +++++++ oonidata/experiments/experiment_result.py | 105 +++ oonidata/experiments/signal.py | 202 +++++ oonidata/experiments/websites.py | 658 +++++++++++++++ oonidata/fingerprintdb.py | 4 +- oonidata/main.py | 8 +- oonidata/observations.py | 14 +- oonidata/processing.py | 204 ----- oonidata/verdicts.py | 986 ---------------------- poetry.lock | 134 +-- pyproject.toml | 37 +- tests/test_blockingevents.py | 116 --- tests/test_experiments.py | 283 +++++++ tests/test_verdicts.py | 162 ---- 23 files changed, 1827 insertions(+), 1938 deletions(-) create mode 100644 .github/workflows/tests.yml delete mode 100644 oonidata/blockingevents.py create mode 100644 oonidata/experiments/__init__.py create mode 100644 oonidata/experiments/control.py create mode 100644 oonidata/experiments/experiment_result.py create mode 100644 oonidata/experiments/signal.py create mode 100644 oonidata/experiments/websites.py delete mode 100644 oonidata/verdicts.py delete mode 100644 tests/test_blockingevents.py create mode 100644 tests/test_experiments.py delete mode 100644 tests/test_verdicts.py diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml new file mode 100644 index 00000000..4ecdd9f3 --- /dev/null +++ b/.github/workflows/tests.yml @@ -0,0 +1,56 @@ +name: Tests +on: + push: + branches: + - main + pull_request: + branches: + - '**' +jobs: + Tests: + name: ${{ matrix.os }} / ${{ matrix.python-version }} + runs-on: ${{ matrix.os }}-latest + strategy: + matrix: + os: [Ubuntu, MacOS] + python-version: [3.7, 3.8, 3.9, "3.10"] + defaults: + run: + shell: bash + + steps: + - uses: actions/checkout@v3 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + + - name: Get full Python version + id: full-python-version + run: | + echo ::set-output name=version::$(python -c "import sys; print('-'.join(str(v) for v in sys.version_info))") + + - name: Install poetry + run: | + curl -fsS https://install.python-poetry.org | python - --preview -y + + - name: Add poetry to PATH + run: echo "$HOME/.local/bin" >> $GITHUB_PATH + + - name: Set up cache + uses: actions/cache@v3 + id: cache + with: + path: .venv + key: venv-${{ runner.os }}-${{ steps.full-python-version.outputs.version }}-${{ hashFiles('**/poetry.lock') }} + + - name: Install dependencies + run: poetry install + + - name: Run all tests + run: poetry run pytest --cov=./ --cov-report=xml -q tests + + - name: Upload coverage to codecov + uses: codecov/codecov-action@v3 + diff --git a/.gitignore b/.gitignore index e5a62ac4..a9f56875 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,9 @@ __pycache__ -/.coverage -/output -/asdata -/geoip +/.coverage* +/coverage.xml /tests/data/datadir/* /tests/data/measurements/* +/dist +/datadir +/output +/attic diff --git a/Readme.md b/Readme.md index a4a91c63..14454e3d 100644 --- a/Readme.md +++ b/Readme.md @@ -49,35 +49,97 @@ graph TD MsmtProcessor --> HTTPObservations ``` + The `measurement_processor` stage can be run either in a streaming fashion as measurements are uploaded to the collector or in batch mode by reprocessing existing raw measurements. -### Verdict generation +```mermaid +graph LR + P((Probe)) --> M{{Measurement}} + BE --> P + M --> PL[(Analysis)] + PL --> O{{Observations}} + O --> PL + PL --> BE{{ExperimentResult}} + BE --> E((Explorer)) + O --> E +``` + +### ExperimentResult generation -A verdict is the result of interpreting one or more network observations -collected within a particular testing session. For example, a verdict could -conceptually look like "we think there's interference with TLS handshakes -to 8.8.4.4:443 using dns.google as the SNI in country ZZ and AS0". +The data flow of the blocking event generation pipeline looks as follows: +```mermaid +classDiagram + direction RL + + ExperimentResult --* WebsiteExperimentResult + ExperimentResult --* WhatsAppExperimentResult + + ExperimentResult : +String measurement_uid + ExperimentResult : +datetime timestamp + ExperimentResult : +int probe_asn + ExperimentResult : +String probe_cc + ExperimentResult : +String network_type + ExperimentResult : +struct resolver + ExperimentResult : +List[str] observation_ids + ExperimentResult : +List[BlockingEvent] blocking_events + ExperimentResult : +float ok_confidence + + ExperimentResult : +bool anomaly + ExperimentResult : +bool confirmed + + class WebsiteExperimentResult { + +String domain_name + +String website_name + } -An important component to verdict generation is having some form of baseline to -establish some ground truth. This is necessary in order to establish if the -network condition we are seeing is a result of the target being offline vs it -being the result of blocking. + class WhatsAppExperimentResult { + +float web_ok_confidence + +String web_blocking_detail + + +float registration_ok_confidence + +String registration_blocking_detail + + +float endpoints_ok_confidence + +String endpoints_blocking_detail + } + + class BlockingEvent { + blocking_type: +BlockingType + blocking_subject: +String + blocking_detail: +String + blocking_meta: +json + confidence: +float + } + + class BlockingType { + <> + OK + BLOCKED + NATIONAL_BLOCK + ISP_BLOCK + LOCAL_BLOCK + SERVER_SIDE_BLOCK + DOWN + THROTTLING + } +``` -The data flow of the verdict generation pipeline looks as follows: ```mermaid -graph TD - IPInfoDB[(IPInfoDB)] --> VerdictGenerator - FingerprintDB[(FingerprintDB)] --> VerdictGenerator - - Observations --> GrouperTimeTarget[/"GROUP BY time_interval, target"/] - GrouperTimeTarget --> BaselineGenerator{{"baseline_generator()"}} - GrouperTimeTarget --> GrouperSession[/"GROUP BY session_id"/] - BaselineGenerator --> Baselines - Baselines --> VerdictGenerator - GrouperSession --> VerdictGenerator{{"verdict_generator()"}} - VerdictGenerator --> Verdicts +graph + M{{Measurement}} --> OGEN[[observationGen]] + OGEN --> |many| O{{Observations}} + O --> CGEN[[controlGen]] + O --> ODB[(ObservationDB)] + ODB --> CGEN + CGEN --> |many| CTRL{{Controls}} + CTRL --> A[[Analysis]] + FDB[(FingerprintDB)] --> A + NDB[(NetInfoDB)] --> A + O --> A + A --> |one| ER{{ExperimentResult}} + ER --> |many| BE{{BlockingEvents}} ``` Some precautions need to be taken when running the `verdict_generator()` in diff --git a/oonidata/__init__.py b/oonidata/__init__.py index e69de29b..8c1f5735 100644 --- a/oonidata/__init__.py +++ b/oonidata/__init__.py @@ -0,0 +1,6 @@ +try: + import importlib.metadata as importlib_metadata +except ModuleNotFoundError: + import importlib_metadata + +__version__ = importlib_metadata.version(__name__) diff --git a/oonidata/blockingevents.py b/oonidata/blockingevents.py deleted file mode 100644 index 9547094c..00000000 --- a/oonidata/blockingevents.py +++ /dev/null @@ -1,305 +0,0 @@ -import logging -from typing import List, Optional, NamedTuple -from enum import Enum -from datetime import datetime -from dataclasses import dataclass -from oonidata.dataformat import SIGNAL_PEM_STORE -from oonidata.fingerprintdb import FingerprintDB -from oonidata.netinfo import NetinfoDB -from oonidata.datautils import ( - TLSCertStore, - InvalidCertificateChain, -) - -from oonidata.observations import ( - DNSObservation, - HTTPObservation, - NettestObservation, - Observation, - TCPObservation, - TLSObservation, -) - -log = logging.getLogger("oonidata.events") - - -class OutcomeType(Enum): - # k: everything is OK - OK = "k" - # b: blocking is happening with an unknown scope - BLOCKED = "b" - # n: national level blocking - NATIONAL_BLOCK = "n" - # i: isp level blocking - ISP_BLOCK = "i" - # l: local blocking (school, office, home network) - LOCAL_BLOCK = "l" - # s: server-side blocking - SERVER_SIDE_BLOCK = "s" - # d: the subject is down - DOWN = "d" - # t: this is a signal indicating some form of network throttling - THROTTLING = "t" - - -def fp_scope_to_outcome(scope: Optional[str]) -> OutcomeType: - # "nat" national level blockpage - # "isp" ISP level blockpage - # "prod" text pattern related to a middlebox product - # "inst" text pattern related to a voluntary instition blockpage (school, office) - # "vbw" vague blocking word - # "fp" fingerprint for false positives - if scope == "nat": - return OutcomeType.NATIONAL_BLOCK - elif scope == "isp": - return OutcomeType.ISP_BLOCK - elif scope == "inst": - return OutcomeType.LOCAL_BLOCK - elif scope == "fp": - return OutcomeType.SERVER_SIDE_BLOCK - return OutcomeType.BLOCKED - - -class Outcome(NamedTuple): - outcome_type: OutcomeType - outcome_subject: str - outcome_detail: str - outcome_meta: dict - confidence: float - - -@dataclass -class BlockingEvent: - measurement_uid: str - report_id: str - input: str - timestamp: datetime - - probe_asn: int - probe_cc: str - - probe_as_org_name: str - probe_as_cc: str - - network_type: str - - resolver_ip: Optional[str] - resolver_asn: Optional[int] - resolver_as_org_name: Optional[str] - resolver_as_cc: Optional[str] - resolver_cc: Optional[str] - - observation_ids: List[str] - outcomes: List[Outcome] - ok_confidence: float - - anomaly: bool - confirmed: bool - - -@dataclass -class WebsiteBlockingEvent(BlockingEvent): - domain_name: str - website_name: str - - -class SignalBlockingEvent(BlockingEvent): - pass - - -def make_base_event_meta(obs: Observation) -> dict: - return dict( - measurement_uid=obs.measurement_uid, - report_id=obs.report_id, - input=obs.input, - timestamp=obs.timestamp, - probe_asn=obs.probe_asn, - probe_cc=obs.probe_cc, - probe_as_org_name=obs.probe_as_org_name, - probe_as_cc=obs.probe_as_cc, - network_type=obs.network_type, - resolver_ip=obs.resolver_ip, - resolver_asn=obs.resolver_asn, - resolver_as_org_name=obs.resolver_as_org_name, - resolver_as_cc=obs.resolver_as_cc, - resolver_cc=obs.resolver_cc, - ) - - -def make_signal_blocking_event( - nt_obs: NettestObservation, - dns_o_list: List[DNSObservation], - tcp_o_list: List[TCPObservation], - tls_o_list: List[TLSObservation], - http_o_list: List[HTTPObservation], - fingerprintdb: FingerprintDB, - netinfodb: NetinfoDB, -) -> SignalBlockingEvent: - # We got nothin', we can't do nothin' - assert len(dns_o_list) > 0, "empty DNS observation list" - - confirmed = False - anomaly = False - - signal_is_down = False - - outcomes = [] - observation_ids = [] - inconsistent_dns_answers = set() - - for dns_o in dns_o_list: - observation_ids.append(dns_o.observation_id) - if dns_o.domain_name == "uptime.signal.org": - # This DNS query is used by signal to figure out if some of it's - # services are down. - # see: https://github.com/signalapp/Signal-Android/blob/c4bc2162f23e0fd6bc25941af8fb7454d91a4a35/app/src/main/java/org/thoughtcrime/securesms/jobs/ServiceOutageDetectionJob.java#L25 - # TODO: should we do something in the case in which we can't tell - # because DNS blocking is going on (ex. in Iran)? - if dns_o.answer == "127.0.0.2": - signal_is_down = True - continue - - if dns_o.failure: - anomaly = True - outcomes.append( - Outcome( - outcome_type=OutcomeType.BLOCKED, - outcome_subject=f"{dns_o.domain_name}", - outcome_detail=f"dns.{dns_o.failure}", - outcome_meta={}, - confidence=0.8, - ) - ) - continue - - if not dns_o.is_tls_consistent: - # We don't set the anomaly flag, because this logic is very - # susceptible to false positives - # anomaly = True - outcome_meta = {"why": "tls_inconsistent", "ip": dns_o.answer} - outcome_type = OutcomeType.BLOCKED - fp = fingerprintdb.match_dns(dns_o.answer) - if fp: - outcome_type = fp_scope_to_outcome(fp.scope) - if outcome_type != OutcomeType.SERVER_SIDE_BLOCK: - confirmed = True - anomaly = True - outcome_meta["fp_name"] = fp.name - - confidence = 0.3 - # Having a TLS inconsistency is a much stronger indication than not - # knowing. - if dns_o.is_tls_consistent == False: - inconsistent_dns_answers.add(dns_o.answer) - # Only set the anomaly here - anomaly = True - confidence = 0.8 - - outcomes.append( - Outcome( - outcome_type=outcome_type, - outcome_detail="dns.inconsistent", - outcome_subject=f"{dns_o.domain_name}", - outcome_meta=outcome_meta, - confidence=confidence, - ) - ) - continue - - if signal_is_down: - # The service is down. No point in going on with the analysis - # It's still possible for the service to be down, yet there to be DNS - # level interference, so we still count the other outcomes if they were - # present. - outcomes.append( - Outcome( - outcome_type=OutcomeType.DOWN, - outcome_subject="Signal Messenger", - outcome_detail="down", - outcome_meta={}, - confidence=0.9, - ) - ) - return SignalBlockingEvent( - outcomes=outcomes, - observation_ids=observation_ids, - anomaly=anomaly, - confirmed=confirmed, - **make_base_event_meta(nt_obs), - ) - - for tcp_o in tcp_o_list: - observation_ids.append(tcp_o.observation_id) - if tcp_o.failure and tcp_o.ip not in inconsistent_dns_answers: - anomaly = True - outcomes.append( - Outcome( - outcome_type=OutcomeType.BLOCKED, - outcome_subject=f"{tcp_o.ip}:{tcp_o.port}", - outcome_detail=f"tcp.{tcp_o.failure}", - outcome_meta={}, - confidence=0.7, - ) - ) - - cert_store = TLSCertStore(pem_cert_store=SIGNAL_PEM_STORE) - for tls_o in tls_o_list: - observation_ids.append(tls_o.observation_id) - # We skip analyzing TLS handshakes that are the result of an - # inconsistent DNS resolution. - if tls_o.ip in inconsistent_dns_answers: - continue - - if tls_o.failure and not tls_o.failure.startswith("ssl_"): - anomaly = True - outcomes.append( - Outcome( - outcome_type=OutcomeType.BLOCKED, - outcome_subject=f"{tls_o.server_name}", - outcome_detail=f"tls.{tls_o.failure}", - outcome_meta={}, - confidence=0.7, - ) - ) - - if tls_o.peer_certificates: - try: - _, _ = cert_store.validate_cert_chain( - tls_o.timestamp, tls_o.peer_certificates - ) - # The server_name is listed in the SAN only for the older certs. - # Since we are pinning to only the two known signal CAs it's - # probably safe to just ignore. - except InvalidCertificateChain as exc: - anomaly = True - outcomes.append( - Outcome( - outcome_type=OutcomeType.BLOCKED, - outcome_subject=f"{tls_o.server_name}", - outcome_detail=f"tls.ssl_invalid_certificate", - outcome_meta={"cert_error": str(exc)}, - confidence=0.9, - ) - ) - - # This is an upper bound, which means we might be over-estimating blocking - ok_confidence = 1 - max(map(lambda o: o.confidence, outcomes), default=0) - outcomes.append( - Outcome( - outcome_type=OutcomeType.OK, - outcome_subject=f"Signal Messenger", - outcome_detail=f"ok", - outcome_meta={}, - confidence=ok_confidence, - ) - ) - - # TODO: add support for validating if the HTTP responses are also consistent - return SignalBlockingEvent( - outcomes=outcomes, - observation_ids=observation_ids, - anomaly=anomaly, - confirmed=confirmed, - ok_confidence=ok_confidence, - **make_base_event_meta(nt_obs), - ) diff --git a/oonidata/cli/process.py b/oonidata/cli/process.py index a41f0ef2..579752d4 100644 --- a/oonidata/cli/process.py +++ b/oonidata/cli/process.py @@ -5,9 +5,7 @@ from oonidata.fingerprintdb import FingerprintDB from oonidata.netinfo import NetinfoDB from oonidata.processing import ( - generate_website_verdicts, process_day, - write_verdicts_to_db, ) @@ -25,27 +23,10 @@ def worker(day_queue, args): else: raise Exception("Missing --csv-dir or --clickhouse") - skip_verdicts = args.skip_verdicts - if not isinstance(db, ClickhouseConnection): - skip_verdicts = True - while True: day = day_queue.get(block=True) if day == None: break - - if isinstance(db, ClickhouseConnection) and args.only_verdicts: - write_verdicts_to_db( - db, - generate_website_verdicts( - args.day, - db, - fingerprintdb, - netinfodb, - ), - ) - continue - process_day( db, fingerprintdb, @@ -54,7 +35,6 @@ def worker(day_queue, args): test_name=args.test_name, probe_cc=args.probe_cc, start_at_idx=args.start_at_idx, - skip_verdicts=skip_verdicts, fast_fail=args.fast_fail, ) diff --git a/oonidata/dataformat.py b/oonidata/dataformat.py index c7377f67..92457c56 100644 --- a/oonidata/dataformat.py +++ b/oonidata/dataformat.py @@ -14,7 +14,7 @@ from base64 import b64decode from datetime import datetime -from typing import Optional, Tuple, Union, List, Union, Mapping +from typing import Optional, Tuple, Union, List, Union, Dict from dataclasses import dataclass @@ -57,7 +57,7 @@ def guess_decode(s: bytes) -> str: return s.decode("ascii", "ignore") -def maybe_binary_data_to_str(mbd: Union[MaybeBinaryData, dict]) -> str: +def maybe_binary_data_to_str(mbd: Union[MaybeBinaryData, Dict]) -> str: if isinstance(mbd, BinaryData): return guess_decode(b64decode(mbd.data)) elif isinstance(mbd, dict): @@ -68,7 +68,7 @@ def maybe_binary_data_to_str(mbd: Union[MaybeBinaryData, dict]) -> str: raise Exception(f"Invalid type {type(mbd)} {mbd}") -def maybe_binary_data_to_bytes(mbd: Union[MaybeBinaryData, dict]) -> bytes: +def maybe_binary_data_to_bytes(mbd: Union[MaybeBinaryData, Dict]) -> bytes: if isinstance(mbd, BinaryData): return b64decode(mbd.data) elif isinstance(mbd, dict): @@ -79,7 +79,7 @@ def maybe_binary_data_to_bytes(mbd: Union[MaybeBinaryData, dict]) -> bytes: raise Exception(f"Invalid type {type(mbd)} {mbd}") -def trivial_id(raw: bytes, msm: dict) -> str: +def trivial_id(raw: bytes, msm: Dict) -> str: """Generate a trivial id of the measurement to allow upsert if needed This is used for legacy (before measurement_uid) measurements - Deterministic / stateless with no DB interaction @@ -108,7 +108,7 @@ class BaseTestKeys(BaseModel): @dataclass class BaseMeasurement(BaseModel): - annotations: Mapping[str, str] + annotations: Dict[str, str] input: Union[str, List[str], None] report_id: str @@ -135,7 +135,7 @@ class BaseMeasurement(BaseModel): probe_network_name: Optional[str] = None - test_helpers: Optional[dict] = None + test_helpers: Optional[Dict] = None data_format_version: Optional[str] = None measurement_uid: Optional[str] = None @@ -159,7 +159,7 @@ class TorInfo(BaseModel): class HTTPBase(BaseModel): body: MaybeBinaryData = None body_is_truncated: Optional[bool] = None - headers: Optional[Mapping[str, str]] = None + headers: Optional[Dict[str, str]] = None headers_list: Optional[HeadersList] = None _body_bytes = None @@ -191,13 +191,13 @@ def body_bytes(self) -> Optional[bytes]: return self._body_bytes @property - def headers_str(self) -> Optional[Mapping[str, str]]: + def headers_str(self) -> Optional[Dict[str, str]]: if not self.headers_list_str: return None return {k: v for k, v in self.headers_list_str} @property - def headers_bytes(self) -> Optional[Mapping[str, bytes]]: + def headers_bytes(self) -> Optional[Dict[str, bytes]]: if not self.headers_list_bytes: return None return {k: v for k, v in self.headers_list_bytes} @@ -350,7 +350,7 @@ class WebConnectivityControlHTTPRequest(BaseModel): body_length: Optional[int] = None failure: Failure = None title: Optional[str] = None - headers: Optional[Mapping[str, str]] = None + headers: Optional[Dict[str, str]] = None status_code: Optional[int] = None @@ -368,7 +368,7 @@ class WebConnectivityControlTCPConnectStatus(BaseModel): @dataclass class WebConnectivityControl(BaseModel): - tcp_connect: Optional[Mapping[str, WebConnectivityControlTCPConnectStatus]] = None + tcp_connect: Optional[Dict[str, WebConnectivityControlTCPConnectStatus]] = None http_request: Optional[WebConnectivityControlHTTPRequest] = None dns: Optional[WebConnectivityControlDNS] = None @@ -539,7 +539,7 @@ class URLGetterTestKeys(BaseTestKeys): @dataclass class DNSCheckTestKeys(BaseTestKeys): - lookups: Optional[Mapping[str, URLGetterTestKeys]] = None + lookups: Optional[Dict[str, URLGetterTestKeys]] = None bootstrap: Optional[URLGetterTestKeys] = None bootstrap_failure: Optional[str] = None @@ -565,7 +565,7 @@ class TorTestTarget(BaseModel): @dataclass class TorTestKeys(BaseModel): - targets: Mapping[str, TorTestTarget] + targets: Dict[str, TorTestTarget] @dataclass diff --git a/oonidata/db/create_tables.py b/oonidata/db/create_tables.py index 89a9a59b..933a9b32 100644 --- a/oonidata/db/create_tables.py +++ b/oonidata/db/create_tables.py @@ -1,6 +1,6 @@ from datetime import datetime -from typing import Optional, Tuple, List, Any, Type, Mapping +from typing import Optional, Tuple, List, Any, Type, Mapping, Dict from dataclasses import fields from oonidata.observations import ( NettestObservation, @@ -10,14 +10,18 @@ TLSObservation, HTTPObservation, ) -from oonidata.verdicts import Outcome, Verdict +from oonidata.experiments.experiment_result import ( + BlockingType, + ExperimentResult, + BlockingEvent, +) def typing_to_clickhouse(t: Any) -> str: if t == str: return "String" - if t == Optional[str] or t == Optional[bytes]: + if t in (Optional[str], Optional[bytes]): return "Nullable(String)" if t == int: @@ -41,21 +45,37 @@ def typing_to_clickhouse(t: Any) -> str: if t == float: return "Float64" + if t == dict: + return "JSON" + if t == Optional[float]: return "Nullable(Float64)" - if t == List[str]: + if t in (List[str], List[bytes]): return "Array(String)" + if t == Optional[List[str]]: return "Nullable(Array(String))" if t == Optional[List[Tuple[str, bytes]]]: return "Nullable(Array(Array(String)))" - if t == Outcome: + if t == BlockingType: return "String" - if t == Mapping[str, str]: + if t == List[BlockingEvent]: + columns = [] + for name, type in BlockingEvent.__annotations__.items(): + type_str = typing_to_clickhouse(type) + columns.append(f" {name} {type_str}") + columns_str = ",\n".join(columns) + + s = "Nested (\n" + s += columns_str + s += "\n )" + return s + + if t in (Mapping[str, str], Dict[str, str]): return "Map(String, String)" raise Exception(f"Unhandled type {t}") @@ -82,9 +102,9 @@ def create_query_for_observation(obs_class: Type[Observation]) -> Tuple[str, str ) -def create_query_for_verdict() -> Tuple[str, str]: +def create_query_for_experiment_result() -> Tuple[str, str]: columns = [] - for f in fields(Verdict): + for f in fields(ExperimentResult): type_str = typing_to_clickhouse(f.type) columns.append(f" {f.name} {type_str}") @@ -92,14 +112,14 @@ def create_query_for_verdict() -> Tuple[str, str]: return ( f""" - CREATE TABLE verdict ( + CREATE TABLE experiment_result ( {columns_str} ) ENGINE = ReplacingMergeTree ORDER BY (timestamp, observation_id, measurement_uid) SETTINGS index_granularity = 8192; """, - "verdict", + "experiment_result", ) @@ -110,7 +130,7 @@ def main(): create_query_for_observation(TLSObservation), create_query_for_observation(HTTPObservation), create_query_for_observation(NettestObservation), - create_query_for_verdict(), + create_query_for_experiment_result(), ( """ CREATE TABLE dns_consistency_tls_baseline ( diff --git a/oonidata/experiments/__init__.py b/oonidata/experiments/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/oonidata/experiments/control.py b/oonidata/experiments/control.py new file mode 100644 index 00000000..108d3b1d --- /dev/null +++ b/oonidata/experiments/control.py @@ -0,0 +1,283 @@ +from dataclasses import dataclass, field +from datetime import date +import logging + +from typing import Any, Dict, Optional, Tuple, List +from urllib.parse import urlparse +from oonidata.dataformat import WebConnectivity, WebConnectivityControl +from oonidata.datautils import one_day_dict + +from oonidata.db.connections import ClickhouseConnection + +log = logging.getLogger("oonidata.processing") + + +@dataclass +class TCPControl: + address: str + reachable_cc_asn: List[Tuple[str, int]] = field(default_factory=list) + unreachable_cc_asn: List[Tuple[str, int]] = field(default_factory=list) + + +def make_tcp_control_map( + day: date, domain_name: str, db: ClickhouseConnection +) -> Dict[str, TCPControl]: + tcp_control_map = {} + q_params = one_day_dict(day) + q_params["domain_name"] = domain_name + + q = """SELECT probe_cc, probe_asn, ip, port, failure FROM obs_tcp + WHERE domain_name = %(domain_name)s + AND timestamp >= %(start_day)s + AND timestamp <= %(end_day)s + GROUP BY probe_cc, probe_asn, ip, port, failure; + """ + res = db.execute(q, q_params) + if isinstance(res, list) and len(res) > 0: + for probe_cc, probe_asn, ip, port, failure in res: + address = f"{ip}:{port}" + tcp_control_map[address] = tcp_control_map.get(address, TCPControl(address)) + if not failure: + tcp_control_map[address].reachable_cc_asn.append((probe_cc, probe_asn)) + else: + tcp_control_map[address].unreachable_cc_asn.append( + (probe_cc, probe_asn) + ) + return tcp_control_map + + +@dataclass +class HTTPControl: + url: str + failure_cc_asn: List[Tuple[str, int]] = field(default_factory=list) + ok_cc_asn: List[Tuple[str, int]] = field(default_factory=list) + + response_body_length: int = 0 + response_body_sha1: str = "" + response_body_title: str = "" + response_body_meta_title: str = "" + + response_status_code: int = 0 + + +def maybe_get_first(l: list, default_value: Any = None) -> Optional[Any]: + try: + return l[0] + except IndexError: + return default_value + + +def make_http_control_map( + day: date, domain_name: str, db: ClickhouseConnection +) -> Dict[str, HTTPControl]: + http_control_map = {} + + q_params = one_day_dict(day) + q_params["domain_name"] = domain_name + + q = """SELECT probe_cc, probe_asn, request_url, failure FROM obs_http + WHERE domain_name = %(domain_name)s + AND timestamp >= %(start_day)s + AND timestamp <= %(end_day)s + GROUP BY probe_cc, probe_asn, request_url, failure; + """ + res = db.execute(q, q_params) + if isinstance(res, list) and len(res) > 0: + for probe_cc, probe_asn, request_url, failure in res: + http_control_map[request_url] = http_control_map.get( + request_url, HTTPControl(request_url) + ) + if not failure: + http_control_map[request_url].ok_cc_asn.append((probe_cc, probe_asn)) + else: + http_control_map[request_url].failure_cc_asn.append( + (probe_cc, probe_asn) + ) + + q = """SELECT request_url, + topK(1)(response_body_sha1), + topK(1)(response_body_length), + topK(1)(response_body_title), + topK(1)(response_body_meta_title), + topK(1)(response_status_code) + FROM obs_http + WHERE failure IS NULL + AND domain_name = %(domain_name)s + AND timestamp >= %(start_day)s + AND timestamp <= %(end_day)s + GROUP BY request_url; + """ + res = db.execute(q, q_params) + if isinstance(res, list) and len(res) > 0: + for ( + request_url, + response_body_sha1, + response_body_length, + response_body_title, + response_body_meta_title, + response_status_code, + ) in res: + http_control_map[request_url] = http_control_map.get( + request_url, HTTPControl(request_url) + ) + http_control_map[request_url].response_body_sha1 = maybe_get_first( + response_body_sha1, "" + ) + http_control_map[request_url].response_body_length = maybe_get_first( + response_body_length, "" + ) + http_control_map[request_url].response_body_title = maybe_get_first( + response_body_title, "" + ) + http_control_map[request_url].response_body_meta_title = maybe_get_first( + response_body_meta_title, "" + ) + http_control_map[request_url].response_status_code = maybe_get_first( + response_status_code, "" + ) + + return http_control_map + + +@dataclass +class DNSControl: + domain: str + nxdomain_cc_asn: List[Tuple[str, int]] = field(default_factory=list) + failure_cc_asn: List[Tuple[str, int]] = field(default_factory=list) + ok_cc_asn: List[Tuple[str, int]] = field(default_factory=list) + tls_consistent_answers: List[str] = field(default_factory=list) + answers_map: Dict[str, List[Tuple[str, str]]] = field(default_factory=dict) + + +def make_dns_control( + day: date, domain_name: str, db: ClickhouseConnection +) -> DNSControl: + dns_baseline = DNSControl(domain_name) + + q_params = one_day_dict(day) + q_params["domain_name"] = domain_name + + q = """SELECT DISTINCT(ip) FROM obs_tls + WHERE is_certificate_valid = 1 + AND domain_name = %(domain_name)s + AND timestamp >= %(start_day)s + AND timestamp <= %(end_day)s; + """ + res = db.execute(q, q_params) + if isinstance(res, list) and len(res) > 0: + dns_baseline.tls_consistent_answers = [row[0] for row in res] + + q = """SELECT probe_cc, probe_asn, failure, answer FROM obs_dns + WHERE domain_name = %(domain_name)s + AND timestamp >= %(start_day)s + AND timestamp <= %(end_day)s + GROUP BY probe_cc, probe_asn, failure, answer; + """ + res = db.execute(q, q_params) + if isinstance(res, list) and len(res) > 0: + for probe_cc, probe_asn, failure, ip in res: + if not failure: + dns_baseline.ok_cc_asn.append((probe_cc, probe_asn)) + dns_baseline.answers_map[probe_cc] = dns_baseline.answers_map.get( + probe_cc, [] + ) + if ip: + dns_baseline.answers_map[probe_cc].append((probe_asn, ip)) + else: + log.error( + f"No IP present for {domain_name} {probe_cc} ({probe_asn}) in baseline" + ) + else: + dns_baseline.failure_cc_asn.append((probe_cc, probe_asn)) + if failure == "dns_nxdomain_error": + dns_baseline.nxdomain_cc_asn.append((probe_cc, probe_asn)) + + return dns_baseline + + +def make_dns_control_from_wc( + msmt_input: str, control: WebConnectivityControl +) -> DNSControl: + domain_name = urlparse(msmt_input).hostname + + assert domain_name is not None, "domain_name is None" + + if not control or not control.dns: + return DNSControl(domain=domain_name) + + nxdomain_cc_asn = [] + if control.dns.failure == "dns_nxdomain_error": + nxdomain_cc_asn.append(("ZZ", 0)) + + ok_cc_asn = [] + failure_cc_asn = [] + if control.dns.failure is not None: + failure_cc_asn.append(("ZZ", 0)) + else: + ok_cc_asn.append(("ZZ", 0)) + + answers_map = {} + if control.dns.addrs: + answers_map["ZZ"] = [(0, ip) for ip in control.dns.addrs] + + return DNSControl( + domain=domain_name, + answers_map=answers_map, + ok_cc_asn=ok_cc_asn, + nxdomain_cc_asn=nxdomain_cc_asn, + failure_cc_asn=failure_cc_asn, + ) + + +def make_tcp_control_from_wc( + control: WebConnectivityControl, +) -> Dict[str, TCPControl]: + if not control or not control.tcp_connect: + return {} + + tcp_b_map = {} + for key, status in control.tcp_connect.items(): + if status.failure == None: + tcp_b_map[key] = TCPControl(address=key, reachable_cc_asn=[("ZZ", 0)]) + else: + tcp_b_map[key] = TCPControl(address=key, unreachable_cc_asn=[("ZZ", 0)]) + return tcp_b_map + + +def make_http_control_from_wc( + msmt: WebConnectivity, control: WebConnectivityControl +) -> Dict[str, HTTPControl]: + if not control or not control.http_request: + return {} + + if not msmt.test_keys.requests: + return {} + + http_b_map = {} + # We make the baseline apply to every URL in the response chain, XXX evaluate how much this is a good idea + for http_transaction in msmt.test_keys.requests: + if not http_transaction.request: + continue + + url = http_transaction.request.url + if control.http_request.failure == None: + http_b_map[url] = HTTPControl( + url=url, + response_body_title=control.http_request.title or "", + response_body_length=control.http_request.body_length or 0, + response_status_code=control.http_request.status_code or 0, + response_body_meta_title="", + response_body_sha1="", + ok_cc_asn=[("ZZ", 0)], + ) + else: + http_b_map[url] = HTTPControl( + url=url, + response_body_title="", + response_body_length=0, + response_status_code=0, + response_body_meta_title="", + response_body_sha1="", + failure_cc_asn=[("ZZ", 0)], + ) + return http_b_map diff --git a/oonidata/experiments/experiment_result.py b/oonidata/experiments/experiment_result.py new file mode 100644 index 00000000..fc17b178 --- /dev/null +++ b/oonidata/experiments/experiment_result.py @@ -0,0 +1,105 @@ +import logging +from typing import List, Optional, NamedTuple +from enum import Enum +from datetime import datetime +from dataclasses import dataclass + + +from oonidata.observations import ( + Observation, +) + +log = logging.getLogger("oonidata.events") + + +class BlockingType(Enum): + # k: everything is OK + OK = "k" + # b: blocking is happening with an unknown scope + BLOCKED = "b" + # n: national level blocking + NATIONAL_BLOCK = "n" + # i: isp level blocking + ISP_BLOCK = "i" + # l: local blocking (school, office, home network) + LOCAL_BLOCK = "l" + # s: server-side blocking + SERVER_SIDE_BLOCK = "s" + # d: the subject is down + DOWN = "d" + # t: this is a signal indicating some form of network throttling + THROTTLING = "t" + + +def fp_scope_to_outcome(scope: Optional[str]) -> BlockingType: + # "nat" national level blockpage + # "isp" ISP level blockpage + # "prod" text pattern related to a middlebox product + # "inst" text pattern related to a voluntary instition blockpage (school, office) + # "vbw" vague blocking word + # "fp" fingerprint for false positives + if scope == "nat": + return BlockingType.NATIONAL_BLOCK + elif scope == "isp": + return BlockingType.ISP_BLOCK + elif scope == "inst": + return BlockingType.LOCAL_BLOCK + elif scope == "fp": + return BlockingType.SERVER_SIDE_BLOCK + return BlockingType.BLOCKED + + +class BlockingEvent(NamedTuple): + blocking_type: BlockingType + blocking_subject: str + blocking_detail: str + blocking_meta: dict + confidence: float + + +@dataclass +class ExperimentResult: + measurement_uid: str + report_id: str + input: str + timestamp: datetime + + probe_asn: int + probe_cc: str + + probe_as_org_name: str + probe_as_cc: str + + network_type: str + + resolver_ip: Optional[str] + resolver_asn: Optional[int] + resolver_as_org_name: Optional[str] + resolver_as_cc: Optional[str] + resolver_cc: Optional[str] + + observation_ids: List[str] + blocking_events: List[BlockingEvent] + ok_confidence: float + + anomaly: bool + confirmed: bool + + +def make_base_result_meta(obs: Observation) -> dict: + return dict( + measurement_uid=obs.measurement_uid, + report_id=obs.report_id, + input=obs.input, + timestamp=obs.timestamp, + probe_asn=obs.probe_asn, + probe_cc=obs.probe_cc, + probe_as_org_name=obs.probe_as_org_name, + probe_as_cc=obs.probe_as_cc, + network_type=obs.network_type, + resolver_ip=obs.resolver_ip, + resolver_asn=obs.resolver_asn, + resolver_as_org_name=obs.resolver_as_org_name, + resolver_as_cc=obs.resolver_as_cc, + resolver_cc=obs.resolver_cc, + ) diff --git a/oonidata/experiments/signal.py b/oonidata/experiments/signal.py new file mode 100644 index 00000000..e19da28d --- /dev/null +++ b/oonidata/experiments/signal.py @@ -0,0 +1,202 @@ +from typing import List +from oonidata.dataformat import SIGNAL_PEM_STORE +from oonidata.datautils import InvalidCertificateChain, TLSCertStore +from oonidata.experiments.experiment_result import ( + BlockingEvent, + BlockingType, + ExperimentResult, + fp_scope_to_outcome, + make_base_result_meta, +) +from oonidata.fingerprintdb import FingerprintDB +from oonidata.netinfo import NetinfoDB +from oonidata.observations import ( + DNSObservation, + HTTPObservation, + NettestObservation, + TCPObservation, + TLSObservation, +) + + +class SignalExperimentResult(ExperimentResult): + pass + + +def make_signal_experiment_result( + nt_obs: NettestObservation, + dns_o_list: List[DNSObservation], + tcp_o_list: List[TCPObservation], + tls_o_list: List[TLSObservation], + http_o_list: List[HTTPObservation], + fingerprintdb: FingerprintDB, + netinfodb: NetinfoDB, +) -> SignalExperimentResult: + # We got nothin', we can't do nothin' + assert len(dns_o_list) > 0, "empty DNS observation list" + + confirmed = False + anomaly = False + + signal_is_down = False + + blocking_events = [] + observation_ids = [] + inconsistent_dns_answers = set() + + for dns_o in dns_o_list: + observation_ids.append(dns_o.observation_id) + if dns_o.domain_name == "uptime.signal.org": + # This DNS query is used by signal to figure out if some of it's + # services are down. + # see: https://github.com/signalapp/Signal-Android/blob/c4bc2162f23e0fd6bc25941af8fb7454d91a4a35/app/src/main/java/org/thoughtcrime/securesms/jobs/ServiceOutageDetectionJob.java#L25 + # TODO: should we do something in the case in which we can't tell + # because DNS blocking is going on (ex. in Iran)? + if dns_o.answer == "127.0.0.2": + signal_is_down = True + continue + + if dns_o.failure: + anomaly = True + blocking_events.append( + BlockingEvent( + blocking_type=BlockingType.BLOCKED, + blocking_subject=f"{dns_o.domain_name}", + blocking_detail=f"dns.{dns_o.failure}", + blocking_meta={}, + confidence=0.8, + ) + ) + continue + + if not dns_o.is_tls_consistent: + # We don't set the anomaly flag, because this logic is very + # susceptible to false positives + # anomaly = True + blocking_meta = {"why": "tls_inconsistent", "ip": dns_o.answer} + blocking_type = BlockingType.BLOCKED + fp = fingerprintdb.match_dns(dns_o.answer) + if fp: + blocking_type = fp_scope_to_outcome(fp.scope) + if blocking_type != BlockingType.SERVER_SIDE_BLOCK: + confirmed = True + anomaly = True + blocking_meta["fp_name"] = fp.name + + confidence = 0.3 + # Having a TLS inconsistency is a much stronger indication than not + # knowing. + if dns_o.is_tls_consistent == False: + inconsistent_dns_answers.add(dns_o.answer) + # Only set the anomaly here + anomaly = True + confidence = 0.8 + + blocking_events.append( + BlockingEvent( + blocking_type=blocking_type, + blocking_detail="dns.inconsistent", + blocking_subject=f"{dns_o.domain_name}", + blocking_meta=blocking_meta, + confidence=confidence, + ) + ) + continue + + if signal_is_down: + # The service is down. No point in going on with the analysis + # It's still possible for the service to be down, yet there to be DNS + # level interference, so we still count the other blocking_events if they were + # present. + blocking_events.append( + BlockingEvent( + blocking_type=BlockingType.DOWN, + blocking_subject="Signal Messenger", + blocking_detail="down", + blocking_meta={}, + confidence=0.9, + ) + ) + return SignalExperimentResult( + blocking_events=blocking_events, + observation_ids=observation_ids, + anomaly=anomaly, + confirmed=confirmed, + **make_base_result_meta(nt_obs), + ) + + for tcp_o in tcp_o_list: + observation_ids.append(tcp_o.observation_id) + if tcp_o.failure and tcp_o.ip not in inconsistent_dns_answers: + anomaly = True + blocking_events.append( + BlockingEvent( + blocking_type=BlockingType.BLOCKED, + blocking_subject=f"{tcp_o.ip}:{tcp_o.port}", + blocking_detail=f"tcp.{tcp_o.failure}", + blocking_meta={}, + confidence=0.7, + ) + ) + + cert_store = TLSCertStore(pem_cert_store=SIGNAL_PEM_STORE) + for tls_o in tls_o_list: + observation_ids.append(tls_o.observation_id) + # We skip analyzing TLS handshakes that are the result of an + # inconsistent DNS resolution. + if tls_o.ip in inconsistent_dns_answers: + continue + + if tls_o.failure and not tls_o.failure.startswith("ssl_"): + anomaly = True + blocking_events.append( + BlockingEvent( + blocking_type=BlockingType.BLOCKED, + blocking_subject=f"{tls_o.server_name}", + blocking_detail=f"tls.{tls_o.failure}", + blocking_meta={}, + confidence=0.7, + ) + ) + + if tls_o.peer_certificates: + try: + _, _ = cert_store.validate_cert_chain( + tls_o.timestamp, tls_o.peer_certificates + ) + # The server_name is listed in the SAN only for the older certs. + # Since we are pinning to only the two known signal CAs it's + # probably safe to just ignore. + except InvalidCertificateChain as exc: + anomaly = True + blocking_events.append( + BlockingEvent( + blocking_type=BlockingType.BLOCKED, + blocking_subject=f"{tls_o.server_name}", + blocking_detail=f"tls.ssl_invalid_certificate", + blocking_meta={"cert_error": str(exc)}, + confidence=0.9, + ) + ) + + # This is an upper bound, which means we might be over-estimating blocking + ok_confidence = 1 - max(map(lambda o: o.confidence, blocking_events), default=0) + blocking_events.append( + BlockingEvent( + blocking_type=BlockingType.OK, + blocking_subject=f"Signal Messenger", + blocking_detail=f"ok", + blocking_meta={}, + confidence=ok_confidence, + ) + ) + + # TODO: add support for validating if the HTTP responses are also consistent + return SignalExperimentResult( + blocking_events=blocking_events, + observation_ids=observation_ids, + anomaly=anomaly, + confirmed=confirmed, + ok_confidence=ok_confidence, + **make_base_result_meta(nt_obs), + ) diff --git a/oonidata/experiments/websites.py b/oonidata/experiments/websites.py new file mode 100644 index 00000000..9f2b8fc6 --- /dev/null +++ b/oonidata/experiments/websites.py @@ -0,0 +1,658 @@ +from dataclasses import dataclass +import ipaddress +from enum import Enum +from typing import Optional, List, Tuple, Dict +from oonidata.experiments.control import DNSControl, HTTPControl, TCPControl +from oonidata.experiments.experiment_result import ( + BlockingEvent, + BlockingType, + ExperimentResult, + fp_scope_to_outcome, + make_base_result_meta, +) + +from oonidata.fingerprintdb import FingerprintDB +from oonidata.netinfo import NetinfoDB + + +from oonidata.observations import ( + DNSObservation, + HTTPObservation, + NettestObservation, + TCPObservation, + TLSObservation, +) + +import logging + +log = logging.getLogger("oonidata.processing") + + +def is_dns_consistent( + dns_o: DNSObservation, dns_ctrl: DNSControl, netinfodb: NetinfoDB +) -> Tuple[bool, float]: + if not dns_o.answer: + return False, 0 + + try: + ipaddress.ip_address(dns_o.answer) + except ValueError: + # Not an IP, er can't do much to validate it + return False, 0 + + if dns_o.answer in dns_ctrl.tls_consistent_answers: + return True, 1.0 + + baseline_asns = set() + baseline_as_org_names = set() + + for ip in dns_ctrl.tls_consistent_answers: + ip_info = netinfodb.lookup_ip(dns_o.timestamp, ip) + if ip_info: + baseline_asns.add(ip_info.as_info.asn) + baseline_as_org_names.add(ip_info.as_info.as_org_name.lower()) + + if dns_o.answer_asn in baseline_asns: + return True, 0.9 + + # XXX maybe with the org_name we can also do something like levenshtein + # distance to get more similarities + if ( + dns_o.answer_as_org_name + and dns_o.answer_as_org_name.lower() in baseline_as_org_names + ): + return True, 0.9 + + other_answers = dns_ctrl.answers_map.copy() + other_answers.pop(dns_o.probe_cc, None) + other_ips = {} + other_asns = {} + for answer_list in other_answers.values(): + for _, ip in answer_list: + + other_ips[ip] = other_ips.get(ip, 0) + other_ips[ip] += 1 + if ip is None: + log.error(f"Missing ip for {dns_o.domain_name}") + continue + ip_info = netinfodb.lookup_ip(dns_o.timestamp, ip) + if ip_info: + asn = ip_info.as_info.asn + other_asns[asn] = other_asns.get(ip, 0) + other_asns[asn] += 1 + + if dns_o.answer in other_ips: + x = other_ips[dns_o.answer] + # This function was derived by looking for an exponential function in + # the form f(x) = c1*a^x + c2 and solving for f(0) = 0 and f(10) = 1, + # giving us a function in the form f(x) = (a^x - 1) / (a^10 - 1). We + # then choose the magic value of 0.6 by looking for a solution in a + # where f(1) ~= 0.5, doing a bit of plots and choosing a curve that + # looks reasonably sloped. + y = (pow(0.5, x) - 1) / (pow(0.5, 10) - 1) + return True, min(0.9, 0.8 * y) + + if dns_o.answer in other_asns: + x = other_asns[dns_o.answer_asn] + y = (pow(0.5, x) - 1) / (pow(0.5, 10) - 1) + return True, min(0.8, 0.7 * y) + + x = len(baseline_asns) + y = (pow(0.5, x) - 1) / (pow(0.5, 10) - 1) + return False, min(0.9, 0.8 * y) + + +def make_website_tcp_blocking_event( + tcp_o: TCPObservation, tcp_b: TCPControl +) -> BlockingEvent: + blocking_type = BlockingType.OK + blocking_detail = "ok" + blocking_subject = f"{tcp_o.ip}:{tcp_o.port}" + blocking_meta = {} + confidence = 1.0 + + if tcp_o.failure: + unreachable_cc_asn = list(tcp_b.unreachable_cc_asn) + try: + unreachable_cc_asn.remove((tcp_o.probe_cc, tcp_o.probe_asn)) + except ValueError: + log.info( + "missing failure in tcp baseline. You are probably using a control derived baseline." + ) + + reachable_count = len(tcp_b.reachable_cc_asn) + unreachable_count = len(unreachable_cc_asn) + blocking_meta = { + "unreachable_count": unreachable_count, + "reachable_count": reachable_count, + } + if reachable_count > unreachable_count: + # We are adding back 1 because we removed it above and it avoid a divide by zero + confidence = reachable_count / (reachable_count + unreachable_count + 1) + blocking_type = BlockingType.BLOCKED + elif unreachable_count > reachable_count: + confidence = (unreachable_count + 1) / ( + reachable_count + unreachable_count + 1 + ) + blocking_type = BlockingType.BLOCKED + + # TODO: should we bump up the confidence in the case of connection_reset? + blocking_detail = f"tcp.{tcp_o.failure}" + + return BlockingEvent( + blocking_type=blocking_type, + blocking_subject=blocking_subject, + blocking_detail=blocking_detail, + blocking_meta=blocking_meta, + confidence=confidence, + ) + + +def make_website_dns_blocking_event( + dns_o: DNSObservation, + dns_ctrl: DNSControl, + fingerprintdb: FingerprintDB, + netinfodb: NetinfoDB, +) -> BlockingEvent: + + blocking_subject = dns_o.domain_name + + if dns_o.fingerprint_id: + fp = fingerprintdb.get_fp(dns_o.fingerprint_id) + blocking_type = fp_scope_to_outcome(fp.scope) + confidence = 1.0 + # If we see the fingerprint in an unexpected country we should + # significantly reduce the confidence in the block + if ( + dns_o.probe_cc + and fp.expected_countries + and len(fp.expected_countries) > 0 + and dns_o.probe_cc not in fp.expected_countries + ): + log.debug( + f"Inconsistent probe_cc vs expected_countries {dns_o.probe_cc} != {fp.expected_countries}" + ) + confidence = 0.7 + + return BlockingEvent( + blocking_type=blocking_type, + blocking_subject=blocking_subject, + blocking_detail="dns.inconsistent.blockpage", + blocking_meta={"ip": dns_o.answer}, + confidence=confidence, + ) + + elif dns_o.answer_is_bogon and len(dns_ctrl.tls_consistent_answers) > 0: + return BlockingEvent( + blocking_type=BlockingType.BLOCKED, + blocking_subject=blocking_subject, + blocking_detail="dns.inconsistent.bogon", + blocking_meta={"ip": dns_o.answer}, + confidence=0.9, + ) + + elif dns_o.failure: + failure_cc_asn = list(dns_ctrl.failure_cc_asn) + try: + failure_cc_asn.remove((dns_o.probe_cc, dns_o.probe_asn)) + except ValueError: + log.info( + "missing failure for the probe in the baseline. You are probably using a control derived baseline." + ) + + failure_count = len(failure_cc_asn) + ok_count = len(dns_ctrl.ok_cc_asn) + + if dns_o.failure == "dns_nxdomain_error": + nxdomain_cc_asn = list(dns_ctrl.nxdomain_cc_asn) + try: + nxdomain_cc_asn.remove((dns_o.probe_cc, dns_o.probe_asn)) + except ValueError: + log.info( + "missing nx_domain failure for the probe in the baseline. You are probably using a control derived baseline." + ) + + nxdomain_count = len(nxdomain_cc_asn) + if ok_count > nxdomain_count: + # We give a bit extra weight to an NXDOMAIN compared to other failures + confidence = ok_count / (ok_count + nxdomain_count + 1) + confidence = min(0.8, confidence * 1.5) + blocking_type = BlockingType.BLOCKED + blocking_detail = "dns.inconsistent.nxdomain" + else: + confidence = (nxdomain_count + 1) / (ok_count + nxdomain_count + 1) + blocking_type = BlockingType.DOWN + blocking_detail = "dns.inconsistent.nxdomain" + elif ok_count > failure_count: + confidence = ok_count / (ok_count + failure_count + 1) + blocking_type = BlockingType.BLOCKED + blocking_detail = f"dns.{dns_o.failure}" + else: + confidence = (failure_count + 1) / (ok_count + failure_count + 1) + blocking_type = BlockingType.DOWN + blocking_detail = f"dns.{dns_o.failure}" + + return BlockingEvent( + blocking_type=blocking_type, + blocking_subject=blocking_subject, + blocking_detail=blocking_detail, + blocking_meta={"ip": dns_o.answer}, + confidence=0.9, + ) + + elif dns_o.is_tls_consistent == False: + return BlockingEvent( + blocking_type=BlockingType.BLOCKED, + blocking_subject=blocking_subject, + blocking_detail="dns.inconsistent.tls_mismatch", + blocking_meta={"ip": dns_o.answer, "why": "tls_inconsistent"}, + confidence=0.9, + ) + + elif dns_o.is_tls_consistent == None: + # If we are in this case, it means we weren't able to determine the + # consistency of the DNS query using TLS. This is the case either + # because the tested site is not in HTTPS and therefore we didn't + # generate a TLS measurement for it or because the target IP isn't + # listening on HTTPS (which is quite fishy). + # In either case we should flag these with being somewhat likely to be + # blocked. + ip_based_consistency, consistency_confidence = is_dns_consistent( + dns_o, dns_ctrl, netinfodb + ) + if ip_based_consistency is False and consistency_confidence > 0: + confidence = consistency_confidence + blocking_detail = "dns.inconsistent.generic" + # If the answer ASN is the same as the probe_asn, it's more likely + # to be a blockpage + if dns_o.answer_asn == dns_o.probe_asn: + blocking_detail = "dns.inconsistent.asn_match" + confidence = 0.8 + # same for the answer_cc + elif dns_o.answer_as_cc == dns_o.probe_cc: + blocking_detail = "dns.inconsistent.cc_match" + confidence = 0.7 + + return BlockingEvent( + blocking_type=BlockingType.BLOCKED, + blocking_subject=blocking_subject, + blocking_detail=blocking_detail, + blocking_meta={"ip": dns_o.answer, "why": "tls_inconsistent"}, + confidence=0.9, + ) + + # No blocking detected + return BlockingEvent( + blocking_type=BlockingType.OK, + blocking_subject=blocking_subject, + blocking_detail="ok", + blocking_meta={"ip": dns_o.answer}, + confidence=0.8, + ) + + +def make_website_tls_blocking_event( + tls_o: TLSObservation, prev_be: List[BlockingEvent] +) -> Optional[BlockingEvent]: + blocking_subject = tls_o.domain_name + + if tls_o.is_certificate_valid == False: + # We only consider it to be a TLS level verdict in cases when there is a + # certificate mismatch, but there was no DNS inconsistency. + # If the DNS was inconsistent, we will just count the DNS verdict + if ( + len( + list( + filter( + lambda v: v.blocking_detail.startswith("dns.") + and v.blocking_meta.get("ip") == tls_o.ip, + prev_be, + ) + ) + ) + > 0 + ): + return + + # TODO: this is wrong. We need to consider the baseline to establish TLS + # MITM, because the cert might be invalid also from other location (eg. + # it expired) and not due to censorship. + return BlockingEvent( + blocking_type=BlockingType.BLOCKED, + blocking_subject=blocking_subject, + blocking_detail="tls.mitm", + blocking_meta={"why": "invalid certificate"}, + confidence=0.8, + ) + + elif tls_o.failure: + if ( + len( + list( + filter( + lambda v: v.blocking_detail.startswith("tcp.") + and v.blocking_subject == f"{tls_o.ip}:443", + prev_be, + ) + ) + ) + > 0 + ): + return + + # We only consider it to be a TLS level verdict if we haven't seen any + # blocks in TCP + blocking_detail = f"tls.{tls_o.failure}" + confidence = 0.5 + + if tls_o.tls_handshake_read_count == 0 and tls_o.tls_handshake_write_count == 1: + # This means we just wrote the TLS ClientHello, let's give it a bit + # more confidence in it being a block + confidence = 0.7 + + if tls_o.failure in ("connection_closed", "connection_reset"): + confidence += 0.2 + + return BlockingEvent( + blocking_type=BlockingType.BLOCKED, + blocking_subject=blocking_subject, + blocking_detail=blocking_detail, + blocking_meta={}, + confidence=confidence, + ) + + return BlockingEvent( + blocking_type=BlockingType.OK, + blocking_subject=blocking_subject, + blocking_detail="ok", + blocking_meta={}, + confidence=1.0, + ) + + +def make_website_http_blocking_event( + http_o: HTTPObservation, + http_ctrl: HTTPControl, + prev_be: List[BlockingEvent], + fingerprintdb: FingerprintDB, +) -> Optional[BlockingEvent]: + blocking_subject = http_o.request_url + + ok_be = BlockingEvent( + blocking_type=BlockingType.OK, + blocking_subject=blocking_subject, + blocking_detail="ok", + blocking_meta={}, + confidence=0.8, + ) + + detail_prefix = "http." + if http_o.request_is_encrypted: + detail_prefix = "https." + + if http_o.failure: + # For HTTP requests we ignore cases in which we detected the blocking + # already to be happening via DNS or TCP. + if not http_o.request_is_encrypted and ( + len( + list( + filter( + lambda v: v.blocking_detail.startswith("dns.") + or ( + v.blocking_detail.startswith("tcp.") + and v.blocking_subject.endswith(":80") + ), + prev_be, + ) + ) + ) + > 0 + ): + return + + # Similarly for HTTPS we ignore cases when the block is done via TLS or TCP + if http_o.request_is_encrypted and ( + len( + list( + filter( + lambda v: v.blocking_detail.startswith("dns.") + or ( + v.blocking_detail.startswith("tcp.") + and v.blocking_subject.endswith(":443") + ) + or v.blocking_detail.startswith("tls."), + prev_be, + ) + ) + ) + > 0 + ): + return + + failure_cc_asn = list(http_ctrl.failure_cc_asn) + try: + failure_cc_asn.remove((http_o.probe_cc, http_o.probe_asn)) + except ValueError: + log.info( + "missing failure in http baseline. Either something is wrong or you are using a control derived baseline" + ) + + failure_count = len(failure_cc_asn) + ok_count = len(http_ctrl.ok_cc_asn) + if ok_count > failure_count: + # We are adding back 1 because we removed it above and it avoid a divide by zero + confidence = ok_count / (ok_count + failure_count + 1) + blocking_type = BlockingType.BLOCKED + else: + confidence = (failure_count + 1) / (ok_count + failure_count + 1) + blocking_type = BlockingType.DOWN + blocking_detail = f"{detail_prefix}{http_o.failure}" + + return BlockingEvent( + blocking_type=blocking_type, + blocking_subject=blocking_subject, + blocking_detail=blocking_detail, + blocking_meta={}, + confidence=confidence, + ) + + elif http_o.response_matches_blockpage: + blocking_type = BlockingType.BLOCKED + blocking_meta = {} + confidence = 0.7 + if http_o.request_is_encrypted: + # Likely some form of server-side blocking + blocking_type = BlockingType.SERVER_SIDE_BLOCK + confidence = 0.5 + elif http_o.fingerprint_country_consistent: + confidence = 1 + + for fp_name in http_o.response_fingerprints: + fp = fingerprintdb.get_fp(fp_name) + if fp.scope: + blocking_type = fp_scope_to_outcome(fp.scope) + blocking_meta = {"fp_name": fp.name, "why": "matched fingerprint"} + break + + blocking_detail = f"{detail_prefix}diff" + return BlockingEvent( + blocking_type=blocking_type, + blocking_subject=blocking_subject, + blocking_detail=blocking_detail, + blocking_meta=blocking_meta, + confidence=confidence, + ) + + elif not http_o.request_is_encrypted: + if http_o.response_matches_false_positive: + return ok_be + if http_o.response_body_title == http_ctrl.response_body_title: + return ok_be + if http_o.response_body_meta_title == http_ctrl.response_body_meta_title: + return ok_be + if http_o.response_body_sha1 == http_ctrl.response_body_sha1: + return ok_be + + if ( + http_o.response_body_length + and http_ctrl.response_body_length + and ( + (http_o.response_body_length + 1.0) + / (http_ctrl.response_body_length + 1.0) + < 0.7 + ) + ): + blocking_detail = f"{detail_prefix}diff.body" + return BlockingEvent( + blocking_type=BlockingType.BLOCKED, + blocking_subject=blocking_subject, + blocking_detail=blocking_detail, + blocking_meta={"why": "inconsistent body length"}, + confidence=0.6, + ) + + return ok_be + + +@dataclass +class WebsiteExperimentResult(ExperimentResult): + domain_name: str + website_name: str + + +def make_website_experiment_result( + nt_o: NettestObservation, + dns_o_list: List[DNSObservation], + dns_ctrl: DNSControl, + fingerprintdb: FingerprintDB, + netinfodb: NetinfoDB, + tcp_o_list: List[TCPObservation], + tcp_ctrl_map: Dict[str, TCPControl], + tls_o_list: List[TLSObservation], + http_o_list: List[HTTPObservation], + http_ctrl_map: Dict[str, HTTPControl], +) -> WebsiteExperimentResult: + """ + make_website_verdicts will yield many verdicts given some observations + related to a website measurement. + + We MUST pass in DNS observations, but observations of other types are + optional. This is to workaround the fact that not every version of OONI + Probe was generating all types of observations. + + The order in which we compute the verdicts is important, since the knowledge + of some form of blocking is relevant to being able to determine future + methods of blocking. + Examples of this include: + * If you know that DNS is consistent and you see a TLS certificate + validation error, you can conclude that it's a MITM + * If you see that TCP connect is failing, you will not attribute a failing + TLS to a TLS level interference (ex. SNI filtering) + + For some lists of observations we also need to pass in a baseline. The + baseline is some groundtruth related to the targets being measured that are + needed in order to draw some meaningful conclusion about it's blocking. + We need this so that we are able to exclude instances in which the target is + unavailable due to transient network failures. + + It is the job of who calls the make_website_verdicts function to build this + baseline by either running queries against the database of observations or + using some set of observations that are already in memory. + + The basic idea is that since we are going to be generating verdicts in + windows of 24h, we would have to do the baselining only once for the 24h + time window given a certain domain. + """ + blocking_events = [] + observation_ids = [] + + domain_name = dns_o_list[0].domain_name + dns_blocking_events = [] + for dns_o in dns_o_list: + observation_ids.append(dns_o.observation_id) + + assert ( + domain_name == dns_o.domain_name + ), f"Inconsistent domain_name in dns_o {dns_o.domain_name}" + dns_be = make_website_dns_blocking_event( + dns_o, dns_ctrl, fingerprintdb, netinfodb + ) + if dns_be: + dns_blocking_events.append(dns_be) + else: + # If we didn't get a DNS blocking event from an observation, it means that + # observation was a sign of everything being OK, hence we should + # ignore all the previous DNS verdicts as likely false positives and + # just consider no DNS level censorship to be happening. + # TODO: probably we want to just reduce the confidence of the DNS + # level blocks in this case by some factor. + dns_blocking_events = [] + break + + for dns_be in dns_blocking_events: + blocking_events.append(dns_be) + + if tcp_o_list: + for tcp_o in tcp_o_list: + observation_ids.append(tcp_o.observation_id) + assert ( + domain_name == tcp_o.domain_name + ), f"Inconsistent domain_name in tcp_o {tcp_o.domain_name}" + tcp_ctrl = tcp_ctrl_map.get(f"{tcp_o.ip}:{tcp_o.port}") + tcp_be = ( + make_website_tcp_blocking_event(tcp_o, tcp_ctrl) if tcp_ctrl else None + ) + if tcp_be: + blocking_events.append(tcp_be) + + if tls_o_list: + for tls_o in tls_o_list: + observation_ids.append(tls_o.observation_id) + assert ( + domain_name == tls_o.domain_name + ), f"Inconsistent domain_name in tls_o {tls_o.domain_name}" + tls_be = make_website_tls_blocking_event(tls_o, blocking_events) + if tls_be: + blocking_events.append(tls_be) + + if http_o_list: + for http_o in http_o_list: + observation_ids.append(http_o.observation_id) + assert ( + domain_name == http_o.domain_name + ), f"Inconsistent domain_name in http_o {http_o.domain_name}" + http_ctrl = http_ctrl_map.get(http_o.request_url) + http_be = ( + make_website_http_blocking_event( + http_o, http_ctrl, blocking_events, fingerprintdb + ) + if http_ctrl + else None + ) + if http_be: + blocking_events.append(http_be) + + # TODO: Should we be including this also BlockingType.DOWN,SERVER_SIDE_BLOCK or not? + ok_confidence = 1 - max( + filter( + lambda be: be.blocking_type + not in (BlockingType.OK, BlockingType.DOWN, BlockingType.SERVER_SIDE_BLOCK), + blocking_events, + ) + ) + + confirmed = False + anomaly = False + if ok_confidence == 0: + confirmed = True + if ok_confidence > 0.6: + anomaly = True + + return WebsiteExperimentResult( + blocking_events=blocking_events, + observation_ids=observation_ids, + anomaly=anomaly, + confirmed=confirmed, + ok_confidence=ok_confidence, + **make_base_result_meta(nt_o), + ) diff --git a/oonidata/fingerprintdb.py b/oonidata/fingerprintdb.py index 597aed58..f2395a43 100644 --- a/oonidata/fingerprintdb.py +++ b/oonidata/fingerprintdb.py @@ -3,7 +3,7 @@ from pathlib import Path from collections import OrderedDict -from typing import Optional, List, Mapping +from typing import Optional, List, Dict from dataclasses import dataclass, field import requests @@ -74,7 +74,7 @@ def matches_http(self, http_response: HTTPResponse) -> bool: return False -def _load_fingerprints(filepath: Path) -> Mapping[str, Fingerprint]: +def _load_fingerprints(filepath: Path) -> Dict[str, Fingerprint]: fingerprints = OrderedDict() with filepath.open() as in_file: diff --git a/oonidata/main.py b/oonidata/main.py index d9dc71ff..3cf52823 100644 --- a/oonidata/main.py +++ b/oonidata/main.py @@ -6,6 +6,7 @@ from datetime import date, timedelta, datetime from typing import List +from oonidata import __version__ from oonidata.cli.sync import run_sync from oonidata.cli.process import run_process @@ -63,6 +64,7 @@ def arg_probe_cc(p: argparse.ArgumentParser): def main(): parser = argparse.ArgumentParser("OONI Data tools") parser.set_defaults(func=lambda r: parser.print_usage()) + parser.add_argument("--version", action="store_true") subparsers = parser.add_subparsers() @@ -111,12 +113,14 @@ def main(): arg_test_name(parser_process) arg_start_day(parser_process) arg_end_day(parser_process) - parser_process.add_argument("--only-verdicts", action="store_true") - parser_process.add_argument("--skip-verdicts", action="store_true") parser_process.add_argument("--fast-fail", action="store_true") parser_process.set_defaults(func=run_process) args = parser.parse_args() + if args.version: + print(__version__) + sys.exit(0) + sys.exit(args.func(args)) diff --git a/oonidata/observations.py b/oonidata/observations.py index b41e251f..dc5d9a1f 100644 --- a/oonidata/observations.py +++ b/oonidata/observations.py @@ -6,7 +6,7 @@ from dataclasses import dataclass, field from urllib.parse import urlparse, urlsplit from datetime import datetime, timedelta -from typing import Callable, Generator, Optional, List, Tuple, Union, Mapping +from typing import Callable, Generator, Optional, List, Tuple, Union, Dict from oonidata.dataformat import SIGNAL_PEM_STORE from oonidata.dataformat import ( @@ -179,7 +179,7 @@ class NettestObservation(Observation): __table_name__ = "obs_nettest" test_runtime: float - annotations: Mapping[str, str] + annotations: Dict[str, str] @staticmethod def from_measurement( @@ -458,7 +458,7 @@ def from_measurement( msmt: BaseMeasurement, res: TCPConnect, idx: int, - ip_to_domain: Mapping[str, str], + ip_to_domain: Dict[str, str], netinfodb: NetinfoDB, ) -> "TCPObservation": tcpo = TCPObservation( @@ -486,7 +486,7 @@ def make_tcp_observations( msmt: BaseMeasurement, tcp_connect: Optional[List[TCPConnect]], netinfodb: NetinfoDB, - ip_to_domain: Mapping[str, str] = {}, + ip_to_domain: Dict[str, str] = {}, target: str = "", ) -> Generator[TCPObservation, None, None]: if not tcp_connect: @@ -575,7 +575,7 @@ def from_measurement( tls_h: TLSHandshake, network_events: Optional[List[NetworkEvent]], idx: int, - ip_to_domain: Mapping[str, str], + ip_to_domain: Dict[str, str], netinfodb: NetinfoDB, cert_store: Optional[TLSCertStore] = None, validate_domain: Callable[[str, str, List[str]], bool] = lambda x, y, z: True, @@ -687,7 +687,7 @@ def make_tls_observations( tls_handshakes: Optional[List[TLSHandshake]], network_events: Optional[List[NetworkEvent]], netinfodb: NetinfoDB, - ip_to_domain: Mapping[str, str] = {}, + ip_to_domain: Dict[str, str] = {}, cert_store: Optional[TLSCertStore] = None, ) -> Generator[TLSObservation, None, None]: if not tls_handshakes: @@ -699,7 +699,7 @@ def make_tls_observations( ) -def make_ip_to_domain(dns_observations: List[DNSObservation]) -> Mapping[str, str]: +def make_ip_to_domain(dns_observations: List[DNSObservation]) -> Dict[str, str]: ip_to_domain = {} for obs in dns_observations: # TODO: do we want to filter out also CNAMEs? diff --git a/oonidata/processing.py b/oonidata/processing.py index 2e665517..9051238c 100644 --- a/oonidata/processing.py +++ b/oonidata/processing.py @@ -16,10 +16,7 @@ from oonidata.observations import ( NettestObservation, DNSObservation, - HTTPObservation, Observation, - TCPObservation, - TLSObservation, make_http_observations, make_dns_observations, make_tcp_observations, @@ -31,14 +28,6 @@ from oonidata.dataformat import BaseMeasurement, WebConnectivity, Tor from oonidata.fingerprintdb import FingerprintDB from oonidata.netinfo import NetinfoDB -from oonidata.verdicts import ( - DNSBaseline, - Verdict, - make_dns_baseline, - make_http_baseline_map, - make_tcp_baseline_map, - make_website_verdicts, -) from oonidata.dataclient import ( MeasurementListProgress, @@ -62,13 +51,6 @@ def make_observation_row(observation: Observation) -> dict: return asdict(observation) -def make_verdict_row(v: Verdict) -> dict: - row = asdict(v) - # XXX come up with a cleaner solution to this - row["outcome"] = row["outcome"].value - return row - - def write_observations_to_db( db: DatabaseConnection, observations: Iterable[Observation] ) -> None: @@ -77,12 +59,6 @@ def write_observations_to_db( db.write_row(obs.__table_name__, row) -def write_verdicts_to_db(db: DatabaseConnection, verdicts: Iterable[Verdict]) -> None: - for v in verdicts: - row = make_verdict_row(v) - db.write_row("verdict", row) - - def default_processor( msmt: BaseMeasurement, db: DatabaseConnection, @@ -297,174 +273,6 @@ def observations_in_session( return observation_list -def websites_observation_group( - day: date, - domain_name: str, - db: ClickhouseConnection, - probe_cc: Optional[str] = None, -) -> Generator[ - Tuple[ - List[DNSObservation], - List[TCPObservation], - List[TLSObservation], - List[HTTPObservation], - ], - None, - None, -]: - for dns_obs_list in dns_observations_by_session(day, domain_name, db, probe_cc): - report_id = dns_obs_list[0].report_id - tcp_o_list = observations_in_session( - day, - domain_name, - TCPObservation, - report_id, - db, - ) - tls_o_list = observations_in_session( - day, - domain_name, - TLSObservation, - report_id, - db, - ) - http_o_list = observations_in_session( - day, - domain_name, - HTTPObservation, - report_id, - db, - ) - - # Drop all verdicts related to this session from the database. - # XXX this should probably be refactored to be closer to the place where - # we do the insert, but will require quite a bit of reorganizing of the - # logic in here. - db.execute( - "ALTER TABLE verdict DELETE WHERE report_id = %(report_id)s", - {"report_id": report_id}, - ) - yield dns_obs_list, tcp_o_list, tls_o_list, http_o_list - - -import certifi -import ssl -import socket - - -def is_tls_valid(ip, hostname): - context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) - context.load_verify_locations(certifi.where()) - - with socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) as sock: - sock.settimeout(1) - with context.wrap_socket(sock, server_hostname=hostname) as conn: - try: - conn.connect((ip, 443)) - # TODO: do we care to distinguish these values? - except ssl.SSLCertVerificationError: - return False - except ssl.SSLError: - return False - except socket.timeout: - return False - except socket.error: - return False - except: - return False - return True - - -def get_extra_dns_consistency_tls_baseline( - dns_baseline: DNSBaseline, - dns_o_list: List[DNSObservation], - db: ClickhouseConnection, -) -> List[str]: - domain_name = dns_o_list[0].domain_name - missing_answers = set(map(lambda a: a.answer, dns_o_list)) - for a in list(missing_answers): - if a is None: - missing_answers.discard(a) - continue - - try: - if is_ip_bogon(a): - missing_answers.discard(a) - except ValueError: - missing_answers.discard(a) - - for a in dns_baseline.tls_consistent_answers: - missing_answers.discard(a) - - new_tls_consistent_ips = [] - res = db.execute( - "SELECT DISTINCT ip FROM dns_consistency_tls_baseline WHERE ip IN %(ip_list)s AND domain_name = %(domain_name)s", - {"ip_list": list(missing_answers), "domain_name": domain_name}, - ) - assert isinstance(res, list) - for row in res: - missing_answers.discard(row[0]) - new_tls_consistent_ips.append(row[0]) - - rows_to_write = [] - for ip in missing_answers: - timestamp = datetime.now() - if is_tls_valid(ip, domain_name): - rows_to_write.append((ip, domain_name, timestamp)) - new_tls_consistent_ips.append(ip) - - if len(rows_to_write) > 0: - db.execute( - "INSERT INTO dns_consistency_tls_baseline (ip, domain_name, timestamp) VALUES", - rows_to_write, - ) - return new_tls_consistent_ips - - -def generate_website_verdicts( - day: date, - db: ClickhouseConnection, - fingerprintdb: FingerprintDB, - netinfodb: NetinfoDB, - probe_cc: Optional[str] = None, -): - with logging_redirect_tqdm(): - for domain_name in tqdm(domains_in_a_day(day, db, probe_cc)): - log.debug(f"Generating verdicts for {domain_name}") - dns_baseline = make_dns_baseline(day, domain_name, db) - http_baseline_map = make_http_baseline_map(day, domain_name, db) - tcp_baseline_map = make_tcp_baseline_map(day, domain_name, db) - - for ( - dns_o_list, - tcp_o_list, - tls_o_list, - http_o_list, - ) in websites_observation_group(day, domain_name, db, probe_cc=probe_cc): - extra_tls_consistent_answers = get_extra_dns_consistency_tls_baseline( - dns_baseline, dns_o_list, db - ) - dns_baseline.tls_consistent_answers = list( - set( - list(dns_baseline.tls_consistent_answers) - + extra_tls_consistent_answers - ) - ) - yield from make_website_verdicts( - dns_o_list, - dns_baseline, - fingerprintdb, - netinfodb, - tcp_o_list, - tcp_baseline_map, - tls_o_list, - http_o_list, - http_baseline_map, - ) - - -verdict_generators = [generate_website_verdicts] - nettest_processors = { "web_connectivity": web_connectivity_processor, "dnscheck": dnscheck_processor, @@ -480,7 +288,6 @@ def process_day( test_name=[], probe_cc=[], start_at_idx=0, - skip_verdicts=False, fast_fail=False, ) -> Tuple[float, date]: t0 = time.monotonic() @@ -538,15 +345,4 @@ def progress_callback(p: MeasurementListProgress): if fast_fail: raise exc - if isinstance(db, ClickhouseConnection) and not skip_verdicts: - write_verdicts_to_db( - db, - generate_website_verdicts( - day, - db, - fingerprintdb, - netinfodb, - ), - ) - return time.monotonic() - t0, day diff --git a/oonidata/verdicts.py b/oonidata/verdicts.py deleted file mode 100644 index 58b160d5..00000000 --- a/oonidata/verdicts.py +++ /dev/null @@ -1,986 +0,0 @@ -import ipaddress -from dataclasses import dataclass, field -from enum import Enum -from typing import Optional, List, Tuple, Generator, Any, Mapping, Dict -from datetime import datetime, date - -from urllib.parse import urlparse - -from oonidata.fingerprintdb import FingerprintDB -from oonidata.netinfo import NetinfoDB - -from oonidata.datautils import one_day_dict - -from oonidata.observations import ( - DNSObservation, - HTTPObservation, - Observation, - TCPObservation, - TLSObservation, -) -from oonidata.dataformat import WebConnectivityControl, WebConnectivity -from oonidata.db.connections import ClickhouseConnection - -import logging - -log = logging.getLogger("oonidata.processing") - - -class Outcome(Enum): - # k: everything is OK - OK = "k" - # b: blocking is happening with an unknown scope - BLOCKED = "b" - # n: national level blocking - NATIONAL_BLOCK = "n" - # i: isp level blocking - ISP_BLOCK = "i" - # l: local blocking (school, office, home network) - LOCAL_BLOCK = "l" - # s: server-side blocking - SERVER_SIDE_BLOCK = "s" - # d: the subject is down - DOWN = "d" - # t: this is a signal indicating some form of network throttling - THROTTLING = "t" - - -def fp_scope_to_outcome(scope: Optional[str]) -> Outcome: - # "nat" national level blockpage - # "isp" ISP level blockpage - # "prod" text pattern related to a middlebox product - # "inst" text pattern related to a voluntary instition blockpage (school, office) - # "vbw" vague blocking word - # "fp" fingerprint for false positives - if scope == "nat": - return Outcome.NATIONAL_BLOCK - elif scope == "isp": - return Outcome.ISP_BLOCK - elif scope == "inst": - return Outcome.LOCAL_BLOCK - elif scope == "fp": - return Outcome.SERVER_SIDE_BLOCK - return Outcome.BLOCKED - - -@dataclass -class Verdict: - measurement_uid: str - observation_id: str - report_id: str - input: str - timestamp: datetime - - probe_asn: int - probe_cc: str - - probe_as_org_name: str - probe_as_cc: str - - network_type: str - - resolver_ip: Optional[str] - resolver_asn: Optional[int] - resolver_as_org_name: Optional[str] - resolver_as_cc: Optional[str] - resolver_cc: Optional[str] - - confidence: float - - subject: str - subject_category: str - subject_detail: str - - outcome: Outcome - - # This will include a more detailed breakdown of the outcome, for example it - # can be "dns.nxdomain" - outcome_detail: str - - -def make_verdict_from_obs( - obs: Observation, - confidence: float, - subject: str, - subject_category: str, - subject_detail: str, - outcome: Outcome, - outcome_detail: str, -) -> Verdict: - return Verdict( - measurement_uid=obs.measurement_uid, - observation_id=obs.observation_id, - report_id=obs.report_id, - input=obs.input, - timestamp=obs.timestamp, - probe_asn=obs.probe_asn, - probe_cc=obs.probe_cc, - probe_as_org_name=obs.probe_as_org_name, - probe_as_cc=obs.probe_as_cc, - network_type=obs.network_type, - resolver_ip=obs.resolver_ip, - resolver_asn=obs.resolver_asn, - resolver_as_org_name=obs.resolver_as_org_name, - resolver_as_cc=obs.resolver_as_cc, - resolver_cc=obs.resolver_cc, - confidence=confidence, - subject=subject, - subject_category=subject_category, - subject_detail=subject_detail, - outcome=outcome, - outcome_detail=outcome_detail, - ) - - -@dataclass -class TCPBaseline: - address: str - reachable_cc_asn: List[Tuple[str, int]] = field(default_factory=list) - unreachable_cc_asn: List[Tuple[str, int]] = field(default_factory=list) - - -def make_tcp_baseline_map( - day: date, domain_name: str, db: ClickhouseConnection -) -> Mapping[str, TCPBaseline]: - tcp_baseline_map = {} - q_params = one_day_dict(day) - q_params["domain_name"] = domain_name - - q = """SELECT probe_cc, probe_asn, ip, port, failure FROM obs_tcp - WHERE domain_name = %(domain_name)s - AND timestamp >= %(start_day)s - AND timestamp <= %(end_day)s - GROUP BY probe_cc, probe_asn, ip, port, failure; - """ - res = db.execute(q, q_params) - if isinstance(res, list) and len(res) > 0: - for probe_cc, probe_asn, ip, port, failure in res: - address = f"{ip}:{port}" - tcp_baseline_map[address] = tcp_baseline_map.get( - address, TCPBaseline(address) - ) - if not failure: - tcp_baseline_map[address].reachable_cc_asn.append((probe_cc, probe_asn)) - else: - tcp_baseline_map[address].unreachable_cc_asn.append( - (probe_cc, probe_asn) - ) - return tcp_baseline_map - - -@dataclass -class HTTPBaseline: - url: str - failure_cc_asn: List[Tuple[str, int]] = field(default_factory=list) - ok_cc_asn: List[Tuple[str, int]] = field(default_factory=list) - - response_body_length: int = 0 - response_body_sha1: str = "" - response_body_title: str = "" - response_body_meta_title: str = "" - - response_status_code: int = 0 - - -def maybe_get_first(l: list, default_value: Any = None) -> Optional[Any]: - try: - return l[0] - except IndexError: - return default_value - - -def make_http_baseline_map( - day: date, domain_name: str, db: ClickhouseConnection -) -> Mapping[str, HTTPBaseline]: - http_baseline_map = {} - - q_params = one_day_dict(day) - q_params["domain_name"] = domain_name - - q = """SELECT probe_cc, probe_asn, request_url, failure FROM obs_http - WHERE domain_name = %(domain_name)s - AND timestamp >= %(start_day)s - AND timestamp <= %(end_day)s - GROUP BY probe_cc, probe_asn, request_url, failure; - """ - res = db.execute(q, q_params) - if isinstance(res, list) and len(res) > 0: - for probe_cc, probe_asn, request_url, failure in res: - http_baseline_map[request_url] = http_baseline_map.get( - request_url, HTTPBaseline(request_url) - ) - if not failure: - http_baseline_map[request_url].ok_cc_asn.append((probe_cc, probe_asn)) - else: - http_baseline_map[request_url].failure_cc_asn.append( - (probe_cc, probe_asn) - ) - - q = """SELECT request_url, - topK(1)(response_body_sha1), - topK(1)(response_body_length), - topK(1)(response_body_title), - topK(1)(response_body_meta_title), - topK(1)(response_status_code) - FROM obs_http - WHERE failure IS NULL - AND domain_name = %(domain_name)s - AND timestamp >= %(start_day)s - AND timestamp <= %(end_day)s - GROUP BY request_url; - """ - res = db.execute(q, q_params) - if isinstance(res, list) and len(res) > 0: - for ( - request_url, - response_body_sha1, - response_body_length, - response_body_title, - response_body_meta_title, - response_status_code, - ) in res: - http_baseline_map[request_url] = http_baseline_map.get( - request_url, HTTPBaseline(request_url) - ) - http_baseline_map[request_url].response_body_sha1 = maybe_get_first( - response_body_sha1, "" - ) - http_baseline_map[request_url].response_body_length = maybe_get_first( - response_body_length, "" - ) - http_baseline_map[request_url].response_body_title = maybe_get_first( - response_body_title, "" - ) - http_baseline_map[request_url].response_body_meta_title = maybe_get_first( - response_body_meta_title, "" - ) - http_baseline_map[request_url].response_status_code = maybe_get_first( - response_status_code, "" - ) - - return http_baseline_map - - -@dataclass -class DNSBaseline: - domain: str - nxdomain_cc_asn: List[Tuple[str, int]] = field(default_factory=list) - failure_cc_asn: List[Tuple[str, int]] = field(default_factory=list) - ok_cc_asn: List[Tuple[str, int]] = field(default_factory=list) - tls_consistent_answers: List[str] = field(default_factory=list) - answers_map: Dict[str, List[Tuple[str, str]]] = field(default_factory=dict) - - -def make_dns_baseline( - day: date, domain_name: str, db: ClickhouseConnection -) -> DNSBaseline: - dns_baseline = DNSBaseline(domain_name) - - q_params = one_day_dict(day) - q_params["domain_name"] = domain_name - - q = """SELECT DISTINCT(ip) FROM obs_tls - WHERE is_certificate_valid = 1 - AND domain_name = %(domain_name)s - AND timestamp >= %(start_day)s - AND timestamp <= %(end_day)s; - """ - res = db.execute(q, q_params) - if isinstance(res, list) and len(res) > 0: - dns_baseline.tls_consistent_answers = [row[0] for row in res] - - q = """SELECT probe_cc, probe_asn, failure, answer FROM obs_dns - WHERE domain_name = %(domain_name)s - AND timestamp >= %(start_day)s - AND timestamp <= %(end_day)s - GROUP BY probe_cc, probe_asn, failure, answer; - """ - res = db.execute(q, q_params) - if isinstance(res, list) and len(res) > 0: - for probe_cc, probe_asn, failure, ip in res: - if not failure: - dns_baseline.ok_cc_asn.append((probe_cc, probe_asn)) - dns_baseline.answers_map[probe_cc] = dns_baseline.answers_map.get( - probe_cc, [] - ) - if ip: - dns_baseline.answers_map[probe_cc].append((probe_asn, ip)) - else: - log.error( - f"No IP present for {domain_name} {probe_cc} ({probe_asn}) in baseline" - ) - else: - dns_baseline.failure_cc_asn.append((probe_cc, probe_asn)) - if failure == "dns_nxdomain_error": - dns_baseline.nxdomain_cc_asn.append((probe_cc, probe_asn)) - - return dns_baseline - - -def is_dns_consistent( - dns_o: DNSObservation, dns_b: DNSBaseline, netinfodb: NetinfoDB -) -> Tuple[bool, float]: - if not dns_o.answer: - return False, 0 - - try: - ipaddress.ip_address(dns_o.answer) - except ValueError: - # Not an IP, er can't do much to validate it - return False, 0 - - if dns_o.answer in dns_b.tls_consistent_answers: - return True, 1.0 - - baseline_asns = set() - baseline_as_org_names = set() - - for ip in dns_b.tls_consistent_answers: - ip_info = netinfodb.lookup_ip(dns_o.timestamp, ip) - if ip_info: - baseline_asns.add(ip_info.as_info.asn) - baseline_as_org_names.add(ip_info.as_info.as_org_name.lower()) - - if dns_o.answer_asn in baseline_asns: - return True, 0.9 - - # XXX maybe with the org_name we can also do something like levenshtein - # distance to get more similarities - if ( - dns_o.answer_as_org_name - and dns_o.answer_as_org_name.lower() in baseline_as_org_names - ): - return True, 0.9 - - other_answers = dns_b.answers_map.copy() - other_answers.pop(dns_o.probe_cc, None) - other_ips = {} - other_asns = {} - for answer_list in other_answers.values(): - for _, ip in answer_list: - - other_ips[ip] = other_ips.get(ip, 0) - other_ips[ip] += 1 - if ip is None: - log.error(f"Missing ip for {dns_o.domain_name}") - continue - ip_info = netinfodb.lookup_ip(dns_o.timestamp, ip) - if ip_info: - asn = ip_info.as_info.asn - other_asns[asn] = other_asns.get(ip, 0) - other_asns[asn] += 1 - - if dns_o.answer in other_ips: - x = other_ips[dns_o.answer] - # This function was derived by looking for an exponential function in - # the form f(x) = c1*a^x + c2 and solving for f(0) = 0 and f(10) = 1, - # giving us a function in the form f(x) = (a^x - 1) / (a^10 - 1). We - # then choose the magic value of 0.6 by looking for a solution in a - # where f(1) ~= 0.5, doing a bit of plots and choosing a curve that - # looks reasonably sloped. - y = (pow(0.5, x) - 1) / (pow(0.5, 10) - 1) - return True, min(0.9, 0.8 * y) - - if dns_o.answer in other_asns: - x = other_asns[dns_o.answer_asn] - y = (pow(0.5, x) - 1) / (pow(0.5, 10) - 1) - return True, min(0.8, 0.7 * y) - - x = len(baseline_asns) - y = (pow(0.5, x) - 1) / (pow(0.5, 10) - 1) - return False, min(0.9, 0.8 * y) - - -def make_website_tcp_verdicts( - tcp_o: TCPObservation, tcp_b: TCPBaseline -) -> Optional[Verdict]: - outcome = Outcome.OK - confidence = 1 - outcome_detail = "" - - if tcp_o.failure: - unreachable_cc_asn = list(tcp_b.unreachable_cc_asn) - try: - unreachable_cc_asn.remove((tcp_o.probe_cc, tcp_o.probe_asn)) - except ValueError: - log.info( - "missing failure in tcp baseline. You are probably using a control derived baseline." - ) - - reachable_count = len(tcp_b.reachable_cc_asn) - unreachable_count = len(unreachable_cc_asn) - if reachable_count > unreachable_count: - # We are adding back 1 because we removed it above and it avoid a divide by zero - confidence = reachable_count / (reachable_count + unreachable_count + 1) - outcome = Outcome.BLOCKED - elif unreachable_count > reachable_count: - confidence = (unreachable_count + 1) / ( - reachable_count + unreachable_count + 1 - ) - outcome = Outcome.DOWN - - outcome_detail = f"tcp.{tcp_o.failure}" - - if outcome != Outcome.OK: - return make_verdict_from_obs( - tcp_o, - confidence=confidence, - subject=tcp_o.domain_name, - subject_detail=f"{tcp_o.ip}:{tcp_o.port}", - subject_category="website", - outcome=outcome, - outcome_detail=outcome_detail, - ) - - -def make_website_dns_verdict( - dns_o: DNSObservation, - dns_b: DNSBaseline, - fingerprintdb: FingerprintDB, - netinfodb: NetinfoDB, -) -> Optional[Verdict]: - if dns_o.fingerprint_id: - fp = fingerprintdb.get_fp(dns_o.fingerprint_id) - outcome = fp_scope_to_outcome(fp.scope) - confidence = 1.0 - # If we see the fingerprint in an unexpected country we should - # significantly reduce the confidence in the block - if ( - dns_o.probe_cc - and fp.expected_countries - and len(fp.expected_countries) > 0 - and dns_o.probe_cc not in fp.expected_countries - ): - log.debug( - f"Inconsistent probe_cc vs expected_countries {dns_o.probe_cc} != {fp.expected_countries}" - ) - confidence = 0.7 - - outcome_detail = "dns.blockpage" - return make_verdict_from_obs( - dns_o, - confidence=confidence, - subject=dns_o.domain_name, - subject_detail=f"{dns_o.answer}", - subject_category="website", - outcome=Outcome.BLOCKED, - outcome_detail=outcome_detail, - ) - - elif dns_o.answer_is_bogon and len(dns_b.tls_consistent_answers) > 0: - outcome_detail = "dns.bogon" - return make_verdict_from_obs( - dns_o, - confidence=0.9, - subject=dns_o.domain_name, - subject_detail=f"{dns_o.answer}", - subject_category="website", - outcome=Outcome.BLOCKED, - outcome_detail="dns.bogon", - ) - - elif dns_o.failure: - failure_cc_asn = list(dns_b.failure_cc_asn) - try: - failure_cc_asn.remove((dns_o.probe_cc, dns_o.probe_asn)) - except ValueError: - log.info( - "missing failure for the probe in the baseline. You are probably using a control derived baseline." - ) - - failure_count = len(failure_cc_asn) - ok_count = len(dns_b.ok_cc_asn) - - if dns_o.failure == "dns_nxdomain_error": - nxdomain_cc_asn = list(dns_b.nxdomain_cc_asn) - try: - nxdomain_cc_asn.remove((dns_o.probe_cc, dns_o.probe_asn)) - except ValueError: - log.info( - "missing nx_domain failure for the probe in the baseline. You are probably using a control derived baseline." - ) - - nxdomain_count = len(nxdomain_cc_asn) - if ok_count > nxdomain_count: - # We give a bit extra weight to an NXDOMAIN compared to other failures - confidence = ok_count / (ok_count + nxdomain_count + 1) - confidence = min(0.8, confidence * 1.5) - outcome = Outcome.BLOCKED - outcome_detail = "dns.nxdomain" - else: - confidence = (nxdomain_count + 1) / (ok_count + nxdomain_count + 1) - outcome = Outcome.DOWN - outcome_detail = "dns.nxdomain" - elif ok_count > failure_count: - confidence = ok_count / (ok_count + failure_count + 1) - outcome = Outcome.BLOCKED - outcome_detail = f"dns.{dns_o.failure}" - else: - confidence = (failure_count + 1) / (ok_count + failure_count + 1) - outcome = Outcome.DOWN - outcome_detail = f"dns.{dns_o.failure}" - return make_verdict_from_obs( - dns_o, - confidence=confidence, - subject=dns_o.domain_name, - subject_detail=f"{dns_o.answer}", - subject_category="website", - outcome=outcome, - outcome_detail=outcome_detail, - ) - - elif dns_o.is_tls_consistent == False: - outcome_detail = "dns.inconsistent.tls_mismatch" - return make_verdict_from_obs( - dns_o, - confidence=0.8, - subject=dns_o.domain_name, - subject_detail=f"{dns_o.answer}", - subject_category="website", - outcome=Outcome.BLOCKED, - outcome_detail=outcome_detail, - ) - - elif dns_o.is_tls_consistent == None: - # If we are in this case, it means we weren't able to determine the - # consistency of the DNS query using TLS. This is the case either - # because the tested site is not in HTTPS and therefore we didn't - # generate a TLS measurement for it or because the target IP isn't - # listening on HTTPS (which is quite fishy). - # In either case we should flag these with being somewhat likely to be - # blocked. - ip_based_consistency, consistency_confidence = is_dns_consistent( - dns_o, dns_b, netinfodb - ) - if ip_based_consistency is False and consistency_confidence > 0: - confidence = consistency_confidence - outcome_detail = "dns.inconsistent.generic" - # If the answer ASN is the same as the probe_asn, it's more likely - # to be a blockpage - if dns_o.answer_asn == dns_o.probe_asn: - outcome_detail = "dns.inconsistent.asn_match" - confidence = 0.8 - # same for the answer_cc - elif dns_o.answer_as_cc == dns_o.probe_cc: - outcome_detail = "dns.inconsistent.cc_match" - confidence = 0.7 - return make_verdict_from_obs( - dns_o, - confidence=confidence, - subject=dns_o.domain_name, - subject_detail=f"{dns_o.answer}", - subject_category="website", - outcome=Outcome.BLOCKED, - outcome_detail=outcome_detail, - ) - # No blocking detected - return None - - -def make_website_tls_verdict( - tls_o: TLSObservation, prev_verdicts: List[Verdict] -) -> Optional[Verdict]: - if tls_o.is_certificate_valid == False: - # We only consider it to be a TLS level verdict in cases when there is a - # certificate mismatch, but there was no DNS inconsistency. - # If the DNS was inconsistent, we will just count the DNS verdict - if ( - len( - list( - filter( - lambda v: v.outcome_detail.startswith("dns.") - and v.subject_detail == tls_o.ip, - prev_verdicts, - ) - ) - ) - > 0 - ): - return - - # TODO: this is wrong. We need to consider the baseline to establish TLS - # MITM, because the cert might be invalid also from other location (eg. - # it expired) and not due to censorship. - outcome_detail = "tls.mitm" - return make_verdict_from_obs( - tls_o, - confidence=1, - subject=tls_o.domain_name, - subject_detail=f"{tls_o.ip}:{tls_o.port}", - subject_category="website", - outcome=Outcome.BLOCKED, - outcome_detail=outcome_detail, - ) - elif tls_o.failure: - if ( - len( - list( - filter( - lambda v: v.outcome_detail.startswith("tcp.") - and v.subject_detail == f"{tls_o.ip}:443", - prev_verdicts, - ) - ) - ) - > 0 - ): - return - - # We only consider it to be a TLS level verdict if we haven't seen any - # blocks in TCP - outcome_detail = f"tls.{tls_o.failure}" - confidence = 0.5 - - if tls_o.tls_handshake_read_count == 0 and tls_o.tls_handshake_write_count == 1: - # This means we just wrote the TLS ClientHello, let's give it a bit - # more confidence in it being a block - confidence = 0.7 - - if tls_o.failure in ("connection_closed", "connection_reset"): - confidence += 0.2 - - return make_verdict_from_obs( - tls_o, - confidence=confidence, - subject=tls_o.domain_name, - subject_detail=f"{tls_o.ip}:{tls_o.port}", - subject_category="website", - outcome=Outcome.BLOCKED, - outcome_detail=outcome_detail, - ) - - -def make_website_http_verdict( - http_o: HTTPObservation, - http_b: HTTPBaseline, - prev_verdicts: List[Verdict], - fingerprintdb: FingerprintDB, -) -> Optional[Verdict]: - if http_o.failure: - # For HTTP requests we ignore cases in which we detected the blocking - # already to be happening via DNS or TCP. - if not http_o.request_is_encrypted and ( - len( - list( - filter( - lambda v: v.outcome_detail.startswith("dns.") - or ( - v.outcome_detail.startswith("tcp.") - and v.subject_detail.endswith(":80") - ), - prev_verdicts, - ) - ) - ) - > 0 - ): - return - - # Similarly for HTTPS we ignore cases when the block is done via TLS or TCP - if http_o.request_is_encrypted and ( - len( - list( - filter( - lambda v: v.outcome_detail.startswith("dns.") - or ( - v.outcome_detail.startswith("tcp.") - and v.subject_detail.endswith(":443") - ) - or v.outcome_detail.startswith("tls."), - prev_verdicts, - ) - ) - ) - > 0 - ): - return - - failure_cc_asn = list(http_b.failure_cc_asn) - try: - failure_cc_asn.remove((http_o.probe_cc, http_o.probe_asn)) - except ValueError: - log.info( - "missing failure in http baseline. Either something is wrong or you are using a control derived baseline" - ) - - failure_count = len(failure_cc_asn) - ok_count = len(http_b.ok_cc_asn) - if ok_count > failure_count: - # We are adding back 1 because we removed it above and it avoid a divide by zero - confidence = ok_count / (ok_count + failure_count + 1) - outcome = Outcome.BLOCKED - else: - confidence = (failure_count + 1) / (ok_count + failure_count + 1) - outcome = Outcome.DOWN - - outcome_detail = "http." - if http_o.request_is_encrypted: - outcome_detail = "https." - outcome_detail += http_o.failure - return make_verdict_from_obs( - http_o, - confidence=confidence, - subject=http_o.domain_name, - subject_detail="", - subject_category="website", - outcome=outcome, - outcome_detail=outcome_detail, - ) - elif http_o.response_matches_blockpage: - outcome = Outcome.BLOCKED - confidence = 0.7 - if http_o.request_is_encrypted: - confidence = 0 - elif http_o.fingerprint_country_consistent: - confidence = 1 - - for fp_name in http_o.response_fingerprints: - fp = fingerprintdb.get_fp(fp_name) - if fp.scope: - outcome = fp_scope_to_outcome(fp.scope) - break - - return make_verdict_from_obs( - http_o, - confidence=confidence, - subject=http_o.domain_name, - subject_detail="", - subject_category="website", - outcome=outcome, - outcome_detail="http.blockpage", - ) - - elif not http_o.request_is_encrypted: - if http_o.response_matches_false_positive: - return - if http_o.response_body_title == http_b.response_body_title: - return - if http_o.response_body_meta_title == http_b.response_body_meta_title: - return - if http_o.response_body_sha1 == http_b.response_body_sha1: - return - - if ( - http_o.response_body_length - and http_b.response_body_length - and ( - (http_o.response_body_length + 1.0) - / (http_b.response_body_length + 1.0) - < 0.7 - ) - ): - return make_verdict_from_obs( - http_o, - confidence=0.6, - subject=http_o.domain_name, - subject_detail="", - subject_category="website", - outcome=Outcome.BLOCKED, - outcome_detail="http.bodydiff", - ) - - -def make_dns_baseline_from_control( - msmt_input: str, control: WebConnectivityControl -) -> DNSBaseline: - domain_name = urlparse(msmt_input).hostname - - assert domain_name is not None, "domain_name is None" - - if not control or not control.dns: - return DNSBaseline(domain=domain_name) - - nxdomain_cc_asn = [] - if control.dns.failure == "dns_nxdomain_error": - nxdomain_cc_asn.append(("ZZ", 0)) - - ok_cc_asn = [] - failure_cc_asn = [] - if control.dns.failure is not None: - failure_cc_asn.append(("ZZ", 0)) - else: - ok_cc_asn.append(("ZZ", 0)) - - answers_map = {} - if control.dns.addrs: - answers_map["ZZ"] = [(0, ip) for ip in control.dns.addrs] - - return DNSBaseline( - domain=domain_name, - answers_map=answers_map, - ok_cc_asn=ok_cc_asn, - nxdomain_cc_asn=nxdomain_cc_asn, - failure_cc_asn=failure_cc_asn, - ) - - -def make_tcp_baseline_from_control( - control: WebConnectivityControl, -) -> Mapping[str, TCPBaseline]: - if not control or not control.tcp_connect: - return {} - - tcp_b_map = {} - for key, status in control.tcp_connect.items(): - if status.failure == None: - tcp_b_map[key] = TCPBaseline(address=key, reachable_cc_asn=[("ZZ", 0)]) - else: - tcp_b_map[key] = TCPBaseline(address=key, unreachable_cc_asn=[("ZZ", 0)]) - return tcp_b_map - - -def make_http_baseline_from_control( - msmt: WebConnectivity, control: WebConnectivityControl -) -> Mapping[str, HTTPBaseline]: - if not control or not control.http_request: - return {} - - if not msmt.test_keys.requests: - return {} - - http_b_map = {} - # We make the baseline apply to every URL in the response chain, XXX evaluate how much this is a good idea - for http_transaction in msmt.test_keys.requests: - if not http_transaction.request: - continue - - url = http_transaction.request.url - if control.http_request.failure == None: - http_b_map[url] = HTTPBaseline( - url=url, - response_body_title=control.http_request.title or "", - response_body_length=control.http_request.body_length or 0, - response_status_code=control.http_request.status_code or 0, - response_body_meta_title="", - response_body_sha1="", - ok_cc_asn=[("ZZ", 0)], - ) - else: - http_b_map[url] = HTTPBaseline( - url=url, - response_body_title="", - response_body_length=0, - response_status_code=0, - response_body_meta_title="", - response_body_sha1="", - failure_cc_asn=[("ZZ", 0)], - ) - return http_b_map - - -def make_website_verdicts( - dns_o_list: List[DNSObservation], - dns_b: DNSBaseline, - fingerprintdb: FingerprintDB, - netinfodb: NetinfoDB, - tcp_o_list: List[TCPObservation], - tcp_b_map: Mapping[str, TCPBaseline], - tls_o_list: List[TLSObservation], - http_o_list: List[HTTPObservation], - http_b_map: Mapping[str, HTTPBaseline], -) -> Generator[Verdict, None, List[str]]: - """ - make_website_verdicts will yield many verdicts given some observations - related to a website measurement. - - We MUST pass in DNS observations, but observations of other types are - optional. This is to workaround the fact that not every version of OONI - Probe was generating all types of observations. - - The order in which we compute the verdicts is important, since the knowledge - of some form of blocking is relevant to being able to determine future - methods of blocking. - Examples of this include: - * If you know that DNS is consistent and you see a TLS certificate - validation error, you can conclude that it's a MITM - * If you see that TCP connect is failing, you will not attribute a failing - TLS to a TLS level interference (ex. SNI filtering) - - For some lists of observations we also need to pass in a baseline. The - baseline is some groundtruth related to the targets being measured that are - needed in order to draw some meaningful conclusion about it's blocking. - We need this so that we are able to exclude instances in which the target is - unavailable due to transient network failures. - - It is the job of who calls the make_website_verdicts function to build this - baseline by either running queries against the database of observations or - using some set of observations that are already in memory. - - The basic idea is that since we are going to be generating verdicts in - windows of 24h, we would have to do the baselining only once for the 24h - time window given a certain domain. - """ - verdicts = [] - - domain_name = dns_o_list[0].domain_name - dns_verdicts = [] - for dns_o in dns_o_list: - assert ( - domain_name == dns_o.domain_name - ), f"Inconsistent domain_name in dns_o {dns_o.domain_name}" - dns_v = make_website_dns_verdict(dns_o, dns_b, fingerprintdb, netinfodb) - if dns_v: - dns_verdicts.append(dns_v) - else: - # If we didn't get a DNS verdict from an observation, it means that - # observation was a sign of everything being OK, hence we should - # ignore all the previous DNS verdicts as likely false positives and - # just consider no DNS level censorship to be happening. - dns_verdicts = [] - break - - for dns_v in dns_verdicts: - verdicts.append(dns_v) - yield dns_v - - if tcp_o_list: - for tcp_o in tcp_o_list: - assert ( - domain_name == tcp_o.domain_name - ), f"Inconsistent domain_name in tcp_o {tcp_o.domain_name}" - tcp_b = tcp_b_map.get(f"{tcp_o.ip}:{tcp_o.port}") - tcp_v = make_website_tcp_verdicts(tcp_o, tcp_b) if tcp_b else None - if tcp_v: - verdicts.append(tcp_v) - yield tcp_v - - if tls_o_list: - for tls_o in tls_o_list: - assert ( - domain_name == tls_o.domain_name - ), f"Inconsistent domain_name in tls_o {tls_o.domain_name}" - tls_v = make_website_tls_verdict(tls_o, verdicts) - if tls_v: - verdicts.append(tls_v) - yield tls_v - - if http_o_list: - for http_o in http_o_list: - assert ( - domain_name == http_o.domain_name - ), f"Inconsistent domain_name in http_o {http_o.domain_name}" - http_b = http_b_map.get(http_o.request_url) - http_v = ( - make_website_http_verdict(http_o, http_b, verdicts, fingerprintdb) - if http_b - else None - ) - if http_v: - verdicts.append(http_v) - yield http_v - - if len(verdicts) == 0: - # We didn't generate any verdicts up to now, so it's reasonable to say - # there is no interference happening for the given domain_name - ok_verdict = make_verdict_from_obs( - dns_o_list[0], - confidence=0.9, - subject=domain_name, - subject_detail="", - subject_category="website", - outcome=Outcome.OK, - outcome_detail="all", - ) - yield ok_verdict - verdicts.append(ok_verdict) - return verdicts diff --git a/poetry.lock b/poetry.lock index 607c4b1b..5ee4913d 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,11 +1,3 @@ -[[package]] -name = "atomicwrites" -version = "1.4.1" -description = "Atomic file writes." -category = "dev" -optional = false -python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" - [[package]] name = "attrs" version = "22.1.0" @@ -33,18 +25,18 @@ tzdata = ["tzdata"] [[package]] name = "black" -version = "22.10.0" +version = "22.3.0" description = "The uncompromising code formatter." category = "dev" optional = false -python-versions = ">=3.7" +python-versions = ">=3.6.2" [package.dependencies] click = ">=8.0.0" mypy-extensions = ">=0.4.3" pathspec = ">=0.9.0" platformdirs = ">=2" -tomli = {version = ">=1.1.0", markers = "python_full_version < \"3.11.0a7\""} +tomli = {version = ">=1.1.0", markers = "python_version < \"3.11\""} typed-ast = {version = ">=1.4.2", markers = "python_version < \"3.8\" and implementation_name == \"cpython\""} typing-extensions = {version = ">=3.10.0.0", markers = "python_version < \"3.10\""} @@ -56,14 +48,14 @@ uvloop = ["uvloop (>=0.15.2)"] [[package]] name = "boto3" -version = "1.25.4" +version = "1.24.96" description = "The AWS SDK for Python" category = "main" optional = false python-versions = ">= 3.7" [package.dependencies] -botocore = ">=1.28.4,<1.29.0" +botocore = ">=1.27.96,<1.28.0" jmespath = ">=0.7.1,<2.0.0" s3transfer = ">=0.6.0,<0.7.0" @@ -72,7 +64,7 @@ crt = ["botocore[crt] (>=1.21.0,<2.0a0)"] [[package]] name = "botocore" -version = "1.28.4" +version = "1.27.96" description = "Low-level, data-driven core of boto 3." category = "main" optional = false @@ -107,11 +99,11 @@ pycparser = "*" [[package]] name = "charset-normalizer" -version = "2.1.1" +version = "2.0.12" description = "The Real First Universal Charset Detector. Open, modern and actively maintained alternative to Chardet." category = "main" optional = false -python-versions = ">=3.6.0" +python-versions = ">=3.5.0" [package.extras] unicode_backport = ["unicodedata2"] @@ -186,6 +178,17 @@ sdist = ["setuptools-rust (>=0.11.4)"] ssh = ["bcrypt (>=3.1.5)"] test = ["pytest (>=6.2.0)", "pytest-benchmark", "pytest-cov", "pytest-subtests", "pytest-xdist", "pretend", "iso8601", "pytz", "hypothesis (>=1.11.4,!=3.79.2)"] +[[package]] +name = "exceptiongroup" +version = "1.0.0" +description = "Backport of PEP 654 (exception groups)" +category = "dev" +optional = false +python-versions = ">=3.7" + +[package.extras] +test = ["pytest (>=6)"] + [[package]] name = "idna" version = "3.4" @@ -198,7 +201,7 @@ python-versions = ">=3.5" name = "importlib-metadata" version = "5.0.0" description = "Read metadata from Python packages" -category = "dev" +category = "main" optional = false python-versions = ">=3.7" @@ -256,19 +259,17 @@ tests = ["pytest (!=3.3.0)", "psutil", "pytest-cov"] [[package]] name = "mashumaro" -version = "3.1" +version = "3.0.4" description = "Fast serialization framework on top of dataclasses" category = "main" optional = false python-versions = ">=3.6" [package.dependencies] -typing-extensions = ">=4.0.1" +typing-extensions = "*" [package.extras] msgpack = ["msgpack (>=0.5.6)"] -orjson = ["orjson"] -toml = ["tomli (>=1.1.0)", "tomli-w (>=1.0)"] yaml = ["pyyaml (>=3.13)"] [[package]] @@ -360,14 +361,6 @@ importlib-metadata = {version = ">=0.12", markers = "python_version < \"3.8\""} dev = ["pre-commit", "tox"] testing = ["pytest", "pytest-benchmark"] -[[package]] -name = "py" -version = "1.11.0" -description = "library with cross-python path, ini-parsing, io, code, log facilities" -category = "dev" -optional = false -python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" - [[package]] name = "pycparser" version = "2.21" @@ -404,25 +397,24 @@ diagrams = ["railroad-diagrams", "jinja2"] [[package]] name = "pytest" -version = "6.2.5" +version = "7.2.0" description = "pytest: simple powerful testing with Python" category = "dev" optional = false -python-versions = ">=3.6" +python-versions = ">=3.7" [package.dependencies] -atomicwrites = {version = ">=1.0", markers = "sys_platform == \"win32\""} attrs = ">=19.2.0" colorama = {version = "*", markers = "sys_platform == \"win32\""} +exceptiongroup = {version = ">=1.0.0rc8", markers = "python_version < \"3.11\""} importlib-metadata = {version = ">=0.12", markers = "python_version < \"3.8\""} iniconfig = "*" packaging = "*" pluggy = ">=0.12,<2.0" -py = ">=1.8.2" -toml = "*" +tomli = {version = ">=1.0.0", markers = "python_version < \"3.11\""} [package.extras] -testing = ["argcomplete", "hypothesis (>=3.56)", "mock", "nose", "requests", "xmlschema"] +testing = ["argcomplete", "hypothesis (>=3.56)", "mock", "nose", "pygments (>=2.7.2)", "requests", "xmlschema"] [[package]] name = "pytest-cov" @@ -452,7 +444,7 @@ six = ">=1.5" [[package]] name = "pytz" -version = "2022.5" +version = "2022.6" description = "World timezone definitions, modern and historical" category = "main" optional = false @@ -480,21 +472,21 @@ python-versions = ">=3.6" [[package]] name = "requests" -version = "2.28.1" +version = "2.27.1" description = "Python HTTP for Humans." category = "main" optional = false -python-versions = ">=3.7, <4" +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, !=3.5.*" [package.dependencies] certifi = ">=2017.4.17" -charset-normalizer = ">=2,<3" -idna = ">=2.5,<4" +charset-normalizer = {version = ">=2.0.0,<2.1.0", markers = "python_version >= \"3\""} +idna = {version = ">=2.5,<4", markers = "python_version >= \"3\""} urllib3 = ">=1.21.1,<1.27" [package.extras] -socks = ["PySocks (>=1.5.6,!=1.5.7)"] -use_chardet_on_py3 = ["chardet (>=3.0.2,<6)"] +socks = ["PySocks (>=1.5.6,!=1.5.7)", "win-inet-pton"] +use_chardet_on_py3 = ["chardet (>=3.0.2,<5)"] [[package]] name = "s3transfer" @@ -529,14 +521,6 @@ python-versions = "*" [package.dependencies] tornado = ">=2.0" -[[package]] -name = "toml" -version = "0.10.2" -description = "Python Library for Tom's Obvious, Minimal Language" -category = "dev" -optional = false -python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*" - [[package]] name = "tomli" version = "2.0.1" @@ -628,7 +612,7 @@ socks = ["PySocks (>=1.5.6,!=1.5.7,<2.0)"] name = "zipp" version = "3.10.0" description = "Backport of pathlib-compatible object wrapper for zip files" -category = "dev" +category = "main" optional = false python-versions = ">=3.7" @@ -639,18 +623,44 @@ testing = ["pytest (>=6)", "pytest-checkdocs (>=2.4)", "pytest-flake8", "flake8 [metadata] lock-version = "1.1" python-versions = ">=3.7,<4" -content-hash = "e9001bebff5cbdf76f195e18bd512311a22417e856ac32af960bc8fe340969fb" +content-hash = "1eb9dd569e6a2c0e62f080b5bc2edd204abd71e785d89204e481a2cf3cf120f2" [metadata.files] -atomicwrites = [] attrs = [] "backports.zoneinfo" = [] -black = [] +black = [ + {file = "black-22.3.0-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:2497f9c2386572e28921fa8bec7be3e51de6801f7459dffd6e62492531c47e09"}, + {file = "black-22.3.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:5795a0375eb87bfe902e80e0c8cfaedf8af4d49694d69161e5bd3206c18618bb"}, + {file = "black-22.3.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:e3556168e2e5c49629f7b0f377070240bd5511e45e25a4497bb0073d9dda776a"}, + {file = "black-22.3.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:67c8301ec94e3bcc8906740fe071391bce40a862b7be0b86fb5382beefecd968"}, + {file = "black-22.3.0-cp310-cp310-win_amd64.whl", hash = "sha256:fd57160949179ec517d32ac2ac898b5f20d68ed1a9c977346efbac9c2f1e779d"}, + {file = "black-22.3.0-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:cc1e1de68c8e5444e8f94c3670bb48a2beef0e91dddfd4fcc29595ebd90bb9ce"}, + {file = "black-22.3.0-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6d2fc92002d44746d3e7db7cf9313cf4452f43e9ea77a2c939defce3b10b5c82"}, + {file = "black-22.3.0-cp36-cp36m-win_amd64.whl", hash = "sha256:a6342964b43a99dbc72f72812bf88cad8f0217ae9acb47c0d4f141a6416d2d7b"}, + {file = "black-22.3.0-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:328efc0cc70ccb23429d6be184a15ce613f676bdfc85e5fe8ea2a9354b4e9015"}, + {file = "black-22.3.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:06f9d8846f2340dfac80ceb20200ea5d1b3f181dd0556b47af4e8e0b24fa0a6b"}, + {file = "black-22.3.0-cp37-cp37m-win_amd64.whl", hash = "sha256:ad4efa5fad66b903b4a5f96d91461d90b9507a812b3c5de657d544215bb7877a"}, + {file = "black-22.3.0-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:e8477ec6bbfe0312c128e74644ac8a02ca06bcdb8982d4ee06f209be28cdf163"}, + {file = "black-22.3.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:637a4014c63fbf42a692d22b55d8ad6968a946b4a6ebc385c5505d9625b6a464"}, + {file = "black-22.3.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:863714200ada56cbc366dc9ae5291ceb936573155f8bf8e9de92aef51f3ad0f0"}, + {file = "black-22.3.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:10dbe6e6d2988049b4655b2b739f98785a884d4d6b85bc35133a8fb9a2233176"}, + {file = "black-22.3.0-cp38-cp38-win_amd64.whl", hash = "sha256:cee3e11161dde1b2a33a904b850b0899e0424cc331b7295f2a9698e79f9a69a0"}, + {file = "black-22.3.0-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:5891ef8abc06576985de8fa88e95ab70641de6c1fca97e2a15820a9b69e51b20"}, + {file = "black-22.3.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:30d78ba6bf080eeaf0b7b875d924b15cd46fec5fd044ddfbad38c8ea9171043a"}, + {file = "black-22.3.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:ee8f1f7228cce7dffc2b464f07ce769f478968bfb3dd1254a4c2eeed84928aad"}, + {file = "black-22.3.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6ee227b696ca60dd1c507be80a6bc849a5a6ab57ac7352aad1ffec9e8b805f21"}, + {file = "black-22.3.0-cp39-cp39-win_amd64.whl", hash = "sha256:9b542ced1ec0ceeff5b37d69838106a6348e60db7b8fdd245294dc1d26136265"}, + {file = "black-22.3.0-py3-none-any.whl", hash = "sha256:bc58025940a896d7e5356952228b68f793cf5fcb342be703c3a2669a1488cb72"}, + {file = "black-22.3.0.tar.gz", hash = "sha256:35020b8886c022ced9282b51b5a875b6d1ab0c387b31a065b84db7c33085ca79"}, +] boto3 = [] botocore = [] certifi = [] cffi = [] -charset-normalizer = [] +charset-normalizer = [ + {file = "charset-normalizer-2.0.12.tar.gz", hash = "sha256:2857e29ff0d34db842cd7ca3230549d1a697f96ee6d3fb071cfa6c7393832597"}, + {file = "charset_normalizer-2.0.12-py3-none-any.whl", hash = "sha256:6881edbebdb17b39b4eaaa821b438bf6eddffb4468cf344f09f89def34a8b1df"}, +] click = [ {file = "click-8.1.3-py3-none-any.whl", hash = "sha256:bb4d8133cb15a609f44e8213d9b391b0809795062913b383c62be0ee95b1db48"}, {file = "click-8.1.3.tar.gz", hash = "sha256:7682dc8afb30297001674575ea00d1814d808d6a36af415a82bd481d37ba7b8e"}, @@ -659,6 +669,7 @@ clickhouse-driver = [] colorama = [] coverage = [] cryptography = [] +exceptiongroup = [] idna = [] importlib-metadata = [] iniconfig = [ @@ -713,10 +724,6 @@ pluggy = [ {file = "pluggy-1.0.0-py2.py3-none-any.whl", hash = "sha256:74134bbf457f031a36d68416e1509f34bd5ccc019f0bcc952c7b909d06b37bd3"}, {file = "pluggy-1.0.0.tar.gz", hash = "sha256:4224373bacce55f955a878bf9cfa763c1e360858e330072059e10bad68531159"}, ] -py = [ - {file = "py-1.11.0-py2.py3-none-any.whl", hash = "sha256:607c53218732647dff4acdfcd50cb62615cedf612e72d1724fb1a0cc6405b378"}, - {file = "py-1.11.0.tar.gz", hash = "sha256:51c75c4126074b472f746a24399ad32f6053d1b34b68d2fa41e558e6f4a98719"}, -] pycparser = [ {file = "pycparser-2.21-py2.py3-none-any.whl", hash = "sha256:8ee45429555515e1f6b185e78100aea234072576aa43ab53aefcae078162fca9"}, {file = "pycparser-2.21.tar.gz", hash = "sha256:e644fdec12f7872f86c58ff790da456218b10f863970249516d60a5eaca77206"}, @@ -726,10 +733,7 @@ pyparsing = [ {file = "pyparsing-3.0.9-py3-none-any.whl", hash = "sha256:5026bae9a10eeaefb61dab2f09052b9f4307d44aee4eda64b309723d8d206bbc"}, {file = "pyparsing-3.0.9.tar.gz", hash = "sha256:2b020ecf7d21b687f219b71ecad3631f644a47f01403fa1d1036b0c6416d70fb"}, ] -pytest = [ - {file = "pytest-6.2.5-py3-none-any.whl", hash = "sha256:7310f8d27bc79ced999e760ca304d69f6ba6c6649c0b60fb0e04a4a77cacc134"}, - {file = "pytest-6.2.5.tar.gz", hash = "sha256:131b36680866a76e6781d13f101efb86cf674ebb9762eb70d3082b6f29889e89"}, -] +pytest = [] pytest-cov = [ {file = "pytest-cov-3.0.0.tar.gz", hash = "sha256:e7f0f5b1617d2210a2cabc266dfe2f4c75a8d32fb89eafb7ad9d06f6d076d470"}, {file = "pytest_cov-3.0.0-py3-none-any.whl", hash = "sha256:578d5d15ac4a25e5f961c938b85a05b09fdaae9deef3bb6de9a6e766622ca7a6"}, @@ -791,10 +795,6 @@ snakeviz = [ {file = "snakeviz-2.1.1-py2.py3-none-any.whl", hash = "sha256:931142dc927101c9a4b6e89bc0577ff1a3d1886b483a04e6af70c31d2c3dce19"}, {file = "snakeviz-2.1.1.tar.gz", hash = "sha256:0d96c006304f095cb4b3fb7ed98bb866ca35a7ca4ab9020bbc27d295ee4c94d9"}, ] -toml = [ - {file = "toml-0.10.2-py2.py3-none-any.whl", hash = "sha256:806143ae5bfb6a3c6e736a764057db0e6a0e05e338b5630894a5f779cabb4f9b"}, - {file = "toml-0.10.2.tar.gz", hash = "sha256:b3bda1d108d5dd99f4a20d24d9c348e91c4db7ab1b749200bded2f839ccbe68f"}, -] tomli = [ {file = "tomli-2.0.1-py3-none-any.whl", hash = "sha256:939de3e7a6161af0c887ef91b7d41a53e7c5a1ca976325f429cb46ea9bc30ecc"}, {file = "tomli-2.0.1.tar.gz", hash = "sha256:de526c12914f0c550d15924c62d72abc48d6fe7364aa87328337a31007fe8a4f"}, diff --git a/pyproject.toml b/pyproject.toml index 601137e5..b00da6f6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,30 +1,31 @@ [tool.poetry] name = "oonidata" -version = "0.2.1" +version = "0.2.2" description = "" authors = ["Arturo Filastò "] [tool.poetry.dependencies] python = ">=3.7,<4" -boto3 = "^1.24.2" -PyYAML = "^6.0" -tqdm = "^4.64.0" -lz4 = "^4.0.1" -requests = "^2.27.1" -cryptography = "^38.0.2" -clickhouse-driver = "^0.2.3" -lxml = "^4.9.0" -maxminddb = "^2.2.0" -orjson = "^3.8.0" -mashumaro = "^3.0.4" -pyOpenSSL = "^22.1.0" +boto3 = "~1.24" +PyYAML = "~6.0" +tqdm = "~4.64" +lz4 = "~4.0" +requests = "~2.27" +cryptography = "~38.0" +clickhouse-driver = "~0.2" +lxml = "~4.9" +maxminddb = "~2.2" +orjson = "~3.8" +mashumaro = "~3.0" +pyOpenSSL = "~22.1" +importlib-metadata = {version = ">=1.0", python = "<3.8"} [tool.poetry.dev-dependencies] -pytest = "^6.2" -black = "^22.3.0" -snakeviz = "^2.1.1" -mypy = "^0.961" -pytest-cov = "^3.0.0" +pytest = "~7.2" +pytest-cov = "~3.0.0" +black = "~22.3.0" +snakeviz = "~2.1.1" +mypy = "~0.961" [build-system] requires = ["poetry-core>=1.0.0"] diff --git a/tests/test_blockingevents.py b/tests/test_blockingevents.py deleted file mode 100644 index af3825cd..00000000 --- a/tests/test_blockingevents.py +++ /dev/null @@ -1,116 +0,0 @@ -from base64 import b64decode -from datetime import datetime -from oonidata.dataformat import load_measurement, Signal, SIGNAL_PEM_STORE -from oonidata.datautils import validate_cert_chain -from oonidata.blockingevents import make_signal_blocking_event -from oonidata.observations import make_signal_observations, NettestObservation - - -def test_signal(fingerprintdb, netinfodb, measurements): - - signal_old_ca = load_measurement( - msmt_path=measurements["20221016235944.266268_GB_signal_1265ff650ee17b44"] - ) - assert isinstance(signal_old_ca, Signal) - assert signal_old_ca.test_keys.tls_handshakes - - for tls_handshake in signal_old_ca.test_keys.tls_handshakes: - assert tls_handshake.peer_certificates - assert tls_handshake.server_name - certificate_chain = list( - map(lambda c: b64decode(c.data), tls_handshake.peer_certificates) - ) - validate_cert_chain( - datetime(2021, 10, 16), - certificate_chain=certificate_chain, - pem_cert_store=SIGNAL_PEM_STORE, - ) - - signal_new_ca = load_measurement( - msmt_path=measurements["20221020235950.432819_NL_signal_27b05458f186a906"] - ) - assert isinstance(signal_new_ca, Signal) - assert signal_new_ca.test_keys.tls_handshakes - - for tls_handshake in signal_new_ca.test_keys.tls_handshakes: - assert tls_handshake.peer_certificates - assert tls_handshake.server_name - certificate_chain = list( - map(lambda c: b64decode(c.data), tls_handshake.peer_certificates) - ) - validate_cert_chain( - datetime(2022, 10, 20), - certificate_chain=certificate_chain, - pem_cert_store=SIGNAL_PEM_STORE, - ) - - nt_obs = NettestObservation.from_measurement(signal_new_ca, netinfodb) - dns_obs, tcp_obs, tls_obs, http_obs = make_signal_observations( - signal_new_ca, fingerprintdb=fingerprintdb, netinfodb=netinfodb - ) - blocking_event = make_signal_blocking_event( - nt_obs=nt_obs, - dns_o_list=dns_obs, - tcp_o_list=tcp_obs, - tls_o_list=tls_obs, - http_o_list=http_obs, - netinfodb=netinfodb, - fingerprintdb=fingerprintdb, - ) - print(blocking_event.outcomes) - assert blocking_event.anomaly == False - assert blocking_event.confirmed == False - - signal_blocked_uz = load_measurement( - msmt_path=measurements["20210926222047.205897_UZ_signal_95fab4a2e669573f"] - ) - assert isinstance(signal_blocked_uz, Signal) - nt_obs = NettestObservation.from_measurement(signal_blocked_uz, netinfodb) - dns_obs, tcp_obs, tls_obs, http_obs = make_signal_observations( - signal_blocked_uz, fingerprintdb=fingerprintdb, netinfodb=netinfodb - ) - blocking_event = make_signal_blocking_event( - nt_obs=nt_obs, - dns_o_list=dns_obs, - tcp_o_list=tcp_obs, - tls_o_list=tls_obs, - http_o_list=http_obs, - netinfodb=netinfodb, - fingerprintdb=fingerprintdb, - ) - assert blocking_event.anomaly == True - assert blocking_event.confirmed == False - tls_outcomes = list( - filter( - lambda outcome: outcome.outcome_detail.startswith("tls."), - blocking_event.outcomes, - ) - ) - assert len(tls_outcomes) > 0 - - signal_blocked_ir = load_measurement( - msmt_path=measurements["20221018174612.488229_IR_signal_f8640b28061bec06"] - ) - assert isinstance(signal_blocked_ir, Signal) - nt_obs = NettestObservation.from_measurement(signal_blocked_ir, netinfodb) - dns_obs, tcp_obs, tls_obs, http_obs = make_signal_observations( - signal_blocked_ir, fingerprintdb=fingerprintdb, netinfodb=netinfodb - ) - blocking_event = make_signal_blocking_event( - nt_obs=nt_obs, - dns_o_list=dns_obs, - tcp_o_list=tcp_obs, - tls_o_list=tls_obs, - http_o_list=http_obs, - netinfodb=netinfodb, - fingerprintdb=fingerprintdb, - ) - assert blocking_event.anomaly == True - dns_outcomes = list( - filter( - lambda outcome: outcome.outcome_detail.startswith("dns."), - blocking_event.outcomes, - ) - ) - assert len(dns_outcomes) > 0 - assert blocking_event.confirmed == True diff --git a/tests/test_experiments.py b/tests/test_experiments.py new file mode 100644 index 00000000..b716fc81 --- /dev/null +++ b/tests/test_experiments.py @@ -0,0 +1,283 @@ +from base64 import b64decode +from datetime import date, datetime +from unittest.mock import MagicMock +from oonidata.dataformat import ( + WebConnectivity, + load_measurement, + Signal, + SIGNAL_PEM_STORE, +) +from oonidata.datautils import validate_cert_chain +from oonidata.experiments.control import ( + make_dns_control, + make_http_control_map, + make_tcp_control_map, +) +from oonidata.experiments.experiment_result import BlockingType +from oonidata.experiments.signal import make_signal_experiment_result +from oonidata.experiments.websites import make_website_dns_blocking_event +from oonidata.observations import ( + make_dns_observations, + make_signal_observations, + NettestObservation, +) + + +def test_signal(fingerprintdb, netinfodb, measurements): + + signal_old_ca = load_measurement( + msmt_path=measurements["20221016235944.266268_GB_signal_1265ff650ee17b44"] + ) + assert isinstance(signal_old_ca, Signal) + assert signal_old_ca.test_keys.tls_handshakes + + for tls_handshake in signal_old_ca.test_keys.tls_handshakes: + assert tls_handshake.peer_certificates + assert tls_handshake.server_name + certificate_chain = list( + map(lambda c: b64decode(c.data), tls_handshake.peer_certificates) + ) + validate_cert_chain( + datetime(2021, 10, 16), + certificate_chain=certificate_chain, + pem_cert_store=SIGNAL_PEM_STORE, + ) + + signal_new_ca = load_measurement( + msmt_path=measurements["20221020235950.432819_NL_signal_27b05458f186a906"] + ) + assert isinstance(signal_new_ca, Signal) + assert signal_new_ca.test_keys.tls_handshakes + + for tls_handshake in signal_new_ca.test_keys.tls_handshakes: + assert tls_handshake.peer_certificates + assert tls_handshake.server_name + certificate_chain = list( + map(lambda c: b64decode(c.data), tls_handshake.peer_certificates) + ) + validate_cert_chain( + datetime(2022, 10, 20), + certificate_chain=certificate_chain, + pem_cert_store=SIGNAL_PEM_STORE, + ) + + nt_obs = NettestObservation.from_measurement(signal_new_ca, netinfodb) + dns_obs, tcp_obs, tls_obs, http_obs = make_signal_observations( + signal_new_ca, fingerprintdb=fingerprintdb, netinfodb=netinfodb + ) + er = make_signal_experiment_result( + nt_obs=nt_obs, + dns_o_list=dns_obs, + tcp_o_list=tcp_obs, + tls_o_list=tls_obs, + http_o_list=http_obs, + netinfodb=netinfodb, + fingerprintdb=fingerprintdb, + ) + assert er.anomaly == False + assert er.confirmed == False + + signal_blocked_uz = load_measurement( + msmt_path=measurements["20210926222047.205897_UZ_signal_95fab4a2e669573f"] + ) + assert isinstance(signal_blocked_uz, Signal) + nt_obs = NettestObservation.from_measurement(signal_blocked_uz, netinfodb) + dns_obs, tcp_obs, tls_obs, http_obs = make_signal_observations( + signal_blocked_uz, fingerprintdb=fingerprintdb, netinfodb=netinfodb + ) + blocking_event = make_signal_experiment_result( + nt_obs=nt_obs, + dns_o_list=dns_obs, + tcp_o_list=tcp_obs, + tls_o_list=tls_obs, + http_o_list=http_obs, + netinfodb=netinfodb, + fingerprintdb=fingerprintdb, + ) + assert blocking_event.anomaly == True + assert blocking_event.confirmed == False + tls_be = list( + filter( + lambda be: be.blocking_detail.startswith("tls."), + blocking_event.blocking_events, + ) + ) + assert len(tls_be) > 0 + + signal_blocked_ir = load_measurement( + msmt_path=measurements["20221018174612.488229_IR_signal_f8640b28061bec06"] + ) + assert isinstance(signal_blocked_ir, Signal) + nt_obs = NettestObservation.from_measurement(signal_blocked_ir, netinfodb) + dns_obs, tcp_obs, tls_obs, http_obs = make_signal_observations( + signal_blocked_ir, fingerprintdb=fingerprintdb, netinfodb=netinfodb + ) + blocking_event = make_signal_experiment_result( + nt_obs=nt_obs, + dns_o_list=dns_obs, + tcp_o_list=tcp_obs, + tls_o_list=tls_obs, + http_o_list=http_obs, + netinfodb=netinfodb, + fingerprintdb=fingerprintdb, + ) + assert blocking_event.anomaly == True + dns_outcomes = list( + filter( + lambda be: be.blocking_detail.startswith("dns."), + blocking_event.blocking_events, + ) + ) + assert len(dns_outcomes) > 0 + assert blocking_event.confirmed == True + + +def baseline_query_mock(q, q_params): + # This pattern of mocking is a bit brittle. + # TODO: come up with a better way of mocking these things out + if "SELECT DISTINCT(ip) FROM obs_tls" in q: + return [["162.159.137.6"], ["162.159.136.6"], ["2606:4700:7::a29f:8906"]] + if "SELECT probe_cc, probe_asn, failure, answer FROM obs_dns" in q: + return [ + ["IT", 12345, None, "162.159.137.6"], + ["GB", 789, None, "162.159.137.6"], + ["FR", 5410, "dns_nxdomain_error", ""], + ] + + if "SELECT probe_cc, probe_asn, request_url, failure FROM obs_http" in q: + return [ + ["IT", 12345, "https://thepiratebay.org/", ""], + ["FR", 5410, "https://thepiratebay.org/", "dns_nxdomain_error"], + ["GB", 789, "https://thepiratebay.org/", ""], + ] + + if "response_body_sha1" in q: + return [ + [ + "http://thepiratebay.org/", + ["1965c4952cc8c082a6307ed67061a57aab6632fa"], + [134], + [""], + [""], + [301], + ], + ["http://thepiratebay.org/index.html", [""], [], [""], [""], [301]], + [ + "https://thepiratebay.org/index.html", + ["c2062ae3fb19fa0d9657b1827a80e10c937b4691"], + [4712], + [""], + [""], + [200], + ], + [ + "https://thepiratebay.org/index.html", + ["cf7a17ad4d1cb7683a1f8592588e5c7b49629cc3"], + [154], + [""], + [""], + [302], + ], + ] + + if "SELECT probe_cc, probe_asn, ip, port, failure FROM obs_tcp" in q: + return [ + ["IT", 12345, "162.159.137.6", 443, ""], + ["FR", 5410, "162.159.137.6", 443, ""], + ["GB", 789, "162.159.137.6", 443, ""], + ] + + +def make_mock_ctrldb(): + db = MagicMock() + db.execute = MagicMock() + db.execute.side_effect = baseline_query_mock + return db + + +def test_baselines(): + day = date(2022, 1, 1) + domain_name = "ooni.org" + db = make_mock_ctrldb() + + dns_ctrl = make_dns_control(day, domain_name, db) + assert len(dns_ctrl.failure_cc_asn) == 1 + assert len(dns_ctrl.ok_cc_asn) == 2 + assert "162.159.137.6" in dns_ctrl.tls_consistent_answers + + http_baseline_map = make_http_control_map(day, domain_name, db) + assert len(http_baseline_map["https://thepiratebay.org/"].failure_cc_asn) == 1 + + tcp_baseline_map = make_tcp_control_map(day, domain_name, db) + assert len(tcp_baseline_map["162.159.137.6:443"].reachable_cc_asn) == 3 + + +def test_website_dns_blocking_event(fingerprintdb, netinfodb, measurements): + day = date(2022, 1, 1) + domain_name = "ooni.org" + + db = make_mock_ctrldb() + + msmt = load_measurement( + msmt_path=measurements[ + "20220627030703.592775_IR_webconnectivity_80e199b3c572f8d3" + ] + ) + assert isinstance(msmt, WebConnectivity) + dns_ctrl = make_dns_control(day, domain_name, db) + for dns_o in make_dns_observations( + msmt, msmt.test_keys.queries, fingerprintdb, netinfodb + ): + blocking_event = make_website_dns_blocking_event( + dns_o, dns_ctrl, fingerprintdb, netinfodb + ) + assert blocking_event.blocking_type == BlockingType.NATIONAL_BLOCK + assert blocking_event.blocking_detail == "dns.inconsistent.blockpage" + + msmt = load_measurement( + msmt_path=measurements[ + "20220627134426.194308_DE_webconnectivity_15675b61ec62e268" + ] + ) + assert isinstance(msmt, WebConnectivity) + dns_ctrl = make_dns_control(day, domain_name, db) + for dns_o in make_dns_observations( + msmt, msmt.test_keys.queries, fingerprintdb, netinfodb + ): + blocking_event = make_website_dns_blocking_event( + dns_o, dns_ctrl, fingerprintdb, netinfodb + ) + assert blocking_event.blocking_type == BlockingType.BLOCKED + assert blocking_event.blocking_detail == "dns.inconsistent.bogon" + + msmt = load_measurement( + msmt_path=measurements[ + "20220627125833.737451_FR_webconnectivity_bca9ad9d3371919a" + ] + ) + assert isinstance(msmt, WebConnectivity) + dns_ctrl = make_dns_control(day, domain_name, db) + for dns_o in make_dns_observations( + msmt, msmt.test_keys.queries, fingerprintdb, netinfodb + ): + blocking_event = make_website_dns_blocking_event( + dns_o, dns_ctrl, fingerprintdb, netinfodb + ) + assert blocking_event.blocking_type == BlockingType.BLOCKED + assert blocking_event.blocking_detail == "dns.inconsistent.nxdomain" + + msmt = load_measurement( + msmt_path=measurements[ + "20220625234824.235023_HU_webconnectivity_3435a5df0e743d39" + ] + ) + assert isinstance(msmt, WebConnectivity) + dns_ctrl = make_dns_control(day, domain_name, db) + for dns_o in make_dns_observations( + msmt, msmt.test_keys.queries, fingerprintdb, netinfodb + ): + blocking_event = make_website_dns_blocking_event( + dns_o, dns_ctrl, fingerprintdb, netinfodb + ) + assert blocking_event.blocking_type == BlockingType.OK + break diff --git a/tests/test_verdicts.py b/tests/test_verdicts.py deleted file mode 100644 index 4f3caa8d..00000000 --- a/tests/test_verdicts.py +++ /dev/null @@ -1,162 +0,0 @@ -from unittest.mock import MagicMock - -from datetime import date - -from oonidata.apiclient import get_measurement_dict -from oonidata.dataformat import load_measurement - -from oonidata.observations import make_dns_observations -from oonidata.verdicts import ( - Outcome, - make_dns_baseline, - make_tcp_baseline_map, - make_http_baseline_map, -) -from oonidata.verdicts import make_website_dns_verdict - - -def baseline_query_mock(q, q_params): - # This pattern of mocking is a bit brittle. - # TODO: come up with a better way of mocking these things out - if "SELECT DISTINCT(ip) FROM obs_tls" in q: - return [["162.159.137.6"], ["162.159.136.6"], ["2606:4700:7::a29f:8906"]] - if "SELECT probe_cc, probe_asn, failure, answer FROM obs_dns" in q: - return [ - ["IT", 12345, None, "162.159.137.6"], - ["GB", 789, None, "162.159.137.6"], - ["FR", 5410, "dns_nxdomain_error", ""], - ] - - if "SELECT probe_cc, probe_asn, request_url, failure FROM obs_http" in q: - return [ - ["IT", 12345, "https://thepiratebay.org/", ""], - ["FR", 5410, "https://thepiratebay.org/", "dns_nxdomain_error"], - ["GB", 789, "https://thepiratebay.org/", ""], - ] - - if "response_body_sha1" in q: - return [ - [ - "http://thepiratebay.org/", - ["1965c4952cc8c082a6307ed67061a57aab6632fa"], - [134], - [""], - [""], - [301], - ], - ["http://thepiratebay.org/index.html", [""], [], [""], [""], [301]], - [ - "https://thepiratebay.org/index.html", - ["c2062ae3fb19fa0d9657b1827a80e10c937b4691"], - [4712], - [""], - [""], - [200], - ], - [ - "https://thepiratebay.org/index.html", - ["cf7a17ad4d1cb7683a1f8592588e5c7b49629cc3"], - [154], - [""], - [""], - [302], - ], - ] - - if "SELECT probe_cc, probe_asn, ip, port, failure FROM obs_tcp" in q: - return [ - ["IT", 12345, "162.159.137.6", 443, ""], - ["FR", 5410, "162.159.137.6", 443, ""], - ["GB", 789, "162.159.137.6", 443, ""], - ] - - -def make_mock_baselinedb(): - db = MagicMock() - db.execute = MagicMock() - db.execute.side_effect = baseline_query_mock - return db - - -def test_baselines(): - day = date(2022, 1, 1) - domain_name = "ooni.org" - db = make_mock_baselinedb() - - dns_baseline = make_dns_baseline(day, domain_name, db) - assert len(dns_baseline.failure_cc_asn) == 1 - assert len(dns_baseline.ok_cc_asn) == 2 - assert "162.159.137.6" in dns_baseline.tls_consistent_answers - - http_baseline_map = make_http_baseline_map(day, domain_name, db) - assert len(http_baseline_map["https://thepiratebay.org/"].failure_cc_asn) == 1 - - tcp_baseline_map = make_tcp_baseline_map(day, domain_name, db) - assert len(tcp_baseline_map["162.159.137.6:443"].reachable_cc_asn) == 3 - - -def test_website_dns_verdict(fingerprintdb, netinfodb, measurements): - day = date(2022, 1, 1) - domain_name = "ooni.org" - - db = make_mock_baselinedb() - - msmt = load_measurement( - msmt_path=measurements[ - "20220627030703.592775_IR_webconnectivity_80e199b3c572f8d3" - ] - ) - dns_baseline = make_dns_baseline(day, domain_name, db) - for dns_o in make_dns_observations( - msmt, msmt.test_keys.queries, fingerprintdb, netinfodb - ): - verdict = make_website_dns_verdict( - dns_o, dns_baseline, fingerprintdb, netinfodb - ) - assert verdict.outcome == Outcome.BLOCKED - assert verdict.outcome_detail == "dns.blockpage" - - msmt = load_measurement( - msmt_path=measurements[ - "20220627134426.194308_DE_webconnectivity_15675b61ec62e268" - ] - ) - dns_baseline = make_dns_baseline(day, domain_name, db) - for dns_o in make_dns_observations( - msmt, msmt.test_keys.queries, fingerprintdb, netinfodb - ): - verdict = make_website_dns_verdict( - dns_o, dns_baseline, fingerprintdb, netinfodb - ) - assert verdict.outcome == Outcome.BLOCKED - assert verdict.outcome_detail == "dns.bogon" - - msmt = load_measurement( - msmt_path=measurements[ - "20220627125833.737451_FR_webconnectivity_bca9ad9d3371919a" - ] - ) - dns_baseline = make_dns_baseline(day, domain_name, db) - for dns_o in make_dns_observations( - msmt, msmt.test_keys.queries, fingerprintdb, netinfodb - ): - verdict = make_website_dns_verdict( - dns_o, dns_baseline, fingerprintdb, netinfodb - ) - assert verdict.outcome == Outcome.BLOCKED - assert verdict.outcome_detail == "dns.nxdomain" - - msmt = load_measurement( - msmt_path=measurements[ - "20220625234824.235023_HU_webconnectivity_3435a5df0e743d39" - ] - ) - dns_baseline = make_dns_baseline(day, domain_name, db) - for dns_o in make_dns_observations( - msmt, msmt.test_keys.queries, fingerprintdb, netinfodb - ): - verdict = make_website_dns_verdict( - dns_o, dns_baseline, fingerprintdb, netinfodb - ) - assert verdict is None - break