Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: transforming workflow failover config #2250

Merged
merged 5 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/src/modules/java-protobuf/pages/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ NOTE: Lightbend provides Tier 1 support for the [.group-java]#Java# [.group-scal

Your development project needs to include the Kalix [.group-java]#Java# [.group-scala]#Scala# Protobuf SDK and logic to start the gRPC server. You define your components in gRPC descriptors and use `protoc` to compile them. Finally, you implement business logic for service components.

To save the work of starting from scratch, the Java xref:java-protobuf:project-template.adoc[code generation tool] creates a project from a template, complete with descriptors and implementations. Or, you can start from one of our fully implemented https://docs.kalix.io/samples/index.html[sample applications].
To save the work of starting from scratch, the Java xref:java-protobuf:project-template.adoc[code generation tool] creates a project from a template, complete with descriptors and implementations.

== Prerequisites

Expand Down
2 changes: 2 additions & 0 deletions docs/src/modules/java-protobuf/pages/workflows.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ include::example$java-protobuf-transfer-workflow-compensation/src/main/java/com/
<1> Sets a failover transition in case of a workflow timeout.
<2> Sets a default failover transition for all steps with maximum number of retries.
<3> Overrides the step recovery strategy for the `deposit` step.
<4> Failover steps should be added like any other steps.

Scala::
+
Expand All @@ -307,6 +308,7 @@ include::example$scala-protobuf-transfer-workflow-compensation/src/main/scala/co
<1> Sets a failover transition in case of a workflow timeout.
<2> Sets a default failover transition for all steps with maximum number of retries.
<3> Overrides the step recovery strategy for the `deposit` step.
<4> Failover steps should be added like any other steps.


NOTE: In case of a workflow timeout one last failover step can be performed. Transitions from that failover step will be ignored.
Expand Down
2 changes: 1 addition & 1 deletion docs/src/modules/java/pages/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Docker:: Kalix requires https://docs.docker.com/get-docker/[Docker {tab-icon}, w

== Getting Started

You can start a new Kalix service using our xref:java:getting-started.adoc[Getting started] guide. If you prefer to first explore a fully implemented Kalix service, you can try one of our https://docs.kalix.io/samples/index.html[beginner samples].
You can start a new Kalix service using our xref:java:getting-started.adoc[Getting started] guide.

On the other hand, if you would rather spend some time exploring our documentation, here are some main features you will find in this section:

Expand Down
1 change: 1 addition & 0 deletions docs/src/modules/java/pages/workflows.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ include::example$java-spring-transfer-workflow-compensation/src/main/java/com/ex
<1> Sets a failover transition in case of a workflow timeout.
<2> Sets a default failover transition for all steps with maximum number of retries.
<3> Overrides the step recovery strategy for the `deposit` step.
<4> Failover steps should be added like any other steps.


NOTE: In case of a workflow timeout one last failover step can be performed. Transitions from that failover step will be ignored.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,10 @@ public WorkflowDef<TransferState> definition() {
.defaultStepRecoverStrategy(maxRetries(1).failoverTo("failover-handler")) // <2>
.addStep(withdraw)
.addStep(deposit, maxRetries(2).failoverTo("compensate-withdraw")) // <3>
// end::recover-strategy[]
.addStep(compensateWithdraw)
.addStep(waitForAcceptation)
.addStep(compensateWithdraw) // <4>
.addStep(failoverHandler);
// end::recover-strategy[]
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,10 @@ public WorkflowDef<TransferState> definition() {
.defaultStepRecoverStrategy(maxRetries(1).failoverTo("failover-handler")) // <2>
.addStep(withdraw)
.addStep(deposit, maxRetries(2).failoverTo("compensate-withdraw")) // <3>
// end::recover-strategy[]
.addStep(compensateWithdraw)
.addStep(waitForAcceptation)
.addStep(compensateWithdraw) // <4>
.addStep(failoverHandler);
// end::recover-strategy[]
}

@PutMapping
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,10 @@ class TransferWorkflow(context: WorkflowContext) extends AbstractTransferWorkflo
.defaultStepRecoverStrategy(maxRetries(1).failoverTo("failover-handler")) // <2>
.addStep(withdraw)
.addStep(deposit, maxRetries(2).failoverTo("compensate-withdraw")) // <3>
// end::recover-strategy[]
.addStep(compensateWithdraw)
.addStep(waitForAcceptation)
.addStep(compensateWithdraw) // <4>
.addStep(failoverHandler);
// end::recover-strategy[]
}

override def start(currentState: TransferState, transfer: Transfer): Effect[Empty] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ public Optional<Step> findByName(String name) {
*/
public WorkflowDef<S> addStep(Step step) {
addStepWithValidation(step);
step.timeout().ifPresent(timeout -> stepConfigs.add(new StepConfig(step.name(), Optional.of(timeout), Optional.empty())));
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,16 @@ private[scalasdk] final class JavaWorkflowAdapter[S >: Null](scalaSdkWorkflow: A
case kalix.scalasdk.impl.workflow.WorkflowEffectImpl.TransitionalEffectImpl(javaEffect) => javaEffect
}
})
javaWorkflowDef.addStep(javaCallStep)
scalaDefinition.stepConfigs
.find(_.stepName == callStep.name)
.flatMap { stepConfig =>
stepConfig.timeout.map(_.toJava).foreach(javaCallStep.timeout)
stepConfig.recoverStrategy
} match {
case Some(recoverStrategy) => javaWorkflowDef.addStep(javaCallStep, convertToJava(recoverStrategy))
case None => javaWorkflowDef.addStep(javaCallStep)
}

case asyncCallStep: AsyncCallStep[Any @unchecked, Any @unchecked, Any @unchecked] =>
val javaAsyncCallStep = new javasdk.workflow.AbstractWorkflow.AsyncCallStep(
asyncCallStep.name,
Expand All @@ -95,7 +104,16 @@ private[scalasdk] final class JavaWorkflowAdapter[S >: Null](scalaSdkWorkflow: A
case kalix.scalasdk.impl.workflow.WorkflowEffectImpl.TransitionalEffectImpl(javaEffect) => javaEffect
}
})
javaWorkflowDef.addStep(javaAsyncCallStep)

scalaDefinition.stepConfigs
.find(_.stepName == asyncCallStep.name)
.flatMap { stepConfig =>
stepConfig.timeout.map(_.toJava).foreach(javaAsyncCallStep.timeout)
stepConfig.recoverStrategy
} match {
case Some(recoverStrategy) => javaWorkflowDef.addStep(javaAsyncCallStep, convertToJava(recoverStrategy))
case None => javaWorkflowDef.addStep(javaAsyncCallStep)
}
}
scalaDefinition.workflowTimeout.map(_.toJava).foreach(javaWorkflowDef.timeout)
scalaDefinition.stepTimeout.map(_.toJava).foreach(javaWorkflowDef.defaultStepTimeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ object AbstractWorkflow {
*/
def addStep(step: AbstractWorkflow.Step): AbstractWorkflow.WorkflowDef[S] = {
addStepWithValidation(step)
step.timeout.foreach(timeout =>
_stepConfigs.addOne(AbstractWorkflow.StepConfig(step.name, Option(timeout), None)))
this
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright (C) 2021-2024 Lightbend Inc. <https://www.lightbend.com>
*/

package kalix.scalasdk.impl.workflow

import java.time.Duration
import java.util.concurrent.CompletableFuture

import scala.concurrent.Future
import scala.concurrent.duration.DurationInt
import scala.jdk.CollectionConverters.CollectionHasAsScala

import com.google.protobuf.empty.Empty
import kalix.javasdk.impl.GrpcDeferredCall
import kalix.javasdk.impl.MetadataImpl
import kalix.scalasdk.impl.ScalaDeferredCallAdapter
import kalix.scalasdk.workflow.AbstractWorkflow
import kalix.scalasdk.workflow.AbstractWorkflow.maxRetries
import kalix.scalasdk.workflow.ProtoWorkflow
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

class JavaWorkflowAdapterSpec extends AnyWordSpec with Matchers {

"JavaWorkflowAdapter" should {
"convert scala workflow definition to java" in {
val adapted = new JavaWorkflowAdapter(new DummyWorkflow)

val definition = adapted.definition()
val steps = definition.getSteps.asScala
steps should have size 2
val step1 = steps.find(_.name() == "step1").get
val step2 = steps.find(_.name() == "step2").get
step1.timeout() shouldBe empty
step2.timeout().get() shouldBe Duration.ofSeconds(10)

val stepConfigs = definition.getStepConfigs.asScala
val step1Config = stepConfigs.find(_.stepName == "step1").get
val step2Config = stepConfigs.find(_.stepName == "step2").get

step1Config.recoverStrategy.get().maxRetries shouldBe 2
step1Config.recoverStrategy.get().failoverStepName shouldBe "step2"
step1Config.timeout.isPresent shouldBe false

step2Config.recoverStrategy.isPresent shouldBe false
step2Config.timeout.get() shouldBe Duration.ofSeconds(10)

definition.getStepTimeout().get() shouldBe Duration.ofSeconds(3)
definition.getStepRecoverStrategy.get().maxRetries shouldBe 3
definition.getStepRecoverStrategy.get().failoverStepName shouldBe "step1"
definition.getFailoverStepName.get() shouldBe "step3"
definition.getFailoverMaxRetries.get().maxRetries shouldBe 10
definition.getWorkflowTimeout.get() shouldBe Duration.ofSeconds(7)
}
}
}

class DummyWorkflow extends ProtoWorkflow[Empty] {
override def emptyState: Empty = Empty()

override def definition: AbstractWorkflow.WorkflowDef[Empty] = {
val step1 = step("step1")
.call { _: Empty =>
ScalaDeferredCallAdapter(
GrpcDeferredCall(
Empty(),
MetadataImpl.Empty,
"service1",
"method1",
_ => CompletableFuture.completedFuture(Empty())))
}
.andThen(_ => effects.end)

val step2 = step("step2")
.asyncCall { _: Empty =>
Future.successful(Empty())
}
.andThen(_ => effects.end)
.timeout(10.seconds)

workflow
.timeout(7.seconds)
.defaultStepTimeout(3.seconds)
.defaultStepRecoverStrategy(maxRetries(3).failoverTo("step1"))
.failoverTo("step3", maxRetries(10))
.addStep(step1, maxRetries(2).failoverTo("step2"))
.addStep(step2)
}
}
2 changes: 1 addition & 1 deletion styles/config/vocabularies/Base/accept.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ dev
facto
enum
env
failover
[Ff]ailover
[Gg]itHub
googleCloud
grpcui
Expand Down
Loading