diff --git a/examples/ci/app/ci/constants.ts b/examples/ci/app/ci/constants.ts index e532ca2..d7dd79e 100644 --- a/examples/ci/app/ci/constants.ts +++ b/examples/ci/app/ci/constants.ts @@ -76,4 +76,31 @@ export const TEST_ROUTES: Pick[] = [ route: "call/workflow-with-failureUrl", waitForSeconds: 5 }, + + /** + * TEST LARGE PAYLOAD CASES + * + * disabled because they are unpredictable in CI. + * they are checked locally instead. + */ + // { + // route: "large-payload/call-result/workflow", + // waitForSeconds: 9 + // }, + // { + // route: "large-payload/error", + // waitForSeconds: 9 + // }, + // { + // route: "large-payload/initial", + // waitForSeconds: 9 + // }, + // { + // route: "large-payload/step-result", + // waitForSeconds: 6 + // }, + // { + // route: "large-payload/step-result-parallel", + // waitForSeconds: 12 + // }, ] \ No newline at end of file diff --git a/examples/ci/app/test-routes/call/constants.ts b/examples/ci/app/test-routes/call/constants.ts index dfb464f..e065a6c 100644 --- a/examples/ci/app/test-routes/call/constants.ts +++ b/examples/ci/app/test-routes/call/constants.ts @@ -1,3 +1,6 @@ export const FAILING_HEADER = "Fail-Header-Foo" export const FAILING_HEADER_VALUE = "fail-header-value-BAR" + +export const GET_HEADER = "Get-Header" +export const GET_HEADER_VALUE = "get-header-value-FOO" \ No newline at end of file diff --git a/examples/ci/app/test-routes/call/third-party/route.ts b/examples/ci/app/test-routes/call/third-party/route.ts index 5081fef..be6dd11 100644 --- a/examples/ci/app/test-routes/call/third-party/route.ts +++ b/examples/ci/app/test-routes/call/third-party/route.ts @@ -1,11 +1,16 @@ -import { FAILING_HEADER_VALUE, FAILING_HEADER } from "../constants"; +import { FAILING_HEADER_VALUE, FAILING_HEADER, GET_HEADER, GET_HEADER_VALUE } from "../constants"; const thirdPartyResult = "third-party-result"; export const GET = async (request: Request) => { return new Response( `called GET '${thirdPartyResult}' '${request.headers.get("get-header")}'`, - { status: 200 } + { + status: 200, + headers: { + [ GET_HEADER ]: GET_HEADER_VALUE + } + } ) } @@ -13,7 +18,7 @@ export const POST = async (request: Request) => { return new Response( `called POST '${thirdPartyResult}' '${request.headers.get("post-header")}' '${await request.text()}'`, - { status: 200 } + { status: 201 } ) } diff --git a/examples/ci/app/test-routes/call/workflow/route.ts b/examples/ci/app/test-routes/call/workflow/route.ts index 5fe48ee..fb42486 100644 --- a/examples/ci/app/test-routes/call/workflow/route.ts +++ b/examples/ci/app/test-routes/call/workflow/route.ts @@ -2,7 +2,7 @@ import { serve } from "@upstash/workflow/nextjs"; import { BASE_URL, TEST_ROUTE_PREFIX } from "app/ci/constants"; import { testServe, expect } from "app/ci/utils"; import { saveResult } from "app/ci/upstash/redis" -import { FAILING_HEADER, FAILING_HEADER_VALUE } from "../constants"; +import { FAILING_HEADER, FAILING_HEADER_VALUE, GET_HEADER, GET_HEADER_VALUE } from "../constants"; const testHeader = `test-header-foo` const headerValue = `header-foo` @@ -28,7 +28,7 @@ export const { POST, GET } = testServe( expect(context.headers.get(testHeader)!, headerValue) - const { body: postResult } = await context.call("post call", { + const { body: postResult, status: postStatus } = await context.call("post call", { url: thirdPartyEndpoint, method: "POST", body: "post-payload", @@ -37,18 +37,21 @@ export const { POST, GET } = testServe( // check payload after first step because we can't check above expect(input, payload); - + expect(postStatus, 201) + expect(postResult as string, "called POST 'third-party-result' 'post-header-value-x' '\"post-payload\"'" ); - + await context.sleep("sleep 1", 2); - - const { body: getResult } = await context.call("get call", { + + const { body: getResult, header: getHeaders, status: getStatus } = await context.call("get call", { url: thirdPartyEndpoint, headers: getHeader, }); - + + expect(getStatus, 200) + expect(getHeaders[GET_HEADER][0], GET_HEADER_VALUE) expect(getResult as string, "called GET 'third-party-result' 'get-header-value-x'"); const { body: patchResult, status, header } = await context.call("get call", { diff --git a/examples/ci/app/test-routes/large-payload/README.md b/examples/ci/app/test-routes/large-payload/README.md new file mode 100644 index 0000000..f3d26fe --- /dev/null +++ b/examples/ci/app/test-routes/large-payload/README.md @@ -0,0 +1,8 @@ +This directory has a endpoints testing the lazy fetch functionality: +- `call-result`: endpoint called with context.call returns a large payload +- `error`: a large error is thrown. failureFunction is called with the initial body. +- `initial`: workflow is started with a large object +- `step-result`: a step returns a large result +- `step-result-parallel`: a parallel step returns a large result + +In `utils.ts`, you can find the large object used. \ No newline at end of file diff --git a/examples/ci/app/test-routes/large-payload/call-result/third-party/route.ts b/examples/ci/app/test-routes/large-payload/call-result/third-party/route.ts new file mode 100644 index 0000000..a2982af --- /dev/null +++ b/examples/ci/app/test-routes/large-payload/call-result/third-party/route.ts @@ -0,0 +1,13 @@ +import { GET_HEADER, GET_HEADER_VALUE, largeObject } from "../../utils" + +export const GET = async () => { + return new Response( + largeObject, + { + status: 201, + headers: { + [ GET_HEADER ]: GET_HEADER_VALUE + } + } + ) +} \ No newline at end of file diff --git a/examples/ci/app/test-routes/large-payload/call-result/workflow/route.ts b/examples/ci/app/test-routes/large-payload/call-result/workflow/route.ts new file mode 100644 index 0000000..53f6017 --- /dev/null +++ b/examples/ci/app/test-routes/large-payload/call-result/workflow/route.ts @@ -0,0 +1,54 @@ +import { serve } from "@upstash/workflow/nextjs"; +import { BASE_URL, TEST_ROUTE_PREFIX } from "app/ci/constants"; +import { testServe, expect } from "app/ci/utils"; +import { saveResult } from "app/ci/upstash/redis" +import { GET_HEADER, GET_HEADER_VALUE, largeObject, largeObjectLength } from "../../utils"; + +const header = `test-header-foo` +const headerValue = `header-bar` +const payload = "“unicode-quotes”" + +const thirdPartyEndpoint = `${TEST_ROUTE_PREFIX}/large-payload/call-result/third-party` + +export const { POST, GET } = testServe( + serve( + async (context) => { + const input = context.requestPayload; + + expect(context.headers.get(header)!, headerValue) + + const { body: result1, status, header: headers } = await context.call("get large bod", { + url: thirdPartyEndpoint, + method: "GET" + }) + + expect(input, payload); + + expect(status, 201) + expect(result1, largeObject) + expect(result1.length, largeObjectLength) + expect(headers[GET_HEADER][0], GET_HEADER_VALUE) + + const result2 = await context.run("step2", () => { + return result1.length + }); + + expect(result2, largeObjectLength); + + await saveResult( + context, + result2.toString() + ) + }, { + baseUrl: BASE_URL, + retries: 0 + } + ), { + expectedCallCount: 5, + expectedResult: largeObjectLength.toString(), + payload, + headers: { + [ header ]: headerValue + } + } +) diff --git a/examples/ci/app/test-routes/large-payload/error/route.ts b/examples/ci/app/test-routes/large-payload/error/route.ts new file mode 100644 index 0000000..5f5c4ce --- /dev/null +++ b/examples/ci/app/test-routes/large-payload/error/route.ts @@ -0,0 +1,51 @@ +import { serve } from "@upstash/workflow/nextjs"; +import { BASE_URL } from "app/ci/constants"; +import { testServe, expect } from "app/ci/utils"; +import { saveResult } from "app/ci/upstash/redis" +import { largeObject } from "../utils"; +import { WorkflowContext } from "@upstash/workflow"; + +const header = `test-header-foo` +const headerValue = `header-bar` +const payload = "“unicode-quotes”" + +export const { POST, GET } = testServe( + serve( + async (context) => { + const input = context.requestPayload; + + expect(input, payload); + expect(context.headers.get(header)!, headerValue) + + const result1 = await context.run("step1", () => { + return input.length; + }); + + expect(result1, payload.length); + + await context.run("step2", () => { + throw new Error(largeObject) + }); + }, { + baseUrl: BASE_URL, + retries: 0, + async failureFunction({ context, failStatus, failResponse }) { + expect( failResponse, largeObject ) + expect( failStatus, 500 ) + expect( context.requestPayload as string, payload ) + + await saveResult( + context as WorkflowContext, + `super secret` + ) + }, + } + ), { + expectedCallCount: 4, + expectedResult: `super secret`, + payload, + headers: { + [ header ]: headerValue + } + } +) diff --git a/examples/ci/app/test-routes/large-payload/initial/route.ts b/examples/ci/app/test-routes/large-payload/initial/route.ts new file mode 100644 index 0000000..f423be8 --- /dev/null +++ b/examples/ci/app/test-routes/large-payload/initial/route.ts @@ -0,0 +1,56 @@ +import { serve } from "@upstash/workflow/nextjs"; +import { BASE_URL } from "app/ci/constants"; +import { testServe, expect } from "app/ci/utils"; +import { saveResult } from "app/ci/upstash/redis" +import { largeObject } from "../utils"; +import { WorkflowContext } from "@upstash/workflow"; + +const header = `test-header-foo` +const headerValue = `header-bar` +const throws = "throwing-foo" + +export const { POST, GET } = testServe( + serve( + async (context) => { + const input = context.requestPayload; + + expect(input, largeObject); + expect(context.headers.get(header)!, headerValue) + + const result1 = await context.run("step1", () => { + return input.length; + }); + + expect(result1, largeObject.length); + + const result2 = await context.run("step2", () => { + return input + }); + + expect(result2, largeObject); + + await context.run("throws", () => { + throw new Error(throws) + }) + }, { + baseUrl: BASE_URL, + retries: 0, + async failureFunction({ context, failResponse }) { + expect(context.requestPayload as string, largeObject) + expect(failResponse, throws) + + await saveResult( + context as WorkflowContext, + throws + ) + }, + } + ), { + expectedCallCount: 5, + expectedResult: throws, + payload: largeObject, + headers: { + [ header ]: headerValue + } + } +) diff --git a/examples/ci/app/test-routes/large-payload/step-result-parallel/route.ts b/examples/ci/app/test-routes/large-payload/step-result-parallel/route.ts new file mode 100644 index 0000000..821cb11 --- /dev/null +++ b/examples/ci/app/test-routes/large-payload/step-result-parallel/route.ts @@ -0,0 +1,59 @@ +import { serve } from "@upstash/workflow/nextjs"; +import { BASE_URL } from "app/ci/constants"; +import { testServe, expect } from "app/ci/utils"; +import { saveResult } from "app/ci/upstash/redis" +import { largeObject, largeObjectLength } from "../utils"; + +const header = `test-header-foo` +const headerValue = `header-bar` +const payload = "foo" + +export const { POST, GET } = testServe( + serve( + async (context) => { + const input = context.requestPayload; + + expect(input, payload); + expect(context.headers.get(header)!, headerValue) + + const [result1, largeResult1] = await Promise.all([ + context.run("step 1", () => undefined), + context.run("step 2 - large", () => { + return largeObject; + }) + ]) + + expect(typeof result1, "undefined"); + expect(typeof largeResult1, "string"); + expect(largeResult1.length, largeObjectLength); + expect(largeResult1, largeObject); + + const [largeResult2, result2] = await Promise.all([ + context.run("step 3 - large", () => { + return largeObject; + }), + context.run("step 4", () => undefined), + ]) + + expect(typeof result2, "undefined"); + expect(typeof largeResult2, "string"); + expect(largeResult2.length, largeObjectLength); + expect(largeResult2, largeObject); + + await saveResult( + context, + `${largeResult1.length} - ${largeResult2.length}` + ) + }, { + baseUrl: BASE_URL, + retries: 0, + } + ), { + expectedCallCount: 10, + expectedResult: `${largeObjectLength} - ${largeObjectLength}`, + payload, + headers: { + [ header ]: headerValue + } + } +) \ No newline at end of file diff --git a/examples/ci/app/test-routes/large-payload/step-result/route.ts b/examples/ci/app/test-routes/large-payload/step-result/route.ts new file mode 100644 index 0000000..13351ac --- /dev/null +++ b/examples/ci/app/test-routes/large-payload/step-result/route.ts @@ -0,0 +1,49 @@ +import { serve } from "@upstash/workflow/nextjs"; +import { BASE_URL } from "app/ci/constants"; +import { testServe, expect } from "app/ci/utils"; +import { saveResult } from "app/ci/upstash/redis" +import { largeObject, largeObjectLength } from "../utils"; + +const header = `test-header-foo` +const headerValue = `header-bar` +const payload = "foo" + +export const { POST, GET } = testServe( + serve( + async (context) => { + const input = context.requestPayload; + + expect(input, payload); + expect(context.headers.get(header)!, headerValue) + + const result1 = await context.run("step1", () => { + return largeObject; + }); + + expect(result1, largeObject); + expect(typeof result1, "string"); + expect(result1.length, largeObjectLength); + + const result2 = await context.run("step2", () => { + return result1.length; + }); + + expect(result2, largeObjectLength); + + await saveResult( + context, + result2.toString() + ) + }, { + baseUrl: BASE_URL, + retries: 0 + } + ), { + expectedCallCount: 4, + expectedResult: largeObjectLength.toString(), + payload, + headers: { + [ header ]: headerValue + } + } +) \ No newline at end of file diff --git a/examples/ci/app/test-routes/large-payload/utils.ts b/examples/ci/app/test-routes/large-payload/utils.ts new file mode 100644 index 0000000..96898cb --- /dev/null +++ b/examples/ci/app/test-routes/large-payload/utils.ts @@ -0,0 +1,5 @@ +export const largeObjectLength = 5 * 1024 * 1024 +export const largeObject = "x".repeat(largeObjectLength); + +export const GET_HEADER = "Get-Header" +export const GET_HEADER_VALUE = "get-header-value-FOO" \ No newline at end of file diff --git a/examples/ci/app/test-routes/path/route.ts b/examples/ci/app/test-routes/path/route.ts index c74a9ff..8af6d13 100644 --- a/examples/ci/app/test-routes/path/route.ts +++ b/examples/ci/app/test-routes/path/route.ts @@ -47,4 +47,4 @@ export const { POST, GET } = testServe( [ header ]: headerValue } } -) \ No newline at end of file +) diff --git a/examples/ci/app/test-routes/sleepWithoutAwait/route.ts b/examples/ci/app/test-routes/sleepWithoutAwait/route.ts index 171b9b6..791c95f 100644 --- a/examples/ci/app/test-routes/sleepWithoutAwait/route.ts +++ b/examples/ci/app/test-routes/sleepWithoutAwait/route.ts @@ -12,20 +12,20 @@ type Invoice = { type Charge = { invoice: Invoice; success: boolean; + counter: number }; const header = `test-header-foo` const headerValue = `header-bar` const payload: Invoice = { date: 123, email: "my@mail.com", amount: 10 } -let counter = 0; -const attemptCharge = () => { +const attemptCharge = (counter: number) => { counter += 1; if (counter === 3) { counter = 0; - return true; + return { success: true, counter }; } - return false; + return { success: false, counter }; }; export const { POST, GET } = testServe( @@ -36,13 +36,19 @@ export const { POST, GET } = testServe( expect(typeof invoice, typeof payload); expect(JSON.stringify(invoice), JSON.stringify(payload)); + let charge: Charge = { + success: false, + counter: 0, + invoice + } + for (let index = 0; index < 3; index++) { - const charge = await context.run("attemptCharge", () => { - const success = attemptCharge(); - const charge: Charge = { invoice, success }; - return charge; + charge = await context.run("attemptCharge", () => { + const { success, counter } = attemptCharge(charge.counter); + const newCharge: Charge = { invoice, success, counter }; + return newCharge; }); - + if (charge.success) { const [updateDb, receipt, sleepResult] = await Promise.all([ context.run("updateDb", () => { @@ -82,4 +88,4 @@ export const { POST, GET } = testServe( [ header ]: headerValue } } -) \ No newline at end of file +) diff --git a/examples/ci/app/test-routes/wait-for-event/workflow/route.ts b/examples/ci/app/test-routes/wait-for-event/workflow/route.ts index 6edf6c5..fcf1a9a 100644 --- a/examples/ci/app/test-routes/wait-for-event/workflow/route.ts +++ b/examples/ci/app/test-routes/wait-for-event/workflow/route.ts @@ -116,4 +116,4 @@ export const { POST, GET } = testServe( [ header ]: headerValue } } -) \ No newline at end of file +) diff --git a/src/client/index.ts b/src/client/index.ts index 83f3907..269581a 100644 --- a/src/client/index.ts +++ b/src/client/index.ts @@ -30,6 +30,8 @@ export class Client { /** * Cancel an ongoing workflow * + * Returns true if workflow is canceled succesfully. Otherwise, throws error. + * * There are multiple ways you can cancel workflows: * - pass one or more workflow run ids to cancel them * - pass a workflow url to cancel all runs starting with this url diff --git a/src/client/utils.ts b/src/client/utils.ts index ff96501..4b735eb 100644 --- a/src/client/utils.ts +++ b/src/client/utils.ts @@ -1,5 +1,7 @@ import { Client } from "@upstash/qstash"; -import { NotifyResponse, Waiter } from "../types"; +import { NotifyResponse, RawStep, Waiter } from "../types"; +import { WorkflowLogger } from "../logger"; +import { WorkflowError } from "../error"; export const makeNotifyRequest = async ( requester: Client["http"], @@ -25,3 +27,63 @@ export const makeGetWaitersRequest = async ( })) as Required[]; return result; }; + +/** + * Returns true if workflow is canceled succesfully. Otherwise, throws error. + * + * @param requester client.http + * @param workflowRunId workflow to cancel + * @returns true if workflow is canceled + */ +export const makeCancelRequest = async (requester: Client["http"], workflowRunId: string) => { + (await requester.request({ + path: ["v2", "workflows", "runs", `${workflowRunId}?cancel=true`], + method: "DELETE", + parseResponseAsJson: false, + })) as undefined; + return true; +}; + +export const getSteps = async ( + requester: Client["http"], + workflowRunId: string, + messageId?: string, + debug?: WorkflowLogger +): Promise => { + try { + const steps = (await requester.request({ + path: ["v2", "workflows", "runs", workflowRunId], + parseResponseAsJson: true, + })) as RawStep[]; + + if (!messageId) { + await debug?.log("INFO", "ENDPOINT_START", { + message: + `Pulled ${steps.length} steps from QStash` + + `and returned them without filtering with messageId.`, + }); + return steps; + } else { + const index = steps.findIndex((item) => item.messageId === messageId); + + if (index === -1) { + // targetMessageId not found, return an empty array or handle it as needed + return []; + } + + const filteredSteps = steps.slice(0, index + 1); + await debug?.log("INFO", "ENDPOINT_START", { + message: + `Pulled ${steps.length} steps from QStash` + + ` and filtered them to ${filteredSteps.length} using messageId.`, + }); + return filteredSteps; + } + } catch (error) { + await debug?.log("ERROR", "ERROR", { + message: "failed while fetching steps.", + error: error, + }); + throw new WorkflowError(`Failed while pulling steps. ${error}`); + } +}; diff --git a/src/context/auto-executor.test.ts b/src/context/auto-executor.test.ts index 2d5194f..3362644 100644 --- a/src/context/auto-executor.test.ts +++ b/src/context/auto-executor.test.ts @@ -6,7 +6,7 @@ import { MOCK_QSTASH_SERVER_URL, mockQStashServer, WORKFLOW_ENDPOINT } from "../ import { nanoid } from "../utils"; import { AutoExecutor } from "./auto-executor"; import type { Step } from "../types"; -import { QStashWorkflowAbort, QStashWorkflowError } from "../error"; +import { WorkflowAbort, WorkflowError } from "../error"; class SpyAutoExecutor extends AutoExecutor { public declare getParallelCallState; @@ -106,7 +106,7 @@ describe("auto-executor", () => { const throws = context.run("attemptCharge", () => { return { input: context.requestPayload, success: false }; }); - expect(throws).rejects.toThrowError(QStashWorkflowAbort); + expect(throws).rejects.toThrowError(WorkflowAbort); }, responseFields: { status: 200, @@ -121,6 +121,7 @@ describe("auto-executor", () => { destination: WORKFLOW_ENDPOINT, headers: { "content-type": "application/json", + "upstash-feature-set": "LazyFetch,InitialBody", "upstash-forward-upstash-workflow-sdk-version": "1", "upstash-method": "POST", "upstash-retries": "3", @@ -198,7 +199,7 @@ describe("auto-executor", () => { context.sleepUntil("sleep until next day", 123_123), context.waitForEvent("waitEvent", "my-event", "5m"), ]); - expect(throws).rejects.toThrowError(QStashWorkflowAbort); + expect(throws).rejects.toThrowError(WorkflowAbort); }, responseFields: { status: 200, @@ -214,6 +215,7 @@ describe("auto-executor", () => { destination: WORKFLOW_ENDPOINT, headers: { "content-type": "application/json", + "upstash-feature-set": "LazyFetch,InitialBody", "upstash-delay": "123s", "upstash-forward-upstash-workflow-sdk-version": "1", "upstash-method": "POST", @@ -229,6 +231,7 @@ describe("auto-executor", () => { destination: WORKFLOW_ENDPOINT, headers: { "content-type": "application/json", + "upstash-feature-set": "LazyFetch,InitialBody", "upstash-delay": "10m", "upstash-forward-upstash-workflow-sdk-version": "1", "upstash-method": "POST", @@ -244,6 +247,7 @@ describe("auto-executor", () => { destination: WORKFLOW_ENDPOINT, headers: { "content-type": "application/json", + "upstash-feature-set": "LazyFetch,InitialBody", "upstash-forward-upstash-workflow-sdk-version": "1", "upstash-method": "POST", "upstash-retries": "3", @@ -259,6 +263,7 @@ describe("auto-executor", () => { destination: WORKFLOW_ENDPOINT, headers: { "content-type": "application/json", + "upstash-feature-set": "LazyFetch,InitialBody", "upstash-forward-upstash-workflow-sdk-version": "1", "upstash-method": "POST", "upstash-retries": "3", @@ -302,7 +307,7 @@ describe("auto-executor", () => { context.sleep("sleep for some time", 123), context.sleepUntil("sleep until next day", 123_123), ]); - expect(throws).rejects.toThrowError(QStashWorkflowAbort); + expect(throws).rejects.toThrowError(WorkflowAbort); }, responseFields: { status: 200, @@ -317,6 +322,7 @@ describe("auto-executor", () => { destination: WORKFLOW_ENDPOINT, headers: { "content-type": "application/json", + "upstash-feature-set": "LazyFetch,InitialBody", "upstash-forward-upstash-workflow-sdk-version": "1", "upstash-method": "POST", "upstash-retries": "3", @@ -355,7 +361,7 @@ describe("auto-executor", () => { context.sleep("sleep for some time", 123), context.sleepUntil("sleep until next day", 123_123), ]); - expect(throws).rejects.toThrowError(QStashWorkflowAbort); + expect(throws).rejects.toThrowError(WorkflowAbort); }, responseFields: { status: 200, @@ -370,6 +376,7 @@ describe("auto-executor", () => { destination: WORKFLOW_ENDPOINT, headers: { "content-type": "application/json", + "upstash-feature-set": "LazyFetch,InitialBody", "upstash-forward-upstash-workflow-sdk-version": "1", "upstash-method": "POST", "upstash-retries": "3", @@ -408,7 +415,7 @@ describe("auto-executor", () => { context.sleep("sleep for some time", 123), context.sleepUntil("sleep until next day", 123_123), ]); - expect(throws).rejects.toThrowError(QStashWorkflowAbort); + expect(throws).rejects.toThrowError(WorkflowAbort); }, responseFields: { status: 200, @@ -476,7 +483,7 @@ describe("auto-executor", () => { return true; }); expect(throws).rejects.toThrow( - new QStashWorkflowError( + new WorkflowError( "Incompatible step name. Expected 'wrongName', got 'attemptCharge' from the request" ) ); @@ -485,7 +492,7 @@ describe("auto-executor", () => { const context = getContext([initialStep, singleStep]); const throws = context.sleep("attemptCharge", 10); expect(throws).rejects.toThrow( - new QStashWorkflowError( + new WorkflowError( "Incompatible step type. Expected 'SleepFor', got 'Run' from the request" ) ); @@ -502,7 +509,7 @@ describe("auto-executor", () => { context.sleepUntil("sleep until next day", 123_123), ]); expect(throws).rejects.toThrow( - new QStashWorkflowError( + new WorkflowError( "Incompatible step name. Expected 'wrongName', got 'sleep for some time' from the request" ) ); @@ -516,7 +523,7 @@ describe("auto-executor", () => { context.sleepUntil("sleep until next day", 123_123), ]); expect(throws).rejects.toThrow( - new QStashWorkflowError( + new WorkflowError( "Incompatible step type. Expected 'SleepUntil', got 'SleepFor' from the request" ) ); @@ -532,7 +539,7 @@ describe("auto-executor", () => { context.sleep("wrongName", 10), // wrong step name context.sleepUntil("sleep until next day", 123_123), ]); - expect(throws).rejects.toThrowError(QStashWorkflowAbort); + expect(throws).rejects.toThrowError(WorkflowAbort); }); test("step type", () => { const context = getContext([initialStep, ...parallelSteps.slice(0, 3)]); @@ -542,7 +549,7 @@ describe("auto-executor", () => { context.sleepUntil("sleep for some time", 10), // wrong step type context.sleepUntil("sleep until next day", 123_123), ]); - expect(throws).rejects.toThrowError(QStashWorkflowAbort); + expect(throws).rejects.toThrowError(WorkflowAbort); }); }); @@ -556,7 +563,7 @@ describe("auto-executor", () => { context.sleepUntil("sleep until next day", 123_123), ]); expect(throws).rejects.toThrowError( - new QStashWorkflowError( + new WorkflowError( "Incompatible steps detected in parallel execution: Incompatible step name. Expected 'wrongName', got 'sleep for some time' from the request\n" + ' > Step Names from the request: ["sleep for some time","sleep until next day"]\n' + ' Step Types from the request: ["SleepFor","SleepUntil"]\n' + @@ -574,7 +581,7 @@ describe("auto-executor", () => { context.sleepUntil("sleep until next day", 123_123), ]); expect(throws).rejects.toThrowError( - new QStashWorkflowError( + new WorkflowError( "Incompatible steps detected in parallel execution: Incompatible step type. Expected 'SleepUntil', got 'SleepFor' from the request\n" + ' > Step Names from the request: ["sleep for some time","sleep until next day"]\n' + ' Step Types from the request: ["SleepFor","SleepUntil"]\n' + diff --git a/src/context/auto-executor.ts b/src/context/auto-executor.ts index d77f97a..db41df8 100644 --- a/src/context/auto-executor.ts +++ b/src/context/auto-executor.ts @@ -1,4 +1,4 @@ -import { QStashWorkflowAbort, QStashWorkflowError } from "../error"; +import { WorkflowAbort, WorkflowError } from "../error"; import type { WorkflowContext } from "./context"; import type { StepFunction, ParallelCallState, Step, WaitRequest } from "../types"; import { LazyCallStep, type BaseLazyStep } from "./steps"; @@ -39,14 +39,14 @@ export class AutoExecutor { * * If a function is already executing (this.executingStep), this * means that there is a nested step which is not allowed. In this - * case, addStep throws QStashWorkflowError. + * case, addStep throws WorkflowError. * * @param stepInfo step plan to add * @returns result of the step function */ public async addStep(stepInfo: BaseLazyStep) { if (this.executingStep) { - throw new QStashWorkflowError( + throw new WorkflowError( "A step can not be run inside another step." + ` Tried to run '${stepInfo.stepName}' inside '${this.executingStep}'` ); @@ -159,7 +159,7 @@ export class AutoExecutor { if (parallelCallState !== "first" && plannedParallelStepCount !== parallelSteps.length) { // user has added/removed a parallel step - throw new QStashWorkflowError( + throw new WorkflowError( `Incompatible number of parallel steps when call state was '${parallelCallState}'.` + ` Expected ${parallelSteps.length}, got ${plannedParallelStepCount} from the request.` ); @@ -194,7 +194,7 @@ export class AutoExecutor { */ const planStep = this.steps.at(-1); if (!planStep || planStep.targetStep === undefined) { - throw new QStashWorkflowError( + throw new WorkflowError( `There must be a last step and it should have targetStep larger than 0.` + `Received: ${JSON.stringify(planStep)}` ); @@ -218,10 +218,10 @@ export class AutoExecutor { ); await this.submitStepsToQStash([resultStep], [parallelStep]); } catch (error) { - if (error instanceof QStashWorkflowAbort) { + if (error instanceof WorkflowAbort) { throw error; } - throw new QStashWorkflowError( + throw new WorkflowError( `Error submitting steps to QStash in partial parallel step execution: ${error}` ); } @@ -236,7 +236,7 @@ export class AutoExecutor { * This call to the API should be discarded: no operations are to be made. Parallel steps which are still * running will finish and call QStash eventually. */ - throw new QStashWorkflowAbort("discarded parallel"); + throw new WorkflowAbort("discarded parallel"); } case "last": { /** @@ -313,7 +313,7 @@ export class AutoExecutor { private async submitStepsToQStash(steps: Step[], lazySteps: BaseLazyStep[]) { // if there are no steps, something went wrong. Raise exception if (steps.length === 0) { - throw new QStashWorkflowError( + throw new WorkflowError( `Unable to submit steps to QStash. Provided list is empty. Current step: ${this.stepCount}` ); } @@ -360,7 +360,7 @@ export class AutoExecutor { parseResponseAsJson: false, }); - throw new QStashWorkflowAbort(steps[0].stepName, steps[0]); + throw new WorkflowAbort(steps[0].stepName, steps[0]); } const result = await this.context.qstashClient.batchJSON( @@ -418,7 +418,7 @@ export class AutoExecutor { }); // if the steps are sent successfully, abort to stop the current request - throw new QStashWorkflowAbort(steps[0].stepName, steps[0]); + throw new WorkflowAbort(steps[0].stepName, steps[0]); } /** @@ -451,7 +451,7 @@ export class AutoExecutor { ) { return result[index] as TResult; } else { - throw new QStashWorkflowError( + throw new WorkflowError( `Unexpected parallel call result while executing step ${index}: '${result}'. Expected ${lazyStepList.length} many items` ); } @@ -468,7 +468,7 @@ export class AutoExecutor { * from the incoming request; compare the step names and types to make sure * that they are the same. * - * Raises `QStashWorkflowError` if there is a difference. + * Raises `WorkflowError` if there is a difference. * * @param lazyStep lazy step created during execution * @param stepFromRequest step parsed from incoming request @@ -476,14 +476,14 @@ export class AutoExecutor { const validateStep = (lazyStep: BaseLazyStep, stepFromRequest: Step): void => { // check step name if (lazyStep.stepName !== stepFromRequest.stepName) { - throw new QStashWorkflowError( + throw new WorkflowError( `Incompatible step name. Expected '${lazyStep.stepName}',` + ` got '${stepFromRequest.stepName}' from the request` ); } // check type name if (lazyStep.stepType !== stepFromRequest.stepType) { - throw new QStashWorkflowError( + throw new WorkflowError( `Incompatible step type. Expected '${lazyStep.stepType}',` + ` got '${stepFromRequest.stepType}' from the request` ); @@ -494,7 +494,7 @@ const validateStep = (lazyStep: BaseLazyStep, stepFromRequest: Step): void => { * validates that each lazy step and step from request has the same step * name and type using `validateStep` method. * - * If there is a difference, raises `QStashWorkflowError` with information + * If there is a difference, raises `WorkflowError` with information * about the difference. * * @param lazySteps list of lazy steps created during parallel execution @@ -506,12 +506,12 @@ const validateParallelSteps = (lazySteps: BaseLazyStep[], stepsFromRequest: Step validateStep(lazySteps[index], stepFromRequest); } } catch (error) { - if (error instanceof QStashWorkflowError) { + if (error instanceof WorkflowError) { const lazyStepNames = lazySteps.map((lazyStep) => lazyStep.stepName); const lazyStepTypes = lazySteps.map((lazyStep) => lazyStep.stepType); const requestStepNames = stepsFromRequest.map((step) => step.stepName); const requestStepTypes = stepsFromRequest.map((step) => step.stepType); - throw new QStashWorkflowError( + throw new WorkflowError( `Incompatible steps detected in parallel execution: ${error.message}` + `\n > Step Names from the request: ${JSON.stringify(requestStepNames)}` + `\n Step Types from the request: ${JSON.stringify(requestStepTypes)}` + diff --git a/src/context/context.test.ts b/src/context/context.test.ts index 0244fb2..1772a77 100644 --- a/src/context/context.test.ts +++ b/src/context/context.test.ts @@ -4,7 +4,7 @@ import { MOCK_QSTASH_SERVER_URL, mockQStashServer, WORKFLOW_ENDPOINT } from "../ import { WorkflowContext } from "./context"; import { Client } from "@upstash/qstash"; import { nanoid } from "../utils"; -import { QStashWorkflowError } from "../error"; +import { WorkflowAbort, WorkflowError } from "../error"; import { WORKFLOW_ID_HEADER, WORKFLOW_INIT_HEADER, @@ -33,7 +33,7 @@ describe("context tests", () => { }); }; expect(throws).toThrow( - new QStashWorkflowError( + new WorkflowError( "A step can not be run inside another step. Tried to run 'inner step' inside 'outer step'" ) ); @@ -55,7 +55,7 @@ describe("context tests", () => { }); }; expect(throws).toThrow( - new QStashWorkflowError( + new WorkflowError( "A step can not be run inside another step. Tried to run 'inner sleep' inside 'outer step'" ) ); @@ -77,7 +77,7 @@ describe("context tests", () => { }); }; expect(throws).toThrow( - new QStashWorkflowError( + new WorkflowError( "A step can not be run inside another step. Tried to run 'inner sleepUntil' inside 'outer step'" ) ); @@ -99,7 +99,7 @@ describe("context tests", () => { }); }; expect(throws).toThrow( - new QStashWorkflowError( + new WorkflowError( "A step can not be run inside another step. Tried to run 'inner call' inside 'outer step'" ) ); @@ -138,6 +138,7 @@ describe("context tests", () => { destination: WORKFLOW_ENDPOINT, headers: { "content-type": "application/json", + "upstash-feature-set": "LazyFetch,InitialBody", "upstash-forward-upstash-workflow-sdk-version": "1", "upstash-method": "POST", "upstash-retries": "2", @@ -187,6 +188,7 @@ describe("context tests", () => { timeout: "20s", timeoutHeaders: { "Content-Type": ["application/json"], + "Upstash-Feature-Set": ["LazyFetch,InitialBody"], [`Upstash-Forward-${WORKFLOW_PROTOCOL_VERSION_HEADER}`]: ["1"], "Upstash-Retries": ["3"], "Upstash-Failure-Callback-Retries": ["3"], @@ -237,6 +239,7 @@ describe("context tests", () => { destination: WORKFLOW_ENDPOINT, headers: { "content-type": "application/json", + "upstash-feature-set": "LazyFetch,InitialBody", "upstash-forward-upstash-workflow-sdk-version": "1", "upstash-method": "POST", "upstash-retries": "3", @@ -252,6 +255,7 @@ describe("context tests", () => { destination: WORKFLOW_ENDPOINT, headers: { "content-type": "application/json", + "upstash-feature-set": "LazyFetch,InitialBody", "upstash-forward-upstash-workflow-sdk-version": "1", "upstash-method": "POST", "upstash-retries": "3", @@ -307,6 +311,7 @@ describe("context tests", () => { headers: { "content-type": "application/json", "upstash-callback": WORKFLOW_ENDPOINT, + "upstash-callback-feature-set": "LazyFetch,InitialBody", "upstash-callback-forward-upstash-workflow-callback": "true", "upstash-callback-forward-upstash-workflow-concurrent": "1", "upstash-callback-forward-upstash-workflow-contenttype": "application/json", @@ -319,7 +324,7 @@ describe("context tests", () => { "upstash-callback-workflow-runid": "wfr-id", "upstash-callback-workflow-url": WORKFLOW_ENDPOINT, "upstash-failure-callback-retries": "3", - "upstash-feature-set": "WF_NoDelete", + "upstash-feature-set": "WF_NoDelete,InitialBody", "upstash-forward-my-header": "my-value", "upstash-method": "PATCH", "upstash-retries": retries.toString(), @@ -369,6 +374,7 @@ describe("context tests", () => { headers: { "content-type": "application/json", "upstash-callback": WORKFLOW_ENDPOINT, + "upstash-callback-feature-set": "LazyFetch,InitialBody", "upstash-callback-forward-upstash-workflow-callback": "true", "upstash-callback-forward-upstash-workflow-concurrent": "1", "upstash-callback-forward-upstash-workflow-contenttype": "application/json", @@ -381,7 +387,7 @@ describe("context tests", () => { "upstash-callback-workflow-runid": "wfr-id", "upstash-callback-workflow-url": WORKFLOW_ENDPOINT, "upstash-failure-callback-retries": "3", - "upstash-feature-set": "WF_NoDelete", + "upstash-feature-set": "WF_NoDelete,InitialBody", "upstash-forward-my-header": "my-value", "upstash-method": "PATCH", "upstash-retries": "0", @@ -396,4 +402,26 @@ describe("context tests", () => { }); }); }); + + test("cancel should throw abort with cleanup: true", async () => { + const context = new WorkflowContext({ + qstashClient, + initialPayload: "my-payload", + steps: [], + url: WORKFLOW_ENDPOINT, + headers: new Headers() as Headers, + workflowRunId: "wfr-id", + }); + try { + await context.cancel(); + } catch (error) { + expect(error instanceof WorkflowAbort).toBeTrue(); + const _error = error as WorkflowAbort; + expect(_error.stepName).toBe("cancel"); + expect(_error.name).toBe("WorkflowAbort"); + expect(_error.cancelWorkflow).toBeTrue(); + return; + } + throw new Error("Test error: context.cancel should have thrown abort error."); + }); }); diff --git a/src/context/context.ts b/src/context/context.ts index e77a33b..2986f73 100644 --- a/src/context/context.ts +++ b/src/context/context.ts @@ -13,6 +13,7 @@ import { import type { HTTPMethods } from "@upstash/qstash"; import type { WorkflowLogger } from "../logger"; import { DEFAULT_RETRIES } from "../constants"; +import { WorkflowAbort } from "../error"; import type { Duration } from "../types"; /** @@ -117,10 +118,6 @@ export class WorkflowContext { * headers of the initial request */ public readonly headers: Headers; - /** - * initial payload as a raw string - */ - public readonly rawInitialPayload: string; /** * Map of environment variables and their values. * @@ -156,7 +153,6 @@ export class WorkflowContext { failureUrl, debug, initialPayload, - rawInitialPayload, env, retries, }: { @@ -168,7 +164,6 @@ export class WorkflowContext { failureUrl?: string; debug?: WorkflowLogger; initialPayload: TInitialPayload; - rawInitialPayload?: string; // optional for tests env?: Record; retries?: number; }) { @@ -179,7 +174,6 @@ export class WorkflowContext { this.failureUrl = failureUrl; this.headers = headers; this.requestPayload = initialPayload; - this.rawInitialPayload = rawInitialPayload ?? JSON.stringify(this.requestPayload); this.env = env ?? {}; this.retries = retries ?? DEFAULT_RETRIES; @@ -202,7 +196,7 @@ export class WorkflowContext { * const [result1, result2] = await Promise.all([ * context.run("step 1", () => { * return "result1" - * }) + * }), * context.run("step 2", async () => { * return await fetchResults() * }) @@ -225,6 +219,10 @@ export class WorkflowContext { /** * Stops the execution for the duration provided. * + * ```typescript + * await context.sleep('sleep1', 3) // wait for three seconds + * ``` + * * @param stepName * @param duration sleep duration in seconds * @returns undefined @@ -236,6 +234,10 @@ export class WorkflowContext { /** * Stops the execution until the date time provided. * + * ```typescript + * await context.sleepUntil('sleep1', Date.now() / 1000 + 3) // wait for three seconds + * ``` + * * @param stepName * @param datetime time to sleep until. Can be provided as a number (in unix seconds), * as a Date object or a string (passed to `new Date(datetimeString)`) @@ -262,7 +264,7 @@ export class WorkflowContext { * const { status, body } = await context.call( * "post call step", * { - * url: `https://www.some-endpoint.com/api`, + * url: "https://www.some-endpoint.com/api", * method: "POST", * body: "my-payload" * } @@ -358,6 +360,8 @@ export class WorkflowContext { * }) * ``` * + * Alternatively, you can use the `context.notify` method. + * * @param stepName * @param eventId event id to wake up the waiting workflow run * @param timeout timeout duration in seconds @@ -388,6 +392,27 @@ export class WorkflowContext { } } + /** + * Notify workflow runs waiting for an event + * + * ```ts + * const { eventId, eventData, notifyResponse } = await context.notify( + * "notify step", "event-id", "event-data" + * ); + * ``` + * + * Upon `context.notify`, the workflow runs waiting for the given eventId (context.waitForEvent) + * will receive the given event data and resume execution. + * + * The response includes the same eventId and eventData. Additionally, there is + * a notifyResponse field which contains a list of `Waiter` objects, each corresponding + * to a notified workflow run. + * + * @param stepName + * @param eventId event id to notify + * @param eventData event data to notify with + * @returns notify response which has event id, event data and list of waiters which were notified + */ public async notify( stepName: string, eventId: string, @@ -407,6 +432,17 @@ export class WorkflowContext { } } + /** + * Cancel the current workflow run + * + * Will throw WorkflowAbort to stop workflow execution. + * Shouldn't be inside try/catch. + */ + public async cancel() { + // throw an abort which will make the workflow cancel + throw new WorkflowAbort("cancel", undefined, true); + } + /** * Adds steps to the executor. Needed so that it can be overwritten in * DisabledWorkflowContext. diff --git a/src/error.ts b/src/error.ts index 2dfc106..dca6e7c 100644 --- a/src/error.ts +++ b/src/error.ts @@ -4,29 +4,41 @@ import type { FailureFunctionPayload, Step } from "./types"; /** * Error raised during Workflow execution */ -export class QStashWorkflowError extends QstashError { +export class WorkflowError extends QstashError { constructor(message: string) { super(message); - this.name = "QStashWorkflowError"; + this.name = "WorkflowError"; } } /** - * Raised when the workflow executes a function and aborts + * Raised when the workflow executes a function successfully + * and aborts to end the execution */ -export class QStashWorkflowAbort extends Error { +export class WorkflowAbort extends Error { public stepInfo?: Step; public stepName: string; + /** + * whether workflow is to be canceled on abort + */ + public cancelWorkflow: boolean; - constructor(stepName: string, stepInfo?: Step) { + /** + * + * @param stepName name of the aborting step + * @param stepInfo step information + * @param cancelWorkflow + */ + constructor(stepName: string, stepInfo?: Step, cancelWorkflow = false) { super( "This is an Upstash Workflow error thrown after a step executes. It is expected to be raised." + " Make sure that you await for each step. Also, if you are using try/catch blocks, you should not wrap context.run/sleep/sleepUntil/call methods with try/catch." + ` Aborting workflow after executing step '${stepName}'.` ); - this.name = "QStashWorkflowAbort"; + this.name = "WorkflowAbort"; this.stepName = stepName; this.stepInfo = stepInfo; + this.cancelWorkflow = cancelWorkflow; } } diff --git a/src/index.ts b/src/index.ts index f0fd609..7c868e2 100644 --- a/src/index.ts +++ b/src/index.ts @@ -3,4 +3,4 @@ export * from "./context"; export * from "./types"; export * from "./logger"; export * from "./client"; -export { QStashWorkflowError, QStashWorkflowAbort } from "./error"; +export { WorkflowError, WorkflowAbort } from "./error"; diff --git a/src/integration.test.ts b/src/integration.test.ts index 90aa307..370d542 100644 --- a/src/integration.test.ts +++ b/src/integration.test.ts @@ -76,7 +76,7 @@ type Charge = { success: boolean; }; -class FinishState { +export class FinishState { public finished = false; public finish() { this.finished = true; @@ -755,4 +755,180 @@ describe.skip("live serve tests", () => { ); }); }); + + test( + "cancel workflow", + async () => { + const finishState = new FinishState(); + await testEndpoint({ + finalCount: 3, + waitFor: 7000, + initialPayload: "my-payload", + finishState, + routeFunction: async (context) => { + const input = context.requestPayload; + expect(input).toBe("my-payload"); + + await context.sleep("sleep", 1); + + finishState.finish(); + await context.cancel(); + + throw new Error("shouldn't reach here"); + }, + }); + }, + { + timeout: 10_000, + } + ); + + describe.skip("lazy fetch", () => { + // create 5 mb payload. + // lazy fetch will become enabled for payloads larger than 3mb + const largeObject = "x".repeat(4 * 1024 * 1024); + + test( + "large payload", + async () => { + const finishState = new FinishState(); + await testEndpoint({ + finalCount: 3, + waitFor: 7000, + initialPayload: largeObject, + finishState, + routeFunction: async (context) => { + const input = context.requestPayload; + + expect(input).toBe(largeObject); + + const result = await context.run("step1", async () => { + return "step-1-result"; + }); + expect(result).toBe("step-1-result"); + + finishState.finish(); + }, + }); + }, + { + timeout: 10_000, + } + ); + test( + "large parallel step response", + async () => { + const finishState = new FinishState(); + await testEndpoint({ + finalCount: 11, + waitFor: 7000, + initialPayload: "my-payload", + finishState, + routeFunction: async (context) => { + const input = context.requestPayload; + + expect(input).toBe("my-payload"); + + const results = await Promise.all([ + context.run("step1", () => { + return largeObject; + }), + context.sleep("sleep1", 1), + context.run("step2", () => { + return largeObject; + }), + context.sleep("sleep2", 1), + ]); + + expect(results[0]).toBe(largeObject); + expect(results[1]).toBe(undefined); + expect(results[2]).toBe(largeObject); + expect(results[3]).toBe(undefined); + + await context.sleep("check", 1); + + finishState.finish(); + }, + }); + }, + { + timeout: 10_000, + } + ); + + test.skip( + "large error", + async () => { + const finishState = new FinishState(); + await testEndpoint({ + finalCount: 3, + waitFor: 7000, + initialPayload: "my-payload", + finishState, + retries: 0, + routeFunction: async (context) => { + const input = context.requestPayload; + + expect(input).toBe("my-payload"); + + await context.run("step1", async () => { + throw new Error(largeObject); + }); + }, + failureFunction({ failResponse }) { + expect(failResponse).toBe(largeObject); + finishState.finish(); + }, + }); + }, + { + timeout: 10_000, + } + ); + + test( + "large call response", + async () => { + const thirdPartyServer = serve({ + async fetch() { + return new Response(largeObject, { status: 200 }); + }, + port: THIRD_PARTY_PORT, + }); + + const finishState = new FinishState(); + await testEndpoint({ + finalCount: 6, + waitFor: 9000, + initialPayload: "my-payload", + finishState, + routeFunction: async (context) => { + // sleeping to avoid checking input before the first step + await context.sleep("sleeping", 1); + + const input = context.requestPayload; + expect(input).toBe("my-payload"); + + const { status, body } = await context.call("call to large object", { + url: LOCAL_THIRD_PARTY_URL, + body: input, + method: "POST", + }); + + expect(status).toBe(200); + expect(body).toBe(largeObject); + + await context.sleep("sleep", 1); + + finishState.finish(); + }, + }); + + thirdPartyServer.stop(); + }, + { + timeout: 10_000, + } + ); + }); }); diff --git a/src/serve/authorization.test.ts b/src/serve/authorization.test.ts index 6ce4476..bc3c09f 100644 --- a/src/serve/authorization.test.ts +++ b/src/serve/authorization.test.ts @@ -4,7 +4,7 @@ import { MOCK_QSTASH_SERVER_URL, mockQStashServer, WORKFLOW_ENDPOINT } from "../ import { WorkflowContext } from "../context"; import { Client } from "@upstash/qstash"; import { nanoid } from "../utils"; -import { QStashWorkflowAbort } from "../error"; +import { WorkflowAbort } from "../error"; import type { RouteFunction } from "../types"; import { DisabledWorkflowContext } from "./authorization"; @@ -27,7 +27,7 @@ describe("disabled workflow context", () => { const throws = disabledContext.run("run-step", () => { return 1; }); - expect(throws).rejects.toThrow(QStashWorkflowAbort); + expect(throws).rejects.toThrow(WorkflowAbort); called = true; }, responseFields: { @@ -43,7 +43,7 @@ describe("disabled workflow context", () => { await mockQStashServer({ execute: () => { const throws = disabledContext.sleep("sleep-step", 1); - expect(throws).rejects.toThrow(QStashWorkflowAbort); + expect(throws).rejects.toThrow(WorkflowAbort); called = true; }, responseFields: { @@ -59,7 +59,7 @@ describe("disabled workflow context", () => { await mockQStashServer({ execute: () => { const throws = disabledContext.sleepUntil("sleepUntil-step", 1); - expect(throws).rejects.toThrow(QStashWorkflowAbort); + expect(throws).rejects.toThrow(WorkflowAbort); called = true; }, responseFields: { @@ -75,7 +75,7 @@ describe("disabled workflow context", () => { await mockQStashServer({ execute: () => { const throws = disabledContext.call("call-step", { url: "some-url" }); - expect(throws).rejects.toThrow(QStashWorkflowAbort); + expect(throws).rejects.toThrow(WorkflowAbort); called = true; }, responseFields: { @@ -86,6 +86,11 @@ describe("disabled workflow context", () => { }); expect(called).toBeTrue(); }); + + test("shouldn't throw on cancel", () => { + const dontThrow = disabledContext.cancel; + expect(dontThrow).not.toThrowError(); + }); }); describe("tryAuthentication", () => { @@ -184,7 +189,7 @@ describe("disabled workflow context", () => { const throws = context.run("step", async () => { return await Promise.resolve("result"); }); - expect(throws).rejects.toThrowError(QStashWorkflowAbort); + expect(throws).rejects.toThrowError(WorkflowAbort); called = true; }, responseFields: { @@ -207,6 +212,7 @@ describe("disabled workflow context", () => { destination: WORKFLOW_ENDPOINT, headers: { "content-type": "application/json", + "upstash-feature-set": "LazyFetch,InitialBody", "upstash-forward-upstash-workflow-sdk-version": "1", "upstash-method": "POST", "upstash-retries": "0", @@ -238,7 +244,7 @@ describe("disabled workflow context", () => { const throws = context.run("step", () => { return Promise.resolve("result"); }); - expect(throws).rejects.toThrowError(QStashWorkflowAbort); + expect(throws).rejects.toThrowError(WorkflowAbort); called = true; }, responseFields: { @@ -261,6 +267,7 @@ describe("disabled workflow context", () => { destination: WORKFLOW_ENDPOINT, headers: { "content-type": "application/json", + "upstash-feature-set": "LazyFetch,InitialBody", "upstash-forward-upstash-workflow-sdk-version": "1", "upstash-method": "POST", "upstash-retries": "3", @@ -292,7 +299,7 @@ describe("disabled workflow context", () => { const throws = context.run("step", () => { return "result"; }); - expect(throws).rejects.toThrowError(QStashWorkflowAbort); + expect(throws).rejects.toThrowError(WorkflowAbort); called = true; called = true; }, @@ -316,6 +323,7 @@ describe("disabled workflow context", () => { destination: WORKFLOW_ENDPOINT, headers: { "content-type": "application/json", + "upstash-feature-set": "LazyFetch,InitialBody", "upstash-forward-upstash-workflow-sdk-version": "1", "upstash-method": "POST", "upstash-retries": "3", diff --git a/src/serve/authorization.ts b/src/serve/authorization.ts index 6efd782..d87ffaf 100644 --- a/src/serve/authorization.ts +++ b/src/serve/authorization.ts @@ -1,13 +1,13 @@ import type { Err, Ok } from "neverthrow"; import { err, ok } from "neverthrow"; -import { QStashWorkflowAbort } from "../error"; +import { WorkflowAbort } from "../error"; import { RouteFunction } from "../types"; import { WorkflowContext } from "../context"; import { BaseLazyStep } from "../context/steps"; import { Client } from "@upstash/qstash"; /** - * Workflow context which throws QStashWorkflowAbort before running the steps. + * Workflow context which throws WorkflowAbort before running the steps. * * Used for making a dry run before running any steps to check authentication. * @@ -37,7 +37,7 @@ export class DisabledWorkflowContext< private static readonly disabledMessage = "disabled-qstash-worklfow-run"; /** - * overwrite the WorkflowContext.addStep method to always raise QStashWorkflowAbort + * overwrite the WorkflowContext.addStep method to always raise WorkflowAbort * error in order to stop the execution whenever we encounter a step. * * @param _step @@ -46,7 +46,14 @@ export class DisabledWorkflowContext< // eslint-disable-next-line @typescript-eslint/no-unused-vars _step: BaseLazyStep ): Promise { - throw new QStashWorkflowAbort(DisabledWorkflowContext.disabledMessage); + throw new WorkflowAbort(DisabledWorkflowContext.disabledMessage); + } + + /** + * overwrite cancel method to do nothing + */ + public async cancel() { + return; } /** @@ -75,7 +82,6 @@ export class DisabledWorkflowContext< url: context.url, failureUrl: context.failureUrl, initialPayload: context.requestPayload, - rawInitialPayload: context.rawInitialPayload, env: context.env, retries: context.retries, }); @@ -83,7 +89,7 @@ export class DisabledWorkflowContext< try { await routeFunction(disabledContext); } catch (error) { - if (error instanceof QStashWorkflowAbort && error.stepName === this.disabledMessage) { + if (error instanceof WorkflowAbort && error.stepName === this.disabledMessage) { return ok("step-found"); } return err(error as Error); diff --git a/src/serve/index.ts b/src/serve/index.ts index d05817f..670b407 100644 --- a/src/serve/index.ts +++ b/src/serve/index.ts @@ -1,3 +1,4 @@ +import { makeCancelRequest } from "../client/utils"; import { WorkflowContext } from "../context"; import { formatWorkflowError } from "../error"; import { WorkflowLogger } from "../logger"; @@ -79,6 +80,9 @@ export const serve = < const { rawInitialPayload, steps, isLastDuplicate } = await parseRequest( requestPayload, isFirstInvocation, + workflowRunId, + qstashClient.http, + request.headers.get("upstash-message-id")!, debug ); @@ -109,7 +113,6 @@ export const serve = < qstashClient, workflowRunId, initialPayload: initialPayloadParser(rawInitialPayload), - rawInitialPayload, headers: recreateUserHeaders(request.headers as Headers), steps, url: workflowUrl, @@ -159,6 +162,9 @@ export const serve = < onCleanup: async () => { await triggerWorkflowDelete(workflowContext, debug); }, + onCancel: async () => { + await makeCancelRequest(workflowContext.qstashClient.http, workflowRunId); + }, }); if (result.isErr()) { diff --git a/src/serve/serve.test.ts b/src/serve/serve.test.ts index 6f5e4ef..496117f 100644 --- a/src/serve/serve.test.ts +++ b/src/serve/serve.test.ts @@ -133,6 +133,7 @@ describe("serve", () => { destination: WORKFLOW_ENDPOINT, headers: { "content-type": "application/json", + "upstash-feature-set": "LazyFetch,InitialBody", "upstash-forward-upstash-workflow-sdk-version": "1", "upstash-retries": "3", "upstash-failure-callback-retries": "3", @@ -160,6 +161,7 @@ describe("serve", () => { destination: WORKFLOW_ENDPOINT, headers: { "content-type": "application/json", + "upstash-feature-set": "LazyFetch,InitialBody", "upstash-forward-upstash-workflow-sdk-version": "1", "upstash-method": "POST", "upstash-retries": "3", @@ -334,6 +336,7 @@ describe("serve", () => { destination: WORKFLOW_ENDPOINT, headers: { "content-type": "application/json", + "upstash-feature-set": "LazyFetch,InitialBody", "upstash-forward-upstash-workflow-sdk-version": "1", "upstash-method": "POST", "upstash-retries": "3", @@ -379,6 +382,7 @@ describe("serve", () => { destination: WORKFLOW_ENDPOINT, headers: { "content-type": "application/json", + "upstash-feature-set": "LazyFetch,InitialBody", "upstash-delay": "1s", "upstash-forward-upstash-workflow-sdk-version": "1", "upstash-method": "POST", @@ -421,6 +425,7 @@ describe("serve", () => { destination: WORKFLOW_ENDPOINT, headers: { "content-type": "application/json", + "upstash-feature-set": "LazyFetch,InitialBody", "upstash-delay": "1s", "upstash-forward-upstash-workflow-sdk-version": "1", "upstash-method": "POST", @@ -467,6 +472,7 @@ describe("serve", () => { destination: WORKFLOW_ENDPOINT, headers: { "content-type": "application/json", + "upstash-feature-set": "LazyFetch,InitialBody", "upstash-delay": "1s", "upstash-forward-upstash-workflow-sdk-version": "1", "upstash-method": "POST", @@ -569,7 +575,8 @@ describe("serve", () => { }, } ); - await endpoint(request); + const response = await endpoint(request); + expect(response.status).toBe(200); expect(called).toBeTrue(); }); @@ -595,6 +602,41 @@ describe("serve", () => { expect(receiver).toBeDefined(); }); + test("should call qstash to cancel workflow on context.cancel", async () => { + const request = getRequest(WORKFLOW_ENDPOINT, "wfr-foo", "my-payload", []); + let called = false; + let runs = false; + const { handler: endpoint } = serve( + async (context) => { + called = true; + await context.cancel(); + await context.run("wont run", () => { + runs = true; + }); + }, + { + qstashClient, + receiver: undefined, + verbose: true, + } + ); + + await mockQStashServer({ + execute: async () => { + const response = await endpoint(request); + expect(response.status).toBe(200); + }, + responseFields: { body: undefined, status: 200 }, + receivesRequest: { + method: "DELETE", + url: `${MOCK_QSTASH_SERVER_URL}/v2/workflows/runs/wfr-foo?cancel=true`, + token, + }, + }); + expect(called).toBeTrue(); + expect(runs).toBeFalse(); + }); + test("should send waitForEvent", async () => { const request = getRequest(WORKFLOW_ENDPOINT, "wfr-bar", "my-payload", []); const { handler: endpoint } = serve( @@ -628,6 +670,7 @@ describe("serve", () => { timeout: "10d", timeoutHeaders: { "Content-Type": ["application/json"], + "Upstash-Feature-Set": ["LazyFetch,InitialBody"], "Upstash-Forward-Upstash-Workflow-Sdk-Version": ["1"], "Upstash-Retries": ["3"], "Upstash-Failure-Callback-Retries": ["3"], diff --git a/src/test-utils.ts b/src/test-utils.ts index 7f0828b..bc32020 100644 --- a/src/test-utils.ts +++ b/src/test-utils.ts @@ -83,7 +83,7 @@ export const mockQStashServer = async ({ } } catch (error) { if (error instanceof Error) { - console.error("Assertion error:", error.message); + console.error(error); return new Response(`assertion in mock QStash failed.`, { status: 400, }); diff --git a/src/workflow-parser.test.ts b/src/workflow-parser.test.ts index fd23311..740d50b 100644 --- a/src/workflow-parser.test.ts +++ b/src/workflow-parser.test.ts @@ -10,10 +10,16 @@ import { } from "./constants"; import { nanoid } from "./utils"; import type { RawStep, Step, WaitStepResponse, WorkflowServeOptions } from "./types"; -import { getRequest, WORKFLOW_ENDPOINT } from "./test-utils"; -import { formatWorkflowError, QStashWorkflowError } from "./error"; +import { + getRequest, + MOCK_QSTASH_SERVER_URL, + mockQStashServer, + WORKFLOW_ENDPOINT, +} from "./test-utils"; +import { formatWorkflowError, WorkflowError } from "./error"; import { Client } from "@upstash/qstash"; import { processOptions } from "./serve/options"; +import { FinishState } from "./integration.test"; describe("Workflow Parser", () => { describe("validateRequest", () => { @@ -52,7 +58,7 @@ describe("Workflow Parser", () => { }); const throws = () => validateRequest(request); - expect(throws).toThrow(new QStashWorkflowError("Couldn't get workflow id from header")); + expect(throws).toThrow(new WorkflowError("Couldn't get workflow id from header")); }); test("should throw when protocol version is incompatible", () => { @@ -65,7 +71,7 @@ describe("Workflow Parser", () => { const throws = () => validateRequest(request); expect(throws).toThrow( - new QStashWorkflowError( + new WorkflowError( `Incompatible workflow sdk protocol version.` + ` Expected ${WORKFLOW_PROTOCOL_VERSION}, got ${requestProtocol} from the request.` ) @@ -88,6 +94,10 @@ describe("Workflow Parser", () => { }); describe("parseRequest", () => { + const token = nanoid(); + const workflowRunId = nanoid(); + const qstashClient = new Client({ baseUrl: MOCK_QSTASH_SERVER_URL, token }); + test("should handle first invocation", async () => { const payload = { initial: "payload" }; const rawPayload = JSON.stringify(payload); @@ -95,28 +105,76 @@ describe("Workflow Parser", () => { body: rawPayload, }); + const finised = new FinishState(); const requestPayload = (await getPayload(request)) ?? ""; - const { rawInitialPayload, steps, isLastDuplicate } = await parseRequest( - requestPayload, - true - ); - - // payload isn't parsed - expect(typeof rawInitialPayload).toBe("string"); - expect(rawInitialPayload).toBe(rawPayload); - // steps are empty: - expect(steps).toEqual([]); - expect(isLastDuplicate).toBeFalse(); + await mockQStashServer({ + execute: async () => { + const { rawInitialPayload, steps, isLastDuplicate } = await parseRequest( + requestPayload, + true, + workflowRunId, + qstashClient.http + ); + + // payload isn't parsed + expect(typeof rawInitialPayload).toBe("string"); + expect(rawInitialPayload).toBe(rawPayload); + // steps are empty: + expect(steps).toEqual([]); + expect(isLastDuplicate).toBeFalse(); + finised.finish(); + }, + // shouldn't call get steps + receivesRequest: false, + responseFields: { + body: {}, + status: 200, + }, + }); + finised.check(); }); - test("should throw when not first invocation and body is missing", async () => { + test("should fetch steps when not first invocation and body is missing", async () => { + const payload = "my-payload"; const request = new Request(WORKFLOW_ENDPOINT); const requestPayload = (await getPayload(request)) ?? ""; - const throws = parseRequest(requestPayload, false); - expect(throws).rejects.toThrow( - new QStashWorkflowError("Only first call can have an empty body") - ); + const finised = new FinishState(); + + const responseBody: RawStep[] = [ + { + messageId: "msg-id", + body: btoa(JSON.stringify(payload)), + callType: "step", + }, + ]; + await mockQStashServer({ + execute: async () => { + const result = await parseRequest( + requestPayload, + false, + workflowRunId, + qstashClient.http + ); + expect(result.rawInitialPayload).toBe(JSON.stringify(payload)); + expect(result.steps.length).toBe(1); + expect(result.steps[0].out).toBe(JSON.stringify(payload)); + finised.finish(); + }, + // should call get steps + receivesRequest: { + headers: {}, + method: "GET", + token, + url: `${MOCK_QSTASH_SERVER_URL}/v2/workflows/runs/${workflowRunId}`, + }, + responseFields: { + body: responseBody, + status: 200, + }, + }); + + finised.check(); }); test("should return steps and initial payload correctly", async () => { @@ -143,7 +201,9 @@ describe("Workflow Parser", () => { const requestPayload = (await getPayload(request)) ?? ""; const { rawInitialPayload, steps, isLastDuplicate } = await parseRequest( requestPayload, - false + false, + workflowRunId, + qstashClient.http ); // payload is not parsed @@ -216,7 +276,9 @@ describe("Workflow Parser", () => { const requestPayload = (await getPayload(request)) ?? ""; const { rawInitialPayload, steps, isLastDuplicate } = await parseRequest( requestPayload, - false + false, + workflowRunId, + qstashClient.http ); expect(rawInitialPayload).toBe(reqiestInitialPayload); @@ -270,7 +332,9 @@ describe("Workflow Parser", () => { const { rawInitialPayload, steps, isLastDuplicate } = await parseRequest( JSON.stringify(payload), - false + false, + workflowRunId, + qstashClient.http ); expect(rawInitialPayload).toBe("initial"); @@ -294,6 +358,10 @@ describe("Workflow Parser", () => { }); describe("parseRequest with duplicates", () => { + const token = nanoid(); + const workflowRunId = nanoid(); + const qstashClient = new Client({ baseUrl: MOCK_QSTASH_SERVER_URL, token }); + const requestPayload = "myPayload"; const initStep: Step = { stepId: 0, @@ -302,7 +370,6 @@ describe("Workflow Parser", () => { out: requestPayload, concurrent: 1, }; - const workflowId = "wfr-foo"; test("should ignore extra init steps", async () => { // prettier-ignore @@ -311,12 +378,14 @@ describe("Workflow Parser", () => { { stepId: 1, stepName: "retrySleep", stepType: "SleepFor", sleepFor: 1_000_000, concurrent: 1 }, ] - const request = getRequest(WORKFLOW_ENDPOINT, workflowId, requestPayload, requestSteps); + const request = getRequest(WORKFLOW_ENDPOINT, workflowRunId, requestPayload, requestSteps); const requestFromPayload = (await getPayload(request)) ?? ""; const { rawInitialPayload, steps, isLastDuplicate } = await parseRequest( requestFromPayload, - false + false, + workflowRunId, + qstashClient.http ); expect(rawInitialPayload).toBe(requestPayload); @@ -339,12 +408,14 @@ describe("Workflow Parser", () => { { stepId: 0, stepName: "successStep2", stepType: "Run", concurrent: 2, targetStep: 5 }, // duplicate ] - const request = getRequest(WORKFLOW_ENDPOINT, workflowId, requestPayload, requestSteps); + const request = getRequest(WORKFLOW_ENDPOINT, workflowRunId, requestPayload, requestSteps); const requestFromPayload = (await getPayload(request)) ?? ""; const { rawInitialPayload, steps, isLastDuplicate } = await parseRequest( requestFromPayload, - false + false, + workflowRunId, + qstashClient.http ); expect(rawInitialPayload).toBe(requestPayload); @@ -372,12 +443,14 @@ describe("Workflow Parser", () => { { stepId: 0, stepName: "successStep2", stepType: "Run", concurrent: 2, targetStep: 5 }, ] - const request = getRequest(WORKFLOW_ENDPOINT, workflowId, requestPayload, requestSteps); + const request = getRequest(WORKFLOW_ENDPOINT, workflowRunId, requestPayload, requestSteps); const requestFromPayload = (await getPayload(request)) ?? ""; const { rawInitialPayload, steps, isLastDuplicate } = await parseRequest( requestFromPayload, - false + false, + workflowRunId, + qstashClient.http ); expect(rawInitialPayload).toBe(requestPayload); @@ -407,12 +480,14 @@ describe("Workflow Parser", () => { { stepId: 5, stepName: "successStep2", stepType: "Run", out: "20", concurrent: 2 }, // duplicate ] - const request = getRequest(WORKFLOW_ENDPOINT, workflowId, requestPayload, requestSteps); + const request = getRequest(WORKFLOW_ENDPOINT, workflowRunId, requestPayload, requestSteps); const requestFromPayload = (await getPayload(request)) ?? ""; const { rawInitialPayload, steps, isLastDuplicate } = await parseRequest( requestFromPayload, - false + false, + workflowRunId, + qstashClient.http ); expect(rawInitialPayload).toBe(requestPayload); @@ -443,12 +518,14 @@ describe("Workflow Parser", () => { { stepId: 5, stepName: "successStep2", stepType: "Run", out: '"20"', concurrent: 2 }, // duplicate ] - const request = getRequest(WORKFLOW_ENDPOINT, workflowId, requestPayload, requestSteps); + const request = getRequest(WORKFLOW_ENDPOINT, workflowRunId, requestPayload, requestSteps); const requestFromPayload = (await getPayload(request)) ?? ""; const { rawInitialPayload, steps, isLastDuplicate } = await parseRequest( requestFromPayload, - false + false, + workflowRunId, + qstashClient.http ); expect(rawInitialPayload).toBe(requestPayload); @@ -475,12 +552,14 @@ describe("Workflow Parser", () => { { stepId: 2, stepName: "retrySleep", stepType: "SleepFor", sleepFor: 1_000_000, concurrent: 1 }, // duplicate ] - const request = getRequest(WORKFLOW_ENDPOINT, workflowId, requestPayload, requestSteps); + const request = getRequest(WORKFLOW_ENDPOINT, workflowRunId, requestPayload, requestSteps); const requestFromPayload = (await getPayload(request)) ?? ""; const { rawInitialPayload, steps, isLastDuplicate } = await parseRequest( requestFromPayload, - false + false, + workflowRunId, + qstashClient.http ); expect(rawInitialPayload).toBe(requestPayload); @@ -502,12 +581,14 @@ describe("Workflow Parser", () => { { stepId: 2, stepName: "retrySleep", stepType: "SleepFor", sleepFor: 1_000_000, concurrent: 1 }, ] - const request = getRequest(WORKFLOW_ENDPOINT, workflowId, requestPayload, requestSteps); + const request = getRequest(WORKFLOW_ENDPOINT, workflowRunId, requestPayload, requestSteps); const requestFromPayload = (await getPayload(request)) ?? ""; const { rawInitialPayload, steps, isLastDuplicate } = await parseRequest( requestFromPayload, - false + false, + workflowRunId, + qstashClient.http ); expect(rawInitialPayload).toBe(requestPayload); @@ -541,12 +622,14 @@ describe("Workflow Parser", () => { { stepId: 5, stepName: "successStep2", stepType: "Run", out: '"20"', concurrent: 2 }, ] - const request = getRequest(WORKFLOW_ENDPOINT, workflowId, requestPayload, requestSteps); + const request = getRequest(WORKFLOW_ENDPOINT, workflowRunId, requestPayload, requestSteps); const requestFromPayload = (await getPayload(request)) ?? ""; const { rawInitialPayload, steps, isLastDuplicate } = await parseRequest( requestFromPayload, - false + false, + workflowRunId, + qstashClient.http ); expect(rawInitialPayload).toBe(requestPayload); @@ -584,12 +667,14 @@ describe("Workflow Parser", () => { { stepId: 5, stepName: "successStep2", stepType: "Run", out: '"20"', concurrent: 2 }, ] - const request = getRequest(WORKFLOW_ENDPOINT, workflowId, requestPayload, requestSteps); + const request = getRequest(WORKFLOW_ENDPOINT, workflowRunId, requestPayload, requestSteps); const requestFromPayload = (await getPayload(request)) ?? ""; const { rawInitialPayload, steps, isLastDuplicate } = await parseRequest( requestFromPayload, - false + false, + workflowRunId, + qstashClient.http ); expect(rawInitialPayload).toBe(requestPayload); @@ -622,7 +707,7 @@ describe("Workflow Parser", () => { const body = { status: 201, header: { myHeader: "value" }, - body: btoa(JSON.stringify(formatWorkflowError(new QStashWorkflowError(failMessage)))), + body: btoa(JSON.stringify(formatWorkflowError(new WorkflowError(failMessage)))), url: WORKFLOW_ENDPOINT, sourceHeader: { Authorization: authorization, @@ -664,7 +749,7 @@ describe("Workflow Parser", () => { expect(result2.isOk() && result2.value === "not-failure-callback").toBeTrue(); }); - test("should throw QStashWorkflowError if header is set but function is not passed", async () => { + test("should throw WorkflowError if header is set but function is not passed", async () => { const request = new Request(WORKFLOW_ENDPOINT, { headers: { [WORKFLOW_FAILURE_HEADER]: "true", @@ -673,7 +758,7 @@ describe("Workflow Parser", () => { const result = await handleFailure(request, "", client, initialPayloadParser); expect(result.isErr()).toBeTrue(); - expect(result.isErr() && result.error.name).toBe(QStashWorkflowError.name); + expect(result.isErr() && result.error.name).toBe(WorkflowError.name); expect(result.isErr() && result.error.message).toBe( "Workflow endpoint is called to handle a failure," + " but a failureFunction is not provided in serve options." + diff --git a/src/workflow-parser.ts b/src/workflow-parser.ts index 22f7d40..3cd9b82 100644 --- a/src/workflow-parser.ts +++ b/src/workflow-parser.ts @@ -1,6 +1,6 @@ import type { Err, Ok } from "neverthrow"; import { err, ok } from "neverthrow"; -import { QStashWorkflowError } from "./error"; +import { WorkflowError } from "./error"; import { NO_CONCURRENCY, WORKFLOW_FAILURE_HEADER, @@ -20,6 +20,8 @@ import type { WorkflowLogger } from "./logger"; import { WorkflowContext } from "./context"; import { recreateUserHeaders } from "./workflow-requests"; import { decodeBase64, getWorkflowRunId } from "./utils"; +import { getSteps } from "./client/utils"; +import { Client } from "@upstash/qstash"; /** * Gets the request body. If that fails, returns undefined @@ -45,11 +47,11 @@ export const getPayload = async (request: Request) => { * When returning steps, we add the initial payload as initial step. This is to make it simpler * in the rest of the code. * - * @param rawPayload body of the request as a string as explained above + * @param rawSteps body of the request as a string as explained above * @returns intiial payload and list of steps */ -const parsePayload = async (rawPayload: string, debug?: WorkflowLogger) => { - const [encodedInitialPayload, ...encodedSteps] = JSON.parse(rawPayload) as RawStep[]; +const processRawSteps = async (rawSteps: RawStep[], debug?: WorkflowLogger) => { + const [encodedInitialPayload, ...encodedSteps] = rawSteps; // decode initial payload: const rawInitialPayload = decodeBase64(encodedInitialPayload.body); @@ -172,7 +174,7 @@ const checkIfLastOneIsDuplicate = async ( * Validates the incoming request checking the workflow protocol * version and whether it is the first invocation. * - * Raises `QStashWorkflowError` if: + * Raises `WorkflowError` if: * - it's not the first invocation and expected protocol version doesn't match * the request. * - it's not the first invocation but there is no workflow id in the headers. @@ -188,7 +190,7 @@ export const validateRequest = ( // if it's not the first invocation, verify that the workflow protocal version is correct if (!isFirstInvocation && versionHeader !== WORKFLOW_PROTOCOL_VERSION) { - throw new QStashWorkflowError( + throw new WorkflowError( `Incompatible workflow sdk protocol version. Expected ${WORKFLOW_PROTOCOL_VERSION},` + ` got ${versionHeader} from the request.` ); @@ -199,7 +201,7 @@ export const validateRequest = ( ? getWorkflowRunId() : (request.headers.get(WORKFLOW_ID_HEADER) ?? ""); if (workflowRunId.length === 0) { - throw new QStashWorkflowError("Couldn't get workflow id from header"); + throw new WorkflowError("Couldn't get workflow id from header"); } return { @@ -220,6 +222,9 @@ export const validateRequest = ( export const parseRequest = async ( requestPayload: string | undefined, isFirstInvocation: boolean, + workflowRunId: string, + requester: Client["http"], + messageId?: string, debug?: WorkflowLogger ): Promise<{ rawInitialPayload: string; @@ -234,11 +239,20 @@ export const parseRequest = async ( isLastDuplicate: false, }; } else { - // if not the first invocation, make sure that body is not empty and parse payload + let rawSteps: RawStep[]; + if (!requestPayload) { - throw new QStashWorkflowError("Only first call can have an empty body"); + await debug?.log( + "INFO", + "ENDPOINT_START", + "request payload is empty, steps will be fetched from QStash." + ); + rawSteps = await getSteps(requester, workflowRunId, messageId, debug); + } else { + rawSteps = JSON.parse(requestPayload) as RawStep[]; } - const { rawInitialPayload, steps } = await parsePayload(requestPayload, debug); + const { rawInitialPayload, steps } = await processRawSteps(rawSteps, debug); + const isLastDuplicate = await checkIfLastOneIsDuplicate(steps, debug); const deduplicatedSteps = deduplicateSteps(steps); @@ -255,7 +269,7 @@ export const parseRequest = async ( * attempts to call the failureFunction function. * * If the header is set but failureFunction is not passed, returns - * QStashWorkflowError. + * WorkflowError. * * @param request incoming request * @param failureFunction function to handle the failure @@ -276,7 +290,7 @@ export const handleFailure = async ( if (!failureFunction) { return err( - new QStashWorkflowError( + new WorkflowError( "Workflow endpoint is called to handle a failure," + " but a failureFunction is not provided in serve options." + " Either provide a failureUrl or a failureFunction." @@ -295,27 +309,19 @@ export const handleFailure = async ( sourceHeader: Record; sourceBody: string; workflowRunId: string; + sourceMessageId: string; }; const decodedBody = body ? decodeBase64(body) : "{}"; const errorPayload = JSON.parse(decodedBody) as FailureFunctionPayload; - // parse steps - const { - rawInitialPayload, - steps, - // eslint-disable-next-line @typescript-eslint/no-unused-vars - isLastDuplicate: _isLastDuplicate, - } = await parseRequest(decodeBase64(sourceBody), false, debug); - // create context const workflowContext = new WorkflowContext({ qstashClient, workflowRunId, - initialPayload: initialPayloadParser(rawInitialPayload), - rawInitialPayload, + initialPayload: initialPayloadParser(decodeBase64(sourceBody)), headers: recreateUserHeaders(new Headers(sourceHeader) as Headers), - steps, + steps: [], url: url, failureUrl: url, debug, diff --git a/src/workflow-requests.test.ts b/src/workflow-requests.test.ts index 4b5e09a..a882042 100644 --- a/src/workflow-requests.test.ts +++ b/src/workflow-requests.test.ts @@ -10,7 +10,7 @@ import { triggerRouteFunction, triggerWorkflowDelete, } from "./workflow-requests"; -import { QStashWorkflowAbort } from "./error"; +import { WorkflowAbort } from "./error"; import { WorkflowContext } from "./context"; import { Client } from "@upstash/qstash"; import type { Step, StepType } from "./types"; @@ -29,6 +29,7 @@ import { mockQStashServer, WORKFLOW_ENDPOINT, } from "./test-utils"; +import { FinishState } from "./integration.test"; describe("Workflow Requests", () => { test("triggerFirstInvocation", async () => { @@ -67,14 +68,17 @@ describe("Workflow Requests", () => { }); describe("triggerRouteFunction", () => { - test("should get step-finished when QStashWorkflowAbort is thrown", async () => { + test("should get step-finished when WorkflowAbort is thrown", async () => { const result = await triggerRouteFunction({ onStep: () => { - throw new QStashWorkflowAbort("name"); + throw new WorkflowAbort("name"); }, onCleanup: async () => { await Promise.resolve(); }, + onCancel: () => { + throw new Error("Something went wrong!"); + }, }); expect(result.isOk()).toBeTrue(); // @ts-expect-error value will be set since stepFinish isOk @@ -89,6 +93,9 @@ describe("Workflow Requests", () => { onCleanup: async () => { await Promise.resolve(); }, + onCancel: () => { + throw new Error("Something went wrong!"); + }, }); expect(result.isOk()).toBeTrue(); // @ts-expect-error value will be set since stepFinish isOk @@ -103,6 +110,9 @@ describe("Workflow Requests", () => { onCleanup: async () => { await Promise.resolve(); }, + onCancel: () => { + throw new Error("Something went wrong!"); + }, }); expect(result.isErr()).toBeTrue(); }); @@ -115,11 +125,48 @@ describe("Workflow Requests", () => { onCleanup: () => { throw new Error("Something went wrong!"); }, + onCancel: () => { + throw new Error("Something went wrong!"); + }, }); expect(result.isErr()).toBeTrue(); }); }); + test("should call onCancel if context.cancel is called", async () => { + const workflowRunId = nanoid(); + const token = "myToken"; + + const context = new WorkflowContext({ + qstashClient: new Client({ baseUrl: MOCK_SERVER_URL, token }), + workflowRunId: workflowRunId, + initialPayload: undefined, + headers: new Headers({}) as Headers, + steps: [], + url: WORKFLOW_ENDPOINT, + }); + + const finished = new FinishState(); + const result = await triggerRouteFunction({ + onStep: async () => { + await context.cancel(); + await context.run("shouldn't call", () => { + throw new Error("shouldn't call context.run"); + }); + }, + onCleanup: async () => { + throw new Error("shouldn't call"); + }, + onCancel: async () => { + finished.finish(); + }, + }); + finished.check(); + expect(result.isOk()).toBeTrue(); + // @ts-expect-error value will be set since result isOk + expect(result.value).toBe("workflow-finished"); + }); + test("should call publishJSON in triggerWorkflowDelete", async () => { const workflowRunId = nanoid(); const token = "myToken"; @@ -345,6 +392,7 @@ describe("Workflow Requests", () => { [WORKFLOW_INIT_HEADER]: "true", [WORKFLOW_ID_HEADER]: workflowRunId, [WORKFLOW_URL_HEADER]: WORKFLOW_ENDPOINT, + [WORKFLOW_FEATURE_HEADER]: "LazyFetch,InitialBody", [`Upstash-Forward-${WORKFLOW_PROTOCOL_VERSION_HEADER}`]: WORKFLOW_PROTOCOL_VERSION, }); expect(timeoutHeaders).toBeUndefined(); @@ -371,6 +419,7 @@ describe("Workflow Requests", () => { [WORKFLOW_INIT_HEADER]: "false", [WORKFLOW_ID_HEADER]: workflowRunId, [WORKFLOW_URL_HEADER]: WORKFLOW_ENDPOINT, + [WORKFLOW_FEATURE_HEADER]: "LazyFetch,InitialBody", [`Upstash-Forward-${WORKFLOW_PROTOCOL_VERSION_HEADER}`]: WORKFLOW_PROTOCOL_VERSION, }); expect(timeoutHeaders).toBeUndefined(); @@ -404,10 +453,11 @@ describe("Workflow Requests", () => { } ); expect(headers).toEqual({ - [WORKFLOW_FEATURE_HEADER]: "WF_NoDelete", [WORKFLOW_INIT_HEADER]: "false", [WORKFLOW_ID_HEADER]: workflowRunId, [WORKFLOW_URL_HEADER]: WORKFLOW_ENDPOINT, + [WORKFLOW_FEATURE_HEADER]: "WF_NoDelete,InitialBody", + "Upstash-Callback-Feature-Set": "LazyFetch,InitialBody", "Upstash-Retries": "0", "Upstash-Callback": WORKFLOW_ENDPOINT, "Upstash-Callback-Forward-Upstash-Workflow-Callback": "true", @@ -440,6 +490,7 @@ describe("Workflow Requests", () => { [WORKFLOW_INIT_HEADER]: "true", [WORKFLOW_ID_HEADER]: workflowRunId, [WORKFLOW_URL_HEADER]: WORKFLOW_ENDPOINT, + [WORKFLOW_FEATURE_HEADER]: "LazyFetch,InitialBody", [`Upstash-Forward-${WORKFLOW_PROTOCOL_VERSION_HEADER}`]: WORKFLOW_PROTOCOL_VERSION, [`Upstash-Failure-Callback-Forward-${WORKFLOW_FAILURE_HEADER}`]: "true", "Upstash-Failure-Callback": failureUrl, @@ -466,6 +517,7 @@ describe("Workflow Requests", () => { "Upstash-Workflow-Init": "false", "Upstash-Workflow-RunId": workflowRunId, "Upstash-Workflow-Url": WORKFLOW_ENDPOINT, + [WORKFLOW_FEATURE_HEADER]: "LazyFetch,InitialBody", "Upstash-Forward-Upstash-Workflow-Sdk-Version": "1", "Upstash-Workflow-CallType": "step", }); @@ -473,6 +525,7 @@ describe("Workflow Requests", () => { "Upstash-Workflow-Init": ["false"], "Upstash-Workflow-RunId": [workflowRunId], "Upstash-Workflow-Url": [WORKFLOW_ENDPOINT], + [WORKFLOW_FEATURE_HEADER]: ["LazyFetch,InitialBody"], "Upstash-Forward-Upstash-Workflow-Sdk-Version": ["1"], "Upstash-Workflow-Runid": [workflowRunId], "Upstash-Workflow-CallType": ["step"], diff --git a/src/workflow-requests.ts b/src/workflow-requests.ts index 6e42d74..86543e5 100644 --- a/src/workflow-requests.ts +++ b/src/workflow-requests.ts @@ -1,6 +1,6 @@ import type { Err, Ok } from "neverthrow"; import { err, ok } from "neverthrow"; -import { QStashWorkflowAbort, QStashWorkflowError } from "./error"; +import { WorkflowAbort, WorkflowError } from "./error"; import type { WorkflowContext } from "./context"; import { DEFAULT_CONTENT_TYPE, @@ -22,6 +22,7 @@ import type { } from "./types"; import { StepTypes } from "./types"; import type { WorkflowLogger } from "./logger"; +import { getSteps } from "./client/utils"; export const triggerFirstInvocation = async ( workflowContext: WorkflowContext, @@ -63,19 +64,29 @@ export const triggerFirstInvocation = async ( export const triggerRouteFunction = async ({ onCleanup, onStep, + onCancel, }: { onStep: () => Promise; onCleanup: () => Promise; + onCancel: () => Promise; }): Promise | Err> => { try { - // When onStep completes successfully, it throws an exception named `QStashWorkflowAbort`, indicating that the step has been successfully executed. + // When onStep completes successfully, it throws an exception named `WorkflowAbort`, + // indicating that the step has been successfully executed. // This ensures that onCleanup is only called when no exception is thrown. await onStep(); await onCleanup(); return ok("workflow-finished"); } catch (error) { const error_ = error as Error; - return error_ instanceof QStashWorkflowAbort ? ok("step-finished") : err(error_); + if (!(error_ instanceof WorkflowAbort)) { + return err(error_); + } else if (error_.cancelWorkflow) { + await onCancel(); + return ok("workflow-finished"); + } else { + return ok("step-finished"); + } } }; @@ -152,7 +163,32 @@ export const handleThirdPartyCallResult = async ( > => { try { if (request.headers.get("Upstash-Workflow-Callback")) { - const callbackMessage = JSON.parse(requestPayload) as { + let callbackPayload: string; + if (requestPayload) { + callbackPayload = requestPayload; + } else { + const workflowRunId = request.headers.get("upstash-workflow-runid"); + const messageId = request.headers.get("upstash-message-id"); + + if (!workflowRunId) + throw new WorkflowError("workflow run id missing in context.call lazy fetch."); + if (!messageId) throw new WorkflowError("message id missing in context.call lazy fetch."); + + const steps = await getSteps(client.http, workflowRunId, messageId, debug); + const failingStep = steps.find((step) => step.messageId === messageId); + + if (!failingStep) + throw new WorkflowError( + "Failed to submit the context.call." + + (steps.length === 0 + ? "No steps found." + : `No step was found with matching messageId ${messageId} out of ${steps.length} steps.`) + ); + + callbackPayload = atob(failingStep.body); + } + + const callbackMessage = JSON.parse(callbackPayload) as { status: number; body: string; retried?: number; // only set after the first try @@ -256,9 +292,7 @@ export const handleThirdPartyCallResult = async ( } catch (error) { const isCallReturn = request.headers.get("Upstash-Workflow-Callback"); return err( - new QStashWorkflowError( - `Error when handling call return (isCallReturn=${isCallReturn}): ${error}` - ) + new WorkflowError(`Error when handling call return (isCallReturn=${isCallReturn}): ${error}`) ); } }; @@ -292,6 +326,7 @@ export const getHeaders = ( [WORKFLOW_INIT_HEADER]: initHeaderValue, [WORKFLOW_ID_HEADER]: workflowRunId, [WORKFLOW_URL_HEADER]: workflowUrl, + [WORKFLOW_FEATURE_HEADER]: "LazyFetch,InitialBody", }; if (!step?.callUrl) { @@ -309,7 +344,7 @@ export const getHeaders = ( // for call url, retry is 0 if (step?.callUrl) { baseHeaders["Upstash-Retries"] = callRetries?.toString() ?? "0"; - baseHeaders[WORKFLOW_FEATURE_HEADER] = "WF_NoDelete"; + baseHeaders[WORKFLOW_FEATURE_HEADER] = "WF_NoDelete,InitialBody"; // if some retries is set, use it in callback and failure callback if (retries) { @@ -354,6 +389,7 @@ export const getHeaders = ( "Upstash-Callback-Workflow-CallType": "fromCallback", "Upstash-Callback-Workflow-Init": "false", "Upstash-Callback-Workflow-Url": workflowUrl, + "Upstash-Callback-Feature-Set": "LazyFetch,InitialBody", "Upstash-Callback-Forward-Upstash-Workflow-Callback": "true", "Upstash-Callback-Forward-Upstash-Workflow-StepId": step.stepId.toString(), @@ -411,7 +447,7 @@ export const verifyRequest = async ( throw new Error("Signature in `Upstash-Signature` header is not valid"); } } catch (error) { - throw new QStashWorkflowError( + throw new WorkflowError( `Failed to verify that the Workflow request comes from QStash: ${error}\n\n` + "If signature is missing, trigger the workflow endpoint by publishing your request to QStash instead of calling it directly.\n\n" + "If you want to disable QStash Verification, you should clear env variables QSTASH_CURRENT_SIGNING_KEY and QSTASH_NEXT_SIGNING_KEY"