diff --git a/src/commands/index.rs b/src/commands/index.rs index 8c162a2..9c3b2f4 100644 --- a/src/commands/index.rs +++ b/src/commands/index.rs @@ -1,4 +1,4 @@ -use std::{path::Path, pin::Pin}; +use std::{path::Path, pin::Pin, sync::Arc}; use color_eyre::Result; use futures::{future::pending, Future}; @@ -12,6 +12,7 @@ use tantivy::{ use tokio::{ fs::{create_dir_all, remove_dir_all}, select, spawn, + sync::Mutex, task::spawn_blocking, time::sleep, }; @@ -52,6 +53,7 @@ pub struct IndexRunner { args: IndexArgs, config: IndexConfig, pool: PgPool, + commit_lock: Arc>, } impl IndexRunner { @@ -82,6 +84,7 @@ impl IndexRunner { args, config, pool, + commit_lock: Arc::new(Mutex::new(())), }) } @@ -170,13 +173,16 @@ impl IndexRunner { let commiter = self.index_commiter().await; if self.args.stream && result != BatchResult::Eof { - // Commit in the background to not interfere with next batch in stream. + // Commit in the background to not block and continue indexing next batch in stream. + + let lock = self.commit_lock.clone().lock_owned().await; spawn(async move { if let Err(e) = Self::commit_index(commiter, &id, &index, index_writer, &index_dir).await { error!("Failed to commit index of id '{}': {e}", &id); } + drop(lock); }); } else { Self::commit_index(commiter, &id, &index, index_writer, &index_dir).await?;