Skip to content

Commit

Permalink
docs(hydro_lang): initial website docs on core Hydro concepts
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj committed Jan 23, 2025
1 parent 496ac2b commit 06eb91c
Show file tree
Hide file tree
Showing 25 changed files with 306 additions and 16 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions docs/docs/hydro/correctness.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
---
sidebar_position: 3
---

# Safety and Correctness
Just like Rust's type system helps you avoid memory safety bugs, Hydro helps you ensure **distributed safety**. Hydro's type systems helps you avoid many kinds of distributed systems bugs, including:
- Non-determinism due to message delays (which reorder arrival) or retries (which result in duplicates)
- See [Live Collections / Eventual Determinism](./live-collections/determinism.md)
- Using mismatched serialization and deserialization formats across services
- See [Locations and Networking](./locations/index.md)
- Misusing node identifiers across logically independent clusters of machines
- See [Locations / Clusters](./locations/clusters.md)
- Relying on non-determinstic clocks for batching events
- See [Ticks and Atomicity / Batching and Emitting Streams](./ticks-atomicity/batching-and-emitting.md)

These safety guarantees are surfaced through the Rust type system, so you can catch these bugs at compile time rather than in production. And when it is necessary to bypass these checks for advanced distributed logic, you can use the same `unsafe` keyword as in Rust as an escape hatch.
8 changes: 8 additions & 0 deletions docs/docs/hydro/live-collections/_category_.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"label": "Live Collections",
"position": 4,
"link": {
"type": "doc",
"id": "hydro/live-collections/index"
}
}
48 changes: 48 additions & 0 deletions docs/docs/hydro/live-collections/bounded-unbounded.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
---
sidebar_position: 0
---

# Bounded and Unbounded Types
Although live collections can be continually updated, some collection types also support **termination**, after which no additional changes can be made. For example, a live collection created by reading integers from an in-memory `Vec` will become terminated once all the elements of the `Vec` have been loaded. But other live collections, such as one being updated by the network, may never become terminated.

In Hydro, certain APIs are restricted to only work on collections that are **guaranteed to terminate** (**bounded** collections). All live collections in Hydro have a type parameter (typically named `B`), which tracks whether the collection is bounded (has the type `Bounded`) or unbounded (has the type `Unbounded`). These types are used in the signature of many Hydro APIs to ensure that the API is only called on the appropriate type of collection.

## Converting Boundedness
In some cases, you may need to convert between bounded and unbounded collections. Converting from a bounded collection **to an unbounded collection** is always allowed and safe, since it relaxes the guarantees on the collection. This can be done by calling `.into()` on the collection.

```rust,no_run
# use hydro_lang::*;
# use dfir_rs::futures::StreamExt;
# let flow = FlowBuilder::new();
# let process = flow.process::<()>();
# let tick = process.tick();
# let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
let input: Stream<_, _, Bounded> = // ...
# unsafe { numbers.timestamped(&tick).tick_batch() };
let unbounded: Stream<_, _, Unbounded> = input.into();
```

```rust,no_run
# use hydro_lang::*;
# use dfir_rs::futures::StreamExt;
# let flow = FlowBuilder::new();
# let process = flow.process::<()>();
# let tick = process.tick();
let input: Singleton<_, _, Bounded> = tick.singleton(q!(0));
let unbounded: Singleton<_, _, Unbounded> = input.into();
```

Converting from an unbounded collection **to a bounded collection**, however is more complex. This requires cutting off the unbounded collection at a specific point in time, which may not be possible to do deterministically. For example, the most common way to convert an unbounded `Stream` to a bounded one is to batch its elements non-deterministically using `.tick_batch()`.

```rust,no_run
# use hydro_lang::*;
# use dfir_rs::futures::StreamExt;
# let flow = FlowBuilder::new();
# let process = flow.process::<()>();
let unbounded_input = // ...
# process.source_iter(q!(vec![1, 2, 3, 4]));
let tick = process.tick();
let batch: Stream<_, _, Bounded> = unsafe {
unbounded_input.timestamped(&tick).tick_batch()
};
```
Original file line number Diff line number Diff line change
@@ -1,22 +1,17 @@
---
sidebar_position: 3
sidebar_position: 1
---

# Consistency and Safety
A key feature of Hydro is its integration with the Rust type system to highlight possible sources of inconsistent distributed behavior due to sources of non-determinism such as batching, timeouts, and message reordering. In this section, we'll walk through the consistency guarantees in Hydro and how to use the **`unsafe`** keyword as an escape hatch when introducing sources of non-determinism.
# Eventual Determinism
Most programs are strong guarantees on **determinism**, the property that when provided the same inputs, the outputs of the program are always the same. Even when the inputs and outputs are live collections, we can focus on the _eventual_ state of the collection (as if we froze the input and waited until the output stops changing).

:::info

Our consistency and safety model is based on the POPL'25 paper [Flo: A Semantic Foundation for Progressive Stream Processing](https://arxiv.org/abs/2411.08274), which covers the formal details and proofs underlying this system.

:::

## Eventual Determinism
Hydro provides strong guarantees on **determinism**, the property that when provided the same inputs, the outputs of the program are always the same. Even when the inputs and outputs are streaming, we can use this property by looking at the **aggregate collection** (i.e. the result of collecting the elements of the stream into a finite collection). This makes it easy to build composable blocks of code without having to worry about runtime behavior such as batching or network delays.

Because Hydro programs can involve network delay, we guarantee **eventual determinism**: given a set of streaming inputs which have arrived, the outputs of the program (which continuously change as inputs arrive) will **eventually** have the same _aggregate_ value.

Again, by focusing on the _aggregate_ value rather than individual outputs, Hydro programs can involve concepts such as retractions (for incremental computation) while still guaranteeing determinism because the _resolved_ output (after processing retractions) will eventually be the same.
Hydro thus guarantees **eventual determinism**: given a set of streaming inputs which have arrived, the outputs of the program will **eventually** have the same _final_ value. This makes it easy to build composable blocks of code without having to worry about runtime behavior such as batching or network delays.

:::note

Expand Down Expand Up @@ -54,9 +49,9 @@ use std::fmt::Debug;
use std::time::Duration;

/// ...
///
///
/// # Safety
/// This function will non-deterministically print elements
/// This function will non-deterministically print elements
/// from the stream according to a timer.
unsafe fn print_samples<T: Debug, L>(
stream: Stream<T, Process<L>, Unbounded>
Expand Down
4 changes: 4 additions & 0 deletions docs/docs/hydro/live-collections/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Live Collections
Traditional programs (like those in Rust) typically manipulate **collections** of data elements, such as those stored in a `Vec` or `HashMap`. These collections are **fixed** in the sense that any transformations applied to them such as `map` are immediately executed on a snapshot of the collection. This means that the output will not be updated when the input collection is modified.

In Hydro, programs instead work with **live collections** which are expected to dynamically change over time as new elements are added or removed (in response to API requests, streaming ingestion, etc). Applying a transformation like `map` to a live collection results in another live collection that will dynamically change over time.
10 changes: 10 additions & 0 deletions docs/docs/hydro/live-collections/singletons-optionals.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
sidebar_position: 3
---

# Singletons and Optionals
:::caution

The Hydro documentation is currently under active development! This page is a placeholder for future content.

:::
10 changes: 10 additions & 0 deletions docs/docs/hydro/live-collections/streams.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
sidebar_position: 2
---

# Streams
:::caution

The Hydro documentation is currently under active development! This page is a placeholder for future content.

:::
8 changes: 8 additions & 0 deletions docs/docs/hydro/locations/_category_.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"label": "Locations and Networking",
"position": 5,
"link": {
"type": "doc",
"id": "hydro/locations/index"
}
}
10 changes: 10 additions & 0 deletions docs/docs/hydro/locations/clusters.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
sidebar_position: 1
---

# Clusters
:::caution

The Hydro documentation is currently under active development! This page is a placeholder for future content.

:::
10 changes: 10 additions & 0 deletions docs/docs/hydro/locations/external-clients.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
sidebar_position: 2
---

# External Clients
:::caution

The Hydro documentation is currently under active development! This page is a placeholder for future content.

:::
41 changes: 41 additions & 0 deletions docs/docs/hydro/locations/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Locations and Networking
Hydro is a **global**, **distributed** programming model. This means that the data and computation in a Hydro program can be spread across multiple machines, data centers, and even continents. To achieve this, Hydro uses the concept of **locations** to keep track of _where_ data is stored and computation is executed.

Each live collection type (`Stream`, `Singleton`, etc.) has a type parameter `L` which will always be a type that implements the `Location` trait (e.g. `Process` and `Cluster`, documented in this section). Most Hydro APIs that transform live collections will emit a new live collection with the same location type as the input, and APIs that consume multiple live collections will require them all to have the same location type.

To create distributed programs, live collections can be sent over the network using a variety of APIs. For example, `Stream`s can be sent from a process to another process using `.send_bincode(&loc2)` (which uses [bincode](https://docs.rs/bincode/latest/bincode/) as a serialization format). The sections for each location type discuss the networking APIs in further detail.

## Creating Locations
Locations can be created by calling the appropriate method on the global `FlowBuilder` (e.g. `flow.process()` or `flow.cluster()`). These methods will return a handle to the location that can be used to create live collections and run computations.

:::caution

It is possible to create **different** locations that still have the same type, for example:

```rust
# use hydro_lang::*;
let flow = FlowBuilder::new();
let process1: Process<()> = flow.process::<()>();
let process2: Process<()> = flow.process::<()>();

assert_ne!(process1, process2);
# let _ = flow.compile_no_network::<deploy::MultiGraph>();
```

These locations will not be unified and may be deployed to separate machines. When deploying a Hydro program, additional runtime checks will be performed to ensure that input locations match.

```rust
# use hydro_lang::*;
let flow = FlowBuilder::new();
let process1: Process<()> = flow.process::<()>();
let process2: Process<()> = flow.process::<()>();

# test_util::assert_panics_with_message(|| {
process1.source_iter(q!([1, 2, 3]))
.cross_product(process2.source_iter(q!([1, 2, 3])));
// PANIC: assertion `left == right` failed: locations do not match
# }, "assertion `left == right` failed: locations do not match");
# let _ = flow.compile_no_network::<deploy::MultiGraph>();
```

:::
10 changes: 10 additions & 0 deletions docs/docs/hydro/locations/processes.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
sidebar_position: 0
---

# Processes
:::caution

The Hydro documentation is currently under active development! This page is a placeholder for future content.

:::
2 changes: 1 addition & 1 deletion docs/docs/hydro/stageleft.mdx
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
title: Stageleft
sidebar_position: 4
sidebar_position: 7
---

import StageleftDocs from '../../../stageleft/README.md'
Expand Down
8 changes: 8 additions & 0 deletions docs/docs/hydro/ticks-atomicity/_category_.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"label": "Ticks and Atomicity",
"position": 6,
"link": {
"type": "doc",
"id": "hydro/ticks-atomicity/index"
}
}
10 changes: 10 additions & 0 deletions docs/docs/hydro/ticks-atomicity/atomicity.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
sidebar_position: 3
---

# Atomicity and Timestamps
:::caution

The Hydro documentation is currently under active development! This page is a placeholder for future content.

:::
10 changes: 10 additions & 0 deletions docs/docs/hydro/ticks-atomicity/batching-and-emitting.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
sidebar_position: 1
---

# Batching and Emitting Streams
:::caution

The Hydro documentation is currently under active development! This page is a placeholder for future content.

:::
10 changes: 10 additions & 0 deletions docs/docs/hydro/ticks-atomicity/execution-model.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
sidebar_position: 0
---

# The Tick Execution Model
:::caution

The Hydro documentation is currently under active development! This page is a placeholder for future content.

:::
12 changes: 12 additions & 0 deletions docs/docs/hydro/ticks-atomicity/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Ticks and Atomicity
By default, all live collections in Hydro are transformed **asynchronously**, which means that there may be arbitrary delays between when a live collection is updated and when downstream transformations see the updates. This is because Hydro is designed to work in a distributed setting where messages may be delayed. But for some programs, it is necessary to define local iterative loops where transformations are applied atomically; this is achieved with **ticks**.

## Loops
In some programs, you may want to process batches or snapshots of a live collection in an iterative manner. For example, in a map-reduce program, it may be helpful to compute aggregations on small local batches of data before sending those intermediate results to a reducer.

To create such iterative loops, Hydro provides the concept of **ticks**. A **tick** captures the body of an infinite loop running locally to the machine (importantly, this means that ticks define a **logical time** which is not comparable across machines). Ticks are non-deterministically generated, so batching data into ticks is an **unsafe** operation that requires special attention.

## Atomicity
In other programs, it is necessary to define an atomic section where a set of transformations are guaranteed to be executed **all at once**. For example, in a transaction processing program, it is important that the transaction is applied **before** an acknowledgment is sent to the client.

In Hydro, this can be achieved by placing the transaction and acknowledgment in the same atomic **tick**. Hydro guarantees that all the outputs of a tick will be computed before any are released. Importantly, atomic ticks cannot span several locations, since that would require a locking mechanism that has significant performance implications.
10 changes: 10 additions & 0 deletions docs/docs/hydro/ticks-atomicity/stateful-loops.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
sidebar_position: 2
---

# Stateful Loops
:::caution

The Hydro documentation is currently under active development! This page is a placeholder for future content.

:::
22 changes: 18 additions & 4 deletions hydro_lang/src/location/cluster/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::fmt::{Debug, Formatter};
use std::marker::PhantomData;

use proc_macro2::{Span, TokenStream};
Expand All @@ -18,12 +19,17 @@ pub struct Cluster<'a, C> {
pub(crate) _phantom: Invariant<'a, C>,
}

pub trait IsCluster {
type Tag;
impl<C> Debug for Cluster<'_, C> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Cluster({})", self.id)
}
}

impl<C> IsCluster for Cluster<'_, C> {
type Tag = C;
impl<C> Eq for Cluster<'_, C> {}
impl<C> PartialEq for Cluster<'_, C> {
fn eq(&self, other: &Self) -> bool {
self.id == other.id && self.flow_state.as_ptr() == other.flow_state.as_ptr()
}
}

impl<'a, C> Cluster<'a, C> {
Expand Down Expand Up @@ -102,6 +108,14 @@ impl<'a, C: 'a, Ctx> FreeVariableWithContext<Ctx> for ClusterIds<'a, C> {

impl<'a, C, Ctx> QuotedWithContext<'a, &'a Vec<ClusterId<C>>, Ctx> for ClusterIds<'a, C> {}

pub trait IsCluster {
type Tag;
}

impl<C> IsCluster for Cluster<'_, C> {
type Tag = C;
}

/// A free variable representing the cluster's own ID. When spliced in
/// a quoted snippet that will run on a cluster, this turns into a [`ClusterId`].
pub static CLUSTER_SELF_ID: ClusterSelfId = ClusterSelfId { _private: &() };
Expand Down
14 changes: 14 additions & 0 deletions hydro_lang/src/location/process.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::fmt::{Debug, Formatter};
use std::marker::PhantomData;

use super::{Location, LocationId};
Expand All @@ -10,6 +11,19 @@ pub struct Process<'a, P = ()> {
pub(crate) _phantom: Invariant<'a, P>,
}

impl<P> Debug for Process<'_, P> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Process({})", self.id)
}
}

impl<P> Eq for Process<'_, P> {}
impl<P> PartialEq for Process<'_, P> {
fn eq(&self, other: &Self) -> bool {
self.id == other.id && self.flow_state.as_ptr() == other.flow_state.as_ptr()
}
}

impl<P> Clone for Process<'_, P> {
fn clone(&self) -> Self {
Process {
Expand Down
10 changes: 10 additions & 0 deletions hydro_lang/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,16 @@ pub struct Stream<T, L, B, Order = TotalOrder> {
_phantom: PhantomData<(T, L, B, Order)>,
}

impl<'a, T, L: Location<'a>, O> From<Stream<T, L, Bounded, O>> for Stream<T, L, Unbounded, O> {
fn from(stream: Stream<T, L, Bounded, O>) -> Stream<T, L, Unbounded, O> {
Stream {
location: stream.location,
ir_node: stream.ir_node,
_phantom: PhantomData,
}
}
}

impl<'a, T, L: Location<'a>, B> From<Stream<T, L, B, TotalOrder>> for Stream<T, L, B, NoOrder> {
fn from(stream: Stream<T, L, B, TotalOrder>) -> Stream<T, L, B, NoOrder> {
Stream {
Expand Down
Loading

0 comments on commit 06eb91c

Please sign in to comment.