Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve worker cleanup on early coordinator exit #5152

Merged
merged 1 commit into from
Jul 28, 2024

Conversation

nickva
Copy link
Contributor

@nickva nickva commented Jul 26, 2024

Previously, if the coordinator process is killed too quickly, before the stream worker cleanup process is spawned, remote workers may be left around waiting until the default 5 minute timeout expires.

In order to reliably clean up processes in that state, need to start the cleaner process, with all the job references, before we start submitting them for execution.

At first, it may seem impossible to monitor a process until after it's already spawned. That's true for regular processes, however rexi operates on plain references. For each process we spawn remotely we create a reference on the coordinator side, which we can then use to track that job. Those are just plain manually created references. Nothing stops us from creating them first, adding them to a cleaner process, and only then submitting them.

That's exactly what this commit accomplishes:

  • Create a streams specific fabric_streams:submit_jobs/4 function, which spawns the cleanup process early, generates worker references, and then submits the jobs. This way, all the existing streaming submit_jobs can be replaced easily in one line: fabric_util -> fabric_streams.

  • The cleanup process operates as previously: monitors the coordinator for exits, and fires off kill_all message to each node.

  • Create rexi:cast_ref(...) variants of rexi:cast(...) calls, where the caller specifies the references as arguments. This is what allows us to start the cleanup process before the jobs are even submitted. Older calls can just be transformed to call into the cast_ref versions with their own created references.

Noticed that we don't need to keep the whole list of shards in memory in the cleaner process. For Q=64, N=3 that can add up to a decent blob of binary paths. We only need node names (atoms) and refs. So updated to use just a set of [{Node, Ref}, ...]. A set since in theory someone would add the same worker twice to it.

Since we added the new rexi:cast_ref(...) variants, ensure to add more test coverage, including the streaming logic as well. It's not 100% yet, but getting there.

Also, the comments in rexi.erl were full of erldoc stanzas and we don't actually build erldocs anywhere, so replace them with something more helpful. The streaming protocol itself was never quite described anywhere, and it can take sometime to figure it out (at least it took me), so took the chance to also add a very basic, high level description of the message flow.

Related: #5127 (comment)

@nickva nickva force-pushed the improve-rexi-stream-clean-on-coordinator-exit branch from a11236f to fcc0e9e Compare July 26, 2024 22:45
Copy link
Contributor

@jaydoane jaydoane left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Such an elegant solution to a nasty race condition!

Just a nit, but I thought a couple sentences in the PR description might have contained typos, specifically:

This is what allows us to start the cleanup process before the even get submitted. Older calls can just be easily call into the cast_ref versions with their own created references.

where I have emphasized the confusing parts.

Very nice to see such great test coverage improvement!

Code Coverage:
rexi            :  85%
rexi_app        : 100%
rexi_buffer     :  82%
rexi_monitor    :   0%
rexi_server     :  59%
rexi_server_mon :  43%
rexi_server_sup : 100%
rexi_sup        : 100%
rexi_utils      :  12%

Total           : 56%

src/fabric/src/fabric_streams.erl Show resolved Hide resolved
Comment on lines +152 to +183
%% Stream messages back to the coordinator. Initializes on first use. Limit
%% the number of unacked messsages to Limit, and throw a timeout error if it
%% doesn't receive an ack in Timeout milliseconds.
%%
%% The general protocol looks like this:
%%
%% Coordinator Worker (one of Q*N usually)
%% ---------- --------------------------
%% cast/2,3,4 -> {doit, ...} -> rexi_server:
%% spawn_monitor worker process.
%% First time stream2/1 is called it
%% runs init_stream/1.
%%
%% init_stream/1:
%% <- rexi_STREAM_INIT <- sync send, wait for reply
%%
%% Some workers are told to
%% continue with rexi_STREAM_START.
%% Others are told to stop with
%% rexi_STREAM_CANCEL
%%
%% -> rexi_STREAM_START ->
%% Keep calling rexi:stream2/3
%% to stream data to coordinator...
%%
%% <- Caller ! {Ref, self(), Msg} <-
%% ...
%% Coordinator must acknowledge.
%% -> {rexi_ack, 1} ->
%% Send last message. No need for ack.
%% <- Caller ! Msg <-
%%
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is awesome!

Previously, if the coordinator process is killed too quickly, before the stream
worker cleanup process is spawned, remote workers may be left around
waiting until the default 5 minute timeout expires.

In order to reliably clean up processes in that state, need to start the
cleaner process, with all the job references, before we start submitting them
for execution.

At first, it may seem impossible to monitor a process until after it's already
spawned. That's true for regular processes, however rexi operates on plain
references. For each process we spawn remotely we create a reference on the
coordinator side, which we can then use to track that job. Those are just plain
manually created references. Nothing stops us from creating them first, adding
them to a cleaner process, and only then submitting them.

That's exactly what this commit accomplishes:

  * Create a streams specific `fabric_streams:submit_jobs/4` function, which
  spawns the cleanup process early, generates worker references, and then
  submits the jobs. This way, all the existing streaming submit_jobs calls can
  be replaced easily in one line: `fabric_util` -> `fabric_streams`.

  * The cleanup process operates as previously: monitors the coordinator for
  exits, and fires off `kill_all` message to each node when needed.

  * Create `rexi:cast_ref(...)` variants of `rexi:cast(...)` calls, where the
  caller specifies the references as arguments. This is what allows us to start
  the cleanup process before the jobs are even submitted. Older calls can just
  be transformed to call into the `cast_ref` versions with their own created
  references.

Noticed that we don't need to keep the whole list of shards in memory in the
cleaner process. For Q=64, N=3 that can add up to a decent blob of binary
paths. We only need node names (atoms) and refs. So updated to use just a set
of [{Node, Ref}, ...]. A set since in theory someone would add the same worker
twice to it.

Since we added the new `rexi:cast_ref(...)` variants, ensure to add more test
coverage, including the streaming logic as well. It's not 100% yet, but getting
there.

Also, the comments in `rexi.erl` were full of erldoc stanzas and we don't
actually build erldocs anywhere, so replace them with something more helpful.
The streaming protocol itself was never quite described anywhere, and it can
take sometime to figure it out (at least it took me), so took the chance to
also add a very basic, high level description of the message flow.

Related: #5127 (comment)
@nickva
Copy link
Contributor Author

nickva commented Jul 28, 2024

Just a nit, but I thought a couple sentences in the PR description might have contained typos, specifically:

This is what allows us to start the cleanup process before the even get submitted. Older calls can just be easily call into the cast_ref versions with their own created references.

Thank you. Yeah that is confusing. I reworded it as:

This is what allows us to start the cleanup process before the jobs are even submitted. Older calls can just be transformed to call into the cast_ref versions with their own created references.

@nickva nickva force-pushed the improve-rexi-stream-clean-on-coordinator-exit branch from fcc0e9e to 5639b69 Compare July 28, 2024 03:58
@nickva nickva merged commit fe0c893 into main Jul 28, 2024
18 checks passed
@nickva nickva deleted the improve-rexi-stream-clean-on-coordinator-exit branch July 28, 2024 04:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants