-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Graceful shutdown on ctrl-c #9
Conversation
Cargo.toml
Outdated
@@ -21,3 +21,4 @@ handlebars = "3" | |||
colored = "2" | |||
colourado = "0.2.0" | |||
memchr = "2.4.1" | |||
ctrlc = "3.2.1" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use tokio::signal::ctrl_c
instead by enabling feature signal
on dependency tokio
.
It removes the need to use tokio::sync::Mutex<Bool>
or tokio::sync::Notify
altogether,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh god tokio has a lot of things. Are you thinking of something like this?
tokio::spawn(async move {
loop {
tokio::select!(
_ = tokio::signal::ctrl_c() => break,
_ = async { /* scheduling loop */ } => continue,
);
}
});
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose this kind of loop is totally OK, if everything in it is cancel safe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think tokio::signal::ctrl_c()
can be called multiple times and all of these .await
on it would return on receiving one ctrl_c
signal, however since I don't have much experience with tokio::signal
, I am not so sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The scheduling loop is probably not cancel safe. The scheduling loop is designed to check ctrl-c cancellation before fetching a new command from the queue, and I feel like awaiting on Notify::notified
or ctrl_c
will end up being async w.r.t the operations of the scheduling loop, making things difficult. Plus, if I'm going to refactor it as a task, then I should figure out a way to drop(command_txs)
, etc when the loop exits.
High chance that I'm not figuring out what you imagined how to use ctrl_c
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very interesting suggestion! I think this will work for broadcast mode, but I'm not sure about queue mode. All in all this is a major rework of the core logic, so if the current one at least works, I think the refactoring should be done in a separate PR. Also, I feel like the suggestions about notify
+ tokio::sync::watch
and using tokio::signal::ctrl_c
are super nice, and I'll try to also incorporate those in the current architecture!
All
Session
s are stored in aArc<HashMap<String, Session>>
, with its name as the key. Since the hosts are loaded at the startup, it is perfectly OK to store them in aArc<HashMap>
.
I suppose it can even be a Arc<Vec<Session>>
because sessions aren't really distinguished between each other.
Then, upon receiving new tasks, search for the corresponding
Session
and spawn a new task to execute the job.
Similarly, there is no "corresponding" session. In broadcast mode, all sessions will be used so that's not a problem, but in queue mode, we need to keep track of which sessions are currently free, and whenever a new command is fetched or a new session becomes free, we execute the command using the free session. To do so, we need to track the status (in use or free) of the sessions. From the top of my head, I suppose keeping is_free: Arc<Mutex<Vec<bool>>>
and setting the flags at the beginning and end of each task would be an option, but that will require the scheduling loop to poll on the vector.
The main task can then call
tokio::sync::Semaphore::acquire_many
to wait for the completion ofn
tasks and usestokio::sync::SemaphorePermit::forget
to prevent the permit from being returned to thetokio::sync::Semaphore
.
Definitely works for broadcast mode but we need something more fine-grained than acquire_many
for queue mode.
By doing each job in a task instead of running every
Session
in a task, we can do graceful shutdown easily usingtokio::signal::ctrl_c
.
I suppose I should start by ditching ctrlc
and use tokio::signal::ctrl_c
right now:
// Replacing ctrlc::set_handler
tokio::spawn(async move {
tokio::signal::ctrl_c().await;
*cancelled_handler.lock().await = true;
});
Using the architecture I described above, I believe it is also possible to use
notify
intokio::spawn
combined withtokio::sync::watch
to monitor the change for change ofqueue.yaml
.It is possible since
tokio::sync::watch::Receiver::changed
is cancel safe.
This is great. Now, instead of just waiting three seconds when queue.yaml
is empty, I can await on watcher_rx.changed()
and whenever it notifies me, continue
to the next iteration which will immediately call job_queue.next()
.
Upon inspection of implementation of
tokio::signal::ctrl_c
, I found that it just usestokio::sync::watch::Receiver
to receive signal, so it should also be cancel safe.
That is good to know, but I didn't quite follow why cancel safety for ctrl_c
is needed in your proposed architecture.
If we actually need to unload/load new hosts dynamically, then I would use
HashMap<String, Arc<Session>>
to stores the sessions.
This could be a pretty good potential feature. Although I currently don't have a need for this, I'll definitely agree if someone will add this feature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly, there is no "corresponding" session.
Oh right, I forgot that in the broadcast mode, the name of session doesn't matter.
And it neither matter in queue mode.
Definitely works for broadcast mode but we need something more fine-grained than acquire_many for queue mode.
We can have a Vec<Arc<Session>>
for storing the all sessions and creates a tokio::sync::mpsc::channel
for storing all Session
that are ready.
I would initialize the channel
like this:
let mut sessions = Vec::new();
session.push(Arc::new(...));
// ...
let (sender, mut receiver) = channel(sessions.len());
for session in sessions.iter() {
sender.send(session.clone()).await.unwrap();
}
When creating a new task:
// Get a session that is free.
let session = receiver.recv().await.unwrap();
let sender = sender.clone();
tokio::spawn(async move {
// do something...
let res = session.run(...).await;
// handle the error...
sender.send(session).await.unwrap();
});
Using this way, we now have fine-grained notification on which Session
is free for queue mode.
I suppose I should start by ditching ctrlc and use tokio::signal::ctrl_c right now:
Agreed.
That is good to know, but I didn't quite follow why cancel safety for ctrl_c is needed in your proposed architecture.
I imagined we will be doing something like this:
tokio::select! {
_ = ctrl_c() => // graceful shutdown,
_ => wait_for_queue_yaml_to_change() => // do more work,
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we're basically tossing around the Session
object! I That's elegant. And we only have to use one channel. And we can keep a vector of Option<JoinHandle>
s where we store the handle returned from tokio::spawn
whenever a task begins, and remove it when it's done. Then, on cancellation, we call join_all
on that handle vector and then iterate over the sessions
vector to gracefully close
all the sessions. I acknowledge that switching to your architecture will benefit Pegasus.
But still, as far as I can understand, the performance would not be significantly better (maybe spawning a new task for each command might slightly degrade performance?) and it doesn't enable something that we weren't able to accomplish with the current architecture. Since the architecture is quite simpler, I will definitely keep this in mind and consider refactoring to this when features are added and the code gets too dirty.
You know, I would have went through the refactoring process if it was Friday night, but... it's Monday soon.. haha.. 😢
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But still, as far as I can understand, the performance would not be significantly better (maybe spawning a new task for each command might slightly degrade performance?) and it doesn't enable something that we weren't able to accomplish with the current architecture.
This is true, however I think a simpler architecture will always make things easier, regardless of whether it is to support new features, fix existing bugs or performance optimizations.
You know, I would have went through the refactoring process if it was Friday night, but... it's Monday soon.. haha..
Totally understandable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is true, however I think a simpler architecture will always make things easier, regardless of whether it is to support new features, fix existing bugs or performance optimizations.
100% agree! Your proposal will not go void.
@jaywonchung I recommend to explicitly close |
Thanks for the review! About explicitly closing |
Mostly network disconnected error.
Yes, sorry I mixed up the name.
|
// terminated. | ||
while let Ok(cmd) = command_rx.recv().await { | ||
let cmd = cmd.fill_template(&mut registry, &session.host); | ||
let result = session.run(cmd).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am pretty sure session.run(cmd)
is cancel safe.
Cargo.toml
Outdated
@@ -21,3 +21,4 @@ handlebars = "3" | |||
colored = "2" | |||
colourado = "0.2.0" | |||
memchr = "2.4.1" | |||
ctrlc = "3.2.1" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I read Session::run
and I am pretty sure it is cancel safe.
And, you don't have to call ctrl_c
in these tasks
.
You can call tokio::task::JoinHandle::abort
to abort these tasks.
I just pushed out a commit that defines a |
Since you already |
Now I tried to use |
I also realized this. We could use |
@NobodyXu Thanks a lot for reviewing. If you don't have further comments, mind if I merge? |
LGTM |
This PR attempts to gracefully shutdown SSH sessions when the user presses ctrl-c.
The overall flow of cancellation propagation is:
ctrlc::set_handler
) assignstrue
to the variablecancelled: Arc<Mutex<bool>>
.cancelled
at the beginning of every iteration. When true, itbreak
s.drop
ed.send
s orrecv
s from any of its channels, the channel will return anErr
and the task will break out of the task execution loop.session
object will be dropped, terminating the SSH session.As a side note, along the way, this PR fixes
stream
inSession
by explicitly lockingstdout
. Without this, multiple calls toprint!
andprintln!
are not coalesced. This lead to output lines from different commands mixing with each other.Closes #3.
@NobodyXu Will you be interested in reviewing? Just asking since you have reviewed a lot of my code recently (and it benefited both me and Pegasus so much). Plus, I can add you as a collaborator if you'd like :)