Skip to content

Commit

Permalink
typing
Browse files Browse the repository at this point in the history
  • Loading branch information
awmatheson committed Aug 8, 2024
1 parent c9709ee commit 13725e8
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 29 deletions.
2 changes: 1 addition & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ def _resolve_bytewax_type_aliases(
return None


def setup(app: Sphinx):
def setup(app: Sphinx) -> None:
"""Install our custom Sphinx build hooks."""
app.connect("missing-reference", _ignore_private)
app.connect("missing-reference", _resolve_type_aliases)
Expand Down
4 changes: 2 additions & 2 deletions examples/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@
chop.output(
"output_clickhouse",
metrics,
PA_SCHEMA,
"metrics",
CH_SCHEMA,
"admin",
"password",
database="bytewax",
port=8123,
ch_schema=CH_SCHEMA,
order_by=ORDER_BY,
pa_schema=PA_SCHEMA,
timeout=timedelta(seconds=1),
max_size=10,
)
4 changes: 2 additions & 2 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ lint: _assert-venv
vermin --config-file vermin-lib.ini src/ pytests/
vermin --config-file vermin-dev.ini docs/ *.py
ruff check src/ pytests/ docs/
mypy -p bytewax-clickhouse
mypy pytests/ docs/
mypy -p bytewax.clickhouse
mypy docs/

# Manually check that all pre-commit hooks pass; runs in CI
lint-pc: _assert-venv
Expand Down
15 changes: 6 additions & 9 deletions src/bytewax/clickhouse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@
"""

import logging
from typing import List, Optional, TypeVar
from typing import List, TypeVar

from bytewax.outputs import DynamicSink, StatelessSinkPartition
from clickhouse_connect import get_client
from pyarrow import Table, concat_tables
from pyarrow import Table, concat_tables # type: ignore
from typing_extensions import override

K = TypeVar("K")
Expand Down Expand Up @@ -107,12 +107,12 @@ class ClickHouseSink(DynamicSink):
def __init__(
self,
table_name: str,
schema: str,
username: str,
password: str,
host: str = "localhost",
port: int = 8123,
database: Optional[str] = None,
schema: Optional[str] = None,
database: str = "default",
order_by: str = "",
):
"""Initialize the ClickHouseSink.
Expand All @@ -128,7 +128,7 @@ def __init__(
host (str, optional): Hostname of ClickHouse server. Default is "localhost".
port (int, optional): Port number of the ClickHouse server. Default is 8123.
database (Optional[str], optional): Name of the database in ClickHouse.
Defaults to None. If not provided, uses "default".
If not provided, uses "default".
schema (Optional[str], optional): Schema definition for the table if needs
to be created. Defaults to None.
order_by (str, optional): Comma-separated list of columns to order by in the
Expand All @@ -147,9 +147,6 @@ def __init__(
self.schema = schema

# init client
if not self.database:
logger.warning("database not set, using 'default'")
self.database = "default"
client = get_client(
host=self.host,
port=self.port,
Expand Down Expand Up @@ -191,7 +188,7 @@ def __init__(
mergetree_type = client.command(mergetree_type_query)
logger.info(f"MergeTree type of the table '{table_name}': {mergetree_type}")

if "ReplacingMergeTree" not in mergetree_type:
if "ReplacingMergeTree" not in str(mergetree_type):
logger.warning(
f"""Table '{table_name}' is not using ReplacingMergeTree.
Consider modifying the table to avoid performance degredation
Expand Down
31 changes: 16 additions & 15 deletions src/bytewax/clickhouse/operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
"""

from datetime import timedelta
from typing import List, Optional, Tuple
from typing import List, Tuple

import bytewax.operators as op
import pyarrow as pa
import pyarrow as pa # type: ignore
from bytewax.clickhouse import ClickHouseSink, V
from bytewax.dataflow import Stream, operator
from typing_extensions import TypeAlias
Expand All @@ -43,7 +43,7 @@ def _to_sink(
) -> KeyedStream[List[V]]:
"""Convert records to PyArrow Table."""

def shim_mapper(key__batch: Tuple, pa_schema) -> pa.Table:
def shim_mapper(key__batch: Tuple, pa_schema: pa.Schema) -> pa.Table:
key, batch = key__batch
columns = list(zip(*batch))
arrays = []
Expand All @@ -63,17 +63,17 @@ def shim_mapper(key__batch: Tuple, pa_schema) -> pa.Table:
def output(
step_id: str,
up: KeyedStream[V],
pa_schema: pa.Schema,
table_name: str,
ch_schema: str,
username: str,
password: str,
host: str = "localhost",
port: int = 8123,
database: Optional[str] = None,
ch_schema: Optional[str] = None,
database: str = "default",
order_by: str = "",
pa_schema: Optional[pa.Schema] = None,
timeout: Optional[timedelta] = 5,
max_size: Optional[int] = 50,
timeout: timedelta = timedelta(seconds=1),
max_size: int = 50,
) -> None:
r"""Produce to ClickHouse as an output sink.
Expand All @@ -91,8 +91,13 @@ def output(
:arg up: Stream of records. Key must be a `String`
and value must be serializable into an arrow table.
:arg pa_schema: Arrow schema.
:arg table_name: Table name for the writes.
:arg ch_schema: schema string of format
```column1 UInt32,\\n column2 String,\\n column3 Date```,
:arg username: database username, user must have
correct permissions.
Expand All @@ -105,19 +110,15 @@ def output(
:arg database: optional database name. If omitted
this will use the default database.
:arg ch_schema: schema string of format
```column1 UInt32,\\n column2 String,\\n column3 Date```,
:arg order_by: order by string that determines the sort of
the table for deduplication. Should be of format:
`column1, column2`
:arg pa_schema: Arrow schema.
:arg timeout: a timedelta of the amount of time to wait for
new data before writing
new data before writing. Defaults to 1 second.
:arg batch_size: the number of items to wait before writing
defaults to 50.
"""
return _to_sink(
Expand All @@ -126,6 +127,6 @@ def output(
op.output,
"kafka_output",
ClickHouseSink(
table_name, username, password, host, port, database, ch_schema, order_by
table_name, ch_schema, username, password, host, port, database, order_by
),
)

0 comments on commit 13725e8

Please sign in to comment.