A TypeScript implementation of differential dataflow
Differential dataflow is a powerful data-parallel programming framework that enables incremental computations over changing input data. This implementation provides:
- Core differential dataflow operators (map, filter, join, reduce, etc.)
- Support for iterative computations
- Incremental updates with partially ordered versions
- Optional SQLite backend for state management and restartability
- Incremental Processing: Efficiently process changes to input data without recomputing everything
- Rich Operators: Supports common operations with a pipeline API:
concat()
: Concatenate two streamsconsolidate()
: Consolidates the elements in the stream at each versioncount()
: Count elements by keydistinct()
: Remove duplicatesfilter()
: Filter elements based on predicatesiterate()
: Perform iterative computationsjoin()
: Join two streamsmap()
: Transform elements in a streamreduce()
: Aggregate values by keyoutput()
: Output the messages of the streampipe()
: Build a pipeline of operators enabling reuse of combinations of operators
- SQLite Integration: Optional SQLite backend for managing operator state
- Type Safety: Full TypeScript type safety and inference through the pipeline API
npm install {TODO}
Here's a simple example that demonstrates the core concepts:
import { D2, map, filter, debug, MultiSet, v } from 'd2ts'
// Create a new D2 graph with initial frontier
const graph = new D2({ initialFrontier: 0 })
// Create an input stream
const input = graph.newInput<number>()
// Build a simple pipeline that:
// 1. Takes numbers as input
// 2. Adds 5 to each number
// 3. Filters to keep only even numbers
const output = input.pipe(
map(x => x + 5),
filter(x => x % 2 === 0),
debug('output')
)
// Finalize the graph
graph.finalize()
// Send some data
input.sendData(0), new MultiSet([
[1, 1],
[2, 1],
[3, 1]
]))
input.sendFrontier(1)
// Process the data
graph.run()
// Output will show:
// 6 (from 1 + 5)
// 8 (from 3 + 5)
Concatenates two input streams
const output = input.pipe(
concat(other)
)
Consolidates the elements in the stream at each version, essentially it ensures the output stream is at the latest known complete version.
const output = input.pipe(
consolidate()
)
Counts the number of elements in the stream by key
const output = input.pipe(
map((data) => [data.somethingToKeyOn, data]),
count()
)
Logs the messages of the stream to the console, the name is used to identify the stream in the logs.
const output = input.pipe(
debug('output')
)
Removes duplicate values from the stream
const output = input.pipe(
distinct()
)
Filters the stream based on a predicate
const output = input.pipe(
filter(x => x % 2 === 0)
)
Performs an iterative computation on the stream
TODO: Explain and add example
Joins two keyed streams, the output stream will contain the elements of the two streams combined, with the key of the element from the left stream.
This is an inner join, so only elements with matching keys will be included in the output.
const input = graph.newInput<{ key: string, value: number }>()
const other = graph.newInput<{ key: string, value: string }>()
const output = input.pipe(
join(other)
)
TODO: Add links to other joins when we have them
Transforms the elements of the stream using a function
const output = input.pipe(
map(x => x + 5)
)
Outputs the messages of the stream
TODO: expand on the Message type and how it works
const output = input.pipe(
output((message) => {
console.log(message)
})
)
Pipes the stream through a series of operators
const composedPipeline = pipe(
map(x => x + 5),
filter(x => x % 2 === 0),
debug('output')
)
const output = input.pipe(
composedPipeline
)
// Or as a function
const myPipe = (a: number, b: number) => pipe(
map(x => x + a),
filter(x => x % b === 0),
debug('output')
)
const output = input.pipe(
myPipe(5, 2)
)
Performs a reduce operation on the stream grouped by key.
The function f
takes an array of values and their multiplicities and returns an array of the result and their multiplicities.
// Count the number of elements in the stream by key
const output = input.pipe(
map((data) => [data.somethingToKeyOn, data]),
reduce((values) => values.map(([value, multiplicity]) => {
let count = 0
for (const [num, multiplicity] of values) {
count += num * multiplicity
}
return [[count, 1]]
}))
)
For persistence and larger datasets, a number of operators are provided that persist to SQLite:
consolidate()
: Consolidates data into a single versioncount()
: Counts the number of elements in a collectiondistinct()
: Removes duplicates from a collectionjoin()
: Joins two collectionsmap()
: Transforms elementsreduce()
: Aggregates values by key
Each take a SQLite database as the final argument.
The implementation is based on the the one outlined in the Materialize blog post, with some TypeScript-specific adaptations, along with using a pipeline rather than builder api pattern.
-
Core data structures:
MultiSet
: Represents collections with multiplicitiesVersion
: Handles partially ordered versionsAntichain
: Manages frontiersIndex
: Stores versioned operator state
-
Operators:
- Base operator classes in
src/operators/
- SQLite variants in
src/sqlite/operators/
- Base operator classes in
-
Graph execution:
- Dataflow graph management in
src/graph.ts
andsrc/D2.ts
- Message passing between operators
- Frontier tracking and advancement
- Dataflow graph management in