Skip to content

Commit

Permalink
[larry-robotics#42] Initial multi-process benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
elBoberido committed Dec 6, 2023
1 parent edfa0cf commit b5babad
Show file tree
Hide file tree
Showing 8 changed files with 528 additions and 3 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ members = [

"examples",

"benchmarks/publish_subscribe"
"benchmarks/publish_subscribe",
"benchmarks/multi_process_publish_subscribe",
]

[workspace.package]
Expand Down
15 changes: 15 additions & 0 deletions benchmarks/multi_process_publish_subscribe/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "benchmark_multi_process_publish_subscribe"
description = "Elkodon: multi-process benchmark for publish-subscribe messaging pattern"
rust-version = { workspace = true }
version = { workspace = true }
repository = { workspace = true }
edition = { workspace = true }

[dependencies]
elkodon_bb_log = { workspace = true }
elkodon = { workspace = true }
elkodon_bb_posix = { workspace = true }
elkodon_bb_container = { workspace = true }

clap = { version = "4.4", features = ["derive"] }
106 changes: 106 additions & 0 deletions benchmarks/multi_process_publish_subscribe/src/follower.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
use crate::setup::*;

use elkodon::prelude::*;

use core::mem::MaybeUninit;
use std::time::{Duration, SystemTime};

pub fn run_follower_process() -> Result<(), Box<dyn std::error::Error>> {
// follower setup
let follower_service = zero_copy::Service::new(&ServiceName::new(FOLLOWER_SERVICE_NAME)?)
.publish_subscribe()
.max_publishers(1)
.max_subscribers(1)
.history_size(0)
.subscriber_max_buffer_size(1)
.enable_safe_overflow(false)
.open_or_create::<BenchTopic<1024>>()?;

let follower_publisher = follower_service.publisher().create()?;

// leader setup
let leader_service = zero_copy::Service::new(&ServiceName::new(LEADER_SERVICE_NAME)?)
.publish_subscribe()
.max_publishers(1)
.max_subscribers(1)
.history_size(0)
.subscriber_max_buffer_size(1)
.enable_safe_overflow(false)
.open_or_create::<BenchTopic<1024>>()?;

let leader_subscriber = leader_service.subscriber().create()?;

// latency result setup
let latency_service = zero_copy::Service::new(&ServiceName::new(LATENCY_SERVICE_NAME)?)
.publish_subscribe()
.max_publishers(2)
.max_subscribers(1)
.history_size(0)
.subscriber_max_buffer_size(2)
.enable_safe_overflow(false)
.open_or_create::<LatencyTopic>()?;

let latency_publisher = latency_service.publisher().create()?;
let mut latency_sample = latency_publisher.loan()?;

// ready setup
let ready_event = zero_copy::Service::new(&ServiceName::new(READY_EVENT_NAME)?)
.event()
.open_or_create()?;

let ready_notifier = ready_event.notifier().create()?;

// signal ready to main process
ready_notifier.notify_with_custom_event_id(FOLLOWER_READY_EVENT_ID)?;

let mut i = 0;
let mut finished = false;
while !finished {
let mut abort_counter = 100_000_000;
let sample = loop {
match leader_subscriber.receive() {
Ok(None) => { /* nothing to do */ }
Ok(Some(sample)) => {
break sample;
}
Err(e) => Err(format!("Error at receiving samples: {:?}", e))?,
}
abort_counter -= 1;
if abort_counter == 0 {
Err("The leader process is not responding")?;
}
};

if !sample.info.warmup {
let receive_timestamp = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)?
.as_nanos();
let latency = receive_timestamp.saturating_sub(sample.info.timestamp);
latency_sample.payload_mut().latencies[i] = latency as u64;
finished = sample.info.last;
i += 1;
}

let send_timestamp = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)?
.as_nanos();
let sample = follower_publisher.loan_uninit()?.write_payload(BenchTopic {
info: Info {
timestamp: send_timestamp,
warmup: sample.info.warmup,
last: finished,
},
data: MaybeUninit::uninit(),
});
follower_publisher.send(sample)?;
}
latency_sample.payload_mut().used_size = i;
latency_publisher.send(latency_sample)?;

println!("Follower finished!");

// FIXME the samples are not received when the process is gone
std::thread::sleep(Duration::from_secs(2));

Ok(())
}
139 changes: 139 additions & 0 deletions benchmarks/multi_process_publish_subscribe/src/leader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
use crate::setup::*;

use elkodon::prelude::*;

use core::mem::MaybeUninit;
use core::time::Duration;
use std::time::SystemTime;

pub fn run_leader_process() -> Result<(), Box<dyn std::error::Error>> {
// settings setup
let settings_service = zero_copy::Service::new(&ServiceName::new(SETTINGS_SERVICE_NAME)?)
.publish_subscribe()
.max_publishers(1)
.max_subscribers(2)
.history_size(0)
.subscriber_max_buffer_size(1)
.enable_safe_overflow(false)
.open_or_create::<SettingsTopic>()?;

let settings_subscriber = settings_service.subscriber().create()?;

let settings_event = zero_copy::Service::new(&ServiceName::new(SETTINGS_EVENT_NAME)?)
.event()
.open_or_create()?;

let mut settings_listener = settings_event.listener().create()?;

// leader setup
let leader_service = zero_copy::Service::new(&ServiceName::new(LEADER_SERVICE_NAME)?)
.publish_subscribe()
.max_publishers(1)
.max_subscribers(1)
.history_size(0)
.subscriber_max_buffer_size(1)
.enable_safe_overflow(false)
.open_or_create::<BenchTopic<1024>>()?;

let leader_publisher = leader_service.publisher().create()?;

// follower setup
let follower_service = zero_copy::Service::new(&ServiceName::new(FOLLOWER_SERVICE_NAME)?)
.publish_subscribe()
.max_publishers(1)
.max_subscribers(1)
.history_size(0)
.subscriber_max_buffer_size(1)
.enable_safe_overflow(false)
.open_or_create::<BenchTopic<1024>>()?;

let follower_subscriber = follower_service.subscriber().create()?;

// latency result setup
let latency_service = zero_copy::Service::new(&ServiceName::new(LATENCY_SERVICE_NAME)?)
.publish_subscribe()
.max_publishers(2)
.max_subscribers(1)
.history_size(0)
.subscriber_max_buffer_size(2)
.enable_safe_overflow(false)
.open_or_create::<LatencyTopic>()?;

let latency_publisher = latency_service.publisher().create()?;
let mut latency_sample = latency_publisher.loan()?;

// ready setup
let ready_event = zero_copy::Service::new(&ServiceName::new(READY_EVENT_NAME)?)
.event()
.open_or_create()?;

let ready_notifier = ready_event.notifier().create()?;

// signal ready to main process
ready_notifier.notify_with_custom_event_id(LEADER_READY_EVENT_ID)?;

// wait for settings
match settings_listener.timed_wait(Duration::from_secs(2)) {
Ok(_) => { /* nothing to do */ }
Err(e) => Err(format!("Error while waiting for settings: {:?}", e))?,
}

let settings = settings_subscriber.receive().unwrap().unwrap();

let mut remaining = settings.iterations;
let mut i = 0;
let mut warmup = 10_000;
loop {
let remaining_next = remaining - 1;
let send_timestamp = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)?
.as_nanos();
let sample = leader_publisher.loan_uninit()?.write_payload(BenchTopic {
info: Info {
timestamp: send_timestamp,
warmup: warmup > 0,
last: remaining_next == 0,
},
data: MaybeUninit::uninit(),
});
leader_publisher.send(sample)?;

let mut abort_counter = 100_000_000;
let sample = loop {
match follower_subscriber.receive() {
Ok(None) => { /* nothing to do */ }
Ok(Some(sample)) => {
break sample;
}
Err(e) => Err(format!("Error at receiving samples: {:?}", e))?,
}
abort_counter -= 1;
if abort_counter == 0 {
Err("The follower process is not responding")?;
}
};
if warmup > 0 {
warmup -= 1;
} else {
let receive_timestamp = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)?
.as_nanos();
let latency = receive_timestamp.saturating_sub(sample.info.timestamp);
latency_sample.payload_mut().latencies[i] = latency as u64;
remaining = remaining_next;
i += 1;
if i == settings.iterations {
break;
}
}
}
latency_sample.payload_mut().used_size = settings.iterations;
latency_publisher.send(latency_sample)?;

println!("Leader finished!");

// FIXME the samples are not received when the process is gone
std::thread::sleep(Duration::from_secs(2));

Ok(())
}
Loading

0 comments on commit b5babad

Please sign in to comment.