Skip to content

Commit

Permalink
Merge branch 'main' into feat/1414-rich-text-component
Browse files Browse the repository at this point in the history
  • Loading branch information
LynxLynxx authored Jan 17, 2025
2 parents 99d03a7 + 5c875d3 commit 4b3ba29
Show file tree
Hide file tree
Showing 50 changed files with 2,640 additions and 49 deletions.
9 changes: 7 additions & 2 deletions catalyst-gateway/bin/src/cardano/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tracing::{debug, error, info, warn};

use crate::{
db::index::{
block::index_block,
block::{index_block, roll_forward},
queries::sync_status::{
get::{get_sync_status, SyncStatus},
update::update_sync_status,
Expand Down Expand Up @@ -453,14 +453,19 @@ impl SyncTask {
},
}

// TODO: IF there is only 1 chain follower left in sync_tasks, then all
// IF there is only 1 chain follower left in sync_tasks, then all
// immutable followers have finished.
// When this happens we need to purge the live index of any records that exist
// before the current immutable tip.
// Note: to prevent a data race when multiple nodes are syncing, we probably
// want to put a gap in this, so that there are X slots of overlap
// between the live chain and immutable chain. This gap should be
// a parameter.
if self.sync_tasks.len() == 1 {
if let Err(error) = roll_forward::purge_live_index(self.immutable_tip_slot).await {
error!(chain=%self.cfg.chain, error=%error, "BUG: Purging volatile data task failed.");
}
}
}

error!(chain=%self.cfg.chain,"BUG: Sync tasks have all stopped. This is an unexpected error!");
Expand Down
1 change: 1 addition & 0 deletions catalyst-gateway/bin/src/db/index/block/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
pub(crate) mod certs;
pub(crate) mod cip36;
pub(crate) mod rbac509;
pub(crate) mod roll_forward;
pub(crate) mod txi;
pub(crate) mod txo;

Expand Down
314 changes: 314 additions & 0 deletions catalyst-gateway/bin/src/db/index/block/roll_forward.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,314 @@
//! Immutable Roll Forward logic.
use std::{collections::HashSet, sync::Arc};

use futures::StreamExt;

use crate::{
db::index::{block::CassandraSession, queries::purge},
settings::Settings,
};

/// Purge Data from Live Index
pub(crate) async fn purge_live_index(purge_slot: u64) -> anyhow::Result<()> {
let persistent = false; // get volatile session
let Some(session) = CassandraSession::get(persistent) else {
anyhow::bail!("Failed to acquire db session");
};

// Purge data up to this slot
let purge_to_slot: num_bigint::BigInt = purge_slot
.saturating_sub(Settings::purge_slot_buffer())
.into();

let txn_hashes = purge_txi_by_hash(&session, &purge_to_slot).await?;
purge_chain_root_for_role0_key(&session, &purge_to_slot).await?;
purge_chain_root_for_stake_address(&session, &purge_to_slot).await?;
purge_chain_root_for_txn_id(&session, &txn_hashes).await?;
purge_cip36_registration(&session, &purge_to_slot).await?;
purge_cip36_registration_for_vote_key(&session, &purge_to_slot).await?;
purge_cip36_registration_invalid(&session, &purge_to_slot).await?;
purge_rbac509_registration(&session, &purge_to_slot).await?;
purge_stake_registration(&session, &purge_to_slot).await?;
purge_txo_ada(&session, &purge_to_slot).await?;
purge_txo_assets(&session, &purge_to_slot).await?;
purge_unstaked_txo_ada(&session, &purge_to_slot).await?;
purge_unstaked_txo_assets(&session, &purge_to_slot).await?;

Ok(())
}

/// Purge data from `chain_root_for_role0_key`.
async fn purge_chain_root_for_role0_key(
session: &Arc<CassandraSession>, purge_to_slot: &num_bigint::BigInt,
) -> anyhow::Result<()> {
use purge::chain_root_for_role0_key::{DeleteQuery, Params, PrimaryKeyQuery};

// Get all keys
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
// Filter
let mut delete_params: Vec<Params> = Vec::new();
while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
let params: Params = primary_key.into();
if &params.slot_no <= purge_to_slot {
delete_params.push(params);
}
}
// Delete filtered keys
DeleteQuery::execute(session, delete_params).await?;
Ok(())
}

/// Purge data from `chain_root_for_stake_address`.
async fn purge_chain_root_for_stake_address(
session: &Arc<CassandraSession>, purge_to_slot: &num_bigint::BigInt,
) -> anyhow::Result<()> {
use purge::chain_root_for_stake_address::{DeleteQuery, Params, PrimaryKeyQuery};

// Get all keys
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
// Filter
let mut delete_params: Vec<Params> = Vec::new();
while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
let params: Params = primary_key.into();
if &params.slot_no <= purge_to_slot {
delete_params.push(params);
}
}
// Delete filtered keys
DeleteQuery::execute(session, delete_params).await?;
Ok(())
}

/// Purge data from `chain_root_for_txn_id`.
async fn purge_chain_root_for_txn_id(
session: &Arc<CassandraSession>, txn_hashes: &HashSet<Vec<u8>>,
) -> anyhow::Result<()> {
use purge::chain_root_for_txn_id::{DeleteQuery, Params, PrimaryKeyQuery};

// Get all keys
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
// Filter
let mut delete_params: Vec<Params> = Vec::new();
while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
let params: Params = primary_key.into();
if txn_hashes.contains(&params.transaction_id) {
delete_params.push(params);
}
}
// Delete filtered keys
DeleteQuery::execute(session, delete_params).await?;
Ok(())
}

/// Purge data from `cip36_registration`.
async fn purge_cip36_registration(
session: &Arc<CassandraSession>, purge_to_slot: &num_bigint::BigInt,
) -> anyhow::Result<()> {
use purge::cip36_registration::{DeleteQuery, Params, PrimaryKeyQuery};

// Get all keys
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
// Filter
let mut delete_params: Vec<Params> = Vec::new();
while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
let params: Params = primary_key.into();
if &params.slot_no <= purge_to_slot {
delete_params.push(params);
}
}
// Delete filtered keys
DeleteQuery::execute(session, delete_params).await?;
Ok(())
}

/// Purge data from `cip36_registration_for_vote_key`.
async fn purge_cip36_registration_for_vote_key(
session: &Arc<CassandraSession>, purge_to_slot: &num_bigint::BigInt,
) -> anyhow::Result<()> {
use purge::cip36_registration_for_vote_key::{DeleteQuery, Params, PrimaryKeyQuery};

// Get all keys
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
// Filter
let mut delete_params: Vec<Params> = Vec::new();
while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
let params: Params = primary_key.into();
if &params.slot_no <= purge_to_slot {
delete_params.push(params);
}
}
// Delete filtered keys
DeleteQuery::execute(session, delete_params).await?;
Ok(())
}

/// Purge data from `cip36_registration_invalid`.
async fn purge_cip36_registration_invalid(
session: &Arc<CassandraSession>, purge_to_slot: &num_bigint::BigInt,
) -> anyhow::Result<()> {
use purge::cip36_registration_invalid::{DeleteQuery, Params, PrimaryKeyQuery};

// Get all keys
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
// Filter
let mut delete_params: Vec<Params> = Vec::new();
while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
let params: Params = primary_key.into();
if &params.slot_no <= purge_to_slot {
delete_params.push(params);
}
}
// Delete filtered keys
DeleteQuery::execute(session, delete_params).await?;
Ok(())
}

/// Purge data from `rbac509_registration`.
async fn purge_rbac509_registration(
session: &Arc<CassandraSession>, purge_to_slot: &num_bigint::BigInt,
) -> anyhow::Result<()> {
use purge::rbac509_registration::{DeleteQuery, Params, PrimaryKeyQuery};

// Get all keys
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
// Filter
let mut delete_params: Vec<Params> = Vec::new();
while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
let params: Params = primary_key.into();
if &params.slot_no <= purge_to_slot {
delete_params.push(params);
}
}
// Delete filtered keys
DeleteQuery::execute(session, delete_params).await?;
Ok(())
}

/// Purge data from `stake_registration`.
async fn purge_stake_registration(
session: &Arc<CassandraSession>, purge_to_slot: &num_bigint::BigInt,
) -> anyhow::Result<()> {
use purge::stake_registration::{DeleteQuery, Params, PrimaryKeyQuery};

// Get all keys
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
// Filter
let mut delete_params: Vec<Params> = Vec::new();
while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
let params: Params = primary_key.into();
if &params.slot_no <= purge_to_slot {
delete_params.push(params);
}
}
// Delete filtered keys
DeleteQuery::execute(session, delete_params).await?;
Ok(())
}

/// Purge data from `txi_by_hash`.
async fn purge_txi_by_hash(
session: &Arc<CassandraSession>, purge_to_slot: &num_bigint::BigInt,
) -> anyhow::Result<HashSet<Vec<u8>>> {
use purge::txi_by_hash::{DeleteQuery, Params, PrimaryKeyQuery};

// Get all keys
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
// Filter
let mut delete_params: Vec<Params> = Vec::new();
let mut txn_hashes: HashSet<Vec<u8>> = HashSet::new();
while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
if &primary_key.2 <= purge_to_slot {
let params: Params = primary_key.into();
txn_hashes.insert(params.txn_hash.clone());
delete_params.push(params);
}
}
// Delete filtered keys
DeleteQuery::execute(session, delete_params).await?;
Ok(txn_hashes)
}

/// Purge data from `txo_ada`.
async fn purge_txo_ada(
session: &Arc<CassandraSession>, purge_to_slot: &num_bigint::BigInt,
) -> anyhow::Result<()> {
use purge::txo_ada::{DeleteQuery, Params, PrimaryKeyQuery};

// Get all keys
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
// Filter
let mut delete_params: Vec<Params> = Vec::new();
while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
let params: Params = primary_key.into();
if &params.slot_no <= purge_to_slot {
delete_params.push(params);
}
}
// Delete filtered keys
DeleteQuery::execute(session, delete_params).await?;
Ok(())
}

/// Purge data from `txo_assets`.
async fn purge_txo_assets(
session: &Arc<CassandraSession>, purge_to_slot: &num_bigint::BigInt,
) -> anyhow::Result<()> {
use purge::txo_assets::{DeleteQuery, Params, PrimaryKeyQuery};

// Get all keys
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
// Filter
let mut delete_params: Vec<Params> = Vec::new();
while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
let params: Params = primary_key.into();
if &params.slot_no <= purge_to_slot {
delete_params.push(params);
}
}
// Delete filtered keys
DeleteQuery::execute(session, delete_params).await?;
Ok(())
}

/// Purge data from `unstaked_txo_ada`.
async fn purge_unstaked_txo_ada(
session: &Arc<CassandraSession>, purge_to_slot: &num_bigint::BigInt,
) -> anyhow::Result<()> {
use purge::unstaked_txo_ada::{DeleteQuery, Params, PrimaryKeyQuery};

// Get all keys
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
// Filter
let mut delete_params: Vec<Params> = Vec::new();
while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
if &primary_key.2 <= purge_to_slot {
let params: Params = primary_key.into();
delete_params.push(params);
}
}
// Delete filtered keys
DeleteQuery::execute(session, delete_params).await?;
Ok(())
}

/// Purge data from `unstaked_txo_assets`.
async fn purge_unstaked_txo_assets(
session: &Arc<CassandraSession>, purge_to_slot: &num_bigint::BigInt,
) -> anyhow::Result<()> {
use purge::unstaked_txo_assets::{DeleteQuery, Params, PrimaryKeyQuery};

// Get all keys
let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
// Filter
let mut delete_params: Vec<Params> = Vec::new();
while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
let params: Params = primary_key.into();
if &params.slot_no <= purge_to_slot {
delete_params.push(params);
}
}
// Delete filtered keys
DeleteQuery::execute(session, delete_params).await?;
Ok(())
}
Loading

0 comments on commit 4b3ba29

Please sign in to comment.