Skip to content

Commit

Permalink
Run migrations in order across repos (#4466)
Browse files Browse the repository at this point in the history
* order migrations

* migrate in 'streaks'

* cleanup

* docs

* tests

* add comment

* changelog

* continue

* run migrations tests in ci

* add pending_streams cmd

* fix ci

* add docs

* fix test

* simplify test

* only test v2+

* fix test

* return async: true

* cleanup

* add 'no pending migration'

* drop migrate/0 and pending_migrations/0

---------

Co-authored-by: Adrian Gruntkowski <[email protected]>
  • Loading branch information
ruslandoga and zoldar authored Sep 9, 2024
1 parent 9ba9e2a commit 8ee4827
Show file tree
Hide file tree
Showing 7 changed files with 323 additions and 40 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/elixir.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,12 @@ jobs:

- run: make minio
if: env.MIX_ENV == 'test'
- run: mix test --include slow --include minio --max-failures 1 --warnings-as-errors
- run: mix test --include slow --include minio --include migrations --max-failures 1 --warnings-as-errors
if: env.MIX_ENV == 'test'
env:
MINIO_HOST_FOR_CLICKHOUSE: "172.17.0.1"

- run: mix test --include slow --max-failures 1 --warnings-as-errors
- run: mix test --include slow --include migrations --max-failures 1 --warnings-as-errors
if: env.MIX_ENV == 'ce_test'

static:
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ All notable changes to this project will be documented in this file.
- Don't include imports when showing time series hourly interval. Previously imported data was shown each midnight
- Fix property filter suggestions 500 error when property hasn't been selected
- Bamboo.Mua: add Date and Message-ID headers if missing plausible/analytics#4474
- Fix migration order across `plausible_db` and `plausible_events_db` databases plausible/analytics#4466

## v2.1.1 - 2024-06-06

Expand Down
142 changes: 108 additions & 34 deletions lib/plausible_release.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,120 @@ defmodule Plausible.Release do
end
end

def migrate do
@doc """
`interweave_migrate/0` is a migration function that:
- Lists all pending migrations across multiple repositories.
- Sorts these migrations into a single list.
- Groups consecutive migrations by repository into "streaks".
- Executes the migrations in the correct order by processing each streak sequentially.
### Why Use This Approach?
This function resolves dependencies between migrations that span across different repositories.
The default `migrate/0` function migrates each repository independently, which may result in
migrations running in the wrong order when there are cross-repository dependencies.
Consider the following example (adapted from reality, not 100% accurate):
- **Migration 1**: The PostgreSQL (PG) repository creates a table named `site_imports`.
- **Migration 2**: The ClickHouse (CH) repository creates `import_id` columns in `imported_*` tables.
- **Migration 3**: The PG repository runs a data migration that utilizes both PG and CH databases,
reading from the `import_id` column in `imported_*` tables.
The default `migrate/0` would execute these migrations by repository, resulting in the following order:
1. Migration 1 (PG)
2. Migration 3 (PG)
3. Migration 2 (CH)
This sequence would fail at Migration 3, as the `import_id` columns in the CH repository have not been created yet.
`interweave_migrate/0` addresses this issue by consolidating all pending migrations into a single, ordered queue:
1. Migration 1 (PG)
2. Migration 2 (CH)
3. Migration 3 (PG)
This ensures all dependencies are resolved in the correct order.
"""
def interweave_migrate(repos \\ repos()) do
prepare()
Enum.each(repos(), &run_migrations_for/1)
IO.puts("Migrations successful!")

pending = all_pending_migrations(repos)
streaks = migration_streaks(pending)

Enum.each(streaks, fn {repo, up_to_version} ->
{:ok, _, _} = Ecto.Migrator.with_repo(repo, &Ecto.Migrator.run(&1, :up, to: up_to_version))
end)
end

defp migration_streaks(pending_migrations) do
sorted_migrations =
pending_migrations
|> Enum.map(fn {repo, version, _name} -> {repo, version} end)
|> Enum.sort_by(fn {_repo, version} -> version end, :asc)

streaks_reversed =
Enum.reduce(sorted_migrations, [], fn {repo, _version} = latest_migration, streaks_acc ->
case streaks_acc do
# start the streak for repo
[] -> [latest_migration]
# extend the streak
[{^repo, _prev_version} | rest] -> [latest_migration | rest]
# end the streak for prev_repo, start the streak for repo
[{_prev_repo, _prev_version} | _rest] -> [latest_migration | streaks_acc]
end
end)

:lists.reverse(streaks_reversed)
end

@spec all_pending_migrations([Ecto.Repo.t()]) :: [{Ecto.Repo.t(), integer, String.t()}]
defp all_pending_migrations(repos) do
Enum.flat_map(repos, fn repo ->
# credo:disable-for-lines:6 Credo.Check.Refactor.Nesting
{:ok, pending, _started} =
Ecto.Migrator.with_repo(repo, fn repo ->
Ecto.Migrator.migrations(repo)
|> Enum.filter(fn {status, _version, _name} -> status == :down end)
|> Enum.map(fn {_status, version, name} -> {repo, version, name} end)
end)

pending
end)
end

def pending_migrations do
def pending_streaks(repos \\ repos()) do
prepare()
IO.puts("Pending migrations")
IO.puts("")
Enum.each(repos(), &list_pending_migrations_for/1)
IO.puts("Collecting pending migrations..")

pending = all_pending_migrations(repos)

if pending == [] do
IO.puts("No pending migrations!")
else
streaks = migration_streaks(pending)
print_migration_streaks(streaks, pending)
end
end

defp print_migration_streaks([{repo, up_to_version} | streaks], pending) do
{streak, pending} =
Enum.split_with(pending, fn {pending_repo, version, _name} ->
pending_repo == repo and version <= up_to_version
end)

IO.puts(
"\n#{inspect(repo)} [#{Path.relative_to_cwd(Ecto.Migrator.migrations_path(repo))}] streak up to version #{up_to_version}:"
)

Enum.each(streak, fn {_repo, version, name} -> IO.puts(" * #{version}_#{name}") end)
print_migration_streaks(streaks, pending)
end

defp print_migration_streaks([], []), do: :ok

def seed do
prepare()
# Run seed script
Expand Down Expand Up @@ -123,33 +224,6 @@ defmodule Plausible.Release do
end
end

defp run_migrations_for(repo) do
IO.puts("Running migrations for #{repo}")
{:ok, _, _} = Ecto.Migrator.with_repo(repo, &Ecto.Migrator.run(&1, :up, all: true))
end

defp list_pending_migrations_for(repo) do
IO.puts("Listing pending migrations for #{repo}")
IO.puts("")

migration_directory = Ecto.Migrator.migrations_path(repo)

pending =
repo
|> Ecto.Migrator.migrations([migration_directory])
|> Enum.filter(fn {status, _version, _migration} -> status == :down end)

if pending == [] do
IO.puts("No pending migrations")
else
Enum.each(pending, fn {_, version, migration} ->
IO.puts("* #{version}_#{migration}")
end)
end

IO.puts("")
end

defp ensure_repo_created(repo) do
IO.puts("create #{inspect(repo)} database if it doesn't exist")

Expand Down
2 changes: 1 addition & 1 deletion rel/overlays/migrate.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@

BIN_DIR=$(dirname "$0")

"${BIN_DIR}"/bin/plausible eval Plausible.Release.migrate
"${BIN_DIR}"/bin/plausible eval Plausible.Release.interweave_migrate
2 changes: 1 addition & 1 deletion rel/overlays/pending-migrations.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@

BIN_DIR=$(dirname "$0")

"${BIN_DIR}"/bin/plausible eval Plausible.Release.pending_migrations
"${BIN_DIR}"/bin/plausible eval Plausible.Release.pending_streaks
Loading

0 comments on commit 8ee4827

Please sign in to comment.