From 4fca74bb8c72e0d03378369bc8462b3d89b6b6c4 Mon Sep 17 00:00:00 2001 From: Manul from Pathway Date: Thu, 16 Nov 2023 11:05:33 +0100 Subject: [PATCH] Release 0.7.0 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Michał Bartoszkiewicz Co-authored-by: Jan Chorowski Co-authored-by: Xavier Gendre Co-authored-by: Adrian Kosowski Co-authored-by: Jakub Kowalski Co-authored-by: Sergey Kulik Co-authored-by: Mateusz Lewandowski Co-authored-by: Mohamed Malhou Co-authored-by: Krzysztof Nowicki Co-authored-by: Richard Pelgrim Co-authored-by: Kamil Piechowiak Co-authored-by: Paweł Podhajski Co-authored-by: Olivier Ruas Co-authored-by: Przemysław Uznański Co-authored-by: Sebastian Włudzik GitOrigin-RevId: 71c8b3e511c0ea3b530530ca733d3b1cb717a198 --- CHANGELOG.md | 13 +- CONTRIBUTING.md | 2 + Cargo.lock | 2 +- Cargo.toml | 2 +- README.md | 75 +++-- .../webserver/test_rest_connector.py | 24 +- python/pathway/debug/__init__.py | 117 +++---- python/pathway/internals/arg_handlers.py | 6 +- python/pathway/internals/column.py | 306 ++++++++++++++---- python/pathway/internals/column_properties.py | 1 - python/pathway/internals/desugaring.py | 7 +- .../internals/graph_runner/__init__.py | 9 +- .../graph_runner/expression_evaluator.py | 4 +- .../internals/graph_runner/path_evaluator.py | 4 +- .../pathway/internals/graph_runner/state.py | 13 - python/pathway/internals/groupby.py | 17 +- python/pathway/internals/join.py | 34 +- python/pathway/internals/schema.py | 41 ++- python/pathway/internals/table.py | 255 +++++---------- python/pathway/internals/table_like.py | 10 +- python/pathway/internals/type_interpreter.py | 9 + python/pathway/internals/universe_solver.py | 4 + python/pathway/internals/universes.py | 3 +- python/pathway/io/_utils.py | 17 + python/pathway/io/fs/__init__.py | 2 - python/pathway/io/http/_server.py | 31 +- python/pathway/io/python/__init__.py | 9 +- python/pathway/io/redpanda/__init__.py | 8 +- python/pathway/stdlib/temporal/__init__.py | 6 +- .../pathway/stdlib/temporal/_interval_join.py | 16 +- python/pathway/stdlib/temporal/_window.py | 52 ++- .../stdlib/temporal/temporal_behavior.py | 63 +++- python/pathway/stdlib/temporal/utils.py | 13 +- python/pathway/stdlib/utils/__init__.py | 8 +- python/pathway/tests/ml/test_index.py | 19 +- .../temporal/test_interval_joins_stream.py | 4 +- python/pathway/tests/temporal/test_windows.py | 4 +- .../tests/temporal/test_windows_stream.py | 175 ++++++++-- .../pathway/tests/test_column_properties.py | 44 +++ python/pathway/tests/test_io.py | 135 ++++++-- python/pathway/tests/test_schema.py | 8 + python/pathway/tests/utils.py | 44 +++ src/connectors/data_storage.rs | 269 ++++++++------- src/connectors/metadata.rs | 19 +- src/connectors/mod.rs | 38 ++- src/python_api.rs | 6 - src/timestamp.rs | 7 + tests/helpers.rs | 3 + tests/test_bytes.rs | 2 +- tests/test_connector_field_defaults.rs | 8 - tests/test_debezium.rs | 2 - tests/test_dsv.rs | 10 +- tests/test_dsv_dir.rs | 7 - tests/test_jsonlines.rs | 11 - tests/test_metadata.rs | 8 - tests/test_seek.rs | 2 - 56 files changed, 1275 insertions(+), 733 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 31c992ac..dc031bb7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,10 +2,19 @@ All notable changes to this project will be documented in this file. -The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), -and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +### Added +- class `Behavior`, a superclass of all behavior classes +- class `ExactlyOnceBehavior` indicating we want to create a `CommonBehavior` that results in each window prodcucing exactly one output (shifted in time by an optional `shift` parameter) +- function `exactly_once_behavior` creating an instance of `ExactlyOnceBehavior` + +### Changed +- **BREAKING**: `WindowBehavior` is now called `CommonBehavior`, as it can be also used with interval joins +- **BREAKING**: `window_behavior` is now called `common_behavior`, as it can be also used with interval joins +- Deprecating parameter `keep_queries` in `pw.io.http.rest_connector`. Now `delete_completed_queries` with an opposite meaning should be used instead. The default is still `delete_completed_queries=True` (equivalent to `keep_queries=False`) but it will soon be required to be set explicitly. + ## [0.6.0] - 2023-11-10 ### Added diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 972c64f3..1329ec0a 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -26,6 +26,8 @@ After a pull request is opened it will be reviewed, and merged after passing continuous integration tests and being accepted by a project or sub-system maintainer. +We maintain a [Changelog](https://github.com/pathwaycom/pathway/blob/main/CHANGELOG.md) where all notable changes to this project are documented. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). + We ask that developers sign our [contributor license agreement](https://cla-assistant.io/pathwaycom/pathway). The process of signing the CLA is automated, and you'll be prompted with instructions diff --git a/Cargo.lock b/Cargo.lock index d792224d..0b268c9e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1434,7 +1434,7 @@ dependencies = [ [[package]] name = "pathway" -version = "0.6.0" +version = "0.7.0" dependencies = [ "arc-swap", "arcstr", diff --git a/Cargo.toml b/Cargo.toml index 613dc7c4..dd6ed54d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pathway" -version = "0.6.0" +version = "0.7.0" edition = "2021" publish = false rust-version = "1.72.0" diff --git a/README.md b/README.md index 38e8297f..43d1badc 100644 --- a/README.md +++ b/README.md @@ -16,11 +16,22 @@ alt="follow on Twitter"> follow on LinkedIn +
+ Getting Started | + Example | + Performance | + Deployment | + Resources | + Documentation | + Blog | + Get Help

-# Pathway + + +# Pathway [Pathway](https://pathway.com) is an open framework for high-throughput and low-latency real-time data processing. It is used to create Python code which seamlessly combines batch processing, streaming, and real-time API's for LLM apps. Pathway's distributed runtime (🦀-🐍) provides fresh results of your data pipelines whenever new inputs and requests are received. @@ -37,7 +48,10 @@ In Pathway, data is represented in the form of Tables. Live data streams are als For any questions, you will find the community and team behind the project [on Discord](https://discord.com/invite/pathway). -## Installation +## Getting started + + +### Installation Pathway requires Python 3.10 or above. @@ -49,7 +63,7 @@ $ pip install -U pathway ⚠️ Pathway is available on MacOS and Linux. Users of other systems should run Pathway on a Virtual Machine. -## Getting started +### Running Pathway locally To use Pathway, you only need to import it: @@ -78,7 +92,7 @@ $ pathway spawn --threads 3 python main.py To jumpstart a Pathway project, you can use our [cookiecutter template](https://github.com/pathwaycom/cookiecutter-pathway). -### Example +### Example ```python import pathway as pw @@ -99,7 +113,18 @@ pw.run() Run this example [in Google Colab](https://colab.research.google.com/drive/1kLx5-vKKg0IeQ88ydS-ehtrxSujEZrXK?usp=sharing)! -## Monitoring Pathway +## Deployment + +Do you feel limited by a local run? +If you want to scale your Pathway application, you may be interested in our Pathway for Enterprise. +Pathway for Enterprise is specially tailored towards end-to-end data processing and real time intelligent analytics. +It scales using distributed computing on the cloud and supports Kubernetes deployment. + +You can learn more about the features of Pathway for Enterprise on our [website](https://pathway.com/features). + +If you are interested, don't hesitate to [contact us](mailto:contact@pathway.com) to learn more. + +## Monitoring Pathway Pathway comes with a monitoring dashboard that allows you to keep track of the number of messages sent by each connector and the latency of the system. The dashboard also includes log messages. @@ -109,18 +134,18 @@ This dashboard is enabled by default; you can disable it by passing `monitoring_ In addition to Pathway's built-in dashboard, you can [use Prometheus](https://pathway.com/developers/tutorials/prometheus-monitoring) to monitor your Pathway application. -## Resources +## Resources -See also: **[Pathway Developer Resources](https://pathway.com/developers/)** webpage (including API Docs). +See also: **📖 [Pathway Documentation](https://pathway.com/developers/)** webpage (including API Docs). -### Videos about Pathway +### Videos about Pathway [▶️ Building an LLM Application without a vector database](https://www.youtube.com/watch?v=kcrJSk00duw) - by [Jan Chorowski](https://scholar.google.com/citations?user=Yc94070AAAAJ) (7min 56s) [▶️ Linear regression on a Kafka Stream](https://vimeo.com/805069039) - by [Richard Pelgrim](https://twitter.com/richardpelgrim) (7min 53s) [▶️ Introduction to reactive data processing](https://pathway.com/developers/user-guide/introduction/welcome) - by [Adrian Kosowski](https://scholar.google.com/citations?user=om8De_0AAAAJ) (27min 54s) -### Guides +### Guides - [Core concepts of Pathway](https://pathway.com/developers/user-guide/introduction/key-concepts/) - [Basic operations](https://pathway.com/developers/user-guide/introduction/survival-guide/) - [Joins](https://pathway.com/developers/user-guide/table-operations/join-manual/) @@ -132,7 +157,7 @@ See also: **[Pathway Developer Resources](https://pathway.com/developers/)** web - [API docs](https://pathway.com/developers/api-docs/pathway) - [Troubleshooting](https://pathway.com/developers/user-guide/introduction/troubleshooting/) -### Tutorials +### Tutorials - [Linear regression on a Kafka Stream](https://pathway.com/developers/tutorials/linear_regression_with_kafka/) ([video](https://vimeo.com/805069039)) - Joins: - [Interval joins](https://pathway.com/developers/tutorials/fleet_eta_interval_join/) @@ -147,12 +172,12 @@ See also: **[Pathway Developer Resources](https://pathway.com/developers/)** web - [Monitoring Pathway with Prometheus](https://pathway.com/developers/tutorials/prometheus-monitoring/) - [Time between events in a multi-topic event stream](https://pathway.com/developers/tutorials/event_stream_processing_time_between_occurrences/) -### Showcases +### Showcases - [Realtime Twitter Analysis App](https://pathway.com/developers/showcases/twitter/) - [Realtime classification with Nearest Neighbors](https://pathway.com/developers/showcases/lsh/lsh_chapter1/) - [Realtime Fuzzy joins](https://pathway.com/developers/showcases/fuzzy_join/fuzzy_join_chapter1/) -### External and community content +### External and community content - [Real-time linear regression (Data Engineering Weekly)](https://pathway.com/developers/tutorials/unlocking-data-stream-processing-1/) - [Realtime server logs monitoring (Data Engineering Weekly)](https://pathway.com/developers/tutorials/unlocking-data-stream-processing-2/) - [Data enrichment with fuzzy joins (Data Engineering Weekly)](https://pathway.com/developers/tutorials/unlocking-data-stream-processing-3/) @@ -160,13 +185,13 @@ See also: **[Pathway Developer Resources](https://pathway.com/developers/)** web If you would like to share with us some Pathway-related content, please give an admin a shout on [Discord](https://discord.gg/pathway). -### Manul conventions +### Manul conventions Manuls (aka Pallas's Cats) [are creatures with fascinating habits](https://www.youtube.com/watch?v=rlSTBvViflc). As a tribute to them, we usually read `pw`, one of the most frequent tokens in Pathway code, as: `"paw"`. manul -## Performance +## Performance Pathway is made to outperform state-of-the-art technologies designed for streaming and batch data processing tasks, including: Flink, Spark, and Kafka Streaming. It also makes it possible to implement a lot of algorithms/UDF's in streaming mode which are not readily supported by other streaming frameworks (especially: temporal joins, iterative graph algorithms, machine learning routines). @@ -176,29 +201,39 @@ If you are curious, here are [some benchmarks to play with](https://github.com/p If you try your own benchmarks, please don't hesitate to let us know. We investigate situations in which Pathway is underperforming on par with bugs (i.e., to our knowledge, they shouldn't happen...). -## Coming soon +## Coming soon -Pathway continues to evolve and gain new capabilities. Here are some exciting new features that we plan to incorporate in the near future: +Here are some features we plan to incorporate in the near future: - Enhanced monitoring, observability, and data drift detection (integrates with Grafana visualization and other dashboarding tools). - New connectors: interoperability with Delta Lake and Snowflake data sources. - Easier connection setup for MongoDB. - More performant garbage collection. -Stay up to date with the latest developments and news surrounding Pathway on [our blog](https://pathway.com/blog/), or [subscribe to our newsletter]. -## Dependencies +## Dependencies Pathway is made to run in a "clean" Linux/MacOS + Python environment. When installing the pathway package with `pip` (from a wheel), you are likely to encounter a small number of Python package dependencies, such as sqlglot (used in the SQL API) and python-sat (useful for resolving dependencies during compilation). All necessary Rust crates are pre-built; the Rust compiler is not required to install Pathway, unless building from sources. A modified version of Timely/Differential Dataflow (which provides a dataflow assembly layer) is part of this repo. -## License +## License Pathway is distributed on a [BSL 1.1 License](https://github.com/pathwaycom/pathway/blob/main/LICENSE.txt) which allows for unlimited non-commercial use, as well as use of the Pathway package [for most commercial purposes](https://pathway.com/license/), free of charge. Code in this repository automatically converts to Open Source (Apache 2.0 License) after 4 years. Some [public repos](https://github.com/pathwaycom) which are complementary to this one (examples, libraries, connectors, etc.) are licensed as Open Source, under the MIT license. -## Contribution guidelines +## Contribution guidelines If you develop a library or connector which you would like to integrate with this repo, we suggest releasing it first as a separate repo on a MIT/Apache 2.0 license. For all concerns regarding core Pathway functionalities, Issues are encouraged. For further information, don't hesitate to engage with Pathway's [Discord community](https://discord.gg/pathway). + +## Get Help + +If you have any questions, issues, or just want to chat about Pathway, we're here to help! Feel free to: +- Check out the [documentation](https://pathway.com/developers/) for detailed information. +- [Open an issue on GitHub](https://github.com/pathwaycom/pathway/issues) if you encounter any bugs or have feature requests. +- Join us on [Discord](https://discord.com/invite/pathway) to connect with other users and get support. +- Reach out to us via email at [contact@pathway.com](mailto:contact@pathway.com). + + Our team is always happy to help you and ensure that you get the most out of Pathway. +If you would like to better understand how best to use Pathway in your project, please don't hesitate to reach out to us. \ No newline at end of file diff --git a/integration_tests/webserver/test_rest_connector.py b/integration_tests/webserver/test_rest_connector.py index 61ced2f5..a36a3e37 100644 --- a/integration_tests/webserver/test_rest_connector.py +++ b/integration_tests/webserver/test_rest_connector.py @@ -32,17 +32,21 @@ def logic(queries: pw.Table) -> pw.Table: def target(): time.sleep(5) - requests.post( + r = requests.post( f"http://127.0.0.1:{port}", json={"query": "one", "user": "sergey"}, - ).raise_for_status() - requests.post( + ) + r.raise_for_status() + assert r.text == '"ONE"', r.text + r = requests.post( f"http://127.0.0.1:{port}", json={"query": "two", "user": "sergey"}, - ).raise_for_status() + ) + r.raise_for_status() + assert r.text == '"TWO"', r.text queries, response_writer = pw.io.http.rest_connector( - host="127.0.0.1", port=port, schema=InputSchema + host="127.0.0.1", port=port, schema=InputSchema, delete_completed_queries=True ) responses = logic(queries) response_writer(responses) @@ -80,7 +84,11 @@ def target(): ).raise_for_status() queries, response_writer = pw.io.http.rest_connector( - host="127.0.0.1", port=port, schema=InputSchema, route="/endpoint" + host="127.0.0.1", + port=port, + schema=InputSchema, + route="/endpoint", + delete_completed_queries=True, ) responses = logic(queries) response_writer(responses) @@ -118,7 +126,7 @@ def target(): ).raise_for_status() queries, response_writer = pw.io.http.rest_connector( - host="127.0.0.1", port=port, schema=InputSchema + host="127.0.0.1", port=port, schema=InputSchema, delete_completed_queries=True ) responses = logic(queries) response_writer(responses) @@ -151,7 +159,7 @@ def target(): ).raise_for_status() queries, response_writer = pw.io.http.rest_connector( - host="127.0.0.1", port=port, schema=InputSchema, delete_queries=False + host="127.0.0.1", port=port, schema=InputSchema, delete_completed_queries=False ) response_writer(queries.select(query_id=queries.id, result=pw.this.v)) diff --git a/python/pathway/debug/__init__.py b/python/pathway/debug/__init__.py index a1f7cb85..acc1f24a 100644 --- a/python/pathway/debug/__init__.py +++ b/python/pathway/debug/__init__.py @@ -122,7 +122,7 @@ def table_to_pandas(table: Table): @runtime_type_check @trace_user_frame -def _table_from_pandas( +def table_from_pandas( df: pd.DataFrame, id_from: list[str] | None = None, unsafe_trusted_ids: bool = False, @@ -205,6 +205,10 @@ def table_to_parquet(table: Table, filename: str | PathLike): return df.to_parquet(filename) +# XXX: clean this up +table_from_markdown = parse_to_table + + class _EmptyConnectorSubject(ConnectorSubject): def run(self): pass @@ -215,7 +219,7 @@ class StreamGenerator: events: dict[tuple[str, int], list[api.SnapshotEvent]] = {} def _get_next_persistent_id(self) -> str: - return str(next(self._persistent_id)) + return str(f"_stream_generator_{next(self._persistent_id)}") def _advance_time_for_all_workers( self, persistent_id: str, workers: Iterable[int], timestamp: int @@ -283,6 +287,15 @@ def table_from_list_of_batches_by_workers( batches: list[dict[int, list[dict[str, api.Value]]]], schema: type[Schema], ) -> Table: + """ + A function that creates a table from a list of batches, where each batch is a mapping + from worker id to a list of rows processed by this worker in this batch. + Each row is a mapping from column name to a value. + + Args: + batches: list of batches to be put in the table + schema: schema of the table + """ key = itertools.count() schema, api_schema = read_schema(schema=schema) value_fields: list[api.ValueField] = api_schema["value_fields"] @@ -313,6 +326,14 @@ def table_from_list_of_batches( batches: list[list[dict[str, api.Value]]], schema: type[Schema], ) -> Table: + """ + A function that creates a table from a list of batches, where each batch is a list of + rows in this batch. Each row is a mapping from column name to a value. + + Args: + batches: list of batches to be put in the table + schema: schema of the table + """ batches_by_worker = [{0: batch} for batch in batches] return self.table_from_list_of_batches_by_workers(batches_by_worker, schema) @@ -323,6 +344,12 @@ def table_from_pandas( unsafe_trusted_ids: bool = False, schema: type[Schema] | None = None, ) -> Table: + """ + A function for creating a table from a pandas DataFrame. If the DataFrame + contains a column ``_time``, rows will be split into batches with timestamps from ``_time`` column. + Then ``_worker`` column will be interpreted as the id of a worker which will process the row and + ``_diff`` column as an event type with ``1`` treated as inserting row and ``-1`` as removing. + """ if schema is None: schema = schema_from_pandas( df, exclude_columns=["_time", "_diff", "_worker"] @@ -337,11 +364,6 @@ def table_from_pandas( if "_diff" not in df: df["_diff"] = [1] * len(df) - persistent_id = self._get_next_persistent_id() - workers = set(df["_worker"]) - for worker in workers: - self.events[(persistent_id, worker)] = [] - batches: dict[ int, dict[int, list[tuple[int, api.Pointer, list[api.Value]]]] ] = {} @@ -380,10 +402,21 @@ def table_from_markdown( unsafe_trusted_ids: bool = False, schema: type[Schema] | None = None, ) -> Table: + """ + A function for creating a table from its definition in markdown. If it + contains a column ``_time``, rows will be split into batches with timestamps from ``_time`` column. + Then ``_worker`` column will be interpreted as the id of a worker which will process the row and + ``_diff`` column as an event type - with ``1`` treated as inserting row and ``-1`` as removing. + """ df = _markdown_to_pandas(table) return self.table_from_pandas(df, id_from, unsafe_trusted_ids, schema) def persistence_config(self) -> persistence.Config | None: + """ + Returns a persistece config to be used during run. Needs to be passed to ``pw.run`` + so that tables created using StreamGenerator are filled with data. + """ + if len(self.events) == 0: return None return persistence.Config.simple_config( @@ -391,73 +424,3 @@ def persistence_config(self) -> persistence.Config | None: snapshot_access=api.SnapshotAccess.REPLAY, replay_mode=api.ReplayMode.SPEEDRUN, ) - - -stream_generator = StreamGenerator() - - -def table_from_list_of_batches_by_workers( - batches: list[dict[int, list[dict[str, api.Value]]]], - schema: type[Schema], -) -> Table: - """ - A function that creates a table from a list of batches, where each batch is a mapping - from worker id to a list of rows processed by this worker in this batch. - Each row is a mapping from column name to a value. - - Args: - batches: list of batches to be put in the table - schema: schema of the table - """ - return stream_generator.table_from_list_of_batches_by_workers(batches, schema) - - -def table_from_list_of_batches( - batches: list[list[dict[str, api.Value]]], - schema: type[Schema], -) -> Table: - """ - A function that creates a table from a list of batches, where each batch is a list of - rows in this batch. Each row is a mapping from column name to a value. - - Args: - batches: list of batches to be put in the table - schema: schema of the table - """ - return stream_generator.table_from_list_of_batches(batches, schema) - - -def table_from_pandas( - df: pd.DataFrame, - id_from: list[str] | None = None, - unsafe_trusted_ids: bool = False, - schema: type[Schema] | None = None, -): - """ - A function for creating a table from a pandas DataFrame. If the DataFrame - contains a column ``_time``, rows will be split into batches with timestamps from ``_time`` column. - Then ``_worker`` column will be interpreted as the id of a worker which will process the row and - ``_diff`` column as an event type with ``1`` treated as inserting row and ``-1`` as removing. - """ - if "_time" in df: - return stream_generator.table_from_pandas( - df, id_from, unsafe_trusted_ids, schema - ) - else: - return _table_from_pandas(df, id_from, unsafe_trusted_ids, schema) - - -def table_from_markdown( - table_def: str, - id_from: list[str] | None = None, - unsafe_trusted_ids: bool = False, - schema: type[Schema] | None = None, -) -> Table: - """ - A function for creating a table from its definition in markdown. If it - contains a column ``_time``, rows will be split into batches with timestamps from ``_time`` column. - Then ``_worker`` column will be interpreted as the id of a worker which will process the row and - ``_diff`` column as an event type - with ``1`` treated as inserting row and ``-1`` as removing. - """ - df = _markdown_to_pandas(table_def) - return table_from_pandas(df, id_from, unsafe_trusted_ids, schema) diff --git a/python/pathway/internals/arg_handlers.py b/python/pathway/internals/arg_handlers.py index 67b77a7f..cb1484fa 100644 --- a/python/pathway/internals/arg_handlers.py +++ b/python/pathway/internals/arg_handlers.py @@ -126,11 +126,11 @@ def handler(self, other, *on, **kwargs): if "behavior" in kwargs: behavior = processed_kwargs["behavior"] = kwargs.pop("behavior") - from pathway.stdlib.temporal import WindowBehavior + from pathway.stdlib.temporal import CommonBehavior - if not isinstance(behavior, WindowBehavior): + if not isinstance(behavior, CommonBehavior): raise ValueError( - "The behavior argument of join should be of type pathway.temporal.WindowBehavior." + "The behavior argument of join should be of type pathway.temporal.CommonBehavior." ) if kwargs: diff --git a/python/pathway/internals/column.py b/python/pathway/internals/column.py index 17f0d252..6b59f94f 100644 --- a/python/pathway/internals/column.py +++ b/python/pathway/internals/column.py @@ -14,15 +14,16 @@ from pathway.internals import column_properties as cp, dtype as dt, trace from pathway.internals.expression import ColumnExpression, ColumnReference from pathway.internals.helpers import SetOnceProperty, StableSet +from pathway.internals.parse_graph import G +from pathway.internals.universe import Universe if TYPE_CHECKING: from pathway.internals.expression import InternalColRef from pathway.internals.operator import OutputHandle from pathway.internals.table import Table - from pathway.internals.universe import Universe -@dataclass(eq=False, frozen=True) +@dataclass(frozen=True) class Lineage: source: OutputHandle """Source handle.""" @@ -32,7 +33,7 @@ def trace(self) -> trace.Trace: return self.source.operator.trace -@dataclass(eq=False, frozen=True) +@dataclass(frozen=True) class ColumnLineage(Lineage): name: str """Original name of a column.""" @@ -63,7 +64,7 @@ def __init__(self, universe: Universe) -> None: self._trace = trace.Trace.from_traceback() def column_dependencies(self) -> StableSet[Column]: - return StableSet([self]) + return StableSet([]) @property def trace(self) -> trace.Trace: @@ -117,7 +118,11 @@ class ColumnWithContext(Column, ABC): context: Context - def __init__(self, context: Context, universe: Universe): + def __init__( + self, + context: Context, + universe: Universe, + ): super().__init__(universe) self.context = context @@ -143,6 +148,20 @@ def context_dtype(self) -> dt.DType: return dt.POINTER +class MaterializedIdColumn(IdColumn): + context: MaterializedContext + + def __init__( + self, context: MaterializedContext, properties: cp.ColumnProperties + ) -> None: + super().__init__(context) + self._properties = properties + + @property + def properties(self) -> cp.ColumnProperties: + return self._properties + + class ColumnWithExpression(ColumnWithContext): """Column holding expression and context.""" @@ -210,26 +229,31 @@ def __post_init__(self): assert all((column.universe == self.universe) for column in self.columns) -def _create_internal_table( - columns: Iterable[Column], universe: Universe, context: Context -) -> Table: +def _create_internal_table(columns: Iterable[Column], context: Context) -> Table: from pathway.internals.table import Table columns_dict = {f"{i}": column for i, column in enumerate(columns)} - return Table(columns_dict, universe, id_column=IdColumn(context)) + return Table(columns_dict, context=context) @dataclass(eq=False, frozen=True) -class Context: +class Context(ABC): """Context of the column evaluation. Context will be mapped to proper evaluator based on its type. """ - universe: Universe - """Resulting universe.""" _column_properties_evaluator: ClassVar[type[cp.ColumnPropertiesEvaluator]] + @cached_property + def id_column(self) -> IdColumn: + return IdColumn(self) + + @property + @abstractmethod + def universe(self) -> Universe: + ... + def column_dependencies_external(self) -> Iterable[Column]: return [] @@ -240,7 +264,8 @@ def column_dependencies(self) -> StableSet[Column]: # columns depend on columns in their context, not dependencies of columns in context return StableSet( chain( - self.column_dependencies_external(), self.column_dependencies_internal() + self.column_dependencies_external(), + self.column_dependencies_internal(), ) ) @@ -269,21 +294,17 @@ def intermediate_tables(self) -> Iterable[Table]: dependencies = list(self.column_dependencies_internal()) if len(dependencies) == 0: return [] - universe = None context = None columns: list[ColumnWithContext] = [] for column in dependencies: assert isinstance( column, ColumnWithContext ), f"Column {column} that is not ColumnWithContext appeared in column_dependencies_internal()" - assert universe is None or universe == column.universe assert context is None or context == column.context columns.append(column) - universe = column.universe context = column.context - assert universe is not None assert context is not None - return [_create_internal_table(columns, universe, context)] + return [_create_internal_table(columns, context)] def column_properties(self, column: ColumnWithContext) -> cp.ColumnProperties: return self._column_properties_evaluator().eval(column) @@ -306,16 +327,42 @@ class RowwiseContext( ): """Context for basic expressions.""" + _id_column: IdColumn + + def column_dependencies_external(self) -> Iterable[Column]: + return [self._id_column] + def universe_dependencies(self) -> Iterable[Universe]: return [self.universe] + @property + def universe(self) -> Universe: + return self._id_column.universe + + +@dataclass(eq=False, frozen=True) +class MaterializedContext(Context): + _universe: Universe + _universe_properties: cp.ColumnProperties = cp.ColumnProperties(dtype=dt.POINTER) + + def universe_dependencies(self) -> Iterable[Universe]: + return [self.universe] + + @property + def id_column(self) -> MaterializedIdColumn: + return MaterializedIdColumn(self, self._universe_properties) + + @property + def universe(self) -> Universe: + return self._universe + @dataclass(eq=False, frozen=True) class GradualBroadcastContext(Context): + orig_id_column: IdColumn lower_column: ColumnWithExpression value_column: ColumnWithExpression upper_column: ColumnWithExpression - apx_value_column: MaterializedColumn def column_dependencies_internal(self) -> Iterable[Column]: return [self.lower_column, self.value_column, self.upper_column] @@ -323,6 +370,14 @@ def column_dependencies_internal(self) -> Iterable[Column]: def universe_dependencies(self) -> Iterable[Universe]: return [self.universe, self.value_column.universe] + @cached_property + def apx_value_column(self): + return MaterializedColumn(self.universe, cp.ColumnProperties(dtype=dt.FLOAT)) + + @property + def universe(self) -> Universe: + return self.orig_id_column.universe + @dataclass(eq=False, frozen=True) class TableRestrictedRowwiseContext(RowwiseContext): @@ -353,6 +408,10 @@ def column_dependencies_external(self) -> Iterable[Column]: def universe_dependencies(self) -> Iterable[Universe]: return [self.inner_context.universe] + @cached_property + def universe(self) -> Universe: + return Universe() + @dataclass(eq=False, frozen=True) class FilterContext( @@ -361,28 +420,42 @@ class FilterContext( """Context of `table.filter() operation.""" filtering_column: ColumnWithExpression - universe_to_filter: Universe + id_column_to_filter: IdColumn def column_dependencies_internal(self) -> Iterable[Column]: return [self.filtering_column] + def column_dependencies_external(self) -> Iterable[Column]: + return [self.id_column_to_filter] + def universe_dependencies(self) -> Iterable[Universe]: - return [self.universe_to_filter] + return [self.id_column_to_filter.universe] + + @cached_property + def universe(self) -> Universe: + return self.id_column_to_filter.universe.subset() @dataclass(eq=False, frozen=True) class TimeColumnContext(Context): """Context of operations that use time columns.""" - orig_universe: Universe + orig_id_column: IdColumn threshold_column: ColumnWithExpression time_column: ColumnWithExpression def column_dependencies_internal(self) -> Iterable[Column]: return [self.threshold_column, self.time_column] + def column_dependencies_external(self) -> Iterable[Column]: + return [self.orig_id_column] + def universe_dependencies(self) -> Iterable[Universe]: - return [self.orig_universe] + return [self.orig_id_column.universe] + + @cached_property + def universe(self) -> Universe: + return self.orig_id_column.universe.subset() @dataclass(eq=False, frozen=True) @@ -396,20 +469,34 @@ class ForgetContext(TimeColumnContext): class ForgetImmediatelyContext(Context): """Context of `table._forget_immediately operation.""" - orig_universe: Universe + orig_id_column: IdColumn + + def column_dependencies_external(self) -> Iterable[Column]: + return [self.orig_id_column] def universe_dependencies(self) -> Iterable[Universe]: - return [self.orig_universe] + return [self.orig_id_column.universe] + + @cached_property + def universe(self) -> Universe: + return self.orig_id_column.universe.subset() @dataclass(eq=False, frozen=True) class FilterOutForgettingContext(Context): """Context of `table._filter_out_results_of_forgetting() operation.""" - orig_universe: Universe + orig_id_column: IdColumn + + def column_dependencies_external(self) -> Iterable[Column]: + return [self.orig_id_column] def universe_dependencies(self) -> Iterable[Universe]: - return [self.orig_universe] + return [self.orig_id_column.universe] + + @cached_property + def universe(self) -> Universe: + return self.orig_id_column.universe.superset() @dataclass(eq=False, frozen=True) @@ -434,66 +521,96 @@ def column_dependencies_internal(self) -> Iterable[Column]: def universe_dependencies(self) -> Iterable[Universe]: return [self.reindex_column.universe] + @cached_property + def universe(self) -> Universe: + return Universe() + @dataclass(eq=False, frozen=True) class IxContext(Context): """Context of `table.ix() operation.""" - orig_universe: Universe key_column: Column + orig_id_column: IdColumn optional: bool def column_dependencies_external(self) -> Iterable[Column]: - return [self.key_column] + return [self.orig_id_column, self.key_column] def universe_dependencies(self) -> Iterable[Universe]: - return [self.universe, self.orig_universe] + return [self.universe, self.orig_id_column.universe] + + @cached_property + def universe(self) -> Universe: + return self.key_column.universe @dataclass(eq=False, frozen=True) class IntersectContext(Context): """Context of `table.intersect() operation.""" - intersecting_universes: tuple[Universe, ...] + intersecting_ids: tuple[IdColumn, ...] def __post_init__(self): - assert len(self.intersecting_universes) > 0 + assert len(self.intersecting_ids) > 0 def universe_dependencies(self) -> Iterable[Universe]: - return self.intersecting_universes + return [c.universe for c in self.intersecting_ids] + + @cached_property + def universe(self) -> Universe: + return G.universe_solver.get_intersection( + *[c.universe for c in self.intersecting_ids] + ) @dataclass(eq=False, frozen=True) class RestrictContext(Context): """Context of `table.restrict() operation.""" - orig_universe: Universe + orig_id_column: IdColumn + _universe: Universe + + def column_dependencies_external(self) -> Iterable[Column]: + return [self.orig_id_column] def universe_dependencies(self) -> Iterable[Universe]: - return [self.orig_universe, self.universe] + return [self.orig_id_column.universe, self.universe] + + @cached_property + def universe(self) -> Universe: + return self._universe @dataclass(eq=False, frozen=True) class DifferenceContext(Context): """Context of `table.difference() operation.""" - left: Universe - right: Universe + left: IdColumn + right: IdColumn def universe_dependencies(self) -> Iterable[Universe]: - return [self.left, self.right] + return [self.left.universe, self.right.universe] + + @cached_property + def universe(self) -> Universe: + return G.universe_solver.get_difference(self.left.universe, self.right.universe) @dataclass(eq=False, frozen=True) class HavingContext(Context): - orig_universe: Universe + orig_id_column: IdColumn key_column: Column def column_dependencies_external(self) -> Iterable[Column]: return [self.key_column] def universe_dependencies(self) -> Iterable[Universe]: - return [self.orig_universe, self.key_column.universe] + return [self.orig_id_column.universe, self.key_column.universe] + + @cached_property + def universe(self) -> Universe: + return self.key_column.universe.subset() @dataclass(eq=False, frozen=True) @@ -501,41 +618,60 @@ class UpdateRowsContext(Context): """Context of `table.update_rows()` and related operations.""" updates: dict[str, Column] - union_universes: tuple[Universe, ...] + union_ids: tuple[IdColumn, ...] def __post_init__(self): - assert len(self.union_universes) > 0 + assert len(self.union_ids) > 0 def reference_column_dependencies(self, ref: ColumnReference) -> StableSet[Column]: return StableSet([self.updates[ref.name]]) def universe_dependencies(self) -> Iterable[Universe]: - return self.union_universes + return [c.universe for c in self.union_ids] + + @cached_property + def universe(self) -> Universe: + return G.universe_solver.get_union(*[c.universe for c in self.union_ids]) @dataclass(eq=False, frozen=True) -class UpdateCellsContext(UpdateRowsContext): +class UpdateCellsContext(Context): + left: IdColumn + right: IdColumn + updates: dict[str, Column] + def reference_column_dependencies(self, ref: ColumnReference) -> StableSet[Column]: if ref.name in self.updates: - return super().reference_column_dependencies(ref) + return StableSet([self.updates[ref.name]]) return StableSet() + def universe_dependencies(self) -> Iterable[Universe]: + return [self.left.universe, self.right.universe] + + @property + def universe(self) -> Universe: + return self.left.universe + @dataclass(eq=False, frozen=True) class ConcatUnsafeContext(Context): """Context of `table.concat_unsafe()`.""" updates: tuple[dict[str, Column], ...] - union_universes: tuple[Universe, ...] + union_ids: tuple[IdColumn, ...] def __post_init__(self): - assert len(self.union_universes) > 0 + assert len(self.union_ids) > 0 def reference_column_dependencies(self, ref: ColumnReference) -> StableSet[Column]: return StableSet([update[ref.name] for update in self.updates]) def universe_dependencies(self) -> Iterable[Universe]: - return self.union_universes + return [c.universe for c in self.union_ids] + + @cached_property + def universe(self) -> Universe: + return G.universe_solver.get_union(*[c.universe for c in self.union_ids]) @dataclass(eq=False, frozen=True) @@ -544,16 +680,24 @@ class PromiseSameUniverseContext( ): """Context of table.unsafe_promise_same_universe_as() operation.""" - orig_universe: Universe + orig_id_column: IdColumn + _id_column: IdColumn def universe_dependencies(self) -> Iterable[Universe]: - return [self.orig_universe, self.universe] + return [self.orig_id_column.universe, self.universe] + + @cached_property + def universe(self) -> Universe: + return self._id_column.universe @dataclass(eq=True, frozen=True) class JoinContext(Context): - """Context of `table.join() operation.""" + """Context for building inner table of a join, where all columns from left and right + are properly unrolled. Uses JoinTypeInterpreter to properly evaluate which columns + should be optionalized.""" + _universe: Universe left_table: pw.Table right_table: pw.Table on_left: ContextTable @@ -579,32 +723,37 @@ def intermediate_tables(self) -> Iterable[Table]: return [ _create_internal_table( self.on_left.columns, - self.on_left.universe, self.left_table._table_restricted_context, ), _create_internal_table( self.on_right.columns, - self.on_right.universe, self.right_table._table_restricted_context, ), ] + @cached_property + def universe(self) -> Universe: + return self._universe + @dataclass(eq=False, frozen=True) class JoinRowwiseContext(RowwiseContext): + """Context for actually evaluating join expressions.""" + temporary_column_to_original: dict[InternalColRef, InternalColRef] original_column_to_temporary: dict[InternalColRef, ColumnReference] @staticmethod def from_mapping( - universe: Universe, + id_column: IdColumn, columns_mapping: dict[InternalColRef, ColumnReference], ) -> JoinRowwiseContext: - temporary_column_to_original = {} - for orig_colref, expression in columns_mapping.items(): - temporary_column_to_original[expression._to_internal()] = orig_colref + temporary_column_to_original = { + expression._to_internal(): orig_colref + for orig_colref, expression in columns_mapping.items() + } return JoinRowwiseContext( - universe, temporary_column_to_original, columns_mapping.copy() + id_column, temporary_column_to_original, columns_mapping.copy() ) def _get_type_interpreter(self): @@ -620,15 +769,13 @@ class FlattenContext(Context): """Context of `table.flatten() operation.""" orig_universe: Universe - flatten_column: Column - flatten_result_column: MaterializedColumn + flatten_column: ColumnWithExpression def column_dependencies_external(self) -> Iterable[Column]: return [self.flatten_column] - @staticmethod - def get_flatten_column_dtype(flatten_column: ColumnWithExpression): - dtype = flatten_column.dtype + def _get_flatten_column_dtype(self): + dtype = self.flatten_column.dtype if isinstance(dtype, dt.List): return dtype.wrapped if isinstance(dtype, dt.Tuple): @@ -645,12 +792,25 @@ def get_flatten_column_dtype(flatten_column: ColumnWithExpression): return dt.ANY else: raise TypeError( - f"Cannot flatten column {flatten_column.expression!r} of type {dtype}." + f"Cannot flatten column {self.flatten_column.expression!r} of type {dtype}." ) def universe_dependencies(self) -> Iterable[Universe]: return [self.orig_universe] + @cached_property + def universe(self) -> Universe: + return Universe() + + @cached_property + def flatten_result_column(self) -> Column: + return MaterializedColumn( + self.universe, + cp.ColumnProperties( + dtype=self._get_flatten_column_dtype(), + ), + ) + @dataclass(eq=False, frozen=True) class SortingContext(Context): @@ -658,11 +818,25 @@ class SortingContext(Context): key_column: ColumnWithExpression instance_column: ColumnWithExpression - prev_column: MaterializedColumn - next_column: MaterializedColumn def column_dependencies_internal(self) -> Iterable[Column]: return [self.key_column, self.instance_column] def universe_dependencies(self) -> Iterable[Universe]: return [self.universe] + + @cached_property + def universe(self) -> Universe: + return self.key_column.universe + + @cached_property + def prev_column(self) -> Column: + return MaterializedColumn( + self.universe, cp.ColumnProperties(dtype=dt.Optional(dt.POINTER)) + ) + + @cached_property + def next_column(self) -> Column: + return MaterializedColumn( + self.universe, cp.ColumnProperties(dtype=dt.Optional(dt.POINTER)) + ) diff --git a/python/pathway/internals/column_properties.py b/python/pathway/internals/column_properties.py index f748889a..10de3998 100644 --- a/python/pathway/internals/column_properties.py +++ b/python/pathway/internals/column_properties.py @@ -40,5 +40,4 @@ def _has_property(self, column: clmn.ColumnWithContext, name: str, value: Any): return all( getattr(col.properties, name) == value for col in column.column_dependencies() - if col != column ) diff --git a/python/pathway/internals/desugaring.py b/python/pathway/internals/desugaring.py index e2bf22ad..8c4e6c48 100644 --- a/python/pathway/internals/desugaring.py +++ b/python/pathway/internals/desugaring.py @@ -131,12 +131,9 @@ def eval_require( class TableCallbackDesugaring(DesugaringTransform): - table_like: table.TableLike - - def __init__(self, table_like: table.TableLike): - from pathway.internals import table + table_like: table.TableLike | groupby.GroupedJoinable - assert isinstance(table_like, table.TableLike) + def __init__(self, table_like: table.TableLike | groupby.GroupedJoinable): self.table_like = table_like @abstractmethod diff --git a/python/pathway/internals/graph_runner/__init__.py b/python/pathway/internals/graph_runner/__init__.py index b05596d2..ab41c3af 100644 --- a/python/pathway/internals/graph_runner/__init__.py +++ b/python/pathway/internals/graph_runner/__init__.py @@ -37,8 +37,6 @@ def __init__( default_logging: bool = True, persistence_config: PersistenceConfig | None = None, ) -> None: - from pathway.debug import stream_generator - self._graph = input_graph self.debug = debug if ignore_asserts is None: @@ -47,11 +45,7 @@ def __init__( self.monitoring_level = monitoring_level self.with_http_server = with_http_server self.default_logging = default_logging - self.persistence_config = ( - persistence_config - or environ.get_replay_config() - or stream_generator.persistence_config() - ) + self.persistence_config = persistence_config or environ.get_replay_config() def run_tables( self, @@ -102,7 +96,6 @@ def logic( for operator in context.nodes if isinstance(operator, ContextualizedIntermediateOperator) ] - monitoring_level = self.monitoring_level.to_internal() with new_event_loop() as event_loop, monitor_stats( diff --git a/python/pathway/internals/graph_runner/expression_evaluator.py b/python/pathway/internals/graph_runner/expression_evaluator.py index d8ef92be..71a0e03e 100644 --- a/python/pathway/internals/graph_runner/expression_evaluator.py +++ b/python/pathway/internals/graph_runner/expression_evaluator.py @@ -1001,7 +1001,7 @@ def run_join(self, universe: univ.Universe, *input_storages: Storage) -> Storage def run(self, output_storage: Storage, *input_storages: Storage) -> api.Table: joined_storage = self.run_join(self.context.universe, *input_storages) rowwise_evaluator = RowwiseEvaluator( - clmn.RowwiseContext(self.context.universe), + clmn.RowwiseContext(self.context.id_column), self.scope, self.state, self.scope_context, @@ -1117,7 +1117,7 @@ def run(self, output_storage: Storage, *input_storages: Storage) -> api.Table: return self.scope.update_rows_table(input_table, update_input_table, properties) -class UpdateCellsEvaluator(UpdateRowsEvaluator, context_type=clmn.UpdateCellsContext): +class UpdateCellsEvaluator(ExpressionEvaluator, context_type=clmn.UpdateCellsContext): context: clmn.UpdateCellsContext def run(self, output_storage: Storage, *input_storages: Storage) -> api.Table: diff --git a/python/pathway/internals/graph_runner/path_evaluator.py b/python/pathway/internals/graph_runner/path_evaluator.py index e72cbb2e..55dcd2ef 100644 --- a/python/pathway/internals/graph_runner/path_evaluator.py +++ b/python/pathway/internals/graph_runner/path_evaluator.py @@ -162,7 +162,7 @@ def compute( ) -> Storage: context = self.context output_columns_list = list(output_columns) - source_universe = context.union_universes[0] + source_universe = context.union_ids[0].universe # ensure that keeping structure is possible, # i.e. all sources have the same path to required columns keep_structure = True @@ -328,7 +328,7 @@ def compute( input_storages: dict[Universe, Storage], ) -> Storage: paths: dict[clmn.Column, ColumnPath] = {} - orig_storage = input_storages[self.context.orig_universe] + orig_storage = input_storages[self.context.orig_id_column.universe] new_storage = input_storages[self.context.universe] for column in output_columns: if ( diff --git a/python/pathway/internals/graph_runner/state.py b/python/pathway/internals/graph_runner/state.py index 8032975f..79c943e2 100644 --- a/python/pathway/internals/graph_runner/state.py +++ b/python/pathway/internals/graph_runner/state.py @@ -4,7 +4,6 @@ from collections.abc import Callable, Iterable -import pathway.internals.graph_runner.expression_evaluator as evaluator from pathway.internals import api, column, table, universe from pathway.internals.column_path import ColumnPath from pathway.internals.graph_runner.path_storage import Storage @@ -19,7 +18,6 @@ class ScopeState: legacy_tables: dict[table.Table, api.LegacyTable] universes: dict[universe.Universe, api.Universe] computers: list[Callable] - evaluators: dict[column.Context, evaluator.ExpressionEvaluator] tables: dict[universe.Universe, api.Table] storages: dict[universe.Universe, Storage] @@ -28,7 +26,6 @@ def __init__(self, scope: api.Scope) -> None: self.columns = {} self.universes = {} self.computers = [] - self.evaluators = {} self.legacy_tables = {} self.tables = {} self.storages = {} @@ -142,16 +139,6 @@ def add_computer_logic(self, computer_callback: Callable) -> int: def get_computer_logic(self, id: int) -> Callable: return self.computers[id] - def get_or_create_evaluator( - self, - context: column.Context, - evaluator_factory: Callable[[column.Context], evaluator.ExpressionEvaluator], - ): - if context not in self.evaluators: - evaluator = evaluator_factory(context) - self.evaluators[context] = evaluator - return self.evaluators[context] - def get_table(self, key: Storage) -> api.Table: return self.tables[key._universe] diff --git a/python/pathway/internals/groupby.py b/python/pathway/internals/groupby.py index 0aae30ec..8396f5da 100644 --- a/python/pathway/internals/groupby.py +++ b/python/pathway/internals/groupby.py @@ -8,7 +8,6 @@ from functools import lru_cache from typing import TYPE_CHECKING -from pathway.internals import universes from pathway.internals.expression_visitor import IdentityTransform from pathway.internals.trace import trace_user_frame @@ -17,7 +16,7 @@ import pathway.internals.column as clmn import pathway.internals.expression as expr -from pathway.internals import table, table_like, thisclass +from pathway.internals import table, thisclass from pathway.internals.arg_handlers import arg_handler, reduce_args_handler from pathway.internals.decorators import contextualized_operator from pathway.internals.desugaring import ( @@ -34,14 +33,15 @@ from pathway.internals.universe import Universe -class GroupedJoinable(DesugaringContext, table_like.TableLike, OperatorInput): +class GroupedJoinable(DesugaringContext, OperatorInput): _substitution: dict[thisclass.ThisMetaclass, table.Joinable] _joinable_to_group: table.Joinable + _universe: Universe def __init__(self, _universe: Universe, _substitution, _joinable: table.Joinable): - super().__init__(_universe) self._substitution = _substitution self._joinable_to_group = _joinable + self._universe = _universe @property def _desugaring(self) -> TableReduceDesugaring: @@ -198,13 +198,11 @@ def reduce( def _reduce(self, **kwargs: expr.ColumnExpression) -> table.Table: reduced_columns: dict[str, clmn.ColumnWithExpression] = {} - universe = Universe() context = clmn.GroupedContext( table=self._joinable_to_group, - universe=universe, grouping_columns=tuple(self._grouping_columns), set_id=self._set_id, - inner_context=self._joinable_to_group._context, + inner_context=self._joinable_to_group._rowwise_context, sort_by=self._sort_by, ) @@ -214,10 +212,9 @@ def _reduce(self, **kwargs: expr.ColumnExpression) -> table.Table: result: table.Table = table.Table( columns=reduced_columns, - universe=universe, - id_column=clmn.IdColumn(context), + context=context, ) - universes.promise_are_equal(result, self) + G.universe_solver.register_as_equal(self._universe, result._universe) return result def _validate_expression(self, expression: expr.ColumnExpression): diff --git a/python/pathway/internals/join.py b/python/pathway/internals/join.py index a42d1b86..2679476c 100644 --- a/python/pathway/internals/join.py +++ b/python/pathway/internals/join.py @@ -487,7 +487,7 @@ class JoinResult(Joinable, OperatorInput): def __init__( self, - _universe: Universe, + _context: clmn.Context, _inner_table: Table, _columns_mapping: dict[expr.InternalColRef, expr.ColumnReference], _left_table: Table, @@ -498,7 +498,7 @@ def __init__( _joined_on_names: StableSet[str], _join_mode: JoinMode, ): - super().__init__(_universe) + super().__init__(_context) self._inner_table = _inner_table self._columns_mapping = _columns_mapping self._left_table = _left_table @@ -658,17 +658,19 @@ def filter(self, filter_expression: expr.ColumnExpression) -> JoinResult: filter_expression ) inner_table = self._inner_table.filter(desugared_filter_expression) - new_columns_mapping = {} - for int_ref, expression in self._columns_mapping.items(): - new_columns_mapping[int_ref] = inner_table[expression.name] + new_columns_mapping = { + int_ref: inner_table[expression.name] + for int_ref, expression in self._columns_mapping.items() + } new_columns_mapping[inner_table.id._to_internal()] = inner_table.id - inner_table._context = clmn.JoinRowwiseContext.from_mapping( - inner_table._universe, new_columns_mapping - ) # FIXME don't set _context property of table + context = clmn.JoinRowwiseContext.from_mapping( + inner_table._id_column, new_columns_mapping + ) + inner_table._rowwise_context = context return JoinResult( - _universe=inner_table._universe, + _context=context, _inner_table=inner_table, _columns_mapping=new_columns_mapping, _left_table=self._left_table, @@ -817,8 +819,7 @@ def _join( return Table( columns=columns, - universe=context.universe, - id_column=clmn.IdColumn(context), + context=context, ) @staticmethod @@ -861,9 +862,12 @@ def _prepare_inner_table_with_mapping( final_mapping[colref._to_internal()] = colref final_mapping[inner_table.id._to_internal()] = inner_table.id - inner_table._context = clmn.JoinRowwiseContext.from_mapping( - inner_table._universe, final_mapping - ) # FIXME don't set _context property of table + rowwise_context = clmn.JoinRowwiseContext.from_mapping( + inner_table._id_column, final_mapping + ) + inner_table._rowwise_context = ( + rowwise_context # FIXME don't set _context property of table + ) return (inner_table, final_mapping) @@ -964,7 +968,7 @@ def _table_join( common_column_names, ) return JoinResult( - universe, + context, inner_table, columns_mapping, left_table, diff --git a/python/pathway/internals/schema.py b/python/pathway/internals/schema.py index 998b817a..a77d3298 100644 --- a/python/pathway/internals/schema.py +++ b/python/pathway/internals/schema.py @@ -34,11 +34,14 @@ def schema_from_columns( ) -> type[Schema]: if _name is None: _name = "schema_from_columns(" + str(list(columns.keys())) + ")" - __dict = { - "__metaclass__": SchemaMetaclass, - "__annotations__": {name: c.dtype for name, c in columns.items()}, - } - return _schema_builder(_name, __dict) + + return schema_builder( + columns={ + name: ColumnDefinition.from_properties(c.properties) + for name, c in columns.items() + }, + name=_name, + ) def _type_converter(series: pd.Series) -> dt.DType: @@ -210,6 +213,18 @@ def _get_column_property(property_name: str, default: Any) -> Any: return columns +def _universe_properties( + columns: list[ColumnSchema], schema_properties: SchemaProperties +) -> ColumnProperties: + append_only: bool = False + if len(columns) > 0: + append_only = any(c.append_only for c in columns) + elif schema_properties.append_only is not None: + append_only = schema_properties.append_only + + return ColumnProperties(dtype=dt.POINTER, append_only=append_only) + + @dataclass(frozen=True) class SchemaProperties: append_only: bool | None = None @@ -219,13 +234,15 @@ class SchemaMetaclass(type): __columns__: dict[str, ColumnSchema] __dtypes__: dict[str, dt.DType] __types__: dict[str, Any] + __universe_properties__: ColumnProperties @trace.trace_user_frame def __init__(self, *args, append_only: bool | None = None, **kwargs) -> None: super().__init__(*args, **kwargs) - - self.__columns__ = _create_column_definitions( - self, SchemaProperties(append_only=append_only) + schema_properties = SchemaProperties(append_only=append_only) + self.__columns__ = _create_column_definitions(self, schema_properties) + self.__universe_properties__ = _universe_properties( + list(self.__columns__.values()), schema_properties ) self.__dtypes__ = { name: column.dtype for name, column in self.__columns__.items() @@ -241,6 +258,10 @@ def columns(self) -> Mapping[str, ColumnSchema]: def column_names(self) -> list[str]: return list(self.keys()) + @property + def universe_properties(self) -> ColumnProperties: + return self.__universe_properties__ + def column_properties(self, name: str) -> ColumnProperties: column = self.__columns__[name] return ColumnProperties(dtype=column.dtype, append_only=column.append_only) @@ -510,6 +531,10 @@ class ColumnDefinition: def __post_init__(self): assert self.dtype is None or isinstance(self.dtype, dt.DType) + @classmethod + def from_properties(cls, properties: ColumnProperties) -> ColumnDefinition: + return cls(dtype=properties.dtype, append_only=properties.append_only) + def column_definition( *, diff --git a/python/pathway/internals/table.py b/python/pathway/internals/table.py index 79a18a3a..f7cfc1f3 100644 --- a/python/pathway/internals/table.py +++ b/python/pathway/internals/table.py @@ -17,7 +17,6 @@ reduce_args_handler, select_args_handler, ) -from pathway.internals.column_properties import ColumnProperties from pathway.internals.decorators import ( contextualized_operator, empty_from_schema, @@ -99,27 +98,26 @@ class Table( ) _columns: dict[str, clmn.Column] - _context: clmn.RowwiseContext _schema: type[Schema] _id_column: clmn.IdColumn + _rowwise_context: clmn.RowwiseContext _source: SetOnceProperty[OutputHandle] = SetOnceProperty() """Lateinit by operator.""" def __init__( self, columns: Mapping[str, clmn.Column], - universe: Universe, + context: clmn.Context, schema: type[Schema] | None = None, - id_column: clmn.IdColumn | None = None, ): if schema is None: schema = schema_from_columns(columns) - super().__init__(universe) + super().__init__(context) self._columns = dict(columns) self._schema = schema - self._context = clmn.RowwiseContext(self._universe) - self._id_column = id_column or clmn.IdColumn(self._context) + self._id_column = context.id_column self._substitution = {thisclass.this: self} + self._rowwise_context = clmn.RowwiseContext(self._id_column) @property def id(self) -> expr.ColumnReference: @@ -520,8 +518,7 @@ def _filter(self, filter_expression: expr.ColumnExpression) -> Table[TSchema]: filtering_column = self._eval(filter_expression) assert self._universe == filtering_column.universe - universe = self._universe.subset() - context = clmn.FilterContext(universe, filtering_column, self._universe) + context = clmn.FilterContext(filtering_column, self._id_column) return self._table_with_context(context) @@ -550,23 +547,14 @@ def __gradual_broadcast( value_column, upper_column, ): - apx_value = clmn.MaterializedColumn( - self._universe, ColumnProperties(dtype=dt.FLOAT) - ) - context = clmn.GradualBroadcastContext( - self._universe, + self._id_column, threshold_table._eval(lower_column), threshold_table._eval(value_column), threshold_table._eval(upper_column), - apx_value_column=apx_value, ) - return Table( - columns={"apx_value": apx_value}, - universe=context.universe, - id_column=clmn.IdColumn(context), - ) + return Table(columns={"apx_value": context.apx_value_column}, context=context) @trace_user_frame @desugar @@ -578,10 +566,8 @@ def _forget( time_column: expr.ColumnExpression, mark_forgetting_records: bool, ) -> Table: - universe = self._universe.subset() context = clmn.ForgetContext( - universe, - self._universe, + self._id_column, self._eval(threshold_column), self._eval(time_column), mark_forgetting_records, @@ -595,11 +581,7 @@ def _forget( def _forget_immediately( self, ) -> Table: - universe = self._universe.subset() - context = clmn.ForgetImmediatelyContext( - universe, - self._universe, - ) + context = clmn.ForgetImmediatelyContext(self._id_column) return self._table_with_context(context) @trace_user_frame @@ -609,14 +591,10 @@ def _forget_immediately( def _filter_out_results_of_forgetting( self, ) -> Table: - universe = self._universe.superset() # The output universe is a superset of input universe because forgetting entries # are filtered out. At each point in time, the set of keys with +1 diff can be # bigger than a set of keys with +1 diff in an input table. - context = clmn.FilterOutForgettingContext( - universe, - self._universe, - ) + context = clmn.FilterOutForgettingContext(self._id_column) return self._table_with_context(context) @trace_user_frame @@ -628,10 +606,8 @@ def _freeze( threshold_column: expr.ColumnExpression, time_column: expr.ColumnExpression, ) -> Table: - universe = self._universe.subset() context = clmn.FreezeContext( - universe, - self._universe, + self._id_column, self._eval(threshold_column), self._eval(time_column), ) @@ -646,10 +622,8 @@ def _buffer( threshold_column: expr.ColumnExpression, time_column: expr.ColumnExpression, ) -> Table: - universe = self._universe.subset() context = clmn.BufferContext( - universe, - self._universe, + self._id_column, self._eval(threshold_column), self._eval(time_column), ) @@ -687,11 +661,9 @@ def difference(self, other: Table) -> Table[TSchema]: age | owner | pet 10 | Alice | 1 """ - universe = G.universe_solver.get_difference(self._universe, other._universe) context = clmn.DifferenceContext( - universe=universe, - left=self._universe, - right=other._universe, + left=self._id_column, + right=other._id_column, ) return self._table_with_context(context) @@ -734,14 +706,14 @@ def intersect(self, *tables: Table) -> Table[TSchema]: ) universe = G.universe_solver.get_intersection(*intersecting_universes) if universe in intersecting_universes: - context: clmn.Context = clmn.RestrictContext( - universe=universe, - orig_universe=self._universe, - ) + context: clmn.Context = clmn.RestrictContext(self._id_column, universe) else: + intersecting_ids = ( + self._id_column, + *tuple(table._id_column for table in tables), + ) context = clmn.IntersectContext( - universe=universe, - intersecting_universes=intersecting_universes, + intersecting_ids=intersecting_ids, ) return self._table_with_context(context) @@ -789,10 +761,7 @@ def restrict(self, other: TableLike) -> Table[TSchema]: "Table.restrict(): other universe has to be a subset of self universe." + "Consider using Table.promise_universe_is_subset_of() to assert it." ) - context = clmn.RestrictContext( - universe=other._universe, - orig_universe=self._universe, - ) + context = clmn.RestrictContext(self._id_column, other._universe) columns = { name: self._wrap_column_in_context(context, column, name) @@ -801,8 +770,7 @@ def restrict(self, other: TableLike) -> Table[TSchema]: return Table( columns=columns, - universe=other._universe, - id_column=clmn.IdColumn(context), + context=context, ) @contextualized_operator @@ -832,14 +800,11 @@ def copy(self) -> Table[TSchema]: """ columns = { - name: self._wrap_column_in_context(self._context, column, name) + name: self._wrap_column_in_context(self._rowwise_context, column, name) for name, column in self._columns.items() } - return Table( - columns=columns, - universe=self._universe, - ) + return Table(columns=columns, context=self._rowwise_context) @trace_user_frame @desugar @@ -1041,11 +1006,8 @@ def _ix( key_expression: expr.ColumnReference, optional: bool, ) -> Table: - key_universe_table = key_expression._table - universe = key_universe_table._universe key_column = key_expression._column - - context = clmn.IxContext(universe, self._universe, key_column, optional) + context = clmn.IxContext(key_column, self._id_column, optional) return self._table_with_context(context) @@ -1160,32 +1122,21 @@ def concat(self, *others: Table[TSchema]) -> Table[TSchema]: @trace_user_frame @contextualized_operator def _concat(self, *others: Table[TSchema]) -> Table[TSchema]: - union_universes = (self._universe, *(other._universe for other in others)) - if not G.universe_solver.query_are_disjoint(*union_universes): + union_ids = (self._id_column, *(other._id_column for other in others)) + if not G.universe_solver.query_are_disjoint(*(c.universe for c in union_ids)): raise ValueError( "Universes of the arguments of Table.concat() have to be disjoint.\n" + "Consider using Table.promise_universes_are_disjoint() to assert it.\n" + "(However, untrue assertion might result in runtime errors.)" ) - universe = G.universe_solver.get_union(*union_universes) context = clmn.ConcatUnsafeContext( - universe=universe, - union_universes=union_universes, + union_ids=union_ids, updates=tuple( {col_name: other._columns[col_name] for col_name in self.keys()} for other in others ), ) - columns = { - name: self._wrap_column_in_context(context, column, name) - for name, column in self._columns.items() - } - ret: Table = Table( - columns=columns, - universe=universe, - id_column=clmn.IdColumn(context), - ) - return ret + return self._table_with_context(context) @trace_user_frame @runtime_type_check @@ -1259,24 +1210,12 @@ def _update_cells(self, other: Table) -> Table: + "Consider using Table.promise_is_subset_of() to assert this.\n" + "(However, untrue assertion might result in runtime errors.)" ) - - union_universes = [self._universe] - if other._universe != self._universe: - union_universes.append(other._universe) context = clmn.UpdateCellsContext( - universe=self._universe, - union_universes=tuple(union_universes), + left=self._id_column, + right=other._id_column, updates={name: other._columns[name] for name in other.keys()}, ) - columns = { - name: self._wrap_column_in_context(context, column, name) - for name, column in self._columns.items() - } - return Table( - columns=columns, - universe=self._universe, - id_column=clmn.IdColumn(context), - ) + return self._table_with_context(context) @trace_user_frame @runtime_type_check @@ -1332,36 +1271,27 @@ def update_rows(self, other: Table[TSchema]) -> Table[TSchema]: key: dt.types_lca(self.schema.__dtypes__[key], other.schema.__dtypes__[key]) for key in self.keys() } - return Table._update_rows( - self.cast_to_types(**schema), other.cast_to_types(**schema) - ) + union_universes = (self._universe, other._universe) + universe = G.universe_solver.get_union(*union_universes) + if universe == self._universe: + return Table._update_cells( + self.cast_to_types(**schema), other.cast_to_types(**schema) + ) + else: + return Table._update_rows( + self.cast_to_types(**schema), other.cast_to_types(**schema) + ) @trace_user_frame @contextualized_operator @runtime_type_check def _update_rows(self, other: Table[TSchema]) -> Table[TSchema]: - union_universes = (self._universe, other._universe) - universe = G.universe_solver.get_union(*union_universes) - context_cls = ( - clmn.UpdateCellsContext - if universe == self._universe - else clmn.UpdateRowsContext - ) - context = context_cls( - universe=universe, - union_universes=union_universes, + union_ids = (self._id_column, other._id_column) + context = clmn.UpdateRowsContext( updates={col_name: other._columns[col_name] for col_name in self.keys()}, + union_ids=union_ids, ) - columns = { - name: self._wrap_column_in_context(context, column, name) - for name, column in self._columns.items() - } - ret: Table = Table( - columns=columns, - universe=universe, - id_column=clmn.IdColumn(context), - ) - return ret + return self._table_with_context(context) @trace_user_frame @desugar @@ -1497,19 +1427,9 @@ def _with_new_index( reindex_column = self._eval(new_index) assert self._universe == reindex_column.universe - universe = Universe() - context = clmn.ReindexContext(universe, reindex_column) + context = clmn.ReindexContext(reindex_column) - columns = { - name: self._wrap_column_in_context(context, column, name) - for name, column in self._columns.items() - } - - return Table( - columns=columns, - universe=universe, - id_column=clmn.IdColumn(context), - ) + return self._table_with_context(context) @trace_user_frame @desugar @@ -1559,7 +1479,9 @@ def rename_columns(self, **kwargs: str | expr.ColumnReference) -> Table: columns_wrapped = { name: self._wrap_column_in_context( - self._context, column, mapping[name] if name in mapping else name + self._rowwise_context, + column, + mapping[name] if name in mapping else name, ) for name, column in renamed_columns.items() } @@ -1704,7 +1626,7 @@ def without(self, *columns: str | expr.ColumnReference) -> Table: assert isinstance(col, str) new_columns.pop(col) columns_wrapped = { - name: self._wrap_column_in_context(self._context, column, name) + name: self._wrap_column_in_context(self._rowwise_context, column, name) for name, column in new_columns.items() } return self._with_same_universe(*columns_wrapped.items()) @@ -1760,11 +1682,9 @@ def cast_to_types(self, **kwargs: Any) -> Table: @contextualized_operator @runtime_type_check def _having(self, indexer: expr.ColumnReference) -> Table[TSchema]: - universe = indexer.table._universe.subset() context = clmn.HavingContext( - universe=universe, orig_universe=self._universe, key_column=indexer._column + orig_id_column=self._id_column, key_column=indexer._column ) - return self._table_with_context(context) @trace_user_frame @@ -1856,18 +1776,9 @@ def _flatten( flatten_column = self._columns[flatten_name] assert isinstance(flatten_column, clmn.ColumnWithExpression) - universe = Universe() - flatten_result_column = clmn.MaterializedColumn( - universe, - ColumnProperties( - dtype=clmn.FlattenContext.get_flatten_column_dtype(flatten_column), - ), - ) context = clmn.FlattenContext( - universe=universe, orig_universe=self._universe, flatten_column=flatten_column, - flatten_result_column=flatten_result_column, ) columns = { @@ -1878,11 +1789,10 @@ def _flatten( return Table( columns={ - flatten_name: flatten_result_column, + flatten_name: context.flatten_result_column, **columns, }, - universe=universe, - id_column=clmn.IdColumn(context), + context=context, ) @trace_user_frame @@ -1944,33 +1854,23 @@ def sort( ^RT0AZWX... | David | 35 | 90 | ^EDPSSB1... | ^T0B95XH... | Eve | 15 | 80 | | ^GBSDEEW... """ - if not isinstance(instance, expr.ColumnExpression): - instance = expr.ColumnConstExpression(instance) - prev_column = clmn.MaterializedColumn( - self._universe, ColumnProperties(dtype=dt.Optional(dt.POINTER)) - ) - next_column = clmn.MaterializedColumn( - self._universe, ColumnProperties(dtype=dt.Optional(dt.POINTER)) - ) + instance = clmn.ColumnExpression._wrap(instance) context = clmn.SortingContext( - self._universe, self._eval(key), self._eval(instance), - prev_column, - next_column, ) return Table( columns={ - "prev": prev_column, - "next": next_column, + "prev": context.prev_column, + "next": context.next_column, }, - universe=self._universe, - id_column=clmn.IdColumn(context), + context=context, ) def _set_source(self, source: OutputHandle): self._source = source - self._id_column.lineage = clmn.ColumnLineage(name="id", source=source) + if not hasattr(self._id_column, "lineage"): + self._id_column.lineage = clmn.ColumnLineage(name="id", source=source) for name, column in self._columns.items(): if not hasattr(column, "lineage"): column.lineage = clmn.ColumnLineage(name=name, source=source) @@ -1980,17 +1880,8 @@ def _set_source(self, source: OutputHandle): @contextualized_operator def _unsafe_promise_universe(self, other: TableLike) -> Table: - context = clmn.PromiseSameUniverseContext(other._universe, self._universe) - columns = { - name: self._wrap_column_in_context(context, column, name) - for name, column in self._columns.items() - } - - return Table( - columns=columns, - universe=context.universe, - id_column=clmn.IdColumn(context), - ) + context = clmn.PromiseSameUniverseContext(self._id_column, other._id_column) + return self._table_with_context(context) def _validate_expression(self, expression: expr.ColumnExpression): for dep in expression._dependencies_above_reducer(): @@ -2027,20 +1918,19 @@ def _table_with_context(self, context: clmn.Context) -> Table: return Table( columns=columns, - universe=context.universe, - id_column=clmn.IdColumn(context), + context=context, ) @functools.cached_property def _table_restricted_context(self) -> clmn.TableRestrictedRowwiseContext: - return clmn.TableRestrictedRowwiseContext(self._universe, self) + return clmn.TableRestrictedRowwiseContext(self._id_column, self) def _eval( self, expression: expr.ColumnExpression, context: clmn.Context | None = None ) -> clmn.ColumnWithExpression: """Desugar expression and wrap it in given context.""" if context is None: - context = self._context + context = self._rowwise_context column = expression._column_with_expression_cls( context=context, universe=context.universe, @@ -2051,6 +1941,7 @@ def _eval( @classmethod def _from_schema(cls, schema: type[Schema]) -> Table: universe = Universe() + context = clmn.MaterializedContext(universe, schema.universe_properties) columns = { name: clmn.MaterializedColumn( universe, @@ -2058,7 +1949,7 @@ def _from_schema(cls, schema: type[Schema]) -> Table: ) for name in schema.column_names() } - return cls(columns=columns, universe=universe, schema=schema) + return cls(columns=columns, schema=schema, context=context) def __repr__(self) -> str: return f"" @@ -2070,9 +1961,8 @@ def _with_same_universe( ) -> Table: return Table( columns=dict(columns), - universe=self._universe, schema=schema, - id_column=clmn.IdColumn(self._context), + context=self._rowwise_context, ) def _sort_columns_by_other(self, other: Table): @@ -2093,14 +1983,15 @@ def to(self, sink: DataSink) -> None: table_to_datasink(self, sink) def _materialize(self, universe: Universe): + context = clmn.MaterializedContext(universe) columns = { name: clmn.MaterializedColumn(universe, column.properties) for (name, column) in self._columns.items() } return Table( columns=columns, - universe=universe, schema=self.schema, + context=context, ) @trace_user_frame @@ -2237,7 +2128,7 @@ def typehints(self) -> Mapping[str, Any]: def eval_type(self, expression: expr.ColumnExpression) -> dt.DType: return ( - self._context._get_type_interpreter() + self._rowwise_context._get_type_interpreter() .eval_expression(expression, state=TypeInterpreterState()) ._dtype ) diff --git a/python/pathway/internals/table_like.py b/python/pathway/internals/table_like.py index ab119781..68611b02 100644 --- a/python/pathway/internals/table_like.py +++ b/python/pathway/internals/table_like.py @@ -4,7 +4,7 @@ from typing import TypeVar -from pathway.internals import universes +from pathway.internals import column as clmn, universes from pathway.internals.deprecation_meta import DeprecationSuperclass from pathway.internals.runtime_type_check import runtime_type_check from pathway.internals.universe import Universe @@ -37,9 +37,13 @@ class TableLike(DeprecationSuperclass): """ _universe: Universe + _context: clmn.Context + _id_column: clmn.IdColumn - def __init__(self, universe: Universe): - self._universe = universe + def __init__(self, context: clmn.Context): + self._context = context + self._universe = context.universe + self._id_column = context.id_column @runtime_type_check def promise_universes_are_disjoint( diff --git a/python/pathway/internals/type_interpreter.py b/python/pathway/internals/type_interpreter.py index b8ec6b95..a8377165 100644 --- a/python/pathway/internals/type_interpreter.py +++ b/python/pathway/internals/type_interpreter.py @@ -526,6 +526,11 @@ def eval_unwrap( class JoinTypeInterpreter(TypeInterpreter): + """This type interpreter is used by JoinContext. + It evaluates only column references, and is used to decide which columns + to optionalize when unrolling left and right table columns to internal table columns. + """ + left: Table right: Table optionalize_left: bool @@ -553,6 +558,10 @@ def _eval_column_val( class JoinRowwiseTypeInterpreter(TypeInterpreter): + """Type interpreter for evaluating expressions in join. + Colrefs are already properly optionalized (dependning on type of join and + left/right table) and properly unrolled and stored in internal table.""" + temporary_column_to_original: dict[expr.InternalColRef, expr.InternalColRef] original_column_to_temporary: dict[expr.InternalColRef, expr.ColumnReference] diff --git a/python/pathway/internals/universe_solver.py b/python/pathway/internals/universe_solver.py index 24f0802a..07b11f0f 100644 --- a/python/pathway/internals/universe_solver.py +++ b/python/pathway/internals/universe_solver.py @@ -19,6 +19,10 @@ def __init__(self): self.var_counter = itertools.count(start=1) self.universe_vars = defaultdict(lambda: next(self.var_counter)) + def register_as_equal(self, left: Universe, right: Universe) -> None: + self.register_as_subset(left, right) + self.register_as_subset(right, left) + def register_as_subset(self, subset: Universe, superset: Universe) -> None: varA = self.universe_vars[subset] varB = self.universe_vars[superset] diff --git a/python/pathway/internals/universes.py b/python/pathway/internals/universes.py index 83ac7e88..dec0ad58 100644 --- a/python/pathway/internals/universes.py +++ b/python/pathway/internals/universes.py @@ -126,5 +126,4 @@ def promise_are_equal(self: TableLike, *others: TableLike) -> None: 15 | Alice | tortoise """ for other in others: - promise_is_subset_of(self, other) - promise_is_subset_of(other, self) + G.universe_solver.register_as_equal(self._universe, other._universe) diff --git a/python/pathway/io/_utils.py b/python/pathway/io/_utils.py index 46e31ce5..6fe5d582 100644 --- a/python/pathway/io/_utils.py +++ b/python/pathway/io/_utils.py @@ -227,6 +227,21 @@ def read_schema( ) +def assert_schema_or_value_columns_not_none( + schema: type[Schema] | None, + value_columns: list[str] | None, + data_format_type: str | None = None, +): + if schema is None and value_columns is None: + if data_format_type == "dsv": + raise ValueError( + "Neither schema nor value_columns were specified. " + "Consider using `pw.schema_from_csv` for generating schema from a CSV file" + ) + else: + raise ValueError("Neither schema nor value_columns were specified") + + def construct_schema_and_data_format( format: str, *, @@ -272,6 +287,8 @@ def construct_schema_and_data_format( parse_utf8=(format != "binary"), ) + assert_schema_or_value_columns_not_none(schema, value_columns, data_format_type) + if with_metadata: if schema is not None: schema |= MetadataSchema diff --git a/python/pathway/io/fs/__init__.py b/python/pathway/io/fs/__init__.py index 3f73bd9e..b0aee080 100644 --- a/python/pathway/io/fs/__init__.py +++ b/python/pathway/io/fs/__init__.py @@ -226,7 +226,6 @@ def read( mode=internal_connector_mode(mode), object_pattern=object_pattern, persistent_id=persistent_id, - with_metadata=with_metadata, ) else: data_storage = api.DataStorage( @@ -236,7 +235,6 @@ def read( read_method=internal_read_method(format), object_pattern=object_pattern, persistent_id=persistent_id, - with_metadata=with_metadata, ) schema, data_format = construct_schema_and_data_format( diff --git a/python/pathway/io/http/_server.py b/python/pathway/io/http/_server.py index bdff90ad..880a44ba 100644 --- a/python/pathway/io/http/_server.py +++ b/python/pathway/io/http/_server.py @@ -6,6 +6,7 @@ from collections.abc import Callable from typing import Any from uuid import uuid4 +from warnings import warn from aiohttp import web @@ -18,7 +19,7 @@ class RestServerSubject(io.python.ConnectorSubject): _host: str _port: int _loop: asyncio.AbstractEventLoop - _delete_queries: bool + _delete_completed_queries: bool def __init__( self, @@ -28,7 +29,7 @@ def __init__( loop: asyncio.AbstractEventLoop, tasks: dict[Any, Any], schema: type[pw.Schema], - delete_queries: bool, + delete_completed_queries: bool, format: str = "raw", ) -> None: super().__init__() @@ -38,7 +39,7 @@ def __init__( self._loop = loop self._tasks = tasks self._schema = schema - self._delete_queries = delete_queries + self._delete_completed_queries = delete_completed_queries self._format = format def run(self): @@ -75,7 +76,7 @@ async def handle(self, request: web.Request): self._add(id, data) response = await self._fetch_response(id, event) - if self._delete_queries: + if self._delete_completed_queries: self._remove(id, data) return web.json_response(status=200, data=response) @@ -98,7 +99,8 @@ def rest_connector( route: str = "/", schema: type[pw.Schema] | None = None, autocommit_duration_ms=1500, - delete_queries: bool = False, + keep_queries: bool | None = None, + delete_completed_queries: bool | None = None, ) -> tuple[pw.Table, Callable]: """ Runs a lightweight HTTP server and inputs a collection from the HTTP endpoint, @@ -116,7 +118,8 @@ def rest_connector( autocommit_duration_ms: the maximum time between two commits. Every autocommit_duration_ms milliseconds, the updates received by the connector are committed and pushed into Pathway's computation graph; - delete_queries: whether to send a deletion entry after the query is processed. + keep_queries: whether to keep queries after processing; defaults to False. [deprecated] + delete_completed_queries: whether to send a deletion entry after the query is processed. Allows to remove it from the system if it is stored by operators such as ``join`` or ``groupby``; Returns: @@ -124,6 +127,20 @@ def rest_connector( response_writer: a callable, where the result table should be provided. """ + if delete_completed_queries is None: + if keep_queries is None: + warn( + "delete_completed_queries arg of rest_connector should be set explicitly." + + " It will soon be required." + ) + delete_completed_queries = True + else: + warn( + "DEPRECATED: keep_queries arg of rest_connector is deprecated," + + " use delete_completed_queries with an opposite meaning instead." + ) + delete_completed_queries = not keep_queries + loop = asyncio.new_event_loop() tasks: dict[Any, Any] = {} @@ -141,7 +158,7 @@ def rest_connector( loop=loop, tasks=tasks, schema=schema, - delete_queries=delete_queries, + delete_completed_queries=delete_completed_queries, format=format, ), schema=schema, diff --git a/python/pathway/io/python/__init__.py b/python/pathway/io/python/__init__.py index f71bed38..f0a81c93 100644 --- a/python/pathway/io/python/__init__.py +++ b/python/pathway/io/python/__init__.py @@ -12,7 +12,12 @@ from pathway.internals.runtime_type_check import runtime_type_check from pathway.internals.schema import Schema from pathway.internals.trace import trace_user_frame -from pathway.io._utils import RawDataSchema, get_data_format_type, read_schema +from pathway.io._utils import ( + RawDataSchema, + assert_schema_or_value_columns_not_none, + get_data_format_type, + read_schema, +) SUPPORTED_INPUT_FORMATS: set[str] = { "json", @@ -175,6 +180,8 @@ def read( raise ValueError("raw format must not be used with value_columns property") schema = RawDataSchema + assert_schema_or_value_columns_not_none(schema, value_columns, data_format_type) + schema, api_schema = read_schema( schema=schema, value_columns=value_columns, diff --git a/python/pathway/io/redpanda/__init__.py b/python/pathway/io/redpanda/__init__.py index 77aa7fc1..2728242e 100644 --- a/python/pathway/io/redpanda/__init__.py +++ b/python/pathway/io/redpanda/__init__.py @@ -263,13 +263,17 @@ def write( ... } You want to send a Pathway table t to the Redpanda instance. + + >>> import pathway as pw + >>> t = pw.debug.parse_to_table("age owner pet \\n 1 10 Alice dog \\n 2 9 Bob cat \\n 3 8 Alice cat") + To connect to the topic "animals" and send messages, the connector must be used \ as follows, depending on the format: JSON version: - >>> import pathway as pw - >>> t = pw.io.redpanda.read( + >>> pw.io.redpanda.write( + ... t, ... rdkafka_settings, ... "animals", ... format="json", diff --git a/python/pathway/stdlib/temporal/__init__.py b/python/pathway/stdlib/temporal/__init__.py index dc4d114e..c1895f49 100644 --- a/python/pathway/stdlib/temporal/__init__.py +++ b/python/pathway/stdlib/temporal/__init__.py @@ -33,7 +33,7 @@ window_join_outer, window_join_right, ) -from .temporal_behavior import WindowBehavior, window_behavior +from .temporal_behavior import CommonBehavior, common_behavior __all__ = [ "AsofJoinResult", @@ -65,6 +65,6 @@ "tumbling", "sliding", "session", - "window_behavior", - "WindowBehavior", + "common_behavior", + "CommonBehavior", ] diff --git a/python/pathway/stdlib/temporal/_interval_join.py b/python/pathway/stdlib/temporal/_interval_join.py index fbe2a6f8..a2f71f20 100644 --- a/python/pathway/stdlib/temporal/_interval_join.py +++ b/python/pathway/stdlib/temporal/_interval_join.py @@ -24,7 +24,7 @@ from pathway.internals.thisclass import ThisMetaclass from pathway.internals.trace import trace_user_frame -from .temporal_behavior import WindowBehavior +from .temporal_behavior import CommonBehavior from .utils import IntervalType, TimeEventType, check_joint_types, get_default_shift @@ -156,7 +156,7 @@ def __init__( @staticmethod def _apply_temporal_behavior( - table: pw.Table, behavior: WindowBehavior | None + table: pw.Table, behavior: CommonBehavior | None ) -> pw.Table: if behavior is not None: if behavior.delay is not None: @@ -179,7 +179,7 @@ def _interval_join( right_time_expression: pw.ColumnExpression, interval: Interval, *on: pw.ColumnExpression, - behavior: WindowBehavior | None = None, + behavior: CommonBehavior | None = None, mode: pw.JoinMode, ): """Creates an IntervalJoinResult. To perform an interval join uses it uses two @@ -428,7 +428,7 @@ def interval_join( other_time: pw.ColumnExpression, interval: Interval, *on: pw.ColumnExpression, - behavior: WindowBehavior | None = None, + behavior: CommonBehavior | None = None, how: pw.JoinMode = pw.JoinMode.INNER, ) -> IntervalJoinResult: """Performs an interval join of self with other using a time difference @@ -545,7 +545,7 @@ def interval_join_inner( other_time: pw.ColumnExpression, interval: Interval, *on: pw.ColumnExpression, - behavior: WindowBehavior | None = None, + behavior: CommonBehavior | None = None, ) -> IntervalJoinResult: """Performs an interval join of self with other using a time difference and join expressions. If `self_time + lower_bound <= @@ -659,7 +659,7 @@ def interval_join_left( other_time: pw.ColumnExpression, interval: Interval, *on: pw.ColumnExpression, - behavior: WindowBehavior | None = None, + behavior: CommonBehavior | None = None, ) -> IntervalJoinResult: """Performs an interval left join of self with other using a time difference and join expressions. If `self_time + lower_bound <= @@ -778,7 +778,7 @@ def interval_join_right( other_time: pw.ColumnExpression, interval: Interval, *on: pw.ColumnExpression, - behavior: WindowBehavior | None = None, + behavior: CommonBehavior | None = None, ) -> IntervalJoinResult: """Performs an interval right join of self with other using a time difference and join expressions. If `self_time + lower_bound <= @@ -899,7 +899,7 @@ def interval_join_outer( other_time: pw.ColumnExpression, interval: Interval, *on: pw.ColumnExpression, - behavior: WindowBehavior | None = None, + behavior: CommonBehavior | None = None, ) -> IntervalJoinResult: """Performs an interval outer join of self with other using a time difference and join expressions. If `self_time + lower_bound <= diff --git a/python/pathway/stdlib/temporal/_window.py b/python/pathway/stdlib/temporal/_window.py index e3d96717..4489135e 100644 --- a/python/pathway/stdlib/temporal/_window.py +++ b/python/pathway/stdlib/temporal/_window.py @@ -19,8 +19,19 @@ from ._interval_join import interval, interval_join from ._window_join import WindowJoinResult -from .temporal_behavior import WindowBehavior -from .utils import IntervalType, TimeEventType, check_joint_types, get_default_shift +from .temporal_behavior import ( + Behavior, + CommonBehavior, + ExactlyOnceBehavior, + common_behavior, +) +from .utils import ( + IntervalType, + TimeEventType, + check_joint_types, + get_default_shift, + zero_length_interval, +) class Window(ABC): @@ -29,7 +40,7 @@ def _apply( self, table: pw.Table, key: pw.ColumnExpression, - behavior: WindowBehavior | None, + behavior: Behavior | None, shard: pw.ColumnExpression | None, ) -> pw.GroupedTable: ... @@ -100,7 +111,7 @@ def _apply( self, table: pw.Table, key: pw.ColumnExpression, - behavior: WindowBehavior | None, + behavior: Behavior | None, shard: pw.ColumnExpression | None, ) -> pw.GroupedTable: if self.max_gap is not None: @@ -307,7 +318,7 @@ def _apply( self, table: pw.Table, key: pw.ColumnExpression, - behavior: WindowBehavior | None, + behavior: Behavior | None, shard: pw.ColumnExpression | None, ) -> pw.GroupedTable: check_joint_types( @@ -318,6 +329,7 @@ def _apply( "window.offset": (self.offset, TimeEventType), } ) + target = table.select( _pw_window=pw.apply_with_type( self._assign_windows, @@ -341,6 +353,31 @@ def _apply( ) if behavior is not None: + if isinstance(behavior, ExactlyOnceBehavior): + duration: IntervalType + # that is split in two if-s, as it helps mypy figure out proper types + # one if impl left either self.ratio or self.duration as optionals + # which won't fit into the duration variable of type IntervalType + if self.duration is not None: + duration = self.duration + elif self.ratio is not None: + duration = self.ratio * self.hop + shift = ( + self.shift + if self.shift is not None + else zero_length_interval(type(duration)) + ) + behavior = common_behavior( + duration + shift, shift, True # type:ignore + ) + elif not isinstance(behavior, CommonBehavior): + raise ValueError( + f"behavior {behavior} unsupported in sliding/tumbling window" + ) + + if behavior.cutoff is not None: + cutoff_threshold = pw.this._pw_window_end + behavior.cutoff + target = target._freeze(cutoff_threshold, pw.this._pw_key) if behavior.delay is not None: target = target._buffer( target._pw_window_start + behavior.delay, target._pw_key @@ -355,7 +392,6 @@ def _apply( if behavior.cutoff is not None: cutoff_threshold = pw.this._pw_window_end + behavior.cutoff - target = target._freeze(cutoff_threshold, pw.this._pw_key) target = target._forget( cutoff_threshold, pw.this._pw_key, behavior.keep_results ) @@ -470,7 +506,7 @@ def _apply( self, table: pw.Table, key: pw.ColumnExpression, - behavior: WindowBehavior | None, + behavior: CommonBehavior | None, shard: pw.ColumnExpression | None, ) -> pw.GroupedTable: check_joint_types( @@ -803,7 +839,7 @@ def windowby( time_expr: pw.ColumnExpression, *, window: Window, - behavior: WindowBehavior | None = None, + behavior: Behavior | None = None, shard: pw.ColumnExpression | None = None, ) -> pw.GroupedTable: """ diff --git a/python/pathway/stdlib/temporal/temporal_behavior.py b/python/pathway/stdlib/temporal/temporal_behavior.py index 45f56378..04806059 100644 --- a/python/pathway/stdlib/temporal/temporal_behavior.py +++ b/python/pathway/stdlib/temporal/temporal_behavior.py @@ -4,11 +4,17 @@ from .utils import IntervalType -# TODO - clarify corner cases (which times are exclusive / inclusive) + +class Behavior: + """ + A superclass of all classes defining temporal behavior. + """ + + pass @dataclass -class WindowBehavior: +class CommonBehavior(Behavior): """Defines temporal behavior of windows and temporal joins.""" delay: IntervalType | None @@ -16,22 +22,57 @@ class WindowBehavior: keep_results: bool -def window_behavior( +def common_behavior( delay: IntervalType | None = None, cutoff: IntervalType | None = None, keep_results: bool = True, -) -> WindowBehavior: - """Creates WindowBehavior +) -> CommonBehavior: + """Creates CommonBehavior Args: - delay: For windows, delays initial output by ``delay`` with respect to the - beginning of the window. For interval joins, it delays the time the record - is joined by ``delay``. Using `delay` is useful when updates are too frequent. - cutoff: For windows, stops updating windows which end earlier than maximal seen - time minus ``cutoff``. For interval joins, it ignores entries that are older + delay: + Optional; for windows, delays initial output by ``delay`` with respect to the + beginning of the window. Setting it to ``None`` does not enable + delaying mechanism. + + For interval joins, it delays the time the record is joined by ``delay``. + + Using `delay` is useful when updates are too frequent. + cutoff: + Optional; for windows, stops updating windows which end earlier than maximal + seen time minus ``cutoff``. Setting cutoff to ``None`` does not enable + cutoff mechanism. + + For interval joins, it ignores entries that are older than maximal seen time minus ``cutoff``. This parameter is also used to clear memory. It allows to release memory used by entries that won't change. + keep_results: If set to True, keeps all results of the operator. If set to False, keeps only results that are newer than maximal seen time minus ``cutoff``. + Can't be set to ``False``, when ``cutoff`` is ``None``. + """ + assert not (cutoff is None and not keep_results) + return CommonBehavior(delay, cutoff, keep_results) + + +@dataclass +class ExactlyOnceBehavior(Behavior): + shift: IntervalType | None + + +def exactly_once_behavior(shift: IntervalType | None = None): + """Creates an instance of class ExactlyOnceBehavior, indicating that each non empty + window should produce exactly one output. + + Args: + shift: optional, defines the moment in time (``window end + shift``) in which + the window stops accepting the data and sends the results to the output. + Setting it to ``None`` is interpreted as ``shift=0``. + + Remark: + note that setting a non-zero shift and demanding exactly one output results in + the output being delivered only when the time in the time column reaches + ``window end + shift``. + """ - return WindowBehavior(delay, cutoff, keep_results) + return ExactlyOnceBehavior(shift) diff --git a/python/pathway/stdlib/temporal/utils.py b/python/pathway/stdlib/temporal/utils.py index fcd5af77..945d4f8c 100644 --- a/python/pathway/stdlib/temporal/utils.py +++ b/python/pathway/stdlib/temporal/utils.py @@ -1,7 +1,7 @@ # Copyright © 2023 Pathway import datetime -from typing import Any, Union +from typing import Any, Type, Union from pathway.internals import dtype as dt from pathway.internals.type_interpreter import eval_type @@ -19,6 +19,17 @@ def get_default_shift(interval: IntervalType) -> TimeEventType: return 0.0 +def zero_length_interval(interval_type: Type[IntervalType]) -> IntervalType: + if issubclass(interval_type, datetime.timedelta): + return datetime.timedelta(0) + elif issubclass(interval_type, int): + return 0 + elif issubclass(interval_type, float): + return 0.0 + else: + raise Exception("unsupported interval type") + + def _get_possible_types(type: Any) -> tuple[dt.DType, ...]: if type is TimeEventType: return (dt.INT, dt.FLOAT, dt.DATE_TIME_NAIVE, dt.DATE_TIME_UTC) diff --git a/python/pathway/stdlib/utils/__init__.py b/python/pathway/stdlib/utils/__init__.py index 5ab2d2d8..9ec80cca 100644 --- a/python/pathway/stdlib/utils/__init__.py +++ b/python/pathway/stdlib/utils/__init__.py @@ -4,4 +4,10 @@ from . import async_transformer, bucketing, col, filtering, pandas_transformer -__all__ = ["bucketing", "col", "pandas_transformer", "async_transformer", "filtering"] +__all__ = [ + "bucketing", + "col", + "pandas_transformer", + "async_transformer", + "filtering", +] diff --git a/python/pathway/tests/ml/test_index.py b/python/pathway/tests/ml/test_index.py index 64d478d3..906eafd1 100644 --- a/python/pathway/tests/ml/test_index.py +++ b/python/pathway/tests/ml/test_index.py @@ -72,11 +72,13 @@ def test_all_at_once(): assert_table_equality_wo_index(result, expected) -def stream_points() -> tuple[pw.Table, pw.Table]: +def stream_points() -> tuple[pw.Table, pw.Table, pw.persistence.Config]: """Returns (points, queries).""" points = get_points() - table = pw.debug.table_from_list_of_batches( + stream_generator = pw.debug.StreamGenerator() + + table = stream_generator.table_from_list_of_batches( [[{"coords": point[0], "is_query": point[1]}] for point in points], PointSchema, ).update_types(coords=tuple[int, ...]) @@ -84,11 +86,12 @@ def stream_points() -> tuple[pw.Table, pw.Table]: return ( table.filter(~pw.this.is_query).without(pw.this.is_query), table.filter(pw.this.is_query).without(pw.this.is_query), + stream_generator.persistence_config(), ) def test_update_old(): - points, queries = stream_points() + points, queries, persistence_config = stream_points() index = KNNIndex(points.coords, points, n_dimensions=2, n_and=5) result = queries + index.get_nearest_items(queries.coords, k=2).with_universe_of( queries @@ -101,11 +104,13 @@ def test_update_old(): ((-2, -3), ((-1, 0), (1, -4))), ] ) - assert_table_equality_wo_index(result, expected) + assert_table_equality_wo_index( + result, expected, persistence_config=persistence_config + ) def test_asof_now(): - points, queries = stream_points() + points, queries, persistence_config = stream_points() index = KNNIndex(points.coords, points, n_dimensions=2, n_and=5) result = queries + index.get_nearest_items_asof_now(queries.coords, k=2).select( nn=pw.apply(sort_arrays, pw.this.coords) @@ -118,4 +123,6 @@ def test_asof_now(): ((-2, -3), ((-3, 1), (-1, 0))), ] ) - assert_table_equality_wo_index(result, expected) + assert_table_equality_wo_index( + result, expected, persistence_config=persistence_config + ) diff --git a/python/pathway/tests/temporal/test_interval_joins_stream.py b/python/pathway/tests/temporal/test_interval_joins_stream.py index ebfc5e02..d3591981 100644 --- a/python/pathway/tests/temporal/test_interval_joins_stream.py +++ b/python/pathway/tests/temporal/test_interval_joins_stream.py @@ -35,7 +35,7 @@ def test_forgetting(keep_results: bool): t1.t, t2.t, pw.temporal.interval(-0.1, 0.1), - behavior=pw.temporal.window_behavior(0, 2, keep_results=keep_results), + behavior=pw.temporal.common_behavior(0, 2, keep_results=keep_results), ).select(left_t=pw.left.t, right_t=pw.right.t) if keep_results: expected = T( @@ -104,7 +104,7 @@ def test_forgetting_sharded(keep_results: bool): t2.t, pw.temporal.interval(-0.1, 0.1), t1.v == t2.v, - behavior=pw.temporal.window_behavior(0, 2, keep_results=keep_results), + behavior=pw.temporal.common_behavior(0, 2, keep_results=keep_results), ).select(v=pw.this.v, left_t=pw.left.t, right_t=pw.right.t) if keep_results: expected = T( diff --git a/python/pathway/tests/temporal/test_windows.py b/python/pathway/tests/temporal/test_windows.py index 661f925a..13d079bf 100644 --- a/python/pathway/tests/temporal/test_windows.py +++ b/python/pathway/tests/temporal/test_windows.py @@ -224,7 +224,7 @@ def test_sliding_compacting(): gb = t.windowby( t.t, window=pw.temporal.sliding(duration=10, hop=3), - behavior=pw.temporal.window_behavior(delay=0, cutoff=1, keep_results=False), + behavior=pw.temporal.common_behavior(delay=0, cutoff=1, keep_results=False), shard=t.shard, ) @@ -271,7 +271,7 @@ def test_sliding_compacting_2(): gb = t.windowby( t.t, window=pw.temporal.sliding(duration=10, hop=3), - behavior=pw.temporal.window_behavior(delay=0, cutoff=2, keep_results=False), + behavior=pw.temporal.common_behavior(delay=0, cutoff=2, keep_results=False), shard=t.shard, ) diff --git a/python/pathway/tests/temporal/test_windows_stream.py b/python/pathway/tests/temporal/test_windows_stream.py index 92fb58d6..0410c310 100644 --- a/python/pathway/tests/temporal/test_windows_stream.py +++ b/python/pathway/tests/temporal/test_windows_stream.py @@ -6,7 +6,12 @@ import pathway as pw from pathway.internals import api -from pathway.tests.utils import DiffEntry, assert_key_entries_in_stream_consistent, run +from pathway.tests.utils import ( + DiffEntry, + assert_key_entries_in_stream_consistent, + assert_stream_equal, + run, +) class TimeColumnInputSchema(pw.Schema): @@ -31,6 +36,7 @@ def generate_buffer_output( duration, hop, delay, + cutoff, ): now = 0 buffer = {} @@ -44,7 +50,12 @@ def generate_buffer_output( for _pw_window_start, _pw_window_end in windows: shard = None window = (shard, _pw_window_start, _pw_window_end) + freeze_threshold = window[2] + cutoff + if freeze_threshold <= now: + continue + threshold = window[1] + delay + if threshold <= now: to_process.append((window, entry)) else: @@ -58,7 +69,6 @@ def generate_buffer_output( to_process.append((window, entry)) output.extend(to_process) - # print(buffer) return output @@ -68,8 +78,7 @@ def test_keep_results_manual(): "value": lambda x: x, } - # 68 is 4*17, 1 - # 7 is a nice number I chose arbitrarily + # 68 is 4*17, 17 is a nice number I chose arbitrarily # 4 comes from the fact that I wanted 2 old entries and two fresh (possibly late) # entries in a window @@ -84,7 +93,7 @@ def test_keep_results_manual(): gb = t.windowby( t.time, window=pw.temporal.sliding(duration=5, hop=3), - behavior=pw.temporal.window_behavior(delay=0, cutoff=0, keep_results=True), + behavior=pw.temporal.common_behavior(delay=0, cutoff=0, keep_results=True), ) expected_entries = [] @@ -146,14 +155,13 @@ def test_keep_results_manual(): pw.this._pw_window_end, max_time=pw.reducers.max(pw.this.time), max_value=pw.reducers.max(pw.this.value), - ).select(pw.this._pw_window_end, pw.this.max_time, pw.this.max_value) - + ) assert_key_entries_in_stream_consistent(expected_entries, result) - run(debug=True) + run() -def parametrized_test(duration, hop, delay, cutoff, keep_results): +def create_windowby_scenario(duration, hop, delay, cutoff, keep_results): value_functions = { "time": lambda x: (x // 2) % 17, "value": lambda x: x, @@ -170,25 +178,38 @@ def parametrized_test(duration, hop, delay, cutoff, keep_results): autocommit_duration_ms=5, input_rate=25, ) + gb = t.windowby( t.time, window=pw.temporal.sliding(duration=duration, hop=hop), - behavior=pw.temporal.window_behavior( + behavior=pw.temporal.common_behavior( delay=delay, cutoff=cutoff, keep_results=keep_results ), ) + result = gb.reduce( + pw.this._pw_window_end, + max_time=pw.reducers.max(pw.this.time), + max_value=pw.reducers.max(pw.this.value), + ) + result.debug("res") + return result + + +def generate_expected(duration, hop, delay, cutoff, keep_results, result_table): entries = [] for i in range(68): entries.append({"value": i, "time": (i // 2) % 17}) - buf_out = generate_buffer_output(entries, duration=duration, hop=hop, delay=delay) + buf_out = generate_buffer_output( + entries, duration=duration, hop=hop, delay=delay, cutoff=cutoff + ) - simulated_state: dict = {} + simulated_state: dict[pw.Pointer, DiffEntry] = {} expected_entries = [] max_global_time = 0 order = 0 - print(buf_out) + for ( window, in_entry, @@ -200,7 +221,7 @@ def parametrized_test(duration, hop, delay, cutoff, keep_results): "_pw_window_end": window[2], } - entry_id = DiffEntry.create_id_from(gb, pk_row) + entry_id = DiffEntry.create_id_from(result_table, pk_row) order = in_entry["value"] max_value = in_entry["value"] @@ -209,13 +230,11 @@ def parametrized_test(duration, hop, delay, cutoff, keep_results): old_entry_state = simulated_state.get(entry_id) if old_entry_state is not None: - # cutoff - if max_global_time < typing.cast( - int, old_entry_state.row["_pw_window_end"] + cutoff - ): - expected_entries.append( - DiffEntry.create(gb, pk_row, order, False, old_entry_state.row) + expected_entries.append( + DiffEntry.create( + result_table, pk_row, order, False, old_entry_state.row ) + ) max_value = max( max_value, typing.cast(int, old_entry_state.row["max_value"]) @@ -229,28 +248,24 @@ def parametrized_test(duration, hop, delay, cutoff, keep_results): "max_value": max_value, "max_time": max_window_time, } - insert_entry = DiffEntry.create(gb, pk_row, order, True, row) - - if ( - max_global_time - < typing.cast(int, insert_entry.row["_pw_window_end"]) + cutoff - ): - simulated_state[entry_id] = insert_entry - expected_entries.append(insert_entry) + insert_entry = DiffEntry.create(result_table, pk_row, order, True, row) + simulated_state[entry_id] = insert_entry + expected_entries.append(insert_entry) if not keep_results: for entry in simulated_state.values(): if entry.row["_pw_window_end"] + cutoff <= max_global_time: expected_entries.append(entry.final_cleanup_entry()) + return expected_entries - result = gb.reduce( - pw.this._pw_window_end, - max_time=pw.reducers.max(pw.this.time), - max_value=pw.reducers.max(pw.this.value), - ).select(pw.this._pw_window_end, pw.this.max_time, pw.this.max_value) - assert_key_entries_in_stream_consistent(expected_entries, result) - run(debug=True) +def parametrized_test(duration, hop, delay, cutoff, keep_results): + result_table = create_windowby_scenario(duration, hop, delay, cutoff, keep_results) + expected = generate_expected( + duration, hop, delay, cutoff, keep_results, result_table + ) + assert_key_entries_in_stream_consistent(expected, result_table) + run() def test_keep_results(): @@ -287,3 +302,93 @@ def test_high_delay_high_buffer_keep_results(): def test_non_zero_delay_non_zero_buffer_remove_results(): parametrized_test(5, 3, 1, 1, False) + + +def test_exactly_once(): + duration = 5 + hop = 3 + delay = 6 + cutoff = 1 + keep_results = True + result = create_windowby_scenario(duration, hop, delay, cutoff, keep_results) + expected = [] + for i, window_end in enumerate([2, 5, 8, 11, 14]): + pk_row: dict = { + "_pw_window": (None, window_end - duration, window_end), + "_pw_shard": None, + "_pw_window_start": window_end - duration, + "_pw_window_end": window_end, + } + + row: dict = { + "_pw_window_end": window_end, + "max_time": window_end - 1, + "max_value": 2 * window_end - 1, + } + + expected.append(DiffEntry.create(result, pk_row, i, True, row)) + assert_stream_equal(expected, result) + run() + + +def test_exactly_once_from_behavior(): + duration = 5 + hop = 3 + value_functions = { + "time": lambda x: (x // 2) % 17, + "value": lambda x: x, + } + + # 68 is 4*17, 17 is a nice number I chose arbitrarily + # 4 comes from the fact that I wanted 2 old entries and two fresh (possibly late) + # entries in a window + + t = pw.demo.generate_custom_stream( + value_functions, + schema=TimeColumnInputSchema, + nb_rows=68, + autocommit_duration_ms=5, + input_rate=25, + ) + gb = t.windowby( + t.time, + window=pw.temporal.sliding(duration=duration, hop=hop), + behavior=pw.temporal.temporal_behavior.exactly_once_behavior(), + ) + + result = gb.reduce( + pw.this._pw_window_end, + max_time=pw.reducers.max(pw.this.time), + max_value=pw.reducers.max(pw.this.value), + ) + + expected = [] + for i, window_end in enumerate([2, 5, 8, 11, 14]): + pk_row: dict = { + "_pw_window": (None, window_end - duration, window_end), + "_pw_shard": None, + "_pw_window_start": window_end - duration, + "_pw_window_end": window_end, + } + + row: dict = { + "_pw_window_end": window_end, + "max_time": window_end - 1, + "max_value": 2 * window_end - 1, + } + + expected.append(DiffEntry.create(result, pk_row, i, True, row)) + assert_stream_equal(expected, result) + run() + + +def test_exactly_once_empty(): + duration = 5 + hop = 3 + delay = 6 + cutoff = 1 + keep_results = False + result = create_windowby_scenario(duration, hop, delay, cutoff, keep_results) + expected: list[DiffEntry] = [] + assert_stream_equal(expected, result) + run() diff --git a/python/pathway/tests/test_column_properties.py b/python/pathway/tests/test_column_properties.py index 83f78e77..59181ecf 100644 --- a/python/pathway/tests/test_column_properties.py +++ b/python/pathway/tests/test_column_properties.py @@ -1,6 +1,9 @@ +import pytest + import pathway.internals as pw from pathway.internals import dtype as dt from pathway.internals.column_properties import ColumnProperties +from pathway.internals.decorators import empty_from_schema from pathway.tests.utils import T @@ -68,3 +71,44 @@ def test_preserve_context_dependency_properties(): assert_col_props(res1.a, ColumnProperties(dtype=dt.INT, append_only=True)) assert_col_props(res2.a, ColumnProperties(dtype=dt.INT, append_only=False)) + + +@pytest.mark.parametrize("append_only", [True, False]) +def test_const_column_properties(append_only): + class Schema(pw.Schema, append_only=append_only): + a: int = pw.column_definition(primary_key=True) + + table = empty_from_schema(Schema) + + result = table.select(ret=42) + + assert table.a._column.properties.append_only == append_only + assert result.ret._column.properties.append_only == append_only + + +@pytest.mark.parametrize("append_only", [True, False]) +def test_universe_properties(append_only): + class Schema(pw.Schema, append_only=append_only): + a: int = pw.column_definition(primary_key=True) + + table = empty_from_schema(Schema) + result = table.select() + + assert table._id_column.properties.append_only == append_only + assert result._id_column.properties.append_only == append_only + + +def test_universe_properties_with_universe_of(): + class Schema(pw.Schema, append_only=True): + a: int = pw.column_definition(primary_key=True) + + table = empty_from_schema(Schema) + + reduced = table.groupby(pw.this.a).reduce(pw.this.a) + reduced_same_universe = ( + table.groupby(pw.this.a).reduce(pw.this.a).with_universe_of(table) + ) + + assert table._id_column.properties.append_only + assert not reduced._id_column.properties.append_only + assert reduced_same_universe._id_column.properties.append_only diff --git a/python/pathway/tests/test_io.py b/python/pathway/tests/test_io.py index face3c23..d81c82d8 100644 --- a/python/pathway/tests/test_io.py +++ b/python/pathway/tests/test_io.py @@ -21,6 +21,7 @@ from pathway.tests.utils import ( CountDifferentTimestampsCallback, CsvLinesNumberChecker, + FileLinesNumberChecker, T, assert_table_equality, assert_table_equality_wo_index, @@ -1184,11 +1185,17 @@ def run(self): def run_replacement_test( - streaming_target, input_format, expected_output_lines, tmp_path, monkeypatch + streaming_target, + input_format, + expected_output_lines, + tmp_path, + monkeypatch, + inputs_path_override=None, + has_only_file_replacements=False, ): monkeypatch.setenv("PATHWAY_PERSISTENT_STORAGE", str(tmp_path / "PStorage")) - inputs_path = tmp_path / "inputs" - os.mkdir(inputs_path) + inputs_path = inputs_path_override or (tmp_path / "inputs") + os.mkdir(tmp_path / "inputs") class InputSchema(pw.Schema): key: int = pw.column_definition(primary_key=True) @@ -1199,22 +1206,52 @@ class InputSchema(pw.Schema): format=input_format, schema=InputSchema, mode="streaming_with_deletions", - autocommit_duration_ms=10, + autocommit_duration_ms=1, + with_metadata=True, ) output_path = tmp_path / "output.csv" - pw.io.csv.write(table, str(output_path)) + pw.io.jsonlines.write(table, str(output_path)) inputs_thread = threading.Thread(target=streaming_target, daemon=True) inputs_thread.start() assert wait_result_with_checker( - CsvLinesNumberChecker(output_path, expected_output_lines), 30 + FileLinesNumberChecker(output_path, expected_output_lines), 30 ) + parsed_rows = [] + with open(output_path, "r") as f: + for row in f: + parsed_row = json.loads(row) + parsed_rows.append(parsed_row) + parsed_rows.sort(key=lambda row: (row["time"], row["diff"])) + + key_metadata = {} + time_removed = {} + for parsed_row in parsed_rows: + key = parsed_row["key"] + metadata = parsed_row["_metadata"] + file_name = metadata["path"] + is_insertion = parsed_row["diff"] == 1 + timestamp = parsed_row["time"] + + if is_insertion: + if has_only_file_replacements and file_name in time_removed: + # If there are only replacement and the file has been removed + # already, then we need to check that the insertion and its' + # removal were consolidated, i.e. happened in the same timestamp + assert time_removed[file_name] == timestamp + key_metadata[key] = metadata + else: + # Check that the metadata for the deleted object corresponds to the + # initially reported metadata + assert key_metadata[key] == metadata + time_removed[file_name] = timestamp + @xfail_on_darwin(reason="running pw.run from separate process not supported") -def test_simple_forgetting(tmp_path: pathlib.Path, monkeypatch): +def test_simple_replacement_with_removal(tmp_path: pathlib.Path, monkeypatch): def stream_inputs(): time.sleep(1) first_line = {"key": 1, "value": "one"} @@ -1234,6 +1271,54 @@ def stream_inputs(): ) +@xfail_on_darwin(reason="running pw.run from separate process not supported") +def test_simple_insert_consolidation(tmp_path: pathlib.Path, monkeypatch): + def stream_inputs(): + time.sleep(1) + first_line = {"key": 1, "value": "one"} + second_line = {"key": 2, "value": "two"} + write_lines(tmp_path / "inputs/input1.jsonlines", json.dumps(first_line)) + time.sleep(1) + write_lines(tmp_path / "inputs/input1.jsonlines", json.dumps(second_line)) + time.sleep(1) + write_lines(tmp_path / "inputs/input1.jsonlines", json.dumps(first_line)) + time.sleep(1) + + run_replacement_test( + streaming_target=stream_inputs, + input_format="json", + expected_output_lines=5, + tmp_path=tmp_path, + monkeypatch=monkeypatch, + has_only_file_replacements=True, + ) + + +@xfail_on_darwin(reason="running pw.run from separate process not supported") +def test_simple_replacement_on_file(tmp_path: pathlib.Path, monkeypatch): + def stream_inputs(): + time.sleep(1) + first_line = {"key": 1, "value": "one"} + second_line = {"key": 2, "value": "two"} + third_line = {"key": 3, "value": "three"} + write_lines(tmp_path / "inputs/input.jsonlines", json.dumps(first_line)) + time.sleep(1) + write_lines(tmp_path / "inputs/input.jsonlines", json.dumps(second_line)) + time.sleep(1) + os.remove(tmp_path / "inputs/input.jsonlines") + time.sleep(1) + write_lines(tmp_path / "inputs/input.jsonlines", json.dumps(third_line)) + + run_replacement_test( + streaming_target=stream_inputs, + input_format="json", + expected_output_lines=5, + tmp_path=tmp_path, + monkeypatch=monkeypatch, + inputs_path_override=tmp_path / "inputs/input.jsonlines", + ) + + @xfail_on_darwin(reason="running pw.run from separate process not supported") def test_simple_replacement(tmp_path: pathlib.Path, monkeypatch): def stream_inputs(): @@ -1253,6 +1338,7 @@ def stream_inputs(): expected_output_lines=4, tmp_path=tmp_path, monkeypatch=monkeypatch, + has_only_file_replacements=True, ) @@ -1275,6 +1361,7 @@ def stream_inputs(): expected_output_lines=4, tmp_path=tmp_path, monkeypatch=monkeypatch, + has_only_file_replacements=True, ) @@ -1306,11 +1393,12 @@ def stream_inputs(): expected_output_lines=4, tmp_path=tmp_path, monkeypatch=monkeypatch, + has_only_file_replacements=True, ) @xfail_on_darwin(reason="running pw.run from separate process not supported") -def test_simple_forgetting_autogenerated_key(tmp_path: pathlib.Path, monkeypatch): +def test_file_removal_autogenerated_key(tmp_path: pathlib.Path, monkeypatch): def stream_inputs(): time.sleep(1) first_line = {"key": 1, "value": "one"} @@ -1349,6 +1437,7 @@ def stream_inputs(): expected_output_lines=4, tmp_path=tmp_path, monkeypatch=monkeypatch, + has_only_file_replacements=True, ) @@ -1913,16 +2002,18 @@ def test_stream_generator_from_list(): class InputSchema(pw.Schema): number: int + stream_generator = pw.debug.StreamGenerator() + events = [ [{"number": 1}, {"number": 2}, {"number": 5}], [{"number": 4}, {"number": 4}], ] - t = pw.debug.table_from_list_of_batches(events, InputSchema) + t = stream_generator.table_from_list_of_batches(events, InputSchema) on_change = mock.Mock() pw.io.subscribe(t, on_change=on_change) - pw.run() + pw.run(persistence_config=stream_generator.persistence_config()) timestamps = set([call.kwargs["time"] for call in on_change.mock_calls]) assert len(timestamps) == 2 @@ -1967,6 +2058,7 @@ class InputSchema(pw.Schema): def test_stream_generator_from_list_multiple_workers(monkeypatch: pytest.MonkeyPatch): monkeypatch.setenv("PATHWAY_THREADS", "2") + stream_generator = pw.debug.StreamGenerator() class InputSchema(pw.Schema): number: int @@ -1976,11 +2068,11 @@ class InputSchema(pw.Schema): {0: [{"number": 4}], 1: [{"number": 4}]}, ] - t = pw.debug.table_from_list_of_batches_by_workers(events, InputSchema) + t = stream_generator.table_from_list_of_batches_by_workers(events, InputSchema) on_change = mock.Mock() pw.io.subscribe(t, on_change=on_change) - pw.run() + pw.run(persistence_config=stream_generator.persistence_config()) timestamps = set([call.kwargs["time"] for call in on_change.mock_calls]) assert len(timestamps) == 2 @@ -2025,7 +2117,8 @@ class InputSchema(pw.Schema): @pytest.mark.filterwarnings("ignore:timestamps are required to be even") def test_stream_generator_from_markdown(): - t = pw.debug.table_from_markdown( + stream_generator = pw.debug.StreamGenerator() + t = stream_generator.table_from_markdown( """ | colA | colB | _time 1 | 1 | 2 | 1 @@ -2036,7 +2129,7 @@ def test_stream_generator_from_markdown(): on_change = mock.Mock() pw.io.subscribe(t, on_change=on_change) - pw.run() + pw.run(persistence_config=stream_generator.persistence_config()) on_change.assert_has_calls( [ @@ -2065,7 +2158,8 @@ def test_stream_generator_from_markdown(): def test_stream_generator_from_markdown_with_diffs(): - t = pw.debug.table_from_markdown( + stream_generator = pw.debug.StreamGenerator() + t = stream_generator.table_from_markdown( """ | colA | colB | _time | _diff 1 | 1 | 2 | 2 | 1 @@ -2085,17 +2179,20 @@ def test_stream_generator_from_markdown_with_diffs(): """ ) - assert_table_equality(t, expected) + assert_table_equality( + t, expected, persistence_config=stream_generator.persistence_config() + ) def test_stream_generator_two_tables_multiple_workers(monkeypatch: pytest.MonkeyPatch): + stream_generator = pw.debug.StreamGenerator() monkeypatch.setenv("PATHWAY_THREADS", "4") class InputSchema(pw.Schema): colA: int colB: int - t1 = pw.debug.table_from_markdown( + t1 = stream_generator.table_from_markdown( """ colA | colB | _time | _worker 1 | 2 | 2 | 0 @@ -2106,7 +2203,7 @@ class InputSchema(pw.Schema): """ ) - t2 = pw.debug.stream_generator._table_from_dict( + t2 = stream_generator._table_from_dict( { 2: {0: [(1, api.ref_scalar(0), [1, 4])]}, 4: {2: [(1, api.ref_scalar(1), [3, 7])]}, @@ -2124,7 +2221,7 @@ class InputSchema(pw.Schema): on_change = mock.Mock() pw.io.subscribe(t3, on_change=on_change) - pw.run() + pw.run(persistence_config=stream_generator.persistence_config()) on_change.assert_has_calls( [ diff --git a/python/pathway/tests/test_schema.py b/python/pathway/tests/test_schema.py index 20c3d88a..b0c1e681 100644 --- a/python/pathway/tests/test_schema.py +++ b/python/pathway/tests/test_schema.py @@ -267,6 +267,7 @@ class A(pw.Schema, append_only=True): assert A["a"].append_only is True assert A["b"].append_only is True + assert A.universe_properties.append_only is True class B(pw.Schema, append_only=False): a: int = pw.column_definition(append_only=False) @@ -274,6 +275,7 @@ class B(pw.Schema, append_only=False): assert B["a"].append_only is False assert B["b"].append_only is False + assert B.universe_properties.append_only is False class C(pw.Schema): a: int = pw.column_definition(append_only=True) @@ -283,3 +285,9 @@ class C(pw.Schema): assert C["a"].append_only is True assert C["b"].append_only is False assert C["c"].append_only is False + assert C.universe_properties.append_only is True + + class D(pw.Schema, append_only=True): + pass + + assert D.universe_properties.append_only is True diff --git a/python/pathway/tests/utils.py b/python/pathway/tests/utils.py index 5c2abe83..52e6602b 100644 --- a/python/pathway/tests/utils.py +++ b/python/pathway/tests/utils.py @@ -127,6 +127,32 @@ def on_end(self): assert not self.state, f"Non empty final state = {self.state!r}" +# this callback does not verify the order of entries, only that all of them were present +class CheckStreamEntriesEqualityCallback(CheckKeyEntriesInStreamCallback): + def __call__( + self, + key: api.Pointer, + row: dict[str, api.Value], + time: int, + is_addition: bool, + ) -> Any: + q = self.state.get(key) + assert ( + q + ), f"Got unexpected entry {key=} {row=} {time=} {is_addition=}, expected entries= {self.state!r}" + + entry = q.popleft() + assert (is_addition, row) == ( + entry.insertion, + entry.row, + ), f"Got unexpected entry {key=} {row=} {time=} {is_addition=}, expected entries= {self.state!r}" + if not q: + self.state.pop(key) + + def on_end(self): + assert not self.state, f"Non empty final state = {self.state!r}" + + # assert_key_entries_in_stream_consistent verifies for each key, whether: # - a sequence of updates in the table is a subsequence # of the sequence of updates defined in expected @@ -136,6 +162,11 @@ def assert_key_entries_in_stream_consistent(expected: list[DiffEntry], table: pw pw.io.subscribe(table, callback, callback.on_end) +def assert_stream_equal(expected: list[DiffEntry], table: pw.Table): + callback = CheckStreamEntriesEqualityCallback(expected) + pw.io.subscribe(table, callback, callback.on_end) + + def assert_equal_tables(t0: api.CapturedTable, t1: api.CapturedTable): assert t0 == t1 @@ -167,6 +198,19 @@ def __call__(self): return len(result) == self.n_lines +class FileLinesNumberChecker: + def __init__(self, path, n_lines): + self.path = path + self.n_lines = n_lines + + def __call__(self): + n_lines_actual = 0 + with open(self.path, "r") as f: + for row in f: + n_lines_actual += 1 + return n_lines_actual == self.n_lines + + def expect_csv_checker(expected, output_path, usecols=("k", "v"), index_col=("k")): expected = ( pw.debug._markdown_to_pandas(expected) diff --git a/src/connectors/data_storage.rs b/src/connectors/data_storage.rs index 96c4e1fd..a0d3281a 100644 --- a/src/connectors/data_storage.rs +++ b/src/connectors/data_storage.rs @@ -22,8 +22,7 @@ use std::str::{from_utf8, Utf8Error}; use std::sync::Arc; use std::thread; use std::thread::sleep; -use std::time::Duration; -use std::time::SystemTime; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use chrono::{DateTime, FixedOffset}; use log::{error, warn}; @@ -41,6 +40,7 @@ use crate::persistence::{ExternalPersistentId, PersistentId}; use crate::python_api::threads::PythonThreadState; use crate::python_api::with_gil_and_pool; use crate::python_api::PythonSubject; +use crate::timestamp::current_unix_timestamp_secs; use bincode::ErrorKind as BincodeError; use elasticsearch::{BulkParts, Elasticsearch}; @@ -162,6 +162,7 @@ impl ReaderContext { pub enum ReadResult { Finished, NewSource(Option), + FinishedSource { commit_allowed: bool }, Data(ReaderContext, Offset), } @@ -436,6 +437,7 @@ pub struct FilesystemReader { reader: Option>, filesystem_scanner: FilesystemScanner, total_entries_read: u64, + deferred_read_result: Option, } impl FilesystemReader { @@ -445,15 +447,9 @@ impl FilesystemReader { persistent_id: Option, read_method: ReadMethod, object_pattern: &str, - with_metadata: bool, ) -> Result { - let filesystem_scanner = FilesystemScanner::new( - path, - persistent_id, - streaming_mode, - object_pattern, - with_metadata, - )?; + let filesystem_scanner = + FilesystemScanner::new(path, persistent_id, streaming_mode, object_pattern)?; Ok(Self { persistent_id, @@ -462,6 +458,7 @@ impl FilesystemReader { filesystem_scanner, total_entries_read: 0, read_method, + deferred_read_result: None, }) } } @@ -497,6 +494,10 @@ impl Reader for FilesystemReader { } fn read(&mut self) -> Result { + if let Some(deferred_read_result) = self.deferred_read_result.take() { + return Ok(deferred_read_result); + } + loop { if let Some(reader) = &mut self.reader { let mut line = Vec::new(); @@ -522,6 +523,9 @@ impl Reader for FilesystemReader { .expect("scanner action can't be empty"); if self.read_method == ReadMethod::Full { + self.deferred_read_result = Some(ReadResult::FinishedSource { + commit_allowed: !self.filesystem_scanner.has_planned_insertion(), + }); self.reader = None; } @@ -530,21 +534,20 @@ impl Reader for FilesystemReader { offset, )); } + + self.reader = None; + return Ok(ReadResult::FinishedSource { + commit_allowed: !self.filesystem_scanner.has_planned_insertion(), + }); } - self.reader = None; - if self.filesystem_scanner.next_action_determined()? { - let file = File::open( - self.filesystem_scanner - .current_file() - .as_ref() - .unwrap() - .as_path(), - )?; - self.reader = Some(BufReader::new(file)); - return Ok(ReadResult::NewSource( - self.filesystem_scanner.maybe_current_object_metadata(), - )); + let next_read_result = self.filesystem_scanner.next_action_determined()?; + if let Some(next_read_result) = next_read_result { + if let Some(selected_file) = self.filesystem_scanner.current_file() { + let file = File::open(&*selected_file)?; + self.reader = Some(BufReader::new(file)); + } + return Ok(next_read_result); } if self.filesystem_scanner.is_polling_enabled() { @@ -714,7 +717,8 @@ struct FilesystemScanner { cached_modify_times: HashMap>, inotify: Option, object_pattern: GlobPattern, - with_metadata: bool, + next_file_for_insertion: Option, + cached_metadata: HashMap>, } impl FilesystemScanner { @@ -723,12 +727,10 @@ impl FilesystemScanner { persistent_id: Option, streaming_mode: ConnectorMode, object_pattern: &str, - with_metadata: bool, ) -> Result { - let path = std::fs::canonicalize(path.into())?; - - if !path.exists() { - return Err(io::Error::from(io::ErrorKind::NotFound).into()); + let mut path = path.into(); + if path.exists() || matches!(streaming_mode, ConnectorMode::Static) { + path = std::fs::canonicalize(path)?; } let is_directory = path.is_dir(); @@ -766,10 +768,15 @@ impl FilesystemScanner { cached_modify_times: HashMap::new(), inotify, object_pattern: GlobPattern::new(object_pattern)?, - with_metadata, + next_file_for_insertion: None, + cached_metadata: HashMap::new(), }) } + fn has_planned_insertion(&self) -> bool { + self.next_file_for_insertion.is_some() + } + fn is_polling_enabled(&self) -> bool { self.streaming_mode.is_polling_enabled() } @@ -794,19 +801,6 @@ impl FilesystemScanner { } } - fn maybe_current_object_metadata(&self) -> Option { - if !self.with_metadata { - return None; - } - let path: &Path = match &self.current_action { - Some(PosixScannerAction::Read(path) | PosixScannerAction::Delete(path)) => { - path.as_ref() - } - None => return None, - }; - Some(SourceMetadata::from_fs_path(path)) - } - /// Returns the name of the currently processed file in the input directory fn current_offset_file(&self) -> Option> { match &self.current_action { @@ -822,14 +816,21 @@ impl FilesystemScanner { todo!("seek for snapshot mode"); } + self.known_files.clear(); if !self.is_directory { self.current_action = Some(PosixScannerAction::Read(Arc::new( seek_file_path.to_path_buf(), ))); + let modify_system_time = std::fs::metadata(seek_file_path)?.modified().unwrap(); + let modify_unix_timestamp = modify_system_time + .duration_since(UNIX_EPOCH) + .expect("File modified earlier than UNIX epoch") + .as_secs(); + self.known_files + .insert(seek_file_path.to_path_buf(), modify_unix_timestamp); return Ok(()); } - self.known_files.clear(); let target_modify_time = match std::fs::metadata(seek_file_path) { Ok(metadata) => metadata.modified()?, Err(e) => { @@ -882,37 +883,54 @@ impl FilesystemScanner { } } - fn next_action_determined(&mut self) -> io::Result { + /// Finish reading the current file and find the next one to read from. + /// If there is a file to read from, the method returns a `ReadResult` + /// specifying the action to be provided downstream. + /// + /// It can either be a `NewSource` event when the new action is found or + /// a `FinishedSource` event when we've had a scheduled action but the + /// corresponding file was deleted before we were able to execute this scheduled action. + /// scheduled action. + fn next_action_determined(&mut self) -> io::Result> { // Finalize the current processing action - let is_processing_finalized = match take(&mut self.current_action) { - Some(PosixScannerAction::Read(_)) => true, - Some(PosixScannerAction::Delete(path)) => { - let cached_path = self - .cached_file_path(&path) - .expect("in case of enabled deletions cache should exist"); - std::fs::remove_file(cached_path)?; - true + if let Some(PosixScannerAction::Delete(path)) = take(&mut self.current_action) { + let cached_path = self + .cached_file_path(&path) + .expect("in case of enabled deletions cache should exist"); + std::fs::remove_file(cached_path)?; + } + + // File modification is handled as combination of its deletion and insertion + // If a file was deleted in the last action, now we must add it, and after that + // we may allow commit + if let Some(next_file_for_insertion) = take(&mut self.next_file_for_insertion) { + if next_file_for_insertion.exists() { + return Ok(Some( + self.initiate_file_insertion(&next_file_for_insertion)?, + )); } - None => false, - }; - if !self.is_directory && is_processing_finalized { - return Ok(false); + // The scheduled insertion after deletion is impossible because + // the file has already been deleted. + // The action was done in full now, and we can allow commits. + return Ok(Some(ReadResult::FinishedSource { + commit_allowed: true, + })); } // First check if we need to delete something if self.streaming_mode.are_deletions_enabled() { - let has_something_to_delete = self.next_file_for_deletion_found(); - if has_something_to_delete { - return Ok(true); + let next_for_deletion = self.next_deletion_entry(); + if next_for_deletion.is_some() { + return Ok(next_for_deletion); } } // If there is nothing to delete, ingest the new entries - self.next_file_for_insertion_found() + self.next_insertion_entry() } - fn next_file_for_deletion_found(&mut self) -> bool { + fn next_deletion_entry(&mut self) -> Option { let mut path_for_deletion: Option = None; for (path, modified_at) in &self.known_files { let metadata = std::fs::metadata(path); @@ -946,11 +964,22 @@ impl FilesystemScanner { match path_for_deletion { Some(path) => { + // Metadata of the deleted file must be the same as when it was added + // so that the deletion event is processed correctly by timely. To achieve + // this, we just take the cached metadata + let old_metadata = self + .cached_metadata + .remove(&path) + .expect("inconsistency between known_files and cached_metadata"); + self.known_files.remove(&path.clone().clone()); self.current_action = Some(PosixScannerAction::Delete(Arc::new(path.clone()))); - true + if path.exists() { + self.next_file_for_insertion = Some(path); + } + Some(ReadResult::NewSource(old_metadata)) } - None => false, + None => None, } } @@ -962,7 +991,7 @@ impl FilesystemScanner { }) } - fn next_file_for_insertion_found(&mut self) -> io::Result { + fn next_insertion_entry(&mut self) -> io::Result> { let mut selected_file: Option<(PathBuf, SystemTime)> = None; if self.is_directory { let files_in_directory = std::fs::read_dir(self.path.as_path())?; @@ -1009,31 +1038,38 @@ impl FilesystemScanner { } } } else { + let is_existing_file = self.path.exists() && self.path.is_file(); + if !self.known_files.is_empty() || !is_existing_file { + return Ok(None); + } selected_file = Some((self.path.clone(), SystemTime::now())); } match selected_file { - Some((new_file_name, new_file_modify_time)) => { - let new_file_path = self.path.as_path().join(new_file_name); - - let new_file_modify_timestamp = new_file_modify_time - .duration_since(SystemTime::UNIX_EPOCH) - .expect("System time should be after the Unix epoch") - .as_secs(); - - self.known_files - .insert(new_file_path.clone(), new_file_modify_timestamp); - - let cached_path = self.cached_file_path(&new_file_path); - if let Some(cached_path) = cached_path { - std::fs::copy(&new_file_path, cached_path)?; - } + Some((new_file_name, _)) => Ok(Some(self.initiate_file_insertion(&new_file_name)?)), + None => Ok(None), + } + } - self.current_action = Some(PosixScannerAction::Read(Arc::new(new_file_path))); - Ok(true) - } - None => Ok(false), + fn initiate_file_insertion(&mut self, new_file_name: &PathBuf) -> io::Result { + let new_file_meta = + SourceMetadata::from_fs_meta(new_file_name, &std::fs::metadata(new_file_name)?); + self.cached_metadata + .insert(new_file_name.clone(), Some(new_file_meta.clone())); + self.known_files.insert( + new_file_name.clone(), + new_file_meta + .modified_at + .unwrap_or(current_unix_timestamp_secs()), + ); + + let cached_path = self.cached_file_path(new_file_name); + if let Some(cached_path) = cached_path { + std::fs::copy(new_file_name, cached_path)?; } + + self.current_action = Some(PosixScannerAction::Read(Arc::new(new_file_name.clone()))); + Ok(ReadResult::NewSource(Some(new_file_meta))) } fn sleep_duration() -> Duration { @@ -1103,15 +1139,9 @@ impl CsvFilesystemReader { streaming_mode: ConnectorMode, persistent_id: Option, object_pattern: &str, - with_metadata: bool, ) -> Result { - let filesystem_scanner = FilesystemScanner::new( - path.into(), - persistent_id, - streaming_mode, - object_pattern, - with_metadata, - )?; + let filesystem_scanner = + FilesystemScanner::new(path.into(), persistent_id, streaming_mode, object_pattern)?; Ok(CsvFilesystemReader { parser_builder, persistent_id, @@ -1216,37 +1246,31 @@ impl Reader for CsvFilesystemReader { offset, )); } - if self.filesystem_scanner.next_action_determined()? { - self.reader = Some( - self.parser_builder.from_path( - self.filesystem_scanner - .current_file() - .as_ref() - .unwrap() - .as_path(), - )?, - ); - return Ok(ReadResult::NewSource( - self.filesystem_scanner.maybe_current_object_metadata(), - )); + + let next_read_result = self.filesystem_scanner.next_action_determined()?; + if let Some(next_read_result) = next_read_result { + if let Some(selected_file) = self.filesystem_scanner.current_file() { + self.reader = Some(self.parser_builder.from_path(&*selected_file)?); + } + return Ok(next_read_result); } // The file came to its end, so we should drop the reader self.reader = None; + return Ok(ReadResult::FinishedSource { + commit_allowed: !self.filesystem_scanner.has_planned_insertion(), + }); } None => { - if self.filesystem_scanner.next_action_determined()? { - self.reader = Some( - self.parser_builder.flexible(true).from_path( - self.filesystem_scanner - .current_file() - .as_ref() - .unwrap() - .as_path(), - )?, - ); - return Ok(ReadResult::NewSource( - self.filesystem_scanner.maybe_current_object_metadata(), - )); + let next_read_result = self.filesystem_scanner.next_action_determined()?; + if let Some(next_read_result) = next_read_result { + if let Some(selected_file) = self.filesystem_scanner.current_file() { + self.reader = Some( + self.parser_builder + .flexible(true) + .from_path(&*selected_file)?, + ); + } + return Ok(next_read_result); } } } @@ -2047,6 +2071,7 @@ pub struct S3GenericReader { persistent_id: Option, total_entries_read: u64, current_bytes_read: u64, + deferred_read_result: Option, } impl S3GenericReader { @@ -2066,6 +2091,7 @@ impl S3GenericReader { persistent_id, total_entries_read: 0, current_bytes_read: 0, + deferred_read_result: None, }) } @@ -2133,6 +2159,10 @@ impl Reader for S3GenericReader { } fn read(&mut self) -> Result { + if let Some(deferred_read_result) = self.deferred_read_result.take() { + return Ok(deferred_read_result); + } + loop { match &mut self.reader { Some(reader) => { @@ -2152,6 +2182,9 @@ impl Reader for S3GenericReader { ); if self.read_method == ReadMethod::Full { + self.deferred_read_result = Some(ReadResult::FinishedSource { + commit_allowed: true, + }); self.reader = None; } diff --git a/src/connectors/metadata.rs b/src/connectors/metadata.rs index 8ac65923..3bba468e 100644 --- a/src/connectors/metadata.rs +++ b/src/connectors/metadata.rs @@ -1,4 +1,3 @@ -use log::error; use std::path::Path; use std::time::{SystemTime, UNIX_EPOCH}; @@ -11,7 +10,7 @@ pub struct SourceMetadata { // Creation and modification time may not be available at some platforms // Stored in u64 for easy serialization created_at: Option, - modified_at: Option, + pub modified_at: Option, // Owner may be unavailable at some platforms and on S3 owner: Option, @@ -23,18 +22,10 @@ pub struct SourceMetadata { } impl SourceMetadata { - pub fn from_fs_path(path: &Path) -> Self { - let (created_at, modified_at, owner) = match std::fs::metadata(path) { - Ok(metadata) => ( - metadata_time_to_unix_timestamp(metadata.created().ok()), - metadata_time_to_unix_timestamp(metadata.modified().ok()), - file_owner::get_owner(&metadata), - ), - Err(e) => { - error!("Failed to get metadata for filesystem object {path:?}, details: {e}"); - (None, None, None) - } - }; + pub fn from_fs_meta(path: &Path, meta: &std::fs::Metadata) -> Self { + let created_at = metadata_time_to_unix_timestamp(meta.created().ok()); + let modified_at = metadata_time_to_unix_timestamp(meta.modified().ok()); + let owner = file_owner::get_owner(meta); Self { created_at, diff --git a/src/connectors/mod.rs b/src/connectors/mod.rs index 86f5d71a..b6bcffa6 100644 --- a/src/connectors/mod.rs +++ b/src/connectors/mod.rs @@ -402,6 +402,7 @@ where let connector_monitor = Rc::new(RefCell::new(ConnectorMonitor::new(reader_name))); let cloned_connector_monitor = connector_monitor.clone(); + let mut commit_allowed = true; let poller = Box::new(move || { let iteration_start = SystemTime::now(); if matches!(replay_mode, ReplayMode::Speedrun) @@ -413,7 +414,7 @@ where if let Some(next_commit_at_timestamp) = next_commit_at { if next_commit_at_timestamp <= iteration_start { - if backfilling_finished { + if backfilling_finished && commit_allowed { /* We don't auto-commit for the initial batch, which consists of the data, which shouldn't trigger any output. @@ -460,6 +461,7 @@ where &mut snapshot_writer, &offsets_by_time_writer, &mut Some(&mut *connector_monitor.borrow_mut()), + &mut commit_allowed, ); } Err(TryRecvError::Empty) => return ControlFlow::Continue(next_commit_at), @@ -489,24 +491,38 @@ where snapshot_writer: &mut Option, offsets_by_time_writer: &Mutex>, connector_monitor: &mut Option<&mut ConnectorMonitor>, + commit_allowed: &mut bool, ) { let has_persistent_storage = snapshot_writer.is_some(); match entry { Entry::Realtime(read_result) => match read_result { ReadResult::Finished => {} + ReadResult::FinishedSource { + commit_allowed: commit_allowed_external, + } => { + *commit_allowed = commit_allowed_external; + if *commit_allowed { + let parsed_entries = vec![ParsedEvent::AdvanceTime]; + self.on_parsed_data( + parsed_entries, + None, // no key generation for time advancement + input_session, + values_to_key, + snapshot_writer, + connector_monitor, + ); + } + } ReadResult::NewSource(metadata) => { + // If a connector produces events of this kind, we consider the + // objects atomic. That means that we won't do commits in between + // of data source processing. + // + // So, we will block the ability to commit until an event allowing + // the commits is received again. + *commit_allowed = false; parser.on_new_source_started(metadata.as_ref()); - - let parsed_entries = vec![ParsedEvent::AdvanceTime]; - self.on_parsed_data( - parsed_entries, - None, // no key generation for time advancement - input_session, - values_to_key, - snapshot_writer, - connector_monitor, - ); } ReadResult::Data(reader_context, offset) => { let mut parsed_entries = match parser.parse(&reader_context) { diff --git a/src/python_api.rs b/src/python_api.rs index 830de042..da5bf280 100644 --- a/src/python_api.rs +++ b/src/python_api.rs @@ -2977,7 +2977,6 @@ pub struct DataStorage { persistent_id: Option, max_batch_size: Option, object_pattern: String, - with_metadata: bool, mock_events: Option>>, } @@ -3209,7 +3208,6 @@ impl DataStorage { persistent_id = None, max_batch_size = None, object_pattern = "*".to_string(), - with_metadata = false, mock_events = None, ))] #[allow(clippy::too_many_arguments)] @@ -3229,7 +3227,6 @@ impl DataStorage { persistent_id: Option, max_batch_size: Option, object_pattern: String, - with_metadata: bool, mock_events: Option>>, ) -> Self { DataStorage { @@ -3248,7 +3245,6 @@ impl DataStorage { persistent_id, max_batch_size, object_pattern, - with_metadata, mock_events, } } @@ -3447,7 +3443,6 @@ impl DataStorage { self.internal_persistent_id(), self.read_method, &self.object_pattern, - self.with_metadata, ) .map_err(|e| { PyIOError::new_err(format!("Failed to initialize Filesystem reader: {e}")) @@ -3485,7 +3480,6 @@ impl DataStorage { self.mode, self.internal_persistent_id(), &self.object_pattern, - self.with_metadata, ) .map_err(|e| { PyIOError::new_err(format!("Failed to initialize CsvFilesystem reader: {e}")) diff --git a/src/timestamp.rs b/src/timestamp.rs index 54a24fb2..651a5cff 100644 --- a/src/timestamp.rs +++ b/src/timestamp.rs @@ -6,3 +6,10 @@ pub fn current_unix_timestamp_ms() -> u128 { .expect("Failed to get the current timestamp") .as_millis() } + +pub fn current_unix_timestamp_secs() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Failed to get the current timestamp") + .as_secs() +} diff --git a/tests/helpers.rs b/tests/helpers.rs index 985c9d5a..bf6b1a31 100644 --- a/tests/helpers.rs +++ b/tests/helpers.rs @@ -112,6 +112,7 @@ pub fn full_cycle_read( new_parsed_entries.push(event); } } + Entry::Realtime(ReadResult::FinishedSource { .. }) => continue, Entry::Realtime(ReadResult::NewSource(metadata)) => { parser.on_new_source_started(metadata.as_ref()); } @@ -174,6 +175,7 @@ pub fn read_data_from_reader( panic!("Unexpected erroneous reply: {parse_result:?}"); } } + ReadResult::FinishedSource { .. } => continue, ReadResult::NewSource(metadata) => parser.on_new_source_started(metadata.as_ref()), ReadResult::Finished => break, } @@ -249,6 +251,7 @@ pub fn data_parsing_fails( return Ok(true); } } + ReadResult::FinishedSource { .. } => continue, ReadResult::NewSource(metadata) => parser.on_new_source_started(metadata.as_ref()), ReadResult::Finished => break, } diff --git a/tests/test_bytes.rs b/tests/test_bytes.rs index fb18343f..608cf9dc 100644 --- a/tests/test_bytes.rs +++ b/tests/test_bytes.rs @@ -13,7 +13,6 @@ fn read_bytes_from_path(path: &str) -> eyre::Result> { None, ReadMethod::Full, "*", - false, )?; let mut parser = IdentityParser::new(vec!["data".to_string()], false); let mut events = Vec::new(); @@ -30,6 +29,7 @@ fn read_bytes_from_path(path: &str) -> eyre::Result> { } } ReadResult::Finished => break, + ReadResult::FinishedSource { .. } => continue, ReadResult::NewSource(_) => continue, } } diff --git a/tests/test_connector_field_defaults.rs b/tests/test_connector_field_defaults.rs index 964392e9..8f582bb4 100644 --- a/tests/test_connector_field_defaults.rs +++ b/tests/test_connector_field_defaults.rs @@ -30,7 +30,6 @@ fn test_dsv_with_default_end_of_line() -> eyre::Result<()> { ConnectorMode::Static, None, "*", - false, )?; let parser = DsvParser::new( DsvSettings::new( @@ -85,7 +84,6 @@ fn test_dsv_with_default_middle_of_line() -> eyre::Result<()> { ConnectorMode::Static, None, "*", - false, )?; let parser = DsvParser::new( DsvSettings::new( @@ -136,7 +134,6 @@ fn test_dsv_fails_without_default() -> eyre::Result<()> { ConnectorMode::Static, None, "*", - false, )?; let parser = DsvParser::new( DsvSettings::new( @@ -170,7 +167,6 @@ fn test_dsv_with_default_nullable() -> eyre::Result<()> { ConnectorMode::Static, None, "*", - false, )?; let parser = DsvParser::new( DsvSettings::new( @@ -215,7 +211,6 @@ fn test_jsonlines_fails_without_default() -> eyre::Result<()> { None, ReadMethod::ByLine, "*", - false, )?; let parser = JsonLinesParser::new( Some(vec!["a".to_string()]), @@ -244,7 +239,6 @@ fn test_jsonlines_with_default() -> eyre::Result<()> { None, ReadMethod::ByLine, "*", - false, )?; let parser = JsonLinesParser::new( Some(vec!["a".to_string()]), @@ -297,7 +291,6 @@ fn test_jsonlines_with_default_at_jsonpath() -> eyre::Result<()> { None, ReadMethod::ByLine, "*", - false, )?; let parser = JsonLinesParser::new( Some(vec!["a".to_string()]), @@ -344,7 +337,6 @@ fn test_jsonlines_explicit_null_not_overridden() -> eyre::Result<()> { None, ReadMethod::ByLine, "*", - false, )?; let parser = JsonLinesParser::new( Some(vec!["a".to_string()]), diff --git a/tests/test_debezium.rs b/tests/test_debezium.rs index be4dfdca..da3ddea1 100644 --- a/tests/test_debezium.rs +++ b/tests/test_debezium.rs @@ -20,7 +20,6 @@ fn test_debezium_reads_ok() -> eyre::Result<()> { None, ReadMethod::ByLine, "*", - false, )?; let parser = DebeziumMessageParser::new( Some(vec!["id".to_string()]), @@ -166,7 +165,6 @@ fn test_debezium_mongodb_format() -> eyre::Result<()> { None, ReadMethod::ByLine, "*", - false, )?; let parser = DebeziumMessageParser::new( Some(vec!["id".to_string()]), diff --git a/tests/test_dsv.rs b/tests/test_dsv.rs index 36917da9..14ba7bfc 100644 --- a/tests/test_dsv.rs +++ b/tests/test_dsv.rs @@ -21,7 +21,6 @@ fn test_dsv_read_ok() -> eyre::Result<()> { None, ReadMethod::ByLine, "*", - false, )?; let mut parser = DsvParser::new( DsvSettings::new(Some(vec!["a".to_string()]), vec!["b".to_string()], ','), @@ -65,7 +64,6 @@ fn test_dsv_column_does_not_exist() -> eyre::Result<()> { None, ReadMethod::ByLine, "*", - false, )?; let parser = DsvParser::new( DsvSettings::new(Some(vec!["a".to_string()]), vec!["c".to_string()], ','), @@ -89,7 +87,6 @@ fn test_dsv_rows_parsing_ignore_type() -> eyre::Result<()> { None, ReadMethod::ByLine, "*", - false, )?; let mut parser = DsvParser::new( DsvSettings::new(Some(vec!["a".to_string()]), vec!["b".to_string()], ','), @@ -126,7 +123,6 @@ fn test_dsv_not_enough_columns() -> eyre::Result<()> { None, ReadMethod::ByLine, "*", - false, )?; let mut parser = DsvParser::new( DsvSettings::new(Some(vec!["a".to_string()]), vec!["b".to_string()], ','), @@ -172,7 +168,6 @@ fn test_dsv_autogenerate_pkey() -> eyre::Result<()> { None, ReadMethod::ByLine, "*", - false, )?; let mut parser = DsvParser::new( DsvSettings::new(None, vec!["a".to_string(), "b".to_string()], ','), @@ -200,6 +195,7 @@ fn test_dsv_autogenerate_pkey() -> eyre::Result<()> { } } ReadResult::Finished => break, + ReadResult::FinishedSource { .. } => continue, ReadResult::NewSource(_) => continue, } } @@ -215,7 +211,6 @@ fn test_dsv_composite_pkey() -> eyre::Result<()> { None, ReadMethod::ByLine, "*", - false, )?; let mut parser = DsvParser::new( DsvSettings::new( @@ -246,6 +241,7 @@ fn test_dsv_composite_pkey() -> eyre::Result<()> { } } ReadResult::Finished => break, + ReadResult::FinishedSource { .. } => continue, ReadResult::NewSource(_) => continue, } } @@ -278,7 +274,6 @@ fn test_dsv_read_schema_ok() -> eyre::Result<()> { None, ReadMethod::ByLine, "*", - false, )?; let mut parser = DsvParser::new( DsvSettings::new( @@ -348,7 +343,6 @@ fn test_dsv_read_schema_nonparsable() -> eyre::Result<()> { None, ReadMethod::ByLine, "*", - false, )?; let mut parser = DsvParser::new( DsvSettings::new( diff --git a/tests/test_dsv_dir.rs b/tests/test_dsv_dir.rs index 3b96c694..d3102266 100644 --- a/tests/test_dsv_dir.rs +++ b/tests/test_dsv_dir.rs @@ -20,7 +20,6 @@ fn test_dsv_dir_ok() -> eyre::Result<()> { ConnectorMode::Static, None, "*", - false, )?; let parser = DsvParser::new( DsvSettings::new(Some(vec!["key".to_string()]), vec!["foo".to_string()], ','), @@ -54,7 +53,6 @@ fn test_single_file_ok() -> eyre::Result<()> { ConnectorMode::Static, None, "*", - false, )?; let parser = DsvParser::new( DsvSettings::new(Some(vec!["a".to_string()]), vec!["b".to_string()], ','), @@ -80,7 +78,6 @@ fn test_custom_delimiter() -> eyre::Result<()> { ConnectorMode::Static, None, "*", - false, )?; let parser = DsvParser::new( DsvSettings::new( @@ -108,7 +105,6 @@ fn test_escape_fields() -> eyre::Result<()> { ConnectorMode::Static, None, "*", - false, )?; let parser = DsvParser::new( DsvSettings::new( @@ -155,7 +151,6 @@ fn test_escape_newlines() -> eyre::Result<()> { ConnectorMode::Static, None, "*", - false, )?; let parser = DsvParser::new( DsvSettings::new( @@ -192,7 +187,6 @@ fn test_nonexistent_file() -> eyre::Result<()> { ConnectorMode::Static, None, "*", - false, ); assert!(reader.is_err()); @@ -210,7 +204,6 @@ fn test_special_fields() -> eyre::Result<()> { ConnectorMode::Static, None, "*", - false, )?; let parser = DsvParser::new( DsvSettings::new( diff --git a/tests/test_jsonlines.rs b/tests/test_jsonlines.rs index 5fd849a7..8f15b19e 100644 --- a/tests/test_jsonlines.rs +++ b/tests/test_jsonlines.rs @@ -18,7 +18,6 @@ fn test_jsonlines_ok() -> eyre::Result<()> { None, ReadMethod::ByLine, "*", - false, )?; let parser = JsonLinesParser::new( Some(vec!["a".to_string()]), @@ -58,7 +57,6 @@ fn test_jsonlines_incorrect_key() -> eyre::Result<()> { None, ReadMethod::ByLine, "*", - false, )?; let parser = JsonLinesParser::new( Some(vec!["a".to_string(), "d".to_string()]), @@ -85,7 +83,6 @@ fn test_jsonlines_incomplete_key_to_null() -> eyre::Result<()> { None, ReadMethod::ByLine, "*", - false, )?; let parser = JsonLinesParser::new( Some(vec!["a".to_string(), "d".to_string()]), @@ -109,7 +106,6 @@ fn test_jsonlines_incorrect_values() -> eyre::Result<()> { None, ReadMethod::ByLine, "*", - false, )?; let parser = JsonLinesParser::new( Some(vec!["a".to_string()]), @@ -136,7 +132,6 @@ fn test_jsonlines_types_parsing() -> eyre::Result<()> { None, ReadMethod::ByLine, "*", - false, )?; let parser = JsonLinesParser::new( Some(vec!["a".to_string()]), @@ -187,7 +182,6 @@ fn test_jsonlines_complex_paths() -> eyre::Result<()> { None, ReadMethod::ByLine, "*", - false, )?; let mut routes = HashMap::new(); @@ -244,7 +238,6 @@ fn test_jsonlines_complex_paths_error() -> eyre::Result<()> { None, ReadMethod::ByLine, "*", - false, )?; let mut routes = HashMap::new(); @@ -286,7 +279,6 @@ fn test_jsonlines_complex_path_ignore_errors() -> eyre::Result<()> { None, ReadMethod::ByLine, "*", - false, )?; let mut routes = HashMap::new(); @@ -325,7 +317,6 @@ fn test_jsonlines_incorrect_key_verbose_error() -> eyre::Result<()> { None, ReadMethod::ByLine, "*", - false, )?; let parser = JsonLinesParser::new( Some(vec!["a".to_string(), "d".to_string()]), @@ -355,7 +346,6 @@ fn test_jsonlines_incorrect_jsonpointer_verbose_error() -> eyre::Result<()> { None, ReadMethod::ByLine, "*", - false, )?; let parser = JsonLinesParser::new( Some(vec!["a".to_string(), "d".to_string()]), @@ -382,7 +372,6 @@ fn test_jsonlines_failed_to_parse_field() -> eyre::Result<()> { None, ReadMethod::ByLine, "*", - false, )?; let parser = JsonLinesParser::new( None, diff --git a/tests/test_metadata.rs b/tests/test_metadata.rs index 94d40412..0aa19c5a 100644 --- a/tests/test_metadata.rs +++ b/tests/test_metadata.rs @@ -34,7 +34,6 @@ fn test_metadata_fs_dir() -> eyre::Result<()> { None, ReadMethod::ByLine, "*", - true, )?; let parser = DsvParser::new( DsvSettings::new( @@ -65,7 +64,6 @@ fn test_metadata_fs_file() -> eyre::Result<()> { None, ReadMethod::ByLine, "*", - true, )?; let parser = DsvParser::new( DsvSettings::new( @@ -97,7 +95,6 @@ fn test_metadata_csv_dir() -> eyre::Result<()> { ConnectorMode::Static, None, "*", - true, )?; let parser = DsvParser::new( DsvSettings::new( @@ -131,7 +128,6 @@ fn test_metadata_csv_file() -> eyre::Result<()> { ConnectorMode::Static, None, "*", - true, )?; let parser = DsvParser::new( DsvSettings::new( @@ -160,7 +156,6 @@ fn test_metadata_json_file() -> eyre::Result<()> { None, ReadMethod::ByLine, "*", - true, )?; let parser = JsonLinesParser::new( None, @@ -184,7 +179,6 @@ fn test_metadata_json_dir() -> eyre::Result<()> { None, ReadMethod::ByLine, "*", - true, )?; let parser = JsonLinesParser::new( None, @@ -209,7 +203,6 @@ fn test_metadata_identity_file() -> eyre::Result<()> { None, ReadMethod::ByLine, "*", - true, )?; let parser = IdentityParser::new(vec!["data".to_string(), "_metadata".to_string()], false); @@ -227,7 +220,6 @@ fn test_metadata_identity_dir() -> eyre::Result<()> { None, ReadMethod::ByLine, "*", - true, )?; let parser = IdentityParser::new(vec!["data".to_string(), "_metadata".to_string()], false); diff --git a/tests/test_seek.rs b/tests/test_seek.rs index d2884ea2..64798126 100644 --- a/tests/test_seek.rs +++ b/tests/test_seek.rs @@ -32,7 +32,6 @@ fn csv_reader_parser_pair(input_path: &Path) -> (Box, Box (Box, Box