diff --git a/platforms/astro.ts b/platforms/astro.ts index 72f6bcb..4fd3bbc 100644 --- a/platforms/astro.ts +++ b/platforms/astro.ts @@ -1,7 +1,8 @@ import type { APIContext, APIRoute } from "astro"; -import type { PublicServeOptions, WorkflowContext } from "../src"; +import { PublicServeOptions, WorkflowContext } from "../src"; import { serveBase } from "../src/serve"; +import { SDK_TELEMETRY } from "../src/constants"; export function serve( routeFunction: ( @@ -13,6 +14,13 @@ export function serve( const POST: APIRoute = (apiContext) => { const { handler } = serveBase( (workflowContext) => routeFunction(workflowContext, apiContext), + { + sdk: SDK_TELEMETRY, + framework: "astro", + runtime: process.versions.bun + ? `bun@${process.versions.bun}/node@${process.version}` + : `node@${process.version}`, + }, options ); diff --git a/platforms/cloudflare.ts b/platforms/cloudflare.ts index e5c9f5d..9061bbf 100644 --- a/platforms/cloudflare.ts +++ b/platforms/cloudflare.ts @@ -1,4 +1,5 @@ import type { PublicServeOptions, RouteFunction } from "../src"; +import { SDK_TELEMETRY } from "../src/constants"; import { serveBase } from "../src/serve"; export type WorkflowBindings = { @@ -62,10 +63,17 @@ export const serve = ( ): { fetch: (...args: PagesHandlerArgs | WorkersHandlerArgs) => Promise } => { const fetch = async (...args: PagesHandlerArgs | WorkersHandlerArgs) => { const { request, env } = getArgs(args); - const { handler: serveHandler } = serveBase(routeFunction, { - env, - ...options, - }); + const { handler: serveHandler } = serveBase( + routeFunction, + { + sdk: SDK_TELEMETRY, + framework: "cloudflare", + }, + { + env, + ...options, + } + ); return await serveHandler(request); }; return { fetch }; diff --git a/platforms/express.ts b/platforms/express.ts index 518bc14..a7a3781 100644 --- a/platforms/express.ts +++ b/platforms/express.ts @@ -1,4 +1,5 @@ import type { WorkflowServeOptions, RouteFunction } from "../src"; +import { SDK_TELEMETRY } from "../src/constants"; import { serveBase } from "../src/serve"; import { Request as ExpressRequest, @@ -43,7 +44,14 @@ export function serve( // create handler const { handler: serveHandler } = serveBase( - (workflowContext) => routeFunction(workflowContext), + routeFunction, + { + sdk: SDK_TELEMETRY, + framework: "express", + runtime: process.versions.bun + ? `bun@${process.versions.bun}/node@${process.version}` + : `node@${process.version}`, + }, { ...options, useJSONContent: true, diff --git a/platforms/h3.ts b/platforms/h3.ts index f647d09..ec0b570 100644 --- a/platforms/h3.ts +++ b/platforms/h3.ts @@ -3,6 +3,7 @@ import { defineEventHandler, readRawBody } from "h3"; import type { PublicServeOptions, RouteFunction } from "../src"; import { serveBase } from "../src/serve"; import type { IncomingHttpHeaders } from "node:http"; +import { SDK_TELEMETRY } from "../src/constants"; function transformHeaders(headers: IncomingHttpHeaders): [string, string][] { const formattedHeaders = Object.entries(headers).map(([key, value]) => [ @@ -37,7 +38,17 @@ export const serve = ( method: "POST", }); - const { handler: serveHandler } = serveBase(routeFunction, options); + const { handler: serveHandler } = serveBase( + routeFunction, + { + sdk: SDK_TELEMETRY, + framework: "h3", + runtime: process.versions.bun + ? `bun@${process.versions.bun}/node@${process.version}` + : `node@${process.version}`, + }, + options + ); return await serveHandler(request); }); diff --git a/platforms/hono.ts b/platforms/hono.ts index e9d5413..a0ab5bd 100644 --- a/platforms/hono.ts +++ b/platforms/hono.ts @@ -2,6 +2,7 @@ import type { Context } from "hono"; import type { PublicServeOptions, RouteFunction } from "../src"; import { serveBase } from "../src/serve"; import { Variables } from "hono/types"; +import { SDK_TELEMETRY } from "../src/constants"; export type WorkflowBindings = { QSTASH_TOKEN: string; @@ -32,12 +33,19 @@ export const serve = < const environment = context.env; const request = context.req.raw; - const { handler: serveHandler } = serveBase(routeFunction, { - // when hono is used without cf workers, it sends a DebugHTTPServer - // object in `context.env`. don't pass env if this is the case: - env: "QSTASH_TOKEN" in environment ? environment : undefined, - ...options, - }); + const { handler: serveHandler } = serveBase( + routeFunction, + { + sdk: SDK_TELEMETRY, + framework: "hono", + }, + { + // when hono is used without cf workers, it sends a DebugHTTPServer + // object in `context.env`. don't pass env if this is the case: + env: "QSTASH_TOKEN" in environment ? environment : undefined, + ...options, + } + ); return await serveHandler(request); }; return handler; diff --git a/platforms/nextjs.ts b/platforms/nextjs.ts index 362826b..8869cb3 100644 --- a/platforms/nextjs.ts +++ b/platforms/nextjs.ts @@ -4,6 +4,7 @@ import type { NextApiHandler, NextApiRequest, NextApiResponse } from "next"; import type { RouteFunction, PublicServeOptions } from "../src"; import { serveBase } from "../src/serve"; +import { SDK_TELEMETRY } from "../src/constants"; /** * Serve method to serve a Upstash Workflow in a Nextjs project @@ -20,6 +21,11 @@ export const serve = ( ): { POST: (request: Request) => Promise } => { const { handler: serveHandler } = serveBase( routeFunction, + { + sdk: SDK_TELEMETRY, + framework: "nextjs", + runtime: `node@${process.version}`, + }, options ); @@ -34,7 +40,17 @@ export const servePagesRouter = ( routeFunction: RouteFunction, options?: PublicServeOptions ): { handler: NextApiHandler } => { - const { handler: serveHandler } = serveBase(routeFunction, options); + const { handler: serveHandler } = serveBase( + routeFunction, + { + sdk: SDK_TELEMETRY, + framework: "nextjs-pages", + runtime: process.versions.bun + ? `bun@${process.versions.bun}/node@${process.version}` + : `node@${process.version}`, + }, + options + ); const handler = async (request_: NextApiRequest, res: NextApiResponse) => { if (request_.method?.toUpperCase() !== "POST") { diff --git a/platforms/solidjs.ts b/platforms/solidjs.ts index 9da7706..6382fc8 100644 --- a/platforms/solidjs.ts +++ b/platforms/solidjs.ts @@ -2,6 +2,7 @@ import type { APIEvent } from "@solidjs/start/server"; import type { PublicServeOptions, RouteFunction } from "../src"; import { serveBase } from "../src/serve"; +import { SDK_TELEMETRY } from "../src/constants"; /** * Serve method to serve a Upstash Workflow in a Nextjs project @@ -28,7 +29,17 @@ export const serve = ( } // create serve handler - const { handler: serveHandler } = serveBase(routeFunction, options); + const { handler: serveHandler } = serveBase( + routeFunction, + { + sdk: SDK_TELEMETRY, + framework: "solidjs", + runtime: process.versions.bun + ? `bun@${process.versions.bun}/node@${process.version}` + : `node@${process.version}`, + }, + options + ); return await serveHandler(event.request); }; diff --git a/platforms/svelte.ts b/platforms/svelte.ts index 9aa7b34..8490336 100644 --- a/platforms/svelte.ts +++ b/platforms/svelte.ts @@ -2,6 +2,7 @@ import type { RequestHandler } from "@sveltejs/kit"; import type { PublicServeOptions, RouteFunction } from "../src"; import { serveBase } from "../src/serve"; +import { SDK_TELEMETRY } from "../src/constants"; /** * Serve method to serve a Upstash Workflow in a Nextjs project @@ -19,10 +20,17 @@ export const serve = ( } ): { POST: RequestHandler } => { const handler: RequestHandler = async ({ request }) => { - const { handler: serveHandler } = serveBase(routeFunction, { - ...options, - useJSONContent: true, - }); + const { handler: serveHandler } = serveBase( + routeFunction, + { + sdk: SDK_TELEMETRY, + framework: "svelte", + }, + { + ...options, + useJSONContent: true, + } + ); return await serveHandler(request); }; diff --git a/src/client/index.ts b/src/client/index.ts index 6beea9c..2e19a2e 100644 --- a/src/client/index.ts +++ b/src/client/index.ts @@ -4,7 +4,6 @@ import { makeGetWaitersRequest, makeNotifyRequest } from "./utils"; import { getWorkflowRunId } from "../utils"; import { triggerFirstInvocation } from "../workflow-requests"; import { WorkflowContext } from "../context"; -import { DEFAULT_RETRIES } from "../constants"; type ClientConfig = ConstructorParameters[0]; @@ -214,8 +213,13 @@ export class Client { steps: [], url, workflowRunId: finalWorkflowRunId, + retries, + telemetry: undefined, // can't know workflow telemetry here + }); + const result = await triggerFirstInvocation({ + workflowContext: context, + telemetry: undefined, // can't know workflow telemetry here }); - const result = await triggerFirstInvocation(context, retries ?? DEFAULT_RETRIES); if (result.isOk()) { return { workflowRunId: finalWorkflowRunId }; } else { diff --git a/src/constants.ts b/src/constants.ts index f4a7ebc..7b8e1d4 100644 --- a/src/constants.ts +++ b/src/constants.ts @@ -1,3 +1,5 @@ +import { Telemetry } from "./types"; + export const WORKFLOW_ID_HEADER = "Upstash-Workflow-RunId"; export const WORKFLOW_INIT_HEADER = "Upstash-Workflow-Init"; export const WORKFLOW_URL_HEADER = "Upstash-Workflow-Url"; @@ -12,3 +14,15 @@ export const DEFAULT_CONTENT_TYPE = "application/json"; export const NO_CONCURRENCY = 1; export const NOT_SET = "not-set"; export const DEFAULT_RETRIES = 3; + +export const VERSION = "v0.2.3"; +export const SDK_TELEMETRY = `@upstash/workflow@${VERSION}`; + +export const TELEMETRY_HEADER_SDK = "Upstash-Telemetry-Sdk" as const; +export const TELEMETRY_HEADER_FRAMEWORK = "Upstash-Telemetry-Framework" as const; +export const TELEMETRY_HEADER_RUNTIME = "Upstash-Telemetry-Runtime" as const; + +export const MOCK_TELEMETRY: Telemetry = { + framework: "mock", + sdk: "mock", +}; diff --git a/src/context/auto-executor.ts b/src/context/auto-executor.ts index 0534e9e..cfc88c1 100644 --- a/src/context/auto-executor.ts +++ b/src/context/auto-executor.ts @@ -1,6 +1,6 @@ import { WorkflowAbort, WorkflowError } from "../error"; import type { WorkflowContext } from "./context"; -import type { StepFunction, ParallelCallState, Step, WaitRequest } from "../types"; +import type { StepFunction, ParallelCallState, Step, WaitRequest, Telemetry } from "../types"; import { LazyCallStep, type BaseLazyStep } from "./steps"; import { getHeaders } from "../workflow-requests"; import type { WorkflowLogger } from "../logger"; @@ -15,15 +15,24 @@ export class AutoExecutor { private readonly nonPlanStepCount: number; private readonly steps: Step[]; private indexInCurrentList = 0; + private telemetry?: Telemetry; + public stepCount = 0; public planStepCount = 0; protected executingStep: string | false = false; - constructor(context: WorkflowContext, steps: Step[], debug?: WorkflowLogger) { + constructor( + context: WorkflowContext, + steps: Step[], + telemetry?: Telemetry, + debug?: WorkflowLogger + ) { this.context = context; - this.debug = debug; this.steps = steps; + this.telemetry = telemetry; + this.debug = debug; + this.nonPlanStepCount = this.steps.filter((step) => !step.targetStep).length; } @@ -323,18 +332,21 @@ export class AutoExecutor { steps, }); + // must check length to be 1, otherwise was the if would return + // true for plan steps. if (steps[0].waitEventId && steps.length === 1) { const waitStep = steps[0]; - const { headers, timeoutHeaders } = getHeaders( - "false", - this.context.workflowRunId, - this.context.url, - this.context.headers, - waitStep, - this.context.failureUrl, - this.context.retries - ); + const { headers, timeoutHeaders } = getHeaders({ + initHeaderValue: "false", + workflowRunId: this.context.workflowRunId, + workflowUrl: this.context.url, + userHeaders: this.context.headers, + step: waitStep, + failureUrl: this.context.failureUrl, + retries: this.context.retries, + telemetry: this.telemetry, + }); // call wait const waitBody: WaitRequest = { @@ -366,17 +378,18 @@ export class AutoExecutor { const result = await this.context.qstashClient.batchJSON( steps.map((singleStep, index) => { const lazyStep = lazySteps[index]; - const { headers } = getHeaders( - "false", - this.context.workflowRunId, - this.context.url, - this.context.headers, - singleStep, - this.context.failureUrl, - this.context.retries, - lazyStep instanceof LazyCallStep ? lazyStep.retries : undefined, - lazyStep instanceof LazyCallStep ? lazyStep.timeout : undefined - ); + const { headers } = getHeaders({ + initHeaderValue: "false", + workflowRunId: this.context.workflowRunId, + workflowUrl: this.context.url, + userHeaders: this.context.headers, + step: singleStep, + failureUrl: this.context.failureUrl, + retries: this.context.retries, + callRetries: lazyStep instanceof LazyCallStep ? lazyStep.retries : undefined, + callTimeout: lazyStep instanceof LazyCallStep ? lazyStep.timeout : undefined, + telemetry: this.telemetry, + }); // if the step is a single step execution or a plan step, we can add sleep headers const willWait = singleStep.concurrent === NO_CONCURRENCY || singleStep.stepId === 0; diff --git a/src/context/context.ts b/src/context/context.ts index 974cc1a..31b8349 100644 --- a/src/context/context.ts +++ b/src/context/context.ts @@ -2,6 +2,7 @@ import type { CallResponse, CallSettings, NotifyStepResponse, + Telemetry, WaitEventOptions, WaitStepResponse, WorkflowClient, @@ -162,6 +163,7 @@ export class WorkflowContext { initialPayload, env, retries, + telemetry, }: { qstashClient: WorkflowClient; workflowRunId: string; @@ -173,6 +175,7 @@ export class WorkflowContext { initialPayload: TInitialPayload; env?: Record; retries?: number; + telemetry?: Telemetry; }) { this.qstashClient = qstashClient; this.workflowRunId = workflowRunId; @@ -184,7 +187,7 @@ export class WorkflowContext { this.env = env ?? {}; this.retries = retries ?? DEFAULT_RETRIES; - this.executor = new AutoExecutor(this, this.steps, debug); + this.executor = new AutoExecutor(this, this.steps, telemetry, debug); } /** diff --git a/src/serve/index.ts b/src/serve/index.ts index af8473f..a544218 100644 --- a/src/serve/index.ts +++ b/src/serve/index.ts @@ -1,8 +1,9 @@ import { makeCancelRequest } from "../client/utils"; +import { SDK_TELEMETRY } from "../constants"; import { WorkflowContext } from "../context"; import { formatWorkflowError } from "../error"; import { WorkflowLogger } from "../logger"; -import { RouteFunction, WorkflowServeOptions } from "../types"; +import { RouteFunction, Telemetry, WorkflowServeOptions } from "../types"; import { getPayload, handleFailure, parseRequest, validateRequest } from "../workflow-parser"; import { handleThirdPartyCallResult, @@ -33,6 +34,7 @@ export const serveBase = < TResponse extends Response = Response, >( routeFunction: RouteFunction, + telemetry?: Telemetry, options?: WorkflowServeOptions ): { handler: (request: TRequest) => Promise } => { // Prepares options with defaults if they are not provided. @@ -49,7 +51,9 @@ export const serveBase = < env, retries, useJSONContent, + disableTelemetry, } = processOptions(options); + telemetry = disableTelemetry ? undefined : telemetry; const debug = WorkflowLogger.getLogger(verbose); /** @@ -107,7 +111,10 @@ export const serveBase = < qstashClient, initialPayloadParser, routeFunction, - failureFunction + failureFunction, + env, + retries, + debug ); if (failureCheck.isErr()) { // unexpected error during handleFailure @@ -130,6 +137,7 @@ export const serveBase = < debug, env, retries, + telemetry, }); // attempt running routeFunction until the first step @@ -152,15 +160,16 @@ export const serveBase = < } // check if request is a third party call result - const callReturnCheck = await handleThirdPartyCallResult( + const callReturnCheck = await handleThirdPartyCallResult({ request, - rawInitialPayload, - qstashClient, + requestPayload: rawInitialPayload, + client: qstashClient, workflowUrl, - workflowFailureUrl, + failureUrl: workflowFailureUrl, retries, - debug - ); + telemetry, + debug, + }); if (callReturnCheck.isErr()) { // error while checking await debug?.log("ERROR", "SUBMIT_THIRD_PARTY_RESULT", { @@ -170,7 +179,7 @@ export const serveBase = < } else if (callReturnCheck.value === "continue-workflow") { // request is not third party call. Continue workflow as usual const result = isFirstInvocation - ? await triggerFirstInvocation(workflowContext, retries, useJSONContent, debug) + ? await triggerFirstInvocation({ workflowContext, useJSONContent, telemetry, debug }) : await triggerRouteFunction({ onStep: async () => routeFunction(workflowContext), onCleanup: async () => { @@ -229,5 +238,12 @@ export const serve = < routeFunction: RouteFunction, options?: Omit, "useJSONContent"> ): { handler: (request: TRequest) => Promise } => { - return serveBase(routeFunction, options); + return serveBase( + routeFunction, + { + sdk: SDK_TELEMETRY, + framework: "unknown", + }, + options + ); }; diff --git a/src/serve/options.ts b/src/serve/options.ts index a49dc43..855f96a 100644 --- a/src/serve/options.ts +++ b/src/serve/options.ts @@ -82,6 +82,7 @@ export const processOptions = { qstashClient, verbose: true, receiver: undefined, + disableTelemetry: true, } ); @@ -114,13 +115,41 @@ describe("serve", () => { ]; await driveWorkflow({ - execute: async (initialPayload, steps) => { - const request = getRequest(WORKFLOW_ENDPOINT, workflowRunId, initialPayload, steps); + execute: async (initialPayload, steps, first) => { + const request = first + ? new Request(WORKFLOW_ENDPOINT, { + body: JSON.stringify(initialPayload), + method: "POST", + }) + : getRequest(WORKFLOW_ENDPOINT, workflowRunId, initialPayload, steps); + const response = await endpoint(request); expect(response.status).toBe(200); }, initialPayload, iterations: [ + { + stepsToAdd: [], + responseFields: { + body: { messageId: "some-message-id" }, + status: 200, + }, + receivesRequest: { + method: "POST", + url: `${MOCK_QSTASH_SERVER_URL}/v2/publish/https://requestcatcher.com/api`, + token, + body: initialPayload, + headers: { + "upstash-feature-set": "LazyFetch,InitialBody", + "upstash-forward-upstash-workflow-sdk-version": "1", + "upstash-retries": "3", + "upstash-failure-callback-retries": "3", + "upstash-method": "POST", + "upstash-workflow-init": "true", + "upstash-workflow-url": WORKFLOW_ENDPOINT, + }, + }, + }, { stepsToAdd: [], responseFields: { @@ -349,6 +378,9 @@ describe("serve", () => { "upstash-workflow-init": "false", "upstash-workflow-runid": "wfr-foo", "upstash-workflow-url": WORKFLOW_ENDPOINT, + "upstash-telemetry-framework": "unknown", + "upstash-telemetry-runtime": "unknown", + "upstash-telemetry-sdk": "@upstash/workflow@v0.2.3", }, body: '{"stepId":3,"stepName":"step 3","stepType":"Run","out":"\\"combined results: result 1,result 2\\"","concurrent":1}', }, @@ -396,6 +428,9 @@ describe("serve", () => { "upstash-workflow-init": "false", "upstash-workflow-runid": "wfr-bar", "upstash-workflow-url": WORKFLOW_ENDPOINT, + "upstash-telemetry-framework": "unknown", + "upstash-telemetry-runtime": "unknown", + "upstash-telemetry-sdk": "@upstash/workflow@v0.2.3", }, body: '{"stepId":1,"stepName":"sleep-step","stepType":"SleepFor","sleepFor":1,"concurrent":1}', }, @@ -441,6 +476,9 @@ describe("serve", () => { "upstash-workflow-url": WORKFLOW_ENDPOINT, "upstash-failure-callback": myFailureEndpoint, "upstash-failure-callback-forward-upstash-workflow-is-failure": "true", + "upstash-telemetry-framework": "unknown", + "upstash-telemetry-runtime": "unknown", + "upstash-telemetry-sdk": "@upstash/workflow@v0.2.3", }, body: '{"stepId":1,"stepName":"sleep-step","stepType":"SleepFor","sleepFor":1,"concurrent":1}', }, @@ -488,6 +526,9 @@ describe("serve", () => { "upstash-workflow-url": WORKFLOW_ENDPOINT, "upstash-failure-callback": WORKFLOW_ENDPOINT, "upstash-failure-callback-forward-upstash-workflow-is-failure": "true", + "upstash-telemetry-framework": "unknown", + "upstash-telemetry-runtime": "unknown", + "upstash-telemetry-sdk": "@upstash/workflow@v0.2.3", }, body: '{"stepId":1,"stepName":"sleep-step","stepType":"SleepFor","sleepFor":1,"concurrent":1}', }, @@ -686,6 +727,9 @@ describe("serve", () => { "Upstash-Workflow-RunId": ["wfr-bar"], "Upstash-Workflow-Runid": ["wfr-bar"], "Upstash-Workflow-Url": [WORKFLOW_ENDPOINT], + "Upstash-Telemetry-Framework": ["unknown"], + "Upstash-Telemetry-Runtime": ["unknown"], + "Upstash-Telemetry-Sdk": ["@upstash/workflow@v0.2.3"], }, timeoutUrl: WORKFLOW_ENDPOINT, url: WORKFLOW_ENDPOINT, @@ -905,4 +949,71 @@ describe("serve", () => { }); }); }); + + test("should forward client headers", async () => { + const request = getRequest(WORKFLOW_ENDPOINT, "wfr-bar", "my-payload", []); + let called = false; + const myFailureFunction: WorkflowServeOptions["failureFunction"] = async () => { + return; + }; + + const header = "test-header"; + const headerValue = `test-header-value-${nanoid()}`; + const qstashClient = new Client({ + baseUrl: MOCK_QSTASH_SERVER_URL, + token, + headers: { + [header]: headerValue, + }, + }); + + const { handler: endpoint } = serve( + async (context) => { + await context.sleep("sleep-step", 1); + }, + { + qstashClient, + receiver: undefined, + failureFunction: myFailureFunction, + } + ); + await mockQStashServer({ + execute: async () => { + const response = await endpoint(request); + expect(response.status).toBe(200); + called = true; + }, + responseFields: { body: { messageId: "some-message-id" }, status: 200 }, + receivesRequest: { + method: "POST", + url: `${MOCK_QSTASH_SERVER_URL}/v2/batch`, + token, + body: [ + { + destination: WORKFLOW_ENDPOINT, + headers: { + "content-type": "application/json", + "upstash-feature-set": "LazyFetch,InitialBody", + "upstash-delay": "1s", + "upstash-forward-upstash-workflow-sdk-version": "1", + "upstash-method": "POST", + "upstash-retries": "3", + "upstash-failure-callback-retries": "3", + "upstash-workflow-init": "false", + "upstash-workflow-runid": "wfr-bar", + "upstash-workflow-url": WORKFLOW_ENDPOINT, + "upstash-failure-callback": WORKFLOW_ENDPOINT, + "upstash-failure-callback-forward-upstash-workflow-is-failure": "true", + "upstash-forward-test-header": headerValue, + "upstash-telemetry-framework": "unknown", + "upstash-telemetry-runtime": "unknown", + "upstash-telemetry-sdk": "@upstash/workflow@v0.2.3", + }, + body: '{"stepId":1,"stepName":"sleep-step","stepType":"SleepFor","sleepFor":1,"concurrent":1}', + }, + ], + }, + }); + expect(called).toBeTrue(); + }); }); diff --git a/src/test-utils.ts b/src/test-utils.ts index ad41af1..a8336a2 100644 --- a/src/test-utils.ts +++ b/src/test-utils.ts @@ -53,7 +53,7 @@ export const mockQStashServer = async ({ if (!receivesRequest) { return new Response("assertion in mock QStash failed. fetch shouldn't have been called.", { - status: 400, + status: 500, }); } const { method, url, token, body } = receivesRequest; @@ -85,7 +85,7 @@ export const mockQStashServer = async ({ if (error instanceof Error) { console.error(error); return new Response(`assertion in mock QStash failed.`, { - status: 400, + status: 500, }); } } @@ -120,18 +120,20 @@ export const driveWorkflow = async ({ initialPayload, iterations, }: { - execute: (intialPayload: unknown, steps: Step[]) => Promise; + execute: (intialPayload: unknown, steps: Step[], first: boolean) => Promise; initialPayload: unknown; iterations: IterationTape; }) => { const steps: Step[] = []; + let counter = 0; for (const { stepsToAdd, responseFields, receivesRequest } of iterations) { steps.push(...stepsToAdd); await mockQStashServer({ - execute: async () => execute(initialPayload, steps), + execute: async () => execute(initialPayload, steps, counter === 0), responseFields, receivesRequest, }); + counter++; } }; diff --git a/src/types.ts b/src/types.ts index 027f060..8c11a39 100644 --- a/src/types.ts +++ b/src/types.ts @@ -227,6 +227,27 @@ export type WorkflowServeOptions< * Not part of the public API. Only available in serveBase, which is not exported. */ useJSONContent?: boolean; + /** + * By default, Workflow SDK sends telemetry about SDK version, framework or runtime. + * + * Set `disableTelemetry` to disable this behavior. + */ + disableTelemetry?: boolean; +}; + +export type Telemetry = { + /** + * sdk version + */ + sdk: string; + /** + * platform (such as nextjs/cloudflare) + */ + framework: string; + /** + * node version + */ + runtime?: string; }; export type PublicServeOptions< @@ -337,3 +358,70 @@ export type CallSettings = { retries?: number; timeout?: Duration | number; }; + +export type HeaderParams = { + /** + * whether the request is a first invocation request. + */ + initHeaderValue: "true" | "false"; + /** + * run id of the workflow + */ + workflowRunId: string; + /** + * url where the workflow is hosted + */ + workflowUrl: string; + /** + * user headers which will be forwarded in the request + */ + userHeaders?: Headers; + /** + * failure url to call incase of failure + */ + failureUrl?: WorkflowServeOptions["failureUrl"]; + /** + * retry setting of requests except context.call + */ + retries?: number; + /** + * telemetry to include in timeoutHeaders. + * + * Only needed/used when the step is a waitForEvent step + */ + telemetry?: Telemetry; +} & ( + | { + /** + * step to generate headers for + */ + step: Step; + /** + * number of retries in context.call + */ + callRetries?: number; + /** + * timeout duration in context.call + */ + callTimeout?: number | Duration; + } + | { + /** + * step not passed. Either first invocation or simply getting headers for + * third party callack. + */ + step?: never; + /** + * number of retries in context.call + * + * set to never because this is not a context.call step + */ + callRetries?: never; + /** + * timeout duration in context.call + * + * set to never because this is not a context.call step + */ + callTimeout?: never; + } +); diff --git a/src/workflow-parser.test.ts b/src/workflow-parser.test.ts index 5206d01..bd4a8b1 100644 --- a/src/workflow-parser.test.ts +++ b/src/workflow-parser.test.ts @@ -753,7 +753,16 @@ describe("Workflow Parser", () => { }; // no failureFunction - const result1 = await handleFailure(request, "", client, initialPayloadParser, routeFunction); + const result1 = await handleFailure( + request, + "", + client, + initialPayloadParser, + routeFunction, + undefined, + {}, + 3 + ); expect(result1.isOk()).toBeTrue(); expect(result1.isOk() && result1.value === "not-failure-callback").toBeTrue(); @@ -764,7 +773,9 @@ describe("Workflow Parser", () => { client, initialPayloadParser, routeFunction, - failureFunction + failureFunction, + {}, + 0 ); expect(result2.isOk()).toBeTrue(); expect(result2.isOk() && result2.value === "not-failure-callback").toBeTrue(); @@ -789,7 +800,10 @@ describe("Workflow Parser", () => { "", client, initialPayloadParser, - routeFunction + routeFunction, + undefined, + {}, + 0 ); expect(result.isErr()).toBeTrue(); expect(result.isErr() && result.error.name).toBe(WorkflowError.name); @@ -817,7 +831,9 @@ describe("Workflow Parser", () => { client, initialPayloadParser, routeFunction, - failureFunction + failureFunction, + {}, + 3 ); expect(result.isErr()).toBeTrue(); expect(result.isErr() && result.error.message).toBe("my-error"); @@ -847,7 +863,9 @@ describe("Workflow Parser", () => { client, initialPayloadParser, routeFunction, - failureFunction + failureFunction, + {}, + 0 ); expect(result.isOk()).toBeTrue(); expect(result.isOk() && result.value).toBe("is-failure-callback"); @@ -867,7 +885,9 @@ describe("Workflow Parser", () => { client, initialPayloadParser, routeFunctionWithoutSteps, - failureFunction + failureFunction, + {}, + 3 ); expect(result.isErr()); diff --git a/src/workflow-parser.ts b/src/workflow-parser.ts index 329de5a..52a1e11 100644 --- a/src/workflow-parser.ts +++ b/src/workflow-parser.ts @@ -304,7 +304,9 @@ export const handleFailure = async ( WorkflowServeOptions >["initialPayloadParser"], routeFunction: RouteFunction, - failureFunction?: WorkflowServeOptions["failureFunction"], + failureFunction: WorkflowServeOptions["failureFunction"], + env: WorkflowServeOptions["env"], + retries: WorkflowServeOptions["retries"], debug?: WorkflowLogger ): Promise | Err> => { if (request.headers.get(WORKFLOW_FAILURE_HEADER) !== "true") { @@ -350,6 +352,9 @@ export const handleFailure = async ( url: url, failureUrl: url, debug, + env, + retries, + telemetry: undefined, // not going to make requests in authentication check }); // attempt running routeFunction until the first step diff --git a/src/workflow-requests.test.ts b/src/workflow-requests.test.ts index e0d7fa3..4735d58 100644 --- a/src/workflow-requests.test.ts +++ b/src/workflow-requests.test.ts @@ -46,11 +46,12 @@ describe("Workflow Requests", () => { headers: new Headers({}) as Headers, steps: [], url: WORKFLOW_ENDPOINT, + retries: 0, }); await mockQStashServer({ execute: async () => { - const result = await triggerFirstInvocation(context, 0); + const result = await triggerFirstInvocation({ workflowContext: context }); expect(result.isOk()).toBeTrue(); }, responseFields: { @@ -270,14 +271,18 @@ describe("Workflow Requests", () => { // create mock server and run the code await mockQStashServer({ execute: async () => { - const result = await handleThirdPartyCallResult( + const result = await handleThirdPartyCallResult({ request, - await request.text(), + requestPayload: await request.text(), client, - WORKFLOW_ENDPOINT, - WORKFLOW_ENDPOINT, - 2 - ); + workflowUrl: WORKFLOW_ENDPOINT, + failureUrl: WORKFLOW_ENDPOINT, + retries: 2, + telemetry: { + framework: "some-platform", + sdk: "some-sdk", + }, + }); expect(result.isOk()).toBeTrue(); // @ts-expect-error value will be set since stepFinish isOk expect(result.value).toBe("is-call-return"); @@ -343,14 +348,18 @@ describe("Workflow Requests", () => { }); const spy = spyOn(client, "publishJSON"); - const result = await handleThirdPartyCallResult( + const result = await handleThirdPartyCallResult({ request, - await request.text(), + requestPayload: await request.text(), client, - WORKFLOW_ENDPOINT, - WORKFLOW_ENDPOINT, - 3 - ); + workflowUrl: WORKFLOW_ENDPOINT, + failureUrl: WORKFLOW_ENDPOINT, + retries: 3, + telemetry: { + framework: "some-platform", + sdk: "some-sdk", + }, + }); expect(result.isOk()).toBeTrue(); // @ts-expect-error value will be set since stepFinish isOk expect(result.value).toBe("call-will-retry"); @@ -393,28 +402,36 @@ describe("Workflow Requests", () => { }); const spy = spyOn(client, "publishJSON"); - const initialResult = await handleThirdPartyCallResult( - initialRequest, - await initialRequest.text(), + const initialResult = await handleThirdPartyCallResult({ + request: initialRequest, + requestPayload: await initialRequest.text(), client, - WORKFLOW_ENDPOINT, - WORKFLOW_ENDPOINT, - 5 - ); + workflowUrl: WORKFLOW_ENDPOINT, + failureUrl: WORKFLOW_ENDPOINT, + retries: 5, + telemetry: { + framework: "some-platform", + sdk: "some-sdk", + }, + }); expect(initialResult.isOk()).toBeTrue(); // @ts-expect-error value will be set since stepFinish isOk expect(initialResult.value).toBe("continue-workflow"); expect(spy).toHaveBeenCalledTimes(0); // second call - const result = await handleThirdPartyCallResult( - workflowRequest, - await workflowRequest.text(), + const result = await handleThirdPartyCallResult({ + request: workflowRequest, + requestPayload: await workflowRequest.text(), client, - WORKFLOW_ENDPOINT, - WORKFLOW_ENDPOINT, - 0 - ); + workflowUrl: WORKFLOW_ENDPOINT, + failureUrl: WORKFLOW_ENDPOINT, + retries: 0, + telemetry: { + framework: "some-platform", + sdk: "some-sdk", + }, + }); expect(result.isOk()).toBeTrue(); // @ts-expect-error value will be set since stepFinish isOk expect(result.value).toBe("continue-workflow"); @@ -425,7 +442,11 @@ describe("Workflow Requests", () => { describe("getHeaders", () => { const workflowRunId = nanoid(); test("should create headers without step passed", () => { - const { headers, timeoutHeaders } = getHeaders("true", workflowRunId, WORKFLOW_ENDPOINT); + const { headers, timeoutHeaders } = getHeaders({ + initHeaderValue: "true", + workflowRunId, + workflowUrl: WORKFLOW_ENDPOINT, + }); expect(headers).toEqual({ [WORKFLOW_INIT_HEADER]: "true", [WORKFLOW_ID_HEADER]: workflowRunId, @@ -441,18 +462,17 @@ describe("Workflow Requests", () => { const stepName = "some step"; const stepType: StepType = "Run"; - const { headers, timeoutHeaders } = getHeaders( - "false", + const { headers, timeoutHeaders } = getHeaders({ + initHeaderValue: "false", workflowRunId, - WORKFLOW_ENDPOINT, - undefined, - { + workflowUrl: WORKFLOW_ENDPOINT, + step: { stepId, stepName, stepType: stepType, concurrent: 1, - } - ); + }, + }); expect(headers).toEqual({ [WORKFLOW_INIT_HEADER]: "false", [WORKFLOW_ID_HEADER]: workflowRunId, @@ -474,12 +494,11 @@ describe("Workflow Requests", () => { }; const callBody = undefined; - const { headers, timeoutHeaders } = getHeaders( - "false", + const { headers, timeoutHeaders } = getHeaders({ + initHeaderValue: "false", workflowRunId, - WORKFLOW_ENDPOINT, - undefined, - { + workflowUrl: WORKFLOW_ENDPOINT, + step: { stepId, stepName, stepType: stepType, @@ -488,8 +507,8 @@ describe("Workflow Requests", () => { callMethod, callHeaders, callBody, - } - ); + }, + }); expect(headers).toEqual({ [WORKFLOW_INIT_HEADER]: "false", [WORKFLOW_ID_HEADER]: workflowRunId, @@ -516,14 +535,13 @@ describe("Workflow Requests", () => { test("should include failure header", () => { const failureUrl = "https://my-failure-endpoint.com"; - const { headers, timeoutHeaders } = getHeaders( - "true", + const { headers, timeoutHeaders } = getHeaders({ + initHeaderValue: "true", workflowRunId, - WORKFLOW_ENDPOINT, - new Headers() as Headers, - undefined, - failureUrl - ); + workflowUrl: WORKFLOW_ENDPOINT, + userHeaders: new Headers() as Headers, + failureUrl, + }); expect(headers).toEqual({ [WORKFLOW_INIT_HEADER]: "true", [WORKFLOW_ID_HEADER]: workflowRunId, @@ -537,20 +555,19 @@ describe("Workflow Requests", () => { }); test("should return timeout headers for wait step", () => { - const { headers, timeoutHeaders } = getHeaders( - "false", + const { headers, timeoutHeaders } = getHeaders({ + initHeaderValue: "false", workflowRunId, - WORKFLOW_ENDPOINT, - undefined, - { + workflowUrl: WORKFLOW_ENDPOINT, + step: { stepId: 1, stepName: "waiting-step-name", stepType: "Wait", concurrent: 1, waitEventId: "wait event id", timeout: "20s", - } - ); + }, + }); expect(headers).toEqual({ "Upstash-Workflow-Init": "false", "Upstash-Workflow-RunId": workflowRunId, @@ -596,7 +613,7 @@ describe("Workflow Requests", () => { url: WORKFLOW_ENDPOINT, }); - await triggerFirstInvocation(context, 3); + await triggerFirstInvocation({ workflowContext: context }); const debug = new WorkflowLogger({ logLevel: "INFO", logOutput: "console" }); const spy = spyOn(debug, "log"); @@ -639,7 +656,11 @@ describe("Workflow Requests", () => { const debug = new WorkflowLogger({ logLevel: "INFO", logOutput: "console" }); const spy = spyOn(debug, "log"); - await triggerFirstInvocation(context, 3, false, debug); + await triggerFirstInvocation({ + workflowContext: context, + useJSONContent: false, + debug, + }); expect(spy).toHaveBeenCalledTimes(1); await workflowClient.cancel({ ids: [workflowRunId] }); @@ -690,7 +711,11 @@ describe("Workflow Requests", () => { const debug = new WorkflowLogger({ logLevel: "INFO", logOutput: "console" }); const spy = spyOn(debug, "log"); - await triggerFirstInvocation(context, 3, false, debug); + await triggerFirstInvocation({ + workflowContext: context, + useJSONContent: false, + debug, + }); expect(spy).toHaveBeenCalledTimes(1); await workflowClient.cancel({ ids: [workflowRunId] }); @@ -740,14 +765,31 @@ describe("Workflow Requests", () => { const debug = new WorkflowLogger({ logLevel: "INFO", logOutput: "console" }); const spy = spyOn(debug, "log"); - const resultOne = await triggerFirstInvocation(context, 3, false, debug); + const resultOne = await triggerFirstInvocation({ + workflowContext: context, + useJSONContent: false, + debug, + }); expect(resultOne.isOk()).toBeTrue(); // @ts-expect-error value will exist because of isOk expect(resultOne.value).toBe("success"); expect(spy).toHaveBeenCalledTimes(1); - const resultTwo = await triggerFirstInvocation(context, 0, false, debug); + const noRetryContext = new WorkflowContext({ + qstashClient, + workflowRunId: workflowRunId, + initialPayload: undefined, + headers: new Headers({}) as Headers, + steps: [], + url: WORKFLOW_ENDPOINT, + retries: 0, + }); + const resultTwo = await triggerFirstInvocation({ + workflowContext: noRetryContext, + useJSONContent: false, + debug, + }); expect(resultTwo.isOk()).toBeTrue(); // @ts-expect-error value will exist because of isOk expect(resultTwo.value).toBe("workflow-run-already-exists"); @@ -771,6 +813,9 @@ describe("Workflow Requests", () => { const deleteResult = await triggerWorkflowDelete(context, debug); expect(deleteResult).toEqual({ deleted: true }); + + const deleteResultSecond = await triggerWorkflowDelete(noRetryContext, debug); + expect(deleteResultSecond).toEqual({ deleted: false }); }, { timeout: 10000, diff --git a/src/workflow-requests.ts b/src/workflow-requests.ts index 3f91661..7ac3341 100644 --- a/src/workflow-requests.ts +++ b/src/workflow-requests.ts @@ -4,6 +4,9 @@ import { WorkflowAbort, WorkflowError } from "./error"; import type { WorkflowContext } from "./context"; import { DEFAULT_CONTENT_TYPE, + TELEMETRY_HEADER_FRAMEWORK, + TELEMETRY_HEADER_RUNTIME, + TELEMETRY_HEADER_SDK, WORKFLOW_FAILURE_HEADER, WORKFLOW_FEATURE_HEADER, WORKFLOW_ID_HEADER, @@ -14,9 +17,10 @@ import { } from "./constants"; import type { CallResponse, - Duration, + HeaderParams, Step, StepType, + Telemetry, WorkflowClient, WorkflowReceiver, WorkflowServeOptions, @@ -26,21 +30,26 @@ import type { WorkflowLogger } from "./logger"; import { QstashError } from "@upstash/qstash"; import { getSteps } from "./client/utils"; -export const triggerFirstInvocation = async ( - workflowContext: WorkflowContext, - retries: number, - useJSONContent?: boolean, - debug?: WorkflowLogger -): Promise | Err> => { - const { headers } = getHeaders( - "true", - workflowContext.workflowRunId, - workflowContext.url, - workflowContext.headers, - undefined, - workflowContext.failureUrl, - retries - ); +export const triggerFirstInvocation = async ({ + workflowContext, + useJSONContent, + telemetry, + debug, +}: { + workflowContext: WorkflowContext; + useJSONContent?: boolean; + telemetry?: Telemetry; + debug?: WorkflowLogger; +}): Promise | Err> => { + const { headers } = getHeaders({ + initHeaderValue: "true", + workflowRunId: workflowContext.workflowRunId, + workflowUrl: workflowContext.url, + userHeaders: workflowContext.headers, + failureUrl: workflowContext.failureUrl, + retries: workflowContext.retries, + telemetry, + }); if (useJSONContent) { headers["content-type"] = "application/json"; @@ -207,15 +216,25 @@ export const recreateUserHeaders = (headers: Headers): Headers => { * @param client QStash client * @returns */ -export const handleThirdPartyCallResult = async ( - request: Request, - requestPayload: string, - client: WorkflowClient, - workflowUrl: string, - failureUrl: WorkflowServeOptions["failureUrl"], - retries: number, - debug?: WorkflowLogger -): Promise< +export const handleThirdPartyCallResult = async ({ + request, + requestPayload, + client, + workflowUrl, + failureUrl, + retries, + telemetry, + debug, +}: { + request: Request; + requestPayload: string; + client: WorkflowClient; + workflowUrl: string; + failureUrl: WorkflowServeOptions["failureUrl"]; + retries: number; + telemetry?: Telemetry; + debug?: WorkflowLogger; +}): Promise< | Ok<"is-call-return" | "continue-workflow" | "call-will-retry" | "workflow-ended", never> | Err > => { @@ -311,15 +330,15 @@ export const handleThirdPartyCallResult = async ( } const userHeaders = recreateUserHeaders(request.headers as Headers); - const { headers: requestHeaders } = getHeaders( - "false", + const { headers: requestHeaders } = getHeaders({ + initHeaderValue: "false", workflowRunId, workflowUrl, userHeaders, - undefined, failureUrl, - retries - ); + retries, + telemetry, + }); const callResponse: CallResponse = { status: callbackMessage.status, @@ -368,32 +387,39 @@ export type HeadersResponse = { timeoutHeaders?: Record; }; +export const getTelemetryHeaders = (telemetry: Telemetry) => { + return { + [TELEMETRY_HEADER_SDK]: telemetry.sdk, + [TELEMETRY_HEADER_FRAMEWORK]: telemetry.framework, + [TELEMETRY_HEADER_RUNTIME]: telemetry.runtime ?? "unknown", + }; +}; + /** * Gets headers for calling QStash * - * @param initHeaderValue Whether the invocation should create a new workflow - * @param workflowRunId id of the workflow - * @param workflowUrl url of the workflow endpoint - * @param step step to get headers for. If the step is a third party call step, more - * headers are added. + * See HeaderParams for more details about parameters. + * * @returns headers to submit */ -export const getHeaders = ( - initHeaderValue: "true" | "false", - workflowRunId: string, - workflowUrl: string, - userHeaders?: Headers, - step?: Step, - failureUrl?: WorkflowServeOptions["failureUrl"], - retries?: number, - callRetries?: number, - callTimeout?: number | Duration -): HeadersResponse => { +export const getHeaders = ({ + initHeaderValue, + workflowRunId, + workflowUrl, + userHeaders, + failureUrl, + retries, + step, + callRetries, + callTimeout, + telemetry, +}: HeaderParams): HeadersResponse => { const baseHeaders: Record = { [WORKFLOW_INIT_HEADER]: initHeaderValue, [WORKFLOW_ID_HEADER]: workflowRunId, [WORKFLOW_URL_HEADER]: workflowUrl, [WORKFLOW_FEATURE_HEADER]: "LazyFetch,InitialBody", + ...(telemetry ? getTelemetryHeaders(telemetry) : {}), }; if (!step?.callUrl) { @@ -483,6 +509,15 @@ export const getHeaders = ( ...Object.fromEntries( Object.entries(baseHeaders).map(([header, value]) => [header, [value]]) ), + // to include telemetry headers: + ...(telemetry + ? Object.fromEntries( + Object.entries(getTelemetryHeaders(telemetry)).map(([header, value]) => [ + header, + [value], + ]) + ) + : {}), // note: using WORKFLOW_ID_HEADER doesn't work, because Runid -> RunId: "Upstash-Workflow-Runid": [workflowRunId], [WORKFLOW_INIT_HEADER]: ["false"],