Skip to content

Commit

Permalink
Parallelize checksum computation in filesystem writer
Browse files Browse the repository at this point in the history
  • Loading branch information
mhx committed Apr 5, 2021
1 parent 713b187 commit ef722f0
Showing 1 changed file with 108 additions and 110 deletions.
218 changes: 108 additions & 110 deletions src/dwarfs/filesystem_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ namespace {

class fsblock {
public:
fsblock(logger& lgr, section_type type, const block_compressor& bc,
std::shared_ptr<block_data>&& data);
fsblock(section_type type, block_compressor const& bc,
std::shared_ptr<block_data>&& data, uint32_t number);

fsblock(section_type type, compression_type compression,
folly::ByteRange data);
folly::ByteRange data, uint32_t number);

void compress(worker_group& wg) { impl_->compress(wg); }
void wait_until_compressed() { impl_->wait_until_compressed(); }
Expand All @@ -60,6 +60,8 @@ class fsblock {
folly::ByteRange data() const { return impl_->data(); }
size_t uncompressed_size() const { return impl_->uncompressed_size(); }
size_t size() const { return impl_->size(); }
uint32_t number() const { return impl_->number(); }
section_header_v2 const& header() const { return impl_->header(); }

class impl {
public:
Expand All @@ -72,112 +74,97 @@ class fsblock {
virtual folly::ByteRange data() const = 0;
virtual size_t uncompressed_size() const = 0;
virtual size_t size() const = 0;
virtual uint32_t number() const = 0;
virtual section_header_v2 const& header() const = 0;
};

static void
build_section_header(section_header_v2& sh, fsblock::impl const& fsb);

private:
std::unique_ptr<impl> impl_;
};

template <typename LoggerPolicy>
class raw_fsblock : public fsblock::impl {
private:
class state {
public:
state(std::shared_ptr<block_data>&& data, logger& lgr)
: compressed_(false)
, data_(std::move(data))
, LOG_PROXY_INIT(lgr) {}

void compress(const block_compressor& bc) {
std::shared_ptr<block_data> tmp;

{
auto td = LOG_TIMED_TRACE;
public:
raw_fsblock(section_type type, const block_compressor& bc,
std::shared_ptr<block_data>&& data, uint32_t number)
: type_{type}
, bc_{bc}
, uncompressed_size_{data->size()}
, data_{std::move(data)}
, number_{number} {}

tmp = std::make_shared<block_data>(bc.compress(data_->vec()));
void compress(worker_group& wg) override {
std::promise<void> prom;
future_ = prom.get_future();

td << "block compression finished";
}
wg.add_job([this, prom = std::move(prom)]() mutable {
auto tmp = std::make_shared<block_data>(bc_.compress(data_->vec()));

{
std::lock_guard lock(mx_);
data_.swap(tmp);
compressed_ = true;
}

cond_.notify_one();
}

void wait() {
std::unique_lock lock(mx_);
cond_.wait(lock, [&]() -> bool { return compressed_; });
}

std::vector<uint8_t> const& data() const { return data_->vec(); }

size_t size() const {
std::lock_guard lock(mx_);
return data_->size();
}

private:
mutable std::mutex mx_;
std::condition_variable cond_;
std::atomic<bool> compressed_;
std::shared_ptr<block_data> data_;
LOG_PROXY_DECL(LoggerPolicy);
};

public:
raw_fsblock(logger& lgr, section_type type, const block_compressor& bc,
std::shared_ptr<block_data>&& data)
: type_(type)
, bc_(bc)
, uncompressed_size_(data->size())
, state_(std::make_shared<state>(std::move(data), lgr))
, LOG_PROXY_INIT(lgr) {}
tmp.reset();

void compress(worker_group& wg) override {
LOG_TRACE << "block queued for compression";
fsblock::build_section_header(header_, *this);

std::shared_ptr<state> s = state_;

wg.add_job([&, s] {
LOG_TRACE << "block compression started";
s->compress(bc_);
prom.set_value();
});
}

void wait_until_compressed() override { state_->wait(); }
void wait_until_compressed() override { future_.wait(); }

section_type type() const override { return type_; }

compression_type compression() const override { return bc_.type(); }

folly::ByteRange data() const override { return state_->data(); }
folly::ByteRange data() const override { return data_->vec(); }

size_t uncompressed_size() const override { return uncompressed_size_; }

size_t size() const override { return state_->size(); }
size_t size() const override {
std::lock_guard lock(mx_);
return data_->size();
}

uint32_t number() const override { return number_; }

section_header_v2 const& header() const override { return header_; }

private:
const section_type type_;
block_compressor const& bc_;
const size_t uncompressed_size_;
std::shared_ptr<state> state_;
LOG_PROXY_DECL(LoggerPolicy);
mutable std::mutex mx_;
std::shared_ptr<block_data> data_;
std::future<void> future_;
uint32_t const number_;
section_header_v2 header_;
};

class compressed_fsblock : public fsblock::impl {
public:
compressed_fsblock(section_type type, compression_type compression,
folly::ByteRange range)
: type_(type)
, compression_(compression)
, range_(range) {}
folly::ByteRange range, uint32_t number)
: type_{type}
, compression_{compression}
, range_{range}
, number_{number} {}

void compress(worker_group&) override {}
void wait_until_compressed() override {}
void compress(worker_group& wg) override {
std::promise<void> prom;
future_ = prom.get_future();

wg.add_job([this, prom = std::move(prom)]() mutable {
fsblock::build_section_header(header_, *this);
prom.set_value();
});
}

void wait_until_compressed() override { future_.wait(); }

section_type type() const override { return type_; }
compression_type compression() const override { return compression_; }
Expand All @@ -187,20 +174,52 @@ class compressed_fsblock : public fsblock::impl {
size_t uncompressed_size() const override { return range_.size(); }
size_t size() const override { return range_.size(); }

uint32_t number() const override { return number_; }

section_header_v2 const& header() const override { return header_; }

private:
const section_type type_;
const compression_type compression_;
section_type const type_;
compression_type const compression_;
folly::ByteRange range_;
std::future<void> future_;
uint32_t const number_;
section_header_v2 header_;
};

fsblock::fsblock(logger& lgr, section_type type, const block_compressor& bc,
std::shared_ptr<block_data>&& data)
: impl_(make_unique_logging_object<impl, raw_fsblock, logger_policies>(
lgr, type, bc, std::move(data))) {}
fsblock::fsblock(section_type type, block_compressor const& bc,
std::shared_ptr<block_data>&& data, uint32_t number)
: impl_(std::make_unique<raw_fsblock>(type, bc, std::move(data), number)) {}

fsblock::fsblock(section_type type, compression_type compression,
folly::ByteRange data)
: impl_(std::make_unique<compressed_fsblock>(type, compression, data)) {}
folly::ByteRange data, uint32_t number)
: impl_(std::make_unique<compressed_fsblock>(type, compression, data,
number)) {}

void fsblock::build_section_header(section_header_v2& sh,
fsblock::impl const& fsb) {
auto range = fsb.data();

::memcpy(&sh.magic[0], "DWARFS", 6);
sh.major = MAJOR_VERSION;
sh.minor = MINOR_VERSION;
sh.number = fsb.number();
sh.type = static_cast<uint16_t>(fsb.type());
sh.compression = static_cast<uint16_t>(fsb.compression());
sh.length = range.size();

checksum xxh(checksum::algorithm::XXH3_64);
xxh.update(&sh.number,
sizeof(section_header_v2) - offsetof(section_header_v2, number));
xxh.update(range.data(), range.size());
DWARFS_CHECK(xxh.finalize(&sh.xxh3_64), "XXH3-64 checksum failed");

checksum sha(checksum::algorithm::SHA2_512_256);
sha.update(&sh.xxh3_64,
sizeof(section_header_v2) - offsetof(section_header_v2, xxh3_64));
sha.update(range.data(), range.size());
DWARFS_CHECK(sha.finalize(&sh.sha2_512_256), "SHA512/256 checksum failed");
}

template <typename LoggerPolicy>
class filesystem_writer_ final : public filesystem_writer::impl {
Expand All @@ -226,8 +245,7 @@ class filesystem_writer_ final : public filesystem_writer::impl {
private:
void write_section(section_type type, std::shared_ptr<block_data>&& data,
block_compressor const& bc);
void write(section_type type, compression_type compression,
folly::ByteRange range);
void write(fsblock const& fsb);
void write(const char* data, size_t size);
template <typename T>
void write(const T& obj);
Expand Down Expand Up @@ -321,7 +339,7 @@ void filesystem_writer_<LoggerPolicy>::writer_thread() {
<< size_with_unit(fsb->uncompressed_size()) << " to "
<< size_with_unit(fsb->size());

write(fsb->type(), fsb->compression(), fsb->data());
write(*fsb);
}
}

Expand Down Expand Up @@ -355,34 +373,11 @@ void filesystem_writer_<LoggerPolicy>::write(folly::ByteRange range) {
}

template <typename LoggerPolicy>
void filesystem_writer_<LoggerPolicy>::write(section_type type,
compression_type compression,
folly::ByteRange range) {
section_header_v2 sh;
::memcpy(&sh.magic[0], "DWARFS", 6);
sh.major = MAJOR_VERSION;
sh.minor = MINOR_VERSION;
sh.number = section_number_++;
sh.type = static_cast<uint16_t>(type);
sh.compression = static_cast<uint16_t>(compression);
sh.length = range.size();

checksum xxh(checksum::algorithm::XXH3_64);
xxh.update(&sh.number,
sizeof(section_header_v2) - offsetof(section_header_v2, number));
xxh.update(range.data(), range.size());
DWARFS_CHECK(xxh.finalize(&sh.xxh3_64), "XXH3-64 checksum failed");
void filesystem_writer_<LoggerPolicy>::write(fsblock const& fsb) {
write(fsb.header());
write(fsb.data());

checksum sha(checksum::algorithm::SHA2_512_256);
sha.update(&sh.xxh3_64,
sizeof(section_header_v2) - offsetof(section_header_v2, xxh3_64));
sha.update(range.data(), range.size());
DWARFS_CHECK(sha.finalize(&sh.sha2_512_256), "SHA512/256 checksum failed");

write(sh);
write(range);

if (type == section_type::BLOCK) {
if (fsb.type() == section_type::BLOCK) {
prog_.blocks_written++;
}
}
Expand All @@ -400,7 +395,7 @@ void filesystem_writer_<LoggerPolicy>::write_section(
}

auto fsb =
std::make_unique<fsblock>(LOG_GET_LOGGER, type, bc, std::move(data));
std::make_unique<fsblock>(type, bc, std::move(data), section_number_++);

fsb->compress(wg_);

Expand All @@ -415,7 +410,10 @@ void filesystem_writer_<LoggerPolicy>::write_section(
template <typename LoggerPolicy>
void filesystem_writer_<LoggerPolicy>::write_compressed_section(
section_type type, compression_type compression, folly::ByteRange data) {
auto fsb = std::make_unique<fsblock>(type, compression, data);
auto fsb =
std::make_unique<fsblock>(type, compression, data, section_number_++);

fsb->compress(wg_);

{
std::lock_guard lock(mx_);
Expand Down

0 comments on commit ef722f0

Please sign in to comment.