Skip to content

Commit

Permalink
Merge pull request #82 from mmolimar/develop
Browse files Browse the repository at this point in the history
Relese version 1.3.0
  • Loading branch information
mmolimar authored Feb 16, 2021
2 parents 98c5641 + 79f6e4f commit 6b03a6d
Show file tree
Hide file tree
Showing 27 changed files with 1,082 additions and 50 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM confluentinc/cp-kafka-connect-base:5.5.1
FROM confluentinc/cp-kafka-connect-base:6.1.0

ARG PROJECT_VERSION
ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"
Expand Down
8 changes: 4 additions & 4 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: '3'
services:
cp-zookeeper:
image: confluentinc/cp-zookeeper:5.5.1
image: confluentinc/cp-zookeeper:6.1.0
hostname: zookeeper
container_name: zookeeper
ports:
Expand All @@ -11,7 +11,7 @@ services:
ZOOKEEPER_TICK_TIME: 2000

cp-kafka:
image: confluentinc/cp-kafka:5.5.1
image: confluentinc/cp-kafka:6.1.0
hostname: kafka
container_name: kafka
depends_on:
Expand All @@ -32,7 +32,7 @@ services:
CONFLUENT_METRICS_ENABLE: 'false'

cp-schema-registry:
image: confluentinc/cp-schema-registry:5.5.1
image: confluentinc/cp-schema-registry:6.1.0
hostname: schema-registry
container_name: schema-registry
depends_on:
Expand All @@ -45,7 +45,7 @@ services:
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'

connect-fs:
image: mmolimar/kafka-connect-fs:1.2.0
image: mmolimar/kafka-connect-fs:1.3.0
container_name: connect
depends_on:
- cp-kafka
Expand Down
4 changes: 2 additions & 2 deletions docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@
# built documents.
#
# The short X.Y version.
version = '1.2'
version = '1.3'
# The full version, including alpha/beta/rc tags.
release = '1.2'
release = '1.3'

# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
Expand Down
96 changes: 93 additions & 3 deletions docs/source/config_options.rst
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@ General config properties for this connector.
If you want to ingest data from S3, you can add credentials with:
``policy.fs.fs.s3a.access.key=<ACCESS_KEY>``
and
``policy.fs.fs.s3a.secret.key=<SECRET_KEY>``

``policy.fs.fs.s3a.secret.key=<SECRET_KEY>``.
Also, in case you want to configure a custom credentials provider, you should use
``policy.fs.fs.s3a.aws.credentials.provider=<CLASS>`` property.

``topic``
Topic in which copy data to.

Expand Down Expand Up @@ -224,7 +226,7 @@ HDFS file watcher
In order to configure custom properties for this policy, the name you must use is ``hdfs_file_watcher``.

``policy.hdfs_file_watcher.poll``
Time to wait until the records retrieved from the file watcher will be sent to the source task.
Time to wait (in milliseconds) until the records retrieved from the file watcher will be sent to the source task.

* Type: long
* Default: ``5000``
Expand All @@ -237,6 +239,52 @@ In order to configure custom properties for this policy, the name you must use i
* Default: ``20000``
* Importance: medium

.. _config_options-policies-s3events:

S3 event notifications
--------------------------------------------

In order to configure custom properties for this policy, the name you must use is ``s3_event_notifications``.

``policy.s3_event_notifications.queue``
SQS queue name to retrieve messages from.

* Type: string
* Importance: high

``policy.s3_event_notifications.poll``
Time to wait (in milliseconds) until the records retrieved from the queue will be sent to the source task.

* Type: long
* Default: ``5000``
* Importance: medium

``policy.s3_event_notifications.event_regex``
Regular expression to filter event based on their types.

* Type: string
* Default: ``.*``
* Importance: medium

``policy.s3_event_notifications.delete_messages``
If messages from SQS should be removed after reading them.

* Type: boolean
* Default: ``true``
* Importance: medium

``policy.s3_event_notifications.max_messages``
Maximum number of messages to retrieve at a time (must be between 1 and 10).

* Type: int
* Importance: medium

``policy.s3_event_notifications.visibility_timeout``
Duration (in seconds) that the received messages are hidden from subsequent retrieve requests.

* Type: int
* Importance: low

.. _config_options-filereaders:

File readers
Expand Down Expand Up @@ -357,6 +405,13 @@ In order to configure custom properties for this reader, the name you must use i
* Default: ``true``
* Importance: medium

``file_reader.cobol.reader.is_text``
If line ending characters will be used (LF / CRLF) as the record separator.

* Type: boolean
* Default: ``false``
* Importance: medium

``file_reader.cobol.reader.ebcdic_code_page``
Code page to be used for EBCDIC to ASCII / Unicode conversions.

Expand Down Expand Up @@ -448,6 +503,13 @@ In order to configure custom properties for this reader, the name you must use i
* Default: ``false``
* Importance: low

``file_reader.cobol.reader.record_length``
Specifies the length of the record disregarding the copybook record size. Implied the file has fixed record length.

* Type: int
* Default: ``null``
* Importance: low

``file_reader.cobol.reader.length_field_name``
The name for a field that contains the record length. If not set, the copybook record length will be used.

Expand Down Expand Up @@ -539,20 +601,41 @@ In order to configure custom properties for this reader, the name you must use i
* Default: ``null``
* Importance: low

``file_reader.cobol.reader.record_extractor``
Parser to be used to parse records.

* Type: string
* Default: ``null``
* Importance: low

``file_reader.cobol.reader.rhp_additional_info``
Extra option to be passed to a custom record header parser.

* Type: string
* Default: ``null``
* Importance: low

``file_reader.cobol.reader.re_additional_info``
A string provided for the raw record extractor.

* Type: string
* Default: ````
* Importance: low

``file_reader.cobol.reader.input_file_name_column``
A column name to add to each record containing the input file name.

* Type: string
* Default: ````
* Importance: low

.. _config_options-filereaders-binary:

Binary
--------------------------------------------

There are no extra configuration options for this file reader.

.. _config_options-filereaders-csv:

CSV
Expand Down Expand Up @@ -1258,6 +1341,13 @@ To configure custom properties for this reader, the name you must use is ``agnos
* Default: ``dat``
* Importance: medium

``file_reader.agnostic.extensions.binary``
A comma-separated string list with the accepted extensions for binary files.

* Type: string[]
* Default: ``bin``
* Importance: medium

``file_reader.agnostic.extensions.csv``
A comma-separated string list with the accepted extensions for CSV files.

Expand Down
1 change: 1 addition & 0 deletions docs/source/connector.rst
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ The are several file readers included which can read the following file formats:
* ORC.
* SequenceFile.
* Cobol / EBCDIC.
* Other binary files.
* CSV.
* TSV.
* Fixed-width.
Expand Down
21 changes: 21 additions & 0 deletions docs/source/filereaders.rst
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,26 @@ translate it into a Kafka message with the schema.

More information about properties of this file reader :ref:`here<config_options-filereaders-cobol>`.

Binary
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

All other kind of binary files can be ingested using this reader.

It just extracts the content plus some metadata such as: path, file owner, file group, length, access time,
and modification time.

Each message will contain the following schema:

* ``path``: File path (string).
* ``owner``: Owner of the file. (string).
* ``group``: Group associated with the file. (string).
* ``length``: Length of this file, in bytes. (long).
* ``access_time``: Access time of the file. (long).
* ``modification_time``: Modification time of the file (long).
* ``content``: Content of the file (bytes).

More information about properties of this file reader :ref:`here<config_options-filereaders-binary>`.

CSV
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Expand Down Expand Up @@ -153,6 +173,7 @@ Default extensions for each format (configurable):
* ORC: ``.orc``
* SequenceFile: ``.seq``
* Cobol / EBCDIC: ``.dat``
* Other binary files: ``.bin``
* CSV: ``.csv``
* TSV: ``.tsv``
* FixedWidth: ``.fixed``
Expand Down
12 changes: 12 additions & 0 deletions docs/source/policies.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,15 @@ You can learn more about the properties of this policy :ref:`here<config_options
.. attention:: The URIs included in the general property ``fs.uris`` will be filtered and only those
ones which start with the prefix ``hdfs://`` will be watched. Also, this policy
will only work for Hadoop versions 2.6.0 or higher.

S3 event notifications
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

It uses S3 event notifications sent from S3 to process files which have been created or modified in S3.
These notifications will be read from a AWS-SQS queue and they can be sent to SQS directly from S3 or via
AWS-SNS, either as a SNS notification or a raw message in the subscription.

Just use it when you have S3 URIs and the event notifications in the S3 bucket must be enabled to a SNS
topic or a SQS queue.

You can learn more about the properties of this policy :ref:`here<config_options-policies-s3events>`.
47 changes: 30 additions & 17 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>com.github.mmolimar.kafka.connect</groupId>
<artifactId>kafka-connect-fs</artifactId>
<version>1.2.0</version>
<version>1.3.0</version>
<packaging>jar</packaging>

<name>kafka-connect-fs</name>
Expand Down Expand Up @@ -46,31 +46,31 @@

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<kafka.version>2.6.0</kafka.version>
<confluent.version>5.5.1</confluent.version>
<kafka.version>2.7.0</kafka.version>
<confluent.version>6.1.0</confluent.version>
<hadoop.version>3.3.0</hadoop.version>
<gcs-connector.version>hadoop3-2.1.5</gcs-connector.version>
<gcs-connector.version>hadoop3-2.2.0</gcs-connector.version>
<parquet.version>1.11.1</parquet.version>
<orc.version>1.6.3</orc.version>
<univocity.version>2.9.0</univocity.version>
<orc.version>1.6.7</orc.version>
<univocity.version>2.9.1</univocity.version>
<jackson-dataformat.version>2.10.2</jackson-dataformat.version>
<cobrix.version>2.1.1</cobrix.version>
<scala.version>2.12.12</scala.version>
<cron-utils.version>9.1.1</cron-utils.version>
<cobrix.version>2.2.0</cobrix.version>
<scala.version>2.12.13</scala.version>
<cron-utils.version>9.1.3</cron-utils.version>
<jsch.version>0.1.55</jsch.version>
<junit-jupiter.version>5.7.0</junit-jupiter.version>
<junit-jupiter.version>5.7.1</junit-jupiter.version>
<easymock.version>4.2</easymock.version>
<powermock.version>2.0.7</powermock.version>
<powermock.version>2.0.9</powermock.version>
<maven-compiler.source>1.8</maven-compiler.source>
<maven-compiler.target>${maven-compiler.source}</maven-compiler.target>
<maven-jar-plugin.version>3.2.0</maven-jar-plugin.version>
<maven-compiler-plugin.version>3.8.1</maven-compiler-plugin.version>
<maven-scala-plugin.version>4.4.0</maven-scala-plugin.version>
<maven-assembly-plugin.version>3.3.0</maven-assembly-plugin.version>
<maven-jacoco-plugin.version>0.8.5</maven-jacoco-plugin.version>
<maven-jacoco-plugin.version>0.8.6</maven-jacoco-plugin.version>
<maven-coveralls-plugin.version>4.3.0</maven-coveralls-plugin.version>
<maven-surfire-plugin.version>3.0.0-M5</maven-surfire-plugin.version>
<maven-kafka-connect-plugin.version>0.11.3</maven-kafka-connect-plugin.version>
<maven-kafka-connect-plugin.version>0.12.0</maven-kafka-connect-plugin.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -139,6 +139,12 @@
<groupId>za.co.absa.cobrix</groupId>
<artifactId>cobol-parser_2.12</artifactId>
<version>${cobrix.version}</version>
<exclusions>
<exclusion>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.cronutils</groupId>
Expand All @@ -150,6 +156,11 @@
<artifactId>jsch</artifactId>
<version>${jsch.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>

<!-- Test dependencies -->
<dependency>
Expand Down Expand Up @@ -298,11 +309,12 @@
into Kafka.
The following file types are supported: Parquet, Avro, ORC, SequenceFile,
Cobol / EBCDIC, CSV, TSV, Fixed-width, JSON, XML, YAML and Text.
Cobol / EBCDIC, other binary files, CSV, TSV, Fixed-width, JSON, XML, YAML and Text.
Also, the connector has built-in support for file systems such as HDFS, S3,
Google Cloud Storage, Azure Blob Storage, Azure Data Lake Store, FTP, SFTP and
local file system, among others.
Also, the connector has built-in support for file systems such as HDFS, S3 (directly
or via messages from SNS/SQS queues due to S3 event notifications), Google Cloud
Storage, Azure Blob Storage, Azure Data Lake Store, FTP, SFTP and local file system,
among others.
]]></description>
<sourceUrl>https://github.com/mmolimar/kafka-connect-fs</sourceUrl>

Expand Down Expand Up @@ -340,6 +352,7 @@
<tag>orc</tag>
<tag>sequence</tag>
<tag>cobol</tag>
<tag>binary</tag>
<tag>csv</tag>
<tag>tsv</tag>
<tag>fixed</tag>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public String version() {
}

@Override
@SuppressWarnings("unchecked")
public void start(Map<String, String> properties) {
log.info("{} Starting FS source task...", this);
try {
Expand Down Expand Up @@ -94,7 +95,7 @@ public List<SourceRecord> poll() {
Struct record = reader.next();
// TODO change FileReader interface in the next major version
boolean hasNext = (reader instanceof AbstractFileReader) ?
((AbstractFileReader) reader).hasNextBatch() || reader.hasNext() : reader.hasNext();
((AbstractFileReader<?>) reader).hasNextBatch() || reader.hasNext() : reader.hasNext();
records.add(convert(metadata, reader.currentOffset(), !hasNext, record));
}
} catch (IOException | ConnectException e) {
Expand Down
Loading

0 comments on commit 6b03a6d

Please sign in to comment.