Skip to content

Commit

Permalink
feat(subscriber): name tasks spawned by the console subscriber (conso…
Browse files Browse the repository at this point in the history
…le-rs#117)

This branch updates the `console-subscriber` crate so that the
"aggregate" and "serve" tasks spawned by the console are spawned with
names. This allows the user to distinguish between tasks spawned by the
console subscriber and tasks spawned by other parts of the application.

Note that this does *not* include the tasks spawned by Tonic to handle
each accepted client connection. Those tasks are spawned inside of
Tonic, rather than by the console-subscriber crate, so we can't easily
add our own names to them. We could probably fix that by using Tonic's
lower level APIs to spawn our own per-connection tasks, and give them
names...but that seems like a follow-up PR.

This is a first pass on console-rs#109 (although it isn't a *complete* solution
due to the above issue with Tonic).

Screenshot: 
![image](https://user-images.githubusercontent.com/2796466/132248558-8417f2c8-317b-4d2a-85a9-c851dacba587.png)
  • Loading branch information
hawkw authored Sep 6, 2021
1 parent 0685482 commit 05b9f5b
Showing 1 changed file with 20 additions and 5 deletions.
25 changes: 20 additions & 5 deletions console-subscriber/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,16 +548,16 @@ impl Server {
.aggregator
.take()
.expect("cannot start server multiple times");
let aggregate = tokio::spawn(aggregate.run());
let aggregate = spawn_named(aggregate.run(), "console::aggregate");
let addr = self.addr;
let res = builder
let serve = builder
.add_service(proto::instrument::instrument_server::InstrumentServer::new(
self,
))
.serve(addr)
.await;
.serve(addr);
let res = spawn_named(serve, "console::serve").await;
aggregate.abort();
res.map_err(Into::into)
res?.map_err(Into::into)
}
}

Expand Down Expand Up @@ -650,3 +650,18 @@ impl WakeOp {
}
}
}

#[track_caller]
pub(crate) fn spawn_named<T>(
task: impl std::future::Future<Output = T> + Send + 'static,
_name: &str,
) -> tokio::task::JoinHandle<T>
where
T: Send + 'static,
{
#[cfg(tokio_unstable)]
return tokio::task::Builder::new().name(_name).spawn(task);

#[cfg(not(tokio_unstable))]
tokio::spawn(task)
}

0 comments on commit 05b9f5b

Please sign in to comment.