Skip to content

Commit

Permalink
NIFI-12340: Only call PythonBridge.onProcessorRemoved for python base…
Browse files Browse the repository at this point in the history
…d processors, not all processors. Do so in a background virtual thread. Also specified explicit versions for openai dependencies to ensure correctness, since the newly released 1.x is not yet compatible with the latest langchain
  • Loading branch information
markap14 committed Nov 9, 2023
1 parent 3dcfc91 commit 5aa6ee3
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,22 @@
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.controller.exception.ControllerServiceInstantiationException;
import org.apache.nifi.controller.parameter.ParameterProviderInstantiationException;
import org.apache.nifi.controller.flowanalysis.FlowAnalysisRuleInstantiationException;
import org.apache.nifi.controller.flowrepository.FlowRepositoryClientInstantiationException;
import org.apache.nifi.controller.parameter.ParameterProviderInstantiationException;
import org.apache.nifi.controller.service.ControllerServiceInvocationHandler;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.flowanalysis.FlowAnalysisRule;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.LogRepositoryFactory;
import org.apache.nifi.logging.StandardLoggingContext;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.nar.PythonBundle;
import org.apache.nifi.parameter.ParameterProvider;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.logging.StandardLoggingContext;
import org.apache.nifi.processor.StandardProcessContext;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.FlowRegistryClientNode;
Expand Down Expand Up @@ -86,9 +87,14 @@ public void reload(final ProcessorNode existingNode, final String newType, final
extensionManager.closeURLClassLoader(id, existingInstanceClassLoader);
}

// Before creating a new processor, ensure that we notify the Python Bridge that we're removing the old one.
// Ensure that we notify the Python Bridge that we're removing the old processor, if the Processor is Python based.
// This way we can shutdown the Process if necessary before creating a new processor (which may then spawn a new process).
flowController.getPythonBridge().onProcessorRemoved(id, existingNode.getComponentType(), existingNode.getBundleCoordinate().getVersion());
// There is no need to wait for this to complete, and it may require communicating over local socket so run in a background (virtual) thread.
if (PythonBundle.isPythonCoordinate(bundleCoordinate)) {
Thread.ofVirtual().name("Notify Python Processor " + id + " Removed").start(() -> {
flowController.getPythonBridge().onProcessorRemoved(id, existingNode.getComponentType(), existingNode.getBundleCoordinate().getVersion());
});
}

// create a new node with firstTimeAdded as true so lifecycle methods get fired
// attempt the creation to make sure it works before firing the OnRemoved methods below
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private Object convertOutput(final Method method, final Object output) {
final Class<?> outputType = output.getClass();
final Class<?>[] parameters = { returnType };
final Class<?>[] arguments = { outputType };
final List<TypeConverter> converters = new ArrayList<TypeConverter>();
final List<TypeConverter> converters = new ArrayList<>();
final int cost = MethodInvoker.buildConverters(converters, parameters, arguments);

if (cost == -1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class ProcessorDetails:
version = '2.0.0-SNAPSHOT'
description = "Submits a prompt to ChatGPT, writing the results either to a FlowFile attribute or to the contents of the FlowFile"
tags = ["text", "chatgpt", "gpt", "machine learning", "ML", "artificial intelligence", "ai", "document", "langchain"]
dependencies = ['langchain', 'openai', 'jsonpath-ng']
dependencies = ['langchain==0.0.331', 'openai==0.28.1', 'jsonpath-ng']


MODEL = PropertyDescriptor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.

# Shared requirements
openai
openai==0.28.1

# Chroma requirements
chromadb==0.4.14
Expand All @@ -26,4 +26,4 @@ requests
# Pinecone requirements
pinecone-client
tiktoken
langchain
langchain==0.0.331

0 comments on commit 5aa6ee3

Please sign in to comment.