diff --git a/build.gradle b/build.gradle index 26c9080..888ee49 100644 --- a/build.gradle +++ b/build.gradle @@ -7,7 +7,7 @@ plugins { ext { appVersion = '1.0-SNAPSHOT' versions = [ - orkesClient: '1.0.2', + orkesClient: '2.1.2', guava : '31.0.1-jre', im4java : '1.4.0', log4j : '2.17.1!!', diff --git a/src/main/java/io/orkes/samples/OrkesWorkersApplication.java b/src/main/java/io/orkes/samples/OrkesWorkersApplication.java index 7815cb1..96cda7a 100644 --- a/src/main/java/io/orkes/samples/OrkesWorkersApplication.java +++ b/src/main/java/io/orkes/samples/OrkesWorkersApplication.java @@ -1,6 +1,7 @@ package io.orkes.samples; import com.netflix.conductor.client.worker.Worker; +import com.netflix.conductor.sdk.workflow.executor.WorkflowExecutor; import io.orkes.conductor.client.ApiClient; import io.orkes.conductor.client.OrkesClients; import io.orkes.conductor.client.TaskClient; @@ -118,4 +119,9 @@ private static void loadExternalConfig() throws IOException { }); } + @Bean + public WorkflowExecutor workflowExecutor(OrkesClients orkesClients) { + return new WorkflowExecutor(taskClient(orkesClients), workflowClient(orkesClients), orkesClients.getMetadataClient(), 100); + } + } \ No newline at end of file diff --git a/src/main/java/io/orkes/samples/models/Distribution.java b/src/main/java/io/orkes/samples/models/Distribution.java new file mode 100644 index 0000000..667354d --- /dev/null +++ b/src/main/java/io/orkes/samples/models/Distribution.java @@ -0,0 +1,16 @@ +package io.orkes.samples.models; + +import lombok.Data; + +import java.util.Map; + +@Data +public class Distribution { + private String translation; + private String distributeTo; + private String sendToORB = "Y"; + private String taskType = "HTTP"; + private String natsWorkflowName; + private Integer natsWorkflowVersion; + private Map natsWorkflowInput; +} diff --git a/src/main/java/io/orkes/samples/models/Enrichment.java b/src/main/java/io/orkes/samples/models/Enrichment.java new file mode 100644 index 0000000..b92638f --- /dev/null +++ b/src/main/java/io/orkes/samples/models/Enrichment.java @@ -0,0 +1,15 @@ +package io.orkes.samples.models; + +import lombok.Data; + +import java.util.Map; + +@Data +public class Enrichment { + private String enrichmentType; + private String sendToORB = "N"; + private String taskType = "HTTP"; + private String natsWorkflowName; + private Integer natsWorkflowVersion; + private Map natsWorkflowInput; +} diff --git a/src/main/java/io/orkes/samples/models/MediationRules.java b/src/main/java/io/orkes/samples/models/MediationRules.java new file mode 100644 index 0000000..1e949de --- /dev/null +++ b/src/main/java/io/orkes/samples/models/MediationRules.java @@ -0,0 +1,12 @@ +package io.orkes.samples.models; + +import lombok.Data; + +import java.util.List; + +@Data +public class MediationRules { + private List enrichments; + private List translations; + private List distributions; +} diff --git a/src/main/java/io/orkes/samples/models/Translation.java b/src/main/java/io/orkes/samples/models/Translation.java new file mode 100644 index 0000000..4d124f9 --- /dev/null +++ b/src/main/java/io/orkes/samples/models/Translation.java @@ -0,0 +1,18 @@ +package io.orkes.samples.models; + +import lombok.Data; + +import java.util.List; +import java.util.Map; + +@Data +public class Translation { + private String name; + private List enrichments; + private String sendToORB = "N"; + private String taskType = "HTTP"; + private String natsWorkflowName; + private Integer natsWorkflowVersion; + private Map natsWorkflowInput; + +} diff --git a/src/main/java/io/orkes/samples/workers/DynamicSubworkflowWorker.java b/src/main/java/io/orkes/samples/workers/DynamicSubworkflowWorker.java new file mode 100644 index 0000000..a5b49e5 --- /dev/null +++ b/src/main/java/io/orkes/samples/workers/DynamicSubworkflowWorker.java @@ -0,0 +1,172 @@ +package io.orkes.samples.workers; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.netflix.conductor.client.worker.Worker; +import com.netflix.conductor.common.metadata.tasks.Task; +import com.netflix.conductor.common.metadata.tasks.TaskResult; +import com.netflix.conductor.common.metadata.workflow.StartWorkflowRequest; +import com.netflix.conductor.common.metadata.workflow.WorkflowDef; +import com.netflix.conductor.sdk.workflow.def.ConductorWorkflow; +import com.netflix.conductor.sdk.workflow.def.tasks.*; +import com.netflix.conductor.sdk.workflow.executor.WorkflowExecutor; +import io.orkes.conductor.client.WorkflowClient; +import io.orkes.samples.models.MediationRules; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Component +@Slf4j +@AllArgsConstructor +public class DynamicSubworkflowWorker implements Worker { + + private final WorkflowClient workflowClient; + private final ObjectMapper objectMapper = new ObjectMapper(); + private final WorkflowExecutor executor; + + @Override + public String getTaskDefName() { + return "quest_start_subworkflow"; + } + + /** + * This Worker will start 'dynamic_workflow' workflow and pass the subworkflow definitions using createDynamicSubworkflow() method + * @param task + * @return + */ + @Override + public TaskResult execute(Task task) { + System.out.println("Starting quest_start_subworkflow task"); + TaskResult result = new TaskResult(task); + try { + MediationRules mediationRules = objectMapper.convertValue(task.getInputData().get("mediation_rules"), MediationRules.class); + result.setOutputData(startExistingWorkflow(mediationRules)); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + result.setStatus(TaskResult.Status.COMPLETED); + return result; + } + + public Map startExistingWorkflow(MediationRules mediationRules) throws JsonProcessingException { + StartWorkflowRequest request = new StartWorkflowRequest(); + request.setName("dynamic_workflow"); + Map inputData = new HashMap<>(); + //inputData.put("enrichmentSubworkflowsDef", subworkflowDef()); + Object dynamicSubworkflowDef = objectMapper.convertValue(createDynamicSubworkflow(mediationRules), Object.class); + inputData.put("dynamicSubworkflowDef", dynamicSubworkflowDef); + request.setInput(inputData); + + String workflowId = workflowClient.startWorkflow(request); + log.info("Workflow id: {}", workflowId); + return Map.of("workflowId", workflowId); + } + + private WorkflowDef createDynamicSubworkflow(MediationRules mediationRules) throws JsonProcessingException { + var workflow = new ConductorWorkflow<>(executor); + workflow.setName("dynamic_subworkflows_series"); + workflow.setVersion(1); + workflow.setOwnerEmail("saksham.solanki@orkes.io"); + workflow.setDescription("test"); + workflow.setVariables(Map.of()); + workflow.setDefaultInput(Map.of()); + workflow.setTimeoutPolicy(WorkflowDef.TimeoutPolicy.ALERT_ONLY); + + //ForkTask is having following structure [{}, {}] + + // --------------- Enrichment level started ------------------ + com.netflix.conductor.sdk.workflow.def.tasks.Task[][] enrichmentForkTasks = new com.netflix.conductor.sdk.workflow.def.tasks.Task[mediationRules.getEnrichments().size()][1]; + for (int i = 0; i < mediationRules.getEnrichments().size(); i++) { + ConductorWorkflow conductorWorkflow = new ConductorWorkflow(executor); + conductorWorkflow.setName(mediationRules.getEnrichments().get(i).getEnrichmentType() + "_workflow"); + + SubWorkflow natsSubworkflow = new SubWorkflow("nats_" + mediationRules.getEnrichments().get(i).getEnrichmentType() + "_subworkflow_ref", mediationRules.getEnrichments().get(i).getNatsWorkflowName(), mediationRules.getEnrichments().get(i).getNatsWorkflowVersion()); + natsSubworkflow.input(mediationRules.getEnrichments().get(i).getNatsWorkflowInput()); + Switch sendToORBEnrichmentSwitch = new Switch("send_to_" + mediationRules.getEnrichments().get(i).getEnrichmentType() + "_switch", "${workflow.input.sendToORB}").switchCase("Y", natsSubworkflow).defaultCase(List.of()); + conductorWorkflow.add(sendToORBEnrichmentSwitch); + + Http httptask = new Http(mediationRules.getEnrichments().get(i).getEnrichmentType() + "_enrichment_workflow_task"); + httptask.url("https://orkes-api-tester.orkesconductor.com/api"); + conductorWorkflow.add(httptask); + + SubWorkflow forkSubworkflow = new SubWorkflow(mediationRules.getEnrichments().get(i).getEnrichmentType() + "_subworkflow_ref", conductorWorkflow); + forkSubworkflow.input("sendToORB", mediationRules.getEnrichments().get(i).getSendToORB()); + enrichmentForkTasks[i][0] = forkSubworkflow; + } + ForkJoin forkEnrichment = new ForkJoin("fork_enrichment", enrichmentForkTasks); + workflow.add(forkEnrichment); + // ------------- Enrichment level ended ---------------- + + + // -------------- Translation Level started ---------------- + com.netflix.conductor.sdk.workflow.def.tasks.Task[][] translationForkTasks = new com.netflix.conductor.sdk.workflow.def.tasks.Task[mediationRules.getTranslations().size()][1]; + for (int i = 0; i < mediationRules.getTranslations().size(); i++) { + ConductorWorkflow conductorWorkflow = new ConductorWorkflow(executor); + SubWorkflow forkSubworkflow = new SubWorkflow(mediationRules.getTranslations().get(i).getName() + "_subworkflow_ref", conductorWorkflow); + forkSubworkflow.input("sendToORB", mediationRules.getTranslations().get(i).getSendToORB()); + conductorWorkflow.setName(mediationRules.getTranslations().get(i).getName() + "_workflow"); + SubWorkflow natsSubworkflow = new SubWorkflow("nats_" + mediationRules.getTranslations().get(i).getName() + "_subworkflow_ref", mediationRules.getTranslations().get(i).getNatsWorkflowName(), mediationRules.getTranslations().get(i).getNatsWorkflowVersion()); + natsSubworkflow.input(mediationRules.getTranslations().get(i).getNatsWorkflowInput()); + Switch sendToORBTranslationSwitch = new Switch("send_to_" + mediationRules.getTranslations().get(i).getName() + "_switch", "${workflow.input.sendToORB}").switchCase("Y", natsSubworkflow).defaultCase(List.of()); + conductorWorkflow.add(sendToORBTranslationSwitch); + + for (int j = 0; j < mediationRules.getTranslations().get(i).getEnrichments().size(); j++) { + Http httptask = new Http(mediationRules.getTranslations().get(i).getEnrichments().get(j)+ "_translations_workflow_task"); + httptask.url("https://orkes-api-tester.orkesconductor.com/api"); + String taskRef = mediationRules.getTranslations().get(i).getEnrichments().get(j) + "_subworkflow_ref"; + String outputExpression = "${" + taskRef + ".output.response}"; //Can differ with different different tasks. Example with Simple/Inline tasks we might have to use result + forkSubworkflow.input(mediationRules.getTranslations().get(i).getEnrichments().get(j), outputExpression); + conductorWorkflow.add(httptask); + } + + translationForkTasks[i][0] = forkSubworkflow; + } + ForkJoin forkTranslation = new ForkJoin("fork_translation", translationForkTasks); + workflow.add(forkTranslation); + // ------------ Translation Level Ended ------------------- + + + // -------------- Distribution level started -------------------- + com.netflix.conductor.sdk.workflow.def.tasks.Task[][] distributionForkTasks = new com.netflix.conductor.sdk.workflow.def.tasks.Task[mediationRules.getDistributions().size()][1]; + for (int i = 0; i < mediationRules.getDistributions().size(); i++) { + ConductorWorkflow conductorWorkflow = new ConductorWorkflow(executor); + conductorWorkflow.setName(mediationRules.getDistributions().get(i).getDistributeTo() + "_workflow"); + + SubWorkflow natsSubworkflow = new SubWorkflow("nats_" + mediationRules.getDistributions().get(i).getDistributeTo() + "_subworkflow_ref", mediationRules.getDistributions().get(i).getNatsWorkflowName(), mediationRules.getDistributions().get(i).getNatsWorkflowVersion()); + natsSubworkflow.input(mediationRules.getDistributions().get(i).getNatsWorkflowInput()); + Switch sendToORBDistributionSwitch = new Switch("send_to_" + mediationRules.getDistributions().get(i).getDistributeTo() + "_switch", "${workflow.input.sendToORB}").switchCase("Y", natsSubworkflow).defaultCase(List.of()); + conductorWorkflow.add(sendToORBDistributionSwitch); + + Http httptask = new Http(mediationRules.getDistributions().get(i).getDistributeTo() + "_distributions_workflow_task"); + httptask.url("https://orkes-api-tester.orkesconductor.com/api"); + conductorWorkflow.add(httptask); + + SubWorkflow forkSubworkflow = new SubWorkflow(mediationRules.getDistributions().get(i).getDistributeTo() + "_subworkflow_ref", conductorWorkflow); + forkSubworkflow.input("sendToORB", mediationRules.getDistributions().get(i).getSendToORB()); + String taskRef = mediationRules.getDistributions().get(i).getTranslation() + "_subworkflow_ref"; + String outputExpression = "${" + taskRef + ".output.response}"; //Can differ with different different tasks. Example with Simple/Inline tasks we might have to use result + forkSubworkflow.input(mediationRules.getDistributions().get(i).getTranslation(), outputExpression); + forkSubworkflow.input("sink", "nats:nats-integ:subject"); + distributionForkTasks[i][0] = forkSubworkflow; + } + ForkJoin forkDistribution = new ForkJoin("fork_distribution", distributionForkTasks); + workflow.add(forkDistribution); + // ----------- Distribution level Ended -------------------- + + + WorkflowDef workflowDef = workflow.toWorkflowDef(); + workflowDef.setOutputParameters(Map.of()); + workflowDef.setTimeoutSeconds(0); + workflowDef.setInputTemplate(Map.of()); + workflowDef.setSchemaVersion(2); + workflowDef.setInputParameters(List.of()); + + return workflowDef; + } +}