-
-
Notifications
You must be signed in to change notification settings - Fork 38
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
6 changed files
with
192 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,6 +37,7 @@ | |
"cfworker", | ||
"codegen", | ||
"Deno", | ||
"denokv", | ||
"dereferenceable", | ||
"discoverability", | ||
"docloader", | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
import { Temporal } from "@js-temporal/polyfill"; | ||
import { assertEquals, assertGreater } from "@std/assert"; | ||
import { delay } from "@std/async/delay"; | ||
import { DenoKvMessageQueue, DenoKvStore } from "./denokv.ts"; | ||
|
||
Deno.test("DenoKvStore", async (t) => { | ||
const kv = await Deno.openKv(":memory:"); | ||
const store = new DenoKvStore(kv); | ||
|
||
await t.step("get()", async () => { | ||
await kv.set(["foo", "bar"], "foobar"); | ||
assertEquals(await store.get(["foo", "bar"]), "foobar"); | ||
}); | ||
|
||
await t.step("set()", async () => { | ||
await store.set(["foo", "baz"], "baz"); | ||
assertEquals((await kv.get<string>(["foo", "baz"])).value, "baz"); | ||
}); | ||
|
||
await t.step("delete()", async () => { | ||
await store.delete(["foo", "baz"]); | ||
assertEquals((await kv.get<string>(["foo", "baz"])).value, null); | ||
}); | ||
|
||
kv.close(); | ||
}); | ||
|
||
Deno.test("DenoKvMessageQueue", async (t) => { | ||
const kv = await Deno.openKv(":memory:"); | ||
const mq = new DenoKvMessageQueue(kv); | ||
|
||
const messages: string[] = []; | ||
mq.listen((message: string) => { | ||
messages.push(message); | ||
}); | ||
|
||
await t.step("enqueue()", async () => { | ||
await mq.enqueue("Hello, world!"); | ||
}); | ||
|
||
await waitFor(() => messages.length > 0, 15_000); | ||
|
||
await t.step("listen()", () => { | ||
assertEquals(messages, ["Hello, world!"]); | ||
}); | ||
|
||
let started = 0; | ||
await t.step("enqueue() with delay", async () => { | ||
started = Date.now(); | ||
await mq.enqueue( | ||
"Delayed message", | ||
{ delay: Temporal.Duration.from({ seconds: 3 }) }, | ||
); | ||
}); | ||
|
||
await waitFor(() => messages.length > 1, 15_000); | ||
|
||
await t.step("listen() with delay", () => { | ||
assertEquals(messages, ["Hello, world!", "Delayed message"]); | ||
assertGreater(Date.now() - started, 3_000); | ||
}); | ||
|
||
kv.close(); | ||
}); | ||
|
||
async function waitFor( | ||
predicate: () => boolean, | ||
timeoutMs: number, | ||
): Promise<void> { | ||
const started = Date.now(); | ||
while (!predicate()) { | ||
await delay(500); | ||
if (Date.now() - started > timeoutMs) { | ||
throw new Error("Timeout"); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
/** | ||
* `KvStore` & `MessageQueue` adapters for Deno's KV store | ||
* ======================================================= | ||
* | ||
* This module provides `KvStore` and `MessageQueue` implementations that use | ||
* Deno's KV store. The `DenoKvStore` class implements the `KvStore` interface | ||
* using Deno's KV store, and the `DenoKvMessageQueue` class implements the | ||
* `MessageQueue` interface using Deno's KV store. | ||
* | ||
* @module | ||
* @since 0.5.0 | ||
*/ | ||
import type { KvKey, KvStore, KvStoreSetOptions } from "../federation/kv.ts"; | ||
import type { | ||
MessageQueue, | ||
MessageQueueEnqueueOptions, | ||
} from "../federation/mq.ts"; | ||
|
||
/** | ||
* Represents a key-value store implementation using Deno's KV store. | ||
*/ | ||
export class DenoKvStore implements KvStore { | ||
#kv: Deno.Kv; | ||
|
||
/** | ||
* Constructs a new {@link DenoKvStore} adapter with the given Deno KV store. | ||
* @param kv The Deno KV store to use. | ||
*/ | ||
constructor(kv: Deno.Kv) { | ||
this.#kv = kv; | ||
} | ||
|
||
/** | ||
* {@inheritDoc KvStore.set} | ||
*/ | ||
async get<T = unknown>(key: KvKey): Promise<T | undefined> { | ||
const entry = await this.#kv.get<T>(key); | ||
return entry == null || entry.value == null ? undefined : entry.value; | ||
} | ||
|
||
/** | ||
* {@inheritDoc KvStore.set} | ||
*/ | ||
async set( | ||
key: KvKey, | ||
value: unknown, | ||
options?: KvStoreSetOptions, | ||
): Promise<void> { | ||
await this.#kv.set( | ||
key, | ||
value, | ||
options?.ttl == null ? undefined : { | ||
expireIn: options.ttl.total("millisecond"), | ||
}, | ||
); | ||
} | ||
|
||
/** | ||
* {@inheritDoc KvStore.delete} | ||
*/ | ||
delete(key: KvKey): Promise<void> { | ||
return this.#kv.delete(key); | ||
} | ||
} | ||
|
||
/** | ||
* Represents a message queue adapter that uses Deno KV store. | ||
*/ | ||
export class DenoKvMessageQueue implements MessageQueue { | ||
#kv: Deno.Kv; | ||
|
||
/** | ||
* Constructs a new {@link DenoKvMessageQueue} adapter with the given Deno KV | ||
* store. | ||
* @param kv The Deno KV store to use. | ||
*/ | ||
constructor(kv: Deno.Kv) { | ||
this.#kv = kv; | ||
} | ||
|
||
/** | ||
* {@inheritDoc MessageQueue.enqueue} | ||
*/ | ||
async enqueue( | ||
// deno-lint-ignore no-explicit-any | ||
message: any, | ||
options?: MessageQueueEnqueueOptions | undefined, | ||
): Promise<void> { | ||
await this.#kv.enqueue( | ||
message, | ||
options?.delay == null ? undefined : { | ||
delay: options.delay.total("millisecond"), | ||
}, | ||
); | ||
} | ||
|
||
/** | ||
* {@inheritDoc MessageQueue.listen} | ||
*/ | ||
// deno-lint-ignore no-explicit-any | ||
listen(handler: (message: any) => void | Promise<void>): void { | ||
this.#kv.listenQueue(handler); | ||
} | ||
} |