From cd231bd9215a479fc941230500f3874a26b3a806 Mon Sep 17 00:00:00 2001 From: sfauvel Date: Wed, 20 Nov 2024 12:14:54 +0100 Subject: [PATCH] Remove previous stake distribution before inserting a new one for an epoch --- .../query/stake_pool/delete_stake_pool.rs | 9 +- .../query/stake_pool/get_stake_pool.rs | 58 ++++--- .../database/repository/stake_pool_store.rs | 6 + .../src/database/tests/stake_pool.rs | 155 +++++++++++++++++- 4 files changed, 195 insertions(+), 33 deletions(-) diff --git a/mithril-signer/src/database/query/stake_pool/delete_stake_pool.rs b/mithril-signer/src/database/query/stake_pool/delete_stake_pool.rs index d77d78664fa..5c7f19c0b38 100644 --- a/mithril-signer/src/database/query/stake_pool/delete_stake_pool.rs +++ b/mithril-signer/src/database/query/stake_pool/delete_stake_pool.rs @@ -1,6 +1,6 @@ use sqlite::Value; -use mithril_common::entities::Epoch; +use mithril_common::{entities::Epoch, StdResult}; use mithril_persistence::sqlite::{Query, SourceAlias, SqLiteEntity, WhereCondition}; use crate::database::record::StakePool; @@ -37,6 +37,13 @@ impl DeleteStakePoolQuery { Self { condition } } + + /// Create the SQL query to delete the given Epoch. + pub fn by_epoch(epoch: Epoch) -> StdResult { + let condition = WhereCondition::new("epoch = ?*", vec![Value::Integer(epoch.try_into()?)]); + + Ok(Self { condition }) + } } #[cfg(test)] diff --git a/mithril-signer/src/database/query/stake_pool/get_stake_pool.rs b/mithril-signer/src/database/query/stake_pool/get_stake_pool.rs index 7289651622b..032e48ca465 100644 --- a/mithril-signer/src/database/query/stake_pool/get_stake_pool.rs +++ b/mithril-signer/src/database/query/stake_pool/get_stake_pool.rs @@ -18,6 +18,13 @@ impl GetStakePoolQuery { Ok(Self { condition }) } + + #[cfg(test)] + pub(crate) fn all() -> Self { + Self { + condition: WhereCondition::default(), + } + } } impl Query for GetStakePoolQuery { @@ -37,41 +44,38 @@ impl Query for GetStakePoolQuery { #[cfg(test)] mod tests { - use crate::database::test_helper::{insert_stake_pool, main_db_connection}; - use mithril_persistence::sqlite::ConnectionExtensions; - use super::*; + use crate::database::{query::InsertOrReplaceStakePoolQuery, test_helper::main_db_connection}; + use mithril_persistence::sqlite::ConnectionExtensions; #[test] - fn test_get_stake_pools() { + fn test_query_sorts_the_return_stake_pool_by_epoch_stack_and_stake_pool_id() { let connection = main_db_connection().unwrap(); - insert_stake_pool(&connection, &[1, 2, 3]).unwrap(); - - let mut cursor = connection - .fetch(GetStakePoolQuery::by_epoch(Epoch(1)).unwrap()) + connection + .apply(InsertOrReplaceStakePoolQuery::many(vec![ + ("pool-A".to_string(), Epoch(1), 1500), + ("pool-D".to_string(), Epoch(2), 1250), + ("pool-B".to_string(), Epoch(1), 1000), + ("pool-E".to_string(), Epoch(1), 1600), + ("pool-C".to_string(), Epoch(1), 1600), + ])) .unwrap(); - let stake_pool = cursor.next().expect("Should have a stake pool 'pool3'."); - assert_eq!( - ("pool3".to_string(), Epoch(1), 1200), - (stake_pool.stake_pool_id, stake_pool.epoch, stake_pool.stake) - ); - assert_eq!(2, cursor.count()); - - let mut cursor = connection - .fetch(GetStakePoolQuery::by_epoch(Epoch(3)).unwrap()) - .unwrap(); + let stake_pool_in_database: Vec = + connection.fetch_collect(GetStakePoolQuery::all()).unwrap(); - let stake_pool = cursor.next().expect("Should have a stake pool 'pool2'."); assert_eq!( - ("pool2".to_string(), Epoch(3), 1190), - (stake_pool.stake_pool_id, stake_pool.epoch, stake_pool.stake) + vec![ + ("pool-C".to_string(), Epoch(1), 1600), + ("pool-E".to_string(), Epoch(1), 1600), + ("pool-A".to_string(), Epoch(1), 1500), + ("pool-B".to_string(), Epoch(1), 1000), + ("pool-D".to_string(), Epoch(2), 1250), + ], + stake_pool_in_database + .into_iter() + .map(|s| (s.stake_pool_id, s.epoch, s.stake)) + .collect::>() ); - assert_eq!(2, cursor.count()); - - let cursor = connection - .fetch(GetStakePoolQuery::by_epoch(Epoch(5)).unwrap()) - .unwrap(); - assert_eq!(0, cursor.count()); } } diff --git a/mithril-signer/src/database/repository/stake_pool_store.rs b/mithril-signer/src/database/repository/stake_pool_store.rs index afcf62f9e49..d412e94558d 100644 --- a/mithril-signer/src/database/repository/stake_pool_store.rs +++ b/mithril-signer/src/database/repository/stake_pool_store.rs @@ -42,6 +42,12 @@ impl StakeStorer for StakePoolStore { epoch: Epoch, stakes: StakeDistribution, ) -> StdResult> { + // We should create a transaction including delete and insert but it's not possible + // with the current implementation because the connection is shared. + self.connection + .apply(DeleteStakePoolQuery::by_epoch(epoch)?) + .with_context(|| format!("delete stakes failure, epoch: {epoch}"))?; + let pools: Vec = self .connection .fetch_collect(InsertOrReplaceStakePoolQuery::many( diff --git a/mithril-signer/src/database/tests/stake_pool.rs b/mithril-signer/src/database/tests/stake_pool.rs index c1495d2496d..c514b1bd727 100644 --- a/mithril-signer/src/database/tests/stake_pool.rs +++ b/mithril-signer/src/database/tests/stake_pool.rs @@ -25,18 +25,163 @@ mod request { #[tokio::test] async fn retrieve_returns_stake_distribution() { - let stake_distribution_to_retrieve = - StakeDistribution::from([("pool-123".to_string(), 123)]); + let stake_distribution_epoch_100 = + StakeDistribution::from([("pool-A".to_string(), 1000), ("pool-B".to_string(), 1200)]); + let stake_distribution_epoch_200 = StakeDistribution::from([ + ("pool-A".to_string(), 2500), + ("pool-B".to_string(), 2000), + ("pool-C".to_string(), 2600), + ]); let connection = main_db_connection().unwrap(); let store = StakePoolStore::new(Arc::new(connection), None); store - .save_stakes(Epoch(1), stake_distribution_to_retrieve.clone()) + .save_stakes(Epoch(100), stake_distribution_epoch_100.clone()) + .await + .unwrap(); + store + .save_stakes(Epoch(200), stake_distribution_epoch_200.clone()) .await .unwrap(); - let stake_distribution = store.retrieve(Epoch(1)).await.unwrap(); + { + let stake_distribution_in_database = store.retrieve(Epoch(100)).await.unwrap().unwrap(); - assert_eq!(stake_distribution, Some(stake_distribution_to_retrieve)); + assert_eq!(2, stake_distribution_in_database.len()); + assert_eq!(1000, stake_distribution_in_database["pool-A"]); + assert_eq!(1200, stake_distribution_in_database["pool-B"]); + } + + { + let stake_distribution_in_database = store.retrieve(Epoch(200)).await.unwrap().unwrap(); + + assert_eq!(3, stake_distribution_in_database.len()); + assert_eq!(2500, stake_distribution_in_database["pool-A"]); + assert_eq!(2000, stake_distribution_in_database["pool-B"]); + assert_eq!(2600, stake_distribution_in_database["pool-C"]); + } + } + + #[tokio::test] + async fn save_stake_distribution_return_inserted_records() { + let epoch = Epoch(100); + + let connection = main_db_connection().unwrap(); + let store = StakePoolStore::new(Arc::new(connection), None); + + { + let stake_distribution = StakeDistribution::from([ + ("pool-A".to_string(), 1000), + ("pool-B".to_string(), 1200), + ]); + + let save_result = store + .save_stakes(epoch, stake_distribution.clone()) + .await + .unwrap(); + + assert_eq!(stake_distribution, save_result.unwrap()); + } + + { + let stake_distribution = StakeDistribution::from([ + ("pool-A".to_string(), 2000), + ("pool-C".to_string(), 2300), + ]); + + let save_result = store + .save_stakes(epoch, stake_distribution.clone()) + .await + .unwrap(); + + assert_eq!(stake_distribution, save_result.unwrap()); + } + } + + #[tokio::test] + async fn save_stake_distribution_replace_all_stake_for_the_epoch() { + let epoch = Epoch(100); + + let connection = main_db_connection().unwrap(); + let store = StakePoolStore::new(Arc::new(connection), None); + + { + let stake_distribution = StakeDistribution::from([ + ("pool-A".to_string(), 1000), + ("pool-B".to_string(), 1200), + ]); + store + .save_stakes(epoch, stake_distribution.clone()) + .await + .unwrap(); + + let stake_distribution_in_database = store.retrieve(epoch).await.unwrap().unwrap(); + + assert_eq!(2, stake_distribution_in_database.len()); + assert_eq!(1000, stake_distribution_in_database["pool-A"]); + assert_eq!(1200, stake_distribution_in_database["pool-B"]); + } + + { + let stake_distribution = StakeDistribution::from([ + ("pool-B".to_string(), 2000), + ("pool-C".to_string(), 2300), + ]); + store + .save_stakes(epoch, stake_distribution.clone()) + .await + .unwrap(); + + let stake_distribution_in_database = store.retrieve(epoch).await.unwrap().unwrap(); + + assert_eq!(2, stake_distribution_in_database.len()); + assert_eq!(2000, stake_distribution_in_database["pool-B"]); + assert_eq!(2300, stake_distribution_in_database["pool-C"]); + } + } + + #[tokio::test] + async fn save_stake_distribution_do_not_change_other_epoch() { + let connection = main_db_connection().unwrap(); + let store = StakePoolStore::new(Arc::new(connection), None); + + let stake_distribution_99 = StakeDistribution::from([("pool-A".to_string(), 50)]); + store + .save_stakes(Epoch(99), stake_distribution_99.clone()) + .await + .unwrap(); + + let stake_distribution_100 = StakeDistribution::from([("pool-A".to_string(), 1000)]); + store + .save_stakes(Epoch(100), stake_distribution_100.clone()) + .await + .unwrap(); + + let stake_distribution_101 = StakeDistribution::from([("pool-A".to_string(), 5000)]); + store + .save_stakes(Epoch(101), stake_distribution_101.clone()) + .await + .unwrap(); + + { + let stake_distribution_100_updated = + StakeDistribution::from([("pool-A".to_string(), 1111)]); + store + .save_stakes(Epoch(100), stake_distribution_100_updated.clone()) + .await + .unwrap(); + + let stake_distribution_in_database = store.retrieve(Epoch(100)).await.unwrap().unwrap(); + assert_eq!( + stake_distribution_100_updated, + stake_distribution_in_database + ); + + let stake_distribution_in_database = store.retrieve(Epoch(99)).await.unwrap().unwrap(); + assert_eq!(stake_distribution_99, stake_distribution_in_database); + + let stake_distribution_in_database = store.retrieve(Epoch(101)).await.unwrap().unwrap(); + assert_eq!(stake_distribution_101, stake_distribution_in_database); + } } }