From 984b94cca8d49311f63ccfc98686f6b7ebc54015 Mon Sep 17 00:00:00 2001 From: Mario Molina Date: Sat, 9 Dec 2017 12:28:47 -0600 Subject: [PATCH 1/3] New Agnostic file reader --- .../fs/file/reader/AbstractFileReader.java | 4 + .../fs/file/reader/AgnosticFileReader.java | 123 ++++++++++++++++++ .../connect/fs/util/ReflectionUtils.java | 6 +- 3 files changed, 128 insertions(+), 5 deletions(-) create mode 100644 src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/AgnosticFileReader.java diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/AbstractFileReader.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/AbstractFileReader.java index 6c53356..4e1b474 100644 --- a/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/AbstractFileReader.java +++ b/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/AbstractFileReader.java @@ -46,4 +46,8 @@ public final Struct next() { protected abstract T nextRecord(); + protected ReaderAdapter getAdapter() { + return adapter; + } + } diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/AgnosticFileReader.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/AgnosticFileReader.java new file mode 100644 index 0000000..5e025da --- /dev/null +++ b/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/AgnosticFileReader.java @@ -0,0 +1,123 @@ +package com.github.mmolimar.kafka.connect.fs.file.reader; + +import com.github.mmolimar.kafka.connect.fs.file.Offset; +import com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.kafka.connect.data.Struct; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import static com.github.mmolimar.kafka.connect.fs.FsSourceTaskConfig.FILE_READER_PREFIX; + +public class AgnosticFileReader extends AbstractFileReader { + + private static final String FILE_READER_AGNOSTIC = FILE_READER_PREFIX + "agnostic."; + private static final String FILE_READER_AGNOSTIC_EXTENSIONS = FILE_READER_AGNOSTIC + "extensions."; + public static final String FILE_READER_AGNOSTIC_EXTENSIONS_PARQUET = FILE_READER_AGNOSTIC_EXTENSIONS + "parquet"; + public static final String FILE_READER_AGNOSTIC_EXTENSIONS_AVRO = FILE_READER_AGNOSTIC_EXTENSIONS + "avro"; + public static final String FILE_READER_AGNOSTIC_EXTENSIONS_SEQUENCE = FILE_READER_AGNOSTIC_EXTENSIONS + "sequence"; + public static final String FILE_READER_AGNOSTIC_EXTENSIONS_DELIMITED = FILE_READER_AGNOSTIC_EXTENSIONS + "delimited"; + + private final AbstractFileReader reader; + private List parquetExtensions, avroExtensions, sequenceExtensions, delimitedExtensions; + + public AgnosticFileReader(FileSystem fs, Path filePath, Map config) throws IOException { + super(fs, filePath, new AgnosticAdapter(), config); + + try { + reader = (AbstractFileReader) readerByExtension(fs, filePath, config); + } catch (RuntimeException | IOException e) { + throw e; + } catch (Throwable t) { + throw new IOException("An error has ocurred when creating a concrete reader", t); + } + } + + private FileReader readerByExtension(FileSystem fs, Path filePath, Map config) + throws Throwable { + int index = filePath.getName().lastIndexOf('.'); + String extension = index == -1 || index == filePath.getName().length() - 1 ? "" : + filePath.getName().substring(index + 1).toLowerCase(); + + Class clz; + if (parquetExtensions.contains(extension)) { + clz = ParquetFileReader.class; + } else if (avroExtensions.contains(extension)) { + clz = AvroFileReader.class; + } else if (sequenceExtensions.contains(extension)) { + clz = SequenceFileReader.class; + } else if (delimitedExtensions.contains(extension)) { + clz = DelimitedTextFileReader.class; + } else { + clz = TextFileReader.class; + } + + return ReflectionUtils.makeReader(clz, fs, filePath, config); + } + + @Override + protected void configure(Map config) { + this.parquetExtensions = config.get(FILE_READER_AGNOSTIC_EXTENSIONS_PARQUET) == null ? + Arrays.asList("parquet") : + Arrays.asList(config.get(FILE_READER_AGNOSTIC_EXTENSIONS_PARQUET).toString().toLowerCase().split(",")); + this.avroExtensions = config.get(FILE_READER_AGNOSTIC_EXTENSIONS_AVRO) == null ? + Arrays.asList("avro") : + Arrays.asList(config.get(FILE_READER_AGNOSTIC_EXTENSIONS_AVRO).toString().toLowerCase().split(",")); + this.sequenceExtensions = config.get(FILE_READER_AGNOSTIC_EXTENSIONS_SEQUENCE) == null ? + Arrays.asList("seq") : + Arrays.asList(config.get(FILE_READER_AGNOSTIC_EXTENSIONS_SEQUENCE).toString().toLowerCase().split(",")); + this.delimitedExtensions = config.get(FILE_READER_AGNOSTIC_EXTENSIONS_DELIMITED) == null ? + Arrays.asList("tsv", "csv") : + Arrays.asList(config.get(FILE_READER_AGNOSTIC_EXTENSIONS_DELIMITED).toString().toLowerCase().split(",")); + } + + @Override + public boolean hasNext() { + return reader.hasNext(); + } + + @Override + public void seek(Offset offset) { + reader.seek(offset); + } + + @Override + public Offset currentOffset() { + return reader.currentOffset(); + } + + @Override + public void close() throws IOException { + reader.close(); + } + + @Override + protected AgnosticRecord nextRecord() { + return new AgnosticRecord(reader.getAdapter(), reader.nextRecord()); + } + + static class AgnosticAdapter implements ReaderAdapter { + + public AgnosticAdapter() { + } + + @Override + public Struct apply(AgnosticRecord ag) { + return ag.adapter.apply(ag.record); + } + } + + static class AgnosticRecord { + private final ReaderAdapter adapter; + private final Object record; + + public AgnosticRecord(ReaderAdapter adapter, Object record) { + this.adapter = adapter; + this.record = record; + } + } +} diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/util/ReflectionUtils.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/util/ReflectionUtils.java index 39d1c23..babe70c 100644 --- a/src/main/java/com/github/mmolimar/kafka/connect/fs/util/ReflectionUtils.java +++ b/src/main/java/com/github/mmolimar/kafka/connect/fs/util/ReflectionUtils.java @@ -25,15 +25,11 @@ public static Policy makePolicy(Class clazz, FsSourceTaskConfi private static T make(Class clazz, Object... args) throws Throwable { try { - if (args == null || args.length == 0) { - return (T) clazz.getConstructor().newInstance(); - } Class[] constClasses = Arrays.stream(args).map(arg -> arg.getClass()).toArray(Class[]::new); Constructor constructor = ConstructorUtils.getMatchingAccessibleConstructor(clazz, constClasses); return (T) constructor.newInstance(args); - } catch (NoSuchMethodException | - IllegalAccessException | + } catch (IllegalAccessException | InstantiationException | InvocationTargetException e) { throw e.getCause(); From 0d7322349912899fd80a2a7521d6f08afe25efa5 Mon Sep 17 00:00:00 2001 From: Mario Molina Date: Sat, 9 Dec 2017 12:30:24 -0600 Subject: [PATCH 2/3] Adapting tests for the new agnostic file reader --- .../fs/file/reader/FileReaderTestBase.java | 5 ++-- .../file/reader/hdfs/AvroFileReaderTest.java | 11 ++++++-- .../hdfs/DelimitedTextFileReaderTest.java | 13 ++++++--- .../reader/hdfs/ParquetFileReaderTest.java | 18 ++++++++++--- .../reader/hdfs/SequenceFileReaderTest.java | 14 +++++++--- .../file/reader/hdfs/TextFileReaderTest.java | 12 +++++++-- .../file/reader/local/AvroFileReaderTest.java | 19 ++++++++++--- .../local/DelimitedTextFileReaderTest.java | 24 ++++++++++++++--- .../reader/local/ParquetFileReaderTest.java | 27 +++++++++++++++---- .../reader/local/SequenceFileReaderTest.java | 16 ++++++++--- .../file/reader/local/TextFileReaderTest.java | 12 +++++++-- 11 files changed, 138 insertions(+), 33 deletions(-) diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/FileReaderTestBase.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/FileReaderTestBase.java index ef63849..e4aa2b4 100644 --- a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/FileReaderTestBase.java +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/FileReaderTestBase.java @@ -68,7 +68,7 @@ public void fileDoesNotExist() throws Throwable { @Test(expected = IOException.class) public void emptyFile() throws Throwable { - File tmp = File.createTempFile("test-", ""); + File tmp = File.createTempFile("test-", "." + getFileExtension()); Path path = new Path(new Path(fsUri), tmp.getName()); fs.moveFromLocalFile(new Path(tmp.getAbsolutePath()), path); getReader(fs, path, readerConfig); @@ -76,7 +76,7 @@ public void emptyFile() throws Throwable { @Test(expected = IOException.class) public void invalidFileFormat() throws Throwable { - File tmp = File.createTempFile("test-", ""); + File tmp = File.createTempFile("test-", "." + getFileExtension()); try (BufferedWriter writer = new BufferedWriter(new FileWriter(tmp))) { writer.write("test"); } @@ -150,5 +150,6 @@ protected final FileReader getReader(FileSystem fs, Path path, Map(); } private static Path createDataFile() throws IOException { - File avroFile = File.createTempFile("test-", ".avro"); + File avroFile = File.createTempFile("test-", "." + FILE_EXTENSION); DatumWriter writer = new GenericDatumWriter<>(schema); try (DataFileWriter dataFileWriter = new DataFileWriter<>(writer)) { dataFileWriter.setFlushOnEveryBlock(true); @@ -103,4 +105,9 @@ protected void checkData(Struct record, long index) { assertTrue(record.get(FIELD_NAME).toString().startsWith(index + "_")); assertTrue(record.get(FIELD_SURNAME).toString().startsWith(index + "_")); } + + @Override + protected String getFileExtension() { + return FILE_EXTENSION; + } } diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/hdfs/DelimitedTextFileReaderTest.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/hdfs/DelimitedTextFileReaderTest.java index 5aec386..da5304d 100644 --- a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/hdfs/DelimitedTextFileReaderTest.java +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/hdfs/DelimitedTextFileReaderTest.java @@ -1,6 +1,7 @@ package com.github.mmolimar.kafka.connect.fs.file.reader.hdfs; import com.github.mmolimar.kafka.connect.fs.file.Offset; +import com.github.mmolimar.kafka.connect.fs.file.reader.AgnosticFileReader; import com.github.mmolimar.kafka.connect.fs.file.reader.DelimitedTextFileReader; import com.github.mmolimar.kafka.connect.fs.file.reader.FileReader; import org.apache.hadoop.fs.FileSystem; @@ -27,10 +28,11 @@ public class DelimitedTextFileReaderTest extends HdfsFileReaderTestBase { private static final String FIELD_COLUMN2 = "column_2"; private static final String FIELD_COLUMN3 = "column_3"; private static final String FIELD_COLUMN4 = "column_4"; + private static final String FILE_EXTENSION = "csv"; @BeforeClass public static void setUp() throws IOException { - readerClass = DelimitedTextFileReader.class; + readerClass = AgnosticFileReader.class; dataFile = createDataFile(true); readerConfig = new HashMap() {{ put(DelimitedTextFileReader.FILE_READER_DELIMITED_TOKEN, ","); @@ -39,7 +41,7 @@ public static void setUp() throws IOException { } private static Path createDataFile(boolean header) throws IOException { - File txtFile = File.createTempFile("test-", ".txt"); + File txtFile = File.createTempFile("test-", "." + FILE_EXTENSION); try (FileWriter writer = new FileWriter(txtFile)) { if (header) @@ -102,7 +104,7 @@ public void readAllDataWithoutHeader() throws Throwable { @Test public void readAllDataWithMalformedRows() throws Throwable { - File tmp = File.createTempFile("test-", ""); + File tmp = File.createTempFile("test-", "." + getFileExtension()); try (FileWriter writer = new FileWriter(tmp)) { writer.append(FIELD_COLUMN1 + "," + FIELD_COLUMN2 + "," + FIELD_COLUMN3 + "," + FIELD_COLUMN4 + "\n"); writer.append("dummy\n"); @@ -200,4 +202,9 @@ protected void checkData(Struct record, long index) { assertTrue(record.get(FIELD_COLUMN3).toString().startsWith(index + "_")); assertTrue(record.get(FIELD_COLUMN4).toString().startsWith(index + "_")); } + + @Override + protected String getFileExtension() { + return FILE_EXTENSION; + } } diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/hdfs/ParquetFileReaderTest.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/hdfs/ParquetFileReaderTest.java index bbf1c6f..ae0e82c 100644 --- a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/hdfs/ParquetFileReaderTest.java +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/hdfs/ParquetFileReaderTest.java @@ -1,6 +1,7 @@ package com.github.mmolimar.kafka.connect.fs.file.reader.hdfs; import com.github.mmolimar.kafka.connect.fs.file.Offset; +import com.github.mmolimar.kafka.connect.fs.file.reader.AgnosticFileReader; import com.github.mmolimar.kafka.connect.fs.file.reader.ParquetFileReader; import org.apache.avro.AvroRuntimeException; import org.apache.avro.Schema; @@ -34,21 +35,24 @@ public class ParquetFileReaderTest extends HdfsFileReaderTestBase { private static final String FIELD_INDEX = "index"; private static final String FIELD_NAME = "name"; private static final String FIELD_SURNAME = "surname"; + private static final String FILE_EXTENSION = "parquet"; private static Schema readerSchema; private static Schema projectionSchema; @BeforeClass public static void setUp() throws IOException { - readerClass = ParquetFileReader.class; + readerClass = AgnosticFileReader.class; dataFile = createDataFile(); readerConfig = new HashMap<>(); } private static Path createDataFile() throws IOException { - File parquetFile = File.createTempFile("test-", ".parquet"); - readerSchema = new Schema.Parser().parse(com.github.mmolimar.kafka.connect.fs.file.reader.local.ParquetFileReaderTest.class.getResourceAsStream("/file/reader/schemas/people.avsc")); - projectionSchema = new Schema.Parser().parse(com.github.mmolimar.kafka.connect.fs.file.reader.local.ParquetFileReaderTest.class.getResourceAsStream("/file/reader/schemas/people_projection.avsc")); + File parquetFile = File.createTempFile("test-", "." + FILE_EXTENSION); + readerSchema = new Schema.Parser().parse( + ParquetFileReaderTest.class.getResourceAsStream("/file/reader/schemas/people.avsc")); + projectionSchema = new Schema.Parser().parse( + ParquetFileReaderTest.class.getResourceAsStream("/file/reader/schemas/people_projection.avsc")); try (ParquetWriter writer = AvroParquetWriter.builder(new Path(parquetFile.toURI())) .withConf(fs.getConf()).withWriteMode(ParquetFileWriter.Mode.OVERWRITE).withSchema(readerSchema).build()) { @@ -138,4 +142,10 @@ protected void checkData(Struct record, long index) { assertTrue(record.get(FIELD_NAME).toString().startsWith(index + "_")); assertTrue(record.get(FIELD_SURNAME).toString().startsWith(index + "_")); } + + @Override + protected String getFileExtension() { + return FILE_EXTENSION; + } + } diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/hdfs/SequenceFileReaderTest.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/hdfs/SequenceFileReaderTest.java index 2136a03..d7e6ba0 100644 --- a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/hdfs/SequenceFileReaderTest.java +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/hdfs/SequenceFileReaderTest.java @@ -1,6 +1,7 @@ package com.github.mmolimar.kafka.connect.fs.file.reader.hdfs; import com.github.mmolimar.kafka.connect.fs.file.Offset; +import com.github.mmolimar.kafka.connect.fs.file.reader.AgnosticFileReader; import com.github.mmolimar.kafka.connect.fs.file.reader.SequenceFileReader; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; @@ -26,10 +27,11 @@ public class SequenceFileReaderTest extends HdfsFileReaderTestBase { private static final String FIELD_NAME_KEY = "key"; private static final String FIELD_NAME_VALUE = "value"; + private static final String FILE_EXTENSION = "seq"; @BeforeClass public static void setUp() throws IOException { - readerClass = SequenceFileReader.class; + readerClass = AgnosticFileReader.class; dataFile = createDataFile(); readerConfig = new HashMap() {{ put(SequenceFileReader.FILE_READER_SEQUENCE_FIELD_NAME_KEY, FIELD_NAME_KEY); @@ -38,7 +40,7 @@ public static void setUp() throws IOException { } private static Path createDataFile() throws IOException { - File seqFile = File.createTempFile("test-", ".seq"); + File seqFile = File.createTempFile("test-", "." + FILE_EXTENSION); try (SequenceFile.Writer writer = SequenceFile.createWriter(fs.getConf(), SequenceFile.Writer.file(new Path(seqFile.getAbsolutePath())), SequenceFile.Writer.keyClass(IntWritable.class), SequenceFile.Writer.valueClass(Text.class))) { @@ -71,7 +73,7 @@ private static Path createDataFile() throws IOException { @Test public void defaultFieldNames() throws Throwable { - Map customReaderCfg = new HashMap(); + Map customReaderCfg = new HashMap<>(); reader = getReader(fs, dataFile, customReaderCfg); assertTrue(reader.getFilePath().equals(dataFile)); @@ -100,4 +102,10 @@ private void checkData(String keyFieldName, String valueFieldName, Struct record assertTrue((Integer) record.get(keyFieldName) == index); assertTrue(record.get(valueFieldName).toString().startsWith(index + "_")); } + + @Override + protected String getFileExtension() { + return FILE_EXTENSION; + } + } diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/hdfs/TextFileReaderTest.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/hdfs/TextFileReaderTest.java index 858207f..0c37d4d 100644 --- a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/hdfs/TextFileReaderTest.java +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/hdfs/TextFileReaderTest.java @@ -1,6 +1,7 @@ package com.github.mmolimar.kafka.connect.fs.file.reader.hdfs; import com.github.mmolimar.kafka.connect.fs.file.Offset; +import com.github.mmolimar.kafka.connect.fs.file.reader.AgnosticFileReader; import com.github.mmolimar.kafka.connect.fs.file.reader.TextFileReader; import org.apache.hadoop.fs.Path; import org.apache.kafka.connect.data.Struct; @@ -22,10 +23,11 @@ public class TextFileReaderTest extends HdfsFileReaderTestBase { private static final String FIELD_NAME_VALUE = "custom_field_name"; + private static final String FILE_EXTENSION = "txt"; @BeforeClass public static void setUp() throws IOException { - readerClass = TextFileReader.class; + readerClass = AgnosticFileReader.class; dataFile = createDataFile(); readerConfig = new HashMap() {{ put(TextFileReader.FILE_READER_TEXT_FIELD_NAME_VALUE, FIELD_NAME_VALUE); @@ -33,7 +35,7 @@ public static void setUp() throws IOException { } private static Path createDataFile() throws IOException { - File txtFile = File.createTempFile("test-", ".txt"); + File txtFile = File.createTempFile("test-", "." + FILE_EXTENSION); try (FileWriter writer = new FileWriter(txtFile)) { IntStream.range(0, NUM_RECORDS).forEach(index -> { @@ -91,4 +93,10 @@ protected Offset getOffset(long offset) { protected void checkData(Struct record, long index) { assertTrue(record.get(FIELD_NAME_VALUE).toString().startsWith(index + "_")); } + + @Override + protected String getFileExtension() { + return FILE_EXTENSION; + } + } diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/local/AvroFileReaderTest.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/local/AvroFileReaderTest.java index 1e28206..de4ed20 100644 --- a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/local/AvroFileReaderTest.java +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/local/AvroFileReaderTest.java @@ -1,6 +1,7 @@ package com.github.mmolimar.kafka.connect.fs.file.reader.local; import com.github.mmolimar.kafka.connect.fs.file.Offset; +import com.github.mmolimar.kafka.connect.fs.file.reader.AgnosticFileReader; import com.github.mmolimar.kafka.connect.fs.file.reader.AvroFileReader; import org.apache.avro.AvroTypeException; import org.apache.avro.Schema; @@ -29,19 +30,22 @@ public class AvroFileReaderTest extends LocalFileReaderTestBase { private static final String FIELD_INDEX = "index"; private static final String FIELD_NAME = "name"; private static final String FIELD_SURNAME = "surname"; + private static final String FILE_EXTENSION = "avr"; private static Schema schema; @BeforeClass public static void setUp() throws IOException { schema = new Schema.Parser().parse(AvroFileReaderTest.class.getResourceAsStream("/file/reader/schemas/people.avsc")); - readerClass = AvroFileReader.class; + readerClass = AgnosticFileReader.class; dataFile = createDataFile(); - readerConfig = new HashMap<>(); + readerConfig = new HashMap() {{ + put(AgnosticFileReader.FILE_READER_AGNOSTIC_EXTENSIONS_AVRO, FILE_EXTENSION); + }}; } private static Path createDataFile() throws IOException { - File avroFile = File.createTempFile("test-", ".avro"); + File avroFile = File.createTempFile("test-", "." + FILE_EXTENSION); DatumWriter writer = new GenericDatumWriter<>(schema); try (DataFileWriter dataFileWriter = new DataFileWriter<>(writer)) { dataFileWriter.setFlushOnEveryBlock(true); @@ -70,6 +74,7 @@ private static Path createDataFile() throws IOException { public void readerWithSchema() throws Throwable { Map cfg = new HashMap() {{ put(AvroFileReader.FILE_READER_AVRO_SCHEMA, schema.toString()); + put(AgnosticFileReader.FILE_READER_AGNOSTIC_EXTENSIONS_AVRO, getFileExtension()); }}; reader = getReader(fs, dataFile, cfg); readAllData(); @@ -79,6 +84,7 @@ public void readerWithSchema() throws Throwable { public void readerWithInvalidSchema() throws Throwable { Map cfg = new HashMap() {{ put(AvroFileReader.FILE_READER_AVRO_SCHEMA, Schema.create(Schema.Type.STRING).toString()); + put(AgnosticFileReader.FILE_READER_AGNOSTIC_EXTENSIONS_AVRO, getFileExtension()); }}; reader = getReader(fs, dataFile, cfg); readAllData(); @@ -88,6 +94,7 @@ public void readerWithInvalidSchema() throws Throwable { public void readerWithUnparseableSchema() throws Throwable { Map cfg = new HashMap() {{ put(AvroFileReader.FILE_READER_AVRO_SCHEMA, "invalid schema"); + put(AgnosticFileReader.FILE_READER_AGNOSTIC_EXTENSIONS_AVRO, getFileExtension()); }}; getReader(fs, dataFile, cfg); } @@ -103,4 +110,10 @@ protected void checkData(Struct record, long index) { assertTrue(record.get(FIELD_NAME).toString().startsWith(index + "_")); assertTrue(record.get(FIELD_SURNAME).toString().startsWith(index + "_")); } + + @Override + protected String getFileExtension() { + return FILE_EXTENSION; + } + } diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/local/DelimitedTextFileReaderTest.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/local/DelimitedTextFileReaderTest.java index f684523..5884240 100644 --- a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/local/DelimitedTextFileReaderTest.java +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/local/DelimitedTextFileReaderTest.java @@ -1,6 +1,7 @@ package com.github.mmolimar.kafka.connect.fs.file.reader.local; import com.github.mmolimar.kafka.connect.fs.file.Offset; +import com.github.mmolimar.kafka.connect.fs.file.reader.AgnosticFileReader; import com.github.mmolimar.kafka.connect.fs.file.reader.DelimitedTextFileReader; import com.github.mmolimar.kafka.connect.fs.file.reader.FileReader; import org.apache.hadoop.fs.FileSystem; @@ -27,19 +28,21 @@ public class DelimitedTextFileReaderTest extends LocalFileReaderTestBase { private static final String FIELD_COLUMN2 = "column_2"; private static final String FIELD_COLUMN3 = "column_3"; private static final String FIELD_COLUMN4 = "column_4"; + private static final String FILE_EXTENSION = "tcsv"; @BeforeClass public static void setUp() throws IOException { - readerClass = DelimitedTextFileReader.class; + readerClass = AgnosticFileReader.class; dataFile = createDataFile(true); readerConfig = new HashMap() {{ put(DelimitedTextFileReader.FILE_READER_DELIMITED_TOKEN, ","); put(DelimitedTextFileReader.FILE_READER_DELIMITED_HEADER, "true"); + put(AgnosticFileReader.FILE_READER_AGNOSTIC_EXTENSIONS_DELIMITED, FILE_EXTENSION); }}; } private static Path createDataFile(boolean header) throws IOException { - File txtFile = File.createTempFile("test-", ".txt"); + File txtFile = File.createTempFile("test-", "." + FILE_EXTENSION); try (FileWriter writer = new FileWriter(txtFile)) { if (header) @@ -74,7 +77,10 @@ public void invalidFileFormat() throws Throwable { @Test(expected = IllegalArgumentException.class) public void invaliConfigArgs() throws Throwable { try { - readerClass.getConstructor(FileSystem.class, Path.class, Map.class).newInstance(fs, dataFile, new HashMap<>()); + readerClass.getConstructor(FileSystem.class, Path.class, Map.class).newInstance(fs, dataFile, + new HashMap() {{ + put(AgnosticFileReader.FILE_READER_AGNOSTIC_EXTENSIONS_DELIMITED, FILE_EXTENSION); + }}); } catch (Exception e) { throw e.getCause(); } @@ -86,6 +92,7 @@ public void readAllDataWithoutHeader() throws Throwable { FileReader reader = getReader(fs, file, new HashMap() {{ put(DelimitedTextFileReader.FILE_READER_DELIMITED_TOKEN, ","); put(DelimitedTextFileReader.FILE_READER_DELIMITED_HEADER, "false"); + put(AgnosticFileReader.FILE_READER_AGNOSTIC_EXTENSIONS_DELIMITED, getFileExtension()); }}); assertTrue(reader.hasNext()); @@ -102,7 +109,7 @@ public void readAllDataWithoutHeader() throws Throwable { @Test public void readAllDataWithMalformedRows() throws Throwable { - File tmp = File.createTempFile("test-", ""); + File tmp = File.createTempFile("test-", "." + getFileExtension()); try (FileWriter writer = new FileWriter(tmp)) { writer.append(FIELD_COLUMN1 + "," + FIELD_COLUMN2 + "," + FIELD_COLUMN3 + "," + FIELD_COLUMN4 + "\n"); writer.append("dummy\n"); @@ -112,6 +119,7 @@ public void readAllDataWithMalformedRows() throws Throwable { put(DelimitedTextFileReader.FILE_READER_DELIMITED_TOKEN, ","); put(DelimitedTextFileReader.FILE_READER_DELIMITED_HEADER, "true"); put(DelimitedTextFileReader.FILE_READER_DELIMITED_DEFAULT_VALUE, "custom_value"); + put(AgnosticFileReader.FILE_READER_AGNOSTIC_EXTENSIONS_DELIMITED, getFileExtension()); }}; Path path = new Path(new Path(fsUri), tmp.getName()); fs.moveFromLocalFile(new Path(tmp.getAbsolutePath()), path); @@ -137,6 +145,7 @@ public void seekFileWithoutHeader() throws Throwable { FileReader reader = getReader(fs, file, new HashMap() {{ put(DelimitedTextFileReader.FILE_READER_DELIMITED_TOKEN, ","); put(DelimitedTextFileReader.FILE_READER_DELIMITED_HEADER, "false"); + put(AgnosticFileReader.FILE_READER_AGNOSTIC_EXTENSIONS_DELIMITED, getFileExtension()); }}); assertTrue(reader.hasNext()); @@ -170,6 +179,7 @@ public void validFileEncoding() throws Throwable { put(DelimitedTextFileReader.FILE_READER_DELIMITED_TOKEN, ","); put(DelimitedTextFileReader.FILE_READER_DELIMITED_HEADER, "true"); put(DelimitedTextFileReader.FILE_READER_DELIMITED_ENCODING, "Cp1252"); + put(AgnosticFileReader.FILE_READER_AGNOSTIC_EXTENSIONS_DELIMITED, getFileExtension()); }}; getReader(fs, dataFile, cfg); } @@ -180,6 +190,7 @@ public void invalidFileEncoding() throws Throwable { put(DelimitedTextFileReader.FILE_READER_DELIMITED_TOKEN, ","); put(DelimitedTextFileReader.FILE_READER_DELIMITED_HEADER, "true"); put(DelimitedTextFileReader.FILE_READER_DELIMITED_ENCODING, "invalid_charset"); + put(AgnosticFileReader.FILE_READER_AGNOSTIC_EXTENSIONS_DELIMITED, getFileExtension()); }}; getReader(fs, dataFile, cfg); } @@ -200,4 +211,9 @@ protected void checkData(Struct record, long index) { assertTrue(record.get(FIELD_COLUMN3).toString().startsWith(index + "_")); assertTrue(record.get(FIELD_COLUMN4).toString().startsWith(index + "_")); } + + @Override + protected String getFileExtension() { + return FILE_EXTENSION; + } } diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/local/ParquetFileReaderTest.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/local/ParquetFileReaderTest.java index 8ed59ff..91c1eb6 100644 --- a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/local/ParquetFileReaderTest.java +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/local/ParquetFileReaderTest.java @@ -1,6 +1,7 @@ package com.github.mmolimar.kafka.connect.fs.file.reader.local; import com.github.mmolimar.kafka.connect.fs.file.Offset; +import com.github.mmolimar.kafka.connect.fs.file.reader.AgnosticFileReader; import com.github.mmolimar.kafka.connect.fs.file.reader.ParquetFileReader; import org.apache.avro.AvroRuntimeException; import org.apache.avro.Schema; @@ -34,21 +35,26 @@ public class ParquetFileReaderTest extends LocalFileReaderTestBase { private static final String FIELD_INDEX = "index"; private static final String FIELD_NAME = "name"; private static final String FIELD_SURNAME = "surname"; + private static final String FILE_EXTENSION = "prqt"; private static Schema readerSchema; private static Schema projectionSchema; @BeforeClass public static void setUp() throws IOException { - readerClass = ParquetFileReader.class; + readerClass = AgnosticFileReader.class; dataFile = createDataFile(); - readerConfig = new HashMap<>(); + readerConfig = new HashMap() {{ + put(AgnosticFileReader.FILE_READER_AGNOSTIC_EXTENSIONS_PARQUET, FILE_EXTENSION); + }}; } private static Path createDataFile() throws IOException { - File parquetFile = File.createTempFile("test-", ".parquet"); - readerSchema = new Schema.Parser().parse(ParquetFileReaderTest.class.getResourceAsStream("/file/reader/schemas/people.avsc")); - projectionSchema = new Schema.Parser().parse(ParquetFileReaderTest.class.getResourceAsStream("/file/reader/schemas/people_projection.avsc")); + File parquetFile = File.createTempFile("test-", "." + FILE_EXTENSION); + readerSchema = new Schema.Parser().parse( + ParquetFileReaderTest.class.getResourceAsStream("/file/reader/schemas/people.avsc")); + projectionSchema = new Schema.Parser().parse( + ParquetFileReaderTest.class.getResourceAsStream("/file/reader/schemas/people_projection.avsc")); try (ParquetWriter writer = AvroParquetWriter.builder(new Path(parquetFile.toURI())) .withConf(fs.getConf()).withWriteMode(ParquetFileWriter.Mode.OVERWRITE).withSchema(readerSchema).build()) { @@ -75,6 +81,7 @@ private static Path createDataFile() throws IOException { public void readerWithSchema() throws Throwable { Map cfg = new HashMap() {{ put(ParquetFileReader.FILE_READER_PARQUET_SCHEMA, readerSchema.toString()); + put(AgnosticFileReader.FILE_READER_AGNOSTIC_EXTENSIONS_PARQUET, getFileExtension()); }}; reader = getReader(FileSystem.newInstance(fsUri, new Configuration()), dataFile, cfg); readAllData(); @@ -84,6 +91,7 @@ public void readerWithSchema() throws Throwable { public void readerWithProjection() throws Throwable { Map cfg = new HashMap() {{ put(ParquetFileReader.FILE_READER_PARQUET_PROJECTION, projectionSchema.toString()); + put(AgnosticFileReader.FILE_READER_AGNOSTIC_EXTENSIONS_PARQUET, getFileExtension()); }}; reader = getReader(FileSystem.newInstance(fsUri, new Configuration()), dataFile, cfg); while (reader.hasNext()) { @@ -105,6 +113,7 @@ public void readerWithInvalidProjection() throws Throwable { .endRecord(); Map cfg = new HashMap() {{ put(ParquetFileReader.FILE_READER_PARQUET_PROJECTION, testSchema.toString()); + put(AgnosticFileReader.FILE_READER_AGNOSTIC_EXTENSIONS_PARQUET, getFileExtension()); }}; reader = getReader(FileSystem.newInstance(fsUri, new Configuration()), dataFile, cfg); readAllData(); @@ -114,6 +123,7 @@ public void readerWithInvalidProjection() throws Throwable { public void readerWithInvalidSchema() throws Throwable { Map cfg = new HashMap() {{ put(ParquetFileReader.FILE_READER_PARQUET_SCHEMA, Schema.create(Schema.Type.STRING).toString()); + put(AgnosticFileReader.FILE_READER_AGNOSTIC_EXTENSIONS_PARQUET, getFileExtension()); }}; reader = getReader(FileSystem.newInstance(fsUri, new Configuration()), dataFile, cfg); readAllData(); @@ -123,6 +133,7 @@ public void readerWithInvalidSchema() throws Throwable { public void readerWithUnparseableSchema() throws Throwable { Map cfg = new HashMap() {{ put(ParquetFileReader.FILE_READER_PARQUET_SCHEMA, "invalid schema"); + put(AgnosticFileReader.FILE_READER_AGNOSTIC_EXTENSIONS_PARQUET, getFileExtension()); }}; getReader(FileSystem.newInstance(fsUri, new Configuration()), dataFile, cfg); } @@ -138,4 +149,10 @@ protected void checkData(Struct record, long index) { assertTrue(record.get(FIELD_NAME).toString().startsWith(index + "_")); assertTrue(record.get(FIELD_SURNAME).toString().startsWith(index + "_")); } + + @Override + protected String getFileExtension() { + return FILE_EXTENSION; + } + } diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/local/SequenceFileReaderTest.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/local/SequenceFileReaderTest.java index 05dd1d8..8d53cb8 100644 --- a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/local/SequenceFileReaderTest.java +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/local/SequenceFileReaderTest.java @@ -1,6 +1,7 @@ package com.github.mmolimar.kafka.connect.fs.file.reader.local; import com.github.mmolimar.kafka.connect.fs.file.Offset; +import com.github.mmolimar.kafka.connect.fs.file.reader.AgnosticFileReader; import com.github.mmolimar.kafka.connect.fs.file.reader.SequenceFileReader; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; @@ -26,19 +27,21 @@ public class SequenceFileReaderTest extends LocalFileReaderTestBase { private static final String FIELD_NAME_KEY = "custom_field_key"; private static final String FIELD_NAME_VALUE = "custom_field_name"; + private static final String FILE_EXTENSION = "sq"; @BeforeClass public static void setUp() throws IOException { - readerClass = SequenceFileReader.class; + readerClass = AgnosticFileReader.class; dataFile = createDataFile(); readerConfig = new HashMap() {{ put(SequenceFileReader.FILE_READER_SEQUENCE_FIELD_NAME_KEY, FIELD_NAME_KEY); put(SequenceFileReader.FILE_READER_SEQUENCE_FIELD_NAME_VALUE, FIELD_NAME_VALUE); + put(AgnosticFileReader.FILE_READER_AGNOSTIC_EXTENSIONS_SEQUENCE, FILE_EXTENSION); }}; } private static Path createDataFile() throws IOException { - File seqFile = File.createTempFile("test-", ".seq"); + File seqFile = File.createTempFile("test-", "." + FILE_EXTENSION); try (SequenceFile.Writer writer = SequenceFile.createWriter(fs.getConf(), SequenceFile.Writer.file(new Path(seqFile.getAbsolutePath())), SequenceFile.Writer.keyClass(IntWritable.class), SequenceFile.Writer.valueClass(Text.class))) { @@ -71,7 +74,9 @@ private static Path createDataFile() throws IOException { @Test public void defaultFieldNames() throws Throwable { - Map customReaderCfg = new HashMap(); + Map customReaderCfg = new HashMap() {{ + put(AgnosticFileReader.FILE_READER_AGNOSTIC_EXTENSIONS_SEQUENCE, getFileExtension()); + }}; reader = getReader(fs, dataFile, customReaderCfg); assertTrue(reader.getFilePath().equals(dataFile)); @@ -100,4 +105,9 @@ private void checkData(String keyFieldName, String valueFieldName, Struct record assertTrue((Integer) record.get(keyFieldName) == index); assertTrue(record.get(valueFieldName).toString().startsWith(index + "_")); } + + @Override + protected String getFileExtension() { + return FILE_EXTENSION; + } } diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/local/TextFileReaderTest.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/local/TextFileReaderTest.java index bd874ff..53d9a98 100644 --- a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/local/TextFileReaderTest.java +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/local/TextFileReaderTest.java @@ -1,6 +1,7 @@ package com.github.mmolimar.kafka.connect.fs.file.reader.local; import com.github.mmolimar.kafka.connect.fs.file.Offset; +import com.github.mmolimar.kafka.connect.fs.file.reader.AgnosticFileReader; import com.github.mmolimar.kafka.connect.fs.file.reader.TextFileReader; import org.apache.hadoop.fs.Path; import org.apache.kafka.connect.data.Struct; @@ -22,10 +23,11 @@ public class TextFileReaderTest extends LocalFileReaderTestBase { private static final String FIELD_NAME_VALUE = "custom_field_name"; + private static final String FILE_EXTENSION = "txt"; @BeforeClass public static void setUp() throws IOException { - readerClass = TextFileReader.class; + readerClass = AgnosticFileReader.class; dataFile = createDataFile(); readerConfig = new HashMap() {{ put(TextFileReader.FILE_READER_TEXT_FIELD_NAME_VALUE, FIELD_NAME_VALUE); @@ -33,7 +35,7 @@ public static void setUp() throws IOException { } private static Path createDataFile() throws IOException { - File txtFile = File.createTempFile("test-", ".txt"); + File txtFile = File.createTempFile("test-", "." + FILE_EXTENSION); try (FileWriter writer = new FileWriter(txtFile)) { IntStream.range(0, NUM_RECORDS).forEach(index -> { @@ -91,4 +93,10 @@ protected Offset getOffset(long offset) { protected void checkData(Struct record, long index) { assertTrue(record.get(FIELD_NAME_VALUE).toString().startsWith(index + "_")); } + + @Override + protected String getFileExtension() { + return FILE_EXTENSION; + } + } From 999dfe56b1c46c867316e20440d541344b53f930 Mon Sep 17 00:00:00 2001 From: Mario Molina Date: Sun, 10 Dec 2017 11:54:11 -0600 Subject: [PATCH 3/3] Agnostic file reader documentation --- docs/source/config_options.rst | 33 +++++++++++++++++++++++++++++++++ docs/source/filereaders.rst | 28 +++++++++++++++++++++++++--- 2 files changed, 58 insertions(+), 3 deletions(-) diff --git a/docs/source/config_options.rst b/docs/source/config_options.rst index 89b5d44..c2ff638 100644 --- a/docs/source/config_options.rst +++ b/docs/source/config_options.rst @@ -276,3 +276,36 @@ In order to configure custom properties for this reader, the name you must use i * Type: string * Default: null * Importance: low + +Agnostic +-------------------------------------------- + +In order to configure custom properties for this reader, the name you must use is ``agnostic``. + +``file_reader.agnostic.extensions.parquet`` + A comma-separated string list with the accepted extensions for Parquet files. + + * Type: string + * Default: parquet + * Importance: medium + +``file_reader.agnostic.extensions.avro`` + A comma-separated string list with the accepted extensions for Avro files. + + * Type: string + * Default: avro + * Importance: medium + +``file_reader.agnostic.extensions.sequence`` + A comma-separated string list with the accepted extensions for Sequence files. + + * Type: string + * Default: seq + * Importance: medium + +``file_reader.agnostic.extensions.delimited`` + A comma-separated string list with the accepted extensions for Delimited text files. + + * Type: string + * Default: tsv,csv + * Importance: medium diff --git a/docs/source/filereaders.rst b/docs/source/filereaders.rst index b89d120..37c76f3 100644 --- a/docs/source/filereaders.rst +++ b/docs/source/filereaders.rst @@ -8,10 +8,12 @@ to Kafka is created by transforming the record by means of `Confluent avro-converter `__ API. +More information about properties of this file reader :ref:`here`. + Parquet ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -Read files with `Parquet `__ format. +Reads files with `Parquet `__ format. The reader takes advantage of the Parquet-Avro API and uses the Parquet file as if it were an Avro file, so the message sent to Kafka is built in the same @@ -22,6 +24,8 @@ way as the Avro file reader does. over and over again and has to seek the file, the performance can be affected. +More information about properties of this file reader :ref:`here`. + SequenceFile ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -32,8 +36,7 @@ This reader can process this file format and build a Kafka message with the key/value pair. These two values are named ``key`` and ``value`` in the message by default but you can customize these field names. -More information about properties of this file reader -:ref:`here`. +More information about properties of this file reader :ref:`here`. Text ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -44,6 +47,8 @@ Each line represents one record which will be in a field named ``value`` in the message sent to Kafka by default but you can customize these field names. +More information about properties of this file reader :ref:`here`. + Delimited text ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -56,3 +61,20 @@ Also, the token delimiter for columns is configurable. More information about properties of this file reader :ref:`here`. +Agnostic +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Actually, this reader is a wrapper of the readers listing above. + +It tries to read any kind of file format using an internal reader based on the file extension, +applying the proper one (Parquet, Avro, SecuenceFile, Text or Delimited text). In case of no +extension has been matched, the Text file reader will be applied. + +Default extensions for each format: +* Parquet: .parquet +* Avro: .avro +* SequenceFile: .seq +* Delimited text: .tsv, .csv +* Text: any other sort of file extension. + +More information about properties of this file reader :ref:`here`.