Skip to content

Commit

Permalink
Rename acc to memo in composer
Browse files Browse the repository at this point in the history
  • Loading branch information
antonmi committed Nov 25, 2023
1 parent 8892378 commit 4c076bd
Show file tree
Hide file tree
Showing 11 changed files with 102 additions and 56 deletions.
2 changes: 0 additions & 2 deletions TODO.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
TODOs:
- stages_from, stage_set_ref, stage or component? need more consistency
- delete Decomposer, Recomposer, and Clone. Cleanup the ip and ip_error structs
- Composer memory per stream_ref
- Can we split one stream into two
34 changes: 17 additions & 17 deletions lib/components/composer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ defmodule ALF.Components.Composer do
module: nil,
function: nil,
source_code: nil,
acc: nil,
stream_accs: %{},
memo: nil,
stream_memos: %{},
collected_ips: [],
new_collected_ips: %{}
]

alias ALF.{DSLError, IP}

@dsl_options [:name, :count, :opts, :acc]
@dsl_options [:name, :count, :opts, :memo]

@spec start_link(t()) :: GenServer.on_start()
def start_link(%__MODULE__{} = state) do
Expand Down Expand Up @@ -64,10 +64,10 @@ defmodule ALF.Components.Composer do

defp process_ip(current_ip, state) do
history = history(current_ip, state)
acc = Map.get(state.stream_accs, current_ip.stream_ref, state.acc)
memo = Map.get(state.stream_memos, current_ip.stream_ref, state.memo)

case call_function(state.module, state.function, current_ip.event, acc, state.opts) do
{:ok, {events, acc}} when is_list(events) ->
case call_function(state.module, state.function, current_ip.event, memo, state.opts) do
{:ok, {events, memo}} when is_list(events) ->
ips =
Enum.map(events, fn event ->
ip = build_ip(event, current_ip, history)
Expand All @@ -76,16 +76,16 @@ defmodule ALF.Components.Composer do
end)

send_result(current_ip, :destroyed)
stream_accs = Map.put(state.stream_accs, current_ip.stream_ref, acc)
{ips, %{state | stream_accs: stream_accs}}
stream_memos = Map.put(state.stream_memos, current_ip.stream_ref, memo)
{ips, %{state | stream_memos: stream_memos}}

{:error, error, stacktrace} ->
send_error_result(current_ip, error, stacktrace, state)
{[], state}

{:ok, other} ->
error =
"Composer \"#{state.name}\" must return the {[event], acc} tuple. Got #{inspect(other)}"
"Composer \"#{state.name}\" must return the {[event], memo} tuple. Got #{inspect(other)}"

send_error_result(current_ip, error, [], state)
{[], state}
Expand All @@ -108,12 +108,12 @@ defmodule ALF.Components.Composer do
end

defp do_sync_process(ip, state) do
acc = get_from_process_dict({state.pid, ip.stream_ref}) || state.acc
memo = get_from_process_dict({state.pid, ip.stream_ref}) || state.memo
history = history(ip, state)

case call_function(state.module, state.function, ip.event, acc, state.opts) do
{:ok, {events, acc}} when is_list(events) ->
put_to_process_dict({state.pid, ip.stream_ref}, acc)
case call_function(state.module, state.function, ip.event, memo, state.opts) do
{:ok, {events, memo}} when is_list(events) ->
put_to_process_dict({state.pid, ip.stream_ref}, memo)

case Enum.map(events, &build_ip(&1, ip, history)) do
[] -> nil
Expand All @@ -125,7 +125,7 @@ defmodule ALF.Components.Composer do

{:ok, other} ->
error =
"Composer \"#{state.name}\" must return the {[event], acc} tuple. Got #{inspect(other)}"
"Composer \"#{state.name}\" must return the {[event], memo} tuple. Got #{inspect(other)}"

send_error_result(ip, error, [], state)
end
Expand Down Expand Up @@ -161,9 +161,9 @@ defmodule ALF.Components.Composer do
end
end

defp call_function(module, function, event, acc, opts)
defp call_function(module, function, event, memo, opts)
when is_atom(module) and is_atom(function) do
{:ok, apply(module, function, [event, acc, opts])}
{:ok, apply(module, function, [event, memo, opts])}
rescue
error ->
{:error, error, __STACKTRACE__}
Expand All @@ -174,5 +174,5 @@ defmodule ALF.Components.Composer do

defp get_from_process_dict(key), do: Process.get(key, nil)

defp put_to_process_dict(key, acc), do: Process.put(key, acc)
defp put_to_process_dict(key, memo), do: Process.put(key, memo)
end
4 changes: 2 additions & 2 deletions lib/dsl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ defmodule ALF.DSL do

defmacro composer(name, options \\ [opts: []]) do
opts = options[:opts] || []
acc = options[:acc]
memo = options[:memo]
count = options[:count] || 1

quote do
Expand All @@ -124,7 +124,7 @@ defmodule ALF.DSL do
composer =
Basic.build_component(Composer, unquote(name), unquote(count), unquote(opts), __MODULE__)

%{composer | acc: unquote(acc)}
%{composer | memo: unquote(memo)}
end
end

Expand Down
2 changes: 1 addition & 1 deletion test/components/composer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ defmodule ALF.Components.ComposerTest do
[error_ip] = TestConsumer.ips(consumer_pid)

assert error_ip.error ==
"Composer \"test_composer1\" must return the {[event], acc} tuple. Got \"wrong_return\""
"Composer \"test_composer1\" must return the {[event], memo} tuple. Got \"wrong_return\""
end
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ defmodule ALF.Examples.DistrTrans.TwoPhaseCommitParallel.TPCPipeline do
)
]

def decompose(event, _acc, _) do
def decompose(event, _memo, _) do
{[%{event | route_to: :first}, %{event | route_to: :second}], nil}
end

Expand Down
44 changes: 22 additions & 22 deletions test/examples/parcels/parcels_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ defmodule ALF.Examples.Parcels.OrderingPipeline do
use ALF.DSL

@components [
composer(:check_ordering, acc: MapSet.new()),
composer(:accumulate_waiting, acc: %{})
composer(:check_ordering, memo: MapSet.new()),
composer(:accumulate_waiting, memo: %{})
]

def check_ordering(event, order_numbers, _) do
Expand Down Expand Up @@ -71,23 +71,23 @@ defmodule ALF.Examples.Parcels.Pipeline do
use ALF.DSL

@components [
composer(:check_expired, acc: []),
composer(:check_parcels_count, acc: %{})
composer(:check_expired, memo: []),
composer(:check_parcels_count, memo: %{})
]

@seconds_in_week 3600 * 24 * 7

def check_expired(event, acc, _) do
def check_expired(event, memo, _) do
order_number = event[:order_number]

case event[:type] do
"ORDER_CREATED" ->
acc = [{order_number, event[:occurred_at]} | acc]
{[event], acc}
memo = [{order_number, event[:occurred_at]} | memo]
{[event], memo}

"PARCEL_SHIPPED" ->
{expired, still_valid} =
Enum.split_while(Enum.reverse(acc), fn {_, order_time} ->
Enum.split_while(Enum.reverse(memo), fn {_, order_time} ->
DateTime.diff(event[:occurred_at], order_time, :second) > @seconds_in_week
end)

Expand All @@ -100,20 +100,20 @@ defmodule ALF.Examples.Parcels.Pipeline do
end
end

def check_parcels_count(event, acc, _) do
def check_parcels_count(event, memo, _) do
order_number = event[:order_number]

case event[:type] do
"ORDER_CREATED" ->
# putting order time here, it's always less than parcels time
acc = Map.put(acc, order_number, {event[:to_ship], event[:occurred_at]})
{[], acc}
memo = Map.put(memo, order_number, {event[:to_ship], event[:occurred_at]})
{[], memo}

"PARCEL_SHIPPED" ->
case Map.get(acc, order_number) do
case Map.get(memo, order_number) do
# was deleted in THRESHOLD_EXCEEDED
nil ->
{[], acc}
{[], memo}

{1, last_occurred_at} ->
last_occurred_at = latest_occurred_at(event[:occurred_at], last_occurred_at)
Expand All @@ -124,22 +124,22 @@ defmodule ALF.Examples.Parcels.Pipeline do
occurred_at: last_occurred_at
}

acc = Map.put(acc, order_number, :all_parcels_shipped)
{[ok_event], acc}
memo = Map.put(memo, order_number, :all_parcels_shipped)
{[ok_event], memo}

{amount, last_occurred_at} when amount > 1 ->
last_occurred_at = latest_occurred_at(event[:occurred_at], last_occurred_at)
acc = Map.put(acc, order_number, {amount - 1, last_occurred_at})
{[], acc}
memo = Map.put(memo, order_number, {amount - 1, last_occurred_at})
{[], memo}
end

"THRESHOLD_EXCEEDED" ->
case Map.get(acc, order_number) do
case Map.get(memo, order_number) do
:all_parcels_shipped ->
{[], Map.delete(acc, order_number)}
{[], Map.delete(memo, order_number)}

_count ->
{[event], Map.delete(acc, order_number)}
{[event], Map.delete(memo, order_number)}
end
end
end
Expand Down Expand Up @@ -261,8 +261,8 @@ defmodule ALF.Examples.Parcels.ParcelsTest do
[
switch(:route_event,
branches:
Enum.reduce(0..(@partitions_count - 1), %{}, fn i, acc ->
Map.put(acc, i, main_stages)
Enum.reduce(0..(@partitions_count - 1), %{}, fn i, memo ->
Map.put(memo, i, main_stages)
end)
)
]
Expand Down
2 changes: 1 addition & 1 deletion test/examples/stream/chunk_by_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule ALF.Examples.Stream.ChunkByPipeline do
end

@components [
composer(:chunk_by, acc: {nil, []}, opts: %{fun: &ChunkBy.call/1})
composer(:chunk_by, memo: {nil, []}, opts: %{fun: &ChunkBy.call/1})
]

def chunk_by(:end, acc, _opts) do
Expand Down
14 changes: 7 additions & 7 deletions test/examples/stream/chunk_every_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@ defmodule ALF.Examples.Stream.ChunkEveryPipeline do
use ALF.DSL

@components [
composer(:chunk_every, acc: [], opts: %{count: 2})
composer(:chunk_every, memo: [], opts: %{count: 2})
]

def chunk_every(:end, acc, _), do: {[Enum.reverse(acc)], []}
def chunk_every(:end, memo, _), do: {[Enum.reverse(memo)], []}

def chunk_every(event, acc, opts) do
acc = [event | acc]
def chunk_every(event, memo, opts) do
memo = [event | memo]

if length(acc) >= opts[:count] do
{[Enum.reverse(acc)], []}
if length(memo) >= opts[:count] do
{[Enum.reverse(memo)], []}
else
{[], acc}
{[], memo}
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion test/examples/telegram_with_composer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule ALF.Examples.TelegramWithComposer.Pipeline do

@components [
composer(:split_to_words),
composer(:create_lines, acc: [])
composer(:create_lines, memo: [])
]

@length_limit 50
Expand Down
48 changes: 48 additions & 0 deletions test/integration/composer_with_streams_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
defmodule ALF.Examples.ComposerWithStreamsTest do
use ExUnit.Case, async: true

defmodule Pipeline do
use ALF.DSL

@components [
composer(:sum_and_emit, memo: 0)
]

def sum_and_emit(event, memo, _) do
sum = event + memo

if sum > 5 do
{[sum], 0}
else
{[], event + memo}
end
end
end

setup do
Pipeline.start(sync: true)
on_exit(&Pipeline.stop/0)
end

test "one stream" do
results =
[1, 2, 3, 4, 5]
|> Pipeline.stream()
|> Enum.to_list()

assert results == [6, 9]
end

test "several streams" do
stream1 = Pipeline.stream(1..5)
stream2 = Pipeline.stream(1..5)
stream3 = Pipeline.stream(1..5)

[result1, result2, result3] =
[stream1, stream2, stream3]
|> Enum.map(&Task.async(fn -> Enum.to_list(&1) end))
|> Task.await_many()

assert [result1, result2, result3] == [[6, 9], [6, 9], [6, 9]]
end
end
4 changes: 2 additions & 2 deletions test/sync_run/compose_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ defmodule ALF.SyncRun.ComposeTest do

@components [
composer(:decomposer_function),
composer(:recomposer_function, acc: [])
composer(:recomposer_function, memo: [])
]

def decomposer_function(event, nil, _) do
Expand Down Expand Up @@ -65,7 +65,7 @@ defmodule ALF.SyncRun.ComposeTest do

@components [
composer(:split_to_words),
composer(:create_lines, acc: [])
composer(:create_lines, memo: [])
]

@length_limit 10
Expand Down

0 comments on commit 4c076bd

Please sign in to comment.