diff --git a/Cargo.lock b/Cargo.lock index 575b6e67..e68ec35d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "addr2line" @@ -395,7 +395,7 @@ dependencies = [ "bitflags 2.6.0", "cexpr", "clang-sys", - "itertools 0.13.0", + "itertools", "log", "prettyplease", "proc-macro2", @@ -1058,6 +1058,37 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "derive_builder" +version = "0.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "507dfb09ea8b7fa618fcf76e953f4f5e192547945816d5358edffe39f6f94947" +dependencies = [ + "derive_builder_macro", +] + +[[package]] +name = "derive_builder_core" +version = "0.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d5bcf7b024d6835cfb3d473887cd966994907effbe9227e8c8219824d06c4e8" +dependencies = [ + "darling 0.20.10", + "proc-macro2", + "quote", + "syn 2.0.89", +] + +[[package]] +name = "derive_builder_macro" +version = "0.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab63b0e2bf4d5928aff72e83a7dace85d7bba5fe12dcc3c5a572d78caffd3f3c" +dependencies = [ + "derive_builder_core", + "syn 2.0.89", +] + [[package]] name = "derive_more" version = "0.99.18" @@ -1562,7 +1593,7 @@ version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ae8ab26ef7c7c3f7dfb9cc3982293d031d8e78c85d00ddfb704b5c35aeff7c8" dependencies = [ - "prost 0.13.3", + "prost", "prost-types", "tonic", ] @@ -2442,7 +2473,7 @@ dependencies = [ [[package]] name = "integrationos-domain" -version = "8.0.0" +version = "9.0.0" dependencies = [ "aes", "anyhow", @@ -2454,9 +2485,8 @@ dependencies = [ "bson", "chacha20poly1305", "chrono", - "crc32fast", "ctr", - "digest", + "derive_builder", "downcast-rs", "envconfig", "fake", @@ -2471,9 +2501,6 @@ dependencies = [ "js-sandbox-ios", "jsonpath_lib", "jsonwebtoken 8.3.0", - "k8s-openapi", - "kube", - "mockito", "mongodb", "napi", "napi-derive", @@ -2484,11 +2511,11 @@ dependencies = [ "opentelemetry_sdk", "percent-encoding", "pin-project", - "prost 0.12.6", + "prost", "rand", "reqwest", "schemars", - "secrecy", + "secrecy 0.10.3", "semver 1.0.23", "serde", "serde_json", @@ -2572,6 +2599,7 @@ version = "0.1.0" dependencies = [ "bson", "chrono", + "derive_builder", "futures", "handlebars", "http 1.1.0", @@ -2637,15 +2665,6 @@ dependencies = [ "serde", ] -[[package]] -name = "itertools" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" -dependencies = [ - "either", -] - [[package]] name = "itertools" version = "0.13.0" @@ -2847,7 +2866,7 @@ dependencies = [ "pem 3.0.4", "rustls 0.23.18", "rustls-pemfile 2.2.0", - "secrecy", + "secrecy 0.8.0", "serde", "serde_json", "serde_yaml", @@ -3560,23 +3579,23 @@ dependencies = [ [[package]] name = "opentelemetry" -version = "0.26.0" +version = "0.27.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "570074cc999d1a58184080966e5bd3bf3a9a4af650c3b05047c2621e7405cd17" +checksum = "ab70038c28ed37b97d8ed414b6429d343a8bbf44c9f79ec854f3a643029ba6d7" dependencies = [ "futures-core", "futures-sink", "js-sys", - "once_cell", "pin-project-lite", "thiserror 1.0.69", + "tracing", ] [[package]] name = "opentelemetry-otlp" -version = "0.26.0" +version = "0.27.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29e1f9c8b032d4f635c730c0efcf731d5e2530ea13fa8bef7939ddc8420696bd" +checksum = "91cf61a1868dacc576bf2b2a1c3e9ab150af7272909e80085c3173384fe11f76" dependencies = [ "async-trait", "futures-core", @@ -3584,36 +3603,36 @@ dependencies = [ "opentelemetry", "opentelemetry-proto", "opentelemetry_sdk", - "prost 0.13.3", + "prost", "thiserror 1.0.69", "tokio", "tonic", + "tracing", ] [[package]] name = "opentelemetry-proto" -version = "0.26.1" +version = "0.27.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9d3968ce3aefdcca5c27e3c4ea4391b37547726a70893aab52d3de95d5f8b34" +checksum = "a6e05acbfada5ec79023c85368af14abd0b307c015e9064d249b2a950ef459a6" dependencies = [ "opentelemetry", "opentelemetry_sdk", - "prost 0.13.3", + "prost", "tonic", ] [[package]] name = "opentelemetry_sdk" -version = "0.26.0" +version = "0.27.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2c627d9f4c9cdc1f21a29ee4bfbd6028fcb8bcf2a857b43f3abdf72c9c862f3" +checksum = "231e9d6ceef9b0b2546ddf52335785ce41252bc7474ee8ba05bfad277be13ab8" dependencies = [ "async-trait", "futures-channel", "futures-executor", "futures-util", "glob", - "once_cell", "opentelemetry", "percent-encoding", "rand", @@ -3621,6 +3640,7 @@ dependencies = [ "thiserror 1.0.69", "tokio", "tokio-stream", + "tracing", ] [[package]] @@ -3960,45 +3980,22 @@ dependencies = [ [[package]] name = "prost" -version = "0.12.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "deb1435c188b76130da55f17a466d252ff7b1418b2ad3e037d127b94e3411f29" -dependencies = [ - "bytes", - "prost-derive 0.12.6", -] - -[[package]] -name = "prost" -version = "0.13.3" +version = "0.13.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b0487d90e047de87f984913713b85c601c05609aad5b0df4b4573fbf69aa13f" +checksum = "2c0fef6c4230e4ccf618a35c59d7ede15dea37de8427500f50aff708806e42ec" dependencies = [ "bytes", - "prost-derive 0.13.3", + "prost-derive", ] [[package]] name = "prost-derive" -version = "0.12.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" -dependencies = [ - "anyhow", - "itertools 0.12.1", - "proc-macro2", - "quote", - "syn 2.0.89", -] - -[[package]] -name = "prost-derive" -version = "0.13.3" +version = "0.13.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e9552f850d5f0964a4e4d0bf306459ac29323ddfbae05e35a7c0d35cb0803cc5" +checksum = "157c5a9d7ea5c2ed2d9fb8f495b64759f7816c7eaea54ba3978f0d63000162e3" dependencies = [ "anyhow", - "itertools 0.13.0", + "itertools", "proc-macro2", "quote", "syn 2.0.89", @@ -4010,7 +4007,7 @@ version = "0.13.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4759aa0d3a6232fb8dbdb97b61de2c20047c68aca932c7ed76da9d788508d670" dependencies = [ - "prost 0.13.3", + "prost", ] [[package]] @@ -4780,6 +4777,16 @@ dependencies = [ "zeroize", ] +[[package]] +name = "secrecy" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e891af845473308773346dc847b2c23ee78fe442e0472ac50e22a18a93d3ae5a" +dependencies = [ + "serde", + "zeroize", +] + [[package]] name = "security-framework" version = "2.11.1" @@ -5853,7 +5860,7 @@ dependencies = [ "hyper-util", "percent-encoding", "pin-project", - "prost 0.13.3", + "prost", "rustls-pemfile 2.2.0", "socket2", "tokio", @@ -6008,9 +6015,9 @@ dependencies = [ [[package]] name = "tracing-opentelemetry" -version = "0.27.0" +version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc58af5d3f6c5811462cabb3289aec0093f7338e367e5a33d28c0433b3c7360b" +checksum = "97a971f6058498b5c0f1affa23e7ea202057a7301dbff68e968b2d578bcbd053" dependencies = [ "js-sys", "once_cell", diff --git a/Cargo.toml b/Cargo.toml index 06a234be..047ab33d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ bson = "2.13.0" chrono = { version = "0.4.38", features = ["serde"] } convert_case = "0.6.0" dotenvy = "0.15.7" +derive_builder = "0.20.0" envconfig = "0.10.0" fake = { version = "2.10.0", features = [ "uuid", diff --git a/integrationos-api/src/logic/mod.rs b/integrationos-api/src/logic/mod.rs index 50ee0fe6..31d37429 100644 --- a/integrationos-api/src/logic/mod.rs +++ b/integrationos-api/src/logic/mod.rs @@ -9,7 +9,7 @@ use axum::{ }; use bson::doc; use http::{HeaderMap, HeaderValue}; -use integrationos_cache::local::connection_cache::ConnectionCacheArcStrHeaderKey; +use integrationos_cache::local::{ConnectionHeaderCache, LocalCacheExt}; use integrationos_domain::{ algebra::MongoStore, event_access::EventAccess, ApplicationError, Connection, IntegrationOSError, InternalError, OAuth, Store, Unit, @@ -42,7 +42,7 @@ pub mod schema_generator; pub mod secrets; pub mod transactions; pub mod unified; -pub mod utils; +pub mod utility; pub mod vault_connection; const INTEGRATION_OS_PASSTHROUGH_HEADER: &str = "x-pica-passthrough"; @@ -322,11 +322,11 @@ async fn get_connection( access: &EventAccess, connection_key: &HeaderValue, stores: &AppStores, - cache: &ConnectionCacheArcStrHeaderKey, + cache: &ConnectionHeaderCache, ) -> Result, IntegrationOSError> { let connection = cache .get_or_insert_with_filter( - (access.ownership.id.clone(), connection_key.clone()), + &(access.ownership.id.clone(), connection_key.clone()), stores.connection.clone(), doc! { "key": connection_key.to_str().map_err(|_| { @@ -335,6 +335,7 @@ async fn get_connection( "ownership.buildableId": access.ownership.id.as_ref(), "deleted": false }, + None, ) .await?; diff --git a/integrationos-api/src/logic/passthrough.rs b/integrationos-api/src/logic/passthrough.rs index 575c18d3..c38dce42 100644 --- a/integrationos-api/src/logic/passthrough.rs +++ b/integrationos-api/src/logic/passthrough.rs @@ -68,7 +68,7 @@ pub async fn passthrough_request( let model_execution_result = state .extractor_caller - .send_to_destination( + .dispatch_destination_request( Some(connection.clone()), &destination, headers, diff --git a/integrationos-api/src/logic/unified.rs b/integrationos-api/src/logic/unified.rs index a4df71a1..bda4cf1e 100644 --- a/integrationos-api/src/logic/unified.rs +++ b/integrationos-api/src/logic/unified.rs @@ -14,6 +14,7 @@ use integrationos_domain::{ encrypted_access_key::EncryptedAccessKey, encrypted_data::PASSWORD_LENGTH, event_access::EventAccess, AccessKey, ApplicationError, Event, InternalError, }; +use integrationos_unified::domain::RequestCrudBuilder; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use std::{collections::HashMap, sync::Arc}; @@ -38,6 +39,7 @@ pub struct PathParams { pub async fn get_request( access: Extension>, + Extension(passthrough): Extension>, state: State>, Path(params): Path, headers: HeaderMap, @@ -52,6 +54,7 @@ pub async fn get_request( name: params.model.to_case(Case::Pascal).into(), action: CrudAction::GetOne, id: Some(params.id.into()), + passthrough: *passthrough, }, None, ) @@ -62,6 +65,7 @@ const META: &str = "meta"; pub async fn update_request( access: Extension>, + Extension(passthrough): Extension>, state: State>, Path(params): Path, headers: HeaderMap, @@ -77,6 +81,7 @@ pub async fn update_request( name: params.model.to_case(Case::Pascal).into(), action: CrudAction::Update, id: Some(params.id.into()), + passthrough: *passthrough, }, Some(body), ) @@ -85,6 +90,7 @@ pub async fn update_request( pub async fn upsert_request( access: Extension>, + Extension(passthrough): Extension>, state: State>, Path(model): Path, headers: HeaderMap, @@ -100,6 +106,7 @@ pub async fn upsert_request( name: model.to_case(Case::Pascal).into(), action: CrudAction::Upsert, id: None, + passthrough: *passthrough, }, Some(body), ) @@ -108,6 +115,7 @@ pub async fn upsert_request( pub async fn list_request( access: Extension>, + Extension(passthrough): Extension>, state: State>, Path(model): Path, headers: HeaderMap, @@ -122,6 +130,7 @@ pub async fn list_request( name: model.to_case(Case::Pascal).into(), action: CrudAction::GetMany, id: None, + passthrough: *passthrough, }, None, ) @@ -130,6 +139,7 @@ pub async fn list_request( pub async fn count_request( access: Extension>, + Extension(passthrough): Extension>, state: State>, Path(model): Path, headers: HeaderMap, @@ -144,6 +154,7 @@ pub async fn count_request( name: model.to_case(Case::Pascal).into(), action: CrudAction::GetCount, id: None, + passthrough: *passthrough, }, None, ) @@ -153,6 +164,7 @@ pub async fn count_request( pub async fn create_request( access: Extension>, state: State>, + Extension(passthrough): Extension>, Path(model): Path, headers: HeaderMap, query_params: Option>>, @@ -167,6 +179,7 @@ pub async fn create_request( name: model.to_case(Case::Pascal).into(), action: CrudAction::Create, id: None, + passthrough: *passthrough, }, Some(body), ) @@ -175,6 +188,7 @@ pub async fn create_request( pub async fn delete_request( access: Extension>, + Extension(passthrough): Extension>, state: State>, Path(params): Path, headers: HeaderMap, @@ -189,6 +203,7 @@ pub async fn delete_request( name: params.model.to_case(Case::Pascal).into(), action: CrudAction::Delete, id: Some(params.id.into()), + passthrough: *passthrough, }, None, ) @@ -223,12 +238,6 @@ pub async fn process_request( let Query(query_params) = query_params.unwrap_or_default(); - let include_passthrough = headers - .get(&state.config.headers.enable_passthrough_header) - .and_then(|v| v.to_str().ok()) - .map(|s| s == "true") - .unwrap_or_default(); - let access_key_header_value = headers.get(&state.config.headers.auth_header).cloned(); remove_event_headers(&mut headers, &state.config.headers); @@ -248,14 +257,22 @@ pub async fn process_request( let mut response = state .extractor_caller - .send_to_destination_unified( + .dispatch_unified_request( connection.clone(), action.clone(), - include_passthrough, state.config.environment, - headers, - query_params, - payload, + RequestCrudBuilder::default() + .headers(headers) + .query_params(query_params) + .body(payload) + .build() + .map_err(|e| { + error!("Error building request crud: {e}"); + InternalError::invalid_argument( + &format!("Error building request crud: {e}"), + None, + ) + })?, ) .await .inspect_err(|e| { @@ -278,7 +295,10 @@ pub async fn process_request( .collect::(); let (parts, body) = response.response.into_parts(); - let mut metadata = body.get(META).unwrap_or(&response.metadata).clone(); + let mut metadata = body + .get(META) + .unwrap_or(&response.metadata.as_value()) + .clone(); if let Some(Ok(encrypted_access_key)) = access_key_header_value.map(|v| v.to_str().map(|s| s.to_string())) diff --git a/integrationos-api/src/logic/utils.rs b/integrationos-api/src/logic/utility.rs similarity index 100% rename from integrationos-api/src/logic/utils.rs rename to integrationos-api/src/logic/utility.rs diff --git a/integrationos-api/src/middleware/header_auth.rs b/integrationos-api/src/middleware/header_auth.rs index eafac32c..954935fb 100644 --- a/integrationos-api/src/middleware/header_auth.rs +++ b/integrationos-api/src/middleware/header_auth.rs @@ -1,6 +1,7 @@ use crate::server::AppState; use axum::{body::Body, extract::State, middleware::Next, response::Response}; use http::Request; +use integrationos_cache::local::LocalCacheExt; use integrationos_domain::{ApplicationError, IntegrationOSError, InternalError}; use mongodb::bson::doc; use std::sync::Arc; @@ -44,6 +45,7 @@ pub async fn header_auth_middleware( "accessKey": key, "deleted": false }, + None, ) .await; diff --git a/integrationos-api/src/middleware/blocker.rs b/integrationos-api/src/middleware/header_blocker.rs similarity index 100% rename from integrationos-api/src/middleware/blocker.rs rename to integrationos-api/src/middleware/header_blocker.rs diff --git a/integrationos-api/src/middleware/header_passthrough.rs b/integrationos-api/src/middleware/header_passthrough.rs new file mode 100644 index 00000000..545026d3 --- /dev/null +++ b/integrationos-api/src/middleware/header_passthrough.rs @@ -0,0 +1,22 @@ +use crate::server::AppState; +use axum::{body::Body, extract::State, middleware::Next, response::Response}; +use http::Request; +use integrationos_domain::IntegrationOSError; +use std::sync::Arc; + +pub async fn header_passthrough_middleware( + State(state): State>, + mut req: Request, + next: Next, +) -> Result { + let headers = req.headers(); + let include_passthrough = headers + .get(&state.config.headers.enable_passthrough_header) + .and_then(|v| v.to_str().ok()) + .map(|s| s == "true") + .unwrap_or_default(); + + req.extensions_mut().insert(Arc::new(include_passthrough)); + + Ok(next.run(req).await) +} diff --git a/integrationos-api/src/middleware/mod.rs b/integrationos-api/src/middleware/mod.rs index 36d7ffa2..a5c712d1 100644 --- a/integrationos-api/src/middleware/mod.rs +++ b/integrationos-api/src/middleware/mod.rs @@ -1,7 +1,8 @@ -pub mod blocker; -pub mod extractor; pub mod header_auth; +pub mod header_blocker; +pub mod header_passthrough; pub mod jwt_auth; +pub mod rate_limiter; pub use header_auth::header_auth_middleware; pub use jwt_auth::jwt_auth_middleware; diff --git a/integrationos-api/src/middleware/extractor.rs b/integrationos-api/src/middleware/rate_limiter.rs similarity index 100% rename from integrationos-api/src/middleware/extractor.rs rename to integrationos-api/src/middleware/rate_limiter.rs diff --git a/integrationos-api/src/router/public.rs b/integrationos-api/src/router/public.rs index f6a0d6a4..dd7c98aa 100644 --- a/integrationos-api/src/router/public.rs +++ b/integrationos-api/src/router/public.rs @@ -4,7 +4,7 @@ use crate::{ connection_definition::{self, GetPublicConnectionDetailsRequest}, connection_model_schema, connection_oauth_definition, event_access::create_event_access_for_new_user, - openapi, read, schema_generator, utils, + openapi, read, schema_generator, utility, }, middleware::jwt_auth::{self, JwtState}, server::AppState, @@ -67,10 +67,10 @@ pub fn get_router(state: &Arc) -> Router> { jwt_auth::jwt_auth_middleware, )), ) - .route("/generate-id/:prefix", get(utils::generate_id)) + .route("/generate-id/:prefix", get(utility::generate_id)) .route("/openapi", get(openapi::get_openapi)) .route("/openapi/yaml", get(openapi::get_openapi_yaml)) - .route("/version", get(utils::get_version)) + .route("/version", get(utility::get_version)) .layer(from_fn(log_request_middleware)) .layer(TraceLayer::new_for_http()) } diff --git a/integrationos-api/src/router/secured_key.rs b/integrationos-api/src/router/secured_key.rs index 8154f81a..e23a0714 100644 --- a/integrationos-api/src/router/secured_key.rs +++ b/integrationos-api/src/router/secured_key.rs @@ -9,9 +9,10 @@ use crate::{ unified, vault_connection, }, middleware::{ - blocker::{handle_blocked_error, BlockInvalidHeaders}, - extractor::{rate_limit_middleware, RateLimiter}, header_auth, + header_blocker::{handle_blocked_error, BlockInvalidHeaders}, + header_passthrough, + rate_limiter::{rate_limit_middleware, RateLimiter}, }, server::AppState, }; @@ -72,6 +73,10 @@ pub async fn get_router(state: &Arc) -> Router> { state.clone(), header_auth::header_auth_middleware, )) + .layer(from_fn_with_state( + state.clone(), + header_passthrough::header_passthrough_middleware, + )) .layer(from_fn(log_request_middleware)) .layer(TraceLayer::new_for_http()) .layer(SetSensitiveRequestHeadersLayer::new(once( diff --git a/integrationos-api/src/server.rs b/integrationos-api/src/server.rs index a2353a20..8f8a476b 100644 --- a/integrationos-api/src/server.rs +++ b/integrationos-api/src/server.rs @@ -7,10 +7,8 @@ use crate::{ use anyhow::{anyhow, Context, Result}; use axum::Router; use integrationos_cache::local::{ - connection_cache::ConnectionCacheArcStrHeaderKey, - connection_definition_cache::ConnectionDefinitionCache, - connection_oauth_definition_cache::ConnectionOAuthDefinitionCache, - event_access_cache::EventAccessCache, + ConnectionDefinitionCache, ConnectionHeaderCache, ConnectionOAuthDefinitionCache, + EventAccessCache, }; use integrationos_domain::{ algebra::{DefaultTemplate, MongoStore}, @@ -69,7 +67,7 @@ pub struct AppState { pub config: ConnectionsConfig, pub connection_definitions_cache: ConnectionDefinitionCache, pub connection_oauth_definitions_cache: ConnectionOAuthDefinitionCache, - pub connections_cache: ConnectionCacheArcStrHeaderKey, + pub connections_cache: ConnectionHeaderCache, pub event_access_cache: EventAccessCache, pub event_tx: Sender, pub extractor_caller: UnifiedDestination, @@ -175,10 +173,8 @@ impl Server { let event_access_cache = EventAccessCache::new(config.cache_size, config.access_key_cache_ttl_secs); - let connections_cache = ConnectionCacheArcStrHeaderKey::create( - config.cache_size, - config.connection_cache_ttl_secs, - ); + let connections_cache = + ConnectionHeaderCache::new(config.cache_size, config.connection_cache_ttl_secs); let connection_definitions_cache = ConnectionDefinitionCache::new( config.cache_size, config.connection_definition_cache_ttl_secs, diff --git a/integrationos-cache/src/lib.rs b/integrationos-cache/src/lib.rs index 42b95356..1fc1939a 100644 --- a/integrationos-cache/src/lib.rs +++ b/integrationos-cache/src/lib.rs @@ -1,68 +1,2 @@ pub mod local; pub mod remote; - -use futures::Future; -use integrationos_domain::{ApplicationError, IntegrationOSError, MongoStore, Unit}; -use mongodb::bson::Document; -use serde::de::DeserializeOwned; -use serde::{Deserialize, Serialize}; -use std::fmt::Debug; -use std::hash::Hash; - -pub trait RemoteCacheExt { - fn get( - &self, - key: &str, - ) -> impl Future, IntegrationOSError>> + Send - where - T: for<'de> Deserialize<'de>; - fn set( - &self, - key: &str, - value: T, - expire: Option, - ) -> impl Future> + Send - where - T: Serialize + Send; - fn remove(&self, key: &str) -> impl Future> + Send; - fn clear(&self) -> impl Future> + Send; -} - -pub trait LocalCacheExt -where - K: Hash + Eq + Clone + Debug, - V: Clone + DeserializeOwned + Send + Sync + Unpin + Serialize + 'static, -{ - fn get_or_insert_with_filter( - &self, - key: &K, - store: MongoStore, - filter: Document, - ) -> impl Future> { - async move { - match self.get(key).await? { - Some(entry) => { - tracing::debug!("Cache hit for key: {:?}", key); - Ok(entry) - } - None => { - tracing::debug!("Cache miss for key: {:?}", key); - let value = store.get_one(filter).await?; - if let Some(value) = value { - self.set(key, &value).await?; - Ok(value) - } else { - tracing::warn!("Value with id {:?} not found", key); - Err(ApplicationError::not_found("Value not found", None)) - } - } - } - } - } - - fn get(&self, key: &K) -> impl Future, IntegrationOSError>>; - - fn set(&self, key: &K, value: &V) -> impl Future>; - - fn remove(&self, key: &K) -> impl Future>; -} diff --git a/integrationos-cache/src/local.rs b/integrationos-cache/src/local.rs new file mode 100644 index 00000000..2ce9db22 --- /dev/null +++ b/integrationos-cache/src/local.rs @@ -0,0 +1,158 @@ +use futures::Future; +use http::HeaderValue; +use integrationos_domain::connection_definition::ConnectionDefinition; +use integrationos_domain::connection_model_definition::ConnectionModelDefinition; +use integrationos_domain::connection_model_schema::ConnectionModelSchema; +use integrationos_domain::connection_oauth_definition::ConnectionOAuthDefinition; +use integrationos_domain::destination::Destination; +use integrationos_domain::event_access::EventAccess; +use integrationos_domain::{ + ApplicationError, Connection, Id, IntegrationOSError, MongoStore, Secret, Unit, +}; +use moka::future::Cache; +use mongodb::bson::Document; +use mongodb::options::FindOneOptions; +use serde::de::DeserializeOwned; +use serde::Serialize; +use std::fmt::Debug; +use std::hash::Hash; +use std::sync::Arc; +use std::time::Duration; + +pub trait LocalCacheExt +where + K: Hash + Eq + Clone + Debug, + V: Clone + DeserializeOwned + Send + Sync + Unpin + Serialize + 'static, +{ + fn get_or_insert_with_filter( + &self, + key: &K, + store: MongoStore, + filter: Document, + options: Option, + ) -> impl Future> { + async move { + match self.get(key).await? { + Some(entry) => { + tracing::debug!("Cache hit for key: {:?}", key); + Ok(entry) + } + None => { + tracing::debug!("Cache miss for key: {:?}", key); + let value = store + .collection + .find_one(filter) + .with_options(options) + .await?; + if let Some(value) = value { + self.insert(key, &value).await?; + Ok(value) + } else { + tracing::warn!("Value with id {:?} not found", key); + Err(ApplicationError::not_found("Value not found", None)) + } + } + } + } + } + + fn get_or_insert_with_fn( + &self, + key: &K, + fa: F, + ) -> impl Future> + where + F: FnOnce() -> Fut, + Fut: Future>, + { + async move { + match self.get(key).await? { + Some(entry) => { + tracing::debug!("Cache hit for key: {:?}", key); + Ok(entry) + } + None => { + let value = fa().await?; + self.insert(key, &value).await?; + Ok(value) + } + } + } + } + + fn get(&self, key: &K) -> impl Future, IntegrationOSError>>; + + fn insert(&self, key: &K, value: &V) -> impl Future>; + + fn remove(&self, key: &K) -> impl Future>; + + fn max_capacity(&self) -> u64; +} + +#[derive(Clone)] +pub struct GenericCache +where + K: Hash + Eq + Clone + Debug, + V: Clone + DeserializeOwned + Send + Sync + Unpin + Serialize + 'static, +{ + inner: Arc>, +} + +impl GenericCache +where + K: Hash + Eq + Clone + Debug + Sync + Send + 'static, + V: Clone + DeserializeOwned + Send + Sync + Unpin + Serialize + 'static, +{ + pub fn new(size: u64, ttl: u64) -> Self { + Self { + inner: Arc::new( + Cache::builder() + .max_capacity(size) + .time_to_live(Duration::from_secs(ttl)) + .build(), + ), + } + } +} + +impl LocalCacheExt for GenericCache +where + K: Hash + Eq + Clone + Debug + Sync + Send + 'static, + V: Clone + DeserializeOwned + Send + Sync + Unpin + Serialize + 'static, +{ + async fn get(&self, key: &K) -> Result, IntegrationOSError> { + let inner = self.inner.clone(); + Ok(inner.get(key).await) + } + + async fn insert(&self, key: &K, value: &V) -> Result { + let inner = self.inner.clone(); + inner.insert(key.clone(), value.clone()).await; + Ok(()) + } + + async fn remove(&self, key: &K) -> Result { + let inner = self.inner.clone(); + inner.remove(key).await; + Ok(()) + } + + fn max_capacity(&self) -> u64 { + self.inner.policy().max_capacity().unwrap_or_default() + } +} + +type ConnectionModelSchemaKey = (Arc, Arc); +type ConnectionHeaderKey = (Arc, HeaderValue); +type ConnectionKey = Arc; + +pub type EventAccessCache = GenericCache; +pub type SecretCache = GenericCache; +pub type ConnectionOAuthDefinitionCache = GenericCache; +pub type ConnectionModelSchemaCache = GenericCache; +pub type ConnectionModelDefinitionDestinationCache = + GenericCache; +pub type ConnectionModelDefinitionCacheIdKey = GenericCache; +pub type ConnectionDefinitionCache = GenericCache; +pub type ConnectionHeaderCache = GenericCache; +pub type ConnectionCache = GenericCache; diff --git a/integrationos-cache/src/local/connection_cache.rs b/integrationos-cache/src/local/connection_cache.rs deleted file mode 100644 index ef0d8016..00000000 --- a/integrationos-cache/src/local/connection_cache.rs +++ /dev/null @@ -1,65 +0,0 @@ -use crate::LocalCacheExt; -use http::HeaderValue; -use integrationos_domain::{Connection, IntegrationOSError, MongoStore, Unit}; -use moka::future::Cache; -use mongodb::bson::Document; -use std::fmt::Debug; -use std::hash::Hash; -use std::{sync::Arc, time::Duration}; - -#[derive(Clone)] -pub struct ConnectionCacheForKey { - inner: Arc>, -} - -impl ConnectionCacheForKey { - pub fn new(size: u64, ttl: u64) -> Self { - Self { - inner: Arc::new( - Cache::builder() - .max_capacity(size) - .time_to_live(Duration::from_secs(ttl)) - .build(), - ), - } - } - - pub async fn get_or_insert_with_filter( - &self, - key: K, - store: MongoStore, - filter: Document, - ) -> Result { - self.inner - .get_or_insert_with_filter(&key, store, filter) - .await - } - - pub async fn get(&self, key: K) -> Result, IntegrationOSError> { - self.inner.get(&key).await - } - - pub async fn set(&self, key: K, value: &Connection) -> Result { - self.inner.set(&key, value).await - } - - pub async fn remove(&self, key: K) -> Result { - self.inner.remove(&key).await - } -} - -pub type ConnectionCacheArcStrKey = ConnectionCacheForKey>; - -impl ConnectionCacheArcStrKey { - pub fn create(size: u64, ttl: u64) -> ConnectionCacheForKey> { - ConnectionCacheForKey::new(size, ttl) - } -} - -pub type ConnectionCacheArcStrHeaderKey = ConnectionCacheForKey<(Arc, HeaderValue)>; - -impl ConnectionCacheArcStrHeaderKey { - pub fn create(size: u64, ttl: u64) -> ConnectionCacheForKey<(Arc, HeaderValue)> { - ConnectionCacheForKey::new(size, ttl) - } -} diff --git a/integrationos-cache/src/local/connection_definition_cache.rs b/integrationos-cache/src/local/connection_definition_cache.rs deleted file mode 100644 index 5f4336af..00000000 --- a/integrationos-cache/src/local/connection_definition_cache.rs +++ /dev/null @@ -1,53 +0,0 @@ -// pub type InMemoryCache = Arc>, Arc>>; -use crate::LocalCacheExt; -use integrationos_domain::{ - connection_definition::ConnectionDefinition, Id, IntegrationOSError, MongoStore, Unit, -}; -use moka::future::Cache; -use mongodb::bson::Document; -use std::{sync::Arc, time::Duration}; - -#[derive(Clone)] -pub struct ConnectionDefinitionCache { - inner: Arc>, -} - -impl ConnectionDefinitionCache { - pub fn new(size: u64, ttl: u64) -> Self { - Self { - inner: Arc::new( - Cache::builder() - .max_capacity(size) - .time_to_live(Duration::from_secs(ttl)) - .build(), - ), - } - } - - pub async fn get_or_insert_with_filter( - &self, - key: &Id, - store: MongoStore, - filter: Document, - ) -> Result { - self.inner - .get_or_insert_with_filter(key, store, filter) - .await - } - - pub async fn get(&self, key: &Id) -> Result, IntegrationOSError> { - self.inner.get(key).await - } - - pub async fn set( - &self, - key: &Id, - value: &ConnectionDefinition, - ) -> Result { - self.inner.set(key, value).await - } - - pub async fn remove(&self, key: &Id) -> Result { - self.inner.remove(key).await - } -} diff --git a/integrationos-cache/src/local/connection_model_definition_cache.rs b/integrationos-cache/src/local/connection_model_definition_cache.rs deleted file mode 100644 index 86a9f870..00000000 --- a/integrationos-cache/src/local/connection_model_definition_cache.rs +++ /dev/null @@ -1,98 +0,0 @@ -use crate::LocalCacheExt; -use futures::Future; -use integrationos_domain::{ - connection_model_definition::ConnectionModelDefinition, destination::Destination, Id, - IntegrationOSError, MongoStore, Unit, -}; -use moka::future::Cache; -use mongodb::bson::Document; -use std::{fmt::Debug, hash::Hash, sync::Arc, time::Duration}; - -#[derive(Clone)] -pub struct ConnectionModelDefinitionCacheForKey< - K: Clone + Send + Sync + Eq + Hash + Debug + 'static, -> { - inner: Arc>, -} - -impl ConnectionModelDefinitionCacheForKey { - pub fn new(size: u64, ttl: u64) -> Self { - Self { - inner: Arc::new( - Cache::builder() - .max_capacity(size) - .time_to_live(Duration::from_secs(ttl)) - .build(), - ), - } - } - - pub async fn get_or_insert_with_filter( - &self, - key: K, - store: MongoStore, - filter: Document, - ) -> Result { - self.inner - .get_or_insert_with_filter(&key, store, filter) - .await - } - - pub async fn get_or_insert_with_fn( - &self, - key: K, - fa: F, - ) -> Result - where - F: FnOnce() -> Fut, - Fut: Future>, - { - let result = self.inner.get(&key).await; - match result { - Ok(Some(value)) => Ok(value), - Ok(None) => { - let value = fa().await?; - self.inner.set(&key, &value).await?; - Ok(value) - } - Err(e) => Err(e), - } - } - - pub async fn get( - &self, - key: K, - ) -> Result, IntegrationOSError> { - self.inner.get(&key).await - } - - pub async fn set( - &self, - key: K, - value: &ConnectionModelDefinition, - ) -> Result { - self.inner.set(&key, value).await - } - - pub async fn remove(&self, key: K) -> Result { - self.inner.remove(&key).await - } -} - -#[derive(Clone)] -pub struct ConnectionModelDefinitionCacheIdKey; - -impl ConnectionModelDefinitionCacheIdKey { - pub fn create(size: u64, ttl: u64) -> ConnectionModelDefinitionCacheForKey { - ConnectionModelDefinitionCacheForKey::new(size, ttl) - } -} - -pub type ConnectionModelDefinitionDestinationKey = - ConnectionModelDefinitionCacheForKey; - -impl ConnectionModelDefinitionDestinationKey { - pub fn create(size: u64, ttl: u64) -> ConnectionModelDefinitionCacheForKey { - ConnectionModelDefinitionCacheForKey::new(size, ttl) - } -} diff --git a/integrationos-cache/src/local/connection_model_schema_cache.rs b/integrationos-cache/src/local/connection_model_schema_cache.rs deleted file mode 100644 index e5d19b2d..00000000 --- a/integrationos-cache/src/local/connection_model_schema_cache.rs +++ /dev/null @@ -1,77 +0,0 @@ -use crate::LocalCacheExt; -use integrationos_domain::{ - connection_model_schema::ConnectionModelSchema, ApplicationError, IntegrationOSError, - MongoStore, Unit, -}; -use moka::future::Cache; -use mongodb::{bson::Document, options::FindOneOptions}; -use std::{sync::Arc, time::Duration}; - -type ConnectionModelSchemaKey = (Arc, Arc); - -#[derive(Clone)] -pub struct ConnectionModelSchemaCache { - inner: Arc>, -} - -impl ConnectionModelSchemaCache { - pub fn new(size: u64, ttl: u64) -> Self { - Self { - inner: Arc::new( - Cache::builder() - .max_capacity(size) - .time_to_live(Duration::from_secs(ttl)) - .build(), - ), - } - } - - pub async fn get_or_insert_with_filter( - &self, - key: &ConnectionModelSchemaKey, - store: MongoStore, - filter: Document, - options: Option, - ) -> Result { - match self.get(key).await? { - Some(entry) => { - tracing::debug!("Cache hit for key: {:?}", key); - Ok(entry) - } - None => { - tracing::debug!("Cache miss for key: {:?}", key); - let value = store - .collection - .find_one(filter) - .with_options(options) - .await?; - if let Some(value) = value { - self.set(key, &value).await?; - Ok(value) - } else { - tracing::warn!("Value with id {:?} not found", key); - Err(ApplicationError::not_found("Value not found", None)) - } - } - } - } - - pub async fn get( - &self, - key: &ConnectionModelSchemaKey, - ) -> Result, IntegrationOSError> { - self.inner.get(key).await - } - - pub async fn set( - &self, - key: &ConnectionModelSchemaKey, - value: &ConnectionModelSchema, - ) -> Result { - self.inner.set(key, value).await - } - - pub async fn remove(&self, key: &ConnectionModelSchemaKey) -> Result { - self.inner.remove(key).await - } -} diff --git a/integrationos-cache/src/local/connection_oauth_definition_cache.rs b/integrationos-cache/src/local/connection_oauth_definition_cache.rs deleted file mode 100644 index 4a527220..00000000 --- a/integrationos-cache/src/local/connection_oauth_definition_cache.rs +++ /dev/null @@ -1,56 +0,0 @@ -use crate::LocalCacheExt; -use integrationos_domain::{ - connection_oauth_definition::ConnectionOAuthDefinition, Id, IntegrationOSError, MongoStore, - Unit, -}; -use moka::future::Cache; -use mongodb::bson::Document; -use std::{sync::Arc, time::Duration}; - -#[derive(Clone)] -pub struct ConnectionOAuthDefinitionCache { - inner: Arc>, -} - -impl ConnectionOAuthDefinitionCache { - pub fn new(size: u64, ttl: u64) -> Self { - Self { - inner: Arc::new( - Cache::builder() - .max_capacity(size) - .time_to_live(Duration::from_secs(ttl)) - .build(), - ), - } - } - - pub async fn get_or_insert_with_filter( - &self, - key: &Id, - store: MongoStore, - filter: Document, - ) -> Result { - self.inner - .get_or_insert_with_filter(key, store, filter) - .await - } - - pub async fn get( - &self, - key: &Id, - ) -> Result, IntegrationOSError> { - self.inner.get(key).await - } - - pub async fn set( - &self, - key: &Id, - value: &ConnectionOAuthDefinition, - ) -> Result { - self.inner.set(key, value).await - } - - pub async fn remove(&self, key: &Id) -> Result { - self.inner.remove(key).await - } -} diff --git a/integrationos-cache/src/local/event_access_cache.rs b/integrationos-cache/src/local/event_access_cache.rs deleted file mode 100644 index d5c5f4a1..00000000 --- a/integrationos-cache/src/local/event_access_cache.rs +++ /dev/null @@ -1,51 +0,0 @@ -use crate::LocalCacheExt; -use http::HeaderValue; -use integrationos_domain::{event_access::EventAccess, IntegrationOSError, MongoStore, Unit}; -use moka::future::Cache; -use mongodb::bson::Document; -use std::{sync::Arc, time::Duration}; - -#[derive(Clone)] -pub struct EventAccessCache { - inner: Arc>, -} - -impl EventAccessCache { - pub fn new(size: u64, ttl: u64) -> Self { - Self { - inner: Arc::new( - Cache::builder() - .max_capacity(size) - .time_to_live(Duration::from_secs(ttl)) - .build(), - ), - } - } - - pub async fn get_or_insert_with_filter( - &self, - key: &HeaderValue, - store: MongoStore, - filter: Document, - ) -> Result { - self.inner - .get_or_insert_with_filter(key, store, filter) - .await - } - - pub async fn get(&self, key: &HeaderValue) -> Result, IntegrationOSError> { - self.inner.get(key).await - } - - pub async fn set( - &self, - key: &HeaderValue, - value: &EventAccess, - ) -> Result { - self.inner.set(key, value).await - } - - pub async fn remove(&self, key: &HeaderValue) -> Result { - self.inner.remove(key).await - } -} diff --git a/integrationos-cache/src/local/mod.rs b/integrationos-cache/src/local/mod.rs deleted file mode 100644 index 0facee13..00000000 --- a/integrationos-cache/src/local/mod.rs +++ /dev/null @@ -1,123 +0,0 @@ -pub mod connection_cache; -pub mod connection_definition_cache; -pub mod connection_model_definition_cache; -pub mod connection_model_schema_cache; -pub mod connection_oauth_definition_cache; -pub mod event_access_cache; -pub mod secrets_cache; - -use crate::LocalCacheExt; -use integrationos_domain::{IntegrationOSError, Unit}; -use moka::future::Cache; -use serde::{de::DeserializeOwned, Serialize}; -use std::{fmt::Debug, hash::Hash, sync::Arc}; - -impl LocalCacheExt for Arc> -where - K: Hash + Eq + Clone + Debug + Send + Sync + 'static, - V: Clone + DeserializeOwned + Send + Sync + Unpin + Serialize + 'static, -{ - async fn get(&self, key: &K) -> Result, IntegrationOSError> { - match Cache::get(self, key).await { - Some(entry) => Ok(Some(entry)), - None => Ok(None), - } - } - - async fn set(&self, key: &K, value: &V) -> Result { - Cache::insert(self, key.clone(), value.clone()).await; - Ok(()) - } - - async fn remove(&self, key: &K) -> Result { - Cache::remove(self, key).await; - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use fake::{Fake, Faker}; - use mongodb::bson::doc; - use serde::Deserialize; - use std::time::Duration; - - #[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] - pub struct Element { - pub id: String, - pub value: String, - } - - pub struct TestCache { - inner: Arc>, - } - - impl TestCache { - pub fn new(size: u64, ttl: Duration) -> Self { - Self { - inner: Arc::new( - Cache::builder() - .max_capacity(size) - .time_to_live(ttl) - .build(), - ), - } - } - - pub async fn get(&self, key: &String) -> Result, IntegrationOSError> { - self.inner.get(key).await - } - - pub async fn set(&self, key: &String, value: &Element) -> Result { - self.inner.set(key, value).await - } - - pub async fn remove(&self, key: &String) -> Result { - self.inner.remove(key).await - } - } - - #[tokio::test] - async fn test_local_cache() { - let cache = TestCache::new(10, Duration::from_secs(3)); - - let id = Faker.fake(); - let value = Faker.fake(); - let element = Element { id, value }; - - let different_id = Faker.fake(); - - let result = cache.get(&different_id).await.expect("get failed"); - assert_eq!(result, None); - - cache - .set(&different_id, &element) - .await - .expect("set failed"); - - let result = cache.get(&different_id).await.expect("get failed"); - assert_eq!(result, Some(element.clone())); - - cache.remove(&different_id).await.expect("remove failed"); - - let result = cache.get(&different_id).await.expect("get failed"); - assert_eq!(result, None); - - // Test expiry - let id = Faker.fake(); - let value = Faker.fake(); - let element = Element { id, value }; - - cache.set(&element.id, &element).await.expect("set failed"); - - let result = cache.get(&element.id).await.expect("get failed"); - assert_eq!(result, Some(element.clone())); - - // wait for three seconds - tokio::time::sleep(Duration::from_secs(5)).await; - - let result = cache.get(&element.id).await.expect("get failed"); - assert_eq!(result, None); - } -} diff --git a/integrationos-cache/src/local/secrets_cache.rs b/integrationos-cache/src/local/secrets_cache.rs deleted file mode 100644 index 9ca14cec..00000000 --- a/integrationos-cache/src/local/secrets_cache.rs +++ /dev/null @@ -1,74 +0,0 @@ -use crate::LocalCacheExt; -use futures::Future; -use integrationos_domain::{Connection, IntegrationOSError, InternalError, MongoStore, Unit}; -use moka::future::Cache; -use mongodb::bson::Document; -use serde_json::Value; -use std::{sync::Arc, time::Duration}; - -#[derive(Clone)] -pub struct SecretCache { - inner: Arc>, -} - -impl SecretCache { - pub fn new(size: u64, ttl: u64) -> Self { - Self { - inner: Arc::new( - Cache::builder() - .max_capacity(size) - .time_to_live(Duration::from_secs(ttl)) - .build(), - ), - } - } - - pub fn max_capacity(&self) -> u64 { - self.inner.policy().max_capacity().unwrap_or_default() - } - - pub async fn get_or_insert_with_filter( - &self, - _: &Connection, - _: MongoStore, - _: Document, - ) -> Result { - Err(InternalError::key_not_found( - "The method you are trying to use is not implemented for this cache", - None, - )) - } - - pub async fn get_or_insert_with_fn( - &self, - key: Connection, - fa: F, - ) -> Result - where - F: FnOnce() -> Fut, - Fut: Future>, - { - let result = self.inner.get(&key).await; - match result { - Ok(Some(value)) => Ok(value), - Ok(None) => { - let value = fa().await?; - self.inner.set(&key, &value).await?; - Ok(value) - } - Err(e) => Err(e), - } - } - - pub async fn get(&self, key: &Connection) -> Result, IntegrationOSError> { - self.inner.get(key).await - } - - pub async fn set(&self, key: &Connection, value: &Value) -> Result { - self.inner.set(key, value).await - } - - pub async fn remove(&self, key: &Connection) -> Result { - self.inner.remove(key).await - } -} diff --git a/integrationos-database/src/service/init.rs b/integrationos-database/src/algebra/init.rs similarity index 100% rename from integrationos-database/src/service/init.rs rename to integrationos-database/src/algebra/init.rs diff --git a/integrationos-database/src/service/mod.rs b/integrationos-database/src/algebra/mod.rs similarity index 100% rename from integrationos-database/src/service/mod.rs rename to integrationos-database/src/algebra/mod.rs diff --git a/integrationos-database/src/service/storage.rs b/integrationos-database/src/algebra/storage.rs similarity index 100% rename from integrationos-database/src/service/storage.rs rename to integrationos-database/src/algebra/storage.rs diff --git a/integrationos-database/src/lib.rs b/integrationos-database/src/lib.rs index 0180c1cd..7651ee7e 100644 --- a/integrationos-database/src/lib.rs +++ b/integrationos-database/src/lib.rs @@ -1,5 +1,5 @@ +pub mod algebra; pub mod domain; pub mod logic; pub mod router; pub mod server; -pub mod service; diff --git a/integrationos-database/src/main.rs b/integrationos-database/src/main.rs index c46029bb..bc48efef 100644 --- a/integrationos-database/src/main.rs +++ b/integrationos-database/src/main.rs @@ -1,7 +1,7 @@ use anyhow::Result; use dotenvy::dotenv; use envconfig::Envconfig; -use integrationos_database::service::{ +use integrationos_database::algebra::{ init::{DatabaseInitializer, Initializer}, on_error_callback, }; diff --git a/integrationos-database/src/server.rs b/integrationos-database/src/server.rs index f968cda4..7a52391b 100644 --- a/integrationos-database/src/server.rs +++ b/integrationos-database/src/server.rs @@ -1,4 +1,4 @@ -use crate::{router, service::storage::Storage}; +use crate::{algebra::storage::Storage, router}; use anyhow::Result as AnyhowResult; use axum::Router; use integrationos_domain::database::DatabasePodConfig; diff --git a/integrationos-database/tests/context.rs b/integrationos-database/tests/context.rs index 41532d9d..38f8796d 100644 --- a/integrationos-database/tests/context.rs +++ b/integrationos-database/tests/context.rs @@ -1,7 +1,7 @@ use envconfig::Envconfig; use http::{Method, StatusCode}; -use integrationos_database::service::init::DatabaseInitializer; -use integrationos_database::service::init::Initializer; +use integrationos_database::algebra::init::DatabaseInitializer; +use integrationos_database::algebra::init::Initializer; use integrationos_domain::prefix::IdPrefix; use integrationos_domain::Id; use integrationos_domain::{database::DatabasePodConfig, IntegrationOSError, InternalError}; diff --git a/integrationos-domain/Cargo.toml b/integrationos-domain/Cargo.toml index 8682962c..5b5fecca 100644 --- a/integrationos-domain/Cargo.toml +++ b/integrationos-domain/Cargo.toml @@ -2,7 +2,7 @@ name = "integrationos-domain" description = "Shared library for IntegrationOS" license = "GPL-3.0" -version = "8.0.0" +version = "9.0.0" edition = "2021" repository = "https://github.com/integration-os/integrationos-domain" @@ -21,7 +21,7 @@ base64ct.workspace = true bson.workspace = true chrono.workspace = true ctr = "0.9.2" -digest = "0.10.7" +derive_builder.workspace = true downcast-rs = "1.2.1" envconfig.workspace = true fake = { workspace = true, features = [ @@ -46,15 +46,13 @@ indexmap = "2.6.0" js-sandbox-ios.workspace = true jsonpath_lib.workspace = true jsonwebtoken.workspace = true -kube.workspace = true -k8s-openapi = { workspace = true, features = ["latest"] } mongodb.workspace = true napi = { version = "2.16.13", default-features = false, features = ["napi4"] } napi-derive = "2.16.12" openapiv3.workspace = true percent-encoding = "2.3.1" pin-project = "1.1.7" -prost = "0.12.6" +prost = "0.13.4" rand.workspace = true reqwest = { workspace = true, features = ["json", "rustls-tls"] } semver = { workspace = true, features = ["serde"] } @@ -71,16 +69,14 @@ tracing-log = "0.2.0" tracing-subscriber = { workspace = true, features = ["env-filter"] } tracing = { workspace = true, features = ["attributes"] } uuid = { workspace = true, features = ["v4"] } -crc32fast = "1.4.2" -secrecy = { version = "0.8.0", features = ["serde"] } +secrecy = { version = "0.10.3", features = ["serde"] } chacha20poly1305 = "0.10.1" hex = { version = "0.4.3", features = ["serde"] } -opentelemetry = { version = "0.26", features = ["trace"] } -opentelemetry-otlp = "0.26" -tracing-opentelemetry = "0.27" -opentelemetry_sdk = { version = "0.26", features = ["rt-tokio", "trace"] } +opentelemetry = { version = "0.27.1", features = ["trace"] } +opentelemetry-otlp = "0.27.0" +tracing-opentelemetry = "0.28.0" +opentelemetry_sdk = { version = "0.27.1", features = ["rt-tokio", "trace"] } [dev-dependencies] once_cell = "1.20.2" -mockito = "1.6.1" schemars = "0.8.21" diff --git a/integrationos-domain/src/algebra/crypto.rs b/integrationos-domain/src/algebra/crypto.rs index 4165619b..92547ffd 100644 --- a/integrationos-domain/src/algebra/crypto.rs +++ b/integrationos-domain/src/algebra/crypto.rs @@ -9,7 +9,6 @@ use google_cloud_kms::{ client::{Client, ClientConfig}, grpc::kms::v1::DecryptRequest, }; -use secrecy::ExposeSecret; use tracing::debug; #[async_trait] @@ -47,7 +46,7 @@ impl CryptoExt for IOSCrypto { impl IOSCrypto { pub fn new(config: SecretsConfig) -> Result { - let len = config.ios_crypto_secret.expose_secret().as_bytes().len(); + let len = config.ios_crypto_secret.len(); if len != 32 { return Err(InternalError::invalid_argument( @@ -58,7 +57,6 @@ impl IOSCrypto { let key: [u8; 32] = config .ios_crypto_secret - .expose_secret() .as_bytes() .iter() .take(32) diff --git a/integrationos-domain/src/algebra/hash.rs b/integrationos-domain/src/algebra/hash.rs index b2d536d1..91616cf6 100644 --- a/integrationos-domain/src/algebra/hash.rs +++ b/integrationos-domain/src/algebra/hash.rs @@ -7,21 +7,21 @@ pub trait HashExt { fn verify(&self, value: &str, hash: &str) -> bool; } -pub struct HashKecAlg; +pub struct HashKecAlgImpl; -impl HashKecAlg { +impl HashKecAlgImpl { pub fn new() -> Self { - HashKecAlg + HashKecAlgImpl } } -impl Default for HashKecAlg { +impl Default for HashKecAlgImpl { fn default() -> Self { Self::new() } } -impl HashExt for HashKecAlg { +impl HashExt for HashKecAlgImpl { fn hash(&self, value: &str) -> Result { let mut hasher = Keccak256::new(); hasher.update(value); @@ -29,7 +29,7 @@ impl HashExt for HashKecAlg { } fn verify(&self, value: &str, hash: &str) -> bool { - self.hash(value).ok().map_or(false, |h| h == hash) + self.hash(value).ok().is_some_and(|h| h == hash) } } @@ -40,7 +40,7 @@ impl TryFrom for HashedSecret { let value_str = serde_json::to_string(&value).map_err(|err| { InternalError::serialize_error(&format!("Failed to serialize value: {err}"), None) })?; - let hash = HashKecAlg::new().hash(&value_str)?; + let hash = HashKecAlgImpl::new().hash(&value_str)?; Ok(HashedSecret::new(hash)) } } @@ -61,7 +61,7 @@ mod test { } }); - let hash = HashKecAlg::new().hash(&value.to_string()).unwrap(); + let hash = HashKecAlgImpl::new().hash(&value.to_string()).unwrap(); assert_eq!( hash, @@ -80,9 +80,9 @@ mod test { } }); - let hash = HashKecAlg::new().hash(&value.to_string()).unwrap(); + let hash = HashKecAlgImpl::new().hash(&value.to_string()).unwrap(); - assert!(HashKecAlg::new().verify(&value.to_string(), &hash)); + assert!(HashKecAlgImpl::new().verify(&value.to_string(), &hash)); } #[test] diff --git a/integrationos-domain/src/algebra/json.rs b/integrationos-domain/src/algebra/json.rs new file mode 100644 index 00000000..c9972607 --- /dev/null +++ b/integrationos-domain/src/algebra/json.rs @@ -0,0 +1,45 @@ +use serde_json::Value; + +pub trait JsonExt { + fn drop_nulls(&self) -> Self; +} + +impl JsonExt for Value { + fn drop_nulls(&self) -> Value { + remove_nulls(self) + } +} + +fn remove_nulls(value: &Value) -> Value { + match value { + Value::Object(map) => { + let mut map = map.clone(); + + let keys_to_remove: Vec = map + .iter() + .filter(|(_, v)| v.is_null()) + .map(|(k, _)| k.clone()) + .collect(); + + for key in keys_to_remove { + map.remove(&key); + } + + for value in map.values_mut() { + *value = remove_nulls(value); + } + + Value::Object(map) + } + Value::Array(vec) => { + let mut vec = vec.clone(); + + for item in vec.iter_mut() { + *item = remove_nulls(item); + } + + Value::Array(vec) + } + _ => value.clone(), + } +} diff --git a/integrationos-domain/src/algebra/mod.rs b/integrationos-domain/src/algebra/mod.rs index 6fc85a71..3d0d99db 100644 --- a/integrationos-domain/src/algebra/mod.rs +++ b/integrationos-domain/src/algebra/mod.rs @@ -1,6 +1,7 @@ mod crypto; mod fetcher; mod hash; +mod json; mod oauth; mod pipeline; mod secret; @@ -12,6 +13,7 @@ mod timed; pub use crypto::*; pub use fetcher::*; pub use hash::*; +pub use json::*; pub use oauth::*; pub use pipeline::*; pub use secret::*; diff --git a/integrationos-domain/src/algebra/string.rs b/integrationos-domain/src/algebra/string.rs index d2e4da66..46b7d8d8 100644 --- a/integrationos-domain/src/algebra/string.rs +++ b/integrationos-domain/src/algebra/string.rs @@ -77,7 +77,7 @@ impl StringExt for String { } } -impl<'a> StringExt for &'a str { +impl StringExt for &str { fn capitalize(&self) -> String { self.to_string().capitalize() } diff --git a/integrationos-domain/src/domain/configuration/secrets.rs b/integrationos-domain/src/domain/configuration/secrets.rs index 8756e983..22b3f3fb 100644 --- a/integrationos-domain/src/domain/configuration/secrets.rs +++ b/integrationos-domain/src/domain/configuration/secrets.rs @@ -1,5 +1,4 @@ use envconfig::Envconfig; -use secrecy::SecretString; use std::fmt::{Display, Formatter, Result}; use strum::{AsRefStr, EnumString}; @@ -8,7 +7,6 @@ use strum::{AsRefStr, EnumString}; pub enum SecretServiceProvider { GoogleKms, IosKms, - // TODO: Implement LocalStorage } #[derive(Debug, Clone, Envconfig)] @@ -30,7 +28,7 @@ pub struct SecretsConfig { from = "IOS_CRYPTO_SECRET", default = "xTtUQejH8eSNmWP5rlnHLkOWkHeflivG" )] - pub ios_crypto_secret: SecretString, + pub ios_crypto_secret: String, } impl SecretsConfig { @@ -41,7 +39,7 @@ impl SecretsConfig { #[cfg(test)] pub fn with_secret(mut self, secret: String) -> Self { - self.ios_crypto_secret = SecretString::new(secret); + self.ios_crypto_secret = secret; self } @@ -60,7 +58,7 @@ impl Default for SecretsConfig { google_kms_location_id: "global".to_owned(), google_kms_key_ring_id: "secrets-service-local".to_owned(), google_kms_key_id: "secrets-service-local".to_owned(), - ios_crypto_secret: SecretString::new("xTtUQejH8eSNmWP5rlnHLkOWkHeflivG".to_owned()), + ios_crypto_secret: "xTtUQejH8eSNmWP5rlnHLkOWkHeflivG".to_owned(), } } } @@ -82,8 +80,6 @@ impl Display for SecretsConfig { #[cfg(test)] mod tests { - use secrecy::ExposeSecret; - use super::*; #[tokio::test] @@ -91,7 +87,7 @@ mod tests { let config = SecretsConfig::new(); assert_eq!( - config.ios_crypto_secret.expose_secret().as_str(), + config.ios_crypto_secret.as_str(), "xTtUQejH8eSNmWP5rlnHLkOWkHeflivG" ); assert_eq!(config.provider, SecretServiceProvider::GoogleKms); diff --git a/integrationos-domain/src/domain/connection/connection_model_definition.rs b/integrationos-domain/src/domain/connection/connection_model_definition.rs index 66698d28..3597c6fe 100644 --- a/integrationos-domain/src/domain/connection/connection_model_definition.rs +++ b/integrationos-domain/src/domain/connection/connection_model_definition.rs @@ -101,6 +101,14 @@ pub enum PlatformInfo { Api(ApiModelConfig), } +impl PlatformInfo { + pub fn config(&self) -> &ApiModelConfig { + match self { + PlatformInfo::Api(config) => config, + } + } +} + #[derive(Debug, Clone, Eq, PartialEq, Hash, Deserialize, Serialize)] #[cfg_attr(feature = "dummy", derive(fake::Dummy))] #[serde(rename_all = "camelCase")] diff --git a/integrationos-domain/src/domain/error/axum_error.rs b/integrationos-domain/src/domain/error/axum_error.rs index 3ea240cf..c25710a8 100644 --- a/integrationos-domain/src/domain/error/axum_error.rs +++ b/integrationos-domain/src/domain/error/axum_error.rs @@ -11,7 +11,7 @@ impl IntoResponse for IntegrationOSError { } } -impl<'a> IntoResponse for &'a IntegrationOSError { +impl IntoResponse for &IntegrationOSError { fn into_response(self) -> Response { let body = self.to_owned().as_application().as_json(); diff --git a/integrationos-domain/src/domain/error/mod.rs b/integrationos-domain/src/domain/error/mod.rs index 90b24aae..f8fb8b6c 100644 --- a/integrationos-domain/src/domain/error/mod.rs +++ b/integrationos-domain/src/domain/error/mod.rs @@ -1,6 +1,7 @@ pub mod axum_error; use crate::prelude::StringExt; +use derive_builder::UninitializedFieldError; use http::StatusCode; use mongodb::error::WriteFailure; use serde::Serialize; @@ -949,6 +950,12 @@ pub enum IntegrationOSError { Application(ApplicationError), } +impl From for IntegrationOSError { + fn from(err: reqwest::Error) -> Self { + InternalError::io_err(&err.to_string(), None) + } +} + impl AsRef for IntegrationOSError { fn as_ref(&self) -> &str { match self { @@ -1340,6 +1347,12 @@ impl Display for IntegrationOSError { } } +impl From for IntegrationOSError { + fn from(ufe: UninitializedFieldError) -> Self { + InternalError::invalid_argument(&format!("Uninitialized field: {}", ufe.field_name()), None) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/integrationos-domain/src/domain/pipeline/destination.rs b/integrationos-domain/src/domain/pipeline/destination.rs index 9e4d1d7e..dc05eea2 100644 --- a/integrationos-domain/src/domain/pipeline/destination.rs +++ b/integrationos-domain/src/domain/pipeline/destination.rs @@ -19,6 +19,8 @@ pub enum Action { action: CrudAction, #[cfg_attr(feature = "dummy", dummy(default))] id: Option>, + #[serde(default)] + passthrough: bool, }, } @@ -36,6 +38,22 @@ impl Action { Action::Unified { action, .. } => Some(action), } } + + pub fn passthrough(&self) -> bool { + match self { + Action::Passthrough { .. } => true, + Action::Unified { passthrough, .. } => *passthrough, + } + } + + pub fn set_passthrough(mut self, value: bool) -> Self { + match &mut self { + Action::Passthrough { .. } => {} + Action::Unified { passthrough, .. } => *passthrough = value, + } + + self + } } #[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)] @@ -48,3 +66,43 @@ pub struct Destination { #[cfg_attr(feature = "dummy", dummy(expr = "String::new().into()"))] pub connection_key: Arc, } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_set_passthrough() { + let action = Action::Unified { + name: "test".to_string().into(), + action: CrudAction::GetOne, + id: None, + passthrough: false, + }; + + assert!(!action.passthrough()); + + let action = action.set_passthrough(true); + + assert!(action.passthrough()); + } + + #[test] + fn test_destination_deserialize() { + let action = r#"{ + "platform": "ios", + "action": { + "Unified": { + "name": "test", + "action": "getOne", + "id": null + } + }, + "connectionKey": "test" + }"#; + + let destination: Destination = serde_json::from_str(action).unwrap(); + + assert!(!destination.action.passthrough()); + } +} diff --git a/integrationos-domain/src/domain/schema/common_model.rs b/integrationos-domain/src/domain/schema/common_model.rs index 66665971..aea594c6 100644 --- a/integrationos-domain/src/domain/schema/common_model.rs +++ b/integrationos-domain/src/domain/schema/common_model.rs @@ -744,12 +744,12 @@ impl CommonModel { /// * `cm_store` - The store for common models /// * `ce_store` - The store for common enums /// * `strategy` - The strategy to use for generating the type - pub async fn generate_as_expanded<'a>( + pub async fn generate_as_expanded( &self, lang: &Lang, cm_store: &MongoStore, ce_store: &MongoStore, - strategy: TypeGenerationStrategy<'a>, + strategy: TypeGenerationStrategy<'_>, ) -> String { match lang { Lang::Rust => self.as_rust_expanded(cm_store, ce_store, strategy).await, @@ -892,11 +892,11 @@ impl CommonModel { /// /// # Returns /// A string of all the enums and models recursively expanded in the specified language - async fn as_typescript_expanded<'a>( + async fn as_typescript_expanded( &self, cm_store: &MongoStore, ce_store: &MongoStore, - strategy: TypeGenerationStrategy<'a>, + strategy: TypeGenerationStrategy<'_>, ) -> String { let mut long_lived_visited_enums = HashSet::new(); let mut long_lived_visited_common_models = HashSet::new(); @@ -1002,11 +1002,11 @@ impl CommonModel { /// /// # Returns /// A string of all the enums and models recursively expanded in the specified language - async fn as_rust_expanded<'a>( + async fn as_rust_expanded( &self, cm_store: &MongoStore, ce_store: &MongoStore, - strategy: TypeGenerationStrategy<'a>, + strategy: TypeGenerationStrategy<'_>, ) -> String { let mut long_lived_visited_enums = HashSet::new(); let mut long_lived_visited_common_models = HashSet::new(); diff --git a/integrationos-domain/src/domain/secret/mod.rs b/integrationos-domain/src/domain/secret/mod.rs index d831c678..f1d3473a 100644 --- a/integrationos-domain/src/domain/secret/mod.rs +++ b/integrationos-domain/src/domain/secret/mod.rs @@ -95,7 +95,7 @@ impl Secret { } pub fn encrypted_secret(&self) -> SecretString { - SecretString::new(self.encrypted_secret.clone()) + SecretString::from(self.encrypted_secret.clone()) } } diff --git a/integrationos-domain/src/service/telemetry/mod.rs b/integrationos-domain/src/service/telemetry/mod.rs index e0a868a9..4d8701c7 100644 --- a/integrationos-domain/src/service/telemetry/mod.rs +++ b/integrationos-domain/src/service/telemetry/mod.rs @@ -5,9 +5,9 @@ use axum::middleware::Next; use axum::response::IntoResponse; use http::StatusCode; use opentelemetry::{global, trace::TracerProvider}; -use opentelemetry_otlp::WithExportConfig; +use opentelemetry_otlp::{SpanExporter, WithExportConfig}; use opentelemetry_sdk::runtime::Tokio; -use opentelemetry_sdk::trace::{BatchConfig, Config}; +use opentelemetry_sdk::trace::TracerProvider as OtelTracerProvider; use tracing::subscriber::set_global_default; use tracing_bunyan_formatter::{BunyanFormattingLayer, JsonStorageLayer}; use tracing_log::LogTracer; @@ -39,22 +39,18 @@ where match otlp_url { Some(otlp_url) => { - let exporter = opentelemetry_otlp::new_exporter() - .tonic() - .with_endpoint(otlp_url.clone()); + let exporter = SpanExporter::builder() + .with_tonic() + .with_endpoint(otlp_url.clone()) + .build() + .expect("Failed to create OTLP exporter"); - let provider = opentelemetry_otlp::new_pipeline() - .tracing() - .with_trace_config(Config::default().with_resource( - opentelemetry_sdk::Resource::new(vec![opentelemetry::KeyValue::new( - "service.name", - name, - )]), - )) - .with_batch_config(BatchConfig::default()) - .with_exporter(exporter) - .install_batch(Tokio) - .expect("Failed to install tracing pipeline"); + let provider = OtelTracerProvider::builder() + .with_resource(opentelemetry_sdk::Resource::new(vec![ + opentelemetry::KeyValue::new("service.name", name), + ])) + .with_batch_exporter(exporter, Tokio) + .build(); global::set_tracer_provider(provider.clone()); let tracer = provider.tracer("tracing-otel-subscriber"); diff --git a/integrationos-event/src/mongo_control_data_store.rs b/integrationos-event/src/mongo_control_data_store.rs index 2f1d4b93..e305b883 100644 --- a/integrationos-event/src/mongo_control_data_store.rs +++ b/integrationos-event/src/mongo_control_data_store.rs @@ -355,7 +355,7 @@ impl ControlDataStore for MongoControlDataStore { ) -> Result { let response = self .destination_caller - .send_to_destination( + .dispatch_destination_request( None, &pipeline.destination, event.headers.clone(), diff --git a/integrationos-gateway/src/config.rs b/integrationos-gateway/src/config.rs index a5cc0179..111704c7 100644 --- a/integrationos-gateway/src/config.rs +++ b/integrationos-gateway/src/config.rs @@ -55,7 +55,6 @@ impl Default for Config { } #[cfg(test)] - mod tests { use super::*; diff --git a/integrationos-gateway/src/lib.rs b/integrationos-gateway/src/lib.rs index 35e80129..3e65fc58 100644 --- a/integrationos-gateway/src/lib.rs +++ b/integrationos-gateway/src/lib.rs @@ -2,4 +2,4 @@ pub mod config; pub mod finalizer; pub mod mock; pub mod server; -pub mod util; +pub mod utility; diff --git a/integrationos-gateway/src/server.rs b/integrationos-gateway/src/server.rs index 965a70d8..8b21878a 100644 --- a/integrationos-gateway/src/server.rs +++ b/integrationos-gateway/src/server.rs @@ -1,6 +1,6 @@ use crate::{ config::Config, finalizer::event::FinalizeEvent, mock::finalizer::MockFinalizer, - util::get_value_from_path, + utility::get_value_from_path, }; use anyhow::{anyhow, Result}; use axum::{ diff --git a/integrationos-gateway/src/util.rs b/integrationos-gateway/src/utility.rs similarity index 100% rename from integrationos-gateway/src/util.rs rename to integrationos-gateway/src/utility.rs diff --git a/integrationos-unified/Cargo.toml b/integrationos-unified/Cargo.toml index e8e39ae7..0a8bd50f 100644 --- a/integrationos-unified/Cargo.toml +++ b/integrationos-unified/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" jsonpath_lib.workspace = true bson.workspace = true chrono = { workspace = true, features = ["serde"] } +derive_builder.workspace = true integrationos-cache = { path = "../integrationos-cache" } integrationos-domain = { path = "../integrationos-domain" } futures.workspace = true diff --git a/integrationos-unified/src/algebra/jsruntime.rs b/integrationos-unified/src/algebra/jsruntime.rs new file mode 100644 index 00000000..fa7ace06 --- /dev/null +++ b/integrationos-unified/src/algebra/jsruntime.rs @@ -0,0 +1,109 @@ +use integrationos_domain::{ApplicationError, IntegrationOSError}; +use js_sandbox_ios::Script; +use serde::de::DeserializeOwned; +use serde::Serialize; +use std::cell::RefCell; +use std::fmt::Debug; + +thread_local! { + static JS_RUNTIME: RefCell