Skip to content

Commit

Permalink
fix: add wait/notify tests
Browse files Browse the repository at this point in the history
  • Loading branch information
CahidArda committed Oct 14, 2024
1 parent 8d18bbb commit e9ecc87
Show file tree
Hide file tree
Showing 15 changed files with 555 additions and 115 deletions.
47 changes: 47 additions & 0 deletions src/client/index.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import { describe, test } from "bun:test";
import { MOCK_QSTASH_SERVER_URL, mockQStashServer } from "../test-utils";
import { Client } from ".";
import { nanoid } from "../utils";

describe("workflow client", () => {
const token = nanoid();
const client = new Client({ baseUrl: MOCK_QSTASH_SERVER_URL, token });

test("should send cancel", async () => {
const workflowRunId = `wfr-${nanoid()}`;
await mockQStashServer({
execute: async () => {
await client.cancel({ workflowRunId });
},
responseFields: {
status: 200,
body: "msgId",
},
receivesRequest: {
method: "DELETE",
url: `${MOCK_QSTASH_SERVER_URL}/v2/workflows/runs/${workflowRunId}?cancel=true`,
token,
},
});
});

test("should send notify", async () => {
const eventId = `event-id-${nanoid()}`;
const notifyData = { data: `notify-data-${nanoid()}` };
await mockQStashServer({
execute: async () => {
await client.notify({ eventId, notifyBody: JSON.stringify(notifyData) });
},
responseFields: {
status: 200,
body: "msgId",
},
receivesRequest: {
method: "POST",
url: `${MOCK_QSTASH_SERVER_URL}/v2/notify/${eventId}`,
token,
body: notifyData,
},
});
});
});
35 changes: 20 additions & 15 deletions src/client/index.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import { NotifyResponse } from "../types";
import { Client } from "@upstash/qstash"
import { Client as QStashClient } from "@upstash/qstash";

type ClientConfig = ConstructorParameters<typeof Client>[0];
type ClientConfig = ConstructorParameters<typeof QStashClient>[0];

export class WorkflowClient {
private client: Client
export class Client {
private client: QStashClient;

constructor(clientConfig: ClientConfig) {
if (!clientConfig.baseUrl || !clientConfig.token) {
console.warn("[Upstash Workflow] url or the token is not set. client will not work.");
};
this.client = new Client(clientConfig)
}
this.client = new QStashClient(clientConfig);
}

/**
Expand All @@ -19,7 +19,7 @@ export class WorkflowClient {
* @param workflowRunId run id of the workflow to delete
* @returns true if workflow is succesfully deleted. Otherwise throws QStashError
*/
public async cancel(workflowRunId: string) {
public async cancel({ workflowRunId }: { workflowRunId: string }) {
const result = (await this.client.http.request({
path: ["v2", "workflows", "runs", `${workflowRunId}?cancel=true`],
method: "DELETE",
Expand All @@ -29,19 +29,24 @@ export class WorkflowClient {
}

/**
* Notify waiting
*
* @param eventId
* @param notifyData
* Notify a workflow run waiting for an event
*
* @param eventId event id to notify
* @param notifyData data to provide to the workflow
*/
public async notify(eventId: string, notifyData: string): Promise<NotifyResponse[]> {

public async notify({
eventId,
notifyBody,
}: {
eventId: string;
notifyBody?: string;
}): Promise<NotifyResponse[]> {
const result = (await this.client.http.request({
path: ["v2", "notify", eventId],
method: "POST",
body: notifyData
body: notifyBody,
})) as NotifyResponse[];

return result
return result;
}
}
20 changes: 7 additions & 13 deletions src/context/auto-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ export class AutoExecutor {
initialStepCount: number
): ParallelCallState {
const remainingSteps = this.steps.filter(
(step) => (step.targetStep ?? step.stepId) >= initialStepCount
(step) => (step.targetStep || step.stepId) >= initialStepCount
);

if (remainingSteps.length === 0) {
Expand Down Expand Up @@ -321,15 +321,9 @@ export class AutoExecutor {
length: steps.length,
steps,
});

if (steps[0].waitEventId) {
if (steps.length !== 1) {
throw new QStashWorkflowError(
`Received an unexpected step array. If a step has waitEventId, it should be by itself. Received instead: ${steps}`
)
}

const waitStep = steps[0]
if (steps[0].waitEventId && steps.length === 1) {
const waitStep = steps[0];

const { headers, timeoutHeaders } = getHeaders(
"false",
Expand All @@ -353,9 +347,9 @@ export class AutoExecutor {
stepType: "Wait",
stepName: waitStep.stepName,
concurrent: waitStep.concurrent,
targetStep: waitStep.targetStep
}
}
targetStep: waitStep.targetStep,
},
};

await this.context.qstashClient.http.request({
path: ["v2", "wait", waitStep.waitEventId],
Expand Down Expand Up @@ -531,6 +525,6 @@ const validateParallelSteps = (lazySteps: BaseLazyStep[], stepsFromRequest: Step
* @returns sorted steps
*/
const sortSteps = (steps: Step[]): Step[] => {
const getStepId = (step: Step) => step.targetStep ?? step.stepId;
const getStepId = (step: Step) => step.targetStep || step.stepId;
return steps.toSorted((step, stepOther) => getStepId(step) - getStepId(stepOther));
};
118 changes: 118 additions & 0 deletions src/context/context.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ import { WorkflowContext } from "./context";
import { Client } from "@upstash/qstash";
import { nanoid } from "../utils";
import { QStashWorkflowError } from "../error";
import {
WORKFLOW_ID_HEADER,
WORKFLOW_INIT_HEADER,
WORKFLOW_PROTOCOL_VERSION_HEADER,
WORKFLOW_URL_HEADER,
} from "../constants";

describe("context tests", () => {
const token = nanoid();
Expand Down Expand Up @@ -144,4 +150,116 @@ describe("context tests", () => {
},
});
});

describe("wait for event step", () => {
test("should send request to wait endpoint if there is a wait for event step", async () => {
const context = new WorkflowContext({
qstashClient,
initialPayload: "my-payload",
steps: [],
url: WORKFLOW_ENDPOINT,
headers: new Headers() as Headers,
workflowRunId: "wfr-id",
});

const eventId = "my-event-id";
await mockQStashServer({
execute: () => {
const throws = () => context.waitForEvent("my-step", eventId, 20);
expect(throws).toThrowError("Aborting workflow after executing step 'my-step'.");
},
responseFields: {
status: 200,
body: "msgId",
},
receivesRequest: {
method: "POST",
url: `${MOCK_QSTASH_SERVER_URL}/v2/wait/${eventId}`,
token,
body: {
step: {
concurrent: 1,
stepId: 1,
stepName: "my-step",
stepType: "Wait",
},
timeout: "20s",
timeoutHeaders: {
"Content-Type": ["application/json"],
[`Upstash-Forward-${WORKFLOW_PROTOCOL_VERSION_HEADER}`]: ["1"],
"Upstash-Retries": ["3"],
"Upstash-Workflow-CallType": ["step"],
[WORKFLOW_INIT_HEADER]: ["false"],
[WORKFLOW_ID_HEADER]: ["wfr-id"],
"Upstash-Workflow-Runid": ["wfr-id"],
[WORKFLOW_URL_HEADER]: [WORKFLOW_ENDPOINT],
},
timeoutUrl: WORKFLOW_ENDPOINT,
url: WORKFLOW_ENDPOINT,
},
},
});
});

test("should send request to batch endpoint if there is a parallel wait for event step", async () => {
const context = new WorkflowContext({
qstashClient,
initialPayload: "my-payload",
steps: [],
url: WORKFLOW_ENDPOINT,
headers: new Headers() as Headers,
workflowRunId: "wfr-id",
});

const eventId = "my-event-id";
await mockQStashServer({
execute: () => {
const throws = () =>
Promise.all([
context.waitForEvent("my-wait-step", eventId, 20),
context.run("my-run-step", () => "foo"),
]);
expect(throws).toThrowError("Aborting workflow after executing step 'my-wait-step'.");
},
responseFields: {
status: 200,
body: "msgId",
},
receivesRequest: {
method: "POST",
url: `${MOCK_QSTASH_SERVER_URL}/v2/batch`,
token,
body: [
{
body: '{"stepId":0,"stepName":"my-wait-step","stepType":"Wait","waitEventId":"my-event-id","timeout":"20s","concurrent":2,"targetStep":1}',
destination: WORKFLOW_ENDPOINT,
headers: {
"content-type": "application/json",
"upstash-forward-upstash-workflow-sdk-version": "1",
"upstash-method": "POST",
"upstash-retries": "3",
"upstash-workflow-calltype": "step",
"upstash-workflow-init": "false",
"upstash-workflow-runid": "wfr-id",
"upstash-workflow-url": WORKFLOW_ENDPOINT,
},
},
{
body: '{"stepId":0,"stepName":"my-run-step","stepType":"Run","concurrent":2,"targetStep":2}',
destination: WORKFLOW_ENDPOINT,
headers: {
"content-type": "application/json",
"upstash-forward-upstash-workflow-sdk-version": "1",
"upstash-method": "POST",
"upstash-retries": "3",
"upstash-workflow-init": "false",
"upstash-workflow-runid": "wfr-id",
"upstash-workflow-url": WORKFLOW_ENDPOINT,
},
},
],
},
});
});
});
});
14 changes: 10 additions & 4 deletions src/context/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@ import type { WaitStepResponse, WorkflowClient } from "../types";
import { type StepFunction, type Step } from "../types";
import { AutoExecutor } from "./auto-executor";
import type { BaseLazyStep } from "./steps";
import { LazyCallStep, LazyFunctionStep, LazySleepStep, LazySleepUntilStep, LazyWaitStep } from "./steps";
import {
LazyCallStep,
LazyFunctionStep,
LazySleepStep,
LazySleepUntilStep,
LazyWaitForEventStep,
} from "./steps";
import type { HTTPMethods } from "@upstash/qstash";
import type { WorkflowLogger } from "../logger";
import { DEFAULT_RETRIES } from "../constants";
Expand Down Expand Up @@ -292,15 +298,15 @@ export class WorkflowContext<TInitialPayload = unknown> {
public async waitForEvent(
stepName: string,
eventId: string,
timeout: string | number,
timeout: string | number
): Promise<WaitStepResponse> {
const result = await this.addStep(
new LazyWaitStep(
new LazyWaitForEventStep(
stepName,
eventId,
typeof timeout === "string" ? timeout : `${timeout}s`
)
)
);

return result;
}
Expand Down
41 changes: 40 additions & 1 deletion src/context/steps.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import { describe, test, expect } from "bun:test";
import { LazyCallStep, LazyFunctionStep, LazySleepStep, LazySleepUntilStep } from "./steps";
import {
LazyCallStep,
LazyFunctionStep,
LazySleepStep,
LazySleepUntilStep,
LazyWaitForEventStep,
} from "./steps";
import { nanoid } from "../utils";
import type { Step } from "../types";

Expand Down Expand Up @@ -141,4 +147,37 @@ describe("test steps", () => {
});
});
});

describe("wait step", () => {
const eventId = "my-event-id";
const timeout = "10s";
const step = new LazyWaitForEventStep(stepName, eventId, timeout);

test("should set correct fields", () => {
expect(step.stepName).toBe(stepName);
expect(step.stepType).toBe("Wait");
});
test("should create plan step", () => {
expect(step.getPlanStep(concurrent, targetStep)).toEqual({
stepId: 0,
stepName,
stepType: "Wait",
waitEventId: eventId,
timeout,
concurrent,
targetStep,
});
});

test("should create result step", async () => {
expect(await step.getResultStep(4, stepId)).toEqual({
waitEventId: eventId,
timeout,
concurrent: 4,
stepId,
stepName,
stepType: "Wait",
});
});
});
});
Loading

0 comments on commit e9ecc87

Please sign in to comment.