diff --git a/meetup.livemd b/meetup.livemd new file mode 100644 index 0000000..18abf14 --- /dev/null +++ b/meetup.livemd @@ -0,0 +1,450 @@ + + +# Streams processing. Keep it simple. + +```elixir +Mix.install([{:strom, "0.6.0"}]) +``` + +## Anton Mishchuk, Berlin Elixir Meetup, January 11, 2024 + +## The problem + +https://engineering.zalando.com/posts/2017/07/complex-event-generation-for-business-process-monitoring-using-apache-flink.html + +```elixir +order_creaded = %{ + type: "ORDER_CREATED", + occurred_at: "2023-12-16T22:19:27", + order_number: 123, + to_ship: 3 +} + +parcel_shipped = %{ + type: "PARCEL_SHIPPED", + occurred_at: "2023-12-17T19:23:27", + order_number: 123 +} + +nil +``` + + + +```mermaid +graph LR; + ORDER_CREATED-->Logic; + PARCEL_SHIPPED-->Logic; + Logic-->ALL_PARCELS_SHIPPED; + Logic-->THRESHOLD_EXCEEDED; +``` + + + +Let's quickly go through the article. + +## The problem + +## How Elixir engineer see the problem + +In Elixir we have the Stream abstraction - composable, lazy enumerables. + +```elixir +stream = + [1, 2, 3, 4, 5] + |> Stream.map(&(&1 * 2)) + |> Stream.each(&IO.inspect/1) + |> Stream.take_every(2) +``` + +```elixir +Stream.run(stream) +# or +Enum.to_list(stream) +``` + +Using Streams one can see the problem in quite straightforward way: + +* One need a "source" that produces a stream of the "orders" events (file, db, Kafka, Rabbit, etc). +* One need a "source" that produces a stream of the "parcels" events. +* The streams need to be merged (mixed) into one, so they can be processed. +* The mixed stream then goes to a "function" that transforms it and produces a stream of the "all_parcels_shipped" and "threshold_exceeded" events. +* The stream needs to be splitted into two, based on the type of events. +* "all_parcels_shipped" should be directed into a "sink" (file, db, Kafka, Rabbit, etc). +* "threshold_exceeded" should be directed into another sink. + + + +### + + + + + +```mermaid +graph LR; + source_orders --> merge; + source_parcesl --> merge; + merge --> transform; + transform --> split; + split --> sink_all_parcels_shipped; + split --> sink_threshold_exceeded; +``` + +## Stream operations + +Elixir (a many other programming languages) has the Stream abstraction - a sequence of data made available over time. + +Since a stream is just a data sequence without any predefined size (potentially infinite), the set of operations that can be performed on streams is quite limited: + +create/destroy + +* nil -> source() -> stream +* stream -> sink() -> nil + +mix/split + +* {stream, stream} -> mix -> stream +* stream -> split -> {stream, stream} + +transform + +* stream -> transform -> stream + +## Implementation of the operations + +### See the Strom library + +https://github.com/antonmi/Strom + + + +### The "flow" datastructure - a Map of Streams + +```elixir +flow = %{numbers1: [1, 2, 3], numbers2: Stream.cycle([1, 2, 3])} + +# empty flow +flow = %{} +``` + +### Strom.Source + +```elixir +# Enum +source = Strom.Source.start([1, 2, 3]) +flow = %{} +flow = Strom.Source.call(flow, source, :numbers) +``` + +```elixir +Enum.to_list(flow[:numbers]) +``` + +```elixir +flow = %{before: [10, 20, 30]} +source = Strom.Source.start([1, 2, 3]) +flow = Strom.Source.call(flow, source, :numbers) +``` + +```elixir +# with files +path1 = "/Users/anton.mishchukkloeckner.com/elixir/strom/test/data/numbers1.txt" +source = Strom.Source.start(%Strom.Source.ReadLines{path: path1}) + +flow = Strom.Source.call(%{}, source, :numbers1) +``` + +```elixir +Enum.to_list(flow[:numbers1]) +``` + +One can easily implement a custom source. It's just a module that implements `start/1`, `call/1`, and `stop/1` functions. + +See https://github.com/antonmi/Strom/tree/main/lib/source + +## Strom.Sink + +`Sink` runs (destroy) a stream. + +```elixir +# IO.puts +sink = Strom.Sink.start(%Strom.Sink.IOPuts{}) + +flow = %{data: [1, 2, 3], another: ["x", "y", "z"]} + +Strom.Sink.call(flow, sink, :data) +``` + +See: https://github.com/antonmi/Strom/tree/main/lib/sink + +## Strom.Mixer + +```elixir +mixer = Strom.Mixer.start(buffer: 2) + +flow = %{numbers1: [1, 2, 3], numbers2: [10, 20, 30]} + +flow = Strom.Mixer.call(flow, mixer, [:numbers1, :numbers2], :mixed) +``` + +```elixir +Enum.to_list(flow[:mixed]) +``` + +## Strom.Splitter + +```elixir +splitter = Strom.Splitter.start() + +partitions = %{odd: &(rem(&1, 2) == 1), even: &(rem(&1, 2) == 0)} + +flow = %{numbers: [1, 2, 3, 4, 5, 6, 7, 8, 9]} + +flow = Strom.Splitter.call(flow, splitter, :numbers, partitions) +``` + +```elixir +{Enum.to_list(flow[:odd]), Enum.to_list(flow[:even])} +``` + +## Strom.Transform + +```elixir +# simple map + +transformer = Strom.Transformer.start() + +function = &(&1 * 2) + +flow = %{numbers: [1, 2, 3]} + +flow = Strom.Transformer.call(flow, transformer, :numbers, function) +``` + +```elixir +Enum.to_list(flow[:numbers]) +``` + +```elixir +# reduce + +transformer = Strom.Transformer.start() + +function = fn event, acc -> + {[event, event + acc], acc + 10} +end + +flow = %{numbers: [1, 2, 3]} + +flow = Strom.Transformer.call(flow, transformer, :numbers, {function, 10}) +``` + +```elixir +Enum.to_list(flow[:numbers]) +``` + +```elixir +# Transormer function can be applied to several streams + +transformer = Strom.Transformer.start() + +flow = %{numbers1: [1, 2, 3], numbers2: [10, 20, 30]} + +flow = Strom.Transformer.call(flow, transformer, [:numbers1, :numbers2], &(&1 * 2)) + +{Enum.to_list(flow[:numbers1]), Enum.to_list(flow[:numbers2])} +``` + +## Strom.DSL and Flow module + +The problem is: + +Read numbers from two files ("numbers1.txt" and "numbers2.txt"). Add one to each number. Then split the numbers into two files: even numbers goes to "even.txt", odd - to "odd.txt" + +Consider the following topology: + + + + + +```mermaid +graph LR; + source_numbers1 --> mix; + source_numbers2 --> mix; + mix --> to_integer; + to_integer --> add_one; + add_one --> split; + split --> to_string_odd; + split --> to_string_even; + to_string_odd --> sink_odd; + to_string_even --> sink_event; +``` + +```elixir +defmodule OddEvenFlow do + use Strom.DSL + alias Strom.Sink.IOPuts + + def topology(_opts) do + partitions = %{ + odd: &__MODULE__.odd_fun/1, + even: &__MODULE__.even_fun/1 + } + + [ + source(:numbers1, [1, 2, 3, 4, 5]), + source(:numbers2, [10, 20, 30, 40, 50]), + mix([:numbers1, :numbers2], :mixed), + transform(:mixed, &__MODULE__.add_one/1), + split(:mixed, partitions), + transform([:odd, :even], &__MODULE__.to_string/1), + sink(:odd, %IOPuts{prefix: "odd"}, true), + sink(:even, %IOPuts{prefix: "even"}, true) + ] + end + + def odd_fun(event), do: rem(event, 2) == 1 + + def even_fun(event), do: rem(event, 2) == 0 + + def add_one(event), do: event + 1 + + def to_string(el), do: "#{el}" +end +``` + +See: +https://github.com/antonmi/Strom/blob/main/test/dsl_test.exs + +## Finally back to the "parcels" problem + + + +```mermaid +graph LR; + source_orders(source) -- orders --> build_orders; + source_parcesl(source) -- parcels --> build_parcels; + build_orders --> mix{mix}; + build_parcels --> mix{mix}; + mix -- mixed --> force_order; + force_order -- ordered --> decide; + decide --> split{split}; + split -- all_shipped --> to_string1[to_string]; + split -- threshold --> to_string2[to_string]; + to_string1 --> sink_all_shipped(sink); + to_string2 --> sink_threshold(sink); +``` + +```elixir +def topology(_opts) do + [ + source(:orders, %ReadLines{path: "test/examples/parcels/orders.csv"}), + transform([:orders], &__MODULE__.build_order/1), + source(:parcels, %ReadLines{path: "test/examples/parcels/parcels.csv"}), + transform([:parcels], &__MODULE__.build_parcel/1), + mix([:orders, :parcels], :mixed), + transform([:mixed], &ParcelsFlow.force_order/2, %{}), + transform([:mixed], &ParcelsFlow.decide/2, %{}), + split(:mixed, %{ + threshold_exceeded: &(&1[:type] == "THRESHOLD_EXCEEDED"), + all_parcels_shipped: &(&1[:type] == "ALL_PARCELS_SHIPPED") + }), + transform([:threshold_exceeded, :all_parcels_shipped], &__MODULE__.to_string/1), + sink(:threshold_exceeded, %WriteLines{ + path: "test/examples/parcels/threshold_exceeded.csv" + }), + sink( + :all_parcels_shipped, + %WriteLines{path: "test/examples/parcels/all_parcels_shipped.csv"}, + true + ) + ] +end +``` + +```elixir +def force_order(event, memo) do + order_number = event[:order_number] + + case event[:type] do + "PARCEL_SHIPPED" -> + case Map.get(memo, order_number) do + nil -> + {[], Map.put(memo, order_number, [event])} + + true -> + {[event], memo} + + parcels -> + {[], Map.put(memo, order_number, parcels ++ [event])} + end + + "ORDER_CREATED" -> + case Map.get(memo, order_number) do + nil -> + {[event], Map.put(memo, order_number, true)} + + parcels -> + {[event | parcels], Map.put(memo, order_number, true)} + end + end +end +``` + +```elixir +def decide(event, memo) do + order_number = event[:order_number] + + case event[:type] do + "ORDER_CREATED" -> + memo = Map.put(memo, order_number, {event[:to_ship], event[:occurred_at]}) + {[], memo} + + "PARCEL_SHIPPED" -> + case Map.get(memo, order_number) do + # THRESHOLD_EXCEEDED was sent already + nil -> + {[], memo} + + {1, order_occurred_at} -> + good_or_bad = + if DateTime.diff(event[:occurred_at], order_occurred_at, :second) > + @seconds_in_week do + %{ + type: "THRESHOLD_EXCEEDED", + order_number: order_number, + occurred_at: event[:occurred_at] + } + else + %{ + type: "ALL_PARCELS_SHIPPED", + order_number: order_number, + occurred_at: event[:occurred_at] + } + end + + memo = Map.delete(memo, order_number) + {[good_or_bad], memo} + + {amount, order_occurred_at} when amount > 1 -> + if DateTime.diff(event[:occurred_at], order_occurred_at, :second) > + @seconds_in_week do + bad = %{ + type: "THRESHOLD_EXCEEDED", + order_number: order_number, + occurred_at: event[:occurred_at] + } + + memo = Map.delete(memo, order_number) + {[bad], memo} + else + memo = Map.put(memo, order_number, {amount - 1, order_occurred_at}) + {[], memo} + end + end + end +end +``` + +