From 5aa6ee38c1e0b1fb1ab0b35a1de9ebd3e9123410 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 9 Nov 2023 09:48:57 -0500 Subject: [PATCH] NIFI-12340: Only call PythonBridge.onProcessorRemoved for python based processors, not all processors. Do so in a background virtual thread. Also specified explicit versions for openai dependencies to ensure correctness, since the newly released 1.x is not yet compatible with the latest langchain --- .../nifi/controller/StandardReloadComponent.java | 14 ++++++++++---- .../py4j/client/PythonProxyInvocationHandler.java | 2 +- .../src/main/python/PromptChatGPT.py | 2 +- .../src/main/python/vectorstores/requirements.txt | 4 ++-- 4 files changed, 14 insertions(+), 8 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardReloadComponent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardReloadComponent.java index e0b93a8a9d6a..49b9ff0c4f8c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardReloadComponent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardReloadComponent.java @@ -20,21 +20,22 @@ import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.components.state.StateManager; import org.apache.nifi.controller.exception.ControllerServiceInstantiationException; -import org.apache.nifi.controller.parameter.ParameterProviderInstantiationException; import org.apache.nifi.controller.flowanalysis.FlowAnalysisRuleInstantiationException; import org.apache.nifi.controller.flowrepository.FlowRepositoryClientInstantiationException; +import org.apache.nifi.controller.parameter.ParameterProviderInstantiationException; import org.apache.nifi.controller.service.ControllerServiceInvocationHandler; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.StandardConfigurationContext; import org.apache.nifi.flowanalysis.FlowAnalysisRule; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.LogRepositoryFactory; +import org.apache.nifi.logging.StandardLoggingContext; import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.nar.NarCloseable; +import org.apache.nifi.nar.PythonBundle; import org.apache.nifi.parameter.ParameterProvider; import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.SimpleProcessLogger; -import org.apache.nifi.logging.StandardLoggingContext; import org.apache.nifi.processor.StandardProcessContext; import org.apache.nifi.registry.flow.FlowRegistryClient; import org.apache.nifi.registry.flow.FlowRegistryClientNode; @@ -86,9 +87,14 @@ public void reload(final ProcessorNode existingNode, final String newType, final extensionManager.closeURLClassLoader(id, existingInstanceClassLoader); } - // Before creating a new processor, ensure that we notify the Python Bridge that we're removing the old one. + // Ensure that we notify the Python Bridge that we're removing the old processor, if the Processor is Python based. // This way we can shutdown the Process if necessary before creating a new processor (which may then spawn a new process). - flowController.getPythonBridge().onProcessorRemoved(id, existingNode.getComponentType(), existingNode.getBundleCoordinate().getVersion()); + // There is no need to wait for this to complete, and it may require communicating over local socket so run in a background (virtual) thread. + if (PythonBundle.isPythonCoordinate(bundleCoordinate)) { + Thread.ofVirtual().name("Notify Python Processor " + id + " Removed").start(() -> { + flowController.getPythonBridge().onProcessorRemoved(id, existingNode.getComponentType(), existingNode.getBundleCoordinate().getVersion()); + }); + } // create a new node with firstTimeAdded as true so lifecycle methods get fired // attempt the creation to make sure it works before firing the OnRemoved methods below diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/client/PythonProxyInvocationHandler.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/client/PythonProxyInvocationHandler.java index 7d43eaa137c9..3a13d0fcad10 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/client/PythonProxyInvocationHandler.java +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/client/PythonProxyInvocationHandler.java @@ -90,7 +90,7 @@ private Object convertOutput(final Method method, final Object output) { final Class outputType = output.getClass(); final Class[] parameters = { returnType }; final Class[] arguments = { outputType }; - final List converters = new ArrayList(); + final List converters = new ArrayList<>(); final int cost = MethodInvoker.buildConverters(converters, parameters, arguments); if (cost == -1) { diff --git a/nifi-python-extensions/nifi-openai-module/src/main/python/PromptChatGPT.py b/nifi-python-extensions/nifi-openai-module/src/main/python/PromptChatGPT.py index b459d6d32943..62528f95ee18 100644 --- a/nifi-python-extensions/nifi-openai-module/src/main/python/PromptChatGPT.py +++ b/nifi-python-extensions/nifi-openai-module/src/main/python/PromptChatGPT.py @@ -33,7 +33,7 @@ class ProcessorDetails: version = '2.0.0-SNAPSHOT' description = "Submits a prompt to ChatGPT, writing the results either to a FlowFile attribute or to the contents of the FlowFile" tags = ["text", "chatgpt", "gpt", "machine learning", "ML", "artificial intelligence", "ai", "document", "langchain"] - dependencies = ['langchain', 'openai', 'jsonpath-ng'] + dependencies = ['langchain==0.0.331', 'openai==0.28.1', 'jsonpath-ng'] MODEL = PropertyDescriptor( diff --git a/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/requirements.txt b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/requirements.txt index 6cac27591c6b..4e0669a38eee 100644 --- a/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/requirements.txt +++ b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/requirements.txt @@ -14,7 +14,7 @@ # limitations under the License. # Shared requirements -openai +openai==0.28.1 # Chroma requirements chromadb==0.4.14 @@ -26,4 +26,4 @@ requests # Pinecone requirements pinecone-client tiktoken -langchain +langchain==0.0.331