Skip to content

Commit

Permalink
fix(torii): add erc20 patch for eth mainnet token
Browse files Browse the repository at this point in the history
commit-id:93263be6
  • Loading branch information
lambda-0x committed Dec 3, 2024
1 parent cd17676 commit 938d50f
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 17 deletions.
22 changes: 19 additions & 3 deletions crates/torii/core/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,17 @@ pub enum FetchDataResult {
None,
}

impl FetchDataResult {
pub fn block_id(&self) -> Option<BlockId> {
match self {
FetchDataResult::Range(range) => Some(BlockId::Number(range.latest_block_number)),
FetchDataResult::Pending(_pending) => Some(BlockId::Tag(BlockTag::Pending)),
// we dont require block_id when result is none, we return None
FetchDataResult::None => None,
}
}
}

#[derive(Debug)]
pub struct FetchRangeResult {
// (block_number, transaction_hash) -> events
Expand Down Expand Up @@ -269,11 +280,16 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
info!(target: LOG_TARGET, "Syncing reestablished.");
}

let block_id = fetch_result.block_id();
match self.process(fetch_result).await {
Ok(_) => {
self.db.flush().await?;
self.db.apply_cache_diff().await?;
self.db.execute().await?;
// Its only `None` when `FetchDataResult::None` in which case
// we don't need to flush or apply cache diff
if let Some(block_id) = block_id {
self.db.flush().await?;
self.db.apply_cache_diff(block_id).await?;
self.db.execute().await?;
}
},
Err(e) => {
error!(target: LOG_TARGET, error = %e, "Processing fetched data.");
Expand Down
39 changes: 37 additions & 2 deletions crates/torii/core/src/executor/erc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use tracing::{debug, trace, warn};

use super::{ApplyBalanceDiffQuery, Executor};
use crate::constants::{IPFS_CLIENT_MAX_RETRY, SQL_FELT_DELIMITER, TOKEN_BALANCE_TABLE};
use crate::executor::LOG_TARGET;
use crate::sql::utils::{felt_to_sql_string, sql_string_to_u256, u256_to_sql_string, I256};
use crate::types::ContractType;
use crate::utils::fetch_content_from_ipfs;
Expand Down Expand Up @@ -46,6 +47,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> {
pub async fn apply_balance_diff(
&mut self,
apply_balance_diff: ApplyBalanceDiffQuery,
provider: Arc<P>,
) -> Result<()> {
let erc_cache = apply_balance_diff.erc_cache;
for ((contract_type, id_str), balance) in erc_cache.iter() {
Expand All @@ -66,6 +68,8 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> {
contract_address,
token_id,
balance,
Arc::clone(&provider),
apply_balance_diff.block_id,
)
.await
.with_context(|| "Failed to apply balance diff in apply_cache_diff")?;
Expand All @@ -83,6 +87,8 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> {
contract_address,
token_id,
balance,
Arc::clone(&provider),
apply_balance_diff.block_id,
)
.await
.with_context(|| "Failed to apply balance diff in apply_cache_diff")?;
Expand All @@ -93,13 +99,16 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> {
Ok(())
}

#[allow(clippy::too_many_arguments)]
pub async fn apply_balance_diff_helper(
&mut self,
id: &str,
account_address: &str,
contract_address: &str,
token_id: &str,
balance_diff: &I256,
provider: Arc<P>,
block_id: BlockId,
) -> Result<()> {
let tx = &mut self.transaction;
let balance: Option<(String,)> =
Expand All @@ -116,9 +125,35 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> {

if balance_diff.is_negative {
if balance < balance_diff.value {
dbg!(&balance_diff, balance, id);
// HACK: ideally we should never hit this case. But ETH on starknet mainnet didn't
// emit transfer events properly so they are broken. For those cases
// we manually fetch the balance of the address using RPC

let current_balance = provider
.call(
FunctionCall {
contract_address: Felt::from_str(contract_address).unwrap(),
entry_point_selector: get_selector_from_name("balanceOf").unwrap(),
calldata: vec![Felt::from_str(account_address).unwrap()],
},
block_id,
)
.await
.with_context(|| format!("Failed to fetch balance for id: {}", id))?;

let current_balance =
cainome::cairo_serde::U256::cairo_deserialize(&current_balance, 0).unwrap();

warn!(
target: LOG_TARGET,
id = id,
"Invalid transfer event detected, overriding balance by querying RPC directly"
);
// override the balance from onchain data
balance = U256::from_words(current_balance.low, current_balance.high);
} else {
balance -= balance_diff.value;
}
balance -= balance_diff.value;
} else {
balance += balance_diff.value;
}
Expand Down
5 changes: 4 additions & 1 deletion crates/torii/core/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub struct DeleteEntityQuery {
#[derive(Debug, Clone)]
pub struct ApplyBalanceDiffQuery {
pub erc_cache: HashMap<(ContractType, String), I256>,
pub block_id: BlockId,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -606,7 +607,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> {
QueryType::ApplyBalanceDiff(apply_balance_diff) => {
debug!(target: LOG_TARGET, "Applying balance diff.");
let instant = Instant::now();
self.apply_balance_diff(apply_balance_diff).await?;
self.apply_balance_diff(apply_balance_diff, self.provider.clone()).await?;
debug!(target: LOG_TARGET, duration = ?instant.elapsed(), "Applied balance diff.");
}
QueryType::RegisterErc721Token(register_erc721_token) => {
Expand Down Expand Up @@ -678,6 +679,8 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> {

self.register_tasks.spawn(async move {
let permit = semaphore.acquire().await.unwrap();
let span = tracing::span!(tracing::Level::INFO, "contract_address_span", contract_address = %register_erc721_token.contract_address);
let _enter = span.enter();

let result = Self::process_register_erc721_token_query(
register_erc721_token,
Expand Down
3 changes: 2 additions & 1 deletion crates/torii/core/src/processors/erc20_legacy_transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ where
&self,
world: &WorldContractReader<P>,
db: &mut Sql,
_block_number: u64,
block_number: u64,
block_timestamp: u64,
event_id: &str,
event: &Event,
Expand All @@ -59,6 +59,7 @@ where
world.provider(),
block_timestamp,
event_id,
block_number,
)
.await?;
debug!(target: LOG_TARGET,from = ?from, to = ?to, value = ?value, "Legacy ERC20 Transfer");
Expand Down
3 changes: 2 additions & 1 deletion crates/torii/core/src/processors/erc20_transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ where
&self,
world: &WorldContractReader<P>,
db: &mut Sql,
_block_number: u64,
block_number: u64,
block_timestamp: u64,
event_id: &str,
event: &Event,
Expand All @@ -59,6 +59,7 @@ where
world.provider(),
block_timestamp,
event_id,
block_number,
)
.await?;
debug!(target: LOG_TARGET,from = ?from, to = ?to, value = ?value, "ERC20 Transfer");
Expand Down
14 changes: 11 additions & 3 deletions crates/torii/core/src/processors/erc721_legacy_transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ where
&self,
_world: &WorldContractReader<P>,
db: &mut Sql,
_block_number: u64,
block_number: u64,
block_timestamp: u64,
event_id: &str,
event: &Event,
Expand All @@ -51,8 +51,16 @@ where
let token_id = U256Cainome::cairo_deserialize(&event.data, 2)?;
let token_id = U256::from_words(token_id.low, token_id.high);

db.handle_erc721_transfer(token_address, from, to, token_id, block_timestamp, event_id)
.await?;
db.handle_erc721_transfer(
token_address,
from,
to,
token_id,
block_timestamp,
event_id,
block_number,
)
.await?;
debug!(target: LOG_TARGET, from = ?from, to = ?to, token_id = ?token_id, "ERC721 Transfer");

Ok(())
Expand Down
14 changes: 11 additions & 3 deletions crates/torii/core/src/processors/erc721_transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ where
&self,
_world: &WorldContractReader<P>,
db: &mut Sql,
_block_number: u64,
block_number: u64,
block_timestamp: u64,
event_id: &str,
event: &Event,
Expand All @@ -51,8 +51,16 @@ where
let token_id = U256Cainome::cairo_deserialize(&event.keys, 3)?;
let token_id = U256::from_words(token_id.low, token_id.high);

db.handle_erc721_transfer(token_address, from, to, token_id, block_timestamp, event_id)
.await?;
db.handle_erc721_transfer(
token_address,
from,
to,
token_id,
block_timestamp,
event_id,
block_number,
)
.await?;
debug!(target: LOG_TARGET, from = ?from, to = ?to, token_id = ?token_id, "ERC721 Transfer");

Ok(())
Expand Down
11 changes: 8 additions & 3 deletions crates/torii/core/src/sql/erc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ impl Sql {
provider: &P,
block_timestamp: u64,
event_id: &str,
block_number: u64,
) -> Result<()> {
// contract_address
let token_id = felt_to_sql_string(&contract_address);
Expand Down Expand Up @@ -66,10 +67,11 @@ impl Sql {
self.local_cache.erc_cache.entry((ContractType::ERC20, to_balance_id)).or_default();
*to_balance += I256::from(amount);
}
let block_id = BlockId::Number(block_number);

if self.local_cache.erc_cache.len() >= 100000 {
self.flush().await.with_context(|| "Failed to flush in handle_erc20_transfer")?;
self.apply_cache_diff().await?;
self.apply_cache_diff(block_id).await?;
}

Ok(())
Expand All @@ -84,6 +86,7 @@ impl Sql {
token_id: U256,
block_timestamp: u64,
event_id: &str,
block_number: u64,
) -> Result<()> {
// contract_address:id
let actual_token_id = token_id;
Expand Down Expand Up @@ -127,10 +130,11 @@ impl Sql {
.or_default();
*to_balance += I256::from(1u8);
}
let block_id = BlockId::Number(block_number);

if self.local_cache.erc_cache.len() >= 100000 {
self.flush().await.with_context(|| "Failed to flush in handle_erc721_transfer")?;
self.apply_cache_diff().await?;
self.apply_cache_diff(block_id).await?;
}

Ok(())
Expand Down Expand Up @@ -272,7 +276,7 @@ impl Sql {
Ok(())
}

pub async fn apply_cache_diff(&mut self) -> Result<()> {
pub async fn apply_cache_diff(&mut self, block_id: BlockId) -> Result<()> {
if !self.local_cache.erc_cache.is_empty() {
self.executor.send(QueryMessage::new(
"".to_string(),
Expand All @@ -282,6 +286,7 @@ impl Sql {
&mut self.local_cache.erc_cache,
HashMap::with_capacity(64),
),
block_id,
}),
))?;
}
Expand Down

0 comments on commit 938d50f

Please sign in to comment.