Skip to content

Commit

Permalink
Polling résultats validation enRoute
Browse files Browse the repository at this point in the history
  • Loading branch information
ptitfred committed Nov 27, 2024
1 parent 55bc771 commit f4e5d51
Show file tree
Hide file tree
Showing 10 changed files with 691 additions and 175 deletions.
25 changes: 25 additions & 0 deletions apps/transport/lib/jobs/netex_poller_job.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
defmodule Transport.Jobs.NeTExPollerJob do
@moduledoc """
Companion module to the validator for NeTEx files, used to handle long
standing validations.
"""
use Oban.Worker, tags: ["validation"], max_attempts: 180, queue: :resource_validation

alias Transport.Validators.NeTEx

@impl Oban.Worker
def perform(%Oban.Job{
args: %{
"validation_id" => validation_id,
"resource_history_id" => resource_history_id
},
attempt: attempt
}) do
NeTEx.poll_validation_results(validation_id, attempt)
|> NeTEx.handle_validation_results(resource_history_id, fn ^validation_id -> snooze_poller(attempt) end)
end

def snooze_poller(attempt) do
{:snooze, NeTEx.poll_interval(attempt)}
end
end
68 changes: 68 additions & 0 deletions apps/transport/lib/jobs/on_demand_netex_poller_job.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
defmodule Transport.Jobs.OnDemandNeTExPollerJob do
@moduledoc """
Job in charge of polling validation results from enRoute Chouette Valid.
Upon success it stores the result in the database.
"""
use Oban.Worker, tags: ["validation"], max_attempts: 30, queue: :on_demand_validation
alias Transport.Jobs.OnDemandValidationHelpers, as: Helpers
alias Transport.Validators.NeTEx

def later(validation_id, multivalidation_id, url) do
%{validation_id: validation_id, id: multivalidation_id, permanent_url: url}
|> new(schedule_in: {20, :seconds})
|> Oban.insert()

Helpers.delegated_state()
end

def perform(%Oban.Job{args: %{"id" => multivalidation_id} = args, attempt: attempt}) do
check_result(args, attempt)
|> Helpers.handle_validation_result(multivalidation_id)
end

def check_result(%{"permanent_url" => url, "validation_id" => validation_id}, attempt) do
case NeTEx.poll_validation(validation_id, attempt) do
{:error, error_result} -> handle_error(error_result)
{:ok, ok_result} -> handle_success(ok_result, url)
{:pending, _validation_id} -> handle_pending(attempt)
end
end

def handle_error(error_result) do
error_result
|> build_error_validation_result()
|> Helpers.terminal_state()
end

def handle_success(ok_result, url) do
ok_result
|> build_successful_validation_result(url)
|> Helpers.terminal_state()
end

def handle_pending(attempt) do
attempt
|> NeTEx.poll_interval()
|> Helpers.snoozed_state()
end

defp build_successful_validation_result(%{"validations" => validation, "metadata" => metadata}, url) do
%{
result: validation,
metadata: metadata,
data_vis: nil,
validator: NeTEx.validator_name(),
validated_data_name: url,
max_error: NeTEx.get_max_severity_error(validation),
oban_args: Helpers.completed()
}
end

defp build_error_validation_result(%{message: msg}) do
%{
oban_args: Helpers.error(msg),
validator: NeTEx.validator_name()
}
end
end
41 changes: 41 additions & 0 deletions apps/transport/lib/jobs/on_demand_validation_helpers.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
defmodule Transport.Jobs.OnDemandValidationHelpers do
@moduledoc """
Shared code for jobs implementing the On Demand validation.
"""
import Ecto.Changeset
import Ecto.Query
alias DB.{MultiValidation, Repo}

def terminal_state(result), do: {:terminal, result}
def delegated_state, do: :delegated
def snoozed_state(duration_in_seconds), do: {:snooze, duration_in_seconds}

def completed, do: %{"state" => "completed"}

def error(error_message), do: %{"state" => "error", "error_reason" => error_message}

def handle_validation_result(result, multivalidation_id) do
case result do
{:terminal, changes} -> update_multivalidation(multivalidation_id, changes)
:delegated -> :ok
{:snooze, _duration_in_seconds} -> result
end
end

defp update_multivalidation(multivalidation_id, changes) do
validation = %{oban_args: oban_args} = MultiValidation |> preload(:metadata) |> Repo.get!(multivalidation_id)

# update oban_args with validator output
oban_args = Map.merge(oban_args, Map.get(changes, :oban_args, %{}))
changes = changes |> Map.put(:oban_args, oban_args)

{metadata, changes} = Map.pop(changes, :metadata)

validation
|> change(changes)
|> put_assoc(:metadata, %{metadata: metadata})
|> Repo.update!()

:ok
end
end
89 changes: 32 additions & 57 deletions apps/transport/lib/jobs/on_demand_validation_job.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,54 +7,40 @@ defmodule Transport.Jobs.OnDemandValidationJob do
"""
use Oban.Worker, tags: ["validation"], max_attempts: 5, queue: :on_demand_validation
require Logger
import Ecto.Changeset
import Ecto.Query
alias DB.{MultiValidation, Repo}
alias Shared.Validation.JSONSchemaValidator.Wrapper, as: JSONSchemaValidator
alias Shared.Validation.TableSchemaValidator.Wrapper, as: TableSchemaValidator
alias Transport.DataVisualization
alias Transport.Jobs.OnDemandNeTExPollerJob
alias Transport.Jobs.OnDemandValidationHelpers, as: Helpers
alias Transport.Validators.GTFSRT
alias Transport.Validators.GTFSTransport
alias Transport.Validators.NeTEx

@download_timeout_ms 10_000

@impl Oban.Worker
def perform(%Oban.Job{args: %{"id" => multivalidation_id, "state" => "waiting"} = payload}) do
changes =
result =
try do
perform_validation(payload)
rescue
e -> %{oban_args: %{"state" => "error", "error_reason" => inspect(e)}}
e -> %{oban_args: Helpers.error(inspect(e))} |> Helpers.terminal_state()
end

validation = %{oban_args: oban_args} = MultiValidation |> preload(:metadata) |> Repo.get!(multivalidation_id)

# update oban_args with validator output
oban_args = Map.merge(oban_args, changes.oban_args)
changes = changes |> Map.put(:oban_args, oban_args)

{metadata, changes} = Map.pop(changes, :metadata)

validation
|> change(changes)
|> put_assoc(:metadata, %{
metadata: metadata
})
|> Repo.update!()

Helpers.handle_validation_result(result, multivalidation_id)
after
if Map.has_key?(payload, "filename") do
Transport.S3.delete_object!(:on_demand_validation, payload["filename"])
end

:ok
end

defp perform_validation(%{"type" => "gtfs", "permanent_url" => url}) do
validator = GTFSTransport.validator_name()

case GTFSTransport.validate(url) do
{:error, msg} ->
%{oban_args: %{"state" => "error", "error_reason" => msg}, validator: validator}
%{oban_args: Helpers.error(msg), validator: validator}
|> Helpers.terminal_state()

{:ok, %{"validations" => validation, "metadata" => metadata}} ->
%{
Expand All @@ -65,32 +51,22 @@ defmodule Transport.Jobs.OnDemandValidationJob do
command: GTFSTransport.command(url),
validated_data_name: url,
max_error: GTFSTransport.get_max_severity_error(validation),
oban_args: %{
"state" => "completed"
}
oban_args: Helpers.completed()
}
|> Helpers.terminal_state()
end
end

defp perform_validation(%{"type" => "netex", "permanent_url" => url}) do
validator = NeTEx.validator_name()
defp perform_validation(%{"type" => "netex", "id" => multivalidation_id, "permanent_url" => url}) do
case NeTEx.validate(url) do
{:error, error_result} ->
OnDemandNeTExPollerJob.handle_error(error_result)

case NeTEx.validate(url, []) do
{:error, %{message: msg}} ->
%{oban_args: %{"state" => "error", "error_reason" => msg}, validator: validator}
{:ok, ok_result} ->
OnDemandNeTExPollerJob.handle_success(ok_result, url)

{:ok, %{"validations" => validation, "metadata" => metadata}} ->
%{
result: validation,
metadata: metadata,
data_vis: nil,
validator: validator,
validated_data_name: url,
max_error: NeTEx.get_max_severity_error(validation),
oban_args: %{
"state" => "completed"
}
}
{:pending, validation_id} ->
OnDemandNeTExPollerJob.later(validation_id, multivalidation_id, url)
end
end

Expand All @@ -103,12 +79,14 @@ defmodule Transport.Jobs.OnDemandValidationJob do

case TableSchemaValidator.validate(schema_name, url) do
nil ->
%{oban_args: %{"state" => "error", "error_reason" => "could not perform validation"}, validator: validator}
%{oban_args: Helpers.error("could not perform validation"), validator: validator}
|> Helpers.terminal_state()

# https://github.com/etalab/transport-site/issues/2390
# validator name should come from validator module, when it is properly extracted
validation ->
%{oban_args: %{"state" => "completed"}, result: validation, validator: validator}
%{oban_args: Helpers.completed(), result: validation, validator: validator}
|> Helpers.terminal_state()
end
end

Expand All @@ -127,15 +105,14 @@ defmodule Transport.Jobs.OnDemandValidationJob do
) do
nil ->
%{
oban_args: %{
"state" => "error",
"error_reason" => "could not perform validation"
},
oban_args: Helpers.error("could not perform validation"),
validator: validator
}
|> Helpers.terminal_state()

validation ->
%{oban_args: %{"state" => "completed"}, result: validation, validator: validator}
%{oban_args: Helpers.completed(), result: validation, validator: validator}
|> Helpers.terminal_state()
end
end

Expand All @@ -155,11 +132,12 @@ defmodule Transport.Jobs.OnDemandValidationJob do

result
|> Map.merge(%{validated_data_name: gtfs_rt_url, secondary_validated_data_name: gtfs_url})
|> Helpers.terminal_state()
end

defp normalize_download(result) do
case result do
{:error, reason} -> {:error, %{"state" => "error", "error_reason" => reason}}
{:error, reason} -> {:error, Helpers.error(reason)}
{:ok, path, _} -> {:ok, path}
end
end
Expand Down Expand Up @@ -191,26 +169,23 @@ defmodule Transport.Jobs.OnDemandValidationJob do
# https://github.com/etalab/transport-site/issues/2390
# to do: transport-tools version when available
%{
oban_args: %{"state" => "completed"},
oban_args: Helpers.completed(),
result: validation,
validator: GTFSRT.validator_name(),
command: inspect(validator_args)
}

:error ->
%{
oban_args: %{
"state" => "error",
"error_reason" => "Could not run validator. Please provide a GTFS and a GTFS-RT."
}
oban_args: Helpers.error("Could not run validator. Please provide a GTFS and a GTFS-RT.")
}
end

{:error, reason} ->
if not ignore_shapes and String.contains?(reason, "java.lang.OutOfMemoryError") do
run_save_gtfs_rt_validation(gtfs_path, gtfs_rt_path, ignore_shapes: true)
else
%{oban_args: %{"state" => "error", "error_reason" => inspect(reason)}}
%{oban_args: Helpers.error(inspect(reason))}
end
end
end
Expand Down
Loading

0 comments on commit f4e5d51

Please sign in to comment.