Skip to content

Commit

Permalink
Implement EventsBuffer class that can queue events
Browse files Browse the repository at this point in the history
The main feature of the structure is that subscription to even
updates can start in the certain point in past which will cause
all the past and future to be published via callback.
  • Loading branch information
p2004a committed Jan 9, 2025
1 parent 607d6ee commit 282c7c8
Show file tree
Hide file tree
Showing 4 changed files with 355 additions and 0 deletions.
11 changes: 11 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"node": "^22"
},
"dependencies": {
"@js-sdsl/deque": "^4.4.2",
"ajv": "^8.17.1",
"ajv-formats": "^3.0.1",
"pino": "^9.6.0",
Expand Down
174 changes: 174 additions & 0 deletions src/eventsBuffer.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
import { suite, test } from 'node:test';
import assert from 'node:assert/strict';
import { binarySearch, EventsBuffer } from './eventsBuffer.js';

suite('binary search', () => {
test('simple', () => {
assert.equal(
binarySearch(0, 10, (v) => v >= 5),
5,
);
assert.equal(
binarySearch(0, 11, (v) => v >= 5),
5,
);
assert.equal(
binarySearch(5, 11, (v) => v >= 7),
7,
);
});

test('edge cases', () => {
assert.equal(
binarySearch(10, 10, () => true),
10,
);
assert.equal(
binarySearch(10, 10, () => true),
10,
);
assert.equal(
binarySearch(0, 1, () => true),
0,
);
assert.equal(
binarySearch(0, 1, () => false),
1,
);
assert.equal(
binarySearch(0, 100, () => false),
100,
);
assert.equal(
binarySearch(0, 100, () => true),
0,
);
});

test('full range', () => {
for (let begin = 0; begin < 16; ++begin) {
for (let end = begin; end < 16; ++end) {
for (let val = begin; val <= end; ++val) {
assert.equal(
binarySearch(begin, end, (n) => n >= val),
val,
);
}
}
}
});
});

suite('EventsBuffer', async () => {
await test('base behavior', async (t) => {
t.mock.timers.enable({ apis: ['Date'] });
const eb: EventsBuffer<string> = new EventsBuffer(10 * 1000 * 1000);
const { promise, resolve } = Promise.withResolvers();
let eventsLeft = 4;
const cb = t.mock.fn(async (_t: number, _e: string) => {
--eventsLeft;
if (eventsLeft == 1) {
t.mock.timers.tick(1);
eb.push('d');
} else if (eventsLeft == 0) {
resolve(undefined);
}
});
eb.subscribe(0, cb);

t.mock.timers.tick(1);
eb.push('a');
t.mock.timers.tick(1);
eb.push('b');
eb.push('c');
await promise;

assert.equal(cb.mock.callCount(), 4);
assert.deepEqual(cb.mock.calls[0].arguments, [1000, 'a']);
assert.deepEqual(cb.mock.calls[1].arguments, [2000, 'b']);
assert.deepEqual(cb.mock.calls[2].arguments, [2001, 'c']);
assert.deepEqual(cb.mock.calls[3].arguments, [3000, 'd']);

assert.equal(eb.length, 4);

eb.unsubscribe();
});

await test("can't subscribe while subscribed", async (t) => {
t.mock.timers.enable({ apis: ['Date'] });
const eb: EventsBuffer<string> = new EventsBuffer(10 * 1000 * 1000);
eb.subscribe(0, async () => {});
assert.throws(
() => {
eb.subscribe(0, async () => {});
},
{
name: 'EventsBufferError',
type: 'callback_already_set',
},
);
});

await test("can't subscribe too much in the past", async (t) => {
const maxAge = 1000;
t.mock.timers.enable({ apis: ['Date'] });
const eb: EventsBuffer<string> = new EventsBuffer(maxAge);
t.mock.timers.tick(2);
assert.throws(
() => {
eb.subscribe(999, async () => {});
},
{
name: 'EventsBufferError',
type: 'too_far_in_the_past',
},
);
eb.subscribe(1001, async () => {});
});

await test("doesn't grow infinitely", async (t) => {
t.mock.timers.enable({ apis: ['Date'] });
t.mock.timers.setTime(99999);
const maxAge = 10000;
const droppingFrequency = 1; // Try to drop every single time.
const eb: EventsBuffer<string> = new EventsBuffer(maxAge, droppingFrequency);
eb.push('a');
t.mock.timers.tick(1);
eb.push('b');
t.mock.timers.tick(1);
eb.push('c');
assert.equal(eb.length, 3);

t.mock.timers.tick(1000);
eb.push('d');
assert.equal(eb.length, 1);
});

await test('subscribe in the past', async (t) => {
t.mock.timers.enable({ apis: ['Date'] });
const maxAge = 1000000000;
const eb: EventsBuffer<string> = new EventsBuffer(maxAge);
eb.push('a');
eb.push('b');
t.mock.timers.tick(1);
eb.push('c');
eb.push('d');
t.mock.timers.tick(1);
eb.push('e');

const { promise, resolve } = Promise.withResolvers();
let eventsLeft = 2;
const cb = t.mock.fn(async (_t: number, _e: string) => {
--eventsLeft;
if (eventsLeft == 0) {
resolve(undefined);
}
});
eb.subscribe(1000, cb);
await promise;

assert.equal(cb.mock.callCount(), 2);
assert.deepEqual(cb.mock.calls[0].arguments, [1001, 'd']);
assert.deepEqual(cb.mock.calls[1].arguments, [2000, 'e']);
});
});
169 changes: 169 additions & 0 deletions src/eventsBuffer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
import { Deque } from '@js-sdsl/deque';

type EventCallback<T> = (time: number, ev: T) => Promise<void>;

/**
* Returns the first value in range [`from`; `to`) for which the given
* `predicate` returns `true`, or `to` when not found.
*
* Requirements:
* - The predicate must be monotonically increasing (from false to true) in
* the range.
* - `from` <= `to`.
*/
export function binarySearch(from: number, to: number, predicate: (n: number) => boolean): number {
let len = to - from;
while (len > 0) {
const off = (len / 2) | 0;
if (predicate(from + off)) {
len = off;
} else {
from += off + 1;
len -= off + 1;
}
}
return from;
}

/**
* Errors thrown from `EventsBuffer.subscribe`.
*/
export class EventsBufferError extends Error {
constructor(
public readonly type: 'callback_already_set' | 'too_far_in_the_past',
msg: string,
) {
super(msg);
this.name = 'EventsBufferError';
}
}

/**
* EventsBuffer accepts new events via the `push` method and queues them inside
* of internal memory queue to later republish them via a callback set via the
* `subscribe` method.
*
* Every event gets an unique monotonically increasing timestamp and the main
* feature of the structure is that subscription can start in the past which
* will cause all the past and future to be published via callback.
*
* The callback must return a promise that can't reject, and pusher awaits on
* which allows to provide backpressure.
*
* To stop the publishing one needs only to unsubscribe the callback.
*
* **WARNING**: This class uses time at **microsecond** level, not the language default
* which is millisecond.
*/
export class EventsBuffer<T> {
private callback: EventCallback<T> | null = null;
private pusherRunning: boolean = false;
private pusherEventsIdx: number = 0;
private lastDropTime: number = 0;
private events: Deque<{ time: number; event: T }> = new Deque();

/**
* Construct new EventsBuffer.
*
* @param maxAge Max age in microseconds for how long to keep the events.
* Subscription will fail if asked for events older then `now() - maxAge`.
* @param droppingFrequency Time in microseconds, for maximum interval of
* dropping old events from the queue. The value is to reduce overhead
* of removing old elements.
*/
constructor(
private maxAge: number,
private droppingFrequency: number = (maxAge / 10) | 0,
) {}

/**
* Subscribe to event updates since specified time.
*
* @param since Callback will receive all events with timestamp > since.
* Value needs to be in in microseconds.
* @param callback The callback to push new events to.
* @throws EventsBufferError if callback is already set of since is too high.
*/
subscribe(since: number, callback: EventCallback<T>) {
if (this.callback !== null) {
throw new EventsBufferError(
'callback_already_set',
'callback already set, unsubscribe first',
);
}
if (since < Date.now() * 1000 - this.maxAge) {
throw new EventsBufferError(
'too_far_in_the_past',
`since is too far in the past, max age is ${(this.maxAge / (1000 * 1000)) | 0}s`,
);
}

this.callback = callback;
const subStart =
binarySearch(
0,
this.events.length,
(n) => this.events.getElementByPos(n).time <= since,
) - 1;
if (subStart > 0) {
this.startPusher(subStart);
}
}

/**
* Remove the currently subscribed callback.
*/
unsubscribe() {
this.callback = null;
}

/**
* Add a new event to the buffer.
*
* @param event Event
*/
push(event: T) {
let time = Date.now() * 1000;
if (!this.events.empty() && this.events.front()!.time >= time) {
time = this.events.front()!.time + 1;
}
this.events.pushFront({ time, event });
this.pusherEventsIdx += 1; // Because pusher worker moves from back to front.
this.startPusher(0);
this.maybeDropOlderThen(time - this.maxAge);
}

/**
* Length is simply number of events still in the buffer.
*
* Useful primarily for tests.
*/
get length(): number {
return this.events.length;
}

private maybeDropOlderThen(after: number) {
if (after > this.lastDropTime + this.droppingFrequency) {
const pos = binarySearch(
0,
this.events.length,
(n) => this.events.getElementByPos(n).time <= after,
);
this.events.cut(pos - 1); // Looking at current implementation, it's O(1)
this.lastDropTime = after;
}
}

private startPusher(fromIdx: number) {
if (this.pusherRunning || !this.callback) return;
this.pusherEventsIdx = fromIdx;
this.pusherRunning = true;
process.nextTick(async () => {
while (this.pusherEventsIdx >= 0 && this.callback) {
const { time, event } = this.events.getElementByPos(this.pusherEventsIdx--);
await this.callback(time, event); // must not throw, if it does, crash is appropriate.
}
this.pusherRunning = false;
});
}
}

0 comments on commit 282c7c8

Please sign in to comment.