This example demonstrates how to implement the Saga pattern for realizing distributed transactions across multiple microservices, in a safe and reliable way using change data capture.
Based on the outbox pattern, this implementation avoids unsafe dual writes to a service's database and Apache Kafka by channeling all outgoing messages through the originating service's database and capturing them from there using CDC and Debezium.
There are three services involved:
- order-service: originator and orchestrator of the Saga
- customer-service: validates whether an incoming order is within the customer's credit limit and approves or rejects it accordingly
- payment-service executes the payment associated to an incoming order
Build and start up:
$ mvn clean verify
$ export DEBEZIUM_VERSION=2.1
$ docker-compose up --build
Register the connectors for the different services:
$ http PUT http://localhost:8083/connectors/order-outbox-connector/config < register-order-connector.json
$ http PUT http://localhost:8083/connectors/payment-outbox-connector/config < register-payment-connector.json
$ http PUT http://localhost:8083/connectors/credit-outbox-connector/config < register-credit-connector.json
As an example, here is the connector for capturing outbox events from the order service's database:
{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
/* database coordinates */
"database.hostname": "order-db",
"database.port": "5432",
"database.user": "orderuser",
"database.password": "orderpw",
"database.dbname" : "orderdb",
"topic.prefix": "dbserver1",
/* only capture changes from the outboxevent table */
"schema.include.list": "purchaseorder",
"table.include.list" : "purchaseorder.outboxevent",
"tombstones.on.delete" : "false",
"poll.interval.ms": "100",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
/* apply the outbox event routing SMT */
"transforms" : "saga",
"transforms.saga.type" : "io.debezium.transforms.outbox.EventRouter",
"transforms.saga.route.topic.replacement" : "${routedByValue}.request"
}
Place an order:
$ http POST http://localhost:8080/orders < requests/place-order.json
HTTP/1.1 200 OK
Content-Length: 32
Content-Type: application/json
{
"orderId": 1,
"status": "CREATED"
}
Examine the emitted messages for credit-approval
and payment
in Apache Kafka:
$ docker run --tty --rm \
--network saga-network \
quay.io/debezium/tooling:1.2 \
kafkacat -b kafka:9092 -C -o beginning -q \
-f "{\"key\":%k, \"headers\":\"%h\"}\n%s\n" \
-t credit-approval.request
{"key":17ef865e-39bf-404d-8d35-25c61ae0e082, "headers":"id=6ab3c538-5899-4a61-aa22-ebf5dee45b9d"}
{"order-id":1,"customer-id":456,"payment-due":59,"credit-card-no":"xxxx-yyyy-dddd-aaaa","type":"REQUEST"}
$ docker run --tty --rm \
--network saga-network \
quay.io/debezium/tooling:1.2 \
kafkacat -b kafka:9092 -C -o beginning -q \
-f "{\"key\":%k, \"headers\":\"%h\"}\n%s\n" \
-t payment.request
{"key":17ef865e-39bf-404d-8d35-25c61ae0e082, "headers":"id=e88e463f-047d-49a9-be08-988a1552c571"}
{"order-id":1,"customer-id":456,"payment-due":59,"credit-card-no":"xxxx-yyyy-dddd-aaaa","type":"REQUEST"}
Examine the saga state in the order service's database:
$ docker run --tty --rm -i \
--network saga-network \
quay.io/debezium/tooling:1.2 \
bash -c 'pgcli postgresql://orderuser:orderpw@order-db:5432/orderdb'
select * from purchaseorder.sagastate;
+--------------------------------------+------------------------------------------------------------------------------------------+----------+---------------------------------------------------+-----------------+-----------+
| id | payload | status | stepstate | type | version |
|--------------------------------------+------------------------------------------------------------------------------------------+----------+---------------------------------------------------+-----------------+-----------|
| 17ef865e-39bf-404d-8d35-25c61ae0e082 | {"order-id":1,"customer-id":456,"payment-due":59,"credit-card-no":"xxxx-yyyy-dddd-aaaa","type":"REQUEST"} | COMPLETED | {"credit-approval":"SUCCEEDED","payment":"SUCCEEDED"} | order-placement | 2 |
+--------------------------------------+------------------------------------------------------------------------------------------+----------+---------------------------------------------------+-----------------+-----------+
Alternatively, you also can access pgAdmin on http://localhost:5050 ([email protected]/admin).
Place an order with an invalid credit card number (the payment service rejects any number that ends with "9999"):
$ http POST http://localhost:8080/orders < requests/place-invalid-order1.json
Observe how the saga's state is ABORTED
, with the payment
step in state FAILED
, and the credit-approval
first in state COMPENSATING
, then COMPENSATED
.
Now place an order which exceeds the credit limit (customer 456 has an initial credit limit of $500.00, and this order exceeds this; alternatively, you can place the valid order request a number of times, until the accumlated value exceeds the limit):
$ http POST http://localhost:8080/orders < requests/place-invalid-order2.json
Observe how the saga's state again is ABORTED
, with the step states set accordingly.
Now stop the payment service and place a valid order again:
$ docker-compose stop payment-service
$ http POST http://localhost:8080/orders < requests/place-order.json
Observe how the saga remains in state STARTED
, with the credit-approval
step in state SUCCEEDED
and the payment
step in state STARTED
.
Start the payment service again (docker-compose start payment-service
) and observe how the saga completes.
Set the ADVERTISED_HOST_NAME env variable of the kafka service in docker-compose.yml to the address of your host machine.
$ docker-compose up --build --scale order-service=0 --scale payment-service=0 --scale customer-service=0
$ mvn compile quarkus:dev -f order-service/pom.xml
$ mvn compile quarkus:dev -f payment-service/pom.xml
$ mvn compile quarkus:dev -f customer-service/pom.xml
Listing all topics:
$ docker-compose exec kafka /kafka/bin/kafka-topics.sh --bootstrap-server kafka:9092 --list
Register connector for logging the saga state:
$ http PUT http://localhost:8083/connectors/order-sagastate-connector/config < register-sagastate-connector.json
Examining saga state log:
$ docker run --tty --rm \
--network saga-network \
quay.io/debezium/tooling:1.2 \
kafkacat -b kafka:9092 -C -o beginning -q \
-f "{\"key\":%k, \"headers\":\"%h\"}\n%s\n" \
-t dbserver4.purchaseorder.sagastate
Examining saga execution via Jaeger tracing:
$ open http://localhost:16686/