From c82a548f9befc88aa706fe7fd8717c861cc35688 Mon Sep 17 00:00:00 2001 From: Tony Solomonik Date: Tue, 14 May 2024 18:46:06 +0300 Subject: [PATCH] Move to async via `tokio` --- Cargo.lock | 27 ++++- Cargo.toml | 5 +- src/args.rs | 3 - src/main.rs | 229 +++++++++++++++++++++--------------- src/opendal_file_handle.rs | 57 ++++----- src/unified_index/writer.rs | 70 ++++++----- 6 files changed, 225 insertions(+), 166 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a187ebd..043bd1c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -120,6 +120,15 @@ version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" +[[package]] +name = "async-tempfile" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c17d4a3286e6518d84be399efb8901aa74a183753b0bf4a8a4afe5bcfedcbb08" +dependencies = [ + "tokio", +] + [[package]] name = "async-trait" version = "0.1.80" @@ -511,6 +520,7 @@ checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" dependencies = [ "futures-channel", "futures-core", + "futures-executor", "futures-io", "futures-sink", "futures-task", @@ -533,6 +543,17 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +[[package]] +name = "futures-executor" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.30" @@ -1962,6 +1983,7 @@ checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1" dependencies = [ "bytes", "futures-core", + "futures-io", "futures-sink", "pin-project-lite", "tokio", @@ -1971,10 +1993,11 @@ dependencies = [ name = "toshokan" version = "0.1.0" dependencies = [ + "async-tempfile", "bincode", "clap", "color-eyre", - "itertools", + "futures", "log", "once_cell", "opendal", @@ -1982,8 +2005,8 @@ dependencies = [ "serde", "serde_json", "tantivy", - "tempfile", "tokio", + "tokio-util", "uuid", ] diff --git a/Cargo.toml b/Cargo.toml index b466a3f..0297f1f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,7 @@ lto = "thin" bincode = "1.3.3" clap = { version = "4.5.4", features = ["derive"] } color-eyre = { version = "0.6.3", default-features = false } -itertools = "0.12.1" +futures = "0.3.30" log = "0.4.21" once_cell = "1.19.0" opendal = { version = "0.46.0", features = ["layers-blocking", "services-fs"] } @@ -29,7 +29,8 @@ serde = { version = "1.0.201", features = ["derive"] } serde_json = "1.0.117" tantivy = "0.22.0" tokio = { version = "1.37.0", features = ["full"] } +tokio-util = { version = "0.7.11", features = ["compat"] } uuid = { version = "1.8.0", features = ["v7"] } [dev-dependencies] -tempfile = "3.10.1" +async-tempfile = "0.5.0" diff --git a/src/args.rs b/src/args.rs index 35d3652..7ef30f7 100644 --- a/src/args.rs +++ b/src/args.rs @@ -39,9 +39,6 @@ The memory is split evenly between all indexing threads, once a thread reaches i default_value = "1073741824" )] pub memory_budget: usize, - - #[clap(short, long, help = "Merge all created segments into one segment.")] - pub merge: bool, } #[derive(Parser, Debug, Clone)] diff --git a/src/main.rs b/src/main.rs index c08811a..68628f5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,31 +3,32 @@ mod bincode; mod opendal_file_handle; mod unified_index; -use std::{ - fs::{create_dir, read_to_string, write, File}, - io::{BufRead, BufReader}, - path::Path, - sync::Arc, -}; +use std::{collections::HashSet, path::Path, sync::Arc, time::Duration}; use args::{IndexArgs, SearchArgs}; use color_eyre::eyre::Result; -use itertools::Itertools; -use once_cell::sync::Lazy; -use opendal::{ - layers::{BlockingLayer, LoggingLayer}, - BlockingOperator, Operator, -}; +use futures::future::try_join_all; +use opendal::{layers::LoggingLayer, Operator}; use pretty_env_logger::formatted_timed_builder; use tantivy::{ collector::TopDocs, directory::{FileSlice, MmapDirectory}, + indexer::NoMergePolicy, query::QueryParser, schema::{ DateOptions, DateTimePrecision, JsonObjectOptions, Schema, FAST, INDEXED, STORED, STRING, }, DateTime, Document, Index, IndexWriter, ReloadPolicy, TantivyDocument, }; +use tokio::{ + fs::{create_dir, read_dir, read_to_string, write, File}, + io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, + runtime::Builder, + spawn, + sync::mpsc, + task::spawn_blocking, +}; +use tokio_util::compat::FuturesAsyncWriteCompatExt; use unified_index::directory::UnifiedDirectory; use crate::{ @@ -43,14 +44,7 @@ extern crate log; const DEFAULT_DEBUG_LOG_LEVEL: &str = "toshokan=trace"; const DEFAULT_RELEASE_LOG_LEVEL: &str = "toshokan=info"; -static RUNTIME: Lazy = Lazy::new(|| { - tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build() - .unwrap() -}); - -fn index(args: IndexArgs) -> Result<()> { +async fn index(args: IndexArgs) -> Result<()> { let mut schema_builder = Schema::builder(); let dynamic_field = schema_builder.add_json_field( "_dynamic", @@ -63,18 +57,19 @@ fn index(args: IndexArgs) -> Result<()> { let schema = schema_builder.build(); - let _ = create_dir(&args.build_dir); + let _ = create_dir(&args.build_dir).await; let index = Index::open_or_create(MmapDirectory::open(&args.build_dir)?, schema.clone())?; let mut index_writer: IndexWriter = index.writer(args.memory_budget)?; + index_writer.set_merge_policy(Box::new(NoMergePolicy)); - let mut reader = BufReader::new(File::open(&args.input_path)?); + let mut reader = BufReader::new(File::open(&args.input_path).await?); let mut line = String::new(); let mut i = 0; let mut added = 0; loop { - let len = reader.read_line(&mut line)?; + let len = reader.read_line(&mut line).await?; if len == 0 { break; } @@ -99,50 +94,46 @@ fn index(args: IndexArgs) -> Result<()> { } info!("Commiting {added} documents, after processing {i}"); - index_writer.commit()?; + index_writer.prepare_commit()?.commit_future().await?; - if args.merge { - let segment_ids = index.searchable_segment_ids()?; - info!("Merging {} segments", segment_ids.len()); + let segment_ids = index.searchable_segment_ids()?; + info!("Merging {} segments", segment_ids.len()); + index_writer.merge(&segment_ids).await?; - // Returns Err when a merge is already in progress. - let _ = index_writer.merge(&segment_ids).wait(); - } - - info!("Joining merging threads"); - index_writer.wait_merging_threads()?; + spawn_blocking(move || index_writer.wait_merging_threads()).await??; let unified_index_writer = UnifiedIndexWriter::from_file_paths( Path::new(&args.build_dir), index.directory().list_managed_files(), - )?; + ) + .await?; let mut builder = opendal::services::Fs::default(); builder.root(&args.index_dir); - let _guard = RUNTIME.enter(); - let op: BlockingOperator = Operator::new(builder)? - .layer(BlockingLayer::create()?) + let op = Operator::new(builder)? .layer(LoggingLayer::default()) - .finish() - .blocking(); + .finish(); let id = uuid::Uuid::now_v7(); let mut writer = op .writer_with(&format!("{}.index", id)) .content_type("application/octet-stream") - .buffer(5_000_000) - .call()? - .into_std_write(); + .chunk(5_000_000) + .await? + .into_futures_async_write() + // Turn a futures::AsyncWrite into something that implements tokio::io::AsyncWrite. + .compat_write(); info!("Writing unified index file"); - let (total_len, footer_len) = unified_index_writer.write(&mut writer)?; - writer.close()?; + let (total_len, footer_len) = unified_index_writer.write(&mut writer).await?; + writer.shutdown().await?; write( Path::new(&args.index_dir).join(format!("{}.footer", id)), footer_len.to_string(), - )?; + ) + .await?; debug!( "Index file length: {}. Footer length: {}", @@ -152,80 +143,117 @@ fn index(args: IndexArgs) -> Result<()> { Ok(()) } -fn search(args: SearchArgs) -> Result<()> { +async fn search(args: SearchArgs) -> Result<()> { if args.limit == 0 { return Ok(()); } - let index_ids = std::fs::read_dir(&args.index_dir)? - .filter_map(std::result::Result::ok) - .filter_map(|entry| { - entry.file_name().to_str().map(|filename| { + let mut index_ids = HashSet::new(); + let mut dir_reader = read_dir(&args.index_dir).await?; + while let Some(entry) = dir_reader.next_entry().await? { + if let Some(filename) = entry.file_name().to_str() { + index_ids.insert( filename .chars() .take_while(|&c| c != '.') - .collect::() - }) - }) - .unique() - .collect::>(); + .collect::(), + ); + } + } let mut builder = opendal::services::Fs::default(); builder.root(&args.index_dir); - let _guard = RUNTIME.enter(); - let op: BlockingOperator = Operator::new(builder)? - .layer(BlockingLayer::create()?) + let op = Operator::new(builder)? .layer(LoggingLayer::default()) - .finish() - .blocking(); + .finish(); - let mut printed = 0; - 'ids_loop: for id in index_ids { + let mut indexes_args = Vec::with_capacity(index_ids.len()); + for id in index_ids { let index_filename = format!("{}.index", id); - let reader = op.reader_with(&index_filename).call()?; - let file_slice = FileSlice::new(Arc::new(OpenDalFileHandle::from_path( - &Path::new(&args.index_dir).join(&index_filename), - reader, - )?)); + let reader = op.reader_with(&index_filename).await?; + let file_slice = FileSlice::new(Arc::new( + OpenDalFileHandle::from_path(&Path::new(&args.index_dir).join(&index_filename), reader) + .await?, + )); let footer_len = - read_to_string(&Path::new(&args.index_dir).join(&format!("{}.footer", id)))? + read_to_string(&Path::new(&args.index_dir).join(&format!("{}.footer", id))) + .await? .parse::()?; - let index = Index::open(UnifiedDirectory::open_with_len( - file_slice, - footer_len as usize, - )?)?; - let schema = index.schema(); - - let dynamic_field = schema.get_field("_dynamic")?; - let timestamp_field = schema.get_field("timestamp")?; - - let reader = index - .reader_builder() - .reload_policy(ReloadPolicy::Manual) - .try_into()?; - let searcher = reader.searcher(); - - let query_parser = QueryParser::for_index(&index, vec![dynamic_field, timestamp_field]); - let query = query_parser.parse_query(&args.query)?; - let docs = searcher.search(&query, &TopDocs::with_limit(args.limit - printed))?; - - for (_, doc_address) in docs { - let doc: TantivyDocument = searcher.doc(doc_address)?; - println!("{}", doc.to_json(&schema)); - printed += 1; - if printed >= args.limit { - break 'ids_loop; + indexes_args.push((file_slice, footer_len)); + } + + let (tx, mut rx) = mpsc::channel(args.limit); + let mut tx_handles = Vec::with_capacity(indexes_args.len()); + + // Should be chunked to never starve the thread pool (default in tokio is 500 threads). + for (file_slice, footer_len) in indexes_args { + let tx = tx.clone(); + let query = args.query.clone(); + + // Should use rayon if search ends up being cpu bound (it seems io bound). + tx_handles.push(spawn_blocking(move || -> Result<()> { + if tx.is_closed() { + return Ok(()); } - } + + let index = Index::open(UnifiedDirectory::open_with_len( + file_slice, + footer_len as usize, + )?)?; + + let schema = index.schema(); + + let dynamic_field = schema.get_field("_dynamic")?; + let timestamp_field = schema.get_field("timestamp")?; + + let reader = index + .reader_builder() + .reload_policy(ReloadPolicy::Manual) + .try_into()?; + let searcher = reader.searcher(); + + let query_parser = QueryParser::for_index(&index, vec![dynamic_field, timestamp_field]); + let query = query_parser.parse_query(&query)?; + let docs = searcher.search(&query, &TopDocs::with_limit(args.limit))?; + + if tx.is_closed() { + return Ok(()); + } + + for (_, doc_address) in docs { + let doc: TantivyDocument = searcher.doc(doc_address)?; + if tx.blocking_send(doc.to_json(&schema)).is_err() { + return Ok(()); + } + } + + Ok(()) + })); } + let rx_handle = spawn(async move { + let mut i = 0; + while let Some(doc) = rx.recv().await { + println!("{}", doc); + i += 1; + if i == args.limit { + break; + } + } + rx.close(); + Ok::<(), color_eyre::eyre::Error>(()) + }); + + try_join_all(tx_handles).await?; + rx_handle.await??; + Ok(()) } -fn main() -> Result<()> { +async fn async_main() -> Result<()> { color_eyre::install()?; let default_log_level = if cfg!(debug_assertions) { @@ -243,12 +271,19 @@ fn main() -> Result<()> { let args = parse_args(); match args.subcmd { SubCommand::Index(index_args) => { - index(index_args)?; + index(index_args).await?; } SubCommand::Search(search_args) => { - search(search_args)?; + search(search_args).await?; } } Ok(()) } + +fn main() -> Result<()> { + let runtime = Builder::new_multi_thread() + .thread_keep_alive(Duration::from_secs(20)) + .build()?; + runtime.block_on(async_main()) +} diff --git a/src/opendal_file_handle.rs b/src/opendal_file_handle.rs index a03e293..924d27a 100644 --- a/src/opendal_file_handle.rs +++ b/src/opendal_file_handle.rs @@ -1,23 +1,24 @@ -use std::{fs::metadata, ops::Range, path::Path}; +use std::{ops::Range, path::Path}; -use opendal::BlockingReader; +use opendal::Reader; use tantivy::{ directory::{FileHandle, OwnedBytes}, HasLen, }; +use tokio::fs::metadata; pub struct OpenDalFileHandle { size: u64, - reader: BlockingReader, + reader: Reader, } impl OpenDalFileHandle { - pub fn from_path(path: &Path, reader: BlockingReader) -> std::io::Result { - let size = metadata(path)?.len(); + pub async fn from_path(path: &Path, reader: Reader) -> std::io::Result { + let size = metadata(path).await?.len(); Ok(Self::new(size, reader)) } - fn new(size: u64, reader: BlockingReader) -> Self { + fn new(size: u64, reader: Reader) -> Self { Self { size, reader } } } @@ -31,8 +32,11 @@ impl std::fmt::Debug for OpenDalFileHandle { impl FileHandle for OpenDalFileHandle { fn read_bytes(&self, range: Range) -> std::io::Result { let mut bytes = Vec::new(); - self.reader - .read_into(&mut bytes, range.start as u64..range.end as u64) + let fut = self + .reader + .read_into(&mut bytes, range.start as u64..range.end as u64); + tokio::runtime::Handle::current() + .block_on(fut) .map_err(Into::::into)?; Ok(OwnedBytes::new(bytes)) } @@ -46,46 +50,37 @@ impl HasLen for OpenDalFileHandle { #[cfg(test)] mod tests { - use std::io::Write; - + use async_tempfile::TempFile; use color_eyre::eyre::Result; - use opendal::{layers::BlockingLayer, BlockingOperator, Operator}; - use tempfile::NamedTempFile; - - use crate::RUNTIME; + use opendal::Operator; + use tokio::io::AsyncWriteExt; use super::*; - #[test] - fn opendal_reader_read_file() -> Result<()> { - let mut file = NamedTempFile::new()?; + #[tokio::test] + async fn opendal_reader_read_file() -> Result<()> { + let mut file = TempFile::new().await?; - file.write_all(b"abcdefgh")?; - let path = file.into_temp_path(); - let path_buf = path.to_path_buf(); + file.write_all(b"abcdefgh").await?; + let path = file.file_path(); let mut builder = opendal::services::Fs::default(); - builder.root(path_buf.parent().unwrap().to_str().unwrap()); + builder.root(path.parent().unwrap().to_str().unwrap()); - let _guard = RUNTIME.enter(); - let op: BlockingOperator = Operator::new(builder)? - .layer(BlockingLayer::create()?) - .finish() - .blocking(); + let op = Operator::new(builder)?.finish(); let reader = OpenDalFileHandle::from_path( &path, - op.reader_with(&path_buf.file_name().unwrap().to_str().unwrap()) - .call()?, - )?; + op.reader_with(&path.file_name().unwrap().to_str().unwrap()) + .await?, + ) + .await?; assert_eq!( reader.read_bytes(0..reader.len())?.to_vec(), b"abcdefgh".to_vec() ); - path.close()?; - Ok(()) } } diff --git a/src/unified_index/writer.rs b/src/unified_index/writer.rs index 5947150..a196422 100644 --- a/src/unified_index/writer.rs +++ b/src/unified_index/writer.rs @@ -1,32 +1,35 @@ use std::{ collections::{HashMap, HashSet}, - fs::File, - io::{Cursor, Read}, + io::Cursor, ops::Range, path::{Path, PathBuf}, }; use bincode::Options; use color_eyre::eyre::{bail, Result}; +use tokio::{ + fs::File, + io::{AsyncRead, AsyncWrite}, +}; use crate::bincode::bincode_options; use super::IndexFooter; struct FileReader { - reader: Box, + reader: Box, file_name: PathBuf, } impl FileReader { - fn from_path(dir: &Path, file_name: PathBuf) -> std::io::Result { + async fn from_path(dir: &Path, file_name: PathBuf) -> std::io::Result { Ok(Self::new( - Box::new(File::open(dir.join(&file_name))?), + Box::new(File::open(dir.join(&file_name)).await?), file_name, )) } - fn new(reader: Box, file_name: PathBuf) -> Self { + fn new(reader: Box, file_name: PathBuf) -> Self { Self { reader, file_name } } } @@ -37,11 +40,14 @@ pub struct UnifiedIndexWriter { } impl UnifiedIndexWriter { - pub fn from_file_paths(dir: &Path, file_names: HashSet) -> std::io::Result { - let file_readers = file_names - .into_iter() - .map(|path| FileReader::from_path(dir, path)) - .collect::>>()?; + pub async fn from_file_paths( + dir: &Path, + file_names: HashSet, + ) -> std::io::Result { + let mut file_readers = Vec::with_capacity(file_names.len()); + for file_name in file_names { + file_readers.push(FileReader::from_path(dir, file_name).await?); + } Ok(Self::new(file_readers)) } @@ -52,19 +58,22 @@ impl UnifiedIndexWriter { } } - pub fn write(mut self, writer: &mut W) -> Result<(u64, u64)> { + pub async fn write(mut self, writer: &mut W) -> Result<(u64, u64)> + where + W: AsyncWrite + Unpin, + { let mut written = 0u64; for mut file_reader in self.file_readers { let start = written; - written += std::io::copy(&mut file_reader.reader, writer)?; - self.file_offsets - .insert(file_reader.file_name, start..written); + let file_name = file_reader.file_name; + written += tokio::io::copy(&mut file_reader.reader, writer).await?; + self.file_offsets.insert(file_name, start..written); } let footer_bytes = bincode_options().serialize(&IndexFooter::new(self.file_offsets))?; let footer_len = footer_bytes.len() as u64; - let footer_written = std::io::copy(&mut Cursor::new(footer_bytes), writer)?; + let footer_written = tokio::io::copy(&mut Cursor::new(footer_bytes), writer).await?; if footer_written < footer_len { bail!( "written less than expected: {} < {}", @@ -79,31 +88,30 @@ impl UnifiedIndexWriter { #[cfg(test)] mod tests { - use std::{ - io::{Seek, SeekFrom, Write}, - path::Path, - sync::Arc, - }; + use std::{io::SeekFrom, path::Path, sync::Arc}; + use async_tempfile::TempFile; use color_eyre::eyre::Result; + use futures::try_join; use tantivy::{ directory::{FileSlice, OwnedBytes}, Directory, }; - use tempfile::tempfile; + use tokio::io::{AsyncSeekExt, AsyncWriteExt}; use crate::unified_index::directory::UnifiedDirectory; use super::*; - #[test] - fn unified_index_write_then_read_2_files() -> Result<()> { - let mut file1 = tempfile()?; - let mut file2 = tempfile()?; - file1.write_all(b"hello")?; - file2.write_all(b"world")?; - file1.seek(SeekFrom::Start(0))?; - file2.seek(SeekFrom::Start(0))?; + #[tokio::test] + async fn unified_index_write_then_read_2_files() -> Result<()> { + let mut file1 = TempFile::new().await?; + let mut file2 = TempFile::new().await?; + try_join!(file1.write_all(b"hello"), file2.write_all(b"world"))?; + try_join!( + file1.seek(SeekFrom::Start(0)), + file2.seek(SeekFrom::Start(0)) + )?; let writer = UnifiedIndexWriter::new(vec![ FileReader::new(Box::new(file1), PathBuf::from("a")), @@ -111,7 +119,7 @@ mod tests { ]); let mut buf = vec![]; - let (_, footer_len) = writer.write(&mut buf)?; + let (_, footer_len) = writer.write(&mut buf).await?; let file_slice = FileSlice::new(Arc::new(OwnedBytes::new(buf))); let dir = UnifiedDirectory::open_with_len(file_slice, footer_len as usize)?;