Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add S3Stage downloader #13784

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open

feat: add S3Stage downloader #13784

wants to merge 4 commits into from

Conversation

joshieDo
Copy link
Collaborator

@joshieDo joshieDo commented Jan 13, 2025

  • adds S3Stage skeleton logic. It's incomplete and not enabled. TODOs can be disregarded for now.
  • adds S3Stage downloader. Downloads a file in parallel allowing resumes after shutdowns.
/// Downloads file from url to data file path.
///
/// If a `file_hash` is passed, it will verify it at the end.
///
/// ## Details
///
/// 1) A [`Metadata`] file is created or opened in `{target_dir}/download/{filename}.metadata`. It
///    tracks the download progress including total file size, downloaded bytes, chunk sizes, and
///    ranges that still need downloading. Allows for resumability.
/// 2) The target file is preallocated with the total size of the file in
///    `{target_dir}/download/{filename}`.
/// 3) Multiple `workers` are spawned for downloading of specific chunks of the file.
/// 4) `Orchestrator` manages workers, distributes chunk ranges, and ensures the download progresses
///    efficiently by dynamically assigning tasks to workers as they become available.
/// 5) Once the file is downloaded:
///     * If `file_hash` is `Some`, verifies its blake3 hash.
///     * Deletes the metadata file
///     * Moves downloaded file to target directory.

@joshieDo joshieDo added C-enhancement New feature or request A-staged-sync Related to staged sync (pipelines and stages) A-downloaders Related to headers/bodies downloaders labels Jan 13, 2025
@joshieDo joshieDo requested a review from mattsse January 13, 2025 11:44
@joshieDo joshieDo requested a review from Rjected January 13, 2025 11:45
@joshieDo joshieDo force-pushed the joshie/s3stage branch 8 times, most recently from 79066e8 to 43e6188 Compare January 13, 2025 13:28
Copy link
Collaborator

@mattsse mattsse left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool,

I mostly have some questions re async or rather how we integrate the downloader in the s3stage

}
}

while !metadata.is_done() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should probably emit some traces?

crates/stages/stages/src/stages/s3/downloader/fetch.rs Outdated Show resolved Hide resolved
crates/stages/stages/src/stages/s3/downloader/meta.rs Outdated Show resolved Hide resolved
crates/stages/stages/src/stages/s3/downloader/meta.rs Outdated Show resolved Hide resolved
crates/stages/stages/src/stages/s3/downloader/worker.rs Outdated Show resolved Hide resolved
crates/stages/stages/src/stages/s3/mod.rs Outdated Show resolved Hide resolved
crates/stages/stages/src/stages/s3/mod.rs Outdated Show resolved Hide resolved
@joshieDo joshieDo force-pushed the joshie/s3stage branch 2 times, most recently from 8d67276 to 7efb229 Compare January 14, 2025 16:53
Copy link
Collaborator

@mattsse mattsse left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have a few more questions about workers <> orchestrators

Comment on lines +135 to +141
let client = Client::new();
let resp = client.head(url).send().await?;
let total_length: usize = resp
.headers()
.get(CONTENT_LENGTH)
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse().ok())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need any additional header setup?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, that's it. just need to know the total size of the file

crates/stages/stages/src/stages/s3/downloader/meta.rs Outdated Show resolved Hide resolved
crates/stages/stages/src/stages/s3/downloader/meta.rs Outdated Show resolved Hide resolved
crates/stages/stages/src/stages/s3/mod.rs Outdated Show resolved Hide resolved
Comment on lines 81 to 82
// Spawns the downloader task
self.spawn_fetch(input);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will always be called when we call the poll function? this seems wrong, because we don't track whether we've downloaded everything

Copy link
Collaborator Author

@joshieDo joshieDo Jan 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, just when self.fetch_rx is None, meaning that there's no task in the background running. I should make it more explicit.

now that i look at it, there's an issue inside spawn_fetch that will always cause it to spawn, when it should only when theres files to be downloaded (maybe that's what you mean?)

Comment on lines +82 to +83
// Distribute chunk ranges to workers when they free up
while let Some(worker_msg) = orchestrator_rx.recv().await {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this more performant than using a single downloader? I'd assume downloading is bound by bandwidth so unclear if using multiple downloaders improves this?

Copy link
Collaborator Author

@joshieDo joshieDo Jan 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although in theory it's bound by bandwidth, many times servers will throttle single connections. Using multiple connections helps

Comment on lines +38 to +39
// Create channels for communication between workers and orchestrator
let (orchestrator_tx, orchestrator_rx) = unbounded_channel();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a bit confusing to me,
why do we need multiple workers
doesn't this download just one file

@joshieDo joshieDo force-pushed the joshie/s3stage branch 3 times, most recently from 44cc6e6 to c910229 Compare January 15, 2025 23:56
@joshieDo joshieDo force-pushed the joshie/s3stage branch 2 times, most recently from 6c631be to 6ae894e Compare January 16, 2025 11:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-downloaders Related to headers/bodies downloaders A-staged-sync Related to staged sync (pipelines and stages) C-enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants