Skip to content

Commit

Permalink
index: Add support to stdin stream indexing
Browse files Browse the repository at this point in the history
  • Loading branch information
tontinton committed Jun 16, 2024
1 parent 2c5feb1 commit 8fd5d59
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 34 deletions.
45 changes: 17 additions & 28 deletions src/commands/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async fn pipe_source_to_index(
args: &IndexArgs,
config: &IndexConfig,
pool: &PgPool,
) -> Result<()> {
) -> Result<bool> {
let id = uuid::Uuid::now_v7().hyphenated().to_string();
let index_dir = Path::new(&args.build_dir).join(&id);
let _ = create_dir_all(&index_dir).await;
Expand All @@ -45,10 +45,13 @@ async fn pipe_source_to_index(
let commit_timeout = sleep(args.commit_interval);
tokio::pin!(commit_timeout);

let mut timeout = false;

'reader_loop: loop {
let mut json_obj = if args.stream {
select! {
_ = &mut commit_timeout => {
timeout = true;
break;
}
maybe_json_obj = source.get_one() => {
Expand Down Expand Up @@ -90,7 +93,7 @@ async fn pipe_source_to_index(

if added == 0 {
debug!("Not writing index: no documents added");
return Ok(());
return Ok(timeout);
}

info!("Commiting {added} documents");
Expand All @@ -106,7 +109,7 @@ async fn pipe_source_to_index(

write_unified_index(&id, &index, &index_dir, &config.name, &config.path, pool).await?;

Ok(())
Ok(timeout)
}

pub async fn run_index(args: IndexArgs, pool: &PgPool) -> Result<()> {
Expand All @@ -120,31 +123,17 @@ pub async fn run_index(args: IndexArgs, pool: &PgPool) -> Result<()> {

let mut source = connect_to_source(args.input.as_deref(), args.stream).await?;

if args.stream {
loop {
pipe_source_to_index(
&mut source,
schema.clone(),
&field_parsers,
dynamic_field,
&args,
&config,
pool,
)
.await?;
}
} else {
pipe_source_to_index(
&mut source,
schema,
&field_parsers,
dynamic_field,
&args,
&config,
pool,
)
.await?;
}
while pipe_source_to_index(
&mut source,
schema.clone(),
&field_parsers,
dynamic_field,
&args,
&config,
pool,
)
.await?
{}

Ok(())
}
7 changes: 1 addition & 6 deletions src/commands/sources/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,6 @@ pub async fn connect_to_source(input: Option<&str>, stream: bool) -> Result<Box<
}
Box::new(BufSource::from_path(path).await?)
}
None => {
if stream {
bail!("Streaming from stdin is not supported.");
}
Box::new(BufSource::from_stdin())
}
None => Box::new(BufSource::from_stdin()),
})
}

0 comments on commit 8fd5d59

Please sign in to comment.