Skip to content

Commit

Permalink
Review update
Browse files Browse the repository at this point in the history
  • Loading branch information
lordgamez committed Oct 29, 2024
1 parent 4c64de6 commit 0b86be8
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 15 deletions.
22 changes: 11 additions & 11 deletions extensions/python/types/PyProcessSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ namespace org::apache::nifi::minifi::extensions::python {
namespace core = org::apache::nifi::minifi::core;

PyProcessSession::PyProcessSession(core::ProcessSession& session)
: session_(gsl::make_not_null(&session)) {
: session_(session) {
}

std::shared_ptr<core::FlowFile> PyProcessSession::get() {
auto flow_file = session_->get();
auto flow_file = session_.get();

if (flow_file == nullptr) {
return nullptr;
Expand All @@ -51,23 +51,23 @@ void PyProcessSession::transfer(const std::shared_ptr<core::FlowFile>& flow_file
throw std::runtime_error("Access of FlowFile after it has been released");
}

session_->transfer(flow_file, relationship);
session_.transfer(flow_file, relationship);
}

void PyProcessSession::transferToCustomRelationship(const std::shared_ptr<core::FlowFile>& flow_file, const std::string& relationship_name) {
if (!flow_file) {
throw std::runtime_error("Access of FlowFile after it has been released");
}

session_->transferToCustomRelationship(flow_file, relationship_name);
session_.transferToCustomRelationship(flow_file, relationship_name);
}

void PyProcessSession::read(const std::shared_ptr<core::FlowFile>& flow_file, BorrowedObject input_stream_callback) {
if (!flow_file) {
throw std::runtime_error("Access of FlowFile after it has been released");
}

session_->read(flow_file, [&input_stream_callback](const std::shared_ptr<io::InputStream>& input_stream) -> int64_t {
session_.read(flow_file, [&input_stream_callback](const std::shared_ptr<io::InputStream>& input_stream) -> int64_t {
return Long(Callable(input_stream_callback.getAttribute("process"))(std::weak_ptr(input_stream))).asInt64();
});
}
Expand All @@ -77,13 +77,13 @@ void PyProcessSession::write(const std::shared_ptr<core::FlowFile>& flow_file, B
throw std::runtime_error("Access of FlowFile after it has been released");
}

session_->write(flow_file, [&output_stream_callback](const std::shared_ptr<io::OutputStream>& output_stream) -> int64_t {
session_.write(flow_file, [&output_stream_callback](const std::shared_ptr<io::OutputStream>& output_stream) -> int64_t {
return Long(Callable(output_stream_callback.getAttribute("process"))(std::weak_ptr(output_stream))).asInt64();
});
}

std::shared_ptr<core::FlowFile> PyProcessSession::create(const std::shared_ptr<core::FlowFile>& flow_file) {
auto result = session_->create(flow_file.get());
auto result = session_.create(flow_file.get());

flow_files_.push_back(result);
return result;
Expand All @@ -94,14 +94,14 @@ std::shared_ptr<core::FlowFile> PyProcessSession::clone(const std::shared_ptr<co
throw std::runtime_error("Flow file to clone is nullptr");
}

auto result = session_->clone(*flow_file);
auto result = session_.clone(*flow_file);

flow_files_.push_back(result);
return result;
}

void PyProcessSession::remove(const std::shared_ptr<core::FlowFile>& flow_file) {
session_->remove(flow_file);
session_.remove(flow_file);
flow_files_.erase(ranges::remove_if(flow_files_, [&flow_file](const auto& ff)-> bool { return ff == flow_file; }), flow_files_.end());
}

Expand All @@ -111,15 +111,15 @@ std::string PyProcessSession::getContentsAsString(const std::shared_ptr<core::Fl
}

std::string content;
session_->read(flow_file, [&content](const std::shared_ptr<io::InputStream>& input_stream) -> int64_t {
session_.read(flow_file, [&content](const std::shared_ptr<io::InputStream>& input_stream) -> int64_t {
content.resize(input_stream->size());
return gsl::narrow<int64_t>(input_stream->read(as_writable_bytes(std::span(content))));
});
return content;
}

void PyProcessSession::putAttribute(const std::shared_ptr<core::FlowFile>& flow_file, std::string_view key, const std::string& value) {
session_->putAttribute(*flow_file, key, value);
session_.putAttribute(*flow_file, key, value);
}

extern "C" {
Expand Down
4 changes: 2 additions & 2 deletions extensions/python/types/PyProcessSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ class PyProcessSession {
void remove(const std::shared_ptr<core::FlowFile>& flow_file);
std::string getContentsAsString(const std::shared_ptr<core::FlowFile>& flow_file);
void putAttribute(const std::shared_ptr<core::FlowFile>& flow_file, std::string_view key, const std::string& value);
gsl::not_null<core::ProcessSession*> getSession() const { return session_; }
core::ProcessSession& getSession() const { return session_; }

private:
std::vector<std::shared_ptr<core::FlowFile>> flow_files_;
gsl::not_null<core::ProcessSession*> session_;
core::ProcessSession& session_;
};

struct PyProcessSessionObject {
Expand Down
4 changes: 3 additions & 1 deletion extensions/python/types/PyRecordSetReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ static PyType_Spec PyRecordSetReaderTypeSpec{
};

int PyRecordSetReader::init(PyRecordSetReader* self, PyObject* args, PyObject*) {
gsl_Expects(self && args);
PyObject* weak_ptr_capsule = nullptr;
if (!PyArg_ParseTuple(args, "O", &weak_ptr_capsule)) {
return -1;
Expand All @@ -60,6 +61,7 @@ int PyRecordSetReader::init(PyRecordSetReader* self, PyObject* args, PyObject*)
}

PyObject* PyRecordSetReader::read(PyRecordSetReader* self, PyObject* args) {
gsl_Expects(self && args);
auto record_set_reader = self->record_set_reader_.lock();
if (!record_set_reader) {
PyErr_SetString(PyExc_AttributeError, "tried reading ssl context service outside 'on_trigger'");
Expand All @@ -84,7 +86,7 @@ PyObject* PyRecordSetReader::read(PyRecordSetReader* self, PyObject* args) {
return nullptr;
}

auto read_result = record_set_reader->read(flow_file, *process_session->getSession());
auto read_result = record_set_reader->read(flow_file, process_session->getSession());
if (!read_result) {
std::string error_message = "failed to read record set with the following error: " + read_result.error().message();
PyErr_SetString(PyExc_RuntimeError, error_message.c_str());
Expand Down
4 changes: 3 additions & 1 deletion extensions/python/types/PyRecordSetWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ static PyType_Spec PyRecordSetWriterTypeSpec{
};

int PyRecordSetWriter::init(PyRecordSetWriter* self, PyObject* args, PyObject*) {
gsl_Expects(self && args);
PyObject* weak_ptr_capsule = nullptr;
if (!PyArg_ParseTuple(args, "O", &weak_ptr_capsule)) {
return -1;
Expand All @@ -60,6 +61,7 @@ int PyRecordSetWriter::init(PyRecordSetWriter* self, PyObject* args, PyObject*)
}

PyObject* PyRecordSetWriter::write(PyRecordSetWriter* self, PyObject* args) {
gsl_Expects(self && args);
auto record_set_writer = self->record_set_writer_.lock();
if (!record_set_writer) {
PyErr_SetString(PyExc_AttributeError, "tried reading record set writer outside 'on_trigger'");
Expand Down Expand Up @@ -99,7 +101,7 @@ PyObject* PyRecordSetWriter::write(PyRecordSetWriter* self, PyObject* args) {
record_set.push_back(core::Record::fromJson(document));
}

record_set_writer->write(record_set, flow_file, *process_session->getSession());
record_set_writer->write(record_set, flow_file, process_session->getSession());
Py_RETURN_NONE;
}

Expand Down

0 comments on commit 0b86be8

Please sign in to comment.