Skip to content

Commit

Permalink
memory fix
Browse files Browse the repository at this point in the history
Signed-off-by: iceseer <[email protected]>
  • Loading branch information
iceseer committed Jun 7, 2024
1 parent d7c508a commit 8073861
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 143 deletions.
15 changes: 15 additions & 0 deletions core/api/jrpc/value_converter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ namespace kagome::api {
inline jsonrpc::Value makeValue(const primitives::Version &);
inline jsonrpc::Value makeValue(const primitives::Justification &);
inline jsonrpc::Value makeValue(const primitives::RpcMethods &);
inline jsonrpc::Value makeValue(
const primitives::events::RemoveAfterFinalizationParams &val);
inline jsonrpc::Value makeValue(
const primitives::events::RemoveAfterFinalizationParams::HeaderInfo &val);

inline jsonrpc::Value makeValue(const uint32_t &val) {
return static_cast<int64_t>(val);
Expand Down Expand Up @@ -150,6 +154,17 @@ namespace kagome::api {
return value;
}

inline jsonrpc::Value makeValue(
const primitives::events::RemoveAfterFinalizationParams &val) {
return makeValue(val.removed);
}

inline jsonrpc::Value makeValue(
const primitives::events::RemoveAfterFinalizationParams::HeaderInfo
&val) {
return makeValue(val.hash);
}

template <size_t N>
inline jsonrpc::Value makeValue(const common::Blob<N> &val) {
return makeValue(BufferView{val});
Expand Down
66 changes: 39 additions & 27 deletions core/blockchain/impl/block_tree_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -813,9 +813,13 @@ namespace kagome::blockchain {

OUTCOME_TRY(p.storage_->putJustification(justification, block_hash));

std::vector<primitives::BlockHash> retired_hashes;
std::vector<
primitives::events::RemoveAfterFinalizationParams::HeaderInfo>
retired_hashes;
for (auto parent = node->parent(); parent; parent = parent->parent()) {
retired_hashes.emplace_back(parent->info.hash);
retired_hashes.emplace_back(
primitives::events::RemoveAfterFinalizationParams::HeaderInfo{
parent->info.hash, parent->info.number});
}

auto changes = p.tree_->finalize(node);
Expand Down Expand Up @@ -846,12 +850,13 @@ namespace kagome::blockchain {

main_pool_handler_->execute(
[weak{weak_from_this()},
retired_hashes{std::move(retired_hashes)}] {
retired{primitives::events::RemoveAfterFinalizationParams{
std::move(retired_hashes), header.number}}] {
if (auto self = weak.lock()) {
self->chain_events_engine_->notify(
primitives::events::ChainEventType::
kDeactivateAfterFinalization,
retired_hashes);
retired);
}
});

Expand Down Expand Up @@ -1301,7 +1306,8 @@ namespace kagome::blockchain {
}

std::vector<primitives::Extrinsic> extrinsics;
std::vector<primitives::BlockHash> retired_hashes;
std::vector<primitives::events::RemoveAfterFinalizationParams::HeaderInfo>
retired_hashes;

// remove from storage
retired_hashes.reserve(changes.prune.size());
Expand Down Expand Up @@ -1329,33 +1335,39 @@ namespace kagome::blockchain {
BOOST_ASSERT(block_header_opt.has_value());
OUTCOME_TRY(p.state_pruner_->pruneDiscarded(block_header_opt.value()));
}
retired_hashes.emplace_back(block.hash);
retired_hashes.emplace_back(
primitives::events::RemoveAfterFinalizationParams::HeaderInfo{
block.hash, block.number});
OUTCOME_TRY(p.storage_->removeBlock(block.hash));
}

// trying to return extrinsics back to transaction pool
main_pool_handler_->execute([extrinsics{std::move(extrinsics)},
wself{weak_from_this()},
retired_hashes{
std::move(retired_hashes)}]() mutable {
if (auto self = wself.lock()) {
auto eo = self->block_tree_data_.sharedAccess(
[&](const BlockTreeData &p) { return p.extrinsic_observer_; });

for (auto &&extrinsic : extrinsics) {
auto result = eo->onTxMessage(extrinsic);
if (result) {
SL_DEBUG(self->log_, "Tx {} was reapplied", result.value().toHex());
} else {
SL_DEBUG(self->log_, "Tx was skipped: {}", result.error());
}
}
main_pool_handler_->execute(
[extrinsics{std::move(extrinsics)},
wself{weak_from_this()},
retired{primitives::events::RemoveAfterFinalizationParams{
std::move(retired_hashes),
getLastFinalizedNoLock(p).number}}]() mutable {
if (auto self = wself.lock()) {
auto eo = self->block_tree_data_.sharedAccess(
[&](const BlockTreeData &p) { return p.extrinsic_observer_; });

for (auto &&extrinsic : extrinsics) {
auto result = eo->onTxMessage(extrinsic);
if (result) {
SL_DEBUG(
self->log_, "Tx {} was reapplied", result.value().toHex());
} else {
SL_DEBUG(self->log_, "Tx was skipped: {}", result.error());
}
}

self->chain_events_engine_->notify(
primitives::events::ChainEventType::kDeactivateAfterFinalization,
retired_hashes);
}
});
self->chain_events_engine_->notify(
primitives::events::ChainEventType::
kDeactivateAfterFinalization,
retired);
}
});

return outcome::success();
}
Expand Down
218 changes: 110 additions & 108 deletions core/parachain/approval/approval_distribution.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -570,31 +570,31 @@ namespace kagome::parachain {
REINVOKE(*approval_thread_handler_, clearCaches, event);

approvals_cache_.exclusiveAccess([&](auto &approvals_cache) {
for (const auto &lost : event) {
for (const auto &lost : event.removed) {
SL_TRACE(logger_,
"Cleaning up stale pending messages.(block hash={})",
lost);
pending_known_.erase(lost);
active_tranches_.erase(lost);
approving_context_map_.erase(lost);
lost.hash);
pending_known_.erase(lost.hash);
active_tranches_.erase(lost.hash);
approving_context_map_.erase(lost.hash);
/// TODO(iceseer): `blocks_by_number_` clear on finalization

if (auto block_entry = storedBlockEntries().get(lost)) {
if (auto block_entry = storedBlockEntries().get(lost.hash)) {
for (const auto &candidate : block_entry->get().candidates) {
recovery_->remove(candidate.second);
storedCandidateEntries().extract(candidate.second);
if (auto it_cached = approvals_cache.find(candidate.second);
it_cached != approvals_cache.end()) {
ApprovalCache &approval_cache = it_cached->second;
approval_cache.blocks_.erase(lost);
approval_cache.blocks_.erase(lost.hash);
if (approval_cache.blocks_.empty()) {
approvals_cache.erase(it_cached);
}
}
}
storedBlockEntries().extract(lost);
storedBlockEntries().extract(lost.hash);
}
storedDistribBlockEntries().extract(lost);
storedDistribBlockEntries().extract(lost.hash);
}
});
}
Expand Down Expand Up @@ -1240,110 +1240,112 @@ namespace kagome::parachain {
ValidatorIndex validator_index,
Hash block_hash,
GroupIndex backing_group) {
auto on_recover_complete = [wself{weak_from_this()},
hashed_candidate{hashed_candidate},
block_hash,
session_index,
validator_index,
relay_block_hash](
std::optional<
outcome::result<runtime::AvailableData>>
&&opt_result) mutable {
auto self = wself.lock();
if (!self) {
return;
}
auto on_recover_complete =
[wself{weak_from_this()},
hashed_candidate{hashed_candidate},
block_hash,
session_index,
validator_index,
relay_block_hash](
std::optional<outcome::result<runtime::AvailableData>>
&&opt_result) mutable {
auto self = wself.lock();
if (!self) {
return;
}

const auto &candidate_receipt = hashed_candidate.get();
if (!opt_result) { // Unavailable
self->logger_->warn(
"No available parachain data.(session index={}, candidate "
"hash={}, relay block hash={})",
session_index,
hashed_candidate.getHash(),
relay_block_hash);
return;
}
const auto &candidate_receipt = hashed_candidate.get();
if (!opt_result) { // Unavailable
self->logger_->warn(
"No available parachain data.(session index={}, candidate "
"hash={}, relay block hash={})",
session_index,
hashed_candidate.getHash(),
relay_block_hash);
return;
}

if (opt_result->has_error()) {
self->logger_->warn(
"Parachain data recovery failed.(error={}, session index={}, "
"candidate hash={}, relay block hash={})",
opt_result->error(),
session_index,
hashed_candidate.getHash(),
relay_block_hash);
self->dispute_coordinator_.get()->issueLocalStatement(
session_index,
hashed_candidate.getHash(),
hashed_candidate.get(),
false);
return;
}
auto &available_data = opt_result->value();
auto result = self->parachain_host_->validation_code_by_hash(
block_hash, candidate_receipt.descriptor.validation_code_hash);
if (result.has_error() || !result.value()) {
self->logger_->warn(
"Approval state is failed. Block hash {}, session index {}, "
"validator index {}, relay parent {}",
block_hash,
session_index,
validator_index,
candidate_receipt.descriptor.relay_parent);
return; /// ApprovalState::failed
}
if (opt_result->has_error()) {
self->logger_->warn(
"Parachain data recovery failed.(error={}, session index={}, "
"candidate hash={}, relay block hash={})",
opt_result->error(),
session_index,
hashed_candidate.getHash(),
relay_block_hash);
self->dispute_coordinator_.get()->issueLocalStatement(
session_index,
hashed_candidate.getHash(),
hashed_candidate.get(),
false);
return;
}
auto &available_data = opt_result->value();
auto result = self->parachain_host_->validation_code_by_hash(
block_hash, candidate_receipt.descriptor.validation_code_hash);
if (result.has_error() || !result.value()) {
self->logger_->warn(
"Approval state is failed. Block hash {}, session index {}, "
"validator index {}, relay parent {}",
block_hash,
session_index,
validator_index,
candidate_receipt.descriptor.relay_parent);
return; /// ApprovalState::failed
}

self->logger_->info(
"Make exhaustive validation. Candidate hash {}, validator index "
"{}, block hash {}",
hashed_candidate.getHash(),
validator_index,
block_hash);
self->logger_->info(
"Make exhaustive validation. Candidate hash {}, validator index "
"{}, block hash {}",
hashed_candidate.getHash(),
validator_index,
block_hash);

runtime::ValidationCode &validation_code = *result.value();
runtime::ValidationCode &validation_code = *result.value();

auto cb = [weak_self{wself},
hashed_candidate,
session_index,
validator_index,
relay_block_hash](outcome::result<Pvf::Result> outcome) {
auto self = weak_self.lock();
if (not self) {
return;
}
const auto &candidate_receipt = hashed_candidate.get();
self->approvals_cache_.exclusiveAccess([&](auto &approvals_cache) {
if (auto it = approvals_cache.find(hashed_candidate.getHash());
it != approvals_cache.end()) {
ApprovalCache &ac = it->second;
ac.approval_result = outcome.has_error()
? ApprovalOutcome::Failed
: ApprovalOutcome::Approved;
}
});
if (outcome.has_error()) {
self->logger_->warn(
"Approval validation failed.(parachain id={}, relay parent={}, error={})",
candidate_receipt.descriptor.para_id,
candidate_receipt.descriptor.relay_parent,
outcome.error());
self->dispute_coordinator_.get()->issueLocalStatement(
session_index,
hashed_candidate.getHash(),
candidate_receipt,
false);
} else {
self->issue_approval(
hashed_candidate.getHash(), validator_index, relay_block_hash);
}
};
self->pvf_->pvfValidate(available_data.validation_data,
available_data.pov,
candidate_receipt,
validation_code,
std::move(cb));
};
auto cb = [weak_self{wself},
hashed_candidate,
session_index,
validator_index,
relay_block_hash](outcome::result<Pvf::Result> outcome) {
auto self = weak_self.lock();
if (not self) {
return;
}
const auto &candidate_receipt = hashed_candidate.get();
self->approvals_cache_.exclusiveAccess([&](auto &approvals_cache) {
if (auto it = approvals_cache.find(hashed_candidate.getHash());
it != approvals_cache.end()) {
ApprovalCache &ac = it->second;
ac.approval_result = outcome.has_error()
? ApprovalOutcome::Failed
: ApprovalOutcome::Approved;
}
});
if (outcome.has_error()) {
self->logger_->warn(
"Approval validation failed.(parachain id={}, relay "
"parent={}, error={})",
candidate_receipt.descriptor.para_id,
candidate_receipt.descriptor.relay_parent,
outcome.error());
self->dispute_coordinator_.get()->issueLocalStatement(
session_index,
hashed_candidate.getHash(),
candidate_receipt,
false);
} else {
self->issue_approval(hashed_candidate.getHash(),
validator_index,
relay_block_hash);
}
};
self->pvf_->pvfValidate(available_data.validation_data,
available_data.pov,
candidate_receipt,
validation_code,
std::move(cb));
};

recovery_->recover(hashed_candidate,
session_index,
Expand Down
Loading

0 comments on commit 8073861

Please sign in to comment.