diff --git a/apps/shared/lib/wrapper/wrapper_req.ex b/apps/shared/lib/wrapper/wrapper_req.ex index 17a7e9c6ca..b8fbe93ce5 100644 --- a/apps/shared/lib/wrapper/wrapper_req.ex +++ b/apps/shared/lib/wrapper/wrapper_req.ex @@ -35,6 +35,18 @@ defmodule Transport.HTTPClient do """ def get!(url, options) do + {req, options} = setup_cache(options) + + Transport.Req.impl().get!(req, options |> Keyword.merge(url: url)) + end + + def get(url, options) do + {req, options} = setup_cache(options) + + Transport.Req.impl().get(req, options |> Keyword.merge(url: url)) + end + + defp setup_cache(options) do options = Keyword.validate!(options, [ :custom_cache_dir, @@ -48,13 +60,10 @@ defmodule Transport.HTTPClient do {enable_cache, options} = options |> Keyword.pop!(:enable_cache) - req = - if enable_cache do - req |> Transport.Shared.ReqCustomCache.attach() - else - req - end - - Transport.Req.impl().get!(req, options |> Keyword.merge(url: url)) + if enable_cache do + {req |> Transport.Shared.ReqCustomCache.attach(), options} + else + {req, options} + end end end diff --git a/apps/transport/lib/gtfs/utils.ex b/apps/transport/lib/gtfs/utils.ex new file mode 100644 index 0000000000..27ddc49e7e --- /dev/null +++ b/apps/transport/lib/gtfs/utils.ex @@ -0,0 +1,87 @@ +defmodule Transport.GTFS.Utils do + @moduledoc """ + Some helpers for handling GTFS archives. + """ + + def fetch_position(record, field) do + Map.fetch!(record, field) |> convert_text_to_float() + end + + @doc """ + Convert textual values to float. + + iex> convert_text_to_float("") + nil + iex> convert_text_to_float("0") + 0.0 + iex> convert_text_to_float("0.0") + 0.0 + iex> convert_text_to_float("12.7") + 12.7 + iex> convert_text_to_float("-12.7") + -12.7 + iex> convert_text_to_float(" -48.7 ") + -48.7 + """ + def convert_text_to_float(input) do + if input |> String.trim() != "" do + input |> String.trim() |> Decimal.new() |> Decimal.to_float() + else + nil + end + end + + @doc """ + Variant of csv_get_with_default/3 that raises if a mandatory column is missing. + """ + def csv_get_with_default!(map, field, default_value, mandatory_column \\ true) do + value = if mandatory_column, do: Map.fetch!(map, field), else: Map.get(map, field) + + case value do + nil -> default_value + "" -> default_value + v -> v + end + end + + @doc """ + iex> csv_get_with_default(%{}, "field", 0) + 0 + iex> csv_get_with_default(%{"other_field" => 1}, "field", 0) + 0 + iex> csv_get_with_default(%{"field" => 2, "other_field" => 1}, "field", 0) + 2 + iex> csv_get_with_default(%{"field" => "", "other_field" => 1}, "field", 0) + 0 + """ + def csv_get_with_default(map, field, default_value) do + value = Map.get(map, field) + + case value do + nil -> default_value + "" -> default_value + v -> v + end + end + + @doc """ + Transform the stream outputed by Unzip to a stream of maps, each map + corresponding to a row from the CSV. + """ + def to_stream_of_maps(file_stream) do + file_stream + # transform the stream to a stream of binaries + |> Stream.map(fn c -> IO.iodata_to_binary(c) end) + # stream line by line + |> NimbleCSV.RFC4180.to_line_stream() + |> NimbleCSV.RFC4180.parse_stream(skip_headers: false) + # transform the stream to a stream of maps %{column_name1: value1, ...} + |> Stream.transform([], fn r, acc -> + if acc == [] do + {%{}, r |> Enum.map(fn h -> h |> String.replace_prefix("\uFEFF", "") end)} + else + {[acc |> Enum.zip(r) |> Enum.into(%{})], acc} + end + end) + end +end diff --git a/apps/transport/lib/jobs/gtfs_to_db.ex b/apps/transport/lib/jobs/gtfs_to_db.ex index 2b64b38c9f..bc05b4dba7 100644 --- a/apps/transport/lib/jobs/gtfs_to_db.ex +++ b/apps/transport/lib/jobs/gtfs_to_db.ex @@ -3,33 +3,7 @@ defmodule Transport.Jobs.GtfsToDB do Get the content of a GTFS ResourceHistory, store it in the DB """ - @doc """ - Convert textual values to float. - - iex> convert_text_to_float("0") - 0.0 - iex> convert_text_to_float("0.0") - 0.0 - iex> convert_text_to_float("12.7") - 12.7 - iex> convert_text_to_float("-12.7") - -12.7 - iex> convert_text_to_float(" -48.7 ") - -48.7 - """ - def convert_text_to_float(input) do - input |> String.trim() |> Decimal.new() |> Decimal.to_float() - end - - def csv_get_with_default!(map, field, default_value, mandatory_column \\ true) do - value = if mandatory_column, do: Map.fetch!(map, field), else: Map.get(map, field) - - case value do - nil -> default_value - "" -> default_value - v -> v - end - end + alias Transport.GTFS.Utils def import_gtfs_from_resource_history(resource_history_id) do %{id: data_import_id} = %DB.DataImport{resource_history_id: resource_history_id} |> DB.Repo.insert!() @@ -61,16 +35,16 @@ defmodule Transport.Jobs.GtfsToDB do def stops_stream_insert(file_stream, data_import_id) do DB.Repo.transaction(fn -> file_stream - |> to_stream_of_maps() + |> Utils.to_stream_of_maps() # the map is reshaped for Ecto's needs |> Stream.map(fn r -> %{ data_import_id: data_import_id, stop_id: r |> Map.fetch!("stop_id"), stop_name: r |> Map.fetch!("stop_name"), - stop_lat: r |> Map.fetch!("stop_lat") |> convert_text_to_float(), - stop_lon: r |> Map.fetch!("stop_lon") |> convert_text_to_float(), - location_type: r |> csv_get_with_default!("location_type", "0", false) |> String.to_integer() + stop_lat: r |> Utils.fetch_position("stop_lat"), + stop_lon: r |> Utils.fetch_position("stop_lon"), + location_type: r |> Utils.csv_get_with_default!("location_type", "0", false) |> String.to_integer() } end) |> Stream.chunk_every(1000) @@ -79,27 +53,6 @@ defmodule Transport.Jobs.GtfsToDB do end) end - @doc """ - Transform the stream outputed by Unzip to a stream of maps, each map - corresponding to a row from the CSV. - """ - def to_stream_of_maps(file_stream) do - file_stream - # transform the stream to a stream of binaries - |> Stream.map(fn c -> IO.iodata_to_binary(c) end) - # stream line by line - |> NimbleCSV.RFC4180.to_line_stream() - |> NimbleCSV.RFC4180.parse_stream(skip_headers: false) - # transform the stream to a stream of maps %{column_name1: value1, ...} - |> Stream.transform([], fn r, acc -> - if acc == [] do - {%{}, r |> Enum.map(fn h -> h |> String.replace_prefix("\uFEFF", "") end)} - else - {[acc |> Enum.zip(r) |> Enum.into(%{})], acc} - end - end) - end - def fill_calendar_from_resource_history(resource_history_id, data_import_id) do file_stream = file_stream(resource_history_id, "calendar.txt") calendar_stream_insert(file_stream, data_import_id) @@ -108,7 +61,7 @@ defmodule Transport.Jobs.GtfsToDB do def calendar_stream_insert(file_stream, data_import_id) do DB.Repo.transaction(fn -> file_stream - |> to_stream_of_maps() + |> Utils.to_stream_of_maps() |> Stream.map(fn r -> res = %{ data_import_id: data_import_id, @@ -155,7 +108,7 @@ defmodule Transport.Jobs.GtfsToDB do DB.Repo.transaction( fn -> file_stream - |> to_stream_of_maps() + |> Utils.to_stream_of_maps() |> Stream.map(fn r -> %{ data_import_id: data_import_id, @@ -209,7 +162,7 @@ defmodule Transport.Jobs.GtfsToDB do DB.Repo.transaction( fn -> file_stream - |> to_stream_of_maps() + |> Utils.to_stream_of_maps() |> Stream.map(fn r -> %{ data_import_id: data_import_id, @@ -235,7 +188,7 @@ defmodule Transport.Jobs.GtfsToDB do DB.Repo.transaction( fn -> file_stream - |> to_stream_of_maps() + |> Utils.to_stream_of_maps() |> Stream.map(fn r -> %{ data_import_id: data_import_id, diff --git a/apps/transport/lib/netex/netex_archive_parser.ex b/apps/transport/lib/netex/netex_archive_parser.ex index 40b2d95fe8..d95e038425 100644 --- a/apps/transport/lib/netex/netex_archive_parser.ex +++ b/apps/transport/lib/netex/netex_archive_parser.ex @@ -20,16 +20,16 @@ defmodule Transport.NeTEx do # Entry names ending with a slash `/` are directories. Skip them. # https://github.com/akash-akya/unzip/blob/689a1ca7a134ab2aeb79c8c4f8492d61fa3e09a0/lib/unzip.ex#L69 String.ends_with?(file_name, "/") -> - [] + {:ok, []} extension |> String.downcase() == ".zip" -> - raise "Insupported zip inside zip for file #{file_name}" + {:error, "Insupported zip inside zip for file #{file_name}"} extension |> String.downcase() != ".xml" -> - raise "Insupported file extension (#{extension}) for file #{file_name}" + {:error, "Insupported file extension (#{extension}) for file #{file_name}"} true -> - {:ok, state} = + parsing_result = unzip |> Unzip.file_stream!(file_name) |> Stream.map(&IO.iodata_to_binary(&1)) @@ -42,7 +42,21 @@ defmodule Transport.NeTEx do end }) - state.stop_places + case parsing_result do + {:ok, state} -> {:ok, state.stop_places} + {:error, exception} -> {:error, Exception.message(exception)} + {:halt, _state, _rest} -> {:error, "SAX parsing interrupted unexpectedly."} + end + end + end + + @doc """ + Like read_stop_places/2 but raises on errors. + """ + def read_stop_places!(%Unzip{} = unzip, file_name) do + case read_stop_places(unzip, file_name) do + {:ok, stop_places} -> stop_places + {:error, message} -> raise message end end @@ -53,8 +67,14 @@ defmodule Transport.NeTEx do zip_file = Unzip.LocalFile.open(zip_file_name) try do - {:ok, unzip} = Unzip.new(zip_file) - cb.(unzip) + case Unzip.new(zip_file) do + {:ok, unzip} -> + cb.(unzip) + + {:error, message} -> + Logger.error("Error while reading #{zip_file_name}: #{message}") + [] + end after Unzip.LocalFile.close(zip_file) end @@ -67,6 +87,17 @@ defmodule Transport.NeTEx do See tests for actual output. Will be refactored soonish. """ def read_all_stop_places(zip_file_name) do + read_all(zip_file_name, &read_stop_places/2) + end + + @doc """ + Like read_all_stop_places/1 but raises on error. + """ + def read_all_stop_places!(zip_file_name) do + read_all(zip_file_name, &read_stop_places!/2) + end + + defp read_all(zip_file_name, reader) do with_zip_file_handle(zip_file_name, fn unzip -> unzip |> Unzip.list_entries() @@ -75,7 +106,7 @@ defmodule Transport.NeTEx do { metadata.file_name, - read_stop_places(unzip, metadata.file_name) + reader.(unzip, metadata.file_name) } end) end) diff --git a/apps/transport/lib/registry/engine.ex b/apps/transport/lib/registry/engine.ex new file mode 100644 index 0000000000..59e92b870f --- /dev/null +++ b/apps/transport/lib/registry/engine.ex @@ -0,0 +1,111 @@ +defmodule Transport.Registry.Engine do + @moduledoc """ + Stream eligible resources and run extractors to produce a raw registry at the end. + """ + + alias Transport.Registry.GTFS + alias Transport.Registry.Model.DataSource + alias Transport.Registry.Model.Stop + alias Transport.Registry.NeTEx + alias Transport.Registry.Result + + import Ecto.Query + + require Logger + + @type option :: {:limit, integer()} | {:formats, [String.t()]} + + @doc """ + execute("/tmp/registre-arrets.csv", formats: ~w(GTFS NeTEx), limit: 100) + """ + @spec execute(output_file :: Path.t(), opts :: [option]) :: :ok + def execute(output_file, opts \\ []) do + limit = Keyword.get(opts, :limit, 1_000_000) + formats = Keyword.get(opts, :formats, ~w(GTFS NeTEx)) + + create_empty_csv_with_headers(output_file) + + enumerate_gtfs_resources(limit, formats) + |> Result.map_result(&prepare_extractor/1) + |> Task.async_stream(&download/1, max_concurrency: 12, timeout: 30 * 60_000) + # one for Task.async_stream + |> Result.cat_results() + # one for download/1 + |> Result.cat_results() + |> Result.map_result(&extract_from_archive/1) + |> dump_to_csv(output_file) + end + + def create_empty_csv_with_headers(output_file) do + headers = NimbleCSV.RFC4180.dump_to_iodata([Stop.csv_headers()]) + File.write(output_file, headers) + end + + def enumerate_gtfs_resources(limit, formats) do + DB.Resource.base_query() + |> DB.ResourceHistory.join_resource_with_latest_resource_history() + |> where([resource: r], r.format in ^formats) + |> preload([resource_history: rh], resource_history: rh) + |> limit(^limit) + |> DB.Repo.all() + end + + def prepare_extractor(%DB.Resource{} = resource) do + data_source_id = "PAN:resource:#{resource.id}" + + case resource.format do + "GTFS" -> {:ok, {GTFS, data_source_id, resource.url}} + "NeTEx" -> {:ok, {NeTEx, data_source_id, resource.url}} + _ -> {:error, "Unsupported format"} + end + end + + def download({extractor, data_source_id, url}) do + Logger.debug("download #{extractor} #{data_source_id} #{url}") + tmp_path = System.tmp_dir!() |> Path.join("#{Ecto.UUID.generate()}.dat") + + safe_error = fn msg -> + File.rm(tmp_path) + Result.error(msg) + end + + http_result = + Transport.HTTPClient.get(url, + decode_body: false, + compressed: false, + into: File.stream!(tmp_path) + ) + + case http_result do + {:error, error} -> + safe_error.("Unexpected error while downloading the resource from #{url}: #{Exception.message(error)}") + + {:ok, %{status: status}} -> + cond do + status >= 200 && status < 300 -> + {:ok, {extractor, data_source_id, tmp_path}} + + status > 400 -> + safe_error.("Error #{status} while downloading the resource from #{url}") + + true -> + safe_error.("Unexpected HTTP error #{status} while downloading the resource from #{url}") + end + end + end + + @spec extract_from_archive({module(), DataSource.data_source_id(), Path.t()}) :: Result.t([Stop.t()]) + def extract_from_archive({extractor, data_source_id, file}) do + Logger.debug("extract_from_archive #{extractor} #{data_source_id} #{file}") + extractor.extract_from_archive(data_source_id, file) + end + + def dump_to_csv(enumerable, output_file) do + enumerable + |> Stream.concat() + |> Stream.map(&Stop.to_csv/1) + |> NimbleCSV.RFC4180.dump_to_stream() + |> Stream.into(File.stream!(output_file, [:append, :utf8])) + |> Stream.run() + end +end diff --git a/apps/transport/lib/registry/extractor.ex b/apps/transport/lib/registry/extractor.ex new file mode 100644 index 0000000000..fa5f0da5a5 --- /dev/null +++ b/apps/transport/lib/registry/extractor.ex @@ -0,0 +1,14 @@ +defmodule Transport.Registry.Extractor do + @moduledoc """ + Interface and utilities for stops extractors. + """ + + require Logger + + alias Transport.Registry.Model.DataSource + alias Transport.Registry.Model.Stop + alias Transport.Registry.Result + + @callback extract_from_archive(data_source_id :: DataSource.data_source_id(), path :: Path.t()) :: + Result.t([Stop.t()]) +end diff --git a/apps/transport/lib/registry/gtfs.ex b/apps/transport/lib/registry/gtfs.ex new file mode 100644 index 0000000000..b3f16f0859 --- /dev/null +++ b/apps/transport/lib/registry/gtfs.ex @@ -0,0 +1,93 @@ +defmodule Transport.Registry.GTFS do + @moduledoc """ + Implementation of a stop extractor for GTFS resources. + """ + + alias Transport.Registry.Model.Stop + alias Transport.Registry.Model.StopIdentifier + alias Transport.Registry.Result + + alias Transport.GTFS.Utils + + require Logger + + @behaviour Transport.Registry.Extractor + @doc """ + Extract stops from GTFS ressource. + """ + def extract_from_archive(data_source_id, archive) do + case file_stream(archive) do + {:error, error} -> + Logger.error(error) + Result.error(error) + + {:ok, content} -> + Logger.debug("Valid Zip archive") + + try do + content + |> Utils.to_stream_of_maps() + |> Stream.flat_map(&handle_stop(data_source_id, &1)) + |> Enum.to_list() + |> Result.ok() + rescue + e in NimbleCSV.ParseError -> + e + |> Exception.message() + |> Result.error() + end + end + end + + defp handle_stop(data_source_id, record) do + latitude = Utils.fetch_position(record, "stop_lat") + longitude = Utils.fetch_position(record, "stop_lon") + + if latitude != nil && longitude != nil do + [ + %Stop{ + main_id: %StopIdentifier{id: Map.fetch!(record, "stop_id"), type: :main}, + display_name: Map.fetch!(record, "stop_name"), + latitude: latitude, + longitude: longitude, + projection: :utm_wgs84, + stop_type: record |> Utils.csv_get_with_default("location_type", "0") |> to_stop_type(), + data_source_format: :gtfs, + data_source_id: data_source_id + } + ] + else + [] + end + end + + defp to_stop_type("0"), do: :quay + defp to_stop_type("1"), do: :stop + defp to_stop_type(_), do: :other + + defp file_stream(archive) do + zip_file = Unzip.LocalFile.open(archive) + + case Unzip.new(zip_file) do + {:ok, unzip} -> + if has_stops?(unzip) do + unzip |> Unzip.file_stream!("stops.txt") |> Result.ok() + else + Result.error("Missing stops.txt in #{archive}") + end + + {:error, error} -> + Result.error("Error while unzipping archive #{archive}: #{error}") + end + end + + defp has_stops?(unzip) do + unzip + |> Unzip.list_entries() + |> Enum.any?(&entry_of_name?("stops.txt", &1)) + end + + defp entry_of_name?(name, %Unzip.Entry{file_name: file_name}) do + file_name == name + end +end diff --git a/apps/transport/lib/registry/model/data_source.ex b/apps/transport/lib/registry/model/data_source.ex new file mode 100644 index 0000000000..d985325ce9 --- /dev/null +++ b/apps/transport/lib/registry/model/data_source.ex @@ -0,0 +1,23 @@ +defmodule Transport.Registry.Model.DataSource do + @moduledoc """ + Common attributes describing a data source. + """ + + defstruct [ + :id, + :checksum, + :last_updated_at, + :validity_period + ] + + @type t :: %__MODULE__{ + id: data_source_id(), + checksum: binary(), + last_updated_at: DateTime.t(), + validity_period: date_time_range() + } + + @type data_source_id :: binary() + + @type date_time_range :: binary() +end diff --git a/apps/transport/lib/registry/model/stop.ex b/apps/transport/lib/registry/model/stop.ex new file mode 100644 index 0000000000..3e9c9b84d4 --- /dev/null +++ b/apps/transport/lib/registry/model/stop.ex @@ -0,0 +1,103 @@ +defmodule Transport.Registry.Model.StopIdentifier do + @moduledoc """ + Representation of a Stop ID. + """ + + defstruct [ + :id, + :type + ] + + @type t :: %__MODULE__{ + id: binary(), + type: identifier_type() + } + + @type identifier_type :: :main | :private_code | :stop_code | :other + + def main(id), do: %__MODULE__{type: :main, id: id} + + @doc """ + iex> to_field(%Transport.Registry.Model.StopIdentifier{id: "stop1", type: :main}) + "main:stop1" + iex> to_field(%Transport.Registry.Model.StopIdentifier{id: "FRPLY", type: :private_code}) + "private_code:FRPLY" + iex> to_field(%Transport.Registry.Model.StopIdentifier{id: "PARIS GDL", type: :other}) + "other:PARIS GDL" + """ + def to_field(%__MODULE__{id: id, type: type}) do + "#{type}:#{id}" + end +end + +defmodule Transport.Registry.Model.Stop do + @moduledoc """ + Common attributes describing a stop. + """ + alias Transport.Registry.Model.DataSource + alias Transport.Registry.Model.StopIdentifier + + defstruct [ + :main_id, + :display_name, + :data_source_id, + :data_source_format, + :parent_id, + :latitude, + :longitude, + projection: :utm_wgs84, + stop_type: :stop, + secondary_ids: [] + ] + + @type t :: %__MODULE__{ + main_id: StopIdentifier.t(), + display_name: binary(), + data_source_id: DataSource.data_source_id(), + data_source_format: data_source_format_type(), + parent_id: StopIdentifier.t() | nil, + latitude: float(), + longitude: float(), + projection: projection(), + stop_type: stop_type(), + secondary_ids: [StopIdentifier.t()] + } + + @type data_source_format_type :: :gtfs | :netex + + @type stop_type :: :stop | :quay | :other + + @type projection :: :utm_wgs84 | :lambert93_rgf93 + + def csv_headers do + ~w( + main_id + display_name + data_source_id + data_source_format + parent_id + latitude + longitude + projection + stop_type + ) + end + + def to_csv(%__MODULE__{} = stop) do + [ + StopIdentifier.to_field(stop.main_id), + stop.display_name, + stop.data_source_id, + stop.data_source_format, + maybe(stop.parent_id, &StopIdentifier.to_field/1, ""), + stop.latitude, + stop.longitude, + stop.projection, + stop.stop_type + ] + end + + @spec maybe(value :: any() | nil, mapper :: (any() -> any()), defaultValue :: any()) :: any() | nil + def maybe(nil, _, defaultValue), do: defaultValue + def maybe(value, mapper, _), do: mapper.(value) +end diff --git a/apps/transport/lib/registry/netex.ex b/apps/transport/lib/registry/netex.ex new file mode 100644 index 0000000000..aa8dcf321d --- /dev/null +++ b/apps/transport/lib/registry/netex.ex @@ -0,0 +1,55 @@ +defmodule Transport.Registry.NeTEx do + @moduledoc """ + Implementation of a stop extractor for NeTEx resources. + """ + + alias Transport.Registry.Model.Stop + alias Transport.Registry.Model.StopIdentifier + alias Transport.Registry.Result + + require Logger + + @behaviour Transport.Registry.Extractor + @doc """ + Extract stops from a NeTEx archive. + """ + def extract_from_archive(data_source_id, archive) do + archive + |> Transport.NeTEx.read_all_stop_places() + |> Enum.flat_map(&process_stop_places(data_source_id, &1)) + |> Result.ok() + end + + defp process_stop_places(data_source_id, {_filename, {:ok, stop_places}}) do + stop_places |> Enum.map(&to_stop(data_source_id, &1)) |> Result.cat_results() + end + + defp process_stop_places(_data_source_id, {filename, {:error, message}}) do + Logger.error("Processing of #{filename}, error: #{message}") + [] + end + + defp to_stop(data_source_id, %{id: id, name: name, latitude: latitude, longitude: longitude}) do + %Stop{ + main_id: StopIdentifier.main(id), + display_name: name, + latitude: latitude, + longitude: longitude, + data_source_format: :netex, + data_source_id: data_source_id + } + |> Result.ok() + end + + defp to_stop(_data_source_id, incomplete_record) do + expected_keys = MapSet.new(~w(id name latitude longitude)) + keys = MapSet.new(Map.keys(incomplete_record)) + + missing_keys = MapSet.difference(expected_keys, keys) |> Enum.to_list() + + message = "Can't build stop, missing keys: #{inspect(missing_keys)}" + + Logger.error(message) + Result.error(message) + end +end diff --git a/apps/transport/lib/registry/result.ex b/apps/transport/lib/registry/result.ex new file mode 100644 index 0000000000..e87f61e274 --- /dev/null +++ b/apps/transport/lib/registry/result.ex @@ -0,0 +1,33 @@ +defmodule Transport.Registry.Result do + @moduledoc """ + Type and utilities to represent results. + """ + require Integer + + @type t(positive) :: {:ok, positive} | {:error, binary()} + + def ok(positive), do: {:ok, positive} + + def error(message), do: {:error, message} + + @doc """ + iex> [{:ok, "valid"}, {:error, "invalid"}, {:ok, "relevant"}] |> cat_results() + ["valid", "relevant"] + """ + @spec cat_results(Stream.t(t(term()))) :: Stream.t(term()) + def cat_results(enumerable), do: Stream.flat_map(enumerable, &keep_ok/1) + + defp keep_ok({:ok, result}), do: [result] + defp keep_ok(_), do: [] + + @doc """ + iex> 1..10 |> map_result(fn v -> if Integer.is_odd(v) do {:ok, v} else {:error, "Even Steven"} end end) + [1, 3, 5, 7, 9] + """ + @spec map_result(Stream.t(term()), (term() -> t(term()))) :: Stream.t(term()) + def map_result(enumerable, mapper) do + enumerable + |> Stream.map(mapper) + |> cat_results() + end +end diff --git a/apps/transport/test/gtfs/utils_test.exs b/apps/transport/test/gtfs/utils_test.exs new file mode 100644 index 0000000000..1e232cf18c --- /dev/null +++ b/apps/transport/test/gtfs/utils_test.exs @@ -0,0 +1,4 @@ +defmodule Transport.GTFS.UtilsTest do + use ExUnit.Case, async: true + doctest Transport.GTFS.Utils, import: true +end diff --git a/apps/transport/test/netex/netex_archive_parser_test.exs b/apps/transport/test/netex/netex_archive_parser_test.exs index a619424ad8..1c7c93d641 100644 --- a/apps/transport/test/netex/netex_archive_parser_test.exs +++ b/apps/transport/test/netex/netex_archive_parser_test.exs @@ -43,6 +43,15 @@ defmodule Transport.NeTEx.ArchiveParserTest do # given a zip netex archive containing 1 file, I want the output I expected [{"arrets.xml", data}] = Transport.NeTEx.read_all_stop_places(tmp_file) + assert data == + {:ok, + [ + %{id: "FR:HELLO:POYARTIN:001", latitude: 43.669, longitude: -0.919, name: "Poyartin"} + ]} + + # given a zip netex archive containing 1 file, I want the output I expected + [{"arrets.xml", data}] = Transport.NeTEx.read_all_stop_places!(tmp_file) + assert data == [ %{id: "FR:HELLO:POYARTIN:001", latitude: 43.669, longitude: -0.919, name: "Poyartin"} ] diff --git a/apps/transport/test/registry/gtfs_test.exs b/apps/transport/test/registry/gtfs_test.exs new file mode 100644 index 0000000000..8102b43347 --- /dev/null +++ b/apps/transport/test/registry/gtfs_test.exs @@ -0,0 +1,4 @@ +defmodule Transport.Registry.GTFSTest do + use ExUnit.Case, async: true + doctest Transport.Registry.GTFS, import: true +end diff --git a/apps/transport/test/registry/model_test.exs b/apps/transport/test/registry/model_test.exs new file mode 100644 index 0000000000..23ff7b266a --- /dev/null +++ b/apps/transport/test/registry/model_test.exs @@ -0,0 +1,6 @@ +defmodule Transport.Registry.ModelTest do + use ExUnit.Case, async: true + doctest Transport.Registry.Model.DataSource, import: true + doctest Transport.Registry.Model.Stop, import: true + doctest Transport.Registry.Model.StopIdentifier, import: true +end diff --git a/apps/transport/test/registry/result_test.exs b/apps/transport/test/registry/result_test.exs new file mode 100644 index 0000000000..7c500d3186 --- /dev/null +++ b/apps/transport/test/registry/result_test.exs @@ -0,0 +1,29 @@ +defmodule Transport.Registry.ResultTest do + use ExUnit.Case, async: false + + require Integer + alias Transport.Registry.Result + doctest Result + + test "cat_results" do + assert [] == cat_results([]) + assert [] == cat_results([{:error, "Error message"}]) + assert [1, 3] == cat_results([{:ok, 1}, {:error, "Error message"}, {:ok, 3}]) + end + + test "map_result" do + assert [] == map_result([], &even_is_forbidden/1) + assert [1, 3, 5, 7, 9] == map_result(1..10, &even_is_forbidden/1) + end + + defp cat_results(enumerable) do + enumerable |> Result.cat_results() |> Enum.to_list() + end + + defp map_result(enumerable, mapper) do + enumerable |> Result.map_result(mapper) |> Enum.to_list() + end + + defp even_is_forbidden(i) when Integer.is_odd(i), do: {:ok, i} + defp even_is_forbidden(_), do: {:error, "Even is forbidden"} +end diff --git a/scripts/registre-arrets.exs b/scripts/registre-arrets.exs new file mode 100644 index 0000000000..d02e89cdce --- /dev/null +++ b/scripts/registre-arrets.exs @@ -0,0 +1 @@ +Transport.Registry.Engine.execute("./registre-arrets.csv")