Skip to content

Commit

Permalink
feat: use manifest from network subgraph (#647)
Browse files Browse the repository at this point in the history
graphprotocol/graph-network-subgraph#266 allows
us to stop loading manifests from IPFS, and instead get the necessary
data from the network subgraph.
  • Loading branch information
Theodus authored Apr 15, 2024
1 parent 5010d64 commit de3b53f
Show file tree
Hide file tree
Showing 10 changed files with 27 additions and 176 deletions.
46 changes: 0 additions & 46 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ semver = { version = "1.0", features = ["serde"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0.115", features = ["raw_value"] }
serde_with = "3.7.0"
serde_yaml = "0.9"
siphasher = "1.0.1"
thegraph-core = "0.3.0"
thegraph-graphql-http = "0.2.0"
Expand Down
1 change: 0 additions & 1 deletion gateway-framework/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ secp256k1.workspace = true
serde.workspace = true
serde_json = { workspace = true, features = ["raw_value"] }
serde_with.workspace = true
serde_yaml.workspace = true
siphasher.workspace = true
tap_core = { git = "https://github.com/semiotic-ai/timeline-aggregation-protocol", rev = "c179dfe" }
thegraph-core = { workspace = true, features = ["subgraph-client"] }
Expand Down
32 changes: 0 additions & 32 deletions gateway-framework/src/ipfs.rs

This file was deleted.

1 change: 0 additions & 1 deletion gateway-framework/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ pub mod config;
pub mod errors;
pub mod graphql;
pub mod ip_blocker;
pub mod ipfs;
pub mod json;
pub mod network;
pub mod reporting;
Expand Down
16 changes: 15 additions & 1 deletion gateway-framework/src/network/network_subgraph.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{sync::Arc, time::Duration};

use alloy_primitives::Address;
use alloy_primitives::{Address, BlockNumber};
use eventuals::{self, Eventual, EventualExt as _, EventualWriter, Ptr};
use serde::Deserialize;
use serde_with::serde_as;
Expand All @@ -18,6 +18,15 @@ pub struct Subgraph {
pub versions: Vec<SubgraphVersion>,
}

#[serde_as]
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Manifest {
pub network: Option<String>,
#[serde_as(as = "Option<serde_with::DisplayFromStr>")]
pub start_block: Option<BlockNumber>,
}

#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SubgraphVersion {
Expand All @@ -31,6 +40,7 @@ pub struct SubgraphDeployment {
pub id: DeploymentId,
#[serde(rename = "indexerAllocations")]
pub allocations: Vec<Allocation>,
pub manifest: Option<Manifest>,
#[serde(default)]
pub transferred_to_l2: bool,
}
Expand Down Expand Up @@ -110,6 +120,10 @@ impl Client {
versions(orderBy: version, orderDirection: asc) {{
subgraphDeployment {{
ipfsHash
manifest {{
network
startBlock
}}
indexerAllocations(
first: 100
orderBy: createdAt, orderDirection: asc
Expand Down
94 changes: 10 additions & 84 deletions gateway-framework/src/topology/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,15 @@ use std::{
};

use alloy_primitives::Address;
use anyhow::anyhow;
use eventuals::{Eventual, EventualExt, Ptr};
use futures::future::join_all;
use gateway_common::types::Indexing;
use itertools::Itertools;
use serde::Deserialize;
use thegraph_core::types::{DeploymentId, SubgraphId};
use tokio::sync::{Mutex, RwLock};
use tokio::sync::Mutex;
use url::Url;

use crate::{ip_blocker::IpBlocker, ipfs, network::network_subgraph};
use crate::{ip_blocker::IpBlocker, network::network_subgraph};

/// Deployment manifest information needed for the gateway to work.
pub struct Manifest {
Expand Down Expand Up @@ -59,7 +57,7 @@ pub struct Subgraph {

pub struct Deployment {
pub id: DeploymentId,
pub manifest: Arc<Manifest>,
pub manifest: Manifest,
/// An indexer may have multiple active allocations on a deployment. We collapse them into a
/// single logical allocation using the largest allocation ID and sum of the allocated tokens.
pub indexers: HashMap<Address, Arc<Indexer>>,
Expand Down Expand Up @@ -87,19 +85,14 @@ pub struct GraphNetwork {
impl GraphNetwork {
pub async fn new(
subgraphs: Eventual<Ptr<Vec<network_subgraph::Subgraph>>>,
ipfs: Arc<ipfs::Client>,
ip_blocker: IpBlocker,
) -> Self {
let cache: &'static RwLock<IpfsCache> = Box::leak(Box::new(RwLock::new(IpfsCache {
ipfs,
manifests: HashMap::new(),
})));
let ip_blocker: &'static Mutex<IpBlocker> = Box::leak(Box::new(ip_blocker.into()));

// Create a lookup table for subgraphs, keyed by their ID.
// Invalid URL indexers are filtered out. See: 7f2f89aa-24c9-460b-ab1e-fc94697c4f4
let subgraphs = subgraphs.map(move |subgraphs| async move {
Ptr::new(Self::subgraphs(&subgraphs, cache, ip_blocker).await)
Ptr::new(Self::subgraphs(&subgraphs, ip_blocker).await)
});

// Create a lookup table for deployments, keyed by their ID (which is also their IPFS hash).
Expand Down Expand Up @@ -137,7 +130,6 @@ impl GraphNetwork {

async fn subgraphs(
subgraphs: &[network_subgraph::Subgraph],
cache: &'static RwLock<IpfsCache>,
ip_blocker: &'static Mutex<IpBlocker>,
) -> HashMap<SubgraphId, Subgraph> {
join_all(subgraphs.iter().map(|subgraph| async move {
Expand All @@ -146,7 +138,7 @@ impl GraphNetwork {
subgraph
.versions
.iter()
.map(|version| Self::deployment(subgraphs, version, cache, ip_blocker)),
.map(|version| Self::deployment(subgraphs, version, ip_blocker)),
)
.await
.into_iter()
Expand All @@ -167,11 +159,14 @@ impl GraphNetwork {
async fn deployment(
subgraphs: &[network_subgraph::Subgraph],
version: &network_subgraph::SubgraphVersion,
cache: &'static RwLock<IpfsCache>,
ip_blocker: &'static Mutex<IpBlocker>,
) -> Option<Arc<Deployment>> {
let id = version.subgraph_deployment.id;
let manifest = IpfsCache::manifest(cache, &version.subgraph_deployment.id).await?;
let manifest = version.subgraph_deployment.manifest.as_ref()?;
let manifest = Manifest {
network: manifest.network.as_ref()?.clone(),
min_block: manifest.start_block.unwrap_or(0),
};
let subgraphs = subgraphs
.iter()
.filter(|subgraph| {
Expand Down Expand Up @@ -262,72 +257,3 @@ impl GraphNetwork {
.cloned()
}
}

struct IpfsCache {
ipfs: Arc<ipfs::Client>,
manifests: HashMap<DeploymentId, Arc<Manifest>>,
}

impl IpfsCache {
async fn manifest(cache: &RwLock<Self>, deployment: &DeploymentId) -> Option<Arc<Manifest>> {
let read = cache.read().await;
if let Some(manifest) = read.manifests.get(deployment) {
return Some(manifest.clone());
}
let ipfs = read.ipfs.clone();
drop(read);

let manifest = match Self::cat_manifest(&ipfs, deployment).await {
Ok(manifest) => Arc::new(manifest),
Err(manifest_err) => {
tracing::warn!(%deployment, %manifest_err);
return None;
}
};

let mut write = cache.write().await;
write.manifests.insert(*deployment, manifest.clone());
Some(manifest)
}

async fn cat_manifest(
ipfs: &ipfs::Client,
deployment: &DeploymentId,
) -> anyhow::Result<Manifest> {
// Subgraph manifest schema:
// https://github.com/graphprotocol/graph-node/blob/master/docs/subgraph-manifest.md
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct ManifestSrc {
data_sources: Vec<DataSource>,
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct DataSource {
network: String,
source: EthereumContractSource,
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct EthereumContractSource {
start_block: Option<u64>,
}

let payload = ipfs.cat(&deployment.to_string()).await?;
let manifest = serde_yaml::from_str::<ManifestSrc>(&payload)?;
let min_block = manifest
.data_sources
.iter()
.map(|data_source| data_source.source.start_block.unwrap_or(0))
.min()
.unwrap_or(0);
// We are assuming that all `dataSource.network` fields are identical.
let network = manifest
.data_sources
.into_iter()
.map(|data_source| data_source.network)
.next()
.ok_or_else(|| anyhow!("Network not found"))?;
Ok(Manifest { network, min_block })
}
}
3 changes: 0 additions & 3 deletions graph-gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ alloy-primitives.workspace = true
alloy-sol-types.workspace = true
anyhow.workspace = true
axum = { workspace = true, features = ["tokio", "http1"] }
axum-extra = "0.9.3"
chrono = { version = "0.4", default-features = false, features = ["clock"] }
cost-model = { git = "https://github.com/graphprotocol/agora", rev = "3ed34ca" }
custom_debug = "0.6.1"
Expand Down Expand Up @@ -36,7 +35,6 @@ semver.workspace = true
serde.workspace = true
serde_json = { workspace = true, features = ["raw_value"] }
serde_with.workspace = true
serde_yaml.workspace = true
simple-rate-limiter = "1.0"
snmalloc-rs = "0.3"
thegraph-core = { workspace = true, features = [
Expand All @@ -50,7 +48,6 @@ toolshed.workspace = true
tower = "0.4.13"
tower-http = { version = "0.5.2", features = ["cors"] }
tracing.workspace = true
tracing-subscriber.workspace = true
url = "2.5.0"
uuid = { version = "1.8", default-features = false, features = ["v4"] }

Expand Down
4 changes: 0 additions & 4 deletions graph-gateway/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,6 @@ pub struct Config {
/// timeout, so setting this to 5 for example would result in a 100s worst case response time
/// for a client query.
pub indexer_selection_retry_limit: usize,
/// IPFS endpoint with access to the subgraph files
#[debug(with = Display::fmt)]
#[serde_as(as = "DisplayFromStr")]
pub ipfs: Url,
/// File path of CSV containing rows of `IpNetwork,Country`
pub ip_blocker_db: Option<PathBuf>,
/// IP rate limit in requests per second
Expand Down
Loading

0 comments on commit de3b53f

Please sign in to comment.