With the release of Debezium version 2.3, incremental snapshot can be triggered in a different way and not only via an insert into signal table. Also Kafka signal, that was available only for MySQL with GTIDs enabled, is now an option for all connectors.
In this example, we create a simple setup in which we insert data into a PostgreSQL database, start a Kafka Connector and finally trigger a snapshot by producing data into the signal topic.
Due to simplicity, we will start an environment without any security.
Start the environment
docker-compose up -d
Check if Connect contains the Debezium connector
curl -s -XGET http://localhost:8083/connector-plugins | jq '.[].class'
Enter PostgreSQL
docker exec -i postgres psql -U myuser -d postgres
Create the data table
CREATE table characters (
id INT PRIMARY KEY,
first_name VARCHAR(50),
last_name VARCHAR(50)
);
INSERT INTO characters VALUES (1, 'luke', 'skywalker');
INSERT INTO characters VALUES (2, 'anakin', 'skywalker');
INSERT INTO characters VALUES (3, 'padmé', 'amidala');
SELECT * FROM characters;
Create Signal Table (still required)
CREATE TABLE debezium_signal (id VARCHAR(100) PRIMARY KEY, type VARCHAR(100) NOT NULL, data VARCHAR(2048) NULL);
For the Kafka signal, we need to add to the connector configuration
"signal.enabled.channels": "source,kafka",
"signal.kafka.topic": "signal-topic",
"signal.kafka.bootstrap.servers": "broker:29092"
signals.consumer.*
Deploy the connector
curl -X POST -H "Content-Type: application/json" --data @connector.json http://localhost:8083/connectors | jq
We consume the topic
kafka-console-consumer --bootstrap-server localhost:9092 --topic test.public.characters --from-beginning
and should see the 3 messages of the characters table.
INSERT INTO debezium_signal (id, type, data) VALUES ('ad-hoc', 'execute-snapshot', '{"data-collections": ["public.characters"],"type":"incremental"}');
Produce into the signal topic
kafka-console-producer --broker-list localhost:9092 --topic signal-topic --property parse.key=true --property key.separator=":"
Ensure that the key equals the topic.prefix configuration.
test:{"type":"execute-snapshot","data": {"data-collections": ["public.characters"], "type": "INCREMENTAL"}}
Now we should see 9 messages in the topic (3 inserts + 3 from the manual snapshot + 3 from the Kafka snapshot).