Skip to content

Commit

Permalink
Merge pull request #14 from antonmi/generic_composer
Browse files Browse the repository at this point in the history
Add generic Composer, it will replace recomposer and decomposer
  • Loading branch information
antonmi authored Nov 20, 2023
2 parents 21c0d81 + 6f17a88 commit 22bc3d9
Show file tree
Hide file tree
Showing 12 changed files with 878 additions and 4 deletions.
21 changes: 19 additions & 2 deletions lib/builder.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ defmodule ALF.Builder do
Unplug,
Decomposer,
Recomposer,
Composer,
Tbd
}

Expand Down Expand Up @@ -182,15 +183,23 @@ defmodule ALF.Builder do
%Plug{} = plug ->
plug =
plug
|> Map.merge(%{stage_set_ref: make_ref(), pipeline_module: pipeline_module, telemetry: telemetry})
|> Map.merge(%{
stage_set_ref: make_ref(),
pipeline_module: pipeline_module,
telemetry: telemetry
})
|> start_stage(supervisor_pid, prev_stages)

{[plug], stages ++ [plug]}

%Unplug{} = unplug ->
unplug =
unplug
|> Map.merge(%{stage_set_ref: make_ref(), pipeline_module: pipeline_module, telemetry: telemetry})
|> Map.merge(%{
stage_set_ref: make_ref(),
pipeline_module: pipeline_module,
telemetry: telemetry
})
|> start_stage(supervisor_pid, prev_stages)

{[unplug], stages ++ [unplug]}
Expand All @@ -211,6 +220,14 @@ defmodule ALF.Builder do

{[recomposer], stages ++ [recomposer]}

%Composer{} = composer ->
composer =
composer
|> Map.merge(%{stage_set_ref: make_ref(), telemetry: telemetry})
|> start_stage(supervisor_pid, prev_stages)

{[composer], stages ++ [composer]}

%Tbd{} = tbd ->
tbd =
tbd
Expand Down
176 changes: 176 additions & 0 deletions lib/components/composer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
defmodule ALF.Components.Composer do
use ALF.Components.Basic

defstruct Basic.common_attributes() ++
[
type: :composer,
module: nil,
function: nil,
source_code: nil,
acc: nil,
collected_ips: [],
new_collected_ips: %{}
]

alias ALF.{DSLError, IP}

@dsl_options [:name, :opts, :acc]

@spec start_link(t()) :: GenServer.on_start()
def start_link(%__MODULE__{} = state) do
GenStage.start_link(__MODULE__, state)
end

@impl true
def init(state) do
state = %{
state
| pid: self(),
opts: init_opts(state.module, state.opts),
source_code: state.source_code || read_source_code(state.module, state.function)
}

component_added(state)
{:producer_consumer, state}
end

def init_sync(state, telemetry) do
%{
state
| pid: make_ref(),
opts: init_opts(state.module, state.opts),
source_code: state.source_code || read_source_code(state.module, state.function),
telemetry: telemetry
}
end

@impl true
def handle_events([%ALF.IP{} = ip], _from, %__MODULE__{telemetry: true} = state) do
:telemetry.span(
[:alf, :component],
telemetry_data(ip, state),
fn ->
{ips, state} = process_ip(ip, state)
{{:noreply, ips, state}, telemetry_data(ips, state)}
end
)
end

def handle_events([%ALF.IP{} = ip], _from, %__MODULE__{telemetry: false} = state) do
{ips, state} = process_ip(ip, state)
{:noreply, ips, state}
end

defp process_ip(current_ip, state) do
history =
if current_ip.debug, do: [{state.name, current_ip.event} | current_ip.history], else: []

case call_function(state.module, state.function, current_ip.event, state.acc, state.opts) do
{:ok, {events, acc}} ->
ips =
Enum.map(events, fn event ->
ip = build_ip(event, current_ip, history)
send_result(ip, :composed)
ip
end)

send_result(current_ip, :destroyed)
{ips, %{state | acc: acc}}

{:error, error, stacktrace} ->
send_error_result(current_ip, error, stacktrace, state)
{[], state}

{:ok, other} ->
error =
"Composer \"#{state.name}\" must return the {[event], acc} tuple. Got #{inspect(other)}"

send_error_result(current_ip, error, [], state)
{[], state}
end
end

def sync_process(ip, %__MODULE__{telemetry: false} = state) do
do_sync_process(ip, state)
end

def sync_process(ip, %__MODULE__{telemetry: true} = state) do
:telemetry.span(
[:alf, :component],
telemetry_data(ip, state),
fn ->
ips = do_sync_process(ip, state)
{ips, telemetry_data(ips, state)}
end
)
end

defp do_sync_process(ip, state) do
acc = get_from_process_dict({state.pid, ip.stream_ref}) || state.acc
history = history(ip, state)

case call_function(state.module, state.function, ip.event, acc, state.opts) do
{:ok, {events, acc}} when is_list(events) ->
put_to_process_dict({state.pid, ip.stream_ref}, acc)

case Enum.map(events, &build_ip(&1, ip, history)) do
[] -> nil
ips -> ips
end

{:error, error, stacktrace} ->
send_error_result(ip, error, stacktrace, state)

{:ok, other} ->
error =
"Composer \"#{state.name}\" must return the {[event], acc} tuple. Got #{inspect(other)}"

send_error_result(ip, error, [], state)
end
end

defp build_ip(event, ip, history) do
%IP{
stream_ref: ip.stream_ref,
ref: ip.ref,
destination: ip.destination,
init_event: event,
event: event,
pipeline_module: ip.pipeline_module,
composed: true,
debug: ip.debug,
history: history,
sync_path: ip.sync_path,
plugs: ip.plugs
}
end

def validate_options(name, options) do
wrong_options = Keyword.keys(options) -- @dsl_options

unless is_atom(name) do
raise DSLError, "Composer name must be an atom: #{inspect(name)}"
end

if Enum.any?(wrong_options) do
raise DSLError,
"Wrong options for the #{name} composer: #{inspect(wrong_options)}. " <>
"Available options are #{inspect(@dsl_options)}"
end
end

defp call_function(module, function, event, acc, opts)
when is_atom(module) and is_atom(function) do
{:ok, apply(module, function, [event, acc, opts])}
rescue
error ->
{:error, error, __STACKTRACE__}
catch
kind, value ->
{:error, kind, value}
end

defp get_from_process_dict(key), do: Process.get(key, nil)

defp put_to_process_dict(key, acc), do: Process.put(key, acc)
end
1 change: 0 additions & 1 deletion lib/components/unplug.ex
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ defmodule ALF.Components.Unplug do

defp process_ip(ip, state) do
ip = %{ip | history: history(ip, state)}

prev_event = Map.fetch!(ip.plugs, state.name)
ip_plugs = Map.delete(ip.plugs, state.name)
ip = %{ip | plugs: ip_plugs}
Expand Down
22 changes: 22 additions & 0 deletions lib/dsl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ defmodule ALF.DSL do
Unplug,
Decomposer,
Recomposer,
Composer,
Tbd
}

Expand Down Expand Up @@ -152,6 +153,27 @@ defmodule ALF.DSL do
end
end

defmacro composer(atom, options \\ [opts: []]) do
opts = options[:opts]
name = options[:name]
acc = options[:acc]

quote do
Composer.validate_options(unquote(atom), unquote(options))

composer =
Basic.build_component(
Composer,
unquote(atom),
unquote(name),
unquote(opts),
__MODULE__
)

%{composer | acc: unquote(acc)}
end
end

defmacro stages_from(module, options \\ [opts: [], count: 1]) do
count = options[:count]
opts = options[:opts]
Expand Down
2 changes: 2 additions & 0 deletions lib/ip.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ defmodule ALF.IP do
history: list(),
decomposed: boolean(),
recomposed: boolean(),
composed: boolean(),
plugs: map(),
sync_path: nil | list()
}
Expand All @@ -28,6 +29,7 @@ defmodule ALF.IP do
history: [],
decomposed: false,
recomposed: false,
composed: false,
plugs: %{},
sync_path: nil
end
2 changes: 1 addition & 1 deletion lib/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ defmodule ALF.Manager do
{^ref, :created_recomposer} ->
wait_result(ref, acc, {timeout, initial_ip})

{^ref, reason} when reason in [:created_decomposer, :cloned] ->
{^ref, reason} when reason in [:created_decomposer, :cloned, :composed] ->
wait_result(
ref,
acc ++ wait_result(ref, [], {timeout, initial_ip}),
Expand Down
Loading

0 comments on commit 22bc3d9

Please sign in to comment.