diff --git a/core/api/jrpc/value_converter.hpp b/core/api/jrpc/value_converter.hpp index 1b12dcd75e..5bfeac3d2f 100644 --- a/core/api/jrpc/value_converter.hpp +++ b/core/api/jrpc/value_converter.hpp @@ -70,6 +70,10 @@ namespace kagome::api { inline jsonrpc::Value makeValue(const primitives::Version &); inline jsonrpc::Value makeValue(const primitives::Justification &); inline jsonrpc::Value makeValue(const primitives::RpcMethods &); + inline jsonrpc::Value makeValue( + const primitives::events::RemoveAfterFinalizationParams &val); + inline jsonrpc::Value makeValue( + const primitives::events::RemoveAfterFinalizationParams::HeaderInfo &val); inline jsonrpc::Value makeValue(const uint32_t &val) { return static_cast(val); @@ -150,6 +154,17 @@ namespace kagome::api { return value; } + inline jsonrpc::Value makeValue( + const primitives::events::RemoveAfterFinalizationParams &val) { + return makeValue(val.removed); + } + + inline jsonrpc::Value makeValue( + const primitives::events::RemoveAfterFinalizationParams::HeaderInfo + &val) { + return makeValue(val.hash); + } + template inline jsonrpc::Value makeValue(const common::Blob &val) { return makeValue(BufferView{val}); diff --git a/core/blockchain/impl/block_tree_impl.cpp b/core/blockchain/impl/block_tree_impl.cpp index 41c6dfbb96..b4cadc891d 100644 --- a/core/blockchain/impl/block_tree_impl.cpp +++ b/core/blockchain/impl/block_tree_impl.cpp @@ -813,9 +813,13 @@ namespace kagome::blockchain { OUTCOME_TRY(p.storage_->putJustification(justification, block_hash)); - std::vector retired_hashes; + std::vector< + primitives::events::RemoveAfterFinalizationParams::HeaderInfo> + retired_hashes; for (auto parent = node->parent(); parent; parent = parent->parent()) { - retired_hashes.emplace_back(parent->info.hash); + retired_hashes.emplace_back( + primitives::events::RemoveAfterFinalizationParams::HeaderInfo{ + parent->info.hash, parent->info.number}); } auto changes = p.tree_->finalize(node); @@ -846,12 +850,13 @@ namespace kagome::blockchain { main_pool_handler_->execute( [weak{weak_from_this()}, - retired_hashes{std::move(retired_hashes)}] { + retired{primitives::events::RemoveAfterFinalizationParams{ + std::move(retired_hashes), header.number}}] { if (auto self = weak.lock()) { self->chain_events_engine_->notify( primitives::events::ChainEventType:: kDeactivateAfterFinalization, - retired_hashes); + retired); } }); @@ -1301,7 +1306,8 @@ namespace kagome::blockchain { } std::vector extrinsics; - std::vector retired_hashes; + std::vector + retired_hashes; // remove from storage retired_hashes.reserve(changes.prune.size()); @@ -1329,33 +1335,39 @@ namespace kagome::blockchain { BOOST_ASSERT(block_header_opt.has_value()); OUTCOME_TRY(p.state_pruner_->pruneDiscarded(block_header_opt.value())); } - retired_hashes.emplace_back(block.hash); + retired_hashes.emplace_back( + primitives::events::RemoveAfterFinalizationParams::HeaderInfo{ + block.hash, block.number}); OUTCOME_TRY(p.storage_->removeBlock(block.hash)); } // trying to return extrinsics back to transaction pool - main_pool_handler_->execute([extrinsics{std::move(extrinsics)}, - wself{weak_from_this()}, - retired_hashes{ - std::move(retired_hashes)}]() mutable { - if (auto self = wself.lock()) { - auto eo = self->block_tree_data_.sharedAccess( - [&](const BlockTreeData &p) { return p.extrinsic_observer_; }); - - for (auto &&extrinsic : extrinsics) { - auto result = eo->onTxMessage(extrinsic); - if (result) { - SL_DEBUG(self->log_, "Tx {} was reapplied", result.value().toHex()); - } else { - SL_DEBUG(self->log_, "Tx was skipped: {}", result.error()); - } - } + main_pool_handler_->execute( + [extrinsics{std::move(extrinsics)}, + wself{weak_from_this()}, + retired{primitives::events::RemoveAfterFinalizationParams{ + std::move(retired_hashes), + getLastFinalizedNoLock(p).number}}]() mutable { + if (auto self = wself.lock()) { + auto eo = self->block_tree_data_.sharedAccess( + [&](const BlockTreeData &p) { return p.extrinsic_observer_; }); + + for (auto &&extrinsic : extrinsics) { + auto result = eo->onTxMessage(extrinsic); + if (result) { + SL_DEBUG( + self->log_, "Tx {} was reapplied", result.value().toHex()); + } else { + SL_DEBUG(self->log_, "Tx was skipped: {}", result.error()); + } + } - self->chain_events_engine_->notify( - primitives::events::ChainEventType::kDeactivateAfterFinalization, - retired_hashes); - } - }); + self->chain_events_engine_->notify( + primitives::events::ChainEventType:: + kDeactivateAfterFinalization, + retired); + } + }); return outcome::success(); } diff --git a/core/parachain/approval/approval_distribution.cpp b/core/parachain/approval/approval_distribution.cpp index 9bad3c92ea..2bcaf93e30 100644 --- a/core/parachain/approval/approval_distribution.cpp +++ b/core/parachain/approval/approval_distribution.cpp @@ -570,31 +570,31 @@ namespace kagome::parachain { REINVOKE(*approval_thread_handler_, clearCaches, event); approvals_cache_.exclusiveAccess([&](auto &approvals_cache) { - for (const auto &lost : event) { + for (const auto &lost : event.removed) { SL_TRACE(logger_, "Cleaning up stale pending messages.(block hash={})", - lost); - pending_known_.erase(lost); - active_tranches_.erase(lost); - approving_context_map_.erase(lost); + lost.hash); + pending_known_.erase(lost.hash); + active_tranches_.erase(lost.hash); + approving_context_map_.erase(lost.hash); /// TODO(iceseer): `blocks_by_number_` clear on finalization - if (auto block_entry = storedBlockEntries().get(lost)) { + if (auto block_entry = storedBlockEntries().get(lost.hash)) { for (const auto &candidate : block_entry->get().candidates) { recovery_->remove(candidate.second); storedCandidateEntries().extract(candidate.second); if (auto it_cached = approvals_cache.find(candidate.second); it_cached != approvals_cache.end()) { ApprovalCache &approval_cache = it_cached->second; - approval_cache.blocks_.erase(lost); + approval_cache.blocks_.erase(lost.hash); if (approval_cache.blocks_.empty()) { approvals_cache.erase(it_cached); } } } - storedBlockEntries().extract(lost); + storedBlockEntries().extract(lost.hash); } - storedDistribBlockEntries().extract(lost); + storedDistribBlockEntries().extract(lost.hash); } }); } @@ -1240,110 +1240,112 @@ namespace kagome::parachain { ValidatorIndex validator_index, Hash block_hash, GroupIndex backing_group) { - auto on_recover_complete = [wself{weak_from_this()}, - hashed_candidate{hashed_candidate}, - block_hash, - session_index, - validator_index, - relay_block_hash]( - std::optional< - outcome::result> - &&opt_result) mutable { - auto self = wself.lock(); - if (!self) { - return; - } + auto on_recover_complete = + [wself{weak_from_this()}, + hashed_candidate{hashed_candidate}, + block_hash, + session_index, + validator_index, + relay_block_hash]( + std::optional> + &&opt_result) mutable { + auto self = wself.lock(); + if (!self) { + return; + } - const auto &candidate_receipt = hashed_candidate.get(); - if (!opt_result) { // Unavailable - self->logger_->warn( - "No available parachain data.(session index={}, candidate " - "hash={}, relay block hash={})", - session_index, - hashed_candidate.getHash(), - relay_block_hash); - return; - } + const auto &candidate_receipt = hashed_candidate.get(); + if (!opt_result) { // Unavailable + self->logger_->warn( + "No available parachain data.(session index={}, candidate " + "hash={}, relay block hash={})", + session_index, + hashed_candidate.getHash(), + relay_block_hash); + return; + } - if (opt_result->has_error()) { - self->logger_->warn( - "Parachain data recovery failed.(error={}, session index={}, " - "candidate hash={}, relay block hash={})", - opt_result->error(), - session_index, - hashed_candidate.getHash(), - relay_block_hash); - self->dispute_coordinator_.get()->issueLocalStatement( - session_index, - hashed_candidate.getHash(), - hashed_candidate.get(), - false); - return; - } - auto &available_data = opt_result->value(); - auto result = self->parachain_host_->validation_code_by_hash( - block_hash, candidate_receipt.descriptor.validation_code_hash); - if (result.has_error() || !result.value()) { - self->logger_->warn( - "Approval state is failed. Block hash {}, session index {}, " - "validator index {}, relay parent {}", - block_hash, - session_index, - validator_index, - candidate_receipt.descriptor.relay_parent); - return; /// ApprovalState::failed - } + if (opt_result->has_error()) { + self->logger_->warn( + "Parachain data recovery failed.(error={}, session index={}, " + "candidate hash={}, relay block hash={})", + opt_result->error(), + session_index, + hashed_candidate.getHash(), + relay_block_hash); + self->dispute_coordinator_.get()->issueLocalStatement( + session_index, + hashed_candidate.getHash(), + hashed_candidate.get(), + false); + return; + } + auto &available_data = opt_result->value(); + auto result = self->parachain_host_->validation_code_by_hash( + block_hash, candidate_receipt.descriptor.validation_code_hash); + if (result.has_error() || !result.value()) { + self->logger_->warn( + "Approval state is failed. Block hash {}, session index {}, " + "validator index {}, relay parent {}", + block_hash, + session_index, + validator_index, + candidate_receipt.descriptor.relay_parent); + return; /// ApprovalState::failed + } - self->logger_->info( - "Make exhaustive validation. Candidate hash {}, validator index " - "{}, block hash {}", - hashed_candidate.getHash(), - validator_index, - block_hash); + self->logger_->info( + "Make exhaustive validation. Candidate hash {}, validator index " + "{}, block hash {}", + hashed_candidate.getHash(), + validator_index, + block_hash); - runtime::ValidationCode &validation_code = *result.value(); + runtime::ValidationCode &validation_code = *result.value(); - auto cb = [weak_self{wself}, - hashed_candidate, - session_index, - validator_index, - relay_block_hash](outcome::result outcome) { - auto self = weak_self.lock(); - if (not self) { - return; - } - const auto &candidate_receipt = hashed_candidate.get(); - self->approvals_cache_.exclusiveAccess([&](auto &approvals_cache) { - if (auto it = approvals_cache.find(hashed_candidate.getHash()); - it != approvals_cache.end()) { - ApprovalCache &ac = it->second; - ac.approval_result = outcome.has_error() - ? ApprovalOutcome::Failed - : ApprovalOutcome::Approved; - } - }); - if (outcome.has_error()) { - self->logger_->warn( - "Approval validation failed.(parachain id={}, relay parent={}, error={})", - candidate_receipt.descriptor.para_id, - candidate_receipt.descriptor.relay_parent, - outcome.error()); - self->dispute_coordinator_.get()->issueLocalStatement( - session_index, - hashed_candidate.getHash(), - candidate_receipt, - false); - } else { - self->issue_approval( - hashed_candidate.getHash(), validator_index, relay_block_hash); - } - }; - self->pvf_->pvfValidate(available_data.validation_data, - available_data.pov, - candidate_receipt, - validation_code, - std::move(cb)); - }; + auto cb = [weak_self{wself}, + hashed_candidate, + session_index, + validator_index, + relay_block_hash](outcome::result outcome) { + auto self = weak_self.lock(); + if (not self) { + return; + } + const auto &candidate_receipt = hashed_candidate.get(); + self->approvals_cache_.exclusiveAccess([&](auto &approvals_cache) { + if (auto it = approvals_cache.find(hashed_candidate.getHash()); + it != approvals_cache.end()) { + ApprovalCache &ac = it->second; + ac.approval_result = outcome.has_error() + ? ApprovalOutcome::Failed + : ApprovalOutcome::Approved; + } + }); + if (outcome.has_error()) { + self->logger_->warn( + "Approval validation failed.(parachain id={}, relay " + "parent={}, error={})", + candidate_receipt.descriptor.para_id, + candidate_receipt.descriptor.relay_parent, + outcome.error()); + self->dispute_coordinator_.get()->issueLocalStatement( + session_index, + hashed_candidate.getHash(), + candidate_receipt, + false); + } else { + self->issue_approval(hashed_candidate.getHash(), + validator_index, + relay_block_hash); + } + }; + self->pvf_->pvfValidate(available_data.validation_data, + available_data.pov, + candidate_receipt, + validation_code, + std::move(cb)); + }; recovery_->recover(hashed_candidate, session_index, diff --git a/core/parachain/validator/impl/parachain_processor.cpp b/core/parachain/validator/impl/parachain_processor.cpp index f441922c95..8642edfe75 100644 --- a/core/parachain/validator/impl/parachain_processor.cpp +++ b/core/parachain/validator/impl/parachain_processor.cpp @@ -794,12 +794,36 @@ namespace kagome::parachain { const primitives::events::RemoveAfterFinalizationParams &event) { REINVOKE(*main_pool_handler_, onDeactivateBlocks, event); - for (const auto &lost : event) { - SL_TRACE(logger_, "Remove from storages.(relay parent={})", lost); - - backing_store_->onDeactivateLeaf(lost); - av_store_->remove(lost); - bitfield_store_->remove(lost); + for (const auto &lost : event.removed) { + SL_TRACE(logger_, + "Remove from storages.(relay parent={}, number={})", + lost.hash, + lost.number); + + backing_store_->onDeactivateLeaf(lost.hash); + av_store_->remove(lost.hash); + bitfield_store_->remove(lost.hash); + } + + for (auto it = our_current_state_.state_by_relay_parent.begin(); + it != our_current_state_.state_by_relay_parent.end();) { + const auto &hash = it->first; + const auto &per_relay_state = it->second; + const auto header = block_tree_->getBlockHeader(hash); + + const bool keep = header.has_value() + && per_relay_state.prospective_parachains_mode + && (header.value().number + + per_relay_state.prospective_parachains_mode + ->allowed_ancestry_len + + 1) + >= event.finalized; + if (keep) { + ++it; + } else { + our_current_state_.implicit_view->deactivate_leaf(hash); + it = our_current_state_.state_by_relay_parent.erase(it); + } } } diff --git a/core/primitives/event_types.hpp b/core/primitives/event_types.hpp index 14edcdf12d..1ae24e606d 100644 --- a/core/primitives/event_types.hpp +++ b/core/primitives/event_types.hpp @@ -48,7 +48,14 @@ namespace kagome::primitives::events { using HeadsEventParams = ref_t; using RuntimeVersionEventParams = ref_t; using NewRuntimeEventParams = ref_t; - using RemoveAfterFinalizationParams = std::vector; + struct RemoveAfterFinalizationParams { + struct HeaderInfo { + primitives::BlockHash hash; + primitives::BlockNumber number; + }; + std::vector removed; + primitives::BlockNumber finalized; + }; using ChainEventParams = boost::variantclearCaches(event); + std::vector removed; + removed.reserve(event.removed.size()); + std::transform(event.removed.begin(), + event.removed.end(), + std::back_inserter(removed), + [](const auto &bi) { return bi.hash; }); + self->clearCaches(removed); } });