Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gustavo/generic manager #584

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 89 additions & 90 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ axum = { version = "0.7.9", default-features = false, features = [
"http1",
"http2",
] }
tokio = "1.40"
tokio = "1"
prometheus = "0.13.3"
anyhow = { version = "1.0.72" }
thiserror = "1.0.49"
Expand Down Expand Up @@ -52,22 +52,22 @@ uuid = { version = "1.11.0", features = ["v7"] }
tracing = { version = "0.1.40", default-features = false }
bigdecimal = "0.4.3"
build-info = "0.0.39"
tap_core = { git = "https://github.com/semiotic-ai/timeline-aggregation-protocol", rev = "6af1add", default-features = false }
tap_aggregator = { git = "https://github.com/semiotic-ai/timeline-aggregation-protocol", rev = "6af1add", default-features = false }
tap_core = { git = "https://github.com/semiotic-ai/timeline-aggregation-protocol", rev = "1fc51a3", default-features = false }
tap_aggregator = { git = "https://github.com/semiotic-ai/timeline-aggregation-protocol", rev = "1fc51a3", default-features = false }
tracing-subscriber = { version = "0.3", features = [
"json",
"env-filter",
"ansi",
], default-features = false }
thegraph-core = { version = "0.10.0", features = [
thegraph-core = { git = "https://github.com/edgeandnode/toolshed", rev = "a1d0509", features = [
"attestation",
"alloy-eip712",
"alloy-sol-types",
"alloy-rlp",
"alloy-signers",
"alloy-signer-local",
"alloy-signer-mnemonic",
"serde"
"serde",
] }
thegraph-graphql-http = { version = "0.3.2", features = ["reqwest"] }
graphql_client = { version = "0.14.0", features = ["reqwest-rustls"] }
Expand Down
16 changes: 8 additions & 8 deletions crates/dips/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ pub mod server;
pub mod store;

use store::AgreementStore;
use uuid::Uuid;

use thiserror::Error;
use uuid::Uuid;

sol! {
// EIP712 encoded bytes, ABI - ethers
Expand Down Expand Up @@ -264,9 +263,10 @@ pub async fn validate_and_cancel_agreement(

#[cfg(test)]
mod test {
use std::sync::Arc;

use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::{
sync::Arc,
time::{Duration, SystemTime, UNIX_EPOCH},
};

use thegraph_core::{
alloy::{
Expand All @@ -276,12 +276,12 @@ mod test {
},
attestation::eip712_domain,
};

use crate::{CancellationRequest, DipsError};
use crate::{IndexingAgreementVoucher, SubgraphIndexingVoucherMetadata};
use uuid::Uuid;

pub use crate::store::{AgreementStore, InMemoryAgreementStore};
use crate::{
CancellationRequest, DipsError, IndexingAgreementVoucher, SubgraphIndexingVoucherMetadata,
};

#[tokio::test]
async fn test_validate_and_create_agreement() -> anyhow::Result<()> {
Expand Down
9 changes: 5 additions & 4 deletions crates/dips/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@

use std::{str::FromStr, sync::Arc, time::Duration};

use anyhow::anyhow;
use async_trait::async_trait;
use thegraph_core::alloy::{primitives::Address, sol_types::Eip712Domain};
use uuid::Uuid;

use crate::{
proto::graphprotocol::indexer::dips::*, store::AgreementStore, validate_and_cancel_agreement,
validate_and_create_agreement, DipsError,
};
use anyhow::anyhow;
use async_trait::async_trait;
use thegraph_core::alloy::{dyn_abi::Eip712Domain, primitives::Address};
use uuid::Uuid;

#[derive(Debug)]
pub struct DipsServer {
Expand Down
2 changes: 0 additions & 2 deletions crates/service/src/database/dips.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@
use anyhow::bail;
use axum::async_trait;
use build_info::chrono::Utc;

use indexer_dips::{
store::AgreementStore, SignedCancellationRequest, SignedIndexingAgreementVoucher,
};

use sqlx::PgPool;
use thegraph_core::alloy::rlp::Decodable;
use uuid::Uuid;
Expand Down
11 changes: 6 additions & 5 deletions crates/service/src/middleware/auth/tap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use axum::{
};
use tap_core::{
manager::{adapters::ReceiptStore, Manager},
rav::ReceiptAggregateVoucher,
receipt::{Context, SignedReceipt},
};
use tower_http::auth::AsyncAuthorizeRequest;
Expand All @@ -30,7 +31,7 @@ use crate::{error::IndexerServiceError, middleware::prometheus_metrics::MetricLa
///
/// Requires SignedReceipt, MetricLabels and Arc<Context> extensions
pub fn tap_receipt_authorize<T, B>(
tap_manager: Arc<Manager<T>>,
tap_manager: Arc<Manager<T, SignedReceipt, ReceiptAggregateVoucher>>,
failed_receipt_metric: &'static prometheus::CounterVec,
) -> impl AsyncAuthorizeRequest<
B,
Expand All @@ -40,7 +41,7 @@ pub fn tap_receipt_authorize<T, B>(
> + Clone
+ Send
where
T: ReceiptStore + Sync + Send + 'static,
T: ReceiptStore<SignedReceipt> + Sync + Send + 'static,
B: Send,
{
move |request: Request<B>| {
Expand Down Expand Up @@ -91,7 +92,7 @@ mod tests {
receipt::{
checks::{Check, CheckError, CheckList, CheckResult},
state::Checking,
ReceiptWithState,
ReceiptWithState, SignedReceipt,
},
};
use test_assets::{
Expand Down Expand Up @@ -133,11 +134,11 @@ mod tests {

struct MyCheck;
#[async_trait::async_trait]
impl Check for MyCheck {
impl Check<SignedReceipt> for MyCheck {
async fn check(
&self,
_: &tap_core::receipt::Context,
receipt: &ReceiptWithState<Checking>,
receipt: &ReceiptWithState<Checking, SignedReceipt>,
) -> CheckResult {
if receipt.signed_receipt().message.nonce == FAILED_NONCE {
Err(CheckError::Failed(anyhow::anyhow!("Failed")))
Expand Down
4 changes: 2 additions & 2 deletions crates/service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{net::SocketAddr, sync::Arc, time::Duration};

use anyhow::anyhow;
use axum::{extract::Request, serve, ServiceExt};
use clap::Parser;
use indexer_config::{Config, DipsConfig, GraphNodeConfig, SubgraphConfig};
use indexer_dips::{
proto::graphprotocol::indexer::dips::agreement_service_server::{
Expand All @@ -19,14 +20,13 @@ use reqwest::Url;
use tap_core::tap_eip712_domain;
use tokio::{net::TcpListener, signal};
use tower_http::normalize_path::NormalizePath;
use tracing::info;

use crate::{
cli::Cli,
database::{self, dips::PsqlAgreementStore},
metrics::serve_metrics,
};
use clap::Parser;
use tracing::info;

mod release;
mod router;
Expand Down
4 changes: 2 additions & 2 deletions crates/service/src/tap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use indexer_allocation::Allocation;
use indexer_monitor::EscrowAccounts;
use receipt_store::{DatabaseReceipt, InnerContext};
use sqlx::PgPool;
use tap_core::receipt::checks::ReceiptCheck;
use tap_core::receipt::{checks::ReceiptCheck, SignedReceipt};
use thegraph_core::alloy::{primitives::Address, sol_types::Eip712Domain};
use tokio::sync::{
mpsc::{self, Sender},
Expand Down Expand Up @@ -48,7 +48,7 @@ impl IndexerTapContext {
escrow_accounts: Receiver<EscrowAccounts>,
timestamp_error_tolerance: Duration,
receipt_max_value: u128,
) -> Vec<ReceiptCheck> {
) -> Vec<ReceiptCheck<SignedReceipt>> {
vec![
Arc::new(AllocationEligible::new(indexer_allocations)),
Arc::new(SenderBalanceCheck::new(escrow_accounts)),
Expand Down
6 changes: 3 additions & 3 deletions crates/service/src/tap/checks/allocation_eligible.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use indexer_allocation::Allocation;
use tap_core::receipt::{
checks::{Check, CheckError, CheckResult},
state::Checking,
ReceiptWithState,
ReceiptWithState, SignedReceipt,
};
use thegraph_core::alloy::primitives::Address;
use tokio::sync::watch::Receiver;
Expand All @@ -25,11 +25,11 @@ impl AllocationEligible {
}
}
#[async_trait::async_trait]
impl Check for AllocationEligible {
impl Check<SignedReceipt> for AllocationEligible {
async fn check(
&self,
_: &tap_core::receipt::Context,
receipt: &ReceiptWithState<Checking>,
receipt: &ReceiptWithState<Checking, SignedReceipt>,
) -> CheckResult {
let allocation_id = receipt.signed_receipt().message.allocation_id;
if !self
Expand Down
6 changes: 3 additions & 3 deletions crates/service/src/tap/checks/deny_list_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use sqlx::{postgres::PgListener, PgPool};
use tap_core::receipt::{
checks::{Check, CheckError, CheckResult},
state::Checking,
ReceiptWithState,
ReceiptWithState, SignedReceipt,
};
use thegraph_core::alloy::primitives::Address;

Expand Down Expand Up @@ -153,11 +153,11 @@ impl DenyListCheck {
}

#[async_trait::async_trait]
impl Check for DenyListCheck {
impl Check<SignedReceipt> for DenyListCheck {
async fn check(
&self,
ctx: &tap_core::receipt::Context,
_: &ReceiptWithState<Checking>,
_: &ReceiptWithState<Checking, SignedReceipt>,
) -> CheckResult {
let Sender(receipt_sender) = ctx
.get::<Sender>()
Expand Down
12 changes: 7 additions & 5 deletions crates/service/src/tap/checks/receipt_max_val_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub struct ReceiptMaxValueCheck {
use tap_core::receipt::{
checks::{Check, CheckError, CheckResult},
state::Checking,
ReceiptWithState,
ReceiptWithState, SignedReceipt,
};

impl ReceiptMaxValueCheck {
Expand All @@ -19,11 +19,11 @@ impl ReceiptMaxValueCheck {
}

#[async_trait::async_trait]
impl Check for ReceiptMaxValueCheck {
impl Check<SignedReceipt> for ReceiptMaxValueCheck {
async fn check(
&self,
_: &tap_core::receipt::Context,
receipt: &ReceiptWithState<Checking>,
receipt: &ReceiptWithState<Checking, SignedReceipt>,
) -> CheckResult {
let receipt_value = receipt.signed_receipt().message.value;

Expand Down Expand Up @@ -54,7 +54,9 @@ mod tests {
use super::*;
use crate::tap::Eip712Domain;

fn create_signed_receipt_with_custom_value(value: u128) -> ReceiptWithState<Checking> {
fn create_signed_receipt_with_custom_value(
value: u128,
) -> ReceiptWithState<Checking, SignedReceipt> {
let index: u32 = 0;
let wallet: PrivateKeySigner = MnemonicBuilder::<English>::default()
.phrase("abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about")
Expand Down Expand Up @@ -86,7 +88,7 @@ mod tests {
&wallet,
)
.unwrap();
ReceiptWithState::<Checking>::new(receipt)
ReceiptWithState::new(receipt)
}

const RECEIPT_LIMIT: u128 = 10;
Expand Down
6 changes: 3 additions & 3 deletions crates/service/src/tap/checks/sender_balance_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use indexer_monitor::EscrowAccounts;
use tap_core::receipt::{
checks::{Check, CheckError, CheckResult},
state::Checking,
ReceiptWithState,
ReceiptWithState, SignedReceipt,
};
use thegraph_core::alloy::primitives::U256;
use tokio::sync::watch::Receiver;
Expand All @@ -24,11 +24,11 @@ impl SenderBalanceCheck {
}

#[async_trait::async_trait]
impl Check for SenderBalanceCheck {
impl Check<SignedReceipt> for SenderBalanceCheck {
async fn check(
&self,
ctx: &tap_core::receipt::Context,
_: &ReceiptWithState<Checking>,
_: &ReceiptWithState<Checking, SignedReceipt>,
) -> CheckResult {
let escrow_accounts_snapshot = self.escrow_accounts.borrow();

Expand Down
14 changes: 8 additions & 6 deletions crates/service/src/tap/checks/timestamp_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub struct TimestampCheck {
use tap_core::receipt::{
checks::{Check, CheckError, CheckResult},
state::Checking,
ReceiptWithState,
ReceiptWithState, SignedReceipt,
};

impl TimestampCheck {
Expand All @@ -23,11 +23,11 @@ impl TimestampCheck {
}

#[async_trait::async_trait]
impl Check for TimestampCheck {
impl Check<SignedReceipt> for TimestampCheck {
async fn check(
&self,
_: &tap_core::receipt::Context,
receipt: &ReceiptWithState<Checking>,
receipt: &ReceiptWithState<Checking, SignedReceipt>,
) -> CheckResult {
let timestamp_now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
Expand All @@ -52,7 +52,9 @@ mod tests {
use std::time::{Duration, SystemTime};

use tap_core::{
receipt::{checks::Check, state::Checking, Context, Receipt, ReceiptWithState},
receipt::{
checks::Check, state::Checking, Context, Receipt, ReceiptWithState, SignedReceipt,
},
signed_message::EIP712SignedMessage,
tap_eip712_domain,
};
Expand All @@ -66,7 +68,7 @@ mod tests {

fn create_signed_receipt_with_custom_timestamp(
timestamp_ns: u64,
) -> ReceiptWithState<Checking> {
) -> ReceiptWithState<Checking, SignedReceipt> {
let index: u32 = 0;
let wallet: PrivateKeySigner = MnemonicBuilder::<English>::default()
.phrase("abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about")
Expand All @@ -89,7 +91,7 @@ mod tests {
&wallet,
)
.unwrap();
ReceiptWithState::<Checking>::new(receipt)
ReceiptWithState::new(receipt)
}

#[tokio::test]
Expand Down
10 changes: 7 additions & 3 deletions crates/service/src/tap/checks/value_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use sqlx::{
use tap_core::receipt::{
checks::{Check, CheckError, CheckResult},
state::Checking,
Context, ReceiptWithState,
Context, ReceiptWithState, SignedReceipt,
};
use thegraph_core::DeploymentId;

Expand Down Expand Up @@ -303,8 +303,12 @@ impl MinimumValue {
}

#[async_trait::async_trait]
impl Check for MinimumValue {
async fn check(&self, ctx: &Context, receipt: &ReceiptWithState<Checking>) -> CheckResult {
impl Check<SignedReceipt> for MinimumValue {
async fn check(
&self,
ctx: &Context,
receipt: &ReceiptWithState<Checking, SignedReceipt>,
) -> CheckResult {
let agora_query = ctx
.get()
.ok_or(CheckError::Failed(anyhow!("Could not find agora query")))?;
Expand Down
Loading
Loading