Skip to content

Commit

Permalink
Load metadata from stream instead of headers. (#31)
Browse files Browse the repository at this point in the history
* Handle base64 encoded content headers

* Parse the metadata from the stream instead of headers.

* Extract the right parameter for metadata.
  • Loading branch information
jmoseley authored Jul 20, 2024
1 parent b2783a4 commit 9305641
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 48 deletions.
4 changes: 2 additions & 2 deletions chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ export class Chat {
decoder,
readableStream,
opts.statusStream,
).then((content) => {
).then(([content]) => {
const messages: Message[] = [
{
role: "user",
Expand Down Expand Up @@ -296,7 +296,7 @@ export class Chat {
decoder,
readableStream,
opts.statusStream,
).then((content) => {
).then(([content]) => {
this.messages.push(
{
role: "user",
Expand Down
84 changes: 42 additions & 42 deletions content.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,18 @@ export type ContentListOptions = {
cortexName?: string;
};

export type ContentMetadata = {
id: string;
title: string;
version: number;
commands: ContentCommand[];
cortex: string;
createdAt: string;
userEmail: string;
status: ContentStatus;
publishedVersion: number | null;
};

export class Content {
get id() {
return this._id;
Expand Down Expand Up @@ -194,44 +206,37 @@ export class Content {
title,
prompt,
stream,
noContentInHeaders: true,
});
const reader = res.body!.getReader();
const decoder = new TextDecoder("utf-8");

const id: string = res.headers.get("id") || "";
const version: number = parseInt(res.headers.get("version") || "0");
const userEmail = res.headers.get("userEmail") || undefined;
const createdAt: string = res.headers.get("createdAt") || "";
const status: ContentStatus = res.headers.get("status") as ContentStatus;
const publishedVersion: number | undefined = numberOrUndefined(
res.headers.get("publishedVersion"),
);
const commands: ContentCommand[] = JSON.parse(
res.headers.get("commands") || "[]",
);

const readableStream = new Readable({
read() {},
});

const contentPromise = processStream(
const contentPromise = processStream<ContentMetadata>(
reader,
decoder,
readableStream,
opts.statusStream,
).then((content) => {
).then(([content, metadata]) => {
if (!metadata) {
throw new Error("Metadata not found in stream");
}

return new Content(
client,
id,
title,
metadata.id,
metadata.title,
content,
commands,
version,
createdAt,
status,
cortex.name,
userEmail,
publishedVersion,
metadata.commands,
metadata.version,
metadata.createdAt,
metadata.status,
metadata.cortex,
metadata.userEmail,
metadata.publishedVersion || undefined,
);
});

Expand Down Expand Up @@ -317,38 +322,33 @@ export class Content {
const res = await this.apiClient.POST(`/content/${this._id}/refine`, {
prompt,
stream: true,
noContentInHeaders: true,
});
const reader = res.body!.getReader();
const decoder = new TextDecoder("utf-8");

const version: number = parseInt(res.headers.get("version") || "0");
const createdAt = res.headers.get("createdAt") || "";
const userEmail = res.headers.get("userEmail") || undefined;
const commands: ContentCommand[] = JSON.parse(
res.headers.get("commands") || "[]",
);
const status = res.headers.get("status") as ContentStatus;
const publishedVersion: number | undefined = numberOrUndefined(
res.headers.get("publishedVersion"),
);
this._version = version;
this._commands = commands;
this._createdAt = createdAt;
this._userEmail = userEmail;
this._status = status;
this._publishedVersion = publishedVersion;

const readableStream = new Readable({
read() {},
});

const contentPromise = processStream(
const contentPromise = processStream<ContentMetadata>(
reader,
decoder,
readableStream,
opts.statusStream,
).then((content) => {
).then(([content, metadata]) => {
if (!metadata) {
throw new Error("Metadata not found in stream");
}

this._content = content;
this._version = metadata.version;
this._commands = metadata.commands;
this._createdAt = metadata.createdAt;
this._userEmail = metadata.userEmail;
this._status = metadata.status;
this._publishedVersion = metadata.publishedVersion || undefined;
this._title = metadata.title;
return this;
});

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"url": "https://github.com/cortexclick/cortex-sdk",
"type": "git"
},
"version": "0.0.4",
"version": "0.0.5",
"type": "module",
"main": "index.js",
"scripts": {
Expand Down
10 changes: 7 additions & 3 deletions utils/streaming.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import { Readable } from "stream";

export async function processStream(
export async function processStream<Metadata extends Record<string, unknown>>(
reader: ReadableStreamDefaultReader<Uint8Array>,
decoder: TextDecoder,
contentStream: Readable,
statusStream?: Readable,
): Promise<string> {
): Promise<[string, Metadata | undefined]> {
let buffer = "";
let fullContent = "";
let isStatusStreamOpen = true;

let metadata: Metadata | undefined = undefined;

const processNextChunk = async (): Promise<void> => {
const { done, value } = await reader.read();
if (done) {
Expand Down Expand Up @@ -43,6 +45,8 @@ export async function processStream(
// t:s = status message
else if (json.messageType === "status" && statusStream) {
statusStream.push(line + "\n");
} else if (json.messageType === "metadata") {
metadata = json.data;
}
} catch (e) {
console.error("Error parsing JSON:", e);
Expand All @@ -57,5 +61,5 @@ export async function processStream(
contentStream.emit("error", error);
});

return fullContent;
return [fullContent, metadata];
}

0 comments on commit 9305641

Please sign in to comment.