Skip to content

Commit

Permalink
Merge pull request #14 from mmolimar/feature/dfr_enhacement
Browse files Browse the repository at this point in the history
Default value in DelimitedTextFileReader
  • Loading branch information
mmolimar authored Dec 4, 2017
2 parents 7618fc7 + d41f475 commit 1f93c45
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 1 deletion.
7 changes: 7 additions & 0 deletions docs/source/config_options.rst
Original file line number Diff line number Diff line change
Expand Up @@ -269,3 +269,10 @@ In order to configure custom properties for this reader, the name you must use i
* Type: string
* Importance: medium

``file_reader.delimited.default_value``
Sets a default value in a column when its value is null. This is due to the record is malformed (it does not contain
all expected columns).

* Type: string
* Default: null
* Importance: low
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import java.io.IOException;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static com.github.mmolimar.kafka.connect.fs.FsSourceTaskConfig.FILE_READER_PREFIX;
Expand All @@ -18,13 +19,15 @@ public class DelimitedTextFileReader extends AbstractFileReader<DelimitedTextFil
public static final String FILE_READER_DELIMITED_HEADER = FILE_READER_DELIMITED + "header";
public static final String FILE_READER_DELIMITED_TOKEN = FILE_READER_DELIMITED + "token";
public static final String FILE_READER_DELIMITED_ENCODING = FILE_READER_DELIMITED + "encoding";
public static final String FILE_READER_DELIMITED_DEFAULT_VALUE = FILE_READER_DELIMITED + "default_value";

private static final String DEFAULT_COLUMN_NAME = "column";

private final TextFileReader inner;
private final Schema schema;
private DelimitedTextOffset offset;
private String token;
private String defaultValue;
private boolean hasHeader;

public DelimitedTextFileReader(FileSystem fs, Path filePath, Map<String, Object> config) throws IOException {
Expand Down Expand Up @@ -61,13 +64,29 @@ protected void configure(Map<String, Object> config) {
throw new IllegalArgumentException(FILE_READER_DELIMITED_TOKEN + " property cannot be empty for DelimitedTextFileReader");
}
this.token = config.get(FILE_READER_DELIMITED_TOKEN).toString();
this.defaultValue = config.get(FILE_READER_DELIMITED_DEFAULT_VALUE) == null ?
null : config.get(FILE_READER_DELIMITED_DEFAULT_VALUE).toString();
this.hasHeader = Boolean.valueOf((String) config.get(FILE_READER_DELIMITED_HEADER));
}

@Override
protected DelimitedRecord nextRecord() {
offset.inc();
return new DelimitedRecord(schema, inner.nextRecord().getValue().split(token));
String values[] = inner.nextRecord().getValue().split(token);
return new DelimitedRecord(schema, defaultValue != null ? fillNullValues(values) : values);
}

private String[] fillNullValues(final String[] values) {
return IntStream.range(0, schema.fields().size())
.mapToObj(index -> {
if (index < values.length) {
return values[index];
} else {
return defaultValue;
}
})
.collect(Collectors.toList())
.toArray(new String[0]);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,37 @@ public void readAllDataWithoutHeader() throws Throwable {

}

@Test
public void readAllDataWithMalformedRows() throws Throwable {
File tmp = File.createTempFile("test-", "");
try (FileWriter writer = new FileWriter(tmp)) {
writer.append(FIELD_COLUMN1 + "," + FIELD_COLUMN2 + "," + FIELD_COLUMN3 + "," + FIELD_COLUMN4 + "\n");
writer.append("dummy\n");
writer.append("dummy\n");
}
Map<String, Object> cfg = new HashMap<String, Object>() {{
put(DelimitedTextFileReader.FILE_READER_DELIMITED_TOKEN, ",");
put(DelimitedTextFileReader.FILE_READER_DELIMITED_HEADER, "true");
put(DelimitedTextFileReader.FILE_READER_DELIMITED_DEFAULT_VALUE, "custom_value");
}};
Path path = new Path(new Path(fsUri), tmp.getName());
fs.moveFromLocalFile(new Path(tmp.getAbsolutePath()), path);
reader = getReader(fs, path, cfg);

assertTrue(reader.hasNext());

int recordCount = 0;
while (reader.hasNext()) {
Struct record = reader.next();
assertTrue(record.get(FIELD_COLUMN1).equals("dummy"));
assertTrue(record.get(FIELD_COLUMN2).equals("custom_value"));
assertTrue(record.get(FIELD_COLUMN3).equals("custom_value"));
assertTrue(record.get(FIELD_COLUMN4).equals("custom_value"));
recordCount++;
}
assertEquals("The number of records in the file does not match", 2, recordCount);
}

@Test
public void seekFileWithoutHeader() throws Throwable {
Path file = createDataFile(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,37 @@ public void readAllDataWithoutHeader() throws Throwable {

}

@Test
public void readAllDataWithMalformedRows() throws Throwable {
File tmp = File.createTempFile("test-", "");
try (FileWriter writer = new FileWriter(tmp)) {
writer.append(FIELD_COLUMN1 + "," + FIELD_COLUMN2 + "," + FIELD_COLUMN3 + "," + FIELD_COLUMN4 + "\n");
writer.append("dummy\n");
writer.append("dummy\n");
}
Map<String, Object> cfg = new HashMap<String, Object>() {{
put(DelimitedTextFileReader.FILE_READER_DELIMITED_TOKEN, ",");
put(DelimitedTextFileReader.FILE_READER_DELIMITED_HEADER, "true");
put(DelimitedTextFileReader.FILE_READER_DELIMITED_DEFAULT_VALUE, "custom_value");
}};
Path path = new Path(new Path(fsUri), tmp.getName());
fs.moveFromLocalFile(new Path(tmp.getAbsolutePath()), path);
reader = getReader(fs, path, cfg);

assertTrue(reader.hasNext());

int recordCount = 0;
while (reader.hasNext()) {
Struct record = reader.next();
assertTrue(record.get(FIELD_COLUMN1).equals("dummy"));
assertTrue(record.get(FIELD_COLUMN2).equals("custom_value"));
assertTrue(record.get(FIELD_COLUMN3).equals("custom_value"));
assertTrue(record.get(FIELD_COLUMN4).equals("custom_value"));
recordCount++;
}
assertEquals("The number of records in the file does not match", 2, recordCount);
}

@Test
public void seekFileWithoutHeader() throws Throwable {
Path file = createDataFile(false);
Expand Down

0 comments on commit 1f93c45

Please sign in to comment.