From 3b0b52a1931a1db65de2a411af8c11e16abe9aa1 Mon Sep 17 00:00:00 2001 From: Luning Wang Date: Fri, 2 Dec 2022 16:11:28 +0800 Subject: [PATCH] Flink: Support log store --- .../org/apache/iceberg/io/WriteResult.java | 35 ++++- flink-example/build.gradle | 79 ++++++++++ .../apache/iceberg/flink/LogStoreExample.java | 97 +++++++++++++ flink/v1.16/build.gradle | 10 ++ .../iceberg/flink/FlinkConfigOptions.java | 12 ++ .../flink/FlinkDynamicTableFactory.java | 87 ++++++++++- .../iceberg/flink/IcebergTableSink.java | 68 ++++++--- .../kafka/KafkaLogDeserializationSchema.java | 101 +++++++++++++ .../iceberg/flink/kafka/KafkaLogOptions.java | 40 +++++ .../kafka/KafkaLogSerializationSchema.java | 103 +++++++++++++ .../flink/kafka/KafkaLogSinkProvider.java | 71 +++++++++ .../flink/kafka/KafkaLogSourceProvider.java | 116 +++++++++++++++ .../flink/kafka/KafkaLogStoreFactory.java | 137 ++++++++++++++++++ .../flink/kafka/KafkaSinkFunction.java | 77 ++++++++++ .../iceberg/flink/log/LogSinkFunction.java | 43 ++++++ .../iceberg/flink/log/LogSinkProvider.java | 32 ++++ .../iceberg/flink/log/LogSourceProvider.java | 36 +++++ .../flink/log/LogStoreOffsetsUtils.java | 48 ++++++ .../flink/log/LogStoreTableFactory.java | 93 ++++++++++++ .../iceberg/flink/log/LogWriteCallback.java | 47 ++++++ .../iceberg/flink/sink/DeltaManifests.java | 14 +- .../flink/sink/DeltaManifestsSerializer.java | 6 +- .../iceberg/flink/sink/FlinkManifestUtil.java | 9 +- .../apache/iceberg/flink/sink/FlinkSink.java | 20 ++- .../flink/sink/IcebergFilesCommitter.java | 36 ++++- .../flink/sink/IcebergStreamWriter.java | 124 +++++++++++++++- .../iceberg/flink/source/IcebergSource.java | 52 ++++++- .../flink/source/IcebergTableSource.java | 85 ++++++++--- .../org.apache.flink.table.factories.Factory | 2 + .../flink/sink/TestCompressionSettings.java | 2 +- .../flink/sink/TestIcebergStreamWriter.java | 2 +- .../source/TestFlinkLogStoreSourceSql.java | 99 +++++++++++++ settings.gradle | 2 + 33 files changed, 1726 insertions(+), 59 deletions(-) create mode 100644 flink-example/build.gradle create mode 100644 flink-example/src/main/java/org/apache/iceberg/flink/LogStoreExample.java create mode 100644 flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/kafka/KafkaLogDeserializationSchema.java create mode 100644 flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/kafka/KafkaLogOptions.java create mode 100644 flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/kafka/KafkaLogSerializationSchema.java create mode 100644 flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/kafka/KafkaLogSinkProvider.java create mode 100644 flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/kafka/KafkaLogSourceProvider.java create mode 100644 flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/kafka/KafkaLogStoreFactory.java create mode 100644 flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/kafka/KafkaSinkFunction.java create mode 100644 flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/log/LogSinkFunction.java create mode 100644 flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/log/LogSinkProvider.java create mode 100644 flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/log/LogSourceProvider.java create mode 100644 flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/log/LogStoreOffsetsUtils.java create mode 100644 flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/log/LogStoreTableFactory.java create mode 100644 flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/log/LogWriteCallback.java create mode 100644 flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkLogStoreSourceSql.java diff --git a/core/src/main/java/org/apache/iceberg/io/WriteResult.java b/core/src/main/java/org/apache/iceberg/io/WriteResult.java index c73b09f46c53..818b2db540f7 100644 --- a/core/src/main/java/org/apache/iceberg/io/WriteResult.java +++ b/core/src/main/java/org/apache/iceberg/io/WriteResult.java @@ -21,10 +21,12 @@ import java.io.Serializable; import java.util.Collections; import java.util.List; +import java.util.Map; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.CharSequenceSet; public class WriteResult implements Serializable { @@ -32,11 +34,17 @@ public class WriteResult implements Serializable { private DeleteFile[] deleteFiles; private CharSequence[] referencedDataFiles; + private Map logStorePartitionOffsets; + private WriteResult( - List dataFiles, List deleteFiles, CharSequenceSet referencedDataFiles) { + List dataFiles, + List deleteFiles, + CharSequenceSet referencedDataFiles, + Map logStorePartitionOffsets) { this.dataFiles = dataFiles.toArray(new DataFile[0]); this.deleteFiles = deleteFiles.toArray(new DeleteFile[0]); this.referencedDataFiles = referencedDataFiles.toArray(new CharSequence[0]); + this.logStorePartitionOffsets = logStorePartitionOffsets; } public DataFile[] dataFiles() { @@ -51,6 +59,14 @@ public CharSequence[] referencedDataFiles() { return referencedDataFiles; } + public Map logStorePartitionOffsets() { + return logStorePartitionOffsets; + } + + public void setLogStorePartitionOffsets(Map logStorePartitionOffsets) { + this.logStorePartitionOffsets = logStorePartitionOffsets; + } + public static Builder builder() { return new Builder(); } @@ -59,18 +75,20 @@ public static class Builder { private final List dataFiles; private final List deleteFiles; private final CharSequenceSet referencedDataFiles; + private Map logStorePartitionOffsets; private Builder() { this.dataFiles = Lists.newArrayList(); this.deleteFiles = Lists.newArrayList(); this.referencedDataFiles = CharSequenceSet.empty(); + this.logStorePartitionOffsets = Maps.newHashMap(); } public Builder add(WriteResult result) { addDataFiles(result.dataFiles); addDeleteFiles(result.deleteFiles); addReferencedDataFiles(result.referencedDataFiles); - + addOffsets(result.logStorePartitionOffsets); return this; } @@ -109,8 +127,19 @@ public Builder addReferencedDataFiles(Iterable files) { return this; } + public Builder addOffsets(Map newLogStorePartitionOffsets) { + for (Map.Entry entry : newLogStorePartitionOffsets.entrySet()) { + Long oldOffset = this.logStorePartitionOffsets.get(entry.getKey()); + Long newOffset = entry.getValue(); + if (oldOffset == null || oldOffset < newOffset) { + this.logStorePartitionOffsets.put(entry.getKey(), newOffset); + } + } + return this; + } + public WriteResult build() { - return new WriteResult(dataFiles, deleteFiles, referencedDataFiles); + return new WriteResult(dataFiles, deleteFiles, referencedDataFiles, logStorePartitionOffsets); } } } diff --git a/flink-example/build.gradle b/flink-example/build.gradle new file mode 100644 index 000000000000..bcb60e4ef73a --- /dev/null +++ b/flink-example/build.gradle @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +String flinkVersion = '1.16.0' +String flinkMajorVersion = '1.16' +String scalaVersion = System.getProperty("scalaVersion") != null ? System.getProperty("scalaVersion") : System.getProperty("defaultScalaVersion") + +project(":iceberg-flink-example") { + apply plugin: 'com.github.johnrengelman.shadow' + + tasks.jar.dependsOn tasks.shadowJar + + dependencies { + implementation project(":iceberg-flink:iceberg-flink-runtime-${flinkMajorVersion}") + implementation "org.apache.flink:flink-table-api-java:${flinkVersion}" + implementation "org.apache.flink:flink-table-api-java-bridge:${flinkVersion}" + implementation 'org.apache.flink:flink-runtime:1.16.0' + implementation 'org.apache.flink:flink-table-runtime:1.16.0' +// implementation 'org.apache.flink:flink-table-planner-loader:1.16.0' + implementation "org.apache.flink:flink-sql-connector-hive-2.3.9_2.12:1.16.0" + implementation 'org.apache.flink:flink-json:1.16.0' + implementation "org.apache.flink:flink-connector-kafka:${flinkVersion}" + implementation "org.apache.flink:flink-connector-base:${flinkVersion}" + implementation "org.apache.flink:flink-connector-files:${flinkVersion}" + implementation "org.apache.flink:flink-clients:1.16.0" + implementation "org.apache.hadoop:hadoop-client" + implementation 'org.apache.flink:flink-runtime-web:1.16.0' + implementation 'org.apache.flink:flink-sql-gateway-api:1.16.0' + implementation 'org.apache.flink:flink-table-planner_2.12:1.16.0' + implementation 'org.apache.flink:flink-csv:1.16.0' + } + + shadowJar { + configurations = [project.configurations.runtimeClasspath] + + zip64 true + + // include the LICENSE and NOTICE files for the shaded Jar + from(projectDir) { + include 'LICENSE' + include 'NOTICE' + } + + // Relocate dependencies to avoid conflicts + relocate 'org.apache.avro', 'org.apache.iceberg.shaded.org.apache.avro' + relocate 'org.apache.parquet', 'org.apache.iceberg.shaded.org.apache.parquet' + relocate 'com.google', 'org.apache.iceberg.shaded.com.google' + relocate 'com.fasterxml', 'org.apache.iceberg.shaded.com.fasterxml' + relocate 'com.github.benmanes', 'org.apache.iceberg.shaded.com.github.benmanes' + relocate 'org.checkerframework', 'org.apache.iceberg.shaded.org.checkerframework' + relocate 'shaded.parquet', 'org.apache.iceberg.shaded.org.apache.parquet.shaded' + relocate 'org.apache.orc', 'org.apache.iceberg.shaded.org.apache.orc' + relocate 'io.airlift', 'org.apache.iceberg.shaded.io.airlift' + relocate 'org.threeten.extra', 'org.apache.iceberg.shaded.org.threeten.extra' + relocate 'org.apache.httpcomponents.client5', 'org.apache.iceberg.shaded.org.apache.httpcomponents.client5' + + classifier null + } + + jar { + enabled = false + } +} diff --git a/flink-example/src/main/java/org/apache/iceberg/flink/LogStoreExample.java b/flink-example/src/main/java/org/apache/iceberg/flink/LogStoreExample.java new file mode 100644 index 000000000000..6400ce7ba3ee --- /dev/null +++ b/flink-example/src/main/java/org/apache/iceberg/flink/LogStoreExample.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; + +public class LogStoreExample { + + private LogStoreExample() {} + + public static void main(String[] args) throws Exception { + + Configuration configuration = new Configuration(); +// configuration.setString("table.exec.iceberg.use-flip27-source", "true"); + configuration.setString("execution.checkpointing.interval", "60s"); + configuration.setString("state.checkpoint-storage", "jobmanager"); + configuration.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); + + StreamExecutionEnvironment env = + StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration); + TableEnvironment tEnv = StreamTableEnvironment.create(env); + + tEnv.executeSql( + "CREATE CATALOG hive_catalog WITH (\n" + + " 'type'='iceberg',\n" + + " 'uri'='thrift://flink03:9083',\n" + + " 'warehouse'='hdfs://ns1/dtInsight/hive/warehouse'\n" + + ")"); + + tEnv.executeSql("USE CATALOG hive_catalog"); + + tEnv.executeSql("USE sec_index"); + + tEnv.executeSql("CREATE TABLE default_catalog.default_database.f (\n" + + " id BIGINT,\n" + + " name STRING\n" + + ") WITH (\n" + + " 'connector' = 'filesystem',\n" + + " 'path' = 'file:///Users/ada/tmp/log-store',\n" + + " 'format' = 'csv'\n" + + ")"); + + // tEnv.executeSql("CREATE TABLE log_store_v2 (\n" + + // " id BIGINT COMMENT 'unique id',\n" + + // " name STRING\n" + + // ") WITH (\n" + + // " 'format-version' = '2',\n" + + // " 'log-store' = 'kafka',\n" + + // " 'kafka.bootstrap.servers' = '172.16.100.109:9092',\n" + + // " 'kafka.topic' = 'log-store.v2'\n" + + // ")"); + +// tEnv.executeSql("ALTER TABLE log_store_v1 SET ('log-store.kafka.bootstrap.servers'='172.16.100.109:9092')"); +// tEnv.executeSql("ALTER TABLE log_store_v1 SET ('log-store.kafka.topic'='log-store.v2')"); + + // tEnv.executeSql("INSERT INTO log_store_v2 VALUES (3, 'bar')"); +// tEnv.executeSql( +// "SELECT * FROM log_store_v2 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/") +// .print(); + + tEnv.executeSql( + "INSERT INTO default_catalog.default_database.f SELECT * FROM log_store_v2 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'log-store'='none') */") + ; + +// tEnv.executeSql( +// "INSERT INTO default_catalog.default_database.f VALUES(1, 'foo')"); +// ; + +// tEnv.executeSql( +// "SELECT * FROM log_store_v2 /*+ OPTIONS('log-store'='') */") +// .print(); + } +} diff --git a/flink/v1.16/build.gradle b/flink/v1.16/build.gradle index 139faf986825..a7f92057772b 100644 --- a/flink/v1.16/build.gradle +++ b/flink/v1.16/build.gradle @@ -33,6 +33,8 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") { implementation project(':iceberg-parquet') implementation project(':iceberg-hive-metastore') + implementation "org.apache.flink:flink-connector-kafka:${flinkVersion}" + // for dropwizard histogram metrics implementation compileOnly "org.apache.flink:flink-metrics-dropwizard:${flinkVersion}" compileOnly "org.apache.flink:flink-streaming-java:${flinkVersion}" @@ -80,6 +82,14 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") { testImplementation project(path: ':iceberg-core', configuration: 'testArtifacts') testImplementation project(path: ':iceberg-data', configuration: 'testArtifacts') + testImplementation("org.apache.kafka:kafka_${scalaVersion}:2.8.1") { + exclude group: 'org.slf4j', module: 'slf4j-log4j12' + exclude group: 'com.fasterxml.jackson.core' + exclude group: 'com.fasterxml.jackson.dataformat' + exclude group: 'com.fasterxml.jackson.module' + exclude group: 'com.fasterxml.jackson.datatype' + } + // By default, hive-exec is a fat/uber jar and it exports a guava library // that's really old. We use the core classifier to be able to override our guava // version. Luckily, hive-exec seems to work okay so far with this version of guava diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java index 7c7afd24ed8e..09c91809cd91 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java @@ -104,4 +104,16 @@ private FlinkConfigOptions() {} SplitAssignerType.SIMPLE + ": simple assigner that doesn't provide any guarantee on order or locality.")) .build()); + + public static final ConfigOption LOG_KEY_FORMAT = + ConfigOptions.key("log.key.format") + .stringType() + .defaultValue("json") + .withDescription("Specify the key message format of log system with primary key."); + + public static final ConfigOption LOG_FORMAT = + ConfigOptions.key("log.format") + .stringType() + .defaultValue("json") + .withDescription("Specify the message format of log system."); } diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java index 3f04d35e851c..38d23fa8cb37 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java @@ -23,6 +23,8 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.description.Description; +import org.apache.flink.configuration.description.TextElement; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogDatabaseImpl; @@ -35,10 +37,12 @@ import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.factories.DynamicTableSinkFactory; import org.apache.flink.table.factories.DynamicTableSourceFactory; +import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.utils.TableSchemaUtils; import org.apache.flink.util.Preconditions; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.flink.log.LogStoreTableFactory; import org.apache.iceberg.flink.source.IcebergTableSource; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; @@ -47,6 +51,8 @@ public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, DynamicTableSourceFactory { static final String FACTORY_IDENTIFIER = "iceberg"; + public static final String NONE = "none"; + private static final ConfigOption CATALOG_NAME = ConfigOptions.key("catalog-name") .stringType() @@ -71,6 +77,27 @@ public class FlinkDynamicTableFactory .noDefaultValue() .withDescription("Table name managed in the underlying iceberg catalog and database."); + public static final ConfigOption LOG_STORE = + ConfigOptions.key("log-store") + .stringType() + .defaultValue(NONE) + .withDescription( + Description.builder() + .text("The log system used to keep changes of the table.") + .linebreak() + .linebreak() + .text("Possible values:") + .linebreak() + .list( + TextElement.text( + "\"none\": No log system, the data is written only to file store," + + " and the streaming read will be directly read from the file store.")) + .list( + TextElement.text( + "\"kafka\": Kafka log system, the data is double written to file" + + " store and kafka, and the streaming read will be read from kafka.")) + .build()); + private final FlinkCatalog catalog; public FlinkDynamicTableFactory() { @@ -100,7 +127,22 @@ public DynamicTableSource createDynamicTableSource(Context context) { objectIdentifier.getObjectName()); } - return new IcebergTableSource(tableLoader, tableSchema, tableProps, context.getConfiguration()); + String logSystemType = tableProps.get(LOG_STORE.key()); + if (logSystemType.equalsIgnoreCase(NONE)) { + return new IcebergTableSource( + tableLoader, tableSchema, tableProps, context.getConfiguration(), null, null); + } else { + LogStoreTableFactory logStoreTableFactory = + FactoryUtil.discoverFactory( + context.getClassLoader(), LogStoreTableFactory.class, logSystemType); + return new IcebergTableSource( + tableLoader, + tableSchema, + tableProps, + context.getConfiguration(), + logStoreTableFactory, + context); + } } @Override @@ -119,7 +161,22 @@ public DynamicTableSink createDynamicTableSink(Context context) { catalogTable, writeProps, objectPath.getDatabaseName(), objectPath.getObjectName()); } - return new IcebergTableSink(tableLoader, tableSchema, context.getConfiguration(), writeProps); + String logSystemType = writeProps.get(LOG_STORE.key()); + if (logSystemType.equalsIgnoreCase(NONE)) { + return new IcebergTableSink( + tableLoader, tableSchema, context.getConfiguration(), writeProps, null, context); + } else { + LogStoreTableFactory logStoreTableFactory = + FactoryUtil.discoverFactory( + context.getClassLoader(), LogStoreTableFactory.class, logSystemType); + return new IcebergTableSink( + tableLoader, + tableSchema, + context.getConfiguration(), + writeProps, + logStoreTableFactory, + context); + } } @Override @@ -135,6 +192,7 @@ public Set> optionalOptions() { Set> options = Sets.newHashSet(); options.add(CATALOG_DATABASE); options.add(CATALOG_TABLE); + options.add(LOG_STORE); return options; } @@ -203,4 +261,29 @@ private static TableLoader createTableLoader(FlinkCatalog catalog, ObjectPath ob Preconditions.checkNotNull(catalog, "Flink catalog cannot be null"); return TableLoader.fromCatalog(catalog.getCatalogLoader(), catalog.toIdentifier(objectPath)); } + + // private static Map stripPrefix(Map tableOptions, String + // prefix) { + // Map options = Maps.newHashMap(); + // tableOptions.keySet().stream() + // .filter(key -> key.startsWith(prefix)) + // .forEach( + // key -> { + // final String value = tableOptions.get(key); + // final String subKey = key.substring(prefix.length()); + // options.put(subKey, value); + // }); + // return options; + // } + + // private static Context newContext(Context context, Map newOptions) { + // return new FactoryUtil.DefaultDynamicTableContext( + // context.getObjectIdentifier(), + // context.getCatalogTable().copy(newOptions), + // // TODO What options we should use + // newOptions, + // context.getConfiguration(), + // context.getClassLoader(), + // context.isTemporary()); + // } } diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java index 1b9268569d9a..c7ab805fc549 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java @@ -32,8 +32,11 @@ import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite; import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DynamicTableFactory; import org.apache.flink.types.RowKind; import org.apache.flink.util.Preconditions; +import org.apache.iceberg.flink.log.LogSinkProvider; +import org.apache.iceberg.flink.log.LogStoreTableFactory; import org.apache.iceberg.flink.sink.FlinkSink; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -45,48 +48,77 @@ public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning, private boolean overwrite = false; + private final LogStoreTableFactory logStoreTableFactory; + + private final DynamicTableFactory.Context context; + private IcebergTableSink(IcebergTableSink toCopy) { this.tableLoader = toCopy.tableLoader; this.tableSchema = toCopy.tableSchema; this.overwrite = toCopy.overwrite; this.readableConfig = toCopy.readableConfig; this.writeProps = toCopy.writeProps; + this.logStoreTableFactory = toCopy.logStoreTableFactory; + this.context = toCopy.context; } public IcebergTableSink( TableLoader tableLoader, TableSchema tableSchema, ReadableConfig readableConfig, - Map writeProps) { + Map writeProps, + LogStoreTableFactory logStoreTableFactory, + DynamicTableFactory.Context context) { this.tableLoader = tableLoader; this.tableSchema = tableSchema; this.readableConfig = readableConfig; this.writeProps = writeProps; + this.logStoreTableFactory = logStoreTableFactory; + this.context = context; } @Override - public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { + public SinkRuntimeProvider getSinkRuntimeProvider(Context sinkContext) { Preconditions.checkState( - !overwrite || context.isBounded(), + !overwrite || sinkContext.isBounded(), "Unbounded data stream doesn't support overwrite operation."); - List equalityColumns = tableSchema.getPrimaryKey().map(UniqueConstraint::getColumns).orElseGet(ImmutableList::of); - return new DataStreamSinkProvider() { - @Override - public DataStreamSink consumeDataStream( - ProviderContext providerContext, DataStream dataStream) { - return FlinkSink.forRowData(dataStream) - .tableLoader(tableLoader) - .tableSchema(tableSchema) - .equalityFieldColumns(equalityColumns) - .overwrite(overwrite) - .setAll(writeProps) - .flinkConf(readableConfig) - .append(); - } - }; + if (logStoreTableFactory != null) { + LogSinkProvider logSinkProvider = + logStoreTableFactory.createSinkProvider(this.context, sinkContext); + return new DataStreamSinkProvider() { + @Override + public DataStreamSink consumeDataStream( + ProviderContext providerContext, DataStream dataStream) { + return FlinkSink.forRowData(dataStream) + .tableLoader(tableLoader) + .tableSchema(tableSchema) + .equalityFieldColumns(equalityColumns) + .overwrite(overwrite) + .setAll(writeProps) + .flinkConf(readableConfig) + .logSinkProvider(logSinkProvider) + .append(); + } + }; + } else { + return new DataStreamSinkProvider() { + @Override + public DataStreamSink consumeDataStream( + ProviderContext providerContext, DataStream dataStream) { + return FlinkSink.forRowData(dataStream) + .tableLoader(tableLoader) + .tableSchema(tableSchema) + .equalityFieldColumns(equalityColumns) + .overwrite(overwrite) + .setAll(writeProps) + .flinkConf(readableConfig) + .append(); + } + }; + } } @Override diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/kafka/KafkaLogDeserializationSchema.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/kafka/KafkaLogDeserializationSchema.java new file mode 100644 index 000000000000..249389a832bb --- /dev/null +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/kafka/KafkaLogDeserializationSchema.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.kafka; + +import java.util.List; +import javax.annotation.Nullable; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.Collector; +import org.apache.iceberg.Schema; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.RowDataWrapper; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.TypeUtil; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +public class KafkaLogDeserializationSchema implements KafkaDeserializationSchema { + + private static final long serialVersionUID = 1L; + private final Schema schema; + private final List equalityFieldIds; + @Nullable private final DeserializationSchema primaryKeyDeserializer; + private final DeserializationSchema valueDeserializer; + @Nullable private RowDataWrapper keyWrapper; + + public KafkaLogDeserializationSchema( + Schema schema, + List equalityFieldIds, + @Nullable DeserializationSchema primaryKeyDeserializer, + DeserializationSchema valueDeserializer) { + this.schema = schema; + this.equalityFieldIds = equalityFieldIds; + this.primaryKeyDeserializer = primaryKeyDeserializer; + this.valueDeserializer = valueDeserializer; + if (equalityFieldIds != null && !equalityFieldIds.isEmpty()) { + Schema deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)); + this.keyWrapper = + new RowDataWrapper(FlinkSchemaUtil.convert(deleteSchema), deleteSchema.asStruct()); + } + } + + @Override + public void open(DeserializationSchema.InitializationContext context) throws Exception { + if (primaryKeyDeserializer != null) { + primaryKeyDeserializer.open(context); + } + valueDeserializer.open(context); + } + + @Override + public boolean isEndOfStream(RowData nextElement) { + return false; + } + + @Override + public RowData deserialize(ConsumerRecord record) { + throw new RuntimeException( + "Please invoke DeserializationSchema#deserialize(byte[], Collector) instead."); + } + + @Override + public void deserialize(ConsumerRecord record, Collector underCollector) + throws Exception { + if (equalityFieldIds.size() > 0 && record.value() == null) { + RowData key = primaryKeyDeserializer.deserialize(record.key()); + keyWrapper.wrap(key); + GenericRowData result = new GenericRowData(RowKind.DELETE, schema.columns().size()); + for (int i = 0; i < equalityFieldIds.size(); i++) { + result.setField(equalityFieldIds.get(i), keyWrapper.get(i, Object.class)); + } + underCollector.collect(result); + } else { + valueDeserializer.deserialize(record.value(), underCollector); + } + } + + @Override + public TypeInformation getProducedType() { + return null; + } +} diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/kafka/KafkaLogOptions.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/kafka/KafkaLogOptions.java new file mode 100644 index 000000000000..62fb1d18b855 --- /dev/null +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/kafka/KafkaLogOptions.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.kafka; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +/** Options for kafka log. */ +public class KafkaLogOptions { + + private KafkaLogOptions() {} + + public static final ConfigOption BOOTSTRAP_SERVERS = + ConfigOptions.key("kafka.bootstrap.servers") + .stringType() + .noDefaultValue() + .withDescription("Required Kafka server connection string."); + + public static final ConfigOption TOPIC = + ConfigOptions.key("kafka.topic") + .stringType() + .noDefaultValue() + .withDescription("Topic of this kafka table."); +} diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/kafka/KafkaLogSerializationSchema.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/kafka/KafkaLogSerializationSchema.java new file mode 100644 index 000000000000..3237ce17b62d --- /dev/null +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/kafka/KafkaLogSerializationSchema.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.kafka; + +import java.util.List; +import javax.annotation.Nullable; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.RowKind; +import org.apache.iceberg.Schema; +import org.apache.iceberg.flink.data.RowDataProjection; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.TypeUtil; +import org.apache.kafka.clients.producer.ProducerRecord; + +public class KafkaLogSerializationSchema implements KafkaSerializationSchema { + + private static final long serialVersionUID = 1L; + + private final String topic; + @Nullable private final SerializationSchema primaryKeySerializer; + private final SerializationSchema valueSerializer; + @Nullable private Schema deleteSchema; + @Nullable private RowDataProjection keyProjection; + private final boolean upsert; + + public KafkaLogSerializationSchema( + String topic, + @Nullable SerializationSchema keySerializer, + SerializationSchema valueSerializer, + Schema schema, + RowType flinkSchema, + List equalityFieldIds, + boolean upsert) { + this.topic = topic; + this.primaryKeySerializer = keySerializer; + this.valueSerializer = valueSerializer; + if (equalityFieldIds != null && !equalityFieldIds.isEmpty()) { + this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)); + this.keyProjection = RowDataProjection.create(schema, deleteSchema); + } + this.upsert = upsert; + } + + @Override + public void open(SerializationSchema.InitializationContext context) throws Exception { + if (primaryKeySerializer != null) { + primaryKeySerializer.open(context); + } + valueSerializer.open(context); + } + + @Override + public ProducerRecord serialize(RowData element, @Nullable Long timestamp) { + RowKind kind = element.getRowKind(); + + byte[] primaryKeyBytes = null; + byte[] valueBytes = null; + if (primaryKeySerializer != null && upsert) { + primaryKeyBytes = primaryKeySerializer.serialize(keyProjection.wrap(element)); + // TODO accord with File writer? + if (kind == RowKind.INSERT || kind == RowKind.UPDATE_AFTER) { + valueBytes = valueSerializer.serialize(element); + } else if (kind == RowKind.DELETE || kind == RowKind.UPDATE_BEFORE) { + primaryKeyBytes = primaryKeySerializer.serialize(keyProjection.wrap(element)); + valueBytes = null; + } + } else { + valueBytes = valueSerializer.serialize(element); + } + + return new ProducerRecord<>(topic, null, primaryKeyBytes, valueBytes); + } + + static RowData createProjectedRow( + RowData consumedRow, RowKind kind, RowData.FieldGetter[] fieldGetters) { + final int arity = fieldGetters.length; + final GenericRowData genericRowData = new GenericRowData(kind, arity); + for (int fieldPos = 0; fieldPos < arity; fieldPos++) { + genericRowData.setField(fieldPos, fieldGetters[fieldPos].getFieldOrNull(consumedRow)); + } + return genericRowData; + } +} diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/kafka/KafkaLogSinkProvider.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/kafka/KafkaLogSinkProvider.java new file mode 100644 index 000000000000..9b21cf89cdf0 --- /dev/null +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/kafka/KafkaLogSinkProvider.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.kafka; + +import java.util.List; +import java.util.Properties; +import javax.annotation.Nullable; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.log.LogSinkFunction; +import org.apache.iceberg.flink.log.LogSinkProvider; + +public class KafkaLogSinkProvider implements LogSinkProvider { + + private static final long serialVersionUID = 1L; + + private final String topic; + + private final Properties properties; + + @Nullable private final SerializationSchema keySerializer; + + private final SerializationSchema valueSerializer; + + public KafkaLogSinkProvider( + String topic, + Properties properties, + @Nullable SerializationSchema keySerializer, + SerializationSchema valueSerializer) { + this.topic = topic; + this.properties = properties; + this.keySerializer = keySerializer; + this.valueSerializer = valueSerializer; + } + + @Override + public LogSinkFunction createSink( + Table table, RowType flinkRowType, List equalityFieldIds, boolean upsert) { + return new KafkaSinkFunction( + topic, + createSerializationSchema(table.schema(), flinkRowType, equalityFieldIds, upsert), + properties); + } + + @VisibleForTesting + KafkaLogSerializationSchema createSerializationSchema( + Schema schema, RowType flinkRowType, List equalityFieldIds, boolean upsert) { + return new KafkaLogSerializationSchema( + topic, keySerializer, valueSerializer, schema, flinkRowType, equalityFieldIds, upsert); + } +} diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/kafka/KafkaLogSourceProvider.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/kafka/KafkaLogSourceProvider.java new file mode 100644 index 000000000000..29d6b866f7c0 --- /dev/null +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/kafka/KafkaLogSourceProvider.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.kafka; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; +import javax.annotation.Nullable; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.connector.kafka.source.KafkaSource; +import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; +import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.log.LogSourceProvider; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.kafka.common.TopicPartition; + +public class KafkaLogSourceProvider implements LogSourceProvider { + + private static final long serialVersionUID = 1L; + + private final String topic; + + private final Properties properties; + + private final DataType physicalType; + + private final int[] primaryKey; + + @Nullable private final DeserializationSchema primaryKeyDeserializer; + + private final DeserializationSchema valueDeserializer; + + public KafkaLogSourceProvider( + String topic, + Properties properties, + DataType physicalType, + int[] primaryKey, + @Nullable DeserializationSchema primaryKeyDeserializer, + DeserializationSchema valueDeserializer) { + this.topic = topic; + this.properties = properties; + this.physicalType = physicalType; + this.primaryKey = primaryKey; + this.primaryKeyDeserializer = primaryKeyDeserializer; + this.valueDeserializer = valueDeserializer; + } + + @Override + public KafkaSource createSource( + @Nullable Map offsets, TableLoader tableLoader) { + + Table table; + try (TableLoader loader = tableLoader) { + loader.open(); + table = tableLoader.loadTable(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + return KafkaSource.builder() + .setTopics(topic) + .setStartingOffsets(toOffsetsInitializer(offsets)) + .setProperties(properties) + .setDeserializer(createDeserializationSchema(table)) + .setGroupId(UUID.randomUUID().toString()) + .build(); + } + + @VisibleForTesting + KafkaRecordDeserializationSchema createDeserializationSchema(Table table) { + Schema schema = table.schema(); + List equalityFieldIds = Lists.newArrayList(schema.identifierFieldIds()); + return KafkaRecordDeserializationSchema.of( + new KafkaLogDeserializationSchema( + schema, equalityFieldIds, primaryKeyDeserializer, valueDeserializer)); + } + + private OffsetsInitializer toOffsetsInitializer(@Nullable Map partitionOffsets) { + return partitionOffsets == null + ? OffsetsInitializer.earliest() + : OffsetsInitializer.offsets(toKafkaOffsets(partitionOffsets)); + } + + private Map toKafkaOffsets(Map partitionOffsets) { + Map offsets = Maps.newHashMap(); + partitionOffsets.forEach( + (bucket, offset) -> offsets.put(new TopicPartition(topic, bucket), offset)); + return offsets; + } +} diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/kafka/KafkaLogStoreFactory.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/kafka/KafkaLogStoreFactory.java new file mode 100644 index 000000000000..a3cf03371f4e --- /dev/null +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/kafka/KafkaLogStoreFactory.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.kafka; + +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.UniqueConstraint; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DynamicTableFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.utils.DataTypeUtils; +import org.apache.iceberg.flink.log.LogStoreTableFactory; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; + +public class KafkaLogStoreFactory implements LogStoreTableFactory { + + public static final String IDENTIFIER = "kafka"; + + public static final String KAFKA_PREFIX = IDENTIFIER + "."; + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + Set> options = Sets.newHashSet(); + options.add(KafkaLogOptions.BOOTSTRAP_SERVERS); + return options; + } + + @Override + public Set> optionalOptions() { + return Sets.newHashSet(); + } + + private String topic(DynamicTableFactory.Context context) { + return context.getCatalogTable().getOptions().get(KafkaLogOptions.TOPIC.key()); + } + + @Override + public KafkaLogSourceProvider createSourceProvider( + DynamicTableFactory.Context context, DynamicTableSource.Context sourceContext) { + FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + ResolvedSchema schema = context.getCatalogTable().getResolvedSchema(); + DataType physicalType = schema.toPhysicalRowDataType(); + + DeserializationSchema primaryKeyDeserializer = null; + int[] primaryKey = getPrimaryKeyIndexes(schema); + if (primaryKey.length > 0) { + DataType keyType = DataTypeUtils.projectRow(physicalType, primaryKey); + primaryKeyDeserializer = + LogStoreTableFactory.getKeyDecodingFormat(helper) + .createRuntimeDecoder(sourceContext, keyType); + } + DeserializationSchema valueDeserializer = + LogStoreTableFactory.getValueDecodingFormat(helper) + .createRuntimeDecoder(sourceContext, physicalType); + + return new KafkaLogSourceProvider( + topic(context), + toKafkaProperties(helper.getOptions()), + physicalType, + primaryKey, + primaryKeyDeserializer, + valueDeserializer); + } + + @Override + public KafkaLogSinkProvider createSinkProvider( + DynamicTableFactory.Context context, DynamicTableSink.Context sinkContext) { + FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + ResolvedSchema schema = context.getCatalogTable().getResolvedSchema(); + DataType physicalType = schema.toPhysicalRowDataType(); + + SerializationSchema keySerializer = null; + int[] primaryKey = getPrimaryKeyIndexes(schema); + if (primaryKey.length > 0) { + DataType keyType = DataTypeUtils.projectRow(physicalType, primaryKey); + keySerializer = + LogStoreTableFactory.getKeyEncodingFormat(helper) + .createRuntimeEncoder(sinkContext, keyType); + } + SerializationSchema valueSerializer = + LogStoreTableFactory.getValueEncodingFormat(helper) + .createRuntimeEncoder(sinkContext, physicalType); + + return new KafkaLogSinkProvider( + topic(context), toKafkaProperties(helper.getOptions()), keySerializer, valueSerializer); + } + + private int[] getPrimaryKeyIndexes(ResolvedSchema schema) { + final List columns = schema.getColumnNames(); + return schema + .getPrimaryKey() + .map(UniqueConstraint::getColumns) + .map(pkColumns -> pkColumns.stream().mapToInt(columns::indexOf).toArray()) + .orElseGet(() -> new int[] {}); + } + + public static Properties toKafkaProperties(ReadableConfig options) { + Properties properties = new Properties(); + Map optionMap = ((Configuration) options).toMap(); + optionMap.keySet().stream() + .filter(key -> key.startsWith(KAFKA_PREFIX)) + .forEach(key -> properties.put(key.substring(KAFKA_PREFIX.length()), optionMap.get(key))); + return properties; + } +} diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/kafka/KafkaSinkFunction.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/kafka/KafkaSinkFunction.java new file mode 100644 index 000000000000..f1df0007ea43 --- /dev/null +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/kafka/KafkaSinkFunction.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.kafka; + +import java.util.Properties; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaException; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; +import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema; +import org.apache.flink.table.data.RowData; +import org.apache.iceberg.flink.log.LogSinkFunction; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.ProducerRecord; + +public class KafkaSinkFunction extends FlinkKafkaProducer + implements LogSinkFunction { + + private LogSinkFunction.WriteCallback writeCallback; + + /** + * Creates a {@link KafkaSinkFunction} for a given topic. The sink produces its input to the + * topic. It accepts a {@link KafkaSerializationSchema} for serializing records to a {@link + * ProducerRecord}, including partitioning information. + * + * @param defaultTopic The default topic to write data to + * @param serializationSchema A serializable serialization schema for turning user objects into a + * kafka-consumable byte[] supporting key/value messages + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is + * the only required argument. + */ + public KafkaSinkFunction( + String defaultTopic, + KafkaSerializationSchema serializationSchema, + Properties producerConfig) { + super(defaultTopic, serializationSchema, producerConfig, Semantic.AT_LEAST_ONCE); + } + + @Override + public void setWriteCallback(LogSinkFunction.WriteCallback writeCallback) { + this.writeCallback = writeCallback; + } + + @Override + public void open(Configuration configuration) throws Exception { + super.open(configuration); + Callback baseCallback = Preconditions.checkNotNull(callback, "Base callback can not be null"); + callback = + (metadata, exception) -> { + if (writeCallback != null) { + writeCallback.onCompletion(metadata.partition(), metadata.offset()); + } + baseCallback.onCompletion(metadata, exception); + }; + } + + @Override + public void flush() throws FlinkKafkaException { + super.preCommit(super.currentTransaction()); + } +} diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/log/LogSinkFunction.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/log/LogSinkFunction.java new file mode 100644 index 000000000000..a2f9aa058fef --- /dev/null +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/log/LogSinkFunction.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.log; + +import org.apache.flink.streaming.api.functions.sink.SinkFunction; + +public interface LogSinkFunction extends SinkFunction { + + void setWriteCallback(WriteCallback writeCallback); + + /** Flush pending records. */ + void flush() throws Exception; + + /** + * A callback interface that the user can implement to know the offset of the bucket when the + * request is complete. + */ + interface WriteCallback { + + /** + * A callback method the user can implement to provide asynchronous handling of request + * completion. This method will be called when the record sent to the server has been + * acknowledged. + */ + void onCompletion(int partition, long offset); + } +} diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/log/LogSinkProvider.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/log/LogSinkProvider.java new file mode 100644 index 000000000000..b60b1feff344 --- /dev/null +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/log/LogSinkProvider.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.log; + +import java.io.Serializable; +import java.util.List; +import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.Table; + +/** A {@link Serializable} sink provider for log store. */ +public interface LogSinkProvider extends Serializable { + + /** Creates a {@link LogSinkFunction} instance. */ + LogSinkFunction createSink( + Table table, RowType flinkRowType, List equalityFieldIds, boolean upsert); +} diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/log/LogSourceProvider.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/log/LogSourceProvider.java new file mode 100644 index 000000000000..8ac13ca865cd --- /dev/null +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/log/LogSourceProvider.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.log; + +import java.io.Serializable; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.table.data.RowData; +import org.apache.iceberg.flink.TableLoader; + +public interface LogSourceProvider extends Serializable { + /** + * Creates a {@link Source} instance. + * + * @param partitionOffsets configure to specify the startup offset. + */ + Source createSource( + @Nullable Map partitionOffsets, TableLoader loader); +} diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/log/LogStoreOffsetsUtils.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/log/LogStoreOffsetsUtils.java new file mode 100644 index 000000000000..af96186451db --- /dev/null +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/log/LogStoreOffsetsUtils.java @@ -0,0 +1,48 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, + * * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * * KIND, either express or implied. See the License for the + * * specific language governing permissions and limitations + * * under the License. + * + */ +package org.apache.iceberg.flink.log; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +public class LogStoreOffsetsUtils { + + private LogStoreOffsetsUtils() {} + + public static String offsetsToString(Map offsets) { + List lists = Lists.newArrayList(); + offsets.forEach((k, v) -> lists.add(k + ":" + v)); + return String.join(",", lists); + } + + public static Map stringToOffsets(String str) { + if (str == null || str.length() == 0) { + return Maps.newHashMap(); + } + return Arrays.stream(str.split(",")) + .map(s -> s.split(":")) + .collect(Collectors.toMap(s -> Integer.parseInt(s[0]), s -> Long.parseLong(s[1]))); + } +} diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/log/LogStoreTableFactory.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/log/LogStoreTableFactory.java new file mode 100644 index 000000000000..bd57e71a1ff8 --- /dev/null +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/log/LogStoreTableFactory.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.log; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.format.EncodingFormat; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DeserializationFormatFactory; +import org.apache.flink.table.factories.DynamicTableFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.factories.SerializationFormatFactory; +import org.apache.iceberg.flink.FlinkConfigOptions; + +/** + * Base interface for configuring a default log table connector. The log table is used by managed + * table factory. + * + *

Log tables are for processing only unbounded data. Support streaming reading and streaming + * writing. + */ +public interface LogStoreTableFactory extends DynamicTableFactory { + + String LOG_STORE_OFFSETS = "log-offsets"; + + /** + * Creates a {@link LogSourceProvider} instance from a {@link CatalogTable} and additional context + * information. + */ + LogSourceProvider createSourceProvider( + DynamicTableFactory.Context context, DynamicTableSource.Context sourceContext); + + /** + * Creates a {@link LogSinkProvider} instance from a {@link CatalogTable} and additional context + * information. + */ + LogSinkProvider createSinkProvider( + DynamicTableFactory.Context context, DynamicTableSink.Context sinkContext); + + // -------------------------------------------------------------------------------------------- + + static DecodingFormat> getKeyDecodingFormat( + FactoryUtil.TableFactoryHelper helper) { + DecodingFormat> format = + helper.discoverDecodingFormat( + DeserializationFormatFactory.class, FlinkConfigOptions.LOG_KEY_FORMAT); + return format; + } + + static EncodingFormat> getKeyEncodingFormat( + FactoryUtil.TableFactoryHelper helper) { + EncodingFormat> format = + helper.discoverEncodingFormat( + SerializationFormatFactory.class, FlinkConfigOptions.LOG_KEY_FORMAT); + return format; + } + + static DecodingFormat> getValueDecodingFormat( + FactoryUtil.TableFactoryHelper helper) { + DecodingFormat> format = + helper.discoverDecodingFormat( + DeserializationFormatFactory.class, FlinkConfigOptions.LOG_FORMAT); + return format; + } + + static EncodingFormat> getValueEncodingFormat( + FactoryUtil.TableFactoryHelper helper) { + EncodingFormat> format = + helper.discoverEncodingFormat( + SerializationFormatFactory.class, FlinkConfigOptions.LOG_FORMAT); + return format; + } +} diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/log/LogWriteCallback.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/log/LogWriteCallback.java new file mode 100644 index 000000000000..5c035848d524 --- /dev/null +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/log/LogWriteCallback.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.log; + +import java.util.Map; +import java.util.concurrent.atomic.LongAccumulator; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +public class LogWriteCallback implements LogSinkFunction.WriteCallback { + + private final Map offsetMap = Maps.newConcurrentMap(); + + @Override + public void onCompletion(int partition, long offset) { + LongAccumulator acc = offsetMap.get(partition); + if (acc == null) { + // computeIfAbsent will lock on the key + acc = offsetMap.computeIfAbsent(partition, k -> new LongAccumulator(Long::max, 0)); + } // else lock free + + // Save the next offset, what we need to provide to the hybrid reading is the starting + // offset of the next transaction + acc.accumulate(offset + 1); + } + + public Map offsets() { + Map offsets = Maps.newHashMap(); + offsetMap.forEach((k, v) -> offsets.put(k, v.longValue())); + return offsets; + } +} diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifests.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifests.java index 036970c06d5b..79bf043d274a 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifests.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifests.java @@ -31,17 +31,23 @@ class DeltaManifests { private final ManifestFile deleteManifest; private final CharSequence[] referencedDataFiles; + private final String logStoreOffsets; + DeltaManifests(ManifestFile dataManifest, ManifestFile deleteManifest) { - this(dataManifest, deleteManifest, EMPTY_REF_DATA_FILES); + this(dataManifest, deleteManifest, EMPTY_REF_DATA_FILES, ""); } DeltaManifests( - ManifestFile dataManifest, ManifestFile deleteManifest, CharSequence[] referencedDataFiles) { + ManifestFile dataManifest, + ManifestFile deleteManifest, + CharSequence[] referencedDataFiles, + String logStoreOffsets) { Preconditions.checkNotNull(referencedDataFiles, "Referenced data files shouldn't be null."); this.dataManifest = dataManifest; this.deleteManifest = deleteManifest; this.referencedDataFiles = referencedDataFiles; + this.logStoreOffsets = logStoreOffsets; } ManifestFile dataManifest() { @@ -56,6 +62,10 @@ CharSequence[] referencedDataFiles() { return referencedDataFiles; } + String logStoreOffsets() { + return logStoreOffsets; + } + List manifests() { List manifests = Lists.newArrayListWithCapacity(2); if (dataManifest != null) { diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifestsSerializer.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifestsSerializer.java index c4d6e713bb73..191e74bf7bff 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifestsSerializer.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifestsSerializer.java @@ -70,6 +70,8 @@ public byte[] serialize(DeltaManifests deltaManifests) throws IOException { out.writeUTF(referencedDataFiles[i].toString()); } + out.writeUTF(deltaManifests.logStoreOffsets()); + return binaryOut.toByteArray(); } @@ -117,6 +119,8 @@ private DeltaManifests deserializeV2(byte[] serialized) throws IOException { referencedDataFiles[i] = in.readUTF(); } - return new DeltaManifests(dataManifest, deleteManifest, referencedDataFiles); + String kafkaOffsets = in.readUTF(); + + return new DeltaManifests(dataManifest, deleteManifest, referencedDataFiles, kafkaOffsets); } } diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java index 25badc372abf..f887b282c4f9 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.function.Supplier; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; @@ -30,6 +31,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; import org.apache.iceberg.TableOperations; +import org.apache.iceberg.flink.log.LogStoreOffsetsUtils; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFile; @@ -101,7 +103,9 @@ static DeltaManifests writeCompletedFiles( deleteManifest = deleteManifestWriter.toManifestFile(); } - return new DeltaManifests(dataManifest, deleteManifest, result.referencedDataFiles()); + String kafkaOffsets = LogStoreOffsetsUtils.offsetsToString(result.logStorePartitionOffsets()); + return new DeltaManifests( + dataManifest, deleteManifest, result.referencedDataFiles(), kafkaOffsets); } static WriteResult readCompletedFiles(DeltaManifests deltaManifests, FileIO io) @@ -121,6 +125,9 @@ static WriteResult readCompletedFiles(DeltaManifests deltaManifests, FileIO io) } } + Map offsets = + LogStoreOffsetsUtils.stringToOffsets(deltaManifests.logStoreOffsets()); + builder.addOffsets(offsets); return builder.addReferencedDataFiles(deltaManifests.referencedDataFiles()).build(); } } diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java index 81706e582413..af7a72aa941f 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java @@ -59,6 +59,7 @@ import org.apache.iceberg.flink.FlinkWriteConf; import org.apache.iceberg.flink.FlinkWriteOptions; import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.log.LogSinkProvider; import org.apache.iceberg.flink.util.FlinkCompatibilityUtil; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; @@ -140,6 +141,8 @@ public static class Builder { private final Map writeOptions = Maps.newHashMap(); private FlinkWriteConf flinkWriteConf = null; + private LogSinkProvider logSinkProvider; + private Builder() {} private Builder forRowData(DataStream newRowDataInput) { @@ -316,6 +319,11 @@ public Builder setSnapshotProperty(String property, String value) { return this; } + public Builder logSinkProvider(LogSinkProvider newLogSinkProvider) { + this.logSinkProvider = newLogSinkProvider; + return this; + } + private DataStreamSink chainIcebergOperators() { Preconditions.checkArgument( inputCreator != null, @@ -456,7 +464,8 @@ private SingleOutputStreamOperator appendWriter( } IcebergStreamWriter streamWriter = - createStreamWriter(table, flinkWriteConf, flinkRowType, equalityFieldIds); + createStreamWriter( + table, flinkWriteConf, flinkRowType, equalityFieldIds, logSinkProvider); int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism; SingleOutputStreamOperator writerStream = @@ -567,7 +576,8 @@ static IcebergStreamWriter createStreamWriter( Table table, FlinkWriteConf flinkWriteConf, RowType flinkRowType, - List equalityFieldIds) { + List equalityFieldIds, + LogSinkProvider logSinkProvider) { Preconditions.checkArgument(table != null, "Iceberg table shouldn't be null"); Table serializableTable = SerializableTable.copyOf(table); @@ -581,7 +591,11 @@ static IcebergStreamWriter createStreamWriter( writeProperties(table, format, flinkWriteConf), equalityFieldIds, flinkWriteConf.upsertMode()); - return new IcebergStreamWriter<>(table.name(), taskWriterFactory); + return new IcebergStreamWriter<>( + table.name(), + taskWriterFactory, + logSinkProvider.createSink( + table, flinkRowType, equalityFieldIds, flinkWriteConf.upsertMode())); } /** diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java index 8aa2c0304eb0..674de4c79060 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java @@ -47,6 +47,8 @@ import org.apache.iceberg.SnapshotUpdate; import org.apache.iceberg.Table; import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.log.LogStoreOffsetsUtils; +import org.apache.iceberg.flink.log.LogStoreTableFactory; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -305,14 +307,25 @@ private void replacePartitions( summary.deleteFilesCount() == 0, "Cannot overwrite partitions with delete files."); // Commit the overwrite transaction. ReplacePartitions dynamicOverwrite = table.newReplacePartitions().scanManifestsWith(workerPool); + + WriteResult.Builder builder = WriteResult.builder(); for (WriteResult result : pendingResults.values()) { Preconditions.checkState( result.referencedDataFiles().length == 0, "Should have no referenced data files."); Arrays.stream(result.dataFiles()).forEach(dynamicOverwrite::addFile); + builder.add(result); } + String logStorePartitionOffsets = + LogStoreOffsetsUtils.offsetsToString(builder.build().logStorePartitionOffsets()); + LOG.info("replacePartitions logStorePartitionOffsets: {}", logStorePartitionOffsets); commitOperation( - dynamicOverwrite, summary, "dynamic partition overwrite", newFlinkJobId, checkpointId); + dynamicOverwrite, + summary, + "dynamic partition overwrite", + newFlinkJobId, + checkpointId, + logStorePartitionOffsets); } private void commitDeltaTxn( @@ -323,13 +336,21 @@ private void commitDeltaTxn( if (summary.deleteFilesCount() == 0) { // To be compatible with iceberg format V1. AppendFiles appendFiles = table.newAppend().scanManifestsWith(workerPool); + + WriteResult.Builder builder = WriteResult.builder(); for (WriteResult result : pendingResults.values()) { Preconditions.checkState( result.referencedDataFiles().length == 0, "Should have no referenced data files for append."); Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile); + builder.add(result); } - commitOperation(appendFiles, summary, "append", newFlinkJobId, checkpointId); + + String logStorePartitionOffsets = + LogStoreOffsetsUtils.offsetsToString(builder.build().logStorePartitionOffsets()); + LOG.info("commitDeltaTxn no deletes logStorePartitionOffsets: {}", logStorePartitionOffsets); + commitOperation( + appendFiles, summary, "append", newFlinkJobId, checkpointId, logStorePartitionOffsets); } else { // To be compatible with iceberg format V2. for (Map.Entry e : pendingResults.entrySet()) { @@ -350,7 +371,12 @@ private void commitDeltaTxn( Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows); Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes); - commitOperation(rowDelta, summary, "rowDelta", newFlinkJobId, e.getKey()); + String logStorePartitionOffsets = + LogStoreOffsetsUtils.offsetsToString(result.logStorePartitionOffsets()); + LOG.info( + "commitDeltaTxn has deletes logStorePartitionOffsets: {}", logStorePartitionOffsets); + commitOperation( + rowDelta, summary, "rowDelta", newFlinkJobId, e.getKey(), logStorePartitionOffsets); } } } @@ -360,13 +386,15 @@ private void commitOperation( CommitSummary summary, String description, String newFlinkJobId, - long checkpointId) { + long checkpointId, + String logStoreOffsets) { LOG.info("Committing {} to table {} with summary: {}", description, table.name(), summary); snapshotProperties.forEach(operation::set); // custom snapshot metadata properties will be overridden if they conflict with internal ones // used by the sink. operation.set(MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId)); operation.set(FLINK_JOB_ID, newFlinkJobId); + operation.set(LogStoreTableFactory.LOG_STORE_OFFSETS, logStoreOffsets); long startNano = System.nanoTime(); operation.commit(); // abort is automatically called if this fails. diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java index 9ea0349fb057..18508f564dc3 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java @@ -20,11 +20,25 @@ import java.io.IOException; import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.streaming.util.functions.StreamingFunctionUtils; +import org.apache.iceberg.flink.log.LogSinkFunction; +import org.apache.iceberg.flink.log.LogWriteCallback; import org.apache.iceberg.io.TaskWriter; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; @@ -42,14 +56,45 @@ class IcebergStreamWriter extends AbstractStreamOperator private transient int attemptId; private transient IcebergStreamWriterMetrics writerMetrics; - IcebergStreamWriter(String fullTableName, TaskWriterFactory taskWriterFactory) { + private LogSinkFunction logSinkFunction; + + @Nullable private transient LogWriteCallback logCallback; + private transient SimpleContext sinkContext; + private long currentWatermark = Long.MIN_VALUE; + + IcebergStreamWriter( + String fullTableName, + TaskWriterFactory taskWriterFactory, + LogSinkFunction logSinkFunction) { this.fullTableName = fullTableName; this.taskWriterFactory = taskWriterFactory; + // TODO FTS调用logSinkFunction很多函数,Iceberg是否也需要调用,这个要看下Kafka connector code的实现 + this.logSinkFunction = logSinkFunction; setChainingStrategy(ChainingStrategy.ALWAYS); } @Override - public void open() { + public void setup( + StreamTask containingTask, + StreamConfig config, + Output> output) { + super.setup(containingTask, config, output); + if (logSinkFunction != null) { + FunctionUtils.setFunctionRuntimeContext(logSinkFunction, getRuntimeContext()); + } + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); + + if (logSinkFunction != null) { + StreamingFunctionUtils.restoreFunctionState(context, logSinkFunction); + } + } + + @Override + public void open() throws Exception { this.subTaskId = getRuntimeContext().getIndexOfThisSubtask(); this.attemptId = getRuntimeContext().getAttemptNumber(); this.writerMetrics = new IcebergStreamWriterMetrics(super.metrics, fullTableName); @@ -59,6 +104,13 @@ public void open() { // Initialize the task writer. this.writer = taskWriterFactory.create(); + + this.sinkContext = new SimpleContext(getProcessingTimeService()); + if (logSinkFunction != null) { + FunctionUtils.openFunction(logSinkFunction, new Configuration()); + logCallback = new LogWriteCallback(); + logSinkFunction.setWriteCallback(logCallback); + } } @Override @@ -70,6 +122,26 @@ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { @Override public void processElement(StreamRecord element) throws Exception { writer.write(element.getValue()); + if (logSinkFunction != null) { + logSinkFunction.invoke(element.getValue(), sinkContext); + } + } + + @Override + public void snapshotState(StateSnapshotContext context) throws Exception { + super.snapshotState(context); + if (logSinkFunction != null) { + StreamingFunctionUtils.snapshotFunctionState( + context, getOperatorStateBackend(), logSinkFunction); + } + } + + @Override + public void finish() throws Exception { + super.finish(); + if (logSinkFunction != null) { + logSinkFunction.finish(); + } } @Override @@ -79,6 +151,27 @@ public void close() throws Exception { writer.close(); writer = null; } + if (logSinkFunction != null) { + FunctionUtils.closeFunction(logSinkFunction); + } + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + super.notifyCheckpointComplete(checkpointId); + + if (logSinkFunction instanceof CheckpointListener) { + ((CheckpointListener) logSinkFunction).notifyCheckpointComplete(checkpointId); + } + } + + @Override + public void notifyCheckpointAborted(long checkpointId) throws Exception { + super.notifyCheckpointAborted(checkpointId); + + if (logSinkFunction instanceof CheckpointListener) { + ((CheckpointListener) logSinkFunction).notifyCheckpointAborted(checkpointId); + } } @Override @@ -109,6 +202,7 @@ private void flush() throws IOException { long startNano = System.nanoTime(); WriteResult result = writer.complete(); + result.setLogStorePartitionOffsets(logCallback.offsets()); writerMetrics.updateFlushResult(result); output.collect(new StreamRecord<>(result)); writerMetrics.flushDuration(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNano)); @@ -117,4 +211,30 @@ private void flush() throws IOException { // prepareSnapshotPreBarrier happening after endInput. writer = null; } + + private class SimpleContext implements SinkFunction.Context { + + @Nullable private Long timestamp; + + private final ProcessingTimeService processingTimeService; + + SimpleContext(ProcessingTimeService processingTimeService) { + this.processingTimeService = processingTimeService; + } + + @Override + public long currentProcessingTime() { + return processingTimeService.getCurrentProcessingTime(); + } + + @Override + public long currentWatermark() { + return currentWatermark; + } + + @Override + public Long timestamp() { + return timestamp; + } + } } diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java index df95b3e43497..afe42f75bed9 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java @@ -23,6 +23,7 @@ import java.time.Duration; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.ExecutorService; import javax.annotation.Nullable; @@ -40,10 +41,13 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.util.Preconditions; import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.log.LogStoreOffsetsUtils; +import org.apache.iceberg.flink.log.LogStoreTableFactory; import org.apache.iceberg.flink.source.assigner.SplitAssigner; import org.apache.iceberg.flink.source.assigner.SplitAssignerFactory; import org.apache.iceberg.flink.source.enumerator.ContinuousIcebergEnumerator; @@ -58,6 +62,7 @@ import org.apache.iceberg.flink.source.reader.RowDataReaderFunction; import org.apache.iceberg.flink.source.split.IcebergSourceSplit; import org.apache.iceberg.flink.source.split.IcebergSourceSplitSerializer; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.ThreadPools; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,17 +80,28 @@ public class IcebergSource implements Source logStoreOffsets; + IcebergSource( TableLoader tableLoader, ScanContext scanContext, ReaderFunction readerFunction, SplitAssignerFactory assignerFactory, - Table table) { + Table table, + boolean logStoreEnabled, + Map logStoreOffsets) { this.tableLoader = tableLoader; this.scanContext = scanContext; this.readerFunction = readerFunction; this.assignerFactory = assignerFactory; this.table = table; + this.logStoreEnabled = logStoreEnabled; + this.logStoreOffsets = logStoreOffsets; + } + + public Map getLogStoreOffsets() { + return logStoreOffsets; } String name() { @@ -133,7 +149,9 @@ private Table lazyTable() { @Override public Boundedness getBoundedness() { - return scanContext.isStreaming() ? Boundedness.CONTINUOUS_UNBOUNDED : Boundedness.BOUNDED; + return scanContext.isStreaming() && !logStoreEnabled + ? Boundedness.CONTINUOUS_UNBOUNDED + : Boundedness.BOUNDED; } @Override @@ -179,7 +197,7 @@ private SplitEnumerator createEnumer assigner = assignerFactory.createAssigner(enumState.pendingSplits()); } - if (scanContext.isStreaming()) { + if (scanContext.isStreaming() && !logStoreEnabled) { ContinuousSplitPlanner splitPlanner = new ContinuousSplitPlannerImpl(lazyTable(), scanContext, planningThreadName()); return new ContinuousIcebergEnumerator( @@ -213,6 +231,8 @@ public static class Builder { private TableSchema projectedFlinkSchema; private Boolean exposeLocality; + private boolean logStoreEnabled; + Builder() {} public Builder tableLoader(TableLoader loader) { @@ -335,6 +355,11 @@ public Builder exposeLocality(boolean newExposeLocality) { return this; } + public Builder logStoreEnabled(boolean newLogStoreEnabled) { + this.logStoreEnabled = newLogStoreEnabled; + return this; + } + public Builder properties(Map properties) { contextBuilder.fromProperties(properties); return this; @@ -368,10 +393,29 @@ public IcebergSource build() { this.readerFunction = (ReaderFunction) rowDataReaderFunction; } + Map offsetsMap = Maps.newHashMap(); + if (logStoreEnabled) { + Snapshot snapshot = table.currentSnapshot(); + if (snapshot != null) { + long snapshotId = snapshot.snapshotId(); + contextBuilder.useSnapshotId(snapshotId); + String kafkaOffsets = + Optional.ofNullable(snapshot.summary().get(LogStoreTableFactory.LOG_STORE_OFFSETS)) + .orElse(""); + offsetsMap.putAll(LogStoreOffsetsUtils.stringToOffsets(kafkaOffsets)); + } + } + checkRequired(); // Since builder already load the table, pass it to the source to avoid double loading return new IcebergSource( - tableLoader, context, readerFunction, splitAssignerFactory, table); + tableLoader, + context, + readerFunction, + splitAssignerFactory, + table, + logStoreEnabled, + offsetsMap); } private void checkRequired() { diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java index 46b33a5d842b..0be021f317f0 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java @@ -25,7 +25,9 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Source; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.connector.base.source.hybrid.HybridSource; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -40,11 +42,14 @@ import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; import org.apache.flink.table.data.RowData; import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.factories.DynamicTableFactory; import org.apache.flink.table.types.DataType; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.flink.FlinkConfigOptions; import org.apache.iceberg.flink.FlinkFilters; import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.log.LogSourceProvider; +import org.apache.iceberg.flink.log.LogStoreTableFactory; import org.apache.iceberg.flink.source.assigner.SplitAssignerType; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -68,6 +73,10 @@ public class IcebergTableSource private final boolean isLimitPushDown; private final ReadableConfig readableConfig; + private final LogStoreTableFactory logStoreTableFactory; + + private final DynamicTableFactory.Context context; + private IcebergTableSource(IcebergTableSource toCopy) { this.loader = toCopy.loader; this.schema = toCopy.schema; @@ -77,14 +86,28 @@ private IcebergTableSource(IcebergTableSource toCopy) { this.limit = toCopy.limit; this.filters = toCopy.filters; this.readableConfig = toCopy.readableConfig; + this.logStoreTableFactory = toCopy.logStoreTableFactory; + this.context = toCopy.context; } public IcebergTableSource( TableLoader loader, TableSchema schema, Map properties, - ReadableConfig readableConfig) { - this(loader, schema, properties, null, false, -1, ImmutableList.of(), readableConfig); + ReadableConfig readableConfig, + LogStoreTableFactory logStoreTableFactory, + DynamicTableFactory.Context context) { + this( + loader, + schema, + properties, + null, + false, + -1, + ImmutableList.of(), + readableConfig, + logStoreTableFactory, + context); } private IcebergTableSource( @@ -95,7 +118,9 @@ private IcebergTableSource( boolean isLimitPushDown, long limit, List filters, - ReadableConfig readableConfig) { + ReadableConfig readableConfig, + LogStoreTableFactory logStoreTableFactory, + DynamicTableFactory.Context context) { this.loader = loader; this.schema = schema; this.properties = properties; @@ -104,6 +129,8 @@ private IcebergTableSource( this.limit = limit; this.filters = filters; this.readableConfig = readableConfig; + this.logStoreTableFactory = logStoreTableFactory; + this.context = context; } @Override @@ -128,24 +155,48 @@ private DataStream createDataStream(StreamExecutionEnvironment execEnv) .build(); } - private DataStreamSource createFLIP27Stream(StreamExecutionEnvironment env) { + private DataStreamSource createFLIP27Stream( + StreamExecutionEnvironment env, ScanContext scanContext) { SplitAssignerType assignerType = readableConfig.get(FlinkConfigOptions.TABLE_EXEC_SPLIT_ASSIGNER_TYPE); - IcebergSource source = - IcebergSource.forRowData() - .tableLoader(loader) - .assignerFactory(assignerType.factory()) - .properties(properties) - .project(getProjectedSchema()) - .limit(limit) - .filters(filters) - .flinkConfig(readableConfig) - .build(); + + LogSourceProvider logSourceProvider; + Source finalSource; + IcebergSource fileSource; + if (logStoreTableFactory == null) { + fileSource = + IcebergSource.forRowData() + .tableLoader(loader) + .assignerFactory(assignerType.factory()) + .properties(properties) + .project(getProjectedSchema()) + .limit(limit) + .filters(filters) + .flinkConfig(readableConfig) + .build(); + finalSource = fileSource; + } else { + logSourceProvider = logStoreTableFactory.createSourceProvider(this.context, scanContext); + fileSource = + IcebergSource.forRowData() + .tableLoader(loader) + .assignerFactory(assignerType.factory()) + .properties(properties) + .project(getProjectedSchema()) + .limit(limit) + .filters(filters) + .flinkConfig(readableConfig) + .logStoreEnabled(true) + .build(); + Source logSource = logSourceProvider.createSource(fileSource.getLogStoreOffsets(), loader); + finalSource = HybridSource.builder(fileSource).addSource(logSource).build(); + } + DataStreamSource stream = env.fromSource( - source, + finalSource, WatermarkStrategy.noWatermarks(), - source.name(), + fileSource.name(), TypeInformation.of(RowData.class)); return stream; } @@ -204,7 +255,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon public DataStream produceDataStream( ProviderContext providerContext, StreamExecutionEnvironment execEnv) { if (readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE)) { - return createFLIP27Stream(execEnv); + return createFLIP27Stream(execEnv, runtimeProviderContext); } else { return createDataStream(execEnv); } diff --git a/flink/v1.16/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink/v1.16/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index 29a9955a7e20..88b0713c21f4 100644 --- a/flink/v1.16/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/flink/v1.16/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -14,3 +14,5 @@ # limitations under the License. org.apache.iceberg.flink.FlinkDynamicTableFactory +org.apache.iceberg.flink.kafka.KafkaLogStoreFactory + diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java index 49f472b7325e..1df92e910d8c 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java @@ -215,7 +215,7 @@ private static OneInputStreamOperatorTestHarness createIce icebergTable, override, new org.apache.flink.configuration.Configuration()); IcebergStreamWriter streamWriter = - FlinkSink.createStreamWriter(icebergTable, flinkWriteConfig, flinkRowType, null); + FlinkSink.createStreamWriter(icebergTable, flinkWriteConfig, flinkRowType, null, null); OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness<>(streamWriter, 1, 1, 0); diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java index fa69c5d4d1fd..7e9b964d2293 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java @@ -376,7 +376,7 @@ private OneInputStreamOperatorTestHarness createIcebergStr icebergTable, Maps.newHashMap(), new org.apache.flink.configuration.Configuration()); IcebergStreamWriter streamWriter = - FlinkSink.createStreamWriter(icebergTable, flinkWriteConfig, flinkRowType, null); + FlinkSink.createStreamWriter(icebergTable, flinkWriteConfig, flinkRowType, null, null); OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness<>(streamWriter, 1, 1, 0); diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkLogStoreSourceSql.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkLogStoreSourceSql.java new file mode 100644 index 000000000000..62437e962f45 --- /dev/null +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkLogStoreSourceSql.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source; + +import static org.apache.iceberg.flink.TestFixtures.DATABASE; +import static org.apache.iceberg.types.Types.NestedField.required; + +import java.io.IOException; +import java.util.List; +import org.apache.flink.table.api.config.TableConfigOptions; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TestHelpers; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.GenericAppenderHelper; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Types; +import org.junit.Test; + +public class TestFlinkLogStoreSourceSql extends TestSqlBase { + + @Override + public void before() throws IOException { + SqlHelpers.sql( + getTableEnv(), + "CREATE CATALOG iceberg_catalog with ('type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='%s')", + catalogResource.warehouse()); + SqlHelpers.sql(getTableEnv(), "use catalog iceberg_catalog"); + getTableEnv() + .getConfig() + .getConfiguration() + .set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true); + } + + public static final Schema SCHEMA = + new Schema( + required(1, "data", Types.StringType.get()), + required(2, "id", Types.LongType.get()), + required(3, "dt", Types.StringType.get())); + + public static final String TABLE = "log_store"; + + public static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of(DATABASE, TABLE); + + @Test + public void testR() throws Exception { + Table table = + catalogResource + .catalog() + .createTable( + TABLE_IDENTIFIER, TestFixtures.SCHEMA, TestFixtures.SPEC, null, Maps.newHashMap()); + + List writeRecords = RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L); + writeRecords.get(0).set(1, 123L); + writeRecords.get(0).set(2, "2020-03-20"); + writeRecords.get(1).set(1, 456L); + writeRecords.get(1).set(2, "2020-03-20"); + + GenericAppenderHelper helper = + new GenericAppenderHelper(table, FileFormat.PARQUET, TEMPORARY_FOLDER); + + List expectedRecords = Lists.newArrayList(); + expectedRecords.add(writeRecords.get(0)); + + DataFile dataFile1 = helper.writeFile(TestHelpers.Row.of("2020-03-20", 0), writeRecords); + DataFile dataFile2 = + helper.writeFile( + TestHelpers.Row.of("2020-03-21", 0), + RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L)); + helper.appendToTable(dataFile1, dataFile2); + + org.apache.iceberg.flink.TestHelpers.assertRecords( + run(Maps.newHashMap(), "where dt='2020-03-20' and id=123", "*"), + expectedRecords, + TestFixtures.SCHEMA); + } +} diff --git a/settings.gradle b/settings.gradle index d1a14abe5b0f..7717b8d07d44 100644 --- a/settings.gradle +++ b/settings.gradle @@ -34,6 +34,7 @@ include 'hive-metastore' include 'nessie' include 'gcp' include 'dell' +include 'flink-example' project(':api').name = 'iceberg-api' project(':common').name = 'iceberg-common' @@ -51,6 +52,7 @@ project(':hive-metastore').name = 'iceberg-hive-metastore' project(':nessie').name = 'iceberg-nessie' project(':gcp').name = 'iceberg-gcp' project(':dell').name = 'iceberg-dell' +project(':flink-example').name = 'iceberg-flink-example' if (null != System.getProperty("allVersions")) { System.setProperty("flinkVersions", System.getProperty("knownFlinkVersions"))