Skip to content

Commit

Permalink
Improve worker cleanup on early coordinator exit
Browse files Browse the repository at this point in the history
Previously, if the coordinator process is killed too quickly, before the stream
worker cleanup process is spawned, remote workers may be left around
waiting until the default 5 minute timeout expires.

In order to reliably clean up processes in that state, need to start the
cleaner process, with all the job references, before we start submitting them
for execution.

At first, it may seem impossible to monitor a process until after it's already
spawned. That's true for regular processes, however rexi operates on plain
references. For each process we spawn remotely we create a reference on the
coordinator side, which we can then use to track that job. Those are just plain
manually created references. Nothing stops us from creating them first, adding
them to a cleaner process, and only then submitting them.

That's exactly what this commit accomplishes:

  * Create a streams specific `fabric_streams:submit_jobs/4` function, which
  spawns the cleanup process early, generates worker references, and then
  submits the jobs. This way, all the existing streaming submit_jobs calls can
  be replaced easily in one line: `fabric_util` -> `fabric_streams`.

  * The cleanup process operates as previously: monitors the coordinator for
  exits, and fires off `kill_all` message to each node when needed.

  * Create `rexi:cast_ref(...)` variants of `rexi:cast(...)` calls, where the
  caller specifies the references as arguments. This is what allows us to start
  the cleanup process before the jobs are even submitted. Older calls can just
  be transformed to call into the `cast_ref` versions with their own created
  references.

Noticed that we don't need to keep the whole list of shards in memory in the
cleaner process. For Q=64, N=3 that can add up to a decent blob of binary
paths. We only need node names (atoms) and refs. So updated to use just a set
of [{Node, Ref}, ...]. A set since in theory someone would add the same worker
twice to it.

Since we added the new `rexi:cast_ref(...)` variants, ensure to add more test
coverage, including the streaming logic as well. It's not 100% yet, but getting
there.

Also, the comments in `rexi.erl` were full of erldoc stanzas and we don't
actually build erldocs anywhere, so replace them with something more helpful.
The streaming protocol itself was never quite described anywhere, and it can
take sometime to figure it out (at least it took me), so took the chance to
also add a very basic, high level description of the message flow.

Related: #5127 (comment)
  • Loading branch information
nickva committed Jul 28, 2024
1 parent 4f73e6c commit fe0c893
Show file tree
Hide file tree
Showing 7 changed files with 326 additions and 68 deletions.
2 changes: 1 addition & 1 deletion src/couch_replicator/src/couch_replicator_fabric.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

docs(DbName, Options, QueryArgs, Callback, Acc) ->
Shards = mem3:shards(DbName),
Workers0 = fabric_util:submit_jobs(
Workers0 = fabric_streams:submit_jobs(
Shards, couch_replicator_fabric_rpc, docs, [Options, QueryArgs]
),
RexiMon = fabric_util:create_monitors(Workers0),
Expand Down
113 changes: 102 additions & 11 deletions src/fabric/src/fabric_streams.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
-module(fabric_streams).

-export([
submit_jobs/4,
start/2,
start/3,
start/4,
Expand All @@ -27,6 +28,23 @@

-define(WORKER_CLEANER, fabric_worker_cleaner).

% This is the streams equivalent of fabric_util:submit_jobs/4. Besides
% submitting the jobs it also starts the worker cleaner and adds each started
% job to the cleaner first before the job is submitted.
%
submit_jobs(Shards, Module, EndPoint, ExtraArgs) ->
% Create refs first and add them to the cleaner to ensure if our process
% gets killed, the remote workers will be cleaned up as well.
RefFun = fun(#shard{} = Shard) -> Shard#shard{ref = make_ref()} end,
Workers = lists:map(RefFun, Shards),
ClientReq = chttpd_util:mochiweb_client_req_get(),
spawn_worker_cleaner(self(), Workers, ClientReq),
SubmitFun = fun(#shard{node = Node, name = ShardName, ref = Ref}) ->
rexi:cast_ref(Ref, Node, {Module, EndPoint, [ShardName | ExtraArgs]})
end,
ok = lists:foreach(SubmitFun, Workers),
Workers.

start(Workers, Keypos) ->
start(Workers, Keypos, undefined, undefined).

Expand Down Expand Up @@ -158,39 +176,49 @@ handle_stream_start(Else, _, _) ->
% Spawn an auxiliary rexi worker cleaner. This will be used in cases
% when the coordinator (request) process is forceably killed and doesn't
% get a chance to process its `after` fabric:clean/1 clause.
spawn_worker_cleaner(Coordinator, Workers, ClientReq) ->
spawn_worker_cleaner(Coordinator, Workers, ClientReq) when
is_pid(Coordinator), is_list(Workers)
->
case get(?WORKER_CLEANER) of
undefined ->
Pid = spawn(fun() ->
erlang:monitor(process, Coordinator),
cleaner_loop(Coordinator, Workers, ClientReq)
NodeRefSet = set_from_list(shards_to_node_refs(Workers)),
cleaner_loop(Coordinator, NodeRefSet, ClientReq)
end),
put(?WORKER_CLEANER, Pid),
Pid;
ExistingCleaner ->
ExistingCleaner when is_pid(ExistingCleaner) ->
ExistingCleaner
end.

cleaner_loop(Pid, Workers, ClientReq) ->
cleaner_loop(Pid, NodeRefSet, ClientReq) ->
CheckMSec = chttpd_util:mochiweb_client_req_check_msec(),
receive
{add_worker, Pid, Worker} ->
cleaner_loop(Pid, [Worker | Workers], ClientReq);
{add_node_ref, Pid, {_, _} = NodeRef} ->
cleaner_loop(Pid, sets:add_element(NodeRef, NodeRefSet), ClientReq);
{'DOWN', _, _, Pid, _} ->
fabric_util:cleanup(Workers)
rexi:kill_all(sets:to_list(NodeRefSet))
after CheckMSec ->
chttpd_util:stop_client_process_if_disconnected(Pid, ClientReq),
cleaner_loop(Pid, Workers, ClientReq)
cleaner_loop(Pid, NodeRefSet, ClientReq)
end.

add_worker_to_cleaner(CoordinatorPid, Worker) ->
add_worker_to_cleaner(CoordinatorPid, #shard{node = Node, ref = Ref}) ->
case get(?WORKER_CLEANER) of
CleanerPid when is_pid(CleanerPid) ->
CleanerPid ! {add_worker, CoordinatorPid, Worker};
CleanerPid ! {add_node_ref, CoordinatorPid, {Node, Ref}};
_ ->
ok
end.

set_from_list(List) when is_list(List) ->
sets:from_list(List, [{version, 2}]).

shards_to_node_refs(Workers) when is_list(Workers) ->
Fun = fun(#shard{node = Node, ref = Ref}) -> {Node, Ref} end,
lists:map(Fun, Workers).

-ifdef(TEST).

-include_lib("couch/include/couch_eunit.hrl").
Expand All @@ -207,7 +235,8 @@ worker_cleaner_test_() ->
?TDEF_FE(does_not_fire_if_cleanup_called),
?TDEF_FE(should_clean_additional_worker_too),
?TDEF_FE(coordinator_is_killed_if_client_disconnects),
?TDEF_FE(coordinator_is_not_killed_if_client_is_connected)
?TDEF_FE(coordinator_is_not_killed_if_client_is_connected),
?TDEF_FE(submit_jobs_sets_up_cleaner)
]
}
}.
Expand Down Expand Up @@ -351,6 +380,68 @@ coordinator_is_not_killed_if_client_is_connected(_) ->
{'DOWN', CleanerRef, _, _, _} -> ok
end.

submit_jobs_sets_up_cleaner(_) ->
meck:reset(rexi),
erase(?WORKER_CLEANER),
Shards = [
#shard{node = 'n1'},
#shard{node = 'n2'}
],
meck:expect(rexi, cast_ref, fun(Ref, _, _) -> Ref end),
{Coord, CoordRef} = spawn_monitor(fun() ->
Workers = submit_jobs(Shards, fabric_rpc, potatoes, []),
receive
{get_workers_and_cleaner, From} ->
From ! {Workers, get(?WORKER_CLEANER)},
timer:sleep(999999)
end
end),
Coord ! {get_workers_and_cleaner, self()},
{Workers, Cleaner} =
receive
Msg -> Msg
end,
?assert(is_pid(Cleaner)),
?assert(is_process_alive(Cleaner)),
?assert(is_process_alive(Coord)),
CheckWorkerFun = fun(#shard{node = Node, ref = Ref}) ->
?assert(is_reference(Ref)),
{Node, Ref}
end,
NodeRefs = lists:map(CheckWorkerFun, Workers),
?assertEqual(length(Shards), length(Workers)),
?assertEqual(length(lists:usort(NodeRefs)), length(NodeRefs)),
% Were the jobs actually submitted?
meck:wait(2, rexi, cast_ref, '_', 1000),
% If we kill the coordinator, the cleaner should kill the workers
meck:reset(rexi),
CleanupMon = erlang:monitor(process, Cleaner),
exit(Coord, kill),
receive
{'DOWN', CoordRef, _, _, WorkerReason} ->
?assertEqual(killed, WorkerReason)
after 1000 ->
?assert(is_process_alive(Coord))
end,
% Cleaner should do the cleanup
meck:wait(1, rexi, kill_all, '_', 1000),
History = meck:history(rexi),
?assertMatch([{_, {rexi, kill_all, _}, ok}], History),
[{Pid, {rexi, kill_all, Args}, ok}] = History,
% It was the cleaner who called it
?assertEqual(Cleaner, Pid),
?assertMatch([[{_, _}, {_, _}]], Args),
[NodeRefsKilled] = Args,
% The node refs killed are the ones we expect
?assertEqual(lists:sort(NodeRefs), lists:sort(NodeRefsKilled)),
% Cleanup process should exit when done
receive
{'DOWN', CleanupMon, _, _, CleanerReason} ->
?assertEqual(normal, CleanerReason)
after 1000 ->
?assert(is_process_alive(Cleaner))
end.

setup() ->
ok = meck:expect(rexi, kill_all, fun(_) -> ok end),
% Speed up disconnect socket timeout for the test to 200 msec
Expand Down
2 changes: 1 addition & 1 deletion src/fabric/src/fabric_view_all_docs.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ go(Db, Options, #mrargs{keys = undefined} = QueryArgs, Callback, Acc) ->
{CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(QueryArgs),
DbName = fabric:dbname(Db),
{Shards, RingOpts} = shards(Db, QueryArgs),
Workers0 = fabric_util:submit_jobs(
Workers0 = fabric_streams:submit_jobs(
Shards, fabric_rpc, all_docs, [Options, WorkerArgs]
),
RexiMon = fabric_util:create_monitors(Workers0),
Expand Down
4 changes: 2 additions & 2 deletions src/fabric/src/fabric_view_map.erl
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ go(Db, Options, DDoc, View, Args0, Callback, Acc, VInfo) ->
Repls = fabric_ring:get_shard_replacements(DbName, Shards),
RPCArgs = [DocIdAndRev, View, WorkerArgs, Options],
StartFun = fun(Shard) ->
hd(fabric_util:submit_jobs([Shard], fabric_rpc, map_view, RPCArgs))
hd(fabric_streams:submit_jobs([Shard], fabric_rpc, map_view, RPCArgs))
end,
Workers0 = fabric_util:submit_jobs(Shards, fabric_rpc, map_view, RPCArgs),
Workers0 = fabric_streams:submit_jobs(Shards, fabric_rpc, map_view, RPCArgs),
RexiMon = fabric_util:create_monitors(Workers0),
try
case
Expand Down
4 changes: 2 additions & 2 deletions src/fabric/src/fabric_view_reduce.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ go(Db, DDoc, VName, Args, Callback, Acc, VInfo) ->
fabric_view:maybe_update_others(DbName, DocIdAndRev, Shards, VName, Args),
Repls = fabric_ring:get_shard_replacements(DbName, Shards),
StartFun = fun(Shard) ->
hd(fabric_util:submit_jobs([Shard], fabric_rpc, reduce_view, RPCArgs))
hd(fabric_streams:submit_jobs([Shard], fabric_rpc, reduce_view, RPCArgs))
end,
Workers0 = fabric_util:submit_jobs(Shards, fabric_rpc, reduce_view, RPCArgs),
Workers0 = fabric_streams:submit_jobs(Shards, fabric_rpc, reduce_view, RPCArgs),
RexiMon = fabric_util:create_monitors(Workers0),
try
case
Expand Down
Loading

0 comments on commit fe0c893

Please sign in to comment.