Skip to content

Commit

Permalink
Move opts to the new functions
Browse files Browse the repository at this point in the history
  • Loading branch information
antonmi committed Jan 16, 2024
1 parent a76763c commit d173485
Show file tree
Hide file tree
Showing 13 changed files with 5,095 additions and 4,990 deletions.
62 changes: 45 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,42 @@ In the "mermaid" notation, I suggest the following shapes:
See the example below.
```mermaid
graph LR;
source1(("source1")) --> mixer{{"mixer"}}
source2(("source2")) --> mixer{{"mixer"}}
source(("source")) --> mixer{{"mixer"}}
mixer{{"mixer"}} --> transformer["transformer"]
transformer["transformer"] --> composite(["composite"])
composite(["composite"]) --> splitter{{"splitter"}}
splitter{{"splitter"}} --> sink1(("sink1"))
splitter{{"splitter"}} --> sink2(("sink2"))
splitter{{"splitter"}} --> sink(("sink"))
```

## Example
### The problem
There are two streams of data. One have to sum pairs of numbers from each stream respectively,
then produce two steams: one with the odd numbers, another with the even ones.

### Solution
The flow chart for possible solution:
## Hello, World!
```mermaid
graph LR;
source1(("numbers1")) --> round_robin(["round-robin mixer"])
source2(("numbers1")) --> round_robin(["round-robin mixer"])
round_robin(["round-robin-mixer"]) --> sum["sum pairs"]
sum["sum pairs"] --> spitter{{"split odd-even"}}
spitter{{"split odd-even"}} --> sink_odd(("puts odd"))
spitter{{"split odd-even"}} --> sink_even(("puts even"))
source(("IO.gets")) --> transformer["greeting"]
transformer["greeting"] --> sink(("IO.puts"))
```

```elixir
io_gets = Strom.Source.IOGets.new()
source = :stream |> Strom.Source.new(io_gets)

function = fn string -> "Hello, #{string}!" end
transformer = :stream |> Strom.Transformer.new(function, nil, buffer: 1)

io_puts = Strom.Sink.IOPuts.new()
sink = :stream |> Strom.Sink.new(io_puts, true)

greeter = Strom.Composite.new([source, transformer, sink])
greeter = Strom.Composite.start(greeter)

Strom.Composite.call(%{}, greeter)
```
Add see:
```shell
iex(13)> Strom.Composite.call(%{}, greeter)
IOGets> world
Hello, world!
```

#### The "flow" data-structure
Strom components operates with "flow" - a named set of streams. It's a map with streams as values and their names as keys:

Expand All @@ -59,6 +70,23 @@ A mixer mixes several streams into one. A splitter does the opposite.

A transformer modifies a stream (or streams).

## A more sophisticated example

### The problem
There are two streams of data. One have to sum pairs of numbers from each stream respectively,
then produce two steams: one with the odd numbers, another with the even ones.

### Solution
The flow chart for possible solution:
```mermaid
graph LR;
source1(("numbers1")) --> round_robin(["round-robin mixer"])
source2(("numbers1")) --> round_robin(["round-robin mixer"])
round_robin(["round-robin-mixer"]) --> sum["sum pairs"]
sum["sum pairs"] --> spitter{{"split odd-even"}}
spitter{{"split odd-even"}} --> sink_odd(("puts odd"))
spitter{{"split odd-even"}} --> sink_even(("puts even"))
```

#### Components
The origins for sources here will be just simple lists of numbers.
Expand Down
12 changes: 6 additions & 6 deletions lib/composite.ex
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,14 @@ defmodule Strom.Composite do
%Strom.Sink{} = sink ->
Strom.Sink.start(sink)

%Strom.Mixer{opts: opts} = mixer ->
Strom.Mixer.start(mixer, opts)
%Strom.Mixer{} = mixer ->
Strom.Mixer.start(mixer)

%Strom.Splitter{opts: opts} = splitter ->
Strom.Splitter.start(splitter, opts)
%Strom.Splitter{} = splitter ->
Strom.Splitter.start(splitter)

%Strom.Transformer{opts: opts} = transformer when is_list(opts) ->
Strom.Transformer.start(transformer, opts)
%Strom.Transformer{} = transformer ->
Strom.Transformer.start(transformer)

%Strom.Renamer{} = renamer ->
Strom.Renamer.start(renamer)
Expand Down
6 changes: 3 additions & 3 deletions lib/dsl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,19 @@ defmodule Strom.DSL do

defmacro mix(inputs, output, opts \\ []) do
quote do
%{Mixer.new(unquote(inputs), unquote(output)) | opts: unquote(opts)}
Mixer.new(unquote(inputs), unquote(output), unquote(opts))
end
end

defmacro split(input, outputs, opts \\ []) do
quote do
%{Splitter.new(unquote(input), unquote(outputs)) | opts: unquote(opts)}
Splitter.new(unquote(input), unquote(outputs), unquote(opts))
end
end

defmacro transform(names, function, acc \\ nil, opts \\ []) do
quote do
%{Transformer.new(unquote(names), unquote(function), unquote(acc)) | opts: unquote(opts)}
Transformer.new(unquote(names), unquote(function), unquote(acc), unquote(opts))
end
end

Expand Down
13 changes: 7 additions & 6 deletions lib/mixer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,16 @@ defmodule Strom.Mixer do

@spec new(
[Strom.stream_name()] | %{Strom.stream_name() => (event() -> as_boolean(any))},
Strom.stream_name()
Strom.stream_name(),
list()
) :: __MODULE__.t()
def new(inputs, output)
when is_list(inputs) or (is_map(inputs) and map_size(inputs) > 0) do
def new(inputs, output, opts \\ [])
when is_list(inputs) or (is_map(inputs) and map_size(inputs) > 0 and is_list(opts)) do
%__MODULE__{inputs: inputs, output: output}
end

@spec start(__MODULE__.t(), buffer: integer()) :: __MODULE__.t()
def start(%__MODULE__{inputs: inputs, output: output} = mixer, opts \\ []) do
@spec start(__MODULE__.t()) :: __MODULE__.t()
def start(%__MODULE__{inputs: inputs, output: output, opts: opts} = mixer) do
inputs =
if is_list(inputs) do
Enum.reduce(inputs, %{}, fn name, acc ->
Expand All @@ -58,7 +59,7 @@ defmodule Strom.Mixer do
}

{:ok, pid} = GenMix.start(gen_mix)
%{mixer | pid: pid, opts: opts}
%{mixer | pid: pid}
end

@spec call(Strom.flow(), __MODULE__.t()) :: Strom.flow()
Expand Down
2 changes: 1 addition & 1 deletion lib/source/io_gets.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Strom.Source.IOGets do
@behaviour Strom.Source

defstruct infinite: true
defstruct infinite: false

def new, do: %__MODULE__{}

Expand Down
14 changes: 6 additions & 8 deletions lib/splitter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,16 @@ defmodule Strom.Splitter do

@spec new(
Strom.stream_name(),
[Strom.stream_name()] | %{Strom.stream_name() => (event() -> as_boolean(any))}
[Strom.stream_name()] | %{Strom.stream_name() => (event() -> as_boolean(any))},
list()
) :: __MODULE__.t()
def new(input, outputs) when is_list(outputs) or (is_map(outputs) and map_size(outputs)) > 0 do
def new(input, outputs, opts \\ [])
when is_list(outputs) or ((is_map(outputs) and map_size(outputs)) > 0 and is_list(opts)) do
%Strom.Splitter{input: input, outputs: outputs}
end

@spec start(__MODULE__.t(), buffer: integer()) :: __MODULE__.t()
def start(
%__MODULE__{input: input, outputs: outputs} =
splitter,
opts \\ []
) do
@spec start(__MODULE__.t()) :: __MODULE__.t()
def start(%__MODULE__{input: input, outputs: outputs, opts: opts} = splitter) do
inputs = %{input => fn _el -> true end}

outputs =
Expand Down
17 changes: 7 additions & 10 deletions lib/transformer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -51,22 +51,19 @@ defmodule Strom.Transformer do
(event() -> event())
| (event(), acc() -> {[event()], acc()})

@spec new(Strom.stream_name(), func(), acc()) :: __MODULE__.t()
def new(names, function, acc \\ nil) do
@spec new(Strom.stream_name(), func(), acc(), list()) :: __MODULE__.t()
def new(names, function, acc \\ nil, opts \\ []) when is_function(function) and is_list(opts) do
%__MODULE__{
function: function,
acc: acc,
names: names
names: names,
opts: opts
}
end

@spec start(__MODULE__.t(), buffer: integer(), opts: list()) :: __MODULE__.t()
def start(%__MODULE__{} = transformer, opts \\ []) do
transformer = %{
transformer
| opts: opts,
buffer: Keyword.get(opts, :buffer, @buffer)
}
@spec start(__MODULE__.t()) :: __MODULE__.t()
def start(%__MODULE__{opts: opts} = transformer) do
transformer = %{transformer | buffer: Keyword.get(opts, :buffer, @buffer)}

{:ok, pid} = start_link(transformer)
__state__(pid)
Expand Down
Loading

0 comments on commit d173485

Please sign in to comment.