From 321f05c3c2071138e63bacd8dcc447c6d1929d5c Mon Sep 17 00:00:00 2001 From: Kitson Kelly Date: Thu, 22 Feb 2024 16:23:28 +1100 Subject: [PATCH] feat: add setBlob and deleteBlob to batched atomic Closes #7 --- README.md | 11 ++++++ batched_atomic.test.ts | 37 +++++++++++++++++++ batched_atomic.ts | 83 +++++++++++++++++++++++++++++++++--------- blob.test.ts | 17 +++++++++ blob.ts | 66 ++------------------------------- blob_util.ts | 78 +++++++++++++++++++++++++++++++++++++++ 6 files changed, 212 insertions(+), 80 deletions(-) create mode 100644 blob_util.ts diff --git a/README.md b/README.md index 264eeea..c20b471 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,17 @@ where currently only 10 operations operations can be part of a commit. Similar to `Deno.Kv#atomic()`, but will batch individual transactions across as many atomic operations as necessary. +There are two additional methods supported on batched atomics not supported by +Deno KV atomic transactions: + +- `.setBlob(key, value, options?)` - Allows setting of arbitrarily size blob + values as part of an atomic transaction. The values can be a byte + `ReadableStream` or array buffer like. It will work around the constraints of + Deno KV value sizes by splitting the value across multiple keys. + +- `.deleteBlob(key)` - Allows deletion of all parts of a blob value as part of + an atomic transaction. + The `commit()` method will return a promise which resolves with an array of results based on how many batches the operations was broken up into. diff --git a/batched_atomic.test.ts b/batched_atomic.test.ts index a1a3a40..15b8d04 100644 --- a/batched_atomic.test.ts +++ b/batched_atomic.test.ts @@ -6,8 +6,10 @@ import { setup, teardown, } from "./_test_util.ts"; +import { keys } from "./keys.ts"; import { batchedAtomic } from "./batched_atomic.ts"; +import { set } from "./blob.ts"; Deno.test({ name: "batched atomic handles checks", @@ -61,3 +63,38 @@ Deno.test({ return teardown(); }, }); + +Deno.test({ + name: "batched atomic supports setting blobs", + async fn() { + const kv = await setup(); + const blob = new Uint8Array(65_536); + window.crypto.getRandomValues(blob); + const operation = batchedAtomic(kv); + operation.setBlob(["hello"], blob); + await operation.commit(); + const actual = await keys(kv, { prefix: ["hello"] }); + assertEquals(actual, [ + ["hello", "__kv_toolbox_blob__", 1], + ["hello", "__kv_toolbox_blob__", 2], + ]); + return teardown(); + }, +}); + +Deno.test({ + name: "batched atomic supports deleting blobs", + async fn() { + const kv = await setup(); + const blob = new Uint8Array(65_536); + window.crypto.getRandomValues(blob); + await set(kv, ["hello"], blob); + assertEquals((await keys(kv, { prefix: ["hello"] })).length, 2); + const operation = batchedAtomic(kv); + await operation + .deleteBlob(["hello"]) + .commit(); + assertEquals((await keys(kv, { prefix: ["hello"] })).length, 0); + return teardown(); + }, +}); diff --git a/batched_atomic.ts b/batched_atomic.ts index 321b4a4..335f31c 100644 --- a/batched_atomic.ts +++ b/batched_atomic.ts @@ -6,10 +6,23 @@ * @module */ +import { BLOB_KEY, setBlob } from "./blob_util.ts"; +import { keys } from "./keys.ts"; + /** The default batch size for atomic operations. */ const BATCH_SIZE = 10; -type AtomicOperationKeys = keyof Deno.AtomicOperation; +interface KVToolboxAtomicOperation extends Deno.AtomicOperation { + deleteBlob(key: Deno.KvKey): this; + + setBlob( + key: Deno.KvKey, + value: ArrayBufferLike | ReadableStream, + options?: { expireIn?: number }, + ): this; +} + +type AtomicOperationKeys = keyof KVToolboxAtomicOperation; export class BatchedAtomicOperation { #batchSize: number; @@ -19,7 +32,7 @@ export class BatchedAtomicOperation { #enqueue( operation: Op, - args: Parameters, + args: Parameters, ): this { this.#queue.push([operation, args]); return this; @@ -98,6 +111,19 @@ export class BatchedAtomicOperation { return this.#enqueue("set", [key, value, options]); } + /** + * Add to the operation a mutation that sets a blob value in the store if all + * checks pass during the commit. The blob can be any array buffer like + * structure or a byte {@linkcode ReadableStream}. + */ + setBlob( + key: Deno.KvKey, + value: ArrayBufferLike | ReadableStream, + options?: { expireIn?: number }, + ): this { + return this.#enqueue("setBlob", [key, value, options]); + } + /** * Add to the operation a mutation that deletes the specified key if all * checks pass during the commit. @@ -106,6 +132,14 @@ export class BatchedAtomicOperation { return this.#enqueue("delete", [key]); } + /** + * Add to the operation a set of mutations to delete the specified parts of + * a blob value if all checks pass during the commit. + */ + deleteBlob(key: Deno.KvKey): this { + return this.#enqueue("deleteBlob", [key]); + } + /** * Add to the operation a mutation that enqueues a value into the queue if all * checks pass during the commit. @@ -144,23 +178,38 @@ export class BatchedAtomicOperation { while (this.#queue.length) { const [method, args] = this.#queue.shift()!; count++; - if (method === "check") { - hasCheck = true; - } - // deno-lint-ignore no-explicit-any - (operation[method] as any).apply(operation, args); - if (count >= this.#batchSize || !this.#queue.length) { - const rp = operation.commit(); - results.push(rp); - if (this.#queue.length) { - if (hasCheck) { - const result = await rp; - if (!result.ok) { - break; + if (method === "setBlob") { + const queue = this.#queue; + this.#queue = []; + const [key, value, options] = args; + const items = await keys(this.#kv, { prefix: [...key, BLOB_KEY] }); + await setBlob(this, key, value, items.length, options); + this.#queue = [...this.#queue, ...queue]; + } else if (method === "deleteBlob") { + const [key] = args; + const items = await keys(this.#kv, { prefix: [...key, BLOB_KEY] }); + for (const item of items) { + this.#queue.unshift(["delete", [item]]); + } + } else { + if (method === "check") { + hasCheck = true; + } + // deno-lint-ignore no-explicit-any + (operation[method] as any).apply(operation, args); + if (count >= this.#batchSize || !this.#queue.length) { + const rp = operation.commit(); + results.push(rp); + if (this.#queue.length) { + if (hasCheck) { + const result = await rp; + if (!result.ok) { + break; + } } + count = 0; + operation = this.#kv.atomic(); } - count = 0; - operation = this.#kv.atomic(); } } } diff --git a/blob.test.ts b/blob.test.ts index 0a2b398..c8a43e7 100644 --- a/blob.test.ts +++ b/blob.test.ts @@ -25,6 +25,23 @@ Deno.test({ }, }); +Deno.test({ + name: "set - sets a blob value as a stream", + async fn() { + const kv = await setup(); + const data = new Uint8Array(65_536); + window.crypto.getRandomValues(data); + const blob = new Blob([data]); + await set(kv, ["hello"], blob.stream()); + const actual = await keys(kv, { prefix: ["hello"] }); + assertEquals(actual, [ + ["hello", "__kv_toolbox_blob__", 1], + ["hello", "__kv_toolbox_blob__", 2], + ]); + return teardown(); + }, +}); + Deno.test({ name: "set - replacing value sizes keys properly", async fn() { diff --git a/blob.ts b/blob.ts index 7d68f71..cec263c 100644 --- a/blob.ts +++ b/blob.ts @@ -8,15 +8,11 @@ * @module */ -import { - batchedAtomic, - type BatchedAtomicOperation, -} from "./batched_atomic.ts"; +import { batchedAtomic } from "./batched_atomic.ts"; +import { BLOB_KEY, CHUNK_SIZE, setBlob } from "./blob_util.ts"; import { keys } from "./keys.ts"; const BATCH_SIZE = 10; -const CHUNK_SIZE = 63_000; -const BLOB_KEY = "__kv_toolbox_blob__"; function asStream( kv: Deno.Kv, @@ -78,56 +74,6 @@ async function asUint8Array( return found ? value : null; } -function deleteKeys( - operation: BatchedAtomicOperation, - key: Deno.KvKey, - count: number, - length: number, -): BatchedAtomicOperation { - while (++count <= length) { - operation.delete([...key, BLOB_KEY, count]); - } - return operation; -} - -function writeArrayBuffer( - operation: BatchedAtomicOperation, - key: Deno.KvKey, - blob: ArrayBufferLike, - start = 0, - options?: { expireIn?: number }, -): [count: number, operation: BatchedAtomicOperation] { - const buffer = new Uint8Array(blob); - let offset = 0; - let count = start; - while (buffer.byteLength > offset) { - count++; - const chunk = buffer.subarray(offset, offset + CHUNK_SIZE); - operation.set([...key, BLOB_KEY, count], chunk, options); - offset += CHUNK_SIZE; - } - return [count, operation]; -} - -async function writeStream( - operation: BatchedAtomicOperation, - key: Deno.KvKey, - stream: ReadableStream, - options?: { expireIn?: number }, -): Promise<[count: number, operation: BatchedAtomicOperation]> { - let start = 0; - for await (const chunk of stream) { - [start, operation] = writeArrayBuffer( - operation, - key, - chunk, - start, - options, - ); - } - return [start, operation]; -} - /** Remove/delete a binary object from the store with a given key that has been * {@linkcode set}. * @@ -252,12 +198,6 @@ export async function set( ): Promise { const items = await keys(kv, { prefix: [...key, BLOB_KEY] }); let operation = batchedAtomic(kv); - let count; - if (blob instanceof ReadableStream) { - [count, operation] = await writeStream(operation, key, blob, options); - } else { - [count, operation] = writeArrayBuffer(operation, key, blob, 0, options); - } - operation = deleteKeys(operation, key, count, items.length); + operation = await setBlob(operation, key, blob, items.length, options); await operation.commit(); } diff --git a/blob_util.ts b/blob_util.ts new file mode 100644 index 0000000..50f83c7 --- /dev/null +++ b/blob_util.ts @@ -0,0 +1,78 @@ +/** + * This is an internal module which contains some of the blob writing + * functionality and is not part of the public API of kv-toolbox. + * + * @module + */ + +import { type BatchedAtomicOperation } from "./batched_atomic.ts"; + +export const BLOB_KEY = "__kv_toolbox_blob__"; +export const CHUNK_SIZE = 63_000; + +function deleteKeys( + operation: BatchedAtomicOperation, + key: Deno.KvKey, + count: number, + length: number, +): BatchedAtomicOperation { + while (++count <= length) { + operation.delete([...key, BLOB_KEY, count]); + } + return operation; +} + +function writeArrayBuffer( + operation: BatchedAtomicOperation, + key: Deno.KvKey, + blob: ArrayBufferLike, + start = 0, + options?: { expireIn?: number }, +): [count: number, operation: BatchedAtomicOperation] { + const buffer = new Uint8Array(blob); + let offset = 0; + let count = start; + while (buffer.byteLength > offset) { + count++; + const chunk = buffer.subarray(offset, offset + CHUNK_SIZE); + operation.set([...key, BLOB_KEY, count], chunk, options); + offset += CHUNK_SIZE; + } + return [count, operation]; +} + +async function writeStream( + operation: BatchedAtomicOperation, + key: Deno.KvKey, + stream: ReadableStream, + options?: { expireIn?: number }, +): Promise<[count: number, operation: BatchedAtomicOperation]> { + let start = 0; + for await (const chunk of stream) { + [start, operation] = writeArrayBuffer( + operation, + key, + chunk, + start, + options, + ); + } + return [start, operation]; +} + +export async function setBlob( + operation: BatchedAtomicOperation, + key: Deno.KvKey, + blob: ArrayBufferLike | ReadableStream, + itemCount: number, + options?: { expireIn?: number }, +) { + let count; + if (blob instanceof ReadableStream) { + [count, operation] = await writeStream(operation, key, blob, options); + } else { + [count, operation] = writeArrayBuffer(operation, key, blob, 0, options); + } + operation = deleteKeys(operation, key, count, itemCount); + return operation; +}