Skip to content

Commit

Permalink
Extract execution context to its own module
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Sep 12, 2024
1 parent d0c08bf commit 5513d8d
Show file tree
Hide file tree
Showing 10 changed files with 129 additions and 95 deletions.
2 changes: 1 addition & 1 deletion src/app/handlers/flightsql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ pub fn normal_mode_handler(app: &mut App, key: KeyEvent) {
let execution = Arc::clone(&app.execution);
let _event_tx = app.app_event_tx.clone();
tokio::spawn(async move {
let client = &execution.flightsql_client;
let client = execution.flightsql_client();
let mut query =
FlightSQLQuery::new(sql.clone(), None, None, None, Duration::default(), None);
let start = Instant::now();
Expand Down
4 changes: 2 additions & 2 deletions src/app/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ pub fn app_event_handler(app: &mut App, event: AppEvent) -> Result<()> {
AppEvent::ExecuteDDL(ddl) => {
let queries: Vec<String> = ddl.split(';').map(|s| s.to_string()).collect();
queries.into_iter().for_each(|q| {
let ctx = app.execution.session_ctx.clone();
let ctx = app.execution.session_ctx().clone();
tokio::spawn(async move {
match ctx.sql(&q).await {
Ok(df) => {
Expand Down Expand Up @@ -208,7 +208,7 @@ pub fn app_event_handler(app: &mut App, event: AppEvent) -> Result<()> {
let url: &'static str = Box::leak(url.into_boxed_str());
let execution = Arc::clone(&app.execution);
tokio::spawn(async move {
let client = &execution.flightsql_client;
let client = execution.flightsql_client();
let maybe_channel = Channel::from_static(url).connect().await;
info!("Created channel");
match maybe_channel {
Expand Down
9 changes: 3 additions & 6 deletions src/app/handlers/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,9 @@ use log::{error, info};
use ratatui::crossterm::event::{KeyCode, KeyEvent, KeyModifiers};
use tokio_stream::StreamExt;

use crate::app::{
execution::collect_plan_stats, handlers::tab_navigation_handler, state::tabs::sql::Query,
AppEvent,
};

use super::App;
use crate::app::{handlers::tab_navigation_handler, state::tabs::sql::Query, AppEvent};
use crate::execution::collect_plan_stats;

pub fn normal_mode_handler(app: &mut App, key: KeyEvent) {
match key.code {
Expand Down Expand Up @@ -89,7 +86,7 @@ pub fn editable_handler(app: &mut App, key: KeyEvent) {
(KeyCode::Esc, _) => app.state.sql_tab.exit_edit(),
(KeyCode::Enter, KeyModifiers::CONTROL) => {
let query = app.state.sql_tab.editor().lines().join("");
let ctx = app.execution.session_ctx.clone();
let ctx = app.execution.session_ctx().clone();
let _event_tx = app.app_event_tx.clone();
// TODO: Maybe this should be on a separate runtime to prevent blocking main thread /
// runtime
Expand Down
145 changes: 79 additions & 66 deletions src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
// under the License.

pub mod config;
pub mod execution;
pub mod handlers;
pub mod state;

Expand Down Expand Up @@ -45,9 +44,9 @@ use tokio::task::JoinHandle;
use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;

use self::execution::ExecutionContext;
use self::handlers::{app_event_handler, crossterm_event_handler};
use self::state::tabs::sql::Query;
use crate::execution::ExecutionContext;

#[cfg(feature = "flightsql")]
use self::state::tabs::flightsql::FlightSQLQuery;
Expand Down Expand Up @@ -328,84 +327,98 @@ pub async fn run_app(cli: cli::DftCli, state: state::AppState<'_>) -> Result<()>
app.exit()
}

pub async fn execute_files_or_commands(
files: Vec<PathBuf>,
commands: Vec<String>,
state: &state::AppState<'_>,
) -> Result<()> {
match (files.is_empty(), commands.is_empty()) {
(true, true) => Err(eyre!("No files or commands provided to execute")),
(false, true) => execute_files(files, state).await,
(true, false) => execute_commands(commands, state).await,
(false, false) => Err(eyre!(
"Cannot execute both files and commands at the same time"
)),
}
/// Encapsulates the command line interface
pub struct CliApp {
/// Execution context for running queries
execution: ExecutionContext,
}
async fn execute_files(files: Vec<PathBuf>, state: &state::AppState<'_>) -> Result<()> {
info!("Executing files: {:?}", files);
let execution = ExecutionContext::new(state.config.execution.clone());

for file in files {
exec_from_file(&execution, &file).await?
impl CliApp {
pub fn new(state: state::AppState<'static>) -> Self {
let execution = ExecutionContext::new(state.config.execution.clone());

Self { execution }
}

Ok(())
}
async fn execute_commands(commands: Vec<String>, state: &state::AppState<'_>) -> Result<()> {
info!("Executing commands: {:?}", commands);
for command in commands {
exec_from_string(&command, state).await?
pub async fn execute_files_or_commands(
&self,
files: Vec<PathBuf>,
commands: Vec<String>,
) -> Result<()> {
match (files.is_empty(), commands.is_empty()) {
(true, true) => Err(eyre!("No files or commands provided to execute")),
(false, true) => self.execute_files(files).await,
(true, false) => self.execute_commands(commands).await,
(false, false) => Err(eyre!(
"Cannot execute both files and commands at the same time"
)),
}
}

Ok(())
}
async fn execute_files(&self, files: Vec<PathBuf>) -> Result<()> {
info!("Executing files: {:?}", files);
for file in files {
self.exec_from_file(&file).await?
}

async fn exec_from_string(sql: &str, state: &state::AppState<'_>) -> Result<()> {
let dialect = GenericDialect {};
let execution = ExecutionContext::new(state.config.execution.clone());
let statements = DFParser::parse_sql_with_dialect(sql, &dialect)?;
for statement in statements {
execution.execute_and_print_statement(statement).await?;
Ok(())
}
Ok(())
}

/// run and execute SQL statements and commands from a file, against a context
/// with the given print options
pub async fn exec_from_file(ctx: &ExecutionContext, file: &Path) -> Result<()> {
let file = File::open(file)?;
let reader = BufReader::new(file);
async fn execute_commands(&self, commands: Vec<String>) -> Result<()> {
info!("Executing commands: {:?}", commands);
for command in commands {
self.exec_from_string(&command).await?
}

let mut query = String::new();
Ok(())
}

for line in reader.lines() {
let line = line?;
if line.starts_with("#!") {
continue;
async fn exec_from_string(&self, sql: &str) -> Result<()> {
let dialect = GenericDialect {};
let statements = DFParser::parse_sql_with_dialect(sql, &dialect)?;
for statement in statements {
self.execution
.execute_and_print_statement(statement)
.await?;
}
if line.starts_with("--") {
continue;
Ok(())
}

/// run and execute SQL statements and commands from a file, against a context
/// with the given print options
pub async fn exec_from_file(&self, file: &Path) -> Result<()> {
let file = File::open(file)?;
let reader = BufReader::new(file);

let mut query = String::new();

for line in reader.lines() {
let line = line?;
if line.starts_with("#!") {
continue;
}
if line.starts_with("--") {
continue;
}

let line = line.trim_end();
query.push_str(line);
// if we found the end of a query, run it
if line.ends_with(';') {
// TODO: if the query errors, should we keep trying to execute
// the other queries in the file? That is what datafusion-cli does...
self.execution.execute_and_print_stream_sql(&query).await?;
query.clear();
} else {
query.push('\n');
}
}

let line = line.trim_end();
query.push_str(line);
// if we found the end of a query, run it
if line.ends_with(';') {
// TODO: if the query errors, should we keep trying to execute
// the other queries in the file? That is what datafusion-cli does...
ctx.execute_and_print_stream_sql(&query).await?;
query.clear();
} else {
query.push('\n');
// run the last line(s) in file if the last statement doesn't contain ‘;’
// ignore if it only consists of '\n'
if query.contains(|c| c != '\n') {
self.execution.execute_and_print_stream_sql(&query).await?;
}
}

// run the last line(s) in file if the last statement doesn't contain ‘;’
// ignore if it only consists of '\n'
if query.contains(|c| c != '\n') {
ctx.execute_and_print_stream_sql(&query).await?;
Ok(())
}

Ok(())
}
2 changes: 1 addition & 1 deletion src/app/state/tabs/flightsql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use ratatui::style::Style;
use ratatui::widgets::TableState;
use tui_textarea::TextArea;

use crate::app::execution::ExecutionStats;
use crate::execution::ExecutionStats;

#[derive(Clone, Debug)]
pub struct FlightSQLQuery {
Expand Down
2 changes: 1 addition & 1 deletion src/app/state/tabs/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::time::Duration;

use ratatui::widgets::TableState;

use crate::app::execution::ExecutionStats;
use crate::execution::ExecutionStats;

#[derive(Debug)]
pub enum Context {
Expand Down
2 changes: 1 addition & 1 deletion src/app/state/tabs/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use ratatui::style::Style;
use ratatui::widgets::TableState;
use tui_textarea::TextArea;

use crate::app::execution::ExecutionStats;
use crate::execution::ExecutionStats;

#[derive(Clone, Debug)]
pub struct Query {
Expand Down
45 changes: 32 additions & 13 deletions src/app/execution.rs → src/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

//! [`ExecutionContext`]: DataFusion based execution context for running SQL queries
//!
use std::sync::Arc;
use std::time::Duration;

Expand All @@ -31,7 +33,6 @@ use deltalake::delta_datafusion::DeltaTableFactory;
use log::{error, info};
use tokio::sync::mpsc::UnboundedSender;
use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;
#[cfg(feature = "s3")]
use url::Url;
#[cfg(feature = "flightsql")]
Expand All @@ -40,21 +41,34 @@ use {
tonic::transport::Channel,
};

use super::config::ExecutionConfig;
use super::state::tabs::sql::Query;
use super::AppEvent;
use crate::app::config::ExecutionConfig;
use crate::app::state::tabs::sql::Query;
use crate::app::AppEvent;

/// Structure for executing queries either locally or remotely (via FlightSQL)
///
/// This context includes both:
///
/// 1. The configuration of a [`SessionContext`] with various extensions enabled
///
/// 2. The code for running SQL queries
///
/// The design goals for this module are to serve as an example of how to integrate
/// DataFusion into an application and to provide a simple interface for running SQL queries
/// with the various extensions enabled.
///
/// Thus it is important (eventually) not depend on the code in the app crate
pub struct ExecutionContext {
pub session_ctx: SessionContext,
pub config: ExecutionConfig,
pub cancellation_token: CancellationToken,
session_ctx: SessionContext,
#[cfg(feature = "flightsql")]
pub flightsql_client: Mutex<Option<FlightSqlServiceClient<Channel>>>,
flightsql_client: Mutex<Option<FlightSqlServiceClient<Channel>>>,
}

impl ExecutionContext {
#[allow(unused_mut)]
/// Construct a new `ExecutionContext` with the specified configuration
pub fn new(config: ExecutionConfig) -> Self {
let _ = &config; // avoid unused variable warning (it is used when some features are enabled)

let cfg = SessionConfig::default()
.with_batch_size(1)
.with_information_schema(true);
Expand Down Expand Up @@ -91,6 +105,7 @@ impl ExecutionContext {
}
}

#[allow(unused_mut)] // used when deltalake is enabled
let mut state = SessionStateBuilder::new()
.with_default_features()
.with_runtime_env(runtime_env.into())
Expand All @@ -106,12 +121,9 @@ impl ExecutionContext {

{
let session_ctx = SessionContext::new_with_state(state);
let cancellation_token = CancellationToken::new();

Self {
config,
session_ctx,
cancellation_token,
#[cfg(feature = "flightsql")]
flightsql_client: Mutex::new(None),
}
Expand All @@ -122,10 +134,17 @@ impl ExecutionContext {
Ok(())
}

/// Return the inner DataFusion [`SessionContext`]
pub fn session_ctx(&self) -> &SessionContext {
&self.session_ctx
}

/// Return a handle to the underlying FlightSQL client, if any
#[cfg(feature = "flightsql")]
pub fn flightsql_client(&self) -> &Mutex<Option<FlightSqlServiceClient<Channel>>> {
&self.flightsql_client
}

pub async fn run_sqls(&self, sqls: Vec<&str>, sender: UnboundedSender<AppEvent>) -> Result<()> {
// We need to filter out empty strings to correctly determine the last query for displaying
// results.
Expand Down Expand Up @@ -196,7 +215,7 @@ impl ExecutionContext {
Ok(())
}

/// Execcutes the specified parsed DataFusion statement and discards the result
/// Executes the specified parsed DataFusion statement and discards the result
pub async fn execute_sql(&self, sql: &str, print: bool) -> Result<()> {
let df = self.session_ctx.sql(sql).await?;
self.execute_stream_dataframe(df, print).await
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod app;
pub mod cli;
pub mod execution;
pub mod telemetry;
pub mod ui;
Loading

0 comments on commit 5513d8d

Please sign in to comment.