diff --git a/Dockerfile b/Dockerfile index 387153d..7ae968c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM confluentinc/cp-kafka-connect-base:5.5.1 +FROM confluentinc/cp-kafka-connect-base:6.1.0 ARG PROJECT_VERSION ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" diff --git a/docker-compose.yml b/docker-compose.yml index cbbc7bd..a4354bd 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,7 +1,7 @@ version: '3' services: cp-zookeeper: - image: confluentinc/cp-zookeeper:5.5.1 + image: confluentinc/cp-zookeeper:6.1.0 hostname: zookeeper container_name: zookeeper ports: @@ -11,7 +11,7 @@ services: ZOOKEEPER_TICK_TIME: 2000 cp-kafka: - image: confluentinc/cp-kafka:5.5.1 + image: confluentinc/cp-kafka:6.1.0 hostname: kafka container_name: kafka depends_on: @@ -32,7 +32,7 @@ services: CONFLUENT_METRICS_ENABLE: 'false' cp-schema-registry: - image: confluentinc/cp-schema-registry:5.5.1 + image: confluentinc/cp-schema-registry:6.1.0 hostname: schema-registry container_name: schema-registry depends_on: @@ -45,7 +45,7 @@ services: SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181' connect-fs: - image: mmolimar/kafka-connect-fs:1.2.0 + image: mmolimar/kafka-connect-fs:1.3.0 container_name: connect depends_on: - cp-kafka diff --git a/docs/source/conf.py b/docs/source/conf.py index 7c79907..5e83243 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -55,9 +55,9 @@ # built documents. # # The short X.Y version. -version = '1.2' +version = '1.3' # The full version, including alpha/beta/rc tags. -release = '1.2' +release = '1.3' # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/docs/source/config_options.rst b/docs/source/config_options.rst index d82f238..38968c1 100644 --- a/docs/source/config_options.rst +++ b/docs/source/config_options.rst @@ -57,8 +57,10 @@ General config properties for this connector. If you want to ingest data from S3, you can add credentials with: ``policy.fs.fs.s3a.access.key=`` and - ``policy.fs.fs.s3a.secret.key=`` -   + ``policy.fs.fs.s3a.secret.key=``. + Also, in case you want to configure a custom credentials provider, you should use + ``policy.fs.fs.s3a.aws.credentials.provider=`` property. + ``topic`` Topic in which copy data to. @@ -224,7 +226,7 @@ HDFS file watcher In order to configure custom properties for this policy, the name you must use is ``hdfs_file_watcher``. ``policy.hdfs_file_watcher.poll`` - Time to wait until the records retrieved from the file watcher will be sent to the source task. + Time to wait (in milliseconds) until the records retrieved from the file watcher will be sent to the source task. * Type: long * Default: ``5000`` @@ -237,6 +239,52 @@ In order to configure custom properties for this policy, the name you must use i * Default: ``20000`` * Importance: medium +.. _config_options-policies-s3events: + +S3 event notifications +-------------------------------------------- + +In order to configure custom properties for this policy, the name you must use is ``s3_event_notifications``. + +``policy.s3_event_notifications.queue`` + SQS queue name to retrieve messages from. + + * Type: string + * Importance: high + +``policy.s3_event_notifications.poll`` + Time to wait (in milliseconds) until the records retrieved from the queue will be sent to the source task. + + * Type: long + * Default: ``5000`` + * Importance: medium + +``policy.s3_event_notifications.event_regex`` + Regular expression to filter event based on their types. + + * Type: string + * Default: ``.*`` + * Importance: medium + +``policy.s3_event_notifications.delete_messages`` + If messages from SQS should be removed after reading them. + + * Type: boolean + * Default: ``true`` + * Importance: medium + +``policy.s3_event_notifications.max_messages`` + Maximum number of messages to retrieve at a time (must be between 1 and 10). + + * Type: int + * Importance: medium + +``policy.s3_event_notifications.visibility_timeout`` + Duration (in seconds) that the received messages are hidden from subsequent retrieve requests. + + * Type: int + * Importance: low + .. _config_options-filereaders: File readers @@ -357,6 +405,13 @@ In order to configure custom properties for this reader, the name you must use i * Default: ``true`` * Importance: medium +``file_reader.cobol.reader.is_text`` + If line ending characters will be used (LF / CRLF) as the record separator. + + * Type: boolean + * Default: ``false`` + * Importance: medium + ``file_reader.cobol.reader.ebcdic_code_page`` Code page to be used for EBCDIC to ASCII / Unicode conversions. @@ -448,6 +503,13 @@ In order to configure custom properties for this reader, the name you must use i * Default: ``false`` * Importance: low +``file_reader.cobol.reader.record_length`` + Specifies the length of the record disregarding the copybook record size. Implied the file has fixed record length. + + * Type: int + * Default: ``null`` + * Importance: low + ``file_reader.cobol.reader.length_field_name`` The name for a field that contains the record length. If not set, the copybook record length will be used. @@ -539,6 +601,13 @@ In order to configure custom properties for this reader, the name you must use i * Default: ``null`` * Importance: low +``file_reader.cobol.reader.record_extractor`` + Parser to be used to parse records. + + * Type: string + * Default: ``null`` + * Importance: low + ``file_reader.cobol.reader.rhp_additional_info`` Extra option to be passed to a custom record header parser. @@ -546,6 +615,13 @@ In order to configure custom properties for this reader, the name you must use i * Default: ``null`` * Importance: low +``file_reader.cobol.reader.re_additional_info`` + A string provided for the raw record extractor. + + * Type: string + * Default: ```` + * Importance: low + ``file_reader.cobol.reader.input_file_name_column`` A column name to add to each record containing the input file name. @@ -553,6 +629,13 @@ In order to configure custom properties for this reader, the name you must use i * Default: ```` * Importance: low +.. _config_options-filereaders-binary: + +Binary +-------------------------------------------- + +There are no extra configuration options for this file reader. + .. _config_options-filereaders-csv: CSV @@ -1258,6 +1341,13 @@ To configure custom properties for this reader, the name you must use is ``agnos * Default: ``dat`` * Importance: medium +``file_reader.agnostic.extensions.binary`` + A comma-separated string list with the accepted extensions for binary files. + + * Type: string[] + * Default: ``bin`` + * Importance: medium + ``file_reader.agnostic.extensions.csv`` A comma-separated string list with the accepted extensions for CSV files. diff --git a/docs/source/connector.rst b/docs/source/connector.rst index f5e7891..70f7a2a 100644 --- a/docs/source/connector.rst +++ b/docs/source/connector.rst @@ -160,6 +160,7 @@ The are several file readers included which can read the following file formats: * ORC. * SequenceFile. * Cobol / EBCDIC. +* Other binary files. * CSV. * TSV. * Fixed-width. diff --git a/docs/source/filereaders.rst b/docs/source/filereaders.rst index d1297a2..5ebe05c 100644 --- a/docs/source/filereaders.rst +++ b/docs/source/filereaders.rst @@ -60,6 +60,26 @@ translate it into a Kafka message with the schema. More information about properties of this file reader :ref:`here`. +Binary +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +All other kind of binary files can be ingested using this reader. + +It just extracts the content plus some metadata such as: path, file owner, file group, length, access time, +and modification time. + +Each message will contain the following schema: + + * ``path``: File path (string). + * ``owner``: Owner of the file. (string). + * ``group``: Group associated with the file. (string). + * ``length``: Length of this file, in bytes. (long). + * ``access_time``: Access time of the file. (long). + * ``modification_time``: Modification time of the file (long). + * ``content``: Content of the file (bytes). + +More information about properties of this file reader :ref:`here`. + CSV ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -153,6 +173,7 @@ Default extensions for each format (configurable): * ORC: ``.orc`` * SequenceFile: ``.seq`` * Cobol / EBCDIC: ``.dat`` +* Other binary files: ``.bin`` * CSV: ``.csv`` * TSV: ``.tsv`` * FixedWidth: ``.fixed`` diff --git a/docs/source/policies.rst b/docs/source/policies.rst index 3225de6..4e39447 100644 --- a/docs/source/policies.rst +++ b/docs/source/policies.rst @@ -36,3 +36,15 @@ You can learn more about the properties of this policy :ref:`here`. diff --git a/pom.xml b/pom.xml index 0afe9f9..bd7de88 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ com.github.mmolimar.kafka.connect kafka-connect-fs - 1.2.0 + 1.3.0 jar kafka-connect-fs @@ -46,31 +46,31 @@ UTF-8 - 2.6.0 - 5.5.1 + 2.7.0 + 6.1.0 3.3.0 - hadoop3-2.1.5 + hadoop3-2.2.0 1.11.1 - 1.6.3 - 2.9.0 + 1.6.7 + 2.9.1 2.10.2 - 2.1.1 - 2.12.12 - 9.1.1 + 2.2.0 + 2.12.13 + 9.1.3 0.1.55 - 5.7.0 + 5.7.1 4.2 - 2.0.7 + 2.0.9 1.8 ${maven-compiler.source} 3.2.0 3.8.1 4.4.0 3.3.0 - 0.8.5 + 0.8.6 4.3.0 3.0.0-M5 - 0.11.3 + 0.12.0 @@ -139,6 +139,12 @@ za.co.absa.cobrix cobol-parser_2.12 ${cobrix.version} + + + org.scala-lang + scala-library + + com.cronutils @@ -150,6 +156,11 @@ jsch ${jsch.version} + + org.scala-lang + scala-library + ${scala.version} + @@ -298,11 +309,12 @@ into Kafka. The following file types are supported: Parquet, Avro, ORC, SequenceFile, - Cobol / EBCDIC, CSV, TSV, Fixed-width, JSON, XML, YAML and Text. + Cobol / EBCDIC, other binary files, CSV, TSV, Fixed-width, JSON, XML, YAML and Text. - Also, the connector has built-in support for file systems such as HDFS, S3, - Google Cloud Storage, Azure Blob Storage, Azure Data Lake Store, FTP, SFTP and - local file system, among others. + Also, the connector has built-in support for file systems such as HDFS, S3 (directly + or via messages from SNS/SQS queues due to S3 event notifications), Google Cloud + Storage, Azure Blob Storage, Azure Data Lake Store, FTP, SFTP and local file system, + among others. ]]> https://github.com/mmolimar/kafka-connect-fs @@ -340,6 +352,7 @@ orc sequence cobol + binary csv tsv fixed diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/FsSourceTask.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/FsSourceTask.java index 979695a..45d9a46 100644 --- a/src/main/java/com/github/mmolimar/kafka/connect/fs/FsSourceTask.java +++ b/src/main/java/com/github/mmolimar/kafka/connect/fs/FsSourceTask.java @@ -46,6 +46,7 @@ public String version() { } @Override + @SuppressWarnings("unchecked") public void start(Map properties) { log.info("{} Starting FS source task...", this); try { @@ -94,7 +95,7 @@ public List poll() { Struct record = reader.next(); // TODO change FileReader interface in the next major version boolean hasNext = (reader instanceof AbstractFileReader) ? - ((AbstractFileReader) reader).hasNextBatch() || reader.hasNext() : reader.hasNext(); + ((AbstractFileReader) reader).hasNextBatch() || reader.hasNext() : reader.hasNext(); records.add(convert(metadata, reader.currentOffset(), !hasNext, record)); } } catch (IOException | ConnectException e) { diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/AgnosticFileReader.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/AgnosticFileReader.java index 8761a9d..63e5c7f 100644 --- a/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/AgnosticFileReader.java +++ b/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/AgnosticFileReader.java @@ -24,6 +24,7 @@ public class AgnosticFileReader extends AbstractFileReader reader; private Set parquetExtensions, avroExtensions, sequenceExtensions, orcExtensions, cobolExtensions, - csvExtensions, tsvExtensions, fixedExtensions, jsonExtensions, xmlExtensions, yamlExtensions; + binaryExtensions, csvExtensions, tsvExtensions, fixedExtensions, jsonExtensions, xmlExtensions, + yamlExtensions; public AgnosticFileReader(FileSystem fs, Path filePath, Map config) throws Exception { super(fs, filePath, new AgnosticAdapter(), config); @@ -46,12 +48,13 @@ public AgnosticFileReader(FileSystem fs, Path filePath, Map conf } } + @SuppressWarnings("unchecked") private AbstractFileReader readerByExtension(FileSystem fs, Path filePath, Map config) { int index = filePath.getName().lastIndexOf('.'); String extension = index == -1 || index == filePath.getName().length() - 1 ? "" : filePath.getName().substring(index + 1).toLowerCase(); - Class clz; + Class> clz; if (parquetExtensions.contains(extension)) { clz = ParquetFileReader.class; } else if (avroExtensions.contains(extension)) { @@ -62,6 +65,8 @@ private AbstractFileReader readerByExtension(FileSystem fs, Path filePat clz = OrcFileReader.class; } else if (cobolExtensions.contains(extension)) { clz = CobolFileReader.class; + } else if (binaryExtensions.contains(extension)) { + clz = BinaryFileReader.class; } else if (csvExtensions.contains(extension)) { clz = CsvFileReader.class; } else if (tsvExtensions.contains(extension)) { @@ -93,6 +98,8 @@ protected void configure(Map config) { .toLowerCase().split(",")).collect(Collectors.toSet()); this.cobolExtensions = Arrays.stream(config.getOrDefault(FILE_READER_AGNOSTIC_EXTENSIONS_COBOL, "dat") .toLowerCase().split(",")).collect(Collectors.toSet()); + this.binaryExtensions = Arrays.stream(config.getOrDefault(FILE_READER_AGNOSTIC_EXTENSIONS_BINARY, "bin") + .toLowerCase().split(",")).collect(Collectors.toSet()); this.csvExtensions = Arrays.stream(config.getOrDefault(FILE_READER_AGNOSTIC_EXTENSIONS_CSV, "csv") .toLowerCase().split(",")).collect(Collectors.toSet()); this.tsvExtensions = Arrays.stream(config.getOrDefault(FILE_READER_AGNOSTIC_EXTENSIONS_TSV, "tsv") diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/BinaryFileReader.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/BinaryFileReader.java new file mode 100644 index 0000000..fba3021 --- /dev/null +++ b/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/BinaryFileReader.java @@ -0,0 +1,149 @@ +package com.github.mmolimar.kafka.connect.fs.file.reader; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; + +import java.io.ByteArrayOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.util.Map; + +public class BinaryFileReader extends AbstractFileReader { + + private static final String FIELD_PATH = "path"; + private static final String FIELD_OWNER = "owner"; + private static final String FIELD_GROUP = "group"; + private static final String FIELD_LENGTH = "length"; + private static final String FIELD_ACCESS_TIME = "access_time"; + private static final String FIELD_MODIFICATION_TIME = "modification_time"; + private static final String FIELD_CONTENT = "content"; + + protected static final int NUM_RECORDS = 1; + + private final FileStatus fileStatus; + private final Schema schema; + + private FSDataInputStream is; + private boolean closed; + + public BinaryFileReader(FileSystem fs, Path filePath, Map config) throws IOException { + super(fs, filePath, new BinaryToStruct(), config); + + this.is = getFs().open(getFilePath()); + this.fileStatus = getFs().getFileStatus(getFilePath()); + this.schema = buildSchema(); + this.closed = false; + } + + @Override + protected void configure(Map config) { + } + + @Override + protected BinaryRecord nextRecord() throws IOException { + return new BinaryRecord(schema, fileStatus, readFully(is)); + } + + @Override + protected boolean hasNextRecord() throws IOException { + return is.available() > 0; + } + + @Override + protected void seekFile(long offset) throws IOException { + if (offset == 0 && !isClosed()) { + is = getFs().open(getFilePath()); + } else if (!isClosed()){ + readFully(is); + } + } + + @Override + public void close() throws IOException { + closed = true; + is.close(); + } + + @Override + public boolean isClosed() { + return closed; + } + + private Schema buildSchema() { + return SchemaBuilder.struct() + .field(FIELD_PATH, Schema.STRING_SCHEMA) + .field(FIELD_OWNER, Schema.STRING_SCHEMA) + .field(FIELD_GROUP, Schema.STRING_SCHEMA) + .field(FIELD_LENGTH, Schema.INT64_SCHEMA) + .field(FIELD_ACCESS_TIME, Schema.INT64_SCHEMA) + .field(FIELD_MODIFICATION_TIME, Schema.INT64_SCHEMA) + .field(FIELD_CONTENT, Schema.BYTES_SCHEMA) + .build(); + } + + private byte[] readFully(FSDataInputStream in) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try { + while (true) { + baos.write(in.readByte()); + } + } catch (EOFException ignored) { + } + return baos.toByteArray(); + } + + static class BinaryToStruct implements ReaderAdapter { + + @Override + public Struct apply(BinaryRecord record) { + Struct struct = new Struct(record.schema); + record.schema.fields().forEach(field -> { + Object value = null; + switch (field.name()) { + case FIELD_PATH: + value = record.fileStatus.getPath().toString(); + break; + case FIELD_OWNER: + value = record.fileStatus.getOwner(); + break; + case FIELD_GROUP: + value = record.fileStatus.getGroup(); + break; + case FIELD_LENGTH: + value = record.fileStatus.getLen(); + break; + case FIELD_ACCESS_TIME: + value = record.fileStatus.getAccessTime(); + break; + case FIELD_MODIFICATION_TIME: + value = record.fileStatus.getModificationTime(); + break; + case FIELD_CONTENT: + value = record.content; + break; + } + struct.put(field, value); + }); + return struct; + } + } + + static class BinaryRecord { + + private final Schema schema; + private final FileStatus fileStatus; + private final byte[] content; + + BinaryRecord(Schema schema, FileStatus fileStatus, byte[] content) { + this.schema = schema; + this.fileStatus = fileStatus; + this.content = content; + } + + } +} diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/CobolFileReader.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/CobolFileReader.java index 0c835df..d8e9071 100644 --- a/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/CobolFileReader.java +++ b/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/CobolFileReader.java @@ -51,12 +51,14 @@ public class CobolFileReader extends AbstractFileReader> initIterator() throws Exception { stream.close(); } stream = new FSStream(getFs(), getFilePath()); - return asJavaIterator(reader.getRecordIterator(stream, 0, 0, 0).map(it -> seqAsJavaList(it.seq()))); + return asJavaIterator(reader.getRecordIterator(stream, 0, 0, 0) + .map(it -> seqAsJavaList(it.seq()))); } private Schema extractSchema(CobolSchema cobolSchema) { @@ -124,7 +129,8 @@ private Schema schemaForField(Statement statement) { if (statement instanceof Group) { Group group = (Group) statement; SchemaBuilder childrenBuilder = SchemaBuilder.struct(); - seqAsJavaList(group.children()).forEach(child -> childrenBuilder.field(child.name(), schemaForField(child))); + seqAsJavaList(group.children()) + .forEach(child -> childrenBuilder.field(child.name(), schemaForField(child))); SchemaBuilder builder; if (group.isArray()) { builder = SchemaBuilder.array(childrenBuilder.build()); @@ -188,21 +194,23 @@ private String copybookContent(Map config) { private ReaderParameters getReaderParameters(Map config) { return new ReaderParameters( Boolean.parseBoolean(config.getOrDefault(FILE_READER_COBOL_READER_IS_EBCDIC, "true")), // isEbcdic + Boolean.parseBoolean(config.getOrDefault(FILE_READER_COBOL_READER_IS_TEXT, "false")), // isText config.getOrDefault(FILE_READER_COBOL_READER_EBCDIC_CODE_PAGE, "common"), // ebcdicCodePage scala.Option.apply(config.get(FILE_READER_COBOL_READER_EBCDIC_CODE_PAGE_CLASS)), // ebcdicCodePageClass config.getOrDefault(FILE_READER_COBOL_READER_ASCII_CHARSET, ""), // asciiCharset Boolean.parseBoolean(config.getOrDefault(FILE_READER_COBOL_READER_IS_UFT16_BIG_ENDIAN, "true")), // isUtf16BigEndian FloatingPointFormat$.MODULE$.withNameOpt(config.getOrDefault(FILE_READER_COBOL_READER_FLOATING_POINT_FORMAT, "ibm")).get(), // floatingPointFormat Boolean.parseBoolean(config.getOrDefault(FILE_READER_COBOL_READER_VARIABLE_SIZE_OCCURS, "false")), // variableSizeOccurs + scala.Option.apply(config.get(FILE_READER_COBOL_READER_RECORD_LENGTH)).map(Integer::parseInt), // recordLength scala.Option.apply(config.get(FILE_READER_COBOL_READER_LENGTH_FIELD_NAME)), // lengthFieldName Boolean.parseBoolean(config.getOrDefault(FILE_READER_COBOL_READER_IS_RECORD_SEQUENCE, "false")), // isRecordSequence Boolean.parseBoolean(config.getOrDefault(FILE_READER_COBOL_READER_IS_RDW_BIG_ENDIAN, "false")), // isRdwBigEndian Boolean.parseBoolean(config.getOrDefault(FILE_READER_COBOL_READER_IS_RDW_PART_REC_LENGTH, "false")), // isRdwPartRecLength Integer.parseInt(config.getOrDefault(FILE_READER_COBOL_READER_RDW_ADJUSTMENT, "0")), // rdwAdjustment Boolean.parseBoolean(config.getOrDefault(FILE_READER_COBOL_READER_IS_INDEX_GENERATION_NEEDED, "false")), // isIndexGenerationNeeded - scala.Option.apply(config.get(FILE_READER_COBOL_READER_INPUT_SPLIT_RECORDS)), // inputSplitRecords - scala.Option.apply(config.get(FILE_READER_COBOL_READER_INPUT_SPLIT_SIZE_MB)), // inputSplitSizeMB - scala.Option.apply(config.get(FILE_READER_COBOL_READER_HDFS_DEFAULT_BLOCK_SIZE)), // hdfsDefaultBlockSize + scala.Option.apply(config.get(FILE_READER_COBOL_READER_INPUT_SPLIT_RECORDS)).map(Integer::parseInt), // inputSplitRecords + scala.Option.apply(config.get(FILE_READER_COBOL_READER_INPUT_SPLIT_SIZE_MB)).map(Integer::parseInt), // inputSplitSizeMB + scala.Option.apply(config.get(FILE_READER_COBOL_READER_HDFS_DEFAULT_BLOCK_SIZE)).map(Integer::parseInt), // hdfsDefaultBlockSize Integer.parseInt(config.getOrDefault(FILE_READER_COBOL_READER_START_OFFSET, "0")), // startOffset Integer.parseInt(config.getOrDefault(FILE_READER_COBOL_READER_END_OFFSET, "0")), // endOffset Integer.parseInt(config.getOrDefault(FILE_READER_COBOL_READER_FILE_START_OFFSET, "0")), // fileStartOffset @@ -218,7 +226,9 @@ private ReaderParameters getReaderParameters(Map config) { scala.collection.immutable.Map$.MODULE$.empty(), // occursMappings DebugFieldsPolicy$.MODULE$.withNameOpt(config.getOrDefault(FILE_READER_COBOL_READER_DEBUG_FIELDS_POLICY, "none")).get(), // debugFieldsPolicy scala.Option.apply(config.get(FILE_READER_COBOL_READER_RECORD_HEADER_PARSER)), // recordHeaderParser + scala.Option.apply(config.get(FILE_READER_COBOL_READER_RECORD_EXTRACTOR)), // recordExtractor scala.Option.apply(config.get(FILE_READER_COBOL_READER_RHP_ADDITIONAL_INFO)), // rhpAdditionalInfo + config.getOrDefault(FILE_READER_COBOL_READER_RE_ADDITIONAL_INFO, ""), // reAdditionalInfo config.getOrDefault(FILE_READER_COBOL_READER_INPUT_FILE_NAME_COLUMN, "") // inputFileNameColumn ); } @@ -267,16 +277,14 @@ public void close() { private static class FSStream implements SimpleStream { - private final FileSystem fs; private final Path file; private final FSDataInputStream stream; private final long size; private long offset; FSStream(FileSystem fs, Path file) throws IOException { - this.fs = fs; this.file = file; - this.stream = this.fs.open(file); + this.stream = fs.open(file); this.size = fs.getContentSummary(file).getLength(); this.offset = stream.getPos(); } @@ -313,6 +321,7 @@ public void close() throws IOException { static class CobolToStruct implements ReaderAdapter { + @SuppressWarnings("unchecked") public Struct apply(CobolRecord record) { Struct struct = new Struct(record.schema); record.row.stream() @@ -324,6 +333,7 @@ public Struct apply(CobolRecord record) { return struct; } + @SuppressWarnings("unchecked") private Object mapValue(Schema schema, String fieldName, Object value) { if (value == null) { return null; @@ -374,6 +384,7 @@ private Map mapValues(Group group, Object[] values) { .collect(HashMap::new, (m, e) -> m.put(e.getKey(), e.getValue()), HashMap::putAll); } + @SuppressWarnings("unchecked") private Map.Entry transform(Statement child, Object value) { Object childValue; if (child instanceof Group && value instanceof Map) { diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/policy/AbstractPolicy.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/policy/AbstractPolicy.java index 5e35da4..d55cf90 100644 --- a/src/main/java/com/github/mmolimar/kafka/connect/fs/policy/AbstractPolicy.java +++ b/src/main/java/com/github/mmolimar/kafka/connect/fs/policy/AbstractPolicy.java @@ -156,7 +156,7 @@ protected void postCheck() { public Iterator listFiles(FileSystem fs) throws IOException { return new Iterator() { - RemoteIterator it = fs.listFiles(fs.getWorkingDirectory(), recursive); + final RemoteIterator it = fs.listFiles(fs.getWorkingDirectory(), recursive); LocatedFileStatus current = null; private TailCall hasNextRec() { @@ -218,6 +218,7 @@ FileMetadata toMetadata(LocatedFileStatus fileStatus) { } @Override + @SuppressWarnings("unchecked") public FileReader offer(FileMetadata metadata, Map offsetMap) { FileSystem current = fileSystems.stream() .filter(fs -> metadata.getPath().startsWith(fs.getWorkingDirectory().toString())) diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/policy/HdfsFileWatcherPolicy.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/policy/HdfsFileWatcherPolicy.java index b89923c..8cc64f7 100644 --- a/src/main/java/com/github/mmolimar/kafka/connect/fs/policy/HdfsFileWatcherPolicy.java +++ b/src/main/java/com/github/mmolimar/kafka/connect/fs/policy/HdfsFileWatcherPolicy.java @@ -118,11 +118,6 @@ public void close() throws IOException { super.close(); } - @Override - public String toString() { - return this.getClass().getSimpleName(); - } - private class EventStreamThread extends Thread { private final FileSystem fs; private final HdfsAdmin admin; diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/policy/S3EventNotificationsPolicy.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/policy/S3EventNotificationsPolicy.java new file mode 100644 index 0000000..facdf15 --- /dev/null +++ b/src/main/java/com/github/mmolimar/kafka/connect/fs/policy/S3EventNotificationsPolicy.java @@ -0,0 +1,224 @@ +package com.github.mmolimar.kafka.connect.fs.policy; + +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.AmazonSQSClientBuilder; +import com.amazonaws.services.sqs.model.Message; +import com.amazonaws.services.sqs.model.ReceiveMessageRequest; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.mmolimar.kafka.connect.fs.FsSourceTaskConfig; +import com.github.mmolimar.kafka.connect.fs.file.FileMetadata; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.*; +import java.util.stream.Stream; + +public class S3EventNotificationsPolicy extends AbstractPolicy { + + private static final Logger log = LoggerFactory.getLogger(S3EventNotificationsPolicy.class); + private static final long DEFAULT_POLL = 5000L; + private static final String S3_EVENT_NOTIFICATIONS_POLICY_PREFIX = FsSourceTaskConfig.POLICY_PREFIX + + "s3_event_notifications."; + + public static final String S3_EVENT_NOTIFICATIONS_POLICY_QUEUE = S3_EVENT_NOTIFICATIONS_POLICY_PREFIX + "queue"; + public static final String S3_EVENT_NOTIFICATIONS_POLICY_POLL_MS = S3_EVENT_NOTIFICATIONS_POLICY_PREFIX + "poll"; + public static final String S3_EVENT_NOTIFICATIONS_POLICY_EVENT_REGEX = S3_EVENT_NOTIFICATIONS_POLICY_PREFIX + "event_regex"; + public static final String S3_EVENT_NOTIFICATIONS_POLICY_DELETE_MESSAGES = S3_EVENT_NOTIFICATIONS_POLICY_PREFIX + "delete_messages"; + public static final String S3_EVENT_NOTIFICATIONS_POLICY_MAX_MESSAGES = S3_EVENT_NOTIFICATIONS_POLICY_PREFIX + "max_messages"; + public static final String S3_EVENT_NOTIFICATIONS_POLICY_VISIBILITY_TIMEOUT = S3_EVENT_NOTIFICATIONS_POLICY_PREFIX + "visibility_timeout"; + + private final Time time; + private final ObjectMapper mapper; + private final AmazonSQS sqs; + private final ReceiveMessageRequest request; + private final String queueUrl; + + private String queue; + private String eventNameRegex; + private Integer maxNumberOfMessages; + private Integer visibilityTimeout; + private long pollSleepMs; + private boolean isShutDown; + private boolean deleteMessages; + + public S3EventNotificationsPolicy(FsSourceTaskConfig conf) throws IOException { + super(conf); + + this.time = new SystemTime(); + this.mapper = new ObjectMapper(); + this.sqs = getSqsClient(); + this.queueUrl = this.sqs.getQueueUrl(getQueue()).getQueueUrl(); + this.request = new ReceiveMessageRequest() + .withMaxNumberOfMessages(maxNumberOfMessages) + .withVisibilityTimeout(visibilityTimeout) + .withQueueUrl(queueUrl); + this.isShutDown = false; + } + + @Override + protected void configPolicy(Map customConfigs) { + try { + this.pollSleepMs = Long.parseLong((String) customConfigs + .getOrDefault(S3_EVENT_NOTIFICATIONS_POLICY_POLL_MS, String.valueOf(DEFAULT_POLL))); + } catch (NumberFormatException nfe) { + throw new ConfigException(S3_EVENT_NOTIFICATIONS_POLICY_POLL_MS + " property is required and must be a " + + "number (long). Got: " + customConfigs.get(S3_EVENT_NOTIFICATIONS_POLICY_POLL_MS)); + } + + this.eventNameRegex = customConfigs.getOrDefault(S3_EVENT_NOTIFICATIONS_POLICY_EVENT_REGEX, ".*").toString(); + + try { + if (customConfigs.containsKey(S3_EVENT_NOTIFICATIONS_POLICY_MAX_MESSAGES)) { + this.maxNumberOfMessages = Integer.valueOf((String) customConfigs + .get(S3_EVENT_NOTIFICATIONS_POLICY_MAX_MESSAGES)); + if (this.maxNumberOfMessages < 1 || this.maxNumberOfMessages > 10) { + throw new IllegalArgumentException("Max number of messages must be between 1 and 10."); + } + } + } catch (IllegalArgumentException e) { + throw new ConfigException(S3_EVENT_NOTIFICATIONS_POLICY_MAX_MESSAGES + " property is required and " + + "must be an integer between 1 and 10. Got: " + customConfigs.get(S3_EVENT_NOTIFICATIONS_POLICY_MAX_MESSAGES)); + } + + try { + if (customConfigs.containsKey(S3_EVENT_NOTIFICATIONS_POLICY_VISIBILITY_TIMEOUT)) { + this.visibilityTimeout = Integer.valueOf((String) customConfigs + .get(S3_EVENT_NOTIFICATIONS_POLICY_VISIBILITY_TIMEOUT)); + } + } catch (NumberFormatException nfe) { + throw new ConfigException(S3_EVENT_NOTIFICATIONS_POLICY_VISIBILITY_TIMEOUT + " property is required and " + + "must be a number (int). Got: " + customConfigs.get(S3_EVENT_NOTIFICATIONS_POLICY_VISIBILITY_TIMEOUT)); + } + + if (customConfigs.get(S3_EVENT_NOTIFICATIONS_POLICY_QUEUE) == null || + customConfigs.get(S3_EVENT_NOTIFICATIONS_POLICY_QUEUE).toString().trim().equals("")) { + throw new ConfigException(S3_EVENT_NOTIFICATIONS_POLICY_QUEUE + " cannot be empty. Got: '" + + customConfigs.get(S3_EVENT_NOTIFICATIONS_POLICY_QUEUE) + "'."); + } + this.queue = customConfigs.get(S3_EVENT_NOTIFICATIONS_POLICY_QUEUE).toString().trim(); + this.deleteMessages = Boolean.parseBoolean(customConfigs + .getOrDefault(S3_EVENT_NOTIFICATIONS_POLICY_DELETE_MESSAGES, "true").toString()); + } + + @Override + public Iterator listFiles(FileSystem fs) { + return sqs.receiveMessage(request).getMessages() + .stream() + .flatMap(message -> parseMessage(message).stream()) + .filter(record -> record.eventName.matches(eventNameRegex)) + .filter(record -> fs.getWorkingDirectory().toString().startsWith(getUriPrefix() + record.bucketName)) + .map(record -> { + Path path = new Path(getUriPrefix() + record.bucketName + "/", record.objectKey); + Optional metadata = Optional.empty(); + try { + RemoteIterator it = fs.listFiles(path, false); + if (it.hasNext()) { + LocatedFileStatus status = it.next(); + if (status.isFile()) metadata = Optional.of(toMetadata(status)); + } + } catch (Exception ioe) { + log.warn("{} Cannot get file at path '{}': {}", this, path, ioe.getMessage()); + } + if (deleteMessages) { + log.trace("{} Removing message with ID '{}'.", this, record.messageId); + sqs.deleteMessage(queueUrl, record.receiptHandle); + } + + return metadata; + }) + .flatMap(metadataOpt -> metadataOpt.map(Stream::of).orElseGet(Stream::empty)) + .iterator(); + } + + private List parseMessage(Message message) { + List events = new ArrayList<>(); + try { + JsonNode content = mapper.readTree(message.getBody()); + if (content.has("Type") && content.get("Type").asText().equals("Notification")) { + content = mapper.readTree(content.get("Message").asText()); + } + if (!content.has("Records")) { + return events; + } + for (JsonNode record : content.get("Records")) { + String eventName = record.get("eventName").asText(); + if (record.has("s3")) { + String bucketName = record.get("s3").get("bucket").get("name").asText(); + String objectKey = record.get("s3").get("object").get("key").asText(); + events.add(new EventRecord(message.getMessageId(), message.getReceiptHandle(), + eventName, bucketName, objectKey)); + } + } + } catch (JsonProcessingException jpe) { + log.debug("{} Ignoring event due to cannot be parsed. Value: '{}'.", this, message.getBody()); + } + return events; + } + + private static class EventRecord { + final String messageId; + final String receiptHandle; + final String eventName; + final String bucketName; + final String objectKey; + + public EventRecord(String messageId, String receiptHandle, String eventName, String bucketName, String objectKey) { + this.messageId = messageId; + this.receiptHandle = receiptHandle; + this.eventName = eventName; + this.bucketName = bucketName; + // cleaning spaces + this.objectKey = objectKey.replace("+", " "); + } + } + + protected String getUriPrefix() { + return "s3a://"; + } + + protected AmazonSQS getSqsClient() { + return AmazonSQSClientBuilder.defaultClient(); + } + + protected String getQueue() { + return queue; + } + + @Override + protected boolean isPolicyCompleted() { + return isShutDown; + } + + @Override + public void interrupt() { + shutdown(); + super.interrupt(); + } + + @Override + public void postCheck() { + time.sleep(pollSleepMs); + } + + @Override + public void close() throws IOException { + shutdown(); + super.close(); + } + + private void shutdown() { + isShutDown = true; + sqs.shutdown(); + } + +} diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/util/ReflectionUtils.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/util/ReflectionUtils.java index 9473837..d161fd5 100644 --- a/src/main/java/com/github/mmolimar/kafka/connect/fs/util/ReflectionUtils.java +++ b/src/main/java/com/github/mmolimar/kafka/connect/fs/util/ReflectionUtils.java @@ -26,7 +26,7 @@ public static Policy makePolicy(Class clazz, FsSourceTaskConfi private static T make(Class clazz, Object... args) { try { - Class[] constClasses = Arrays.stream(args).map(Object::getClass).toArray(Class[]::new); + Class[] constClasses = Arrays.stream(args).map(Object::getClass).toArray(Class[]::new); Constructor constructor = ConstructorUtils.getMatchingAccessibleConstructor(clazz, constClasses); return constructor.newInstance(args); diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/AbstractHdfsFsConfig.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/AbstractHdfsFsConfig.java index f3fef89..fa1322f 100644 --- a/src/test/java/com/github/mmolimar/kafka/connect/fs/AbstractHdfsFsConfig.java +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/AbstractHdfsFsConfig.java @@ -39,6 +39,6 @@ public URI getFsUri() { @Override public void close() throws IOException { fs.close(); - cluster.shutdown(true); + cluster.shutdown(true, true); } } diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/AgnosticFileReaderTest.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/AgnosticFileReaderTest.java index 1055b2a..5271086 100644 --- a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/AgnosticFileReaderTest.java +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/AgnosticFileReaderTest.java @@ -264,4 +264,25 @@ public String getFileExtension() { return "dt"; } } + + @Nested + class AgnosticBinaryFileReaderTest extends BinaryFileReaderTest { + + @Override + protected Map getReaderConfig() { + Map config = super.getReaderConfig(); + config.put(AgnosticFileReader.FILE_READER_AGNOSTIC_EXTENSIONS_BINARY, getFileExtension()); + return config; + } + + @Override + public Class getReaderClass() { + return AgnosticFileReader.class; + } + + @Override + public String getFileExtension() { + return "bin"; + } + } } diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/BinaryFileReaderTest.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/BinaryFileReaderTest.java new file mode 100644 index 0000000..60083d3 --- /dev/null +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/BinaryFileReaderTest.java @@ -0,0 +1,117 @@ +package com.github.mmolimar.kafka.connect.fs.file.reader; + +import com.github.mmolimar.kafka.connect.fs.FsSourceTaskConfig; +import org.apache.hadoop.fs.Path; +import org.apache.kafka.connect.data.Struct; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.HashMap; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.stream.IntStream; + +import static org.junit.jupiter.api.Assertions.*; + +public class BinaryFileReaderTest extends FileReaderTestBase { + + private static final String FILE_EXTENSION = "bin"; + + @Override + protected Path createDataFile(ReaderFsTestConfig fsConfig, Object... args) throws IOException { + File binaryFile = File.createTempFile("test-", "." + getFileExtension()); + byte[] content = "test".getBytes(); + Path path = new Path(new Path(fsConfig.getFsUri()), binaryFile.getName()); + Files.write(binaryFile.toPath(), content); + fsConfig.getFs().moveFromLocalFile(new Path(binaryFile.getAbsolutePath()), path); + IntStream.range(0, NUM_RECORDS).forEach(index -> fsConfig.offsetsByIndex().put(index, (long) 0)); + return path; + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void emptyFile(ReaderFsTestConfig fsConfig) throws IOException { + File tmp = File.createTempFile("test-", "." + getFileExtension()); + Path path = new Path(new Path(fsConfig.getFsUri()), tmp.getName()); + fsConfig.getFs().moveFromLocalFile(new Path(tmp.getAbsolutePath()), path); + FileReader reader = getReader(fsConfig.getFs(), path, getReaderConfig()); + assertFalse(reader.hasNext()); + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + @Override + public void readAllData(ReaderFsTestConfig fsConfig) { + FileReader reader = fsConfig.getReader(); + assertTrue(reader.hasNext()); + + int recordCount = 0; + while (reader.hasNext()) { + Struct record = reader.next(); + checkData(record, recordCount); + recordCount++; + } + assertEquals(1, recordCount, "The number of records in the file does not match"); + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void readAllDataInBatches(ReaderFsTestConfig fsConfig) { + Map config = getReaderConfig(); + int batchSize = 5; + config.put(FsSourceTaskConfig.FILE_READER_BATCH_SIZE, batchSize); + AbstractFileReader reader = (AbstractFileReader) getReader(fsConfig.getFs(), fsConfig.getDataFile(), config); + assertTrue(reader.hasNext()); + + int recordCount = 0; + while (reader.hasNextBatch()) { + reader.nextBatch(); + while (reader.hasNext()) { + Struct record = reader.next(); + checkData(record, recordCount); + recordCount++; + } + assertEquals(1, recordCount % batchSize); + } + assertThrows(NoSuchElementException.class, reader::nextBatch); + assertEquals(1, recordCount, "The number of records in the file does not match"); + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + @Disabled + public void invalidFileFormat(ReaderFsTestConfig fsConfig) throws IOException { + } + + @Override + protected Class getReaderClass() { + return BinaryFileReader.class; + } + + @Override + protected Map getReaderConfig() { + return new HashMap<>(); + } + + @Override + protected void checkData(Struct record, long index) { + assertAll( + () -> assertFalse(record.get("path").toString().isEmpty()), + () -> assertFalse(record.get("owner").toString().isEmpty()), + () -> assertFalse(record.get("group").toString().isEmpty()), + () -> assertEquals(record.getInt64("length"), 4L), + () -> assertNotNull(record.get("access_time")), + () -> assertNotNull(record.get("modification_time")), + () -> assertEquals(new String(record.getBytes("content")), "test") + ); + } + + @Override + protected String getFileExtension() { + return FILE_EXTENSION; + } +} diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/ParquetFileReaderTest.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/ParquetFileReaderTest.java index 4531808..6e851f2 100644 --- a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/ParquetFileReaderTest.java +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/ParquetFileReaderTest.java @@ -54,7 +54,7 @@ protected Path createDataFile(ReaderFsTestConfig fsConfig, Object... args) throw FileSystem fs = fsConfig.getFs(); File parquetFile = File.createTempFile("test-", "." + getFileExtension()); - try (ParquetWriter writer = AvroParquetWriter.builder(new Path(parquetFile.toURI())) + try (ParquetWriter writer = AvroParquetWriter.builder(new Path(parquetFile.toURI())) .withConf(fs.getConf()).withWriteMode(ParquetFileWriter.Mode.OVERWRITE).withSchema(readerSchema).build()) { IntStream.range(0, NUM_RECORDS).forEach(index -> { GenericRecord datum = new GenericData.Record(readerSchema); diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/UnivocityFileReaderTest.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/UnivocityFileReaderTest.java index 49449ea..a4571c0 100644 --- a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/UnivocityFileReaderTest.java +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/UnivocityFileReaderTest.java @@ -20,7 +20,7 @@ import static org.junit.jupiter.api.Assertions.*; -abstract class UnivocityFileReaderTest extends FileReaderTestBase { +abstract class UnivocityFileReaderTest> extends FileReaderTestBase { protected static final String FIELD_COLUMN1 = "column_1"; protected static final String FIELD_COLUMN2 = "column_2"; @@ -111,7 +111,7 @@ public void readAllDataWithCustomHeaders(ReaderFsTestConfig fsConfig) throws IOE int recordCount = 0; while (reader.hasNext()) { Struct record = reader.next(); - checkDataWithHeaders(record, recordCount, headers); + checkDataWithHeaders(record, headers); recordCount++; } assertEquals(NUM_RECORDS, recordCount, "The number of records in the file does not match"); @@ -265,6 +265,7 @@ public void invalidFileEncoding(ReaderFsTestConfig fsConfig) { } @Override + @SuppressWarnings("unchecked") protected Class getReaderClass() { return (Class) ((ParameterizedType) this.getClass().getGenericSuperclass()).getActualTypeArguments()[0]; } @@ -284,7 +285,7 @@ protected void checkData(Struct record, long index) { ); } - protected void checkDataWithHeaders(Struct record, long index, String[] headers) { + protected void checkDataWithHeaders(Struct record, String[] headers) { assertAll(() -> assertEquals((byte) 2, record.get(headers[0])), () -> assertEquals((short) 4, record.get(headers[1])), () -> assertEquals(8, record.get(headers[2])), diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/CronPolicyTest.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/CronPolicyTest.java index ac5e264..ef83d3b 100644 --- a/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/CronPolicyTest.java +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/CronPolicyTest.java @@ -50,6 +50,7 @@ public void execPolicyAlreadyEnded(PolicyFsTestConfig fsConfig) throws IOExcepti @ParameterizedTest @MethodSource("fileSystemConfigProvider") + @SuppressWarnings("unchecked") public void invalidCronExpression(PolicyFsTestConfig fsConfig) { Map originals = fsConfig.getSourceTaskConfig().originalsStrings(); originals.put(CronPolicy.CRON_POLICY_EXPRESSION, "invalid"); @@ -69,6 +70,7 @@ public void invalidCronExpression(PolicyFsTestConfig fsConfig) { @ParameterizedTest @MethodSource("fileSystemConfigProvider") + @SuppressWarnings("unchecked") public void invalidEndDate(PolicyFsTestConfig fsConfig) { Map originals = fsConfig.getSourceTaskConfig().originalsStrings(); originals.put(CronPolicy.CRON_POLICY_END_DATE, "invalid"); @@ -88,6 +90,7 @@ public void invalidEndDate(PolicyFsTestConfig fsConfig) { @ParameterizedTest @MethodSource("fileSystemConfigProvider") + @SuppressWarnings("unchecked") public void canBeInterrupted(PolicyFsTestConfig fsConfig) throws IOException { try (Policy policy = ReflectionUtils.makePolicy((Class) fsConfig.getSourceTaskConfig() .getClass(FsSourceTaskConfig.POLICY_CLASS), diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/HdfsFileWatcherPolicyTest.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/HdfsFileWatcherPolicyTest.java index f04db50..26d8dda 100644 --- a/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/HdfsFileWatcherPolicyTest.java +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/HdfsFileWatcherPolicyTest.java @@ -81,6 +81,7 @@ public void execPolicyAlreadyEnded(PolicyFsTestConfig fsConfig) throws IOExcepti @ParameterizedTest @MethodSource("fileSystemConfigProvider") + @SuppressWarnings("unchecked") public void notReachableFileSystem(PolicyFsTestConfig fsConfig) throws InterruptedException, IOException { Map originals = fsConfig.getSourceTaskConfig().originalsStrings(); originals.put(FsSourceTaskConfig.FS_URIS, "hdfs://localhost:65432/data"); @@ -101,6 +102,7 @@ public void notReachableFileSystem(PolicyFsTestConfig fsConfig) throws Interrupt @ParameterizedTest @MethodSource("fileSystemConfigProvider") + @SuppressWarnings("unchecked") public void invalidPollTime(PolicyFsTestConfig fsConfig) { Map originals = fsConfig.getSourceTaskConfig().originalsStrings(); originals.put(HdfsFileWatcherPolicy.HDFS_FILE_WATCHER_POLICY_POLL_MS, "invalid"); @@ -120,6 +122,7 @@ public void invalidPollTime(PolicyFsTestConfig fsConfig) { @ParameterizedTest @MethodSource("fileSystemConfigProvider") + @SuppressWarnings("unchecked") public void invalidRetryTime(PolicyFsTestConfig fsConfig) { Map originals = fsConfig.getSourceTaskConfig().originalsStrings(); originals.put(HdfsFileWatcherPolicy.HDFS_FILE_WATCHER_POLICY_RETRY_MS, "invalid"); diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/PolicyTestBase.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/PolicyTestBase.java index b9ac7dc..7c97aee 100644 --- a/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/PolicyTestBase.java +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/PolicyTestBase.java @@ -48,6 +48,7 @@ public static void finishFs() throws IOException { } @BeforeEach + @SuppressWarnings("unchecked") public void initPolicy() { for (PolicyFsTestConfig fsConfig : TEST_FILE_SYSTEMS) { FsSourceTaskConfig sourceTaskConfig = buildSourceTaskConfig(fsConfig.getDirectories()); @@ -83,6 +84,7 @@ public void invalidArgs(PolicyFsTestConfig fsConfig) { @ParameterizedTest @MethodSource("fileSystemConfigProvider") + @SuppressWarnings("unchecked") public void invalidConfig(PolicyFsTestConfig fsConfig) { assertThrows(ConfigException.class, () -> ReflectionUtils.makePolicy((Class) fsConfig.getSourceTaskConfig() @@ -92,6 +94,7 @@ public void invalidConfig(PolicyFsTestConfig fsConfig) { @ParameterizedTest @MethodSource("fileSystemConfigProvider") + @SuppressWarnings("unchecked") public void invalidConfigCleanup(PolicyFsTestConfig fsConfig) { Map originals = fsConfig.getSourceTaskConfig().originalsStrings(); originals.put(FsSourceTaskConfig.POLICY_CLEANUP, "invalid"); @@ -155,6 +158,7 @@ public void oneFilePerFs(PolicyFsTestConfig fsConfig) throws IOException, Interr @ParameterizedTest @MethodSource("fileSystemConfigProvider") + @SuppressWarnings("unchecked") public void oneFilePerFsWithMoveCleanup(PolicyFsTestConfig fsConfig) throws IOException { FileSystem fs = fsConfig.getFs(); @@ -195,6 +199,7 @@ public void oneFilePerFsWithMoveCleanup(PolicyFsTestConfig fsConfig) throws IOEx @ParameterizedTest @MethodSource("fileSystemConfigProvider") + @SuppressWarnings("unchecked") public void oneFilePerFsWithDeleteCleanup(PolicyFsTestConfig fsConfig) throws IOException { FileSystem fs = fsConfig.getFs(); @@ -258,6 +263,7 @@ public void execPolicyAlreadyEnded(PolicyFsTestConfig fsConfig) throws IOExcepti @ParameterizedTest @MethodSource("fileSystemConfigProvider") + @SuppressWarnings("unchecked") public void dynamicURIs(PolicyFsTestConfig fsConfig) throws IOException { Path dynamic = new Path(fsConfig.getFsUri().toString(), "${G}/${yyyy}/${MM}/${W}"); fsConfig.getFs().create(dynamic); @@ -287,6 +293,7 @@ public void dynamicURIs(PolicyFsTestConfig fsConfig) throws IOException { @ParameterizedTest @MethodSource("fileSystemConfigProvider") + @SuppressWarnings("unchecked") public void invalidDynamicURIs(PolicyFsTestConfig fsConfig) throws IOException { Path dynamic = new Path(fsConfig.getFsUri().toString(), "${yyyy}/${MM}/${mmmmmmm}"); fsConfig.getFs().create(dynamic); @@ -308,6 +315,7 @@ public void invalidDynamicURIs(PolicyFsTestConfig fsConfig) throws IOException { @ParameterizedTest @MethodSource("fileSystemConfigProvider") + @SuppressWarnings("unchecked") public void execPolicyBatchesFiles(PolicyFsTestConfig fsConfig) throws IOException, InterruptedException { Map originals = fsConfig.getSourceTaskConfig().originalsStrings(); originals.put(FsSourceTaskConfig.POLICY_BATCH_SIZE, "1"); diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/S3EventNotificationsPolicyTest.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/S3EventNotificationsPolicyTest.java new file mode 100644 index 0000000..f1e5370 --- /dev/null +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/S3EventNotificationsPolicyTest.java @@ -0,0 +1,348 @@ +package com.github.mmolimar.kafka.connect.fs.policy; + +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.model.GetQueueUrlResult; +import com.amazonaws.services.sqs.model.Message; +import com.amazonaws.services.sqs.model.ReceiveMessageRequest; +import com.amazonaws.services.sqs.model.ReceiveMessageResult; +import com.github.mmolimar.kafka.connect.fs.FsSourceTaskConfig; +import com.github.mmolimar.kafka.connect.fs.file.FileMetadata; +import com.github.mmolimar.kafka.connect.fs.file.reader.TextFileReader; +import com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.errors.IllegalWorkerStateException; +import org.easymock.Capture; +import org.easymock.CaptureType; +import org.easymock.EasyMock; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.powermock.api.easymock.PowerMock; + +import java.io.IOException; +import java.util.*; + +import static org.junit.jupiter.api.Assertions.*; + +public class S3EventNotificationsPolicyTest extends PolicyTestBase { + + static { + TEST_FILE_SYSTEMS = Collections.singletonList( + new LocalFsConfig() + ); + } + + @BeforeAll + public static void initFs() throws IOException { + for (PolicyFsTestConfig fsConfig : TEST_FILE_SYSTEMS) { + fsConfig.initFs(); + } + } + + @Override + protected FsSourceTaskConfig buildSourceTaskConfig(List directories) { + Map cfg = new HashMap() {{ + String[] uris = directories.stream().map(Path::toString) + .toArray(String[]::new); + put(FsSourceTaskConfig.FS_URIS, String.join(",", uris)); + put(FsSourceTaskConfig.TOPIC, "topic_test"); + put(FsSourceTaskConfig.POLICY_CLASS, S3EventNotificationsPolicyMock.class.getName()); + put(FsSourceTaskConfig.FILE_READER_CLASS, TextFileReader.class.getName()); + put(FsSourceTaskConfig.POLICY_REGEXP, "^[0-9]*\\.txt$"); + put(FsSourceTaskConfig.POLICY_PREFIX_FS + "dfs.data.dir", "test"); + put(FsSourceTaskConfig.POLICY_PREFIX_FS + "fs.default.name", "hdfs://test"); + put(S3EventNotificationsPolicy.S3_EVENT_NOTIFICATIONS_POLICY_QUEUE, "test"); + }}; + return new FsSourceTaskConfig(cfg); + } + + // This policy does not throw any exception. Just stop watching those nonexistent dirs + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + @Override + public void invalidDirectory(PolicyFsTestConfig fsConfig) throws IOException { + for (Path dir : fsConfig.getDirectories()) { + fsConfig.getFs().delete(dir, true); + } + try { + fsConfig.getPolicy().execute(); + } finally { + for (Path dir : fsConfig.getDirectories()) { + fsConfig.getFs().mkdirs(dir); + } + } + } + + // This policy never ends. We have to interrupt it + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + @Override + public void execPolicyAlreadyEnded(PolicyFsTestConfig fsConfig) throws IOException { + fsConfig.getPolicy().execute(); + assertFalse(fsConfig.getPolicy().hasEnded()); + fsConfig.getPolicy().interrupt(); + assertTrue(fsConfig.getPolicy().hasEnded()); + assertThrows(IllegalWorkerStateException.class, () -> fsConfig.getPolicy().execute()); + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + @SuppressWarnings("unchecked") + public void invalidPollTime(PolicyFsTestConfig fsConfig) { + Map originals = fsConfig.getSourceTaskConfig().originalsStrings(); + originals.put(S3EventNotificationsPolicy.S3_EVENT_NOTIFICATIONS_POLICY_POLL_MS, "invalid"); + FsSourceTaskConfig cfg = new FsSourceTaskConfig(originals); + assertThrows(ConnectException.class, () -> + ReflectionUtils.makePolicy((Class) fsConfig.getSourceTaskConfig() + .getClass(FsSourceTaskConfig.POLICY_CLASS), cfg)); + assertThrows(ConfigException.class, () -> { + try { + ReflectionUtils.makePolicy((Class) fsConfig.getSourceTaskConfig() + .getClass(FsSourceTaskConfig.POLICY_CLASS), cfg); + } catch (Exception e) { + throw e.getCause(); + } + }); + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + @SuppressWarnings("unchecked") + public void invalidMaxMessages(PolicyFsTestConfig fsConfig) { + Map originals = fsConfig.getSourceTaskConfig().originalsStrings(); + originals.put(S3EventNotificationsPolicy.S3_EVENT_NOTIFICATIONS_POLICY_MAX_MESSAGES, "invalid"); + FsSourceTaskConfig cfgInvalid = new FsSourceTaskConfig(originals); + assertThrows(ConnectException.class, () -> + ReflectionUtils.makePolicy((Class) fsConfig.getSourceTaskConfig() + .getClass(FsSourceTaskConfig.POLICY_CLASS), cfgInvalid)); + assertThrows(ConfigException.class, () -> { + try { + ReflectionUtils.makePolicy((Class) fsConfig.getSourceTaskConfig() + .getClass(FsSourceTaskConfig.POLICY_CLASS), cfgInvalid); + } catch (Exception e) { + throw e.getCause(); + } + }); + + originals.put(S3EventNotificationsPolicy.S3_EVENT_NOTIFICATIONS_POLICY_MAX_MESSAGES, "100"); + FsSourceTaskConfig cfgMaxMessages = new FsSourceTaskConfig(originals); + assertThrows(ConnectException.class, () -> + ReflectionUtils.makePolicy((Class) fsConfig.getSourceTaskConfig() + .getClass(FsSourceTaskConfig.POLICY_CLASS), cfgMaxMessages)); + assertThrows(ConfigException.class, () -> { + try { + ReflectionUtils.makePolicy((Class) fsConfig.getSourceTaskConfig() + .getClass(FsSourceTaskConfig.POLICY_CLASS), cfgMaxMessages); + } catch (Exception e) { + throw e.getCause(); + } + }); + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + @SuppressWarnings("unchecked") + public void customMaxMessages(PolicyFsTestConfig fsConfig) { + Map originals = fsConfig.getSourceTaskConfig().originalsStrings(); + originals.put(S3EventNotificationsPolicy.S3_EVENT_NOTIFICATIONS_POLICY_MAX_MESSAGES, "5"); + FsSourceTaskConfig cfg = new FsSourceTaskConfig(originals); + assertDoesNotThrow(() -> + ReflectionUtils.makePolicy((Class) fsConfig.getSourceTaskConfig() + .getClass(FsSourceTaskConfig.POLICY_CLASS), cfg)); + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + @SuppressWarnings("unchecked") + public void invalidVisibilityTimeout(PolicyFsTestConfig fsConfig) { + Map originals = fsConfig.getSourceTaskConfig().originalsStrings(); + originals.put(S3EventNotificationsPolicy.S3_EVENT_NOTIFICATIONS_POLICY_VISIBILITY_TIMEOUT, "invalid"); + FsSourceTaskConfig cfg = new FsSourceTaskConfig(originals); + assertThrows(ConnectException.class, () -> + ReflectionUtils.makePolicy((Class) fsConfig.getSourceTaskConfig() + .getClass(FsSourceTaskConfig.POLICY_CLASS), cfg)); + assertThrows(ConfigException.class, () -> { + try { + ReflectionUtils.makePolicy((Class) fsConfig.getSourceTaskConfig() + .getClass(FsSourceTaskConfig.POLICY_CLASS), cfg); + } catch (Exception e) { + throw e.getCause(); + } + }); + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + @SuppressWarnings("unchecked") + public void customVisibilityTimeout(PolicyFsTestConfig fsConfig) { + Map originals = fsConfig.getSourceTaskConfig().originalsStrings(); + originals.put(S3EventNotificationsPolicy.S3_EVENT_NOTIFICATIONS_POLICY_VISIBILITY_TIMEOUT, "1"); + FsSourceTaskConfig cfg = new FsSourceTaskConfig(originals); + assertDoesNotThrow(() -> + ReflectionUtils.makePolicy((Class) fsConfig.getSourceTaskConfig() + .getClass(FsSourceTaskConfig.POLICY_CLASS), cfg)); + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + @SuppressWarnings("unchecked") + public void withoutQueue(PolicyFsTestConfig fsConfig) { + Map originals = fsConfig.getSourceTaskConfig().originalsStrings(); + originals.remove(S3EventNotificationsPolicy.S3_EVENT_NOTIFICATIONS_POLICY_QUEUE); + FsSourceTaskConfig cfg = new FsSourceTaskConfig(originals); + assertThrows(ConnectException.class, () -> + ReflectionUtils.makePolicy((Class) fsConfig.getSourceTaskConfig() + .getClass(FsSourceTaskConfig.POLICY_CLASS), cfg)); + assertThrows(ConfigException.class, () -> { + try { + ReflectionUtils.makePolicy((Class) fsConfig.getSourceTaskConfig() + .getClass(FsSourceTaskConfig.POLICY_CLASS), cfg); + } catch (Exception e) { + throw e.getCause(); + } + }); + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + @Disabled + public void execPolicyBatchesFiles(PolicyFsTestConfig fsConfig) { + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + @Disabled + public void recursiveDirectory(PolicyFsTestConfig fsConfig) { + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + @Override + public void oneFilePerFs(PolicyFsTestConfig fsConfig) throws IOException, InterruptedException { + FileSystem fs = fsConfig.getFs(); + Path testDir = new Path(System.getProperty("java.io.tmpdir")); + for (Path dir : fsConfig.getDirectories()) { + testDir = new Path(dir.getParent().getParent(), "test"); + fs.mkdirs(testDir); + fs.createNewFile(new Path(testDir, "0123456789.txt")); + + // we wait till FS has registered the files + Thread.sleep(5000); + } + Iterator it = fsConfig.getPolicy().execute(); + assertTrue(it.hasNext()); + it.next(); + assertTrue(it.hasNext()); + it.next(); + assertFalse(it.hasNext()); + fs.delete(testDir, true); + } + + public static class S3EventNotificationsPolicyMock extends S3EventNotificationsPolicy { + + public S3EventNotificationsPolicyMock(FsSourceTaskConfig conf) throws IOException { + super(conf); + } + + @Override + protected String getUriPrefix() { + String prefix = "file:" + System.getProperty("java.io.tmpdir"); + return prefix.endsWith("/") ? prefix : prefix + "/"; + } + + @Override + protected AmazonSQS getSqsClient() { + AmazonSQS sqs = PowerMock.createMock(AmazonSQS.class); + + EasyMock.expect(sqs.getQueueUrl(getQueue())) + .andReturn(new GetQueueUrlResult().withQueueUrl(getQueue())) + .anyTimes(); + + ReceiveMessageResult messageResult = new ReceiveMessageResult(); + List messages = new ArrayList<>(); + messages.add(new Message().withBody(MESSAGE_RAW)); + messages.add(new Message().withBody(MESSAGE_SNS)); + messages.add(new Message().withBody(MESSAGE_NO_RECORDS)); + messages.add(new Message().withBody(MESSAGE_INVALID)); + messageResult.setMessages(messages); + Capture receiveRequest = Capture.newInstance(CaptureType.ALL); + EasyMock.expect(sqs.receiveMessage(EasyMock.capture(receiveRequest))) + .andReturn(messageResult) + .anyTimes(); + + Capture anyString = Capture.newInstance(CaptureType.ALL); + EasyMock.expect(sqs.deleteMessage(EasyMock.capture(anyString), EasyMock.capture(anyString))) + .andAnswer(() -> null) + .anyTimes(); + + sqs.shutdown(); + EasyMock.expectLastCall().andVoid().anyTimes(); + + EasyMock.checkOrder(sqs, false); + EasyMock.replay(sqs); + + return sqs; + } + + private static final String MESSAGE_RAW = "{\n" + + " \"Records\" : [ {\n" + + " \"eventVersion\" : \"2.1\",\n" + + " \"eventSource\" : \"aws:s3\",\n" + + " \"awsRegion\" : \"us-west-1\",\n" + + " \"eventTime\" : \"2021-01-01T01:01:01.001Z\",\n" + + " \"eventName\" : \"ObjectCreated:Put\",\n" + + " \"userIdentity\" : {\n" + + " \"principalId\" : \"AWS:AROA5EJEASFBABPAO11B11:test\"\n" + + " },\n" + + " \"requestParameters\" : {\n" + + " \"sourceIPAddress\" : \"127.0.0.1\"\n" + + " },\n" + + " \"responseElements\" : {\n" + + " \"x-amz-request-id\" : \"ACA811A1BD123C0F\",\n" + + " \"x-amz-id-2\" : \"2r0F/Yw4hv6Sweqq91fEZlGam0tr4ScKMRWZA1LjOlC1RW/h4Xz45asxwoDpHDAK9f1ba\"\n" + + " },\n" + + " \"s3\" : {\n" + + " \"s3SchemaVersion\" : \"1.0\",\n" + + " \"configurationId\" : \"test\",\n" + + " \"bucket\" : {\n" + + " \"name\" : \"test\",\n" + + " \"ownerIdentity\" : {\n" + + " \"principalId\" : \"ACA811A1BD123C0F\"\n" + + " },\n" + + " \"arn\" : \"arn:aws:s3:::test\"\n" + + " },\n" + + " \"object\" : {\n" + + " \"key\" : \"0123456789.txt\",\n" + + " \"size\" : 0,\n" + + " \"eTag\" : \"d41d8cd98f13g204e9239115xf8427e\",\n" + + " \"sequencer\" : \"00211B24AA43A01199\"\n" + + " }\n" + + " }\n" + + " } ]\n" + + "}\n"; + + private static final String MESSAGE_SNS = "{\n" + + " \"Type\": \"Notification\",\n" + + " \"MessageId\": \"a3091de2-0f4d-51e8-a6fb-456737913670\",\n" + + " \"TopicArn\": \"arn:aws:sns:us-west-1:953167085750:test\",\n" + + " \"Subject\": \"Amazon S3 Notification\",\n" + + " \"Message\": \"" + "{\\\"Records\\\":[{\\\"eventVersion\\\":\\\"2.1\\\",\\\"eventSource\\\":\\\"aws:s3\\\",\\\"awsRegion\\\":\\\"us-west-1\\\",\\\"eventTime\\\":\\\"2021-01-01T01:01:01.001Z\\\",\\\"eventName\\\":\\\"ObjectCreated:Put\\\",\\\"userIdentity\\\":{\\\"principalId\\\":\\\"AWS:AROA5EJEASFBABPAO7BB11:test\\\"},\\\"requestParameters\\\":{\\\"sourceIPAddress\\\":\\\"127.0.0.1\\\"},\\\"responseElements\\\":{\\\"x-amz-request-id\\\":\\\"ACA8110A0D438C0F\\\",\\\"x-amz-id-2\\\":\\\"2r0F\\/Yi6hv6Sweqq91fEZlGam0tr4ScKMRWZA1LjOlC1RW\\/h4XzYjxHwoDpHDAK9f1ba\\\"},\\\"s3\\\":{\\\"s3SchemaVersion\\\":\\\"1.0\\\",\\\"configurationId\\\":\\\"test\\\",\\\"bucket\\\":{\\\"name\\\":\\\"test\\\",\\\"ownerIdentity\\\":{\\\"principalId\\\":\\\"ACA8110A0D438C0F\\\"},\\\"arn\\\":\\\"arn:aws:s3:::test\\\"},\\\"object\\\":{\\\"key\\\":\\\"f1\\/file.txt\\\",\\\"size\\\":0,\\\"eTag\\\":\\\"d41d8cd98f11b204e9239124cf8427e\\\",\\\"sequencer\\\":\\\"00200C24AA43A01199\\\"}}}]}" + "\",\n" + + " \"Timestamp\": \"2020-01-01T01:01:01.0001Z\",\n" + + " \"SignatureVersion\": \"1\",\n" + + " \"Signature\": \"pXQWpasLFJ3ecRAX0rcqwevxZpBSErUgcTqqIS4VAJtdQ7y5fqjEg+veJHIpBIBtd2bEUN7JMNkm8\",\n" + + " \"SigningCertURL\": \"https://sns.us-west-1.amazonaws.com/SimpleNotificationService-010a517c1823535cd94bdb92c3asdf.pem\",\n" + + " \"UnsubscribeURL\": \"https://sns.us-west-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-west-1:953167085750:test:8419c54c-316d-1234-bc6c-acbcede97e47\"\n" + + "}"; + + private static final String MESSAGE_NO_RECORDS = "{}"; + + private static final String MESSAGE_INVALID = "invalid"; + + } + + +} diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/SimplePolicyTest.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/SimplePolicyTest.java index f70e184..f735f34 100644 --- a/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/SimplePolicyTest.java +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/SimplePolicyTest.java @@ -38,6 +38,7 @@ protected FsSourceTaskConfig buildSourceTaskConfig(List directories) { @ParameterizedTest @MethodSource("fileSystemConfigProvider") + @SuppressWarnings("unchecked") public void execPolicyEndsAfterBatching(PolicyFsTestConfig fsConfig) throws IOException, InterruptedException { Map originals = fsConfig.getSourceTaskConfig().originalsStrings(); originals.put(FsSourceTaskConfig.POLICY_BATCH_SIZE, "1"); diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/SleepyPolicyTest.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/SleepyPolicyTest.java index 8f340b0..91ad85e 100644 --- a/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/SleepyPolicyTest.java +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/SleepyPolicyTest.java @@ -38,6 +38,7 @@ protected FsSourceTaskConfig buildSourceTaskConfig(List directories) { @ParameterizedTest @MethodSource("fileSystemConfigProvider") + @SuppressWarnings("unchecked") public void invalidSleepTime(PolicyFsTestConfig fsConfig) { Map originals = fsConfig.getSourceTaskConfig().originalsStrings(); originals.put(SleepyPolicy.SLEEPY_POLICY_SLEEP_MS, "invalid"); @@ -57,6 +58,7 @@ public void invalidSleepTime(PolicyFsTestConfig fsConfig) { @ParameterizedTest @MethodSource("fileSystemConfigProvider") + @SuppressWarnings("unchecked") public void invalidMaxExecs(PolicyFsTestConfig fsConfig) { Map originals = fsConfig.getSourceTaskConfig().originalsStrings(); originals.put(SleepyPolicy.SLEEPY_POLICY_MAX_EXECS, "invalid"); @@ -76,6 +78,7 @@ public void invalidMaxExecs(PolicyFsTestConfig fsConfig) { @ParameterizedTest @MethodSource("fileSystemConfigProvider") + @SuppressWarnings("unchecked") public void invalidSleepFraction(PolicyFsTestConfig fsConfig) { Map originals = fsConfig.getSourceTaskConfig().originalsStrings(); originals.put(SleepyPolicy.SLEEPY_POLICY_SLEEP_FRACTION, "invalid"); @@ -95,6 +98,7 @@ public void invalidSleepFraction(PolicyFsTestConfig fsConfig) { @ParameterizedTest @MethodSource("fileSystemConfigProvider") + @SuppressWarnings("unchecked") public void sleepExecution(PolicyFsTestConfig fsConfig) throws IOException { Map tConfig = fsConfig.getSourceTaskConfig().originalsStrings(); tConfig.put(SleepyPolicy.SLEEPY_POLICY_SLEEP_MS, "1000"); @@ -113,6 +117,7 @@ public void sleepExecution(PolicyFsTestConfig fsConfig) throws IOException { @ParameterizedTest @MethodSource("fileSystemConfigProvider") + @SuppressWarnings("unchecked") public void defaultExecutions(PolicyFsTestConfig fsConfig) throws IOException { Map tConfig = fsConfig.getSourceTaskConfig().originalsStrings(); tConfig.put(SleepyPolicy.SLEEPY_POLICY_SLEEP_MS, "1");