Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
py committed Dec 2, 2024
1 parent b08ed14 commit 7a1f7e2
Show file tree
Hide file tree
Showing 23 changed files with 275 additions and 451 deletions.
3 changes: 2 additions & 1 deletion crates/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ async fn update(context: Context<impl Environment>) -> anyhow::Result<()> {
let existing_configuration =
configuration::parse_configuration(&context.context_path).await?;
let output =
configuration::introspect(&existing_configuration.clone(), &context.environment).await?;
configuration::introspect(&existing_configuration.clone(), &context.environment)
.await?;

// Check that the input file did not change since we started introspecting,
let input_again_before_write =
Expand Down
2 changes: 1 addition & 1 deletion crates/configuration/src/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ pub struct Configuration {
// pub provider_name: String,
pub region: String,
// pub mutations_version: Option<metadata::mutations::MutationsVersion>,
}
}
2 changes: 1 addition & 1 deletion crates/configuration/src/connection_settings.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Database connection settings.
use crate::values::{Secret, AccessKeyId, SecretAccessKey, Region};
use crate::values::{AccessKeyId, Region, Secret, SecretAccessKey};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

Expand Down
10 changes: 5 additions & 5 deletions crates/configuration/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
pub mod version1;
pub mod connection_settings;
pub mod configuration;
pub mod connection_settings;
pub mod environment;
pub mod error;
mod values;
mod to_runtime_configuration;
mod values;
pub mod version1;

pub use configuration::Configuration;
pub use to_runtime_configuration::make_runtime_configuration;
pub use values::connection_info::{AccessKeyId, ProviderName, Region, SecretAccessKey};
pub use version1::{
introspect,
parse_configuration,
Expand All @@ -18,5 +20,3 @@ pub use version1::{
// PoolSettings,
ParsedConfiguration,
};
pub use values::connection_info::{AccessKeyId, Region, SecretAccessKey, ProviderName};
pub use to_runtime_configuration::make_runtime_configuration;
2 changes: 1 addition & 1 deletion crates/configuration/src/to_runtime_configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use super::version1::ParsedConfiguration;
use crate::environment::Environment;
use crate::error::MakeRuntimeConfigurationError;
use crate::values::{Secret, AccessKeyId, Region, SecretAccessKey};
use crate::values::{AccessKeyId, Region, Secret, SecretAccessKey};
use query_engine_metadata::{self, metadata};
// use crate::VersionTag;

Expand Down
2 changes: 1 addition & 1 deletion crates/configuration/src/values/connection_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,4 @@ impl From<&str> for Region {
fn from(value: &str) -> Self {
Self::from(value.to_string())
}
}
}
6 changes: 3 additions & 3 deletions crates/configuration/src/values/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
mod secret;
mod pool_settings;
pub mod connection_info;
mod pool_settings;
mod secret;

pub use connection_info::{AccessKeyId, Region, SecretAccessKey};
pub use secret::Secret;
pub use connection_info::{AccessKeyId, SecretAccessKey, Region};
144 changes: 76 additions & 68 deletions crates/configuration/src/version1.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
//! Internal Configuration and state for our connector.
use crate::{connection_settings, AccessKeyId, SecretAccessKey};
use crate::environment::Environment;
use crate::error::WriteParsedConfigurationError;
use crate::values::Secret;
use crate::{connection_settings, AccessKeyId, SecretAccessKey};

use super::error::ParseConfigurationError;
use aws_sdk_dynamodb::types::KeyType;
Expand All @@ -16,15 +16,16 @@ use std::collections::{BTreeMap, BTreeSet};
use std::path::Path;
use tokio::fs;

use query_engine_metadata::metadata::{self, database, ColumnInfo, Nullable, ProjectionTypeInfo, ScalarTypes, TablesInfo};
use query_engine_metadata::metadata::{
self, database, ColumnInfo, Nullable, ProjectionTypeInfo, ScalarTypes, TablesInfo,
};

const CURRENT_VERSION: u32 = 1;
pub const CONFIGURATION_FILENAME: &str = "configuration.json";
const CHARACTER_STRINGS: [&str; 3] = ["character", "text", "string"];
const UNICODE_CHARACTER_STRINGS: [&str; 3] = ["nchar", "ntext", "nvarchar"];
const CANNOT_COMPARE: [&str; 3] = ["text", "ntext", "image"];


/// Initial configuration, just enough to connect to a database and elaborate a full
/// 'Configuration'.
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize, JsonSchema)]
Expand Down Expand Up @@ -68,19 +69,25 @@ pub async fn introspect(
) -> anyhow::Result<ParsedConfiguration> {
let access_key_id = match &args.connection_settings.access_key_id {
AccessKeyId(Secret::Plain(value)) => Cow::Borrowed(value),
AccessKeyId(Secret::FromEnvironment { variable }) => Cow::Owned(environment.read(variable)?),
AccessKeyId(Secret::FromEnvironment { variable }) => {
Cow::Owned(environment.read(variable)?)
}
};
let secret_access_key = match &args.connection_settings.secret_access_key {
SecretAccessKey(Secret::Plain(value)) => Cow::Borrowed(value),
SecretAccessKey(Secret::FromEnvironment { variable }) => Cow::Owned(environment.read(variable)?),
SecretAccessKey(Secret::FromEnvironment { variable }) => {
Cow::Owned(environment.read(variable)?)
}
};
// let provider_name = match &args.connection_settings.provider_name {
// ProviderName(Secret::Plain(value)) => Cow::Borrowed(value),
// ProviderName(Secret::FromEnvironment { variable }) => Cow::Owned(environment.read(variable)?),
// };
let region = match &args.connection_settings.region {
crate::Region(Secret::Plain(value)) => Cow::Borrowed(value),
crate::Region(Secret::FromEnvironment { variable }) => Cow::Owned(environment.read(variable)?),
crate::Region(Secret::FromEnvironment { variable }) => {
Cow::Owned(environment.read(variable)?)
}
};
// let access_key_id = args.connection_settings.access_key_id.clone();
// let secret_access_key = args.connection_settings.secret_access_key.clone();
Expand All @@ -90,16 +97,16 @@ pub async fn introspect(
let credentials = aws_sdk_dynamodb::config::Credentials::new(
access_key_id.to_string(),
secret_access_key.to_string(),
None, // Optional session token
None, // Expiration (None for non-expiring)
"my-provider", // Provider name
None, // Optional session token
None, // Expiration (None for non-expiring)
"my-provider", // Provider name
);

// Configure AWS SDK with explicit credentials
let config = Config::builder()
.region(aws_config::Region::new(region.to_string()))
.credentials_provider(credentials)
.build();
.region(aws_config::Region::new(region.to_string()))
.credentials_provider(credentials)
.build();

// To use localhost url
// let config = aws_config::defaults(aws_config::BehaviorVersion::latest())
Expand All @@ -114,13 +121,12 @@ pub async fn introspect(
let client = aws_sdk_dynamodb::Client::from_conf(config);
let tables_result = client.list_tables().send().await;
// dbg!(&tables_result);
let tables = tables_result.map_err(|_op| {
ParseConfigurationError::IoErrorButStringified(format!(
"Failed to list tables:",
// op.error_message.unwrap()
))
}).unwrap(); //TODO: handle error
// dbg!(&tables);
let tables = tables_result
.map_err(|_op| {
ParseConfigurationError::IoErrorButStringified("Failed to list tables:".to_string())
})
.unwrap(); //TODO: handle error
// dbg!(&tables);
let table_names = tables.table_names.unwrap_or_default();
let mut scalars_list: BTreeSet<ScalarTypeName> = BTreeSet::new();
let mut tables_info: BTreeMap<CollectionName, metadata::TableInfo> = BTreeMap::new();
Expand All @@ -147,7 +153,6 @@ pub async fn introspect(
// "NULL" => ScalarTypeName::new("Null".into()),
// "M" => ScalarTypeName::new("Object".into()),
// "L" => ScalarTypeName::new("Array".into()),

_ => ScalarTypeName::new("Any".into()),
};
scalars_list.insert(scalar_type_name.clone());
Expand All @@ -163,18 +168,13 @@ pub async fn introspect(

//get non key attributes
let result = client
.execute_statement()
.statement(
format!(
r#"select * from {}"#,
table_name
)
)
.set_parameters(None)
.set_limit(Some(20))
.send()
.await
.unwrap();
.execute_statement()
.statement(format!(r#"select * from {table_name}"#))
.set_parameters(None)
.set_limit(Some(20))
.send()
.await
.unwrap();

// let result = match row_1
// {
Expand All @@ -190,40 +190,34 @@ pub async fn introspect(
// dbg!(&result);

// let row = result.first().unwrap();
for item in result.items.unwrap().iter() {
for item in &result.items.unwrap() {
for (key, attribute_value) in item {
let column_name = FieldName::new(key.clone().into());
// dbg!(&column_name);
let column_type =
if attribute_value.is_s() {
let scalar_type_name = ScalarTypeName::new("String".into());
scalars_list.insert(scalar_type_name.clone());
metadata::Type::ScalarType(scalar_type_name)
}
else if attribute_value.is_n() {
let scalar_type_name = ScalarTypeName::new("Number".into());
scalars_list.insert(scalar_type_name.clone());
metadata::Type::ScalarType(scalar_type_name)
}
else if attribute_value.is_bool() {
let scalar_type_name = ScalarTypeName::new("Boolean".into());
scalars_list.insert(scalar_type_name.clone());
metadata::Type::ScalarType(scalar_type_name)
}
else {
metadata::Type::ScalarType(ScalarTypeName::new("Any".into()))
};
let column_type = if attribute_value.is_s() {
let scalar_type_name = ScalarTypeName::new("String".into());
scalars_list.insert(scalar_type_name.clone());
metadata::Type::ScalarType(scalar_type_name)
} else if attribute_value.is_n() {
let scalar_type_name = ScalarTypeName::new("Number".into());
scalars_list.insert(scalar_type_name.clone());
metadata::Type::ScalarType(scalar_type_name)
} else if attribute_value.is_bool() {
let scalar_type_name = ScalarTypeName::new("Boolean".into());
scalars_list.insert(scalar_type_name.clone());
metadata::Type::ScalarType(scalar_type_name)
} else {
metadata::Type::ScalarType(ScalarTypeName::new("Any".into()))
};
let column_info = ColumnInfo {
name: key.clone(),
r#type: column_type,
nullable: Nullable::Nullable,
description: None,
};
columns_info.insert(column_name, column_info);

}
}

}

//
let mut key_info: BTreeMap<KeyType, String> = BTreeMap::new();
Expand All @@ -239,7 +233,7 @@ pub async fn introspect(
let partition_key = key_info.get(&KeyType::Hash).unwrap();
let sort_key = key_info.get(&KeyType::Range).unwrap();

let mut gsi_indexes:BTreeMap<String, metadata::GlobalSecondaryIndexInfo> = BTreeMap::new();
let mut gsi_indexes: BTreeMap<String, metadata::GlobalSecondaryIndexInfo> = BTreeMap::new();
let gsis = table.global_secondary_indexes.unwrap();
for gsi in gsis {
let index_name = gsi.index_name.unwrap();
Expand All @@ -248,24 +242,38 @@ pub async fn introspect(
for key in index_keys {
let name = key.attribute_name;
let key_type = key.key_type;

if key_type == KeyType::Hash || key_type == KeyType::Range {
index_keys_info.insert(key_type, name);
}
}
let partition_key = index_keys_info.get(&KeyType::Hash).unwrap();
let sort_key: Option<String> = index_keys_info.get(&KeyType::Range).cloned();

let projection_type = gsi.projection.clone().unwrap().projection_type.unwrap().as_str().to_string();
let non_key_attributes = gsi.projection.unwrap().non_key_attributes.unwrap_or_default();
gsi_indexes.insert(index_name, metadata::GlobalSecondaryIndexInfo {
partition_key: partition_key.to_owned(),
sort_key,
projection_type: ProjectionTypeInfo {
projection_type,
non_key_attributes,
}
});
let projection_type = gsi
.projection
.clone()
.unwrap()
.projection_type
.unwrap()
.as_str()
.to_string();
let non_key_attributes = gsi
.projection
.unwrap()
.non_key_attributes
.unwrap_or_default();
gsi_indexes.insert(
index_name,
metadata::GlobalSecondaryIndexInfo {
partition_key: partition_key.to_owned(),
sort_key,
projection_type: ProjectionTypeInfo {
projection_type,
non_key_attributes,
},
},
);
}
let table_info = metadata::TableInfo {
table_name: table_name.clone(),
Expand Down
2 changes: 1 addition & 1 deletion crates/ndc-dynamodb/bin/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::process::ExitCode;

use ndc_dynamodb_configuration::environment::ProcessEnvironment;
use ndc_dynamodb::connector::DynamoDBSetup;
use ndc_dynamodb_configuration::environment::ProcessEnvironment;
use ndc_sdk::default_main::default_main_with;

#[tokio::main]
Expand Down
20 changes: 8 additions & 12 deletions crates/ndc-dynamodb/src/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ use aws_sdk_dynamodb::Client;
/// For example, this function should check that the connector
/// is able to reach its data source over the network.
/// TODO
pub async fn health_check(
client: &Client,
) -> Result<(), ErrorResponse> {
pub async fn health_check(client: &Client) -> Result<(), ErrorResponse> {
// Query
// let mut rs = client
// .job()
Expand All @@ -24,18 +22,16 @@ pub async fn health_check(

let tables_result = client.list_tables().send().await;
let tables = tables_result.map_err(|_op| {
ndc_dynamodb_configuration::error::ParseConfigurationError::IoErrorButStringified(format!(
"Failed to list tables"
))
ndc_dynamodb_configuration::error::ParseConfigurationError::IoErrorButStringified(
"Failed to list tables".to_string(),
)
}); //TODO: handle error

match tables {
Ok(_res) => {
Ok(())
}
Err(_e) => {
Err(ErrorResponse::new_internal_with_details(serde_json::Value::Null))
}
Ok(_res) => Ok(()),
Err(_e) => Err(ErrorResponse::new_internal_with_details(
serde_json::Value::Null,
)),
}

// // silly check
Expand Down
1 change: 0 additions & 1 deletion crates/ndc-dynamodb/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ pub async fn get_schema(
collection_type: table_name.as_str().into(),
uniqueness_constraints: BTreeMap::new(),
foreign_keys: BTreeMap::new(),

})
.collect();

Expand Down
Loading

0 comments on commit 7a1f7e2

Please sign in to comment.