Skip to content

Commit

Permalink
Merge 'max-au/fix-pg-monitor-crash' into maint
Browse files Browse the repository at this point in the history
OTP-18833
  • Loading branch information
sverker committed Oct 30, 2023
2 parents 045e704 + c132771 commit d8c25c9
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 24 deletions.
45 changes: 22 additions & 23 deletions lib/kernel/src/pg.erl
Original file line number Diff line number Diff line change
Expand Up @@ -306,13 +306,13 @@ handle_call({leave_local, Group, PidOrPids}, _From, #state{scope = Scope, local
handle_call(monitor, {Pid, _Tag}, #state{scope = Scope, scope_monitors = ScopeMon} = State) ->
%% next line could also be done with iterating over process state, but it appears to be slower
Local = #{G => P || [G,P] <- ets:match(Scope, {'$1', '$2', '_'})},
MRef = erlang:monitor(process, Pid), %% monitor the monitor, to discard it upon termination, and generate MRef
MRef = erlang:monitor(process, Pid, [{tag, {'DOWN', scope_monitors}}]), %% to discard it upon termination
{reply, {MRef, Local}, State#state{scope_monitors = ScopeMon#{MRef => Pid}}};

handle_call({monitor, Group}, {Pid, _Tag}, #state{scope = Scope, group_monitors = GM, monitored_groups = MG} = State) ->
%% ETS cache is writable only from this process - so get_members is safe to use
Members = get_members(Scope, Group),
MRef = erlang:monitor(process, Pid),
MRef = erlang:monitor(process, Pid, [{tag, {'DOWN', group_monitors}}]),
NewMG = maps:update_with(Group, fun (Ex) -> [{Pid, MRef} | Ex] end, [{Pid, MRef}], MG),
{reply, {MRef, Members}, State#state{group_monitors = GM#{MRef => {Pid, Group}}, monitored_groups = NewMG}};

Expand Down Expand Up @@ -401,20 +401,21 @@ handle_info({discover, Peer}, State) ->
handle_info({discover, Peer, _ProtocolVersion}, State) ->
handle_discover(Peer, State);

%% handle local process exit, or a local monitor exit
%% handle local process exit
handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, local = Local,
remote = Remote, scope_monitors = ScopeMon, monitored_groups = MG} = State) when node(Pid) =:= node() ->
case maps:take(Pid, Local) of
error ->
{noreply, maybe_drop_monitor(MRef, State)};
%% ignore late monitor: this can only happen when leave request and 'DOWN' are in pg queue
{noreply, State};
{{MRef, Groups}, NewLocal} ->
[leave_local_update_ets(Scope, ScopeMon, MG, Group, Pid) || Group <- Groups],
%% send update to all remote peers
broadcast(maps:keys(Remote), {leave, self(), Pid, Groups}),
{noreply, State#state{local = NewLocal}}
end;

%% handle remote node down or scope leaving overlay network, or a monitor from the remote node went down
%% handle remote node down or scope leaving overlay network
handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, remote = Remote,
scope_monitors = ScopeMon, monitored_groups = MG} = State) ->
case maps:take(Pid, Remote) of
Expand All @@ -423,7 +424,22 @@ handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, remote =
leave_remote_update_ets(Scope, ScopeMon, MG, Pids, [Group]) end, RemoteMap),
{noreply, State#state{remote = NewRemote}};
error ->
{noreply, maybe_drop_monitor(MRef, State)}
{noreply, MRef, State}
end;

%% handle scope monitor exiting
handle_info({{'DOWN', scope_monitors}, MRef, process, _Pid, _Info}, #state{scope_monitors = ScopeMon} = State) ->
{noreply, State#state{scope_monitors = maps:remove(MRef, ScopeMon)}};

%% handle group monitor exiting
handle_info({{'DOWN', group_monitors}, MRef, process, Pid, _Info}, #state{
group_monitors = GMs, monitored_groups = MG} = State) ->
case maps:take(MRef, GMs) of
error ->
{noreply, State};
{{Pid, Group}, NewGM} ->
%% clean up the inverse map
{noreply, State#state{group_monitors = NewGM, monitored_groups = demonitor_group({Pid, MRef}, Group, MG)}}
end;

%% nodedown: ignore, and wait for 'DOWN' signal for monitored process
Expand Down Expand Up @@ -670,23 +686,6 @@ broadcast([Dest | Tail], Msg) ->
erlang:send(Dest, Msg, [noconnect]),
broadcast(Tail, Msg).

%% drops a monitor if DOWN was received
maybe_drop_monitor(MRef, #state{scope_monitors = ScopeMon, group_monitors = GMs, monitored_groups = MG} = State) ->
%% could be a local monitor going DOWN. Since it's a rare event, check should
%% not stay in front of any other, more frequent events
case maps:take(MRef, ScopeMon) of
error ->
case maps:take(MRef, GMs) of
error ->
State;
{{Pid, Group}, NewGM} ->
%% clean up the inverse map
State#state{group_monitors = NewGM, monitored_groups = demonitor_group({Pid, MRef}, Group, MG)}
end;
{_Pid, NewScopeMon} ->
State#state{scope_monitors = NewScopeMon}
end.

demonitor_group(Tag, Group, MG) ->
case maps:find(Group, MG) of
{ok, [Tag]} ->
Expand Down
47 changes: 46 additions & 1 deletion lib/kernel/test/pg_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@
monitor_nonempty_scope/0, monitor_nonempty_scope/1,
monitor_scope/0, monitor_scope/1,
monitor/1,
monitor_self/1,
multi_monitor/1,
protocol_upgrade/1
]).

Expand Down Expand Up @@ -89,7 +91,7 @@ all() ->
groups() ->
[
{basic, [parallel], [errors, pg, leave_exit_race, single, overlay_missing,
protocol_upgrade]},
protocol_upgrade, monitor_self, multi_monitor]},
{performance, [], [thundering_herd]},
{cluster, [parallel], [process_owner_check, two, initial, netsplit, trisplit, foursplit,
exchange, nolocal, double, scope_restart, missing_scope_join, empty_group_by_remote_leave,
Expand Down Expand Up @@ -757,6 +759,49 @@ second_monitor(Msgs) ->
second_monitor([Msg | Msgs])
end.

%% Test for GH-7625: monitor process that joined a group
monitor_self(Config) when is_list(Config) ->
F = fun() ->
%% spawned process both monitor and group-joined
pg:monitor(?FUNCTION_NAME, ?FUNCTION_NAME),
pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, self())
end,
{Pid, Mon} = spawn_monitor(F),
receive
{'DOWN', Mon, process, Pid, Reason} ->
?assertEqual(normal, Reason)
end,
%% if pg crashes, next expression fails the test
sync(?FUNCTION_NAME).

%% check same process monitoring several things at once,
%% and also joining a few groups
multi_monitor(Config) when is_list(Config) ->
F = fun() ->
Self = self(),
%% spawned process both monitor and group-joined
{RefOne, []} = pg:monitor(?FUNCTION_NAME, one),
{RefTwo, []} = pg:monitor(?FUNCTION_NAME, two),
{RefScope, _} = pg:monitor_scope(?FUNCTION_NAME),
ok = pg:join(?FUNCTION_NAME, one, Self),
ok = pg:join(?FUNCTION_NAME, two, Self),
sync(?FUNCTION_NAME),
%% ensure receiving 4 messages: two per group this process
[wait_message(Ref, join, Group, [Self], "Local") || {Ref, Group} <-
[{RefOne, one}, {RefScope, one}, {RefTwo, two}, {RefScope, two}]]
end,
{Pid, Mon} = spawn_monitor(F),
receive
{'DOWN', Mon, process, Pid, Reason} ->
?assertEqual(normal, Reason)
end,
%% if pg crashes, next expression fails the test
sync(?FUNCTION_NAME),
%% white box: pg should not have any group or scope monitors
{state, _, _, _, SM, GM, _} = sys:get_state(?FUNCTION_NAME),
?assertEqual(#{}, SM),
?assertEqual(#{}, GM).

protocol_upgrade(Config) when is_list(Config) ->
Scope = ?FUNCTION_NAME,
Group = ?FUNCTION_NAME,
Expand Down

0 comments on commit d8c25c9

Please sign in to comment.