From 274dd9399e52647fc4dd93380b83294dbe3e0de2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20Menou?= Date: Thu, 19 Dec 2024 22:13:34 +0100 Subject: [PATCH] Code dans le bon module --- apps/transport/lib/registry/engine.ex | 22 ++++++++--------- apps/transport/lib/registry/extractor.ex | 18 ++------------ apps/transport/lib/registry/gtfs.ex | 24 +++++++++---------- apps/transport/lib/registry/result.ex | 24 +++++++++++++++++++ .../{extractor_test.exs => result_test.exs} | 8 +++---- 5 files changed, 53 insertions(+), 43 deletions(-) create mode 100644 apps/transport/lib/registry/result.ex rename apps/transport/test/registry/{extractor_test.exs => result_test.exs} (75%) diff --git a/apps/transport/lib/registry/engine.ex b/apps/transport/lib/registry/engine.ex index 0ebbeec0fb..ca70c753e1 100644 --- a/apps/transport/lib/registry/engine.ex +++ b/apps/transport/lib/registry/engine.ex @@ -3,9 +3,9 @@ defmodule Transport.Registry.Engine do Stream eligible resources and run extractors to produce a raw registry at the end. """ - alias Transport.Registry.Extractor alias Transport.Registry.GTFS alias Transport.Registry.Model.Stop + alias Transport.Registry.Result import Ecto.Query @@ -19,13 +19,13 @@ defmodule Transport.Registry.Engine do create_empty_csv_with_headers(output_file) enumerate_gtfs_resources(limit, formats) - |> Extractor.map_result(&prepare_extractor/1) + |> Result.map_result(&prepare_extractor/1) |> Task.async_stream(&download/1, max_concurrency: 10, timeout: 120_000) # one for Task.async_stream - |> Extractor.cat_results() + |> Result.cat_results() # one for download/1 - |> Extractor.cat_results() - |> Extractor.map_result(&extract_from_archive/1) + |> Result.cat_results() + |> Result.map_result(&extract_from_archive/1) |> dump_to_csv(output_file) end @@ -54,9 +54,9 @@ defmodule Transport.Registry.Engine do Logger.debug("download #{extractor} #{url}") tmp_path = System.tmp_dir!() |> Path.join("#{Ecto.UUID.generate()}.dat") - error_result = fn msg -> + safe_error = fn msg -> File.rm(tmp_path) - {:error, msg} + Result.error(msg) end http_result = @@ -68,7 +68,7 @@ defmodule Transport.Registry.Engine do case http_result do {:error, error} -> - error_result.("Unexpected error while downloading the resource from #{url}: #{Exception.message(error)}") + safe_error.("Unexpected error while downloading the resource from #{url}: #{Exception.message(error)}") {:ok, %{status: status}} -> cond do @@ -76,15 +76,15 @@ defmodule Transport.Registry.Engine do {:ok, {extractor, tmp_path}} status > 400 -> - error_result.("Error #{status} while downloading the resource from #{url}") + safe_error.("Error #{status} while downloading the resource from #{url}") true -> - error_result.("Unexpected HTTP error #{status} while downloading the resource from #{url}") + safe_error.("Unexpected HTTP error #{status} while downloading the resource from #{url}") end end end - @spec extract_from_archive({module(), Path.t()}) :: Extractor.result([Stop.t()]) + @spec extract_from_archive({module(), Path.t()}) :: Result.t([Stop.t()]) def extract_from_archive({extractor, file}) do Logger.debug("extract_from_archive #{extractor} #{file}") extractor.extract_from_archive(file) diff --git a/apps/transport/lib/registry/extractor.ex b/apps/transport/lib/registry/extractor.ex index 19ab45308b..0f5db7d9ec 100644 --- a/apps/transport/lib/registry/extractor.ex +++ b/apps/transport/lib/registry/extractor.ex @@ -6,21 +6,7 @@ defmodule Transport.Registry.Extractor do require Logger alias Transport.Registry.Model.Stop + alias Transport.Registry.Result - @type result(positive) :: {:ok, positive} | {:error, binary()} - - @callback extract_from_archive(path :: Path.t()) :: result([Stop.t()]) - - @spec cat_results(Stream.t(result(term()))) :: Stream.t(term()) - def cat_results(enumerable), do: Stream.flat_map(enumerable, &keep_ok/1) - - defp keep_ok({:ok, result}), do: [result] - defp keep_ok(_), do: [] - - @spec map_result(Stream.t(term()), (term() -> result(term()))) :: Stream.t(term()) - def map_result(enumerable, mapper) do - enumerable - |> Stream.map(mapper) - |> cat_results() - end + @callback extract_from_archive(path :: Path.t()) :: Result.t([Stop.t()]) end diff --git a/apps/transport/lib/registry/gtfs.ex b/apps/transport/lib/registry/gtfs.ex index 25ecb7bbea..80c875e0ee 100644 --- a/apps/transport/lib/registry/gtfs.ex +++ b/apps/transport/lib/registry/gtfs.ex @@ -5,6 +5,7 @@ defmodule Transport.Registry.GTFS do alias Transport.Registry.Model.Stop alias Transport.Registry.Model.StopIdentifier + alias Transport.Registry.Result alias Transport.GTFS.Utils @@ -18,18 +19,16 @@ defmodule Transport.Registry.GTFS do case file_stream(archive) do {:error, error} -> Logger.error(error) - {:error, error} + Result.error(error) {:ok, content} -> Logger.debug("Valid Zip archive") - stops = - content - |> Utils.to_stream_of_maps() - |> Stream.flat_map(&handle_stop/1) - |> Enum.to_list() - - {:ok, stops} + content + |> Utils.to_stream_of_maps() + |> Stream.flat_map(&handle_stop/1) + |> Enum.to_list() + |> Result.ok() end end @@ -63,18 +62,19 @@ defmodule Transport.Registry.GTFS do case Unzip.new(zip_file) do {:ok, unzip} -> if has_stops?(unzip) do - {:ok, Unzip.file_stream!(unzip, "stops.txt")} + unzip |> Unzip.file_stream!("stops.txt") |> Result.ok() else - {:error, "Missing stops.txt in #{archive}"} + Result.error("Missing stops.txt in #{archive}") end {:error, error} -> - {:error, "Error while unzipping archive #{archive}: #{error}"} + Result.error("Error while unzipping archive #{archive}: #{error}") end end defp has_stops?(unzip) do - Unzip.list_entries(unzip) + unzip + |> Unzip.list_entries() |> Enum.any?(&entry_of_name?("stops.txt", &1)) end diff --git a/apps/transport/lib/registry/result.ex b/apps/transport/lib/registry/result.ex new file mode 100644 index 0000000000..5821cbfa41 --- /dev/null +++ b/apps/transport/lib/registry/result.ex @@ -0,0 +1,24 @@ +defmodule Transport.Registry.Result do + @moduledoc """ + Type and utilities to represent results. + """ + + @type t(positive) :: {:ok, positive} | {:error, binary()} + + def ok(positive), do: {:ok, positive} + + def error(message), do: {:error, message} + + @spec cat_results(Stream.t(t(term()))) :: Stream.t(term()) + def cat_results(enumerable), do: Stream.flat_map(enumerable, &keep_ok/1) + + defp keep_ok({:ok, result}), do: [result] + defp keep_ok(_), do: [] + + @spec map_result(Stream.t(term()), (term() -> t(term()))) :: Stream.t(term()) + def map_result(enumerable, mapper) do + enumerable + |> Stream.map(mapper) + |> cat_results() + end +end diff --git a/apps/transport/test/registry/extractor_test.exs b/apps/transport/test/registry/result_test.exs similarity index 75% rename from apps/transport/test/registry/extractor_test.exs rename to apps/transport/test/registry/result_test.exs index da60497735..d48a8d587b 100644 --- a/apps/transport/test/registry/extractor_test.exs +++ b/apps/transport/test/registry/result_test.exs @@ -1,8 +1,8 @@ -defmodule Transport.Registry.ExtractorTest do +defmodule Transport.Registry.ResultTest do use ExUnit.Case, async: false require Integer - alias Transport.Registry.Extractor + alias Transport.Registry.Result test "cat_results" do assert [] == cat_results([]) @@ -16,11 +16,11 @@ defmodule Transport.Registry.ExtractorTest do end defp cat_results(enumerable) do - enumerable |> Extractor.cat_results() |> Enum.to_list() + enumerable |> Result.cat_results() |> Enum.to_list() end defp map_result(enumerable, mapper) do - enumerable |> Extractor.map_result(mapper) |> Enum.to_list() + enumerable |> Result.map_result(mapper) |> Enum.to_list() end defp even_is_forbidden(i) when Integer.is_odd(i), do: {:ok, i}