Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Non-experimental aggregates #407

Merged
merged 4 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 104 additions & 100 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 0 additions & 7 deletions nemo-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,6 @@ fn run(mut cli: CliApp) -> Result<(), Error> {
log::info!("Rules parsed");
log::trace!("{:?}", program);

for atom in program.rules().iter().flat_map(|rule| rule.head()) {
if atom.aggregates().next().is_some() {
log::warn!("Program is using the experimental aggregates feature and currently depends on the internally chosen variable orders for predicates.",);
break;
}
}

let parsed_fact = cli.trace_fact.map(parse_fact).transpose()?;

if cli.write_all_idb_predicates {
Expand Down
1 change: 1 addition & 0 deletions nemo-physical/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ default = ["timing"]
timing = ["dep:howlong"]

[dependencies]
enum_dispatch = "0.3.12"
log = "0.4"
bytesize = "1.2"
thiserror = "1.0"
Expand Down
34 changes: 23 additions & 11 deletions nemo-physical/src/aggregates/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
use crate::datatypes::DataTypeName;

use super::processors::{
aggregate::Aggregate, count_aggregate::CountAggregateProcessor,
max_aggregate::MaxAggregateProcessor, min_aggregate::MinAggregateProcessor,
processor::AggregateProcessor, sum_aggregate::SumAggregateProcessor,
aggregate::Aggregate,
count_aggregate::CountAggregateProcessor,
max_aggregate::MaxAggregateProcessor,
min_aggregate::MinAggregateProcessor,
processor::{AggregateProcessor, AggregateProcessorT},
sum_aggregate::SumAggregateProcessor,
};

#[derive(Clone, Copy, Debug, PartialEq)]
Expand All @@ -23,14 +26,23 @@ pub enum AggregateOperation {

impl AggregateOperation {
/// Creates a new aggregate processor for the given aggregate operation.
/// TODO: This is currently implemented using dynamic dispatch, but this may change in the future.
pub fn create_processor<A: Aggregate>(&self) -> Box<dyn AggregateProcessor<A>> {
match self {
AggregateOperation::Count => Box::new(CountAggregateProcessor::new()),
AggregateOperation::Max => Box::new(MaxAggregateProcessor::new()),
AggregateOperation::Min => Box::new(MinAggregateProcessor::new()),
AggregateOperation::Sum => Box::new(SumAggregateProcessor::new()),
}
pub(crate) fn create_processor<A: Aggregate>(&self) -> AggregateProcessorT<A> {
let aggregate_processor: AggregateProcessorT<A> = match self {
AggregateOperation::Count => CountAggregateProcessor::new().into(),
AggregateOperation::Max => MaxAggregateProcessor::new().into(),
AggregateOperation::Min => MinAggregateProcessor::new().into(),
AggregateOperation::Sum => SumAggregateProcessor::new().into(),
};

aggregate_processor
}

/// Returns whether the aggregate processor is invariant to being called with the same aggregated value multiple times in a row.
/// This function has to return the same value independent of the aggregated value type.
///
/// If `true` is returned this allows for additional optimizations when creating the execution plan. In particular, peripheral variables (not group-by, aggregate or distinct variables) can be converted to distinct variables in an idempotent aggregate processor without changing the semantics of the aggregate.
pub fn idempotent<A: Aggregate>(&self) -> bool {
self.create_processor::<A>().idempotent()
}

/// Returns whether the aggregate operation always produces an aggregate output column of the same type.
Expand Down
2 changes: 2 additions & 0 deletions nemo-physical/src/aggregates/processors/count_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use super::{
processor::{AggregateGroupProcessor, AggregateProcessor},
};

#[derive(Debug)]
pub(crate) struct CountAggregateProcessor<A>
where
A: Aggregate,
Expand All @@ -34,6 +35,7 @@ impl<A: Aggregate> AggregateProcessor<A> for CountAggregateProcessor<A> {
}
}

#[derive(Debug)]
pub(crate) struct CountAggregateGroupProcessor<A>
where
A: Aggregate,
Expand Down
2 changes: 2 additions & 0 deletions nemo-physical/src/aggregates/processors/max_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use super::{
processor::{AggregateGroupProcessor, AggregateProcessor},
};

#[derive(Debug)]
pub(crate) struct MaxAggregateProcessor<A>
where
A: PartialEq + PartialOrd + 'static,
Expand All @@ -34,6 +35,7 @@ impl<A: Aggregate> AggregateProcessor<A> for MaxAggregateProcessor<A> {
}
}

#[derive(Debug)]
pub(crate) struct MaxAggregateGroupProcessor<A>
where
A: Aggregate,
Expand Down
2 changes: 2 additions & 0 deletions nemo-physical/src/aggregates/processors/min_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use super::{
processor::{AggregateGroupProcessor, AggregateProcessor},
};

#[derive(Debug)]
pub(crate) struct MinAggregateProcessor<A>
where
A: Aggregate,
Expand All @@ -34,6 +35,7 @@ impl<A: Aggregate> AggregateProcessor<A> for MinAggregateProcessor<A> {
}
}

#[derive(Debug)]
pub(crate) struct MinAggregateGroupProcessor<A>
where
A: Aggregate,
Expand Down
37 changes: 33 additions & 4 deletions nemo-physical/src/aggregates/processors/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,42 @@

use crate::datatypes::StorageValueT;

use super::aggregate::Aggregate;
use super::{
aggregate::Aggregate,
count_aggregate::{CountAggregateGroupProcessor, CountAggregateProcessor},
max_aggregate::{MaxAggregateGroupProcessor, MaxAggregateProcessor},
min_aggregate::{MinAggregateGroupProcessor, MinAggregateProcessor},
sum_aggregate::{SumAggregateGroupProcessor, SumAggregateProcessor},
};

use enum_dispatch::enum_dispatch;

/// Allows for aggregation of a column, by providing [`AggregateGroupProcessor`] for every group in the input trie scan.
pub trait AggregateProcessor<A: Aggregate> {
#[enum_dispatch]
pub(crate) trait AggregateProcessor<A: Aggregate> {
/// Returns whether the aggregate processor is invariant to being called with the same aggregated value multiple times in a row.
/// This function has to return the same value independent of the aggregated value type.
///
/// If `true` is returned this allows for additional optimizations when creating the execution plan (e.g. not needing to reorder if the distinct variables are in the wrong variable order).
/// If `true` is returned this allows for additional optimizations when creating the execution plan. In particular, peripheral variables (not group-by, aggregate or distinct variables) can be converted to distinct variables in an idempotent aggregate processor without changing the semantics of the aggregate.
///
/// See [`super::super::operation::AggregateOperation::idempotent`]
fn idempotent(&self) -> bool;

/// Creates a [`AggregateGroupProcessor`] for aggregating values with the same values in group-by columns.
fn group(&self) -> Box<dyn AggregateGroupProcessor<A>>;
}

#[enum_dispatch(AggregateProcessor<A>)]
#[derive(Debug)]
pub(crate) enum AggregateProcessorT<A: Aggregate> {
Count(CountAggregateProcessor<A>),
Max(MaxAggregateProcessor<A>),
Min(MinAggregateProcessor<A>),
Sum(SumAggregateProcessor<A>),
}

/// Allows aggregation of multiple rows (all with the same group-by values) to produce a single aggregate value.
pub trait AggregateGroupProcessor<A>
pub(crate) trait AggregateGroupProcessor<A>
where
A: Aggregate,
{
Expand All @@ -27,3 +47,12 @@ where
/// Returns the resulting aggregated value of all the processed input values.
fn finish(&self) -> Option<StorageValueT>;
}

#[enum_dispatch(AggregateGroupProcessor<A>)]
#[derive(Debug)]
pub(crate) enum AggregateGroupProcessorT<A: Aggregate> {
Count(CountAggregateGroupProcessor<A>),
Max(MaxAggregateGroupProcessor<A>),
Min(MinAggregateGroupProcessor<A>),
Sum(SumAggregateGroupProcessor<A>),
}
2 changes: 2 additions & 0 deletions nemo-physical/src/aggregates/processors/sum_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use super::{
processor::{AggregateGroupProcessor, AggregateProcessor},
};

#[derive(Debug)]
pub(crate) struct SumAggregateProcessor<A>
where
A: Aggregate,
Expand All @@ -34,6 +35,7 @@ impl<A: Aggregate> AggregateProcessor<A> for SumAggregateProcessor<A> {
}
}

#[derive(Debug)]
pub(crate) struct SumAggregateGroupProcessor<A>
where
A: Aggregate,
Expand Down
16 changes: 10 additions & 6 deletions nemo-physical/src/tabular/operations/triescan_aggregate.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::{cell::UnsafeCell, fmt::Debug};

use crate::{
aggregates::{operation::AggregateOperation, processors::processor::AggregateGroupProcessor},
aggregates::{
operation::AggregateOperation,
processors::processor::{AggregateGroupProcessor, AggregateProcessor},
},
columnar::traits::columnscan::ColumnScanT,
datatypes::{Double, Float, StorageTypeName, StorageValueT},
tabular::traits::{partial_trie_scan::PartialTrieScan, trie_scan::TrieScan},
Expand All @@ -22,7 +25,7 @@ enum AggregatedOutputValue {
/// Describes which columns of the input trie scan will be group-by, distinct and aggregate columns and other information about the aggregation.
#[derive(Debug, Clone, Copy)]
pub struct AggregationInstructions {
/// Name of the aggregate operation, which determines the [`AggregateGroupProcessor`] that will be used
/// Type of the aggregate operation, which determines the aggregate processor that will be used
pub aggregate_operation: AggregateOperation,
/// Number of group-by columns
///
Expand Down Expand Up @@ -74,12 +77,13 @@ impl AggregationInstructions {
/// [`TrieScan`] which performs an aggregate operation.
///
/// Input columns:
/// * Possibly group-by columns
/// * A single aggregated column possibly mixed with additional distinct columns
/// * Zero or more group-by columns, followed by
/// * a single aggregated column possibly mixed with additional distinct columns, followed by
/// * zero or more peripheral columns, that do not impact the result of the aggregate at all and are not used during aggregation.
///
/// Output columns:
/// * Group-by columns
/// * Aggregate output column
/// * Zero or more group-by columns, followed by
/// * one aggregate output column
#[derive(Debug)]
pub struct TrieScanAggregate<T: TrieScan> {
aggregated_input_column_storage_type: StorageTypeName,
Expand Down
2 changes: 1 addition & 1 deletion nemo-python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
This crate provide python bindings for the `nemo` crate.

> **Note**
> These bindings are currently in an experimental state and likely subject to change.
> These bindings are currently in an experimental state and subject to change.

## Building

Expand Down
2 changes: 1 addition & 1 deletion nemo-wasm/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
This crate provides a Web Assembly build and JavaScript/TypeScript bindings for the `nemo` crate.

> **Note**
> These bindings are currently in an experimental state and likely subject to change.
> These bindings are currently in an experimental state and subject to change.

## Building

Expand Down
4 changes: 2 additions & 2 deletions nemo-wasm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ struct SyncAccessHandleWriter(web_sys::FileSystemSyncAccessHandle);
#[cfg(web_sys_unstable_apis)]
impl std::io::Write for SyncAccessHandleWriter {
fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
let mut buf: Vec<_> = buf.into();
let bytes_written = self.0.write_with_u8_array(&mut buf).map_err(|js_value| {
let buf: Vec<_> = buf.into();
let bytes_written = self.0.write_with_u8_array(&buf).map_err(|js_value| {
std_io_error_from_js_value(
js_value,
"Error while writing to FileSystemSyncAccessHandle",
Expand Down
2 changes: 2 additions & 0 deletions nemo/src/execution/planning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,6 @@ pub mod plan_util;

pub mod negation;

mod aggregates;

pub mod arithmetic;
Loading