Skip to content

Commit

Permalink
Add a parameter to the sync to specify the download/upload page
Browse files Browse the repository at this point in the history
  • Loading branch information
m42e committed Oct 4, 2024
1 parent 80e950c commit 2ff4e53
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 10 deletions.
15 changes: 14 additions & 1 deletion crates/atuin-client/src/api_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ static APP_USER_AGENT: &str = concat!("atuin/", env!("CARGO_PKG_VERSION"),);
pub struct Client<'a> {
sync_addr: &'a str,
client: reqwest::Client,
headers: HeaderMap,
}

pub async fn register(
Expand Down Expand Up @@ -183,10 +184,11 @@ impl<'a> Client<'a> {
sync_addr,
client: reqwest::Client::builder()
.user_agent(APP_USER_AGENT)
.default_headers(headers)
.default_headers(headers.clone())
.connect_timeout(Duration::new(connect_timeout, 0))
.timeout(Duration::new(timeout, 0))
.build()?,
headers
})
}

Expand Down Expand Up @@ -265,6 +267,13 @@ impl<'a> Client<'a> {
let url = format!("{}/history", self.sync_addr);
let url = Url::parse(url.as_str())?;

for (name, value) in self.headers.iter() {
debug!("{}: {}", name.as_str(), value.to_str().unwrap());
}
for (entry) in history {
debug!("entry: {}", entry.data);
}

let resp = self.client.post(url).json(history).send().await?;
handle_resp_error(resp).await?;

Expand Down Expand Up @@ -306,6 +315,10 @@ impl<'a> Client<'a> {

debug!("uploading {} records to {url}", records.len());

for (name, value) in self.headers.iter() {
debug!("{}: {}", name.as_str(), value.to_str().unwrap());
}

let resp = self.client.post(url).json(records).send().await?;
handle_resp_error(resp).await?;

Expand Down
15 changes: 8 additions & 7 deletions crates/atuin-client/src/record/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,10 @@ async fn sync_upload(
tag: String,
local: RecordIdx,
remote: Option<RecordIdx>,
page_size: u64,
) -> Result<i64, SyncError> {
let remote = remote.unwrap_or(0);
let expected = local - remote;
let upload_page_size = 100;
let mut progress = 0;

let pb = ProgressBar::new(expected);
Expand All @@ -185,7 +185,7 @@ async fn sync_upload(
// preload with the first entry if remote does not know of this store
loop {
let page = store
.next(host, tag.as_str(), remote + progress, upload_page_size)
.next(host, tag.as_str(), remote + progress, page_size)
.await
.map_err(|e| {
error!("failed to read upload page: {e:?}");
Expand Down Expand Up @@ -219,10 +219,10 @@ async fn sync_download(
tag: String,
local: Option<RecordIdx>,
remote: RecordIdx,
page_size: u64,
) -> Result<Vec<RecordId>, SyncError> {
let local = local.unwrap_or(0);
let expected = remote - local;
let download_page_size = 100;
let mut progress = 0;
let mut ret = Vec::new();

Expand All @@ -242,7 +242,7 @@ async fn sync_download(
// preload with the first entry if remote does not know of this store
loop {
let page = client
.next_records(host, tag.clone(), local + progress, download_page_size)
.next_records(host, tag.clone(), local + progress, page_size)
.await
.map_err(|e| SyncError::RemoteRequestError { msg: e.to_string() })?;

Expand Down Expand Up @@ -270,6 +270,7 @@ pub async fn sync_remote(
operations: Vec<Operation>,
local_store: &impl Store,
settings: &Settings,
page_size: u64,
) -> Result<(i64, Vec<RecordId>), SyncError> {
let client = Client::new(
&settings.sync_address,
Expand All @@ -293,15 +294,15 @@ pub async fn sync_remote(
tag,
local,
remote,
} => uploaded += sync_upload(local_store, &client, host, tag, local, remote).await?,
} => uploaded += sync_upload(local_store, &client, host, tag, local, remote, page_size).await?,

Operation::Download {
host,
tag,
local,
remote,
} => {
let mut d = sync_download(local_store, &client, host, tag, local, remote).await?;
let mut d = sync_download(local_store, &client, host, tag, local, remote, page_size).await?;
downloaded.append(&mut d)
}

Expand All @@ -318,7 +319,7 @@ pub async fn sync(
) -> Result<(i64, Vec<RecordId>), SyncError> {
let (diff, _) = diff(settings, store).await?;
let operations = operations(diff, store).await?;
let (uploaded, downloaded) = sync_remote(operations, store, settings).await?;
let (uploaded, downloaded) = sync_remote(operations, store, settings, 100).await?;

Ok((uploaded, downloaded))
}
Expand Down
7 changes: 6 additions & 1 deletion crates/atuin/src/command/client/store/pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ pub struct Pull {
/// This will first wipe the local store, and then download all records from the remote
#[arg(long, default_value = "false")]
pub force: bool,

/// Page Size
/// How many records to upload at once. Defaults to 100
#[arg(long, default_value = "100")]
pub page: u64,
}

impl Pull {
Expand Down Expand Up @@ -67,7 +72,7 @@ impl Pull {
})
.collect();

let (_, downloaded) = sync::sync_remote(operations, &store, settings).await?;
let (_, downloaded) = sync::sync_remote(operations, &store, settings, self.page).await?;

println!("Downloaded {} records", downloaded.len());

Expand Down
7 changes: 6 additions & 1 deletion crates/atuin/src/command/client/store/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ pub struct Push {
/// local store
#[arg(long, default_value = "false")]
pub force: bool,

/// Page Size
/// How many records to upload at once. Defaults to 100
#[arg(long, default_value = "100")]
pub page: u64,
}

impl Push {
Expand Down Expand Up @@ -87,7 +92,7 @@ impl Push {
})
.collect();

let (uploaded, _) = sync::sync_remote(operations, &store, settings).await?;
let (uploaded, _) = sync::sync_remote(operations, &store, settings, self.page).await?;

println!("Uploaded {uploaded} records");

Expand Down

0 comments on commit 2ff4e53

Please sign in to comment.