Skip to content

Commit

Permalink
Implement support for running commands via -c, (#124)
Browse files Browse the repository at this point in the history
This allows `dft` to run single commands from the command line like

```shell
dft -c  'select 1 + 2'
```
  • Loading branch information
alamb authored Sep 9, 2024
1 parent f27bf6d commit abfc169
Show file tree
Hide file tree
Showing 11 changed files with 388 additions and 422 deletions.
219 changes: 128 additions & 91 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ datafusion = "41.0.0"
datafusion-common = "41.0.0"
deltalake = { version = "0.19.0", features = ["datafusion"], optional = true }
directories = "5.0.1"
env_logger = "0.11.5"
futures = "0.3.30"
itertools = "0.13.0"
lazy_static = "1.4.0"
Expand Down
14 changes: 12 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,24 @@ Some of the current and planned features are:

## `dft` CLI

The `dft` CLI is a scriptable engine for executing queries from files. It is used in a similar manner to `datafusion-cli` but with the added benefit of being able to query from multiple data sources.
The `dft` CLI is a scriptable interface to the `tui` engine for executing
queries from files or the command line. The CLI is used in a similar manner to
`datafusion-cli` but with the added benefit of supporting multiple pre-integrated
data sources.

For example you can run the contents of `query.sql` with
### Example: Run the contents of `query.sql`

```shell
$ dft -f query.sql
```

### Example: Run a query from the command line

```shell
$ dft -c "SELECT 1+2"
```


## User Guide

### Installation
Expand Down
20 changes: 18 additions & 2 deletions src/app/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use datafusion::execution::session_state::SessionStateBuilder;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::{execute_stream, visit_execution_plan, ExecutionPlanVisitor};
use datafusion::prelude::*;
use datafusion::sql::parser::Statement;
use datafusion::{arrow::util::pretty::pretty_format_batches, physical_plan::ExecutionPlan};
#[cfg(feature = "deltalake")]
use deltalake::delta_datafusion::DeltaTableFactory;
Expand Down Expand Up @@ -121,10 +122,25 @@ impl ExecutionContext {
&self.session_ctx
}

/// Executes the specified parsed DataFusion statement and prints the result to stdout
pub async fn execute_statement(&self, statement: Statement) -> Result<()> {
let plan = self
.session_ctx
.state()
.statement_to_plan(statement)
.await?;
let df = self.session_ctx.execute_logical_plan(plan).await?;
self.execute_stream_dataframe(df).await
}

/// Executes the specified query and prints the result to stdout
pub async fn execute_stream_sql(&self, query: &str) -> Result<()> {
let df = self.session_ctx.sql(query).await.unwrap();
let physical_plan = df.create_physical_plan().await.unwrap();
let df = self.session_ctx.sql(query).await?;
self.execute_stream_dataframe(df).await
}

pub async fn execute_stream_dataframe(&self, df: DataFrame) -> Result<()> {
let physical_plan = df.create_physical_plan().await?;
// We use small batch size because web socket stream comes in small increments (each
// message usually only has at most a few records).
let stream_cfg = SessionConfig::default();
Expand Down
36 changes: 35 additions & 1 deletion src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ use crate::{cli, ui};
use color_eyre::eyre::eyre;
use color_eyre::Result;
use crossterm::event as ct;
use datafusion::sql::parser::DFParser;
use datafusion::sql::sqlparser::dialect::GenericDialect;
use futures::FutureExt;
use log::{debug, error, info, trace};
use ratatui::backend::CrosstermBackend;
Expand Down Expand Up @@ -325,7 +327,21 @@ pub async fn run_app(cli: cli::DftCli, state: state::AppState<'_>) -> Result<()>
app.exit()
}

pub async fn execute_files(files: Vec<PathBuf>, state: &state::AppState<'_>) -> Result<()> {
pub async fn execute_files_or_commands(
files: Vec<PathBuf>,
commands: Vec<String>,
state: &state::AppState<'_>,
) -> Result<()> {
match (files.is_empty(), commands.is_empty()) {
(true, true) => Err(eyre!("No files or commands provided to execute")),
(false, true) => execute_files(files, state).await,
(true, false) => execute_commands(commands, state).await,
(false, false) => Err(eyre!(
"Cannot execute both files and commands at the same time"
)),
}
}
async fn execute_files(files: Vec<PathBuf>, state: &state::AppState<'_>) -> Result<()> {
info!("Executing files: {:?}", files);
let execution = ExecutionContext::new(state.config.execution.clone());

Expand All @@ -335,6 +351,24 @@ pub async fn execute_files(files: Vec<PathBuf>, state: &state::AppState<'_>) ->

Ok(())
}
async fn execute_commands(commands: Vec<String>, state: &state::AppState<'_>) -> Result<()> {
info!("Executing commands: {:?}", commands);
for command in commands {
exec_from_string(&command, state).await?
}

Ok(())
}

async fn exec_from_string(sql: &str, state: &state::AppState<'_>) -> Result<()> {
let dialect = GenericDialect {};
let execution = ExecutionContext::new(state.config.execution.clone());
let statements = DFParser::parse_sql_with_dialect(sql, &dialect)?;
for statement in statements {
execution.execute_statement(statement).await?;
}
Ok(())
}

/// run and execute SQL statements and commands from a file, against a context
/// with the given print options
Expand Down
21 changes: 19 additions & 2 deletions src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ Environment Variables
RUST_LOG { trace | debug | info | error }: Standard rust logging level. Default is info.
";

#[derive(Clone, Debug, Parser)]
#[derive(Clone, Debug, Parser, Default)]
#[command(author, version, about, long_about = LONG_ABOUT)]
pub struct DftCli {
#[clap(
Expand All @@ -46,7 +46,16 @@ pub struct DftCli {
)]
pub files: Vec<PathBuf>,

#[clap(short, long, help = "Path to the configuration file")]
#[clap(
short = 'c',
long,
num_args = 0..,
help = "Execute the given SQL string(s), then exit.",
value_parser(parse_command)
)]
pub commands: Vec<String>,

#[clap(long, help = "Path to the configuration file")]
pub config: Option<String>,
}

Expand Down Expand Up @@ -76,3 +85,11 @@ fn parse_valid_file(file: &str) -> Result<PathBuf, String> {
Ok(path)
}
}

fn parse_command(command: &str) -> Result<String, String> {
if !command.is_empty() {
Ok(command.to_string())
} else {
Err("-c flag expects only non empty commands".to_string())
}
}
15 changes: 9 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,24 @@

use clap::Parser;
use color_eyre::Result;
use dft::app::state;
use dft::app::{execute_files, run_app};
use dft::app::{execute_files_or_commands, run_app, state};
use dft::cli;
use dft::telemetry;

#[tokio::main]
async fn main() -> Result<()> {
telemetry::initialize_logs()?;
let cli = cli::DftCli::parse();
let state = state::initialize(cli.clone());

// If executing commands from files, do so and then exit
if !cli.files.is_empty() {
execute_files(cli.files.clone(), &state).await?;
if !cli.files.is_empty() || !cli.commands.is_empty() {
// use env_logger to setup logging for CLI
env_logger::init();
let state = state::initialize(cli.clone());
execute_files_or_commands(cli.files.clone(), cli.commands.clone(), &state).await?;
} else {
// use alternate logging for TUI
telemetry::initialize_logs()?;
let state = state::initialize(cli.clone());
run_app(cli.clone(), state).await?;
}

Expand Down
Loading

0 comments on commit abfc169

Please sign in to comment.