Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into registre-arrets/pre…
Browse files Browse the repository at this point in the history
…mier-modele
  • Loading branch information
ptitfred committed Jan 20, 2025
2 parents 89dad4a + 410255d commit a6e4446
Show file tree
Hide file tree
Showing 9 changed files with 303 additions and 342 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
defmodule Transport.Jobs.ExpirationAdminProducerNotificationJob do
@moduledoc """
This module is in charge of sending notifications to admins and producers when data is outdated.
It is similar to `Transport.Jobs.ExpirationNotificationJob`, dedicated to reusers.
Both could be merged in the future.
"""

use Oban.Worker, max_attempts: 3, tags: ["notifications"]
import Ecto.Query

@type delay_and_records :: {integer(), [{DB.Dataset.t(), [DB.Resource.t()]}]}
@expiration_reason Transport.NotificationReason.reason(:expiration)
# If delay < 0, the resource is already expired
@default_outdated_data_delays [-90, -60, -30, -45, -15, -7, -3, 0, 7, 14]

@impl Oban.Worker

def perform(%Oban.Job{id: job_id}) do
outdated_data(job_id)
:ok
end

def outdated_data(job_id) do
for delay <- possible_delays(),
date = Date.add(Date.utc_today(), delay) do
{delay, gtfs_datasets_expiring_on(date)}
end
|> Enum.reject(fn {_, records} -> Enum.empty?(records) end)
|> send_outdated_data_admin_mail()
|> Enum.map(&send_outdated_data_producer_notifications(&1, job_id))
end

@spec gtfs_datasets_expiring_on(Date.t()) :: [{DB.Dataset.t(), [DB.Resource.t()]}]
def gtfs_datasets_expiring_on(%Date{} = date) do
DB.Dataset.base_query()
|> DB.Dataset.join_from_dataset_to_metadata(Transport.Validators.GTFSTransport.validator_name())
|> where(
[metadata: m, resource: r],
fragment("TO_DATE(?->>'end_date', 'YYYY-MM-DD')", m.metadata) == ^date and r.format == "GTFS"
)
|> select([dataset: d, resource: r], {d, r})
|> distinct(true)
|> DB.Repo.all()
|> Enum.group_by(fn {%DB.Dataset{} = d, _} -> d end, fn {_, %DB.Resource{} = r} -> r end)
|> Enum.to_list()
end

def possible_delays do
@default_outdated_data_delays
|> Enum.uniq()
|> Enum.sort()
end

# A different email is sent to producers for every delay, containing all datasets expiring on this given delay
@spec send_outdated_data_producer_notifications(delay_and_records(), integer()) :: :ok
def send_outdated_data_producer_notifications({delay, records}, job_id) do
Enum.each(records, fn {%DB.Dataset{} = dataset, resources} ->
@expiration_reason
|> DB.NotificationSubscription.subscriptions_for_reason_dataset_and_role(dataset, :producer)
|> Enum.each(fn %DB.NotificationSubscription{contact: %DB.Contact{} = contact} = subscription ->
contact
|> Transport.UserNotifier.expiration_producer(dataset, resources, delay)
|> Transport.Mailer.deliver()

DB.Notification.insert!(dataset, subscription, %{delay: delay, job_id: job_id})
end)
end)
end

@spec send_outdated_data_admin_mail([delay_and_records()]) :: [delay_and_records()]
defp send_outdated_data_admin_mail([] = _records), do: []

defp send_outdated_data_admin_mail(records) do
Transport.AdminNotifier.expiration(records)
|> Transport.Mailer.deliver()

records
end
end
2 changes: 2 additions & 0 deletions apps/transport/lib/jobs/expiration_notification_job.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ defmodule Transport.Jobs.ExpirationNotificationJob do
It has 2 `perform/1` methods:
- a dispatcher one in charge of identifying contacts we should get in touch with today
- another in charge of building the daily digest for a specific contact (with only their favorited datasets)
It is similar to `Transport.Jobs.ExpirationAdminProducerNotificationJob`, dedicated to producers and admins.
"""
use Oban.Worker,
max_attempts: 3,
Expand Down
21 changes: 19 additions & 2 deletions apps/transport/lib/jobs/new_dataset_notifications_job.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ defmodule Transport.Jobs.NewDatasetNotificationsJob do
"""
use Oban.Worker, max_attempts: 3, tags: ["notifications"]
import Ecto.Query
@new_dataset_reason Transport.NotificationReason.reason(:new_dataset)

@impl Oban.Worker
def perform(%Oban.Job{inserted_at: %DateTime{} = inserted_at}) do
inserted_at |> relevant_datasets() |> Transport.DataChecker.send_new_dataset_notifications()

def perform(%Oban.Job{id: job_id, inserted_at: %DateTime{} = inserted_at}) do
inserted_at |> relevant_datasets() |> send_new_dataset_notifications(job_id)
:ok
end

Expand All @@ -18,4 +20,19 @@ defmodule Transport.Jobs.NewDatasetNotificationsJob do
|> where([dataset: d], d.inserted_at >= ^datetime_limit)
|> DB.Repo.all()
end

@spec send_new_dataset_notifications([DB.Dataset.t()] | [], pos_integer()) :: no_return() | :ok
def send_new_dataset_notifications([], _job_id), do: :ok

def send_new_dataset_notifications(datasets, job_id) do
@new_dataset_reason
|> DB.NotificationSubscription.subscriptions_for_reason_and_role(:reuser)
|> Enum.each(fn %DB.NotificationSubscription{contact: %DB.Contact{} = contact} = subscription ->
contact
|> Transport.UserNotifier.new_datasets(datasets)
|> Transport.Mailer.deliver()

DB.Notification.insert!(subscription, %{dataset_ids: Enum.map(datasets, & &1.id), job_id: job_id})
end)
end
end
100 changes: 1 addition & 99 deletions apps/transport/lib/transport/data_checker.ex
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
defmodule Transport.DataChecker do
@moduledoc """
Use to check data, and act about it, like send email
Use to check data for toggling on and off active status of datasets depending on status on data.gouv.fr
"""
alias DB.{Dataset, Repo}
import Ecto.Query
require Logger

@type delay_and_records :: {integer(), [{DB.Dataset.t(), [DB.Resource.t()]}]}
@type dataset_status :: :active | :inactive | :ignore | :no_producer | {:archived, DateTime.t()}
@expiration_reason Transport.NotificationReason.reason(:expiration)
@new_dataset_reason Transport.NotificationReason.reason(:new_dataset)
# If delay < 0, the resource is already expired
@default_outdated_data_delays [-90, -60, -30, -45, -15, -7, -3, 0, 7, 14]

@doc """
This method is a scheduled job which does two things:
Expand Down Expand Up @@ -106,99 +101,6 @@ defmodule Transport.DataChecker do
)
end

def outdated_data do
# Generated as an integer rather than a UUID because `payload.job_id`
# for other notifications are %Oban.Job.id (bigint).
job_id = Enum.random(1..Integer.pow(2, 63))

for delay <- possible_delays(),
date = Date.add(Date.utc_today(), delay) do
{delay, gtfs_datasets_expiring_on(date)}
end
|> Enum.reject(fn {_, records} -> Enum.empty?(records) end)
|> send_outdated_data_mail()
|> Enum.map(&send_outdated_data_notifications(&1, job_id))
end

@spec gtfs_datasets_expiring_on(Date.t()) :: [{DB.Dataset.t(), [DB.Resource.t()]}]
def gtfs_datasets_expiring_on(%Date{} = date) do
DB.Dataset.base_query()
|> DB.Dataset.join_from_dataset_to_metadata(Transport.Validators.GTFSTransport.validator_name())
|> where(
[metadata: m, resource: r],
fragment("TO_DATE(?->>'end_date', 'YYYY-MM-DD')", m.metadata) == ^date and r.format == "GTFS"
)
|> select([dataset: d, resource: r], {d, r})
|> distinct(true)
|> DB.Repo.all()
|> Enum.group_by(fn {%DB.Dataset{} = d, _} -> d end, fn {_, %DB.Resource{} = r} -> r end)
|> Enum.to_list()
end

def possible_delays do
@default_outdated_data_delays
|> Enum.uniq()
|> Enum.sort()
end

@spec send_new_dataset_notifications([Dataset.t()] | []) :: no_return() | :ok
def send_new_dataset_notifications([]), do: :ok

def send_new_dataset_notifications(datasets) do
# Generated as an integer rather than a UUID because `payload.job_id`
# for other notifications are %Oban.Job.id (bigint).
job_id = Enum.random(1..Integer.pow(2, 63))

@new_dataset_reason
|> DB.NotificationSubscription.subscriptions_for_reason_and_role(:reuser)
|> Enum.each(fn %DB.NotificationSubscription{contact: %DB.Contact{} = contact} = subscription ->
contact
|> Transport.UserNotifier.new_datasets(datasets)
|> Transport.Mailer.deliver()

DB.Notification.insert!(subscription, %{dataset_ids: Enum.map(datasets, & &1.id), job_id: job_id})
end)
end

@spec send_outdated_data_notifications(delay_and_records(), integer()) :: delay_and_records()
def send_outdated_data_notifications({delay, records} = payload, job_id) do
Enum.each(records, fn {%DB.Dataset{} = dataset, resources} ->
@expiration_reason
|> DB.NotificationSubscription.subscriptions_for_reason_dataset_and_role(dataset, :producer)
|> Enum.each(fn %DB.NotificationSubscription{contact: %DB.Contact{} = contact} = subscription ->
contact
|> Transport.UserNotifier.expiration_producer(dataset, resources, delay)
|> Transport.Mailer.deliver()

DB.Notification.insert!(dataset, subscription, %{delay: delay, job_id: job_id})
end)
end)

payload
end

@doc """
iex> resource_titles([%DB.Resource{title: "B"}])
"B"
iex> resource_titles([%DB.Resource{title: "B"}, %DB.Resource{title: "A"}])
"A, B"
"""
def resource_titles(resources) do
resources
|> Enum.sort_by(fn %DB.Resource{title: title} -> title end)
|> Enum.map_join(", ", fn %DB.Resource{title: title} -> title end)
end

@spec send_outdated_data_mail([delay_and_records()]) :: [delay_and_records()]
defp send_outdated_data_mail([] = _records), do: []

defp send_outdated_data_mail(records) do
Transport.AdminNotifier.expiration(records)
|> Transport.Mailer.deliver()

records
end

# Do nothing if all lists are empty
defp send_inactive_datasets_mail([] = _reactivated_datasets, [] = _inactive_datasets, [] = _archived_datasets),
do: nil
Expand Down
2 changes: 0 additions & 2 deletions apps/transport/lib/transport/scheduler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ defmodule Transport.Scheduler do
[
# Every day at 4am UTC
{"0 4 * * *", {Transport.ImportData, :import_validate_all, []}},
# Send email for outdated data
{"@daily", {Transport.DataChecker, :outdated_data, []}},
# Set inactive data
{"@daily", {Transport.DataChecker, :inactive_data, []}},
# Watch for new comments on datasets
Expand Down
Loading

0 comments on commit a6e4446

Please sign in to comment.