Skip to content
This repository has been archived by the owner on Dec 21, 2024. It is now read-only.

Commit

Permalink
chore: default RunConfig will print to stdout
Browse files Browse the repository at this point in the history
  • Loading branch information
NathanFlurry committed Jul 23, 2024
1 parent 4440872 commit d94a158
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 32 deletions.
2 changes: 1 addition & 1 deletion rivet-cli/src/commands/login.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub struct Opts {

impl Opts {
pub async fn execute(&self) -> GlobalResult<()> {
let (run_config, _temp_dir) = RunConfig::with_temp_dir()?;
let run_config = RunConfig::empty();

// Check if linked
let output =
Expand Down
2 changes: 1 addition & 1 deletion rivet-cli/src/commands/logout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub struct Opts {}

impl Opts {
pub async fn execute(&self) -> GlobalResult<()> {
let (run_config, _temp_dir) = RunConfig::with_temp_dir()?;
let run_config = RunConfig::empty();

run_task::<unlink::Task>(run_config.clone(), unlink::Input {}).await?;
eprintln!("Logged out");
Expand Down
48 changes: 30 additions & 18 deletions rivet-toolchain/src/util/task/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,39 @@ pub enum LogEvent {
Stderr(String),
}

pub async fn log_writer(mut log_rx: mpsc::UnboundedReceiver<LogEvent>, log_file: File) {
let mut log_writer = BufWriter::new(log_file);
pub async fn log_writer(mut log_rx: mpsc::UnboundedReceiver<LogEvent>, log_file: Option<File>) {
if let Some(log_file) = log_file {
// Write to file

while let Some(event) = log_rx.recv().await {
// HACK: serde_json::to_writer is not async
let event_json = match serde_json::to_vec(&event) {
Ok(x) => x,
Err(err) => {
eprintln!("Failed to serialize event: {err:?}");
continue;
}
};
let mut log_writer = BufWriter::new(log_file);

if log_writer.write_all(&event_json).await.is_err() {
eprintln!("Failed to write event to stdout log file");
}
if log_writer.write_all(b"\n").await.is_err() {
eprintln!("Failed to write newline to stdout log file");
while let Some(event) = log_rx.recv().await {
// HACK: serde_json::to_writer is not async
let event_json = match serde_json::to_vec(&event) {
Ok(x) => x,
Err(err) => {
eprintln!("Failed to serialize event: {err:?}");
continue;
}
};
if log_writer.write_all(&event_json).await.is_err() {
eprintln!("Failed to write event to stdout log file");
}
if log_writer.write_all(b"\n").await.is_err() {
eprintln!("Failed to write newline to stdout log file");
}
if log_writer.flush().await.is_err() {
eprintln!("Failed to flush stdout log file");
}
}
if log_writer.flush().await.is_err() {
eprintln!("Failed to flush stdout log file");
} else {
// Write to stdout

while let Some(event) = log_rx.recv().await {
match event {
LogEvent::Stdout(x) => println!("{x}"),
LogEvent::Stderr(x) => eprintln!("{x}"),
}
}
}
}
47 changes: 35 additions & 12 deletions rivet-toolchain/src/util/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,32 @@ use crate::tasks::Task;
#[derive(Deserialize, Clone)]
pub struct RunConfig {
/// Path to file that will abort this task if exists.
pub abort_path: String,
///
/// If none provided, the task will not be abortable.
pub abort_path: Option<String>,

/// Path to file to output events.
pub output_path: String,
///
/// If none provided, will be logged to standard output.
pub output_path: Option<String>,
}

impl RunConfig {
pub fn empty() -> Self {
RunConfig {
abort_path: None,
output_path: None,
}
}

/// Creates a new config with paths in a temp dir.
pub fn with_temp_dir() -> GlobalResult<(Self, TempDir)> {
let temp_dir = tempfile::tempdir()?;

Ok((
Self {
abort_path: temp_dir.path().join("abort").display().to_string(),
output_path: temp_dir.path().join("output").display().to_string(),
abort_path: Some(temp_dir.path().join("abort").display().to_string()),
output_path: Some(temp_dir.path().join("output").display().to_string()),
},
temp_dir,
))
Expand All @@ -47,11 +59,17 @@ where
let (log_tx, log_rx) = mpsc::unbounded_channel::<log::LogEvent>();
let (shutdown_tx, shutdown_rx) = broadcast::channel(1);

let output_file = OpenOptions::new()
.create(true)
.append(true)
.open(run_config.output_path)
.await?;
let output_file = if let Some(output_path) = run_config.output_path {
Some(
OpenOptions::new()
.create(true)
.append(true)
.open(output_path)
.await?,
)
} else {
None
};

task::spawn(log::log_writer(log_rx, output_file));

Expand All @@ -60,7 +78,7 @@ where
// Wait for task or abort
let output = tokio::select! {
result = T::run(task_ctx.clone(), input) => result,
_ = wait_for_abort(&run_config.abort_path) => {
_ = wait_for_abort(run_config.abort_path.clone()) => {
Err(err_code!(ERROR, error = "Task aborted"))
},
};
Expand All @@ -74,9 +92,14 @@ where
const POLL_ABORT_INTERVAL: Duration = Duration::from_millis(250);

/// Waits for file to exist before completing.
async fn wait_for_abort(path: &str) {
async fn wait_for_abort(path: Option<String>) {
let Some(path) = path else {
// Wait forever, since this task is not abortable.
return std::future::pending::<()>().await;
};

// HACK: Use file watcher
let path = Path::new(path);
let path = Path::new(&path);
loop {
// TODO: Do this async
if path.exists() {
Expand Down

0 comments on commit d94a158

Please sign in to comment.