Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Registre d'arrêts : première ébauche #4393

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions apps/shared/lib/wrapper/wrapper_req.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,18 @@ defmodule Transport.HTTPClient do
"""

def get!(url, options) do
{req, options} = setup_cache(options)

Transport.Req.impl().get!(req, options |> Keyword.merge(url: url))
end

def get(url, options) do
{req, options} = setup_cache(options)

Transport.Req.impl().get(req, options |> Keyword.merge(url: url))
end

defp setup_cache(options) do
options =
Keyword.validate!(options, [
:custom_cache_dir,
Expand All @@ -48,13 +60,10 @@ defmodule Transport.HTTPClient do

{enable_cache, options} = options |> Keyword.pop!(:enable_cache)

req =
if enable_cache do
req |> Transport.Shared.ReqCustomCache.attach()
else
req
end

Transport.Req.impl().get!(req, options |> Keyword.merge(url: url))
if enable_cache do
{req |> Transport.Shared.ReqCustomCache.attach(), options}
else
{req, options}
end
end
end
87 changes: 87 additions & 0 deletions apps/transport/lib/gtfs/utils.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
defmodule Transport.GTFS.Utils do
@moduledoc """
Some helpers for handling GTFS archives.
"""

def fetch_position(record, field) do
Map.fetch!(record, field) |> convert_text_to_float()
end

@doc """
Convert textual values to float.

iex> convert_text_to_float("")
nil
iex> convert_text_to_float("0")
0.0
iex> convert_text_to_float("0.0")
0.0
iex> convert_text_to_float("12.7")
12.7
iex> convert_text_to_float("-12.7")
-12.7
iex> convert_text_to_float(" -48.7 ")
-48.7
"""
def convert_text_to_float(input) do
if input |> String.trim() != "" do
input |> String.trim() |> Decimal.new() |> Decimal.to_float()
else
nil
end
end

@doc """
Variant of csv_get_with_default/3 that raises if a mandatory column is missing.
"""
def csv_get_with_default!(map, field, default_value, mandatory_column \\ true) do
value = if mandatory_column, do: Map.fetch!(map, field), else: Map.get(map, field)

case value do
nil -> default_value
"" -> default_value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Est-ce qu'il y a un trim en amont ? J'imagine qu'on pourrait dans certains cas tomber sur des valeurs comme " ").

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

v -> v
end
end

@doc """
iex> csv_get_with_default(%{}, "field", 0)
0
iex> csv_get_with_default(%{"other_field" => 1}, "field", 0)
0
iex> csv_get_with_default(%{"field" => 2, "other_field" => 1}, "field", 0)
2
iex> csv_get_with_default(%{"field" => "", "other_field" => 1}, "field", 0)
0
"""
def csv_get_with_default(map, field, default_value) do
value = Map.get(map, field)

case value do
nil -> default_value
"" -> default_value
v -> v
end
end

@doc """
Transform the stream outputed by Unzip to a stream of maps, each map
corresponding to a row from the CSV.
"""
def to_stream_of_maps(file_stream) do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Je me suis demandé si file_stream "is a" %File.Stream{}.

Avec l'arrivée de Elixir 1.18+ et du typage, j'ai l'impression que mettre les types de structs en paramètre va être une bonne idée.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

C'est un enumerable de iodata (pas sûr du type en elixir).

file_stream
# transform the stream to a stream of binaries
|> Stream.map(fn c -> IO.iodata_to_binary(c) end)
# stream line by line
|> NimbleCSV.RFC4180.to_line_stream()
|> NimbleCSV.RFC4180.parse_stream(skip_headers: false)
# transform the stream to a stream of maps %{column_name1: value1, ...}
|> Stream.transform([], fn r, acc ->
if acc == [] do
{%{}, r |> Enum.map(fn h -> h |> String.replace_prefix("\uFEFF", "") end)}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Je ne sais pas si tu as vu qu'on peut trimmer le BOM directement à l'ouverture du stream:

https://hexdocs.pm/elixir/1.18.1/File.html#stream!/3-byte-order-marks-and-read-offset

File.stream!("./test/test.txt", [:trim_bom, encoding: :utf8])

Mais peut-être que tu as déjà vu et que ce n'est pas pratique etc...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non ; j'ai repris naïvement du code existant.

else
{[acc |> Enum.zip(r) |> Enum.into(%{})], acc}
end
end)
end
end
65 changes: 9 additions & 56 deletions apps/transport/lib/jobs/gtfs_to_db.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,7 @@ defmodule Transport.Jobs.GtfsToDB do
Get the content of a GTFS ResourceHistory, store it in the DB
"""

@doc """
Convert textual values to float.

iex> convert_text_to_float("0")
0.0
iex> convert_text_to_float("0.0")
0.0
iex> convert_text_to_float("12.7")
12.7
iex> convert_text_to_float("-12.7")
-12.7
iex> convert_text_to_float(" -48.7 ")
-48.7
"""
def convert_text_to_float(input) do
input |> String.trim() |> Decimal.new() |> Decimal.to_float()
end

def csv_get_with_default!(map, field, default_value, mandatory_column \\ true) do
value = if mandatory_column, do: Map.fetch!(map, field), else: Map.get(map, field)

case value do
nil -> default_value
"" -> default_value
v -> v
end
end
alias Transport.GTFS.Utils

def import_gtfs_from_resource_history(resource_history_id) do
%{id: data_import_id} = %DB.DataImport{resource_history_id: resource_history_id} |> DB.Repo.insert!()
Expand Down Expand Up @@ -61,16 +35,16 @@ defmodule Transport.Jobs.GtfsToDB do
def stops_stream_insert(file_stream, data_import_id) do
DB.Repo.transaction(fn ->
file_stream
|> to_stream_of_maps()
|> Utils.to_stream_of_maps()
# the map is reshaped for Ecto's needs
|> Stream.map(fn r ->
%{
data_import_id: data_import_id,
stop_id: r |> Map.fetch!("stop_id"),
stop_name: r |> Map.fetch!("stop_name"),
stop_lat: r |> Map.fetch!("stop_lat") |> convert_text_to_float(),
stop_lon: r |> Map.fetch!("stop_lon") |> convert_text_to_float(),
location_type: r |> csv_get_with_default!("location_type", "0", false) |> String.to_integer()
stop_lat: r |> Utils.fetch_position("stop_lat"),
stop_lon: r |> Utils.fetch_position("stop_lon"),
location_type: r |> Utils.csv_get_with_default!("location_type", "0", false) |> String.to_integer()
}
end)
|> Stream.chunk_every(1000)
Expand All @@ -79,27 +53,6 @@ defmodule Transport.Jobs.GtfsToDB do
end)
end

@doc """
Transform the stream outputed by Unzip to a stream of maps, each map
corresponding to a row from the CSV.
"""
def to_stream_of_maps(file_stream) do
file_stream
# transform the stream to a stream of binaries
|> Stream.map(fn c -> IO.iodata_to_binary(c) end)
# stream line by line
|> NimbleCSV.RFC4180.to_line_stream()
|> NimbleCSV.RFC4180.parse_stream(skip_headers: false)
# transform the stream to a stream of maps %{column_name1: value1, ...}
|> Stream.transform([], fn r, acc ->
if acc == [] do
{%{}, r |> Enum.map(fn h -> h |> String.replace_prefix("\uFEFF", "") end)}
else
{[acc |> Enum.zip(r) |> Enum.into(%{})], acc}
end
end)
end

def fill_calendar_from_resource_history(resource_history_id, data_import_id) do
file_stream = file_stream(resource_history_id, "calendar.txt")
calendar_stream_insert(file_stream, data_import_id)
Expand All @@ -108,7 +61,7 @@ defmodule Transport.Jobs.GtfsToDB do
def calendar_stream_insert(file_stream, data_import_id) do
DB.Repo.transaction(fn ->
file_stream
|> to_stream_of_maps()
|> Utils.to_stream_of_maps()
|> Stream.map(fn r ->
res = %{
data_import_id: data_import_id,
Expand Down Expand Up @@ -155,7 +108,7 @@ defmodule Transport.Jobs.GtfsToDB do
DB.Repo.transaction(
fn ->
file_stream
|> to_stream_of_maps()
|> Utils.to_stream_of_maps()
|> Stream.map(fn r ->
%{
data_import_id: data_import_id,
Expand Down Expand Up @@ -209,7 +162,7 @@ defmodule Transport.Jobs.GtfsToDB do
DB.Repo.transaction(
fn ->
file_stream
|> to_stream_of_maps()
|> Utils.to_stream_of_maps()
|> Stream.map(fn r ->
%{
data_import_id: data_import_id,
Expand All @@ -235,7 +188,7 @@ defmodule Transport.Jobs.GtfsToDB do
DB.Repo.transaction(
fn ->
file_stream
|> to_stream_of_maps()
|> Utils.to_stream_of_maps()
|> Stream.map(fn r ->
%{
data_import_id: data_import_id,
Expand Down
47 changes: 39 additions & 8 deletions apps/transport/lib/netex/netex_archive_parser.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@ defmodule Transport.NeTEx do
# Entry names ending with a slash `/` are directories. Skip them.
# https://github.com/akash-akya/unzip/blob/689a1ca7a134ab2aeb79c8c4f8492d61fa3e09a0/lib/unzip.ex#L69
String.ends_with?(file_name, "/") ->
[]
{:ok, []}

extension |> String.downcase() == ".zip" ->
raise "Insupported zip inside zip for file #{file_name}"
{:error, "Insupported zip inside zip for file #{file_name}"}

extension |> String.downcase() != ".xml" ->
raise "Insupported file extension (#{extension}) for file #{file_name}"
{:error, "Insupported file extension (#{extension}) for file #{file_name}"}

true ->
{:ok, state} =
parsing_result =
unzip
|> Unzip.file_stream!(file_name)
|> Stream.map(&IO.iodata_to_binary(&1))
Expand All @@ -42,7 +42,21 @@ defmodule Transport.NeTEx do
end
})

state.stop_places
case parsing_result do
{:ok, state} -> {:ok, state.stop_places}
{:error, exception} -> {:error, Exception.message(exception)}
{:halt, _state, _rest} -> {:error, "SAX parsing interrupted unexpectedly."}
end
end
end

@doc """
Like read_stop_places/2 but raises on errors.
"""
def read_stop_places!(%Unzip{} = unzip, file_name) do
case read_stop_places(unzip, file_name) do
{:ok, stop_places} -> stop_places
{:error, message} -> raise message
end
end

Expand All @@ -53,8 +67,14 @@ defmodule Transport.NeTEx do
zip_file = Unzip.LocalFile.open(zip_file_name)

try do
{:ok, unzip} = Unzip.new(zip_file)
cb.(unzip)
case Unzip.new(zip_file) do
{:ok, unzip} ->
cb.(unzip)

{:error, message} ->
Logger.error("Error while reading #{zip_file_name}: #{message}")
[]
end
after
Unzip.LocalFile.close(zip_file)
end
Expand All @@ -67,6 +87,17 @@ defmodule Transport.NeTEx do
See tests for actual output. Will be refactored soonish.
"""
def read_all_stop_places(zip_file_name) do
read_all(zip_file_name, &read_stop_places/2)
end

@doc """
Like read_all_stop_places/1 but raises on error.
"""
def read_all_stop_places!(zip_file_name) do
read_all(zip_file_name, &read_stop_places!/2)
end

defp read_all(zip_file_name, reader) do
with_zip_file_handle(zip_file_name, fn unzip ->
unzip
|> Unzip.list_entries()
Expand All @@ -75,7 +106,7 @@ defmodule Transport.NeTEx do

{
metadata.file_name,
read_stop_places(unzip, metadata.file_name)
reader.(unzip, metadata.file_name)
}
end)
end)
Expand Down
Loading
Loading