Skip to content

Commit

Permalink
Release 0.5.1
Browse files Browse the repository at this point in the history
Co-authored-by: Michał Bartoszkiewicz <[email protected]>
Co-authored-by: Jan Chorowski <[email protected]>
Co-authored-by: Xavier Gendre <[email protected]>
Co-authored-by: Adrian Kosowski <[email protected]>
Co-authored-by: Jakub Kowalski <[email protected]>
Co-authored-by: Sergey Kulik <[email protected]>
Co-authored-by: Mateusz Lewandowski <[email protected]>
Co-authored-by: Mohamed Malhou <[email protected]>
Co-authored-by: Krzysztof Nowicki <[email protected]>
Co-authored-by: Richard Pelgrim <[email protected]>
Co-authored-by: Kamil Piechowiak <[email protected]>
Co-authored-by: Paweł Podhajski <[email protected]>
Co-authored-by: Olivier Ruas <[email protected]>
Co-authored-by: Przemysław Uznański <[email protected]>
Co-authored-by: Sebastian Włudzik <[email protected]>
GitOrigin-RevId: b05a3a337fefce7bce7b748d553d2f46e648178a
  • Loading branch information
16 people committed Oct 4, 2023
1 parent da5e4b5 commit bee5bdd
Show file tree
Hide file tree
Showing 23 changed files with 435 additions and 214 deletions.
13 changes: 7 additions & 6 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,22 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]

## [0.5.0] - 2023-10-04
## [0.5.1] - 2023-10-04

### Added
- `Schema` method `typehints` that returns dict of mypy-compatible typehints.
### Fixed
- `select` operates only on consistent states.

### Changed
- **BREAKING**: renamed `Table` method `dtypes` to `typehints`. It now returns a `dict` of mypy-compatible typehints.
- **BREAKING**: `Schema.__getitem__` returns a data class `ColumnSchema` containing all related information on particular column.
## [0.5.0] - 2023-10-04

### Added
- `Schema` method `typehints` that returns dict of mypy-compatible typehints.
- Support for JSON parsing from CSV sources.
- `restrict` method in `Table` to restrict table universe to the universe of the other table.
- Better support for postgresql types in the output connector.

### Changed
- **BREAKING**: renamed `Table` method `dtypes` to `typehints`. It now returns a `dict` of mypy-compatible typehints.
- **BREAKING**: `Schema.__getitem__` returns a data class `ColumnSchema` containing all related information on particular column.
- **BREAKING**: `tuple` reducer used after intervals_over window now sorts values by time.
- **BREAKING**: expressions used in `select`, `filter`, `flatten`, `with_columns`, `with_id`, `with_id_from` have to have the same universe as the table. Earlier it was possible to use an expression from a superset of a table universe. To use expressions from wider universes, one can use `restrict` on the expression source table.
- **BREAKING**: `pw.universes.promise_are_equal(t1, t2)` no longer allows to use references from `t1` and `t2` in a single expression. To change the universe of a table, use `with_universe_of`.
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pathway"
version = "0.5.0"
version = "0.5.1"
edition = "2021"
publish = false
rust-version = "1.71.0"
Expand Down
122 changes: 122 additions & 0 deletions integration_tests/s3/test_s3_interops.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import boto3
import pandas as pd
import pytest

import pathway as pw
from pathway.internals.monitoring import MonitoringLevel
Expand Down Expand Up @@ -342,3 +343,124 @@ def on_end(*args, **kwargs):
]
* 2
)


def test_s3_alternative_path(tmp_path: pathlib.Path):
input_s3_path = "integration_tests/test_s3_alternative_path/input.csv"
output_path = tmp_path / "output.csv"
model_output_path = tmp_path / "model_output.csv"

input_contents = "key,value\n1,Hello\n2,World"

put_aws_object(input_s3_path, input_contents)
write_lines(model_output_path, input_contents)

table = pw.io.s3_csv.read(
"s3://aws-integrationtest/{}".format(input_s3_path),
aws_s3_settings=pw.io.s3_csv.AwsS3Settings(
access_key="AKIAX67C7K343BP4QUWN",
secret_access_key=os.environ["AWS_S3_SECRET_ACCESS_KEY"],
region="eu-central-1",
),
value_columns=["key", "value"],
mode="static",
autocommit_duration_ms=1000,
)

pw.io.csv.write(table, str(output_path))
pw.run()

result = pd.read_csv(
output_path, usecols=["key", "value"], index_col=["key"]
).sort_index()
expected = pd.read_csv(
model_output_path, usecols=["key", "value"], index_col=["key"]
).sort_index()
assert result.equals(expected)


def test_s3_wrong_path(tmp_path: pathlib.Path):
input_s3_path = "integration_tests/test_s3_wrong_path/input.csv"
output_path = tmp_path / "output.csv"

table = pw.io.s3_csv.read(
"s3://aws-integrationtest/{}".format(input_s3_path),
aws_s3_settings=pw.io.s3_csv.AwsS3Settings(
access_key="AKIAX67C7K343BP4QUWN",
secret_access_key=os.environ["AWS_S3_SECRET_ACCESS_KEY"],
region="eu-central-1",
),
value_columns=["key", "value"],
mode="static",
autocommit_duration_ms=1000,
)

pw.io.csv.write(table, str(output_path))
with pytest.raises(
RuntimeError,
match="Creating S3 reader failed: no objects to read",
):
pw.run()


def test_s3_creds_from_profiles(tmp_path: pathlib.Path):
input_s3_path = "integration_tests/test_s3_creds_from_profiles/input.csv"
output_path = tmp_path / "output.csv"
model_output_path = tmp_path / "model_output.csv"

input_contents = "key,value\n1,Hello\n2,World"

put_aws_object(input_s3_path, input_contents)
write_lines(model_output_path, input_contents)

table = pw.io.s3_csv.read(
"s3://aws-integrationtest/{}".format(input_s3_path),
aws_s3_settings=pw.io.s3_csv.AwsS3Settings(region="eu-central-1"),
value_columns=["key", "value"],
mode="static",
autocommit_duration_ms=1000,
)

pw.io.csv.write(table, str(output_path))
pw.run()

result = pd.read_csv(
output_path, usecols=["key", "value"], index_col=["key"]
).sort_index()
expected = pd.read_csv(
model_output_path, usecols=["key", "value"], index_col=["key"]
).sort_index()
assert result.equals(expected)


def test_s3_full_autodetect(tmp_path: pathlib.Path):
input_s3_path = "integration_tests/test_s3_full_autodetect/input.csv"
output_path = tmp_path / "output.csv"
model_output_path = tmp_path / "model_output.csv"

input_contents = "key,value\n1,Hello\n2,World"

put_aws_object(input_s3_path, input_contents)
write_lines(model_output_path, input_contents)

class InputSchema(pw.Schema):
key: int
value: str

table = pw.io.s3.read(
"s3://aws-integrationtest/{}".format(input_s3_path),
format="csv",
schema=InputSchema,
mode="static",
)

pw.io.csv.write(table, str(output_path))
pw.run()

result = pd.read_csv(
output_path, usecols=["key", "value"], index_col=["key"]
).sort_index()
expected = pd.read_csv(
model_output_path, usecols=["key", "value"], index_col=["key"]
).sort_index()
assert result.equals(expected)
21 changes: 11 additions & 10 deletions integration_tests/wordcount/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,18 +226,19 @@ def get_pw_program_run_time(
needs_polling = False
finally:
if mode == STREAMING_MODE_NAME:
popen.kill()
pw_exit_code = popen.poll()
if not pw_exit_code:
popen.kill()
else:
pw_exit_code = popen.wait()
if pw_exit_code != 0:
warnings.warn(
f"Warning: pw program terminated with non zero exit code: {pw_exit_code}"
)
assert (
n_retries < 3
), "Number of retries for S3 reconnection exceeded"
needs_pw_program_launch = True
n_retries += 1

if pw_exit_code is not None and pw_exit_code != 0:
warnings.warn(
f"Warning: pw program terminated with non zero exit code: {pw_exit_code}"
)
assert n_retries < 3, "Number of retries for S3 reconnection exceeded"
needs_pw_program_launch = True
n_retries += 1

return time.time() - time_start

Expand Down
18 changes: 5 additions & 13 deletions integration_tests/wordcount/test_new_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,13 @@
)


@pytest.mark.parametrize("n_cpus", [1, 2, 4])
@pytest.mark.parametrize("pstorage_type", [S3_STORAGE_NAME, FS_STORAGE_NAME])
@pytest.mark.parametrize(
"n_backfilling_runs,n_cpus,mode,pstorage_type",
"n_backfilling_runs,mode",
[
(3, 1, STREAMING_MODE_NAME, S3_STORAGE_NAME),
(3, 2, STREAMING_MODE_NAME, S3_STORAGE_NAME),
(3, 4, STREAMING_MODE_NAME, S3_STORAGE_NAME),
(3, 1, STATIC_MODE_NAME, S3_STORAGE_NAME),
(3, 2, STATIC_MODE_NAME, S3_STORAGE_NAME),
(3, 4, STATIC_MODE_NAME, S3_STORAGE_NAME),
(3, 1, STREAMING_MODE_NAME, FS_STORAGE_NAME),
(3, 2, STREAMING_MODE_NAME, FS_STORAGE_NAME),
(3, 4, STREAMING_MODE_NAME, FS_STORAGE_NAME),
(3, 1, STATIC_MODE_NAME, FS_STORAGE_NAME),
(3, 2, STATIC_MODE_NAME, FS_STORAGE_NAME),
(3, 4, STATIC_MODE_NAME, FS_STORAGE_NAME),
(3, STREAMING_MODE_NAME),
(3, STATIC_MODE_NAME),
],
)
def test_integration_new_data(
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ dependencies = [
"rich >= 12.6.0",
"diskcache >= 5.2.1",
"exceptiongroup >= 1.1.3; python_version < '3.11'",
"boto3 >= 1.26.76",
]

[project.optional-dependencies]
Expand Down
33 changes: 32 additions & 1 deletion python/pathway/internals/_io_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,25 @@

from __future__ import annotations

import boto3

from pathway.internals import api
from pathway.internals import dtype as dt
from pathway.internals import schema
from pathway.internals.table import Table
from pathway.internals.trace import trace_user_frame

S3_PATH_PREFIX = "s3://"
S3_DEFAULT_REGION = "us-east-1"
S3_LOCATION_FIELD = "LocationConstraint"


class AwsS3Settings:
@trace_user_frame
def __init__(
self,
bucket_name,
*,
bucket_name=None,
access_key=None,
secret_access_key=None,
with_path_style=False,
Expand All @@ -40,6 +46,31 @@ def __init__(
endpoint,
)

@classmethod
def new_from_path(cls, s3_path: str):
starts_with_prefix = s3_path.startswith(S3_PATH_PREFIX)
has_extra_chars = len(s3_path) > len(S3_PATH_PREFIX)
if not starts_with_prefix or not has_extra_chars:
raise ValueError("Incorrect S3 path: {}".format(s3_path))
bucket = s3_path[len(S3_PATH_PREFIX) :].split("/")[0]

# the crate we use on the Rust-engine side can't detect the location
# of a bucket, so it's done on the Python side
s3_client = boto3.client("s3")
location_response = s3_client.get_bucket_location(Bucket=bucket)

# Buckets in Region us-east-1 have a LocationConstraint of None
location_constraint = location_response[S3_LOCATION_FIELD]
if location_constraint is None:
region = S3_DEFAULT_REGION
else:
region = location_constraint.split("|")[0]

return cls(
bucket_name=bucket,
region=region,
)


def _format_output_value_fields(table: Table) -> list[api.ValueField]:
value_fields = []
Expand Down
5 changes: 0 additions & 5 deletions python/pathway/internals/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,11 +320,6 @@ class TableRestrictedRowwiseContext(RowwiseContext):
table: pw.Table


@dataclass(eq=False, frozen=True)
class CopyContext(Context):
"""Context used by operators not changing the columns."""


@dataclass(eq=False, frozen=True)
class GroupedContext(Context):
"""Context of `table.groupby().reduce() operation."""
Expand Down
4 changes: 0 additions & 4 deletions python/pathway/internals/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ def contextualized_operator(func):
return _operator_wrapper(func, op.ContextualizedIntermediateOperator)


def non_contextualized_operator(func):
return _operator_wrapper(func, op.NonContextualizedIntermediateOperator)


def _operator_wrapper(func: Callable, operator_cls: type[op.OperatorFromDef]):
fn_spec = function_spec(func)

Expand Down
6 changes: 0 additions & 6 deletions python/pathway/internals/graph_runner/expression_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -810,12 +810,6 @@ def _dereference(self, expression: expr.ColumnReference):
return super()._dereference(expression)


class CopyEvaluator(ExpressionEvaluator, context_type=clmn.CopyContext):
def run(self, output_storage: Storage, *input_storages: Storage) -> api.Table:
[input_storage] = input_storages
return self.state.get_table(input_storage)


class FilterEvaluator(ExpressionEvaluator, context_type=clmn.FilterContext):
context: clmn.FilterContext

Expand Down
13 changes: 0 additions & 13 deletions python/pathway/internals/graph_runner/operator_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
DebugOperator,
InputOperator,
IterateOperator,
NonContextualizedIntermediateOperator,
Operator,
OutputOperator,
)
Expand Down Expand Up @@ -280,18 +279,6 @@ def _does_not_need_evaluation(self, column) -> bool:
return self.state.has_column(column) or self.scope_context.skip_column(column)


class NonContextualizedIntermediateOperatorHandler(
OperatorHandler[NonContextualizedIntermediateOperator],
operator_type=NonContextualizedIntermediateOperator,
):
def _run(
self,
operator: NonContextualizedIntermediateOperator,
output_storages: dict[Table, Storage],
):
pass


class DebugOperatorHandler(
OperatorHandler[DebugOperator],
operator_type=DebugOperator,
Expand Down
1 change: 0 additions & 1 deletion python/pathway/internals/graph_runner/path_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,6 @@ def compute(
class NoNewColumnsPathEvaluator(
PathEvaluator,
context_types=[
clmn.CopyContext,
clmn.FilterContext,
clmn.ReindexContext,
clmn.IntersectContext,
Expand Down
Loading

0 comments on commit bee5bdd

Please sign in to comment.