Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: dynamic blocklist #1008

Merged
merged 4 commits into from
Dec 17, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat: unify blocklist
Theodus committed Dec 16, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit e2f278a26e3764121f5520ad241b949840734c19
47 changes: 24 additions & 23 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
//! The Graph Gateway configuration.
use std::{
collections::{BTreeMap, HashSet},
path::{Path, PathBuf},
@@ -26,9 +24,9 @@ pub struct Config {
#[serde(default)]
pub api_keys: Option<ApiKeys>,
pub attestations: AttestationConfig,
/// List of indexer addresses to block. This should only be used temprorarily.
/// Blocklist applying to indexers.
#[serde(default)]
pub blocked_indexers: BTreeMap<Address, BlockedIndexer>,
pub blocklist: Vec<BlocklistEntry>,
/// Chain aliases
#[serde(default)]
pub chain_aliases: BTreeMap<String, String>,
@@ -53,9 +51,6 @@ pub struct Config {
pub trusted_indexers: Vec<TrustedIndexer>,
/// Check payment state of client (disable for testnets)
pub payment_required: bool,
/// POI blocklist
#[serde(default)]
pub poi_blocklist: Vec<BlockedPoi>,
/// public API port
pub port_api: u16,
/// private metrics port
@@ -96,11 +91,28 @@ pub enum ApiKeys {
Fixed(Vec<ApiKey>),
}

#[derive(Deserialize)]
pub struct BlockedIndexer {
/// empty array blocks on all deployments
pub deployments: Vec<DeploymentId>,
pub reason: String,
#[derive(Clone, Deserialize, Serialize)]
#[serde(untagged)]
pub enum BlocklistEntry {
Poi {
deployment: DeploymentId,
info: BlocklistInfo,
public_poi: B256,
block: BlockNumber,
},
Other {
deployment: DeploymentId,
info: BlocklistInfo,
indexer: Address,
},
}

#[derive(Clone, Deserialize, Serialize)]
pub struct BlocklistInfo {
/// Example query (should be minimal to reproduce bad response)
query: Option<String>,
/// Bad query response, from the above query executed on indexers with this blocked PoI
bad_query_response: Option<String>,
}

/// Attestation configuration.
@@ -171,17 +183,6 @@ pub struct Receipts {
pub verifier: Address,
}

#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct BlockedPoi {
pub public_poi: B256,
pub deployment: DeploymentId,
pub block_number: BlockNumber,
/// Example query (should be minimal to reproduce bad response)
pub query: Option<String>,
/// Bad query response, from the above query executed on indexers with this blocked PoI
pub bad_query_response: Option<String>,
}

/// Load the configuration from a JSON file.
pub fn load_from_file(path: &Path) -> anyhow::Result<Config> {
let config_content = std::fs::read_to_string(path)?;
14 changes: 9 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -40,6 +40,7 @@ use budgets::{Budgeter, USD};
use chains::Chains;
use client_query::context::Context;
use config::{ApiKeys, ExchangeRateProvider};
use headers::ContentType;
use indexer_client::IndexerClient;
use indexing_performance::IndexingPerformance;
use middleware::{
@@ -52,6 +53,7 @@ use thegraph_core::{
alloy::{dyn_abi::Eip712Domain, primitives::ChainId, signers::local::PrivateKeySigner},
attestation,
};
use thegraph_headers::HttpBuilderExt;
use tokio::{net::TcpListener, signal::unix::SignalKind, sync::watch};
use tower_http::cors::{self, CorsLayer};
use tracing_subscriber::{prelude::*, EnvFilter};
@@ -112,11 +114,10 @@ async fn main() {
let mut network = network::service::spawn(
http_client.clone(),
network_subgraph_client,
conf.blocklist.clone(),
conf.min_indexer_version,
conf.min_graph_node_version,
conf.blocked_indexers,
indexer_host_blocklist,
conf.poi_blocklist.clone(),
);
let indexing_perf = IndexingPerformance::new(network.clone());
network.wait_until_ready().await;
@@ -157,7 +158,7 @@ async fn main() {
reporter,
};

let poi_blocklist: &'static str = serde_json::to_string(&conf.poi_blocklist).unwrap().leak();
let blocklist: &'static str = serde_json::to_string(&conf.blocklist).unwrap().leak();

// Host metrics on a separate server with a port that isn't open to public requests.
tokio::spawn(async move {
@@ -226,8 +227,11 @@ async fn main() {
.route(
"/blocklist",
routing::get(move || async move {
let headers = [(reqwest::header::CONTENT_TYPE, "application/json")];
(headers, poi_blocklist)
axum::http::Response::builder()
.status(StatusCode::OK)
.header_typed(ContentType::json())
.body(blocklist.to_string())
.unwrap()
}),
)
.nest("/api", api);
27 changes: 9 additions & 18 deletions src/network/indexer_processing.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};

use custom_debug::CustomDebug;
use thegraph_core::{alloy::primitives::BlockNumber, AllocationId, DeploymentId, IndexerId};
use tracing::Instrument;
use url::Url;

use crate::{
config::BlockedIndexer,
errors::UnavailableReason,
network::{indexing_progress::IndexingProgressResolver, service::InternalState},
};
@@ -191,29 +190,21 @@ async fn process_indexer_indexings(
state: &InternalState,
url: &Url,
indexings: HashMap<DeploymentId, IndexingRawInfo>,
blocklist: Option<&BlockedIndexer>,
blocklist: Option<&HashSet<DeploymentId>>,
) -> HashMap<DeploymentId, Result<ResolvedIndexingInfo, UnavailableReason>> {
let mut indexer_indexings: HashMap<DeploymentId, Result<IndexingInfo<(), ()>, _>> = indexings
.into_iter()
.map(|(id, info)| (id, Ok(info.into())))
.collect();

match blocklist {
None => (),
Some(blocklist) if blocklist.deployments.is_empty() => {
for entry in indexer_indexings.values_mut() {
*entry = Err(UnavailableReason::Blocked(blocklist.reason.clone()));
}
}
Some(blocklist) => {
for deployment in &blocklist.deployments {
indexer_indexings.insert(
*deployment,
Err(UnavailableReason::Blocked(blocklist.reason.clone())),
);
}
if let Some(blocklist) = blocklist {
for deployment in blocklist {
indexer_indexings.insert(
*deployment,
Err(UnavailableReason::Blocked("missing data".to_string())),
);
}
};
}

// ref: df8e647b-1e6e-422a-8846-dc9ee7e0dcc2
let status_url = url.join("status").unwrap();
57 changes: 33 additions & 24 deletions src/network/service.rs
Original file line number Diff line number Diff line change
@@ -3,15 +3,15 @@
//! query processing pipeline
use std::{
collections::{BTreeMap, HashMap, HashSet},
collections::{HashMap, HashSet},
time::Duration,
};

use ipnetwork::IpNetwork;
use semver::Version;
use thegraph_core::{
alloy::primitives::{Address, BlockNumber},
DeploymentId, IndexerId, SubgraphId,
DeploymentId, IndexerId, ProofOfIndexing, SubgraphId,
};
use tokio::{sync::watch, time::MissedTickBehavior};

@@ -28,10 +28,7 @@ use super::{
version_filter::{MinimumVersionRequirements, VersionFilter},
DeploymentError, SubgraphError,
};
use crate::{
config::{BlockedIndexer, BlockedPoi},
errors::UnavailableReason,
};
use crate::{config::BlocklistEntry, errors::UnavailableReason};

/// Subgraph resolution information returned by the [`NetworkService`].
pub struct ResolvedSubgraphInfo {
@@ -164,28 +161,40 @@ impl NetworkService {
pub fn spawn(
http: reqwest::Client,
subgraph_client: SubgraphClient,
blocklist: Vec<BlocklistEntry>,
min_indexer_service_version: Version,
min_graph_node_version: Version,
indexer_blocklist: BTreeMap<Address, BlockedIndexer>,
indexer_host_blocklist: HashSet<IpNetwork>,
poi_blocklist: Vec<BlockedPoi>,
) -> NetworkService {
let poi_blocklist = poi_blocklist
.iter()
.map(|entry| &entry.deployment)
.collect::<HashSet<_>>()
.into_iter()
.map(|deployment| {
(
*deployment,
let mut poi_blocklist: HashMap<DeploymentId, Vec<(BlockNumber, ProofOfIndexing)>> =
Default::default();
let mut indexer_blocklist: HashMap<Address, HashSet<DeploymentId>> = Default::default();
for entry in blocklist {
match entry {
BlocklistEntry::Poi {
deployment,
block,
public_poi,
..
} => {
poi_blocklist
.iter()
.filter(|entry| &entry.deployment == deployment)
.map(|entry| (entry.block_number, entry.public_poi.into()))
.collect::<Vec<_>>(),
)
})
.collect();
.entry(deployment)
.or_default()
.push((block, public_poi.into()));
}
BlocklistEntry::Other {
deployment,
indexer,
..
} => {
indexer_blocklist
.entry(indexer)
.or_default()
.insert(deployment);
}
};
}

let internal_state = InternalState {
indexer_blocklist,
indexer_host_filter: HostFilter::new(indexer_host_blocklist)
@@ -207,7 +216,7 @@ pub fn spawn(
}

pub struct InternalState {
pub indexer_blocklist: BTreeMap<Address, BlockedIndexer>,
pub indexer_blocklist: HashMap<Address, HashSet<DeploymentId>>,
pub indexer_host_filter: HostFilter,
pub indexer_version_filter: VersionFilter,
pub indexer_poi_filer: PoiFilter,