Skip to content
This repository has been archived by the owner on Dec 21, 2024. It is now read-only.

chore: default RunConfig will print to stdout #290

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rivet-cli/src/commands/login.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub struct Opts {

impl Opts {
pub async fn execute(&self) -> GlobalResult<()> {
let (run_config, _temp_dir) = RunConfig::with_temp_dir()?;
let run_config = RunConfig::empty();

// Check if linked
let output =
Expand Down
2 changes: 1 addition & 1 deletion rivet-cli/src/commands/logout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub struct Opts {}

impl Opts {
pub async fn execute(&self) -> GlobalResult<()> {
let (run_config, _temp_dir) = RunConfig::with_temp_dir()?;
let run_config = RunConfig::empty();

run_task::<unlink::Task>(run_config.clone(), unlink::Input {}).await?;
eprintln!("Logged out");
Expand Down
48 changes: 30 additions & 18 deletions rivet-toolchain/src/util/task/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,39 @@ pub enum LogEvent {
Stderr(String),
}

pub async fn log_writer(mut log_rx: mpsc::UnboundedReceiver<LogEvent>, log_file: File) {
let mut log_writer = BufWriter::new(log_file);
pub async fn log_writer(mut log_rx: mpsc::UnboundedReceiver<LogEvent>, log_file: Option<File>) {
if let Some(log_file) = log_file {
// Write to file

while let Some(event) = log_rx.recv().await {
// HACK: serde_json::to_writer is not async
let event_json = match serde_json::to_vec(&event) {
Ok(x) => x,
Err(err) => {
eprintln!("Failed to serialize event: {err:?}");
continue;
}
};
let mut log_writer = BufWriter::new(log_file);

if log_writer.write_all(&event_json).await.is_err() {
eprintln!("Failed to write event to stdout log file");
}
if log_writer.write_all(b"\n").await.is_err() {
eprintln!("Failed to write newline to stdout log file");
while let Some(event) = log_rx.recv().await {
// HACK: serde_json::to_writer is not async
let event_json = match serde_json::to_vec(&event) {
Ok(x) => x,
Err(err) => {
eprintln!("Failed to serialize event: {err:?}");
continue;
}
};
if log_writer.write_all(&event_json).await.is_err() {
eprintln!("Failed to write event to stdout log file");
}
if log_writer.write_all(b"\n").await.is_err() {
eprintln!("Failed to write newline to stdout log file");
}
if log_writer.flush().await.is_err() {
eprintln!("Failed to flush stdout log file");
}
}
if log_writer.flush().await.is_err() {
eprintln!("Failed to flush stdout log file");
} else {
// Write to stdout

while let Some(event) = log_rx.recv().await {
match event {
LogEvent::Stdout(x) => println!("{x}"),
LogEvent::Stderr(x) => eprintln!("{x}"),
}
}
}
}
47 changes: 35 additions & 12 deletions rivet-toolchain/src/util/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,32 @@ use crate::tasks::Task;
#[derive(Deserialize, Clone)]
pub struct RunConfig {
/// Path to file that will abort this task if exists.
pub abort_path: String,
///
/// If none provided, the task will not be abortable.
pub abort_path: Option<String>,

/// Path to file to output events.
pub output_path: String,
///
/// If none provided, will be logged to standard output.
pub output_path: Option<String>,
}

impl RunConfig {
pub fn empty() -> Self {
RunConfig {
abort_path: None,
output_path: None,
}
}

/// Creates a new config with paths in a temp dir.
pub fn with_temp_dir() -> GlobalResult<(Self, TempDir)> {
let temp_dir = tempfile::tempdir()?;

Ok((
Self {
abort_path: temp_dir.path().join("abort").display().to_string(),
output_path: temp_dir.path().join("output").display().to_string(),
abort_path: Some(temp_dir.path().join("abort").display().to_string()),
output_path: Some(temp_dir.path().join("output").display().to_string()),
},
temp_dir,
))
Expand All @@ -47,11 +59,17 @@ where
let (log_tx, log_rx) = mpsc::unbounded_channel::<log::LogEvent>();
let (shutdown_tx, shutdown_rx) = broadcast::channel(1);

let output_file = OpenOptions::new()
.create(true)
.append(true)
.open(run_config.output_path)
.await?;
let output_file = if let Some(output_path) = run_config.output_path {
Some(
OpenOptions::new()
.create(true)
.append(true)
.open(output_path)
.await?,
)
} else {
None
};

task::spawn(log::log_writer(log_rx, output_file));

Expand All @@ -60,7 +78,7 @@ where
// Wait for task or abort
let output = tokio::select! {
result = T::run(task_ctx.clone(), input) => result,
_ = wait_for_abort(&run_config.abort_path) => {
_ = wait_for_abort(run_config.abort_path.clone()) => {
Err(err_code!(ERROR, error = "Task aborted"))
},
};
Expand All @@ -74,9 +92,14 @@ where
const POLL_ABORT_INTERVAL: Duration = Duration::from_millis(250);

/// Waits for file to exist before completing.
async fn wait_for_abort(path: &str) {
async fn wait_for_abort(path: Option<String>) {
let Some(path) = path else {
// Wait forever, since this task is not abortable.
return std::future::pending::<()>().await;
};

// HACK: Use file watcher
let path = Path::new(path);
let path = Path::new(&path);
loop {
// TODO: Do this async
if path.exists() {
Expand Down
Loading