Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-8501 Support for JDBC offset and schema history stores #80

Merged
merged 2 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.debezium.operator.api.model.source.storage.offset.ConfigMapOffsetStore;
import io.debezium.operator.api.model.source.storage.offset.FileOffsetStore;
import io.debezium.operator.api.model.source.storage.offset.InMemoryOffsetStore;
import io.debezium.operator.api.model.source.storage.offset.JdbcOffsetStore;
import io.debezium.operator.api.model.source.storage.offset.KafkaOffsetStore;
import io.debezium.operator.api.model.source.storage.offset.RedisOffsetStore;
import io.debezium.operator.docs.annotations.Documented;
Expand All @@ -37,6 +38,8 @@ public class Offset implements ConfigMappable<DebeziumServer> {
private RedisOffsetStore redis;
@JsonPropertyDescription("Kafka backing store configuration")
private KafkaOffsetStore kafka;
@JsonPropertyDescription("JDBC backing store configuration")
private JdbcOffsetStore jdbc;
@JsonPropertyDescription("Config map backed offset store configuration")
private ConfigMapOffsetStore configMap;
@JsonPropertyDescription("Arbitrary offset store configuration")
Expand Down Expand Up @@ -85,6 +88,14 @@ public void setKafka(KafkaOffsetStore kafka) {
this.kafka = kafka;
}

public JdbcOffsetStore getJdbc() {
return jdbc;
}

public void setJdbc(JdbcOffsetStore jdbc) {
this.jdbc = jdbc;
}

public ConfigMapOffsetStore getConfigMap() {
return configMap;
}
Expand All @@ -103,7 +114,7 @@ public void setStore(CustomStore store) {

@JsonIgnore
public Store getActiveStore() {
return Stream.of(file, memory, redis, kafka, configMap, store)
return Stream.of(file, memory, redis, kafka, jdbc, configMap, store)
.filter(Objects::nonNull)
.findFirst()
.orElseGet(InMemoryOffsetStore::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.debezium.operator.api.model.source.storage.Store;
import io.debezium.operator.api.model.source.storage.schema.FileSchemaHistoryStore;
import io.debezium.operator.api.model.source.storage.schema.InMemorySchemaHistoryStore;
import io.debezium.operator.api.model.source.storage.schema.JdbcSchemaHistoryStore;
import io.debezium.operator.api.model.source.storage.schema.KafkaSchemaHistoryStore;
import io.debezium.operator.api.model.source.storage.schema.RedisSchemaHistoryStore;
import io.debezium.operator.docs.annotations.Documented;
Expand All @@ -36,6 +37,8 @@ public class SchemaHistory implements ConfigMappable<DebeziumServer> {
private RedisSchemaHistoryStore redis;
@JsonPropertyDescription("Kafka backed schema history store configuration")
private KafkaSchemaHistoryStore kafka;
@JsonPropertyDescription("JDBC backed schema history store configuration")
private JdbcSchemaHistoryStore jdbc;
@JsonPropertyDescription("Arbitrary schema history store configuration")
private CustomStore store;
@JsonPropertyDescription("Additional common schema history store configuration properties.")
Expand Down Expand Up @@ -73,6 +76,14 @@ public void setKafka(KafkaSchemaHistoryStore kafka) {
this.kafka = kafka;
}

public JdbcSchemaHistoryStore getJdbc() {
return jdbc;
}

public void setJdbc(JdbcSchemaHistoryStore jdbc) {
this.jdbc = jdbc;
}

public CustomStore getStore() {
return store;
}
Expand All @@ -91,7 +102,7 @@ public void setConfig(ConfigProperties config) {

@JsonIgnore
public Store getActiveStore() {
return Stream.of(file, memory, redis, kafka, store)
return Stream.of(file, memory, redis, kafka, jdbc, store)
.filter(Objects::nonNull)
.findFirst()
.orElseGet(InMemorySchemaHistoryStore::new);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.operator.api.model.source.storage;

import com.fasterxml.jackson.annotation.JsonPropertyDescription;

import io.debezium.operator.api.config.ConfigMapping;
import io.debezium.operator.api.model.DebeziumServer;

public class JdbcStore extends AbstractStore {
public static final String CONFIG_PREFIX = "jdbc";

@JsonPropertyDescription("JDBC connection URL")
private String url;

@JsonPropertyDescription("Username used to connect to the storage database")
private String user;

@JsonPropertyDescription("Password used to connect to the storage database")
private String password;

@JsonPropertyDescription("Retry delay on connection failure (in milliseconds)")
private long retryDelay;

@JsonPropertyDescription("Maximum number of retries on connection failure")
private int maxRetries;

public JdbcStore(String type) {
super(CONFIG_PREFIX, type);
}

public String getUrl() {
return url;
}

public void setUrl(String url) {
this.url = url;
}

public String getUser() {
return user;
}

public void setUser(String user) {
this.user = user;
}

public String getPassword() {
return password;
}

public void setPassword(String password) {
this.password = password;
}

public long getRetryDelay() {
return retryDelay;
}

public void setRetryDelay(long retryDelay) {
this.retryDelay = retryDelay;
}

public int getMaxRetries() {
return maxRetries;
}

public void setMaxRetries(int maxRetries) {
this.maxRetries = maxRetries;
}

@Override
protected ConfigMapping<DebeziumServer> typeConfiguration(DebeziumServer primary) {
return super.typeConfiguration(primary)
.put("url", url)
.put("user", user)
.put("password", password)
.put("wait.retry.delay.ms", retryDelay)
.put("retry.max.attempts", maxRetries);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.operator.api.model.source.storage.offset;

import com.fasterxml.jackson.annotation.JsonPropertyDescription;

import io.debezium.operator.api.config.ConfigMapping;
import io.debezium.operator.api.model.DebeziumServer;
import io.debezium.operator.api.model.source.storage.JdbcStore;
import io.debezium.operator.docs.annotations.Documented;
import io.sundr.builder.annotations.Buildable;

@Documented
@Buildable(editableEnabled = false, builderPackage = "io.fabric8.kubernetes.api.builder", lazyCollectionInitEnabled = false)
public class JdbcOffsetStore extends JdbcStore {

public static final String TYPE = "io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore";

@JsonPropertyDescription("The configuration of the offset table")
private JdbcOffsetTableConfig table = new JdbcOffsetTableConfig();

public JdbcOffsetStore() {
super(TYPE);
}

public JdbcOffsetTableConfig getTable() {
return table;
}

public void setTable(JdbcOffsetTableConfig table) {
this.table = table;
}

@Override
protected ConfigMapping<DebeziumServer> typeConfiguration(DebeziumServer primary) {
return super.typeConfiguration(primary)
.putAll("table", table);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.operator.api.model.source.storage.offset;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;

import io.debezium.operator.api.config.ConfigMappable;
import io.debezium.operator.api.config.ConfigMapping;
import io.debezium.operator.api.model.DebeziumServer;
import io.debezium.operator.docs.annotations.Documented;
import io.sundr.builder.annotations.Buildable;

@Documented
@Buildable(editableEnabled = false, builderPackage = "io.fabric8.kubernetes.api.builder", lazyCollectionInitEnabled = false)
public class JdbcOffsetTableConfig implements ConfigMappable<DebeziumServer> {

@JsonPropertyDescription("The name of the offset table")
@JsonProperty(required = false)
private String name;

@JsonPropertyDescription("DDL statement to create the offset table")
@JsonProperty(required = false)
private String ddl;

@JsonPropertyDescription("Statement used to select from the offset table")
@JsonProperty(required = false)
private String select;

@JsonPropertyDescription("Statement used to insert into the offset table")
@JsonProperty(required = false)
private String insert;

@JsonPropertyDescription("Statement used to update the offset table")
@JsonProperty(required = false)
private String delete;

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getDdl() {
return ddl;
}

public void setDdl(String ddl) {
this.ddl = ddl;
}

public String getSelect() {
return select;
}

public void setSelect(String select) {
this.select = select;
}

public String getInsert() {
return insert;
}

public void setInsert(String insert) {
this.insert = insert;
}

public String getDelete() {
return delete;
}

public void setDelete(String delete) {
this.delete = delete;
}

@Override
public ConfigMapping<DebeziumServer> asConfiguration(DebeziumServer primary) {
return ConfigMapping.empty(primary)
.put("name", name)
.put("ddl", ddl)
.put("select", select)
.put("insert", insert)
.put("delete", delete);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.operator.api.model.source.storage.schema;

import com.fasterxml.jackson.annotation.JsonPropertyDescription;

import io.debezium.operator.api.config.ConfigMapping;
import io.debezium.operator.api.model.DebeziumServer;
import io.debezium.operator.api.model.source.storage.JdbcStore;
import io.debezium.operator.docs.annotations.Documented;
import io.sundr.builder.annotations.Buildable;

@Documented
@Buildable(editableEnabled = false, builderPackage = "io.fabric8.kubernetes.api.builder", lazyCollectionInitEnabled = false)
public class JdbcSchemaHistoryStore extends JdbcStore {

public static final String TYPE = "io.debezium.storage.jdbc.history.JdbcSchemaHistory";

@JsonPropertyDescription("The configuration of the offset table")
private JdbcSchemaHistoryTableConfig table = new JdbcSchemaHistoryTableConfig();

public JdbcSchemaHistoryStore() {
super(TYPE);
}

public JdbcSchemaHistoryTableConfig getTable() {
return table;
}

public void setTable(JdbcSchemaHistoryTableConfig table) {
this.table = table;
}

@Override
protected ConfigMapping<DebeziumServer> typeConfiguration(DebeziumServer primary) {
return super.typeConfiguration(primary)
.putAll("table", table);
}
}
Loading
Loading