Skip to content

Commit

Permalink
Add SQL aggregates ANY, SOME, EVERY and their COLL_ versions
Browse files Browse the repository at this point in the history
  • Loading branch information
alancai98 committed May 18, 2023
1 parent b1d164f commit 237826e
Show file tree
Hide file tree
Showing 8 changed files with 355 additions and 31 deletions.
129 changes: 111 additions & 18 deletions partiql-eval/src/eval/evaluable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,8 @@ pub(crate) enum AggFunc {
Max(Max),
Min(Min),
Sum(Sum),
Any(Any),
Every(Every),
}

impl AggregateFunction for AggFunc {
Expand All @@ -327,6 +329,8 @@ impl AggregateFunction for AggFunc {
AggFunc::Max(v) => v.next_value(input_value, group),
AggFunc::Min(v) => v.next_value(input_value, group),
AggFunc::Sum(v) => v.next_value(input_value, group),
AggFunc::Any(v) => v.next_value(input_value, group),
AggFunc::Every(v) => v.next_value(input_value, group),
}
}

Expand All @@ -337,6 +341,8 @@ impl AggregateFunction for AggFunc {
AggFunc::Max(v) => v.compute(group),
AggFunc::Min(v) => v.compute(group),
AggFunc::Sum(v) => v.compute(group),
AggFunc::Any(v) => v.compute(group),
AggFunc::Every(v) => v.compute(group),
}
}
}
Expand Down Expand Up @@ -436,7 +442,7 @@ impl AggregateFunction for Avg {
}

fn compute(&self, group: &Tuple) -> Value {
match self.avgs.get(group).expect("Expect group to exist in avgs") {
match self.avgs.get(group).unwrap_or(&(0, Null)) {
(0, _) => Null,
(c, s) => s / &Value::Decimal(rust_decimal::Decimal::from(*c)),
}
Expand Down Expand Up @@ -483,11 +489,7 @@ impl AggregateFunction for Count {
}

fn compute(&self, group: &Tuple) -> Value {
Value::from(
self.counts
.get(group)
.expect("Expect group to exist in counts"),
)
Value::from(self.counts.get(group).unwrap_or(&0))
}
}

Expand Down Expand Up @@ -531,10 +533,7 @@ impl AggregateFunction for Max {
}

fn compute(&self, group: &Tuple) -> Value {
self.maxes
.get(group)
.expect("Expect group to exist in sums")
.clone()
self.maxes.get(group).unwrap_or(&Null).clone()
}
}

Expand Down Expand Up @@ -578,10 +577,7 @@ impl AggregateFunction for Min {
}

fn compute(&self, group: &Tuple) -> Value {
self.mins
.get(group)
.expect("Expect group to exist in mins")
.clone()
self.mins.get(group).unwrap_or(&Null).clone()
}
}

Expand Down Expand Up @@ -625,10 +621,107 @@ impl AggregateFunction for Sum {
}

fn compute(&self, group: &Tuple) -> Value {
self.sums
.get(group)
.expect("Expect group to exist in sums")
.clone()
self.sums.get(group).unwrap_or(&Null).clone()
}
}

/// Represents SQL's `ANY`/`SOME` aggregation function
#[derive(Debug)]
pub(crate) struct Any {
anys: HashMap<Tuple, Value>,
aggregator: AggFilterFn,
}

impl Any {
pub(crate) fn new_distinct() -> Self {
Any {
anys: HashMap::new(),
aggregator: AggFilterFn::Distinct(AggFilterDistinct::new()),
}
}

pub(crate) fn new_all() -> Self {
Any {
anys: HashMap::new(),
aggregator: AggFilterFn::default(),
}
}
}

impl AggregateFunction for Any {
fn next_value(&mut self, input_value: &Value, group: &Tuple) {
if !input_value.is_null_or_missing()
&& self.aggregator.filter_value(input_value.clone(), group)
{
match self.anys.get_mut(group) {
None => {
match input_value {
Boolean(_) => self.anys.insert(group.clone(), input_value.clone()),
_ => self.anys.insert(group.clone(), Missing),
};
}
Some(acc) => {
*acc = match (acc.clone(), input_value) {
(Boolean(l), Value::Boolean(r)) => Value::Boolean(l || *r),
(_, _) => Missing,
};
}
}
}
}

fn compute(&self, group: &Tuple) -> Value {
self.anys.get(group).unwrap_or(&Null).clone()
}
}

/// Represents SQL's `EVERY` aggregation function
#[derive(Debug)]
pub(crate) struct Every {
everys: HashMap<Tuple, Value>,
aggregator: AggFilterFn,
}

impl Every {
pub(crate) fn new_distinct() -> Self {
Every {
everys: HashMap::new(),
aggregator: AggFilterFn::Distinct(AggFilterDistinct::new()),
}
}

pub(crate) fn new_all() -> Self {
Every {
everys: HashMap::new(),
aggregator: AggFilterFn::default(),
}
}
}

impl AggregateFunction for Every {
fn next_value(&mut self, input_value: &Value, group: &Tuple) {
if !input_value.is_null_or_missing()
&& self.aggregator.filter_value(input_value.clone(), group)
{
match self.everys.get_mut(group) {
None => {
match input_value {
Boolean(_) => self.everys.insert(group.clone(), input_value.clone()),
_ => self.everys.insert(group.clone(), Missing),
};
}
Some(acc) => {
*acc = match (acc.clone(), input_value) {
(Boolean(l), Value::Boolean(r)) => Value::Boolean(l && *r),
(_, _) => Missing,
};
}
}
}
}

fn compute(&self, group: &Tuple) -> Value {
self.everys.get(group).unwrap_or(&Null).clone()
}
}

Expand Down
114 changes: 114 additions & 0 deletions partiql-eval/src/eval/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1397,3 +1397,117 @@ impl EvalExpr for EvalFnCollSum {
Cow::Owned(result)
}
}

/// Represents the `COLL_ANY`/`COLL_SOME` function, e.g. `COLL_ANY(DISTINCT [true, true, false])`.
#[derive(Debug)]
pub(crate) struct EvalFnCollAny {
pub(crate) setq: SetQuantifier,
pub(crate) elems: Box<dyn EvalExpr>,
}

#[inline]
#[track_caller]
fn coll_any(elems: Vec<&Value>) -> Value {
if elems.is_empty() {
Null
} else {
let mut any = false;
for e in elems {
match e {
Value::Boolean(b) => any = any || *b,
_ => return Missing,
}
}
Value::from(any)
}
}

impl EvalExpr for EvalFnCollAny {
fn evaluate<'a>(&'a self, bindings: &'a Tuple, ctx: &'a dyn EvalContext) -> Cow<'a, Value> {
let elems = self.elems.evaluate(bindings, ctx);
let result = match elems.borrow() {
Null => Null,
Value::List(l) => {
let l_nums: Vec<&Value> = match self.setq {
SetQuantifier::All => l.iter().filter(|&e| !e.is_null_or_missing()).collect(),
SetQuantifier::Distinct => l
.iter()
.filter(|&e| !e.is_null_or_missing())
.unique()
.collect(),
};
coll_any(l_nums)
}
Value::Bag(b) => {
let b_nums: Vec<&Value> = match self.setq {
SetQuantifier::All => b.iter().filter(|&e| !e.is_null_or_missing()).collect(),
SetQuantifier::Distinct => b
.iter()
.filter(|&e| !e.is_null_or_missing())
.unique()
.collect(),
};
coll_any(b_nums)
}
_ => Missing,
};
Cow::Owned(result)
}
}

/// Represents the `COLL_EVERY` function, e.g. `COLL_EVERY(DISTINCT [true, true, false])`.
#[derive(Debug)]
pub(crate) struct EvalFnCollEvery {
pub(crate) setq: SetQuantifier,
pub(crate) elems: Box<dyn EvalExpr>,
}

#[inline]
#[track_caller]
fn coll_every(elems: Vec<&Value>) -> Value {
if elems.is_empty() {
Null
} else {
let mut every = true;
for e in elems {
match e {
Value::Boolean(b) => every = every && *b,
_ => return Missing,
}
}
Value::from(every)
}
}

impl EvalExpr for EvalFnCollEvery {
fn evaluate<'a>(&'a self, bindings: &'a Tuple, ctx: &'a dyn EvalContext) -> Cow<'a, Value> {
let elems = self.elems.evaluate(bindings, ctx);
let result = match elems.borrow() {
Null => Null,
Value::List(l) => {
let l_nums: Vec<&Value> = match self.setq {
SetQuantifier::All => l.iter().filter(|&e| !e.is_null_or_missing()).collect(),
SetQuantifier::Distinct => l
.iter()
.filter(|&e| !e.is_null_or_missing())
.unique()
.collect(),
};
coll_every(l_nums)
}
Value::Bag(b) => {
let b_nums: Vec<&Value> = match self.setq {
SetQuantifier::All => b.iter().filter(|&e| !e.is_null_or_missing()).collect(),
SetQuantifier::Distinct => b
.iter()
.filter(|&e| !e.is_null_or_missing())
.unique()
.collect(),
};
coll_every(b_nums)
}
_ => Missing,
};
Cow::Owned(result)
}
}
46 changes: 36 additions & 10 deletions partiql-eval/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,20 @@ use partiql_logical::{

use crate::eval;
use crate::eval::evaluable::{
Avg, Count, EvalGroupingStrategy, EvalJoinKind, EvalOrderBy, EvalOrderBySortCondition,
EvalOrderBySortSpec, EvalSubQueryExpr, Evaluable, Max, Min, Sum,
Any, Avg, Count, EvalGroupingStrategy, EvalJoinKind, EvalOrderBy, EvalOrderBySortCondition,
EvalOrderBySortSpec, EvalSubQueryExpr, Evaluable, Every, Max, Min, Sum,
};
use crate::eval::expr::pattern_match::like_to_re_pattern;
use crate::eval::expr::{
EvalBagExpr, EvalBetweenExpr, EvalBinOp, EvalBinOpExpr, EvalDynamicLookup, EvalExpr, EvalFnAbs,
EvalFnBitLength, EvalFnBtrim, EvalFnCardinality, EvalFnCharLength, EvalFnCollAvg,
EvalFnCollCount, EvalFnCollMax, EvalFnCollMin, EvalFnCollSum, EvalFnExists, EvalFnExtractDay,
EvalFnExtractHour, EvalFnExtractMinute, EvalFnExtractMonth, EvalFnExtractSecond,
EvalFnExtractTimezoneHour, EvalFnExtractTimezoneMinute, EvalFnExtractYear, EvalFnLower,
EvalFnLtrim, EvalFnModulus, EvalFnOctetLength, EvalFnOverlay, EvalFnPosition, EvalFnRtrim,
EvalFnSubstring, EvalFnUpper, EvalIsTypeExpr, EvalLikeMatch, EvalLikeNonStringNonLiteralMatch,
EvalListExpr, EvalLitExpr, EvalPath, EvalSearchedCaseExpr, EvalTupleExpr, EvalUnaryOp,
EvalUnaryOpExpr, EvalVarRef,
EvalFnBitLength, EvalFnBtrim, EvalFnCardinality, EvalFnCharLength, EvalFnCollAny,
EvalFnCollAvg, EvalFnCollCount, EvalFnCollEvery, EvalFnCollMax, EvalFnCollMin, EvalFnCollSum,
EvalFnExists, EvalFnExtractDay, EvalFnExtractHour, EvalFnExtractMinute, EvalFnExtractMonth,
EvalFnExtractSecond, EvalFnExtractTimezoneHour, EvalFnExtractTimezoneMinute, EvalFnExtractYear,
EvalFnLower, EvalFnLtrim, EvalFnModulus, EvalFnOctetLength, EvalFnOverlay, EvalFnPosition,
EvalFnRtrim, EvalFnSubstring, EvalFnUpper, EvalIsTypeExpr, EvalLikeMatch,
EvalLikeNonStringNonLiteralMatch, EvalListExpr, EvalLitExpr, EvalPath, EvalSearchedCaseExpr,
EvalTupleExpr, EvalUnaryOp, EvalUnaryOpExpr, EvalVarRef,
};
use crate::eval::EvalPlan;
use partiql_value::Value::Null;
Expand Down Expand Up @@ -178,6 +178,12 @@ impl EvaluatorPlanner {
(AggFunc::AggSum, logical::SetQuantifier::All) => {
eval::evaluable::AggFunc::Sum(Sum::new_all())
}
(AggFunc::AggAny, logical::SetQuantifier::All) => {
eval::evaluable::AggFunc::Any(Any::new_all())
}
(AggFunc::AggEvery, logical::SetQuantifier::All) => {
eval::evaluable::AggFunc::Every(Every::new_all())
}
(AggFunc::AggAvg, logical::SetQuantifier::Distinct) => {
eval::evaluable::AggFunc::Avg(Avg::new_distinct())
}
Expand All @@ -193,6 +199,12 @@ impl EvaluatorPlanner {
(AggFunc::AggSum, logical::SetQuantifier::Distinct) => {
eval::evaluable::AggFunc::Sum(Sum::new_distinct())
}
(AggFunc::AggAny, logical::SetQuantifier::Distinct) => {
eval::evaluable::AggFunc::Any(Any::new_distinct())
}
(AggFunc::AggEvery, logical::SetQuantifier::Distinct) => {
eval::evaluable::AggFunc::Every(Every::new_distinct())
}
};
eval::evaluable::AggregateExpression {
name: a_e.name.to_string(),
Expand Down Expand Up @@ -676,6 +688,20 @@ impl EvaluatorPlanner {
elems: args.pop().unwrap(),
})
}
CallName::CollAny(setq) => {
assert_eq!(args.len(), 1);
Box::new(EvalFnCollAny {
setq: plan_set_quantifier(setq),
elems: args.pop().unwrap(),
})
}
CallName::CollEvery(setq) => {
assert_eq!(args.len(), 1);
Box::new(EvalFnCollEvery {
setq: plan_set_quantifier(setq),
elems: args.pop().unwrap(),
})
}
}
}
}
Expand Down
Loading

0 comments on commit 237826e

Please sign in to comment.