Skip to content

Commit

Permalink
state: Reduce the panics in the parquet flusher, add retries to Stora…
Browse files Browse the repository at this point in the history
…geProvider, send flusher failure. (#445)
  • Loading branch information
Jackson Newhouse authored Dec 8, 2023
1 parent a7b9099 commit 577da1d
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 35 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 25 additions & 7 deletions arroyo-state/src/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1253,8 +1253,23 @@ impl ParquetFlusher {
fn start(mut self) {
tokio::spawn(async move {
loop {
if !self.flush_iteration().await.unwrap() {
return;
match self.flush_iteration().await {
Ok(continue_flushing) => {
if !continue_flushing {
return;
}
}
Err(err) => {
self.control_tx
.send(ControlResp::TaskFailed {
operator_id: self.task_info.operator_id.clone(),
task_index: self.task_info.task_index,
error: err.to_string(),
})
.await
.unwrap();
return;
}
}
}
});
Expand All @@ -1271,8 +1286,8 @@ impl ParquetFlusher {
let cursor = Vec::new();
let mut writer = ArrowWriter::try_new(cursor, record_batch.schema(), Some(props)).unwrap();
writer.write(&record_batch)?;
writer.flush().unwrap();
let parquet_bytes = writer.into_inner().unwrap();
writer.flush()?;
let parquet_bytes = writer.into_inner()?;
let bytes = parquet_bytes.len();
self.storage.put(key, parquet_bytes).await?;
Ok(bytes)
Expand Down Expand Up @@ -1494,10 +1509,13 @@ impl ParquetFlusher {
operator_id: self.task_info.operator_id.clone(),
subtask_metadata,
}))
.await
.unwrap();
.await?;
if cp.then_stop {
self.finish_tx.take().unwrap().send(()).unwrap();
self.finish_tx
.take()
.unwrap()
.send(())
.map_err(|_| anyhow::anyhow!("can't send finish"))?;
return Ok(false);
}
Ok(true)
Expand Down
1 change: 1 addition & 0 deletions arroyo-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ default = []
[dependencies]
arroyo-types = { path = "../arroyo-types" }
bytes = "1.4.0"
tracing = "0.1"
# used only for getting local AWS credentials; can be removed once we have a
# better way to do this
rusoto_core = "0.48.0"
Expand Down
83 changes: 55 additions & 28 deletions arroyo-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,31 @@ fn matchers() -> &'static HashMap<Backend, Vec<Regex>> {
})
}

macro_rules! retry {
($e:expr) => {{
use std::thread::sleep;
use std::time::Duration;
use tracing::error;
let mut retries = 0;
let max_retries: u32 = 10;
let backoff_factor: u64 = 2;
loop {
match $e {
Ok(value) => break Ok(value),
Err(e) if retries < max_retries => {
retries += 1;
error!("Error: {}. Retrying...", e);
// exponential backoff, capped at 10 seconds.
let backoff_time =
Duration::from_millis(10_000.min(100 * backoff_factor.pow(retries)));
sleep(backoff_time);
}
Err(e) => break Err(e),
}
}
}};
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct S3Config {
endpoint: Option<String>,
Expand Down Expand Up @@ -502,11 +527,9 @@ impl StorageProvider {
&self,
path: P,
) -> Result<impl tokio::io::AsyncRead, StorageError> {
let path: String = path.into();
let bytes = self
.object_store
.get(&path.into())
.await
let path: Path = path.into().into();

let bytes = retry!(self.object_store.get(&path).await)
.map_err(|e| Into::<StorageError>::into(e))?
.into_stream();

Expand All @@ -519,9 +542,12 @@ impl StorageProvider {
bytes: Vec<u8>,
) -> Result<String, StorageError> {
let path = path.into().into();
self.object_store
.put(&self.qualify_path(&path), bytes.into())
.await?;
let bytes: Bytes = bytes.into();
retry!(
self.object_store
.put(&self.qualify_path(&path), bytes.clone())
.await
)?;

Ok(format!("{}/{}", self.canonical_url, path))
}
Expand All @@ -546,12 +572,11 @@ impl StorageProvider {
}

pub async fn start_multipart(&self, path: &Path) -> Result<MultipartId, StorageError> {
Ok(self
.object_store
.initiate_multipart_upload(path)
.await
.map_err(|e| Into::<StorageError>::into(e))?
.0)
Ok(
retry!(self.object_store.initiate_multipart_upload(path).await)
.map_err(|e| Into::<StorageError>::into(e))?
.0,
)
}

pub async fn add_multipart(
Expand All @@ -561,13 +586,14 @@ impl StorageProvider {
part_number: usize,
bytes: Bytes,
) -> Result<PartId, StorageError> {
Ok(self
.object_store
.get_put_part(path, multipart_id)
.await?
.put_part(bytes, part_number)
.await
.map_err(|e| Into::<StorageError>::into(e))?)
Ok(retry!(
self.object_store
.get_put_part(path, multipart_id)
.await?
.put_part(bytes.clone(), part_number)
.await
)
.map_err(|e| Into::<StorageError>::into(e))?)
}

pub async fn close_multipart(
Expand All @@ -576,13 +602,14 @@ impl StorageProvider {
multipart_id: &MultipartId,
parts: Vec<PartId>,
) -> Result<(), StorageError> {
Ok(self
.object_store
.get_put_part(path, multipart_id)
.await?
.complete(parts)
.await
.map_err(|e| Into::<StorageError>::into(e))?)
Ok(retry!(
self.object_store
.get_put_part(path, multipart_id)
.await?
.complete(parts.clone())
.await
)
.map_err(|e| Into::<StorageError>::into(e))?)
}

/// Produces a URL representation of this path that can be read by other systems,
Expand Down

0 comments on commit 577da1d

Please sign in to comment.