Skip to content

Commit

Permalink
Add moduledocs, refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
antonmi committed Jan 14, 2024
1 parent c3f6f3b commit 68c11ae
Show file tree
Hide file tree
Showing 31 changed files with 5,325 additions and 5,020 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Strom

## Flow-based Programming Framework
## Composable primitives for processing streams of data.

Strom provides a set of abstractions for creating, routing and modifying streams of data.

Expand All @@ -25,7 +25,7 @@ flow = %{stream1: Stream.cycle([1, 2, 3]), stream2: ["a", "b", "c"]}
Flow can be empty - `%{}`.


### Operators (functions)
### Components
There are several operators (functions) that can be applied to flows.
Each operator accept flow as input and return a modified flow.

Expand Down
49 changes: 46 additions & 3 deletions lib/composite.ex
Original file line number Diff line number Diff line change
@@ -1,15 +1,56 @@
defmodule Strom.Composite do
@moduledoc """
Runs a set of components and is a component itself, meaning that a composite has the same interface - it accepts flow as input and returns a modified flow.
## Example
iex> alias Strom.{Composite, Transformer, Splitter, Source, Sink}
iex> transformer = Transformer.new(:s, &(&1 + 1))
iex> splitter = Splitter.new(:s, %{odd: &(rem(&1, 2) == 1), even: &(rem(&1, 2) == 0)})
iex> composite = [transformer, splitter] |> Composite.new() |> Composite.start()
iex> source = :s |> Source.new([1, 2, 3]) |> Source.start()
iex> %{odd: odd, even: even} = %{} |> Source.call(source) |> Composite.call(composite)
iex> {Enum.to_list(odd), Enum.to_list(even)}
{[3], [2, 4]}
## Composites can be created from other composites
iex> alias Strom.{Composite, Transformer, Splitter, Source, Sink}
iex> transformer = Transformer.new(:s, &(&1 + 1))
iex> splitter = Splitter.new(:s, %{odd: &(rem(&1, 2) == 1), even: &(rem(&1, 2) == 0)})
iex> c1 = Composite.new([transformer])
iex> c2 = Composite.new([splitter])
iex> source = Source.new(:s, [1, 2, 3])
iex> composite = [source, c1, c2] |> Composite.new() |> Composite.start()
iex> %{odd: odd, even: even} = %{} |> Composite.call(composite)
iex> {Enum.to_list(odd), Enum.to_list(even)}
{[3], [2, 4]}
"""

defstruct pid: nil,
components: []

use GenServer

@type t :: %__MODULE__{}

def start(components) when is_list(components) do
state = %__MODULE__{components: components}
@spec new([struct()]) :: __MODULE__.t()
def new(components) when is_list(components) do
components =
Enum.reduce(components, [], fn component, acc ->
case component do
%__MODULE__{components: components} ->
acc ++ components

component ->
acc ++ [component]
end
end)

%__MODULE__{components: components}
end

{:ok, pid} = GenServer.start_link(__MODULE__, state)
@spec start(__MODULE__.t()) :: __MODULE__.t()
def start(%__MODULE__{} = composite) do
{:ok, pid} = GenServer.start_link(__MODULE__, composite)
__state__(pid)
end

Expand Down Expand Up @@ -48,8 +89,10 @@ defmodule Strom.Composite do
end)
end

@spec call(Strom.flow(), __MODULE__.t()) :: Strom.flow()
def call(flow, %__MODULE__{pid: pid}), do: GenServer.call(pid, {:call, flow}, :infinity)

@spec stop(__MODULE__.t()) :: :ok
def stop(%__MODULE__{pid: pid}), do: GenServer.call(pid, :stop)

def __state__(pid) when is_pid(pid), do: GenServer.call(pid, :__state__)
Expand Down
3 changes: 3 additions & 0 deletions lib/dsl.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
defmodule Strom.DSL do
@moduledoc """
DSL for building components
"""
alias Strom.{Transformer, Mixer, Renamer, Sink, Source, Splitter}

defmacro source(name, origin) do
Expand Down
6 changes: 4 additions & 2 deletions lib/gen_mix.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
defmodule Strom.GenMix do
@moduledoc """
Generic functionality used by `Strom.Mixer` and `Strom.Splitter'
"""

use GenServer

@buffer 1000
Expand All @@ -14,8 +18,6 @@ defmodule Strom.GenMix do

alias Strom.GenMix.Consumer

def start(opts \\ [])

def start(%__MODULE__{opts: opts} = gen_mix) when is_list(opts) do
gen_mix = %{
gen_mix
Expand Down
4 changes: 4 additions & 0 deletions lib/gen_mix/consumer.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
defmodule Strom.GenMix.Consumer do
@moduledoc """
Consumer is used by the generic GenMix component.
"""

use GenServer

defstruct pid: nil,
Expand Down
30 changes: 30 additions & 0 deletions lib/mixer.ex
Original file line number Diff line number Diff line change
@@ -1,16 +1,44 @@
defmodule Strom.Mixer do
@moduledoc """
Mix several streams into one. Use Strom.GenMix under the hood
## Example
iex> alias Strom.Mixer
iex> mixer = [:s1, :s2] |> Mixer.new(:stream) |> Mixer.start()
iex> flow = %{s1: [1, 2, 3], s2: [4, 5, 6]}
iex> %{stream: stream} = Mixer.call(flow, mixer)
iex> stream |> Enum.to_list() |> Enum.sort()
[1, 2, 3, 4, 5, 6]
## Can also accept a map with functions as values. Works like "filter".
iex> alias Strom.Mixer
iex> inputs = %{s1: &(rem(&1, 2) == 0), s2: &(rem(&1, 2) == 1)}
iex> mixer = inputs |> Mixer.new(:stream) |> Mixer.start()
iex> flow = %{s1: [1, 2, 3], s2: [4, 5, 6]}
iex> %{stream: stream} = Mixer.call(flow, mixer)
iex> stream |> Enum.to_list() |> Enum.sort()
[2, 5]
"""
alias Strom.GenMix

defstruct pid: nil,
inputs: [],
output: nil,
opts: []

@type t() :: %__MODULE__{}
@type event() :: any()

@spec new(
[Strom.stream_name()] | %{Strom.stream_name() => (event() -> as_boolean(any))},
Strom.stream_name()
) :: __MODULE__.t()
def new(inputs, output)
when is_list(inputs) or (is_map(inputs) and map_size(inputs) > 0) do
%__MODULE__{inputs: inputs, output: output}
end

@spec start(__MODULE__.t(), buffer: integer()) :: __MODULE__.t()
def start(%__MODULE__{inputs: inputs, output: output} = mixer, opts \\ []) do
inputs =
if is_list(inputs) do
Expand All @@ -33,9 +61,11 @@ defmodule Strom.Mixer do
%{mixer | pid: pid, opts: opts}
end

@spec call(Strom.flow(), __MODULE__.t()) :: Strom.flow()
def call(flow, %__MODULE__{pid: pid}) do
GenMix.call(flow, pid)
end

@spec stop(__MODULE__.t()) :: :ok
def stop(%__MODULE__{pid: pid}), do: GenMix.stop(pid)
end
16 changes: 16 additions & 0 deletions lib/renamer.ex
Original file line number Diff line number Diff line change
@@ -1,14 +1,29 @@
defmodule Strom.Renamer do
@moduledoc """
Renames streams in flow.
## Example
iex> alias Strom.Renamer
iex> flow = %{s1: [1], s2: [2]}
iex> renamer = %{s1: :foo1, s2: :foo2} |> Renamer.new() |> Renamer.start()
iex> Renamer.call(flow, renamer)
%{foo1: [1], foo2: [2]}
"""
defstruct names: %{}

@type t() :: %__MODULE__{}

@spec new(map()) :: __MODULE__.t()
def new(names) when is_map(names) and map_size(names) > 0 do
%__MODULE__{names: names}
end

@spec start(__MODULE__.t()) :: __MODULE__.t()
def start(%__MODULE__{names: names} = renamer) when is_map(names) and map_size(names) > 0 do
renamer
end

@spec call(Strom.flow(), __MODULE__.t()) :: Strom.flow()
def call(flow, %__MODULE__{names: names}) do
Enum.reduce(names, flow, fn {name, new_name}, acc ->
acc
Expand All @@ -17,5 +32,6 @@ defmodule Strom.Renamer do
end)
end

@spec stop(__MODULE__.t()) :: :ok
def stop(%__MODULE__{}), do: :ok
end
27 changes: 23 additions & 4 deletions lib/sink.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,19 @@
defmodule Strom.Sink do
@moduledoc """
Runs a given steam and `call` origin on each even in stream.
By default it runs the stream asynchronously (in `Task.async`).
One can pass `true` a the third argument to the `Sink.new/3` function to run a stream synchronously.
## Example
iex> alias Strom.{Sink, Sink.WriteLines}
iex> sink = :strings |> Sink.new(WriteLines.new("test/data/sink.txt"), true) |> Sink.start()
iex> %{} = Sink.call(%{strings: ["a", "b", "c"]}, sink)
iex> File.read!("test/data/sink.txt")
"a\\nb\\nc\\n"
Sink defines a `@behaviour`. One can easily implement their own sinks.
See `Strom.Sink.Writeline`, `Strom.Sink.IOPuts`, `Strom.Sink.Null`
"""
@callback start(map) :: map
@callback call(map, term) :: {:ok, {term, map}} | {:error, {term, map}}
@callback stop(map) :: map
Expand All @@ -10,14 +25,15 @@ defmodule Strom.Sink do
sync: false,
pid: nil

def new(name, origin, sync \\ false) do
unless is_struct(origin) do
raise "Sink origin must be a struct, given: #{inspect(origin)}"
end
@type t() :: %__MODULE__{}
@type event() :: any()

@spec new(Strom.stream_name(), struct(), boolean()) :: __MODULE__.t()
def new(name, origin, sync \\ false) when is_struct(origin) do
%__MODULE__{origin: origin, name: name, sync: sync}
end

@spec start(__MODULE__.t()) :: __MODULE__.t()
def start(%__MODULE__{origin: origin} = sink) when is_struct(origin) do
origin = apply(origin.__struct__, :start, [origin])
sink = %{sink | origin: origin}
Expand All @@ -33,8 +49,10 @@ defmodule Strom.Sink do
@impl true
def init(%__MODULE__{} = sink), do: {:ok, %{sink | pid: self()}}

@spec call(__MODULE__.t(), any()) :: event()
def call(%__MODULE__{pid: pid}, data), do: GenServer.call(pid, {:call, data})

@spec call(Strom.flow(), __MODULE__.t()) :: Strom.flow()
def call(flow, %__MODULE__{name: name, sync: sync} = sink) when is_map(flow) do
stream = Map.fetch!(flow, name)

Expand All @@ -53,6 +71,7 @@ defmodule Strom.Sink do
Map.delete(flow, name)
end

@spec stop(__MODULE__.t()) :: :ok
def stop(%__MODULE__{origin: origin, pid: pid}) do
apply(origin.__struct__, :stop, [origin])
GenServer.call(pid, :stop)
Expand Down
53 changes: 43 additions & 10 deletions lib/source.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,33 @@
defmodule Strom.Source do
@moduledoc """
Produces stream of events.
## Example with Enumerable
iex> alias Strom.Source
iex> source = :numbers |> Source.new([1, 2, 3]) |> Source.start()
iex> %{numbers: stream} = Source.call(%{}, source)
iex> Enum.to_list(stream)
[1, 2, 3]
## Example with file
iex> alias Strom.{Source, Source.ReadLines}
iex> source = :numbers |> Source.new(ReadLines.new("test/data/numbers1.txt")) |> Source.start()
iex> %{numbers: stream} = Source.call(%{}, source)
iex> Enum.to_list(stream)
["1", "2", "3", "4", "5"]
## If two sources are applied to one stream, the streams will be concatenated (Stream.concat/2)
iex> alias Strom.{Source, Source.ReadLines}
iex> source1 = :numbers |> Source.new([1, 2, 3]) |> Source.start()
iex> source2 = :numbers |> Source.new(ReadLines.new("test/data/numbers1.txt")) |> Source.start()
iex> %{numbers: stream} = %{} |> Source.call(source1) |> Source.call(source2)
iex> Enum.to_list(stream)
[1, 2, 3, "1", "2", "3", "4", "5"]
Source defines a `@behaviour`. One can easily implement their own sources.
See `Strom.Source.ReadLines`, `Strom.Source.Events`, `Strom.Source.IOGets`
"""

@callback start(map) :: map
@callback call(map) :: {:ok, {[term], map}} | {:error, {:halt, map}}
@callback stop(map) :: map
Expand All @@ -10,14 +39,15 @@ defmodule Strom.Source do
name: nil,
pid: nil

def new(name, origin) do
unless is_struct(origin) or is_list(origin) do
raise "Source origin must be a struct or just simple list, given: #{inspect(origin)}"
end
@type t() :: %__MODULE__{}
@type event() :: any()

@spec new(Strom.stream_name(), struct() | [event()]) :: __MODULE__.t()
def new(name, origin) when is_struct(origin) or is_list(origin) do
%__MODULE__{origin: origin, name: name}
end

@spec start(__MODULE__.t()) :: __MODULE__.t()
def start(%__MODULE__{origin: list} = source) when is_list(list) do
start(%{source | origin: Strom.Source.Events.new(list)})
end
Expand All @@ -37,16 +67,12 @@ defmodule Strom.Source do
@impl true
def init(%__MODULE__{} = source), do: {:ok, %{source | pid: self()}}

@spec call(__MODULE__.t()) :: event()
def call(%__MODULE__{pid: pid}), do: GenServer.call(pid, :call, :infinity)

def infinite?(%__MODULE__{pid: pid}), do: GenServer.call(pid, :infinite)

def stop(%__MODULE__{origin: origin, pid: pid}) do
apply(origin.__struct__, :stop, [origin])

GenServer.call(pid, :stop)
end

@spec call(Strom.flow(), __MODULE__.t()) :: Strom.flow()
def call(flow, %__MODULE__{name: name} = source) when is_map(flow) do
stream =
Stream.resource(
Expand All @@ -59,6 +85,13 @@ defmodule Strom.Source do
Map.put(flow, name, Stream.concat(prev_stream, stream))
end

@spec stop(__MODULE__.t()) :: :ok
def stop(%__MODULE__{origin: origin, pid: pid}) do
apply(origin.__struct__, :stop, [origin])

GenServer.call(pid, :stop)
end

def __state__(pid) when is_pid(pid), do: GenServer.call(pid, :__state__)

@impl true
Expand Down
Loading

0 comments on commit 68c11ae

Please sign in to comment.