Skip to content

Commit

Permalink
MINIFICPP-2401 Make RocksDB verify_checksums read option configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
lordgamez authored and szaszm committed Nov 21, 2024
1 parent cc072e5 commit e7a7241
Show file tree
Hide file tree
Showing 16 changed files with 80 additions and 19 deletions.
10 changes: 10 additions & 0 deletions CONFIGURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,16 @@ RocksDB has an option to set synchronous writes for its database, ensuring that
# in minifi.properties
nifi.content.repository.rocksdb.use.synchronous.writes=true

### Configuring checksum verification for RocksDB reads

RocksDB has an option to verify checksums for its database reads. This option is set to false by default for better performance. If you prefer to enable checksum verification you can set this option to true.

# in minifi.properties
nifi.content.repository.rocksdb.read.verify.checksums=false
nifi.flowfile.repository.rocksdb.read.verify.checksums=false
nifi.provenance.repository.rocksdb.read.verify.checksums=false
nifi.rocksdb.state.storage.read.verify.checksums=false

### Global RocksDB options

There are a few options for RocksDB that are set for all used RocksDB databases in MiNiFi:
Expand Down
6 changes: 6 additions & 0 deletions conf/minifi.properties
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ nifi.content.repository.class.name=DatabaseContentRepository
# Use synchronous writes for the RocksDB content repository. Disable for better write performance, if data loss is acceptable in case of the host crashing.
# nifi.content.repository.rocksdb.use.synchronous.writes=true

# Verify checksum of the data read from a RocksDB repository. Disabled by default for better read performance.
# nifi.content.repository.rocksdb.read.verify.checksums=false
# nifi.flowfile.repository.rocksdb.read.verify.checksums=false
# nifi.provenance.repository.rocksdb.read.verify.checksums=false
# nifi.rocksdb.state.storage.read.verify.checksums=false

## Relates to the internal workings of the rocksdb backend
# nifi.flowfile.repository.rocksdb.compaction.period=2 min
# nifi.database.content.repository.rocksdb.compaction.period=2 min
Expand Down
14 changes: 10 additions & 4 deletions extensions/rocksdb-repos/DatabaseContentRepository.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ bool DatabaseContentRepository::initialize(const std::shared_ptr<minifi::Configu
}

use_synchronous_writes_ = configuration->get(Configure::nifi_content_repository_rocksdb_use_synchronous_writes).value_or("true") != "false";
verify_checksums_in_rocksdb_reads_ = (configuration->get(Configure::nifi_content_repository_rocksdb_read_verify_checksums) | utils::andThen(&utils::string::toBool)).value_or(false);
logger_->log_debug("{} checksum verification in DatabaseContentRepository", verify_checksums_in_rocksdb_reads_ ? "Using" : "Not using");
return is_valid_;
}

Expand Down Expand Up @@ -195,7 +197,7 @@ std::shared_ptr<io::BaseStream> DatabaseContentRepository::read(const minifi::Re
// we can simply return a nullptr, which is also valid from the API when this stream is not valid.
if (!is_valid_ || !db_)
return nullptr;
return std::make_shared<io::RocksDbStream>(claim.getContentFullPath(), gsl::make_not_null<minifi::internal::RocksDatabase*>(db_.get()), false);
return std::make_shared<io::RocksDbStream>(claim.getContentFullPath(), gsl::make_not_null<minifi::internal::RocksDatabase*>(db_.get()), false, nullptr, true, verify_checksums_in_rocksdb_reads_);
}

bool DatabaseContentRepository::exists(const minifi::ResourceClaim &streamId) {
Expand All @@ -205,7 +207,9 @@ bool DatabaseContentRepository::exists(const minifi::ResourceClaim &streamId) {
}
std::string value;
rocksdb::Status status;
status = opendb->Get(rocksdb::ReadOptions(), streamId.getContentFullPath(), &value);
rocksdb::ReadOptions options;
options.verify_checksums = verify_checksums_in_rocksdb_reads_;
status = opendb->Get(options, streamId.getContentFullPath(), &value);
if (status.ok()) {
logger_->log_debug("{} exists", streamId.getContentFullPath());
return true;
Expand Down Expand Up @@ -288,7 +292,7 @@ std::shared_ptr<io::BaseStream> DatabaseContentRepository::write(const minifi::R
if (!is_valid_ || !db_)
return nullptr;
// append is already supported in all modes
return std::make_shared<io::RocksDbStream>(claim.getContentFullPath(), gsl::make_not_null<minifi::internal::RocksDatabase*>(db_.get()), true, batch);
return std::make_shared<io::RocksDbStream>(claim.getContentFullPath(), gsl::make_not_null<minifi::internal::RocksDatabase*>(db_.get()), true, batch, true, verify_checksums_in_rocksdb_reads_);
}

void DatabaseContentRepository::clearOrphans() {
Expand All @@ -302,7 +306,9 @@ void DatabaseContentRepository::clearOrphans() {
return;
}
std::vector<std::string> keys_to_be_deleted;
auto it = opendb->NewIterator(rocksdb::ReadOptions());
rocksdb::ReadOptions options;
options.verify_checksums = verify_checksums_in_rocksdb_reads_;
auto it = opendb->NewIterator(options);
for (it->SeekToFirst(); it->Valid(); it->Next()) {
auto key = it->key().ToString();
std::lock_guard<std::mutex> lock(count_map_mutex_);
Expand Down
1 change: 1 addition & 0 deletions extensions/rocksdb-repos/DatabaseContentRepository.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ class DatabaseContentRepository : public core::ContentRepository {
std::vector<std::string> keys_to_delete_;
std::unique_ptr<utils::StoppableThread> gc_thread_;
bool use_synchronous_writes_ = true;
bool verify_checksums_in_rocksdb_reads_ = false;
};

} // namespace org::apache::nifi::minifi::core::repository
6 changes: 4 additions & 2 deletions extensions/rocksdb-repos/FlowFileLoader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@

namespace org::apache::nifi::minifi {

FlowFileLoader::FlowFileLoader(gsl::not_null<minifi::internal::RocksDatabase*> db, std::shared_ptr<core::ContentRepository> content_repo)
FlowFileLoader::FlowFileLoader(gsl::not_null<minifi::internal::RocksDatabase*> db, std::shared_ptr<core::ContentRepository> content_repo, bool verify_checksums_in_rocksdb_reads)
: db_(db),
content_repo_(std::move(content_repo)),
logger_(core::logging::LoggerFactory<FlowFileLoader>::getLogger()) {}
logger_(core::logging::LoggerFactory<FlowFileLoader>::getLogger()),
verify_checksums_in_rocksdb_reads_(verify_checksums_in_rocksdb_reads) {}

FlowFileLoader::~FlowFileLoader() {
stop();
Expand Down Expand Up @@ -74,6 +75,7 @@ utils::TaskRescheduleInfo FlowFileLoader::loadImpl(const std::vector<SwappedFlow
FlowFilePtrVec result;
result.reserve(flow_files.size());
rocksdb::ReadOptions read_options;
read_options.verify_checksums = verify_checksums_in_rocksdb_reads_;
std::vector<utils::SmallString<36>> serialized_keys;
serialized_keys.reserve(flow_files.size());
for (const auto& item : flow_files) {
Expand Down
3 changes: 2 additions & 1 deletion extensions/rocksdb-repos/FlowFileLoader.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class FlowFileLoader {
static constexpr size_t thread_count_ = 1;

public:
FlowFileLoader(gsl::not_null<minifi::internal::RocksDatabase*> db, std::shared_ptr<core::ContentRepository> content_repo);
FlowFileLoader(gsl::not_null<minifi::internal::RocksDatabase*> db, std::shared_ptr<core::ContentRepository> content_repo, bool verify_checksums_in_rocksdb_reads);

~FlowFileLoader();

Expand All @@ -60,6 +60,7 @@ class FlowFileLoader {
// this ownership could be removed if that changes
std::shared_ptr<core::ContentRepository> content_repo_;
std::shared_ptr<core::logging::Logger> logger_;
bool verify_checksums_in_rocksdb_reads_ = false;
};

} // namespace org::apache::nifi::minifi
13 changes: 10 additions & 3 deletions extensions/rocksdb-repos/FlowFileRepository.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ void FlowFileRepository::deserializeFlowFilesWithNoContentClaim(minifi::internal
return;
}
std::vector<std::string> values;
auto multistatus = opendb.MultiGet(rocksdb::ReadOptions{}, keys, &values);
rocksdb::ReadOptions options;
options.verify_checksums = verify_checksums_in_rocksdb_reads_;
auto multistatus = opendb.MultiGet(options, keys, &values);
gsl_Expects(keys.size() == values.size() && values.size() == multistatus.size());

for (size_t i = 0; i < keys.size(); ++i) {
Expand Down Expand Up @@ -142,7 +144,9 @@ void FlowFileRepository::initialize_repository() {
}
logger_->log_info("Reading existing flow files from database");

const auto it = opendb->NewIterator(rocksdb::ReadOptions());
rocksdb::ReadOptions options;
options.verify_checksums = verify_checksums_in_rocksdb_reads_;
const auto it = opendb->NewIterator(options);
for (it->SeekToFirst(); it->Valid(); it->Next()) {
utils::Identifier container_id;
auto eventRead = FlowFileRecord::DeSerialize(gsl::make_span(it->value()).as_span<const std::byte>(), content_repo_, container_id);
Expand Down Expand Up @@ -180,7 +184,7 @@ void FlowFileRepository::initialize_repository() {

void FlowFileRepository::loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo) {
content_repo_ = content_repo;
swap_loader_ = std::make_unique<FlowFileLoader>(gsl::make_not_null(db_.get()), content_repo_);
swap_loader_ = std::make_unique<FlowFileLoader>(gsl::make_not_null(db_.get()), content_repo_, verify_checksums_in_rocksdb_reads_);

initialize_repository();
}
Expand Down Expand Up @@ -208,6 +212,9 @@ bool FlowFileRepository::initialize(const std::shared_ptr<Configure> &configure)
const auto encrypted_env = createEncryptingEnv(utils::crypto::EncryptionManager{configure->getHome()}, DbEncryptionOptions{directory_, ENCRYPTION_KEY_NAME});
logger_->log_info("Using {} FlowFileRepository", encrypted_env ? "encrypted" : "plaintext");

verify_checksums_in_rocksdb_reads_ = (configure->get(Configure::nifi_flowfile_repository_rocksdb_read_verify_checksums) | utils::andThen(&utils::string::toBool)).value_or(false);
logger_->log_debug("{} checksum verification in FlowFileRepository", verify_checksums_in_rocksdb_reads_ ? "Using" : "Not using");

auto db_options = [encrypted_env] (minifi::internal::Writable<rocksdb::DBOptions>& options) {
minifi::internal::setCommonRocksDbOptions(options);
if (encrypted_env) {
Expand Down
7 changes: 6 additions & 1 deletion extensions/rocksdb-repos/ProvenanceRepository.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ bool ProvenanceRepository::initialize(const std::shared_ptr<org::apache::nifi::m
}
logger_->log_debug("MiNiFi Provenance Max Storage Time: [{}]", max_partition_millis_);

verify_checksums_in_rocksdb_reads_ = (config->get(Configure::nifi_provenance_repository_rocksdb_read_verify_checksums) | utils::andThen(&utils::string::toBool)).value_or(false);
logger_->log_debug("{} checksum verification in ProvenanceRepository", verify_checksums_in_rocksdb_reads_ ? "Using" : "Not using");

auto db_options = [] (minifi::internal::Writable<rocksdb::DBOptions>& db_opts) {
minifi::internal::setCommonRocksDbOptions(db_opts);
};
Expand Down Expand Up @@ -75,7 +78,9 @@ bool ProvenanceRepository::getElements(std::vector<std::shared_ptr<core::Seriali
if (!opendb) {
return false;
}
std::unique_ptr<rocksdb::Iterator> it(opendb->NewIterator(rocksdb::ReadOptions()));
rocksdb::ReadOptions options;
options.verify_checksums = verify_checksums_in_rocksdb_reads_;
std::unique_ptr<rocksdb::Iterator> it(opendb->NewIterator(options));
size_t requested_batch = max_size;
max_size = 0;
for (it->SeekToFirst(); it->Valid(); it->Next()) {
Expand Down
4 changes: 3 additions & 1 deletion extensions/rocksdb-repos/RocksDbRepository.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ bool RocksDbRepository::Get(const std::string &key, std::string &value) {
if (!opendb) {
return false;
}
return opendb->Get(rocksdb::ReadOptions(), key, &value).ok();
rocksdb::ReadOptions options;
options.verify_checksums = verify_checksums_in_rocksdb_reads_;
return opendb->Get(options, key, &value).ok();
}

uint64_t RocksDbRepository::getRepositorySize() const {
Expand Down
1 change: 1 addition & 0 deletions extensions/rocksdb-repos/RocksDbRepository.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class RocksDbRepository : public ThreadedRepository {
std::unique_ptr<minifi::internal::RocksDatabase> db_;
std::shared_ptr<logging::Logger> logger_;
std::thread thread_;
bool verify_checksums_in_rocksdb_reads_ = false;
};

} // namespace org::apache::nifi::minifi::core::repository
9 changes: 6 additions & 3 deletions extensions/rocksdb-repos/RocksDbStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,17 @@

namespace org::apache::nifi::minifi::io {

RocksDbStream::RocksDbStream(std::string path, gsl::not_null<minifi::internal::RocksDatabase*> db, bool write_enable, minifi::internal::WriteBatch* batch, bool use_synchronous_writes)
RocksDbStream::RocksDbStream(std::string path, gsl::not_null<minifi::internal::RocksDatabase*> db, bool write_enable, minifi::internal::WriteBatch* batch,
bool use_synchronous_writes, bool verify_checksums)
: BaseStream(),
path_(std::move(path)),
write_enable_(write_enable),
db_(db),
exists_([this] {
exists_([this, verify_checksums] {
auto opendb = db_->open();
return opendb && opendb->Get(rocksdb::ReadOptions(), path_, &value_).ok();
rocksdb::ReadOptions options;
options.verify_checksums = verify_checksums;
return opendb && opendb->Get(options, path_, &value_).ok();
}()),
offset_(0),
batch_(batch),
Expand Down
2 changes: 1 addition & 1 deletion extensions/rocksdb-repos/RocksDbStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class RocksDbStream : public io::BaseStream {
* It must already be initialized for read and write.
*/
explicit RocksDbStream(std::string path, gsl::not_null<minifi::internal::RocksDatabase*> db, bool write_enable = false,
minifi::internal::WriteBatch* batch = nullptr, bool use_synchronous_writes = true);
minifi::internal::WriteBatch* batch = nullptr, bool use_synchronous_writes = true, bool verify_checksums = false);

~RocksDbStream() override {
close();
Expand Down
14 changes: 11 additions & 3 deletions extensions/rocksdb-repos/controllers/RocksDbStateStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ void RocksDbStateStorage::onEnable() {
default_write_options.sync = true;
}

verify_checksums_in_rocksdb_reads_ = (configuration_->get(Configure::nifi_rocksdb_state_storage_read_verify_checksums) | utils::andThen(&utils::string::toBool)).value_or(false);

logger_->log_trace("Enabled RocksDbStateStorage");
}

Expand Down Expand Up @@ -125,7 +127,9 @@ bool RocksDbStateStorage::get(const std::string& key, std::string& value) {
if (!opendb) {
return false;
}
rocksdb::Status status = opendb->Get(rocksdb::ReadOptions(), key, &value);
rocksdb::ReadOptions options;
options.verify_checksums = verify_checksums_in_rocksdb_reads_;
rocksdb::Status status = opendb->Get(options, key, &value);
if (!status.ok()) {
if (status.getState() != nullptr) {
logger_->log_error("Failed to Get key {} from RocksDB database at {}, error: {}", key.c_str(), directory_.c_str(), status.getState());
Expand All @@ -146,7 +150,9 @@ bool RocksDbStateStorage::get(std::unordered_map<std::string, std::string>& kvs)
return false;
}
kvs.clear();
auto it = opendb->NewIterator(rocksdb::ReadOptions());
rocksdb::ReadOptions options;
options.verify_checksums = verify_checksums_in_rocksdb_reads_;
auto it = opendb->NewIterator(options);
for (it->SeekToFirst(); it->Valid(); it->Next()) {
kvs.emplace(it->key().ToString(), it->value().ToString());
}
Expand Down Expand Up @@ -181,7 +187,9 @@ bool RocksDbStateStorage::clear() {
if (!opendb) {
return false;
}
auto it = opendb->NewIterator(rocksdb::ReadOptions());
rocksdb::ReadOptions options;
options.verify_checksums = verify_checksums_in_rocksdb_reads_;
auto it = opendb->NewIterator(options);
for (it->SeekToFirst(); it->Valid(); it->Next()) {
rocksdb::Status status = opendb->Delete(default_write_options, it->key());
if (!status.ok()) {
Expand Down
1 change: 1 addition & 0 deletions extensions/rocksdb-repos/controllers/RocksDbStateStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class RocksDbStateStorage : public KeyValueStateStorage {
std::unique_ptr<minifi::internal::RocksDatabase> db_;
rocksdb::WriteOptions default_write_options;
AutoPersistor auto_persistor_;
bool verify_checksums_in_rocksdb_reads_ = false;

std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<RocksDbStateStorage>::getLogger();
};
Expand Down
4 changes: 4 additions & 0 deletions libminifi/include/properties/Configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ class Configuration : public Properties {
static constexpr const char *nifi_dbcontent_repository_rocksdb_compaction_period = "nifi.database.content.repository.rocksdb.compaction.period";
static constexpr const char *nifi_dbcontent_repository_purge_period = "nifi.database.content.repository.purge.period";
static constexpr const char *nifi_content_repository_rocksdb_use_synchronous_writes = "nifi.content.repository.rocksdb.use.synchronous.writes";
static constexpr const char *nifi_content_repository_rocksdb_read_verify_checksums = "nifi.content.repository.rocksdb.read.verify.checksums";
static constexpr const char *nifi_flowfile_repository_rocksdb_read_verify_checksums = "nifi.flowfile.repository.rocksdb.read.verify.checksums";
static constexpr const char *nifi_provenance_repository_rocksdb_read_verify_checksums = "nifi.provenance.repository.rocksdb.read.verify.checksums";
static constexpr const char *nifi_rocksdb_state_storage_read_verify_checksums = "nifi.rocksdb.state.storage.read.verify.checksums";

static constexpr const char *nifi_remote_input_secure = "nifi.remote.input.secure";
static constexpr const char *nifi_security_need_ClientAuth = "nifi.security.need.ClientAuth";
Expand Down
4 changes: 4 additions & 0 deletions libminifi/src/Configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ const std::unordered_map<std::string_view, gsl::not_null<const core::PropertyVal
{Configuration::nifi_flowfile_repository_rocksdb_compaction_period, gsl::make_not_null(&core::StandardPropertyTypes::TIME_PERIOD_TYPE)},
{Configuration::nifi_dbcontent_repository_rocksdb_compaction_period, gsl::make_not_null(&core::StandardPropertyTypes::TIME_PERIOD_TYPE)},
{Configuration::nifi_content_repository_rocksdb_use_synchronous_writes, gsl::make_not_null(&core::StandardPropertyTypes::BOOLEAN_TYPE)},
{Configuration::nifi_content_repository_rocksdb_read_verify_checksums, gsl::make_not_null(&core::StandardPropertyTypes::BOOLEAN_TYPE)},
{Configuration::nifi_flowfile_repository_rocksdb_read_verify_checksums, gsl::make_not_null(&core::StandardPropertyTypes::BOOLEAN_TYPE)},
{Configuration::nifi_provenance_repository_rocksdb_read_verify_checksums, gsl::make_not_null(&core::StandardPropertyTypes::BOOLEAN_TYPE)},
{Configuration::nifi_rocksdb_state_storage_read_verify_checksums, gsl::make_not_null(&core::StandardPropertyTypes::BOOLEAN_TYPE)},
{Configuration::nifi_dbcontent_repository_purge_period, gsl::make_not_null(&core::StandardPropertyTypes::TIME_PERIOD_TYPE)},
{Configuration::nifi_remote_input_secure, gsl::make_not_null(&core::StandardPropertyTypes::BOOLEAN_TYPE)},
{Configuration::nifi_security_need_ClientAuth, gsl::make_not_null(&core::StandardPropertyTypes::BOOLEAN_TYPE)},
Expand Down

0 comments on commit e7a7241

Please sign in to comment.