From bb6b8d0950a2e1a590ff05e9b59a935b320ba5ae Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi Date: Mon, 4 Nov 2024 15:25:57 +0100 Subject: [PATCH 1/3] MINIFICPP-2401 Make RocksDB verify_checksums read option configurable --- CONFIGURE.md | 7 +++++++ conf/minifi.properties | 3 +++ .../rocksdb-repos/DatabaseContentRepository.cpp | 14 ++++++++++---- .../rocksdb-repos/DatabaseContentRepository.h | 1 + extensions/rocksdb-repos/FlowFileLoader.cpp | 6 ++++-- extensions/rocksdb-repos/FlowFileLoader.h | 3 ++- extensions/rocksdb-repos/FlowFileRepository.cpp | 13 ++++++++++--- extensions/rocksdb-repos/ProvenanceRepository.cpp | 7 ++++++- extensions/rocksdb-repos/RocksDbRepository.cpp | 4 +++- extensions/rocksdb-repos/RocksDbRepository.h | 1 + extensions/rocksdb-repos/RocksDbStream.cpp | 9 ++++++--- extensions/rocksdb-repos/RocksDbStream.h | 2 +- .../controllers/RocksDbStateStorage.cpp | 14 +++++++++++--- .../controllers/RocksDbStateStorage.h | 1 + libminifi/include/properties/Configuration.h | 1 + libminifi/src/Configuration.cpp | 1 + 16 files changed, 68 insertions(+), 19 deletions(-) diff --git a/CONFIGURE.md b/CONFIGURE.md index c645a5bc4b..14f6629432 100644 --- a/CONFIGURE.md +++ b/CONFIGURE.md @@ -533,6 +533,13 @@ RocksDB has an option to set synchronous writes for its database, ensuring that # in minifi.properties nifi.content.repository.rocksdb.use.synchronous.writes=true +### Configuring checksum verification for RocksDB reads + +RocksDB has an option to verify checksums for its database reads. This option is set to false by default for better performance. If you prefer to enable checksum verification you can set this option to true. + + # in minifi.properties + nifi.rocksdb.read.verify.checksums=false + ### Global RocksDB options There are a few options for RocksDB that are set for all used RocksDB databases in MiNiFi: diff --git a/conf/minifi.properties b/conf/minifi.properties index 425e6e64b0..46cd211489 100644 --- a/conf/minifi.properties +++ b/conf/minifi.properties @@ -38,6 +38,9 @@ nifi.content.repository.class.name=DatabaseContentRepository # Use synchronous writes for the RocksDB content repository. Disable for better write performance, if data loss is acceptable in case of the host crashing. # nifi.content.repository.rocksdb.use.synchronous.writes=true +# Verify checksum of the data read from the RocksDB content repository. Disabled by default for better read performance. +# nifi.rocksdb.read.verify.checksums=false + ## Relates to the internal workings of the rocksdb backend # nifi.flowfile.repository.rocksdb.compaction.period=2 min # nifi.database.content.repository.rocksdb.compaction.period=2 min diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp b/extensions/rocksdb-repos/DatabaseContentRepository.cpp index 7f22d9e736..577fdd66a8 100644 --- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp +++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp @@ -82,6 +82,8 @@ bool DatabaseContentRepository::initialize(const std::shared_ptrget(Configure::nifi_content_repository_rocksdb_use_synchronous_writes).value_or("true") != "false"; + verify_checksums_in_rocksdb_reads_ = configuration->get(Configure::nifi_rocksdb_read_verify_checksums).value_or("false") == "true"; + logger_->log_debug("{} checksum verification in DatabaseContentRepository", verify_checksums_in_rocksdb_reads_ ? "Using" : "Not using"); return is_valid_; } @@ -195,7 +197,7 @@ std::shared_ptr DatabaseContentRepository::read(const minifi::Re // we can simply return a nullptr, which is also valid from the API when this stream is not valid. if (!is_valid_ || !db_) return nullptr; - return std::make_shared(claim.getContentFullPath(), gsl::make_not_null(db_.get()), false); + return std::make_shared(claim.getContentFullPath(), gsl::make_not_null(db_.get()), false, nullptr, true, verify_checksums_in_rocksdb_reads_); } bool DatabaseContentRepository::exists(const minifi::ResourceClaim &streamId) { @@ -205,7 +207,9 @@ bool DatabaseContentRepository::exists(const minifi::ResourceClaim &streamId) { } std::string value; rocksdb::Status status; - status = opendb->Get(rocksdb::ReadOptions(), streamId.getContentFullPath(), &value); + rocksdb::ReadOptions options; + options.verify_checksums = verify_checksums_in_rocksdb_reads_; + status = opendb->Get(options, streamId.getContentFullPath(), &value); if (status.ok()) { logger_->log_debug("{} exists", streamId.getContentFullPath()); return true; @@ -288,7 +292,7 @@ std::shared_ptr DatabaseContentRepository::write(const minifi::R if (!is_valid_ || !db_) return nullptr; // append is already supported in all modes - return std::make_shared(claim.getContentFullPath(), gsl::make_not_null(db_.get()), true, batch); + return std::make_shared(claim.getContentFullPath(), gsl::make_not_null(db_.get()), true, batch, true, verify_checksums_in_rocksdb_reads_); } void DatabaseContentRepository::clearOrphans() { @@ -302,7 +306,9 @@ void DatabaseContentRepository::clearOrphans() { return; } std::vector keys_to_be_deleted; - auto it = opendb->NewIterator(rocksdb::ReadOptions()); + rocksdb::ReadOptions options; + options.verify_checksums = verify_checksums_in_rocksdb_reads_; + auto it = opendb->NewIterator(options); for (it->SeekToFirst(); it->Valid(); it->Next()) { auto key = it->key().ToString(); std::lock_guard lock(count_map_mutex_); diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.h b/extensions/rocksdb-repos/DatabaseContentRepository.h index f8f5f48cba..eb78204f5f 100644 --- a/extensions/rocksdb-repos/DatabaseContentRepository.h +++ b/extensions/rocksdb-repos/DatabaseContentRepository.h @@ -106,6 +106,7 @@ class DatabaseContentRepository : public core::ContentRepository { std::vector keys_to_delete_; std::unique_ptr gc_thread_; bool use_synchronous_writes_ = true; + bool verify_checksums_in_rocksdb_reads_ = false; }; } // namespace org::apache::nifi::minifi::core::repository diff --git a/extensions/rocksdb-repos/FlowFileLoader.cpp b/extensions/rocksdb-repos/FlowFileLoader.cpp index f2576a47fd..1ab12fd49f 100644 --- a/extensions/rocksdb-repos/FlowFileLoader.cpp +++ b/extensions/rocksdb-repos/FlowFileLoader.cpp @@ -29,10 +29,11 @@ namespace org::apache::nifi::minifi { -FlowFileLoader::FlowFileLoader(gsl::not_null db, std::shared_ptr content_repo) +FlowFileLoader::FlowFileLoader(gsl::not_null db, std::shared_ptr content_repo, bool verify_checksums_in_rocksdb_reads) : db_(db), content_repo_(std::move(content_repo)), - logger_(core::logging::LoggerFactory::getLogger()) {} + logger_(core::logging::LoggerFactory::getLogger()), + verify_checksums_in_rocksdb_reads_(verify_checksums_in_rocksdb_reads) {} FlowFileLoader::~FlowFileLoader() { stop(); @@ -74,6 +75,7 @@ utils::TaskRescheduleInfo FlowFileLoader::loadImpl(const std::vector> serialized_keys; serialized_keys.reserve(flow_files.size()); for (const auto& item : flow_files) { diff --git a/extensions/rocksdb-repos/FlowFileLoader.h b/extensions/rocksdb-repos/FlowFileLoader.h index 2287c7ddfa..79277f6573 100644 --- a/extensions/rocksdb-repos/FlowFileLoader.h +++ b/extensions/rocksdb-repos/FlowFileLoader.h @@ -39,7 +39,7 @@ class FlowFileLoader { static constexpr size_t thread_count_ = 1; public: - FlowFileLoader(gsl::not_null db, std::shared_ptr content_repo); + FlowFileLoader(gsl::not_null db, std::shared_ptr content_repo, bool verify_checksums_in_rocksdb_reads); ~FlowFileLoader(); @@ -60,6 +60,7 @@ class FlowFileLoader { // this ownership could be removed if that changes std::shared_ptr content_repo_; std::shared_ptr logger_; + bool verify_checksums_in_rocksdb_reads_ = false; }; } // namespace org::apache::nifi::minifi diff --git a/extensions/rocksdb-repos/FlowFileRepository.cpp b/extensions/rocksdb-repos/FlowFileRepository.cpp index e6ba15155c..5d3f2fefeb 100644 --- a/extensions/rocksdb-repos/FlowFileRepository.cpp +++ b/extensions/rocksdb-repos/FlowFileRepository.cpp @@ -89,7 +89,9 @@ void FlowFileRepository::deserializeFlowFilesWithNoContentClaim(minifi::internal return; } std::vector values; - auto multistatus = opendb.MultiGet(rocksdb::ReadOptions{}, keys, &values); + rocksdb::ReadOptions options; + options.verify_checksums = verify_checksums_in_rocksdb_reads_; + auto multistatus = opendb.MultiGet(options, keys, &values); gsl_Expects(keys.size() == values.size() && values.size() == multistatus.size()); for (size_t i = 0; i < keys.size(); ++i) { @@ -142,7 +144,9 @@ void FlowFileRepository::initialize_repository() { } logger_->log_info("Reading existing flow files from database"); - const auto it = opendb->NewIterator(rocksdb::ReadOptions()); + rocksdb::ReadOptions options; + options.verify_checksums = verify_checksums_in_rocksdb_reads_; + const auto it = opendb->NewIterator(options); for (it->SeekToFirst(); it->Valid(); it->Next()) { utils::Identifier container_id; auto eventRead = FlowFileRecord::DeSerialize(gsl::make_span(it->value()).as_span(), content_repo_, container_id); @@ -180,7 +184,7 @@ void FlowFileRepository::initialize_repository() { void FlowFileRepository::loadComponent(const std::shared_ptr &content_repo) { content_repo_ = content_repo; - swap_loader_ = std::make_unique(gsl::make_not_null(db_.get()), content_repo_); + swap_loader_ = std::make_unique(gsl::make_not_null(db_.get()), content_repo_, verify_checksums_in_rocksdb_reads_); initialize_repository(); } @@ -208,6 +212,9 @@ bool FlowFileRepository::initialize(const std::shared_ptr &configure) const auto encrypted_env = createEncryptingEnv(utils::crypto::EncryptionManager{configure->getHome()}, DbEncryptionOptions{directory_, ENCRYPTION_KEY_NAME}); logger_->log_info("Using {} FlowFileRepository", encrypted_env ? "encrypted" : "plaintext"); + verify_checksums_in_rocksdb_reads_ = configure->get(Configure::nifi_rocksdb_read_verify_checksums).value_or("false") == "true"; + logger_->log_debug("{} checksum verification in FlowFileRepository", verify_checksums_in_rocksdb_reads_ ? "Using" : "Not using"); + auto db_options = [encrypted_env] (minifi::internal::Writable& options) { minifi::internal::setCommonRocksDbOptions(options); if (encrypted_env) { diff --git a/extensions/rocksdb-repos/ProvenanceRepository.cpp b/extensions/rocksdb-repos/ProvenanceRepository.cpp index 68f404fe31..df1a9db80c 100644 --- a/extensions/rocksdb-repos/ProvenanceRepository.cpp +++ b/extensions/rocksdb-repos/ProvenanceRepository.cpp @@ -39,6 +39,9 @@ bool ProvenanceRepository::initialize(const std::shared_ptrlog_debug("MiNiFi Provenance Max Storage Time: [{}]", max_partition_millis_); + verify_checksums_in_rocksdb_reads_ = config->get(Configure::nifi_rocksdb_read_verify_checksums).value_or("false") == "true"; + logger_->log_debug("{} checksum verification in ProvenanceRepository", verify_checksums_in_rocksdb_reads_ ? "Using" : "Not using"); + auto db_options = [] (minifi::internal::Writable& db_opts) { minifi::internal::setCommonRocksDbOptions(db_opts); }; @@ -75,7 +78,9 @@ bool ProvenanceRepository::getElements(std::vector it(opendb->NewIterator(rocksdb::ReadOptions())); + rocksdb::ReadOptions options; + options.verify_checksums = verify_checksums_in_rocksdb_reads_; + std::unique_ptr it(opendb->NewIterator(options)); size_t requested_batch = max_size; max_size = 0; for (it->SeekToFirst(); it->Valid(); it->Next()) { diff --git a/extensions/rocksdb-repos/RocksDbRepository.cpp b/extensions/rocksdb-repos/RocksDbRepository.cpp index 5f44050f1f..f09d0f9360 100644 --- a/extensions/rocksdb-repos/RocksDbRepository.cpp +++ b/extensions/rocksdb-repos/RocksDbRepository.cpp @@ -79,7 +79,9 @@ bool RocksDbRepository::Get(const std::string &key, std::string &value) { if (!opendb) { return false; } - return opendb->Get(rocksdb::ReadOptions(), key, &value).ok(); + rocksdb::ReadOptions options; + options.verify_checksums = verify_checksums_in_rocksdb_reads_; + return opendb->Get(options, key, &value).ok(); } uint64_t RocksDbRepository::getRepositorySize() const { diff --git a/extensions/rocksdb-repos/RocksDbRepository.h b/extensions/rocksdb-repos/RocksDbRepository.h index c4e606cfc0..f29ff97585 100644 --- a/extensions/rocksdb-repos/RocksDbRepository.h +++ b/extensions/rocksdb-repos/RocksDbRepository.h @@ -59,6 +59,7 @@ class RocksDbRepository : public ThreadedRepository { std::unique_ptr db_; std::shared_ptr logger_; std::thread thread_; + bool verify_checksums_in_rocksdb_reads_ = false; }; } // namespace org::apache::nifi::minifi::core::repository diff --git a/extensions/rocksdb-repos/RocksDbStream.cpp b/extensions/rocksdb-repos/RocksDbStream.cpp index 1ba4cdec99..7889622b5a 100644 --- a/extensions/rocksdb-repos/RocksDbStream.cpp +++ b/extensions/rocksdb-repos/RocksDbStream.cpp @@ -28,14 +28,17 @@ namespace org::apache::nifi::minifi::io { -RocksDbStream::RocksDbStream(std::string path, gsl::not_null db, bool write_enable, minifi::internal::WriteBatch* batch, bool use_synchronous_writes) +RocksDbStream::RocksDbStream(std::string path, gsl::not_null db, bool write_enable, minifi::internal::WriteBatch* batch, + bool use_synchronous_writes, bool verify_checksums) : BaseStream(), path_(std::move(path)), write_enable_(write_enable), db_(db), - exists_([this] { + exists_([this, verify_checksums] { auto opendb = db_->open(); - return opendb && opendb->Get(rocksdb::ReadOptions(), path_, &value_).ok(); + rocksdb::ReadOptions options; + options.verify_checksums = verify_checksums; + return opendb && opendb->Get(options, path_, &value_).ok(); }()), offset_(0), batch_(batch), diff --git a/extensions/rocksdb-repos/RocksDbStream.h b/extensions/rocksdb-repos/RocksDbStream.h index ec903ee4f6..8333115157 100644 --- a/extensions/rocksdb-repos/RocksDbStream.h +++ b/extensions/rocksdb-repos/RocksDbStream.h @@ -45,7 +45,7 @@ class RocksDbStream : public io::BaseStream { * It must already be initialized for read and write. */ explicit RocksDbStream(std::string path, gsl::not_null db, bool write_enable = false, - minifi::internal::WriteBatch* batch = nullptr, bool use_synchronous_writes = true); + minifi::internal::WriteBatch* batch = nullptr, bool use_synchronous_writes = true, bool verify_checksums = false); ~RocksDbStream() override { close(); diff --git a/extensions/rocksdb-repos/controllers/RocksDbStateStorage.cpp b/extensions/rocksdb-repos/controllers/RocksDbStateStorage.cpp index 965165ced0..a086c3bca9 100644 --- a/extensions/rocksdb-repos/controllers/RocksDbStateStorage.cpp +++ b/extensions/rocksdb-repos/controllers/RocksDbStateStorage.cpp @@ -93,6 +93,8 @@ void RocksDbStateStorage::onEnable() { default_write_options.sync = true; } + verify_checksums_in_rocksdb_reads_ = configuration_->get(Configure::nifi_rocksdb_read_verify_checksums).value_or("false") == "true"; + logger_->log_trace("Enabled RocksDbStateStorage"); } @@ -125,7 +127,9 @@ bool RocksDbStateStorage::get(const std::string& key, std::string& value) { if (!opendb) { return false; } - rocksdb::Status status = opendb->Get(rocksdb::ReadOptions(), key, &value); + rocksdb::ReadOptions options; + options.verify_checksums = verify_checksums_in_rocksdb_reads_; + rocksdb::Status status = opendb->Get(options, key, &value); if (!status.ok()) { if (status.getState() != nullptr) { logger_->log_error("Failed to Get key {} from RocksDB database at {}, error: {}", key.c_str(), directory_.c_str(), status.getState()); @@ -146,7 +150,9 @@ bool RocksDbStateStorage::get(std::unordered_map& kvs) return false; } kvs.clear(); - auto it = opendb->NewIterator(rocksdb::ReadOptions()); + rocksdb::ReadOptions options; + options.verify_checksums = verify_checksums_in_rocksdb_reads_; + auto it = opendb->NewIterator(options); for (it->SeekToFirst(); it->Valid(); it->Next()) { kvs.emplace(it->key().ToString(), it->value().ToString()); } @@ -181,7 +187,9 @@ bool RocksDbStateStorage::clear() { if (!opendb) { return false; } - auto it = opendb->NewIterator(rocksdb::ReadOptions()); + rocksdb::ReadOptions options; + options.verify_checksums = verify_checksums_in_rocksdb_reads_; + auto it = opendb->NewIterator(options); for (it->SeekToFirst(); it->Valid(); it->Next()) { rocksdb::Status status = opendb->Delete(default_write_options, it->key()); if (!status.ok()) { diff --git a/extensions/rocksdb-repos/controllers/RocksDbStateStorage.h b/extensions/rocksdb-repos/controllers/RocksDbStateStorage.h index c1dc81e12f..d4b4a8ad8e 100644 --- a/extensions/rocksdb-repos/controllers/RocksDbStateStorage.h +++ b/extensions/rocksdb-repos/controllers/RocksDbStateStorage.h @@ -93,6 +93,7 @@ class RocksDbStateStorage : public KeyValueStateStorage { std::unique_ptr db_; rocksdb::WriteOptions default_write_options; AutoPersistor auto_persistor_; + bool verify_checksums_in_rocksdb_reads_ = false; std::shared_ptr logger_ = core::logging::LoggerFactory::getLogger(); }; diff --git a/libminifi/include/properties/Configuration.h b/libminifi/include/properties/Configuration.h index cea438ea92..f8d38bd8e0 100644 --- a/libminifi/include/properties/Configuration.h +++ b/libminifi/include/properties/Configuration.h @@ -78,6 +78,7 @@ class Configuration : public Properties { static constexpr const char *nifi_dbcontent_repository_rocksdb_compaction_period = "nifi.database.content.repository.rocksdb.compaction.period"; static constexpr const char *nifi_dbcontent_repository_purge_period = "nifi.database.content.repository.purge.period"; static constexpr const char *nifi_content_repository_rocksdb_use_synchronous_writes = "nifi.content.repository.rocksdb.use.synchronous.writes"; + static constexpr const char *nifi_rocksdb_read_verify_checksums = "nifi.rocksdb.read.verify.checksums"; static constexpr const char *nifi_remote_input_secure = "nifi.remote.input.secure"; static constexpr const char *nifi_security_need_ClientAuth = "nifi.security.need.ClientAuth"; diff --git a/libminifi/src/Configuration.cpp b/libminifi/src/Configuration.cpp index 116746c9c5..442d4a4175 100644 --- a/libminifi/src/Configuration.cpp +++ b/libminifi/src/Configuration.cpp @@ -54,6 +54,7 @@ const std::unordered_map Date: Tue, 5 Nov 2024 15:05:47 +0100 Subject: [PATCH 2/3] Configure checksum verification on repository level --- CONFIGURE.md | 5 ++++- conf/minifi.properties | 7 +++++-- extensions/rocksdb-repos/DatabaseContentRepository.cpp | 2 +- extensions/rocksdb-repos/FlowFileRepository.cpp | 2 +- extensions/rocksdb-repos/ProvenanceRepository.cpp | 2 +- .../rocksdb-repos/controllers/RocksDbStateStorage.cpp | 2 +- libminifi/include/properties/Configuration.h | 5 ++++- libminifi/src/Configuration.cpp | 5 ++++- 8 files changed, 21 insertions(+), 9 deletions(-) diff --git a/CONFIGURE.md b/CONFIGURE.md index 14f6629432..51e6219ae2 100644 --- a/CONFIGURE.md +++ b/CONFIGURE.md @@ -538,7 +538,10 @@ RocksDB has an option to set synchronous writes for its database, ensuring that RocksDB has an option to verify checksums for its database reads. This option is set to false by default for better performance. If you prefer to enable checksum verification you can set this option to true. # in minifi.properties - nifi.rocksdb.read.verify.checksums=false + nifi.content.repository.rocksdb.read.verify.checksums=false + nifi.flowfile.repository.rocksdb.read.verify.checksums=false + nifi.provenance.repository.rocksdb.read.verify.checksums=false + nifi.rocksdb.state.storage.read.verify.checksums=false ### Global RocksDB options diff --git a/conf/minifi.properties b/conf/minifi.properties index 46cd211489..1a18fb076d 100644 --- a/conf/minifi.properties +++ b/conf/minifi.properties @@ -38,8 +38,11 @@ nifi.content.repository.class.name=DatabaseContentRepository # Use synchronous writes for the RocksDB content repository. Disable for better write performance, if data loss is acceptable in case of the host crashing. # nifi.content.repository.rocksdb.use.synchronous.writes=true -# Verify checksum of the data read from the RocksDB content repository. Disabled by default for better read performance. -# nifi.rocksdb.read.verify.checksums=false +# Verify checksum of the data read from a RocksDB repository. Disabled by default for better read performance. +# nifi.content.repository.rocksdb.read.verify.checksums=false +# nifi.flowfile.repository.rocksdb.read.verify.checksums=false +# nifi.provenance.repository.rocksdb.read.verify.checksums=false +# nifi.rocksdb.state.storage.read.verify.checksums=false ## Relates to the internal workings of the rocksdb backend # nifi.flowfile.repository.rocksdb.compaction.period=2 min diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp b/extensions/rocksdb-repos/DatabaseContentRepository.cpp index 577fdd66a8..f7d75aa766 100644 --- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp +++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp @@ -82,7 +82,7 @@ bool DatabaseContentRepository::initialize(const std::shared_ptrget(Configure::nifi_content_repository_rocksdb_use_synchronous_writes).value_or("true") != "false"; - verify_checksums_in_rocksdb_reads_ = configuration->get(Configure::nifi_rocksdb_read_verify_checksums).value_or("false") == "true"; + verify_checksums_in_rocksdb_reads_ = configuration->get(Configure::nifi_content_repository_rocksdb_read_verify_checksums).value_or("false") == "true"; logger_->log_debug("{} checksum verification in DatabaseContentRepository", verify_checksums_in_rocksdb_reads_ ? "Using" : "Not using"); return is_valid_; } diff --git a/extensions/rocksdb-repos/FlowFileRepository.cpp b/extensions/rocksdb-repos/FlowFileRepository.cpp index 5d3f2fefeb..783e747056 100644 --- a/extensions/rocksdb-repos/FlowFileRepository.cpp +++ b/extensions/rocksdb-repos/FlowFileRepository.cpp @@ -212,7 +212,7 @@ bool FlowFileRepository::initialize(const std::shared_ptr &configure) const auto encrypted_env = createEncryptingEnv(utils::crypto::EncryptionManager{configure->getHome()}, DbEncryptionOptions{directory_, ENCRYPTION_KEY_NAME}); logger_->log_info("Using {} FlowFileRepository", encrypted_env ? "encrypted" : "plaintext"); - verify_checksums_in_rocksdb_reads_ = configure->get(Configure::nifi_rocksdb_read_verify_checksums).value_or("false") == "true"; + verify_checksums_in_rocksdb_reads_ = configure->get(Configure::nifi_flowfile_repository_rocksdb_read_verify_checksums).value_or("false") == "true"; logger_->log_debug("{} checksum verification in FlowFileRepository", verify_checksums_in_rocksdb_reads_ ? "Using" : "Not using"); auto db_options = [encrypted_env] (minifi::internal::Writable& options) { diff --git a/extensions/rocksdb-repos/ProvenanceRepository.cpp b/extensions/rocksdb-repos/ProvenanceRepository.cpp index df1a9db80c..90e035506b 100644 --- a/extensions/rocksdb-repos/ProvenanceRepository.cpp +++ b/extensions/rocksdb-repos/ProvenanceRepository.cpp @@ -39,7 +39,7 @@ bool ProvenanceRepository::initialize(const std::shared_ptrlog_debug("MiNiFi Provenance Max Storage Time: [{}]", max_partition_millis_); - verify_checksums_in_rocksdb_reads_ = config->get(Configure::nifi_rocksdb_read_verify_checksums).value_or("false") == "true"; + verify_checksums_in_rocksdb_reads_ = config->get(Configure::nifi_provenance_repository_rocksdb_read_verify_checksums).value_or("false") == "true"; logger_->log_debug("{} checksum verification in ProvenanceRepository", verify_checksums_in_rocksdb_reads_ ? "Using" : "Not using"); auto db_options = [] (minifi::internal::Writable& db_opts) { diff --git a/extensions/rocksdb-repos/controllers/RocksDbStateStorage.cpp b/extensions/rocksdb-repos/controllers/RocksDbStateStorage.cpp index a086c3bca9..c4855b6281 100644 --- a/extensions/rocksdb-repos/controllers/RocksDbStateStorage.cpp +++ b/extensions/rocksdb-repos/controllers/RocksDbStateStorage.cpp @@ -93,7 +93,7 @@ void RocksDbStateStorage::onEnable() { default_write_options.sync = true; } - verify_checksums_in_rocksdb_reads_ = configuration_->get(Configure::nifi_rocksdb_read_verify_checksums).value_or("false") == "true"; + verify_checksums_in_rocksdb_reads_ = configuration_->get(Configure::nifi_rocksdb_state_storage_read_verify_checksums).value_or("false") == "true"; logger_->log_trace("Enabled RocksDbStateStorage"); } diff --git a/libminifi/include/properties/Configuration.h b/libminifi/include/properties/Configuration.h index f8d38bd8e0..f5b7bc41bc 100644 --- a/libminifi/include/properties/Configuration.h +++ b/libminifi/include/properties/Configuration.h @@ -78,7 +78,10 @@ class Configuration : public Properties { static constexpr const char *nifi_dbcontent_repository_rocksdb_compaction_period = "nifi.database.content.repository.rocksdb.compaction.period"; static constexpr const char *nifi_dbcontent_repository_purge_period = "nifi.database.content.repository.purge.period"; static constexpr const char *nifi_content_repository_rocksdb_use_synchronous_writes = "nifi.content.repository.rocksdb.use.synchronous.writes"; - static constexpr const char *nifi_rocksdb_read_verify_checksums = "nifi.rocksdb.read.verify.checksums"; + static constexpr const char *nifi_content_repository_rocksdb_read_verify_checksums = "nifi.content.repository.rocksdb.read.verify.checksums"; + static constexpr const char *nifi_flowfile_repository_rocksdb_read_verify_checksums = "nifi.flowfile.repository.rocksdb.read.verify.checksums"; + static constexpr const char *nifi_provenance_repository_rocksdb_read_verify_checksums = "nifi.provenance.repository.rocksdb.read.verify.checksums"; + static constexpr const char *nifi_rocksdb_state_storage_read_verify_checksums = "nifi.rocksdb.state.storage.read.verify.checksums"; static constexpr const char *nifi_remote_input_secure = "nifi.remote.input.secure"; static constexpr const char *nifi_security_need_ClientAuth = "nifi.security.need.ClientAuth"; diff --git a/libminifi/src/Configuration.cpp b/libminifi/src/Configuration.cpp index 442d4a4175..df50a7d728 100644 --- a/libminifi/src/Configuration.cpp +++ b/libminifi/src/Configuration.cpp @@ -54,7 +54,10 @@ const std::unordered_map Date: Thu, 14 Nov 2024 15:04:04 +0100 Subject: [PATCH 3/3] Review update --- extensions/rocksdb-repos/DatabaseContentRepository.cpp | 2 +- extensions/rocksdb-repos/FlowFileRepository.cpp | 2 +- extensions/rocksdb-repos/ProvenanceRepository.cpp | 2 +- extensions/rocksdb-repos/controllers/RocksDbStateStorage.cpp | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp b/extensions/rocksdb-repos/DatabaseContentRepository.cpp index f7d75aa766..262f3aef55 100644 --- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp +++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp @@ -82,7 +82,7 @@ bool DatabaseContentRepository::initialize(const std::shared_ptrget(Configure::nifi_content_repository_rocksdb_use_synchronous_writes).value_or("true") != "false"; - verify_checksums_in_rocksdb_reads_ = configuration->get(Configure::nifi_content_repository_rocksdb_read_verify_checksums).value_or("false") == "true"; + verify_checksums_in_rocksdb_reads_ = (configuration->get(Configure::nifi_content_repository_rocksdb_read_verify_checksums) | utils::andThen(&utils::string::toBool)).value_or(false); logger_->log_debug("{} checksum verification in DatabaseContentRepository", verify_checksums_in_rocksdb_reads_ ? "Using" : "Not using"); return is_valid_; } diff --git a/extensions/rocksdb-repos/FlowFileRepository.cpp b/extensions/rocksdb-repos/FlowFileRepository.cpp index 783e747056..b53ec26540 100644 --- a/extensions/rocksdb-repos/FlowFileRepository.cpp +++ b/extensions/rocksdb-repos/FlowFileRepository.cpp @@ -212,7 +212,7 @@ bool FlowFileRepository::initialize(const std::shared_ptr &configure) const auto encrypted_env = createEncryptingEnv(utils::crypto::EncryptionManager{configure->getHome()}, DbEncryptionOptions{directory_, ENCRYPTION_KEY_NAME}); logger_->log_info("Using {} FlowFileRepository", encrypted_env ? "encrypted" : "plaintext"); - verify_checksums_in_rocksdb_reads_ = configure->get(Configure::nifi_flowfile_repository_rocksdb_read_verify_checksums).value_or("false") == "true"; + verify_checksums_in_rocksdb_reads_ = (configure->get(Configure::nifi_flowfile_repository_rocksdb_read_verify_checksums) | utils::andThen(&utils::string::toBool)).value_or(false); logger_->log_debug("{} checksum verification in FlowFileRepository", verify_checksums_in_rocksdb_reads_ ? "Using" : "Not using"); auto db_options = [encrypted_env] (minifi::internal::Writable& options) { diff --git a/extensions/rocksdb-repos/ProvenanceRepository.cpp b/extensions/rocksdb-repos/ProvenanceRepository.cpp index 90e035506b..5afe75a4b6 100644 --- a/extensions/rocksdb-repos/ProvenanceRepository.cpp +++ b/extensions/rocksdb-repos/ProvenanceRepository.cpp @@ -39,7 +39,7 @@ bool ProvenanceRepository::initialize(const std::shared_ptrlog_debug("MiNiFi Provenance Max Storage Time: [{}]", max_partition_millis_); - verify_checksums_in_rocksdb_reads_ = config->get(Configure::nifi_provenance_repository_rocksdb_read_verify_checksums).value_or("false") == "true"; + verify_checksums_in_rocksdb_reads_ = (config->get(Configure::nifi_provenance_repository_rocksdb_read_verify_checksums) | utils::andThen(&utils::string::toBool)).value_or(false); logger_->log_debug("{} checksum verification in ProvenanceRepository", verify_checksums_in_rocksdb_reads_ ? "Using" : "Not using"); auto db_options = [] (minifi::internal::Writable& db_opts) { diff --git a/extensions/rocksdb-repos/controllers/RocksDbStateStorage.cpp b/extensions/rocksdb-repos/controllers/RocksDbStateStorage.cpp index c4855b6281..ceac73e706 100644 --- a/extensions/rocksdb-repos/controllers/RocksDbStateStorage.cpp +++ b/extensions/rocksdb-repos/controllers/RocksDbStateStorage.cpp @@ -93,7 +93,7 @@ void RocksDbStateStorage::onEnable() { default_write_options.sync = true; } - verify_checksums_in_rocksdb_reads_ = configuration_->get(Configure::nifi_rocksdb_state_storage_read_verify_checksums).value_or("false") == "true"; + verify_checksums_in_rocksdb_reads_ = (configuration_->get(Configure::nifi_rocksdb_state_storage_read_verify_checksums) | utils::andThen(&utils::string::toBool)).value_or(false); logger_->log_trace("Enabled RocksDbStateStorage"); }