Skip to content

Commit

Permalink
Registre d'arrêts : première ébauche (#4393)
Browse files Browse the repository at this point in the history
  • Loading branch information
ptitfred authored Jan 20, 2025
1 parent 410255d commit 7783db7
Show file tree
Hide file tree
Showing 17 changed files with 642 additions and 72 deletions.
25 changes: 17 additions & 8 deletions apps/shared/lib/wrapper/wrapper_req.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,18 @@ defmodule Transport.HTTPClient do
"""

def get!(url, options) do
{req, options} = setup_cache(options)

Transport.Req.impl().get!(req, options |> Keyword.merge(url: url))
end

def get(url, options) do
{req, options} = setup_cache(options)

Transport.Req.impl().get(req, options |> Keyword.merge(url: url))
end

defp setup_cache(options) do
options =
Keyword.validate!(options, [
:custom_cache_dir,
Expand All @@ -48,13 +60,10 @@ defmodule Transport.HTTPClient do

{enable_cache, options} = options |> Keyword.pop!(:enable_cache)

req =
if enable_cache do
req |> Transport.Shared.ReqCustomCache.attach()
else
req
end

Transport.Req.impl().get!(req, options |> Keyword.merge(url: url))
if enable_cache do
{req |> Transport.Shared.ReqCustomCache.attach(), options}
else
{req, options}
end
end
end
87 changes: 87 additions & 0 deletions apps/transport/lib/gtfs/utils.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
defmodule Transport.GTFS.Utils do
@moduledoc """
Some helpers for handling GTFS archives.
"""

def fetch_position(record, field) do
Map.fetch!(record, field) |> convert_text_to_float()
end

@doc """
Convert textual values to float.
iex> convert_text_to_float("")
nil
iex> convert_text_to_float("0")
0.0
iex> convert_text_to_float("0.0")
0.0
iex> convert_text_to_float("12.7")
12.7
iex> convert_text_to_float("-12.7")
-12.7
iex> convert_text_to_float(" -48.7 ")
-48.7
"""
def convert_text_to_float(input) do
if input |> String.trim() != "" do
input |> String.trim() |> Decimal.new() |> Decimal.to_float()
else
nil
end
end

@doc """
Variant of csv_get_with_default/3 that raises if a mandatory column is missing.
"""
def csv_get_with_default!(map, field, default_value, mandatory_column \\ true) do
value = if mandatory_column, do: Map.fetch!(map, field), else: Map.get(map, field)

case value do
nil -> default_value
"" -> default_value
v -> v
end
end

@doc """
iex> csv_get_with_default(%{}, "field", 0)
0
iex> csv_get_with_default(%{"other_field" => 1}, "field", 0)
0
iex> csv_get_with_default(%{"field" => 2, "other_field" => 1}, "field", 0)
2
iex> csv_get_with_default(%{"field" => "", "other_field" => 1}, "field", 0)
0
"""
def csv_get_with_default(map, field, default_value) do
value = Map.get(map, field)

case value do
nil -> default_value
"" -> default_value
v -> v
end
end

@doc """
Transform the stream outputed by Unzip to a stream of maps, each map
corresponding to a row from the CSV.
"""
def to_stream_of_maps(file_stream) do
file_stream
# transform the stream to a stream of binaries
|> Stream.map(fn c -> IO.iodata_to_binary(c) end)
# stream line by line
|> NimbleCSV.RFC4180.to_line_stream()
|> NimbleCSV.RFC4180.parse_stream(skip_headers: false)
# transform the stream to a stream of maps %{column_name1: value1, ...}
|> Stream.transform([], fn r, acc ->
if acc == [] do
{%{}, r |> Enum.map(fn h -> h |> String.replace_prefix("\uFEFF", "") end)}
else
{[acc |> Enum.zip(r) |> Enum.into(%{})], acc}
end
end)
end
end
65 changes: 9 additions & 56 deletions apps/transport/lib/jobs/gtfs_to_db.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,7 @@ defmodule Transport.Jobs.GtfsToDB do
Get the content of a GTFS ResourceHistory, store it in the DB
"""

@doc """
Convert textual values to float.
iex> convert_text_to_float("0")
0.0
iex> convert_text_to_float("0.0")
0.0
iex> convert_text_to_float("12.7")
12.7
iex> convert_text_to_float("-12.7")
-12.7
iex> convert_text_to_float(" -48.7 ")
-48.7
"""
def convert_text_to_float(input) do
input |> String.trim() |> Decimal.new() |> Decimal.to_float()
end

def csv_get_with_default!(map, field, default_value, mandatory_column \\ true) do
value = if mandatory_column, do: Map.fetch!(map, field), else: Map.get(map, field)

case value do
nil -> default_value
"" -> default_value
v -> v
end
end
alias Transport.GTFS.Utils

def import_gtfs_from_resource_history(resource_history_id) do
%{id: data_import_id} = %DB.DataImport{resource_history_id: resource_history_id} |> DB.Repo.insert!()
Expand Down Expand Up @@ -61,16 +35,16 @@ defmodule Transport.Jobs.GtfsToDB do
def stops_stream_insert(file_stream, data_import_id) do
DB.Repo.transaction(fn ->
file_stream
|> to_stream_of_maps()
|> Utils.to_stream_of_maps()
# the map is reshaped for Ecto's needs
|> Stream.map(fn r ->
%{
data_import_id: data_import_id,
stop_id: r |> Map.fetch!("stop_id"),
stop_name: r |> Map.fetch!("stop_name"),
stop_lat: r |> Map.fetch!("stop_lat") |> convert_text_to_float(),
stop_lon: r |> Map.fetch!("stop_lon") |> convert_text_to_float(),
location_type: r |> csv_get_with_default!("location_type", "0", false) |> String.to_integer()
stop_lat: r |> Utils.fetch_position("stop_lat"),
stop_lon: r |> Utils.fetch_position("stop_lon"),
location_type: r |> Utils.csv_get_with_default!("location_type", "0", false) |> String.to_integer()
}
end)
|> Stream.chunk_every(1000)
Expand All @@ -79,27 +53,6 @@ defmodule Transport.Jobs.GtfsToDB do
end)
end

@doc """
Transform the stream outputed by Unzip to a stream of maps, each map
corresponding to a row from the CSV.
"""
def to_stream_of_maps(file_stream) do
file_stream
# transform the stream to a stream of binaries
|> Stream.map(fn c -> IO.iodata_to_binary(c) end)
# stream line by line
|> NimbleCSV.RFC4180.to_line_stream()
|> NimbleCSV.RFC4180.parse_stream(skip_headers: false)
# transform the stream to a stream of maps %{column_name1: value1, ...}
|> Stream.transform([], fn r, acc ->
if acc == [] do
{%{}, r |> Enum.map(fn h -> h |> String.replace_prefix("\uFEFF", "") end)}
else
{[acc |> Enum.zip(r) |> Enum.into(%{})], acc}
end
end)
end

def fill_calendar_from_resource_history(resource_history_id, data_import_id) do
file_stream = file_stream(resource_history_id, "calendar.txt")
calendar_stream_insert(file_stream, data_import_id)
Expand All @@ -108,7 +61,7 @@ defmodule Transport.Jobs.GtfsToDB do
def calendar_stream_insert(file_stream, data_import_id) do
DB.Repo.transaction(fn ->
file_stream
|> to_stream_of_maps()
|> Utils.to_stream_of_maps()
|> Stream.map(fn r ->
res = %{
data_import_id: data_import_id,
Expand Down Expand Up @@ -155,7 +108,7 @@ defmodule Transport.Jobs.GtfsToDB do
DB.Repo.transaction(
fn ->
file_stream
|> to_stream_of_maps()
|> Utils.to_stream_of_maps()
|> Stream.map(fn r ->
%{
data_import_id: data_import_id,
Expand Down Expand Up @@ -209,7 +162,7 @@ defmodule Transport.Jobs.GtfsToDB do
DB.Repo.transaction(
fn ->
file_stream
|> to_stream_of_maps()
|> Utils.to_stream_of_maps()
|> Stream.map(fn r ->
%{
data_import_id: data_import_id,
Expand All @@ -235,7 +188,7 @@ defmodule Transport.Jobs.GtfsToDB do
DB.Repo.transaction(
fn ->
file_stream
|> to_stream_of_maps()
|> Utils.to_stream_of_maps()
|> Stream.map(fn r ->
%{
data_import_id: data_import_id,
Expand Down
47 changes: 39 additions & 8 deletions apps/transport/lib/netex/netex_archive_parser.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@ defmodule Transport.NeTEx do
# Entry names ending with a slash `/` are directories. Skip them.
# https://github.com/akash-akya/unzip/blob/689a1ca7a134ab2aeb79c8c4f8492d61fa3e09a0/lib/unzip.ex#L69
String.ends_with?(file_name, "/") ->
[]
{:ok, []}

extension |> String.downcase() == ".zip" ->
raise "Insupported zip inside zip for file #{file_name}"
{:error, "Insupported zip inside zip for file #{file_name}"}

extension |> String.downcase() != ".xml" ->
raise "Insupported file extension (#{extension}) for file #{file_name}"
{:error, "Insupported file extension (#{extension}) for file #{file_name}"}

true ->
{:ok, state} =
parsing_result =
unzip
|> Unzip.file_stream!(file_name)
|> Stream.map(&IO.iodata_to_binary(&1))
Expand All @@ -42,7 +42,21 @@ defmodule Transport.NeTEx do
end
})

state.stop_places
case parsing_result do
{:ok, state} -> {:ok, state.stop_places}
{:error, exception} -> {:error, Exception.message(exception)}
{:halt, _state, _rest} -> {:error, "SAX parsing interrupted unexpectedly."}
end
end
end

@doc """
Like read_stop_places/2 but raises on errors.
"""
def read_stop_places!(%Unzip{} = unzip, file_name) do
case read_stop_places(unzip, file_name) do
{:ok, stop_places} -> stop_places
{:error, message} -> raise message
end
end

Expand All @@ -53,8 +67,14 @@ defmodule Transport.NeTEx do
zip_file = Unzip.LocalFile.open(zip_file_name)

try do
{:ok, unzip} = Unzip.new(zip_file)
cb.(unzip)
case Unzip.new(zip_file) do
{:ok, unzip} ->
cb.(unzip)

{:error, message} ->
Logger.error("Error while reading #{zip_file_name}: #{message}")
[]
end
after
Unzip.LocalFile.close(zip_file)
end
Expand All @@ -67,6 +87,17 @@ defmodule Transport.NeTEx do
See tests for actual output. Will be refactored soonish.
"""
def read_all_stop_places(zip_file_name) do
read_all(zip_file_name, &read_stop_places/2)
end

@doc """
Like read_all_stop_places/1 but raises on error.
"""
def read_all_stop_places!(zip_file_name) do
read_all(zip_file_name, &read_stop_places!/2)
end

defp read_all(zip_file_name, reader) do
with_zip_file_handle(zip_file_name, fn unzip ->
unzip
|> Unzip.list_entries()
Expand All @@ -75,7 +106,7 @@ defmodule Transport.NeTEx do

{
metadata.file_name,
read_stop_places(unzip, metadata.file_name)
reader.(unzip, metadata.file_name)
}
end)
end)
Expand Down
Loading

0 comments on commit 7783db7

Please sign in to comment.