Releases: lensesio/stream-reactor
Stream Reactor 8.1.26
** Azure Service Bus Source
Removes the offset and source maps from the Connect record created. For Azure messages with a lot of annotations, the Connect offset storage takes a lot of memory
** GCP Pub Sub Source
The source was not setting the headers for the Connect record created
Stream Reactor 8.1.25
See the release notes.
Stream Reactor 8.1.24
See the release notes.
Stream Reactor 8.1.23
DataLakes (S3, GCP) source fixes
Polling Backoff
The connector incurs high costs when there is no data available in the buckets because it continuously polls the data lake in a tight loop, as controlled by Kafka Connect.
From this version by default a backoff queue is used, introducing a standard method for backing off calls to the underlying cloud platform.
Avoid filtering by lastSeenFile where a post process action is configured
When ordering by LastModified
and a post-process action is configured, avoid filtering to the latest result.
This change avoids bugs caused by inconsistent LastModified
dates used for sorting.
If LastModified
sorting is used, ensure objects do not arrive late, or use a post-processing step to handle them.
Add a flag to populate kafka headers with the watermark partition/offset
- This adds a connector property for GCP Storage and S3 Sources:
connect.s3.source.write.watermark.header
connect.gcpstorage.source.write.watermark.header
If set to true
then the headers in the source record produced will include details of the source and line number of the file.
If set to false
(the default) then the headers won't be set.
Currently this does not apply when using the envelope mode.
Stream Reactor 8.1.22
Enhance DataLake Source Connectors: Robust State Management and Move Location Path Handling
This release addresses two critical issues:
-
Corrupted connector state when DELETE/MOVE is used: The connector is designed to store the last processed document and its location within its state for every message sent to Kafka. This mechanism ensures that the connector can resume processing from the correct point in case of a restart. However, when the connector is configured with a post-operation to move or delete processed objects within the data lake, it stores the last processed object in its state. If the connector restarts and the referenced object has been moved or deleted externally, the state points to a non-existent object, causing the connector to fail. The current workaround requires manually cleaning the state and restarting the connector, which is inefficient and error-prone.
-
Incorrect Handling of Move Location Prefixes: When configuring the move location within the data lake, if the prefix ends with a forward slash (/), it results in malformed keys like a//b. Such incorrect paths can break compatibility with query engines like Athena, which may not handle double slashes properly.
Stream Reactor 8.1.21
Azure Service Bus source
Performance improvements in the source to handle a higher throughput. The code now leverages prefetch count, and disables the auto complete. The following connector configs were added:
connect.servicebus.source.prefetch.count
The number of messages to prefetch from ServiceBusconnect.servicebus.source.complete.retries.max
The maximum number of retries to attempt while completing a messageconnect.servicebus.source.complete.retries.min.backoff.ms
The minimum duration in milliseconds for the first backoffconnect.servicebus.source.sleep.on.empty.poll.ms
The duration in milliseconds to sleep when no records are returned from the poll. This avoids a tight loop in Connect.
Stream Reactor 8.1.20
NullPointerException information lost (#174) The change brings extra logging to identify where the exception occurs. It changes the error log information. Co-authored-by: stheppi <[email protected]> Co-authored-by: Andrew Stevenson <[email protected]>
Stream Reactor 8.1.19
Fix: Prevent ElasticSearch from Skipping Records After Tombstone (#172) * Fix: Prevent ElasticSearch from Skipping Records After Tombstone Overview This pull request addresses a critical bug in ElasticSearch versions 6 (ES6) and 7 (ES7) where records following a tombstone are inadvertently skipped during the insertion process. The issue stemmed from an erroneous return statement that halted the processing of subsequent records. Background In the current implementation, when a tombstone record is encountered within a sequence of records to be written to ElasticSearch, the insertion process prematurely exits due to a return instruction. This results in all records following the tombstone being ignored, leading to incomplete data ingestion and potential inconsistencies within the ElasticSearch indices. Changes Made Refactored Insert Method: Modularization: The original insert method has been decomposed into smaller, more focused functions. This enhances code readability, maintainability, and facilitates easier testing. Detailed Log Entries: Added log statements at key points within the insertion workflow ES Error not handled: Previously the response from ElasticSearch ignored failures. With this change, if any of the batch fail, the sink will raise an exception. * Avoid sending empty requests * Fix the unit tests --------- Co-authored-by: stheppi <[email protected]>
Stream Reactor 8.1.18
🚀 New Features
Sequential Message Sending for Azure Service Bus
- Introduced a new KCQL property:
batch.enabled
(default:true
). - Users can now disable batching to send messages sequentially, addressing specific scenarios with large message sizes (e.g., >1 MB).
- Why this matters: Batching improves performance but can fail for large messages. Sequential sending ensures reliability in such cases.
- How to use: Configure
batch.enabled=false
in the KCQL mapping to enable sequential sending.
Post-Processing for Datalake Cloud Source Connectors
- Added post-processing capabilities for AWS S3 and GCP Storage source connectors ( Azure Datalake Gen 2 support coming soon).
- New KCQL properties:
post.process.action
: Defines the action (DELETE
orMOVE
) to perform on source files after successful processing.post.process.action.bucket
: Specifies the target bucket for theMOVE
action (required forMOVE
).post.process.action.prefix
: Specifies a new prefix for the file’s location when using theMOVE
action (required forMOVE
).
- Use cases:
- Free up storage space by deleting files.
- Archive or organize processed files by moving them to a new location.
- Example 1 : Delete Files:
INSERT INTO `my-bucket`
SELECT * FROM `my-topic`
PROPERTIES ('post.process.action'=`DELETE`)
- Example 2: Move files to an archive bucket:
INSERT INTO `my-bucket:archive/`
SELECT * FROM `my-topic`
PROPERTIES (
'post.process.action'=`MOVE`,
'post.process.action.bucket'=`archive-bucket`,
'post.process.action.prefix'=`archive/`
)
🛠 Dependency Updates
Updated Azure Service Bus Dependencies
azure-core
updated to version 1.54.1.azure-messaging-servicebus
updated to version 7.17.6.
These updates ensure compatibility with the latest Azure SDKs and improve stability and performance.
Upgrade Notes
- Review the new KCQL properties and configurations for Azure Service Bus and Datalake connectors.
- Ensure compatibility with the updated Azure Service Bus dependencies if you use custom extensions or integrations.
Thank you to all contributors! 🎉
Stream Reactor 8.1.17
Feat/es suport pk from key (#162) * ElasticSearch Document Primary Key The ES sink connector misses the feature of choosing the key from the Key or Header. No SMT would help move data from the Key into the Value payload so that the connector can work in the scenarios where the Key or a Header carries information to be used as part of the ElasticSearch document primary key. The change refines the TransformAndExtractPK to take the Key and Headers. It adds tests that were missing for PrimaryKeyExtractor, JsonPayloadExtractor and TransformAndExtractPK * Improve the code complexity Co-authored-by: David Sloan <[email protected]> * Improve the test for json payload to mix ing OptionValues and reduce the code required Make the _key/_value/_header a constant. * Avoid deseralising the key a json if there is not _key path in the primary keys list * Enhances the functionality of PK path extraction by allowing the path to be specified as _key or nested paths like _key.fieldA.fieldB. This change broadens the scope of supported incoming types, ensuring compatibility with all Kafka Connect Struct types, as well as schemaless input. It provides more flexibility and robustness in handling diverse data formats for primary key extraction. * Fix the unit tests and the handling of bytes/string * Remove unused import --------- Co-authored-by: stheppi <[email protected]> Co-authored-by: David Sloan <[email protected]>