Skip to content

Commit

Permalink
Test DDL (#161)
Browse files Browse the repository at this point in the history
This adds test for running `AppEvent::ExecuteDDL` and then running query
to make sure the DDL statements were correctly processed.

Unfortunately, this isnt a proper end to end test like I was hoping. It
was surprisingly difficult to get the app event loop to run in the test
which is required to get that to work. So, we simply execute a query
directly on the context instead.
  • Loading branch information
matthewmturner authored Sep 24, 2024
1 parent c431b3a commit 16897dc
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 117 deletions.
24 changes: 10 additions & 14 deletions src/app/app_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

//! [`AppExecution`]: Handles executing queries for the TUI application.
use crate::app::state::tabs::sql::Query;
use crate::app::{AppEvent, ExecutionError, ExecutionResultsBatch};
use crate::execution::ExecutionContext;
use color_eyre::eyre::Result;
Expand All @@ -27,7 +26,6 @@ use datafusion::physical_plan::execute_stream;
use futures::StreamExt;
use log::{error, info};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::Mutex;

Expand Down Expand Up @@ -62,23 +60,25 @@ impl AppExecution {
///
/// Error handling: If an error occurs while executing a query, the error is
/// logged and execution continues
pub async fn run_sqls(&self, sqls: Vec<&str>, sender: UnboundedSender<AppEvent>) -> Result<()> {
pub async fn run_sqls(
self: Arc<Self>,
sqls: Vec<String>,
sender: UnboundedSender<AppEvent>,
) -> Result<()> {
// We need to filter out empty strings to correctly determine the last query for displaying
// results.
info!("Running sqls: {:?}", sqls);
let non_empty_sqls: Vec<&str> = sqls.into_iter().filter(|s| !s.is_empty()).collect();
let non_empty_sqls: Vec<String> = sqls.into_iter().filter(|s| !s.is_empty()).collect();
info!("Non empty SQLs: {:?}", non_empty_sqls);
let statement_count = non_empty_sqls.len();
for (i, sql) in non_empty_sqls.into_iter().enumerate() {
info!("Running query {}", i);
let _sender = sender.clone();
let mut query =
Query::new(sql.to_string(), None, None, None, Duration::default(), None);
let start = std::time::Instant::now();
if i == statement_count - 1 {
info!("Executing last query and display results");
sender.send(AppEvent::NewExecution)?;
match self.inner.create_physical_plan(sql).await {
match self.inner.create_physical_plan(&sql).await {
Ok(plan) => match execute_stream(plan, self.inner.session_ctx().task_ctx()) {
Ok(stream) => {
self.set_result_stream(stream).await;
Expand All @@ -105,7 +105,7 @@ impl AppExecution {
}
}
Err(stream_err) => {
error!("Error creating physical plan: {:?}", stream_err);
error!("Error executing stream: {:?}", stream_err);
let elapsed = start.elapsed();
let e = ExecutionError {
query: sql.to_string(),
Expand All @@ -127,11 +127,8 @@ impl AppExecution {
}
}
} else {
match self.inner.execute_sql_and_discard_results(sql).await {
Ok(_) => {
let elapsed = start.elapsed();
query.set_execution_time(elapsed);
}
match self.inner.execute_sql_and_discard_results(&sql).await {
Ok(_) => {}
Err(e) => {
// We only log failed queries, we don't want to stop the execution of the
// remaining queries. Perhaps there should be a configuration option for
Expand All @@ -140,7 +137,6 @@ impl AppExecution {
}
}
}
_sender.send(AppEvent::QueryResult(query))?; // Send the query result to the UI
}
Ok(())
}
Expand Down
3 changes: 2 additions & 1 deletion src/app/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ pub fn app_event_handler(app: &mut App, event: AppEvent) -> Result<()> {
})
.collect();
let ctx = app.execution.session_ctx().clone();
tokio::spawn(async move {
let handle = tokio::spawn(async move {
for q in queries {
info!("Executing DDL: {:?}", q);
match ctx.sql(&q).await {
Expand All @@ -178,6 +178,7 @@ pub fn app_event_handler(app: &mut App, event: AppEvent) -> Result<()> {
}
}
});
app.ddl_task = Some(handle);
}
AppEvent::NewExecution => {
app.state.sql_tab.reset_execution_results();
Expand Down
9 changes: 3 additions & 6 deletions src/app/handlers/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,9 @@ pub fn normal_mode_handler(app: &mut App, key: KeyEvent) {
info!("Running query: {}", sql);
let _event_tx = app.event_tx().clone();
let execution = Arc::clone(&app.execution);
// TODO: Extract this into function to be used in both normal and editable handler.
// Only useful if we get Ctrl / Cmd + Enter to work in editable mode though.
tokio::spawn(async move {
let sqls: Vec<&str> = sql.split(';').collect();
let _ = execution.run_sqls(sqls, _event_tx).await;
});
let sqls: Vec<String> = sql.split(';').map(|s| s.to_string()).collect();
let handle = tokio::spawn(execution.run_sqls(sqls, _event_tx));
app.state.sql_tab.set_execution_task(handle);
}
KeyCode::Right => {
let _event_tx = app.event_tx().clone();
Expand Down
36 changes: 28 additions & 8 deletions src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ use tokio_util::sync::CancellationToken;

use self::app_execution::AppExecution;
use self::handlers::{app_event_handler, crossterm_event_handler};
use self::state::tabs::sql::Query;
use crate::execution::ExecutionContext;

#[cfg(feature = "flightsql")]
Expand Down Expand Up @@ -121,7 +120,6 @@ pub enum AppEvent {
Resize(u16, u16),
ExecuteDDL(String),
NewExecution,
QueryResult(Query),
ExecutionResultsNextPage(ExecutionResultsBatch),
ExecutionResultsPreviousPage,
ExecutionResultsError(ExecutionError),
Expand All @@ -138,13 +136,15 @@ pub struct App<'app> {
event_rx: UnboundedReceiver<AppEvent>,
cancellation_token: CancellationToken,
task: JoinHandle<()>,
ddl_task: Option<JoinHandle<()>>,
}

impl<'app> App<'app> {
pub fn new(state: state::AppState<'app>, execution: ExecutionContext) -> Self {
let (event_tx, event_rx) = mpsc::unbounded_channel();
let cancellation_token = CancellationToken::new();
let task = tokio::spawn(async {});
// let ddl_task = tokio::spawn(async {});
let app_execution = Arc::new(AppExecution::new(Arc::new(execution)));

Self {
Expand All @@ -154,13 +154,18 @@ impl<'app> App<'app> {
event_tx,
cancellation_token,
execution: app_execution,
ddl_task: None,
}
}

pub fn event_tx(&self) -> UnboundedSender<AppEvent> {
self.event_tx.clone()
}

pub fn ddl_task(&mut self) -> &mut Option<JoinHandle<()>> {
&mut self.ddl_task
}

pub fn event_rx(&mut self) -> &mut UnboundedReceiver<AppEvent> {
&mut self.event_rx
}
Expand All @@ -181,6 +186,10 @@ impl<'app> App<'app> {
&self.state
}

pub fn state_mut(&mut self) -> &mut state::AppState<'app> {
&mut self.state
}

/// Enter app, optionally setup `crossterm` with UI settings such as alternative screen and
/// mouse capture, then start event loop.
pub fn enter(&mut self, ui: bool) -> Result<()> {
Expand All @@ -194,7 +203,7 @@ impl<'app> App<'app> {
ratatui::crossterm::execute!(std::io::stdout(), event::EnableBracketedPaste)?;
}
}
self.start_event_loop();
self.start_app_event_loop();
Ok(())
}

Expand Down Expand Up @@ -293,8 +302,10 @@ impl<'app> App<'app> {
});
}

/// Execute DDL from users DDL file
pub fn execute_ddl(&mut self) {
if let Some(user_dirs) = directories::UserDirs::new() {
// TODO: Move to ~/.config/ddl
let datafusion_rc_path = user_dirs
.home_dir()
.join(".datafusion")
Expand All @@ -321,11 +332,6 @@ impl<'app> App<'app> {
let _ = self.event_tx().send(AppEvent::EstablishFlightSQLConnection);
}

/// Dispatch to the appropriate event loop based on the command
pub fn start_event_loop(&mut self) {
self.start_app_event_loop()
}

/// Get the next event from event loop
pub async fn next(&mut self) -> Result<AppEvent> {
self.event_rx()
Expand All @@ -349,6 +355,20 @@ impl<'app> App<'app> {
.divider(" ")
.render(area, buf);
}

pub async fn loop_without_render(&mut self) -> Result<()> {
self.enter(true)?;
// Main loop for handling events
loop {
let event = self.next().await?;

self.handle_app_event(event)?;

if self.state.should_quit {
break Ok(());
}
}
}
}

impl Widget for &App<'_> {
Expand Down
98 changes: 12 additions & 86 deletions src/app/state/tabs/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,93 +16,19 @@
// under the License.

use core::cell::RefCell;
use std::time::Duration;

use color_eyre::Result;
use datafusion::arrow::array::RecordBatch;
use datafusion::sql::sqlparser::keywords;
use ratatui::crossterm::event::KeyEvent;
use ratatui::style::palette::tailwind;
use ratatui::style::{Modifier, Style};
use ratatui::widgets::TableState;
use tokio::task::JoinHandle;
use tui_textarea::TextArea;

use crate::app::ExecutionError;
use crate::config::AppConfig;
use crate::execution::ExecutionStats;

#[derive(Clone, Debug)]
pub struct Query {
sql: String,
results: Option<Vec<RecordBatch>>,
num_rows: Option<usize>,
error: Option<String>,
execution_time: Duration,
execution_stats: Option<ExecutionStats>,
}

impl Query {
pub fn new(
sql: String,
results: Option<Vec<RecordBatch>>,
num_rows: Option<usize>,
error: Option<String>,
execution_time: Duration,
execution_stats: Option<ExecutionStats>,
) -> Self {
Self {
sql,
results,
num_rows,
error,
execution_time,
execution_stats,
}
}

pub fn sql(&self) -> &String {
&self.sql
}

pub fn execution_time(&self) -> &Duration {
&self.execution_time
}

pub fn set_results(&mut self, results: Option<Vec<RecordBatch>>) {
self.results = results;
}

pub fn results(&self) -> &Option<Vec<RecordBatch>> {
&self.results
}

pub fn set_num_rows(&mut self, num_rows: Option<usize>) {
self.num_rows = num_rows;
}

pub fn num_rows(&self) -> &Option<usize> {
&self.num_rows
}

pub fn set_error(&mut self, error: Option<String>) {
self.error = error;
}

pub fn error(&self) -> &Option<String> {
&self.error
}

pub fn set_execution_time(&mut self, elapsed_time: Duration) {
self.execution_time = elapsed_time;
}

pub fn execution_stats(&self) -> &Option<ExecutionStats> {
&self.execution_stats
}

pub fn set_execution_stats(&mut self, stats: Option<ExecutionStats>) {
self.execution_stats = stats;
}
}

pub fn get_keywords() -> Vec<String> {
keywords::ALL_KEYWORDS
Expand All @@ -129,11 +55,11 @@ pub fn keyword_style() -> Style {
pub struct SQLTabState<'app> {
editor: TextArea<'app>,
editor_editable: bool,
query: Option<Query>,
query_results_state: Option<RefCell<TableState>>,
result_batches: Option<Vec<RecordBatch>>,
results_page: Option<usize>,
execution_error: Option<ExecutionError>,
execution_task: Option<JoinHandle<Result<()>>>,
}

impl<'app> SQLTabState<'app> {
Expand All @@ -149,11 +75,11 @@ impl<'app> SQLTabState<'app> {
Self {
editor: textarea,
editor_editable: false,
query: None,
query_results_state: None,
result_batches: None,
results_page: None,
execution_error: None,
execution_task: None,
}
}

Expand Down Expand Up @@ -215,14 +141,6 @@ impl<'app> SQLTabState<'app> {
self.editor_editable
}

pub fn set_query(&mut self, query: Query) {
self.query = Some(query);
}

pub fn query(&self) -> &Option<Query> {
&self.query
}

// TODO: Create Editor struct and move this there
pub fn next_word(&mut self) {
self.editor
Expand Down Expand Up @@ -288,4 +206,12 @@ impl<'app> SQLTabState<'app> {
}
}
}

pub fn execution_task(&mut self) -> &mut Option<JoinHandle<Result<()>>> {
&mut self.execution_task
}

pub fn set_execution_task(&mut self, task: JoinHandle<Result<()>>) {
self.execution_task = Some(task);
}
}
Loading

0 comments on commit 16897dc

Please sign in to comment.