Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests: Add sanity integration test #5

Merged
merged 2 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
440 changes: 420 additions & 20 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,6 @@ uuid = { version = "1.8.0", features = ["v7"] }

[dev-dependencies]
async-tempfile = "0.5.0"
ctor = "0.2.8"
testcontainers = "0.17.0"
testcontainers-modules = { version = "0.5.0", features = ["postgres"] }
9 changes: 6 additions & 3 deletions src/commands/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ use crate::{
config::{FieldType, IndexConfig},
};

pub async fn run_create(args: CreateArgs, pool: PgPool) -> Result<()> {
pub async fn run_create(args: CreateArgs, pool: &PgPool) -> Result<()> {
let config = IndexConfig::from_path(&args.config_path).await?;
run_create_from_config(&config, pool).await
}

pub async fn run_create_from_config(config: &IndexConfig, pool: &PgPool) -> Result<()> {
let array_static_object_exists = config
.schema
.fields
Expand All @@ -20,8 +23,8 @@ pub async fn run_create(args: CreateArgs, pool: PgPool) -> Result<()> {

query("INSERT INTO indexes (name, config) VALUES ($1, $2)")
.bind(&config.name)
.bind(&serde_json::to_value(&config)?)
.execute(&pool)
.bind(&serde_json::to_value(config)?)
.execute(pool)
.await?;

info!("Created index: {}", &config.name);
Expand Down
8 changes: 4 additions & 4 deletions src/commands/drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ use crate::args::DropArgs;

use super::get_index_path;

pub async fn run_drop(args: DropArgs, pool: PgPool) -> Result<()> {
let base_path = get_index_path(&args.name, &pool).await?;
pub async fn run_drop(args: DropArgs, pool: &PgPool) -> Result<()> {
let base_path = get_index_path(&args.name, pool).await?;

let file_names: Vec<(String,)> =
query_as("SELECT file_name FROM index_files WHERE index_name=$1")
.bind(&args.name)
.fetch_all(&pool)
.fetch_all(pool)
.await?;
let file_names_len = file_names.len();

Expand All @@ -30,7 +30,7 @@ pub async fn run_drop(args: DropArgs, pool: PgPool) -> Result<()> {

query("DELETE FROM indexes WHERE name=$1")
.bind(&args.name)
.execute(&pool)
.execute(pool)
.await?;

info!(
Expand Down
8 changes: 5 additions & 3 deletions src/commands/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use crate::{args::IndexArgs, commands::field_parser::build_parsers_from_field_co

use super::{dynamic_field_config, get_index_config, write_unified_index, DYNAMIC_FIELD_NAME};

pub async fn run_index(args: IndexArgs, pool: PgPool) -> Result<()> {
let config = get_index_config(&args.name, &pool).await?;
pub async fn run_index(args: IndexArgs, pool: &PgPool) -> Result<()> {
let config = get_index_config(&args.name, pool).await?;

let mut schema_builder = Schema::builder();
let dynamic_field = schema_builder.add_json_field(DYNAMIC_FIELD_NAME, dynamic_field_config());
Expand All @@ -30,8 +30,10 @@ pub async fn run_index(args: IndexArgs, pool: PgPool) -> Result<()> {
index_writer.set_merge_policy(Box::new(NoMergePolicy));

let input: Box<dyn AsyncRead + Unpin> = if let Some(input) = args.input {
debug!("reading from '{}'", &input);
Box::new(File::open(&input).await?)
} else {
debug!("reading from stdin");
Box::new(stdin())
};
let mut reader = BufReader::new(input);
Expand Down Expand Up @@ -78,7 +80,7 @@ pub async fn run_index(args: IndexArgs, pool: PgPool) -> Result<()> {

spawn_blocking(move || index_writer.wait_merging_threads()).await??;

write_unified_index(index, &args.build_dir, &config.name, &config.path, &pool).await?;
write_unified_index(index, &args.build_dir, &config.name, &config.path, pool).await?;

Ok(())
}
10 changes: 5 additions & 5 deletions src/commands/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ use super::{get_index_config, open_unified_directories, write_unified_index};

const MIN_TANTIVY_MEMORY: usize = 15_000_000;

pub async fn run_merge(args: MergeArgs, pool: PgPool) -> Result<()> {
let config = get_index_config(&args.name, &pool).await?;
pub async fn run_merge(args: MergeArgs, pool: &PgPool) -> Result<()> {
let config = get_index_config(&args.name, pool).await?;

let (ids, directories): (Vec<_>, Vec<_>) = open_unified_directories(&config.path, &pool)
let (ids, directories): (Vec<_>, Vec<_>) = open_unified_directories(&config.path, pool)
.await?
.into_iter()
.map(|(id, dir)| (id, dir.box_clone()))
Expand All @@ -47,11 +47,11 @@ pub async fn run_merge(args: MergeArgs, pool: PgPool) -> Result<()> {

spawn_blocking(move || index_writer.wait_merging_threads()).await??;

write_unified_index(index, &args.merge_dir, &config.name, &config.path, &pool).await?;
write_unified_index(index, &args.merge_dir, &config.name, &config.path, pool).await?;

let delete_result = query("DELETE FROM index_files WHERE id = ANY($1)")
.bind(&ids)
.execute(&pool)
.execute(pool)
.await;

for id in ids {
Expand Down
23 changes: 19 additions & 4 deletions src/commands/search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,11 @@ where
rx
}

pub async fn run_search(args: SearchArgs, pool: PgPool) -> Result<()> {
pub async fn run_search_with_callback(
args: SearchArgs,
pool: &PgPool,
on_doc_fn: Box<dyn Fn(String) + Send>,
) -> Result<()> {
if args.limit == 0 {
return Ok(());
}
Expand All @@ -129,7 +133,7 @@ pub async fn run_search(args: SearchArgs, pool: PgPool) -> Result<()> {
})
.build()?;

let config = get_index_config(&args.name, &pool).await?;
let config = get_index_config(&args.name, pool).await?;

let indexed_field_names = {
let mut fields = config.schema.fields.get_indexed();
Expand All @@ -141,7 +145,7 @@ pub async fn run_search(args: SearchArgs, pool: PgPool) -> Result<()> {
fields
};

let directories = open_unified_directories(&config.path, &pool)
let directories = open_unified_directories(&config.path, pool)
.await?
.into_iter()
.map(|(_, x)| x)
Expand Down Expand Up @@ -197,7 +201,7 @@ pub async fn run_search(args: SearchArgs, pool: PgPool) -> Result<()> {
let mut rx_handle = spawn(async move {
let mut i = 0;
while let Some(doc) = rx.recv().await {
println!("{}", doc);
on_doc_fn(doc);
i += 1;
if i == args.limit {
break;
Expand Down Expand Up @@ -226,3 +230,14 @@ pub async fn run_search(args: SearchArgs, pool: PgPool) -> Result<()> {

Ok(())
}

pub async fn run_search(args: SearchArgs, pool: &PgPool) -> Result<()> {
run_search_with_callback(
args,
pool,
Box::new(|doc| {
println!("{}", doc);
}),
)
.await
}
14 changes: 11 additions & 3 deletions src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ pub mod number;
pub mod static_object;
pub mod text;

use std::{ops::Deref, path::Path, vec::IntoIter};
use std::{ops::Deref, path::Path, str::FromStr, vec::IntoIter};

use color_eyre::eyre::Result;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -232,7 +232,7 @@ pub struct IndexSchema {
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct IndexConfig {
pub struct IndexConfig {
pub name: String,
pub path: String,

Expand All @@ -246,6 +246,14 @@ pub(crate) struct IndexConfig {
impl IndexConfig {
pub async fn from_path<P: AsRef<Path>>(path: P) -> Result<Self> {
let config_str = read_to_string(path).await?;
Ok(serde_yaml::from_str(&config_str)?)
Self::from_str(&config_str)
}
}

impl FromStr for IndexConfig {
type Err = color_eyre::Report;

fn from_str(s: &str) -> Result<Self> {
Ok(serde_yaml::from_str(s)?)
}
}
10 changes: 5 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,19 @@ async fn async_main(args: Args) -> Result<()> {

match args.subcmd {
SubCommand::Create(create_args) => {
run_create(create_args, pool).await?;
run_create(create_args, &pool).await?;
}
SubCommand::Drop(drop_args) => {
run_drop(drop_args, pool).await?;
run_drop(drop_args, &pool).await?;
}
SubCommand::Index(index_args) => {
run_index(index_args, pool).await?;
run_index(index_args, &pool).await?;
}
SubCommand::Merge(merge_args) => {
run_merge(merge_args, pool).await?;
run_merge(merge_args, &pool).await?;
}
SubCommand::Search(search_args) => {
run_search(search_args, pool).await?;
run_search(search_args, &pool).await?;
}
}

Expand Down
55 changes: 55 additions & 0 deletions tests/common/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use std::{
fs::canonicalize,
path::{Path, PathBuf},
};

use color_eyre::Result;
use sqlx::{migrate::Migrator, postgres::PgPoolOptions, PgPool};
use testcontainers::{runners::AsyncRunner, ContainerAsync};
use testcontainers_modules::postgres::Postgres as PostgresContainer;

static MIGRATOR: Migrator = sqlx::migrate!();

const MAX_DB_CONNECTIONS: u32 = 100;

pub struct Postgres {
/// Keep container alive (container is deleted on drop).
_container: ContainerAsync<PostgresContainer>,

/// The underlying sqlx connection to the postgres inside the container.
pub pool: PgPool,
}

async fn open_db_pool(url: &str) -> Result<PgPool> {
Ok(PgPoolOptions::new()
.max_connections(MAX_DB_CONNECTIONS)
.connect(url)
.await?)
}

pub async fn run_postgres() -> Result<Postgres> {
let container = PostgresContainer::default().start().await?;
let pool = open_db_pool(&format!(
"postgres://postgres:[email protected]:{}/postgres",
container.get_host_port_ipv4(5432).await?
))
.await?;

MIGRATOR.run(&pool).await?;

Ok(Postgres {
_container: container,
pool,
})
}

pub fn get_test_file_path(test_file: &str) -> PathBuf {
canonicalize(&Path::new(file!()))
.unwrap()
.parent()
.unwrap()
.parent()
.unwrap()
.join("test_files")
.join(test_file)
}
71 changes: 71 additions & 0 deletions tests/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
mod common;

use std::str::FromStr;

use clap::Parser;
use color_eyre::Result;
use ctor::ctor;
use pretty_env_logger::formatted_timed_builder;
use tokio::sync::mpsc;
use toshokan::{
args::{DropArgs, IndexArgs, SearchArgs},
commands::{
create::run_create_from_config, drop::run_drop, index::run_index,
search::run_search_with_callback,
},
config::IndexConfig,
};

use crate::common::{get_test_file_path, run_postgres};

#[ctor]
fn init() {
color_eyre::install().unwrap();

let mut log_builder = formatted_timed_builder();
log_builder.parse_filters("toshokan=trace,opendal::services=info");
log_builder.try_init().unwrap();
}

#[tokio::test]
async fn test_example_config() -> Result<()> {
let postgres = run_postgres().await?;
let config = IndexConfig::from_str(include_str!("../example_config.yaml"))?;

run_create_from_config(&config, &postgres.pool).await?;

run_index(
IndexArgs::parse_from([
"",
&config.name,
&get_test_file_path("hdfs-logs-multitenants-2.json").to_string_lossy(),
]),
&postgres.pool,
)
.await?;

let (tx, mut rx) = mpsc::channel(1);
run_search_with_callback(
SearchArgs::parse_from([
"",
&config.name,
"tenant_id:>50 AND severity_text:INFO",
"--limit",
"1",
]),
&postgres.pool,
Box::new(move |doc| {
tx.try_send(doc).unwrap();
}),
)
.await?;

assert_eq!(
rx.recv().await.unwrap(),
r#"{"attributes":{"class":"org.apache.hadoop.hdfs.server.datanode.DataNode"},"body":"PacketResponder: BP-108841162-10.10.34.11-1440074360971:blk_1074072698_331874, type=HAS_DOWNSTREAM_IN_PIPELINE terminating","resource":{"service":"datanode/01"},"severity_text":"INFO","tenant_id":58,"timestamp":"2016-04-13T06:46:53Z"}"#
);

run_drop(DropArgs::parse_from(["", &config.name]), &postgres.pool).await?;

Ok(())
}
2 changes: 2 additions & 0 deletions tests/test_files/hdfs-logs-multitenants-2.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"timestamp":1460530013,"severity_text":"INFO","body":"PacketResponder: BP-108841162-10.10.34.11-1440074360971:blk_1074072698_331874, type=HAS_DOWNSTREAM_IN_PIPELINE terminating","resource":{"service":"datanode/01"},"attributes":{"class":"org.apache.hadoop.hdfs.server.datanode.DataNode"},"tenant_id":58}
{"timestamp":1460530014,"severity_text":"INFO","body":"Receiving BP-108841162-10.10.34.11-1440074360971:blk_1074072706_331882 src: /10.10.34.33:42666 dest: /10.10.34.11:50010","resource":{"service":"datanode/01"},"attributes":{"class":"org.apache.hadoop.hdfs.server.datanode.DataNode"},"tenant_id":46}
Loading