From 928f85c3384e35afd8694822169e16a55839ef14 Mon Sep 17 00:00:00 2001 From: Theo Butler Date: Tue, 14 May 2024 10:57:27 -0400 Subject: [PATCH] await tx completion --- Cargo.lock | 259 ++++++++++++++++++++++++++++++++++++++++++------ Cargo.toml | 5 +- src/main.rs | 45 ++++++--- src/receipts.rs | 4 +- 4 files changed, 267 insertions(+), 46 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d4c911f..4758a42 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1130,7 +1130,7 @@ dependencies = [ "proc-macro2", "quote", "regex", - "reqwest", + "reqwest 0.11.27", "serde", "serde_json", "syn 2.0.63", @@ -1192,7 +1192,7 @@ checksum = "e79e5973c26d4baf0ce55520bd732314328cabe53193286671b47144145b9649" dependencies = [ "chrono", "ethers-core", - "reqwest", + "reqwest 0.11.27", "semver 1.0.23", "serde", "serde_json", @@ -1217,7 +1217,7 @@ dependencies = [ "futures-locks", "futures-util", "instant", - "reqwest", + "reqwest 0.11.27", "serde", "serde_json", "thiserror", @@ -1244,12 +1244,12 @@ dependencies = [ "futures-timer", "futures-util", "hashers", - "http", + "http 0.2.12", "instant", "jsonwebtoken", "once_cell", "pin-project", - "reqwest", + "reqwest 0.11.27", "serde", "serde_json", "thiserror", @@ -1612,7 +1612,26 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http", + "http 0.2.12", + "indexmap 2.2.6", + "slab", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "h2" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "816ec7294445779408f36fe57bc5b7fc1cf59664059096c65f905c1c61f58069" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http 1.1.0", "indexmap 2.2.6", "slab", "tokio", @@ -1703,6 +1722,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http-body" version = "0.4.6" @@ -1710,7 +1740,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" dependencies = [ "bytes", - "http", + "http 0.2.12", + "pin-project-lite", +] + +[[package]] +name = "http-body" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" +dependencies = [ + "bytes", + "http 1.1.0", +] + +[[package]] +name = "http-body-util" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0475f8b2ac86659c21b64320d5d653f9efe42acd2a4e560073ec61a155a34f1d" +dependencies = [ + "bytes", + "futures-core", + "http 1.1.0", + "http-body 1.0.0", "pin-project-lite", ] @@ -1736,9 +1789,9 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", - "http", - "http-body", + "h2 0.3.26", + "http 0.2.12", + "http-body 0.4.6", "httparse", "httpdate", "itoa", @@ -1750,6 +1803,26 @@ dependencies = [ "want", ] +[[package]] +name = "hyper" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe575dd17d0862a9a33781c8c4696a55c320909004a67a00fb286ba8b1bc496d" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "h2 0.4.4", + "http 1.1.0", + "http-body 1.0.0", + "httparse", + "itoa", + "pin-project-lite", + "smallvec", + "tokio", + "want", +] + [[package]] name = "hyper-rustls" version = "0.24.2" @@ -1757,8 +1830,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" dependencies = [ "futures-util", - "http", - "hyper", + "http 0.2.12", + "hyper 0.14.28", "rustls", "tokio", "tokio-rustls", @@ -1771,12 +1844,48 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" dependencies = [ "bytes", - "hyper", + "hyper 0.14.28", "native-tls", "tokio", "tokio-native-tls", ] +[[package]] +name = "hyper-tls" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" +dependencies = [ + "bytes", + "http-body-util", + "hyper 1.3.1", + "hyper-util", + "native-tls", + "tokio", + "tokio-native-tls", + "tower-service", +] + +[[package]] +name = "hyper-util" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca38ef113da30126bbff9cd1705f9273e15d45498615d138b0c20279ac7a76aa" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "hyper 1.3.1", + "pin-project-lite", + "socket2", + "tokio", + "tower", + "tower-service", + "tracing", +] + [[package]] name = "iana-time-zone" version = "0.1.60" @@ -2889,12 +2998,12 @@ dependencies = [ "encoding_rs", "futures-core", "futures-util", - "h2", - "http", - "http-body", - "hyper", + "h2 0.3.26", + "http 0.2.12", + "http-body 0.4.6", + "hyper 0.14.28", "hyper-rustls", - "hyper-tls", + "hyper-tls 0.5.0", "ipnet", "js-sys", "log", @@ -2904,7 +3013,7 @@ dependencies = [ "percent-encoding", "pin-project-lite", "rustls", - "rustls-pemfile", + "rustls-pemfile 1.0.4", "serde", "serde_json", "serde_urlencoded", @@ -2919,7 +3028,49 @@ dependencies = [ "wasm-bindgen-futures", "web-sys", "webpki-roots", - "winreg", + "winreg 0.50.0", +] + +[[package]] +name = "reqwest" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "566cafdd92868e0939d3fb961bd0dc25fcfaaed179291093b3d43e6b3150ea10" +dependencies = [ + "base64 0.22.1", + "bytes", + "encoding_rs", + "futures-core", + "futures-util", + "h2 0.4.4", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "hyper 1.3.1", + "hyper-tls 0.6.0", + "hyper-util", + "ipnet", + "js-sys", + "log", + "mime", + "native-tls", + "once_cell", + "percent-encoding", + "pin-project-lite", + "rustls-pemfile 2.1.2", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper", + "system-configuration", + "tokio", + "tokio-native-tls", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "winreg 0.52.0", ] [[package]] @@ -3087,6 +3238,22 @@ dependencies = [ "base64 0.21.7", ] +[[package]] +name = "rustls-pemfile" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29993a25686778eb88d4189742cd713c9bce943bc54251a33509dc63cbacf73d" +dependencies = [ + "base64 0.22.1", + "rustls-pki-types", +] + +[[package]] +name = "rustls-pki-types" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "976295e77ce332211c0d24d92c0e83e50f5c5f046d11082cea19f3df13a3562d" + [[package]] name = "rustls-webpki" version = "0.101.7" @@ -3601,7 +3768,7 @@ dependencies = [ "fs2", "hex", "once_cell", - "reqwest", + "reqwest 0.11.27", "semver 1.0.23", "serde", "serde_json", @@ -3686,7 +3853,8 @@ dependencies = [ "chrono", "ethers", "rdkafka", - "reqwest", + "reqwest 0.11.27", + "reqwest 0.12.4", "serde", "serde_json", "serde_with", @@ -3722,9 +3890,9 @@ dependencies = [ [[package]] name = "thegraph-core" -version = "0.2.3" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04bc4de6918f6436f78300b2f4a974cbcaa869dc63d47f492e160da99986a076" +checksum = "cff46d3b5fbf4a774c59ef9fdc1010faff8e721d0422be39126f6acd5cefbe5d" dependencies = [ "alloy-primitives", "alloy-sol-types", @@ -3732,7 +3900,7 @@ dependencies = [ "ethers-core", "indoc", "lazy_static", - "reqwest", + "reqwest 0.12.4", "serde", "serde_json", "serde_with", @@ -3744,12 +3912,12 @@ dependencies = [ [[package]] name = "thegraph-graphql-http" -version = "0.1.1" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2145a61657c371657e8540bfeb5fae38f5c4495880d6b78415c5eae34364ea9f" +checksum = "c5b7231f969a5168347a7efac9f92cc83a4ada280238139ade4d82d45360b99c" dependencies = [ "async-trait", - "reqwest", + "reqwest 0.12.4", "serde", "serde_json", "thiserror", @@ -3973,6 +4141,28 @@ dependencies = [ "winnow 0.6.8", ] +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "pin-project", + "pin-project-lite", + "tokio", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" + [[package]] name = "tower-service" version = "0.3.2" @@ -3985,6 +4175,7 @@ version = "0.1.40" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ + "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -4061,7 +4252,7 @@ dependencies = [ "byteorder", "bytes", "data-encoding", - "http", + "http 0.2.12", "httparse", "log", "rand", @@ -4509,6 +4700,16 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "winreg" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a277a57398d4bfa075df44f501a17cfdf8542d224f0d36095a2adc7aee4ef0a5" +dependencies = [ + "cfg-if", + "windows-sys 0.48.0", +] + [[package]] name = "ws_stream_wasm" version = "0.7.4" diff --git a/Cargo.toml b/Cargo.toml index 3d9480d..a82e54b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,12 +15,13 @@ rdkafka = { version = "0.36.0", features = [ "gssapi", "tracing", ] } -reqwest = { version = "0.11.27", features = ["json"] } +reqwest = { version = "0.12.4" } +reqwest_old = { package = "reqwest", version = "0.11.27" } serde = { version = "1.0.190", features = ["derive"] } serde_json = { version = "1.0.108", features = ["raw_value"] } serde_with = "3.4.0" snmalloc-rs = "0.3.4" -thegraph-core = { version = "0.2.3", features = ["subgraph-client"] } +thegraph-core = { version = "0.4.1", features = ["subgraph-client"] } tokio = { version = "1.32.0", default-features = false, features = [ "macros", "rt-multi-thread", diff --git a/src/main.rs b/src/main.rs index 7ef4cbc..927e0ed 100644 --- a/src/main.rs +++ b/src/main.rs @@ -43,11 +43,13 @@ async fn main() -> anyhow::Result<()> { LocalWallet::from_bytes(config.secret_key.as_slice())?.with_chain_id(config.chain_id); let sender_address = wallet.address(); tracing::info!(%sender_address); - let http_client = reqwest::ClientBuilder::new() - .tcp_nodelay(true) - .timeout(Duration::from_secs(10)) - .build()?; - let provider = Provider::new(Http::new_with_client(config.rpc_url.clone(), http_client)); + + let provider = Provider::new(Http::new_with_client( + config.rpc_url.clone(), + reqwest_old::ClientBuilder::new() + .timeout(Duration::from_secs(10)) + .build()?, + )); let provider = Arc::new(SignerMiddleware::new(provider, wallet)); let contract = Escrow::new( ethers::abi::Address::from(config.escrow_contract.0 .0), @@ -85,6 +87,7 @@ async fn main() -> anyhow::Result<()> { } }; receivers.extend(escrow_accounts.keys()); + tracing::debug!(receivers = receivers.len()); let debts = debts.borrow(); let adjustments: Vec<(Address, u128)> = receivers @@ -118,12 +121,30 @@ async fn main() -> anyhow::Result<()> { let amounts: Vec = adjustments.iter().map(|(_, a)| U256::from(*a)).collect(); let tx = contract.deposit_many(receivers, amounts); - let result = tx.send().await; - if let Err(contract_call_err) = result { - let revert = contract_call_err.decode_contract_revert::(); - tracing::error!(%contract_call_err, ?revert); - continue; + let pending = match tx.send().await { + Ok(pending) => pending, + Err(contract_call_err) => { + let revert = contract_call_err.decode_contract_revert::(); + tracing::error!(%contract_call_err, ?revert); + continue; + } + }; + let completion = match pending.await { + Ok(completion) => completion, + Err(pending_tx_err) => { + tracing::error!(%pending_tx_err); + continue; + } + }; + if let Some(latest_block) = completion.and_then(|c| c.block_number) { + escrow_subgraph = SubgraphClient::builder( + escrow_subgraph.http_client, + escrow_subgraph.subgraph_url, + ) + .with_subgraph_latest_block(latest_block.as_u64()) + .build(); } + tracing::info!("adjustments complete"); } } @@ -162,7 +183,7 @@ async fn active_indexers( id: Address, } Ok(network_subgraph - .paginated_query::(query) + .paginated_query::(query, 200) .await .map_err(|err| anyhow!(err))? .into_iter() @@ -206,7 +227,7 @@ async fn escrow_accounts( id: Address, } Ok(escrow_subgraph - .paginated_query::(query) + .paginated_query::(query, 200) .await .map_err(|err| anyhow!(err))? .into_iter() diff --git a/src/receipts.rs b/src/receipts.rs index dbd4d8a..bb9e276 100644 --- a/src/receipts.rs +++ b/src/receipts.rs @@ -34,15 +34,13 @@ pub async fn track_receipts( let mut consumer: StreamConsumer = client_config.create()?; consumer.subscribe(&[&config.topic])?; - let (tx, mut rx) = watch::channel(Default::default()); + let (tx, rx) = watch::channel(Default::default()); tokio::spawn(async move { if let Err(kafka_consumer_err) = process_messages(&mut consumer, db, tx, graph_env).await { tracing::error!(%kafka_consumer_err); } }); - rx.wait_for(|debts: &HashMap| !debts.is_empty()) - .await?; Ok(rx) }