Skip to content

Commit

Permalink
feat: Improve the streaming API (#41)
Browse files Browse the repository at this point in the history
## Proposed changes

Improve the streaming API:

* Assume the stream is always an `AsyncIteratorIterable<string>`
* The implementation only needs to return the stream
* The component wrapper does the right thing based on the `props.stream`
value.
  • Loading branch information
jmoseley authored Jan 2, 2025
1 parent eb1a6e8 commit d47ebf4
Show file tree
Hide file tree
Showing 20 changed files with 295 additions and 273 deletions.
8 changes: 6 additions & 2 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,9 @@
"source.fixAll.eslint": "always",
"source.removeUnusedImports": "always"
},
"cSpell.words": ["gensx", "jsxs"]
}
"cSpell.words": [
"gensx",
"jsxs",
"Streamable"
]
}
2 changes: 1 addition & 1 deletion examples/blogWriter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ This example demonstrates how to use GenSX to create an AI-powered blog writing
pnpm install

# Run the example
pnpm start
pnpm run run
```

The example will generate a blog post based on the prompt "Write a blog post about the future of AI". You can modify the prompt in `index.tsx` to generate different content.
2 changes: 1 addition & 1 deletion examples/hackerNewsAnalyzer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ This example demonstrates how to use GenSX to create a workflow that analyzes Ha
pnpm install

# Run the example
pnpm start
pnpm run run
```

The example will:
Expand Down
49 changes: 5 additions & 44 deletions examples/hackerNewsAnalyzer/llm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ export interface LLMConfig {
retryDelay?: number;
}

export interface StreamResult<T> {
value: Promise<T>;
stream: () => AsyncIterator<string>;
export interface StreamResult {
stream: () => AsyncIterableIterator<string>;
}

class LLMError extends Error {
Expand All @@ -43,9 +42,7 @@ export function createLLMService(config: LLMConfig) {
} = config;

// Chat with streaming support
async function chatStream(
messages: ChatMessage[],
): Promise<StreamResult<string>> {
async function chatStream(messages: ChatMessage[]): Promise<StreamResult> {
// Create a single streaming request
const response = await openai.chat.completions.create({
model,
Expand All @@ -55,45 +52,10 @@ export function createLLMService(config: LLMConfig) {
stream: true,
});

// Split the stream into two
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
const [stream1, stream2] = response.tee();

// Create a promise that will resolve with the full text
let fullText = "";
const {
promise: valuePromise,
resolve: resolveValue,
reject: rejectValue,
} = (() => {
let resolve: (value: string) => void;
let reject: (error: Error) => void;
const promise = new Promise<string>((res, rej) => {
resolve = res;
reject = rej;
});
return { promise, resolve: resolve!, reject: reject! };
})();

// Accumulate the full text in the background using stream1
(async () => {
try {
for await (const chunk of stream1) {
const content = chunk.choices[0]?.delta?.content ?? "";
if (content) {
fullText += content;
}
}
resolveValue(fullText);
} catch (e) {
rejectValue(e instanceof Error ? e : new Error(String(e)));
}
})().catch(rejectValue); // Handle floating promise

// Create a stream generator function that yields chunks immediately from stream2
const getStream = async function* () {
try {
for await (const chunk of stream2) {
for await (const chunk of response) {
const content = chunk.choices[0]?.delta?.content ?? "";
if (content) {
yield content;
Expand All @@ -106,7 +68,6 @@ export function createLLMService(config: LLMConfig) {

return {
stream: getStream,
value: valuePromise,
};
}

Expand Down Expand Up @@ -156,7 +117,7 @@ export function createLLMService(config: LLMConfig) {
}

// Complete with streaming support
async function completeStream(prompt: string): Promise<StreamResult<string>> {
async function completeStream(prompt: string): Promise<StreamResult> {
return chatStream([{ role: "user", content: prompt }]);
}

Expand Down
2 changes: 1 addition & 1 deletion examples/streaming/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ This example demonstrates how to use GenSX's streaming capabilities with LLM res
pnpm install

# Run the example
pnpm start
pnpm run run
```

The example will run two versions of the same prompt:
Expand Down
7 changes: 2 additions & 5 deletions examples/streaming/chatCompletion.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,11 @@ interface ChatCompletionProps {
prompt: string;
}

export const ChatCompletion = gsx.StreamComponent<ChatCompletionProps, string>(
export const ChatCompletion = gsx.StreamComponent<ChatCompletionProps>(
async ({ prompt }) => {
// Use the LLM service's streaming API
const result = await llm.completeStream(prompt);

return {
stream: () => result.stream(),
value: result.value,
};
return result.stream();
},
);
47 changes: 38 additions & 9 deletions examples/streaming/index.tsx
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { gsx, Streamable } from "gensx";
import { setTimeout } from "timers/promises";

import { ChatCompletion } from "./chatCompletion.js";

// Example 3: Streaming vs non-streaming chat completion
async function runStreamingWithChildrenExample() {
const prompt =
"Write a 250 word story about an AI that discovers the meaning of friendship through a series of small interactions with humans. Be concise but meaningful.";
Expand All @@ -21,11 +21,9 @@ async function runStreamingWithChildrenExample() {
console.log("\n📝 Streaming version (processing tokens as they arrive):");
await gsx.execute(
<ChatCompletion stream={true} prompt={prompt}>
{async (response: Streamable<string>) => {
{async (response: Streamable) => {
// Print tokens as they arrive
for await (const token of {
[Symbol.asyncIterator]: () => response.stream(),
}) {
for await (const token of response) {
process.stdout.write(token);
}
process.stdout.write("\n");
Expand All @@ -48,23 +46,54 @@ async function runStreamingExample() {
console.log("✅ Complete response:", finalResult);

console.log("\n📝 Streaming version (processing tokens as they arrive):");
const response = await gsx.execute<Streamable<string>>(
const response = await gsx.execute<Streamable>(
<ChatCompletion stream={true} prompt={prompt} />,
);

for await (const token of {
[Symbol.asyncIterator]: () => response.stream(),
}) {
for await (const token of response) {
process.stdout.write(token);
}
process.stdout.write("\n");
console.log("✅ Streaming complete");
}

const GeneratorComponent = gsx.StreamComponent<{
foo: string;
iterations: number;
}>(async function* ({ foo, iterations }) {
await setTimeout(10);
for (let i = 1; i < iterations + 1; i++) {
console.log("🔥 GeneratorComponent", i);
yield `${i}: ${foo.repeat(i)}\n`;
await setTimeout(10);
}
});

async function streamingGeneratorExample() {
console.log("⚡️ StreamingGeneratorExample - return result from generator");
const response1 = await gsx.execute<string>(
<GeneratorComponent foo="bar" iterations={10} />,
);
console.log(`✅ Streaming complete:\n====\n${response1}====`);
console.log("⚡️ StreamingGeneratorExample - process generator result");
await gsx.execute<string>(
<GeneratorComponent stream={true} foo="bar" iterations={10}>
{async (response: Streamable) => {
for await (const token of response) {
process.stdout.write(token);
}
process.stdout.write("\n");
console.log("✅ Streaming complete");
}}
</GeneratorComponent>,
);
}

// Main function to run examples
async function main() {
await runStreamingWithChildrenExample();
await runStreamingExample();
await streamingGeneratorExample();
}

main().catch(console.error);
49 changes: 5 additions & 44 deletions examples/streaming/llm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ export interface LLMConfig {
retryDelay?: number;
}

export interface StreamResult<T> {
value: Promise<T>;
stream: () => AsyncIterator<string>;
export interface StreamResult {
stream: () => AsyncIterableIterator<string>;
}

class LLMError extends Error {
Expand All @@ -43,9 +42,7 @@ export function createLLMService(config: LLMConfig) {
} = config;

// Chat with streaming support
async function chatStream(
messages: ChatMessage[],
): Promise<StreamResult<string>> {
async function chatStream(messages: ChatMessage[]): Promise<StreamResult> {
// Create a single streaming request
const response = await openai.chat.completions.create({
model,
Expand All @@ -55,45 +52,10 @@ export function createLLMService(config: LLMConfig) {
stream: true,
});

// Split the stream into two
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
const [stream1, stream2] = response.tee();

// Create a promise that will resolve with the full text
let fullText = "";
const {
promise: valuePromise,
resolve: resolveValue,
reject: rejectValue,
} = (() => {
let resolve: (value: string) => void;
let reject: (error: Error) => void;
const promise = new Promise<string>((res, rej) => {
resolve = res;
reject = rej;
});
return { promise, resolve: resolve!, reject: reject! };
})();

// Accumulate the full text in the background using stream1
(async () => {
try {
for await (const chunk of stream1) {
const content = chunk.choices[0]?.delta?.content ?? "";
if (content) {
fullText += content;
}
}
resolveValue(fullText);
} catch (e) {
rejectValue(e instanceof Error ? e : new Error(String(e)));
}
})().catch(rejectValue); // Handle floating promise

// Create a stream generator function that yields chunks immediately from stream2
const getStream = async function* () {
try {
for await (const chunk of stream2) {
for await (const chunk of response) {
const content = chunk.choices[0]?.delta?.content ?? "";
if (content) {
yield content;
Expand All @@ -106,7 +68,6 @@ export function createLLMService(config: LLMConfig) {

return {
stream: getStream,
value: valuePromise,
};
}

Expand Down Expand Up @@ -156,7 +117,7 @@ export function createLLMService(config: LLMConfig) {
}

// Complete with streaming support
async function completeStream(prompt: string): Promise<StreamResult<string>> {
async function completeStream(prompt: string): Promise<StreamResult> {
return chatStream([{ role: "user", content: prompt }]);
}

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"build": "turbo run build --filter=\"./packages/*\"",
"build:examples": "turbo run build --filter=\"./examples/*\"",
"build:all": "turbo run build",
"dev": "turbo run dev",
"dev": "turbo run dev --filter=\"./packages/*\"",
"lint": "turbo run lint --filter=\"./packages/*\"",
"lint:examples": "turbo run lint --filter=\"./examples/*\"",
"lint:all": "turbo run lint",
Expand Down
3 changes: 2 additions & 1 deletion packages/gensx-openai/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"lib": [
"ES2020"
],
"resolveJsonModule": true,
"declaration": true,
"declarationMap": true,
"sourceMap": true,
Expand All @@ -21,7 +22,7 @@
"gensx": [
"../gensx/src"
]
},
}
},
"exclude": [
"node_modules",
Expand Down
19 changes: 19 additions & 0 deletions packages/gensx-openai/vitest.config.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,26 @@
// import swc from "unplugin-swc";
import * as path from "path";

import { loadEnv } from "vite";
import { defineConfig } from "vitest/config";

import tsconfig from "./tsconfig.json";

// Create an alias object from the paths in tsconfig.json
const alias = Object.fromEntries(
// For Each Path in tsconfig.json
Object.entries(tsconfig.compilerOptions.paths).map(([key, [value]]) => [
// Remove the "/*" from the key and resolve the path
key.replace("/*", ""),
// Remove the "/*" from the value Resolve the relative path
path.resolve(__dirname, value.replace("/*", "")),
]),
);

export default defineConfig({
resolve: {
alias,
},
test: {
root: "./",
globals: true,
Expand Down
Loading

0 comments on commit d47ebf4

Please sign in to comment.