Skip to content

Commit

Permalink
refactor(hydro_lang)!: rename timestamp to atomic and provide batchin…
Browse files Browse the repository at this point in the history
…g shortcuts
  • Loading branch information
shadaj committed Jan 29, 2025
1 parent dc51d94 commit 8f94f14
Show file tree
Hide file tree
Showing 28 changed files with 275 additions and 359 deletions.
4 changes: 2 additions & 2 deletions docs/docs/hydro/live-collections/bounded-unbounded.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ In some cases, you may need to convert between bounded and unbounded collections
# let tick = process.tick();
# let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
let input: Stream<_, _, Bounded> = // ...
# unsafe { numbers.timestamped(&tick).tick_batch() };
# unsafe { numbers.tick_batch(&tick) };
let unbounded: Stream<_, _, Unbounded> = input.into();
```

Expand All @@ -43,6 +43,6 @@ let unbounded_input = // ...
# process.source_iter(q!(vec![1, 2, 3, 4]));
let tick = process.tick();
let batch: Stream<_, _, Bounded> = unsafe {
unbounded_input.timestamped(&tick).tick_batch()
unbounded_input.tick_batch(&tick)
};
```
2 changes: 1 addition & 1 deletion docs/docs/hydro/ticks-atomicity/atomicity.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
sidebar_position: 3
---

# Atomicity and Timestamps
# Atomicity
:::caution

The Hydro documentation is currently under active development! This page is a placeholder for future content.
Expand Down
2 changes: 1 addition & 1 deletion hydro_lang/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ syn = { version = "2.0.46", features = [ "parsing", "extra-traits", "visit-mut"
tokio = { version = "1.29.0", features = [ "full" ] }
toml = { version = "0.8.0", optional = true }
trybuild-internals-api = { version = "1.0.99", optional = true }
ctor = "0.2"

[build-dependencies]
stageleft_tool = { path = "../stageleft_tool", version = "^0.5.0" }

[dev-dependencies]
async-ssh2-lite = { version = "0.5.0", features = ["vendored-openssl"] }
hydro_deploy = { path = "../hydro_deploy/core", version = "^0.11.0" }
ctor = "0.2"
insta = "1.39"
tokio-test = "0.4.4"
trybuild = "1"
8 changes: 1 addition & 7 deletions hydro_lang/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub use optional::Optional;

pub mod location;
pub use location::cluster::CLUSTER_SELF_ID;
pub use location::{Cluster, ClusterId, ExternalProcess, Location, Process, Tick, Timestamped};
pub use location::{Atomic, Cluster, ClusterId, ExternalProcess, Location, Process, Tick};

#[cfg(feature = "build")]
pub mod deploy;
Expand All @@ -46,12 +46,6 @@ mod staging_util;
#[cfg(feature = "deploy")]
pub mod test_util;

#[ctor::ctor]
fn add_private_reexports() {
stageleft::add_private_reexport(vec!["tokio", "time", "instant"], vec!["tokio", "time"]);
stageleft::add_private_reexport(vec!["bytes", "bytes"], vec!["bytes"]);
}

#[stageleft::runtime]
#[cfg(test)]
mod test_init {
Expand Down
2 changes: 1 addition & 1 deletion hydro_lang/src/location/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub mod can_send;
pub use can_send::CanSend;

pub mod tick;
pub use tick::{NoTick, Tick, Timestamped};
pub use tick::{Atomic, NoTick, Tick};

#[derive(PartialEq, Eq, Clone, Debug, Hash)]
pub enum LocationId {
Expand Down
33 changes: 15 additions & 18 deletions hydro_lang/src/location/tick.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,20 @@ impl<T> NoTick for Process<'_, T> {}
impl<T> NoTick for Cluster<'_, T> {}

#[sealed]
pub trait NoTimestamp {}
pub trait NoAtomic {}
#[sealed]
impl<T> NoTimestamp for Process<'_, T> {}
impl<T> NoAtomic for Process<'_, T> {}
#[sealed]
impl<T> NoTimestamp for Cluster<'_, T> {}
impl<T> NoAtomic for Cluster<'_, T> {}
#[sealed]
impl<'a, L: Location<'a>> NoTimestamp for Tick<L> {}
impl<'a, L: Location<'a>> NoAtomic for Tick<L> {}

#[derive(Clone)]
pub struct Timestamped<L> {
pub struct Atomic<L> {
pub(crate) tick: Tick<L>,
}

impl<'a, L: Location<'a>> Location<'a> for Timestamped<L> {
impl<'a, L: Location<'a>> Location<'a> for Atomic<L> {
type Root = L::Root;

fn root(&self) -> Self::Root {
Expand All @@ -55,7 +55,7 @@ impl<'a, L: Location<'a>> Location<'a> for Timestamped<L> {
}

#[sealed]
impl<L> NoTick for Timestamped<L> {}
impl<L> NoTick for Atomic<L> {}

/// Marks the stream as being inside the single global clock domain.
#[derive(Clone)]
Expand Down Expand Up @@ -102,19 +102,18 @@ impl<'a, L: Location<'a>> Tick<L> {
batch_size: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
) -> Stream<(), Self, Bounded>
where
L: NoTick + NoTimestamp,
L: NoTick + NoAtomic,
{
let out = self
.l
.spin()
.flat_map_ordered(q!(move |_| 0..batch_size))
.map(q!(|_| ()))
.timestamped(self);
.map(q!(|_| ()));

unsafe {
// SAFETY: at runtime, `spin` produces a single value per tick,
// so each batch is guaranteed to be the same size.
out.tick_batch()
out.tick_batch(self)
}
}

Expand All @@ -123,11 +122,11 @@ impl<'a, L: Location<'a>> Tick<L> {
e: impl QuotedWithContext<'a, T, L>,
) -> Singleton<T, Self, Bounded>
where
L: NoTick,
L: NoTick + NoAtomic,
{
unsafe {
// SAFETY: a top-level singleton produces the same value each tick
self.outer().singleton(e).timestamped(self).latest_tick()
self.outer().singleton(e).latest_tick(self)
}
}

Expand All @@ -136,7 +135,7 @@ impl<'a, L: Location<'a>> Tick<L> {
e: impl QuotedWithContext<'a, T, Tick<L>>,
) -> Optional<T, Self, Bounded>
where
L: NoTick,
L: NoTick + NoAtomic,
{
let e_arr = q!([e]);
let e = e_arr.splice_untyped_ctx(self);
Expand Down Expand Up @@ -185,9 +184,7 @@ impl<'a, L: Location<'a>> Tick<L> {
)
}

pub fn forward_ref_timestamped<
S: CycleCollection<'a, ForwardRefMarker, Location = Timestamped<L>>,
>(
pub fn forward_ref_atomic<S: CycleCollection<'a, ForwardRefMarker, Location = Atomic<L>>>(
&self,
) -> (ForwardRef<'a, S>, S) {
let next_id = {
Expand All @@ -214,7 +211,7 @@ impl<'a, L: Location<'a>> Tick<L> {
expected_location: self.id(),
_phantom: PhantomData,
},
S::create_source(ident, Timestamped { tick: self.clone() }),
S::create_source(ident, Atomic { tick: self.clone() }),
)
}

Expand Down
76 changes: 50 additions & 26 deletions hydro_lang/src/optional.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use syn::parse_quote;
use crate::builder::FLOW_USED_MESSAGE;
use crate::cycle::{CycleCollection, CycleComplete, DeferTick, ForwardRefMarker, TickCycleMarker};
use crate::ir::{HydroLeaf, HydroNode, HydroSource, TeeNode};
use crate::location::tick::{NoTimestamp, Timestamped};
use crate::location::tick::{Atomic, NoAtomic};
use crate::location::{check_matching_location, LocationId, NoTick};
use crate::singleton::ZipResult;
use crate::stream::NoOrder;
Expand Down Expand Up @@ -220,7 +220,7 @@ impl<'a, T, L: Location<'a>, B> Optional<T, L, B> {
/// # tokio_test::block_on(test_util::stream_transform_test(|process| {
/// let tick = process.tick();
/// let optional = tick.optional_first_tick(q!(1));
/// optional.map(q!(|v| v + 1)).all_ticks().drop_timestamp()
/// optional.map(q!(|v| v + 1)).all_ticks()
/// # }, |mut stream| async move {
/// // 2
/// # assert_eq!(stream.next().await.unwrap(), 2);
Expand Down Expand Up @@ -475,10 +475,10 @@ impl<'a, T, L: Location<'a>> Optional<T, L, Bounded> {
}
}

impl<'a, T, L: Location<'a> + NoTick, B> Optional<T, Timestamped<L>, B> {
/// Given a tick, returns a optional value corresponding to a snapshot of the optional
/// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
/// relevant data that contributed to the snapshot at tick `t`.
impl<'a, T, L: Location<'a> + NoTick, B> Optional<T, Atomic<L>, B> {
/// Returns an optional value corresponding to the latest snapshot of the optional
/// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
/// at least all relevant data that contributed to the snapshot at tick `t`.
///
/// # Safety
/// Because this picks a snapshot of a optional whose value is continuously changing,
Expand All @@ -494,17 +494,26 @@ impl<'a, T, L: Location<'a> + NoTick, B> Optional<T, Timestamped<L>, B> {
)
}

pub fn drop_timestamp(self) -> Optional<T, L, B> {
pub fn end_atomic(self) -> Optional<T, L, B> {
Optional::new(self.location.tick.l, self.ir_node.into_inner())
}
}

impl<'a, T, L: Location<'a> + NoTick, B> Optional<T, L, B> {
pub fn timestamped(self, tick: &Tick<L>) -> Optional<T, Timestamped<L>, B> {
Optional::new(
Timestamped { tick: tick.clone() },
self.ir_node.into_inner(),
)
impl<'a, T, L: Location<'a> + NoTick + NoAtomic, B> Optional<T, L, B> {
pub fn atomic(self, tick: &Tick<L>) -> Optional<T, Atomic<L>, B> {
Optional::new(Atomic { tick: tick.clone() }, self.ir_node.into_inner())
}

/// Given a tick, returns a optional value corresponding to a snapshot of the optional
/// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
/// relevant data that contributed to the snapshot at tick `t`.
///
/// # Safety
/// Because this picks a snapshot of a optional whose value is continuously changing,
/// the output optional has a non-deterministic value since the snapshot can be at an
/// arbitrary point in time.
pub unsafe fn latest_tick(self, tick: &Tick<L>) -> Optional<T, Tick<L>, Bounded> {
unsafe { self.atomic(tick).latest_tick() }
}

/// Eagerly samples the optional as fast as possible, returning a stream of snapshots
Expand All @@ -519,10 +528,7 @@ impl<'a, T, L: Location<'a> + NoTick, B> Optional<T, L, B> {

unsafe {
// SAFETY: source of intentional non-determinism
self.timestamped(&tick)
.latest_tick()
.all_ticks()
.drop_timestamp()
self.latest_tick(&tick).all_ticks()
}
}

Expand All @@ -540,7 +546,7 @@ impl<'a, T, L: Location<'a> + NoTick, B> Optional<T, L, B> {
interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
) -> Stream<T, L, Unbounded>
where
L: NoTimestamp,
L: NoAtomic,
{
let samples = unsafe {
// SAFETY: source of intentional non-determinism
Expand All @@ -550,19 +556,27 @@ impl<'a, T, L: Location<'a> + NoTick, B> Optional<T, L, B> {

unsafe {
// SAFETY: source of intentional non-determinism
self.timestamped(&tick)
.latest_tick()
.continue_if(samples.timestamped(&tick).tick_batch().first())
self.latest_tick(&tick)
.continue_if(samples.tick_batch(&tick).first())
.all_ticks()
.drop_timestamp()
}
}
}

impl<'a, T, L: Location<'a>> Optional<T, Tick<L>, Bounded> {
pub fn all_ticks(self) -> Stream<T, Timestamped<L>, Unbounded> {
pub fn all_ticks(self) -> Stream<T, L, Unbounded> {
Stream::new(
Timestamped {
self.location.outer().clone(),
HydroNode::Persist {
inner: Box::new(self.ir_node.into_inner()),
metadata: self.location.new_node_metadata::<T>(),
},
)
}

pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded> {
Stream::new(
Atomic {
tick: self.location.clone(),
},
HydroNode::Persist {
Expand All @@ -572,9 +586,19 @@ impl<'a, T, L: Location<'a>> Optional<T, Tick<L>, Bounded> {
)
}

pub fn latest(self) -> Optional<T, Timestamped<L>, Unbounded> {
pub fn latest(self) -> Optional<T, L, Unbounded> {
Optional::new(
self.location.outer().clone(),
HydroNode::Persist {
inner: Box::new(self.ir_node.into_inner()),
metadata: self.location.new_node_metadata::<T>(),
},
)
}

pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded> {
Optional::new(
Timestamped {
Atomic {
tick: self.location.clone(),
},
HydroNode::Persist {
Expand Down
8 changes: 1 addition & 7 deletions hydro_lang/src/rewrites/persist_pullup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,13 +228,7 @@ mod tests {
let process = flow.process::<()>();

let tick = process.tick();
let before_tee = unsafe {
process
.source_iter(q!(0..10))
.timestamped(&tick)
.tick_batch()
.persist()
};
let before_tee = unsafe { process.source_iter(q!(0..10)).tick_batch(&tick).persist() };

before_tee
.clone()
Expand Down
3 changes: 1 addition & 2 deletions hydro_lang/src/rewrites/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,7 @@ mod tests {
process
.source_iter(q!(vec![]))
.map(q!(|string: String| (string, ())))
.timestamped(&tick)
.tick_batch()
.tick_batch(&tick)
}
.fold_keyed(q!(|| 0), counter_func)
.all_ticks()
Expand Down
Loading

0 comments on commit 8f94f14

Please sign in to comment.