From e9ecc87a4aca013a05f1eab4cc3033e7b9fc00f9 Mon Sep 17 00:00:00 2001 From: CahidArda Date: Mon, 14 Oct 2024 22:58:25 +0300 Subject: [PATCH] fix: add wait/notify tests --- src/client/index.test.ts | 47 ++++++++++++++ src/client/index.ts | 35 +++++----- src/context/auto-executor.ts | 20 ++---- src/context/context.test.ts | 118 ++++++++++++++++++++++++++++++++++ src/context/context.ts | 14 ++-- src/context/steps.test.ts | 41 +++++++++++- src/context/steps.ts | 12 ++-- src/index.ts | 1 + src/integration.test.ts | 99 +++++++++++++++++++++++++++- src/serve/serve.test.ts | 23 +++++++ src/types.ts | 72 ++++++++++++--------- src/workflow-parser.test.ts | 66 ++++++++++++++++++- src/workflow-parser.ts | 8 +-- src/workflow-requests.test.ts | 85 ++++++++++++++++++------ src/workflow-requests.ts | 29 ++++----- 15 files changed, 555 insertions(+), 115 deletions(-) create mode 100644 src/client/index.test.ts diff --git a/src/client/index.test.ts b/src/client/index.test.ts new file mode 100644 index 0000000..f249312 --- /dev/null +++ b/src/client/index.test.ts @@ -0,0 +1,47 @@ +import { describe, test } from "bun:test"; +import { MOCK_QSTASH_SERVER_URL, mockQStashServer } from "../test-utils"; +import { Client } from "."; +import { nanoid } from "../utils"; + +describe("workflow client", () => { + const token = nanoid(); + const client = new Client({ baseUrl: MOCK_QSTASH_SERVER_URL, token }); + + test("should send cancel", async () => { + const workflowRunId = `wfr-${nanoid()}`; + await mockQStashServer({ + execute: async () => { + await client.cancel({ workflowRunId }); + }, + responseFields: { + status: 200, + body: "msgId", + }, + receivesRequest: { + method: "DELETE", + url: `${MOCK_QSTASH_SERVER_URL}/v2/workflows/runs/${workflowRunId}?cancel=true`, + token, + }, + }); + }); + + test("should send notify", async () => { + const eventId = `event-id-${nanoid()}`; + const notifyData = { data: `notify-data-${nanoid()}` }; + await mockQStashServer({ + execute: async () => { + await client.notify({ eventId, notifyBody: JSON.stringify(notifyData) }); + }, + responseFields: { + status: 200, + body: "msgId", + }, + receivesRequest: { + method: "POST", + url: `${MOCK_QSTASH_SERVER_URL}/v2/notify/${eventId}`, + token, + body: notifyData, + }, + }); + }); +}); diff --git a/src/client/index.ts b/src/client/index.ts index 0ba5aed..be30713 100644 --- a/src/client/index.ts +++ b/src/client/index.ts @@ -1,16 +1,16 @@ import { NotifyResponse } from "../types"; -import { Client } from "@upstash/qstash" +import { Client as QStashClient } from "@upstash/qstash"; -type ClientConfig = ConstructorParameters[0]; +type ClientConfig = ConstructorParameters[0]; -export class WorkflowClient { - private client: Client +export class Client { + private client: QStashClient; constructor(clientConfig: ClientConfig) { if (!clientConfig.baseUrl || !clientConfig.token) { console.warn("[Upstash Workflow] url or the token is not set. client will not work."); - }; - this.client = new Client(clientConfig) + } + this.client = new QStashClient(clientConfig); } /** @@ -19,7 +19,7 @@ export class WorkflowClient { * @param workflowRunId run id of the workflow to delete * @returns true if workflow is succesfully deleted. Otherwise throws QStashError */ - public async cancel(workflowRunId: string) { + public async cancel({ workflowRunId }: { workflowRunId: string }) { const result = (await this.client.http.request({ path: ["v2", "workflows", "runs", `${workflowRunId}?cancel=true`], method: "DELETE", @@ -29,19 +29,24 @@ export class WorkflowClient { } /** - * Notify waiting - * - * @param eventId - * @param notifyData + * Notify a workflow run waiting for an event + * + * @param eventId event id to notify + * @param notifyData data to provide to the workflow */ - public async notify(eventId: string, notifyData: string): Promise { - + public async notify({ + eventId, + notifyBody, + }: { + eventId: string; + notifyBody?: string; + }): Promise { const result = (await this.client.http.request({ path: ["v2", "notify", eventId], method: "POST", - body: notifyData + body: notifyBody, })) as NotifyResponse[]; - return result + return result; } } diff --git a/src/context/auto-executor.ts b/src/context/auto-executor.ts index b4bb414..e4b6bd9 100644 --- a/src/context/auto-executor.ts +++ b/src/context/auto-executor.ts @@ -281,7 +281,7 @@ export class AutoExecutor { initialStepCount: number ): ParallelCallState { const remainingSteps = this.steps.filter( - (step) => (step.targetStep ?? step.stepId) >= initialStepCount + (step) => (step.targetStep || step.stepId) >= initialStepCount ); if (remainingSteps.length === 0) { @@ -321,15 +321,9 @@ export class AutoExecutor { length: steps.length, steps, }); - - if (steps[0].waitEventId) { - if (steps.length !== 1) { - throw new QStashWorkflowError( - `Received an unexpected step array. If a step has waitEventId, it should be by itself. Received instead: ${steps}` - ) - } - const waitStep = steps[0] + if (steps[0].waitEventId && steps.length === 1) { + const waitStep = steps[0]; const { headers, timeoutHeaders } = getHeaders( "false", @@ -353,9 +347,9 @@ export class AutoExecutor { stepType: "Wait", stepName: waitStep.stepName, concurrent: waitStep.concurrent, - targetStep: waitStep.targetStep - } - } + targetStep: waitStep.targetStep, + }, + }; await this.context.qstashClient.http.request({ path: ["v2", "wait", waitStep.waitEventId], @@ -531,6 +525,6 @@ const validateParallelSteps = (lazySteps: BaseLazyStep[], stepsFromRequest: Step * @returns sorted steps */ const sortSteps = (steps: Step[]): Step[] => { - const getStepId = (step: Step) => step.targetStep ?? step.stepId; + const getStepId = (step: Step) => step.targetStep || step.stepId; return steps.toSorted((step, stepOther) => getStepId(step) - getStepId(stepOther)); }; diff --git a/src/context/context.test.ts b/src/context/context.test.ts index 42750b5..5aedfb1 100644 --- a/src/context/context.test.ts +++ b/src/context/context.test.ts @@ -5,6 +5,12 @@ import { WorkflowContext } from "./context"; import { Client } from "@upstash/qstash"; import { nanoid } from "../utils"; import { QStashWorkflowError } from "../error"; +import { + WORKFLOW_ID_HEADER, + WORKFLOW_INIT_HEADER, + WORKFLOW_PROTOCOL_VERSION_HEADER, + WORKFLOW_URL_HEADER, +} from "../constants"; describe("context tests", () => { const token = nanoid(); @@ -144,4 +150,116 @@ describe("context tests", () => { }, }); }); + + describe("wait for event step", () => { + test("should send request to wait endpoint if there is a wait for event step", async () => { + const context = new WorkflowContext({ + qstashClient, + initialPayload: "my-payload", + steps: [], + url: WORKFLOW_ENDPOINT, + headers: new Headers() as Headers, + workflowRunId: "wfr-id", + }); + + const eventId = "my-event-id"; + await mockQStashServer({ + execute: () => { + const throws = () => context.waitForEvent("my-step", eventId, 20); + expect(throws).toThrowError("Aborting workflow after executing step 'my-step'."); + }, + responseFields: { + status: 200, + body: "msgId", + }, + receivesRequest: { + method: "POST", + url: `${MOCK_QSTASH_SERVER_URL}/v2/wait/${eventId}`, + token, + body: { + step: { + concurrent: 1, + stepId: 1, + stepName: "my-step", + stepType: "Wait", + }, + timeout: "20s", + timeoutHeaders: { + "Content-Type": ["application/json"], + [`Upstash-Forward-${WORKFLOW_PROTOCOL_VERSION_HEADER}`]: ["1"], + "Upstash-Retries": ["3"], + "Upstash-Workflow-CallType": ["step"], + [WORKFLOW_INIT_HEADER]: ["false"], + [WORKFLOW_ID_HEADER]: ["wfr-id"], + "Upstash-Workflow-Runid": ["wfr-id"], + [WORKFLOW_URL_HEADER]: [WORKFLOW_ENDPOINT], + }, + timeoutUrl: WORKFLOW_ENDPOINT, + url: WORKFLOW_ENDPOINT, + }, + }, + }); + }); + + test("should send request to batch endpoint if there is a parallel wait for event step", async () => { + const context = new WorkflowContext({ + qstashClient, + initialPayload: "my-payload", + steps: [], + url: WORKFLOW_ENDPOINT, + headers: new Headers() as Headers, + workflowRunId: "wfr-id", + }); + + const eventId = "my-event-id"; + await mockQStashServer({ + execute: () => { + const throws = () => + Promise.all([ + context.waitForEvent("my-wait-step", eventId, 20), + context.run("my-run-step", () => "foo"), + ]); + expect(throws).toThrowError("Aborting workflow after executing step 'my-wait-step'."); + }, + responseFields: { + status: 200, + body: "msgId", + }, + receivesRequest: { + method: "POST", + url: `${MOCK_QSTASH_SERVER_URL}/v2/batch`, + token, + body: [ + { + body: '{"stepId":0,"stepName":"my-wait-step","stepType":"Wait","waitEventId":"my-event-id","timeout":"20s","concurrent":2,"targetStep":1}', + destination: WORKFLOW_ENDPOINT, + headers: { + "content-type": "application/json", + "upstash-forward-upstash-workflow-sdk-version": "1", + "upstash-method": "POST", + "upstash-retries": "3", + "upstash-workflow-calltype": "step", + "upstash-workflow-init": "false", + "upstash-workflow-runid": "wfr-id", + "upstash-workflow-url": WORKFLOW_ENDPOINT, + }, + }, + { + body: '{"stepId":0,"stepName":"my-run-step","stepType":"Run","concurrent":2,"targetStep":2}', + destination: WORKFLOW_ENDPOINT, + headers: { + "content-type": "application/json", + "upstash-forward-upstash-workflow-sdk-version": "1", + "upstash-method": "POST", + "upstash-retries": "3", + "upstash-workflow-init": "false", + "upstash-workflow-runid": "wfr-id", + "upstash-workflow-url": WORKFLOW_ENDPOINT, + }, + }, + ], + }, + }); + }); + }); }); diff --git a/src/context/context.ts b/src/context/context.ts index 95ec283..8c09cea 100644 --- a/src/context/context.ts +++ b/src/context/context.ts @@ -2,7 +2,13 @@ import type { WaitStepResponse, WorkflowClient } from "../types"; import { type StepFunction, type Step } from "../types"; import { AutoExecutor } from "./auto-executor"; import type { BaseLazyStep } from "./steps"; -import { LazyCallStep, LazyFunctionStep, LazySleepStep, LazySleepUntilStep, LazyWaitStep } from "./steps"; +import { + LazyCallStep, + LazyFunctionStep, + LazySleepStep, + LazySleepUntilStep, + LazyWaitForEventStep, +} from "./steps"; import type { HTTPMethods } from "@upstash/qstash"; import type { WorkflowLogger } from "../logger"; import { DEFAULT_RETRIES } from "../constants"; @@ -292,15 +298,15 @@ export class WorkflowContext { public async waitForEvent( stepName: string, eventId: string, - timeout: string | number, + timeout: string | number ): Promise { const result = await this.addStep( - new LazyWaitStep( + new LazyWaitForEventStep( stepName, eventId, typeof timeout === "string" ? timeout : `${timeout}s` ) - ) + ); return result; } diff --git a/src/context/steps.test.ts b/src/context/steps.test.ts index a077ac1..cd3c30d 100644 --- a/src/context/steps.test.ts +++ b/src/context/steps.test.ts @@ -1,5 +1,11 @@ import { describe, test, expect } from "bun:test"; -import { LazyCallStep, LazyFunctionStep, LazySleepStep, LazySleepUntilStep } from "./steps"; +import { + LazyCallStep, + LazyFunctionStep, + LazySleepStep, + LazySleepUntilStep, + LazyWaitForEventStep, +} from "./steps"; import { nanoid } from "../utils"; import type { Step } from "../types"; @@ -141,4 +147,37 @@ describe("test steps", () => { }); }); }); + + describe("wait step", () => { + const eventId = "my-event-id"; + const timeout = "10s"; + const step = new LazyWaitForEventStep(stepName, eventId, timeout); + + test("should set correct fields", () => { + expect(step.stepName).toBe(stepName); + expect(step.stepType).toBe("Wait"); + }); + test("should create plan step", () => { + expect(step.getPlanStep(concurrent, targetStep)).toEqual({ + stepId: 0, + stepName, + stepType: "Wait", + waitEventId: eventId, + timeout, + concurrent, + targetStep, + }); + }); + + test("should create result step", async () => { + expect(await step.getResultStep(4, stepId)).toEqual({ + waitEventId: eventId, + timeout, + concurrent: 4, + stepId, + stepName, + stepType: "Wait", + }); + }); + }); }); diff --git a/src/context/steps.ts b/src/context/steps.ts index 127d28d..ad80cba 100644 --- a/src/context/steps.ts +++ b/src/context/steps.ts @@ -186,19 +186,19 @@ export class LazyCallStep extends BaseLazySt } } -export class LazyWaitStep extends BaseLazyStep { +export class LazyWaitForEventStep extends BaseLazyStep { private readonly eventId: string; private readonly timeout: string; - stepType: StepType = "Wait" + stepType: StepType = "Wait"; constructor( stepName: string, eventId: string, timeout: string // TODO: string format and accept number as smth ) { - super(stepName) + super(stepName); this.eventId = eventId; - this.timeout = timeout + this.timeout = timeout; } public getPlanStep(concurrent: number, targetStep: number): Step { @@ -209,8 +209,8 @@ export class LazyWaitStep extends BaseLazyStep { waitEventId: this.eventId, timeout: this.timeout, concurrent, - targetStep - } + targetStep, + }; } public async getResultStep(concurrent: number, stepId: number): Promise> { diff --git a/src/index.ts b/src/index.ts index 77cb4b9..f0fd609 100644 --- a/src/index.ts +++ b/src/index.ts @@ -2,4 +2,5 @@ export * from "./serve"; export * from "./context"; export * from "./types"; export * from "./logger"; +export * from "./client"; export { QStashWorkflowError, QStashWorkflowAbort } from "./error"; diff --git a/src/integration.test.ts b/src/integration.test.ts index 45fbc70..9f5abb5 100644 --- a/src/integration.test.ts +++ b/src/integration.test.ts @@ -50,9 +50,11 @@ import { serve } from "bun"; import { serve as workflowServe } from "../platforms/nextjs"; import { expect, test, describe } from "bun:test"; -import { Client } from "@upstash/qstash"; -import type { RouteFunction, WorkflowServeOptions } from "./types"; +import { Client as QStashClient } from "@upstash/qstash"; +import type { RouteFunction, WaitStepResponse, WorkflowServeOptions } from "./types"; import type { NextRequest } from "next/server"; +import { Client } from "./client"; +import { nanoid } from "./utils"; const WORKFLOW_PORT = "3000"; const THIRD_PARTY_PORT = "3001"; @@ -94,7 +96,7 @@ const attemptCharge = () => { return false; }; -const qstashClient = new Client({ +const qstashClient = new QStashClient({ baseUrl: process.env.MOCK_QSTASH_URL, token: process.env.MOCK_QSTASH_TOKEN ?? "", }); @@ -559,4 +561,95 @@ describe.skip("live serve tests", () => { timeout: 7000, } ); + + describe("wait for event", () => { + const runResult = "run-result"; + const testWaitEndpoint = async (expectedWaitResponse: WaitStepResponse, eventId: string) => { + const finishState = new FinishState(); + const payload = "my-payload"; + await testEndpoint({ + finalCount: 7, + waitFor: 15_000, + initialPayload: payload, + finishState, + routeFunction: async (context) => { + const input = context.requestPayload; + + expect(input).toBe(payload); + + const { notifyBody, timeout } = await context.waitForEvent( + "single wait for event", + eventId, + 1 + ); + expect(notifyBody).toBeUndefined(); + expect(timeout).toBeTrue(); + + const [runResponse, waitResponse] = await Promise.all([ + context.run("run-step", () => runResult), + context.waitForEvent("wait-event-step", eventId, 3), + ]); + expect(runResponse).toBe(runResult); + expect(waitResponse.notifyBody).toBe(expectedWaitResponse.notifyBody); + expect(waitResponse.timeout).toBe(expectedWaitResponse.timeout); + finishState.finish(); + }, + }); + }; + + test( + "should timeout correctly", + async () => { + const eventId = `my-event-id-${nanoid()}`; + await testWaitEndpoint( + { + notifyBody: undefined, + timeout: true, + }, + eventId + ); + }, + { timeout: 17_000 } + ); + + test( + "should notify correctly", + async () => { + const eventId = `my-event-id-${nanoid()}`; + const notifyBody = "notify-body"; + const workflowClient = new Client({ + baseUrl: process.env.MOCK_QSTASH_URL, + token: process.env.MOCK_QSTASH_TOKEN ?? "", + }); + + const notifyFinishState = new FinishState(); + async function retryUntilFalse(): Promise { + // wait to avoid notifying the first waitForEvent + await new Promise((resolve) => setTimeout(resolve, 3000)); + + while (true) { + await new Promise((resolve) => setTimeout(resolve, 1000)); + const result = await workflowClient.notify({ eventId, notifyBody }); + if (result) { + expect(result[0].waiter.url).toBe(`http://localhost:${WORKFLOW_PORT}`); + notifyFinishState.finish(); + break; + } + } + } + retryUntilFalse(); + + await testWaitEndpoint( + { + notifyBody, + timeout: false, + }, + eventId + ); + + notifyFinishState.check(); + }, + { timeout: 17_000 } + ); + }); }); diff --git a/src/serve/serve.test.ts b/src/serve/serve.test.ts index 03bbb90..0fb40ef 100644 --- a/src/serve/serve.test.ts +++ b/src/serve/serve.test.ts @@ -17,6 +17,7 @@ import { WORKFLOW_INIT_HEADER, WORKFLOW_PROTOCOL_VERSION_HEADER, } from "../constants"; +import { processOptions } from "./options"; const someWork = (input: string) => { return `processed '${input}'`; @@ -559,4 +560,26 @@ describe("serve", () => { await endpoint(request); expect(called).toBeTrue(); }); + + test("should not initialize verifier if keys are not set", () => { + const { receiver } = processOptions({ + env: { + QSTASH_URL: MOCK_QSTASH_SERVER_URL, + QSTASH_TOKEN: "mock-token", + }, + }); + expect(receiver).toBeUndefined(); + }); + + test("should initialize verifier if keys are set", () => { + const { receiver } = processOptions({ + env: { + QSTASH_URL: MOCK_QSTASH_SERVER_URL, + QSTASH_TOKEN: "mock-token", + QSTASH_CURRENT_SIGNING_KEY: "key-1", + QSTASH_NEXT_SIGNING_KEY: "key-2", + }, + }); + expect(receiver).toBeDefined(); + }); }); diff --git a/src/types.ts b/src/types.ts index 05140c8..cc65061 100644 --- a/src/types.ts +++ b/src/types.ts @@ -23,7 +23,15 @@ export type WorkflowReceiver = { verify: InstanceType["verify"]; }; -export const StepTypes = ["Initial", "Run", "SleepFor", "SleepUntil", "Call", "Wait", "Notify"] as const; +export const StepTypes = [ + "Initial", + "Run", + "SleepFor", + "SleepUntil", + "Call", + "Wait", + "Notify", +] as const; export type StepType = (typeof StepTypes)[number]; type ThirdPartyCallFields = { @@ -47,14 +55,14 @@ type ThirdPartyCallFields = { type WaitFields = { waitEventId: string; - timeout: string, + timeout: string; waitTimeout?: boolean; -} +}; type NotifyFields = { notifyEventId?: string; notifyBody?: string; -} +}; export type Step = { /** @@ -94,9 +102,9 @@ export type Step = { * set to the target step. */ targetStep?: number; -} & (ThirdPartyCallFields | { [P in keyof ThirdPartyCallFields]?: never }) -& (WaitFields | { [P in keyof WaitFields]?: never }) -& (NotifyFields | { [P in keyof NotifyFields]?: never }); +} & (ThirdPartyCallFields | { [P in keyof ThirdPartyCallFields]?: never }) & + (WaitFields | { [P in keyof WaitFields]?: never }) & + (NotifyFields | { [P in keyof NotifyFields]?: never }); export type RawStep = { messageId: string; @@ -228,36 +236,36 @@ export type FailureFunctionPayload = { export type RequiredExceptFields = Omit, K> & Partial>; export type WaitResult = { - result: TResult, - timeout: boolean -} + result: TResult; + timeout: boolean; +}; export type Waiter = { - url: string - deadline: number - headers: Record - timeoutUrl: string - timeoutBody: unknown - timeoutHeaders: Record -} + url: string; + deadline: number; + headers: Record; + timeoutUrl: string; + timeoutBody: unknown; + timeoutHeaders: Record; +}; export type NotifyResponse = { - waiter: Waiter - messageId: string - deduplicated: boolean - error: string -} + waiter: Waiter; + messageId: string; + deduplicated: boolean; + error: string; +}; export type WaitRequest = { - url: string, - timeout: string, - timeoutBody?: string, - timeoutUrl?: string - timeoutHeaders?: Record - step: Step -} + url: string; + timeout: string; + timeoutBody?: string; + timeoutUrl?: string; + timeoutHeaders?: Record; + step: Step; +}; export type WaitStepResponse = { - timeout: boolean, - notifyBody: unknown -} \ No newline at end of file + timeout: boolean; + notifyBody: unknown; +}; diff --git a/src/workflow-parser.test.ts b/src/workflow-parser.test.ts index 3799315..ef01e35 100644 --- a/src/workflow-parser.test.ts +++ b/src/workflow-parser.test.ts @@ -9,7 +9,7 @@ import { WORKFLOW_PROTOCOL_VERSION_HEADER, } from "./constants"; import { nanoid } from "./utils"; -import type { RawStep, Step, WorkflowServeOptions } from "./types"; +import type { RawStep, Step, WaitStepResponse, WorkflowServeOptions } from "./types"; import { getRequest, WORKFLOW_ENDPOINT } from "./test-utils"; import { formatWorkflowError, QStashWorkflowError } from "./error"; import { Client } from "@upstash/qstash"; @@ -226,6 +226,70 @@ describe("Workflow Parser", () => { expect(steps[1].stepId).toBe(remainingStepId); expect(isLastDuplicate).toBeFalse(); }); + + test("should overwrite the out field of wait step", async () => { + const timeoutStep: Step = { + stepId: 1, + stepName: "wait-step-name-1", + stepType: "Wait", + out: undefined, + waitTimeout: true, + waitEventId: "wait-event-1", + concurrent: 1, + timeout: "1s", + }; + const notifyStep: Step = { + stepId: 2, + stepName: "wait-step-name-2", + stepType: "Wait", + out: "notify-data", + waitTimeout: false, + waitEventId: "wait-event-2", + concurrent: 1, + timeout: "2s", + }; + + const payload: RawStep[] = [ + { + messageId: "msgId", + body: btoa("initial"), + callType: "step", + }, + { + messageId: "msgId", + body: btoa(JSON.stringify(timeoutStep)), + callType: "step", + }, + { + messageId: "msgId", + body: btoa(JSON.stringify(notifyStep)), + callType: "step", + }, + ]; + + const { rawInitialPayload, steps, isLastDuplicate } = await parseRequest( + JSON.stringify(payload), + false + ); + + expect(rawInitialPayload).toBe("initial"); + + expect(steps[0].stepType).toBe("Initial"); + expect(steps[1].stepType).toBe("Wait"); + expect(steps[2].stepType).toBe("Wait"); + + const timeoutResponse: WaitStepResponse = { + notifyBody: undefined, + timeout: true, + }; + expect(steps[1].out).toEqual(timeoutResponse); + + const notifyResponse: WaitStepResponse = { + notifyBody: "notify-data", + timeout: false, + }; + expect(steps[2].out).toEqual(notifyResponse); + }); }); describe("parseRequest with duplicates", () => { diff --git a/src/workflow-parser.ts b/src/workflow-parser.ts index 18414d0..0dea61a 100644 --- a/src/workflow-parser.ts +++ b/src/workflow-parser.ts @@ -66,15 +66,15 @@ const parsePayload = (rawPayload: string) => { // decode & parse other steps: const otherSteps = stepsToDecode.map((rawStep) => { - const step = JSON.parse(decodeBase64(rawStep.body)) as Step + const step = JSON.parse(decodeBase64(rawStep.body)) as Step; // if event is a wait event, overwrite the out with WaitStepResponse: if (step.waitEventId) { const newOut: WaitStepResponse = { notifyBody: step.out, - timeout: step.waitTimeout ?? false - } - step.out = newOut + timeout: step.waitTimeout ?? false, + }; + step.out = newOut; } return step; diff --git a/src/workflow-requests.test.ts b/src/workflow-requests.test.ts index 13ca15c..c8ad071 100644 --- a/src/workflow-requests.test.ts +++ b/src/workflow-requests.test.ts @@ -334,13 +334,14 @@ describe("Workflow Requests", () => { describe("getHeaders", () => { const workflowRunId = nanoid(); test("should create headers without step passed", () => { - const { headers } = getHeaders("true", workflowRunId, WORKFLOW_ENDPOINT); + const { headers, timeoutHeaders } = getHeaders("true", workflowRunId, WORKFLOW_ENDPOINT); expect(headers).toEqual({ [WORKFLOW_INIT_HEADER]: "true", [WORKFLOW_ID_HEADER]: workflowRunId, [WORKFLOW_URL_HEADER]: WORKFLOW_ENDPOINT, [`Upstash-Forward-${WORKFLOW_PROTOCOL_VERSION_HEADER}`]: WORKFLOW_PROTOCOL_VERSION, }); + expect(timeoutHeaders).toBeUndefined(); }); test("should create headers with a result step", () => { @@ -348,18 +349,25 @@ describe("Workflow Requests", () => { const stepName = "some step"; const stepType: StepType = "Run"; - const { headers } = getHeaders("false", workflowRunId, WORKFLOW_ENDPOINT, undefined, { - stepId, - stepName, - stepType: stepType, - concurrent: 1, - }); + const { headers, timeoutHeaders } = getHeaders( + "false", + workflowRunId, + WORKFLOW_ENDPOINT, + undefined, + { + stepId, + stepName, + stepType: stepType, + concurrent: 1, + } + ); expect(headers).toEqual({ [WORKFLOW_INIT_HEADER]: "false", [WORKFLOW_ID_HEADER]: workflowRunId, [WORKFLOW_URL_HEADER]: WORKFLOW_ENDPOINT, [`Upstash-Forward-${WORKFLOW_PROTOCOL_VERSION_HEADER}`]: WORKFLOW_PROTOCOL_VERSION, }); + expect(timeoutHeaders).toBeUndefined(); }); test("should create headers with a call step", () => { @@ -373,16 +381,22 @@ describe("Workflow Requests", () => { }; const callBody = undefined; - const { headers } = getHeaders("false", workflowRunId, WORKFLOW_ENDPOINT, undefined, { - stepId, - stepName, - stepType: stepType, - concurrent: 1, - callUrl, - callMethod, - callHeaders, - callBody, - }); + const { headers, timeoutHeaders } = getHeaders( + "false", + workflowRunId, + WORKFLOW_ENDPOINT, + undefined, + { + stepId, + stepName, + stepType: stepType, + concurrent: 1, + callUrl, + callMethod, + callHeaders, + callBody, + } + ); expect(headers).toEqual({ [WORKFLOW_INIT_HEADER]: "false", [WORKFLOW_ID_HEADER]: workflowRunId, @@ -403,11 +417,12 @@ describe("Workflow Requests", () => { "Upstash-Forward-my-custom-header": "my-custom-header-value", "Upstash-Workflow-CallType": "toCallback", }); + expect(timeoutHeaders).toBeUndefined(); }); test("should include failure header", () => { const failureUrl = "https://my-failure-endpoint.com"; - const { headers } = getHeaders( + const { headers, timeoutHeaders } = getHeaders( "true", workflowRunId, WORKFLOW_ENDPOINT, @@ -423,6 +438,40 @@ describe("Workflow Requests", () => { [`Upstash-Failure-Callback-Forward-${WORKFLOW_FAILURE_HEADER}`]: "true", "Upstash-Failure-Callback": failureUrl, }); + expect(timeoutHeaders).toBeUndefined(); + }); + + test("should return timeout headers for wait step", () => { + const { headers, timeoutHeaders } = getHeaders( + "false", + workflowRunId, + WORKFLOW_ENDPOINT, + undefined, + { + stepId: 1, + stepName: "waiting-step-name", + stepType: "Wait", + concurrent: 1, + waitEventId: "wait event id", + timeout: "20s", + } + ); + expect(headers).toEqual({ + "Upstash-Workflow-Init": "false", + "Upstash-Workflow-RunId": workflowRunId, + "Upstash-Workflow-Url": WORKFLOW_ENDPOINT, + "Upstash-Forward-Upstash-Workflow-Sdk-Version": "1", + "Upstash-Workflow-CallType": "step", + }); + expect(timeoutHeaders).toEqual({ + "Upstash-Workflow-Init": ["false"], + "Upstash-Workflow-RunId": [workflowRunId], + "Upstash-Workflow-Url": [WORKFLOW_ENDPOINT], + "Upstash-Forward-Upstash-Workflow-Sdk-Version": ["1"], + "Upstash-Workflow-Runid": [workflowRunId], + "Upstash-Workflow-CallType": ["step"], + "Content-Type": ["application/json"], + }); }); }); }); diff --git a/src/workflow-requests.ts b/src/workflow-requests.ts index 3398e41..47b3508 100644 --- a/src/workflow-requests.ts +++ b/src/workflow-requests.ts @@ -246,9 +246,9 @@ export const handleThirdPartyCallResult = async ( }; export type HeadersResponse = { - headers: Record, - timeoutHeaders?: Record -} + headers: Record; + timeoutHeaders?: Record; +}; /** * Gets headers for calling QStash @@ -299,11 +299,8 @@ export const getHeaders = ( } } - const contentType = ( - userHeaders - ? userHeaders.get("Content-Type") - : undefined - ) ?? DEFAULT_CONTENT_TYPE; + const contentType = + (userHeaders ? userHeaders.get("Content-Type") : undefined) ?? DEFAULT_CONTENT_TYPE; if (step?.callHeaders) { const forwardedHeaders = Object.fromEntries( @@ -330,12 +327,11 @@ export const getHeaders = ( "Upstash-Callback-Forward-Upstash-Workflow-Concurrent": step.concurrent.toString(), "Upstash-Callback-Forward-Upstash-Workflow-ContentType": contentType, "Upstash-Workflow-CallType": "toCallback", - } + }, }; } if (step?.waitEventId) { - return { headers: { ...baseHeaders, @@ -344,22 +340,19 @@ export const getHeaders = ( timeoutHeaders: { // to include user headers: ...Object.fromEntries( - Object.entries(baseHeaders).map(([header, value]) => [ - header, - [value], - ]) + Object.entries(baseHeaders).map(([header, value]) => [header, [value]]) ), // note: using WORKFLOW_ID_HEADER doesn't work, because Runid -> RunId: "Upstash-Workflow-Runid": [workflowRunId], [WORKFLOW_INIT_HEADER]: ["false"], [WORKFLOW_URL_HEADER]: [workflowUrl], "Upstash-Workflow-CallType": ["step"], - "Content-Type": [contentType] - } - } + "Content-Type": [contentType], + }, + }; } - return { headers: baseHeaders}; + return { headers: baseHeaders }; }; export const verifyRequest = async (