diff --git a/rivet-cli/src/commands/login.rs b/rivet-cli/src/commands/login.rs index 650a6d73..6272e514 100644 --- a/rivet-cli/src/commands/login.rs +++ b/rivet-cli/src/commands/login.rs @@ -13,7 +13,7 @@ pub struct Opts { impl Opts { pub async fn execute(&self) -> GlobalResult<()> { - let (run_config, _temp_dir) = RunConfig::with_temp_dir()?; + let run_config = RunConfig::empty(); // Check if linked let output = diff --git a/rivet-cli/src/commands/logout.rs b/rivet-cli/src/commands/logout.rs index 02438e99..f0b0c7b5 100644 --- a/rivet-cli/src/commands/logout.rs +++ b/rivet-cli/src/commands/logout.rs @@ -10,7 +10,7 @@ pub struct Opts {} impl Opts { pub async fn execute(&self) -> GlobalResult<()> { - let (run_config, _temp_dir) = RunConfig::with_temp_dir()?; + let run_config = RunConfig::empty(); run_task::(run_config.clone(), unlink::Input {}).await?; eprintln!("Logged out"); diff --git a/rivet-toolchain/src/util/task/log.rs b/rivet-toolchain/src/util/task/log.rs index 0063874d..f20d1056 100644 --- a/rivet-toolchain/src/util/task/log.rs +++ b/rivet-toolchain/src/util/task/log.rs @@ -11,27 +11,39 @@ pub enum LogEvent { Stderr(String), } -pub async fn log_writer(mut log_rx: mpsc::UnboundedReceiver, log_file: File) { - let mut log_writer = BufWriter::new(log_file); +pub async fn log_writer(mut log_rx: mpsc::UnboundedReceiver, log_file: Option) { + if let Some(log_file) = log_file { + // Write to file - while let Some(event) = log_rx.recv().await { - // HACK: serde_json::to_writer is not async - let event_json = match serde_json::to_vec(&event) { - Ok(x) => x, - Err(err) => { - eprintln!("Failed to serialize event: {err:?}"); - continue; - } - }; + let mut log_writer = BufWriter::new(log_file); - if log_writer.write_all(&event_json).await.is_err() { - eprintln!("Failed to write event to stdout log file"); - } - if log_writer.write_all(b"\n").await.is_err() { - eprintln!("Failed to write newline to stdout log file"); + while let Some(event) = log_rx.recv().await { + // HACK: serde_json::to_writer is not async + let event_json = match serde_json::to_vec(&event) { + Ok(x) => x, + Err(err) => { + eprintln!("Failed to serialize event: {err:?}"); + continue; + } + }; + if log_writer.write_all(&event_json).await.is_err() { + eprintln!("Failed to write event to stdout log file"); + } + if log_writer.write_all(b"\n").await.is_err() { + eprintln!("Failed to write newline to stdout log file"); + } + if log_writer.flush().await.is_err() { + eprintln!("Failed to flush stdout log file"); + } } - if log_writer.flush().await.is_err() { - eprintln!("Failed to flush stdout log file"); + } else { + // Write to stdout + + while let Some(event) = log_rx.recv().await { + match event { + LogEvent::Stdout(x) => println!("{x}"), + LogEvent::Stderr(x) => eprintln!("{x}"), + } } } } diff --git a/rivet-toolchain/src/util/task/mod.rs b/rivet-toolchain/src/util/task/mod.rs index 81f30d79..de02db64 100644 --- a/rivet-toolchain/src/util/task/mod.rs +++ b/rivet-toolchain/src/util/task/mod.rs @@ -19,20 +19,32 @@ use crate::tasks::Task; #[derive(Deserialize, Clone)] pub struct RunConfig { /// Path to file that will abort this task if exists. - pub abort_path: String, + /// + /// If none provided, the task will not be abortable. + pub abort_path: Option, + /// Path to file to output events. - pub output_path: String, + /// + /// If none provided, will be logged to standard output. + pub output_path: Option, } impl RunConfig { + pub fn empty() -> Self { + RunConfig { + abort_path: None, + output_path: None, + } + } + /// Creates a new config with paths in a temp dir. pub fn with_temp_dir() -> GlobalResult<(Self, TempDir)> { let temp_dir = tempfile::tempdir()?; Ok(( Self { - abort_path: temp_dir.path().join("abort").display().to_string(), - output_path: temp_dir.path().join("output").display().to_string(), + abort_path: Some(temp_dir.path().join("abort").display().to_string()), + output_path: Some(temp_dir.path().join("output").display().to_string()), }, temp_dir, )) @@ -47,11 +59,17 @@ where let (log_tx, log_rx) = mpsc::unbounded_channel::(); let (shutdown_tx, shutdown_rx) = broadcast::channel(1); - let output_file = OpenOptions::new() - .create(true) - .append(true) - .open(run_config.output_path) - .await?; + let output_file = if let Some(output_path) = run_config.output_path { + Some( + OpenOptions::new() + .create(true) + .append(true) + .open(output_path) + .await?, + ) + } else { + None + }; task::spawn(log::log_writer(log_rx, output_file)); @@ -60,7 +78,7 @@ where // Wait for task or abort let output = tokio::select! { result = T::run(task_ctx.clone(), input) => result, - _ = wait_for_abort(&run_config.abort_path) => { + _ = wait_for_abort(run_config.abort_path.clone()) => { Err(err_code!(ERROR, error = "Task aborted")) }, }; @@ -74,9 +92,14 @@ where const POLL_ABORT_INTERVAL: Duration = Duration::from_millis(250); /// Waits for file to exist before completing. -async fn wait_for_abort(path: &str) { +async fn wait_for_abort(path: Option) { + let Some(path) = path else { + // Wait forever, since this task is not abortable. + return std::future::pending::<()>().await; + }; + // HACK: Use file watcher - let path = Path::new(path); + let path = Path::new(&path); loop { // TODO: Do this async if path.exists() {