Skip to content

Commit

Permalink
Merge pull request #2086 from input-output-hk/sfa/2067/create_view_fo…
Browse files Browse the repository at this point in the history
…r_registrations_monitoring

Create view for registrations monitoring
  • Loading branch information
sfauvel authored Nov 5, 2024
2 parents f67adf1 + b490869 commit df88acf
Show file tree
Hide file tree
Showing 8 changed files with 264 additions and 63 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions docs/runbook/registrations-monitoring/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@

In order to set the epoch of Era changes, it is required to know the
distribution of the stakes that run a compatible Signer node. There is a SQL
query for that.
view for that.

```sh
$> sqlite3 -table -batch \
$DATA_STORES_DIRECTORY/monitoring.sqlite3 \
< stake_signer_version.sql
`select epoch, version, total_epoch_stakes, stakes_version, stakes_ratio, pool_count from signer_registration_summary;`
```

The variable `$DATA_STORES_DIRECTORY` should point to the directory where the
Expand Down
44 changes: 0 additions & 44 deletions docs/runbook/registrations-monitoring/stake_signer_version.sql

This file was deleted.

2 changes: 1 addition & 1 deletion mithril-aggregator/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mithril-aggregator"
version = "0.5.101"
version = "0.5.102"
description = "A Mithril Aggregator server"
authors = { workspace = true }
edition = { workspace = true }
Expand Down
49 changes: 49 additions & 0 deletions mithril-aggregator/src/event_store/database/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,54 @@ group by action, date;
create index metric_date_index on event(date(json_extract(content, '$.content.date')));
"#,
),
SqlMigration::new(
3,
r#"
create view signer_registration_summary as with
signer_version as (
select
json_extract(content, '$.content.party_id') as party_id,
json_extract(content, '$.headers.signer-node-version') as node_version,
json_extract(content, '$.headers.epoch') as epoch,
json_extract(content, '$.content.stake') as stakes
from event
where action='register_signer'
order by created_at desc
),
stakes_version as (
select
party_id,
case
when instr(signer_version.node_version, '+') > 0
then substr(signer_version.node_version, 0, instr(signer_version.node_version, '+'))
else signer_version.node_version
end as version,
stakes,
cast(epoch as int) as epoch
from signer_version
group by party_id, epoch
),
summed_stakes_version as (
select
epoch,
version,
party_id,
sum(stakes) over (partition by epoch) as total_epoch_stakes,
sum(stakes) over (partition by epoch, version) as stakes_version
from stakes_version
order by epoch desc
)
select
epoch,
version,
total_epoch_stakes,
stakes_version,
printf('%02d %%', round((stakes_version * 100) / (total_epoch_stakes * 1.0))) as stakes_ratio,
count(party_id) as pool_count
from summed_stakes_version
group by epoch, version
order by epoch desc, version desc;
"#,
),
]
}
178 changes: 178 additions & 0 deletions mithril-aggregator/src/event_store/database/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,4 +146,182 @@ mod tests {
assert!(result.contains(&("2024-10-30".to_string(), "metric_1".to_string(), 16)));
}
}

mod signer_registration_summary {
use std::sync::Arc;

use crate::event_store::{
database::test_helper::event_store_db_connection, TransmitterService,
};
use crate::test_tools::TestLogger;
use mithril_common::entities::Stake;
use mithril_common::{entities::SignerWithStake, test_utils::fake_data, StdResult};
use sqlite::ConnectionThreadSafe;

use super::{EventMessage, EventPersister};

/// Insert a signer registration event in the database.
fn insert_registration_event(
persister: &EventPersister,
epoch: &str,
party_id: &str,
stake: Stake,
signer_node_version: &str,
) {
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<EventMessage>();
let transmitter_service = Arc::new(TransmitterService::new(tx, TestLogger::stdout()));

let signers = fake_data::signers_with_stakes(1);
let signer = SignerWithStake {
party_id: party_id.to_string(),
stake,
..signers[0].clone()
};

let _ = transmitter_service.send_signer_registration_event(
"Test",
&signer,
Some(signer_node_version.to_string()),
epoch,
);

let message: EventMessage = rx.try_recv().unwrap();

let _event = persister.persist(message).unwrap();
}

#[derive(PartialEq)]
struct StakeSignerVersion {
epoch: i64,
version: String,
total_epoch_stakes: i64,
stakes_version: i64,
stakes_ratio: String,
pool_count: i64,
}
impl StakeSignerVersion {
fn new(
epoch: i64,
version: &str,
total_epoch_stakes: i64,
stakes_version: i64,
stakes_ratio: &str,
pool_count: i64,
) -> Self {
Self {
epoch,
version: version.to_string(),
total_epoch_stakes,
stakes_version,
stakes_ratio: stakes_ratio.to_string(),
pool_count,
}
}
}

fn get_all_registrations(
connection: Arc<ConnectionThreadSafe>,
) -> StdResult<Vec<StakeSignerVersion>> {
let query = "select
epoch,
version,
total_epoch_stakes,
stakes_version,
stakes_ratio,
pool_count
from signer_registration_summary;";
let mut statement = connection.prepare(query)?;
let mut result = Vec::new();
while let Ok(sqlite::State::Row) = statement.next() {
result.push(StakeSignerVersion::new(
statement.read::<i64, _>("epoch")?,
&statement.read::<String, _>("version")?,
statement.read::<i64, _>("total_epoch_stakes")?,
statement.read::<i64, _>("stakes_version")?,
&statement.read::<String, _>("stakes_ratio")?,
statement.read::<i64, _>("pool_count")?,
));
}

Ok(result)
}

#[test]
fn retrieved_node_version() {
let connection = Arc::new(event_store_db_connection().unwrap());
let persister = EventPersister::new(connection.clone());

insert_registration_event(&persister, "3", "A", 15, "0.2.234");
insert_registration_event(&persister, "4", "A", 15, "15.24.32");
insert_registration_event(&persister, "5", "A", 15, "0.4.789+ef0c28a");

let result = get_all_registrations(connection).unwrap();

assert!(result.contains(&StakeSignerVersion::new(3, "0.2.234", 15, 15, "100 %", 1)));
assert!(result.contains(&StakeSignerVersion::new(4, "15.24.32", 15, 15, "100 %", 1)));
assert!(result.contains(&StakeSignerVersion::new(5, "0.4.789", 15, 15, "100 %", 1)));
}

#[test]
fn retrieved_total_by_epoch() {
let connection = Arc::new(event_store_db_connection().unwrap());
let persister = EventPersister::new(connection.clone());

insert_registration_event(&persister, "8", "A", 20, "1.0.2");
insert_registration_event(&persister, "8", "B", 15, "1.0.2");
insert_registration_event(&persister, "9", "A", 56, "1.0.2");
insert_registration_event(&persister, "9", "B", 31, "1.0.2");
let result = get_all_registrations(connection).unwrap();

assert!(result.contains(&StakeSignerVersion::new(8, "1.0.2", 35, 35, "100 %", 2)));
assert!(result.contains(&StakeSignerVersion::new(9, "1.0.2", 87, 87, "100 %", 2)));
}

#[test]
fn retrieved_percentage_per_version() {
let connection = Arc::new(event_store_db_connection().unwrap());
let persister = EventPersister::new(connection.clone());

insert_registration_event(&persister, "8", "A", 90, "1.0.2");
insert_registration_event(&persister, "8", "B", 30, "1.0.2");
insert_registration_event(&persister, "8", "C", 80, "1.0.4");
let result = get_all_registrations(connection).unwrap();

assert!(result.contains(&StakeSignerVersion::new(8, "1.0.2", 200, 120, "60 %", 2)));
assert!(result.contains(&StakeSignerVersion::new(8, "1.0.4", 200, 80, "40 %", 1)));
}

#[test]
fn retrieved_percentage_per_epoch() {
let connection = Arc::new(event_store_db_connection().unwrap());
let persister = EventPersister::new(connection.clone());

insert_registration_event(&persister, "8", "A", 6, "1.0.2");
insert_registration_event(&persister, "8", "B", 4, "1.0.4");
insert_registration_event(&persister, "9", "A", 28, "1.0.2");
insert_registration_event(&persister, "9", "B", 12, "1.0.4");
let result = get_all_registrations(connection).unwrap();

assert!(result.contains(&StakeSignerVersion::new(8, "1.0.2", 10, 6, "60 %", 1)));
assert!(result.contains(&StakeSignerVersion::new(8, "1.0.4", 10, 4, "40 %", 1)));
assert!(result.contains(&StakeSignerVersion::new(9, "1.0.2", 40, 28, "70 %", 1)));
assert!(result.contains(&StakeSignerVersion::new(9, "1.0.4", 40, 12, "30 %", 1)));
}

#[test]
fn with_multi_registrations_for_an_epoch_only_the_last_recorded_one_is_retained() {
let connection = Arc::new(event_store_db_connection().unwrap());
let persister = EventPersister::new(connection.clone());

insert_registration_event(&persister, "8", "A", 6, "1.0.2");
insert_registration_event(&persister, "8", "A", 8, "1.0.2");
insert_registration_event(&persister, "8", "A", 10, "1.0.4");
insert_registration_event(&persister, "8", "A", 7, "1.0.3");

let result = get_all_registrations(connection).unwrap();

assert!(result.contains(&StakeSignerVersion::new(8, "1.0.3", 7, 7, "100 %", 1)));
assert!(result.len() == 1);
}
}
}
27 changes: 26 additions & 1 deletion mithril-aggregator/src/event_store/transmitter_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use slog::{warn, Logger};
use std::fmt::Debug;
use tokio::sync::mpsc::UnboundedSender;

use mithril_common::logging::LoggerExtensions;
use mithril_common::{entities::SignerWithStake, logging::LoggerExtensions};

use super::EventMessage;

Expand Down Expand Up @@ -69,4 +69,29 @@ impl TransmitterService<EventMessage> {
error_msg
})
}

/// Send signer registration event.
pub fn send_signer_registration_event(
&self,
source: &str,
signer_with_stake: &SignerWithStake,
signer_node_version: Option<String>,
epoch_str: &str,
) -> Result<(), String> {
let mut headers: Vec<(&str, &str)> = match signer_node_version.as_ref() {
Some(version) => vec![("signer-node-version", version)],
None => Vec::new(),
};

if !epoch_str.is_empty() {
headers.push(("epoch", epoch_str));
}

self.send_event_message::<SignerWithStake>(
source,
"register_signer",
signer_with_stake,
headers,
)
}
}
21 changes: 7 additions & 14 deletions mithril-aggregator/src/http_server/routes/signer_routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,37 +137,30 @@ mod handlers {
}
};

let mut headers: Vec<(&str, &str)> = match signer_node_version.as_ref() {
Some(version) => vec![("signer-node-version", version)],
None => Vec::new(),
};

let epoch_str = fetch_epoch_header_value(epoch_service, &logger).await;
if !epoch_str.is_empty() {
headers.push(("epoch", epoch_str.as_str()));
}

match signer_registerer
.register_signer(registration_epoch, &signer)
.await
{
Ok(signer_with_stake) => {
let _ = event_transmitter.send_event_message(
let _ = event_transmitter.send_signer_registration_event(
"HTTP::signer_register",
"register_signer",
&signer_with_stake,
headers,
signer_node_version,
epoch_str.as_str(),
);

Ok(reply::empty(StatusCode::CREATED))
}
Err(SignerRegistrationError::ExistingSigner(signer_with_stake)) => {
debug!(logger, "register_signer::already_registered");
let _ = event_transmitter.send_event_message(

let _ = event_transmitter.send_signer_registration_event(
"HTTP::signer_register",
"register_signer",
&signer_with_stake,
headers,
signer_node_version,
epoch_str.as_str(),
);
Ok(reply::empty(StatusCode::CREATED))
}
Expand Down

0 comments on commit df88acf

Please sign in to comment.