Skip to content

Commit

Permalink
Merge pull request #407 from knowsys/feature/406-non-experimental-agg…
Browse files Browse the repository at this point in the history
…regates

Non-experimental aggregates
  • Loading branch information
rlwww authored Nov 9, 2023
2 parents 4a563b3 + 14bc398 commit 4eb59be
Show file tree
Hide file tree
Showing 28 changed files with 450 additions and 212 deletions.
204 changes: 104 additions & 100 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 0 additions & 7 deletions nemo-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,6 @@ fn run(mut cli: CliApp) -> Result<(), Error> {
log::info!("Rules parsed");
log::trace!("{:?}", program);

for atom in program.rules().iter().flat_map(|rule| rule.head()) {
if atom.aggregates().next().is_some() {
log::warn!("Program is using the experimental aggregates feature and currently depends on the internally chosen variable orders for predicates.",);
break;
}
}

let parsed_fact = cli.trace_fact.map(parse_fact).transpose()?;

if cli.write_all_idb_predicates {
Expand Down
1 change: 1 addition & 0 deletions nemo-physical/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ default = ["timing"]
timing = ["dep:howlong"]

[dependencies]
enum_dispatch = "0.3.12"
log = "0.4"
bytesize = "1.2"
thiserror = "1.0"
Expand Down
34 changes: 23 additions & 11 deletions nemo-physical/src/aggregates/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
use crate::datatypes::DataTypeName;

use super::processors::{
aggregate::Aggregate, count_aggregate::CountAggregateProcessor,
max_aggregate::MaxAggregateProcessor, min_aggregate::MinAggregateProcessor,
processor::AggregateProcessor, sum_aggregate::SumAggregateProcessor,
aggregate::Aggregate,
count_aggregate::CountAggregateProcessor,
max_aggregate::MaxAggregateProcessor,
min_aggregate::MinAggregateProcessor,
processor::{AggregateProcessor, AggregateProcessorT},
sum_aggregate::SumAggregateProcessor,
};

#[derive(Clone, Copy, Debug, PartialEq)]
Expand All @@ -23,14 +26,23 @@ pub enum AggregateOperation {

impl AggregateOperation {
/// Creates a new aggregate processor for the given aggregate operation.
/// TODO: This is currently implemented using dynamic dispatch, but this may change in the future.
pub fn create_processor<A: Aggregate>(&self) -> Box<dyn AggregateProcessor<A>> {
match self {
AggregateOperation::Count => Box::new(CountAggregateProcessor::new()),
AggregateOperation::Max => Box::new(MaxAggregateProcessor::new()),
AggregateOperation::Min => Box::new(MinAggregateProcessor::new()),
AggregateOperation::Sum => Box::new(SumAggregateProcessor::new()),
}
pub(crate) fn create_processor<A: Aggregate>(&self) -> AggregateProcessorT<A> {
let aggregate_processor: AggregateProcessorT<A> = match self {
AggregateOperation::Count => CountAggregateProcessor::new().into(),
AggregateOperation::Max => MaxAggregateProcessor::new().into(),
AggregateOperation::Min => MinAggregateProcessor::new().into(),
AggregateOperation::Sum => SumAggregateProcessor::new().into(),
};

aggregate_processor
}

/// Returns whether the aggregate processor is invariant to being called with the same aggregated value multiple times in a row.
/// This function has to return the same value independent of the aggregated value type.
///
/// If `true` is returned this allows for additional optimizations when creating the execution plan. In particular, peripheral variables (not group-by, aggregate or distinct variables) can be converted to distinct variables in an idempotent aggregate processor without changing the semantics of the aggregate.
pub fn idempotent<A: Aggregate>(&self) -> bool {
self.create_processor::<A>().idempotent()
}

/// Returns whether the aggregate operation always produces an aggregate output column of the same type.
Expand Down
2 changes: 2 additions & 0 deletions nemo-physical/src/aggregates/processors/count_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use super::{
processor::{AggregateGroupProcessor, AggregateProcessor},
};

#[derive(Debug)]
pub(crate) struct CountAggregateProcessor<A>
where
A: Aggregate,
Expand All @@ -34,6 +35,7 @@ impl<A: Aggregate> AggregateProcessor<A> for CountAggregateProcessor<A> {
}
}

#[derive(Debug)]
pub(crate) struct CountAggregateGroupProcessor<A>
where
A: Aggregate,
Expand Down
2 changes: 2 additions & 0 deletions nemo-physical/src/aggregates/processors/max_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use super::{
processor::{AggregateGroupProcessor, AggregateProcessor},
};

#[derive(Debug)]
pub(crate) struct MaxAggregateProcessor<A>
where
A: PartialEq + PartialOrd + 'static,
Expand All @@ -34,6 +35,7 @@ impl<A: Aggregate> AggregateProcessor<A> for MaxAggregateProcessor<A> {
}
}

#[derive(Debug)]
pub(crate) struct MaxAggregateGroupProcessor<A>
where
A: Aggregate,
Expand Down
2 changes: 2 additions & 0 deletions nemo-physical/src/aggregates/processors/min_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use super::{
processor::{AggregateGroupProcessor, AggregateProcessor},
};

#[derive(Debug)]
pub(crate) struct MinAggregateProcessor<A>
where
A: Aggregate,
Expand All @@ -34,6 +35,7 @@ impl<A: Aggregate> AggregateProcessor<A> for MinAggregateProcessor<A> {
}
}

#[derive(Debug)]
pub(crate) struct MinAggregateGroupProcessor<A>
where
A: Aggregate,
Expand Down
37 changes: 33 additions & 4 deletions nemo-physical/src/aggregates/processors/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,42 @@
use crate::datatypes::StorageValueT;

use super::aggregate::Aggregate;
use super::{
aggregate::Aggregate,
count_aggregate::{CountAggregateGroupProcessor, CountAggregateProcessor},
max_aggregate::{MaxAggregateGroupProcessor, MaxAggregateProcessor},
min_aggregate::{MinAggregateGroupProcessor, MinAggregateProcessor},
sum_aggregate::{SumAggregateGroupProcessor, SumAggregateProcessor},
};

use enum_dispatch::enum_dispatch;

/// Allows for aggregation of a column, by providing [`AggregateGroupProcessor`] for every group in the input trie scan.
pub trait AggregateProcessor<A: Aggregate> {
#[enum_dispatch]
pub(crate) trait AggregateProcessor<A: Aggregate> {
/// Returns whether the aggregate processor is invariant to being called with the same aggregated value multiple times in a row.
/// This function has to return the same value independent of the aggregated value type.
///
/// If `true` is returned this allows for additional optimizations when creating the execution plan (e.g. not needing to reorder if the distinct variables are in the wrong variable order).
/// If `true` is returned this allows for additional optimizations when creating the execution plan. In particular, peripheral variables (not group-by, aggregate or distinct variables) can be converted to distinct variables in an idempotent aggregate processor without changing the semantics of the aggregate.
///
/// See [`super::super::operation::AggregateOperation::idempotent`]
fn idempotent(&self) -> bool;

/// Creates a [`AggregateGroupProcessor`] for aggregating values with the same values in group-by columns.
fn group(&self) -> Box<dyn AggregateGroupProcessor<A>>;
}

#[enum_dispatch(AggregateProcessor<A>)]
#[derive(Debug)]
pub(crate) enum AggregateProcessorT<A: Aggregate> {
Count(CountAggregateProcessor<A>),
Max(MaxAggregateProcessor<A>),
Min(MinAggregateProcessor<A>),
Sum(SumAggregateProcessor<A>),
}

/// Allows aggregation of multiple rows (all with the same group-by values) to produce a single aggregate value.
pub trait AggregateGroupProcessor<A>
pub(crate) trait AggregateGroupProcessor<A>
where
A: Aggregate,
{
Expand All @@ -27,3 +47,12 @@ where
/// Returns the resulting aggregated value of all the processed input values.
fn finish(&self) -> Option<StorageValueT>;
}

#[enum_dispatch(AggregateGroupProcessor<A>)]
#[derive(Debug)]
pub(crate) enum AggregateGroupProcessorT<A: Aggregate> {
Count(CountAggregateGroupProcessor<A>),
Max(MaxAggregateGroupProcessor<A>),
Min(MinAggregateGroupProcessor<A>),
Sum(SumAggregateGroupProcessor<A>),
}
2 changes: 2 additions & 0 deletions nemo-physical/src/aggregates/processors/sum_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use super::{
processor::{AggregateGroupProcessor, AggregateProcessor},
};

#[derive(Debug)]
pub(crate) struct SumAggregateProcessor<A>
where
A: Aggregate,
Expand All @@ -34,6 +35,7 @@ impl<A: Aggregate> AggregateProcessor<A> for SumAggregateProcessor<A> {
}
}

#[derive(Debug)]
pub(crate) struct SumAggregateGroupProcessor<A>
where
A: Aggregate,
Expand Down
16 changes: 10 additions & 6 deletions nemo-physical/src/tabular/operations/triescan_aggregate.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::{cell::UnsafeCell, fmt::Debug};

use crate::{
aggregates::{operation::AggregateOperation, processors::processor::AggregateGroupProcessor},
aggregates::{
operation::AggregateOperation,
processors::processor::{AggregateGroupProcessor, AggregateProcessor},
},
columnar::traits::columnscan::ColumnScanT,
datatypes::{Double, Float, StorageTypeName, StorageValueT},
tabular::traits::{partial_trie_scan::PartialTrieScan, trie_scan::TrieScan},
Expand All @@ -22,7 +25,7 @@ enum AggregatedOutputValue {
/// Describes which columns of the input trie scan will be group-by, distinct and aggregate columns and other information about the aggregation.
#[derive(Debug, Clone, Copy)]
pub struct AggregationInstructions {
/// Name of the aggregate operation, which determines the [`AggregateGroupProcessor`] that will be used
/// Type of the aggregate operation, which determines the aggregate processor that will be used
pub aggregate_operation: AggregateOperation,
/// Number of group-by columns
///
Expand Down Expand Up @@ -74,12 +77,13 @@ impl AggregationInstructions {
/// [`TrieScan`] which performs an aggregate operation.
///
/// Input columns:
/// * Possibly group-by columns
/// * A single aggregated column possibly mixed with additional distinct columns
/// * Zero or more group-by columns, followed by
/// * a single aggregated column possibly mixed with additional distinct columns, followed by
/// * zero or more peripheral columns, that do not impact the result of the aggregate at all and are not used during aggregation.
///
/// Output columns:
/// * Group-by columns
/// * Aggregate output column
/// * Zero or more group-by columns, followed by
/// * one aggregate output column
#[derive(Debug)]
pub struct TrieScanAggregate<T: TrieScan> {
aggregated_input_column_storage_type: StorageTypeName,
Expand Down
2 changes: 1 addition & 1 deletion nemo-python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
This crate provide python bindings for the `nemo` crate.

> **Note**
> These bindings are currently in an experimental state and likely subject to change.
> These bindings are currently in an experimental state and subject to change.
## Building

Expand Down
2 changes: 1 addition & 1 deletion nemo-wasm/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
This crate provides a Web Assembly build and JavaScript/TypeScript bindings for the `nemo` crate.

> **Note**
> These bindings are currently in an experimental state and likely subject to change.
> These bindings are currently in an experimental state and subject to change.
## Building

Expand Down
4 changes: 2 additions & 2 deletions nemo-wasm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ struct SyncAccessHandleWriter(web_sys::FileSystemSyncAccessHandle);
#[cfg(web_sys_unstable_apis)]
impl std::io::Write for SyncAccessHandleWriter {
fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
let mut buf: Vec<_> = buf.into();
let bytes_written = self.0.write_with_u8_array(&mut buf).map_err(|js_value| {
let buf: Vec<_> = buf.into();
let bytes_written = self.0.write_with_u8_array(&buf).map_err(|js_value| {
std_io_error_from_js_value(
js_value,
"Error while writing to FileSystemSyncAccessHandle",
Expand Down
2 changes: 2 additions & 0 deletions nemo/src/execution/planning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,6 @@ pub mod plan_util;

pub mod negation;

mod aggregates;

pub mod arithmetic;
Loading

0 comments on commit 4eb59be

Please sign in to comment.