Skip to content

Commit

Permalink
Readme
Browse files Browse the repository at this point in the history
  • Loading branch information
antonmi committed Jan 15, 2024
1 parent 11d42dd commit ea886b6
Show file tree
Hide file tree
Showing 12 changed files with 5,067 additions and 5,080 deletions.
307 changes: 131 additions & 176 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,232 +4,187 @@

### Strom provides a set of abstractions for creating, routing and modifying streams of data.

## Notation
<img src="images/components.png" alt="Implicit components" width="800"/>

<img src="images/transformer.png" width="60"/>
In the "mermaid" notation, I suggest the following shapes:
- circles for a sink and a source.
- diamonds for a mixer and a splitter.
- simple rectangle for a transformer.
- rounded rectangle for a composite.

See the example below.
```mermaid
graph TD;
A(<img src='https://raw.githubusercontent.com/antonmi/Strom/update_readme/images/transformer.png' width='60'/>)-->B;
A-->C;
B["This ❤ Unicode"]-->D;
C-->D;
graph LR;
source1(("source1")) --> mixer{{"mixer"}}
source2(("source2")) --> mixer{{"mixer"}}
mixer{{"mixer"}} --> transformer["transformer"]
transformer["transformer"] --> composite(["composite"])
composite(["composite"]) --> splitter{{"splitter"}}
splitter{{"splitter"}} --> sink1(("sink1"))
splitter{{"splitter"}} --> sink2(("sink2"))
```

## Example
### The problem
There are two streams of data. One have to sum pairs of numbers from each stream respectively,
then produce two steams: one with the odd numbers, another with the even ones.

### Data
The data abstractions are:

#### Event
Any piece of data - number, string, list, map, struct, etc.

#### Stream
A sequence (can be infinite) of events made available over time.

See [Elixir Stream](https://hexdocs.pm/elixir/1.15/Stream.html).

#### Flow
Flow - is a named set of streams.
### Solution
The flow chart for possible solution:
```mermaid
graph LR;
source1(("numbers1")) --> round_robin(["round-robin mixer"])
source2(("numbers1")) --> round_robin(["round-robin mixer"])
round_robin(["round-robin-mixer"]) --> sum["sum pairs"]
sum["sum pairs"] --> spitter{{"split odd-even"}}
spitter{{"split odd-even"}} --> sink_odd(("puts odd"))
spitter{{"split odd-even"}} --> sink_even(("puts even"))
```
#### The "flow" data-structure
Strom components operates with "flow" - a named set of streams. It's a map with streams as values and their names as keys:

For example:
```elixir
flow = %{stream1: Stream.cycle([1, 2, 3]), stream2: ["a", "b", "c"]}
flow = %{
stream1: Stream.cycle([1, 2, 3]),
stream2: ["a", "b", "c"]
}
```
Flow can be empty - `%{}`.

A source adds a new stream to flow. A sink runs the stream of given name and removes it from flow.

### Components
There are several operators (functions) that can be applied to flows.
Each operator accept flow as input and return a modified flow.
A mixer mixes several streams into one. A splitter does the opposite.

A transformer modifies a stream (or streams).

#### Source (source)
Adds a stream of "external data" to a flow.
```elixir
%{} -> source(Src, :foo) -> %{foo: sfoo}
%{bar: Sbar} -> source(Src, :foo) -> %{foo: sfoo, bar: sbar}
```

#### Sink (sink)
Writes a stream data back to somewhere.
#### Components
The origins for sources here will be just simple lists of numbers.
See [sources](https://github.com/antonmi/Strom/blob/main/lib/source/) for other examples of sources. It's easy to implement your own source.
```elixir
%{foo: sfoo} -> sink(Snk, :foo) -> %{}
%{foo: sfoo, bar: sbar} -> sink(Snk, :foo) -> %{bar: sbar}
source1 = Strom.Source.new(:numbers1, [1, 2, 3, 4, 5])
source2 = Strom.Source.new(:numbers2, [10, 20, 30, 40, 50])
```

#### Mixer (mix)
Mixes several streams.
Sinks will use simple IOPuts origin. See more examples here: [sinks](https://github.com/antonmi/Strom/blob/main/lib/sink/)
```elixir
%{foo: sfoo, bar: sbar} -> mix([:foo, :bar], :mixed) -> %{mixed: smixed}
```
origin_odd = Strom.Sink.IOPuts.new("odd: ")
sink_odd = Strom.Sink.new(:odd, origin_odd)

#### Splitter (split)
Split a stream into several streams.
```elixir
%{foo: sfoo} -> split(:foo, [:bar, :baz]) -> %{bar: sbar, baz: sbaz}
origin_even = Strom.Sink.IOPuts.new("even: ")
sink_even = Strom.Sink.new(:even, origin_even)
```

#### Transformer (transform)
Applies a function to each event of a stream or streams.
Now comes a tricky part - the round-robin mixer. It's a composite component that has four components inside:

```elixir
%{foo: sfoo, bar: sbar} -> transform(:foo, F) -> %{foo: F(sfoo)}
%{foo: sfoo, bar: sbar} -> transform([:foo, :bar], F) -> %{foo: F(sfoo), bar: F(sbar}
```mermaid
graph LR;
add_label1["add label :first"] --> mixer{{"mix"}}
add_label2["add label :second"] --> mixer{{"mix"}}
mixer{{"mix"}} --> emit_when_have_both["emit when have both"]
```

A function gets an event as input and must return a modified event.
So, it's the map operation. Think about &Stream.map/2, which is used under the hood.


### Symbolic representation



### Implementation details and interface

Under the hood, each operation is performed inside "components".
Component is a separate process - GenServer.

A component can be:
- build - `new/2`, `new/3`
- started - `start/1`
- stopped - `stop/1`
- and called - `call/2`

#### Example
Let's say one wants to stream a file:
The round-robin mixer first adds labels to each event in order to now from which stream comes a number. Then it mixes streams.
The last transformer will wait until it has numbers from both streams and then emit a pair of events.

```elixir
alias Strom.Source
alias Strom.Source.ReadLines

source =
:lines
|> Source.new(ReadLines.new("input.txt"))
|> Source.start()

%{lines: stream} = Source.call(%{}, source)
# adds the :lines stream to the empty flow (%{})

Enum.to_list(stream)
# runs the stream and returns a list of strings

Source.stop(source)
# stops the source process
```

Here the `Strom.Source.ReadLines` module is used to read line from file.

To specify a custom source, one can implement a module with the `Strom.Source` behaviour.

Strom provides a couple of simple sources, see [sources](https://github.com/antonmi/Strom/blob/main/lib/source/).

The same for sinks.
defmodule RoundRobinMixer do
alias Strom.{Mixer, Transformer}

def add_label(event, label) do
{[{event, label}], label}
end

def call({number, label}, acc) do
[another] = Enum.reject(Map.keys(acc), &(&1 == label))

case Map.fetch!(acc, another) do
[hd | tl] ->
{[hd, number], Map.put(acc, another, tl)}

[] ->
numbers = Map.fetch!(acc, label)
{[], Map.put(acc, label, numbers ++ [number])}
end
end

def components() do
[
Transformer.new(:first, &__MODULE__.add_label/2, :first),
Transformer.new(:second, &__MODULE__.add_label/2, :second),
Mixer.new([:first, :second], :numbers),
Transformer.new(:numbers, &__MODULE__.call/2, %{first: [], second: []})
]
end
end

Then, for example, one wants to split the stream into two streams, one with short lines, another - with long ones:
round_robin = Strom.Composite.new(RoundRobinMixer.components())
```

The "sum pairs" transformer is simple. It will save first number in accumulator and waits the second one to produce the sum.
```elixir
alias Strom.Splitter

parts = %{
long: &(String.length(&1) > 3),
short: &(String.length(&1) <= 3)
}

splitter =
:lines
|> Splitter.new(parts)
|> Splitter.start()

stream = ["Hey", "World"]

%{long: long, short: short} = Splitter.call(%{lines: stream}, splitter)
function = fn number, acc ->
if acc do
{[number + acc], nil}
else
{[], number}
end
end

# Splits the :lines stream into the :long and :short streams based on rules defined in parts
sum_pairs = Strom.Transformer.new(:numbers, function, nil)
```

And then, one wants to save the streams into two files:
The splitter will split the `:numbers` stream into two streams: `:odd` and `:even`

```elixir
alias Strom.Sink
alias Strom.Sink.WriteLines

sink_short =
:short
|> Sink.new(WriteLines.new("short.txt"))
|> Sink.start()

sink_long =
:long
|> Sink.new(%WriteLines{path: "long.txt"})
|> Sink.start()

%{} =
%{long: long, short: short}
|> Sink.call(sink_short)
|> Sink.call(sink_long)
splitter = Strom.Splitter.new(:numbers, %{odd: &(rem(&1, 2) == 1), even: &(rem(&1, 2) == 1)})
```

#### Transformer
With the Function component everything is straightforward.
Let's calculate the length of each string and produce a stream of numbers:

```elixir
alias Strom.Transformer

function = &String.length(&1)

transformer =
:stream
|> Transformer.new(function)
|> Transformer.start()

%{stream: stream} = Transformer.call(%{stream: ["Hey", "World"]} , transformer)
Ok, it's almost done. One thing that you may have noticed - the sources produces `:numbers1` and `:number2` streams.
However the round-robin composite operates with the `:first` and `:second` streams. One should simple rename the streams in flow.

# now the stream is the stream of numbers
```

The function can be applied to several steams simultaneously:
For consistency there is the `Renamer` component:

```elixir
transformer =
[:short, :long]
|> Transformer.new(function)
|> Transformer.start()

flow = %{short: ["Hey"], long: ["World"]}
%{short: short, long: long} = Transformer.call(flow, transformer)
renamer = Strom.Renamer.new(%{numbers1: :first, numbers2: :second})
```

Transformer can operate 2-arity functions with accumulator.

The function must return 2-elements tuple.
Ok. Now we are ready to combine all the components. There will be another composite.
```elixir
{list(event), acc}
final_composite = [
source1,
source2,
renamer,
round_robin,
sum_pairs,
splitter,
sink_odd,
sink_even
] |> Strom.Composite.new()
```
The first element is a list of events that will be returned from the component.
The second is a new accumulator.

Now, just start it and call on an empty flow:
```elixir
alias Strom.Transformer

function = fn event, acc ->
{[event * acc, :new_event], acc + 1}
end

transformer = :events |> Transformer.new(function, 0) |> Transformer.start()

%{events: stream} = Transformer.call(%{events: [1, 2, 3]}, transformer)

Enum.to_list(stream)
# returns
[0, :new_event, 2, :new_event, 6, :new_event]
final_composite = Strom.Composite.start(final_composite)
Strom.Composite.call(%{}, final_composite)
```

### Strom.Composite and Strom.DSL
Add see smth like that in console:
```shell
iex(18)> Strom.Composite.call(%{}, final_composite)
%{}
even: 11
odd: 11
even: 33
odd: 33
even: 55
odd: 55
```

Since the operations have a similar interface and behaviour, it's possible to compose them.
## More info:

See [composite_test.exs](https://github.com/antonmi/Strom/blob/main/test/composite_test.exs).
Read `@moduledoc` for components.

See [examples](https://github.com/antonmi/Strom/blob/main/test/examples/).
See [examples](https://github.com/antonmi/Strom/blob/main/test/examples/) in tests.
Binary file modified images/components.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/composite.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/mixer.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/sink.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/source.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/splitter.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion lib/sink/io_puts.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule Strom.Sink.IOPuts do

defstruct line_sep: "", prefix: ""

def new, do: %__MODULE__{}
def new(prefix \\ "", line_sep \\ ""), do: %__MODULE__{line_sep: line_sep, prefix: prefix}

@impl true
def start(%__MODULE__{} = io_puts), do: io_puts
Expand Down
Loading

0 comments on commit ea886b6

Please sign in to comment.