Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[24.2] Fix error message when subworkflow input connection missing #19526

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7911,12 +7911,12 @@ class Workflow(Base, Dictifiable, RepresentById):
has_cycles: Mapped[Optional[bool]]
has_errors: Mapped[Optional[bool]]
reports_config: Mapped[Optional[bytes]] = mapped_column(JSONType)
creator_metadata: Mapped[Optional[bytes]] = mapped_column(JSONType)
creator_metadata: Mapped[Optional[List[Dict[str, Any]]]] = mapped_column(JSONType)
license: Mapped[Optional[str]] = mapped_column(TEXT)
source_metadata: Mapped[Optional[Dict[str, str]]] = mapped_column(JSONType)
uuid: Mapped[Optional[Union[UUID, str]]] = mapped_column(UUIDType)

steps = relationship(
steps: Mapped[List["WorkflowStep"]] = relationship(
"WorkflowStep",
back_populates="workflow",
primaryjoin=(lambda: Workflow.id == WorkflowStep.workflow_id),
Expand Down Expand Up @@ -7967,7 +7967,7 @@ def to_dict(self, view="collection", value_mapper=None):
return rval

@property
def steps_by_id(self):
def steps_by_id(self) -> Dict[int, "WorkflowStep"]:
steps = {}
for step in self.steps:
step_id = step.id
Expand Down Expand Up @@ -8142,7 +8142,7 @@ class WorkflowStep(Base, RepresentById, UsesCreateAndUpdateTime):
back_populates="workflow_step",
)
post_job_actions = relationship("PostJobAction", back_populates="workflow_step", cascade_backrefs=False)
inputs = relationship("WorkflowStepInput", back_populates="workflow_step")
inputs: Mapped[List["WorkflowStepInput"]] = relationship("WorkflowStepInput", back_populates="workflow_step")
workflow_outputs: Mapped[List["WorkflowOutput"]] = relationship(
back_populates="workflow_step", cascade_backrefs=False
)
Expand Down Expand Up @@ -8486,16 +8486,16 @@ class WorkflowStepConnection(Base, RepresentById):
output_name: Mapped[Optional[str]] = mapped_column(TEXT)
input_subworkflow_step_id: Mapped[Optional[int]] = mapped_column(ForeignKey("workflow_step.id"), index=True)

input_step_input = relationship(
input_step_input: Mapped["WorkflowStepInput"] = relationship(
"WorkflowStepInput",
back_populates="connections",
cascade="all",
primaryjoin=(lambda: WorkflowStepConnection.input_step_input_id == WorkflowStepInput.id),
)
input_subworkflow_step = relationship(
input_subworkflow_step: Mapped[Optional["WorkflowStep"]] = relationship(
"WorkflowStep", primaryjoin=(lambda: WorkflowStepConnection.input_subworkflow_step_id == WorkflowStep.id)
)
output_step = relationship(
output_step: Mapped["WorkflowStep"] = relationship(
"WorkflowStep",
back_populates="output_connections",
cascade="all",
Expand All @@ -8519,7 +8519,7 @@ def input_name(self):

@property
def input_step(self) -> Optional[WorkflowStep]:
return self.input_step_input and self.input_step_input.workflow_step
return self.input_step_input.workflow_step

@property
def input_step_id(self):
Expand Down Expand Up @@ -8736,7 +8736,7 @@ class WorkflowInvocation(Base, UsesCreateAndUpdateTime, Dictifiable, Serializabl
order_by=lambda: WorkflowInvocationStep.order_index,
cascade_backrefs=False,
)
workflow = relationship("Workflow")
workflow: Mapped[Workflow] = relationship("Workflow")
output_dataset_collections = relationship(
"WorkflowInvocationOutputDatasetCollectionAssociation",
back_populates="workflow_invocation",
Expand Down Expand Up @@ -9659,7 +9659,7 @@ class WorkflowRequestStepState(Base, Dictifiable, Serializable):
ForeignKey("workflow_invocation.id", onupdate="CASCADE", ondelete="CASCADE"), index=True
)
workflow_step_id: Mapped[Optional[int]] = mapped_column(ForeignKey("workflow_step.id"))
value: Mapped[Optional[bytes]] = mapped_column(MutableJSONType)
value: Mapped[Optional[Dict[str, Any]]] = mapped_column(MutableJSONType)
workflow_step: Mapped[Optional["WorkflowStep"]] = relationship()
workflow_invocation: Mapped[Optional["WorkflowInvocation"]] = relationship(back_populates="step_states")

Expand Down
18 changes: 15 additions & 3 deletions lib/galaxy/workflow/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ def invoke(self) -> Dict[int, Any]:
step_delayed = delayed_steps = True
self.progress.mark_step_outputs_delayed(step, why=de.why)
except Exception as e:
log_function = log.exception
log_function = log.error
if isinstance(e, modules.FailWorkflowEvaluation) and e.why.reason in FAILURE_REASONS_EXPECTED:
log_function = log.info
log_function(
Expand Down Expand Up @@ -704,7 +704,8 @@ def subworkflow_progress(
for input_subworkflow_step in subworkflow.input_steps:
connection_found = False
subworkflow_step_id = input_subworkflow_step.id
for input_connection in step.input_connections:
input_connections = step.input_connections
for input_connection in input_connections:
if input_connection.input_subworkflow_step_id == subworkflow_step_id:
is_data = input_connection.output_step.type != "parameter_input"
replacement = self.replacement_for_connection(
Expand All @@ -715,7 +716,18 @@ def subworkflow_progress(
connection_found = True
break

if not connection_found and not input_subworkflow_step.input_optional:
if not input_subworkflow_step.input_optional and not connection_found:

if not input_connections:
# TODO: Prevent this on import / runtime !
raise modules.FailWorkflowEvaluation(
InvocationUnexpectedFailure(
reason=FailureReason.unexpected_failure,
workflow_step_id=step.id,
details="Subworkflow has disconnected required input.",
)
)

raise modules.FailWorkflowEvaluation(
InvocationFailureOutputNotFound(
reason=FailureReason.output_not_found,
Expand Down
2 changes: 2 additions & 0 deletions lib/galaxy/workflow/run_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ def build_workflow_run_configs(
step = steps_by_id[key]
if step.type == "parameter_input":
module_injector.inject(step)
assert step.module
input_param = step.module.get_runtime_inputs(step.module)["input"]
try:
input_param.validate(input_dict, trans=trans)
Expand Down Expand Up @@ -546,6 +547,7 @@ def add_parameter(name: str, value: str, type: WorkflowRequestInputParameter.typ
for step in workflow.steps:
steps_by_id[step.id] = step
assert step.module
assert step.state
serializable_runtime_state = step.module.encode_runtime_state(step, step.state)

step_state = WorkflowRequestStepState()
Expand Down
31 changes: 29 additions & 2 deletions lib/galaxy_test/api/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,8 @@ def _setup_workflow_run(
def _ds_entry(self, history_content):
return self.dataset_populator.ds_entry(history_content)

def _invocation_details(self, workflow_id, invocation_id, **kwds):
invocation_details_response = self._get(f"workflows/{workflow_id}/usage/{invocation_id}", data=kwds)
def _invocation_details(self, workflow_id: Optional[str], invocation_id: str, **kwds):
invocation_details_response = self._get(f"invocations/{invocation_id}", data=kwds)
self._assert_status_code_is(invocation_details_response, 200)
invocation_details = invocation_details_response.json()
return invocation_details
Expand Down Expand Up @@ -3483,6 +3483,33 @@ def filter_jobs_by_tool(tool_id):
)
assert output_filtered["element_count"] == 2, output_filtered

def test_subworkflow_missing_input_connection_error(self):
with self.dataset_populator.test_history() as history_id:
summary = self._run_workflow(
"""
class: GalaxyWorkflow
inputs: []
steps:
subworkflow_step:
run:
class: GalaxyWorkflow
inputs:
my_input:
type: data
steps: []
""",
history_id=history_id,
assert_ok=False,
)
workflow_details = self._invocation_details(summary.workflow_id, summary.invocation_id)
assert workflow_details["messages"] == [
{
"details": "Subworkflow has disconnected required input.",
"reason": "unexpected_failure",
"workflow_step_id": 0,
}
]

def test_workflow_request(self):
workflow = self.workflow_populator.load_workflow(name="test_for_queue")
workflow_request, history_id, workflow_id = self._setup_workflow_run(workflow)
Expand Down
2 changes: 1 addition & 1 deletion test/unit/workflows/test_workflow_progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def _set_previous_progress(self, outputs):

workflow_invocation_step_state = model.WorkflowRequestStepState()
workflow_invocation_step_state.workflow_step_id = step_id
workflow_invocation_step_state.value = cast(bytes, True)
workflow_invocation_step_state.value = {"my_param": True}
self.invocation.step_states.append(workflow_invocation_step_state)

def _step(self, index):
Expand Down
Loading