From 0f3892ddca7abedbef5c52d6dc79d57a6b1a9ef7 Mon Sep 17 00:00:00 2001 From: saksham2105 Date: Thu, 4 Jul 2024 21:18:58 +0530 Subject: [PATCH 1/6] Examples for fork join and subworkflows --- build.gradle | 2 +- .../samples/OrkesWorkersApplication.java | 34 +++- .../io/orkes/samples/workers/HelloWorld.java | 187 +++++++++++++++++- .../samples/workers/IntegrationsWorker.java | 34 ++++ src/main/resources/application.properties | 7 +- 5 files changed, 257 insertions(+), 7 deletions(-) create mode 100644 src/main/java/io/orkes/samples/workers/IntegrationsWorker.java diff --git a/build.gradle b/build.gradle index 26c9080..888ee49 100644 --- a/build.gradle +++ b/build.gradle @@ -7,7 +7,7 @@ plugins { ext { appVersion = '1.0-SNAPSHOT' versions = [ - orkesClient: '1.0.2', + orkesClient: '2.1.2', guava : '31.0.1-jre', im4java : '1.4.0', log4j : '2.17.1!!', diff --git a/src/main/java/io/orkes/samples/OrkesWorkersApplication.java b/src/main/java/io/orkes/samples/OrkesWorkersApplication.java index 7815cb1..4863806 100644 --- a/src/main/java/io/orkes/samples/OrkesWorkersApplication.java +++ b/src/main/java/io/orkes/samples/OrkesWorkersApplication.java @@ -1,11 +1,13 @@ package io.orkes.samples; import com.netflix.conductor.client.worker.Worker; +import com.netflix.conductor.sdk.workflow.executor.WorkflowExecutor; import io.orkes.conductor.client.ApiClient; import io.orkes.conductor.client.OrkesClients; import io.orkes.conductor.client.TaskClient; import io.orkes.conductor.client.WorkflowClient; import io.orkes.conductor.client.automator.TaskRunnerConfigurer; +import io.orkes.conductor.client.http.OrkesIntegrationClient; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.boot.SpringApplication; @@ -64,6 +66,31 @@ public OrkesClients orkesClients() { return orkesClients; } + @Bean + public OrkesIntegrationClient orkesIntegrationClient() { + String rootUri = env.getProperty(CONDUCTOR_SERVER_URL); + String key = env.getProperty(CONDUCTOR_CLIENT_KEY_ID); + String secret = env.getProperty(CONDUCTOR_CLIENT_SECRET); + + if ("_CHANGE_ME_".equals(key) || "_CHANGE_ME_".equals(secret)) { + log.error("Please provide an application key id and secret"); + throw new RuntimeException("No Application Key"); + } + + ApiClient apiClient = null; + + log.info("Conductor Server URL: {}", rootUri); + if(StringUtils.isNotBlank(key) && StringUtils.isNotBlank(secret)) { + log.info("Using Key and Secret to connect to the server"); + apiClient = new ApiClient(rootUri, key, secret); + } else { + log.info("setCredentialsIfPresent: Proceeding without client authentication"); + apiClient = new ApiClient(rootUri); + } + OrkesIntegrationClient orkesIntegrationClient = new OrkesIntegrationClient(apiClient); + return orkesIntegrationClient; + } + @Bean public TaskClient taskClient(OrkesClients orkesClients) { TaskClient taskClient = orkesClients.getTaskClient(); @@ -81,7 +108,7 @@ public TaskRunnerConfigurer taskRunnerConfigurer(List workersList, TaskC log.info("Starting workers : {}", workersList); TaskRunnerConfigurer runnerConfigurer = new TaskRunnerConfigurer .Builder(taskClient, workersList) - .withThreadCount(Math.max(1, workersList.size())) + .withThreadCount(Math.max(1, workersList.size() * 3)) .build(); runnerConfigurer.init(); return runnerConfigurer; @@ -118,4 +145,9 @@ private static void loadExternalConfig() throws IOException { }); } + @Bean + public WorkflowExecutor workflowExecutor(OrkesClients orkesClients) { + return new WorkflowExecutor(taskClient(orkesClients), workflowClient(orkesClients), orkesClients.getMetadataClient(), 100); + } + } \ No newline at end of file diff --git a/src/main/java/io/orkes/samples/workers/HelloWorld.java b/src/main/java/io/orkes/samples/workers/HelloWorld.java index 0fb1387..a299181 100644 --- a/src/main/java/io/orkes/samples/workers/HelloWorld.java +++ b/src/main/java/io/orkes/samples/workers/HelloWorld.java @@ -1,21 +1,204 @@ package io.orkes.samples.workers; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.netflix.conductor.client.worker.Worker; import com.netflix.conductor.common.metadata.tasks.Task; import com.netflix.conductor.common.metadata.tasks.TaskResult; +import com.netflix.conductor.common.metadata.workflow.StartWorkflowRequest; +import com.netflix.conductor.common.metadata.workflow.WorkflowDef; +import com.netflix.conductor.sdk.workflow.def.ConductorWorkflow; +import com.netflix.conductor.sdk.workflow.def.tasks.*; +import com.netflix.conductor.sdk.workflow.executor.WorkflowExecutor; +import io.orkes.conductor.client.WorkflowClient; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + @Component +@Slf4j +@AllArgsConstructor public class HelloWorld implements Worker { + + private final WorkflowClient workflowClient; + private final ObjectMapper objectMapper = new ObjectMapper(); + private final WorkflowExecutor executor; + @Override public String getTaskDefName() { - return "hello_world"; + return "quest_start_subworkflow"; } @Override public TaskResult execute(Task task) { + + System.out.println("Starting quest_start_subworkflow"); TaskResult result = new TaskResult(task); - result.addOutputData("hw_response", "Hello World!"); + try { + result.setOutputData(startQuestWorkflow()); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } result.setStatus(TaskResult.Status.COMPLETED); return result; } + + public Map startQuestWorkflow() throws JsonProcessingException { + StartWorkflowRequest request = new StartWorkflowRequest(); + request.setName("dynamic_workflow"); + Map inputData = new HashMap<>(); + //inputData.put("enrichmentSubworkflowsDef", subworkflowDef()); + Object dynamicSubworkflowDef = objectMapper.convertValue(createDynamicSubworkflow(), Object.class); + inputData.put("dynamicSubworkflowDef", dynamicSubworkflowDef); + request.setInput(inputData); + + String workflowId = workflowClient.startWorkflow(request); + log.info("Workflow id: {}", workflowId); + return Map.of("workflowId", workflowId); + } + + + private WorkflowDef createDynamicSubworkflow() { + var workflow = new ConductorWorkflow<>(executor); + workflow.setName("All_quest_subworkflows_series"); + workflow.setVersion(1); + workflow.setOwnerEmail("saksham.solanki@orkes.io"); + workflow.setDescription("test"); + workflow.setVariables(Map.of()); + workflow.setDefaultInput(Map.of()); + workflow.setTimeoutPolicy(WorkflowDef.TimeoutPolicy.ALERT_ONLY); + + // ---- Fork task def started + com.netflix.conductor.sdk.workflow.def.tasks.Task[][] forkedTasks = new com.netflix.conductor.sdk.workflow.def.tasks.Task[4][1]; + + //Below code is subworkflows in forked task + //ForkTask is having following structure [{}}, {}}] + //Enrichment level started + ConductorWorkflow conductorWorkflow = new ConductorWorkflow(executor); + conductorWorkflow.setName("imdb_enrichment_workflow"); + Http httptask = new Http("imdb_enrichment_workflow_task"); + httptask.url("https://orkes-api-tester.orkesconductor.com/api"); + conductorWorkflow.add(httptask); + + SubWorkflow forkSubworkflow = new SubWorkflow("imdb_enrichment_subworkflow", conductorWorkflow); + forkSubworkflow.input("name","{workflow.input.name}"); + forkedTasks[0][0] = forkSubworkflow; + + conductorWorkflow = new ConductorWorkflow(executor); + conductorWorkflow.setName("empi_enrichment_workflow"); + httptask = new Http("empi_enrichment_workflow_task"); + httptask.url("https://orkes-api-tester.orkesconductor.com/api"); + conductorWorkflow.add(httptask); + + forkSubworkflow = new SubWorkflow("empi_enrichment_subworkflow", conductorWorkflow); + forkedTasks[1][0] = forkSubworkflow; + + + conductorWorkflow = new ConductorWorkflow(executor); + conductorWorkflow.setName("mlcp_enrichment_workflow"); + httptask = new Http("mlcp_enrichment_workflow_task"); + httptask.url("https://orkes-api-tester.orkesconductor.com/api"); + conductorWorkflow.add(httptask); + + forkSubworkflow = new SubWorkflow("mlcp_enrichment_workflow", conductorWorkflow); + forkedTasks[2][0] = forkSubworkflow; + + + conductorWorkflow = new ConductorWorkflow(executor); + conductorWorkflow.setName("ohc_enrichment_workflow"); + httptask = new Http("ohc_enrichment_workflow_task"); + httptask.url("https://orkes-api-tester.orkesconductor.com/api"); + conductorWorkflow.add(httptask); + + forkSubworkflow = new SubWorkflow("ohc_enrichment_subworkflow", conductorWorkflow); + forkedTasks[3][0] = forkSubworkflow; + + ForkJoin forkJoin = new ForkJoin("fork_enrichment", forkedTasks); + workflow.add(forkJoin); + // Enrichment level ended + + + // Translation Level Starts + com.netflix.conductor.sdk.workflow.def.tasks.Task[][] forkedTasks1 = new com.netflix.conductor.sdk.workflow.def.tasks.Task[2][1]; + conductorWorkflow = new ConductorWorkflow(executor); + conductorWorkflow.setName("Labos_Translation_workflow"); + httptask = new Http("IMDB_EMPI_Translations"); + httptask.url("https://orkes-api-tester.orkesconductor.com/api"); + conductorWorkflow.add(httptask); + + httptask = new Http("LabOS_Translation"); + httptask.url("https://orkes-api-tester.orkesconductor.com/api"); + conductorWorkflow.add(httptask); + + + forkSubworkflow = new SubWorkflow("LabOS_Translation_subworkflow", conductorWorkflow); + forkedTasks1[0][0] = forkSubworkflow; + + conductorWorkflow = new ConductorWorkflow(executor); + conductorWorkflow.setName("BFE_Translation_workflow"); + httptask = new Http("IMDB_Enrichment"); + httptask.url("https://orkes-api-tester.orkesconductor.com/api"); + conductorWorkflow.add(httptask); + + httptask = new Http("BFE_Translation"); + httptask.url("https://orkes-api-tester.orkesconductor.com/api"); + conductorWorkflow.add(httptask); + + forkSubworkflow = new SubWorkflow("BFE_Translation_subworkflow", conductorWorkflow); + forkedTasks1[1][0] = forkSubworkflow; + + forkJoin = new ForkJoin("fork_translation", forkedTasks1); + workflow.add(forkJoin); + //Translation Level Ended + + //Distribution level starts + com.netflix.conductor.sdk.workflow.def.tasks.Task[][] forkedTasks2 = new com.netflix.conductor.sdk.workflow.def.tasks.Task[3][1]; + conductorWorkflow = new ConductorWorkflow(executor); + conductorWorkflow.setName("BFE_Distributions"); + httptask = new Http("bfe_distributions_subworkflow_task"); + httptask.url("https://orkes-api-tester.orkesconductor.com/api"); + conductorWorkflow.add(httptask); + + forkSubworkflow = new SubWorkflow("BFE_Distributions_subworkflow", conductorWorkflow); + forkSubworkflow.input("name","{workflow.input.name}"); + forkedTasks2[0][0] = forkSubworkflow; + + conductorWorkflow = new ConductorWorkflow(executor); + conductorWorkflow.setName("ELabs_Distributions"); + httptask = new Http("ELabs_Distributions_subworkflow_task"); + httptask.url("https://orkes-api-tester.orkesconductor.com/api"); + conductorWorkflow.add(httptask); + + forkSubworkflow = new SubWorkflow("ELabs_Distributions_subworkflow", conductorWorkflow); + forkedTasks2[1][0] = forkSubworkflow; + + + conductorWorkflow = new ConductorWorkflow(executor); + conductorWorkflow.setName("LabOs_Distributions"); + httptask = new Http("LabOs_Distributions_subworkflow_task"); + httptask.url("https://orkes-api-tester.orkesconductor.com/api"); + conductorWorkflow.add(httptask); + + forkSubworkflow = new SubWorkflow("LabOs_Distributions_subworkflow", conductorWorkflow); + forkedTasks2[2][0] = forkSubworkflow; + + forkJoin = new ForkJoin("fork_distribution", forkedTasks2); + workflow.add(forkJoin); + + // Distribution level Ended + + WorkflowDef workflowDef = workflow.toWorkflowDef(); + workflowDef.setOutputParameters(Map.of()); + workflowDef.setTimeoutSeconds(0); + workflowDef.setInputTemplate(Map.of()); + workflowDef.setSchemaVersion(2); + workflowDef.setInputParameters(List.of()); + + return workflowDef; + } + } diff --git a/src/main/java/io/orkes/samples/workers/IntegrationsWorker.java b/src/main/java/io/orkes/samples/workers/IntegrationsWorker.java new file mode 100644 index 0000000..a40d9ce --- /dev/null +++ b/src/main/java/io/orkes/samples/workers/IntegrationsWorker.java @@ -0,0 +1,34 @@ +package io.orkes.samples.workers; + +import com.netflix.conductor.client.worker.Worker; +import com.netflix.conductor.common.metadata.tasks.Task; +import com.netflix.conductor.common.metadata.tasks.TaskResult; +import io.orkes.conductor.client.http.OrkesIntegrationClient; +import io.orkes.conductor.client.model.integration.Integration; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +public class IntegrationsWorker implements Worker { + + @Autowired + private OrkesIntegrationClient orkesIntegrationClient; + + @Override + public String getTaskDefName() { + return "integrations_worker"; + } + + @Override + public TaskResult execute(Task task) { + + TaskResult result = new TaskResult(task); + log.info("Worker integrations_worker is being started"); + result.setStatus(TaskResult.Status.COMPLETED); + Integration integration = orkesIntegrationClient.getIntegration("anthropic_saas"); + System.out.println("Integration is : " + integration); + return result; + } +} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index c171895..cece382 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1,4 +1,5 @@ -conductor.server.url=https://play.orkes.io/api +conductor.server.url=http://localhost:8080/api -conductor.security.client.key-id=_CHANGE_ME_ -conductor.security.client.secret=_CHANGE_ME_ \ No newline at end of file +conductor.security.client.key-id=96196359-6652-4a3b-9a08-8f2e3b779f0b +conductor.security.client.secret=gC98Y9KPYhLjZjN34Qe5ZbIFfanCtCH0IL8BIffWBMmjvaTy +server.port=7000 \ No newline at end of file From 158a36d0f89a678796c8670f8f332a0de5837d56 Mon Sep 17 00:00:00 2001 From: saksham2105 Date: Fri, 5 Jul 2024 19:56:55 +0530 Subject: [PATCH 2/6] dynamic workflow example --- .../workers/DynamicSubworkflowWorker.java | 207 ++++++++++++++++++ .../io/orkes/samples/workers/HelloWorld.java | 187 +--------------- src/main/resources/application.properties | 7 +- 3 files changed, 212 insertions(+), 189 deletions(-) create mode 100644 src/main/java/io/orkes/samples/workers/DynamicSubworkflowWorker.java diff --git a/src/main/java/io/orkes/samples/workers/DynamicSubworkflowWorker.java b/src/main/java/io/orkes/samples/workers/DynamicSubworkflowWorker.java new file mode 100644 index 0000000..f8e2c70 --- /dev/null +++ b/src/main/java/io/orkes/samples/workers/DynamicSubworkflowWorker.java @@ -0,0 +1,207 @@ +package io.orkes.samples.workers; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.netflix.conductor.client.worker.Worker; +import com.netflix.conductor.common.metadata.tasks.Task; +import com.netflix.conductor.common.metadata.tasks.TaskResult; +import com.netflix.conductor.common.metadata.workflow.StartWorkflowRequest; +import com.netflix.conductor.common.metadata.workflow.WorkflowDef; +import com.netflix.conductor.sdk.workflow.def.ConductorWorkflow; +import com.netflix.conductor.sdk.workflow.def.tasks.*; +import com.netflix.conductor.sdk.workflow.executor.WorkflowExecutor; +import io.orkes.conductor.client.WorkflowClient; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Component +@Slf4j +@AllArgsConstructor +public class DynamicSubworkflowWorker implements Worker { + + private final WorkflowClient workflowClient; + private final ObjectMapper objectMapper = new ObjectMapper(); + private final WorkflowExecutor executor; + + @Override + public String getTaskDefName() { + return "dynamic_subworkflow_task"; + } + + /** + * This Worker will start 'dynamic_workflow' workflow and pass the subworkflow definitions using createDynamicSubworkflow() method + * @param task + * @return + */ + @Override + public TaskResult execute(Task task) { + System.out.println("Starting dynamic_subworkflow_task"); + TaskResult result = new TaskResult(task); + try { + result.setOutputData(startQuestWorkflow()); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + result.setStatus(TaskResult.Status.COMPLETED); + return result; + } + + public Map startQuestWorkflow() throws JsonProcessingException { + StartWorkflowRequest request = new StartWorkflowRequest(); + request.setName("dynamic_workflow"); + Map inputData = new HashMap<>(); + //inputData.put("enrichmentSubworkflowsDef", subworkflowDef()); + Object dynamicSubworkflowDef = objectMapper.convertValue(createDynamicSubworkflow(), Object.class); + inputData.put("dynamicSubworkflowDef", dynamicSubworkflowDef); + request.setInput(inputData); + + String workflowId = workflowClient.startWorkflow(request); + log.info("Workflow id: {}", workflowId); + return Map.of("workflowId", workflowId); + } + + private WorkflowDef createDynamicSubworkflow() { + var workflow = new ConductorWorkflow<>(executor); + workflow.setName("dynamic_subworkflows_series"); + workflow.setVersion(1); + workflow.setOwnerEmail("saksham.solanki@orkes.io"); + workflow.setDescription("test"); + workflow.setVariables(Map.of()); + workflow.setDefaultInput(Map.of()); + workflow.setTimeoutPolicy(WorkflowDef.TimeoutPolicy.ALERT_ONLY); + + // ---- Fork task def started + com.netflix.conductor.sdk.workflow.def.tasks.Task[][] forkedTasks = new com.netflix.conductor.sdk.workflow.def.tasks.Task[4][1]; + + //Below code is subworkflows in forked task + //ForkTask is having following structure [{}}, {}}] + //Enrichment level started + ConductorWorkflow conductorWorkflow = new ConductorWorkflow(executor); + conductorWorkflow.setName("imdb_enrichment_workflow"); + Http httptask = new Http("imdb_enrichment_workflow_task"); + httptask.url("https://orkes-api-tester.orkesconductor.com/api"); + conductorWorkflow.add(httptask); + + SubWorkflow forkSubworkflow = new SubWorkflow("imdb_enrichment_subworkflow", conductorWorkflow); + forkSubworkflow.input("name","{workflow.input.name}"); + forkedTasks[0][0] = forkSubworkflow; + + conductorWorkflow = new ConductorWorkflow(executor); + conductorWorkflow.setName("empi_enrichment_workflow"); + httptask = new Http("empi_enrichment_workflow_task"); + httptask.url("https://orkes-api-tester.orkesconductor.com/api"); + conductorWorkflow.add(httptask); + + forkSubworkflow = new SubWorkflow("empi_enrichment_subworkflow", conductorWorkflow); + forkedTasks[1][0] = forkSubworkflow; + + + conductorWorkflow = new ConductorWorkflow(executor); + conductorWorkflow.setName("mlcp_enrichment_workflow"); + httptask = new Http("mlcp_enrichment_workflow_task"); + httptask.url("https://orkes-api-tester.orkesconductor.com/api"); + conductorWorkflow.add(httptask); + + forkSubworkflow = new SubWorkflow("mlcp_enrichment_workflow", conductorWorkflow); + forkedTasks[2][0] = forkSubworkflow; + + + conductorWorkflow = new ConductorWorkflow(executor); + conductorWorkflow.setName("ohc_enrichment_workflow"); + httptask = new Http("ohc_enrichment_workflow_task"); + httptask.url("https://orkes-api-tester.orkesconductor.com/api"); + conductorWorkflow.add(httptask); + + forkSubworkflow = new SubWorkflow("ohc_enrichment_subworkflow", conductorWorkflow); + forkedTasks[3][0] = forkSubworkflow; + + ForkJoin forkJoin = new ForkJoin("fork_enrichment", forkedTasks); + workflow.add(forkJoin); + // Enrichment level ended + + + // Translation Level Starts + com.netflix.conductor.sdk.workflow.def.tasks.Task[][] forkedTasks1 = new com.netflix.conductor.sdk.workflow.def.tasks.Task[2][1]; + conductorWorkflow = new ConductorWorkflow(executor); + conductorWorkflow.setName("Labos_Translation_workflow"); + httptask = new Http("IMDB_EMPI_Translations"); + httptask.url("https://orkes-api-tester.orkesconductor.com/api"); + conductorWorkflow.add(httptask); + + httptask = new Http("LabOS_Translation"); + httptask.url("https://orkes-api-tester.orkesconductor.com/api"); + conductorWorkflow.add(httptask); + + + forkSubworkflow = new SubWorkflow("LabOS_Translation_subworkflow", conductorWorkflow); + forkedTasks1[0][0] = forkSubworkflow; + + conductorWorkflow = new ConductorWorkflow(executor); + conductorWorkflow.setName("BFE_Translation_workflow"); + httptask = new Http("IMDB_Enrichment"); + httptask.url("https://orkes-api-tester.orkesconductor.com/api"); + conductorWorkflow.add(httptask); + + httptask = new Http("BFE_Translation"); + httptask.url("https://orkes-api-tester.orkesconductor.com/api"); + conductorWorkflow.add(httptask); + + forkSubworkflow = new SubWorkflow("BFE_Translation_subworkflow", conductorWorkflow); + forkedTasks1[1][0] = forkSubworkflow; + + forkJoin = new ForkJoin("fork_translation", forkedTasks1); + workflow.add(forkJoin); + //Translation Level Ended + + //Distribution level starts + com.netflix.conductor.sdk.workflow.def.tasks.Task[][] forkedTasks2 = new com.netflix.conductor.sdk.workflow.def.tasks.Task[3][1]; + conductorWorkflow = new ConductorWorkflow(executor); + conductorWorkflow.setName("BFE_Distributions"); + httptask = new Http("bfe_distributions_subworkflow_task"); + httptask.url("https://orkes-api-tester.orkesconductor.com/api"); + conductorWorkflow.add(httptask); + + forkSubworkflow = new SubWorkflow("BFE_Distributions_subworkflow", conductorWorkflow); + forkSubworkflow.input("name","{workflow.input.name}"); + forkedTasks2[0][0] = forkSubworkflow; + + conductorWorkflow = new ConductorWorkflow(executor); + conductorWorkflow.setName("ELabs_Distributions"); + httptask = new Http("ELabs_Distributions_subworkflow_task"); + httptask.url("https://orkes-api-tester.orkesconductor.com/api"); + conductorWorkflow.add(httptask); + + forkSubworkflow = new SubWorkflow("ELabs_Distributions_subworkflow", conductorWorkflow); + forkedTasks2[1][0] = forkSubworkflow; + + + conductorWorkflow = new ConductorWorkflow(executor); + conductorWorkflow.setName("LabOs_Distributions"); + httptask = new Http("LabOs_Distributions_subworkflow_task"); + httptask.url("https://orkes-api-tester.orkesconductor.com/api"); + conductorWorkflow.add(httptask); + + forkSubworkflow = new SubWorkflow("LabOs_Distributions_subworkflow", conductorWorkflow); + forkedTasks2[2][0] = forkSubworkflow; + + forkJoin = new ForkJoin("fork_distribution", forkedTasks2); + workflow.add(forkJoin); + + // Distribution level Ended + + WorkflowDef workflowDef = workflow.toWorkflowDef(); + workflowDef.setOutputParameters(Map.of()); + workflowDef.setTimeoutSeconds(0); + workflowDef.setInputTemplate(Map.of()); + workflowDef.setSchemaVersion(2); + workflowDef.setInputParameters(List.of()); + + return workflowDef; + } +} diff --git a/src/main/java/io/orkes/samples/workers/HelloWorld.java b/src/main/java/io/orkes/samples/workers/HelloWorld.java index a299181..0fb1387 100644 --- a/src/main/java/io/orkes/samples/workers/HelloWorld.java +++ b/src/main/java/io/orkes/samples/workers/HelloWorld.java @@ -1,204 +1,21 @@ package io.orkes.samples.workers; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import com.netflix.conductor.client.worker.Worker; import com.netflix.conductor.common.metadata.tasks.Task; import com.netflix.conductor.common.metadata.tasks.TaskResult; -import com.netflix.conductor.common.metadata.workflow.StartWorkflowRequest; -import com.netflix.conductor.common.metadata.workflow.WorkflowDef; -import com.netflix.conductor.sdk.workflow.def.ConductorWorkflow; -import com.netflix.conductor.sdk.workflow.def.tasks.*; -import com.netflix.conductor.sdk.workflow.executor.WorkflowExecutor; -import io.orkes.conductor.client.WorkflowClient; -import lombok.AllArgsConstructor; -import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - @Component -@Slf4j -@AllArgsConstructor public class HelloWorld implements Worker { - - private final WorkflowClient workflowClient; - private final ObjectMapper objectMapper = new ObjectMapper(); - private final WorkflowExecutor executor; - @Override public String getTaskDefName() { - return "quest_start_subworkflow"; + return "hello_world"; } @Override public TaskResult execute(Task task) { - - System.out.println("Starting quest_start_subworkflow"); TaskResult result = new TaskResult(task); - try { - result.setOutputData(startQuestWorkflow()); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } + result.addOutputData("hw_response", "Hello World!"); result.setStatus(TaskResult.Status.COMPLETED); return result; } - - public Map startQuestWorkflow() throws JsonProcessingException { - StartWorkflowRequest request = new StartWorkflowRequest(); - request.setName("dynamic_workflow"); - Map inputData = new HashMap<>(); - //inputData.put("enrichmentSubworkflowsDef", subworkflowDef()); - Object dynamicSubworkflowDef = objectMapper.convertValue(createDynamicSubworkflow(), Object.class); - inputData.put("dynamicSubworkflowDef", dynamicSubworkflowDef); - request.setInput(inputData); - - String workflowId = workflowClient.startWorkflow(request); - log.info("Workflow id: {}", workflowId); - return Map.of("workflowId", workflowId); - } - - - private WorkflowDef createDynamicSubworkflow() { - var workflow = new ConductorWorkflow<>(executor); - workflow.setName("All_quest_subworkflows_series"); - workflow.setVersion(1); - workflow.setOwnerEmail("saksham.solanki@orkes.io"); - workflow.setDescription("test"); - workflow.setVariables(Map.of()); - workflow.setDefaultInput(Map.of()); - workflow.setTimeoutPolicy(WorkflowDef.TimeoutPolicy.ALERT_ONLY); - - // ---- Fork task def started - com.netflix.conductor.sdk.workflow.def.tasks.Task[][] forkedTasks = new com.netflix.conductor.sdk.workflow.def.tasks.Task[4][1]; - - //Below code is subworkflows in forked task - //ForkTask is having following structure [{}}, {}}] - //Enrichment level started - ConductorWorkflow conductorWorkflow = new ConductorWorkflow(executor); - conductorWorkflow.setName("imdb_enrichment_workflow"); - Http httptask = new Http("imdb_enrichment_workflow_task"); - httptask.url("https://orkes-api-tester.orkesconductor.com/api"); - conductorWorkflow.add(httptask); - - SubWorkflow forkSubworkflow = new SubWorkflow("imdb_enrichment_subworkflow", conductorWorkflow); - forkSubworkflow.input("name","{workflow.input.name}"); - forkedTasks[0][0] = forkSubworkflow; - - conductorWorkflow = new ConductorWorkflow(executor); - conductorWorkflow.setName("empi_enrichment_workflow"); - httptask = new Http("empi_enrichment_workflow_task"); - httptask.url("https://orkes-api-tester.orkesconductor.com/api"); - conductorWorkflow.add(httptask); - - forkSubworkflow = new SubWorkflow("empi_enrichment_subworkflow", conductorWorkflow); - forkedTasks[1][0] = forkSubworkflow; - - - conductorWorkflow = new ConductorWorkflow(executor); - conductorWorkflow.setName("mlcp_enrichment_workflow"); - httptask = new Http("mlcp_enrichment_workflow_task"); - httptask.url("https://orkes-api-tester.orkesconductor.com/api"); - conductorWorkflow.add(httptask); - - forkSubworkflow = new SubWorkflow("mlcp_enrichment_workflow", conductorWorkflow); - forkedTasks[2][0] = forkSubworkflow; - - - conductorWorkflow = new ConductorWorkflow(executor); - conductorWorkflow.setName("ohc_enrichment_workflow"); - httptask = new Http("ohc_enrichment_workflow_task"); - httptask.url("https://orkes-api-tester.orkesconductor.com/api"); - conductorWorkflow.add(httptask); - - forkSubworkflow = new SubWorkflow("ohc_enrichment_subworkflow", conductorWorkflow); - forkedTasks[3][0] = forkSubworkflow; - - ForkJoin forkJoin = new ForkJoin("fork_enrichment", forkedTasks); - workflow.add(forkJoin); - // Enrichment level ended - - - // Translation Level Starts - com.netflix.conductor.sdk.workflow.def.tasks.Task[][] forkedTasks1 = new com.netflix.conductor.sdk.workflow.def.tasks.Task[2][1]; - conductorWorkflow = new ConductorWorkflow(executor); - conductorWorkflow.setName("Labos_Translation_workflow"); - httptask = new Http("IMDB_EMPI_Translations"); - httptask.url("https://orkes-api-tester.orkesconductor.com/api"); - conductorWorkflow.add(httptask); - - httptask = new Http("LabOS_Translation"); - httptask.url("https://orkes-api-tester.orkesconductor.com/api"); - conductorWorkflow.add(httptask); - - - forkSubworkflow = new SubWorkflow("LabOS_Translation_subworkflow", conductorWorkflow); - forkedTasks1[0][0] = forkSubworkflow; - - conductorWorkflow = new ConductorWorkflow(executor); - conductorWorkflow.setName("BFE_Translation_workflow"); - httptask = new Http("IMDB_Enrichment"); - httptask.url("https://orkes-api-tester.orkesconductor.com/api"); - conductorWorkflow.add(httptask); - - httptask = new Http("BFE_Translation"); - httptask.url("https://orkes-api-tester.orkesconductor.com/api"); - conductorWorkflow.add(httptask); - - forkSubworkflow = new SubWorkflow("BFE_Translation_subworkflow", conductorWorkflow); - forkedTasks1[1][0] = forkSubworkflow; - - forkJoin = new ForkJoin("fork_translation", forkedTasks1); - workflow.add(forkJoin); - //Translation Level Ended - - //Distribution level starts - com.netflix.conductor.sdk.workflow.def.tasks.Task[][] forkedTasks2 = new com.netflix.conductor.sdk.workflow.def.tasks.Task[3][1]; - conductorWorkflow = new ConductorWorkflow(executor); - conductorWorkflow.setName("BFE_Distributions"); - httptask = new Http("bfe_distributions_subworkflow_task"); - httptask.url("https://orkes-api-tester.orkesconductor.com/api"); - conductorWorkflow.add(httptask); - - forkSubworkflow = new SubWorkflow("BFE_Distributions_subworkflow", conductorWorkflow); - forkSubworkflow.input("name","{workflow.input.name}"); - forkedTasks2[0][0] = forkSubworkflow; - - conductorWorkflow = new ConductorWorkflow(executor); - conductorWorkflow.setName("ELabs_Distributions"); - httptask = new Http("ELabs_Distributions_subworkflow_task"); - httptask.url("https://orkes-api-tester.orkesconductor.com/api"); - conductorWorkflow.add(httptask); - - forkSubworkflow = new SubWorkflow("ELabs_Distributions_subworkflow", conductorWorkflow); - forkedTasks2[1][0] = forkSubworkflow; - - - conductorWorkflow = new ConductorWorkflow(executor); - conductorWorkflow.setName("LabOs_Distributions"); - httptask = new Http("LabOs_Distributions_subworkflow_task"); - httptask.url("https://orkes-api-tester.orkesconductor.com/api"); - conductorWorkflow.add(httptask); - - forkSubworkflow = new SubWorkflow("LabOs_Distributions_subworkflow", conductorWorkflow); - forkedTasks2[2][0] = forkSubworkflow; - - forkJoin = new ForkJoin("fork_distribution", forkedTasks2); - workflow.add(forkJoin); - - // Distribution level Ended - - WorkflowDef workflowDef = workflow.toWorkflowDef(); - workflowDef.setOutputParameters(Map.of()); - workflowDef.setTimeoutSeconds(0); - workflowDef.setInputTemplate(Map.of()); - workflowDef.setSchemaVersion(2); - workflowDef.setInputParameters(List.of()); - - return workflowDef; - } - } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index cece382..c171895 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1,5 +1,4 @@ -conductor.server.url=http://localhost:8080/api +conductor.server.url=https://play.orkes.io/api -conductor.security.client.key-id=96196359-6652-4a3b-9a08-8f2e3b779f0b -conductor.security.client.secret=gC98Y9KPYhLjZjN34Qe5ZbIFfanCtCH0IL8BIffWBMmjvaTy -server.port=7000 \ No newline at end of file +conductor.security.client.key-id=_CHANGE_ME_ +conductor.security.client.secret=_CHANGE_ME_ \ No newline at end of file From 069350aad88af83345bcb9f6bd4de8ec3f76ab88 Mon Sep 17 00:00:00 2001 From: saksham2105 Date: Fri, 5 Jul 2024 19:58:42 +0530 Subject: [PATCH 3/6] minor fixes --- .../samples/OrkesWorkersApplication.java | 28 +-------------- .../samples/workers/IntegrationsWorker.java | 34 ------------------- 2 files changed, 1 insertion(+), 61 deletions(-) delete mode 100644 src/main/java/io/orkes/samples/workers/IntegrationsWorker.java diff --git a/src/main/java/io/orkes/samples/OrkesWorkersApplication.java b/src/main/java/io/orkes/samples/OrkesWorkersApplication.java index 4863806..96cda7a 100644 --- a/src/main/java/io/orkes/samples/OrkesWorkersApplication.java +++ b/src/main/java/io/orkes/samples/OrkesWorkersApplication.java @@ -7,7 +7,6 @@ import io.orkes.conductor.client.TaskClient; import io.orkes.conductor.client.WorkflowClient; import io.orkes.conductor.client.automator.TaskRunnerConfigurer; -import io.orkes.conductor.client.http.OrkesIntegrationClient; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.boot.SpringApplication; @@ -66,31 +65,6 @@ public OrkesClients orkesClients() { return orkesClients; } - @Bean - public OrkesIntegrationClient orkesIntegrationClient() { - String rootUri = env.getProperty(CONDUCTOR_SERVER_URL); - String key = env.getProperty(CONDUCTOR_CLIENT_KEY_ID); - String secret = env.getProperty(CONDUCTOR_CLIENT_SECRET); - - if ("_CHANGE_ME_".equals(key) || "_CHANGE_ME_".equals(secret)) { - log.error("Please provide an application key id and secret"); - throw new RuntimeException("No Application Key"); - } - - ApiClient apiClient = null; - - log.info("Conductor Server URL: {}", rootUri); - if(StringUtils.isNotBlank(key) && StringUtils.isNotBlank(secret)) { - log.info("Using Key and Secret to connect to the server"); - apiClient = new ApiClient(rootUri, key, secret); - } else { - log.info("setCredentialsIfPresent: Proceeding without client authentication"); - apiClient = new ApiClient(rootUri); - } - OrkesIntegrationClient orkesIntegrationClient = new OrkesIntegrationClient(apiClient); - return orkesIntegrationClient; - } - @Bean public TaskClient taskClient(OrkesClients orkesClients) { TaskClient taskClient = orkesClients.getTaskClient(); @@ -108,7 +82,7 @@ public TaskRunnerConfigurer taskRunnerConfigurer(List workersList, TaskC log.info("Starting workers : {}", workersList); TaskRunnerConfigurer runnerConfigurer = new TaskRunnerConfigurer .Builder(taskClient, workersList) - .withThreadCount(Math.max(1, workersList.size() * 3)) + .withThreadCount(Math.max(1, workersList.size())) .build(); runnerConfigurer.init(); return runnerConfigurer; diff --git a/src/main/java/io/orkes/samples/workers/IntegrationsWorker.java b/src/main/java/io/orkes/samples/workers/IntegrationsWorker.java deleted file mode 100644 index a40d9ce..0000000 --- a/src/main/java/io/orkes/samples/workers/IntegrationsWorker.java +++ /dev/null @@ -1,34 +0,0 @@ -package io.orkes.samples.workers; - -import com.netflix.conductor.client.worker.Worker; -import com.netflix.conductor.common.metadata.tasks.Task; -import com.netflix.conductor.common.metadata.tasks.TaskResult; -import io.orkes.conductor.client.http.OrkesIntegrationClient; -import io.orkes.conductor.client.model.integration.Integration; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -@Component -@Slf4j -public class IntegrationsWorker implements Worker { - - @Autowired - private OrkesIntegrationClient orkesIntegrationClient; - - @Override - public String getTaskDefName() { - return "integrations_worker"; - } - - @Override - public TaskResult execute(Task task) { - - TaskResult result = new TaskResult(task); - log.info("Worker integrations_worker is being started"); - result.setStatus(TaskResult.Status.COMPLETED); - Integration integration = orkesIntegrationClient.getIntegration("anthropic_saas"); - System.out.println("Integration is : " + integration); - return result; - } -} From 84c614a33e4facdca7f014ca5f845e3b7daf1c48 Mon Sep 17 00:00:00 2001 From: saksham2105 Date: Fri, 5 Jul 2024 20:02:34 +0530 Subject: [PATCH 4/6] minor fixes --- .../io/orkes/samples/workers/DynamicSubworkflowWorker.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/orkes/samples/workers/DynamicSubworkflowWorker.java b/src/main/java/io/orkes/samples/workers/DynamicSubworkflowWorker.java index f8e2c70..69220c0 100644 --- a/src/main/java/io/orkes/samples/workers/DynamicSubworkflowWorker.java +++ b/src/main/java/io/orkes/samples/workers/DynamicSubworkflowWorker.java @@ -44,7 +44,7 @@ public TaskResult execute(Task task) { System.out.println("Starting dynamic_subworkflow_task"); TaskResult result = new TaskResult(task); try { - result.setOutputData(startQuestWorkflow()); + result.setOutputData(startExistingWorkflow()); } catch (JsonProcessingException e) { throw new RuntimeException(e); } @@ -52,7 +52,7 @@ public TaskResult execute(Task task) { return result; } - public Map startQuestWorkflow() throws JsonProcessingException { + public Map startExistingWorkflow() throws JsonProcessingException { StartWorkflowRequest request = new StartWorkflowRequest(); request.setName("dynamic_workflow"); Map inputData = new HashMap<>(); From 02fdd13df045c298b53c9714030d240359153a60 Mon Sep 17 00:00:00 2001 From: saksham2105 Date: Tue, 9 Jul 2024 19:22:58 +0530 Subject: [PATCH 5/6] Creating dynamic workflow using json mediation rules --- .../io/orkes/samples/models/Distribution.java | 16 ++ .../io/orkes/samples/models/Enrichment.java | 15 ++ .../orkes/samples/models/MediationRules.java | 12 ++ .../io/orkes/samples/models/Translation.java | 18 ++ .../workers/DynamicSubworkflowWorker.java | 202 +++++++----------- 5 files changed, 144 insertions(+), 119 deletions(-) create mode 100644 src/main/java/io/orkes/samples/models/Distribution.java create mode 100644 src/main/java/io/orkes/samples/models/Enrichment.java create mode 100644 src/main/java/io/orkes/samples/models/MediationRules.java create mode 100644 src/main/java/io/orkes/samples/models/Translation.java diff --git a/src/main/java/io/orkes/samples/models/Distribution.java b/src/main/java/io/orkes/samples/models/Distribution.java new file mode 100644 index 0000000..667354d --- /dev/null +++ b/src/main/java/io/orkes/samples/models/Distribution.java @@ -0,0 +1,16 @@ +package io.orkes.samples.models; + +import lombok.Data; + +import java.util.Map; + +@Data +public class Distribution { + private String translation; + private String distributeTo; + private String sendToORB = "Y"; + private String taskType = "HTTP"; + private String natsWorkflowName; + private Integer natsWorkflowVersion; + private Map natsWorkflowInput; +} diff --git a/src/main/java/io/orkes/samples/models/Enrichment.java b/src/main/java/io/orkes/samples/models/Enrichment.java new file mode 100644 index 0000000..b92638f --- /dev/null +++ b/src/main/java/io/orkes/samples/models/Enrichment.java @@ -0,0 +1,15 @@ +package io.orkes.samples.models; + +import lombok.Data; + +import java.util.Map; + +@Data +public class Enrichment { + private String enrichmentType; + private String sendToORB = "N"; + private String taskType = "HTTP"; + private String natsWorkflowName; + private Integer natsWorkflowVersion; + private Map natsWorkflowInput; +} diff --git a/src/main/java/io/orkes/samples/models/MediationRules.java b/src/main/java/io/orkes/samples/models/MediationRules.java new file mode 100644 index 0000000..1e949de --- /dev/null +++ b/src/main/java/io/orkes/samples/models/MediationRules.java @@ -0,0 +1,12 @@ +package io.orkes.samples.models; + +import lombok.Data; + +import java.util.List; + +@Data +public class MediationRules { + private List enrichments; + private List translations; + private List distributions; +} diff --git a/src/main/java/io/orkes/samples/models/Translation.java b/src/main/java/io/orkes/samples/models/Translation.java new file mode 100644 index 0000000..4d124f9 --- /dev/null +++ b/src/main/java/io/orkes/samples/models/Translation.java @@ -0,0 +1,18 @@ +package io.orkes.samples.models; + +import lombok.Data; + +import java.util.List; +import java.util.Map; + +@Data +public class Translation { + private String name; + private List enrichments; + private String sendToORB = "N"; + private String taskType = "HTTP"; + private String natsWorkflowName; + private Integer natsWorkflowVersion; + private Map natsWorkflowInput; + +} diff --git a/src/main/java/io/orkes/samples/workers/DynamicSubworkflowWorker.java b/src/main/java/io/orkes/samples/workers/DynamicSubworkflowWorker.java index 69220c0..f81dacf 100644 --- a/src/main/java/io/orkes/samples/workers/DynamicSubworkflowWorker.java +++ b/src/main/java/io/orkes/samples/workers/DynamicSubworkflowWorker.java @@ -11,6 +11,7 @@ import com.netflix.conductor.sdk.workflow.def.tasks.*; import com.netflix.conductor.sdk.workflow.executor.WorkflowExecutor; import io.orkes.conductor.client.WorkflowClient; +import io.orkes.samples.models.MediationRules; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -31,7 +32,7 @@ public class DynamicSubworkflowWorker implements Worker { @Override public String getTaskDefName() { - return "dynamic_subworkflow_task"; + return "quest_start_subworkflow"; } /** @@ -41,10 +42,11 @@ public String getTaskDefName() { */ @Override public TaskResult execute(Task task) { - System.out.println("Starting dynamic_subworkflow_task"); + System.out.println("Starting quest_start_subworkflow task"); TaskResult result = new TaskResult(task); try { - result.setOutputData(startExistingWorkflow()); + MediationRules mediationRules = objectMapper.convertValue(task.getInputData().get("mediation_rules"), MediationRules.class); + result.setOutputData(startExistingWorkflow(mediationRules)); } catch (JsonProcessingException e) { throw new RuntimeException(e); } @@ -52,13 +54,14 @@ public TaskResult execute(Task task) { return result; } - public Map startExistingWorkflow() throws JsonProcessingException { + public Map startExistingWorkflow(MediationRules mediationRules) throws JsonProcessingException { StartWorkflowRequest request = new StartWorkflowRequest(); request.setName("dynamic_workflow"); Map inputData = new HashMap<>(); //inputData.put("enrichmentSubworkflowsDef", subworkflowDef()); - Object dynamicSubworkflowDef = objectMapper.convertValue(createDynamicSubworkflow(), Object.class); + Object dynamicSubworkflowDef = objectMapper.convertValue(createDynamicSubworkflow(mediationRules), Object.class); inputData.put("dynamicSubworkflowDef", dynamicSubworkflowDef); + inputData.put("mediation_rules", objectMapper.convertValue(mediationRules, Object.class)); request.setInput(inputData); String workflowId = workflowClient.startWorkflow(request); @@ -66,7 +69,7 @@ public Map startExistingWorkflow() throws JsonProcessingExceptio return Map.of("workflowId", workflowId); } - private WorkflowDef createDynamicSubworkflow() { + private WorkflowDef createDynamicSubworkflow(MediationRules mediationRules) throws JsonProcessingException { var workflow = new ConductorWorkflow<>(executor); workflow.setName("dynamic_subworkflows_series"); workflow.setVersion(1); @@ -76,124 +79,85 @@ private WorkflowDef createDynamicSubworkflow() { workflow.setDefaultInput(Map.of()); workflow.setTimeoutPolicy(WorkflowDef.TimeoutPolicy.ALERT_ONLY); - // ---- Fork task def started - com.netflix.conductor.sdk.workflow.def.tasks.Task[][] forkedTasks = new com.netflix.conductor.sdk.workflow.def.tasks.Task[4][1]; - - //Below code is subworkflows in forked task - //ForkTask is having following structure [{}}, {}}] - //Enrichment level started - ConductorWorkflow conductorWorkflow = new ConductorWorkflow(executor); - conductorWorkflow.setName("imdb_enrichment_workflow"); - Http httptask = new Http("imdb_enrichment_workflow_task"); - httptask.url("https://orkes-api-tester.orkesconductor.com/api"); - conductorWorkflow.add(httptask); + //ForkTask is having following structure [{}, {}] - SubWorkflow forkSubworkflow = new SubWorkflow("imdb_enrichment_subworkflow", conductorWorkflow); - forkSubworkflow.input("name","{workflow.input.name}"); - forkedTasks[0][0] = forkSubworkflow; + // --------------- Enrichment level started ------------------ + com.netflix.conductor.sdk.workflow.def.tasks.Task[][] enrichmentForkTasks = new com.netflix.conductor.sdk.workflow.def.tasks.Task[mediationRules.getEnrichments().size()][1]; + for (int i = 0; i < mediationRules.getEnrichments().size(); i++) { + ConductorWorkflow conductorWorkflow = new ConductorWorkflow(executor); + conductorWorkflow.setName(mediationRules.getEnrichments().get(i).getEnrichmentType() + "_workflow"); - conductorWorkflow = new ConductorWorkflow(executor); - conductorWorkflow.setName("empi_enrichment_workflow"); - httptask = new Http("empi_enrichment_workflow_task"); - httptask.url("https://orkes-api-tester.orkesconductor.com/api"); - conductorWorkflow.add(httptask); + Http httptask = new Http(mediationRules.getEnrichments().get(i).getEnrichmentType() + "_enrichment_workflow_task"); + httptask.url("https://orkes-api-tester.orkesconductor.com/api"); + conductorWorkflow.add(httptask); - forkSubworkflow = new SubWorkflow("empi_enrichment_subworkflow", conductorWorkflow); - forkedTasks[1][0] = forkSubworkflow; + SubWorkflow natsSubworkflow = new SubWorkflow("nats_" + mediationRules.getEnrichments().get(i).getEnrichmentType() + "_subworkflow_ref", mediationRules.getEnrichments().get(i).getNatsWorkflowName(), mediationRules.getEnrichments().get(i).getNatsWorkflowVersion()); + natsSubworkflow.input(mediationRules.getEnrichments().get(i).getNatsWorkflowInput()); + Switch sendToORBEnrichmentSwitch = new Switch("send_to_" + mediationRules.getEnrichments().get(i).getEnrichmentType() + "_switch", "${workflow.input.sendToORB}").switchCase("Y", natsSubworkflow).defaultCase(List.of()); + conductorWorkflow.add(sendToORBEnrichmentSwitch); + SubWorkflow forkSubworkflow = new SubWorkflow(mediationRules.getEnrichments().get(i).getEnrichmentType() + "_subworkflow_ref", conductorWorkflow); + forkSubworkflow.input("sendToORB", mediationRules.getEnrichments().get(i).getSendToORB()); + enrichmentForkTasks[i][0] = forkSubworkflow; + } + ForkJoin forkEnrichment = new ForkJoin("fork_enrichment", enrichmentForkTasks); + workflow.add(forkEnrichment); + // ------------- Enrichment level ended ---------------- + + + // -------------- Translation Level started ---------------- + com.netflix.conductor.sdk.workflow.def.tasks.Task[][] translationForkTasks = new com.netflix.conductor.sdk.workflow.def.tasks.Task[mediationRules.getTranslations().size()][1]; + for (int i = 0; i < mediationRules.getTranslations().size(); i++) { + ConductorWorkflow conductorWorkflow = new ConductorWorkflow(executor); + SubWorkflow forkSubworkflow = new SubWorkflow(mediationRules.getTranslations().get(i).getName() + "_subworkflow_ref", conductorWorkflow); + forkSubworkflow.input("sendToORB", mediationRules.getTranslations().get(i).getSendToORB()); + conductorWorkflow.setName(mediationRules.getTranslations().get(i).getName() + "_workflow"); + for (int j = 0; j < mediationRules.getTranslations().get(i).getEnrichments().size(); j++) { + Http httptask = new Http(mediationRules.getTranslations().get(i).getEnrichments().get(j)+ "_translations_workflow_task"); + httptask.url("https://orkes-api-tester.orkesconductor.com/api"); + String taskRef = mediationRules.getTranslations().get(i).getEnrichments().get(j) + "_subworkflow_ref"; + String outputExpression = "${" + taskRef + ".output.response}"; //Can differ with different different tasks. Example with Simple/Inline tasks we might have to use result + forkSubworkflow.input(mediationRules.getTranslations().get(i).getEnrichments().get(j), outputExpression); + conductorWorkflow.add(httptask); + } + SubWorkflow natsSubworkflow = new SubWorkflow("nats_" + mediationRules.getTranslations().get(i).getName() + "_subworkflow_ref", mediationRules.getTranslations().get(i).getNatsWorkflowName(), mediationRules.getTranslations().get(i).getNatsWorkflowVersion()); + natsSubworkflow.input(mediationRules.getTranslations().get(i).getNatsWorkflowInput()); + Switch sendToORBTranslationSwitch = new Switch("send_to_" + mediationRules.getTranslations().get(i).getName() + "_switch", "${workflow.input.sendToORB}").switchCase("Y", natsSubworkflow).defaultCase(List.of()); + conductorWorkflow.add(sendToORBTranslationSwitch); + + translationForkTasks[i][0] = forkSubworkflow; + } + ForkJoin forkTranslation = new ForkJoin("fork_translation", translationForkTasks); + workflow.add(forkTranslation); + // ------------ Translation Level Ended ------------------- + + + // -------------- Distribution level started -------------------- + com.netflix.conductor.sdk.workflow.def.tasks.Task[][] distributionForkTasks = new com.netflix.conductor.sdk.workflow.def.tasks.Task[mediationRules.getDistributions().size()][1]; + for (int i = 0; i < mediationRules.getDistributions().size(); i++) { + ConductorWorkflow conductorWorkflow = new ConductorWorkflow(executor); + conductorWorkflow.setName(mediationRules.getDistributions().get(i).getDistributeTo() + "_workflow"); + Http httptask = new Http(mediationRules.getDistributions().get(i).getDistributeTo() + "_distributions_workflow_task"); + httptask.url("https://orkes-api-tester.orkesconductor.com/api"); + conductorWorkflow.add(httptask); + + SubWorkflow natsSubworkflow = new SubWorkflow("nats_" + mediationRules.getDistributions().get(i).getDistributeTo() + "_subworkflow_ref", mediationRules.getDistributions().get(i).getNatsWorkflowName(), mediationRules.getDistributions().get(i).getNatsWorkflowVersion()); + natsSubworkflow.input(mediationRules.getDistributions().get(i).getNatsWorkflowInput()); + Switch sendToORBDistributionSwitch = new Switch("send_to_" + mediationRules.getDistributions().get(i).getDistributeTo() + "_switch", "${workflow.input.sendToORB}").switchCase("Y", natsSubworkflow).defaultCase(List.of()); + conductorWorkflow.add(sendToORBDistributionSwitch); + + SubWorkflow forkSubworkflow = new SubWorkflow(mediationRules.getDistributions().get(i).getDistributeTo() + "_subworkflow_ref", conductorWorkflow); + forkSubworkflow.input("sendToORB", mediationRules.getDistributions().get(i).getSendToORB()); + String taskRef = mediationRules.getDistributions().get(i).getTranslation() + "_subworkflow_ref"; + String outputExpression = "${" + taskRef + ".output.response}"; //Can differ with different different tasks. Example with Simple/Inline tasks we might have to use result + forkSubworkflow.input(mediationRules.getDistributions().get(i).getTranslation(), outputExpression); + forkSubworkflow.input("sink", "nats:nats-integ:subject"); + distributionForkTasks[i][0] = forkSubworkflow; + } + ForkJoin forkDistribution = new ForkJoin("fork_distribution", distributionForkTasks); + workflow.add(forkDistribution); + // ----------- Distribution level Ended -------------------- - conductorWorkflow = new ConductorWorkflow(executor); - conductorWorkflow.setName("mlcp_enrichment_workflow"); - httptask = new Http("mlcp_enrichment_workflow_task"); - httptask.url("https://orkes-api-tester.orkesconductor.com/api"); - conductorWorkflow.add(httptask); - - forkSubworkflow = new SubWorkflow("mlcp_enrichment_workflow", conductorWorkflow); - forkedTasks[2][0] = forkSubworkflow; - - - conductorWorkflow = new ConductorWorkflow(executor); - conductorWorkflow.setName("ohc_enrichment_workflow"); - httptask = new Http("ohc_enrichment_workflow_task"); - httptask.url("https://orkes-api-tester.orkesconductor.com/api"); - conductorWorkflow.add(httptask); - - forkSubworkflow = new SubWorkflow("ohc_enrichment_subworkflow", conductorWorkflow); - forkedTasks[3][0] = forkSubworkflow; - - ForkJoin forkJoin = new ForkJoin("fork_enrichment", forkedTasks); - workflow.add(forkJoin); - // Enrichment level ended - - - // Translation Level Starts - com.netflix.conductor.sdk.workflow.def.tasks.Task[][] forkedTasks1 = new com.netflix.conductor.sdk.workflow.def.tasks.Task[2][1]; - conductorWorkflow = new ConductorWorkflow(executor); - conductorWorkflow.setName("Labos_Translation_workflow"); - httptask = new Http("IMDB_EMPI_Translations"); - httptask.url("https://orkes-api-tester.orkesconductor.com/api"); - conductorWorkflow.add(httptask); - - httptask = new Http("LabOS_Translation"); - httptask.url("https://orkes-api-tester.orkesconductor.com/api"); - conductorWorkflow.add(httptask); - - - forkSubworkflow = new SubWorkflow("LabOS_Translation_subworkflow", conductorWorkflow); - forkedTasks1[0][0] = forkSubworkflow; - - conductorWorkflow = new ConductorWorkflow(executor); - conductorWorkflow.setName("BFE_Translation_workflow"); - httptask = new Http("IMDB_Enrichment"); - httptask.url("https://orkes-api-tester.orkesconductor.com/api"); - conductorWorkflow.add(httptask); - - httptask = new Http("BFE_Translation"); - httptask.url("https://orkes-api-tester.orkesconductor.com/api"); - conductorWorkflow.add(httptask); - - forkSubworkflow = new SubWorkflow("BFE_Translation_subworkflow", conductorWorkflow); - forkedTasks1[1][0] = forkSubworkflow; - - forkJoin = new ForkJoin("fork_translation", forkedTasks1); - workflow.add(forkJoin); - //Translation Level Ended - - //Distribution level starts - com.netflix.conductor.sdk.workflow.def.tasks.Task[][] forkedTasks2 = new com.netflix.conductor.sdk.workflow.def.tasks.Task[3][1]; - conductorWorkflow = new ConductorWorkflow(executor); - conductorWorkflow.setName("BFE_Distributions"); - httptask = new Http("bfe_distributions_subworkflow_task"); - httptask.url("https://orkes-api-tester.orkesconductor.com/api"); - conductorWorkflow.add(httptask); - - forkSubworkflow = new SubWorkflow("BFE_Distributions_subworkflow", conductorWorkflow); - forkSubworkflow.input("name","{workflow.input.name}"); - forkedTasks2[0][0] = forkSubworkflow; - - conductorWorkflow = new ConductorWorkflow(executor); - conductorWorkflow.setName("ELabs_Distributions"); - httptask = new Http("ELabs_Distributions_subworkflow_task"); - httptask.url("https://orkes-api-tester.orkesconductor.com/api"); - conductorWorkflow.add(httptask); - - forkSubworkflow = new SubWorkflow("ELabs_Distributions_subworkflow", conductorWorkflow); - forkedTasks2[1][0] = forkSubworkflow; - - - conductorWorkflow = new ConductorWorkflow(executor); - conductorWorkflow.setName("LabOs_Distributions"); - httptask = new Http("LabOs_Distributions_subworkflow_task"); - httptask.url("https://orkes-api-tester.orkesconductor.com/api"); - conductorWorkflow.add(httptask); - - forkSubworkflow = new SubWorkflow("LabOs_Distributions_subworkflow", conductorWorkflow); - forkedTasks2[2][0] = forkSubworkflow; - - forkJoin = new ForkJoin("fork_distribution", forkedTasks2); - workflow.add(forkJoin); - - // Distribution level Ended WorkflowDef workflowDef = workflow.toWorkflowDef(); workflowDef.setOutputParameters(Map.of()); From 20ffca3f39932c74c28094c292d5b17061af37ce Mon Sep 17 00:00:00 2001 From: saksham2105 Date: Wed, 10 Jul 2024 12:40:58 +0530 Subject: [PATCH 6/6] fix order --- .../workers/DynamicSubworkflowWorker.java | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/src/main/java/io/orkes/samples/workers/DynamicSubworkflowWorker.java b/src/main/java/io/orkes/samples/workers/DynamicSubworkflowWorker.java index f81dacf..a5b49e5 100644 --- a/src/main/java/io/orkes/samples/workers/DynamicSubworkflowWorker.java +++ b/src/main/java/io/orkes/samples/workers/DynamicSubworkflowWorker.java @@ -61,7 +61,6 @@ public Map startExistingWorkflow(MediationRules mediationRules) //inputData.put("enrichmentSubworkflowsDef", subworkflowDef()); Object dynamicSubworkflowDef = objectMapper.convertValue(createDynamicSubworkflow(mediationRules), Object.class); inputData.put("dynamicSubworkflowDef", dynamicSubworkflowDef); - inputData.put("mediation_rules", objectMapper.convertValue(mediationRules, Object.class)); request.setInput(inputData); String workflowId = workflowClient.startWorkflow(request); @@ -87,15 +86,15 @@ private WorkflowDef createDynamicSubworkflow(MediationRules mediationRules) thro ConductorWorkflow conductorWorkflow = new ConductorWorkflow(executor); conductorWorkflow.setName(mediationRules.getEnrichments().get(i).getEnrichmentType() + "_workflow"); - Http httptask = new Http(mediationRules.getEnrichments().get(i).getEnrichmentType() + "_enrichment_workflow_task"); - httptask.url("https://orkes-api-tester.orkesconductor.com/api"); - conductorWorkflow.add(httptask); - SubWorkflow natsSubworkflow = new SubWorkflow("nats_" + mediationRules.getEnrichments().get(i).getEnrichmentType() + "_subworkflow_ref", mediationRules.getEnrichments().get(i).getNatsWorkflowName(), mediationRules.getEnrichments().get(i).getNatsWorkflowVersion()); natsSubworkflow.input(mediationRules.getEnrichments().get(i).getNatsWorkflowInput()); Switch sendToORBEnrichmentSwitch = new Switch("send_to_" + mediationRules.getEnrichments().get(i).getEnrichmentType() + "_switch", "${workflow.input.sendToORB}").switchCase("Y", natsSubworkflow).defaultCase(List.of()); conductorWorkflow.add(sendToORBEnrichmentSwitch); + Http httptask = new Http(mediationRules.getEnrichments().get(i).getEnrichmentType() + "_enrichment_workflow_task"); + httptask.url("https://orkes-api-tester.orkesconductor.com/api"); + conductorWorkflow.add(httptask); + SubWorkflow forkSubworkflow = new SubWorkflow(mediationRules.getEnrichments().get(i).getEnrichmentType() + "_subworkflow_ref", conductorWorkflow); forkSubworkflow.input("sendToORB", mediationRules.getEnrichments().get(i).getSendToORB()); enrichmentForkTasks[i][0] = forkSubworkflow; @@ -112,6 +111,11 @@ private WorkflowDef createDynamicSubworkflow(MediationRules mediationRules) thro SubWorkflow forkSubworkflow = new SubWorkflow(mediationRules.getTranslations().get(i).getName() + "_subworkflow_ref", conductorWorkflow); forkSubworkflow.input("sendToORB", mediationRules.getTranslations().get(i).getSendToORB()); conductorWorkflow.setName(mediationRules.getTranslations().get(i).getName() + "_workflow"); + SubWorkflow natsSubworkflow = new SubWorkflow("nats_" + mediationRules.getTranslations().get(i).getName() + "_subworkflow_ref", mediationRules.getTranslations().get(i).getNatsWorkflowName(), mediationRules.getTranslations().get(i).getNatsWorkflowVersion()); + natsSubworkflow.input(mediationRules.getTranslations().get(i).getNatsWorkflowInput()); + Switch sendToORBTranslationSwitch = new Switch("send_to_" + mediationRules.getTranslations().get(i).getName() + "_switch", "${workflow.input.sendToORB}").switchCase("Y", natsSubworkflow).defaultCase(List.of()); + conductorWorkflow.add(sendToORBTranslationSwitch); + for (int j = 0; j < mediationRules.getTranslations().get(i).getEnrichments().size(); j++) { Http httptask = new Http(mediationRules.getTranslations().get(i).getEnrichments().get(j)+ "_translations_workflow_task"); httptask.url("https://orkes-api-tester.orkesconductor.com/api"); @@ -120,10 +124,6 @@ private WorkflowDef createDynamicSubworkflow(MediationRules mediationRules) thro forkSubworkflow.input(mediationRules.getTranslations().get(i).getEnrichments().get(j), outputExpression); conductorWorkflow.add(httptask); } - SubWorkflow natsSubworkflow = new SubWorkflow("nats_" + mediationRules.getTranslations().get(i).getName() + "_subworkflow_ref", mediationRules.getTranslations().get(i).getNatsWorkflowName(), mediationRules.getTranslations().get(i).getNatsWorkflowVersion()); - natsSubworkflow.input(mediationRules.getTranslations().get(i).getNatsWorkflowInput()); - Switch sendToORBTranslationSwitch = new Switch("send_to_" + mediationRules.getTranslations().get(i).getName() + "_switch", "${workflow.input.sendToORB}").switchCase("Y", natsSubworkflow).defaultCase(List.of()); - conductorWorkflow.add(sendToORBTranslationSwitch); translationForkTasks[i][0] = forkSubworkflow; } @@ -137,15 +137,16 @@ private WorkflowDef createDynamicSubworkflow(MediationRules mediationRules) thro for (int i = 0; i < mediationRules.getDistributions().size(); i++) { ConductorWorkflow conductorWorkflow = new ConductorWorkflow(executor); conductorWorkflow.setName(mediationRules.getDistributions().get(i).getDistributeTo() + "_workflow"); - Http httptask = new Http(mediationRules.getDistributions().get(i).getDistributeTo() + "_distributions_workflow_task"); - httptask.url("https://orkes-api-tester.orkesconductor.com/api"); - conductorWorkflow.add(httptask); SubWorkflow natsSubworkflow = new SubWorkflow("nats_" + mediationRules.getDistributions().get(i).getDistributeTo() + "_subworkflow_ref", mediationRules.getDistributions().get(i).getNatsWorkflowName(), mediationRules.getDistributions().get(i).getNatsWorkflowVersion()); natsSubworkflow.input(mediationRules.getDistributions().get(i).getNatsWorkflowInput()); Switch sendToORBDistributionSwitch = new Switch("send_to_" + mediationRules.getDistributions().get(i).getDistributeTo() + "_switch", "${workflow.input.sendToORB}").switchCase("Y", natsSubworkflow).defaultCase(List.of()); conductorWorkflow.add(sendToORBDistributionSwitch); + Http httptask = new Http(mediationRules.getDistributions().get(i).getDistributeTo() + "_distributions_workflow_task"); + httptask.url("https://orkes-api-tester.orkesconductor.com/api"); + conductorWorkflow.add(httptask); + SubWorkflow forkSubworkflow = new SubWorkflow(mediationRules.getDistributions().get(i).getDistributeTo() + "_subworkflow_ref", conductorWorkflow); forkSubworkflow.input("sendToORB", mediationRules.getDistributions().get(i).getSendToORB()); String taskRef = mediationRules.getDistributions().get(i).getTranslation() + "_subworkflow_ref";