Skip to content

Commit

Permalink
feat: add dispute manager eventual
Browse files Browse the repository at this point in the history
  • Loading branch information
Jannis committed Oct 17, 2023
1 parent 93e9c95 commit ecd9751
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 11 deletions.
138 changes: 138 additions & 0 deletions common/src/attestations/dispute_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright 2023-, GraphOps and Semiotic Labs.
// SPDX-License-Identifier: Apache-2.0

use std::time::Duration;

use alloy_primitives::Address;
use eventuals::{timer, Eventual, EventualExt};
use log::warn;
use serde::Deserialize;
use serde_json::json;
use tokio::time::sleep;

use crate::network_subgraph::NetworkSubgraph;

pub fn dispute_manager(
network_subgraph: &'static NetworkSubgraph,
graph_network_id: u64,
interval: Duration,
) -> Eventual<Address> {
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct DisputeManagerResponse {
graph_network: Option<GraphNetwork>,
}

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct GraphNetwork {
dispute_manager: Address,
}

timer(interval).map_with_retry(
move |_| async move {
let response = network_subgraph
.query::<DisputeManagerResponse>(&json!({
"query": r#"
query network($id: ID!) {
graphNetwork(id: $id) {
disputeManager
}
}
"#,
"variables": {
"id": graph_network_id
}
}))
.await
.map_err(|e| e.to_string())?;

if let Some(errors) = response.errors {
warn!(
"Errors encountered querying the dispute manager for network {}: {}",
graph_network_id,
errors
.into_iter()
.map(|e| e.message)
.collect::<Vec<_>>()
.join(", ")
);
}

response
.data
.and_then(|data| data.graph_network)
.map(|network| network.dispute_manager)
.ok_or_else(|| {
format!("Network {} not found in network subgraph", graph_network_id)
})
},
move |err: String| {
warn!(
"Failed to query dispute manager for network {}: {}",
graph_network_id, err,
);

// Sleep for a bit before we retry
sleep(interval.div_f32(2.0))
},
)
}

#[cfg(test)]
mod test {
use serde_json::json;
use wiremock::{
matchers::{method, path},
Mock, MockServer, ResponseTemplate,
};

use crate::{
prelude::NetworkSubgraph,
test_vectors::{self, DISPUTE_MANAGER_ADDRESS},
};

use super::*;

async fn setup_mock_network_subgraph() -> (&'static NetworkSubgraph, MockServer) {
// Set up a mock network subgraph
let mock_server = MockServer::start().await;
let network_subgraph_endpoint = NetworkSubgraph::local_deployment_endpoint(
&mock_server.uri(),
&test_vectors::NETWORK_SUBGRAPH_DEPLOYMENT,
);
let network_subgraph = NetworkSubgraph::new(
Some(&mock_server.uri()),
Some(&test_vectors::NETWORK_SUBGRAPH_DEPLOYMENT),
network_subgraph_endpoint.as_ref(),
);

// Mock result for current epoch requests
mock_server
.register(
Mock::given(method("POST"))
.and(path(format!(
"/subgraphs/id/{}",
*test_vectors::NETWORK_SUBGRAPH_DEPLOYMENT
)))
.respond_with(ResponseTemplate::new(200).set_body_json(
json!({ "data": { "graphNetwork": { "disputeManager": *DISPUTE_MANAGER_ADDRESS }}}),
)),
)
.await;

(Box::leak(Box::new(network_subgraph)), mock_server)
}

#[test_log::test(tokio::test)]
async fn test_parses_dispute_manager_from_network_subgraph_correctly() {
let (network_subgraph, _mock_server) = setup_mock_network_subgraph().await;

let dispute_manager = dispute_manager(network_subgraph, 1, Duration::from_secs(60));

assert_eq!(
dispute_manager.value().await.unwrap(),
*DISPUTE_MANAGER_ADDRESS
);
}
}
1 change: 1 addition & 0 deletions common/src/attestations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@

pub mod signer;
pub mod signers;
pub mod dispute_manager;
11 changes: 7 additions & 4 deletions common/src/attestations/signers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use alloy_primitives::Address;
use ethers_core::types::U256;
use eventuals::{Eventual, EventualExt};
use eventuals::{join, Eventual, EventualExt};
use log::warn;
use std::collections::HashMap;
use std::sync::Arc;
Expand All @@ -16,7 +16,7 @@ pub fn attestation_signers(
indexer_allocations: Eventual<HashMap<Address, Allocation>>,
indexer_mnemonic: String,
chain_id: U256,
dispute_manager: Address,
dispute_manager: Eventual<Address>,
) -> Eventual<HashMap<Address, AttestationSigner>> {
let attestation_signers_map: &'static Mutex<HashMap<Address, AttestationSigner>> =
Box::leak(Box::new(Mutex::new(HashMap::new())));
Expand All @@ -25,7 +25,7 @@ pub fn attestation_signers(

// Whenever the indexer's active or recently closed allocations change, make sure
// we have attestation signers for all of them
indexer_allocations.map(move |allocations| {
join((indexer_allocations, dispute_manager)).map(move |(allocations, dispute_manager)| {
let indexer_mnemonic = indexer_mnemonic.clone();
async move {
let mut signers = attestation_signers_map.lock().await;
Expand Down Expand Up @@ -72,12 +72,15 @@ mod tests {
#[tokio::test]
async fn test_attestation_signers_update_with_allocations() {
let (mut allocations_writer, allocations) = Eventual::<HashMap<Address, Allocation>>::new();
let (mut dispute_manager_writer, dispute_manager) = Eventual::<Address>::new();

dispute_manager_writer.write(*DISPUTE_MANAGER_ADDRESS);

let signers = attestation_signers(
allocations,
(*INDEXER_OPERATOR_MNEMONIC).to_string(),
U256::from(1),
*DISPUTE_MANAGER_ADDRESS,
dispute_manager,
);
let mut signers = signers.subscribe();

Expand Down
4 changes: 3 additions & 1 deletion common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ pub mod prelude {
pub use super::allocations::{
monitor::indexer_allocations, Allocation, AllocationStatus, SubgraphDeployment,
};
pub use super::attestations::{signer::AttestationSigner, signers::attestation_signers};
pub use super::attestations::{
dispute_manager::dispute_manager, signer::AttestationSigner, signers::attestation_signers,
};
pub use super::network_subgraph::NetworkSubgraph;
}
17 changes: 11 additions & 6 deletions service/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright 2023-, GraphOps and Semiotic Labs.
// SPDX-License-Identifier: Apache-2.0

use alloy_primitives::Address;
use alloy_sol_types::eip712_domain;
use axum::Server;
use dotenvy::dotenv;
Expand All @@ -10,7 +9,9 @@ use std::{net::SocketAddr, str::FromStr, time::Duration};
use toolshed::thegraph::DeploymentId;
use tracing::info;

use indexer_common::prelude::{attestation_signers, indexer_allocations, NetworkSubgraph};
use indexer_common::prelude::{
attestation_signers, dispute_manager, indexer_allocations, NetworkSubgraph,
};

use util::{package_version, shutdown_signal};

Expand Down Expand Up @@ -86,13 +87,17 @@ async fn main() -> Result<(), std::io::Error> {
Duration::from_secs(config.network_subgraph.allocation_syncing_interval),
);

// TODO: Chain ID should be a config
let graph_network_id = 1;

let dispute_manager =
dispute_manager(network_subgraph, graph_network_id, Duration::from_secs(60));

let attestation_signers = attestation_signers(
indexer_allocations.clone(),
config.ethereum.mnemonic.clone(),
// TODO: Chain ID should be a config
U256::from(1),
// TODO: Dispute manager address should be a config
Address::from_str("0xdeadbeefcafebabedeadbeefcafebabedeadbeef").unwrap(),
U256::from(graph_network_id),
dispute_manager,
);

// Establish Database connection necessary for serving indexer management
Expand Down

0 comments on commit ecd9751

Please sign in to comment.