diff --git a/PROCESSORS.md b/PROCESSORS.md index fbccbb9c4d..274393223b 100644 --- a/PROCESSORS.md +++ b/PROCESSORS.md @@ -2213,16 +2213,16 @@ Put a document to Couchbase Server via Key/Value access. In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language. -| Name | Default Value | Allowable Values | Description | -|------------------------------------------|---------------|----------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| **Couchbase Cluster Controller Service** | | | A Couchbase Cluster Controller Service which manages connections to a Couchbase cluster. | -| **Bucket Name** | default | | The name of bucket to access.
**Supports Expression Language: true** | -| Scope Name | | | Scope to use inside the bucket. If not specified, the _default scope is used.
**Supports Expression Language: true** | -| Collection Name | | | Collection to use inside the bucket scope. If not specified, the _default collection is used.
**Supports Expression Language: true** | -| **Document Type** | Json | Json
Binary
String | Content type to store data as. | -| Document Id | | | A static, fixed Couchbase document id, or an expression to construct the Couchbase document id. If not specified, the FlowFile UUID will be used.
**Supports Expression Language: true** | -| **Persist To** | NONE | NONE
ACTIVE
ONE
TWO
THREE
FOUR | Durability constraint about disk persistence. | -| **Replicate To** | NONE | NONE
ONE
TWO
THREE | Durability constraint about replication. | +| Name | Default Value | Allowable Values | Description | +|------------------------------------------|---------------|----------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| **Couchbase Cluster Controller Service** | | | A Couchbase Cluster Controller Service which manages connections to a Couchbase cluster. | +| **Bucket Name** | default | | The name of bucket to access.
**Supports Expression Language: true** | +| Scope Name | | | Scope to use inside the bucket. If not specified, the _default scope is used.
**Supports Expression Language: true** | +| Collection Name | | | Collection to use inside the bucket scope. If not specified, the _default collection is used.
**Supports Expression Language: true** | +| **Document Type** | Json | Json
Binary
String | Content type to store data as. | +| Document Id | | | A static, fixed Couchbase document id, or an expression to construct the Couchbase document id. If not specified, either the FlowFile uuid attribute or if that's not found a generated uuid will be used.
**Supports Expression Language: true** | +| **Persist To** | NONE | NONE
ACTIVE
ONE
TWO
THREE
FOUR | Durability constraint about disk persistence. | +| **Replicate To** | NONE | NONE
ONE
TWO
THREE | Durability constraint about replication. | ### Relationships diff --git a/extensions/couchbase/processors/PutCouchbaseKey.cpp b/extensions/couchbase/processors/PutCouchbaseKey.cpp index 77708debbe..6f8ed54ceb 100644 --- a/extensions/couchbase/processors/PutCouchbaseKey.cpp +++ b/extensions/couchbase/processors/PutCouchbaseKey.cpp @@ -56,7 +56,7 @@ void PutCouchbaseKey::onTrigger(core::ProcessContext& context, core::ProcessSess std::string document_id; if (!context.getProperty(DocumentId, document_id, flow_file.get()) || document_id.empty()) { - document_id = flow_file->getUUIDStr(); + document_id = flow_file->getAttribute(core::SpecialFlowAttribute::UUID).value_or(utils::IdGenerator::getIdGenerator()->generate().to_string()); } ::couchbase::upsert_options options; diff --git a/extensions/couchbase/processors/PutCouchbaseKey.h b/extensions/couchbase/processors/PutCouchbaseKey.h index ca79b7dba9..56ba8c52f4 100644 --- a/extensions/couchbase/processors/PutCouchbaseKey.h +++ b/extensions/couchbase/processors/PutCouchbaseKey.h @@ -100,7 +100,8 @@ class PutCouchbaseKey final : public core::AbstractProcessor { .withAllowedValues(magic_enum::enum_names()) .build(); EXTENSIONAPI static constexpr auto DocumentId = core::PropertyDefinitionBuilder<>::createProperty("Document Id") - .withDescription("A static, fixed Couchbase document id, or an expression to construct the Couchbase document id. If not specified, the FlowFile UUID will be used.") + .withDescription("A static, fixed Couchbase document id, or an expression to construct the Couchbase document id. " + "If not specified, either the FlowFile uuid attribute or if that's not found a generated uuid will be used.") .supportsExpressionLanguage(true) .build(); EXTENSIONAPI static constexpr auto PersistTo = core::PropertyDefinitionBuilder<6>::createProperty("Persist To") diff --git a/extensions/couchbase/tests/PutCouchbaseKeyTests.cpp b/extensions/couchbase/tests/PutCouchbaseKeyTests.cpp index 7adb02394b..2ffe683a8f 100644 --- a/extensions/couchbase/tests/PutCouchbaseKeyTests.cpp +++ b/extensions/couchbase/tests/PutCouchbaseKeyTests.cpp @@ -27,6 +27,8 @@ namespace org::apache::nifi::minifi::couchbase::test { REGISTER_RESOURCE(MockCouchbaseClusterService, ControllerService); +const std::string TEST_UUID = "a53f0e78-b91a-4a82-939b-639174edb00b"; + struct ExpectedCallOptions { std::string bucket_name; std::string scope_name; @@ -40,6 +42,11 @@ struct ExpectedCallOptions { class PutCouchbaseKeyTestController : public TestController { public: PutCouchbaseKeyTestController() { + LogTestController::getInstance().setDebug(); + LogTestController::getInstance().setDebug(); + LogTestController::getInstance().setTrace(); + LogTestController::getInstance().setDebug(); + LogTestController::getInstance().setDebug(); auto controller_service_node = controller_.plan->addController("MockCouchbaseClusterService", "MockCouchbaseClusterService"); mock_couchbase_cluster_service_ = std::static_pointer_cast(controller_service_node->getControllerServiceImplementation()); proc_->setProperty(processors::PutCouchbaseKey::CouchbaseClusterControllerService, "MockCouchbaseClusterService"); @@ -82,7 +89,7 @@ class PutCouchbaseKeyTestController : public TestController { auto upsert_parameters = mock_couchbase_cluster_service_->getUpsertParameters(); CHECK(upsert_parameters.document_type == expected_call_options.document_type); - std::string expected_doc_id = expected_call_options.doc_id.empty() ? flow_file->getUUID().to_string() : expected_call_options.doc_id; + auto expected_doc_id = expected_call_options.doc_id.empty() ? TEST_UUID : expected_call_options.doc_id; CHECK(upsert_parameters.document_id == expected_doc_id); CHECK(upsert_parameters.buffer == stringToByteVector(input)); @@ -110,12 +117,12 @@ class PutCouchbaseKeyTestController : public TestController { TEST_CASE_METHOD(PutCouchbaseKeyTestController, "Invalid Couchbase cluster controller service", "[putcouchbasekey]") { proc_->setProperty(processors::PutCouchbaseKey::CouchbaseClusterControllerService, "invalid"); - REQUIRE_THROWS_AS(controller_.trigger({minifi::test::InputFlowFileData{"{\"name\": \"John\"}\n{\"name\": \"Jill\"}"}}), minifi::Exception); + REQUIRE_THROWS_AS(controller_.trigger({minifi::test::InputFlowFileData{"{\"name\": \"John\"}\n{\"name\": \"Jill\"}", {{"uuid", TEST_UUID}}}}), minifi::Exception); } TEST_CASE_METHOD(PutCouchbaseKeyTestController, "Invalid bucket name", "[putcouchbasekey]") { proc_->setProperty(processors::PutCouchbaseKey::BucketName, ""); - auto results = controller_.trigger({minifi::test::InputFlowFileData{"{\"name\": \"John\"}\n{\"name\": \"Jill\"}"}}); + auto results = controller_.trigger({minifi::test::InputFlowFileData{"{\"name\": \"John\"}\n{\"name\": \"Jill\"}", {{"uuid", TEST_UUID}}}}); REQUIRE(results[processors::PutCouchbaseKey::Failure].size() == 1); REQUIRE(LogTestController::getInstance().contains("Bucket '' is invalid or empty!", 1s)); } @@ -123,7 +130,7 @@ TEST_CASE_METHOD(PutCouchbaseKeyTestController, "Invalid bucket name", "[putcouc TEST_CASE_METHOD(PutCouchbaseKeyTestController, "Put succeeeds with default properties", "[putcouchbasekey]") { proc_->setProperty(processors::PutCouchbaseKey::BucketName, "mybucket"); const std::string input = "{\"name\": \"John\"}\n{\"name\": \"Jill\"}"; - auto results = controller_.trigger({minifi::test::InputFlowFileData{input}}); + auto results = controller_.trigger({minifi::test::InputFlowFileData{input, {{"uuid", TEST_UUID}}}}); verifyResults(results, processors::PutCouchbaseKey::Success, ExpectedCallOptions{"mybucket", "_default", "_default", ::couchbase::persist_to::none, ::couchbase::replicate_to::none, CouchbaseValueType::Json, ""}, input); } @@ -137,7 +144,7 @@ TEST_CASE_METHOD(PutCouchbaseKeyTestController, "Put succeeeds with optional pro proc_->setProperty(processors::PutCouchbaseKey::PersistTo, "ACTIVE"); proc_->setProperty(processors::PutCouchbaseKey::ReplicateTo, "TWO"); const std::string input = "{\"name\": \"John\"}\n{\"name\": \"Jill\"}"; - auto results = controller_.trigger({minifi::test::InputFlowFileData{input}}); + auto results = controller_.trigger({minifi::test::InputFlowFileData{input, {{"uuid", TEST_UUID}}}}); verifyResults(results, processors::PutCouchbaseKey::Success, ExpectedCallOptions{"mybucket", "scope1", "collection1", ::couchbase::persist_to::active, ::couchbase::replicate_to::two, CouchbaseValueType::Binary, "important_doc"}, input); } @@ -146,7 +153,7 @@ TEST_CASE_METHOD(PutCouchbaseKeyTestController, "Put fails with default properti proc_->setProperty(processors::PutCouchbaseKey::BucketName, "mybucket"); mock_couchbase_cluster_service_->setUpsertError(CouchbaseErrorType::FATAL); const std::string input = "{\"name\": \"John\"}\n{\"name\": \"Jill\"}"; - auto results = controller_.trigger({minifi::test::InputFlowFileData{input}}); + auto results = controller_.trigger({minifi::test::InputFlowFileData{input, {{"uuid", TEST_UUID}}}}); verifyResults(results, processors::PutCouchbaseKey::Failure, ExpectedCallOptions{"mybucket", "_default", "_default", ::couchbase::persist_to::none, ::couchbase::replicate_to::none, CouchbaseValueType::Json, ""}, input); } @@ -155,7 +162,7 @@ TEST_CASE_METHOD(PutCouchbaseKeyTestController, "FlowFile is transferred to retr proc_->setProperty(processors::PutCouchbaseKey::BucketName, "mybucket"); mock_couchbase_cluster_service_->setUpsertError(CouchbaseErrorType::TEMPORARY); const std::string input = "{\"name\": \"John\"}\n{\"name\": \"Jill\"}"; - auto results = controller_.trigger({minifi::test::InputFlowFileData{input}}); + auto results = controller_.trigger({minifi::test::InputFlowFileData{input, {{"uuid", TEST_UUID}}}}); verifyResults(results, processors::PutCouchbaseKey::Retry, ExpectedCallOptions{"mybucket", "_default", "_default", ::couchbase::persist_to::none, ::couchbase::replicate_to::none, CouchbaseValueType::Json, ""}, input); }