Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(autoheal): attempt healing complex asymmetric partitions #240

Merged
merged 10 commits into from
Dec 18, 2024
8 changes: 4 additions & 4 deletions .github/workflows/run_test_case.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ jobs:
runs-on: ubuntu-latest

container:
image: erlang:22.1
image: erlang:24

steps:
- uses: actions/checkout@v1
- uses: actions/checkout@v4
- name: Run tests
run: |
make eunit
Expand All @@ -23,12 +23,12 @@ jobs:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
make coveralls
- uses: actions/upload-artifact@v1
- uses: actions/upload-artifact@v4
if: always()
with:
name: logs
path: _build/test/logs
- uses: actions/upload-artifact@v1
- uses: actions/upload-artifact@v4
with:
name: cover
path: _build/test/cover
10 changes: 8 additions & 2 deletions src/ekka.appup.src
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
%% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN,
[{"0.8.1.11",
[{"0.8.1.12",
thalesmg marked this conversation as resolved.
Show resolved Hide resolved
[{load_module,ekka_node_monitor,brutal_purge,soft_purge,[]},
{load_module,ekka_autoheal,brutal_purge,soft_purge,[]}]},
{"0.8.1.11",
[{load_module,ekka_cluster_strategy,brutal_purge,soft_purge,[]},
{load_module,ekka_autocluster,brutal_purge,soft_purge,[]}]},
{"0.8.1.10",
Expand Down Expand Up @@ -67,7 +70,10 @@
{load_module,ekka_httpc,brutal_purge,soft_purge,[]},
{load_module,ekka_mnesia,brutal_purge,soft_purge,[]},
{load_module,ekka_dist,brutal_purge,soft_purge,[]}]}],
[{"0.8.1.11",
[{"0.8.1.12",
[{load_module,ekka_node_monitor,brutal_purge,soft_purge,[]},
{load_module,ekka_autoheal,brutal_purge,soft_purge,[]}]},
{"0.8.1.11",
[{load_module,ekka_cluster_strategy,brutal_purge,soft_purge,[]},
{load_module,ekka_autocluster,brutal_purge,soft_purge,[]}]},
{"0.8.1.10",
Expand Down
131 changes: 95 additions & 36 deletions src/ekka_autoheal.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

-module(ekka_autoheal).

-include_lib("snabbkaffe/include/trace.hrl").

-export([ init/0
, enabled/0
, proc/1
Expand All @@ -24,7 +26,7 @@

-record(autoheal, {delay, role, proc, timer}).

-type(autoheal() :: #autoheal{}).
-type autoheal() :: #autoheal{}.

-export_type([autoheal/0]).

Expand All @@ -47,8 +49,7 @@ enabled() ->
end.

proc(undefined) -> undefined;
proc(#autoheal{proc = Proc}) ->
Proc.
proc(#autoheal{proc = Proc}) -> Proc.

handle_msg(Msg, undefined) ->
?LOG(error, "Autoheal not enabled! Unexpected msg: ~p", [Msg]), undefined;
Expand Down Expand Up @@ -76,12 +77,24 @@ handle_msg(Msg = {create_splitview, Node}, Autoheal = #autoheal{delay = Delay, t
Nodes = ekka_mnesia:cluster_nodes(all),
case rpc:multicall(Nodes, ekka_mnesia, cluster_view, [], 30000) of
{Views, []} ->
SplitView = lists:sort(fun compare_view/2, lists:usort(Views)),
ekka_node_monitor:cast(coordinator(SplitView), {heal_partition, SplitView});
SplitView = find_split_view(Nodes, Views),
HealPlan = find_heal_plan(SplitView),
case HealPlan of
{Candidates = [_ | _], Minority} ->
%% Non-empty list of candidates, choose a coordinator.
CoordNode = pick_coordinator(Candidates),
ekka_node_monitor:cast(CoordNode, {heal_cluster, Minority, SplitView});
{[], Cluster} ->
%% It's very unlikely but possible to have empty list of candidates.
ekka_node_monitor:cast(node(), {heal_cluster, Cluster, SplitView});
{} ->
ignore
end,
Autoheal#autoheal{timer = undefined};
{_Views, BadNodes} ->
?LOG(critical, "Bad nodes found when autoheal: ~p", [BadNodes])
end,
Autoheal#autoheal{timer = undefined};
?LOG(critical, "Bad nodes found when autoheal: ~p", [BadNodes]),
Autoheal#autoheal{timer = ekka_node_monitor:run_after(Delay, {autoheal, Msg})}
end;
false ->
Autoheal#autoheal{timer = ekka_node_monitor:run_after(Delay, {autoheal, Msg})}
end;
Expand All @@ -91,51 +104,98 @@ handle_msg(Msg = {create_splitview, _Node}, Autoheal) ->
Autoheal;

handle_msg({heal_partition, SplitView}, Autoheal = #autoheal{proc = undefined}) ->
%% NOTE: Backward compatibility.
case SplitView of
%% No partitions.
[] -> Autoheal;
[{_, []}] -> Autoheal;
zmstone marked this conversation as resolved.
Show resolved Hide resolved
%% Partitions.
SplitView ->
Proc = spawn_link(fun() -> heal_partition(SplitView) end),
Autoheal#autoheal{role = coordinator, proc = Proc}
end;

handle_msg({heal_cluster, Minority, SplitView}, Autoheal = #autoheal{proc = undefined}) ->
Proc = spawn_link(fun() ->
?LOG(info, "Healing partition: ~p", [SplitView]),
_ = heal_partition(SplitView)
end),
?tp(notice, "Healing cluster partition", #{
need_reboot => Minority,
split_view => SplitView
}),
reboot_minority(Minority -- [node()])
end),
Autoheal#autoheal{role = coordinator, proc = Proc};

handle_msg({heal_partition, SplitView}, Autoheal= #autoheal{proc = _Proc}) ->
handle_msg({heal_partition, SplitView}, Autoheal = #autoheal{proc = _Proc}) ->
?LOG(critical, "Unexpected heal_partition msg: ~p", [SplitView]),
Autoheal;

handle_msg({'EXIT', Pid, normal}, Autoheal = #autoheal{proc = Pid}) ->
Autoheal#autoheal{proc = undefined};
handle_msg({'EXIT', Pid, Reason}, Autoheal = #autoheal{proc = Pid}) ->
?LOG(critical, "Autoheal process crashed: ~s", [Reason]),
?LOG(critical, "Autoheal process crashed: ~p", [Reason]),
_Retry = ekka_node_monitor:run_after(1000, confirm_partition),
Autoheal#autoheal{proc = undefined};

handle_msg(Msg, Autoheal) ->
?LOG(critical, "Unexpected msg: ~p", [Msg, Autoheal]),
Autoheal.

compare_view({Running1, _} , {Running2, _}) ->
Len1 = length(Running1), Len2 = length(Running2),
if
Len1 > Len2 -> true;
Len1 == Len2 -> lists:member(node(), Running1);
true -> false
find_split_view(Nodes, Views) ->
ClusterView = lists:zipwith(
fun(N, {Running, Stopped}) -> {N, Running, Stopped} end,
Nodes,
Views
),
MajorityView = lists:sort(fun compare_node_views/2, ClusterView),
find_split_view(MajorityView).

compare_node_views({_N1, Running1, _}, {_N2, Running2, _}) ->
Len1 = length(Running1),
Len2 = length(Running2),
case Len1 of
%% Prefer partitions with higher number of surviving nodes.
L when L > Len2 -> true;
%% If number of nodes is the same, sort by list of running nodes.
Len2 -> Running1 < Running2;
L when L < Len2 -> false
end.

coordinator([{Nodes, _} | _]) ->
ekka_membership:coordinator(Nodes).

-spec heal_partition(list()) -> list(node()).
heal_partition([]) ->
[];
%% All nodes connected.
heal_partition([{_, []}]) ->
[];
%% Partial partitions happened.
heal_partition([{Nodes, []}|_]) ->
find_split_view([{_Node, _Running, []} | Views]) ->
%% Node observes no partitions, ignore.
find_split_view(Views);
find_split_view([View = {_Node, _Running, Partitioned} | Views]) ->
%% Node observes some nodes as partitioned from it.
%% These nodes need to be rebooted, and as such they should not be part of split view.
Rest = lists:foldl(fun(N, Acc) -> lists:keydelete(N, 1, Acc) end, Views, Partitioned),
[View | find_split_view(Rest)];
find_split_view([]) ->
[].

find_heal_plan([{_Node, R0, P0} | Rest]) ->
%% If we have more than one parition in split view, we need to reboot _all_ of the nodes
%% in each view's partition (i.e. ⋃(Partitions)) for better safety. But then we need to
%% find candidates to do it, as ⋃(Running) ∖ ⋃(Partitions).
{_Nodes, Rs, Ps} = lists:unzip3(Rest),
URunning = ordsets:union(lists:map(fun ordsets:from_list/1, [R0 | Rs])),
UPartitions = ordsets:union(lists:map(fun ordsets:from_list/1, [P0 | Ps])),
{ordsets:subtract(URunning, UPartitions), UPartitions};
find_heal_plan([]) ->
{}.

pick_coordinator(Candidates) ->
case lists:member(node(), Candidates) of
true -> node();
false -> ekka_membership:coordinator(Candidates)
end.

heal_partition([{Nodes, []} | _] = SplitView) ->
%% Symmetric partition.
?LOG(info, "Healing partition: ~p", [SplitView]),
reboot_minority(Nodes -- [node()]);
zmstone marked this conversation as resolved.
Show resolved Hide resolved
heal_partition([{Majority, Minority}, {Minority, Majority}]) ->
reboot_minority(Minority);
heal_partition(SplitView) ->
?LOG(critical, "Cannot heal the partitions: ~p", [SplitView]),
error({unknown_splitview, SplitView}).
heal_partition([{Majority, Minority}, {Minority, Majority}] = SplitView) ->
%% Symmetric partition.
?LOG(info, "Healing partition: ~p", [SplitView]),
reboot_minority(Minority).

reboot_minority(Minority) ->
lists:foreach(fun shutdown/1, Minority),
Expand All @@ -155,4 +215,3 @@ ensure_cancel_timer(undefined) ->
ok;
ensure_cancel_timer(TRef) ->
catch erlang:cancel_timer(TRef).

9 changes: 8 additions & 1 deletion src/ekka_node_monitor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,10 @@ handle_cast({confirm, TargetNode, Status}, State) ->

handle_cast(Msg = {report_partition, _Node}, State) ->
{noreply, autoheal_handle_msg(Msg, State)};

handle_cast(Msg = {heal_partition, _SplitView}, State) ->
{noreply, autoheal_handle_msg(Msg, State)};
handle_cast(Msg = {heal_cluster, _, _}, State) ->
{noreply, autoheal_handle_msg(Msg, State)};

handle_cast(Msg, State) ->
?LOG(error, "Unexpected cast: ~p", [Msg]),
Expand Down Expand Up @@ -142,6 +143,12 @@ handle_info({mnesia_system_event, {mnesia_up, Node}},
false -> ok;
true -> ekka_membership:partition_healed(Node)
end,
%% If there was an anymmetric cluster partition, we might need more
%% autoheal iterations to completely bring the cluster back to normal.
case ekka_autoheal:enabled() of
{true, _} -> run_after(3000, confirm_partition);
false -> ignore
end,
{noreply, State#state{partitions = lists:delete(Node, Partitions)}};

handle_info({mnesia_system_event, {mnesia_down, Node}}, State) ->
Expand Down
Loading
Loading