From 8f53fbf6c705d97728aff7d2aa97126ecab06b8b Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi Date: Wed, 11 Sep 2024 11:30:00 +0200 Subject: [PATCH] MINIFICPP-2454 Create SplitRecord processor Signed-off-by: Ferenc Gerlits This closes #1866 --- PROCESSORS.md | 49 +++++++- .../processors/SegmentContent.h | 3 +- .../processors/SplitRecord.cpp | 119 ++++++++++++++++++ .../processors/SplitRecord.h | 96 ++++++++++++++ .../tests/unit/SplitRecordTests.cpp | 96 ++++++++++++++ 5 files changed, 356 insertions(+), 7 deletions(-) create mode 100644 extensions/standard-processors/processors/SplitRecord.cpp create mode 100644 extensions/standard-processors/processors/SplitRecord.h create mode 100644 extensions/standard-processors/tests/unit/SplitRecordTests.cpp diff --git a/PROCESSORS.md b/PROCESSORS.md index 9a09eae09a..e2ecf5eac4 100644 --- a/PROCESSORS.md +++ b/PROCESSORS.md @@ -93,7 +93,9 @@ limitations under the License. - [RetryFlowFile](#RetryFlowFile) - [RouteOnAttribute](#RouteOnAttribute) - [RouteText](#RouteText) +- [SegmentContent](#SegmentContent) - [SplitContent](#SplitContent) +- [SplitRecord](#SplitRecord) - [SplitText](#SplitText) - [TailEventLog](#TailEventLog) - [TailFile](#TailFile) @@ -2833,12 +2835,12 @@ In the list below, the names of required properties appear in bold. Any other pr ### Output Attributes -| Attribute | Relationship | Description | -|---------------------------|--------------|-------------------------------------------------------------------------------------------------------------------------------| -| fragment.identifier | | All segments produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute | -| fragment.index | | A sequence number starting with 1 that indicates the ordering of the segments that were created from a single parent FlowFile | -| fragment.count | | The number of segments generated from the parent FlowFile | -| segment.original.filename | | The filename of the parent FlowFile | +| Attribute | Relationship | Description | +|---------------------------|--------------|------------------------------------------------------------------------------------------------------------------------------------------------------------| +| fragment.identifier | | All segments produced from the same parent FlowFile will have this attribute set to the same UUID (which is the UUID of the parent FlowFile, if available) | +| fragment.index | | A sequence number starting with 1 that indicates the ordering of the segments that were created from a single parent FlowFile | +| fragment.count | | The number of segments generated from the parent FlowFile | +| segment.original.filename | | The filename of the parent FlowFile | ## SplitContent @@ -2875,6 +2877,41 @@ In the list below, the names of required properties appear in bold. Any other pr | segment.original.filename | | The filename of the parent FlowFile | +## SplitRecord + +### Description + +Splits up an input FlowFile that is in a record-oriented data format into multiple smaller FlowFiles + +### Properties + +In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language. + +| Name | Default Value | Allowable Values | Description | +|-----------------------|---------------|------------------|-------------------------------------------------------------------------------------------------------------------------------| +| **Record Reader** | | | Specifies the Controller Service to use for reading incoming data | +| **Record Writer** | | | Specifies the Controller Service to use for writing out the records | +| **Records Per Split** | | | Specifies how many records should be written to each 'split' or 'segment' FlowFile
**Supports Expression Language: true** | + +### Relationships + +| Name | Description | +|----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| failure | If a FlowFile cannot be transformed from the configured input format to the configured output format, the unchanged FlowFile will be routed to this relationship. | +| splits | The individual 'segments' of the original FlowFile will be routed to this relationship. | +| original | Upon successfully splitting an input FlowFile, the original FlowFile will be sent to this relationship. | + +### Output Attributes + +| Attribute | Relationship | Description | +|---------------------------|--------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| record.count | splits | The number of records in the FlowFile. This is added to FlowFiles that are routed to the 'splits' Relationship. | +| fragment.identifier | splits | All split FlowFiles produced from the same parent FlowFile will have this attribute set to the same UUID (which is the UUID of the parent FlowFile, if available) | +| fragment.index | splits | A one-up number that indicates the ordering of the split FlowFiles that were created from a single parent FlowFile | +| fragment.count | splits | The number of split FlowFiles generated from the parent FlowFile | +| segment.original.filename | splits | The filename of the parent FlowFile | + + ## SplitText ### Description diff --git a/extensions/standard-processors/processors/SegmentContent.h b/extensions/standard-processors/processors/SegmentContent.h index 76e6f083c9..b0a87b037d 100644 --- a/extensions/standard-processors/processors/SegmentContent.h +++ b/extensions/standard-processors/processors/SegmentContent.h @@ -53,7 +53,8 @@ class SegmentContent final : public core::Processor { EXTENSIONAPI static constexpr auto Relationships = std::array{Original, Segments}; EXTENSIONAPI static constexpr auto FragmentIdentifierOutputAttribute = - core::OutputAttributeDefinition<0>{"fragment.identifier", {}, "All segments produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute"}; + core::OutputAttributeDefinition<0>{"fragment.identifier", {}, + "All segments produced from the same parent FlowFile will have this attribute set to the same UUID (which is the UUID of the parent FlowFile, if available)"}; EXTENSIONAPI static constexpr auto FragmentIndexOutputAttribute = core::OutputAttributeDefinition<0>{"fragment.index", {}, "A sequence number starting with 1 that indicates the ordering of the segments that were created from a single parent FlowFile"}; EXTENSIONAPI static constexpr auto FragmentCountOutputAttribute = core::OutputAttributeDefinition<0>{"fragment.count", {}, "The number of segments generated from the parent FlowFile"}; diff --git a/extensions/standard-processors/processors/SplitRecord.cpp b/extensions/standard-processors/processors/SplitRecord.cpp new file mode 100644 index 0000000000..91e3f50d4e --- /dev/null +++ b/extensions/standard-processors/processors/SplitRecord.cpp @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "SplitRecord.h" + +#include "core/Resource.h" +#include "nonstd/expected.hpp" +#include "utils/GeneralUtils.h" + +namespace org::apache::nifi::minifi::processors { +namespace { +template +std::shared_ptr getRecordSetIO(core::ProcessContext& context, const core::PropertyReference& property, const utils::Identifier& processor_uuid) { + std::string service_name; + if (context.getProperty(property, service_name) && !IsNullOrEmpty(service_name)) { + auto record_set_io = std::dynamic_pointer_cast(context.getControllerService(service_name, processor_uuid)); + if (!record_set_io) + return nullptr; + return record_set_io; + } + return nullptr; +} +} // namespace + +void SplitRecord::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { + record_set_reader_ = getRecordSetIO(context, RecordReader, getUUID()); + if (!record_set_reader_) { + throw Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Record Reader property is missing or invalid"); + } + record_set_writer_ = getRecordSetIO(context, RecordWriter, getUUID()); + if (!record_set_writer_) { + throw Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Record Writer property is missing or invalid"); + } +} + +nonstd::expected SplitRecord::readRecordsPerSplit(core::ProcessContext& context, const core::FlowFile& original_flow_file) { + std::string value; + std::size_t records_per_split = 0; + if (context.getProperty(RecordsPerSplit, value, &original_flow_file)) { + if (!core::Property::StringToInt(value, records_per_split)) { + return nonstd::make_unexpected("Failed to convert Records Per Split property to an integer"); + } else if (records_per_split < 1) { + return nonstd::make_unexpected("Records Per Split should be set to a number larger than 0"); + } + } else { + return nonstd::make_unexpected("Records Per Split should be set to a valid number larger than 0"); + } + return records_per_split; +} + +void SplitRecord::onTrigger(core::ProcessContext& context, core::ProcessSession& session) { + const auto original_flow_file = session.get(); + if (!original_flow_file) { + yield(); + return; + } + + auto records_per_split = readRecordsPerSplit(context, *original_flow_file); + if (!records_per_split) { + logger_->log_error("Failed to read Records Per Split property: {}", records_per_split.error()); + session.transfer(original_flow_file, Failure); + return; + } + + auto record_set = record_set_reader_->read(original_flow_file, session); + if (!record_set) { + logger_->log_error("Failed to read record set from flow file: {}", record_set.error().message()); + session.transfer(original_flow_file, Failure); + return; + } + + std::size_t current_index = 0; + const auto fragment_identifier = original_flow_file->getAttribute(core::SpecialFlowAttribute::UUID).value_or(utils::IdGenerator::getIdGenerator()->generate().to_string()); + std::size_t fragment_index = 0; + const auto fragment_count = utils::intdiv_ceil(record_set->size(), records_per_split.value()); + while (current_index < record_set->size()) { + auto split_flow_file = session.create(original_flow_file.get()); + if (!split_flow_file) { + logger_->log_error("Failed to create a new flow file for record set"); + session.transfer(original_flow_file, Failure); + return; + } + + core::RecordSet slice_record_set; + slice_record_set.reserve(*records_per_split); + for (std::size_t i = 0; i < records_per_split.value() && current_index < record_set->size(); ++i, ++current_index) { + slice_record_set.push_back(std::move(record_set->at(current_index))); + } + + split_flow_file->setAttribute("record.count", std::to_string(slice_record_set.size())); + split_flow_file->setAttribute("fragment.identifier", fragment_identifier); + split_flow_file->setAttribute("fragment.index", std::to_string(fragment_index)); + split_flow_file->setAttribute("fragment.count", std::to_string(fragment_count)); + split_flow_file->setAttribute("segment.original.filename", original_flow_file->getAttribute("filename").value_or("")); + + record_set_writer_->write(slice_record_set, split_flow_file, session); + session.transfer(split_flow_file, Splits); + ++fragment_index; + } + + session.transfer(original_flow_file, Original); +} + +REGISTER_RESOURCE(SplitRecord, Processor); + +} // namespace org::apache::nifi::minifi::processors diff --git a/extensions/standard-processors/processors/SplitRecord.h b/extensions/standard-processors/processors/SplitRecord.h new file mode 100644 index 0000000000..ab45b999d0 --- /dev/null +++ b/extensions/standard-processors/processors/SplitRecord.h @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "core/Annotation.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/ProcessSessionFactory.h" +#include "core/PropertyDefinition.h" +#include "core/PropertyDefinitionBuilder.h" +#include "core/RelationshipDefinition.h" +#include "core/logging/Logger.h" +#include "controllers/RecordSetReader.h" +#include "controllers/RecordSetWriter.h" +#include "core/AbstractProcessor.h" + +namespace org::apache::nifi::minifi::processors { + +class SplitRecord final : public core::AbstractProcessor { + public: + using core::AbstractProcessor::AbstractProcessor; + + EXTENSIONAPI static constexpr const char* Description = "Splits up an input FlowFile that is in a record-oriented data format into multiple smaller FlowFiles"; + + EXTENSIONAPI static constexpr auto RecordReader = core::PropertyDefinitionBuilder<>::createProperty("Record Reader") + .withDescription("Specifies the Controller Service to use for reading incoming data") + .isRequired(true) + .withAllowedTypes() + .build(); + EXTENSIONAPI static constexpr auto RecordWriter = core::PropertyDefinitionBuilder<>::createProperty("Record Writer") + .withDescription("Specifies the Controller Service to use for writing out the records") + .isRequired(true) + .withAllowedTypes() + .build(); + EXTENSIONAPI static constexpr auto RecordsPerSplit = core::PropertyDefinitionBuilder<>::createProperty("Records Per Split") + .withDescription("Specifies how many records should be written to each 'split' or 'segment' FlowFile") + .isRequired(true) + .supportsExpressionLanguage(true) + .build(); + + EXTENSIONAPI static constexpr auto Properties = std::to_array({ + RecordReader, + RecordWriter, + RecordsPerSplit + }); + + EXTENSIONAPI static constexpr auto Failure = core::RelationshipDefinition{"failure", + "If a FlowFile cannot be transformed from the configured input format to the configured output format, the unchanged FlowFile will be routed to this relationship."}; + EXTENSIONAPI static constexpr auto Splits = core::RelationshipDefinition{"splits", + "The individual 'segments' of the original FlowFile will be routed to this relationship."}; + EXTENSIONAPI static constexpr auto Original = core::RelationshipDefinition{"original", + "Upon successfully splitting an input FlowFile, the original FlowFile will be sent to this relationship."}; + EXTENSIONAPI static constexpr auto Relationships = std::array{Failure, Splits, Original}; + + EXTENSIONAPI static constexpr auto RecordCount = core::OutputAttributeDefinition<1>{"record.count", {Splits}, + "The number of records in the FlowFile. This is added to FlowFiles that are routed to the 'splits' Relationship."}; + EXTENSIONAPI static constexpr auto FragmentIdentifier = core::OutputAttributeDefinition<1>{"fragment.identifier", {Splits}, + "All split FlowFiles produced from the same parent FlowFile will have this attribute set to the same UUID (which is the UUID of the parent FlowFile, if available)"}; + EXTENSIONAPI static constexpr auto FragmentIndex = core::OutputAttributeDefinition<1>{"fragment.index", {Splits}, + "A one-up number that indicates the ordering of the split FlowFiles that were created from a single parent FlowFile"}; + EXTENSIONAPI static constexpr auto FragmentCount = core::OutputAttributeDefinition<1>{"fragment.count", {Splits}, "The number of split FlowFiles generated from the parent FlowFile"}; + EXTENSIONAPI static constexpr auto SegmentOriginalFilename = core::OutputAttributeDefinition<1>{"segment.original.filename", {Splits}, "The filename of the parent FlowFile"}; + EXTENSIONAPI static constexpr auto OutputAttributes = std::array{RecordCount, FragmentIdentifier, FragmentIndex, FragmentCount, SegmentOriginalFilename}; + + EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false; + EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false; + EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED; + EXTENSIONAPI static constexpr bool IsSingleThreaded = false; + + void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) override; + void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override; + + private: + static nonstd::expected readRecordsPerSplit(core::ProcessContext& context, const core::FlowFile& original_flow_file); + + std::shared_ptr record_set_reader_; + std::shared_ptr record_set_writer_; + + std::shared_ptr logger_{core::logging::LoggerFactory::getLogger(uuid_)}; +}; + +} // namespace org::apache::nifi::minifi::processors diff --git a/extensions/standard-processors/tests/unit/SplitRecordTests.cpp b/extensions/standard-processors/tests/unit/SplitRecordTests.cpp new file mode 100644 index 0000000000..7ae0374dcf --- /dev/null +++ b/extensions/standard-processors/tests/unit/SplitRecordTests.cpp @@ -0,0 +1,96 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "unit/TestBase.h" +#include "unit/Catch.h" +#include "processors/SplitRecord.h" +#include "unit/SingleProcessorTestController.h" +#include "utils/StringUtils.h" +#include "rapidjson/document.h" + +namespace org::apache::nifi::minifi::test { + +class SplitRecordTestController : public TestController { + public: + SplitRecordTestController() { + controller_.plan->addController("JsonRecordSetReader", "JsonRecordSetReader"); + controller_.plan->addController("JsonRecordSetWriter", "JsonRecordSetWriter"); + REQUIRE(controller_.getProcessor()); + controller_.getProcessor()->setProperty(processors::SplitRecord::RecordReader, "JsonRecordSetReader"); + controller_.getProcessor()->setProperty(processors::SplitRecord::RecordWriter, "JsonRecordSetWriter"); + } + + void verifyResults(const ProcessorTriggerResult& results, const std::vector& expected_contents) const { + REQUIRE(results.at(processors::SplitRecord::Original).size() == 1); + REQUIRE(results.at(processors::SplitRecord::Splits).size() == expected_contents.size()); + auto& split_results = results.at(processors::SplitRecord::Splits); + const auto fragment_identifier = split_results[0]->getAttribute("fragment.identifier").value(); + const auto original_filename = results.at(processors::SplitRecord::Original)[0]->getAttribute("filename").value(); + for (size_t i = 0; i < expected_contents.size(); ++i) { + rapidjson::Document result_document; + result_document.Parse(controller_.plan->getContent(split_results[i]).c_str()); + rapidjson::Document expected_document; + expected_document.Parse(expected_contents[i].c_str()); + CHECK(result_document == expected_document); + CHECK(split_results[i]->getAttribute("record.count").value() == std::to_string(minifi::utils::string::split(expected_contents[i], "},{").size())); + CHECK(split_results[i]->getAttribute("fragment.index").value() == std::to_string(i)); + CHECK(split_results[i]->getAttribute("fragment.count").value() == std::to_string(expected_contents.size())); + CHECK(split_results[i]->getAttribute("fragment.identifier").value() == fragment_identifier); + CHECK(split_results[i]->getAttribute("segment.original.filename").value() == original_filename); + } + } + + protected: + SingleProcessorTestController controller_{std::make_unique("SplitRecord")}; +}; + +TEST_CASE_METHOD(SplitRecordTestController, "Invalid Records Per Split property", "[splitrecord]") { + controller_.getProcessor()->setProperty(processors::SplitRecord::RecordsPerSplit, "invalid"); + auto results = controller_.trigger({InputFlowFileData{"{\"name\": \"John\"}\n{\"name\": \"Jill\"}"}}); + REQUIRE(results[processors::SplitRecord::Failure].size() == 1); + REQUIRE(LogTestController::getInstance().contains("Failed to convert Records Per Split property to an integer", 1s)); +} + +TEST_CASE_METHOD(SplitRecordTestController, "Records Per Split property should be greater than zero", "[splitrecord]") { + controller_.getProcessor()->setProperty(processors::SplitRecord::RecordsPerSplit, "${id}"); + auto results = controller_.trigger({InputFlowFileData{"{\"name\": \"John\"}\n{\"name\": \"Jill\"}", {{"id", "0"}}}}); + REQUIRE(results[processors::SplitRecord::Failure].size() == 1); + REQUIRE(LogTestController::getInstance().contains("Records Per Split should be set to a number larger than 0", 1s)); +} + +TEST_CASE_METHOD(SplitRecordTestController, "Invalid records in flow file result in zero splits", "[splitrecord]") { + controller_.getProcessor()->setProperty(processors::SplitRecord::RecordsPerSplit, "1"); + auto results = controller_.trigger({InputFlowFileData{ R"({"name": "John)"}}); + CHECK(results[processors::SplitRecord::Splits].empty()); + REQUIRE(results[processors::SplitRecord::Original].size() == 1); + CHECK(controller_.plan->getContent(results.at(processors::SplitRecord::Original)[0]) == "{\"name\": \"John"); +} + +TEST_CASE_METHOD(SplitRecordTestController, "Split records one by one", "[splitrecord]") { + controller_.getProcessor()->setProperty(processors::SplitRecord::RecordsPerSplit, "1"); + auto results = controller_.trigger({InputFlowFileData{"{\"name\": \"John\"}\n{\"name\": \"Jill\"}"}}); + verifyResults(results, {R"([{"name":"John"}])", R"([{"name":"Jill"}])"}); +} + +TEST_CASE_METHOD(SplitRecordTestController, "Split records two by two", "[splitrecord]") { + controller_.getProcessor()->setProperty(processors::SplitRecord::RecordsPerSplit, "2"); + auto results = controller_.trigger({InputFlowFileData{"{\"a\": \"1\", \"b\": \"2\"}\n{\"c\": \"3\"}\n{\"d\": \"4\", \"e\": \"5\"}\n{\"f\": \"6\"}\n{\"g\": \"7\", \"h\": \"8\"}\n"}}); + verifyResults(results, {R"([{"a":"1","b":"2"},{"c":"3"}])", R"([{"d":"4","e":"5"},{"f":"6"}])", R"([{"g":"7","h":"8"}])"}); +} + +} // namespace org::apache::nifi::minifi::test