diff --git a/src/commands/index.rs b/src/commands/index.rs index 4a374c0..5ea6553 100644 --- a/src/commands/index.rs +++ b/src/commands/index.rs @@ -32,7 +32,7 @@ async fn pipe_source_to_index( args: &IndexArgs, config: &IndexConfig, pool: &PgPool, -) -> Result<()> { +) -> Result { let id = uuid::Uuid::now_v7().hyphenated().to_string(); let index_dir = Path::new(&args.build_dir).join(&id); let _ = create_dir_all(&index_dir).await; @@ -45,10 +45,13 @@ async fn pipe_source_to_index( let commit_timeout = sleep(args.commit_interval); tokio::pin!(commit_timeout); + let mut timeout = false; + 'reader_loop: loop { let mut json_obj = if args.stream { select! { _ = &mut commit_timeout => { + timeout = true; break; } maybe_json_obj = source.get_one() => { @@ -90,7 +93,7 @@ async fn pipe_source_to_index( if added == 0 { debug!("Not writing index: no documents added"); - return Ok(()); + return Ok(timeout); } info!("Commiting {added} documents"); @@ -106,7 +109,7 @@ async fn pipe_source_to_index( write_unified_index(&id, &index, &index_dir, &config.name, &config.path, pool).await?; - Ok(()) + Ok(timeout) } pub async fn run_index(args: IndexArgs, pool: &PgPool) -> Result<()> { @@ -120,31 +123,17 @@ pub async fn run_index(args: IndexArgs, pool: &PgPool) -> Result<()> { let mut source = connect_to_source(args.input.as_deref(), args.stream).await?; - if args.stream { - loop { - pipe_source_to_index( - &mut source, - schema.clone(), - &field_parsers, - dynamic_field, - &args, - &config, - pool, - ) - .await?; - } - } else { - pipe_source_to_index( - &mut source, - schema, - &field_parsers, - dynamic_field, - &args, - &config, - pool, - ) - .await?; - } + while pipe_source_to_index( + &mut source, + schema.clone(), + &field_parsers, + dynamic_field, + &args, + &config, + pool, + ) + .await? + {} Ok(()) } diff --git a/src/commands/sources/mod.rs b/src/commands/sources/mod.rs index f654b40..434ffca 100644 --- a/src/commands/sources/mod.rs +++ b/src/commands/sources/mod.rs @@ -25,11 +25,6 @@ pub async fn connect_to_source(input: Option<&str>, stream: bool) -> Result { - if stream { - bail!("Streaming from stdin is not supported."); - } - Box::new(BufSource::from_stdin()) - } + None => Box::new(BufSource::from_stdin()), }) }