Skip to content

Commit

Permalink
refactor(torture): address various follow-ups
Browse files Browse the repository at this point in the history
+ update docs
+ `scheduled_rollback` can either be forced to crash or not,
  stop treating rollback and rollback crash as two different things
+ handle delays with `Duration`s
  • Loading branch information
gabriele-0201 authored and pepyakin committed Jan 30, 2025
1 parent 8318629 commit daa4a87
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 104 deletions.
6 changes: 2 additions & 4 deletions torture/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ pub async fn run(input: UnixStream) -> Result<()> {
stream
.send(Envelope {
reqno,
message: ToSupervisor::CommitSuccessful(elapsed.as_nanos() as u64),
message: ToSupervisor::CommitSuccessful(elapsed),
})
.await?;
}
Expand Down Expand Up @@ -157,10 +157,9 @@ pub async fn run(input: UnixStream) -> Result<()> {
}

/// Execute the provided `task` and make it crash after the specified `crash_delay`.
/// The delay must be specified in nanoseconds.
async fn crash_task(
task: impl Future<Output = ()> + Send + 'static,
crash_delay: u64,
crash_delay: Duration,
op: &'static str,
) {
// TODO: implement this in the future.
Expand All @@ -177,7 +176,6 @@ async fn crash_task(
let barrier_2 = barrier_1.clone();
let task_1 = tokio::spawn(async move {
barrier_1.wait().await;
let crash_delay = Duration::from_nanos(crash_delay);
sleep(crash_delay).await;
tracing::info!("aborting {op} after {}ms", crash_delay.as_millis());
std::process::abort();
Expand Down
13 changes: 6 additions & 7 deletions torture/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
//! This is used only for inter-process communication and thus doesn't need to care about versioning
//! or compatibility.
use std::time::Duration;

use serde::{Deserialize, Serialize};

pub type Key = [u8; 32];
Expand Down Expand Up @@ -47,7 +49,7 @@ pub struct InitPayload {
/// Only used upon creation a new NOMT db.
pub bitbox_seed: [u8; 16],
/// Whether the agent is supposed to handle rollbacks.
/// If `Some`, the maximum amount of supported blocks in a single rollback is specified.
/// If `Some`, the maximum number of supported blocks in a single rollback is specified.
pub rollback: Option<u32>,
}

Expand All @@ -60,8 +62,7 @@ pub struct CommitPayload {
pub changeset: Vec<KeyValueChange>,
/// If Some the supervisor expects the commit to crash,
/// the crash should happen after the specified amount of time.
/// Time is specified in nanoseconds.
pub should_crash: Option<u64>,
pub should_crash: Option<Duration>,
}

/// The parameters for the [`ToAgent::Rollback`] message.
Expand All @@ -71,8 +72,7 @@ pub struct RollbackPayload {
pub n_commits: usize,
/// If Some the supervisor expects the rollback to crash,
/// the crash should happen after the specified amount of time.
/// Time is specified in nanoseconds.
pub should_crash: Option<u64>,
pub should_crash: Option<Duration>,
}

/// The maximum size of an envelope, in the serialized form.
Expand Down Expand Up @@ -118,8 +118,7 @@ pub enum ToSupervisor {
/// A generic acknowledgment message.
Ack,
/// The response to a successful commit, it contains the elapsed time to perform the commit.
/// Time is measured in nanoseconds.
CommitSuccessful(u64),
CommitSuccessful(Duration),
/// The response to a query for a key-value pair.
QueryValue(Option<Value>),
/// The response to a query for the current sequence number of the database.
Expand Down
121 changes: 28 additions & 93 deletions torture/src/supervisor/workload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ impl WorkloadState {
}

fn rollback(&mut self, snapshot: Snapshot) {
// The application of a rollback counts as increased sync_seq.
self.committed.sync_seqn += 1;
self.committed.state = snapshot.state;
}
Expand Down Expand Up @@ -246,8 +247,8 @@ pub struct Workload {
workload_id: u64,
/// The seed for bitbox generated for this workload.
bitbox_seed: [u8; 16],
/// Data collected to evaluate the average commit time in nanoseconds.
tot_commit_time: u64,
/// Data collected to evaluate the average commit time.
tot_commit_time: Duration,
n_successfull_commit: u64,
/// Whether to ensure the correct application of the changeset after every commit.
ensure_changeset: bool,
Expand All @@ -257,18 +258,9 @@ pub struct Workload {
sample_snapshot: bool,
/// The max number of commits involved in a rollback.
max_rollback_commits: u32,
/// If `Some` there are rollbacks waiting to be applied.
scheduled_rollbacks: ScheduledRollbacks,
}

/// Tracker of rollbacks waiting for being applied.
struct ScheduledRollbacks {
/// If `Some` there is rollback waiting to be applied.
rollback: Option<ScheduledRollback>,
/// If `Some` there is rollback waiting to be applied,
/// alongside the delay after which the rollback process should panic,
/// measured in nanoseconds.
rollback_crash: Option<(ScheduledRollback, u64)>,
/// possibly alongside the delay after which the rollback process should panic.
scheduled_rollback: Option<(ScheduledRollback, Option<Duration>)>,
}

/// Contains the information required to apply a rollback.
Expand Down Expand Up @@ -317,16 +309,13 @@ impl Workload {
state,
workload_id,
bitbox_seed,
tot_commit_time: 0,
tot_commit_time: Duration::ZERO,
n_successfull_commit: 0,
ensure_changeset: workload_params.ensure_changeset,
ensure_snapshot: workload_params.ensure_snapshot,
sample_snapshot: workload_params.sample_snapshot,
max_rollback_commits: workload_params.max_rollback_commits,
scheduled_rollbacks: ScheduledRollbacks {
rollback: None,
rollback_crash: None,
},
scheduled_rollback: None,
}
}

Expand Down Expand Up @@ -360,28 +349,20 @@ impl Workload {
let rr = agent.rr().clone();
trace!("run_iteration");

if let Some((scheduled_rollback, should_crash)) = self
.scheduled_rollbacks
.is_time(self.state.committed.sync_seqn)
{
if self.scheduled_rollback.as_ref().map_or(false, |(r, _)| {
r.sync_seqn == self.state.committed.sync_seqn
}) {
// UNWRAP: scheduled_rollback has just be checked to be `Some`
let (scheduled_rollback, should_crash) = self.scheduled_rollback.take().unwrap();
self.perform_scheduled_rollback(&rr, scheduled_rollback, should_crash)
.await?;
return Ok(());
}

// Do not schedule new rollbacks if they are already scheduled.
let mut rollback = self.state.biases.rollback;
if self.scheduled_rollbacks.is_rollback_scheduled() {
rollback = 0.0;
}

let mut rollback_crash = self.state.biases.rollback_crash;
if self.scheduled_rollbacks.is_rollback_crash_scheduled() {
rollback_crash = 0.0;
}

if self.state.rng.gen_bool(rollback) {
if self.state.rng.gen_bool(rollback_crash) {
let is_rollback_scheduled = self.scheduled_rollback.is_some();
if !is_rollback_scheduled && self.state.rng.gen_bool(self.state.biases.rollback) {
if self.state.rng.gen_bool(self.state.biases.rollback_crash) {
self.schedule_rollback(true /*should_crash*/).await?
} else {
self.schedule_rollback(false /*should_crash*/).await?
Expand Down Expand Up @@ -437,19 +418,20 @@ impl Workload {

// The agent should crash after `crash_delay`ns.
// If no data avaible crash after 300ms.
let mut crash_delay = self
let mut crash_delay_millis = self
.tot_commit_time
.checked_div(self.n_successfull_commit)
.unwrap_or(Duration::from_millis(300).as_nanos() as u64);
.as_millis()
.checked_div(self.n_successfull_commit as u128)
.unwrap_or(300) as u64;
// Crash a little bit earlier than the average commit time to increase the
// possibilities of crashing during sync.
crash_delay = (crash_delay as f64 * 0.98) as u64;
crash_delay_millis = (crash_delay_millis as f64 * 0.98) as u64;

trace!("exercising commit crash");
rr.send_request(crate::message::ToAgent::Commit(
crate::message::CommitPayload {
changeset: changeset.clone(),
should_crash: Some(crash_delay),
should_crash: Some(Duration::from_millis(crash_delay_millis)),
},
))
.await?;
Expand Down Expand Up @@ -478,10 +460,6 @@ impl Workload {

async fn schedule_rollback(&mut self, should_crash: bool) -> anyhow::Result<()> {
let n_commits_to_rollback = self.state.rng.gen_range(1..self.max_rollback_commits) as usize;
if n_commits_to_rollback == 0 {
trace!("No available commits to perform rollback with");
return Ok(());
}

let last_snapshot = &self.state.committed;
let rollback_sync_seqn = last_snapshot.sync_seqn + n_commits_to_rollback as u32;
Expand All @@ -491,14 +469,15 @@ impl Workload {
snapshot: last_snapshot.clone(),
};

if should_crash {
let maybe_crash_delay = if should_crash {
// TODO: more complex crash delay evaluation for rollbacks.
let crash_delay = Duration::from_millis(10).as_nanos() as u64;
self.scheduled_rollbacks.rollback_crash = Some((scheduled_rollback, crash_delay));
Some(Duration::from_millis(10))
} else {
self.scheduled_rollbacks.rollback = Some(scheduled_rollback);
None
};

self.scheduled_rollback = Some((scheduled_rollback, maybe_crash_delay));

trace!(
"scheduled rollback {}for sync_seqn: {} of {} commits",
if should_crash { "crash " } else { "" },
Expand All @@ -513,7 +492,7 @@ impl Workload {
&mut self,
rr: &comms::RequestResponse,
scheduled_rollback: ScheduledRollback,
should_crash: Option<u64>,
should_crash: Option<Duration>,
) -> anyhow::Result<()> {
let ScheduledRollback {
n_commits,
Expand Down Expand Up @@ -560,7 +539,7 @@ impl Workload {
rr: &comms::RequestResponse,
n_commits_to_rollback: usize,
snapshot: Snapshot,
crash_delay: u64,
crash_delay: Duration,
) -> anyhow::Result<()> {
trace!(
"exercising rollback crash of {} commits",
Expand Down Expand Up @@ -778,47 +757,3 @@ impl Workload {
self.workdir
}
}

impl ScheduledRollbacks {
fn is_time(&mut self, curr_sync_seqn: u32) -> Option<(ScheduledRollback, Option<u64>)> {
if self
.rollback
.as_ref()
.map_or(false, |ScheduledRollback { sync_seqn, .. }| {
curr_sync_seqn == *sync_seqn
})
{
// UNWRAP: self.rollback has just been checked to be Some.
let scheduled_rollback = self.rollback.take().unwrap();

// The probability of having scheduled at the same time both a rollback and
// a rollback crash is very low, but if it happens, only the rollback will be applied.
// Discard the rollback crash data.
let _ = self.is_time(curr_sync_seqn);

return Some((scheduled_rollback, None));
}

if self
.rollback_crash
.as_ref()
.map_or(false, |(ScheduledRollback { sync_seqn, .. }, _)| {
curr_sync_seqn == *sync_seqn
})
{
// UNWRAP: self.rollback has just been checked to be Some.
let (scheduled_rollback, crash_delay) = self.rollback_crash.take().unwrap();
return Some((scheduled_rollback, Some(crash_delay)));
}

return None;
}

fn is_rollback_scheduled(&self) -> bool {
self.rollback.is_some()
}

fn is_rollback_crash_scheduled(&self) -> bool {
self.rollback_crash.is_some()
}
}

0 comments on commit daa4a87

Please sign in to comment.