Skip to content

Commit

Permalink
Merge pull request #17 from mmolimar/feature/agnostic_freader
Browse files Browse the repository at this point in the history
New file reader: AgnosticFileReader
  • Loading branch information
mmolimar authored Dec 10, 2017
2 parents 1f93c45 + 999dfe5 commit 3a4b3f2
Show file tree
Hide file tree
Showing 16 changed files with 324 additions and 41 deletions.
33 changes: 33 additions & 0 deletions docs/source/config_options.rst
Original file line number Diff line number Diff line change
Expand Up @@ -276,3 +276,36 @@ In order to configure custom properties for this reader, the name you must use i
* Type: string
* Default: null
* Importance: low

Agnostic
--------------------------------------------

In order to configure custom properties for this reader, the name you must use is ``agnostic``.

``file_reader.agnostic.extensions.parquet``
A comma-separated string list with the accepted extensions for Parquet files.

* Type: string
* Default: parquet
* Importance: medium

``file_reader.agnostic.extensions.avro``
A comma-separated string list with the accepted extensions for Avro files.

* Type: string
* Default: avro
* Importance: medium

``file_reader.agnostic.extensions.sequence``
A comma-separated string list with the accepted extensions for Sequence files.

* Type: string
* Default: seq
* Importance: medium

``file_reader.agnostic.extensions.delimited``
A comma-separated string list with the accepted extensions for Delimited text files.

* Type: string
* Default: tsv,csv
* Importance: medium
28 changes: 25 additions & 3 deletions docs/source/filereaders.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ to Kafka is created by transforming the record by means of
`Confluent avro-converter <https://github.com/confluentinc/schema-registry/tree/master/avro-converter>`__
API.

More information about properties of this file reader :ref:`here<config_options-filereaders-avro>`.

Parquet
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Read files with `Parquet <https://parquet.apache.org/>`__ format.
Reads files with `Parquet <https://parquet.apache.org/>`__ format.

The reader takes advantage of the Parquet-Avro API and uses the Parquet file
as if it were an Avro file, so the message sent to Kafka is built in the same
Expand All @@ -22,6 +24,8 @@ way as the Avro file reader does.
over and over again and has to seek the file, the performance
can be affected.

More information about properties of this file reader :ref:`here<config_options-filereaders-parquet>`.

SequenceFile
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Expand All @@ -32,8 +36,7 @@ This reader can process this file format and build a Kafka message with the
key/value pair. These two values are named ``key`` and ``value`` in the message
by default but you can customize these field names.

More information about properties of this file reader
:ref:`here<config_options-filereaders-sequencefile>`.
More information about properties of this file reader :ref:`here<config_options-filereaders-sequencefile>`.

Text
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Expand All @@ -44,6 +47,8 @@ Each line represents one record which will be in a field
named ``value`` in the message sent to Kafka by default but you can
customize these field names.

More information about properties of this file reader :ref:`here<config_options-filereaders-text>`.

Delimited text
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Expand All @@ -56,3 +61,20 @@ Also, the token delimiter for columns is configurable.

More information about properties of this file reader :ref:`here<config_options-filereaders-delimited>`.

Agnostic
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Actually, this reader is a wrapper of the readers listing above.

It tries to read any kind of file format using an internal reader based on the file extension,
applying the proper one (Parquet, Avro, SecuenceFile, Text or Delimited text). In case of no
extension has been matched, the Text file reader will be applied.

Default extensions for each format:
* Parquet: .parquet
* Avro: .avro
* SequenceFile: .seq
* Delimited text: .tsv, .csv
* Text: any other sort of file extension.

More information about properties of this file reader :ref:`here<config_options-filereaders-agnostic>`.
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,8 @@ public final Struct next() {

protected abstract T nextRecord();

protected ReaderAdapter<T> getAdapter() {
return adapter;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package com.github.mmolimar.kafka.connect.fs.file.reader;

import com.github.mmolimar.kafka.connect.fs.file.Offset;
import com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.connect.data.Struct;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import static com.github.mmolimar.kafka.connect.fs.FsSourceTaskConfig.FILE_READER_PREFIX;

public class AgnosticFileReader extends AbstractFileReader<AgnosticFileReader.AgnosticRecord> {

private static final String FILE_READER_AGNOSTIC = FILE_READER_PREFIX + "agnostic.";
private static final String FILE_READER_AGNOSTIC_EXTENSIONS = FILE_READER_AGNOSTIC + "extensions.";
public static final String FILE_READER_AGNOSTIC_EXTENSIONS_PARQUET = FILE_READER_AGNOSTIC_EXTENSIONS + "parquet";
public static final String FILE_READER_AGNOSTIC_EXTENSIONS_AVRO = FILE_READER_AGNOSTIC_EXTENSIONS + "avro";
public static final String FILE_READER_AGNOSTIC_EXTENSIONS_SEQUENCE = FILE_READER_AGNOSTIC_EXTENSIONS + "sequence";
public static final String FILE_READER_AGNOSTIC_EXTENSIONS_DELIMITED = FILE_READER_AGNOSTIC_EXTENSIONS + "delimited";

private final AbstractFileReader reader;
private List<String> parquetExtensions, avroExtensions, sequenceExtensions, delimitedExtensions;

public AgnosticFileReader(FileSystem fs, Path filePath, Map<String, Object> config) throws IOException {
super(fs, filePath, new AgnosticAdapter(), config);

try {
reader = (AbstractFileReader) readerByExtension(fs, filePath, config);
} catch (RuntimeException | IOException e) {
throw e;
} catch (Throwable t) {
throw new IOException("An error has ocurred when creating a concrete reader", t);
}
}

private FileReader readerByExtension(FileSystem fs, Path filePath, Map<String, Object> config)
throws Throwable {
int index = filePath.getName().lastIndexOf('.');
String extension = index == -1 || index == filePath.getName().length() - 1 ? "" :
filePath.getName().substring(index + 1).toLowerCase();

Class<? extends FileReader> clz;
if (parquetExtensions.contains(extension)) {
clz = ParquetFileReader.class;
} else if (avroExtensions.contains(extension)) {
clz = AvroFileReader.class;
} else if (sequenceExtensions.contains(extension)) {
clz = SequenceFileReader.class;
} else if (delimitedExtensions.contains(extension)) {
clz = DelimitedTextFileReader.class;
} else {
clz = TextFileReader.class;
}

return ReflectionUtils.makeReader(clz, fs, filePath, config);
}

@Override
protected void configure(Map<String, Object> config) {
this.parquetExtensions = config.get(FILE_READER_AGNOSTIC_EXTENSIONS_PARQUET) == null ?
Arrays.asList("parquet") :
Arrays.asList(config.get(FILE_READER_AGNOSTIC_EXTENSIONS_PARQUET).toString().toLowerCase().split(","));
this.avroExtensions = config.get(FILE_READER_AGNOSTIC_EXTENSIONS_AVRO) == null ?
Arrays.asList("avro") :
Arrays.asList(config.get(FILE_READER_AGNOSTIC_EXTENSIONS_AVRO).toString().toLowerCase().split(","));
this.sequenceExtensions = config.get(FILE_READER_AGNOSTIC_EXTENSIONS_SEQUENCE) == null ?
Arrays.asList("seq") :
Arrays.asList(config.get(FILE_READER_AGNOSTIC_EXTENSIONS_SEQUENCE).toString().toLowerCase().split(","));
this.delimitedExtensions = config.get(FILE_READER_AGNOSTIC_EXTENSIONS_DELIMITED) == null ?
Arrays.asList("tsv", "csv") :
Arrays.asList(config.get(FILE_READER_AGNOSTIC_EXTENSIONS_DELIMITED).toString().toLowerCase().split(","));
}

@Override
public boolean hasNext() {
return reader.hasNext();
}

@Override
public void seek(Offset offset) {
reader.seek(offset);
}

@Override
public Offset currentOffset() {
return reader.currentOffset();
}

@Override
public void close() throws IOException {
reader.close();
}

@Override
protected AgnosticRecord nextRecord() {
return new AgnosticRecord(reader.getAdapter(), reader.nextRecord());
}

static class AgnosticAdapter implements ReaderAdapter<AgnosticRecord> {

public AgnosticAdapter() {
}

@Override
public Struct apply(AgnosticRecord ag) {
return ag.adapter.apply(ag.record);
}
}

static class AgnosticRecord {
private final ReaderAdapter<Object> adapter;
private final Object record;

public AgnosticRecord(ReaderAdapter<Object> adapter, Object record) {
this.adapter = adapter;
this.record = record;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,11 @@ public static Policy makePolicy(Class<? extends Policy> clazz, FsSourceTaskConfi

private static <T> T make(Class<T> clazz, Object... args) throws Throwable {
try {
if (args == null || args.length == 0) {
return (T) clazz.getConstructor().newInstance();
}
Class[] constClasses = Arrays.stream(args).map(arg -> arg.getClass()).toArray(Class[]::new);

Constructor constructor = ConstructorUtils.getMatchingAccessibleConstructor(clazz, constClasses);
return (T) constructor.newInstance(args);
} catch (NoSuchMethodException |
IllegalAccessException |
} catch (IllegalAccessException |
InstantiationException |
InvocationTargetException e) {
throw e.getCause();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,15 @@ public void fileDoesNotExist() throws Throwable {

@Test(expected = IOException.class)
public void emptyFile() throws Throwable {
File tmp = File.createTempFile("test-", "");
File tmp = File.createTempFile("test-", "." + getFileExtension());
Path path = new Path(new Path(fsUri), tmp.getName());
fs.moveFromLocalFile(new Path(tmp.getAbsolutePath()), path);
getReader(fs, path, readerConfig);
}

@Test(expected = IOException.class)
public void invalidFileFormat() throws Throwable {
File tmp = File.createTempFile("test-", "");
File tmp = File.createTempFile("test-", "." + getFileExtension());
try (BufferedWriter writer = new BufferedWriter(new FileWriter(tmp))) {
writer.write("test");
}
Expand Down Expand Up @@ -150,5 +150,6 @@ protected final FileReader getReader(FileSystem fs, Path path, Map<String, Objec

protected abstract void checkData(Struct record, long index);

protected abstract String getFileExtension();

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.github.mmolimar.kafka.connect.fs.file.reader.hdfs;

import com.github.mmolimar.kafka.connect.fs.file.Offset;
import com.github.mmolimar.kafka.connect.fs.file.reader.AgnosticFileReader;
import com.github.mmolimar.kafka.connect.fs.file.reader.AvroFileReader;
import org.apache.avro.AvroTypeException;
import org.apache.avro.Schema;
Expand Down Expand Up @@ -29,19 +30,20 @@ public class AvroFileReaderTest extends HdfsFileReaderTestBase {
private static final String FIELD_INDEX = "index";
private static final String FIELD_NAME = "name";
private static final String FIELD_SURNAME = "surname";
private static final String FILE_EXTENSION = "avro";

private static Schema schema;

@BeforeClass
public static void setUp() throws IOException {
schema = new Schema.Parser().parse(AvroFileReaderTest.class.getResourceAsStream("/file/reader/schemas/people.avsc"));
readerClass = AvroFileReader.class;
readerClass = AgnosticFileReader.class;
dataFile = createDataFile();
readerConfig = new HashMap<>();
}

private static Path createDataFile() throws IOException {
File avroFile = File.createTempFile("test-", ".avro");
File avroFile = File.createTempFile("test-", "." + FILE_EXTENSION);
DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(schema);
try (DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(writer)) {
dataFileWriter.setFlushOnEveryBlock(true);
Expand Down Expand Up @@ -103,4 +105,9 @@ protected void checkData(Struct record, long index) {
assertTrue(record.get(FIELD_NAME).toString().startsWith(index + "_"));
assertTrue(record.get(FIELD_SURNAME).toString().startsWith(index + "_"));
}

@Override
protected String getFileExtension() {
return FILE_EXTENSION;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.github.mmolimar.kafka.connect.fs.file.reader.hdfs;

import com.github.mmolimar.kafka.connect.fs.file.Offset;
import com.github.mmolimar.kafka.connect.fs.file.reader.AgnosticFileReader;
import com.github.mmolimar.kafka.connect.fs.file.reader.DelimitedTextFileReader;
import com.github.mmolimar.kafka.connect.fs.file.reader.FileReader;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -27,10 +28,11 @@ public class DelimitedTextFileReaderTest extends HdfsFileReaderTestBase {
private static final String FIELD_COLUMN2 = "column_2";
private static final String FIELD_COLUMN3 = "column_3";
private static final String FIELD_COLUMN4 = "column_4";
private static final String FILE_EXTENSION = "csv";

@BeforeClass
public static void setUp() throws IOException {
readerClass = DelimitedTextFileReader.class;
readerClass = AgnosticFileReader.class;
dataFile = createDataFile(true);
readerConfig = new HashMap<String, Object>() {{
put(DelimitedTextFileReader.FILE_READER_DELIMITED_TOKEN, ",");
Expand All @@ -39,7 +41,7 @@ public static void setUp() throws IOException {
}

private static Path createDataFile(boolean header) throws IOException {
File txtFile = File.createTempFile("test-", ".txt");
File txtFile = File.createTempFile("test-", "." + FILE_EXTENSION);
try (FileWriter writer = new FileWriter(txtFile)) {

if (header)
Expand Down Expand Up @@ -102,7 +104,7 @@ public void readAllDataWithoutHeader() throws Throwable {

@Test
public void readAllDataWithMalformedRows() throws Throwable {
File tmp = File.createTempFile("test-", "");
File tmp = File.createTempFile("test-", "." + getFileExtension());
try (FileWriter writer = new FileWriter(tmp)) {
writer.append(FIELD_COLUMN1 + "," + FIELD_COLUMN2 + "," + FIELD_COLUMN3 + "," + FIELD_COLUMN4 + "\n");
writer.append("dummy\n");
Expand Down Expand Up @@ -200,4 +202,9 @@ protected void checkData(Struct record, long index) {
assertTrue(record.get(FIELD_COLUMN3).toString().startsWith(index + "_"));
assertTrue(record.get(FIELD_COLUMN4).toString().startsWith(index + "_"));
}

@Override
protected String getFileExtension() {
return FILE_EXTENSION;
}
}
Loading

0 comments on commit 3a4b3f2

Please sign in to comment.