diff --git a/codeblocks/codeblocks.json b/codeblocks/codeblocks.json index f9726e52..6482145a 100644 --- a/codeblocks/codeblocks.json +++ b/codeblocks/codeblocks.json @@ -1 +1 @@ -{"https://github.com/orkes-io/workflow-cicd/blob/main/src/deploy_workflows.sh---1":"{`export response=\\`curl -s -X POST $CONDUCTOR_SERVER_URL/token -H 'Content-Type:application/json' -d '{\n\t\"keyId\": \"'\"$KEY\"'\",\n\t\"keySecret\": \"'\"$SECRET\"'\"\n}'\\`\n\nif [[ \"$response\" != *'token'* ]]; then\n echo \"Unable to generate the auth header. Please check KEY, SECRET and CONDUCTOR_SERVER_URL variables\"\n echo \"Server response:\"\n echo $response\n exit 1\nfi\n\nexport token=\\`echo $response | cut -d '\"' -f4\\`\n\nfor FILE in main/resources/workflows/*;\n do\n echo \"Deploying @$FILE\";\n\n curl -X POST $CONDUCTOR_SERVER_URL/metadata/workflow?overwrite=true \\\n -H \"X-Authorization: $token\" \\\n -H \"accept: */*\" \\\n -H \"Content-Type: application/json\" \\\n -d @$FILE\n done\n`}","https://github.com/orkes-io/workflow-cicd/blob/main/src/deploy_workflows.sh---1-lines":"#L8-L32","https://github.com/conductor-sdk/orkes-java-springboot2-example/blob/main/src/main/java/io/orkes/example/banking/workers/ConductorWorkers.java---1":"{`\n /**\n * Note: Using this setting, up to 5 tasks will run in parallel, with tasks being polled every 200ms\n */\n @WorkerTask(value = \"fraud-check\", threadCount = 5, pollingInterval = 200)\n public FraudCheckResult checkForFraudTask(DepositDetail depositDetail) {\n return fraudCheckService.checkForFraud(depositDetail);\n }\n`}","https://github.com/conductor-sdk/orkes-java-springboot2-example/blob/main/src/main/java/io/orkes/example/banking/workers/ConductorWorkers.java---1-lines":"#L27-L35","https://github.com/conductor-sdk/csharp-sdk-examples/blob/main/Examples/Worker/Workers.cs---1":"{` // Note: Using this setting, up to 5 tasks will run in parallel, with tasks being polled every 200ms\n [WorkerTask(taskType: \"fraud-check\", batchSize: 5, domain: null, pollIntervalMs: 200, workerId: \"workerId\")]\n public TaskResult FraudWorker(Task task)\n {\n var depositDetail = (DepositDetail)task.InputData[\"depositDetail\"];\n var fraudCheckResult = _fraudCheckService.CheckForFraud(depositDetail);\n var result = task.Completed();\n result.OutputData = Examples.Util.TypeUtil.GetDictionaryFromObject(fraudCheckResult);\n return result;\n }`}","https://github.com/conductor-sdk/csharp-sdk-examples/blob/main/Examples/Worker/Workers.cs---1-lines":"#L25-L34","https://github.com/conductor-sdk/javascript-sdk-examples/blob/main/src/banking/workers/workers.js---1":"{`const fraudCheckWorker = {\n taskDefName: \"fraud-check\",\n execute: async ({ inputData }) => {\n const { amount, accountId } = inputData;\n const fraudResult = fraudService.isFraudulentTxn(accountId, amount);\n return {\n outputData: fraudResult,\n status: \"COMPLETED\",\n };\n },\n domain: \"fraud\", // Optional\n pollInterval: 100, // Optional can be specified in the TaskManager\n concurrency: 1, // Optional can be specified in the TaskManager\n};`}","https://github.com/conductor-sdk/javascript-sdk-examples/blob/main/src/banking/workers/workers.js---1-lines":"#L4-L17","https://github.com/conductor-sdk/typescript-examples/blob/main/src/banking/workers/workers.ts---1":"{`export const fraudCheckWorker: ConductorWorker = {\n taskDefName: \"fraud-check\",\n execute: async ({ inputData }) => {\n const amount = inputData?.amount;\n const accountId = inputData?.accountId;\n const fraudResult = fraudService.isFraudulentTxn(accountId, amount);\n return {\n outputData: fraudResult,\n status: \"COMPLETED\",\n };\n },\n domain: \"fraud\", // Optional\n pollInterval: 100, // Optional can be specified in the TaskManager\n concurrency: 2, // Optional can be specified in the TaskManager\n};`}","https://github.com/conductor-sdk/typescript-examples/blob/main/src/banking/workers/workers.ts---1-lines":"#L5-L19","https://github.com/conductor-sdk/orkes-java-springboot2-example/blob/main/src/main/java/io/orkes/example/banking/workers/ConductorWorkers.java---2":"{`\n @WorkerTask(value = \"retrieve-deposit-batch\", threadCount = 5, pollingInterval = 200)\n public List retrieveDepositBatch(@InputParam(\"batchCount\") Integer batchCount) {\n if (batchCount == null) {\n batchCount = random.nextInt(5, 11);\n }\n batchCount = Math.min(100, batchCount); // Limit to 100 in playground\n List depositDetails = IntStream.range(0, batchCount)\n .mapToObj(i -> DepositDetail.builder()\n .accountId(\"acc-id-\" + i)\n .amount(BigDecimal.valueOf(i * 1500L)) // Create random amounts\n .build())\n .toList();\n log.info(\"Returning {} transactions\", depositDetails.size());\n return depositDetails;\n }\n`}","https://github.com/conductor-sdk/orkes-java-springboot2-example/blob/main/src/main/java/io/orkes/example/banking/workers/ConductorWorkers.java---2-lines":"#L40-L56","https://github.com/conductor-sdk/orkes-java-springboot2-example/blob/main/src/main/java/io/orkes/example/banking/service/WorkflowService.java---1":"{`\n StartWorkflowRequest request = new StartWorkflowRequest();\n request.setName(\"deposit_payment\");\n Map inputData = new HashMap<>();\n inputData.put(\"amount\", depositDetail.getAmount());\n inputData.put(\"accountId\", depositDetail.getAccountId());\n request.setInput(inputData);\n\n String workflowId = workflowClient.startWorkflow(request);\n log.info(\"Workflow id: {}\", workflowId);\n`}","https://github.com/conductor-sdk/orkes-java-springboot2-example/blob/main/src/main/java/io/orkes/example/banking/service/WorkflowService.java---1-lines":"#L22-L32","https://github.com/conductor-sdk/csharp-sdk-examples/blob/main/Examples/Service/WorkflowService.cs---1":"{` var request = new StartWorkflowRequest\n {\n Name = WORKFLOW_NAME,\n Version = WORKFLOW_VERSION,\n Input = Examples.Util.TypeUtil.GetDictionaryFromObject(depositDetail)\n };\n var workflowId = _workflowClient.StartWorkflow(request);\n Console.WriteLine($\"Started deposit workflow id: {workflowId}\");\n return workflowId;`}","https://github.com/conductor-sdk/csharp-sdk-examples/blob/main/Examples/Service/WorkflowService.cs---1-lines":"#L23-L31","https://github.com/conductor-sdk/orkes-java-springboot2-example/blob/main/src/main/java/io/orkes/example/banking/controller/BankingApiController.java---1":"{` @PostMapping(value = \"/triggerDepositFlow\", produces = \"application/json\")\n public ResponseEntity> triggerDepositFlow(@RequestBody DepositDetail depositDetail) {\n log.info(\"Starting deposit flow for: {}\", depositDetail);\n return ResponseEntity.ok(workflowService.startDepositWorkflow(depositDetail));\n }\n`}","https://github.com/conductor-sdk/orkes-java-springboot2-example/blob/main/src/main/java/io/orkes/example/banking/controller/BankingApiController.java---1-lines":"#L32-L37","https://github.com/conductor-sdk/orkes-java-springboot2-example/blob/main/src/main/java/io/orkes/example/banking/workers/PollUntilConditionMeetsWorker.java---1":"{` @Override\n public TaskResult execute(Task task) {\n TaskResult taskResult = new TaskResult(task);\n if (!task.getInputData().containsKey(POLL_COUNTER)) {\n taskResult.addOutputData(\"message\", \"pollCounter param not found in input, will use default of \" + defaultPollCount + \" polls\");\n }\n\n int pollCounter = Math.min(10, castToInt(task.getInputData().getOrDefault(POLL_COUNTER, defaultPollCount)));\n int pollIntervalSeconds = Math.min(10, castToInt(task.getInputData().getOrDefault(POLL_INTERVAL_SECONDS, 5)));\n\n // Add these to the output for context\n taskResult.addOutputData(POLL_INTERVAL_SECONDS, pollIntervalSeconds + \" (this test task has a max limit of 10 seconds)\");\n taskResult.addOutputData(POLL_COUNTER, pollCounter + \" (this test task has a max limit of 10 iterations)\");\n\n // We can read current iteration from the task output as the data will be retained on the worker when polled\n int currentIteration = castToInt(taskResult.getOutputData().getOrDefault(CURRENT_ITERATION, 0));\n\n // Increment the current iteration and set to the task output\n taskResult.addOutputData(CURRENT_ITERATION, ++currentIteration);\n taskResult.addOutputData(\"updatedTime\", new Date().toString());\n\n // While condition is not met, keep task in progress\n if (currentIteration < pollCounter) {\n taskResult.setStatus(TaskResult.Status.IN_PROGRESS);\n // Set to configured seconds to callback, and you can set this to any value as per the requirements\n taskResult.setCallbackAfterSeconds(pollIntervalSeconds);\n return taskResult;\n }\n\n // Set task as completed now that the poll count condition is met\n taskResult.setStatus(TaskResult.Status.COMPLETED);\n return taskResult;\n }`}","https://github.com/conductor-sdk/orkes-java-springboot2-example/blob/main/src/main/java/io/orkes/example/banking/workers/PollUntilConditionMeetsWorker.java---1-lines":"#L24-L56"} \ No newline at end of file +{"https://github.com/orkes-io/workflow-cicd/blob/main/src/deploy_workflows.sh---1":"{`export response=\\`curl -s -X POST $CONDUCTOR_SERVER_URL/token -H 'Content-Type:application/json' -d '{\n\t\"keyId\": \"'\"$CONDUCTOR_AUTH_KEY\"'\",\n\t\"keySecret\": \"'\"$CONDUCTOR_AUTH_SECRET\"'\"\n}'\\`\n\nif [[ \"$response\" != *'token'* ]]; then\n echo \"Unable to generate the auth header. Please check KEY, SECRET and CONDUCTOR_SERVER_URL variables\"\n echo \"Server response:\"\n echo $response\n exit 1\nfi\n\nexport token=\\`echo $response | cut -d '\"' -f4\\`\n\nfor FILE in main/resources/workflows/*;\n do\n echo \"Deploying @$FILE\";\n\n curl -X POST $CONDUCTOR_SERVER_URL/metadata/workflow?overwrite=true \\\n -H \"X-Authorization: $token\" \\\n -H \"accept: */*\" \\\n -H \"Content-Type: application/json\" \\\n -d @$FILE\n done\n`}","https://github.com/orkes-io/workflow-cicd/blob/main/src/deploy_workflows.sh---1-lines":"#L8-L32","https://github.com/conductor-sdk/orkes-java-springboot2-example/blob/main/src/main/java/io/orkes/example/banking/workers/ConductorWorkers.java---1":"{`\n /**\n * Note: Using this setting, up to 5 tasks will run in parallel, with tasks being polled every 200ms\n */\n @WorkerTask(value = \"fraud-check\", threadCount = 5, pollingInterval = 200)\n public FraudCheckResult checkForFraudTask(DepositDetail depositDetail) {\n return fraudCheckService.checkForFraud(depositDetail);\n }\n`}","https://github.com/conductor-sdk/orkes-java-springboot2-example/blob/main/src/main/java/io/orkes/example/banking/workers/ConductorWorkers.java---1-lines":"#L27-L35","https://github.com/conductor-sdk/csharp-sdk-examples/blob/main/Examples/Worker/Workers.cs---1":"{` // Note: Using this setting, up to 5 tasks will run in parallel, with tasks being polled every 200ms\n [WorkerTask(taskType: \"fraud-check\", batchSize: 5, domain: null, pollIntervalMs: 200, workerId: \"workerId\")]\n public TaskResult FraudWorker(Task task)\n {\n var depositDetail = (DepositDetail)task.InputData[\"depositDetail\"];\n var fraudCheckResult = _fraudCheckService.CheckForFraud(depositDetail);\n var result = task.Completed();\n result.OutputData = Examples.Util.TypeUtil.GetDictionaryFromObject(fraudCheckResult);\n return result;\n }`}","https://github.com/conductor-sdk/csharp-sdk-examples/blob/main/Examples/Worker/Workers.cs---1-lines":"#L25-L34","https://github.com/conductor-sdk/javascript-sdk-examples/blob/main/src/banking/workers/workers.js---1":"{`const fraudCheckWorker = {\n taskDefName: \"fraud-check\",\n execute: async ({ inputData }) => {\n const { amount, accountId } = inputData;\n const fraudResult = fraudService.isFraudulentTxn(accountId, amount);\n return {\n outputData: fraudResult,\n status: \"COMPLETED\",\n };\n },\n domain: \"fraud\", // Optional\n pollInterval: 100, // Optional can be specified in the TaskManager\n concurrency: 1, // Optional can be specified in the TaskManager\n};`}","https://github.com/conductor-sdk/javascript-sdk-examples/blob/main/src/banking/workers/workers.js---1-lines":"#L4-L17","https://github.com/conductor-sdk/typescript-examples/blob/main/src/banking/workers/workers.ts---1":"{`export const fraudCheckWorker: ConductorWorker = {\n taskDefName: \"fraud-check\",\n execute: async ({ inputData }) => {\n const amount = inputData?.amount;\n const accountId = inputData?.accountId;\n const fraudResult = fraudService.isFraudulentTxn(accountId, amount);\n return {\n outputData: fraudResult,\n status: \"COMPLETED\",\n };\n },\n domain: \"fraud\", // Optional\n pollInterval: 100, // Optional can be specified in the TaskManager\n concurrency: 2, // Optional can be specified in the TaskManager\n};`}","https://github.com/conductor-sdk/typescript-examples/blob/main/src/banking/workers/workers.ts---1-lines":"#L5-L19","https://github.com/conductor-sdk/orkes-java-springboot2-example/blob/main/src/main/java/io/orkes/example/banking/workers/ConductorWorkers.java---2":"{`\n @WorkerTask(value = \"retrieve-deposit-batch\", threadCount = 5, pollingInterval = 200)\n public List retrieveDepositBatch(@InputParam(\"batchCount\") Integer batchCount) {\n if (batchCount == null) {\n batchCount = random.nextInt(5, 11);\n }\n batchCount = Math.min(100, batchCount); // Limit to 100 in playground\n List depositDetails = IntStream.range(0, batchCount)\n .mapToObj(i -> DepositDetail.builder()\n .accountId(\"acc-id-\" + i)\n .amount(BigDecimal.valueOf(i * 1500L)) // Create random amounts\n .build())\n .toList();\n log.info(\"Returning {} transactions\", depositDetails.size());\n return depositDetails;\n }\n`}","https://github.com/conductor-sdk/orkes-java-springboot2-example/blob/main/src/main/java/io/orkes/example/banking/workers/ConductorWorkers.java---2-lines":"#L40-L56","https://github.com/conductor-sdk/orkes-java-springboot2-example/blob/main/src/main/java/io/orkes/example/banking/service/WorkflowService.java---1":"{`\n StartWorkflowRequest request = new StartWorkflowRequest();\n request.setName(\"deposit_payment\");\n Map inputData = new HashMap<>();\n inputData.put(\"amount\", depositDetail.getAmount());\n inputData.put(\"accountId\", depositDetail.getAccountId());\n request.setInput(inputData);\n\n String workflowId = workflowClient.startWorkflow(request);\n log.info(\"Workflow id: {}\", workflowId);\n`}","https://github.com/conductor-sdk/orkes-java-springboot2-example/blob/main/src/main/java/io/orkes/example/banking/service/WorkflowService.java---1-lines":"#L22-L32","https://github.com/conductor-sdk/csharp-sdk-examples/blob/main/Examples/Service/WorkflowService.cs---1":"{` var request = new StartWorkflowRequest\n {\n Name = WORKFLOW_NAME,\n Version = WORKFLOW_VERSION,\n Input = Examples.Util.TypeUtil.GetDictionaryFromObject(depositDetail)\n };\n var workflowId = _workflowClient.StartWorkflow(request);\n Console.WriteLine($\"Started deposit workflow id: {workflowId}\");\n return workflowId;`}","https://github.com/conductor-sdk/csharp-sdk-examples/blob/main/Examples/Service/WorkflowService.cs---1-lines":"#L23-L31","https://github.com/conductor-sdk/orkes-java-springboot2-example/blob/main/src/main/java/io/orkes/example/banking/controller/BankingApiController.java---1":"{` @PostMapping(value = \"/triggerDepositFlow\", produces = \"application/json\")\n public ResponseEntity> triggerDepositFlow(@RequestBody DepositDetail depositDetail) {\n log.info(\"Starting deposit flow for: {}\", depositDetail);\n return ResponseEntity.ok(workflowService.startDepositWorkflow(depositDetail));\n }\n`}","https://github.com/conductor-sdk/orkes-java-springboot2-example/blob/main/src/main/java/io/orkes/example/banking/controller/BankingApiController.java---1-lines":"#L32-L37","https://github.com/conductor-sdk/orkes-java-springboot2-example/blob/main/src/main/java/io/orkes/example/banking/workers/PollUntilConditionMeetsWorker.java---1":"{` @Override\n public TaskResult execute(Task task) {\n TaskResult taskResult = new TaskResult(task);\n if (!task.getInputData().containsKey(POLL_COUNTER)) {\n taskResult.addOutputData(\"message\", \"pollCounter param not found in input, will use default of \" + defaultPollCount + \" polls\");\n }\n\n int pollCounter = Math.min(10, castToInt(task.getInputData().getOrDefault(POLL_COUNTER, defaultPollCount)));\n int pollIntervalSeconds = Math.min(10, castToInt(task.getInputData().getOrDefault(POLL_INTERVAL_SECONDS, 5)));\n\n // Add these to the output for context\n taskResult.addOutputData(POLL_INTERVAL_SECONDS, pollIntervalSeconds + \" (this test task has a max limit of 10 seconds)\");\n taskResult.addOutputData(POLL_COUNTER, pollCounter + \" (this test task has a max limit of 10 iterations)\");\n\n // We can read current iteration from the task output as the data will be retained on the worker when polled\n int currentIteration = castToInt(taskResult.getOutputData().getOrDefault(CURRENT_ITERATION, 0));\n\n // Increment the current iteration and set to the task output\n taskResult.addOutputData(CURRENT_ITERATION, ++currentIteration);\n taskResult.addOutputData(\"updatedTime\", new Date().toString());\n\n // While condition is not met, keep task in progress\n if (currentIteration < pollCounter) {\n taskResult.setStatus(TaskResult.Status.IN_PROGRESS);\n // Set to configured seconds to callback, and you can set this to any value as per the requirements\n taskResult.setCallbackAfterSeconds(pollIntervalSeconds);\n return taskResult;\n }\n\n // Set task as completed now that the poll count condition is met\n taskResult.setStatus(TaskResult.Status.COMPLETED);\n return taskResult;\n }`}","https://github.com/conductor-sdk/orkes-java-springboot2-example/blob/main/src/main/java/io/orkes/example/banking/workers/PollUntilConditionMeetsWorker.java---1-lines":"#L24-L56"} \ No newline at end of file diff --git a/docs/reference-docs/operators/dynamic-fork.md b/docs/reference-docs/operators/dynamic-fork.md index 5e1c0d94..c85d4f88 100644 --- a/docs/reference-docs/operators/dynamic-fork.md +++ b/docs/reference-docs/operators/dynamic-fork.md @@ -7,7 +7,7 @@ import TabItem from '@theme/TabItem'; # Dynamic Fork -The Dynamic Fork task is used to run tasks in parallel, with the forking behavior determined at run-time. This contrasts with the Fork/Join task, where the forking behavior is defined at workflow creation. Like the Fork/Join task, the Dynamic Fork task is followed by a join operation that waits on the forked tasks to finish before moving to the next task. This Join task collects the outputs from each forked task. +The Dynamic Fork task is used to run tasks in parallel, with the forking behavior (such as the number of forks) determined at run-time. This contrasts with the Fork/Join task, where the forking behavior is defined at workflow creation. Like the Fork/Join task, the Dynamic Fork task is followed by a join operation that waits on the forked tasks to finish before moving to the next task. This Join task collects the outputs from each forked task. Unlike the Fork/Join task, a Dynamic Fork task can only run one task per fork. A sub-workflow can be utilized if there is a need for multiple tasks per fork. @@ -20,9 +20,12 @@ There are two ways to run the Dynamic Fork task: Configure these parameters for the Dynamic Fork task. The input payload for the forked tasks should correspond with its expected input. For example, if the forked tasks are HTTP tasks, its input should include the method and URI. -### Configuration for running different tasks +**For the Fork task:** -The dynamic fork executes each task in the array specified by `dynamicForkTasksParam` with its corresponding input specified by `dynamicForkTasksInputParamName`. + + + +The dynamic fork executes each task in the array specified by `dynamicForkTasksParam` with its corresponding input specified by `dynamicForkTasksInputParamName`. The number of forks created at runtime depends on the array of tasks specified. | Parameter | Description | Required/ Optional | | ------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------- | @@ -31,20 +34,22 @@ The dynamic fork executes each task in the array specified by `dynamicForkTasksP | dynamicForkTasksInputParamName | The input parameter key whose value will be used to pass the required input parameters for each forked task. For example, "dynamicTasksInput", which will then be specified as an input parameter in the Dynamic Fork task. | Required. | | inputParameters. **dynamicTasksInput** | A map where the keys are the task reference names for each fork and the values are the input parameters that will be passed into its matching task. It can be [passed as a variable](https://orkes.io/content/developer-guides/passing-inputs-to-task-in-conductor). | Required. | + -### Configuration for running the same task — any task type + -The dynamic fork executes the task specified by `forkTaskName` for each element of `forkTaskInputs`. Configure these parameters to execute any task type except Sub Workflow tasks. +The dynamic fork executes the task specified by `forkTaskName` for each element of `forkTaskInputs`. The number of forks created at runtime depends on the number of `forkTaskInputs` sent. Configure these parameters to execute any task type except Sub Workflow tasks. | Parameter | Description | Required/ Optional | | ------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------- | | inputParamters. **forkTaskName** | The name of the task that will be executed in each fork. It can be [passed as a variable](https://orkes.io/content/developer-guides/passing-inputs-to-task-in-conductor). | Required. | | inputParameters. **forkTaskInputs** | An array of JSON inputs for each forked branch. The number of array elements determines the number of branches in the dynamic fork. It can be [passed as a variable](https://orkes.io/content/developer-guides/passing-inputs-to-task-in-conductor). | Required. | + -### Configuration for running the same task — Sub Workflow task + -The dynamic fork executes the workflow specified by `forkTaskWorkflow` and `forkTaskWorkflowVersion` for each element of `forkTaskInputs`. Configure these parameters to execute a Sub Workflow task. +The dynamic fork executes the workflow specified by `forkTaskWorkflow` and `forkTaskWorkflowVersion` for each element of `forkTaskInputs`. The number of forks created at runtime depends on the number of `forkTaskInputs` sent. Configure these parameters to execute a Sub Workflow task. | Parameter | Description | Required/ Optional | | ------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------- | @@ -52,11 +57,12 @@ The dynamic fork executes the workflow specified by `forkTaskWorkflow` and `fork | inputParamters. **forkTaskWorkflowVersion** | The version of the workflow to be executed. If unspecified, the latest version will be used. | Required. | | inputParameters. **forkTaskInputs** | An array of JSON inputs for each forked branch. The number of array elements determines the number of branches in the dynamic fork. It can be [passed as a variable](https://orkes.io/content/developer-guides/passing-inputs-to-task-in-conductor). | Required. | + + +The [Join](./join) task will run after the forked tasks. Configure the Join task as well to complete the fork-join operations. -### Configuration for Join task in dynamic forks - -The Join task will run after the dynamically forked tasks are completed. Configure the Join task as well to complete the fork-join operations. +**For the Join task:** | Parameter | Description | Required/ Optional | | ------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------- | @@ -73,6 +79,7 @@ This is the JSON schema for a Dynamic Fork task definition. ```json +// JSON schema for the Dynamic Fork task { "name": "fork_join_dynamic", "taskReferenceName": "fork_join_dynamic_ref", @@ -102,12 +109,23 @@ This is the JSON schema for a Dynamic Fork task definition. "dynamicForkTasksParam": "dynamicTasks", // input parameter key that will hold the task names to execute "dynamicForkTasksInputParamName": "dynamicTasksInput" // input parameter key that will hold the input parameters for each task } + + +// JSON schema for the Join task +{ + "name": "join", + "taskReferenceName": "join_ref", + "inputParameters": {}, + "type": "JOIN", + "joinOn": [] +} ``` ```json +// JSON schema for the Dynamic Fork task { "name": "fork_join_dynamic", "taskReferenceName": "fork_join_dynamic_ref", @@ -118,12 +136,21 @@ This is the JSON schema for a Dynamic Fork task definition. "type": "FORK_JOIN_DYNAMIC" } +// JSON schema for the Join task +{ + "name": "join", + "taskReferenceName": "join_ref", + "inputParameters": {}, + "type": "JOIN", + "joinOn": [] +} ``` ```json +// JSON schema for the Dynamic Fork task { "name": "fork_join_dynamic", "taskReferenceName": "fork_join_dynamic_ref", @@ -134,6 +161,15 @@ This is the JSON schema for a Dynamic Fork task definition. }, "type": "FORK_JOIN_DYNAMIC" } + +// JSON schema for the Join task +{ + "name": "join", + "taskReferenceName": "join_ref", + "inputParameters": {}, + "type": "JOIN", + "joinOn": [] +} ``` @@ -173,190 +209,143 @@ forkTaskName and forkTaskInputs will take precedence even if dynamicForkTasksPar Here are some examples for using the Dynamic Fork task. -
Using dynamicForkTaskParam and dynamicForkTasksInputParamName to run the same task - -```json -{ - "name": "dynamic", - "taskReferenceName": "dynamic_ref", - "inputParameters": { - "dynamicTasks": [ - { - "name":"image_convert_resize", - "taskReferenceName": "image_convert_resize_png_300x300_0" - // ... task definition - }, - { - "name":"image_convert_resize", - "taskReferenceName": "image_convert_resize_png_200x200_1" - // ... task definition - } - ], - "dynamicTasksInput": { - "image_convert_resize_png_300x300_0" : { - "outputWidth": 300, - "outputHeight": 300 - }, - "image_convert_resize_png_200x200_1" : { - "outputWidth": 200, - "outputHeight": 200 - } - } - }, - "type": "FORK_JOIN_DYNAMIC", - "dynamicForkTasksParam": "dynamicTasks", - "dynamicForkTasksInputParamName": "dynamicTasksInput" -} -``` -
+
Running different tasks -
Using dynamicForkTaskParam and dynamicForkTasksInputParamName to run sub-workflows +To run a different task per fork in a dynamic fork, you must use `dynamicForkTasksParam` and `dynamicForkTasksInputParamName`. Here is an example workflow of a Dynamic Fork task running different tasks. ```json +// workflow definition + { - "name": "dynamic", - "taskReferenceName": "dynamic_ref", - "inputParameters": { - "dynamicTasks": [ - { - "subWorkflowParam" : { - "name": "image_convert_resize_subworkflow", - "version": "1" + "name": "DynamicForkExample", + "description": "This workflow runs different tasks in a dynamic fork.", + "version": 1, + "tasks": [ + { + "name": "fork_join_dynamic", + "taskReferenceName": "fork_join_dynamic_ref", + "inputParameters": { + "dynamicTasks": [ + { + "name": "inline", + "taskReferenceName": "task1", + "type": "INLINE", + "inputParameters": { + "expression": "(function () {\n return $.input;\n})();", + "evaluatorType": "graaljs" + } }, - "type" : "SUB_WORKFLOW", - "taskReferenceName": "image_convert_resize_subworkflow_png_300x300_0", - // ... task definition - }, - { - "subWorkflowParam" : { - "name": :"image_convert_resize_subworkflow", - "version": "1" + { + "name": "http", + "taskReferenceName": "task2", + "type": "HTTP", + "inputParameters": { + "method": "GET", + "connectionTimeOut": 3000, + "readTimeOut": "3000", + "accept": "application/json", + "contentType": "application/json", + "encode": true + } }, - "type" : "SUB_WORKFLOW", - "taskReferenceName": "image_convert_resize_subworkflow_png_200x200_1", - // ... task definition - } - ], - "dynamicTasksInput": { - "image_convert_resize_png_300x300_0" : { - "outputWidth": 300, - "outputHeight": 300 - }, - "image_convert_resize_png_200x200_1" : { - "outputWidth": 200, - "outputHeight": 200 - } - } - }, - "type": "FORK_JOIN_DYNAMIC", - "dynamicForkTasksParam": "dynamicTasks", - "dynamicForkTasksInputParamName": "dynamicTasksInput" -} -``` -
- -
Using dynamicForkTaskParam and dynamicForkTasksInputParamName to run different tasks - -```json -{ - "name": "fork_join_dynamic", - "taskReferenceName": "fork_join_dynamic_ref", - "inputParameters": { - "dynamicTasks": [ - { - "name": "inline", - "taskReferenceName": "task1", - "type": "INLINE", - "inputParameters": { - "expression": "(function () {\n return $.input;\n})();", - "evaluatorType": "graaljs" - } - }, - { - "name": "http", - "taskReferenceName": "task2", - "type": "HTTP", - "inputParameters": { - "method": "GET", - "connectionTimeOut": 3000, - "readTimeOut": "3000", - "accept": "application/json", - "contentType": "application/json", - "encode": true + { + "name": "x_test_worker_0", + "taskReferenceName": "simple_ref", + "type": "SIMPLE" + } + ], + "dynamicTasksInput": { + "task1": { + "input": "one" + }, + "task2": { + "uri": "https://orkes-api-tester.orkesconductor.com/api" + }, + "task3": { + "input": { + "someKey": "someValue" + } + } } }, - { - "name": "x_test_worker_0", - "taskReferenceName": "simple_ref", - "type": "SIMPLE" - } - ], - "dynamicTasksInput": { - "task1": { - "input": "one" - }, - "task2": { - "uri": "https://orkes-api-tester.orkesconductor.com/api" - }, - "task3": { - "input": { - "someKey": "someValue" - } - } + "type": "FORK_JOIN_DYNAMIC", + "dynamicForkTasksParam": "dynamicTasks", + "dynamicForkTasksInputParamName": "dynamicTasksInput" + }, + { + "name": "join", + "taskReferenceName": "join_ref", + "inputParameters": {}, + "type": "JOIN", + "joinOn": [] } - }, - "type": "FORK_JOIN_DYNAMIC", - "dynamicForkTasksParam": "dynamicTasks", - "dynamicForkTasksInputParamName": "dynamicTasksInput" + ], + "inputParameters": [], + "outputParameters": {}, + "schemaVersion": 2 } ```
Running the same task — Simple task -In this example, the Dynamic Fork task runs `update_fruit_list_task` in parallel, based on the number of new fruits retrieved in the previous task. + +In this example workflow, the Dynamic Fork task runs a worker task called `update_fruit_list_task` in parallel. The task input takes from the workflow input, which contains the number of new fruits. ```json // workflow definition { "name": "dynamic_workflow_array_simple", - "description": "Dynamic workflow array - run simple task", + "description": "Update fruit list", + "version": 1, "tasks": [ { - "name": "dynamic_workflow_array_simple", - "taskReferenceName": "dynamic_workflow_array_simple_ref", + "name": "fork_join_dynamic", + "taskReferenceName": "fork_join_dynamic_ref", "inputParameters": { "forkTaskName": "update_fruit_list_task", - "forkTaskInputs": ${previousTask_ref.output} + "forkTaskInputs": "${workflow.input.fruits}" }, "type": "FORK_JOIN_DYNAMIC" }, { - "name": "dynamic_workflow_array_simple_join", - "taskReferenceName": "dynamic_workflow_array_simple_join_ref", - "type": "JOIN" + "name": "join", + "taskReferenceName": "join_ref", + "inputParameters": {}, + "type": "JOIN", + "joinOn": [] } - ] + ], + "inputParameters": [ + "fruits" + ], + "outputParameters": {}, + "schemaVersion": 2 } ``` -Here, `forkTaskInputs` is a variable array input that determines the number of forks. At run-time, the input payload may contain three JSON objects, which means there will be three forks: +Here, `forkTaskInputs` is a variable array input that determines the number of forks. At run-time, if the input payload for “fruits” contains three JSON objects, there will be three forks created: + ```json -[ - { - "fruit" : "apple", - "inventoryNo" : 1 - }, - { - "fruit" : "orange", - "inventoryNo" : 2 - }, - { - "fruit" : "kiwi", - "inventoryNo" : 3 - } -] +// workflow input payload + +{ + "fruits": [ + { + "inventoryNo": 5, + "fruit": "apple" + }, + { + "inventoryNo": 20, + "fruit": "orange" + }, + { + "inventoryNo": 3, + "fruit": "kiwi" + } + ] +} ``` During execution, Conductor will insert an additional parameter called `index` into each JSON object, which is used to represent its array index. @@ -371,19 +360,22 @@ During execution, Conductor will insert an additional parameter called `index` i } ``` -If simple values are used in `forkTaskInputs`, such as ["apple", "orange", "kiwi" ], Conductor will set each array element in a parameter called `input`, like so: +

UI screenshot of one input instance for the Dynamic Fork task at run-time

+ +If simple values are used in `forkTaskInputs`, such as `"fruits" = ["apple", "orange", "kiwi"]`, Conductor will set each array element in a parameter called `input`, like so: ```json // one input instance for the Dynamic Fork task at run-time { "input" : "apple", // input value - "__index" : 0 + "__index" : 0 // index of the element in the source array } ```
Running the same task — HTTP task + In this example, the dynamic fork runs HTTP tasks in parallel. The provided input in `forkTaskInputs` contains the typical payload expected in a HTTP task. ```json diff --git a/docs/reference-docs/operators/fork-join.md b/docs/reference-docs/operators/fork-join.md index 42658680..2117d92e 100644 --- a/docs/reference-docs/operators/fork-join.md +++ b/docs/reference-docs/operators/fork-join.md @@ -33,7 +33,6 @@ This is the JSON schema for a Fork/Join task definition. ```json // JSON schema for the Fork task - { "name": "fork", "taskReferenceName": "fork_ref", diff --git a/static/img/Task-References/dynamic_fork_task-fruit_example_input_UI.png b/static/img/Task-References/dynamic_fork_task-fruit_example_input_UI.png new file mode 100644 index 00000000..ee7a320e Binary files /dev/null and b/static/img/Task-References/dynamic_fork_task-fruit_example_input_UI.png differ