From c10495791acd906eed1dcb4e6004417de38d1aff Mon Sep 17 00:00:00 2001 From: Eike Kettner Date: Fri, 12 Jul 2024 15:02:43 +0200 Subject: [PATCH] Cloning async and parallel if possible --- flake.nix | 2 +- src/cli.rs | 2 +- src/cli/cmd.rs | 8 +- src/cli/cmd/project.rs | 2 +- src/cli/cmd/project/clone.rs | 142 ++++++++++++++++++++++------------- src/cli/cmd/version.rs | 2 +- src/cli/opts.rs | 2 +- src/httpclient.rs | 75 ++++++++++++------ 8 files changed, 151 insertions(+), 84 deletions(-) diff --git a/flake.nix b/flake.nix index ddccbdc..eca7f26 100644 --- a/flake.nix +++ b/flake.nix @@ -173,7 +173,7 @@ # Additional dev-shell environment variables can be set directly # MY_CUSTOM_DEVELOPMENT_VAR = "something else"; - RENKU_CLI_RENKU_URL = "https://ci-renku-3668.dev.renku.ch"; + RENKU_CLI_RENKU_URL = "https://ci-renku-3689.dev.renku.ch"; # Enable mold https://github.com/rui314/mold CARGO_TARGET_X86_64_UNKNOWN_LINUX_GNU_LINKER = "${pkgs.clang}/bin/clang"; diff --git a/src/cli.rs b/src/cli.rs index 6fa9a4c..cf93086 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -18,7 +18,7 @@ pub async fn execute_cmd(opts: MainOpts) -> Result<(), CmdError> { let mut app = MainOpts::command(); input.print_completions(&mut app).await; } - SubCommand::Project(input) => input.exec(&ctx).await?, + SubCommand::Project(input) => input.exec(ctx).await?, #[cfg(feature = "user-doc")] SubCommand::UserDoc(input) => input.exec(&ctx).await?, diff --git a/src/cli/cmd.rs b/src/cli/cmd.rs index efad8d4..ca8b533 100644 --- a/src/cli/cmd.rs +++ b/src/cli/cmd.rs @@ -12,19 +12,19 @@ use snafu::{ResultExt, Snafu}; const RENKULAB_IO: &str = "https://renkulab.io"; -pub struct Context<'a> { - pub opts: &'a CommonOpts, +pub struct Context { + pub opts: CommonOpts, pub client: Client, pub renku_url: String, } -impl Context<'_> { +impl Context { pub fn new(opts: &CommonOpts) -> Result { let base_url = get_renku_url(opts); let client = Client::new(&base_url, proxy_settings(opts), &None, false) .context(ContextCreateSnafu)?; Ok(Context { - opts, + opts: opts.clone(), client, renku_url: base_url, }) diff --git a/src/cli/cmd/project.rs b/src/cli/cmd/project.rs index 80e9327..74a82e9 100644 --- a/src/cli/cmd/project.rs +++ b/src/cli/cmd/project.rs @@ -18,7 +18,7 @@ pub struct Input { } impl Input { - pub async fn exec<'a>(&self, ctx: &Context<'a>) -> Result<(), Error> { + pub async fn exec(&self, ctx: Context) -> Result<(), Error> { match &self.subcmd { ProjectCommand::Clone(input) => input.exec(ctx).await.context(CloneSnafu), } diff --git a/src/cli/cmd/project/clone.rs b/src/cli/cmd/project/clone.rs index 8d00ece..606e3bc 100644 --- a/src/cli/cmd/project/clone.rs +++ b/src/cli/cmd/project/clone.rs @@ -4,22 +4,29 @@ use super::Context; use crate::cli::sink::Error as SinkError; use crate::httpclient::Error as HttpError; use crate::util::data::{ProjectId, SimpleMessage}; +use std::sync::Arc; use clap::Parser; use git2::{Error as GitError, Repository}; use snafu::{ResultExt, Snafu}; -use std::path::Path; use std::path::PathBuf; +use tokio::task::{JoinError, JoinSet}; -/// Clone a project +/// Clone a project. +/// +/// Clones a renku project by creating a directory with the project +/// slug and cloning each code repository into it. #[derive(Parser, Debug)] pub struct Input { - /// The first argument is the project to clone, identified by - /// either its id or the namespace/slug identifier. The second - /// argument is optional, defining the target directory to create - /// the project in. - #[arg(required = true, num_args = 1..=2)] - pub project_and_target: Vec, + /// The project to clone, identified by either its id or the + /// namespace/slug identifier. + #[arg()] + pub project_ref: String, + + /// Optional target directory to create the project in. By default + /// the current working directory is used. + #[arg()] + pub target_dir: Option, } #[derive(Debug, Snafu)] @@ -43,82 +50,100 @@ pub enum Error { #[snafu(display("Error cloning project: {}", source))] GitClone { source: GitError }, + + #[snafu(display("Error in task: {}", source))] + TaskJoin { source: JoinError }, } impl Input { - pub async fn exec<'a>(&self, ctx: &Context<'a>) -> Result<(), Error> { - let details = match self.project_id()? { + pub async fn exec(&self, ctx: Context) -> Result<(), Error> { + let project_id = self.project_id()?; + let opt_details = match &project_id { ProjectId::NamespaceSlug { namespace, slug } => ctx .client - .get_project_by_slug(&namespace, &slug, ctx.opts.verbose > 1) + .get_project_by_slug(namespace, slug, ctx.opts.verbose > 1) .await .context(HttpClientSnafu)?, ProjectId::Id(id) => ctx .client - .get_project_by_id(&id, ctx.opts.verbose > 1) + .get_project_by_id(id, ctx.opts.verbose > 1) .await .context(HttpClientSnafu)?, }; - let target = self.target_dir()?.join(&details.slug); - ctx.write_err(&SimpleMessage { - message: format!( - "Cloning {} ({}) into {}...", - details.slug, - details.id, - target.display() - ), - }) - .await - .context(WriteResultSnafu)?; - - clone_project(ctx, &details, &target).await?; - ctx.write_result(&details).await.context(WriteResultSnafu)?; + if let Some(details) = opt_details { + let target = self.target_dir()?.join(&details.slug); + ctx.write_err(&SimpleMessage { + message: format!( + "Cloning {} ({}) into {}...", + details.slug, + details.id, + &target.display() + ), + }) + .await + .context(WriteResultSnafu)?; + + let ctx = clone_project(ctx, &details, target).await?; + ctx.write_result(&details).await.context(WriteResultSnafu)?; + } else { + ctx.write_err(&SimpleMessage { + message: format!("Project '{}' doesn't exist.", &project_id), + }) + .await + .context(WriteResultSnafu)?; + } Ok(()) } fn project_id(&self) -> Result { - self.project_and_target - .first() - .unwrap() // clap makes sure there is at least one element (🤞) + self.project_ref .parse::() .context(ProjectIdParseSnafu) } fn target_dir(&self) -> Result { - match self.project_and_target.get(1) { + match &self.target_dir { Some(dir) => Ok(std::path::PathBuf::from(dir)), None => std::env::current_dir().context(CurrentDirSnafu), } } } -//TODO make async async fn clone_project<'a>( - ctx: &Context<'a>, + ctx: Context, project: &ProjectDetails, - target: &Path, -) -> Result<(), Error> { - std::fs::create_dir_all(target).context(CreateDirSnafu)?; - // TODO use JoinSet or something to propagate errors - futures::future::join_all( - project - .repositories - .iter() - .map(|repo| clone_repository(ctx, &repo, target)), - ) - .await; - // for repo in project.repositories.iter() { - // clone_repository(ctx, &repo, target).await?; - // } - Ok(()) + target: PathBuf, +) -> Result { + tokio::fs::create_dir_all(&target) + .await + .context(CreateDirSnafu)?; + + let mut tasks = JoinSet::new(); + let cc = Arc::new(ctx); + let tt = Arc::new(target); + for repo in project.repositories.iter() { + let cc = cc.clone(); + let tt = tt.clone(); + let rr = repo.to_string(); + tasks.spawn(clone_repository(cc, rr, tt)); + } + + while let Some(res) = tasks.join_next().await { + res.context(TaskJoinSnafu)??; + } + Ok(Arc::into_inner(cc).unwrap()) } -async fn clone_repository<'a>(ctx: &Context<'a>, repo_url: &str, dir: &Path) -> Result<(), Error> { +async fn clone_repository( + ctx: Arc, + repo_url: String, + dir: Arc, +) -> Result<(), Error> { let name = match repo_url.rsplit_once('/') { Some((_, n)) => n, None => "no-name", }; - let local_path = dir.join(&name); + let local_path = dir.join(name); if local_path.exists() { ctx.write_err(&SimpleMessage { message: format!("The repository {} already exists", name), @@ -126,9 +151,22 @@ async fn clone_repository<'a>(ctx: &Context<'a>, repo_url: &str, dir: &Path) -> .await .context(WriteResultSnafu)?; } else { - // TODO use tokio::task::spawn_blocking! - // TODO use the builder to access more options - Repository::clone(repo_url, &local_path).context(GitCloneSnafu)?; + // TODO use the repository builder to access more options, + // show clone progress and provide credentials + let (repo, repo_url, local_path) = tokio::task::spawn_blocking(|| { + let r = Repository::clone(&repo_url, &local_path).context(GitCloneSnafu); + (r, repo_url, local_path) + }) + .await + .context(TaskJoinSnafu)?; + let git_repo = repo?; + if ctx.opts.verbose > 1 { + let head = git_repo + .head() + .ok() + .and_then(|r| r.name().map(str::to_string)); + log::debug!("Checked out ref {:?} for repo {}", head, repo_url); + } ctx.write_err(&SimpleMessage { message: format!("Cloned: {} to {}", repo_url, local_path.display()), diff --git a/src/cli/cmd/version.rs b/src/cli/cmd/version.rs index 26fbac0..234b7de 100644 --- a/src/cli/cmd/version.rs +++ b/src/cli/cmd/version.rs @@ -31,7 +31,7 @@ pub enum Error { } impl Input { - pub async fn exec<'a>(&self, ctx: &Context<'a>) -> Result<(), Error> { + pub async fn exec(&self, ctx: &Context) -> Result<(), Error> { if self.client_only { let vinfo = BuildInfo::default(); ctx.write_result(&vinfo).await.context(WriteResultSnafu)?; diff --git a/src/cli/opts.rs b/src/cli/opts.rs index 5f888de..36f06c9 100644 --- a/src/cli/opts.rs +++ b/src/cli/opts.rs @@ -5,7 +5,7 @@ use std::str::FromStr; /// Main options are available to all commands. They must appear /// before a sub-command. -#[derive(Parser, Debug)] +#[derive(Parser, Debug, Clone)] #[command()] pub struct CommonOpts { /// Be more verbose when logging. Verbosity increases with each diff --git a/src/httpclient.rs b/src/httpclient.rs index e3048be..794162b 100644 --- a/src/httpclient.rs +++ b/src/httpclient.rs @@ -111,27 +111,52 @@ impl Client { /// expected structure. async fn json_get(&self, path: &str, debug: bool) -> Result { let url = &format!("{}{}", self.base_url, path); + let resp = self + .client + .get(url) + .send() + .await + .context(HttpSnafu { url })?; if debug { - let resp = self - .client - .get(url) - .send() - .await - .context(HttpSnafu { url })? - .text() - .await - .context(DeserializeRespSnafu)?; - log::debug!("GET {} -> {}", url, resp); - serde_json::from_str::(&resp).context(DeserializeJsonSnafu) + let body = resp.text().await.context(DeserializeRespSnafu)?; + log::debug!("GET {} -> {}", url, body); + serde_json::from_str::(&body).context(DeserializeJsonSnafu) } else { - self.client - .get(url) - .send() - .await - .context(HttpSnafu { url })? - .json::() - .await - .context(DeserializeRespSnafu) + resp.json::().await.context(DeserializeRespSnafu) + } + } + + /// Runs a GET request to the given url. When `debug` is true, the + /// response is first decoded into utf8 chars and logged at debug + /// level. Otherwise bytes are directly decoded from JSON into the + /// expected structure. + async fn json_get_option( + &self, + path: &str, + debug: bool, + ) -> Result, Error> { + let url = &format!("{}{}", self.base_url, path); + let resp = self + .client + .get(url) + .send() + .await + .context(HttpSnafu { url })?; + + if debug { + if resp.status() == reqwest::StatusCode::NOT_FOUND { + Ok(None) + } else { + let body = &resp.text().await.context(DeserializeRespSnafu)?; + log::debug!("GET {} -> {}", url, body); + let r = serde_json::from_str::(body).context(DeserializeJsonSnafu)?; + Ok(Some(r)) + } + } else if resp.status() == reqwest::StatusCode::NOT_FOUND { + Ok(None) + } else { + let r = resp.json::().await.context(DeserializeRespSnafu)?; + Ok(Some(r)) } } @@ -152,18 +177,22 @@ impl Client { namespace: &str, slug: &str, debug: bool, - ) -> Result { + ) -> Result, Error> { log::debug!("Get project by namespace/slug: {}/{}", namespace, slug); let path = format!("/api/data/projects/{}/{}", namespace, slug); - let details = self.json_get::(&path, debug).await?; + let details = self.json_get_option::(&path, debug).await?; Ok(details) } /// Get project details by project id. - pub async fn get_project_by_id(&self, id: &str, debug: bool) -> Result { + pub async fn get_project_by_id( + &self, + id: &str, + debug: bool, + ) -> Result, Error> { log::debug!("Get project by id: {}", id); let path = format!("/api/data/projects/{}", id); - let details = self.json_get::(&path, debug).await?; + let details = self.json_get_option::(&path, debug).await?; Ok(details) } }