From 1263fb784100daaa77902e54d184ab8b7d75fe1d Mon Sep 17 00:00:00 2001 From: Filipe Azevedo Date: Thu, 2 Jan 2025 14:25:56 +0000 Subject: [PATCH] refactor: dips grpc (#529) --- .github/workflows/license_headers_check.yml | 1 + .github/workflows/tests.yml | 8 + Cargo.lock | 81 ++- Cargo.toml | 6 +- Dockerfile.indexer-service-rs | 5 +- Dockerfile.indexer-tap-agent | 2 + crates/config/maximal-config-example.toml | 3 +- crates/config/src/config.rs | 23 +- crates/dips/Cargo.toml | 10 + crates/dips/build.rs | 12 + crates/dips/proto/dips.proto | 59 ++ crates/dips/src/lib.rs | 191 ++++-- .../src/proto/graphprotocol.indexer.dips.rs | 621 ++++++++++++++++++ crates/dips/src/proto/mod.rs | 8 + crates/dips/src/server.rs | 87 +++ crates/dips/src/store.rs | 56 ++ crates/service/Cargo.toml | 1 + crates/service/src/database/dips.rs | 57 +- crates/service/src/routes/dips.rs | 335 ---------- crates/service/src/routes/mod.rs | 1 - crates/service/src/service.rs | 63 +- crates/service/src/service/router.rs | 32 +- crates/test-assets/src/lib.rs | 1 - 23 files changed, 1188 insertions(+), 475 deletions(-) create mode 100644 crates/dips/build.rs create mode 100644 crates/dips/proto/dips.proto create mode 100644 crates/dips/src/proto/graphprotocol.indexer.dips.rs create mode 100644 crates/dips/src/proto/mod.rs create mode 100644 crates/dips/src/server.rs create mode 100644 crates/dips/src/store.rs delete mode 100644 crates/service/src/routes/dips.rs diff --git a/.github/workflows/license_headers_check.yml b/.github/workflows/license_headers_check.yml index c4bd8363..1e588284 100644 --- a/.github/workflows/license_headers_check.yml +++ b/.github/workflows/license_headers_check.yml @@ -29,4 +29,5 @@ jobs: -ignore '.github/workflows/*.yaml' \ -ignore '.github/*.yaml' \ -ignore 'migrations/*.sql' \ + -ignore 'crates/dips/src/proto/*' \ . diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 1582eddb..fe79ea4a 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -50,6 +50,8 @@ jobs: - name: Run sccache-cache uses: mozilla-actions/sccache-action@054db53350805f83040bf3e6e9b8cf5a139aa7c9 # v0.0.7 if: ${{ !startsWith(github.head_ref, 'renovate/') }} + - name: Install protobuf compiler + run: apt-get update && apt-get install protobuf-compiler -y - name: Install sqlx run: cargo install sqlx-cli --no-default-features --features postgres - name: Run the test sqlx migrations @@ -78,6 +80,8 @@ jobs: - name: Run sccache-cache uses: mozilla-actions/sccache-action@054db53350805f83040bf3e6e9b8cf5a139aa7c9 # v0.0.7 if: ${{ !startsWith(github.head_ref, 'renovate/') }} + - name: Install protobuf compiler + run: apt-get update && apt-get install protobuf-compiler -y - run: | rustup component add clippy # Temporarily allowing dead-code, while denying all other warnings @@ -116,6 +120,8 @@ jobs: echo "RUSTC_WRAPPER=sccache" >> $GITHUB_ENV echo "SCCACHE_GHA_ENABLED=true" >> $GITHUB_ENV if: ${{ !startsWith(github.head_ref, 'renovate/') }} + - name: Install protobuf compiler + run: apt-get update && apt-get install protobuf-compiler -y - name: Run sccache-cache uses: mozilla-actions/sccache-action@054db53350805f83040bf3e6e9b8cf5a139aa7c9 # v0.0.7 if: ${{ !startsWith(github.head_ref, 'renovate/') }} @@ -166,6 +172,8 @@ jobs: echo "RUSTC_WRAPPER=sccache" >> $GITHUB_ENV echo "SCCACHE_GHA_ENABLED=true" >> $GITHUB_ENV if: ${{ !startsWith(github.head_ref, 'renovate/') }} + - name: Install protobuf compiler + run: apt-get update && apt-get install protobuf-compiler -y - name: Run sccache-cache uses: mozilla-actions/sccache-action@054db53350805f83040bf3e6e9b8cf5a139aa7c9 # v0.0.7 if: ${{ !startsWith(github.head_ref, 'renovate/') }} diff --git a/Cargo.lock b/Cargo.lock index 31c359f6..49ccb9b7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2679,6 +2679,12 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "fixedbitset" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" + [[package]] name = "flate2" version = "1.0.35" @@ -3329,7 +3335,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.4.10", + "socket2 0.5.8", "tokio", "tower-service", "tracing", @@ -3675,8 +3681,16 @@ version = "0.1.0" dependencies = [ "alloy-rlp", "anyhow", + "async-trait", + "base64 0.22.1", + "prost", + "prost-types", "thegraph-core", "thiserror 1.0.69", + "tokio", + "tonic", + "tonic-build", + "uuid", ] [[package]] @@ -3760,6 +3774,7 @@ dependencies = [ "tokio", "tokio-test", "tokio-util", + "tonic", "tower 0.5.2", "tower-http", "tower-service", @@ -4402,6 +4417,12 @@ dependencies = [ "version_check", ] +[[package]] +name = "multimap" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" + [[package]] name = "native-tls" version = "0.2.12" @@ -4905,6 +4926,16 @@ dependencies = [ "ucd-trie", ] +[[package]] +name = "petgraph" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db" +dependencies = [ + "fixedbitset", + "indexmap 2.7.0", +] + [[package]] name = "pharos" version = "0.5.3" @@ -5036,6 +5067,16 @@ dependencies = [ "yansi", ] +[[package]] +name = "prettyplease" +version = "0.2.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64d1ec885c64d0457d564db4ec299b2dae3f9c02808b8ad9c3a089c591b18033" +dependencies = [ + "proc-macro2", + "syn 2.0.90", +] + [[package]] name = "primitive-types" version = "0.12.2" @@ -5168,6 +5209,27 @@ dependencies = [ "prost-derive", ] +[[package]] +name = "prost-build" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15" +dependencies = [ + "bytes", + "heck 0.5.0", + "itertools 0.13.0", + "log", + "multimap", + "once_cell", + "petgraph", + "prettyplease", + "prost", + "prost-types", + "regex", + "syn 2.0.90", + "tempfile", +] + [[package]] name = "prost-derive" version = "0.13.3" @@ -7299,6 +7361,7 @@ dependencies = [ "axum", "base64 0.22.1", "bytes", + "flate2", "h2 0.4.7", "http 1.1.0", "http-body 1.0.1", @@ -7321,6 +7384,20 @@ dependencies = [ "tracing", ] +[[package]] +name = "tonic-build" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9557ce109ea773b399c9b9e5dca39294110b74f1f342cb347a80d1fce8c26a11" +dependencies = [ + "prettyplease", + "proc-macro2", + "prost-build", + "prost-types", + "quote", + "syn 2.0.90", +] + [[package]] name = "tower" version = "0.4.13" @@ -7987,7 +8064,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.59.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index fd64e46c..35855d4a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,7 @@ tokio = "1.40" prometheus = "0.13.3" anyhow = { version = "1.0.72" } thiserror = "1.0.49" -async-trait = "0.1.72" +async-trait = "0.1.83" eventuals = "0.6.7" base64 = "0.22.1" reqwest = { version = "0.12", features = [ @@ -75,3 +75,7 @@ bip39 = "2.0.0" rstest = "0.23.0" wiremock = "0.6.1" typed-builder = "0.20.0" +tonic = { version = "0.12.3", features = ["tls-roots", "gzip"] } +tonic-build = { version = "0.12.3", features = ["prost"] } +prost = "0.13.3" +prost-types = "0.13.3" diff --git a/Dockerfile.indexer-service-rs b/Dockerfile.indexer-service-rs index a6518f27..39eb32d8 100644 --- a/Dockerfile.indexer-service-rs +++ b/Dockerfile.indexer-service-rs @@ -6,6 +6,9 @@ COPY . . # Force SQLx to use the offline mode to statically check the database queries against # the prepared files in the `.sqlx` directory. ENV SQLX_OFFLINE=true + +RUN apt-get update && apt-get install -y --no-install-recommends \ + protobuf-compiler && rm -rf /var/lib/apt/lists/* RUN cargo build --release --bin indexer-service-rs ######################################################################################## @@ -13,7 +16,7 @@ RUN cargo build --release --bin indexer-service-rs FROM debian:bookworm-slim RUN apt-get update && apt-get install -y --no-install-recommends \ - openssl ca-certificates \ + openssl ca-certificates protobuf-compiler \ && rm -rf /var/lib/apt/lists/* COPY --from=build /root/target/release/indexer-service-rs /usr/local/bin/indexer-service-rs diff --git a/Dockerfile.indexer-tap-agent b/Dockerfile.indexer-tap-agent index 306944b3..3663c9d4 100644 --- a/Dockerfile.indexer-tap-agent +++ b/Dockerfile.indexer-tap-agent @@ -6,6 +6,8 @@ COPY . . # Force SQLx to use the offline mode to statically check the database queries against # the prepared files in the `.sqlx` directory. ENV SQLX_OFFLINE=true +RUN apt-get update && apt-get install -y --no-install-recommends \ + protobuf-compiler && rm -rf /var/lib/apt/lists/* RUN cargo build --release --bin indexer-tap-agent ######################################################################################## diff --git a/crates/config/maximal-config-example.toml b/crates/config/maximal-config-example.toml index d5e84783..d96d114a 100644 --- a/crates/config/maximal-config-example.toml +++ b/crates/config/maximal-config-example.toml @@ -150,5 +150,6 @@ max_receipts_per_request = 10000 0x0123456789abcdef0123456789abcdef01234567 = "https://other.example.com/aggregate-receipts" [dips] +host = "0.0.0.0" +port = "7601" allowed_payers = ["0x3333333333333333333333333333333333333333"] - diff --git a/crates/config/src/config.rs b/crates/config/src/config.rs index 3315dfe6..44b71150 100644 --- a/crates/config/src/config.rs +++ b/crates/config/src/config.rs @@ -387,10 +387,23 @@ pub struct TapConfig { #[derive(Debug, Deserialize)] #[cfg_attr(test, derive(PartialEq))] pub struct DipsConfig { + pub host: String, + pub port: String, pub allowed_payers: Vec
, pub cancellation_time_tolerance: Option, } +impl Default for DipsConfig { + fn default() -> Self { + DipsConfig { + host: "0.0.0.0".to_string(), + port: "7601".to_string(), + allowed_payers: vec![], + cancellation_time_tolerance: None, + } + } +} + impl TapConfig { pub fn get_trigger_value(&self) -> u128 { let grt_wei = self.max_amount_willing_to_lose_grt.get_value(); @@ -420,11 +433,11 @@ pub struct RavRequestConfig { #[cfg(test)] mod tests { - use std::{env, fs, path::PathBuf}; + use std::{env, fs, path::PathBuf, str::FromStr}; use figment::value::Uncased; use sealed_test::prelude::*; - use thegraph_core::alloy::primitives::address; + use thegraph_core::alloy::primitives::{Address, FixedBytes}; use tracing_test::traced_test; use super::{DatabaseConfig, SHARED_PREFIX}; @@ -448,8 +461,10 @@ mod tests { ) .unwrap(); max_config.dips = Some(crate::DipsConfig { - allowed_payers: vec![address!("3333333333333333333333333333333333333333")], - cancellation_time_tolerance: None, + allowed_payers: vec![Address( + FixedBytes::<20>::from_str("0x3333333333333333333333333333333333333333").unwrap(), + )], + ..Default::default() }); let max_config_file: Config = toml::from_str( diff --git a/crates/dips/Cargo.toml b/crates/dips/Cargo.toml index 55dc8f9d..049fd23b 100644 --- a/crates/dips/Cargo.toml +++ b/crates/dips/Cargo.toml @@ -8,3 +8,13 @@ thiserror.workspace = true anyhow.workspace = true alloy-rlp = "0.3.10" thegraph-core.workspace = true +tonic.workspace = true +async-trait.workspace = true +prost.workspace = true +prost-types.workspace = true +uuid.workspace = true +base64.workspace = true +tokio.workspace = true + +[build-dependencies] +tonic-build = { workspace = true } diff --git a/crates/dips/build.rs b/crates/dips/build.rs new file mode 100644 index 00000000..1d761845 --- /dev/null +++ b/crates/dips/build.rs @@ -0,0 +1,12 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +fn main() { + println!("cargo:rerun-if-changed=proto"); + tonic_build::configure() + .out_dir("src/proto") + .include_file("mod.rs") + .protoc_arg("--experimental_allow_proto3_optional") + .compile_protos(&["proto/dips.proto"], &["proto"]) + .expect("Failed to compile dips proto(s)"); +} diff --git a/crates/dips/proto/dips.proto b/crates/dips/proto/dips.proto new file mode 100644 index 00000000..400c484a --- /dev/null +++ b/crates/dips/proto/dips.proto @@ -0,0 +1,59 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +syntax = "proto3"; + +package graphprotocol.indexer.dips; + +service AgreementService { + rpc CreateAgreement(CreateAgreementRequest) returns (CreateAgreementResponse); + rpc CancelAgreement(CancelAgreementRequest) returns (AgreementCanellationResponse); + rpc GetAgreementById(GetAgreementByIdRequest) returns (GetAgreementByIdResponse); + rpc GetPrice(PriceRequest) returns (PriceResponse); +} + +message GetAgreementByIdRequest { + +} + +message GetAgreementByIdResponse { + +} + +message CreateAgreementRequest { + string id = 1; + bytes signed_voucher = 2; +} + +message CancelAgreementRequest { + string id = 1; + bytes signed_voucher = 2; +} + +message CreateAgreementResponse { + string uuid = 1; +} + +message AgreementCanellationResponse { + string uuid = 1; +} + +message PriceRequest { + ProtocolNetwork protocol = 1; + string chain_id = 2; +} + +message PriceResponse { + optional Price price = 1; +} + +message Price { + string price_per_block = 1; + string chain_id = 2; + ProtocolNetwork protocol = 3; +} + +enum ProtocolNetwork { + UNKNOWN = 0; + EVM = 1; +} diff --git a/crates/dips/src/lib.rs b/crates/dips/src/lib.rs index 1bd55f82..da9a5a6e 100644 --- a/crates/dips/src/lib.rs +++ b/crates/dips/src/lib.rs @@ -3,9 +3,11 @@ use std::{ str::FromStr, + sync::Arc, time::{Duration, SystemTime, UNIX_EPOCH}, }; +use alloy_rlp::Decodable; use thegraph_core::alloy::{ core::primitives::Address, primitives::PrimitiveSignature as Signature, @@ -14,6 +16,14 @@ use thegraph_core::alloy::{ sol, sol_types::{Eip712Domain, SolStruct}, }; + +pub mod proto; +pub mod server; +pub mod store; + +use store::AgreementStore; +use uuid::Uuid; + use thiserror::Error; sol! { @@ -77,14 +87,34 @@ sol! { } } -#[derive(Error, Debug, PartialEq)] -pub enum AgreementVoucherValidationError { +#[derive(Error, Debug)] +pub enum DipsError { + // agreement cration #[error("signature is not valid, error: {0}")] InvalidSignature(String), #[error("payer {0} not authorised")] PayerNotAuthorised(Address), #[error("voucher payee {actual} does not match the expected address {expected}")] UnexpectedPayee { expected: Address, actual: Address }, + // cancellation + #[error("cancelled_by is expected to match the signer")] + UnexpectedSigner, + #[error("signer {0} not authorised")] + SignerNotAuthorised(Address), + #[error("cancellation request has expired")] + ExpiredRequest, + // misc + #[error("rlp (de)serialisation failed")] + RlpSerializationError(#[from] alloy_rlp::Error), + #[error("unknown error: {0}")] + UnknownError(#[from] anyhow::Error), +} + +// TODO: send back messages +impl From for tonic::Status { + fn from(_val: DipsError) -> Self { + tonic::Status::internal("unknown errr") + } } impl IndexingAgreementVoucher { @@ -109,22 +139,22 @@ impl SignedIndexingAgreementVoucher { domain: &Eip712Domain, expected_payee: &Address, allowed_payers: impl AsRef<[Address]>, - ) -> Result<(), AgreementVoucherValidationError> { + ) -> Result<(), DipsError> { let sig = Signature::from_str(&self.signature.to_string()) - .map_err(|err| AgreementVoucherValidationError::InvalidSignature(err.to_string()))?; + .map_err(|err| DipsError::InvalidSignature(err.to_string()))?; let payer = sig .recover_address_from_prehash(&self.voucher.eip712_signing_hash(domain)) - .map_err(|err| AgreementVoucherValidationError::InvalidSignature(err.to_string()))?; + .map_err(|err| DipsError::InvalidSignature(err.to_string()))?; if allowed_payers.as_ref().is_empty() || !allowed_payers.as_ref().iter().any(|addr| addr.eq(&payer)) { - return Err(AgreementVoucherValidationError::PayerNotAuthorised(payer)); + return Err(DipsError::PayerNotAuthorised(payer)); } if !self.voucher.payee.eq(expected_payee) { - return Err(AgreementVoucherValidationError::UnexpectedPayee { + return Err(DipsError::UnexpectedPayee { expected: *expected_payee, actual: self.voucher.payee, }); @@ -141,18 +171,6 @@ impl SignedIndexingAgreementVoucher { } } -#[derive(Error, Debug, PartialEq)] -pub enum CancellationRequestValidationError { - #[error("signature is not valid, error: {0}")] - InvalidSignature(String), - #[error("cancelled_by is expected to match the signer")] - UnexpectedSigner, - #[error("signer {0} not authorised")] - SignerNotAuthorised(Address), - #[error("cancellation request has expired")] - ExpiredRequest, -} - impl CancellationRequest { pub fn sign( &self, @@ -174,30 +192,28 @@ impl SignedCancellationRequest { &self, domain: &Eip712Domain, time_tolerance: Duration, - ) -> Result<(), CancellationRequestValidationError> { + ) -> Result<(), DipsError> { let sig = Signature::from_str(&self.signature.to_string()) - .map_err(|err| CancellationRequestValidationError::InvalidSignature(err.to_string()))?; + .map_err(|err| DipsError::InvalidSignature(err.to_string()))?; let signer = sig .recover_address_from_prehash(&self.request.eip712_signing_hash(domain)) - .map_err(|err| CancellationRequestValidationError::InvalidSignature(err.to_string()))?; + .map_err(|err| DipsError::InvalidSignature(err.to_string()))?; if signer.ne(&self.request.cancellled_by) { - return Err(CancellationRequestValidationError::UnexpectedSigner); + return Err(DipsError::UnexpectedSigner); } if signer.ne(&self.request.payer) && signer.ne(&self.request.payee) { - return Err(CancellationRequestValidationError::SignerNotAuthorised( - signer, - )); + return Err(DipsError::SignerNotAuthorised(signer)); } let now = SystemTime::now() .duration_since(UNIX_EPOCH) - .map_err(|_| CancellationRequestValidationError::ExpiredRequest)? + .map_err(|_| DipsError::ExpiredRequest)? .as_secs(); if now - self.request.timestamp >= time_tolerance.as_secs() { - return Err(CancellationRequestValidationError::ExpiredRequest); + return Err(DipsError::ExpiredRequest); } Ok(()) @@ -210,8 +226,46 @@ impl SignedCancellationRequest { } } +pub async fn validate_and_create_agreement( + store: Arc, + domain: &Eip712Domain, + id: Uuid, + expected_payee: &Address, + allowed_payers: impl AsRef<[Address]>, + voucher: Vec, +) -> Result { + let voucher = SignedIndexingAgreementVoucher::decode(&mut voucher.as_ref())?; + let metadata = SubgraphIndexingVoucherMetadata::decode(&mut voucher.voucher.metadata.as_ref())?; + + voucher.validate(domain, expected_payee, allowed_payers)?; + + store + .create_agreement(id, voucher, metadata.protocolNetwork.to_string()) + .await?; + + Ok(id) +} + +pub async fn validate_and_cancel_agreement( + store: Arc, + domain: &Eip712Domain, + id: Uuid, + agreement: Vec, + time_tolerance: Duration, +) -> Result { + let voucher = SignedCancellationRequest::decode(&mut agreement.as_ref())?; + + voucher.validate(domain, time_tolerance)?; + + store.cancel_agreement(id, voucher).await?; + + Ok(id) +} + #[cfg(test)] mod test { + use std::sync::Arc; + use std::time::{Duration, SystemTime, UNIX_EPOCH}; use thegraph_core::{ @@ -223,10 +277,64 @@ mod test { attestation::eip712_domain, }; - use crate::{ - AgreementVoucherValidationError, CancellationRequest, CancellationRequestValidationError, - IndexingAgreementVoucher, SubgraphIndexingVoucherMetadata, - }; + use crate::{CancellationRequest, DipsError}; + use crate::{IndexingAgreementVoucher, SubgraphIndexingVoucherMetadata}; + use uuid::Uuid; + + pub use crate::store::{AgreementStore, InMemoryAgreementStore}; + + #[tokio::test] + async fn test_validate_and_create_agreement() -> anyhow::Result<()> { + let deployment_id = "Qmbg1qF4YgHjiVfsVt6a13ddrVcRtWyJQfD4LA3CwHM29f".to_string(); + let payee = PrivateKeySigner::random(); + let payee_addr = payee.address(); + let payer = PrivateKeySigner::random(); + let payer_addr = payer.address(); + + let metadata = SubgraphIndexingVoucherMetadata { + pricePerBlock: U256::from(10000_u64), + protocolNetwork: FixedBytes::left_padding_from("arbitrum-one".as_bytes()), + chainId: FixedBytes::left_padding_from("mainnet".as_bytes()), + deployment_ipfs_hash: deployment_id, + }; + + let voucher = IndexingAgreementVoucher { + payer: payer_addr, + payee: payee_addr, + service: Address(FixedBytes::ZERO), + maxInitialAmount: U256::from(10000_u64), + maxOngoingAmountPerEpoch: U256::from(10000_u64), + deadline: 1000, + maxEpochsPerCollection: 1000, + minEpochsPerCollection: 1000, + durationEpochs: 1000, + metadata: alloy_rlp::encode(metadata).into(), + }; + let domain = eip712_domain(0, Address::ZERO); + + let voucher = voucher.sign(&domain, payer)?; + let rlp_voucher = alloy_rlp::encode(voucher.clone()); + let id = Uuid::now_v7(); + + let store = Arc::new(InMemoryAgreementStore::default()); + + let actual_id = super::validate_and_create_agreement( + store.clone(), + &domain, + id, + &payee_addr, + vec![payer_addr], + rlp_voucher, + ) + .await + .unwrap(); + + let actual = store.get_by_id(actual_id).await.unwrap(); + + let actual_voucher = actual.unwrap(); + assert_eq!(voucher, actual_voucher); + Ok(()) + } #[test] fn voucher_signature_verification() { @@ -259,8 +367,11 @@ mod test { let domain = eip712_domain(0, Address::ZERO); let signed = voucher.sign(&domain, payer).unwrap(); assert_eq!( - signed.validate(&domain, &payee_addr, vec![]).unwrap_err(), - AgreementVoucherValidationError::PayerNotAuthorised(voucher.payer) + signed + .validate(&domain, &payee_addr, vec![]) + .unwrap_err() + .to_string(), + DipsError::PayerNotAuthorised(voucher.payer).to_string() ); assert!(signed .validate(&domain, &payee_addr, vec![payer_addr]) @@ -303,7 +414,7 @@ mod test { signed .validate(&domain, &payee_addr, vec![payer_addr]) .unwrap_err(), - AgreementVoucherValidationError::PayerNotAuthorised(_) + DipsError::PayerNotAuthorised(_) )); } @@ -331,7 +442,7 @@ mod test { name: &'a str, signer: PrivateKeySigner, timestamp: u64, - error: Option, + error: Option, } let cases: Vec = vec![ @@ -351,17 +462,13 @@ mod test { name: "invalid signer", signer: other_signer.clone(), timestamp: now, - error: Some(CancellationRequestValidationError::SignerNotAuthorised( - other_signer.address(), - )), + error: Some(DipsError::SignerNotAuthorised(other_signer.address())), }, Case { name: "expired timestamp", signer: payee, timestamp: 100, - error: Some(CancellationRequestValidationError::SignerNotAuthorised( - other_signer.address(), - )), + error: Some(DipsError::SignerNotAuthorised(other_signer.address())), }, ]; diff --git a/crates/dips/src/proto/graphprotocol.indexer.dips.rs b/crates/dips/src/proto/graphprotocol.indexer.dips.rs new file mode 100644 index 00000000..57fce10e --- /dev/null +++ b/crates/dips/src/proto/graphprotocol.indexer.dips.rs @@ -0,0 +1,621 @@ +// This file is @generated by prost-build. +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct GetAgreementByIdRequest {} +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct GetAgreementByIdResponse {} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct CreateAgreementRequest { + #[prost(string, tag = "1")] + pub id: ::prost::alloc::string::String, + #[prost(bytes = "vec", tag = "2")] + pub signed_voucher: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct CancelAgreementRequest { + #[prost(string, tag = "1")] + pub id: ::prost::alloc::string::String, + #[prost(bytes = "vec", tag = "2")] + pub signed_voucher: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct CreateAgreementResponse { + #[prost(string, tag = "1")] + pub uuid: ::prost::alloc::string::String, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct AgreementCanellationResponse { + #[prost(string, tag = "1")] + pub uuid: ::prost::alloc::string::String, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PriceRequest { + #[prost(enumeration = "ProtocolNetwork", tag = "1")] + pub protocol: i32, + #[prost(string, tag = "2")] + pub chain_id: ::prost::alloc::string::String, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PriceResponse { + #[prost(message, optional, tag = "1")] + pub price: ::core::option::Option, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Price { + #[prost(string, tag = "1")] + pub price_per_block: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub chain_id: ::prost::alloc::string::String, + #[prost(enumeration = "ProtocolNetwork", tag = "3")] + pub protocol: i32, +} +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum ProtocolNetwork { + Unknown = 0, + Evm = 1, +} +impl ProtocolNetwork { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Self::Unknown => "UNKNOWN", + Self::Evm => "EVM", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "UNKNOWN" => Some(Self::Unknown), + "EVM" => Some(Self::Evm), + _ => None, + } + } +} +/// Generated client implementations. +pub mod agreement_service_client { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + use tonic::codegen::http::Uri; + #[derive(Debug, Clone)] + pub struct AgreementServiceClient { + inner: tonic::client::Grpc, + } + impl AgreementServiceClient { + /// Attempt to create a new client by connecting to a given endpoint. + pub async fn connect(dst: D) -> Result + where + D: TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; + Ok(Self::new(conn)) + } + } + impl AgreementServiceClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + std::marker::Send + 'static, + ::Error: Into + std::marker::Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> AgreementServiceClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + , + >>::Error: Into + std::marker::Send + std::marker::Sync, + { + AgreementServiceClient::new(InterceptedService::new(inner, interceptor)) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + pub async fn create_agreement( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/graphprotocol.indexer.dips.AgreementService/CreateAgreement", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "graphprotocol.indexer.dips.AgreementService", + "CreateAgreement", + ), + ); + self.inner.unary(req, path, codec).await + } + pub async fn cancel_agreement( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/graphprotocol.indexer.dips.AgreementService/CancelAgreement", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "graphprotocol.indexer.dips.AgreementService", + "CancelAgreement", + ), + ); + self.inner.unary(req, path, codec).await + } + pub async fn get_agreement_by_id( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/graphprotocol.indexer.dips.AgreementService/GetAgreementById", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "graphprotocol.indexer.dips.AgreementService", + "GetAgreementById", + ), + ); + self.inner.unary(req, path, codec).await + } + pub async fn get_price( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/graphprotocol.indexer.dips.AgreementService/GetPrice", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "graphprotocol.indexer.dips.AgreementService", + "GetPrice", + ), + ); + self.inner.unary(req, path, codec).await + } + } +} +/// Generated server implementations. +pub mod agreement_service_server { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + /// Generated trait containing gRPC methods that should be implemented for use with AgreementServiceServer. + #[async_trait] + pub trait AgreementService: std::marker::Send + std::marker::Sync + 'static { + async fn create_agreement( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + async fn cancel_agreement( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + async fn get_agreement_by_id( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + async fn get_price( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + } + #[derive(Debug)] + pub struct AgreementServiceServer { + inner: Arc, + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, + } + impl AgreementServiceServer { + pub fn new(inner: T) -> Self { + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { + Self { + inner, + accept_compression_encodings: Default::default(), + send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, + } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService + where + F: tonic::service::Interceptor, + { + InterceptedService::new(Self::new(inner), interceptor) + } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } + } + impl tonic::codegen::Service> for AgreementServiceServer + where + T: AgreementService, + B: Body + std::marker::Send + 'static, + B::Error: Into + std::marker::Send + 'static, + { + type Response = http::Response; + type Error = std::convert::Infallible; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: http::Request) -> Self::Future { + match req.uri().path() { + "/graphprotocol.indexer.dips.AgreementService/CreateAgreement" => { + #[allow(non_camel_case_types)] + struct CreateAgreementSvc(pub Arc); + impl< + T: AgreementService, + > tonic::server::UnaryService + for CreateAgreementSvc { + type Response = super::CreateAgreementResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::create_agreement(&inner, request) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = CreateAgreementSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/graphprotocol.indexer.dips.AgreementService/CancelAgreement" => { + #[allow(non_camel_case_types)] + struct CancelAgreementSvc(pub Arc); + impl< + T: AgreementService, + > tonic::server::UnaryService + for CancelAgreementSvc { + type Response = super::AgreementCanellationResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::cancel_agreement(&inner, request) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = CancelAgreementSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/graphprotocol.indexer.dips.AgreementService/GetAgreementById" => { + #[allow(non_camel_case_types)] + struct GetAgreementByIdSvc(pub Arc); + impl< + T: AgreementService, + > tonic::server::UnaryService + for GetAgreementByIdSvc { + type Response = super::GetAgreementByIdResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::get_agreement_by_id( + &inner, + request, + ) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = GetAgreementByIdSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/graphprotocol.indexer.dips.AgreementService/GetPrice" => { + #[allow(non_camel_case_types)] + struct GetPriceSvc(pub Arc); + impl< + T: AgreementService, + > tonic::server::UnaryService + for GetPriceSvc { + type Response = super::PriceResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::get_price(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = GetPriceSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + _ => { + Box::pin(async move { + let mut response = http::Response::new(empty_body()); + let headers = response.headers_mut(); + headers + .insert( + tonic::Status::GRPC_STATUS, + (tonic::Code::Unimplemented as i32).into(), + ); + headers + .insert( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ); + Ok(response) + }) + } + } + } + } + impl Clone for AgreementServiceServer { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { + inner, + accept_compression_encodings: self.accept_compression_encodings, + send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, + } + } + } + /// Generated gRPC service name + pub const SERVICE_NAME: &str = "graphprotocol.indexer.dips.AgreementService"; + impl tonic::server::NamedService for AgreementServiceServer { + const NAME: &'static str = SERVICE_NAME; + } +} diff --git a/crates/dips/src/proto/mod.rs b/crates/dips/src/proto/mod.rs new file mode 100644 index 00000000..8edb4be2 --- /dev/null +++ b/crates/dips/src/proto/mod.rs @@ -0,0 +1,8 @@ +// This file is @generated by prost-build. +pub mod graphprotocol { + pub mod indexer { + pub mod dips { + include!("graphprotocol.indexer.dips.rs"); + } + } +} diff --git a/crates/dips/src/server.rs b/crates/dips/src/server.rs new file mode 100644 index 00000000..24bab8df --- /dev/null +++ b/crates/dips/src/server.rs @@ -0,0 +1,87 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use std::{str::FromStr, sync::Arc, time::Duration}; + +use crate::{ + proto::graphprotocol::indexer::dips::*, store::AgreementStore, validate_and_cancel_agreement, + validate_and_create_agreement, DipsError, +}; +use anyhow::anyhow; +use async_trait::async_trait; +use thegraph_core::alloy::{dyn_abi::Eip712Domain, primitives::Address}; +use uuid::Uuid; + +#[derive(Debug)] +pub struct DipsServer { + pub agreement_store: Arc, + pub expected_payee: Address, + pub allowed_payers: Vec
, + pub domain: Eip712Domain, + pub cancel_voucher_time_tolerance: Duration, +} + +#[async_trait] +impl agreement_service_server::AgreementService for DipsServer { + async fn create_agreement( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + let CreateAgreementRequest { id, signed_voucher } = request.into_inner(); + let uid = Uuid::from_str(&id).map_err(|_| { + Into::::into(DipsError::from(anyhow!("failed to parse uuid"))) + })?; + + validate_and_create_agreement( + self.agreement_store.clone(), + &self.domain, + uid, + &self.expected_payee, + &self.allowed_payers, + signed_voucher, + ) + .await + .map_err(Into::::into)?; + + Ok(tonic::Response::new(CreateAgreementResponse { + uuid: uid.to_string(), + })) + } + + async fn cancel_agreement( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + let CancelAgreementRequest { id, signed_voucher } = request.into_inner(); + let uid = Uuid::from_str(&id).map_err(|_| { + Into::::into(DipsError::from(anyhow!("failed to parse uuid"))) + })?; + + validate_and_cancel_agreement( + self.agreement_store.clone(), + &self.domain, + uid, + signed_voucher, + self.cancel_voucher_time_tolerance, + ) + .await + .map_err(Into::::into)?; + + Ok(tonic::Response::new(AgreementCanellationResponse { + uuid: uid.to_string(), + })) + } + async fn get_agreement_by_id( + &self, + _request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + todo!() + } + + async fn get_price( + &self, + _request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + todo!() + } +} diff --git a/crates/dips/src/store.rs b/crates/dips/src/store.rs new file mode 100644 index 00000000..7453b484 --- /dev/null +++ b/crates/dips/src/store.rs @@ -0,0 +1,56 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use std::collections::HashMap; + +use async_trait::async_trait; +use uuid::Uuid; + +use crate::{SignedCancellationRequest, SignedIndexingAgreementVoucher}; + +#[async_trait] +pub trait AgreementStore: Sync + Send + std::fmt::Debug { + async fn get_by_id(&self, id: Uuid) -> anyhow::Result>; + async fn create_agreement( + &self, + id: Uuid, + agreement: SignedIndexingAgreementVoucher, + protocol: String, + ) -> anyhow::Result<()>; + async fn cancel_agreement( + &self, + id: Uuid, + signed_cancellation: SignedCancellationRequest, + ) -> anyhow::Result; +} + +#[derive(Default, Debug)] +pub struct InMemoryAgreementStore { + pub data: tokio::sync::RwLock>, +} + +#[async_trait] +impl AgreementStore for InMemoryAgreementStore { + async fn get_by_id(&self, id: Uuid) -> anyhow::Result> { + Ok(self.data.try_read()?.get(&id).cloned()) + } + async fn create_agreement( + &self, + id: Uuid, + agreement: SignedIndexingAgreementVoucher, + _protocol: String, + ) -> anyhow::Result<()> { + self.data.try_write()?.insert(id, agreement.clone()); + + Ok(()) + } + async fn cancel_agreement( + &self, + id: Uuid, + _signed_cancellation: SignedCancellationRequest, + ) -> anyhow::Result { + self.data.try_write()?.remove(&id); + + Ok(id) + } +} diff --git a/crates/service/Cargo.toml b/crates/service/Cargo.toml index 167e19c2..bd673cc9 100644 --- a/crates/service/Cargo.toml +++ b/crates/service/Cargo.toml @@ -56,6 +56,7 @@ cost-model = { git = "https://github.com/graphprotocol/agora", rev = "3ed34ca" } bip39.workspace = true tower = "0.5.1" pin-project = "1.1.7" +tonic.workspace = true [dev-dependencies] hex-literal = "0.4.1" diff --git a/crates/service/src/database/dips.rs b/crates/service/src/database/dips.rs index eb667764..2709ec03 100644 --- a/crates/service/src/database/dips.rs +++ b/crates/service/src/database/dips.rs @@ -1,66 +1,21 @@ // Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 -use std::collections::HashMap; - use anyhow::bail; use axum::async_trait; use build_info::chrono::Utc; -use indexer_dips::{SignedCancellationRequest, SignedIndexingAgreementVoucher}; + +use indexer_dips::{ + store::AgreementStore, SignedCancellationRequest, SignedIndexingAgreementVoucher, +}; + use sqlx::PgPool; use thegraph_core::alloy::rlp::Decodable; use uuid::Uuid; -#[async_trait] -pub trait AgreementStore: Sync + Send { - async fn get_by_id(&self, id: Uuid) -> anyhow::Result>; - async fn create_agreement( - &self, - id: Uuid, - agreement: SignedIndexingAgreementVoucher, - protocol: String, - ) -> anyhow::Result<()>; - async fn cancel_agreement( - &self, - id: Uuid, - signed_cancellation: SignedCancellationRequest, - ) -> anyhow::Result; -} - -#[derive(Default)] -pub struct InMemoryAgreementStore { - pub data: tokio::sync::RwLock>, -} - -#[async_trait] -impl AgreementStore for InMemoryAgreementStore { - async fn get_by_id(&self, id: Uuid) -> anyhow::Result> { - Ok(self.data.try_read()?.get(&id).cloned()) - } - async fn create_agreement( - &self, - id: Uuid, - agreement: SignedIndexingAgreementVoucher, - _protocol: String, - ) -> anyhow::Result<()> { - self.data.try_write()?.insert(id, agreement.clone()); - - Ok(()) - } - async fn cancel_agreement( - &self, - id: Uuid, - _signed_cancellation: SignedCancellationRequest, - ) -> anyhow::Result { - self.data.try_write()?.remove(&id); - - Ok(id) - } -} - #[derive(Debug)] pub struct PsqlAgreementStore { - pool: PgPool, + pub pool: PgPool, } #[async_trait] diff --git a/crates/service/src/routes/dips.rs b/crates/service/src/routes/dips.rs deleted file mode 100644 index d9014dfa..00000000 --- a/crates/service/src/routes/dips.rs +++ /dev/null @@ -1,335 +0,0 @@ -// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - -use std::{str::FromStr, sync::Arc, time::Duration}; - -use anyhow::bail; -use async_graphql::{Context, EmptySubscription, FieldResult, Object, Schema, SimpleObject}; -use base64::{engine::general_purpose::STANDARD, Engine}; -use indexer_config::{BlockchainConfig, DipsConfig}; -use indexer_dips::{ - SignedCancellationRequest, SignedIndexingAgreementVoucher, SubgraphIndexingVoucherMetadata, -}; -use thegraph_core::{ - alloy::{primitives::Address, rlp::Decodable, sol_types::Eip712Domain}, - attestation::eip712_domain, -}; -use uuid::Uuid; - -use crate::database::dips::AgreementStore; - -pub type DipsSchema = Schema; -pub type DipsStore = Arc; - -pub fn build_schema( - indexer_address: Address, - DipsConfig { - allowed_payers, - cancellation_time_tolerance, - }: &DipsConfig, - BlockchainConfig { - chain_id, - receipts_verifier_address, - }: &BlockchainConfig, - agreement_store: DipsStore, - prices: Vec, -) -> DipsSchema { - Schema::build( - AgreementQuery {}, - AgreementMutation { - expected_payee: indexer_address, - allowed_payers: allowed_payers.clone(), - domain: eip712_domain(*chain_id as u64, *receipts_verifier_address), - cancel_voucher_time_tolerance: cancellation_time_tolerance - .unwrap_or(Duration::from_secs(5)), - }, - EmptySubscription, - ) - .data(agreement_store) - .data(prices) - .finish() -} - -pub enum NetworkProtocol { - ArbitrumMainnet, -} - -impl FromStr for NetworkProtocol { - type Err = anyhow::Error; - - fn from_str(s: &str) -> Result { - let p = match s { - "arbitrum-mainnet" => NetworkProtocol::ArbitrumMainnet, - _ => bail!("unknown network protocol"), - }; - - Ok(p) - } -} - -#[derive(SimpleObject, Debug, Clone, PartialEq)] -pub struct Agreement { - pub id: String, - pub signed_payload: String, - pub protocol_network: String, -} - -#[derive(SimpleObject, Debug, Clone, PartialEq)] -pub struct Cancellation { - pub signed_payload: String, - pub protocol_network: String, -} - -impl TryInto for (Uuid, SignedIndexingAgreementVoucher) { - type Error = anyhow::Error; - - fn try_into(self) -> Result { - let signed_payload = STANDARD.encode(self.1.encode_vec()); - let metadata = self.1.voucher.metadata; - let metadata: SubgraphIndexingVoucherMetadata = - SubgraphIndexingVoucherMetadata::decode(&mut metadata.as_ref())?; - - Ok(Agreement { - id: self.0.to_string(), - signed_payload, - protocol_network: metadata.protocolNetwork.to_string(), - }) - } -} - -impl TryInto for Agreement { - type Error = anyhow::Error; - - fn try_into(self) -> Result { - let rlp_bytes = STANDARD.decode(self.signed_payload)?; - let signed_voucher = SignedIndexingAgreementVoucher::decode(&mut rlp_bytes.as_ref())?; - - Ok(signed_voucher) - } -} - -#[derive(SimpleObject, Debug, Clone)] -pub struct Price { - price_per_block: String, - chain_id: String, - protocol_network: String, -} - -#[derive(Debug)] -pub struct AgreementQuery {} - -#[Object] -impl AgreementQuery { - pub async fn get_agreement_by_id<'a>( - &self, - ctx: &'a Context<'_>, - id: String, - ) -> FieldResult> { - let store: &Arc = ctx.data()?; - let id = Uuid::from_str(&id)?; - - match store - .get_by_id(id) - .await - .map_err(async_graphql::Error::from)? - { - Some(a) => (id, a) - .try_into() - .map(Some) - .map_err(async_graphql::Error::from), - None => Ok(None), - } - } - - pub async fn get_price<'a>( - &self, - ctx: &'a Context<'_>, - protocol_network: String, - chain_id: String, - ) -> FieldResult> { - let prices: &Vec = ctx.data()?; - - let p = prices - .iter() - .find(|p| p.protocol_network.eq(&protocol_network) && p.chain_id.eq(&chain_id)); - - Ok(p.cloned()) - } - - pub async fn get_all_prices<'a>(&self, ctx: &'a Context<'_>) -> FieldResult> { - let prices: &Vec = ctx.data()?; - - Ok(prices.clone()) - } -} - -#[derive(Debug)] -pub struct AgreementMutation { - pub expected_payee: Address, - pub allowed_payers: Vec
, - pub domain: Eip712Domain, - pub cancel_voucher_time_tolerance: Duration, -} - -#[Object] -impl AgreementMutation { - // create_agreements returns the UUID under which the agreement was stored. - pub async fn create_agreement<'a>( - &self, - ctx: &'a Context<'_>, - // uuid v7 that this agreement should use. - id: String, - // data should be the signed voucher, eip712 signed, rlp and base64 encoded. - signed_voucher: String, - ) -> FieldResult { - let store: &Arc = ctx.data()?; - let uid = Uuid::from_str(&id)?; - - validate_and_create_agreement( - store.clone(), - &self.domain, - uid, - &self.expected_payee, - &self.allowed_payers, - signed_voucher, - ) - .await - .map_err(async_graphql::Error::from)?; - - Ok(id) - } - - pub async fn cancel_agreement<'a>( - &self, - ctx: &'a Context<'_>, - id: String, - // data should be the signed voucher, eip712 signed, rlp and base64 encoded. - signed_request: String, - ) -> FieldResult { - let store: &Arc = ctx.data()?; - let uid = Uuid::from_str(&id)?; - - validate_and_cancel_agreement( - store.clone(), - &self.domain, - uid, - signed_request.clone(), - self.cancel_voucher_time_tolerance, - ) - .await - .map_err(async_graphql::Error::from)?; - - Ok(id) - } -} - -async fn validate_and_create_agreement( - store: Arc, - domain: &Eip712Domain, - id: Uuid, - expected_payee: &Address, - allowed_payers: impl AsRef<[Address]>, - agreement: String, -) -> anyhow::Result { - let rlp_bs = STANDARD.decode(agreement.clone())?; - let voucher = SignedIndexingAgreementVoucher::decode(&mut rlp_bs.as_ref())?; - let metadata = SubgraphIndexingVoucherMetadata::decode(&mut voucher.voucher.metadata.as_ref())?; - - voucher.validate(domain, expected_payee, allowed_payers)?; - - store - .create_agreement(id, voucher, metadata.protocolNetwork.to_string()) - .await?; - - Ok(id) -} - -async fn validate_and_cancel_agreement( - store: Arc, - domain: &Eip712Domain, - id: Uuid, - agreement: String, - time_tolerance: Duration, -) -> anyhow::Result { - let rlp_bs = STANDARD.decode(agreement.clone())?; - let voucher = SignedCancellationRequest::decode(&mut rlp_bs.as_ref())?; - - voucher.validate(domain, time_tolerance)?; - - store.cancel_agreement(id, voucher).await?; - - Ok(id) -} - -#[cfg(test)] -mod test { - use std::sync::Arc; - - use base64::{engine::general_purpose::STANDARD, Engine}; - use indexer_dips::{IndexingAgreementVoucher, SubgraphIndexingVoucherMetadata}; - use thegraph_core::{ - alloy::{ - primitives::{Address, FixedBytes, U256}, - rlp, - signers::local::PrivateKeySigner, - }, - attestation::eip712_domain, - }; - use uuid::Uuid; - - use crate::database::dips::{AgreementStore, InMemoryAgreementStore}; - - #[tokio::test] - async fn test_validate_and_create_agreement() -> anyhow::Result<()> { - let deployment_id = "Qmbg1qF4YgHjiVfsVt6a13ddrVcRtWyJQfD4LA3CwHM29f".to_string(); - let payee = PrivateKeySigner::random(); - let payee_addr = payee.address(); - let payer = PrivateKeySigner::random(); - let payer_addr = payer.address(); - - let metadata = SubgraphIndexingVoucherMetadata { - pricePerBlock: U256::from(10000_u64), - protocolNetwork: FixedBytes::left_padding_from("arbitrum-one".as_bytes()), - chainId: FixedBytes::left_padding_from("mainnet".as_bytes()), - deployment_ipfs_hash: deployment_id, - }; - - let voucher = IndexingAgreementVoucher { - payer: payer_addr, - payee: payee_addr, - service: Address(FixedBytes::ZERO), - maxInitialAmount: U256::from(10000_u64), - maxOngoingAmountPerEpoch: U256::from(10000_u64), - deadline: 1000, - maxEpochsPerCollection: 1000, - minEpochsPerCollection: 1000, - durationEpochs: 1000, - metadata: rlp::encode(metadata).into(), - }; - let domain = eip712_domain(0, Address::ZERO); - - let voucher = voucher.sign(&domain, payer)?; - let rlp_voucher = rlp::encode(voucher.clone()); - let b64 = STANDARD.encode(rlp_voucher); - let id = Uuid::now_v7(); - - let store = Arc::new(InMemoryAgreementStore::default()); - - let actual_id = super::validate_and_create_agreement( - store.clone(), - &domain, - id, - &payee_addr, - vec![payer_addr], - b64, - ) - .await - .unwrap(); - - let actual = store.get_by_id(actual_id).await.unwrap(); - - let actual_voucher = actual.unwrap(); - assert_eq!(voucher, actual_voucher); - Ok(()) - } -} diff --git a/crates/service/src/routes/mod.rs b/crates/service/src/routes/mod.rs index c68fef04..c9a4a50a 100644 --- a/crates/service/src/routes/mod.rs +++ b/crates/service/src/routes/mod.rs @@ -2,7 +2,6 @@ // SPDX-License-Identifier: Apache-2.0 pub mod cost; -pub mod dips; mod health; mod request_handler; mod static_subgraph; diff --git a/crates/service/src/service.rs b/crates/service/src/service.rs index eaa0615a..194b112e 100644 --- a/crates/service/src/service.rs +++ b/crates/service/src/service.rs @@ -1,12 +1,18 @@ // Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 -use std::{net::SocketAddr, time::Duration}; +use core::time; +use std::{net::SocketAddr, sync::Arc, time::Duration}; use anyhow::anyhow; use axum::{extract::Request, serve, ServiceExt}; -use clap::Parser; -use indexer_config::{Config, GraphNodeConfig, SubgraphConfig}; +use indexer_config::{Config, DipsConfig, GraphNodeConfig, SubgraphConfig}; +use indexer_dips::{ + proto::graphprotocol::indexer::dips::agreement_service_server::{ + AgreementService, AgreementServiceServer, + }, + server::DipsServer, +}; use indexer_monitor::{DeploymentDetails, SubgraphClient}; use release::IndexerServiceRelease; use reqwest::Url; @@ -14,7 +20,13 @@ use tap_core::tap_eip712_domain; use tokio::{net::TcpListener, signal}; use tower_http::normalize_path::NormalizePath; -use crate::{cli::Cli, database, metrics::serve_metrics}; +use crate::{ + cli::Cli, + database::{self, dips::PsqlAgreementStore}, + metrics::serve_metrics, +}; +use clap::Parser; +use tracing::info; mod release; mod router; @@ -89,16 +101,16 @@ pub async fn run() -> anyhow::Result<()> { ); let host_and_port = config.service.host_and_port; + let indexer_address = config.indexer.indexer_address; let router = ServiceRouter::builder() - .database(database) - .domain_separator(domain_separator) + .database(database.clone()) + .domain_separator(domain_separator.clone()) .graph_node(config.graph_node) .http_client(http_client) .release(release) .indexer(config.indexer) .service(config.service) - .dips(config.dips) .blockchain(config.blockchain) .timestamp_buffer_secs(config.tap.rav_request.timestamp_buffer_secs) .network_subgraph(network_subgraph, config.subgraphs.network) @@ -111,6 +123,36 @@ pub async fn run() -> anyhow::Result<()> { address = %host_and_port, "Serving requests", ); + if let Some(dips) = config.dips.as_ref() { + let DipsConfig { + host, + port, + allowed_payers, + cancellation_time_tolerance, + } = dips; + + let addr = format!("{}:{}", host, port) + .parse() + .expect("invalid dips host port"); + + let dips = DipsServer { + agreement_store: Arc::new(PsqlAgreementStore { pool: database }), + expected_payee: indexer_address, + allowed_payers: allowed_payers.clone(), + domain: domain_separator, + cancel_voucher_time_tolerance: cancellation_time_tolerance + .unwrap_or(time::Duration::from_secs(60 * 5)), + }; + + info!("starting dips grpc server on {}", addr); + + tokio::spawn(async move { + info!("starting dips grpc server on {}", addr); + + start_dips_server(addr, dips).await; + }); + } + let listener = TcpListener::bind(&host_and_port) .await .expect("Failed to bind to indexer-service port"); @@ -123,6 +165,13 @@ pub async fn run() -> anyhow::Result<()> { .with_graceful_shutdown(shutdown_handler()) .await?) } +async fn start_dips_server(addr: SocketAddr, service: impl AgreementService) { + tonic::transport::Server::builder() + .add_service(AgreementServiceServer::new(service)) + .serve(addr) + .await + .expect("unable to start dips grpc"); +} async fn create_subgraph_client( http_client: reqwest::Client, diff --git a/crates/service/src/service/router.rs b/crates/service/src/service/router.rs index 8768c7bb..69caa62d 100644 --- a/crates/service/src/service/router.rs +++ b/crates/service/src/service/router.rs @@ -13,8 +13,8 @@ use axum::{ }; use governor::{clock::QuantaInstant, middleware::NoOpMiddleware}; use indexer_config::{ - BlockchainConfig, DipsConfig, EscrowSubgraphConfig, GraphNodeConfig, IndexerConfig, - NetworkSubgraphConfig, ServiceConfig, ServiceTapConfig, + BlockchainConfig, EscrowSubgraphConfig, GraphNodeConfig, IndexerConfig, NetworkSubgraphConfig, + ServiceConfig, ServiceTapConfig, }; use indexer_monitor::{ attestation_signers, deployment_to_allocation, dispute_manager, escrow_accounts, @@ -38,7 +38,6 @@ use typed_builder::TypedBuilder; use super::{release::IndexerServiceRelease, GraphNodeState}; use crate::{ - database::dips::{AgreementStore, InMemoryAgreementStore}, metrics::{FAILED_RECEIPT, HANDLER_HISTOGRAM}, middleware::{ allocation_middleware, attestation_middleware, @@ -47,11 +46,7 @@ use crate::{ sender_middleware, signer_middleware, AllocationState, AttestationState, PrometheusMetricsMiddlewareLayer, SenderState, }, - routes::{ - self, - dips::{self, Price}, - health, request_handler, static_subgraph_request_handler, - }, + routes::{self, health, request_handler, static_subgraph_request_handler}, tap::IndexerTapContext, wallet::public_key, }; @@ -74,8 +69,6 @@ pub struct ServiceRouter { service: ServiceConfig, blockchain: BlockchainConfig, timestamp_buffer_secs: Duration, - #[builder(default)] - dips: Option, // either provide subgraph or watcher #[builder(default, setter(transform = @@ -133,24 +126,6 @@ impl ServiceRouter { // STATUS let post_status = post(routes::status); - // DIPS - let agreement_store: Arc = Arc::new(InMemoryAgreementStore::default()); - let prices: Vec = vec![]; - - let dips = match self.dips.as_ref() { - Some(dips_config) => { - let schema = dips::build_schema( - indexer_address, - dips_config, - &self.blockchain, - agreement_store, - prices, - ); - Router::new().route(DEFAULT_ROUTE, post_service(GraphQL::new(schema))) - } - None => Router::new(), - }; - // Monitor the indexer's own allocations // if not provided, create monitor from subgraph let allocations = match (self.allocations, self.network_subgraph.as_ref()) { @@ -407,7 +382,6 @@ impl ServiceRouter { .nest("/version", version) .nest("/escrow", serve_escrow_subgraph) .nest("/network", serve_network_subgraph) - .nest("/dips", dips) .route( "/subgraph/health/:deployment_id", get(health).with_state(graphnode_state.clone()), diff --git a/crates/test-assets/src/lib.rs b/crates/test-assets/src/lib.rs index a475d3fa..9d7328f1 100644 --- a/crates/test-assets/src/lib.rs +++ b/crates/test-assets/src/lib.rs @@ -70,7 +70,6 @@ macro_rules! assert_while_retry { // - (createdAtEpoch-1, 1) // // Using https://github.com/graphprotocol/indexer/blob/f8786c979a8ed0fae93202e499f5ce25773af473/packages/indexer-common/src/allocations/keys.ts#L41-L71 - pub const ESCROW_QUERY_RESPONSE: &str = r#" { "data": {