Skip to content

Commit

Permalink
tests: Add sanity integration test
Browse files Browse the repository at this point in the history
The test creates, indexes, searches and then drops a small hdfs
logs file.

Using `testcontainers-rs`, the test runs a postgres instance inside
a container.
  • Loading branch information
tontinton committed Jun 11, 2024
1 parent c162ba0 commit 10c51be
Show file tree
Hide file tree
Showing 12 changed files with 606 additions and 47 deletions.
440 changes: 420 additions & 20 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,6 @@ uuid = { version = "1.8.0", features = ["v7"] }

[dev-dependencies]
async-tempfile = "0.5.0"
ctor = "0.2.8"
testcontainers = "0.17.0"
testcontainers-modules = { version = "0.5.0", features = ["postgres"] }
9 changes: 6 additions & 3 deletions src/commands/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ use crate::{
config::{FieldType, IndexConfig},
};

pub async fn run_create(args: CreateArgs, pool: PgPool) -> Result<()> {
pub async fn run_create(args: CreateArgs, pool: &PgPool) -> Result<()> {
let config = IndexConfig::from_path(&args.config_path).await?;
run_create_from_config(&config, pool).await
}

pub async fn run_create_from_config(config: &IndexConfig, pool: &PgPool) -> Result<()> {
let array_static_object_exists = config
.schema
.fields
Expand All @@ -20,8 +23,8 @@ pub async fn run_create(args: CreateArgs, pool: PgPool) -> Result<()> {

query("INSERT INTO indexes (name, config) VALUES ($1, $2)")
.bind(&config.name)
.bind(&serde_json::to_value(&config)?)
.execute(&pool)
.bind(&serde_json::to_value(config)?)
.execute(pool)
.await?;

info!("Created index: {}", &config.name);
Expand Down
8 changes: 4 additions & 4 deletions src/commands/drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ use crate::args::DropArgs;

use super::get_index_path;

pub async fn run_drop(args: DropArgs, pool: PgPool) -> Result<()> {
let base_path = get_index_path(&args.name, &pool).await?;
pub async fn run_drop(args: DropArgs, pool: &PgPool) -> Result<()> {
let base_path = get_index_path(&args.name, pool).await?;

let file_names: Vec<(String,)> =
query_as("SELECT file_name FROM index_files WHERE index_name=$1")
.bind(&args.name)
.fetch_all(&pool)
.fetch_all(pool)
.await?;
let file_names_len = file_names.len();

Expand All @@ -30,7 +30,7 @@ pub async fn run_drop(args: DropArgs, pool: PgPool) -> Result<()> {

query("DELETE FROM indexes WHERE name=$1")
.bind(&args.name)
.execute(&pool)
.execute(pool)
.await?;

info!(
Expand Down
8 changes: 5 additions & 3 deletions src/commands/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use crate::{args::IndexArgs, commands::field_parser::build_parsers_from_field_co

use super::{dynamic_field_config, get_index_config, write_unified_index, DYNAMIC_FIELD_NAME};

pub async fn run_index(args: IndexArgs, pool: PgPool) -> Result<()> {
let config = get_index_config(&args.name, &pool).await?;
pub async fn run_index(args: IndexArgs, pool: &PgPool) -> Result<()> {
let config = get_index_config(&args.name, pool).await?;

let mut schema_builder = Schema::builder();
let dynamic_field = schema_builder.add_json_field(DYNAMIC_FIELD_NAME, dynamic_field_config());
Expand All @@ -30,8 +30,10 @@ pub async fn run_index(args: IndexArgs, pool: PgPool) -> Result<()> {
index_writer.set_merge_policy(Box::new(NoMergePolicy));

let input: Box<dyn AsyncRead + Unpin> = if let Some(input) = args.input {
debug!("reading from '{}'", &input);
Box::new(File::open(&input).await?)
} else {
debug!("reading from stdin");
Box::new(stdin())
};
let mut reader = BufReader::new(input);
Expand Down Expand Up @@ -78,7 +80,7 @@ pub async fn run_index(args: IndexArgs, pool: PgPool) -> Result<()> {

spawn_blocking(move || index_writer.wait_merging_threads()).await??;

write_unified_index(index, &args.build_dir, &config.name, &config.path, &pool).await?;
write_unified_index(index, &args.build_dir, &config.name, &config.path, pool).await?;

Ok(())
}
10 changes: 5 additions & 5 deletions src/commands/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ use super::{get_index_config, open_unified_directories, write_unified_index};

const MIN_TANTIVY_MEMORY: usize = 15_000_000;

pub async fn run_merge(args: MergeArgs, pool: PgPool) -> Result<()> {
let config = get_index_config(&args.name, &pool).await?;
pub async fn run_merge(args: MergeArgs, pool: &PgPool) -> Result<()> {
let config = get_index_config(&args.name, pool).await?;

let (ids, directories): (Vec<_>, Vec<_>) = open_unified_directories(&config.path, &pool)
let (ids, directories): (Vec<_>, Vec<_>) = open_unified_directories(&config.path, pool)
.await?
.into_iter()
.map(|(id, dir)| (id, dir.box_clone()))
Expand All @@ -47,11 +47,11 @@ pub async fn run_merge(args: MergeArgs, pool: PgPool) -> Result<()> {

spawn_blocking(move || index_writer.wait_merging_threads()).await??;

write_unified_index(index, &args.merge_dir, &config.name, &config.path, &pool).await?;
write_unified_index(index, &args.merge_dir, &config.name, &config.path, pool).await?;

let delete_result = query("DELETE FROM index_files WHERE id = ANY($1)")
.bind(&ids)
.execute(&pool)
.execute(pool)
.await;

for id in ids {
Expand Down
23 changes: 19 additions & 4 deletions src/commands/search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,11 @@ where
rx
}

pub async fn run_search(args: SearchArgs, pool: PgPool) -> Result<()> {
pub async fn run_search_with_callback(
args: SearchArgs,
pool: &PgPool,
on_doc_fn: Box<dyn Fn(String) + Send>,
) -> Result<()> {
if args.limit == 0 {
return Ok(());
}
Expand All @@ -129,7 +133,7 @@ pub async fn run_search(args: SearchArgs, pool: PgPool) -> Result<()> {
})
.build()?;

let config = get_index_config(&args.name, &pool).await?;
let config = get_index_config(&args.name, pool).await?;

let indexed_field_names = {
let mut fields = config.schema.fields.get_indexed();
Expand All @@ -141,7 +145,7 @@ pub async fn run_search(args: SearchArgs, pool: PgPool) -> Result<()> {
fields
};

let directories = open_unified_directories(&config.path, &pool)
let directories = open_unified_directories(&config.path, pool)
.await?
.into_iter()
.map(|(_, x)| x)
Expand Down Expand Up @@ -197,7 +201,7 @@ pub async fn run_search(args: SearchArgs, pool: PgPool) -> Result<()> {
let mut rx_handle = spawn(async move {
let mut i = 0;
while let Some(doc) = rx.recv().await {
println!("{}", doc);
on_doc_fn(doc);
i += 1;
if i == args.limit {
break;
Expand Down Expand Up @@ -226,3 +230,14 @@ pub async fn run_search(args: SearchArgs, pool: PgPool) -> Result<()> {

Ok(())
}

pub async fn run_search(args: SearchArgs, pool: &PgPool) -> Result<()> {
run_search_with_callback(
args,
pool,
Box::new(|doc| {
println!("{}", doc);
}),
)
.await
}
14 changes: 11 additions & 3 deletions src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ pub mod number;
pub mod static_object;
pub mod text;

use std::{ops::Deref, path::Path, vec::IntoIter};
use std::{ops::Deref, path::Path, str::FromStr, vec::IntoIter};

use color_eyre::eyre::Result;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -232,7 +232,7 @@ pub struct IndexSchema {
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct IndexConfig {
pub struct IndexConfig {
pub name: String,
pub path: String,

Expand All @@ -246,6 +246,14 @@ pub(crate) struct IndexConfig {
impl IndexConfig {
pub async fn from_path<P: AsRef<Path>>(path: P) -> Result<Self> {
let config_str = read_to_string(path).await?;
Ok(serde_yaml::from_str(&config_str)?)
Self::from_str(&config_str)
}
}

impl FromStr for IndexConfig {
type Err = color_eyre::Report;

fn from_str(s: &str) -> Result<Self> {
Ok(serde_yaml::from_str(s)?)
}
}
10 changes: 5 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,19 @@ async fn async_main(args: Args) -> Result<()> {

match args.subcmd {
SubCommand::Create(create_args) => {
run_create(create_args, pool).await?;
run_create(create_args, &pool).await?;
}
SubCommand::Drop(drop_args) => {
run_drop(drop_args, pool).await?;
run_drop(drop_args, &pool).await?;
}
SubCommand::Index(index_args) => {
run_index(index_args, pool).await?;
run_index(index_args, &pool).await?;
}
SubCommand::Merge(merge_args) => {
run_merge(merge_args, pool).await?;
run_merge(merge_args, &pool).await?;
}
SubCommand::Search(search_args) => {
run_search(search_args, pool).await?;
run_search(search_args, &pool).await?;
}
}

Expand Down
55 changes: 55 additions & 0 deletions tests/common/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use std::{
fs::canonicalize,
path::{Path, PathBuf},
};

use color_eyre::Result;
use sqlx::{migrate::Migrator, postgres::PgPoolOptions, PgPool};
use testcontainers::{runners::AsyncRunner, ContainerAsync};
use testcontainers_modules::postgres::Postgres as PostgresContainer;

static MIGRATOR: Migrator = sqlx::migrate!();

const MAX_DB_CONNECTIONS: u32 = 100;

pub struct Postgres {
/// Keep container alive (container is deleted on drop).
_container: ContainerAsync<PostgresContainer>,

/// The underlying sqlx connection to the postgres inside the container.
pub pool: PgPool,
}

async fn open_db_pool(url: &str) -> Result<PgPool> {
Ok(PgPoolOptions::new()
.max_connections(MAX_DB_CONNECTIONS)
.connect(url)
.await?)
}

pub async fn run_postgres() -> Result<Postgres> {
let container = PostgresContainer::default().start().await?;
let pool = open_db_pool(&format!(
"postgres://postgres:[email protected]:{}/postgres",
container.get_host_port_ipv4(5432).await?
))
.await?;

MIGRATOR.run(&pool).await?;

Ok(Postgres {
_container: container,
pool,
})
}

pub fn get_test_file_path(test_file: &str) -> PathBuf {
canonicalize(&Path::new(file!()))
.unwrap()
.parent()
.unwrap()
.parent()
.unwrap()
.join("test_files")
.join(test_file)
}
71 changes: 71 additions & 0 deletions tests/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
mod common;

use std::str::FromStr;

use clap::Parser;
use color_eyre::Result;
use ctor::ctor;
use pretty_env_logger::formatted_timed_builder;
use tokio::sync::mpsc;
use toshokan::{
args::{DropArgs, IndexArgs, SearchArgs},
commands::{
create::run_create_from_config, drop::run_drop, index::run_index,
search::run_search_with_callback,
},
config::IndexConfig,
};

use crate::common::{get_test_file_path, run_postgres};

#[ctor]
fn init() {
color_eyre::install().unwrap();

let mut log_builder = formatted_timed_builder();
log_builder.parse_filters("toshokan=trace,opendal::services=info");
log_builder.try_init().unwrap();
}

#[tokio::test]
async fn test_example_config() -> Result<()> {
let postgres = run_postgres().await?;
let config = IndexConfig::from_str(include_str!("../example_config.yaml"))?;

run_create_from_config(&config, &postgres.pool).await?;

run_index(
IndexArgs::parse_from([
"",
&config.name,
&get_test_file_path("hdfs-logs-multitenants-2.json").to_string_lossy(),
]),
&postgres.pool,
)
.await?;

let (tx, mut rx) = mpsc::channel(1);
run_search_with_callback(
SearchArgs::parse_from([
"",
&config.name,
"tenant_id:>50 AND severity_text:INFO",
"--limit",
"1",
]),
&postgres.pool,
Box::new(move |doc| {
tx.try_send(doc).unwrap();
}),
)
.await?;

assert_eq!(
rx.recv().await.unwrap(),
r#"{"attributes":{"class":"org.apache.hadoop.hdfs.server.datanode.DataNode"},"body":"PacketResponder: BP-108841162-10.10.34.11-1440074360971:blk_1074072698_331874, type=HAS_DOWNSTREAM_IN_PIPELINE terminating","resource":{"service":"datanode/01"},"severity_text":"INFO","tenant_id":58,"timestamp":"2016-04-13T06:46:53Z"}"#
);

run_drop(DropArgs::parse_from(["", &config.name]), &postgres.pool).await?;

Ok(())
}
2 changes: 2 additions & 0 deletions tests/test_files/hdfs-logs-multitenants-2.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"timestamp":1460530013,"severity_text":"INFO","body":"PacketResponder: BP-108841162-10.10.34.11-1440074360971:blk_1074072698_331874, type=HAS_DOWNSTREAM_IN_PIPELINE terminating","resource":{"service":"datanode/01"},"attributes":{"class":"org.apache.hadoop.hdfs.server.datanode.DataNode"},"tenant_id":58}
{"timestamp":1460530014,"severity_text":"INFO","body":"Receiving BP-108841162-10.10.34.11-1440074360971:blk_1074072706_331882 src: /10.10.34.33:42666 dest: /10.10.34.11:50010","resource":{"service":"datanode/01"},"attributes":{"class":"org.apache.hadoop.hdfs.server.datanode.DataNode"},"tenant_id":46}

0 comments on commit 10c51be

Please sign in to comment.