Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Measure execution times of streams #214

Merged
merged 12 commits into from
Aug 8, 2024
35 changes: 6 additions & 29 deletions src/bin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,10 @@ fn initialize_tracing(config: &Config) -> eyre::Result<TracingShutdownHandle> {
} else {
tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer().pretty().compact())
.with(tracing_subscriber::EnvFilter::from_default_env())
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "info".into()),
)
.init();

Ok(TracingShutdownHandle)
Expand Down Expand Up @@ -504,11 +507,9 @@ async fn main() -> eyre::Result<()> {
background_tasks.check_tasks();

let processing_timeout = Duration::from_secs(config.processing_timeout_secs);
let mut total_time = Instant::now();
let mut batch_times = Duration::from_secs(0);

// Main loop
for request_counter in 0..N_BATCHES {
loop {
// **Tensor format of queries**
//
// The functions `receive_batch` and `prepare_query_shares` will prepare the
Expand All @@ -522,11 +523,6 @@ async fn main() -> eyre::Result<()> {
// - The outer Vec is the dimension of the Galois Ring (2):
// - A decomposition of each iris bit into two u8 limbs.

// Skip first iteration
if request_counter == 1 {
total_time = Instant::now();
batch_times = Duration::from_secs(0);
}
let now = Instant::now();

// Skip requests based on the startup sync, only in the first iteration.
Expand Down Expand Up @@ -556,8 +552,7 @@ async fn main() -> eyre::Result<()> {
}

// start trace span - with single TraceId and single ParentTraceID
println!("Received batch in {:?}", now.elapsed());
batch_times += now.elapsed();
tracing::info!("Received batch in {:?}", now.elapsed());
background_tasks.check_tasks();

let result_future = handle.submit_batch_query(batch).await;
Expand All @@ -568,25 +563,7 @@ async fn main() -> eyre::Result<()> {
.map_err(|e| eyre!("ServerActor processing timeout: {:?}", e))?;

tx.send(result).await.unwrap();
println!("CPU time of one iteration {:?}", now.elapsed());

// wrap up span context
}
// drop actor handle to initiate shutdown
drop(handle);

println!(
"Total time for {} iterations: {:?}",
N_BATCHES - 1,
total_time.elapsed() - batch_times
);

// Clean up server tasks, then wait for them to finish
background_tasks.abort_all();
tokio::time::sleep(Duration::from_secs(5)).await;

// Check for background task hangs and shutdown panics
background_tasks.check_tasks_finished();

Ok(())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idk if we need a "clean" shutdown for the Server actor, by putting the loop in a try able block and doing the shutdown here on error?

}
10 changes: 2 additions & 8 deletions src/helpers/device_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,11 @@ impl DeviceManager {
}
}

pub fn create_events(&self, blocking_sync: bool) -> Vec<CUevent> {
let flags = if blocking_sync {
CUevent_flags::CU_EVENT_BLOCKING_SYNC
} else {
CUevent_flags::CU_EVENT_DEFAULT
};

pub fn create_events(&self) -> Vec<CUevent> {
let mut events = vec![];
for idx in 0..self.devices.len() {
self.devices[idx].bind_to_thread().unwrap();
events.push(event::create(flags).unwrap());
events.push(event::create(CUevent_flags::CU_EVENT_DEFAULT).unwrap());
}
events
}
Expand Down
Loading
Loading