Skip to content

Commit

Permalink
Rename CardinalityOne -> LastWriteWins (part of #85)
Browse files Browse the repository at this point in the history
  • Loading branch information
niko committed Sep 14, 2019
1 parent 3b01029 commit 91479b3
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 109 deletions.
8 changes: 4 additions & 4 deletions src/domain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use differential_dataflow::operators::Threshold;
use differential_dataflow::trace::TraceReader;
use differential_dataflow::AsCollection;

use crate::operators::CardinalityOne;
use crate::operators::LastWriteWins;
use crate::{Aid, Error, Rewind, TxData, Value};
use crate::{AttributeConfig, IndexDirection, InputSemantics, QuerySupport};
use crate::{RelationConfig, RelationHandle};
Expand Down Expand Up @@ -118,7 +118,7 @@ where
} else {
let tuples = match config.input_semantics {
InputSemantics::Raw => pairs.as_collection(),
InputSemantics::CardinalityOne => pairs.as_collection().cardinality_one(),
InputSemantics::LastWriteWins => pairs.as_collection().last_write_wins(),
// Ensure that redundant (e,v) pairs don't cause
// misleading proposals during joining.
InputSemantics::CardinalityMany => pairs.as_collection().distinct(),
Expand All @@ -143,9 +143,9 @@ where
);
}

// CardinalityOne is a special case, because count,
// LastWriteWins is a special case, because count,
// propose, and validate are all essentially the same.
if config.input_semantics != InputSemantics::CardinalityOne {
if config.input_semantics != InputSemantics::LastWriteWins {
// Count traces are only required for use in
// worst-case optimal joins.
if config.query_support == QuerySupport::AdaptiveWCO {
Expand Down
8 changes: 6 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,8 +326,12 @@ pub enum InputSemantics {
/// No special semantics enforced. Source is responsible for
/// everything.
Raw,
/// Only a single value per eid is allowed at any given timestamp.
CardinalityOne,
/// Only the last input for each eid is kept.
LastWriteWins,
// @TODO
// /// Only the first input for each eid is kept, all subsequent ones
// /// ignored.
// FirstWriteWins,
/// Multiple different values for any given eid are allowed, but
/// (e,v) pairs are enforced to be distinct.
CardinalityMany,
Expand Down
104 changes: 104 additions & 0 deletions src/operators/last_write_wins.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
//! Operator enforcing last-write-wins semantics for each eid.
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::aggregation::StateMachine;
use timely::dataflow::operators::{generic::operator::Operator, Map};
use timely::dataflow::Scope;

use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::{Arrange, Arranged};
use differential_dataflow::trace::{cursor::Cursor, BatchReader};
use differential_dataflow::{AsCollection, Collection};

use crate::{TraceValHandle, Value};

/// Provides the `last_write_wins` method.
pub trait LastWriteWins<S: Scope> {
/// Ensures that only a single value per eid exists within an
/// attribute, by retracting any previous values upon new
/// updates. Therefore this stream does not expect explicit
/// retractions.
fn last_write_wins(&self) -> Collection<S, (Value, Value), isize>;
}

impl<S> LastWriteWins<S> for Collection<S, (Value, Value), isize>
where
S: Scope,
S::Timestamp: Lattice + Ord,
{
fn last_write_wins(&self) -> Collection<S, (Value, Value), isize> {
use differential_dataflow::hashable::Hashable;

let arranged: Arranged<S, TraceValHandle<Value, Value, S::Timestamp, isize>> =
self.arrange();

arranged
.stream
.unary(Pipeline, "AsCollection", move |_, _| {
move |input, output| {
input.for_each(|time, data| {
let mut session = output.session(&time);
for wrapper in data.iter() {
let batch = &wrapper;
let mut cursor = batch.cursor();
while let Some(key) = cursor.get_key(batch) {
let mut tuples = Vec::new();
while let Some(val) = cursor.get_val(batch) {
cursor.map_times(batch, |time, diff| {
tuples.push((
(key.clone(), val.clone()),
time.clone(),
diff.clone(),
));
});
cursor.step_val(batch);
}

tuples.sort_by_key(|(_, ref t, _)| t.clone());
session.give_iterator(tuples.drain(..));

cursor.step_key(batch);
}
}
});
}
})
.map(
|((e, next_v), t, diff): ((Value, Value), S::Timestamp, isize)| {
(e, (next_v, t, diff))
},
)
.state_machine(
|e, (next_v, t, diff), v| {
match v {
None => {
assert!(
diff > 0,
"Received a retraction of a new key on a LastWriteWins attribute"
);
*v = Some(next_v.clone());
(false, vec![((e.clone(), next_v), t, 1)])
}
Some(old_v) => {
let old_v = old_v.clone();
if diff > 0 {
*v = Some(next_v.clone());
(
false,
vec![
((e.clone(), old_v), t.clone(), -1),
((e.clone(), next_v), t, 1),
],
)
} else {
// Retraction received. Can clean up state.
(true, vec![((e.clone(), old_v), t, -1)])
}
}
}
},
|e| e.hashed(),
)
.as_collection()
}
}
103 changes: 2 additions & 101 deletions src/operators/mod.rs
Original file line number Diff line number Diff line change
@@ -1,105 +1,6 @@
//! Extension traits for `Stream` implementing various
//! declarative-specific operators.
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::aggregation::StateMachine;
use timely::dataflow::operators::{generic::operator::Operator, Map};
use timely::dataflow::Scope;
mod last_write_wins;

use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::{Arrange, Arranged};
use differential_dataflow::trace::{cursor::Cursor, BatchReader};
use differential_dataflow::{AsCollection, Collection};

use crate::{TraceValHandle, Value};

/// Provides the `cardinality_one` method.
pub trait CardinalityOne<S: Scope> {
/// Ensures that only a single value per eid exists within an
/// attribute, by retracting any previous values upon new
/// updates. Therefore this stream does not expect explicit
/// retractions.
fn cardinality_one(&self) -> Collection<S, (Value, Value), isize>;
}

impl<S> CardinalityOne<S> for Collection<S, (Value, Value), isize>
where
S: Scope,
S::Timestamp: Lattice + Ord,
{
fn cardinality_one(&self) -> Collection<S, (Value, Value), isize> {
use differential_dataflow::hashable::Hashable;

let arranged: Arranged<S, TraceValHandle<Value, Value, S::Timestamp, isize>> =
self.arrange();

arranged
.stream
.unary(Pipeline, "AsCollection", move |_, _| {
move |input, output| {
input.for_each(|time, data| {
let mut session = output.session(&time);
for wrapper in data.iter() {
let batch = &wrapper;
let mut cursor = batch.cursor();
while let Some(key) = cursor.get_key(batch) {
let mut tuples = Vec::new();
while let Some(val) = cursor.get_val(batch) {
cursor.map_times(batch, |time, diff| {
tuples.push((
(key.clone(), val.clone()),
time.clone(),
diff.clone(),
));
});
cursor.step_val(batch);
}

tuples.sort_by_key(|(_, ref t, _)| t.clone());
session.give_iterator(tuples.drain(..));

cursor.step_key(batch);
}
}
});
}
})
.map(
|((e, next_v), t, diff): ((Value, Value), S::Timestamp, isize)| {
(e, (next_v, t, diff))
},
)
.state_machine(
|e, (next_v, t, diff), v| {
match v {
None => {
assert!(
diff > 0,
"Received a retraction of a new key on a CardinalityOne attribute"
);
*v = Some(next_v.clone());
(false, vec![((e.clone(), next_v), t, 1)])
}
Some(old_v) => {
let old_v = old_v.clone();
if diff > 0 {
*v = Some(next_v.clone());
(
false,
vec![
((e.clone(), old_v), t.clone(), -1),
((e.clone(), next_v), t, 1),
],
)
} else {
// Retraction received. Can clean up state.
(true, vec![((e.clone(), old_v), t, -1)])
}
}
}
},
|e| e.hashed(),
)
.as_collection()
}
}
pub use last_write_wins::LastWriteWins;
4 changes: 2 additions & 2 deletions tests/cardinality_one.rs → tests/input_semantics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl Run for Vec<Case<u64>> {
for tx in case.transactions.iter() {
for datum in tx {
deps.entry(datum.2.clone()).or_insert_with(|| {
AttributeConfig::tx_time(InputSemantics::CardinalityOne)
AttributeConfig::tx_time(InputSemantics::LastWriteWins)
});
}
}
Expand Down Expand Up @@ -124,7 +124,7 @@ impl Run for Vec<Case<Pair<Duration, u64>>> {
for datum in tx {
deps.entry(datum.2.clone())
.or_insert_with(|| AttributeConfig {
input_semantics: InputSemantics::CardinalityOne,
input_semantics: InputSemantics::LastWriteWins,
trace_slack: Some(Time::Bi(Duration::from_secs(0), 1)),
..Default::default()
});
Expand Down

0 comments on commit 91479b3

Please sign in to comment.