From b5babad71496e9dab296e302b027093990650daa Mon Sep 17 00:00:00 2001 From: Mathias Kraus Date: Tue, 5 Dec 2023 01:16:32 +0100 Subject: [PATCH] [#42] Initial multi-process benchmark --- Cargo.toml | 3 +- .../Cargo.toml | 15 ++ .../src/follower.rs | 106 ++++++++++ .../src/leader.rs | 139 +++++++++++++ .../src/main.rs | 196 ++++++++++++++++++ .../src/params.rs | 17 ++ .../src/setup.rs | 51 +++++ elkodon/src/port/event_id.rs | 4 +- 8 files changed, 528 insertions(+), 3 deletions(-) create mode 100644 benchmarks/multi_process_publish_subscribe/Cargo.toml create mode 100644 benchmarks/multi_process_publish_subscribe/src/follower.rs create mode 100644 benchmarks/multi_process_publish_subscribe/src/leader.rs create mode 100644 benchmarks/multi_process_publish_subscribe/src/main.rs create mode 100644 benchmarks/multi_process_publish_subscribe/src/params.rs create mode 100644 benchmarks/multi_process_publish_subscribe/src/setup.rs diff --git a/Cargo.toml b/Cargo.toml index faf1d19..fa95731 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,8 @@ members = [ "examples", - "benchmarks/publish_subscribe" + "benchmarks/publish_subscribe", + "benchmarks/multi_process_publish_subscribe", ] [workspace.package] diff --git a/benchmarks/multi_process_publish_subscribe/Cargo.toml b/benchmarks/multi_process_publish_subscribe/Cargo.toml new file mode 100644 index 0000000..af1b4ba --- /dev/null +++ b/benchmarks/multi_process_publish_subscribe/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "benchmark_multi_process_publish_subscribe" +description = "Elkodon: multi-process benchmark for publish-subscribe messaging pattern" +rust-version = { workspace = true } +version = { workspace = true } +repository = { workspace = true } +edition = { workspace = true } + +[dependencies] +elkodon_bb_log = { workspace = true } +elkodon = { workspace = true } +elkodon_bb_posix = { workspace = true } +elkodon_bb_container = { workspace = true } + +clap = { version = "4.4", features = ["derive"] } diff --git a/benchmarks/multi_process_publish_subscribe/src/follower.rs b/benchmarks/multi_process_publish_subscribe/src/follower.rs new file mode 100644 index 0000000..38d4014 --- /dev/null +++ b/benchmarks/multi_process_publish_subscribe/src/follower.rs @@ -0,0 +1,106 @@ +use crate::setup::*; + +use elkodon::prelude::*; + +use core::mem::MaybeUninit; +use std::time::{Duration, SystemTime}; + +pub fn run_follower_process() -> Result<(), Box> { + // follower setup + let follower_service = zero_copy::Service::new(&ServiceName::new(FOLLOWER_SERVICE_NAME)?) + .publish_subscribe() + .max_publishers(1) + .max_subscribers(1) + .history_size(0) + .subscriber_max_buffer_size(1) + .enable_safe_overflow(false) + .open_or_create::>()?; + + let follower_publisher = follower_service.publisher().create()?; + + // leader setup + let leader_service = zero_copy::Service::new(&ServiceName::new(LEADER_SERVICE_NAME)?) + .publish_subscribe() + .max_publishers(1) + .max_subscribers(1) + .history_size(0) + .subscriber_max_buffer_size(1) + .enable_safe_overflow(false) + .open_or_create::>()?; + + let leader_subscriber = leader_service.subscriber().create()?; + + // latency result setup + let latency_service = zero_copy::Service::new(&ServiceName::new(LATENCY_SERVICE_NAME)?) + .publish_subscribe() + .max_publishers(2) + .max_subscribers(1) + .history_size(0) + .subscriber_max_buffer_size(2) + .enable_safe_overflow(false) + .open_or_create::()?; + + let latency_publisher = latency_service.publisher().create()?; + let mut latency_sample = latency_publisher.loan()?; + + // ready setup + let ready_event = zero_copy::Service::new(&ServiceName::new(READY_EVENT_NAME)?) + .event() + .open_or_create()?; + + let ready_notifier = ready_event.notifier().create()?; + + // signal ready to main process + ready_notifier.notify_with_custom_event_id(FOLLOWER_READY_EVENT_ID)?; + + let mut i = 0; + let mut finished = false; + while !finished { + let mut abort_counter = 100_000_000; + let sample = loop { + match leader_subscriber.receive() { + Ok(None) => { /* nothing to do */ } + Ok(Some(sample)) => { + break sample; + } + Err(e) => Err(format!("Error at receiving samples: {:?}", e))?, + } + abort_counter -= 1; + if abort_counter == 0 { + Err("The leader process is not responding")?; + } + }; + + if !sample.info.warmup { + let receive_timestamp = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH)? + .as_nanos(); + let latency = receive_timestamp.saturating_sub(sample.info.timestamp); + latency_sample.payload_mut().latencies[i] = latency as u64; + finished = sample.info.last; + i += 1; + } + + let send_timestamp = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH)? + .as_nanos(); + let sample = follower_publisher.loan_uninit()?.write_payload(BenchTopic { + info: Info { + timestamp: send_timestamp, + warmup: sample.info.warmup, + last: finished, + }, + data: MaybeUninit::uninit(), + }); + follower_publisher.send(sample)?; + } + latency_sample.payload_mut().used_size = i; + latency_publisher.send(latency_sample)?; + + println!("Follower finished!"); + + // FIXME the samples are not received when the process is gone + std::thread::sleep(Duration::from_secs(2)); + + Ok(()) +} diff --git a/benchmarks/multi_process_publish_subscribe/src/leader.rs b/benchmarks/multi_process_publish_subscribe/src/leader.rs new file mode 100644 index 0000000..5e07219 --- /dev/null +++ b/benchmarks/multi_process_publish_subscribe/src/leader.rs @@ -0,0 +1,139 @@ +use crate::setup::*; + +use elkodon::prelude::*; + +use core::mem::MaybeUninit; +use core::time::Duration; +use std::time::SystemTime; + +pub fn run_leader_process() -> Result<(), Box> { + // settings setup + let settings_service = zero_copy::Service::new(&ServiceName::new(SETTINGS_SERVICE_NAME)?) + .publish_subscribe() + .max_publishers(1) + .max_subscribers(2) + .history_size(0) + .subscriber_max_buffer_size(1) + .enable_safe_overflow(false) + .open_or_create::()?; + + let settings_subscriber = settings_service.subscriber().create()?; + + let settings_event = zero_copy::Service::new(&ServiceName::new(SETTINGS_EVENT_NAME)?) + .event() + .open_or_create()?; + + let mut settings_listener = settings_event.listener().create()?; + + // leader setup + let leader_service = zero_copy::Service::new(&ServiceName::new(LEADER_SERVICE_NAME)?) + .publish_subscribe() + .max_publishers(1) + .max_subscribers(1) + .history_size(0) + .subscriber_max_buffer_size(1) + .enable_safe_overflow(false) + .open_or_create::>()?; + + let leader_publisher = leader_service.publisher().create()?; + + // follower setup + let follower_service = zero_copy::Service::new(&ServiceName::new(FOLLOWER_SERVICE_NAME)?) + .publish_subscribe() + .max_publishers(1) + .max_subscribers(1) + .history_size(0) + .subscriber_max_buffer_size(1) + .enable_safe_overflow(false) + .open_or_create::>()?; + + let follower_subscriber = follower_service.subscriber().create()?; + + // latency result setup + let latency_service = zero_copy::Service::new(&ServiceName::new(LATENCY_SERVICE_NAME)?) + .publish_subscribe() + .max_publishers(2) + .max_subscribers(1) + .history_size(0) + .subscriber_max_buffer_size(2) + .enable_safe_overflow(false) + .open_or_create::()?; + + let latency_publisher = latency_service.publisher().create()?; + let mut latency_sample = latency_publisher.loan()?; + + // ready setup + let ready_event = zero_copy::Service::new(&ServiceName::new(READY_EVENT_NAME)?) + .event() + .open_or_create()?; + + let ready_notifier = ready_event.notifier().create()?; + + // signal ready to main process + ready_notifier.notify_with_custom_event_id(LEADER_READY_EVENT_ID)?; + + // wait for settings + match settings_listener.timed_wait(Duration::from_secs(2)) { + Ok(_) => { /* nothing to do */ } + Err(e) => Err(format!("Error while waiting for settings: {:?}", e))?, + } + + let settings = settings_subscriber.receive().unwrap().unwrap(); + + let mut remaining = settings.iterations; + let mut i = 0; + let mut warmup = 10_000; + loop { + let remaining_next = remaining - 1; + let send_timestamp = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH)? + .as_nanos(); + let sample = leader_publisher.loan_uninit()?.write_payload(BenchTopic { + info: Info { + timestamp: send_timestamp, + warmup: warmup > 0, + last: remaining_next == 0, + }, + data: MaybeUninit::uninit(), + }); + leader_publisher.send(sample)?; + + let mut abort_counter = 100_000_000; + let sample = loop { + match follower_subscriber.receive() { + Ok(None) => { /* nothing to do */ } + Ok(Some(sample)) => { + break sample; + } + Err(e) => Err(format!("Error at receiving samples: {:?}", e))?, + } + abort_counter -= 1; + if abort_counter == 0 { + Err("The follower process is not responding")?; + } + }; + if warmup > 0 { + warmup -= 1; + } else { + let receive_timestamp = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH)? + .as_nanos(); + let latency = receive_timestamp.saturating_sub(sample.info.timestamp); + latency_sample.payload_mut().latencies[i] = latency as u64; + remaining = remaining_next; + i += 1; + if i == settings.iterations { + break; + } + } + } + latency_sample.payload_mut().used_size = settings.iterations; + latency_publisher.send(latency_sample)?; + + println!("Leader finished!"); + + // FIXME the samples are not received when the process is gone + std::thread::sleep(Duration::from_secs(2)); + + Ok(()) +} diff --git a/benchmarks/multi_process_publish_subscribe/src/main.rs b/benchmarks/multi_process_publish_subscribe/src/main.rs new file mode 100644 index 0000000..cc2e905 --- /dev/null +++ b/benchmarks/multi_process_publish_subscribe/src/main.rs @@ -0,0 +1,196 @@ +mod follower; +mod leader; +mod params; +mod setup; + +use crate::setup::*; + +use elkodon::prelude::*; +use elkodon_bb_log::set_log_level; + +use clap::Parser; + +use std::process::Command; +use std::time::{Duration, SystemTime}; + +fn run_main_process(iterations: usize) -> Result<(), Box> { + if iterations > MAX_ITERATIONS { + Err(format!("Exceeding max iterations of: {}", MAX_ITERATIONS))?; + } + + // ready setup + let ready_event = zero_copy::Service::new(&ServiceName::new(READY_EVENT_NAME)?) + .event() + .open_or_create()?; + + let mut ready_listener = ready_event.listener().create()?; + + // settings setup + let settings_service = zero_copy::Service::new(&ServiceName::new(SETTINGS_SERVICE_NAME)?) + .publish_subscribe() + .max_publishers(1) + .max_subscribers(2) + .history_size(0) + .subscriber_max_buffer_size(1) + .enable_safe_overflow(false) + .open_or_create::()?; + + let settings_publisher = settings_service.publisher().create()?; + + let settings_event = zero_copy::Service::new(&ServiceName::new(SETTINGS_EVENT_NAME)?) + .event() + .open_or_create()?; + + let settings_notifier = settings_event.notifier().create()?; + + // latency result setup + let latency_service = zero_copy::Service::new(&ServiceName::new(LATENCY_SERVICE_NAME)?) + .publish_subscribe() + .max_publishers(2) + .max_subscribers(1) + .history_size(0) + .subscriber_max_buffer_size(2) + .enable_safe_overflow(false) + .open_or_create::()?; + + let latency_subscriber = latency_service.subscriber().create()?; + + println!("Spawning 'leader' and 'follower' process ..."); + let process_path = std::env::current_exe()?; + let mut follower_process = Command::new(&process_path).arg("--follower").spawn()?; + let mut leader_process = Command::new(&process_path).arg("--leader").spawn()?; + println!("... done"); + + // wait for leader and follower process to be ready + let mut leader_ready = false; + let mut follower_ready = false; + let wait_for_ready_start_time = SystemTime::now(); + loop { + match ready_listener.timed_wait(Duration::from_secs(1)) { + Ok(event_ids) => { + for &id in event_ids { + match id { + LEADER_READY_EVENT_ID => { + leader_ready = true; + } + FOLLOWER_READY_EVENT_ID => { + follower_ready = true; + } + _ => Err(format!("Received invalid event id: {:?}", id))?, + } + } + } + _ => {} + } + + if (leader_ready && follower_ready) + || wait_for_ready_start_time.elapsed()? > Duration::from_secs(20) + { + break; + } + } + + if !leader_ready || !follower_ready { + let _ = follower_process.kill(); + let _ = leader_process.kill(); + if !leader_ready && !follower_ready { + Err("The leader and follower processes are not ready!")?; + } else if !leader_ready { + Err("The leader process is not ready!")?; + } else { + Err("The follower process is not ready!")?; + } + } + + settings_publisher.send_copy(SettingsTopic { iterations })?; + settings_notifier.notify()?; + + println!("Waiting for benchmark to finish ..."); + + let mut samples: Vec<_> = Vec::new(); + while samples.len() < 2 { + if let Ok(Some(sample)) = latency_subscriber.receive() { + samples.push(sample); + } + std::thread::sleep(Duration::from_millis(100)); + } + + follower_process.wait()?; + leader_process.wait()?; + + println!("... finished"); + + println!("Processing ..."); + + // let sample1 = latency_subscriber.receive()?.unwrap(); + // let sample2 = latency_subscriber.receive()?.unwrap(); + let mut total_time = 0; + let mut total_iterations = 0; + let mut max_latency = 0; + let mut min_latency = 1_000_000_000; + let mut max_latency_index = 0; + let mut latency_larger_than_10us_count = 0; + let mut latency_larger_than_100us_count = 0; + let mut latency_larger_than_1000us_count = 0; + for sample in samples { + for i in 0..sample.used_size { + let latency = sample.latencies[i]; + total_time += latency; + if latency < min_latency { + min_latency = latency; + } + if latency > max_latency { + max_latency = latency; + max_latency_index = i; + } + if latency > 10_000 { + latency_larger_than_10us_count += 1; + } + if latency > 100_000 { + latency_larger_than_100us_count += 1; + } + if latency > 1_000_000 { + latency_larger_than_1000us_count += 1; + } + } + total_iterations += sample.used_size; + } + + println!("total transmissions: {}", total_iterations); + println!("min latency: {}ns", min_latency); + println!("avg latency: {}ns", total_time / (total_iterations as u64)); + println!( + "max latency: {}ns at index: {}", + max_latency, max_latency_index + ); + println!( + "latency larger than 10us count: {}", + latency_larger_than_10us_count + ); + println!( + "latency larger than 100us count: {}", + latency_larger_than_100us_count + ); + println!( + "latency larger than 1000us count: {}", + latency_larger_than_1000us_count + ); + + Ok(()) +} + +fn main() -> Result<(), Box> { + let params = params::Params::parse(); + + set_log_level(elkodon_bb_log::LogLevel::Error); + + if params.leader { + leader::run_leader_process()?; + } else if params.follower { + follower::run_follower_process()?; + } else { + run_main_process(params.iterations)?; + } + + Ok(()) +} diff --git a/benchmarks/multi_process_publish_subscribe/src/params.rs b/benchmarks/multi_process_publish_subscribe/src/params.rs new file mode 100644 index 0000000..f5f0fe7 --- /dev/null +++ b/benchmarks/multi_process_publish_subscribe/src/params.rs @@ -0,0 +1,17 @@ +use clap::Parser; + +/// Multi-process benchmark for the publish-subscribe messaging pattern +#[derive(Parser, Debug)] +#[command(author, version, about, long_about)] +pub struct Params { + /// For internal use only + #[arg(long)] + pub leader: bool, + /// For internal use only + #[arg(long)] + pub follower: bool, + /// The number of iterations to run + #[arg(long, default_value_t = 10_000_000)] + pub iterations: usize, + // TODO add mode with poll and event +} diff --git a/benchmarks/multi_process_publish_subscribe/src/setup.rs b/benchmarks/multi_process_publish_subscribe/src/setup.rs new file mode 100644 index 0000000..e38c8d0 --- /dev/null +++ b/benchmarks/multi_process_publish_subscribe/src/setup.rs @@ -0,0 +1,51 @@ +use elkodon::prelude::*; + +pub const MAX_ITERATIONS: usize = 10_000_000; + +pub const LEADER_READY_EVENT_ID: EventId = EventId::new(1); +pub const FOLLOWER_READY_EVENT_ID: EventId = EventId::new(2); + +pub const READY_EVENT_NAME: &'static str = "bench/event/ready"; + +pub const SETTINGS_SERVICE_NAME: &'static str = "bench/settings"; +pub const SETTINGS_EVENT_NAME: &'static str = "bench/event/settings"; + +pub const LEADER_SERVICE_NAME: &'static str = "bench/leader"; +pub const FOLLOWER_SERVICE_NAME: &'static str = "bench/follower"; + +pub const LATENCY_SERVICE_NAME: &'static str = "bench/latency"; + +use core::mem::MaybeUninit; + +#[derive(Debug)] +pub struct SettingsTopic { + pub iterations: usize, +} + +#[derive(Debug)] +pub struct Info { + pub timestamp: u128, + pub warmup: bool, + pub last: bool, +} + +#[derive(Debug)] +pub struct BenchTopic { + pub info: Info, + pub data: MaybeUninit<[u8; N]>, +} + +#[derive(Debug)] +pub struct LatencyTopic { + pub used_size: usize, + pub latencies: [u64; MAX_ITERATIONS], +} + +impl Default for LatencyTopic { + fn default() -> Self { + Self { + used_size: 0, + latencies: [0; MAX_ITERATIONS], + } + } +} diff --git a/elkodon/src/port/event_id.rs b/elkodon/src/port/event_id.rs index d60a84d..f2be8aa 100644 --- a/elkodon/src/port/event_id.rs +++ b/elkodon/src/port/event_id.rs @@ -34,12 +34,12 @@ pub struct EventId(u64); impl EventId { /// Creates a new [`EventId`] from a given integer value. - pub fn new(value: u64) -> Self { + pub const fn new(value: u64) -> Self { EventId(value) } /// Returns the underlying integer value of the [`EventId`]. - pub fn as_u64(&self) -> u64 { + pub const fn as_u64(&self) -> u64 { self.0 } }