Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINIFICPP-2401 Make RocksDB verify_checksums read option configurable #1892

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CONFIGURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,16 @@ RocksDB has an option to set synchronous writes for its database, ensuring that
# in minifi.properties
nifi.content.repository.rocksdb.use.synchronous.writes=true

### Configuring checksum verification for RocksDB reads

RocksDB has an option to verify checksums for its database reads. This option is set to false by default for better performance. If you prefer to enable checksum verification you can set this option to true.

# in minifi.properties
nifi.content.repository.rocksdb.read.verify.checksums=false
nifi.flowfile.repository.rocksdb.read.verify.checksums=false
nifi.provenance.repository.rocksdb.read.verify.checksums=false
nifi.rocksdb.state.storage.read.verify.checksums=false

### Global RocksDB options

There are a few options for RocksDB that are set for all used RocksDB databases in MiNiFi:
Expand Down
6 changes: 6 additions & 0 deletions conf/minifi.properties
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ nifi.content.repository.class.name=DatabaseContentRepository
# Use synchronous writes for the RocksDB content repository. Disable for better write performance, if data loss is acceptable in case of the host crashing.
# nifi.content.repository.rocksdb.use.synchronous.writes=true

# Verify checksum of the data read from a RocksDB repository. Disabled by default for better read performance.
# nifi.content.repository.rocksdb.read.verify.checksums=false
# nifi.flowfile.repository.rocksdb.read.verify.checksums=false
# nifi.provenance.repository.rocksdb.read.verify.checksums=false
# nifi.rocksdb.state.storage.read.verify.checksums=false

## Relates to the internal workings of the rocksdb backend
# nifi.flowfile.repository.rocksdb.compaction.period=2 min
# nifi.database.content.repository.rocksdb.compaction.period=2 min
Expand Down
14 changes: 10 additions & 4 deletions extensions/rocksdb-repos/DatabaseContentRepository.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ bool DatabaseContentRepository::initialize(const std::shared_ptr<minifi::Configu
}

use_synchronous_writes_ = configuration->get(Configure::nifi_content_repository_rocksdb_use_synchronous_writes).value_or("true") != "false";
verify_checksums_in_rocksdb_reads_ = (configuration->get(Configure::nifi_content_repository_rocksdb_read_verify_checksums) | utils::andThen(&utils::string::toBool)).value_or(false);
logger_->log_debug("{} checksum verification in DatabaseContentRepository", verify_checksums_in_rocksdb_reads_ ? "Using" : "Not using");
return is_valid_;
}

Expand Down Expand Up @@ -195,7 +197,7 @@ std::shared_ptr<io::BaseStream> DatabaseContentRepository::read(const minifi::Re
// we can simply return a nullptr, which is also valid from the API when this stream is not valid.
if (!is_valid_ || !db_)
return nullptr;
return std::make_shared<io::RocksDbStream>(claim.getContentFullPath(), gsl::make_not_null<minifi::internal::RocksDatabase*>(db_.get()), false);
return std::make_shared<io::RocksDbStream>(claim.getContentFullPath(), gsl::make_not_null<minifi::internal::RocksDatabase*>(db_.get()), false, nullptr, true, verify_checksums_in_rocksdb_reads_);
}

bool DatabaseContentRepository::exists(const minifi::ResourceClaim &streamId) {
Expand All @@ -205,7 +207,9 @@ bool DatabaseContentRepository::exists(const minifi::ResourceClaim &streamId) {
}
std::string value;
rocksdb::Status status;
status = opendb->Get(rocksdb::ReadOptions(), streamId.getContentFullPath(), &value);
rocksdb::ReadOptions options;
options.verify_checksums = verify_checksums_in_rocksdb_reads_;
status = opendb->Get(options, streamId.getContentFullPath(), &value);
if (status.ok()) {
logger_->log_debug("{} exists", streamId.getContentFullPath());
return true;
Expand Down Expand Up @@ -288,7 +292,7 @@ std::shared_ptr<io::BaseStream> DatabaseContentRepository::write(const minifi::R
if (!is_valid_ || !db_)
return nullptr;
// append is already supported in all modes
return std::make_shared<io::RocksDbStream>(claim.getContentFullPath(), gsl::make_not_null<minifi::internal::RocksDatabase*>(db_.get()), true, batch);
return std::make_shared<io::RocksDbStream>(claim.getContentFullPath(), gsl::make_not_null<minifi::internal::RocksDatabase*>(db_.get()), true, batch, true, verify_checksums_in_rocksdb_reads_);
}

void DatabaseContentRepository::clearOrphans() {
Expand All @@ -302,7 +306,9 @@ void DatabaseContentRepository::clearOrphans() {
return;
}
std::vector<std::string> keys_to_be_deleted;
auto it = opendb->NewIterator(rocksdb::ReadOptions());
rocksdb::ReadOptions options;
options.verify_checksums = verify_checksums_in_rocksdb_reads_;
auto it = opendb->NewIterator(options);
for (it->SeekToFirst(); it->Valid(); it->Next()) {
auto key = it->key().ToString();
std::lock_guard<std::mutex> lock(count_map_mutex_);
Expand Down
1 change: 1 addition & 0 deletions extensions/rocksdb-repos/DatabaseContentRepository.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ class DatabaseContentRepository : public core::ContentRepository {
std::vector<std::string> keys_to_delete_;
std::unique_ptr<utils::StoppableThread> gc_thread_;
bool use_synchronous_writes_ = true;
bool verify_checksums_in_rocksdb_reads_ = false;
};

} // namespace org::apache::nifi::minifi::core::repository
6 changes: 4 additions & 2 deletions extensions/rocksdb-repos/FlowFileLoader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@

namespace org::apache::nifi::minifi {

FlowFileLoader::FlowFileLoader(gsl::not_null<minifi::internal::RocksDatabase*> db, std::shared_ptr<core::ContentRepository> content_repo)
FlowFileLoader::FlowFileLoader(gsl::not_null<minifi::internal::RocksDatabase*> db, std::shared_ptr<core::ContentRepository> content_repo, bool verify_checksums_in_rocksdb_reads)
: db_(db),
content_repo_(std::move(content_repo)),
logger_(core::logging::LoggerFactory<FlowFileLoader>::getLogger()) {}
logger_(core::logging::LoggerFactory<FlowFileLoader>::getLogger()),
verify_checksums_in_rocksdb_reads_(verify_checksums_in_rocksdb_reads) {}

FlowFileLoader::~FlowFileLoader() {
stop();
Expand Down Expand Up @@ -74,6 +75,7 @@ utils::TaskRescheduleInfo FlowFileLoader::loadImpl(const std::vector<SwappedFlow
FlowFilePtrVec result;
result.reserve(flow_files.size());
rocksdb::ReadOptions read_options;
read_options.verify_checksums = verify_checksums_in_rocksdb_reads_;
std::vector<utils::SmallString<36>> serialized_keys;
serialized_keys.reserve(flow_files.size());
for (const auto& item : flow_files) {
Expand Down
3 changes: 2 additions & 1 deletion extensions/rocksdb-repos/FlowFileLoader.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class FlowFileLoader {
static constexpr size_t thread_count_ = 1;

public:
FlowFileLoader(gsl::not_null<minifi::internal::RocksDatabase*> db, std::shared_ptr<core::ContentRepository> content_repo);
FlowFileLoader(gsl::not_null<minifi::internal::RocksDatabase*> db, std::shared_ptr<core::ContentRepository> content_repo, bool verify_checksums_in_rocksdb_reads);

~FlowFileLoader();

Expand All @@ -60,6 +60,7 @@ class FlowFileLoader {
// this ownership could be removed if that changes
std::shared_ptr<core::ContentRepository> content_repo_;
std::shared_ptr<core::logging::Logger> logger_;
bool verify_checksums_in_rocksdb_reads_ = false;
};

} // namespace org::apache::nifi::minifi
13 changes: 10 additions & 3 deletions extensions/rocksdb-repos/FlowFileRepository.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ void FlowFileRepository::deserializeFlowFilesWithNoContentClaim(minifi::internal
return;
}
std::vector<std::string> values;
auto multistatus = opendb.MultiGet(rocksdb::ReadOptions{}, keys, &values);
rocksdb::ReadOptions options;
options.verify_checksums = verify_checksums_in_rocksdb_reads_;
auto multistatus = opendb.MultiGet(options, keys, &values);
gsl_Expects(keys.size() == values.size() && values.size() == multistatus.size());

for (size_t i = 0; i < keys.size(); ++i) {
Expand Down Expand Up @@ -142,7 +144,9 @@ void FlowFileRepository::initialize_repository() {
}
logger_->log_info("Reading existing flow files from database");

const auto it = opendb->NewIterator(rocksdb::ReadOptions());
rocksdb::ReadOptions options;
options.verify_checksums = verify_checksums_in_rocksdb_reads_;
const auto it = opendb->NewIterator(options);
szaszm marked this conversation as resolved.
Show resolved Hide resolved
for (it->SeekToFirst(); it->Valid(); it->Next()) {
utils::Identifier container_id;
auto eventRead = FlowFileRecord::DeSerialize(gsl::make_span(it->value()).as_span<const std::byte>(), content_repo_, container_id);
Expand Down Expand Up @@ -180,7 +184,7 @@ void FlowFileRepository::initialize_repository() {

void FlowFileRepository::loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo) {
content_repo_ = content_repo;
swap_loader_ = std::make_unique<FlowFileLoader>(gsl::make_not_null(db_.get()), content_repo_);
swap_loader_ = std::make_unique<FlowFileLoader>(gsl::make_not_null(db_.get()), content_repo_, verify_checksums_in_rocksdb_reads_);

initialize_repository();
}
Expand Down Expand Up @@ -208,6 +212,9 @@ bool FlowFileRepository::initialize(const std::shared_ptr<Configure> &configure)
const auto encrypted_env = createEncryptingEnv(utils::crypto::EncryptionManager{configure->getHome()}, DbEncryptionOptions{directory_, ENCRYPTION_KEY_NAME});
logger_->log_info("Using {} FlowFileRepository", encrypted_env ? "encrypted" : "plaintext");

verify_checksums_in_rocksdb_reads_ = (configure->get(Configure::nifi_flowfile_repository_rocksdb_read_verify_checksums) | utils::andThen(&utils::string::toBool)).value_or(false);
logger_->log_debug("{} checksum verification in FlowFileRepository", verify_checksums_in_rocksdb_reads_ ? "Using" : "Not using");

auto db_options = [encrypted_env] (minifi::internal::Writable<rocksdb::DBOptions>& options) {
minifi::internal::setCommonRocksDbOptions(options);
if (encrypted_env) {
Expand Down
7 changes: 6 additions & 1 deletion extensions/rocksdb-repos/ProvenanceRepository.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ bool ProvenanceRepository::initialize(const std::shared_ptr<org::apache::nifi::m
}
logger_->log_debug("MiNiFi Provenance Max Storage Time: [{}]", max_partition_millis_);

verify_checksums_in_rocksdb_reads_ = (config->get(Configure::nifi_provenance_repository_rocksdb_read_verify_checksums) | utils::andThen(&utils::string::toBool)).value_or(false);
logger_->log_debug("{} checksum verification in ProvenanceRepository", verify_checksums_in_rocksdb_reads_ ? "Using" : "Not using");

auto db_options = [] (minifi::internal::Writable<rocksdb::DBOptions>& db_opts) {
minifi::internal::setCommonRocksDbOptions(db_opts);
};
Expand Down Expand Up @@ -75,7 +78,9 @@ bool ProvenanceRepository::getElements(std::vector<std::shared_ptr<core::Seriali
if (!opendb) {
return false;
}
std::unique_ptr<rocksdb::Iterator> it(opendb->NewIterator(rocksdb::ReadOptions()));
rocksdb::ReadOptions options;
options.verify_checksums = verify_checksums_in_rocksdb_reads_;
std::unique_ptr<rocksdb::Iterator> it(opendb->NewIterator(options));
size_t requested_batch = max_size;
max_size = 0;
for (it->SeekToFirst(); it->Valid(); it->Next()) {
Expand Down
4 changes: 3 additions & 1 deletion extensions/rocksdb-repos/RocksDbRepository.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ bool RocksDbRepository::Get(const std::string &key, std::string &value) {
if (!opendb) {
return false;
}
return opendb->Get(rocksdb::ReadOptions(), key, &value).ok();
rocksdb::ReadOptions options;
options.verify_checksums = verify_checksums_in_rocksdb_reads_;
return opendb->Get(options, key, &value).ok();
}

uint64_t RocksDbRepository::getRepositorySize() const {
Expand Down
1 change: 1 addition & 0 deletions extensions/rocksdb-repos/RocksDbRepository.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class RocksDbRepository : public ThreadedRepository {
std::unique_ptr<minifi::internal::RocksDatabase> db_;
std::shared_ptr<logging::Logger> logger_;
std::thread thread_;
bool verify_checksums_in_rocksdb_reads_ = false;
};

} // namespace org::apache::nifi::minifi::core::repository
9 changes: 6 additions & 3 deletions extensions/rocksdb-repos/RocksDbStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,17 @@

namespace org::apache::nifi::minifi::io {

RocksDbStream::RocksDbStream(std::string path, gsl::not_null<minifi::internal::RocksDatabase*> db, bool write_enable, minifi::internal::WriteBatch* batch, bool use_synchronous_writes)
RocksDbStream::RocksDbStream(std::string path, gsl::not_null<minifi::internal::RocksDatabase*> db, bool write_enable, minifi::internal::WriteBatch* batch,
bool use_synchronous_writes, bool verify_checksums)
: BaseStream(),
path_(std::move(path)),
write_enable_(write_enable),
db_(db),
exists_([this] {
exists_([this, verify_checksums] {
auto opendb = db_->open();
return opendb && opendb->Get(rocksdb::ReadOptions(), path_, &value_).ok();
rocksdb::ReadOptions options;
options.verify_checksums = verify_checksums;
return opendb && opendb->Get(options, path_, &value_).ok();
}()),
offset_(0),
batch_(batch),
Expand Down
2 changes: 1 addition & 1 deletion extensions/rocksdb-repos/RocksDbStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class RocksDbStream : public io::BaseStream {
* It must already be initialized for read and write.
*/
explicit RocksDbStream(std::string path, gsl::not_null<minifi::internal::RocksDatabase*> db, bool write_enable = false,
minifi::internal::WriteBatch* batch = nullptr, bool use_synchronous_writes = true);
minifi::internal::WriteBatch* batch = nullptr, bool use_synchronous_writes = true, bool verify_checksums = false);

~RocksDbStream() override {
close();
Expand Down
14 changes: 11 additions & 3 deletions extensions/rocksdb-repos/controllers/RocksDbStateStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ void RocksDbStateStorage::onEnable() {
default_write_options.sync = true;
}

verify_checksums_in_rocksdb_reads_ = (configuration_->get(Configure::nifi_rocksdb_state_storage_read_verify_checksums) | utils::andThen(&utils::string::toBool)).value_or(false);

logger_->log_trace("Enabled RocksDbStateStorage");
}

Expand Down Expand Up @@ -125,7 +127,9 @@ bool RocksDbStateStorage::get(const std::string& key, std::string& value) {
if (!opendb) {
return false;
}
rocksdb::Status status = opendb->Get(rocksdb::ReadOptions(), key, &value);
rocksdb::ReadOptions options;
options.verify_checksums = verify_checksums_in_rocksdb_reads_;
rocksdb::Status status = opendb->Get(options, key, &value);
if (!status.ok()) {
if (status.getState() != nullptr) {
logger_->log_error("Failed to Get key {} from RocksDB database at {}, error: {}", key.c_str(), directory_.c_str(), status.getState());
Expand All @@ -146,7 +150,9 @@ bool RocksDbStateStorage::get(std::unordered_map<std::string, std::string>& kvs)
return false;
}
kvs.clear();
auto it = opendb->NewIterator(rocksdb::ReadOptions());
rocksdb::ReadOptions options;
options.verify_checksums = verify_checksums_in_rocksdb_reads_;
auto it = opendb->NewIterator(options);
for (it->SeekToFirst(); it->Valid(); it->Next()) {
kvs.emplace(it->key().ToString(), it->value().ToString());
}
Expand Down Expand Up @@ -181,7 +187,9 @@ bool RocksDbStateStorage::clear() {
if (!opendb) {
return false;
}
auto it = opendb->NewIterator(rocksdb::ReadOptions());
rocksdb::ReadOptions options;
options.verify_checksums = verify_checksums_in_rocksdb_reads_;
auto it = opendb->NewIterator(options);
for (it->SeekToFirst(); it->Valid(); it->Next()) {
rocksdb::Status status = opendb->Delete(default_write_options, it->key());
if (!status.ok()) {
Expand Down
1 change: 1 addition & 0 deletions extensions/rocksdb-repos/controllers/RocksDbStateStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class RocksDbStateStorage : public KeyValueStateStorage {
std::unique_ptr<minifi::internal::RocksDatabase> db_;
rocksdb::WriteOptions default_write_options;
AutoPersistor auto_persistor_;
bool verify_checksums_in_rocksdb_reads_ = false;

std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<RocksDbStateStorage>::getLogger();
};
Expand Down
4 changes: 4 additions & 0 deletions libminifi/include/properties/Configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ class Configuration : public Properties {
static constexpr const char *nifi_dbcontent_repository_rocksdb_compaction_period = "nifi.database.content.repository.rocksdb.compaction.period";
static constexpr const char *nifi_dbcontent_repository_purge_period = "nifi.database.content.repository.purge.period";
static constexpr const char *nifi_content_repository_rocksdb_use_synchronous_writes = "nifi.content.repository.rocksdb.use.synchronous.writes";
static constexpr const char *nifi_content_repository_rocksdb_read_verify_checksums = "nifi.content.repository.rocksdb.read.verify.checksums";
static constexpr const char *nifi_flowfile_repository_rocksdb_read_verify_checksums = "nifi.flowfile.repository.rocksdb.read.verify.checksums";
static constexpr const char *nifi_provenance_repository_rocksdb_read_verify_checksums = "nifi.provenance.repository.rocksdb.read.verify.checksums";
static constexpr const char *nifi_rocksdb_state_storage_read_verify_checksums = "nifi.rocksdb.state.storage.read.verify.checksums";

static constexpr const char *nifi_remote_input_secure = "nifi.remote.input.secure";
static constexpr const char *nifi_security_need_ClientAuth = "nifi.security.need.ClientAuth";
Expand Down
4 changes: 4 additions & 0 deletions libminifi/src/Configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ const std::unordered_map<std::string_view, gsl::not_null<const core::PropertyVal
{Configuration::nifi_flowfile_repository_rocksdb_compaction_period, gsl::make_not_null(&core::StandardPropertyTypes::TIME_PERIOD_TYPE)},
{Configuration::nifi_dbcontent_repository_rocksdb_compaction_period, gsl::make_not_null(&core::StandardPropertyTypes::TIME_PERIOD_TYPE)},
{Configuration::nifi_content_repository_rocksdb_use_synchronous_writes, gsl::make_not_null(&core::StandardPropertyTypes::BOOLEAN_TYPE)},
{Configuration::nifi_content_repository_rocksdb_read_verify_checksums, gsl::make_not_null(&core::StandardPropertyTypes::BOOLEAN_TYPE)},
{Configuration::nifi_flowfile_repository_rocksdb_read_verify_checksums, gsl::make_not_null(&core::StandardPropertyTypes::BOOLEAN_TYPE)},
{Configuration::nifi_provenance_repository_rocksdb_read_verify_checksums, gsl::make_not_null(&core::StandardPropertyTypes::BOOLEAN_TYPE)},
{Configuration::nifi_rocksdb_state_storage_read_verify_checksums, gsl::make_not_null(&core::StandardPropertyTypes::BOOLEAN_TYPE)},
{Configuration::nifi_dbcontent_repository_purge_period, gsl::make_not_null(&core::StandardPropertyTypes::TIME_PERIOD_TYPE)},
{Configuration::nifi_remote_input_secure, gsl::make_not_null(&core::StandardPropertyTypes::BOOLEAN_TYPE)},
{Configuration::nifi_security_need_ClientAuth, gsl::make_not_null(&core::StandardPropertyTypes::BOOLEAN_TYPE)},
Expand Down
Loading