Skip to content

Commit

Permalink
MINIFICPP-2279 Add support for RecordTransform NiFi python processors
Browse files Browse the repository at this point in the history
Signed-off-by: Ferenc Gerlits <[email protected]>
This closes #1863
  • Loading branch information
lordgamez authored and fgerlits committed Nov 15, 2024
1 parent 71a4239 commit 153c16f
Show file tree
Hide file tree
Showing 26 changed files with 969 additions and 40 deletions.
2 changes: 2 additions & 0 deletions docker/test/integration/cluster/ImageStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ def __build_minifi_cpp_image_with_nifi_python_processors(self, python_option):
COPY CreateNothing.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/CreateNothing.py
COPY FailureWithContent.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/FailureWithContent.py
COPY TransferToOriginal.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/TransferToOriginal.py
COPY SetRecordField.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/SetRecordField.py
RUN wget {parse_document_url} --directory-prefix=/opt/minifi/minifi-current/minifi-python/nifi_python_processors && \\
wget {chunk_document_url} --directory-prefix=/opt/minifi/minifi-current/minifi-python/nifi_python_processors && \\
echo 'langchain<=0.17.0' > /opt/minifi/minifi-current/minifi-python/nifi_python_processors/requirements.txt && \\
Expand Down Expand Up @@ -205,6 +206,7 @@ def build_full_python_resource_path(resource):
build_full_python_resource_path("CreateNothing.py"),
build_full_python_resource_path("FailureWithContent.py"),
build_full_python_resource_path("TransferToOriginal.py"),
build_full_python_resource_path("SetRecordField.py"),
])

def __build_http_proxy_image(self):
Expand Down
3 changes: 2 additions & 1 deletion docker/test/integration/features/fetch_modbus_tcp.feature
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ Feature: Minifi C++ can act as a modbus tcp master

Scenario: MiNiFi can fetch data from a modbus slave
Given a FetchModbusTcp processor
And a JsonRecordSetWriter controller service is set up for FetchModbusTcp
And a JsonRecordSetWriter controller service is set up with "One Line Per Object" output grouping
And a PutFile processor with the "Directory" property set to "/tmp/output"
And the "Unit Identifier" property of the FetchModbusTcp processor is set to "255"
And the "Record Set Writer" property of the FetchModbusTcp processor is set to "JsonRecordSetWriter"
And there is an accessible PLC with modbus enabled
And PLC register has been set with h@52=123 command
And PLC register has been set with h@5678/f=1.75 command
Expand Down
24 changes: 24 additions & 0 deletions docker/test/integration/features/python.feature
Original file line number Diff line number Diff line change
Expand Up @@ -222,3 +222,27 @@ Feature: MiNiFi can use python processors in its flows
When all instances start up

Then the Minifi logs contain the following message: "Result relationship cannot be 'original', it is reserved for the original flow file, and transferred automatically in non-failure cases." in less than 60 seconds

@USE_NIFI_PYTHON_PROCESSORS
Scenario: MiNiFi C++ supports RecordTransform native python processors
Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
And a file with the content '{"group": "group1", "name": "John"}\n{"group": "group1", "name": "Jane"}\n{"group": "group2", "name": "Kyle"}\n{"name": "Zoe"}' is present in '/tmp/input'
And a file with the content '{"group": "group1", "name": "Steve"}\n{}' is present in '/tmp/input'
And a SetRecordField processor with the "Record Reader" property set to "JsonRecordSetReader"
And the "Record Writer" property of the SetRecordField processor is set to "JsonRecordSetWriter"
And a JsonRecordSetReader controller service is set up
And a JsonRecordSetWriter controller service is set up with "Array" output grouping
And a LogAttribute processor with the "FlowFiles To Log" property set to "0"
And the "Log Payload" property of the LogAttribute processor is set to "true"
And python is installed on the MiNiFi agent with a pre-created virtualenv

And the "success" relationship of the GetFile processor is connected to the SetRecordField
And the "success" relationship of the SetRecordField processor is connected to the LogAttribute

When all instances start up

Then the Minifi logs contain the following message: '[{"group":"group1","name":"John"},{"group":"group1","name":"Jane"}]' in less than 60 seconds
And the Minifi logs contain the following message: '[{"group":"group2","name":"Kyle"}]' in less than 5 seconds
And the Minifi logs contain the following message: '[{"name":"Zoe"}]' in less than 5 seconds
And the Minifi logs contain the following message: '[{"group":"group1","name":"Steve"}]' in less than 5 seconds
And the Minifi logs contain the following message: '[{}]' in less than 5 seconds
21 changes: 14 additions & 7 deletions docker/test/integration/features/steps/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from minifi.controllers.ODBCService import ODBCService
from minifi.controllers.KubernetesControllerService import KubernetesControllerService
from minifi.controllers.JsonRecordSetWriter import JsonRecordSetWriter
from minifi.controllers.JsonRecordSetReader import JsonRecordSetReader

from behave import given, then, when
from behave.model_describe import ModelDescriptor
Expand Down Expand Up @@ -271,6 +272,7 @@ def step_impl(context):


@given("a file with the content \"{content}\" is present in \"{path}\"")
@given("a file with the content '{content}' is present in '{path}'")
@then("a file with the content \"{content}\" is placed in \"{path}\"")
def step_impl(context, content, path):
context.test.add_test_data(path, content)
Expand Down Expand Up @@ -401,14 +403,19 @@ def step_impl(context, processor_name):
processor.set_property('SSL Context Service', ssl_context_service.name)


# RecordSetWriters
@given("a JsonRecordSetWriter controller service is set up for {processor_name}")
def step_impl(context, processor_name):
json_record_set_writer = JsonRecordSetWriter()
# Record set reader and writer
@given("a JsonRecordSetWriter controller service is set up with \"{}\" output grouping")
def step_impl(context, output_grouping: str):
json_record_set_writer = JsonRecordSetWriter(name="JsonRecordSetWriter", output_grouping=output_grouping)
container = context.test.acquire_container(context=context, name="minifi-cpp-flow")
container.add_controller(json_record_set_writer)

processor = context.test.get_node_by_name(processor_name)
processor.controller_services.append(json_record_set_writer)
processor.set_property('Record Set Writer', json_record_set_writer.name)

@given("a JsonRecordSetReader controller service is set up")
def step_impl(context):
json_record_set_reader = JsonRecordSetReader("JsonRecordSetReader")
container = context.test.acquire_container(context=context, name="minifi-cpp-flow")
container.add_controller(json_record_set_reader)


# Kubernetes
Expand Down
23 changes: 23 additions & 0 deletions docker/test/integration/minifi/controllers/JsonRecordSetReader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from ..core.ControllerService import ControllerService


class JsonRecordSetReader(ControllerService):
def __init__(self, name=None):
super(JsonRecordSetReader, self).__init__(name=name)
self.service_class = 'JsonRecordSetReader'
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@


class JsonRecordSetWriter(ControllerService):
def __init__(self, name=None, cert=None, key=None, ca_cert=None, passphrase=None, use_system_cert_store=None):
def __init__(self, name=None, output_grouping='One Line Per Object'):
super(JsonRecordSetWriter, self).__init__(name=name)
self.service_class = 'JsonRecordSetWriter'
self.properties['Output Grouping'] = 'One Line Per Object'
self.properties['Output Grouping'] = output_grouping
26 changes: 26 additions & 0 deletions docker/test/integration/minifi/processors/SetRecordField.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ..core.Processor import Processor


class SetRecordField(Processor):
def __init__(self, context):
super(SetRecordField, self).__init__(
context=context,
clazz='SetRecordField',
class_prefix='org.apache.nifi.minifi.processors.nifi_python_processors.',
properties={},
schedule={'scheduling strategy': 'EVENT_DRIVEN'},
auto_terminate=[])
57 changes: 57 additions & 0 deletions docker/test/integration/resources/python/SetRecordField.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from nifiapi.properties import PropertyDescriptor
from nifiapi.properties import StandardValidators
from nifiapi.properties import ExpressionLanguageScope
from nifiapi.recordtransform import RecordTransformResult
from nifiapi.recordtransform import RecordTransform


class SetRecordField(RecordTransform):
class Java:
implements = ['org.apache.nifi.python.processor.RecordTransform']

class ProcessorDetails:
version = '0.0.1-SNAPSHOT'

def __init__(self, **kwargs):
super().__init__()

def transform(self, context, record, schema, attributemap):
# Update dictionary based on the dynamic properties provided by user
for key in context.getProperties().keys():
if not key.dynamic:
continue

propname = key.name
record[propname] = context.getProperty(propname).evaluateAttributeExpressions(attributemap).getValue()

# Determine the partition
if 'group' in record:
partition = {'group': record['group']}
else:
partition = None

# Return the result
return RecordTransformResult(record=record, relationship='success', partition=partition)

def getDynamicPropertyDescriptor(self, name):
return PropertyDescriptor(
name=name,
description="Specifies the value to set for the '" + name + "' field",
expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
validators=[StandardValidators.ALWAYS_VALID]
)
4 changes: 2 additions & 2 deletions extensions/python/PYTHON.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,14 @@ The SentimentAnalysis processor will perform a Vader Sentiment Analysis. This re

## Using NiFi Python Processors

MiNiFi C++ supports the use of NiFi Python processors, that are inherited from the FlowFileTransform or the FlowFileSource base class. To use these processors, copy the Python processor module to the nifi_python_processors subdirectory of the python directory. By default, the python directory is ${minifi_root}/minifi-python. To see how to write NiFi Python processors, please refer to the Python Developer Guide under the [Apache NiFi documentation](https://nifi.apache.org/documentation/v2/).
MiNiFi C++ supports the use of NiFi Python processors, that are inherited from the FlowFileTransform, RecordTransform or the FlowFileSource base class. To use these processors, copy the Python processor module to the nifi_python_processors subdirectory of the python directory. By default, the python directory is ${minifi_root}/minifi-python. To see how to write NiFi Python processors, please refer to the Python Developer Guide under the [Apache NiFi documentation](https://nifi.apache.org/documentation/v2/).

In the flow configuration these Python processors can be referenced by their fully qualified class name, which looks like this: org.apache.nifi.minifi.processors.nifi_python_processors.<package_name>.<processor_name>. For example, the fully qualified class name of the PromptChatGPT processor implemented in the file nifi_python_processors/PromptChatGPT.py is org.apache.nifi.minifi.processors.nifi_python_processors.PromptChatGPT. If a processor is copied under a subdirectory, because it is part of a python submodule, the submodule name will be appended to the fully qualified class name. For example, if the QueryPinecone processor is implemented in the QueryPinecone.py file that is copied to nifi_python_processors/vectorstores/QueryPinecone.py, the fully qualified class name will be org.apache.nifi.minifi.processors.nifi_python_processors.vectorstores.QueryPinecone in the configuration file.

**NOTE:** The name of the NiFi Python processor file should match the class name in the file, otherwise the processor will not be found.

Due to some differences between the NiFi and MiNiFi C++ processors and implementation, there are some limitations using the NiFi Python processors:
- Record based processors are not yet supported in MiNiFi C++, so the NiFi Python processors inherited from RecordTransform are not supported.
- Schemas are not supported in MiNiFi C++, so the schemas are ignored and passed as None in the `transform` method in the RecordTransform NiFi Python processors.
- There are some validators in NiFi that are not present in MiNiFi C++, so some property validations will be missing using the NiFi Python processors.
- MiNiFi C++ only supports expression language with flow file attributes, so only FLOWFILE_ATTRIBUTES expression language scope is supported, otherwise the expression language will not be evaluated.
- MiNiFi C++ does not support property dependencies, so the property dependencies will be ignored. If a property depends on another property, the property will not be required.
Expand Down
6 changes: 5 additions & 1 deletion extensions/python/PythonBindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
#include "types/PyStateManager.h"
#include "types/PyDataConverter.h"
#include "types/PySSLContextService.h"
#include "types/PyRecordSetReader.h"
#include "types/PyRecordSetWriter.h"

namespace org::apache::nifi::minifi::extensions::python {
extern "C" {
Expand Down Expand Up @@ -62,7 +64,9 @@ PyInit_minifi_native(void) {
std::make_pair(PyInputStream::typeObject(), "InputStream"),
std::make_pair(PyOutputStream::typeObject(), "OutputStream"),
std::make_pair(PyStateManager::typeObject(), "StateManager"),
std::make_pair(PySSLContextService::typeObject(), "SSLContextService")
std::make_pair(PySSLContextService::typeObject(), "SSLContextService"),
std::make_pair(PyRecordSetReader::typeObject(), "RecordSetReader"),
std::make_pair(PyRecordSetWriter::typeObject(), "RecordSetWriter")
});

for (const auto& type : types) {
Expand Down
6 changes: 0 additions & 6 deletions extensions/python/pythonprocessors/nifiapi/flowfilesource.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,6 @@ def getAttributes(self):


class FlowFileSource(ProcessorBase):
# These will be added through the python bindings using C API
logger = None
REL_SUCCESS = None
REL_FAILURE = None
REL_ORIGINAL = None

def onTrigger(self, context: ProcessContext, session: ProcessSession):
context_proxy = ProcessContextProxy(context, self)
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,6 @@ def getAttributes(self):


class FlowFileTransform(ProcessorBase):
# These will be added through the python bindings using C API
logger = None
REL_SUCCESS = None
REL_FAILURE = None
REL_ORIGINAL = None

def onTrigger(self, context: ProcessContext, session: ProcessSession):
original_flow_file = session.get()
if not original_flow_file:
Expand Down
Loading

0 comments on commit 153c16f

Please sign in to comment.