Skip to content

Commit

Permalink
update ndc-spec to 0.1.0-rc14 (#283)
Browse files Browse the repository at this point in the history
- depends on: hasura/ndc-hub#93
- todo: add changelog entry

### What

We are updating connector to support the latest ndc-connector spec
(0.1.0-rc14).
While fixing some breaking changes, such as renaming `where` to
`predicate`, we also support new capabilities:

1. Explaining mutations via the `/mutation/explain` endpoint
2. Supporting filtering using `in` by columns and variables

### How

1. Since a mutation request may contain multiple mutation operations,
the explain response will contain a field named `<operation_name> SQL
Mutation` and a field named `<operation_name> Execution Plan` for each
operation.
2. To filter in column or variable, we need to compare against a
subquery of the shape `(select value from unnest(<column-or-variable>)
as table(value))`.
  • Loading branch information
Gil Mizrahi authored Feb 14, 2024
1 parent 121ec33 commit 553ad47
Show file tree
Hide file tree
Showing 115 changed files with 1,878 additions and 528 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/configuration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ edition = "2021"
[dependencies]
query-engine-metadata = { path = "../query-engine/metadata" }

ndc-sdk = { git = "https://github.com/hasura/ndc-hub.git", rev = "3b6c480" }
ndc-sdk = { git = "https://github.com/hasura/ndc-hub.git", rev = "02d26c1" }

schemars = { version = "0.8.16", features = ["smol_str", "preserve_order"] }
serde = "1.0.196"
Expand Down
6 changes: 3 additions & 3 deletions crates/connectors/ndc-postgres/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ query-engine-metadata = { path = "../../query-engine/metadata" }
query-engine-sql = { path = "../../query-engine/sql" }
query-engine-translation = { path = "../../query-engine/translation" }

ndc-sdk = { git = "https://github.com/hasura/ndc-hub.git", rev = "3b6c480" }
ndc-sdk = { git = "https://github.com/hasura/ndc-hub.git", rev = "02d26c1" }

async-trait = "0.1.77"
percent-encoding = "2.3.1"
Expand All @@ -34,8 +34,8 @@ tracing = "0.1.40"
url = "2.5.0"

[dev-dependencies]
ndc-client = { git = "https://github.com/hasura/ndc-spec.git", tag = "v0.1.0-rc.13" }
ndc-test = { git = "https://github.com/hasura/ndc-spec.git", tag = "v0.1.0-rc.13" }
ndc-client = { git = "https://github.com/hasura/ndc-spec.git", tag = "v0.1.0-rc.14" }
ndc-test = { git = "https://github.com/hasura/ndc-spec.git", tag = "v0.1.0-rc.14" }
tests-common = { path = "../../tests/tests-common" }

axum = "0.6.20"
Expand Down
8 changes: 6 additions & 2 deletions crates/connectors/ndc-postgres/src/capabilities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@ use ndc_sdk::models;
/// from the NDC specification.
pub fn get_capabilities() -> models::CapabilitiesResponse {
models::CapabilitiesResponse {
versions: "^0.1.0".into(),
version: "0.1.0".into(),
capabilities: models::Capabilities {
explain: Some(models::LeafCapability {}),
query: models::QueryCapabilities {
aggregates: Some(models::LeafCapability {}),
variables: Some(models::LeafCapability {}),
explain: Some(models::LeafCapability {}),
},
mutation: models::MutationCapabilities {
transactional: Some(models::LeafCapability {}),
explain: Some(models::LeafCapability {}),
},
relationships: Some(models::RelationshipCapabilities {
relation_comparisons: Some(models::LeafCapability {}),
Expand Down
35 changes: 30 additions & 5 deletions crates/connectors/ndc-postgres/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use ndc_sdk::models;
use ndc_postgres_configuration as configuration;

use super::capabilities;
use super::explain;
use super::health;
use super::mutation;
use super::query;
Expand Down Expand Up @@ -181,15 +180,41 @@ impl connector::Connector for Postgres {

/// Explain a query by creating an execution plan
///
/// This function implements the [explain endpoint](https://hasura.github.io/ndc-spec/specification/explain.html)
/// This function implements the [query/explain endpoint](https://hasura.github.io/ndc-spec/specification/explain.html)
/// from the NDC specification.
async fn explain(
async fn query_explain(
configuration: &Self::Configuration,
state: &Self::State,
query_request: models::QueryRequest,
request: models::QueryRequest,
) -> Result<JsonResponse<models::ExplainResponse>, connector::ExplainError> {
let runtime_configuration = configuration::as_runtime_configuration(configuration);
query::explain(runtime_configuration, state, request)
.await
.map_err(|err| {
tracing::error!(
meta.signal_type = "log",
event.domain = "ndc",
event.name = "Explain error",
name = "Explain error",
body = %err,
error = true,
);
err
})
.map(Into::into)
}

/// Explain a mutation by creating an execution plan
///
/// This function implements the [mutation/explain endpoint](https://hasura.github.io/ndc-spec/specification/explain.html)
/// from the NDC specification.
async fn mutation_explain(
configuration: &Self::Configuration,
state: &Self::State,
request: models::MutationRequest,
) -> Result<JsonResponse<models::ExplainResponse>, connector::ExplainError> {
let runtime_configuration = configuration::as_runtime_configuration(configuration);
explain::explain(runtime_configuration, state, query_request)
mutation::explain(runtime_configuration, state, request)
.await
.map_err(|err| {
tracing::error!(
Expand Down
1 change: 0 additions & 1 deletion crates/connectors/ndc-postgres/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
pub mod capabilities;
pub mod configuration_mapping;
pub mod connector;
pub mod explain;
pub mod health;
pub mod mutation;
pub mod query;
Expand Down
10 changes: 7 additions & 3 deletions crates/connectors/ndc-postgres/src/mutation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
//! [Native Data Connector Specification](https://hasura.github.io/ndc-spec/specification/mutations/index.html)
//! for further details.
mod explain;
pub use explain::explain;

use tracing::{info_span, Instrument};

use ndc_sdk::connector;
Expand All @@ -26,7 +29,7 @@ pub async fn mutation(
state: &state::State,
request: models::MutationRequest,
) -> Result<JsonResponse<models::MutationResponse>, connector::MutationError> {
let timer = state.metrics.time_query_total();
let timer = state.metrics.time_mutation_total();

// See https://docs.rs/tracing/0.1.29/tracing/span/struct.Span.html#in-asynchronous-code
let result = async move {
Expand All @@ -52,15 +55,16 @@ pub async fn mutation(
timer.complete_with(result)
}

fn plan_mutation(
/// Create a mutation execution plan from a request.
pub fn plan_mutation(
configuration: configuration::RuntimeConfiguration<'_>,
state: &state::State,
request: models::MutationRequest,
) -> Result<
sql::execution_plan::ExecutionPlan<sql::execution_plan::Mutations>,
connector::MutationError,
> {
let timer = state.metrics.time_query_plan();
let timer = state.metrics.time_mutation_plan();
let mutations = request
.operations
.into_iter()
Expand Down
130 changes: 130 additions & 0 deletions crates/connectors/ndc-postgres/src/mutation/explain.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
//! Implement the `/mutation/explain` endpoint to return a mutation execution plan.
//! See the Hasura
//! [Native Data Connector Specification](https://hasura.github.io/ndc-spec/specification/explain.html)
//! for further details.
use std::collections::BTreeMap;

use tracing::{info_span, Instrument};

use ndc_postgres_configuration as configuration;

use crate::mutation;
use ndc_sdk::connector;
use ndc_sdk::models;

use crate::state;

/// Explain a mutation by creating an execution plan.
///
/// This function implements the [mutation/explain endpoint](https://hasura.github.io/ndc-spec/specification/explain.html)
/// from the NDC specification.
pub async fn explain<'a>(
configuration: configuration::RuntimeConfiguration<'_>,
state: &state::State,
mutation_request: models::MutationRequest,
) -> Result<models::ExplainResponse, connector::ExplainError> {
async move {
tracing::info!(
mutation_request_json = serde_json::to_string(&mutation_request).unwrap(),
mutation_request = ?mutation_request
);

// Compile the mutation.
let plan = async { mutation::plan_mutation(configuration, state, mutation_request) }
.instrument(info_span!("Plan mutation"))
.await
.map_err(|err| convert_mutation_error(&err))?;

// Execute an explain query.
let results = query_engine_execution::mutation::explain(
&state.pool,
&state.database_info,
&state.metrics,
plan,
)
.instrument(info_span!("Explain mutation"))
.await
.map_err(|err| {
tracing::error!("{}", err);
log_err_metrics_and_convert_error(state, &err)
})?;

state.metrics.record_successful_explain();

let details: BTreeMap<String, String> = results
.into_iter()
.flat_map(|(name, sql, plan)| {
vec![
(format!("{name} SQL Mutation"), sql),
(format!("{name} Execution Plan"), plan),
]
})
.collect();

let response = models::ExplainResponse { details };

Ok(response)
}
.instrument(info_span!("/explain"))
.await
}

fn log_err_metrics_and_convert_error(
state: &state::State,
err: &query_engine_execution::mutation::Error,
) -> connector::ExplainError {
match err {
query_engine_execution::mutation::Error::Query(err) => {
tracing::error!("{}", err);
// log error metric
match &err {
query_engine_execution::mutation::QueryError::NotSupported(_) => {
state.metrics.error_metrics.record_unsupported_feature();
connector::ExplainError::UnsupportedOperation(err.to_string())
}
query_engine_execution::mutation::QueryError::DBError(_) => {
state.metrics.error_metrics.record_invalid_request();
connector::ExplainError::UnprocessableContent(err.to_string())
}
query_engine_execution::mutation::QueryError::DBConstraintError(_) => {
state.metrics.error_metrics.record_invalid_request();
connector::ExplainError::UnprocessableContent(err.to_string())
}
}
}
query_engine_execution::mutation::Error::DB(err) => {
tracing::error!("{}", err);
state.metrics.error_metrics.record_database_error();
connector::ExplainError::Other(err.to_string().into())
}
query_engine_execution::mutation::Error::Multiple(err1, err2) => {
log_err_metrics_and_convert_error(state, err1);
log_err_metrics_and_convert_error(state, err2);
connector::ExplainError::Other(err.to_string().into())
}
}
}

fn convert_mutation_error(err: &connector::MutationError) -> connector::ExplainError {
match err {
connector::MutationError::InvalidRequest(_) => {
connector::ExplainError::InvalidRequest(err.to_string())
}
connector::MutationError::UnprocessableContent(_) => {
connector::ExplainError::UnprocessableContent(err.to_string())
}
connector::MutationError::UnsupportedOperation(_) => {
connector::ExplainError::UnsupportedOperation(err.to_string())
}
connector::MutationError::Conflict(_) => {
connector::ExplainError::Other(err.to_string().into())
}
connector::MutationError::ConstraintNotMet(_) => {
connector::ExplainError::Other(err.to_string().into())
}
connector::MutationError::Other(_) => {
connector::ExplainError::Other(err.to_string().into())
}
}
}
3 changes: 3 additions & 0 deletions crates/connectors/ndc-postgres/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
//! [Native Data Connector Specification](https://hasura.github.io/ndc-spec/specification/queries/index.html)
//! for further details.
mod explain;
pub use explain::explain;

use tracing::{info_span, Instrument};

use ndc_sdk::connector;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Implement the `/explain` endpoint to return a query execution plan.
//! Implement the `/query/explain` endpoint to return a query execution plan.
//! See the Hasura
//! [Native Data Connector Specification](https://hasura.github.io/ndc-spec/specification/explain.html)
//! for further details.
Expand All @@ -19,7 +19,7 @@ use super::state;

/// Explain a query by creating an execution plan
///
/// This function implements the [explain endpoint](https://hasura.github.io/ndc-spec/specification/explain.html)
/// This function implements the [query/explain endpoint](https://hasura.github.io/ndc-spec/specification/explain.html)
/// from the NDC specification.
pub async fn explain(
configuration: configuration::RuntimeConfiguration<'_>,
Expand Down
2 changes: 1 addition & 1 deletion crates/connectors/ndc-postgres/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub async fn get_schema(
.map(|(op_name, op_def)| {
(
op_name.clone(),
models::ComparisonOperatorDefinition {
models::ComparisonOperatorDefinition::Custom {
argument_type: models::Type::Named {
name: op_def.argument_type.0.clone(),
},
Expand Down
24 changes: 24 additions & 0 deletions crates/query-engine/execution/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ pub struct Metrics {
query_total_time: Histogram,
query_plan_time: Histogram,
query_execution_time: Histogram,
mutation_total_time: Histogram,
mutation_plan_time: Histogram,
mutation_execution_time: Histogram,
connection_acquisition_wait_time: Histogram,
pool_size: IntGauge,
Expand Down Expand Up @@ -65,6 +67,18 @@ impl Metrics {
"Time taken to execute an already-planned query, in seconds.",
)?;

let mutation_total_time = add_histogram_metric(
metrics_registry,
"ndc_postgres_mutation_total_time",
"Total time taken to plan and execute a mutation, in seconds",
)?;

let mutation_plan_time = add_histogram_metric(
metrics_registry,
"ndc_postgres_mutation_plan_time",
"Time taken to plan a mutation for execution, in seconds.",
)?;

let mutation_execution_time = add_histogram_metric(
metrics_registry,
"ndc_postgres_mutation_execution_time",
Expand Down Expand Up @@ -134,6 +148,8 @@ impl Metrics {
query_total_time,
query_plan_time,
query_execution_time,
mutation_total_time,
mutation_plan_time,
mutation_execution_time,
connection_acquisition_wait_time,
pool_size,
Expand Down Expand Up @@ -172,6 +188,14 @@ impl Metrics {
Timer(self.query_execution_time.start_timer())
}

pub fn time_mutation_total(&self) -> Timer {
Timer(self.mutation_total_time.start_timer())
}

pub fn time_mutation_plan(&self) -> Timer {
Timer(self.mutation_plan_time.start_timer())
}

pub fn time_mutation_execution(&self) -> Timer {
Timer(self.mutation_execution_time.start_timer())
}
Expand Down
Loading

0 comments on commit 553ad47

Please sign in to comment.