Skip to content

Commit

Permalink
Implement coverage worker feature with simpler thread
Browse files Browse the repository at this point in the history
  • Loading branch information
louismerlin committed Oct 30, 2024
1 parent d21edae commit d9e7c9d
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 200 deletions.
41 changes: 0 additions & 41 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ anyhow = { version = "1.0.83", optional = true }
cargo_metadata = { version = "0.18.1", optional = true }
clap = { version = "4.5.4", features = ["cargo", "derive", "env"], optional = true }
console = { version = "0.15.8", optional = true }
ctrlc = "3.4.4"
env_logger = { version = "0.11.3", optional = true }
fork = { version = "0.1.23", optional = true }
glob = { version = "0.3.1", optional = true }
Expand Down
243 changes: 94 additions & 149 deletions src/bin/cargo-ziggy/fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ use anyhow::{anyhow, Error};
use console::{style, Term};
use glob::glob;
use std::{
collections::VecDeque,
env,
fs::File,
io::Write,
path::Path,
process::{self, Stdio},
sync::mpsc::{channel, TryRecvError},
sync::{Arc, Mutex},
thread,
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
};
Expand Down Expand Up @@ -81,7 +80,7 @@ impl Fuzz {
}

// Manages the continuous running of fuzzers
pub fn fuzz(&mut self, kill_channel: Receiver<()>) -> Result<(), anyhow::Error> {
pub fn fuzz(&mut self) -> Result<(), anyhow::Error> {
if !self.fuzz_binary() {
let build = Build {
no_afl: !self.afl(),
Expand Down Expand Up @@ -162,140 +161,101 @@ impl Fuzz {
return Err(anyhow!("cannot use --no-afl with --coverage-worker!"));
}

let (cov_worker, cov_worker_messages_rx) = match self.coverage_worker {
true => {
info!("cleaning old coverage files");
Cover::clean_old_cov()?;

// build coverage the runner
info!("building coverage worker");
Cover::build_runner()?;

let workspace_root = cargo_metadata::MetadataCommand::new()
.exec()?
.workspace_root
.to_string();
let coverage_interval = self.coverage_interval;
let target = self.target.clone();
let main_corpus = PathBuf::from(self.output_target())
.join("afl")
.join("mainaflfuzzer")
.join("queue");
let mut cov_worker_last_run = None;
let (cov_tx, mut cov_rx) = channel::<()>();
let (cov_message_tx, cov_message_rx) = channel::<String>();
// start the coverage worker
info!("starting coverage worker");
let cov_worker_thread = thread::spawn(move || -> Result<(), anyhow::Error> {
loop {
if should_thread_exit(&mut cov_rx)? {
return Ok(());
}
cov_message_tx
.send(format!("sleeping for {} minutes", coverage_interval))?;
thread::sleep(Duration::from_secs(coverage_interval * 60));
cov_message_tx.send("i am awake".to_string())?;
let entries = std::fs::read_dir(&main_corpus)?;
let last_run = SystemTime::now();
let mut new_entries = 0;
for entry in entries {
if entry.is_err() {
continue;
}
let entry = entry.unwrap().path();
// We only want to run corpus entries created since the last time we ran.
let created = entry.metadata()?.created()?;
let should_run = match cov_worker_last_run {
None => true,
Some(start_time) => start_time < created && last_run >= created,
};
if should_run {
cov_message_tx.send(format!("running {}", entry.display()))?;
process::Command::new(format!(
"./target/coverage/debug/{}",
&target
))
.arg(format!("{}", entry.display()))
.stdout(Stdio::null())
.stderr(Stdio::null())
.spawn()?
.wait()?;
new_entries += 1;
}
if should_thread_exit(&mut cov_rx)? {
return Ok(());
}
}
if new_entries > 0 {
cov_message_tx.send("generating grcov report".to_string())?;
Cover::run_grcov(&target, "html", "coverage", &workspace_root)?;
}
cov_worker_last_run = Some(last_run);
cov_message_tx.send(format!(
"last_run = {:?}; new_entries = {}",
last_run.duration_since(UNIX_EPOCH)?.as_secs(),
new_entries
))?;
}
});
(Some((cov_worker_thread, cov_tx)), Some(cov_message_rx))
}
false => (None, None),
};
let mut cov_worker_messages = VecDeque::<String>::new();
// We prepare builds for the coverage worker
if self.coverage_worker {
info!("cleaning old coverage files");
Cover::clean_old_cov()?;
info!("building coverage worker");
Cover::build_runner()?;
}
let last_coverage_time = Arc::new(Mutex::new(Instant::now()));
//let mut last_coverage_time = Instant::now();
let coverage_now_running = Arc::new(Mutex::new(false));
// let mut coverage_now_running = false;
let workspace_root = cargo_metadata::MetadataCommand::new()
.exec()?
.workspace_root
.to_string();
let target = self.target.clone();
let main_corpus = self.corpus();
let output_target = self.output_target();

loop {
let sleep_duration = Duration::from_secs(1);
thread::sleep(sleep_duration);
if self.coverage_worker {
match kill_channel.try_recv() {
Ok(_) => {
// We got ctrl-c time to exit!
println!("Waiting for coverage worker to exit;");
// If the cov worker is sleeping, we can kill it, no bad side effects will happen.
if !cov_worker_messages
.iter()
.last()
.expect("cov worker should have sent a message!")
.starts_with("sleeping")
{
let (cov_worker, cov_tx) = cov_worker.unwrap();
cov_tx.send(())?;
cov_worker
.join()
.expect("the cov_worker exited before we could join!")?;

let coverage_status = match (
self.coverage_worker,
*coverage_now_running.lock().unwrap(),
last_coverage_time.lock().unwrap().elapsed().as_secs() / 60,
) {
(true, false, wait) if wait < self.coverage_interval => String::from("waiting"),
(true, _, _) => String::from("running"),
(false, _, _) => String::from("disabled"),
};

self.print_stats(&coverage_status);

if coverage_status.as_str() == "running" {
//let string_clone = Arc::clone(&shared_string);

//let handle = thread::spawn(move || {
//let mut data = string_clone.lock().unwrap();
//data.push_str(" world");
//});
let mut shared_cov_now_running = coverage_now_running.lock().unwrap();
*shared_cov_now_running = true;

let main_corpus = main_corpus.clone();
let target = target.clone();
let workspace_root = workspace_root.clone();
let output_target = output_target.clone();
let last_coverage_time_thread = Arc::clone(&last_coverage_time);
let coverage_now_running_thread = Arc::clone(&coverage_now_running);

thread::spawn(move || {
let entries = std::fs::read_dir(&main_corpus).unwrap();
let mut new_entries = 0;
let last_cov_time = {
let unlocked = last_coverage_time_thread.lock().unwrap();
*unlocked
};
for entry in entries.flatten() {
let entry = entry.path();
// We only want to run corpus entries created since the last time we ran.
let created = entry
.metadata()
.unwrap()
.created()
.unwrap()
.elapsed()
.unwrap_or_default();
// TODO Handle corpus entries that were created during the last run.
if last_cov_time.elapsed() >= created {
process::Command::new(format!("./target/coverage/debug/{}", &target))
.arg(format!("{}", entry.display()))
.stdout(Stdio::null())
.stderr(Stdio::null())
.spawn()
.unwrap()
.wait()
.unwrap();
new_entries += 1;
}
return Ok(());
}
Err(TryRecvError::Disconnected) => {
return Err(anyhow!(
"Ctrl-c channel disconnected; this should not happen!"
))
if new_entries > 0 {
let coverage_dir = output_target + "/coverage";
let _ = fs::remove_dir_all(&coverage_dir);
Cover::run_grcov(&target.clone(), "html", &coverage_dir, &workspace_root)
.unwrap();
}
Err(TryRecvError::Empty) => {
let messages_rx = cov_worker_messages_rx.as_ref().unwrap();
for _i in 0..50 {
match messages_rx.try_recv() {
Ok(message) => {
if cov_worker_messages.len() == 50 {
cov_worker_messages.pop_front();
}
cov_worker_messages.push_back(message)
}
Err(TryRecvError::Empty) => {
break;
}
Err(TryRecvError::Disconnected) => {
return Err(anyhow!(
"Coverage worker died; this should not happen!"
))
}
}
}
}
}
}

self.print_stats(&cov_worker_messages);
let mut unlock_last_cov_time = last_coverage_time_thread.lock().unwrap();
*unlock_last_cov_time = Instant::now();
let mut end_cov_now_running = coverage_now_running_thread.lock().unwrap();
*end_cov_now_running = false;
});
}

if !afl_output_ok {
if let Ok(afl_log) =
Expand Down Expand Up @@ -742,7 +702,7 @@ impl Fuzz {
Ok(())
}

pub fn print_stats(&self, cov_worker_messages: &VecDeque<String>) {
pub fn print_stats(&self, cov_worker_status: &str) {
let fuzzer_name = format!(" {} ", self.target);

let reset = "\x1b[0m";
Expand Down Expand Up @@ -935,13 +895,11 @@ impl Fuzz {
}
if self.coverage_worker {
screen += &format!(
"β”œβ”€ {blue}coverage-worker{reset}β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜\n"
"β”œβ”€ {blue}coverage worker{reset}β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜\n"
);
for message in cov_worker_messages {
screen += &format!("β”œβ”€ {message}\n");
}
screen += &format!("β”œβ”€ {:29.29} β”‚\n", cov_worker_status);
}
screen += "β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜";
screen += "β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜";
eprintln!("{screen}");
}
}
Expand Down Expand Up @@ -1015,16 +973,3 @@ pub fn extract_file_id(file: &Path) -> Option<(u32, String)> {
let file_id = str_id.parse::<u32>().ok()?;
Some((file_id, String::from(file_name)))
}

fn should_thread_exit<T>(rx: &mut Receiver<T>) -> Result<bool, anyhow::Error> {
match rx.try_recv() {
Ok(_) => {
// We got ctrl-c time to exit!
Ok(true)
}
Err(TryRecvError::Disconnected) => {
Err(anyhow!("channel disconnected; this should not happen!"))
}
Err(TryRecvError::Empty) => Ok(false),
}
}
Loading

0 comments on commit d9e7c9d

Please sign in to comment.