Skip to content

Commit

Permalink
Merge pull request #74 from hasura/djh/NDAT-1031/add-metrics
Browse files Browse the repository at this point in the history
Add query metrics and spans
  • Loading branch information
danieljharvey authored Nov 16, 2023
2 parents 26b01db + c12eef1 commit fea9824
Show file tree
Hide file tree
Showing 16 changed files with 1,024 additions and 211 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/ndc-sqlserver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ path = "bin/main.rs"
ndc-sdk = { git = "https://github.com/hasura/ndc-hub.git", rev = "f2a2a75", package = "ndc-sdk" }

query-engine-execution = { path = "../query-engine/execution" }
query-engine-translation = { path = "../query-engine/translation" }
query-engine-metadata = {path = "../query-engine/metadata"}
query-engine-sql = { path = "../query-engine/sql" }
query-engine-translation = { path = "../query-engine/translation" }

tiberius = { version = "0.12.2", default-features = false, features = ["rustls"] }
bb8 = "0.8.1"
Expand Down
19 changes: 8 additions & 11 deletions crates/ndc-sqlserver/src/configuration/version1.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
//! Configuration and state for our connector.
use crate::metrics;
use crate::configuration::introspection;
use ndc_sdk::connector;
use query_engine_execution::metrics;
use query_engine_metadata::metadata;
use query_engine_metadata::metadata::{database, Nullable};
use schemars::JsonSchema;
Expand Down Expand Up @@ -47,7 +47,7 @@ pub struct Configuration {
#[derive(Debug, Clone)]
pub struct State {
pub mssql_pool: bb8::Pool<bb8_tiberius::ConnectionManager>,
pub metrics: metrics::Metrics,
pub metrics: query_engine_execution::metrics::Metrics,
}

/// Validate the user configuration.
Expand All @@ -72,15 +72,12 @@ pub async fn validate_raw_configuration(
pub async fn create_state(
configuration: &Configuration,
metrics_registry: &mut prometheus::Registry,
) -> Result<State, connector::InitializationError> {
) -> Result<State, InitializationError> {
let mssql_pool = create_mssql_pool(&configuration.config)
.await
.map_err(|e| {
connector::InitializationError::Other(
InitializationError::UnableToCreateMSSQLPool(e).into(),
)
})?;
let metrics = metrics::initialise_metrics(metrics_registry).await?;
.map_err(InitializationError::UnableToCreateMSSQLPool)?;
let metrics = query_engine_execution::metrics::Metrics::initialize(metrics_registry)
.map_err(InitializationError::MetricsError)?;
Ok(State {
mssql_pool,
metrics,
Expand Down Expand Up @@ -361,8 +358,8 @@ fn get_column_info(
pub enum InitializationError {
#[error("unable to initialize mssql connection pool: {0}")]
UnableToCreateMSSQLPool(bb8_tiberius::Error),
#[error("error initializing Prometheus metrics: {0}")]
PrometheusError(prometheus::Error),
#[error("error initializing metrics: {0}")]
MetricsError(metrics::Error),
}

/// Collect all the types that can occur in the metadata. This is a bit circumstantial. A better
Expand Down
41 changes: 5 additions & 36 deletions crates/ndc-sqlserver/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@ use async_trait::async_trait;
use ndc_sdk::connector;
use ndc_sdk::json_response::JsonResponse;
use ndc_sdk::models;
use query_engine_execution::execution;
use query_engine_translation::translation;

use super::configuration;
use super::query;
use super::schema;

#[derive(Clone, Default)]
Expand Down Expand Up @@ -59,7 +58,9 @@ impl connector::Connector for SQLServer {
configuration: &Self::Configuration,
metrics: &mut prometheus::Registry,
) -> Result<Self::State, connector::InitializationError> {
configuration::create_state(configuration, metrics).await
configuration::create_state(configuration, metrics)
.await
.map_err(|err| connector::InitializationError::Other(err.into()))
}

/// Update any metrics from the state
Expand Down Expand Up @@ -155,39 +156,7 @@ impl connector::Connector for SQLServer {
state: &Self::State,
query_request: models::QueryRequest,
) -> Result<JsonResponse<models::QueryResponse>, connector::QueryError> {
tracing::info!("{}", serde_json::to_string(&query_request).unwrap());
tracing::info!("{:?}", query_request);

// Compile the query.
let plan =
match translation::query::translate(&configuration.config.metadata, query_request) {
Ok(plan) => Ok(plan),
Err(err) => {
tracing::error!("{}", err);
match err {
translation::query::error::Error::NotSupported(_) => {
Err(connector::QueryError::UnsupportedOperation(err.to_string()))
}
_ => Err(connector::QueryError::InvalidRequest(err.to_string())),
}
}
}?;

// Execute the query.
let result = execution::mssql_execute(&state.mssql_pool, plan)
.await
.map_err(|err| match err {
execution::Error::Query(err) => {
tracing::error!("{}", err);
connector::QueryError::Other(err.into())
}
})?;

// assuming query succeeded, increment counter
state.metrics.query_total.inc();

// TODO: return raw JSON
Ok(JsonResponse::Value(result))
query::query(configuration, state, query_request).await
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/ndc-sqlserver/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pub mod configuration;
pub mod connector;
pub mod metrics;
pub mod query;
pub mod schema;
58 changes: 0 additions & 58 deletions crates/ndc-sqlserver/src/metrics.rs

This file was deleted.

87 changes: 87 additions & 0 deletions crates/ndc-sqlserver/src/query.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
//! Implement the `/query` endpoint to run a query against SQLServer.
//! See the Hasura
//! [Native Data Connector Specification](https://hasura.github.io/ndc-spec/specification/queries/index.html)
//! for further details.
use super::configuration;
use ndc_sdk::connector;
use ndc_sdk::json_response::JsonResponse;
use ndc_sdk::models;
use query_engine_execution::execution;
use query_engine_sql::sql;
use query_engine_translation::translation;
use tracing::{info_span, Instrument};

/// Execute a query
///
/// This function implements the [query endpoint](https://hasura.github.io/ndc-spec/specification/queries/index.html)
/// from the NDC specification.
pub async fn query(
configuration: &configuration::Configuration,
state: &configuration::State,
query_request: models::QueryRequest,
) -> Result<JsonResponse<models::QueryResponse>, connector::QueryError> {
tracing::info!("{}", serde_json::to_string(&query_request).unwrap());
tracing::info!("{:?}", query_request);

let timer = state.metrics.time_query_total();
let result = async move {
// Plan the query
let plan = async { plan_query(configuration, state, query_request) }
.instrument(info_span!("Plan query"))
.await?;

// Execute the query.
let result = execute_query(state, plan)
.instrument(info_span!("Execute query"))
.await?;

// assuming query succeeded, increment counter
state.metrics.record_successful_query();

// TODO: return raw JSON
Ok(result)
}
.instrument(info_span!("Execute query"))
.await;

timer.complete_with(result)
}

// Compile the query.
fn plan_query(
configuration: &configuration::Configuration,
state: &configuration::State,
query_request: models::QueryRequest,
) -> Result<sql::execution_plan::ExecutionPlan, connector::QueryError> {
let timer = state.metrics.time_query_plan();
let result = translation::query::translate(&configuration.config.metadata, query_request)
.map_err(|err| {
tracing::error!("{}", err);
match err {
translation::query::error::Error::NotSupported(_) => {
connector::QueryError::UnsupportedOperation(err.to_string())
}
_ => connector::QueryError::InvalidRequest(err.to_string()),
}
});
timer.complete_with(result)
}

async fn execute_query(
state: &configuration::State,
plan: sql::execution_plan::ExecutionPlan,
) -> Result<JsonResponse<models::QueryResponse>, connector::QueryError> {
execution::mssql_execute(&state.mssql_pool, &state.metrics, plan)
.await
.map(JsonResponse::Value)
.map_err(|err| match err {
execution::Error::Query(err) => {
tracing::error!("{}", err);
connector::QueryError::Other(err.into())
}
execution::Error::ConnectionPool(err) => {
tracing::error!("{}", err);
connector::QueryError::Other(err.into())
}
})
}
Loading

0 comments on commit fea9824

Please sign in to comment.