diff --git a/README.rst b/README.rst index f2a915b..a29161e 100644 --- a/README.rst +++ b/README.rst @@ -5,11 +5,14 @@ Introduction to *fluxus* ======================== -**FLUXUS** is a Python framework designed by `BCG X `_ to +*fluxus* is a Python framework designed by `BCG X `_ to streamline the development of complex data processing pipelines (called *flows*), enabling users to quickly and efficiently build, test, and deploy data workflows, making complex operations more manageable. +**FLUXUS** is inspired by the data stream paradigm and is designed to be simple, +expressive, and composable. + Introducing Flows ----------------- @@ -55,23 +58,23 @@ With *fluxus*, we can define this flow as follows: dict(greeting="Bonjour!"), ] - def lower(greeting: str) -> dict[str, str]: + def lower(greeting: str): # Convert the greeting to lowercase and keep track of the case change - return dict( + yield dict( greeting=greeting.lower(), case="lower", ) - def upper(greeting: str) -> dict[str, str]: + def upper(greeting: str): # Convert the greeting to uppercase and keep track of the case change - return dict( + yield dict( greeting=greeting.upper(), - tone="upper", + case="upper", ) - def annotate(greeting: str, case: str = "original") -> dict[str, str]: + def annotate(greeting: str, case: str = "original"): # Annotate the greeting with the case change; default to "original" - return dict(greeting=f"{greeting!r} ({case})") + yield dict(greeting=f"{greeting!r} ({case})") flow = ( step("input", input_data) # initial producer step @@ -123,12 +126,12 @@ This gives us the following output in :code:`result`: [ { 'input': {'greeting': 'Hello, World!'}, - 'upper': {'greeting': 'HELLO, WORLD!', 'tone': 'upper'}, + 'upper': {'greeting': 'HELLO, WORLD!', 'case': 'upper'}, 'annotate': {'greeting': "'HELLO, WORLD!' (original)"} }, { 'input': {'greeting': 'Bonjour!'}, - 'upper': {'greeting': 'BONJOUR!', 'tone': 'upper'}, + 'upper': {'greeting': 'BONJOUR!', 'case': 'upper'}, 'annotate': {'greeting': "'BONJOUR!' (original)"} } ], @@ -144,6 +147,11 @@ This gives us the following output in :code:`result`: ] ) +Or, as a *pandas* data frame by calling :code:`result.to_frame()`: + +.. image:: sphinx/source/_images/flow-hello-world-results.png + :alt: "Hello World" flow results + :width: 600px Here's what happened: The flow starts with a single input data item, which is then passed along three parallel paths. Each path applies different transformations to the @@ -158,8 +166,8 @@ The run result not only gives us the final product of the ``annotate`` step but inputs and intermediate products of the ``lower`` and ``upper`` steps. We refer to this extended view of the flow results as the *lineage* of the flow. -For a more thorough introduction to FLUXUS, please visit our `User Guide <#>`_ and -`Examples <#>`_! +For a more thorough introduction to FLUXUS, please visit our +`User Guide `_. Why *fluxus*? @@ -181,10 +189,9 @@ motivations for using *fluxus* include: - **Ease of Use**: *fluxus* provides a functional API that abstracts away the complexities of data processing, making it accessible to developers of all levels. More experienced users can also leverage the advanced features of its underlying - object-oriented implementation for customisation and optimisation (see - `Advanced Features <#>`_ for more details). - - + object-oriented implementation for additional customisation and versatility (see + `User Guide `_ for more + details). Concurrent Processing in *fluxus* --------------------------------- @@ -207,12 +214,15 @@ applications. Getting started =============== -- See the `FLUXUS Documentation <#>`_ for a comprehensive User Guide, Examples, - API reference, and more. -- See `Contributing `_ or visit our detailed `Contributor Guide <#>`_ +- See the + `FLUXUS Documentation `_ + for a comprehensive User Guide, API reference, and more. +- See `Contributing `_ or visit our detailed + `Contributor Guide `_ for information on contributing. -- We have an `FAQ <#>`_ for common questions. For anything else, please reach out to - ARTKIT@bcg.com. +- We have an `FAQ `_ for common + questions. For anything else, please reach out to + `artkit@bcg.com `_. User Installation @@ -266,7 +276,8 @@ or ``conda``: Contributing ------------ -Contributions to ARTKIT are welcome and appreciated! Please see the `Contributing `_ section for information. +Contributions to *fluxus* are welcome and appreciated! Please see the +`Contributing `_ section for information. License diff --git a/sphinx/make/conf_base.py b/sphinx/make/conf_base.py index 7f725e8..6d2c62b 100644 --- a/sphinx/make/conf_base.py +++ b/sphinx/make/conf_base.py @@ -47,6 +47,8 @@ def set_config( get_package_version(package_path=os.path.join(_dir_src, project)) ) + globals_["html_show_sourcelink"] = False + if html_logo: globals_["html_logo"] = html_logo globals_["latex_logo"] = html_logo diff --git a/sphinx/make/make_base.py b/sphinx/make/make_base.py index 1b48320..f7dfd87 100755 --- a/sphinx/make/make_base.py +++ b/sphinx/make/make_base.py @@ -45,7 +45,6 @@ ] assert len(PACKAGE_NAMES) == 1, "only one package per Sphinx build is supported" PROJECT_NAME = PACKAGE_NAMES[0] -EXCLUDE_MODULES = [] DIR_DOCS = os.path.join(DIR_REPO_ROOT, "docs") DIR_DOCS_VERSION = os.path.join(DIR_DOCS, "docs-version") DIR_SPHINX_SOURCE = os.path.join(DIR_SPHINX_ROOT, "source") @@ -211,15 +210,6 @@ def _run(self) -> None: check=True, ) - # remove rst file and directory for excluded modules - for module in EXCLUDE_MODULES: - rst_path = os.path.join( - DIR_SPHINX_API_GENERATED, PROJECT_NAME, f"{PROJECT_NAME}.{module}.rst" - ) - module_path = os.path.join(DIR_SPHINX_API_GENERATED, PROJECT_NAME, module) - os.remove(rst_path) - shutil.rmtree(module_path) - # Adjust the path and filename as per your project's structure api_doc_filename = os.path.join(DIR_SPHINX_API_GENERATED, f"{packages[0]}.rst") new_title = "API Reference" diff --git a/sphinx/source/_images/flow-hello-world-results.png b/sphinx/source/_images/flow-hello-world-results.png new file mode 100644 index 0000000..75fefbb Binary files /dev/null and b/sphinx/source/_images/flow-hello-world-results.png differ diff --git a/sphinx/source/user_guide/introduction_to_fluxus/building_a_flow.ipynb b/sphinx/source/user_guide/introduction_to_fluxus/building_a_flow.ipynb index 26bef1f..e5253b6 100644 --- a/sphinx/source/user_guide/introduction_to_fluxus/building_a_flow.ipynb +++ b/sphinx/source/user_guide/introduction_to_fluxus/building_a_flow.ipynb @@ -4,50 +4,864 @@ "cell_type": "markdown", "id": "563768a5-cf56-4ccd-9db7-cdfe2a117654", "metadata": {}, - "source": "# Building Your First FLUXUS Pipeline" + "source": "# Creating a Flow with custom classes for Producers, Transformers, and Consumers" }, { "cell_type": "markdown", "id": "1cc6cf47-6d2a-433d-a0ca-9abf4b7063d2", "metadata": {}, "source": [ - "## Introduction" + "## Introduction\n", + "\n", + "This notebook introduces the object-oriented API of *fluxus*. You will learn how to:\n", + "\n", + "- create your own classes for producers, transformers, and consumers\n", + "- compose these conduits using the `>>` operator for chaining and the `&` operator for concurrent execution\n", + "- build flows for asynchronous execution using the asynchronous versions of the classes" ] }, + { + "metadata": {}, + "cell_type": "markdown", + "source": [ + "## Overview\n", + "\n", + "Flows consist of building blocks that create, transform, or collect data items.\n", + "We call these data items *products*, and the building blocks *conduits*.\n", + "\n", + "There are three types of conduits:\n", + "\n", + "- **Producers**: These generate products.\n", + "- **Transformers**: These modify or process the products.\n", + "- **Consumers**: These collect the final products." + ], + "id": "2d94faabac2fca97" + }, { "cell_type": "markdown", "id": "e83b06ba", "metadata": {}, "source": [ - "This notebook introduces the basic building blocks for developing flows with FLUXUS. You will learn how to:\n", + "## Creating Conduit classes\n", + "\n", + "To build your own flow using the object-oriented API, you first need to define classes for the conduits in your flow (for an easier start with *fluxus*, consider first experimenting with the *functional* API where all products are dictionaries, and no custom classes are required).\n", "\n", - "- Build and run flows using the functional API\n", - "- Creating custom flow *conduits* using the underlying object-oriented API" + "All of these classes must be a subclass of one of `Producer`, `Transformer`, or `Consumer`. All of these classes are subclasses of `Conduit`.\n", + "\n", + "We will show examples for defining producers, transformers and consumers in the following sections, and will then use these to build and run a complete flow. " ] }, { "metadata": {}, "cell_type": "markdown", - "source": "", + "source": [ + "### Producers\n", + "\n", + "When creating a `Producer` subclass, you need to specify the type of products it generates using a type hint. Implement the `produce` method to yield products of this type.\n", + "\n", + "Imagine a scenario where the `Producer` generates a sequence of numbers:" + ], "id": "81866155cb418a38" }, { + "metadata": { + "ExecuteTime": { + "end_time": "2024-06-19T21:22:04.898322Z", + "start_time": "2024-06-19T21:22:04.280951Z" + } + }, + "cell_type": "code", + "source": [ + "from collections.abc import Iterator\n", + "from fluxus import Producer\n", + "from fluxus.core.transformer._transformer_base import (\n", + " T_SourceProduct_arg,\n", + " T_TransformedProduct_ret,\n", + ")\n", + "\n", + "\n", + "class NumberProducer(Producer[int]):\n", + "\n", + " def __init__(self, n: int) -> None:\n", + " self.n = n\n", + "\n", + " def produce(self) -> Iterator[int]:\n", + " yield from range(1, self.n + 1)" + ], + "id": "6bd146b4e6a45d8b", + "outputs": [], + "execution_count": 1 + }, + { + "metadata": {}, "cell_type": "markdown", - "id": "82774e01-0015-4d0f-9c35-de08e193391d", + "source": [ + "Note that the type hint for the `Producer` parent class matches the type hint for the iterator.\n", + "\n", + "Producers are *iterable* so we can easily test them by iterating over them:" + ], + "id": "60f43964ea37f121" + }, + { + "metadata": { + "ExecuteTime": { + "end_time": "2024-06-19T21:22:04.902237Z", + "start_time": "2024-06-19T21:22:04.899238Z" + } + }, + "cell_type": "code", + "source": "list(NumberProducer(5))", + "id": "887120876ea29b8c", + "outputs": [ + { + "data": { + "text/plain": [ + "[1, 2, 3, 4, 5]" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "execution_count": 2 + }, + { "metadata": {}, + "cell_type": "markdown", "source": [ - "## Functional API\n", + "### Transformers\n", "\n", - "First, we will import the " - ] + "When creating a `Transformer` subclass, you need to specify the input and output (product) types using type hints. Implement the `transform` method to process input products and yield transformed products of the specified output type.\n", + "\n", + "In our example, we will use `Transformer` classes to perform mathematical operations. We will create one transformer that doubles each number and another that calculates the square.\n" + ], + "id": "6bf91983cfd7e3b7" + }, + { + "metadata": { + "ExecuteTime": { + "end_time": "2024-06-19T21:22:04.905303Z", + "start_time": "2024-06-19T21:22:04.903002Z" + } + }, + "cell_type": "code", + "source": [ + "from fluxus import Transformer\n", + "\n", + "\n", + "class DoublingTransformer(Transformer[int, int]):\n", + " def transform(self, x: int) -> Iterator[int]:\n", + " yield x * 2\n", + "\n", + "\n", + "class SquareTransformer(Transformer[int, int]):\n", + " def transform(self, x: int) -> Iterator[int]:\n", + " yield x * x" + ], + "id": "33b35373302ff052", + "outputs": [], + "execution_count": 3 }, { "metadata": {}, + "cell_type": "markdown", + "source": "Once more, note that the type hints used for the `Transformer` base class match the input and output types of the `transform` method.", + "id": "503e3943168bc0dd" + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": [ + "### Consumers\n", + "\n", + "When creating a `Consumer` subclass, you need to specify the type of products it consumes and the type of output it generates using type hints (use `None` as the output type if there is no output).\n", + "Implement the `consume` method to handle the consumed products.\n", + "\n", + "The following `Consumer` collects all processed numbers into a list.\n", + "Note that the consumer method receives products as an iterator of tuples, where the first element is the index of the path taken through the flow and the second element is the product itself.\n", + "The index is useful when the flow branches into multiple paths, and you want to distinguish between the products generated by each path.\n", + "We will explore this in more detail further down." + ], + "id": "e29e5e86ffec71d4" + }, + { + "metadata": { + "ExecuteTime": { + "end_time": "2024-06-19T21:22:04.908615Z", + "start_time": "2024-06-19T21:22:04.906630Z" + } + }, "cell_type": "code", + "source": [ + "from fluxus import Consumer\n", + "\n", + "\n", + "class NumberConsumer(Consumer[int, list[int]]):\n", + " def consume(self, products: Iterator[tuple[int, int]]) -> list[int]:\n", + " # ignore the path index and return the numbers\n", + " return list(number for _, number in products)" + ], + "id": "2c8f3f63cf547f86", "outputs": [], - "execution_count": null, - "source": "", - "id": "23d1a6c6ac64d1f" + "execution_count": 4 + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": [ + "## Building a flow\n", + "\n", + "Now that we have defined the classes for the conduits, we can build a flow by chaining them together using the `>>` operator.\n", + "\n", + "In this example, we will generate numbers from 1 to 5, double then square them, and then collect the results in a list.\n", + "\n", + "The flow is constructed as follows:" + ], + "id": "2e2c756ec5705e28" + }, + { + "metadata": { + "ExecuteTime": { + "end_time": "2024-06-19T21:22:04.915943Z", + "start_time": "2024-06-19T21:22:04.909296Z" + } + }, + "cell_type": "code", + "source": [ + "flow = (\n", + " NumberProducer(n=5)\n", + " >> DoublingTransformer()\n", + " >> SquareTransformer()\n", + " >> NumberConsumer()\n", + ")" + ], + "id": "68d652742e6f0439", + "outputs": [], + "execution_count": 5 + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": [ + "The `>>` operator chains the conduits together, passing the product of one conduit as the input to the next.\n", + "\n", + "If *Graphviz* is installed, we can visualize the flow:" + ], + "id": "a47f496026270155" + }, + { + "metadata": { + "ExecuteTime": { + "end_time": "2024-06-19T21:22:05.635468Z", + "start_time": "2024-06-19T21:22:04.916525Z" + } + }, + "cell_type": "code", + "source": "flow", + "id": "dd15050a93607640", + "outputs": [ + { + "data": { + "text/plain": [ + "NumberProducer(n=5) >> DoublingTransformer() >> SquareTransformer() >> NumberConsumer()" + ], + "text/html": [ + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "Flow\n", + "\n", + "\n", + "\n", + "NumberProducer_4949763216\n", + "\n", + "NumberProducer\n", + "\n", + "n=5\n", + "\n", + "\n", + "\n", + "DoublingTransformer_4949758352\n", + "\n", + "DoublingTransformer\n", + "\n", + "\n", + "\n", + "NumberProducer_4949763216->DoublingTransformer_4949758352\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "SquareTransformer_4949873680\n", + "\n", + "SquareTransformer\n", + "\n", + "\n", + "\n", + "NumberConsumer_4949878224\n", + "\n", + "NumberConsumer\n", + "\n", + "\n", + "\n", + "SquareTransformer_4949873680->NumberConsumer_4949878224\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "DoublingTransformer_4949758352->SquareTransformer_4949873680\n", + "\n", + "\n", + "\n", + "\n", + "\n" + ], + "image/svg+xml": "\n\n\n\n\n\nFlow\n\n\n\nNumberProducer_4949763216\n\nNumberProducer\n\nn=5\n\n\n\nDoublingTransformer_4949758352\n\nDoublingTransformer\n\n\n\nNumberProducer_4949763216->DoublingTransformer_4949758352\n\n\n\n\n\nSquareTransformer_4949873680\n\nSquareTransformer\n\n\n\nNumberConsumer_4949878224\n\nNumberConsumer\n\n\n\nSquareTransformer_4949873680->NumberConsumer_4949878224\n\n\n\n\n\nDoublingTransformer_4949758352->SquareTransformer_4949873680\n\n\n\n\n\n" + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + } + ], + "execution_count": 6 + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": [ + "Note that the graph includes the `n` property of the `NumberProducer`.\n", + "All `__init__` parameters of a conduit that have a matching instance attribute will be included in representations of the flow, as is the case with the `n` parameter of the `NumberProducer`.\n", + "\n", + "More styles are available via the `style` argument of the `draw` method:" + ], + "id": "7533943b5f84a29e" + }, + { + "metadata": { + "ExecuteTime": { + "end_time": "2024-06-19T21:22:05.769310Z", + "start_time": "2024-06-19T21:22:05.636444Z" + } + }, + "cell_type": "code", + "source": "flow.draw(style=\"graph_dark\")", + "id": "26be42c28dfec061", + "outputs": [ + { + "data": { + "image/svg+xml": "\n\n\n\n\n\nFlow\n\n\n\nNumberProducer_4949763216\n\nNumberProducer\n\nn=5\n\n\n\nDoublingTransformer_4949758352\n\nDoublingTransformer\n\n\n\nNumberProducer_4949763216->DoublingTransformer_4949758352\n\n\n\n\n\nSquareTransformer_4949873680\n\nSquareTransformer\n\n\n\nNumberConsumer_4949878224\n\nNumberConsumer\n\n\n\nSquareTransformer_4949873680->NumberConsumer_4949878224\n\n\n\n\n\nDoublingTransformer_4949758352->SquareTransformer_4949873680\n\n\n\n\n\n", + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "execution_count": 7 + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": "We can now run the flow:", + "id": "f783585118c6328d" + }, + { + "metadata": { + "ExecuteTime": { + "end_time": "2024-06-19T21:22:05.773263Z", + "start_time": "2024-06-19T21:22:05.770433Z" + } + }, + "cell_type": "code", + "source": "flow.run()", + "id": "c720b2d0966ba764", + "outputs": [ + { + "data": { + "text/plain": [ + "[4, 16, 36, 64, 100]" + ] + }, + "execution_count": 8, + "metadata": {}, + "output_type": "execute_result" + } + ], + "execution_count": 8 + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": [ + "## Parallel flows\n", + "\n", + "In many cases, you will want to run multiple conduits concurrently.\n", + "This can be achieved using the `&` operator.\n", + "\n", + "In the following example, we will generate numbers from 1 to 5.\n", + "For each of these numbers, we will once calculate the square and once the doubled value, in parallel.\n", + "We will then collect the results in a list.\n", + "\n", + "The flow is constructed as follows:" + ], + "id": "a6b4aad1b8958615" + }, + { + "metadata": { + "ExecuteTime": { + "end_time": "2024-06-19T21:22:05.776982Z", + "start_time": "2024-06-19T21:22:05.774048Z" + } + }, + "cell_type": "code", + "source": [ + "flow = (\n", + " NumberProducer(n=5)\n", + " >> (DoublingTransformer() & SquareTransformer())\n", + " >> NumberConsumer()\n", + ")" + ], + "id": "2a55c4a05dbe270f", + "outputs": [], + "execution_count": 9 + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": "The flow looks like this:", + "id": "b3f040a960b50301" + }, + { + "metadata": { + "ExecuteTime": { + "end_time": "2024-06-19T21:22:06.041985Z", + "start_time": "2024-06-19T21:22:05.779003Z" + } + }, + "cell_type": "code", + "source": "flow", + "id": "888cc8cd57025b6f", + "outputs": [ + { + "data": { + "text/plain": [ + "NumberProducer(n=5) >> (DoublingTransformer() & SquareTransformer()) >> NumberConsumer()" + ], + "text/html": [ + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "Flow\n", + "\n", + "\n", + "\n", + "SquareTransformer_4970669328\n", + "\n", + "SquareTransformer\n", + "\n", + "\n", + "\n", + "NumberConsumer_4968685840\n", + "\n", + "NumberConsumer\n", + "\n", + "\n", + "\n", + "SquareTransformer_4970669328->NumberConsumer_4968685840\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "NumberProducer_4970669648\n", + "\n", + "NumberProducer\n", + "\n", + "n=5\n", + "\n", + "\n", + "\n", + "NumberProducer_4970669648->SquareTransformer_4970669328\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "DoublingTransformer_4970669584\n", + "\n", + "DoublingTransformer\n", + "\n", + "\n", + "\n", + "NumberProducer_4970669648->DoublingTransformer_4970669584\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "DoublingTransformer_4970669584->NumberConsumer_4968685840\n", + "\n", + "\n", + "\n", + "\n", + "\n" + ], + "image/svg+xml": "\n\n\n\n\n\nFlow\n\n\n\nSquareTransformer_4970669328\n\nSquareTransformer\n\n\n\nNumberConsumer_4968685840\n\nNumberConsumer\n\n\n\nSquareTransformer_4970669328->NumberConsumer_4968685840\n\n\n\n\n\nNumberProducer_4970669648\n\nNumberProducer\n\nn=5\n\n\n\nNumberProducer_4970669648->SquareTransformer_4970669328\n\n\n\n\n\nDoublingTransformer_4970669584\n\nDoublingTransformer\n\n\n\nNumberProducer_4970669648->DoublingTransformer_4970669584\n\n\n\n\n\nDoublingTransformer_4970669584->NumberConsumer_4968685840\n\n\n\n\n\n" + }, + "execution_count": 10, + "metadata": {}, + "output_type": "execute_result" + } + ], + "execution_count": 10 + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": "We run the flow as before:", + "id": "c88aa963ca364577" + }, + { + "metadata": { + "ExecuteTime": { + "end_time": "2024-06-19T21:22:06.045920Z", + "start_time": "2024-06-19T21:22:06.042854Z" + } + }, + "cell_type": "code", + "source": "flow.run()", + "id": "8dfc5d6abac05e43", + "outputs": [ + { + "data": { + "text/plain": [ + "[2, 4, 6, 8, 10, 1, 4, 9, 16, 25]" + ] + }, + "execution_count": 11, + "metadata": {}, + "output_type": "execute_result" + } + ], + "execution_count": 11 + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": [ + "### The `Passthrough` transformer\n", + "\n", + "In the previous example, we used the `&` operator to run two transformers in parallel.\n", + "A special `Passthrough` transformer can be included in a parallel composition of conduits to pass the input product through without modification.\n", + "\n", + "Applying the `Passthrough` transformer to the previous example, we get:" + ], + "id": "32cdc89988b72e98" + }, + { + "metadata": { + "ExecuteTime": { + "end_time": "2024-06-19T21:22:06.050259Z", + "start_time": "2024-06-19T21:22:06.046637Z" + } + }, + "cell_type": "code", + "source": [ + "from fluxus import Passthrough\n", + "\n", + "flow_passthrough = (\n", + " NumberProducer(n=5)\n", + " >> (DoublingTransformer() & SquareTransformer() & Passthrough())\n", + " >> NumberConsumer()\n", + ")" + ], + "id": "f38e1febf145ff2a", + "outputs": [], + "execution_count": 12 + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": "The flow looks like this:", + "id": "d6610c3b17504af2" + }, + { + "metadata": { + "ExecuteTime": { + "end_time": "2024-06-19T21:22:06.316111Z", + "start_time": "2024-06-19T21:22:06.051045Z" + } + }, + "cell_type": "code", + "source": "flow_passthrough", + "id": "798edb45c7fd9a4e", + "outputs": [ + { + "data": { + "text/plain": [ + "NumberProducer(n=5) >> (DoublingTransformer() & SquareTransformer() & Passthrough()) >> NumberConsumer()" + ], + "text/html": [ + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "Flow\n", + "\n", + "\n", + "\n", + "NumberConsumer_4959518928\n", + "\n", + "NumberConsumer\n", + "\n", + "\n", + "\n", + "DoublingTransformer_4963709264\n", + "\n", + "DoublingTransformer\n", + "\n", + "\n", + "\n", + "DoublingTransformer_4963709264->NumberConsumer_4959518928\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "SquareTransformer_4963698128\n", + "\n", + "SquareTransformer\n", + "\n", + "\n", + "\n", + "SquareTransformer_4963698128->NumberConsumer_4959518928\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "NumberProducer_4959444688\n", + "\n", + "NumberProducer\n", + "\n", + "n=5\n", + "\n", + "\n", + "\n", + "NumberProducer_4959444688->NumberConsumer_4959518928\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "NumberProducer_4959444688->DoublingTransformer_4963709264\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "NumberProducer_4959444688->SquareTransformer_4963698128\n", + "\n", + "\n", + "\n", + "\n", + "\n" + ], + "image/svg+xml": "\n\n\n\n\n\nFlow\n\n\n\nNumberConsumer_4959518928\n\nNumberConsumer\n\n\n\nDoublingTransformer_4963709264\n\nDoublingTransformer\n\n\n\nDoublingTransformer_4963709264->NumberConsumer_4959518928\n\n\n\n\n\nSquareTransformer_4963698128\n\nSquareTransformer\n\n\n\nSquareTransformer_4963698128->NumberConsumer_4959518928\n\n\n\n\n\nNumberProducer_4959444688\n\nNumberProducer\n\nn=5\n\n\n\nNumberProducer_4959444688->NumberConsumer_4959518928\n\n\n\n\n\nNumberProducer_4959444688->DoublingTransformer_4963709264\n\n\n\n\n\nNumberProducer_4959444688->SquareTransformer_4963698128\n\n\n\n\n\n" + }, + "execution_count": 13, + "metadata": {}, + "output_type": "execute_result" + } + ], + "execution_count": 13 + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": "Running the flow:", + "id": "550c8f799e0fb5d0" + }, + { + "metadata": { + "ExecuteTime": { + "end_time": "2024-06-19T21:22:06.320443Z", + "start_time": "2024-06-19T21:22:06.317005Z" + } + }, + "cell_type": "code", + "source": "flow_passthrough.run()", + "id": "e3c441f09673794d", + "outputs": [ + { + "data": { + "text/plain": [ + "[2, 4, 6, 8, 10, 1, 4, 9, 16, 25, 1, 2, 3, 4, 5]" + ] + }, + "execution_count": 14, + "metadata": {}, + "output_type": "execute_result" + } + ], + "execution_count": 14 + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": [ + "## Asynchronous Flows\n", + "\n", + "The real power of the object-oriented API is that it allows you to run conduits concurrently using Python's asynchronous capabilities.\n", + "To do this, you need to use the `async` versions of the producer, transformer, and consumer classes:\n", + "`AsyncProducer`, `AsyncTransformer`, and `AsyncConsumer`.\n", + "\n", + "Instead of implementing the `produce`, `transform`, and `consume` methods, you need to implement the `aproduce`, `atransform`, and `aconsume` methods, respectively. Calls to the original methods will create an event loop and will defer the execution of the asynchronous methods.\n", + "\n", + "In the following example, we will run the same flow as before, but this time we will use the asynchronous versions of the classes." + ], + "id": "91b81bf1e5cec1ef" + }, + { + "metadata": { + "ExecuteTime": { + "end_time": "2024-06-19T21:22:06.326052Z", + "start_time": "2024-06-19T21:22:06.321268Z" + } + }, + "cell_type": "code", + "source": [ + "import random\n", + "import asyncio\n", + "from collections.abc import AsyncIterator\n", + "from fluxus import AsyncProducer, AsyncTransformer, AsyncConsumer\n", + "\n", + "\n", + "class AsyncNumberProducer(AsyncProducer[int]):\n", + "\n", + " def __init__(self, n: int) -> None:\n", + " self.n = n\n", + "\n", + " async def aproduce(self) -> AsyncIterator[int]:\n", + " for i in range(1, self.n + 1):\n", + " yield i\n", + "\n", + "\n", + "class AsyncDoublingTransformer(AsyncTransformer[int, int]):\n", + "\n", + " async def atransform(self, x: int) -> AsyncIterator[int]:\n", + " yield x * 2\n", + "\n", + "\n", + "class AsyncSquareTransformer(AsyncTransformer[int, int]):\n", + "\n", + " async def atransform(self, x: int) -> AsyncIterator[int]:\n", + " yield x * x\n", + "\n", + "\n", + "class AsyncNumberConsumer(AsyncConsumer[int, list[int]]):\n", + "\n", + " async def aconsume(self, products: AsyncIterator[tuple[int, int]]) -> list[int]:\n", + " return [number async for _, number in products]\n", + "\n", + "\n", + "flow_async = (\n", + " AsyncNumberProducer(n=5)\n", + " >> AsyncDoublingTransformer()\n", + " >> AsyncSquareTransformer()\n", + " >> AsyncNumberConsumer()\n", + ")" + ], + "id": "abb809f373cb674c", + "outputs": [], + "execution_count": 15 + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": "If you are already inside an event loop (as is the case in Jupyter notebooks), you can run the asynchronous flow using the asynchronous `arun` method:", + "id": "bba3fe84fb9b0986" + }, + { + "metadata": { + "ExecuteTime": { + "end_time": "2024-06-19T21:22:06.330618Z", + "start_time": "2024-06-19T21:22:06.326737Z" + } + }, + "cell_type": "code", + "source": "await flow_async.arun()", + "id": "fd543db281a38cc9", + "outputs": [ + { + "data": { + "text/plain": [ + "[4, 16, 36, 64, 100]" + ] + }, + "execution_count": 16, + "metadata": {}, + "output_type": "execute_result" + } + ], + "execution_count": 16 + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": "Alternatively, calling the synchronous `run` method on an asynchronous flow will create a new event loop behind the scenes and run the asynchronous methods from there:", + "id": "3061ff1a5cb46fbb" + }, + { + "metadata": { + "ExecuteTime": { + "end_time": "2024-06-19T21:22:06.336128Z", + "start_time": "2024-06-19T21:22:06.331380Z" + } + }, + "cell_type": "code", + "source": "flow_async.run()", + "id": "43700c5ec95f41ea", + "outputs": [ + { + "data": { + "text/plain": [ + "[4, 16, 36, 64, 100]" + ] + }, + "execution_count": 17, + "metadata": {}, + "output_type": "execute_result" + } + ], + "execution_count": 17 + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": "While the above is a toy example which does not benefit from the asynchronous capabilities, asynchronous flows will bring dramatic performance improvements when dealing with I/O-bound tasks, such as calls to remote APIs or database queries.", + "id": "955521c0bf01b2c7" }, { "cell_type": "markdown", @@ -56,6 +870,17 @@ "source": [ "## Concluding remarks" ] + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": [ + "In this notebook, we have demonstrated how to create custom classes for producers, transformers, and consumers.\n", + "We have built a flow by chaining these conduits together using the `>>` operator.\n", + "We have also shown how to run multiple conduits concurrently using the `&` operator, and have introduced the `Passthrough` transformer, which can be used to pass the input product through without modification.\n", + "Finally, we have demonstrated how to create and run asynchronous flows using the asynchronous versions of the classes." + ], + "id": "6cd1e5ac9c28c054" } ], "metadata": { diff --git a/src/fluxus/_producer.py b/src/fluxus/_producer.py index ab8d81b..ee96219 100644 --- a/src/fluxus/_producer.py +++ b/src/fluxus/_producer.py @@ -79,11 +79,11 @@ def produce(self) -> Iterator[T_Product_ret]: Generate new products, optionally using an existing producer as input. This method is implemented for compatibility with synchronous code, but - preferably, :meth:`.aiter` should be used instead and called from within an + preferably, :meth:`.aproduce` should be used instead and called from within an event loop. When called from outside an event loop, this method will create an event loop - using :meth:`arun`, collect the products from :meth:`aiter` and block the + using :meth:`arun`, collect the products from :meth:`aproduce` and block the current thread until the iteration is complete. The products will then be returned as a list. diff --git a/src/fluxus/core/producer/_producer_base.py b/src/fluxus/core/producer/_producer_base.py index 710a89e..e2e843f 100644 --- a/src/fluxus/core/producer/_producer_base.py +++ b/src/fluxus/core/producer/_producer_base.py @@ -139,7 +139,7 @@ async def aproduce(self) -> AsyncIterator[T_Product_ret]: """ Generate new products asynchronously. - By default, defers to the synchronous variant, :meth:`.iter`. + By default, defers to the synchronous variant, :meth:`.produce`. :return: the new products """ diff --git a/src/fluxus/functional/__init__.py b/src/fluxus/functional/__init__.py index 007ea5c..16cbf3d 100644 --- a/src/fluxus/functional/__init__.py +++ b/src/fluxus/functional/__init__.py @@ -1,5 +1,75 @@ """ Functional API for the flow module. + +The :mod:`artkit.flow` module provides a functional API for defining and executing +complex and asynchronous workflows. The API is inspired by the functional programming +paradigm and is designed to be simple, expressive, and composable. + +The functions :func:`step`, :func:`chain`, and :func:`parallel` are used to +define the steps of a flow. Function :func:`step` defines a single step, function +:func:`chain` composes steps or flows sequentially, and the :func:`parallel` composes +steps or flows in parallel. + +Alternatively, the ``>>`` operator can be used to chain steps or flows, and the +``&`` operator can be used to compose steps or flows in parallel. + +Function :func:`passthrough` can be included in a parallel composition to pass the +unchanged input through to the next step. + +Function :func:`run` is used to execute a flow. The flow is executed asynchronously, +and all results are returned as a :class:`RunResult` instance. Run results include +the input to the flow as well as all intermediate and final results along all paths +through the flow. + +Example: + +.. code-block:: python + + from artkit.flow import step, chain, parallel, passthrough, run + + def add_one(x) -> dict[str, Any]: + # We can return a single dict + return dict(x=x + 1) + + async def add_two(x) -> dict[str, Any]: + # We can make the function asynchronous + return dict(x=x + 2) + + async def add_three_or_five(x) -> AsyncIterator[dict[str, Any]]: + # We can return an iterator to generate multiple values + yield dict(x=x + 3) + + flow = chain( + step("add_one", add_one), + parallel( + step("add_two", add_two), + step("add_three", add_three), + passthrough(), + ), + ) + + result = run(flow, input=[dict(x=1), dict(x=10)]) + + print(result) + +This will output: + +.. code-block:: python + + RunResult( + [ + {'input': {'x': 1}, 'add_one': {'x': 2}, 'add_two': {'x': 4}}, + {'input': {'x': 10}, 'add_one': {'x': 11}, 'add_two': {'x': 13}} + ], + [ + {'input': {'x': 1}, 'add_one': {'x': 2}, 'add_three': {'x': 5}}, + {'input': {'x': 10}, 'add_one': {'x': 11}, 'add_three': {'x': 14}} + ], + [ + {'input': {'x': 1}, 'add_one': {'x': 2}}, + {'input': {'x': 10}, 'add_one': {'x': 11}} + ] + ) """ from ._functions import * diff --git a/src/fluxus/functional/_functions.py b/src/fluxus/functional/_functions.py index e10e77e..be53603 100644 --- a/src/fluxus/functional/_functions.py +++ b/src/fluxus/functional/_functions.py @@ -614,9 +614,10 @@ def run( """ Run the given steps. - If the given steps do not include a leading :func:`input`, then the input - can be provided as an additional argument. If the flow requires an input but none - is provided, then the flow will be run with an empty dictionary as input. + If the given steps do not include a leading `producer`` step (i.e. a step that takes + no parameters), then the input can be provided as an additional argument. + If the flow requires an input but none is provided, then the flow will be run with + an empty dictionary as input. See :class:`.RunResult` for details on the output format. diff --git a/src/fluxus/functional/_result.py b/src/fluxus/functional/_result.py index 2ce4cc2..62a9e81 100644 --- a/src/fluxus/functional/_result.py +++ b/src/fluxus/functional/_result.py @@ -2,8 +2,6 @@ Implementation of ``RunResult``. """ -from __future__ import annotations - import logging from collections.abc import Iterable, Iterator, Mapping, Sequence from types import NoneType @@ -296,7 +294,7 @@ def _dicts_to_frame( def _dict_to_series( d: Mapping[str, Any], *, simplify: bool, max_levels: int | None = None -) -> pd.Series[Any]: +) -> pd.Series: # type: ignore[type-arg] """ Convert a dictionary to a Series, using the keys as index labels. @@ -341,7 +339,9 @@ def _flatten( # We are at the last level of the index and will stop flattening yield new_key, _simplify_complex_types(v) if simplify else v - sr: pd.Series[Any] = pd.Series(dict(_flatten(d, max_levels or 0))) + sr: pd.Series = ( # type: ignore[type-arg] + pd.Series(dict(_flatten(d, max_levels or 0))) + ) return sr diff --git a/test/fluxus_test/test_functional.py b/test/fluxus_test/test_functional.py index d351230..063a056 100644 --- a/test/fluxus_test/test_functional.py +++ b/test/fluxus_test/test_functional.py @@ -520,8 +520,12 @@ def test_timestamps() -> None: ) # Get the start and end columns - col_start_time: pd.Series[pd.Float64Dtype] = df.loc[:, DictProduct.KEY_START_TIME] - col_end_time: pd.Series[pd.Float64Dtype] = df.loc[:, DictProduct.KEY_END_TIME] + col_start_time: pd.Series = ( # type: ignore[type-arg] + df.loc[:, DictProduct.KEY_START_TIME] + ) + col_end_time: pd.Series = ( # type: ignore[type-arg] + df.loc[:, DictProduct.KEY_END_TIME] + ) # Confirm the dtypes of the start and end time columns are a float64 assert col_start_time.dtype == pd.Float64Dtype()