Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subgraph Composition: Support multiple subgraph datasources #5731

Open
wants to merge 11 commits into
base: krishna/sgc-multiple-sg-sources-fix
Choose a base branch
from
Open
4 changes: 2 additions & 2 deletions chain/arweave/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use graph::{
prelude::{async_trait, o, BlockNumber, ChainStore, Error, Logger, LoggerFactory},
};
use prost::Message;
use std::collections::HashSet;
use std::collections::BTreeSet;
use std::sync::Arc;

use crate::adapter::TriggerFilter;
Expand Down Expand Up @@ -272,7 +272,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
async fn load_block_ptrs_by_numbers(
&self,
_logger: Logger,
_block_numbers: HashSet<BlockNumber>,
_block_numbers: BTreeSet<BlockNumber>,
) -> Result<Vec<ArweaveBlock>, Error> {
todo!()
}
Expand Down
4 changes: 2 additions & 2 deletions chain/cosmos/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use graph::components::adapter::ChainId;
use graph::env::EnvVars;
use graph::prelude::MetricsRegistry;
use graph::substreams::Clock;
use std::collections::HashSet;
use std::collections::BTreeSet;
use std::convert::TryFrom;
use std::sync::Arc;

Expand Down Expand Up @@ -200,7 +200,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
async fn load_block_ptrs_by_numbers(
&self,
_logger: Logger,
_block_numbers: HashSet<BlockNumber>,
_block_numbers: BTreeSet<BlockNumber>,
) -> Result<Vec<Block>, Error> {
todo!()
}
Expand Down
19 changes: 9 additions & 10 deletions chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use graph::{
},
};
use prost::Message;
use std::collections::HashSet;
use std::collections::{BTreeSet, HashSet};
use std::future::Future;
use std::iter::FromIterator;
use std::sync::Arc;
Expand Down Expand Up @@ -747,7 +747,7 @@ pub struct TriggersAdapter {
async fn fetch_unique_blocks_from_cache(
logger: &Logger,
chain_store: Arc<dyn ChainStore>,
block_numbers: HashSet<BlockNumber>,
block_numbers: BTreeSet<BlockNumber>,
) -> (Vec<Arc<ExtendedBlockPtr>>, Vec<i32>) {
// Load blocks from the cache
let blocks_map = chain_store
Expand Down Expand Up @@ -795,7 +795,7 @@ async fn fetch_unique_blocks_from_cache(
async fn load_blocks<F, Fut>(
logger: &Logger,
chain_store: Arc<dyn ChainStore>,
block_numbers: HashSet<BlockNumber>,
block_numbers: BTreeSet<BlockNumber>,
fetch_missing: F,
) -> Result<Vec<BlockFinality>>
where
Expand Down Expand Up @@ -843,7 +843,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
async fn load_block_ptrs_by_numbers(
&self,
logger: Logger,
block_numbers: HashSet<BlockNumber>,
block_numbers: BTreeSet<BlockNumber>,
) -> Result<Vec<BlockFinality>> {
match &*self.chain_client {
ChainClient::Firehose(endpoints) => {
Expand Down Expand Up @@ -1200,7 +1200,6 @@ mod tests {
use graph::{slog, tokio};

use super::*;
use std::collections::HashSet;
use std::sync::Arc;

// Helper function to create test blocks
Expand All @@ -1224,7 +1223,7 @@ mod tests {
let block = create_test_block(1, "block1");
chain_store.blocks.insert(1, vec![block.clone()]);

let block_numbers: HashSet<_> = vec![1].into_iter().collect();
let block_numbers: BTreeSet<_> = vec![1].into_iter().collect();

let (blocks, missing) =
fetch_unique_blocks_from_cache(&logger, Arc::new(chain_store), block_numbers).await;
Expand All @@ -1246,7 +1245,7 @@ mod tests {
.blocks
.insert(1, vec![block1.clone(), block2.clone()]);

let block_numbers: HashSet<_> = vec![1].into_iter().collect();
let block_numbers: BTreeSet<_> = vec![1].into_iter().collect();

let (blocks, missing) =
fetch_unique_blocks_from_cache(&logger, Arc::new(chain_store), block_numbers).await;
Expand All @@ -1266,7 +1265,7 @@ mod tests {
let block = create_test_block(1, "block1");
chain_store.blocks.insert(1, vec![block.clone()]);

let block_numbers: HashSet<_> = vec![1, 2].into_iter().collect();
let block_numbers: BTreeSet<_> = vec![1, 2].into_iter().collect();

let (blocks, missing) =
fetch_unique_blocks_from_cache(&logger, Arc::new(chain_store), block_numbers).await;
Expand All @@ -1287,7 +1286,7 @@ mod tests {
chain_store.blocks.insert(1, vec![block1.clone()]);
chain_store.blocks.insert(2, vec![block2.clone()]);

let block_numbers: HashSet<_> = vec![1, 2].into_iter().collect();
let block_numbers: BTreeSet<_> = vec![1, 2].into_iter().collect();

let (blocks, missing) =
fetch_unique_blocks_from_cache(&logger, Arc::new(chain_store), block_numbers).await;
Expand Down Expand Up @@ -1316,7 +1315,7 @@ mod tests {
.blocks
.insert(2, vec![block2a.clone(), block2b.clone()]);

let block_numbers: HashSet<_> = vec![1, 2, 3].into_iter().collect();
let block_numbers: BTreeSet<_> = vec![1, 2, 3].into_iter().collect();

let (blocks, missing) =
fetch_unique_blocks_from_cache(&logger, Arc::new(chain_store), block_numbers).await;
Expand Down
4 changes: 2 additions & 2 deletions chain/near/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use graph::{
prelude::{async_trait, o, BlockNumber, ChainStore, Error, Logger, LoggerFactory},
};
use prost::Message;
use std::collections::HashSet;
use std::collections::BTreeSet;
use std::sync::Arc;

use crate::adapter::TriggerFilter;
Expand Down Expand Up @@ -328,7 +328,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
async fn load_block_ptrs_by_numbers(
&self,
_logger: Logger,
_block_numbers: HashSet<BlockNumber>,
_block_numbers: BTreeSet<BlockNumber>,
) -> Result<Vec<Block>> {
unimplemented!()
}
Expand Down
4 changes: 2 additions & 2 deletions chain/starknet/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use graph::{
slog::o,
};
use prost::Message;
use std::{collections::HashSet, sync::Arc};
use std::{collections::BTreeSet, sync::Arc};

use crate::{
adapter::TriggerFilter,
Expand Down Expand Up @@ -375,7 +375,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
async fn load_block_ptrs_by_numbers(
&self,
_logger: Logger,
_block_numbers: HashSet<BlockNumber>,
_block_numbers: BTreeSet<BlockNumber>,
) -> Result<Vec<StarknetBlock>, Error> {
unimplemented!()
}
Expand Down
4 changes: 2 additions & 2 deletions chain/substreams/src/trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use graph::{
};
use graph_runtime_wasm::module::ToAscPtr;
use lazy_static::__Deref;
use std::{collections::HashSet, sync::Arc};
use std::{collections::BTreeSet, sync::Arc};

use crate::{Block, Chain, NoopDataSourceTemplate, ParsedChanges};

Expand Down Expand Up @@ -139,7 +139,7 @@ impl blockchain::TriggersAdapter<Chain> for TriggersAdapter {
async fn load_block_ptrs_by_numbers(
&self,
_logger: Logger,
_block_numbers: HashSet<BlockNumber>,
_block_numbers: BTreeSet<BlockNumber>,
) -> Result<Vec<Block>, Error> {
unimplemented!()
}
Expand Down
1 change: 1 addition & 0 deletions core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ where
.iter()
.map(|handler| handler.entity.clone())
.collect(),
manifest_idx: ds.manifest_idx,
})
.collect::<Vec<_>>();

Expand Down
Loading
Loading