Skip to content

Commit

Permalink
MINIFICPP-2454 Create SplitRecord processor
Browse files Browse the repository at this point in the history
Signed-off-by: Ferenc Gerlits <[email protected]>
This closes #1866
  • Loading branch information
lordgamez authored and fgerlits committed Nov 15, 2024
1 parent 153c16f commit 8f53fbf
Show file tree
Hide file tree
Showing 5 changed files with 356 additions and 7 deletions.
49 changes: 43 additions & 6 deletions PROCESSORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ limitations under the License.
- [RetryFlowFile](#RetryFlowFile)
- [RouteOnAttribute](#RouteOnAttribute)
- [RouteText](#RouteText)
- [SegmentContent](#SegmentContent)
- [SplitContent](#SplitContent)
- [SplitRecord](#SplitRecord)
- [SplitText](#SplitText)
- [TailEventLog](#TailEventLog)
- [TailFile](#TailFile)
Expand Down Expand Up @@ -2833,12 +2835,12 @@ In the list below, the names of required properties appear in bold. Any other pr

### Output Attributes

| Attribute | Relationship | Description |
|---------------------------|--------------|-------------------------------------------------------------------------------------------------------------------------------|
| fragment.identifier | | All segments produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute |
| fragment.index | | A sequence number starting with 1 that indicates the ordering of the segments that were created from a single parent FlowFile |
| fragment.count | | The number of segments generated from the parent FlowFile |
| segment.original.filename | | The filename of the parent FlowFile |
| Attribute | Relationship | Description |
|---------------------------|--------------|------------------------------------------------------------------------------------------------------------------------------------------------------------|
| fragment.identifier | | All segments produced from the same parent FlowFile will have this attribute set to the same UUID (which is the UUID of the parent FlowFile, if available) |
| fragment.index | | A sequence number starting with 1 that indicates the ordering of the segments that were created from a single parent FlowFile |
| fragment.count | | The number of segments generated from the parent FlowFile |
| segment.original.filename | | The filename of the parent FlowFile |


## SplitContent
Expand Down Expand Up @@ -2875,6 +2877,41 @@ In the list below, the names of required properties appear in bold. Any other pr
| segment.original.filename | | The filename of the parent FlowFile |


## SplitRecord

### Description

Splits up an input FlowFile that is in a record-oriented data format into multiple smaller FlowFiles

### Properties

In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.

| Name | Default Value | Allowable Values | Description |
|-----------------------|---------------|------------------|-------------------------------------------------------------------------------------------------------------------------------|
| **Record Reader** | | | Specifies the Controller Service to use for reading incoming data |
| **Record Writer** | | | Specifies the Controller Service to use for writing out the records |
| **Records Per Split** | | | Specifies how many records should be written to each 'split' or 'segment' FlowFile<br/>**Supports Expression Language: true** |

### Relationships

| Name | Description |
|----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| failure | If a FlowFile cannot be transformed from the configured input format to the configured output format, the unchanged FlowFile will be routed to this relationship. |
| splits | The individual 'segments' of the original FlowFile will be routed to this relationship. |
| original | Upon successfully splitting an input FlowFile, the original FlowFile will be sent to this relationship. |

### Output Attributes

| Attribute | Relationship | Description |
|---------------------------|--------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| record.count | splits | The number of records in the FlowFile. This is added to FlowFiles that are routed to the 'splits' Relationship. |
| fragment.identifier | splits | All split FlowFiles produced from the same parent FlowFile will have this attribute set to the same UUID (which is the UUID of the parent FlowFile, if available) |
| fragment.index | splits | A one-up number that indicates the ordering of the split FlowFiles that were created from a single parent FlowFile |
| fragment.count | splits | The number of split FlowFiles generated from the parent FlowFile |
| segment.original.filename | splits | The filename of the parent FlowFile |


## SplitText

### Description
Expand Down
3 changes: 2 additions & 1 deletion extensions/standard-processors/processors/SegmentContent.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ class SegmentContent final : public core::Processor {
EXTENSIONAPI static constexpr auto Relationships = std::array{Original, Segments};

EXTENSIONAPI static constexpr auto FragmentIdentifierOutputAttribute =
core::OutputAttributeDefinition<0>{"fragment.identifier", {}, "All segments produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute"};
core::OutputAttributeDefinition<0>{"fragment.identifier", {},
"All segments produced from the same parent FlowFile will have this attribute set to the same UUID (which is the UUID of the parent FlowFile, if available)"};
EXTENSIONAPI static constexpr auto FragmentIndexOutputAttribute =
core::OutputAttributeDefinition<0>{"fragment.index", {}, "A sequence number starting with 1 that indicates the ordering of the segments that were created from a single parent FlowFile"};
EXTENSIONAPI static constexpr auto FragmentCountOutputAttribute = core::OutputAttributeDefinition<0>{"fragment.count", {}, "The number of segments generated from the parent FlowFile"};
Expand Down
119 changes: 119 additions & 0 deletions extensions/standard-processors/processors/SplitRecord.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "SplitRecord.h"

#include "core/Resource.h"
#include "nonstd/expected.hpp"
#include "utils/GeneralUtils.h"

namespace org::apache::nifi::minifi::processors {
namespace {
template<typename RecordSetIO>
std::shared_ptr<RecordSetIO> getRecordSetIO(core::ProcessContext& context, const core::PropertyReference& property, const utils::Identifier& processor_uuid) {
std::string service_name;
if (context.getProperty(property, service_name) && !IsNullOrEmpty(service_name)) {
auto record_set_io = std::dynamic_pointer_cast<RecordSetIO>(context.getControllerService(service_name, processor_uuid));
if (!record_set_io)
return nullptr;
return record_set_io;
}
return nullptr;
}
} // namespace

void SplitRecord::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) {
record_set_reader_ = getRecordSetIO<core::RecordSetReader>(context, RecordReader, getUUID());
if (!record_set_reader_) {
throw Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Record Reader property is missing or invalid");
}
record_set_writer_ = getRecordSetIO<core::RecordSetWriter>(context, RecordWriter, getUUID());
if (!record_set_writer_) {
throw Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Record Writer property is missing or invalid");
}
}

nonstd::expected<std::size_t, std::string> SplitRecord::readRecordsPerSplit(core::ProcessContext& context, const core::FlowFile& original_flow_file) {
std::string value;
std::size_t records_per_split = 0;
if (context.getProperty(RecordsPerSplit, value, &original_flow_file)) {
if (!core::Property::StringToInt(value, records_per_split)) {
return nonstd::make_unexpected("Failed to convert Records Per Split property to an integer");
} else if (records_per_split < 1) {
return nonstd::make_unexpected("Records Per Split should be set to a number larger than 0");
}
} else {
return nonstd::make_unexpected("Records Per Split should be set to a valid number larger than 0");
}
return records_per_split;
}

void SplitRecord::onTrigger(core::ProcessContext& context, core::ProcessSession& session) {
const auto original_flow_file = session.get();
if (!original_flow_file) {
yield();
return;
}

auto records_per_split = readRecordsPerSplit(context, *original_flow_file);
if (!records_per_split) {
logger_->log_error("Failed to read Records Per Split property: {}", records_per_split.error());
session.transfer(original_flow_file, Failure);
return;
}

auto record_set = record_set_reader_->read(original_flow_file, session);
if (!record_set) {
logger_->log_error("Failed to read record set from flow file: {}", record_set.error().message());
session.transfer(original_flow_file, Failure);
return;
}

std::size_t current_index = 0;
const auto fragment_identifier = original_flow_file->getAttribute(core::SpecialFlowAttribute::UUID).value_or(utils::IdGenerator::getIdGenerator()->generate().to_string());
std::size_t fragment_index = 0;
const auto fragment_count = utils::intdiv_ceil(record_set->size(), records_per_split.value());
while (current_index < record_set->size()) {
auto split_flow_file = session.create(original_flow_file.get());
if (!split_flow_file) {
logger_->log_error("Failed to create a new flow file for record set");
session.transfer(original_flow_file, Failure);
return;
}

core::RecordSet slice_record_set;
slice_record_set.reserve(*records_per_split);
for (std::size_t i = 0; i < records_per_split.value() && current_index < record_set->size(); ++i, ++current_index) {
slice_record_set.push_back(std::move(record_set->at(current_index)));
}

split_flow_file->setAttribute("record.count", std::to_string(slice_record_set.size()));
split_flow_file->setAttribute("fragment.identifier", fragment_identifier);
split_flow_file->setAttribute("fragment.index", std::to_string(fragment_index));
split_flow_file->setAttribute("fragment.count", std::to_string(fragment_count));
split_flow_file->setAttribute("segment.original.filename", original_flow_file->getAttribute("filename").value_or(""));

record_set_writer_->write(slice_record_set, split_flow_file, session);
session.transfer(split_flow_file, Splits);
++fragment_index;
}

session.transfer(original_flow_file, Original);
}

REGISTER_RESOURCE(SplitRecord, Processor);

} // namespace org::apache::nifi::minifi::processors
96 changes: 96 additions & 0 deletions extensions/standard-processors/processors/SplitRecord.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include "core/Annotation.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "core/ProcessSessionFactory.h"
#include "core/PropertyDefinition.h"
#include "core/PropertyDefinitionBuilder.h"
#include "core/RelationshipDefinition.h"
#include "core/logging/Logger.h"
#include "controllers/RecordSetReader.h"
#include "controllers/RecordSetWriter.h"
#include "core/AbstractProcessor.h"

namespace org::apache::nifi::minifi::processors {

class SplitRecord final : public core::AbstractProcessor<SplitRecord> {
public:
using core::AbstractProcessor<SplitRecord>::AbstractProcessor;

EXTENSIONAPI static constexpr const char* Description = "Splits up an input FlowFile that is in a record-oriented data format into multiple smaller FlowFiles";

EXTENSIONAPI static constexpr auto RecordReader = core::PropertyDefinitionBuilder<>::createProperty("Record Reader")
.withDescription("Specifies the Controller Service to use for reading incoming data")
.isRequired(true)
.withAllowedTypes<minifi::core::RecordSetReader>()
.build();
EXTENSIONAPI static constexpr auto RecordWriter = core::PropertyDefinitionBuilder<>::createProperty("Record Writer")
.withDescription("Specifies the Controller Service to use for writing out the records")
.isRequired(true)
.withAllowedTypes<minifi::core::RecordSetWriter>()
.build();
EXTENSIONAPI static constexpr auto RecordsPerSplit = core::PropertyDefinitionBuilder<>::createProperty("Records Per Split")
.withDescription("Specifies how many records should be written to each 'split' or 'segment' FlowFile")
.isRequired(true)
.supportsExpressionLanguage(true)
.build();

EXTENSIONAPI static constexpr auto Properties = std::to_array<core::PropertyReference>({
RecordReader,
RecordWriter,
RecordsPerSplit
});

EXTENSIONAPI static constexpr auto Failure = core::RelationshipDefinition{"failure",
"If a FlowFile cannot be transformed from the configured input format to the configured output format, the unchanged FlowFile will be routed to this relationship."};
EXTENSIONAPI static constexpr auto Splits = core::RelationshipDefinition{"splits",
"The individual 'segments' of the original FlowFile will be routed to this relationship."};
EXTENSIONAPI static constexpr auto Original = core::RelationshipDefinition{"original",
"Upon successfully splitting an input FlowFile, the original FlowFile will be sent to this relationship."};
EXTENSIONAPI static constexpr auto Relationships = std::array{Failure, Splits, Original};

EXTENSIONAPI static constexpr auto RecordCount = core::OutputAttributeDefinition<1>{"record.count", {Splits},
"The number of records in the FlowFile. This is added to FlowFiles that are routed to the 'splits' Relationship."};
EXTENSIONAPI static constexpr auto FragmentIdentifier = core::OutputAttributeDefinition<1>{"fragment.identifier", {Splits},
"All split FlowFiles produced from the same parent FlowFile will have this attribute set to the same UUID (which is the UUID of the parent FlowFile, if available)"};
EXTENSIONAPI static constexpr auto FragmentIndex = core::OutputAttributeDefinition<1>{"fragment.index", {Splits},
"A one-up number that indicates the ordering of the split FlowFiles that were created from a single parent FlowFile"};
EXTENSIONAPI static constexpr auto FragmentCount = core::OutputAttributeDefinition<1>{"fragment.count", {Splits}, "The number of split FlowFiles generated from the parent FlowFile"};
EXTENSIONAPI static constexpr auto SegmentOriginalFilename = core::OutputAttributeDefinition<1>{"segment.original.filename", {Splits}, "The filename of the parent FlowFile"};
EXTENSIONAPI static constexpr auto OutputAttributes = std::array<core::OutputAttributeReference, 5>{RecordCount, FragmentIdentifier, FragmentIndex, FragmentCount, SegmentOriginalFilename};

EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED;
EXTENSIONAPI static constexpr bool IsSingleThreaded = false;

void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) override;
void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override;

private:
static nonstd::expected<std::size_t, std::string> readRecordsPerSplit(core::ProcessContext& context, const core::FlowFile& original_flow_file);

std::shared_ptr<core::RecordSetReader> record_set_reader_;
std::shared_ptr<core::RecordSetWriter> record_set_writer_;

std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<SplitRecord>::getLogger(uuid_)};
};

} // namespace org::apache::nifi::minifi::processors
Loading

0 comments on commit 8f53fbf

Please sign in to comment.