Skip to content

Commit

Permalink
index: kafka: stream: Ensure only one background commit
Browse files Browse the repository at this point in the history
  • Loading branch information
tontinton committed Jun 27, 2024
1 parent 7f8f001 commit e66e958
Showing 1 changed file with 8 additions and 2 deletions.
10 changes: 8 additions & 2 deletions src/commands/index.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{path::Path, pin::Pin};
use std::{path::Path, pin::Pin, sync::Arc};

use color_eyre::Result;
use futures::{future::pending, Future};
Expand All @@ -12,6 +12,7 @@ use tantivy::{
use tokio::{
fs::{create_dir_all, remove_dir_all},
select, spawn,
sync::Mutex,
task::spawn_blocking,
time::sleep,
};
Expand Down Expand Up @@ -52,6 +53,7 @@ pub struct IndexRunner {
args: IndexArgs,
config: IndexConfig,
pool: PgPool,
commit_lock: Arc<Mutex<()>>,
}

impl IndexRunner {
Expand Down Expand Up @@ -82,6 +84,7 @@ impl IndexRunner {
args,
config,
pool,
commit_lock: Arc::new(Mutex::new(())),
})
}

Expand Down Expand Up @@ -170,13 +173,16 @@ impl IndexRunner {

let commiter = self.index_commiter().await;
if self.args.stream && result != BatchResult::Eof {
// Commit in the background to not interfere with next batch in stream.
// Commit in the background to not block and continue indexing next batch in stream.

let lock = self.commit_lock.clone().lock_owned().await;
spawn(async move {
if let Err(e) =
Self::commit_index(commiter, &id, &index, index_writer, &index_dir).await
{
error!("Failed to commit index of id '{}': {e}", &id);
}
drop(lock);
});
} else {
Self::commit_index(commiter, &id, &index, index_writer, &index_dir).await?;
Expand Down

0 comments on commit e66e958

Please sign in to comment.