Skip to content

Commit

Permalink
feat: add setBlob and deleteBlob to batched atomic
Browse files Browse the repository at this point in the history
Closes #7
  • Loading branch information
kitsonk committed Feb 22, 2024
1 parent cff4d3d commit 321f05c
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 80 deletions.
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,17 @@ where currently only 10 operations operations can be part of a commit.
Similar to `Deno.Kv#atomic()`, but will batch individual transactions across as
many atomic operations as necessary.

There are two additional methods supported on batched atomics not supported by
Deno KV atomic transactions:

- `.setBlob(key, value, options?)` - Allows setting of arbitrarily size blob
values as part of an atomic transaction. The values can be a byte
`ReadableStream` or array buffer like. It will work around the constraints of
Deno KV value sizes by splitting the value across multiple keys.

- `.deleteBlob(key)` - Allows deletion of all parts of a blob value as part of
an atomic transaction.

The `commit()` method will return a promise which resolves with an array of
results based on how many batches the operations was broken up into.

Expand Down
37 changes: 37 additions & 0 deletions batched_atomic.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import {
setup,
teardown,
} from "./_test_util.ts";
import { keys } from "./keys.ts";

import { batchedAtomic } from "./batched_atomic.ts";
import { set } from "./blob.ts";

Deno.test({
name: "batched atomic handles checks",
Expand Down Expand Up @@ -61,3 +63,38 @@ Deno.test({
return teardown();
},
});

Deno.test({
name: "batched atomic supports setting blobs",
async fn() {
const kv = await setup();
const blob = new Uint8Array(65_536);
window.crypto.getRandomValues(blob);
const operation = batchedAtomic(kv);
operation.setBlob(["hello"], blob);
await operation.commit();
const actual = await keys(kv, { prefix: ["hello"] });
assertEquals(actual, [
["hello", "__kv_toolbox_blob__", 1],
["hello", "__kv_toolbox_blob__", 2],
]);
return teardown();
},
});

Deno.test({
name: "batched atomic supports deleting blobs",
async fn() {
const kv = await setup();
const blob = new Uint8Array(65_536);
window.crypto.getRandomValues(blob);
await set(kv, ["hello"], blob);
assertEquals((await keys(kv, { prefix: ["hello"] })).length, 2);
const operation = batchedAtomic(kv);
await operation
.deleteBlob(["hello"])
.commit();
assertEquals((await keys(kv, { prefix: ["hello"] })).length, 0);
return teardown();
},
});
83 changes: 66 additions & 17 deletions batched_atomic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,23 @@
* @module
*/

import { BLOB_KEY, setBlob } from "./blob_util.ts";
import { keys } from "./keys.ts";

/** The default batch size for atomic operations. */
const BATCH_SIZE = 10;

type AtomicOperationKeys = keyof Deno.AtomicOperation;
interface KVToolboxAtomicOperation extends Deno.AtomicOperation {
deleteBlob(key: Deno.KvKey): this;

setBlob(
key: Deno.KvKey,
value: ArrayBufferLike | ReadableStream<Uint8Array>,
options?: { expireIn?: number },
): this;
}

type AtomicOperationKeys = keyof KVToolboxAtomicOperation;

export class BatchedAtomicOperation {
#batchSize: number;
Expand All @@ -19,7 +32,7 @@ export class BatchedAtomicOperation {

#enqueue<Op extends AtomicOperationKeys>(
operation: Op,
args: Parameters<Deno.AtomicOperation[Op]>,
args: Parameters<KVToolboxAtomicOperation[Op]>,
): this {
this.#queue.push([operation, args]);
return this;
Expand Down Expand Up @@ -98,6 +111,19 @@ export class BatchedAtomicOperation {
return this.#enqueue("set", [key, value, options]);
}

/**
* Add to the operation a mutation that sets a blob value in the store if all
* checks pass during the commit. The blob can be any array buffer like
* structure or a byte {@linkcode ReadableStream}.
*/
setBlob(
key: Deno.KvKey,
value: ArrayBufferLike | ReadableStream<Uint8Array>,
options?: { expireIn?: number },
): this {
return this.#enqueue("setBlob", [key, value, options]);
}

/**
* Add to the operation a mutation that deletes the specified key if all
* checks pass during the commit.
Expand All @@ -106,6 +132,14 @@ export class BatchedAtomicOperation {
return this.#enqueue("delete", [key]);
}

/**
* Add to the operation a set of mutations to delete the specified parts of
* a blob value if all checks pass during the commit.
*/
deleteBlob(key: Deno.KvKey): this {
return this.#enqueue("deleteBlob", [key]);
}

/**
* Add to the operation a mutation that enqueues a value into the queue if all
* checks pass during the commit.
Expand Down Expand Up @@ -144,23 +178,38 @@ export class BatchedAtomicOperation {
while (this.#queue.length) {
const [method, args] = this.#queue.shift()!;
count++;
if (method === "check") {
hasCheck = true;
}
// deno-lint-ignore no-explicit-any
(operation[method] as any).apply(operation, args);
if (count >= this.#batchSize || !this.#queue.length) {
const rp = operation.commit();
results.push(rp);
if (this.#queue.length) {
if (hasCheck) {
const result = await rp;
if (!result.ok) {
break;
if (method === "setBlob") {
const queue = this.#queue;
this.#queue = [];
const [key, value, options] = args;
const items = await keys(this.#kv, { prefix: [...key, BLOB_KEY] });
await setBlob(this, key, value, items.length, options);
this.#queue = [...this.#queue, ...queue];
} else if (method === "deleteBlob") {
const [key] = args;
const items = await keys(this.#kv, { prefix: [...key, BLOB_KEY] });
for (const item of items) {
this.#queue.unshift(["delete", [item]]);
}
} else {
if (method === "check") {
hasCheck = true;
}
// deno-lint-ignore no-explicit-any
(operation[method] as any).apply(operation, args);
if (count >= this.#batchSize || !this.#queue.length) {
const rp = operation.commit();
results.push(rp);
if (this.#queue.length) {
if (hasCheck) {
const result = await rp;
if (!result.ok) {
break;
}
}
count = 0;
operation = this.#kv.atomic();
}
count = 0;
operation = this.#kv.atomic();
}
}
}
Expand Down
17 changes: 17 additions & 0 deletions blob.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,23 @@ Deno.test({
},
});

Deno.test({
name: "set - sets a blob value as a stream",
async fn() {
const kv = await setup();
const data = new Uint8Array(65_536);
window.crypto.getRandomValues(data);
const blob = new Blob([data]);
await set(kv, ["hello"], blob.stream());
const actual = await keys(kv, { prefix: ["hello"] });
assertEquals(actual, [
["hello", "__kv_toolbox_blob__", 1],
["hello", "__kv_toolbox_blob__", 2],
]);
return teardown();
},
});

Deno.test({
name: "set - replacing value sizes keys properly",
async fn() {
Expand Down
66 changes: 3 additions & 63 deletions blob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,11 @@
* @module
*/

import {
batchedAtomic,
type BatchedAtomicOperation,
} from "./batched_atomic.ts";
import { batchedAtomic } from "./batched_atomic.ts";
import { BLOB_KEY, CHUNK_SIZE, setBlob } from "./blob_util.ts";
import { keys } from "./keys.ts";

const BATCH_SIZE = 10;
const CHUNK_SIZE = 63_000;
const BLOB_KEY = "__kv_toolbox_blob__";

function asStream(
kv: Deno.Kv,
Expand Down Expand Up @@ -78,56 +74,6 @@ async function asUint8Array(
return found ? value : null;
}

function deleteKeys(
operation: BatchedAtomicOperation,
key: Deno.KvKey,
count: number,
length: number,
): BatchedAtomicOperation {
while (++count <= length) {
operation.delete([...key, BLOB_KEY, count]);
}
return operation;
}

function writeArrayBuffer(
operation: BatchedAtomicOperation,
key: Deno.KvKey,
blob: ArrayBufferLike,
start = 0,
options?: { expireIn?: number },
): [count: number, operation: BatchedAtomicOperation] {
const buffer = new Uint8Array(blob);
let offset = 0;
let count = start;
while (buffer.byteLength > offset) {
count++;
const chunk = buffer.subarray(offset, offset + CHUNK_SIZE);
operation.set([...key, BLOB_KEY, count], chunk, options);
offset += CHUNK_SIZE;
}
return [count, operation];
}

async function writeStream(
operation: BatchedAtomicOperation,
key: Deno.KvKey,
stream: ReadableStream<Uint8Array>,
options?: { expireIn?: number },
): Promise<[count: number, operation: BatchedAtomicOperation]> {
let start = 0;
for await (const chunk of stream) {
[start, operation] = writeArrayBuffer(
operation,
key,
chunk,
start,
options,
);
}
return [start, operation];
}

/** Remove/delete a binary object from the store with a given key that has been
* {@linkcode set}.
*
Expand Down Expand Up @@ -252,12 +198,6 @@ export async function set(
): Promise<void> {
const items = await keys(kv, { prefix: [...key, BLOB_KEY] });
let operation = batchedAtomic(kv);
let count;
if (blob instanceof ReadableStream) {
[count, operation] = await writeStream(operation, key, blob, options);
} else {
[count, operation] = writeArrayBuffer(operation, key, blob, 0, options);
}
operation = deleteKeys(operation, key, count, items.length);
operation = await setBlob(operation, key, blob, items.length, options);
await operation.commit();
}
78 changes: 78 additions & 0 deletions blob_util.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/**
* This is an internal module which contains some of the blob writing
* functionality and is not part of the public API of kv-toolbox.
*
* @module
*/

import { type BatchedAtomicOperation } from "./batched_atomic.ts";

export const BLOB_KEY = "__kv_toolbox_blob__";
export const CHUNK_SIZE = 63_000;

function deleteKeys(
operation: BatchedAtomicOperation,
key: Deno.KvKey,
count: number,
length: number,
): BatchedAtomicOperation {
while (++count <= length) {
operation.delete([...key, BLOB_KEY, count]);
}
return operation;
}

function writeArrayBuffer(
operation: BatchedAtomicOperation,
key: Deno.KvKey,
blob: ArrayBufferLike,
start = 0,
options?: { expireIn?: number },
): [count: number, operation: BatchedAtomicOperation] {
const buffer = new Uint8Array(blob);
let offset = 0;
let count = start;
while (buffer.byteLength > offset) {
count++;
const chunk = buffer.subarray(offset, offset + CHUNK_SIZE);
operation.set([...key, BLOB_KEY, count], chunk, options);
offset += CHUNK_SIZE;
}
return [count, operation];
}

async function writeStream(
operation: BatchedAtomicOperation,
key: Deno.KvKey,
stream: ReadableStream<Uint8Array>,
options?: { expireIn?: number },
): Promise<[count: number, operation: BatchedAtomicOperation]> {
let start = 0;
for await (const chunk of stream) {
[start, operation] = writeArrayBuffer(
operation,
key,
chunk,
start,
options,
);
}
return [start, operation];
}

export async function setBlob(
operation: BatchedAtomicOperation,
key: Deno.KvKey,
blob: ArrayBufferLike | ReadableStream<Uint8Array>,
itemCount: number,
options?: { expireIn?: number },
) {
let count;
if (blob instanceof ReadableStream) {
[count, operation] = await writeStream(operation, key, blob, options);
} else {
[count, operation] = writeArrayBuffer(operation, key, blob, 0, options);
}
operation = deleteKeys(operation, key, count, itemCount);
return operation;
}

0 comments on commit 321f05c

Please sign in to comment.