Skip to content

Commit

Permalink
Improve fabric streams cleanup on error and timeouts
Browse files Browse the repository at this point in the history
Previously, we performed cleanup only for specific errors such as
`ddoc_updated`, and `insufficient_storage`. In case of other errors, or
timeouts, there was a chance we would leak workers waiting to be either started
or canceled. Those workers would then wait around until the 5 minute rexi
timeout fires, and then they emit an error in the logs. It's not a big deal as
that happens on errors only, and the processes are all waiting in receive,
however, they do hold a Db handle open, so they can waste resources from that
point of view.

To fix that, this commit extends cleanup to other errors and timeouts.

Moreover, in case of timeouts, we log fabric worker timeout errors. In order to
do that we export the `fabric_streams` internal `#stream_acc` record to every
`fabric_streams` user. That's a bit untidy, so make the timeout error return
the defunct workers only, and so, we can avoid leaking the `#stream_acc` record
outside the fabric_streams module.

Related to #5127
  • Loading branch information
nickva committed Aug 2, 2024
1 parent a2241d3 commit 31f0da3
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 59 deletions.
10 changes: 2 additions & 8 deletions src/couch_replicator/src/couch_replicator_fabric.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,8 @@ docs(DbName, Options, QueryArgs, Callback, Acc) ->
after
fabric_streams:cleanup(Workers)
end;
{timeout, NewState} ->
DefunctWorkers = fabric_util:remove_done_workers(
NewState#stream_acc.workers, waiting
),
fabric_util:log_timeout(
DefunctWorkers,
"replicator docs"
),
{timeout, DefunctWorkers} ->
fabric_util:log_timeout(DefunctWorkers, "replicator docs"),
Callback({error, timeout}, Acc);
{error, Error} ->
Callback({error, Error}, Acc)
Expand Down
8 changes: 0 additions & 8 deletions src/fabric/include/fabric.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,6 @@
update_seq
}).

-record(stream_acc, {
workers,
ready,
start_fun,
replacements,
ring_opts
}).

-record(view_row, {key, id, value, doc, worker}).

-type row_property_key() :: id | key | value | doc | worker.
Expand Down
55 changes: 49 additions & 6 deletions src/fabric/src/fabric_streams.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,16 @@
add_worker_to_cleaner/2
]).

-include_lib("fabric/include/fabric.hrl").
-include_lib("mem3/include/mem3.hrl").

-record(stream_acc, {
workers,
ready,
start_fun,
replacements,
ring_opts
}).

-define(WORKER_CLEANER, fabric_worker_cleaner).

% This is the streams equivalent of fabric_util:submit_jobs/4. Besides
Expand Down Expand Up @@ -77,7 +84,12 @@ start(Workers0, Keypos, StartFun, Replacements, RingOpts) ->
Workers
),
{ok, AckedWorkers};
{timeout, #stream_acc{workers = Defunct}} ->
cleanup(Workers0),
DefunctWorkers = fabric_util:remove_done_workers(Defunct, waiting),
{timeout, DefunctWorkers};
Else ->
cleanup(Workers0),
Else
end.

Expand Down Expand Up @@ -165,10 +177,7 @@ handle_stream_start(rexi_STREAM_INIT, {Worker, From}, St) ->
{stop, St#stream_acc{workers = [], ready = Ready1}}
end
end;
handle_stream_start({ok, Error}, _, St) when Error == ddoc_updated; Error == insufficient_storage ->
WaitingWorkers = [W || {W, _} <- St#stream_acc.workers],
ReadyWorkers = [W || {W, _} <- St#stream_acc.ready],
cleanup(WaitingWorkers ++ ReadyWorkers),
handle_stream_start({ok, Error}, _, _) when Error == ddoc_updated; Error == insufficient_storage ->
{stop, Error};
handle_stream_start(Else, _, _) ->
exit({invalid_stream_start, Else}).
Expand Down Expand Up @@ -236,7 +245,9 @@ worker_cleaner_test_() ->
?TDEF_FE(should_clean_additional_worker_too),
?TDEF_FE(coordinator_is_killed_if_client_disconnects),
?TDEF_FE(coordinator_is_not_killed_if_client_is_connected),
?TDEF_FE(submit_jobs_sets_up_cleaner)
?TDEF_FE(submit_jobs_sets_up_cleaner),
?TDEF_FE(cleanup_called_on_timeout),
?TDEF_FE(cleanup_called_on_error)
]
}
}.
Expand Down Expand Up @@ -442,7 +453,39 @@ submit_jobs_sets_up_cleaner(_) ->
?assert(is_process_alive(Cleaner))
end.

cleanup_called_on_timeout(_) ->
Ref1 = make_ref(),
Ref2 = make_ref(),
W1 = #shard{node = 'n1', ref = Ref1},
W2 = #shard{node = 'n2', ref = Ref2},
Workers = [W1, W2],
meck:expect(rexi_utils, recv, fun(_, _, _, Acc, _, _) ->
{timeout, Acc#stream_acc{workers = [{W2, waiting}]}}
end),
meck:reset(fabric_util),
Res = start(Workers, #shard.ref, undefined, undefined, []),
?assertEqual({timeout, [W2]}, Res),
?assert(meck:called(fabric_util, cleanup, 1)).

cleanup_called_on_error(_) ->
Ref1 = make_ref(),
Ref2 = make_ref(),
W1 = #shard{node = 'n1', ref = Ref1},
W2 = #shard{node = 'n2', ref = Ref2},
Workers = [W1, W2],
meck:expect(rexi_utils, recv, fun(_, _, _, _, _, _) ->
{error, foo}
end),
meck:reset(fabric_util),
Res = start(Workers, #shard.ref, undefined, undefined, []),
?assertEqual({error, foo}, Res),
?assert(meck:called(fabric_util, cleanup, 1)).

setup() ->
ok = meck:new(rexi_utils, [passthrough]),
ok = meck:new(config, [passthrough]),
ok = meck:new(fabric_util, [passthrough]),
meck:expect(config, get, fun(_, _, Default) -> Default end),
ok = meck:expect(rexi, kill_all, fun(_) -> ok end),
% Speed up disconnect socket timeout for the test to 200 msec
ok = meck:expect(chttpd_util, mochiweb_client_req_check_msec, 0, 200).
Expand Down
10 changes: 2 additions & 8 deletions src/fabric/src/fabric_view_all_docs.erl
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,8 @@ go(Db, Options, #mrargs{keys = undefined} = QueryArgs, Callback, Acc) ->
after
fabric_streams:cleanup(Workers)
end;
{timeout, NewState} ->
DefunctWorkers = fabric_util:remove_done_workers(
NewState#stream_acc.workers, waiting
),
fabric_util:log_timeout(
DefunctWorkers,
"all_docs"
),
{timeout, DefunctWorkers} ->
fabric_util:log_timeout(DefunctWorkers, "all_docs"),
Callback({error, timeout}, Acc);
{error, Error} ->
Callback({error, Error}, Acc)
Expand Down
11 changes: 2 additions & 9 deletions src/fabric/src/fabric_view_changes.erl
Original file line number Diff line number Diff line change
Expand Up @@ -199,15 +199,8 @@ send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn, Timeout) ->
after
fabric_streams:cleanup(Workers)
end;
{timeout, NewState} ->
DefunctWorkers = fabric_util:remove_done_workers(
NewState#stream_acc.workers,
waiting
),
fabric_util:log_timeout(
DefunctWorkers,
"changes"
),
{timeout, DefunctWorkers} ->
fabric_util:log_timeout(DefunctWorkers, "changes"),
throw({error, timeout});
{error, Reason} ->
throw({error, Reason});
Expand Down
12 changes: 2 additions & 10 deletions src/fabric/src/fabric_view_map.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

-include_lib("fabric/include/fabric.hrl").
-include_lib("mem3/include/mem3.hrl").
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_mrview/include/couch_mrview.hrl").

go(DbName, Options, GroupId, View, Args, Callback, Acc, VInfo) when
Expand Down Expand Up @@ -66,15 +65,8 @@ go(Db, Options, DDoc, View, Args0, Callback, Acc, VInfo) ->
after
fabric_streams:cleanup(Workers)
end;
{timeout, NewState} ->
DefunctWorkers = fabric_util:remove_done_workers(
NewState#stream_acc.workers,
waiting
),
fabric_util:log_timeout(
DefunctWorkers,
"map_view"
),
{timeout, DefunctWorkers} ->
fabric_util:log_timeout(DefunctWorkers, "map_view"),
Callback({error, timeout}, Acc);
{error, Error} ->
Callback({error, Error}, Acc)
Expand Down
12 changes: 2 additions & 10 deletions src/fabric/src/fabric_view_reduce.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

-include_lib("fabric/include/fabric.hrl").
-include_lib("mem3/include/mem3.hrl").
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_mrview/include/couch_mrview.hrl").

go(DbName, GroupId, View, Args, Callback, Acc0, VInfo) when is_binary(GroupId) ->
Expand Down Expand Up @@ -55,15 +54,8 @@ go(Db, DDoc, VName, Args, Callback, Acc, VInfo) ->
after
fabric_streams:cleanup(Workers)
end;
{timeout, NewState} ->
DefunctWorkers = fabric_util:remove_done_workers(
NewState#stream_acc.workers,
waiting
),
fabric_util:log_timeout(
DefunctWorkers,
"reduce_view"
),
{timeout, DefunctWorkers} ->
fabric_util:log_timeout(DefunctWorkers, "reduce_view"),
Callback({error, timeout}, Acc);
{error, Error} ->
Callback({error, Error}, Acc)
Expand Down

0 comments on commit 31f0da3

Please sign in to comment.