diff --git a/Cargo.lock b/Cargo.lock index bd1a7984..86b73546 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3744,6 +3744,16 @@ dependencies = [ "thegraph-core", ] +[[package]] +name = "indexer-receipt" +version = "0.1.0" +dependencies = [ + "anyhow", + "tap_core", + "tap_graph", + "thegraph-core", +] + [[package]] name = "indexer-service-rs" version = "1.4.1" @@ -3772,6 +3782,7 @@ dependencies = [ "indexer-dips", "indexer-monitor", "indexer-query", + "indexer-receipt", "insta", "lazy_static", "pin-project 1.1.8", @@ -3821,6 +3832,7 @@ dependencies = [ "indexer-config", "indexer-monitor", "indexer-query", + "indexer-receipt", "indexer-tap-agent", "indexer-watcher", "jsonrpsee", @@ -6997,7 +7009,7 @@ checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" [[package]] name = "tap_aggregator" version = "0.4.1" -source = "git+https://github.com/semiotic-ai/timeline-aggregation-protocol?rev=dbae001#dbae001f55e48d95a7298417c366f69a711fa1b7" +source = "git+https://github.com/semiotic-ai/timeline-aggregation-protocol?rev=e5546a6#e5546a617ec2efcdbf09c465848b609856bef96c" dependencies = [ "alloy", "anyhow", @@ -7026,7 +7038,7 @@ dependencies = [ [[package]] name = "tap_core" version = "3.0.1" -source = "git+https://github.com/semiotic-ai/timeline-aggregation-protocol?rev=dbae001#dbae001f55e48d95a7298417c366f69a711fa1b7" +source = "git+https://github.com/semiotic-ai/timeline-aggregation-protocol?rev=e5546a6#e5546a617ec2efcdbf09c465848b609856bef96c" dependencies = [ "alloy", "anyhow", @@ -7043,7 +7055,7 @@ dependencies = [ [[package]] name = "tap_eip712_message" version = "0.1.0" -source = "git+https://github.com/semiotic-ai/timeline-aggregation-protocol?rev=dbae001#dbae001f55e48d95a7298417c366f69a711fa1b7" +source = "git+https://github.com/semiotic-ai/timeline-aggregation-protocol?rev=e5546a6#e5546a617ec2efcdbf09c465848b609856bef96c" dependencies = [ "alloy", "serde", @@ -7053,7 +7065,7 @@ dependencies = [ [[package]] name = "tap_graph" version = "0.2.0" -source = "git+https://github.com/semiotic-ai/timeline-aggregation-protocol?rev=dbae001#dbae001f55e48d95a7298417c366f69a711fa1b7" +source = "git+https://github.com/semiotic-ai/timeline-aggregation-protocol?rev=e5546a6#e5546a617ec2efcdbf09c465848b609856bef96c" dependencies = [ "alloy", "rand", @@ -7066,7 +7078,7 @@ dependencies = [ [[package]] name = "tap_receipt" version = "0.1.0" -source = "git+https://github.com/semiotic-ai/timeline-aggregation-protocol?rev=dbae001#dbae001f55e48d95a7298417c366f69a711fa1b7" +source = "git+https://github.com/semiotic-ai/timeline-aggregation-protocol?rev=e5546a6#e5546a617ec2efcdbf09c465848b609856bef96c" dependencies = [ "alloy", "anyhow", diff --git a/Cargo.toml b/Cargo.toml index 7080796b..d81debc5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ members = [ "crates/attestation", "crates/config", "crates/dips", + "crates/indexer-receipt", "crates/monitor", "crates/query", "crates/service", @@ -84,12 +85,12 @@ tonic-build = "0.12.3" [patch.crates-io.tap_core] git = "https://github.com/semiotic-ai/timeline-aggregation-protocol" -rev = "dbae001" +rev = "e5546a6" [patch.crates-io.tap_aggregator] git = "https://github.com/semiotic-ai/timeline-aggregation-protocol" -rev = "dbae001" +rev = "e5546a6" [patch.crates-io.tap_graph] git = "https://github.com/semiotic-ai/timeline-aggregation-protocol" -rev = "dbae001" +rev = "e5546a6" diff --git a/crates/indexer-receipt/Cargo.toml b/crates/indexer-receipt/Cargo.toml new file mode 100644 index 00000000..8007d150 --- /dev/null +++ b/crates/indexer-receipt/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "indexer-receipt" +version = "0.1.0" +edition = "2021" + +[dependencies] +tap_core.workspace = true +tap_graph.workspace = true +thegraph-core.workspace = true +anyhow.workspace = true diff --git a/crates/indexer-receipt/src/lib.rs b/crates/indexer-receipt/src/lib.rs new file mode 100644 index 00000000..c1a79b3b --- /dev/null +++ b/crates/indexer-receipt/src/lib.rs @@ -0,0 +1,175 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use anyhow::anyhow; +use tap_core::{ + receipt::{ + rav::{Aggregate, AggregationError}, + WithUniqueId, WithValueAndTimestamp, + }, + signed_message::SignatureBytes, +}; +use thegraph_core::alloy::{dyn_abi::Eip712Domain, primitives::Address, signers::Signature}; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum TapReceipt { + V1(tap_graph::SignedReceipt), + V2(tap_graph::v2::SignedReceipt), +} + +impl Aggregate for tap_graph::ReceiptAggregateVoucher { + fn aggregate_receipts( + receipts: &[tap_core::receipt::ReceiptWithState< + tap_core::receipt::state::Checked, + TapReceipt, + >], + previous_rav: Option>, + ) -> Result { + if receipts.is_empty() { + return Err(AggregationError::NoValidReceiptsForRavRequest); + } + let receipts: Vec<_> = receipts + .iter() + .map(|receipt| { + receipt + .signed_receipt() + .get_v1_receipt() + .cloned() + .ok_or(anyhow!("Receipt is not v1")) + }) + .collect::>() + .map_err(AggregationError::Other)?; + let allocation_id = receipts[0].message.allocation_id; + tap_graph::ReceiptAggregateVoucher::aggregate_receipts( + allocation_id, + receipts.as_slice(), + previous_rav, + ) + } +} + +impl Aggregate for tap_graph::v2::ReceiptAggregateVoucher { + fn aggregate_receipts( + receipts: &[tap_core::receipt::ReceiptWithState< + tap_core::receipt::state::Checked, + TapReceipt, + >], + previous_rav: Option>, + ) -> Result { + if receipts.is_empty() { + return Err(AggregationError::NoValidReceiptsForRavRequest); + } + let receipts: Vec<_> = receipts + .iter() + .map(|receipt| { + receipt + .signed_receipt() + .get_v2_receipt() + .cloned() + .ok_or(anyhow!("Receipt is not v2")) + }) + .collect::>() + .map_err(AggregationError::Other)?; + let allocation_id = receipts[0].message.allocation_id; + let payer = receipts[0].message.payer; + let data_service = receipts[0].message.data_service; + let service_provider = receipts[0].message.service_provider; + + tap_graph::v2::ReceiptAggregateVoucher::aggregate_receipts( + allocation_id, + payer, + data_service, + service_provider, + receipts.as_slice(), + previous_rav, + ) + } +} + +impl TapReceipt { + pub fn as_v1(self) -> Option { + match self { + TapReceipt::V1(receipt) => Some(receipt), + _ => None, + } + } + + pub fn as_v2(self) -> Option { + match self { + TapReceipt::V2(receipt) => Some(receipt), + _ => None, + } + } + + pub fn get_v1_receipt(&self) -> Option<&tap_graph::SignedReceipt> { + match self { + TapReceipt::V1(receipt) => Some(receipt), + _ => None, + } + } + + pub fn get_v2_receipt(&self) -> Option<&tap_graph::v2::SignedReceipt> { + match self { + TapReceipt::V2(receipt) => Some(receipt), + _ => None, + } + } + + pub fn allocation_id(&self) -> Address { + match self { + TapReceipt::V1(receipt) => receipt.message.allocation_id, + TapReceipt::V2(receipt) => receipt.message.allocation_id, + } + } + + pub fn signature(&self) -> Signature { + match self { + TapReceipt::V1(receipt) => receipt.signature, + TapReceipt::V2(receipt) => receipt.signature, + } + } + + pub fn nonce(&self) -> u64 { + match self { + TapReceipt::V1(receipt) => receipt.message.nonce, + TapReceipt::V2(receipt) => receipt.message.nonce, + } + } + + pub fn recover_signer( + &self, + domain_separator: &Eip712Domain, + ) -> Result { + match self { + TapReceipt::V1(receipt) => receipt.recover_signer(domain_separator), + TapReceipt::V2(receipt) => receipt.recover_signer(domain_separator), + } + } +} + +impl WithValueAndTimestamp for TapReceipt { + fn value(&self) -> u128 { + match self { + TapReceipt::V1(receipt) => receipt.value(), + TapReceipt::V2(receipt) => receipt.value(), + } + } + + fn timestamp_ns(&self) -> u64 { + match self { + TapReceipt::V1(receipt) => receipt.timestamp_ns(), + TapReceipt::V2(receipt) => receipt.timestamp_ns(), + } + } +} + +impl WithUniqueId for TapReceipt { + type Output = SignatureBytes; + + fn unique_id(&self) -> Self::Output { + match self { + TapReceipt::V1(receipt) => receipt.unique_id(), + TapReceipt::V2(receipt) => receipt.unique_id(), + } + } +} diff --git a/crates/service/Cargo.toml b/crates/service/Cargo.toml index 08e0fe4e..870b04e2 100644 --- a/crates/service/Cargo.toml +++ b/crates/service/Cargo.toml @@ -13,6 +13,7 @@ indexer-allocation = { path = "../allocation" } indexer-config = { path = "../config" } indexer-dips = { path = "../dips" } indexer-query = { path = "../query" } +indexer-receipt = { path = "../indexer-receipt" } anyhow = { workspace = true } prometheus = { workspace = true } reqwest = { workspace = true } diff --git a/crates/service/src/middleware/auth/tap.rs b/crates/service/src/middleware/auth/tap.rs index 554a0bff..c6062e9e 100644 --- a/crates/service/src/middleware/auth/tap.rs +++ b/crates/service/src/middleware/auth/tap.rs @@ -20,7 +20,6 @@ use tap_core::{ manager::{adapters::ReceiptStore, Manager}, receipt::Context, }; -use tap_graph::ReceiptAggregateVoucher; use tower_http::auth::AsyncAuthorizeRequest; use crate::{ @@ -33,7 +32,7 @@ use crate::{ /// /// Requires TapReceipt, MetricLabels and Arc extensions pub fn tap_receipt_authorize( - tap_manager: Arc>, + tap_manager: Arc>, failed_receipt_metric: &'static prometheus::CounterVec, ) -> impl AsyncAuthorizeRequest< B, diff --git a/crates/service/src/tap.rs b/crates/service/src/tap.rs index 2c13b614..47bc91c9 100644 --- a/crates/service/src/tap.rs +++ b/crates/service/src/tap.rs @@ -22,11 +22,10 @@ use crate::tap::checks::{ }; mod checks; -mod receipt; mod receipt_store; +pub use ::indexer_receipt::TapReceipt; pub use checks::value_check::AgoraQuery; -pub use receipt::TapReceipt; pub type CheckingReceipt = ReceiptWithState; diff --git a/crates/service/src/tap/receipt.rs b/crates/service/src/tap/receipt.rs deleted file mode 100644 index 61cf4918..00000000 --- a/crates/service/src/tap/receipt.rs +++ /dev/null @@ -1,74 +0,0 @@ -// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - -use tap_core::{ - receipt::{WithUniqueId, WithValueAndTimestamp}, - signed_message::SignatureBytes, -}; -use thegraph_core::alloy::{dyn_abi::Eip712Domain, primitives::Address, signers::Signature}; - -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum TapReceipt { - V1(tap_graph::SignedReceipt), - V2(tap_graph::v2::SignedReceipt), -} - -impl TapReceipt { - pub fn allocation_id(&self) -> Address { - match self { - TapReceipt::V1(receipt) => receipt.message.allocation_id, - TapReceipt::V2(receipt) => receipt.message.allocation_id, - } - } - - pub fn signature(&self) -> Signature { - match self { - TapReceipt::V1(receipt) => receipt.signature, - TapReceipt::V2(receipt) => receipt.signature, - } - } - - pub fn nonce(&self) -> u64 { - match self { - TapReceipt::V1(receipt) => receipt.message.nonce, - TapReceipt::V2(receipt) => receipt.message.nonce, - } - } - - pub fn recover_signer( - &self, - domain_separator: &Eip712Domain, - ) -> Result { - match self { - TapReceipt::V1(receipt) => receipt.recover_signer(domain_separator), - TapReceipt::V2(receipt) => receipt.recover_signer(domain_separator), - } - } -} - -impl WithValueAndTimestamp for TapReceipt { - fn value(&self) -> u128 { - match self { - TapReceipt::V1(receipt) => receipt.value(), - TapReceipt::V2(receipt) => receipt.value(), - } - } - - fn timestamp_ns(&self) -> u64 { - match self { - TapReceipt::V1(receipt) => receipt.timestamp_ns(), - TapReceipt::V2(receipt) => receipt.timestamp_ns(), - } - } -} - -impl WithUniqueId for TapReceipt { - type Output = SignatureBytes; - - fn unique_id(&self) -> Self::Output { - match self { - TapReceipt::V1(receipt) => receipt.unique_id(), - TapReceipt::V2(receipt) => receipt.unique_id(), - } - } -} diff --git a/crates/tap-agent/Cargo.toml b/crates/tap-agent/Cargo.toml index 3649159f..4744ff56 100644 --- a/crates/tap-agent/Cargo.toml +++ b/crates/tap-agent/Cargo.toml @@ -17,6 +17,7 @@ indexer-watcher = { path = "../watcher" } indexer-allocation = { path = "../allocation" } indexer-config = { path = "../config" } indexer-query = { path = "../query" } +indexer-receipt = { path = "../indexer-receipt" } anyhow.workspace = true async-trait.workspace = true sqlx.workspace = true diff --git a/crates/tap-agent/src/agent/sender_allocation.rs b/crates/tap-agent/src/agent/sender_allocation.rs index bc4b6647..2f4aa318 100644 --- a/crates/tap-agent/src/agent/sender_allocation.rs +++ b/crates/tap-agent/src/agent/sender_allocation.rs @@ -22,11 +22,11 @@ use tap_core::{ checks::{Check, CheckList}, rav::AggregationError, state::Failed, - Context, ReceiptWithState, + Context, ReceiptWithState, WithValueAndTimestamp, }, signed_message::Eip712SignedMessage, }; -use tap_graph::{ReceiptAggregateVoucher, SignedRav, SignedReceipt}; +use tap_graph::{ReceiptAggregateVoucher, SignedRav}; use thegraph_core::alloy::{hex::ToHexExt, primitives::Address, sol_types::Eip712Domain}; use thiserror::Error; use tokio::sync::watch::Receiver; @@ -45,7 +45,7 @@ use crate::{ checks::{AllocationId, Signature}, TapAgentContext, }, - signers_trimmed, + signers_trimmed, TapReceipt, }, }; @@ -93,12 +93,14 @@ pub enum RavError { #[error("All receipts are invalid")] AllReceiptsInvalid, + #[error("Receipt is not legacy")] + ReceiptNotCompatible, + #[error(transparent)] Other(#[from] anyhow::Error), } -type TapManager = - tap_core::manager::Manager; +type TapManager = tap_core::manager::Manager; /// Manages unaggregated fees and the TAP lifecyle for a specific (allocation, sender) pair. pub struct SenderAllocation; @@ -353,7 +355,7 @@ impl SenderAllocationState { config, }: SenderAllocationArgs, ) -> anyhow::Result { - let required_checks: Vec + Send + Sync>> = vec![ + let required_checks: Vec + Send + Sync>> = vec![ Arc::new( AllocationId::new( config.indexer_address, @@ -565,12 +567,12 @@ impl SenderAllocationState { // Obtain min/max timestamps to define query let min_timestamp = invalid_receipts .iter() - .map(|receipt| receipt.signed_receipt().message.timestamp_ns) + .map(|receipt| receipt.signed_receipt().timestamp_ns()) .min() .expect("invalid receipts should not be empty"); let max_timestamp = invalid_receipts .iter() - .map(|receipt| receipt.signed_receipt().message.timestamp_ns) + .map(|receipt| receipt.signed_receipt().timestamp_ns()) .max() .expect("invalid receipts should not be empty"); let signers = signers_trimmed(self.escrow_accounts.clone(), self.sender).await?; @@ -594,8 +596,13 @@ impl SenderAllocationState { (Ok(expected_rav), ..) => { let valid_receipts: Vec<_> = valid_receipts .into_iter() - .map(|r| r.signed_receipt().clone()) - .collect(); + .map(|r| { + r.signed_receipt() + .clone() + .as_v1() + .ok_or(RavError::ReceiptNotCompatible) + }) + .collect::>()?; let rav_request = AggregatorRequest::new(valid_receipts, previous_rav); @@ -731,7 +738,7 @@ impl SenderAllocationState { async fn store_invalid_receipts( &mut self, - receipts: &[ReceiptWithState], + receipts: &[ReceiptWithState], ) -> anyhow::Result<()> { let reciepts_len = receipts.len(); let mut reciepts_signers = Vec::with_capacity(reciepts_len); @@ -743,7 +750,10 @@ impl SenderAllocationState { let mut error_logs = Vec::with_capacity(reciepts_len); for received_receipt in receipts.iter() { - let receipt = received_receipt.signed_receipt(); + let receipt = match received_receipt.signed_receipt() { + TapReceipt::V1(receipt) => receipt, + TapReceipt::V2(_) => unimplemented!("V2 not supported"), + }; let allocation_id = receipt.message.allocation_id; let encoded_signature = receipt.signature.as_bytes().to_vec(); let receipt_error = received_receipt.clone().error().to_string(); @@ -802,7 +812,7 @@ impl SenderAllocationState { let fees = receipts .iter() - .map(|receipt| receipt.signed_receipt().message.value) + .map(|receipt| receipt.signed_receipt().value()) .sum(); self.invalid_receipts_fees.value = self @@ -874,6 +884,7 @@ pub mod tests { use bigdecimal::ToPrimitive; use futures::future::join_all; use indexer_monitor::{DeploymentDetails, EscrowAccounts, SubgraphClient}; + use indexer_receipt::TapReceipt; use ractor::{call, cast, Actor, ActorRef, ActorStatus}; use ruint::aliases::U256; use serde_json::json; @@ -883,7 +894,6 @@ pub mod tests { checks::{Check, CheckError, CheckList, CheckResult}, Context, }; - use tap_graph::SignedReceipt; use test_assets::{ flush_messages, ALLOCATION_ID_0, TAP_EIP712_DOMAIN as TAP_EIP712_DOMAIN_SEPARATOR, TAP_SENDER as SENDER, TAP_SIGNER as SIGNER, @@ -1574,7 +1584,7 @@ pub mod tests { struct FailingCheck; #[async_trait::async_trait] - impl Check for FailingCheck { + impl Check for FailingCheck { async fn check( &self, _: &tap_core::receipt::Context, diff --git a/crates/tap-agent/src/tap/context/checks/allocation_id.rs b/crates/tap-agent/src/tap/context/checks/allocation_id.rs index 651b0c2d..d264fe63 100644 --- a/crates/tap-agent/src/tap/context/checks/allocation_id.rs +++ b/crates/tap-agent/src/tap/context/checks/allocation_id.rs @@ -8,11 +8,10 @@ use indexer_monitor::SubgraphClient; use indexer_query::{tap_transactions, TapTransactions}; use indexer_watcher::new_watcher; use tap_core::receipt::checks::{Check, CheckError, CheckResult}; -use tap_graph::SignedReceipt; use thegraph_core::alloy::primitives::Address; use tokio::sync::watch::Receiver; -use crate::tap::CheckingReceipt; +use crate::tap::{CheckingReceipt, TapReceipt}; pub struct AllocationId { tap_allocation_redeemed: Receiver, @@ -45,13 +44,13 @@ impl AllocationId { } #[async_trait::async_trait] -impl Check for AllocationId { +impl Check for AllocationId { async fn check( &self, _: &tap_core::receipt::Context, receipt: &CheckingReceipt, ) -> CheckResult { - let allocation_id = receipt.signed_receipt().message.allocation_id; + let allocation_id = receipt.signed_receipt().allocation_id(); // TODO: Remove the if block below? Each TAP Monitor is specific to an allocation // ID. So the receipts that are received here should already have been filtered by // allocation ID. diff --git a/crates/tap-agent/src/tap/context/checks/signature.rs b/crates/tap-agent/src/tap/context/checks/signature.rs index 9eb49935..41a10ff5 100644 --- a/crates/tap-agent/src/tap/context/checks/signature.rs +++ b/crates/tap-agent/src/tap/context/checks/signature.rs @@ -4,11 +4,10 @@ use anyhow::anyhow; use indexer_monitor::EscrowAccounts; use tap_core::receipt::checks::{Check, CheckError, CheckResult}; -use tap_graph::SignedReceipt; use thegraph_core::alloy::{primitives::U256, sol_types::Eip712Domain}; use tokio::sync::watch::Receiver; -use crate::tap::CheckingReceipt; +use crate::tap::{CheckingReceipt, TapReceipt}; pub struct Signature { domain_separator: Eip712Domain, @@ -25,7 +24,7 @@ impl Signature { } #[async_trait::async_trait] -impl Check for Signature { +impl Check for Signature { async fn check( &self, _: &tap_core::receipt::Context, diff --git a/crates/tap-agent/src/tap/context/receipt.rs b/crates/tap-agent/src/tap/context/receipt.rs index 0d179701..c1f20eb7 100644 --- a/crates/tap-agent/src/tap/context/receipt.rs +++ b/crates/tap-agent/src/tap/context/receipt.rs @@ -8,6 +8,7 @@ use std::{ }; use bigdecimal::{num_bigint::ToBigInt, ToPrimitive}; +use indexer_receipt::TapReceipt; use sqlx::{postgres::types::PgRange, types::BigDecimal}; use tap_core::manager::adapters::{safe_truncate_receipts, ReceiptDelete, ReceiptRead}; use tap_graph::{Receipt, SignedReceipt}; @@ -69,7 +70,7 @@ fn rangebounds_to_pgrange>(range: R) -> PgRange } #[async_trait::async_trait] -impl ReceiptRead for TapAgentContext { +impl ReceiptRead for TapAgentContext { type AdapterError = AdapterError; async fn retrieve_receipts_in_timestamp_range + Send>( @@ -145,7 +146,7 @@ impl ReceiptRead for TapAgentContext { signature, }; - Ok(CheckingReceipt::new(signed_receipt)) + Ok(CheckingReceipt::new(TapReceipt::V1(signed_receipt))) }) .collect::, AdapterError>>()?; @@ -198,7 +199,10 @@ mod test { use indexer_monitor::EscrowAccounts; use lazy_static::lazy_static; use sqlx::PgPool; - use tap_core::manager::adapters::{ReceiptDelete, ReceiptRead}; + use tap_core::{ + manager::adapters::{ReceiptDelete, ReceiptRead}, + receipt::{WithUniqueId, WithValueAndTimestamp}, + }; use test_assets::{ ALLOCATION_ID_0, ALLOCATION_ID_1, TAP_EIP712_DOMAIN as TAP_EIP712_DOMAIN_SEPARATOR, TAP_SENDER as SENDER, TAP_SIGNER as SIGNER, @@ -246,8 +250,8 @@ mod test { .clone(); assert_eq!( - received_receipt.signed_receipt().unique_hash(), - retrieved_receipt.signed_receipt().unique_hash(), + received_receipt.signed_receipt().unique_id(), + retrieved_receipt.signed_receipt().unique_id(), ); } @@ -266,8 +270,8 @@ mod test { let received_receipt_vec: Vec<_> = received_receipt_vec .iter() .filter(|received_receipt| { - range.contains(&received_receipt.signed_receipt().message.timestamp_ns) - && (received_receipt.signed_receipt().message.allocation_id + range.contains(&received_receipt.signed_receipt().timestamp_ns()) + && (received_receipt.signed_receipt().allocation_id() == storage_adapter.allocation_id) && escrow_accounts_snapshot .get_sender_for_signer( @@ -286,7 +290,7 @@ mod test { .retrieve_receipts_in_timestamp_range(range, None) .await? .into_iter() - .map(|r| r.signed_receipt().unique_hash()) + .map(|r| r.signed_receipt().unique_id()) .collect::>(); // Check length @@ -298,8 +302,7 @@ mod test { // Checking that the receipts in recovered_received_receipt_vec are the same as // the ones in received_receipt_vec assert!(received_receipt_vec.iter().all(|received_receipt| { - recovered_received_receipt_vec - .contains(&received_receipt.signed_receipt().unique_hash()) + recovered_received_receipt_vec.contains(&received_receipt.signed_receipt().unique_id()) })); Ok(()) } @@ -333,7 +336,7 @@ mod test { let received_receipt_vec: Vec<_> = received_receipt_vec .iter() .filter(|(_, received_receipt)| { - if (received_receipt.signed_receipt().message.allocation_id + if (received_receipt.signed_receipt().allocation_id() == storage_adapter.allocation_id) && escrow_accounts_snapshot .get_sender_for_signer( @@ -344,7 +347,7 @@ mod test { ) .is_ok_and(|v| v == storage_adapter.sender) { - !range.contains(&received_receipt.signed_receipt().message.timestamp_ns) + !range.contains(&received_receipt.signed_receipt().timestamp_ns()) } else { true } @@ -397,14 +400,13 @@ mod test { }, signature, }; - signed_receipt.unique_hash() + signed_receipt.unique_id() }) .collect(); // Check values recovered_received_receipt_set contains values received_receipt_vec assert!(received_receipt_vec.iter().all(|(_, received_receipt)| { - recovered_received_receipt_set - .contains(&received_receipt.signed_receipt().unique_hash()) + recovered_received_receipt_set.contains(&received_receipt.signed_receipt().unique_id()) })); // Removing all the receipts in the DB diff --git a/crates/tap-agent/src/tap/mod.rs b/crates/tap-agent/src/tap/mod.rs index e5042802..598a9b28 100644 --- a/crates/tap-agent/src/tap/mod.rs +++ b/crates/tap-agent/src/tap/mod.rs @@ -1,13 +1,12 @@ // Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 +pub use ::indexer_receipt::TapReceipt; use indexer_monitor::EscrowAccounts; use tap_core::receipt::{state::Checking, ReceiptWithState}; -use tap_graph::SignedReceipt; use thegraph_core::alloy::{hex::ToHexExt, primitives::Address}; use tokio::sync::watch::Receiver; - -pub type CheckingReceipt = ReceiptWithState; +pub type CheckingReceipt = ReceiptWithState; pub mod context; diff --git a/crates/tap-agent/src/test.rs b/crates/tap-agent/src/test.rs index 7924942c..f2ce0089 100644 --- a/crates/tap-agent/src/test.rs +++ b/crates/tap-agent/src/test.rs @@ -12,6 +12,7 @@ use actors::TestableActor; use anyhow::anyhow; use bigdecimal::num_bigint::BigInt; use indexer_monitor::{DeploymentDetails, EscrowAccounts, SubgraphClient}; +use indexer_receipt::TapReceipt; use lazy_static::lazy_static; use ractor::{concurrency::JoinHandle, Actor, ActorRef}; use reqwest::Url; @@ -275,10 +276,20 @@ pub fn create_received_receipt( signer_wallet, ) .unwrap(); - CheckingReceipt::new(receipt) + CheckingReceipt::new(indexer_receipt::TapReceipt::V1(receipt)) } -pub async fn store_receipt(pgpool: &PgPool, signed_receipt: &SignedReceipt) -> anyhow::Result { +pub async fn store_receipt(pgpool: &PgPool, signed_receipt: &TapReceipt) -> anyhow::Result { + match signed_receipt { + TapReceipt::V1(signed_receipt) => store_receipt_v1(pgpool, signed_receipt).await, + TapReceipt::V2(_) => unimplemented!("V2 not supported"), + } +} + +pub async fn store_receipt_v1( + pgpool: &PgPool, + signed_receipt: &SignedReceipt, +) -> anyhow::Result { let encoded_signature = signed_receipt.signature.as_bytes().to_vec(); let record = sqlx::query!( @@ -318,7 +329,10 @@ pub async fn store_batch_receipts( let mut values = Vec::with_capacity(receipts_len); for receipt in receipts { - let receipt = receipt.signed_receipt(); + let receipt = match receipt.signed_receipt() { + TapReceipt::V1(receipt) => receipt, + TapReceipt::V2(_) => unimplemented!("V2 receipts not supported"), + }; signers.push( receipt .recover_signer(&TAP_EIP712_DOMAIN_SEPARATOR) @@ -364,6 +378,16 @@ pub async fn store_batch_receipts( } pub async fn store_invalid_receipt( + pgpool: &PgPool, + signed_receipt: &TapReceipt, +) -> anyhow::Result { + match signed_receipt { + TapReceipt::V1(signed_receipt) => store_invalid_receipt_v1(pgpool, signed_receipt).await, + TapReceipt::V2(_) => unimplemented!("V2 not supported"), + } +} + +pub async fn store_invalid_receipt_v1( pgpool: &PgPool, signed_receipt: &SignedReceipt, ) -> anyhow::Result {