diff --git a/TODO.md b/TODO.md index d19d1d8..a184dd8 100644 --- a/TODO.md +++ b/TODO.md @@ -1,3 +1,4 @@ - restart strategy - renamer can be just a simple module +- source and sink behaviours - tick source with timeout diff --git a/test/examples/parcels/parcels_test.exs b/test/examples/parcels/parcels_test.exs index feeb287..b6cf955 100644 --- a/test/examples/parcels/parcels_test.exs +++ b/test/examples/parcels/parcels_test.exs @@ -172,7 +172,6 @@ defmodule Strom.Examples.ParcelsTest do {[bad], memo} else memo = Map.put(memo, order_number, {amount - 1, order_occurred_at}) - {[], memo} end end @@ -184,11 +183,6 @@ defmodule Strom.Examples.ParcelsTest do end def topology(_opts) do - partitions = %{ - threshold_exceeded: &(&1[:type] == "THRESHOLD_EXCEEDED"), - all_parcels_shipped: &(&1[:type] == "ALL_PARCELS_SHIPPED") - } - [ source(:orders, %ReadLines{path: "test/examples/parcels/orders.csv"}), transform([:orders], &__MODULE__.build_order/1), @@ -197,7 +191,10 @@ defmodule Strom.Examples.ParcelsTest do mix([:orders, :parcels], :mixed), transform([:mixed], &ParcelsFlow.force_order/2, %{}), transform([:mixed], &ParcelsFlow.decide/2, %{}), - split(:mixed, partitions), + split(:mixed, %{ + threshold_exceeded: &(&1[:type] == "THRESHOLD_EXCEEDED"), + all_parcels_shipped: &(&1[:type] == "ALL_PARCELS_SHIPPED") + }), transform([:threshold_exceeded, :all_parcels_shipped], &__MODULE__.to_string/1), sink(:threshold_exceeded, %WriteLines{ path: "test/examples/parcels/threshold_exceeded.csv"