diff --git a/Cargo.lock b/Cargo.lock index ea9a62b28..38ec5b900 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1663,6 +1663,7 @@ dependencies = [ "sanitise-file-name", "test-log", "thiserror", + "tokio", ] [[package]] @@ -1697,6 +1698,7 @@ dependencies = [ "quickcheck", "quickcheck_macros", "rand", + "reqwest", "rio_turtle", "rio_xml", "test-log", @@ -3104,7 +3106,6 @@ dependencies = [ "bytes", "libc", "mio", - "num_cpus", "pin-project-lite", "socket2", "windows-sys", diff --git a/nemo-cli/src/main.rs b/nemo-cli/src/main.rs index 989d86da3..33053ef7e 100644 --- a/nemo-cli/src/main.rs +++ b/nemo-cli/src/main.rs @@ -28,7 +28,7 @@ use colored::Colorize; use nemo::{ error::{Error, ReadingError}, execution::{DefaultExecutionEngine, ExecutionEngine}, - io::{input_manager::ResourceProviders, parser::parse_program, RecordWriter}, + io::{parser::parse_program, resource_providers::ResourceProviders, RecordWriter}, meta::{timing::TimedDisplay, TimedCode}, model::OutputPredicateSelection, }; diff --git a/nemo-physical/Cargo.toml b/nemo-physical/Cargo.toml index 6be857cc5..dcee77494 100644 --- a/nemo-physical/Cargo.toml +++ b/nemo-physical/Cargo.toml @@ -27,6 +27,7 @@ linked-hash-map = "0.5.6" howlong = { version = "0.1", optional = true } rio_turtle = "0.8.4" rio_xml = "0.8.4" +reqwest = "0.11.18" [dev-dependencies] arbitrary = { version = "1", features = ["derive"] } diff --git a/nemo-physical/src/error.rs b/nemo-physical/src/error.rs index 71c7c2044..784746a28 100644 --- a/nemo-physical/src/error.rs +++ b/nemo-physical/src/error.rs @@ -51,6 +51,9 @@ pub enum ReadingError { /// Error in Rio's RDF/XML parser #[error(transparent)] RioXML(#[from] rio_xml::RdfXmlError), + /// Error in Requwest's HTTP handler + #[error(transparent)] + HTTPTransfer(#[from] reqwest::Error), } /// Error-Collection for all the possible Errors occurring in this crate diff --git a/nemo-python/src/lib.rs b/nemo-python/src/lib.rs index f5e92afff..635cd7cf5 100644 --- a/nemo-python/src/lib.rs +++ b/nemo-python/src/lib.rs @@ -3,7 +3,7 @@ use std::{collections::HashSet, fs::read_to_string}; use nemo::{ datatypes::{DataValueT, Double, Float}, execution::ExecutionEngine, - io::{input_manager::ResourceProviders, OutputFileManager, RecordWriter}, + io::{resource_providers::ResourceProviders, OutputFileManager, RecordWriter}, }; use pyo3::{create_exception, prelude::*}; diff --git a/nemo-wasm/src/lib.rs b/nemo-wasm/src/lib.rs index 651024a93..8163881cd 100644 --- a/nemo-wasm/src/lib.rs +++ b/nemo-wasm/src/lib.rs @@ -7,9 +7,8 @@ use js_sys::Set; use js_sys::Uint8Array; use nemo::execution::ExecutionEngine; -use nemo::io::input_manager::ResourceProvider; -use nemo::io::input_manager::ResourceProviders; use nemo::io::parser::parse_program; +use nemo::io::resource_providers::{ResourceProvider, ResourceProviders}; use nemo_physical::datatypes::DataValueT; use nemo_physical::table_reader::Resource; use wasm_bindgen::prelude::wasm_bindgen; diff --git a/nemo/Cargo.toml b/nemo/Cargo.toml index 10f90ea02..18d3f7242 100644 --- a/nemo/Cargo.toml +++ b/nemo/Cargo.toml @@ -10,18 +10,16 @@ readme = "README.md" repository.workspace = true [features] -# TODO: Add "http-resource-provider" default = ["timing"] # Allows building for web assembly environments # Enables the "js" feature of the "getrandom" crate # This feature cannot be used together with the "timing" feature, because the "howlong" crate does not support web assembly environments -# This feature cannot be used together with the "http-resource-provider" feature, because the "reqwest" crate does not support web assembly environments with the "blocking" feature js = ["getrandom/js"] -http-resource-provider = ["dep:reqwest"] no-prefixed-string-dictionary = ["nemo-physical/no-prefixed-string-dictionary"] timing = ["nemo-physical/timing"] [dependencies] +nemo-physical = { path = "../nemo-physical", default-features = false } log = "0.4" nom = "7.1.1" macros = { path = "../libs/macros" } @@ -32,15 +30,14 @@ thiserror = "1.0" flate2 = "1" sanitise-file-name = "1.0.0" nom_locate = { version = "4.1.0", features = [ "runtime-dispatch-simd" ] } -getrandom = {version = "0.2.9", default-features = false} -reqwest = { version = "0.11.18", features = ["blocking"], optional = true } - -nemo-physical = { path = "../nemo-physical", default-features = false } +getrandom = { version = "0.2.9", default-features = false } path-slash = "0.2.1" rio_api = "0.8.4" rio_turtle = "0.8.4" rio_xml = "0.8.4" oxiri = "0.2.2" +tokio = { version = "1.29.1", features = [ "rt" ] } +reqwest = { version = "0.11.18" } [dev-dependencies] env_logger = "*" diff --git a/nemo/src/api.rs b/nemo/src/api.rs index 9f16cced4..f980e7f83 100644 --- a/nemo/src/api.rs +++ b/nemo/src/api.rs @@ -30,8 +30,8 @@ use crate::{ error::{Error, ReadingError}, execution::{DefaultExecutionEngine, ExecutionEngine}, io::{ - input_manager::ResourceProviders, parser::{all_input_consumed, RuleParser}, + resource_providers::ResourceProviders, OutputFileManager, RecordWriter, }, model::Identifier, diff --git a/nemo/src/execution/execution_engine.rs b/nemo/src/execution/execution_engine.rs index 367abcae5..46f4d1ba4 100644 --- a/nemo/src/execution/execution_engine.rs +++ b/nemo/src/execution/execution_engine.rs @@ -10,7 +10,7 @@ use nemo_physical::{ use crate::{ error::Error, - io::input_manager::{InputManager, ResourceProviders}, + io::{input_manager::InputManager, resource_providers::ResourceProviders}, model::{chase_model::ChaseProgram, Identifier, Program, TermOperation}, program_analysis::analysis::ProgramAnalysis, table_manager::TableManager, diff --git a/nemo/src/io.rs b/nemo/src/io.rs index 6e97cfaff..a7eadcf1b 100644 --- a/nemo/src/io.rs +++ b/nemo/src/io.rs @@ -9,7 +9,9 @@ pub mod input_manager; pub mod output_file_manager; pub mod parser; pub mod rdf_triples; +pub mod resource_providers; +pub use input_manager::InputManager; pub use output_file_manager::OutputFileManager; use nemo_physical::dictionary::value_serializer::TrieSerializer; diff --git a/nemo/src/io/dsv.rs b/nemo/src/io/dsv.rs index d987f35be..2af9c1f4d 100644 --- a/nemo/src/io/dsv.rs +++ b/nemo/src/io/dsv.rs @@ -16,7 +16,7 @@ //! ## Logical layer //! ``` //! # use nemo_physical::table_reader::TableReader; -//! # use nemo::{types::LogicalTypeEnum, io::{input_manager::ResourceProviders, dsv::DSVReader}}; +//! # use nemo::{types::LogicalTypeEnum, io::{resource_providers::ResourceProviders, dsv::DSVReader}}; //! # let file_path = String::from("../resources/doc/examples/city_population.csv"); //! let csv_reader = DSVReader::csv( //! ResourceProviders::default(), @@ -38,7 +38,7 @@ //! ``` //! # use nemo_physical::table_reader::TableReader; //! # -//! # use nemo::{types::LogicalTypeEnum, io::{input_manager::ResourceProviders, dsv::DSVReader}}; +//! # use nemo::{types::LogicalTypeEnum, io::{resource_providers::ResourceProviders, dsv::DSVReader}}; //! # use std::cell::RefCell; //! # use nemo_physical::builder_proxy::{ //! # PhysicalBuilderProxyEnum, PhysicalColumnBuilderProxy, PhysicalStringColumnBuilderProxy @@ -80,7 +80,7 @@ use crate::builder_proxy::LogicalColumnBuilderProxy; use crate::error::{Error, ReadingError}; use crate::types::LogicalTypeEnum; -use super::input_manager::ResourceProviders; +use super::resource_providers::ResourceProviders; /// A reader object for reading [DSV](https://en.wikipedia.org/wiki/Delimiter-separated_values) (delimiter separated values) files. /// diff --git a/nemo/src/io/input_manager.rs b/nemo/src/io/input_manager.rs index d2339a5de..3fd6dae58 100644 --- a/nemo/src/io/input_manager.rs +++ b/nemo/src/io/input_manager.rs @@ -1,21 +1,12 @@ //! Management of resource providers, handling of decompression and resolution of resources to readers. -use std::{io::Read, path::PathBuf, rc::Rc}; +use nemo_physical::management::database::TableSource; -use std::fs::File; - -use flate2::read::GzDecoder; -use nemo_physical::{ - error::ReadingError, management::database::TableSource, table_reader::Resource, -}; -use path_slash::PathBufExt; - -use crate::{error::Error, model::DataSource, types::LogicalTypeEnum}; - -use super::{ - dsv::DSVReader, - parser::{all_input_consumed, iri::iri}, - rdf_triples::RDFTriplesReader, +use crate::{ + error::Error, + io::{dsv::DSVReader, rdf_triples::RDFTriplesReader, resource_providers::ResourceProviders}, + model::DataSource, + types::LogicalTypeEnum, }; /// Manages everything related to resolving the inputs of a Nemo program. @@ -25,153 +16,9 @@ pub struct InputManager { resource_providers: ResourceProviders, } -/// A list of [`ResourceProvider`] sorted by decreasing priority. -/// -/// This allows resolving a given resource, which may occur in a Nemo program, -/// to a reader (which return the actual by of e.g. a referenced file). -/// -/// The list of [`ResourceProviders`] can be customized by users of the Rust nemo crate. -#[derive(Debug, Clone)] -pub struct ResourceProviders(Rc>>); - -impl ResourceProviders { - /// Construct using a list of [`ResourceProvider`]s - pub fn from(r: Vec>) -> Self { - Self(Rc::new(r)) - } - - /// Returns instance which is unable to resolve any resources. - pub fn empty() -> Self { - Self(Rc::new(vec![])) - } - - /// Resolves a resource. - /// - /// First checks if the resource can be opened as gzip, otherwise opens the file directly. - pub fn open_resource( - &self, - resource: &Resource, - try_gzip: bool, - ) -> Result, ReadingError> { - for resource_provider in self.0.iter() { - if let Some(reader) = resource_provider.open_resource(resource)? { - if !try_gzip { - return Ok(reader); - } - - // Try opening with gzip - let gz_reader = GzDecoder::new(reader); - - if gz_reader.header().is_some() { - return Ok(Box::new(gz_reader)); - } else { - // Try again without gzip, otherwise go to next provider - if let Some(reader) = resource_provider.open_resource(resource)? { - return Ok(reader); - }; - } - } - } - - Err(ReadingError::ResourceNotProvided { - resource: resource.clone(), - }) - } -} - -impl Default for ResourceProviders { - fn default() -> Self { - Self(Rc::new(vec![ - Box::::default(), - Box::::default(), - ])) - } -} - -/// Allows resolving resources to readers. -/// -/// This allows specifying how to resolve a resource independent of how the file format is going to be parsed. -pub trait ResourceProvider: std::fmt::Debug { - /// Resolve and open a resource. - /// - /// This function may be called multiple times in a row, e.g. when testing if a file can be opened using gzip. - /// - /// The implementation can decide wether ir wants to handle the given resource, otherwise it can return `None`, and the next `ResourceProvider` will be consulted. - fn open_resource(&self, resource: &Resource) -> Result>, ReadingError>; -} - -fn is_iri(resource: &Resource) -> bool { - all_input_consumed(iri)(resource).is_ok() -} - -/// Resolves resources from the OS-provided file system. -/// -/// Handles `file:` IRIs and non-IRI, (possibly relative) file paths. -#[derive(Debug, Clone, Copy, Default)] -pub struct FileResourceProvider {} - -impl ResourceProvider for FileResourceProvider { - fn open_resource(&self, resource: &Resource) -> Result>, ReadingError> { - // Try to parse as file IRI - let path = if is_iri(resource) { - if resource.starts_with("file://") { - // File URI. We only support local files, i.e., URIs - // where the host part is either empty or `localhost`. - - let path = resource - .strip_prefix("file://localhost") - .or_else(|| resource.strip_prefix("file://")) - .ok_or_else(|| ReadingError::InvalidFileUri(resource.to_string()))?; - PathBuf::from_slash(path) - } else { - // Non-file IRI, file resource provider is not responsible - return Ok(None); - } - } else { - // Not a valid URI, interpret as path directly - PathBuf::from(resource) - }; - - let file = File::open(path)?; - Ok(Some(Box::new(file))) - } -} - -/// Resolves resources using HTTP or HTTPS. -/// -/// Handles `http:` and `https:` IRIs. -#[derive(Debug, Clone, Copy, Default)] -pub struct HTTPResourceProvider {} - -impl ResourceProvider for HTTPResourceProvider { - fn open_resource(&self, resource: &Resource) -> Result>, ReadingError> { - if !is_iri(resource) { - return Ok(None); - } - - if !(resource.starts_with("http:") || resource.starts_with("https:")) { - // Non-http IRI, resource provider is not responsible - return Ok(None); - } - - #[cfg(not(feature = "http-resource-provider"))] - { - panic!("Using the HTTPResourceProvider requires the http-resource-provider feature") - } - #[cfg(feature = "http-resource-provider")] - { - let response = reqwest::blocking::get(resource).unwrap(); - Ok(Some(Box::new(response))) - } - } -} - -// See https://doc.rust-lang.org/cargo/reference/features.html#mutually-exclusive-features -#[cfg(all(feature = "js", feature = "http-resource-provider"))] -compile_error!("feature \"js\" and feature \"http-resource-provider\" cannot be enabled at the same time, because the \"reqwest\" crate does not support web assembly environments with the \"blocking\" feature"); - impl InputManager { - #[allow(missing_docs)] + /// Create a new [input manager][InputManager] from the given + /// [resource providers][ResourceProviders]. pub fn new(resource_providers: ResourceProviders) -> Self { Self { resource_providers } } diff --git a/nemo/src/io/rdf_triples.rs b/nemo/src/io/rdf_triples.rs index 051ac22b7..fad1511ae 100644 --- a/nemo/src/io/rdf_triples.rs +++ b/nemo/src/io/rdf_triples.rs @@ -13,7 +13,7 @@ use rio_xml::RdfXmlParser; use crate::types::LogicalTypeEnum; -use super::input_manager::ResourceProviders; +use super::resource_providers::ResourceProviders; /// A [`TableReader`] for RDF 1.1 files containing triples. #[derive(Debug, Clone)] diff --git a/nemo/src/io/resource_providers.rs b/nemo/src/io/resource_providers.rs new file mode 100644 index 000000000..b02125625 --- /dev/null +++ b/nemo/src/io/resource_providers.rs @@ -0,0 +1,92 @@ +//! Resource providers for external resources that can be used in reasoning. + +use std::{io::Read, rc::Rc}; + +use flate2::read::GzDecoder; + +use crate::io::parser::{all_input_consumed, iri::iri}; +use nemo_physical::{error::ReadingError, table_reader::Resource}; + +/// A resource provider for files. +pub mod file; +/// A resource provider for HTTP(s) requests. +pub mod http; + +fn is_iri(resource: &Resource) -> bool { + all_input_consumed(iri)(resource).is_ok() +} + +/// Allows resolving resources to readers. +/// +/// This allows specifying how to resolve a resource independent of how the file format is going to be parsed. +pub trait ResourceProvider: std::fmt::Debug { + /// Resolve and open a resource. + /// + /// This function may be called multiple times in a row, e.g. when testing if a file can be opened using gzip. + /// + /// The implementation can decide wether ir wants to handle the given resource, otherwise it can return `None`, and the next `ResourceProvider` will be consulted. + fn open_resource(&self, resource: &Resource) -> Result>, ReadingError>; +} + +/// A list of [`ResourceProvider`] sorted by decreasing priority. +/// +/// This allows resolving a given resource, which may occur in a Nemo program, +/// to a reader (which return the actual by of e.g. a referenced file). +/// +/// The list of [`ResourceProviders`] can be customized by users of the Rust nemo crate. +#[derive(Debug, Clone)] +pub struct ResourceProviders(Rc>>); + +impl ResourceProviders { + /// Construct using a list of [`ResourceProvider`]s + pub fn from(r: Vec>) -> Self { + Self(Rc::new(r)) + } + + /// Returns instance which is unable to resolve any resources. + pub fn empty() -> Self { + Self(Rc::new(vec![])) + } + + /// Resolves a resource. + /// + /// First checks if the resource can be opened as gzip, otherwise opens the file directly. + pub fn open_resource( + &self, + resource: &Resource, + try_gzip: bool, + ) -> Result, ReadingError> { + for resource_provider in self.0.iter() { + if let Some(reader) = resource_provider.open_resource(resource)? { + if !try_gzip { + return Ok(reader); + } + + // Try opening with gzip + let gz_reader = GzDecoder::new(reader); + + if gz_reader.header().is_some() { + return Ok(Box::new(gz_reader)); + } else { + // Try again without gzip, otherwise go to next provider + if let Some(reader) = resource_provider.open_resource(resource)? { + return Ok(reader); + }; + } + } + } + + Err(ReadingError::ResourceNotProvided { + resource: resource.clone(), + }) + } +} + +impl Default for ResourceProviders { + fn default() -> Self { + Self(Rc::new(vec![ + Box::::default(), + Box::::default(), + ])) + } +} diff --git a/nemo/src/io/resource_providers/file.rs b/nemo/src/io/resource_providers/file.rs new file mode 100644 index 000000000..4fa15bd3a --- /dev/null +++ b/nemo/src/io/resource_providers/file.rs @@ -0,0 +1,39 @@ +use std::{fs::File, io::Read, path::PathBuf}; + +use nemo_physical::{error::ReadingError, table_reader::Resource}; +use path_slash::PathBufExt; + +use super::{is_iri, ResourceProvider}; + +/// Resolves resources from the OS-provided file system. +/// +/// Handles `file:` IRIs and non-IRI, (possibly relative) file paths. +#[derive(Debug, Clone, Copy, Default)] +pub struct FileResourceProvider {} + +impl ResourceProvider for FileResourceProvider { + fn open_resource(&self, resource: &Resource) -> Result>, ReadingError> { + // Try to parse as file IRI + let path = if is_iri(resource) { + if resource.starts_with("file://") { + // File URI. We only support local files, i.e., URIs + // where the host part is either empty or `localhost`. + + let path = resource + .strip_prefix("file://localhost") + .or_else(|| resource.strip_prefix("file://")) + .ok_or_else(|| ReadingError::InvalidFileUri(resource.to_string()))?; + PathBuf::from_slash(path) + } else { + // Non-file IRI, file resource provider is not responsible + return Ok(None); + } + } else { + // Not a valid URI, interpret as path directly + PathBuf::from(resource) + }; + + let file = File::open(path)?; + Ok(Some(Box::new(file))) + } +} diff --git a/nemo/src/io/resource_providers/http.rs b/nemo/src/io/resource_providers/http.rs new file mode 100644 index 000000000..0a4cb2fa5 --- /dev/null +++ b/nemo/src/io/resource_providers/http.rs @@ -0,0 +1,69 @@ +use std::io::Read; + +use nemo_physical::{error::ReadingError, table_reader::Resource}; + +use super::{is_iri, ResourceProvider}; + +/// Resolves resources using HTTP or HTTPS. +/// +/// Handles `http:` and `https:` IRIs. +#[derive(Debug, Clone, Copy, Default)] +pub struct HTTPResourceProvider {} + +/// The result of fetching a resource via HTTP. +#[derive(Debug, Clone)] +pub struct HTTPResource { + /// IRI that this resource was fetched from + url: Resource, + /// Content of the resource + content: String, +} + +impl HTTPResource { + /// Return the IRI this resource was fetched from. + pub fn url(&self) -> &Resource { + &self.url + } + + /// Return the content of this resource. + pub fn content(&self) -> &String { + &self.content + } +} + +impl Read for HTTPResource { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + self.content.as_bytes().read(buf) + } +} + +impl HTTPResourceProvider { + async fn get(url: &Resource) -> Result { + let response = reqwest::get(url).await?; + let content = response.text().await?; + + Ok(HTTPResource { + url: url.to_string(), + content, + }) + } +} + +impl ResourceProvider for HTTPResourceProvider { + fn open_resource(&self, resource: &Resource) -> Result>, ReadingError> { + if !is_iri(resource) { + return Ok(None); + } + + if !(resource.starts_with("http:") || resource.starts_with("https:")) { + // Non-http IRI, resource provider is not responsible + return Ok(None); + } + + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()?; + let response = rt.block_on(Self::get(resource))?; + Ok(Some(Box::new(response))) + } +}