From 8a282ae3e40adc3282930c267b6669131df2de47 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Mon, 3 Feb 2025 10:37:04 +0100 Subject: [PATCH 1/4] Add/fix type annotation relevant to subworkflow_progress method --- lib/galaxy/model/__init__.py | 20 +++++++++---------- lib/galaxy/workflow/run_request.py | 2 ++ test/unit/workflows/test_workflow_progress.py | 2 +- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/lib/galaxy/model/__init__.py b/lib/galaxy/model/__init__.py index d04f2e062b6c..4e878384b13d 100644 --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -7911,12 +7911,12 @@ class Workflow(Base, Dictifiable, RepresentById): has_cycles: Mapped[Optional[bool]] has_errors: Mapped[Optional[bool]] reports_config: Mapped[Optional[bytes]] = mapped_column(JSONType) - creator_metadata: Mapped[Optional[bytes]] = mapped_column(JSONType) + creator_metadata: Mapped[Optional[List[Dict[str, Any]]]] = mapped_column(JSONType) license: Mapped[Optional[str]] = mapped_column(TEXT) source_metadata: Mapped[Optional[Dict[str, str]]] = mapped_column(JSONType) uuid: Mapped[Optional[Union[UUID, str]]] = mapped_column(UUIDType) - steps = relationship( + steps: Mapped[List["WorkflowStep"]] = relationship( "WorkflowStep", back_populates="workflow", primaryjoin=(lambda: Workflow.id == WorkflowStep.workflow_id), @@ -7967,7 +7967,7 @@ def to_dict(self, view="collection", value_mapper=None): return rval @property - def steps_by_id(self): + def steps_by_id(self) -> Dict[int, "WorkflowStep"]: steps = {} for step in self.steps: step_id = step.id @@ -8142,7 +8142,7 @@ class WorkflowStep(Base, RepresentById, UsesCreateAndUpdateTime): back_populates="workflow_step", ) post_job_actions = relationship("PostJobAction", back_populates="workflow_step", cascade_backrefs=False) - inputs = relationship("WorkflowStepInput", back_populates="workflow_step") + inputs: Mapped[List["WorkflowStepInput"]] = relationship("WorkflowStepInput", back_populates="workflow_step") workflow_outputs: Mapped[List["WorkflowOutput"]] = relationship( back_populates="workflow_step", cascade_backrefs=False ) @@ -8486,16 +8486,16 @@ class WorkflowStepConnection(Base, RepresentById): output_name: Mapped[Optional[str]] = mapped_column(TEXT) input_subworkflow_step_id: Mapped[Optional[int]] = mapped_column(ForeignKey("workflow_step.id"), index=True) - input_step_input = relationship( + input_step_input: Mapped["WorkflowStepInput"] = relationship( "WorkflowStepInput", back_populates="connections", cascade="all", primaryjoin=(lambda: WorkflowStepConnection.input_step_input_id == WorkflowStepInput.id), ) - input_subworkflow_step = relationship( + input_subworkflow_step: Mapped[Optional["WorkflowStep"]] = relationship( "WorkflowStep", primaryjoin=(lambda: WorkflowStepConnection.input_subworkflow_step_id == WorkflowStep.id) ) - output_step = relationship( + output_step: Mapped["WorkflowStep"] = relationship( "WorkflowStep", back_populates="output_connections", cascade="all", @@ -8519,7 +8519,7 @@ def input_name(self): @property def input_step(self) -> Optional[WorkflowStep]: - return self.input_step_input and self.input_step_input.workflow_step + return self.input_step_input.workflow_step @property def input_step_id(self): @@ -8736,7 +8736,7 @@ class WorkflowInvocation(Base, UsesCreateAndUpdateTime, Dictifiable, Serializabl order_by=lambda: WorkflowInvocationStep.order_index, cascade_backrefs=False, ) - workflow = relationship("Workflow") + workflow: Mapped[Workflow] = relationship("Workflow") output_dataset_collections = relationship( "WorkflowInvocationOutputDatasetCollectionAssociation", back_populates="workflow_invocation", @@ -9659,7 +9659,7 @@ class WorkflowRequestStepState(Base, Dictifiable, Serializable): ForeignKey("workflow_invocation.id", onupdate="CASCADE", ondelete="CASCADE"), index=True ) workflow_step_id: Mapped[Optional[int]] = mapped_column(ForeignKey("workflow_step.id")) - value: Mapped[Optional[bytes]] = mapped_column(MutableJSONType) + value: Mapped[Optional[Dict[str, Any]]] = mapped_column(MutableJSONType) workflow_step: Mapped[Optional["WorkflowStep"]] = relationship() workflow_invocation: Mapped[Optional["WorkflowInvocation"]] = relationship(back_populates="step_states") diff --git a/lib/galaxy/workflow/run_request.py b/lib/galaxy/workflow/run_request.py index 94b47feeec31..b448050e66eb 100644 --- a/lib/galaxy/workflow/run_request.py +++ b/lib/galaxy/workflow/run_request.py @@ -378,6 +378,7 @@ def build_workflow_run_configs( step = steps_by_id[key] if step.type == "parameter_input": module_injector.inject(step) + assert step.module input_param = step.module.get_runtime_inputs(step.module)["input"] try: input_param.validate(input_dict, trans=trans) @@ -546,6 +547,7 @@ def add_parameter(name: str, value: str, type: WorkflowRequestInputParameter.typ for step in workflow.steps: steps_by_id[step.id] = step assert step.module + assert step.state serializable_runtime_state = step.module.encode_runtime_state(step, step.state) step_state = WorkflowRequestStepState() diff --git a/test/unit/workflows/test_workflow_progress.py b/test/unit/workflows/test_workflow_progress.py index d0d2f9902e79..5a63ca3f87c1 100644 --- a/test/unit/workflows/test_workflow_progress.py +++ b/test/unit/workflows/test_workflow_progress.py @@ -100,7 +100,7 @@ def _set_previous_progress(self, outputs): workflow_invocation_step_state = model.WorkflowRequestStepState() workflow_invocation_step_state.workflow_step_id = step_id - workflow_invocation_step_state.value = cast(bytes, True) + workflow_invocation_step_state.value = {"my_param": True} self.invocation.step_states.append(workflow_invocation_step_state) def _step(self, index): From 105aead513704748ac4a236952bdf79abdb75c44 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Mon, 3 Feb 2025 11:56:31 +0100 Subject: [PATCH 2/4] Raise appropriate exception when required subworkflow inputs are not connected --- lib/galaxy/workflow/run.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/lib/galaxy/workflow/run.py b/lib/galaxy/workflow/run.py index 6d52a7f6a0cf..0b774aef579c 100644 --- a/lib/galaxy/workflow/run.py +++ b/lib/galaxy/workflow/run.py @@ -704,7 +704,8 @@ def subworkflow_progress( for input_subworkflow_step in subworkflow.input_steps: connection_found = False subworkflow_step_id = input_subworkflow_step.id - for input_connection in step.input_connections: + input_connections = step.input_connections + for input_connection in input_connections: if input_connection.input_subworkflow_step_id == subworkflow_step_id: is_data = input_connection.output_step.type != "parameter_input" replacement = self.replacement_for_connection( @@ -715,7 +716,18 @@ def subworkflow_progress( connection_found = True break - if not connection_found and not input_subworkflow_step.input_optional: + if not input_subworkflow_step.input_optional and not connection_found: + + if not input_connections: + # TODO: Prevent this on import / runtime ! + raise modules.FailWorkflowEvaluation( + InvocationUnexpectedFailure( + reason=FailureReason.unexpected_failure, + workflow_step_id=step.id, + details="Subworkflow has disconnected required input.", + ) + ) + raise modules.FailWorkflowEvaluation( InvocationFailureOutputNotFound( reason=FailureReason.output_not_found, From bcd9bb2cc930ee52f8fa5e9b1ab123d2d1875a66 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Mon, 3 Feb 2025 16:59:54 +0100 Subject: [PATCH 3/4] Add test for disconnected required input --- lib/galaxy_test/api/test_workflows.py | 31 +++++++++++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/lib/galaxy_test/api/test_workflows.py b/lib/galaxy_test/api/test_workflows.py index 48f623a19487..b97cccb9f15f 100644 --- a/lib/galaxy_test/api/test_workflows.py +++ b/lib/galaxy_test/api/test_workflows.py @@ -209,8 +209,8 @@ def _setup_workflow_run( def _ds_entry(self, history_content): return self.dataset_populator.ds_entry(history_content) - def _invocation_details(self, workflow_id, invocation_id, **kwds): - invocation_details_response = self._get(f"workflows/{workflow_id}/usage/{invocation_id}", data=kwds) + def _invocation_details(self, workflow_id: Optional[str], invocation_id: str, **kwds): + invocation_details_response = self._get(f"invocations/{invocation_id}", data=kwds) self._assert_status_code_is(invocation_details_response, 200) invocation_details = invocation_details_response.json() return invocation_details @@ -3483,6 +3483,33 @@ def filter_jobs_by_tool(tool_id): ) assert output_filtered["element_count"] == 2, output_filtered + def test_subworkflow_missing_input_connection_error(self): + with self.dataset_populator.test_history() as history_id: + summary = self._run_workflow( + """ +class: GalaxyWorkflow +inputs: [] +steps: + subworkflow_step: + run: + class: GalaxyWorkflow + inputs: + my_input: + type: data + steps: [] +""", + history_id=history_id, + assert_ok=False, + ) + workflow_details = self._invocation_details(summary.workflow_id, summary.invocation_id) + assert workflow_details["messages"] == [ + { + "details": "Subworkflow has disconnected required input.", + "reason": "unexpected_failure", + "workflow_step_id": 0, + } + ] + def test_workflow_request(self): workflow = self.workflow_populator.load_workflow(name="test_for_queue") workflow_request, history_id, workflow_id = self._setup_workflow_run(workflow) From 2fc6191d124a24cf30153d269daec87cc47f0056 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Mon, 3 Feb 2025 17:00:16 +0100 Subject: [PATCH 4/4] Skip exception context in log message Logging at at error is sufficient, there's no extra information in the stack. --- lib/galaxy/workflow/run.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/galaxy/workflow/run.py b/lib/galaxy/workflow/run.py index 0b774aef579c..cfda4b776eb7 100644 --- a/lib/galaxy/workflow/run.py +++ b/lib/galaxy/workflow/run.py @@ -263,7 +263,7 @@ def invoke(self) -> Dict[int, Any]: step_delayed = delayed_steps = True self.progress.mark_step_outputs_delayed(step, why=de.why) except Exception as e: - log_function = log.exception + log_function = log.error if isinstance(e, modules.FailWorkflowEvaluation) and e.why.reason in FAILURE_REASONS_EXPECTED: log_function = log.info log_function(