diff --git a/.sqlx/query-4c857a980b1eddc4fa85ccad223762d85614c249e06180e0b8dda8e14eff93f4.json b/.sqlx/query-4c857a980b1eddc4fa85ccad223762d85614c249e06180e0b8dda8e14eff93f4.json new file mode 100644 index 00000000..1737fd9d --- /dev/null +++ b/.sqlx/query-4c857a980b1eddc4fa85ccad223762d85614c249e06180e0b8dda8e14eff93f4.json @@ -0,0 +1,68 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT * FROM scalar_tap_ravs WHERE last = true;\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "sender_address", + "type_info": "Bpchar" + }, + { + "ordinal": 1, + "name": "signature", + "type_info": "Bytea" + }, + { + "ordinal": 2, + "name": "allocation_id", + "type_info": "Bpchar" + }, + { + "ordinal": 3, + "name": "timestamp_ns", + "type_info": "Numeric" + }, + { + "ordinal": 4, + "name": "value_aggregate", + "type_info": "Numeric" + }, + { + "ordinal": 5, + "name": "last", + "type_info": "Bool" + }, + { + "ordinal": 6, + "name": "final", + "type_info": "Bool" + }, + { + "ordinal": 7, + "name": "created_at", + "type_info": "Timestamptz" + }, + { + "ordinal": 8, + "name": "updated_at", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false, + false, + false, + false, + false, + false, + false, + true, + true + ] + }, + "hash": "4c857a980b1eddc4fa85ccad223762d85614c249e06180e0b8dda8e14eff93f4" +} diff --git a/.sqlx/query-661b961e286d8591d1eaaf29d3bdf5748c361f3d4c4136b8d5b3a3c3af5fcc9e.json b/.sqlx/query-661b961e286d8591d1eaaf29d3bdf5748c361f3d4c4136b8d5b3a3c3af5fcc9e.json new file mode 100644 index 00000000..65352b48 --- /dev/null +++ b/.sqlx/query-661b961e286d8591d1eaaf29d3bdf5748c361f3d4c4136b8d5b3a3c3af5fcc9e.json @@ -0,0 +1,68 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT * FROM scalar_tap_ravs WHERE last = true;\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "sender_address", + "type_info": "Bpchar" + }, + { + "ordinal": 1, + "name": "signature", + "type_info": "Bytea" + }, + { + "ordinal": 2, + "name": "allocation_id", + "type_info": "Bpchar" + }, + { + "ordinal": 3, + "name": "timestamp_ns", + "type_info": "Numeric" + }, + { + "ordinal": 4, + "name": "value_aggregate", + "type_info": "Numeric" + }, + { + "ordinal": 5, + "name": "last", + "type_info": "Bool" + }, + { + "ordinal": 6, + "name": "final", + "type_info": "Bool" + }, + { + "ordinal": 7, + "name": "created_at", + "type_info": "Timestamptz" + }, + { + "ordinal": 8, + "name": "updated_at", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false, + false, + false, + false, + false, + false, + false, + true, + true + ] + }, + "hash": "661b961e286d8591d1eaaf29d3bdf5748c361f3d4c4136b8d5b3a3c3af5fcc9e" +} diff --git a/Cargo.lock b/Cargo.lock index 788b3948..5e7a632b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -364,7 +364,7 @@ dependencies = [ "lru", "parking_lot", "pin-project 1.1.8", - "reqwest 0.12.12", + "reqwest 0.12.9", "schnellru", "serde", "serde_json", @@ -390,7 +390,7 @@ dependencies = [ "serde_json", "tokio", "tokio-stream", - "tower 0.5.2", + "tower 0.5.1", "tracing", ] @@ -431,12 +431,12 @@ dependencies = [ "alloy-transport-ws", "futures", "pin-project 1.1.8", - "reqwest 0.12.12", + "reqwest 0.12.9", "serde", "serde_json", "tokio", "tokio-stream", - "tower 0.5.2", + "tower 0.5.1", "tracing", "url", "wasmtimer", @@ -690,7 +690,7 @@ dependencies = [ "serde_json", "thiserror 2.0.3", "tokio", - "tower 0.5.2", + "tower 0.5.1", "tracing", "url", "wasmtimer", @@ -704,9 +704,9 @@ checksum = "2ed40eb1e1265b2911512f6aa1dcece9702d078f5a646730c45e39e2be00ac1c" dependencies = [ "alloy-json-rpc", "alloy-transport", - "reqwest 0.12.12", + "reqwest 0.12.9", "serde_json", - "tower 0.5.2", + "tower 0.5.1", "tracing", "url", ] @@ -750,9 +750,9 @@ dependencies = [ [[package]] name = "alloy-trie" -version = "0.7.6" +version = "0.7.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a5fd8fea044cc9a8c8a50bb6f28e31f0385d820f116c5b98f6f4e55d6e5590b" +checksum = "6917c79e837aa7b77b7a6dae9f89cbe15313ac161c4d3cfaf8909ef21f3d22d8" dependencies = [ "alloy-primitives", "alloy-rlp", @@ -830,9 +830,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.95" +version = "1.0.94" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34ac096ce696dc2fcabef30516bb13c0a68a11d30131d3df6f04711467681b04" +checksum = "c1fd03a028ef38ba2276dce7e33fcd6369c158a1bca17946c4b1b701891c1ff7" [[package]] name = "anymap3" @@ -1445,7 +1445,7 @@ dependencies = [ "sync_wrapper 1.0.2", "tokio", "tokio-tungstenite", - "tower 0.5.2", + "tower 0.5.1", "tower-layer", "tower-service", "tracing", @@ -1489,7 +1489,7 @@ dependencies = [ "mime", "pin-project-lite", "serde", - "tower 0.5.2", + "tower 0.5.1", "tower-layer", "tower-service", ] @@ -1676,7 +1676,17 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97493a391b4b18ee918675fb8663e53646fd09321c58b46afa04e8ce2499c869" dependencies = [ - "bon-macros", + "bon-macros 2.3.0", + "rustversion", +] + +[[package]] +name = "bon" +version = "3.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe7acc34ff59877422326db7d6f2d845a582b16396b6b08194942bf34c6528ab" +dependencies = [ + "bon-macros 3.3.2", "rustversion", ] @@ -1693,6 +1703,21 @@ dependencies = [ "syn 2.0.90", ] +[[package]] +name = "bon-macros" +version = "3.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4159dd617a7fbc9be6a692fe69dc2954f8e6bb6bb5e4d7578467441390d77fd0" +dependencies = [ + "darling", + "ident_case", + "prettyplease", + "proc-macro2", + "quote", + "rustversion", + "syn 2.0.90", +] + [[package]] name = "borsh" version = "1.5.3" @@ -2187,7 +2212,7 @@ dependencies = [ "graphql 0.2.0", "itertools 0.12.1", "lazy_static", - "nom", + "nom 5.1.3", "num-bigint 0.2.6", "num-traits", "serde_json", @@ -2551,9 +2576,9 @@ checksum = "c7f84e12ccf0a7ddc17a6c41c93326024c42920d7ee630d04950e6926645c0fe" [[package]] name = "env_logger" -version = "0.11.6" +version = "0.11.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dcaee3d8e3cfc3fd92428d477bc97fc29ec8716d180c0d74c643bb26166660e0" +checksum = "e13fa619b91fb2381732789fc5de83b45675e882f66623b7d8cb4f643017018d" dependencies = [ "env_filter", "log", @@ -2626,17 +2651,6 @@ dependencies = [ "bytes", ] -[[package]] -name = "fastrlp" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce8dba4714ef14b8274c371879b175aa55b16b30f269663f19d576f380018dc4" -dependencies = [ - "arrayvec 0.7.6", - "auto_impl", - "bytes", -] - [[package]] name = "ff" version = "0.13.0" @@ -2901,13 +2915,13 @@ dependencies = [ "once_cell", "prost", "prost-types", - "reqwest 0.12.12", + "reqwest 0.12.9", "secret-vault-value", "serde", "serde_json", "tokio", "tonic", - "tower 0.5.2", + "tower 0.5.1", "tower-layer", "tower-util", "tracing", @@ -3122,6 +3136,10 @@ name = "hashbrown" version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" +dependencies = [ + "ahash 0.8.11", + "allocator-api2", +] [[package]] name = "hashbrown" @@ -3137,11 +3155,11 @@ dependencies = [ [[package]] name = "hashlink" -version = "0.10.0" +version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7382cf6263419f2d8df38c55d7da83da5c18aef87fc7a7fc1fb1e344edfe14c1" +checksum = "6ba4ff7128dee98c7dc9794b6a411377e1404dba1c97deb8d1a55297bd25d8af" dependencies = [ - "hashbrown 0.15.2", + "hashbrown 0.14.5", ] [[package]] @@ -3702,7 +3720,7 @@ dependencies = [ "indexer-attestation", "indexer-query", "indexer-watcher", - "reqwest 0.12.12", + "reqwest 0.12.9", "serde", "serde_json", "test-assets", @@ -3757,8 +3775,8 @@ dependencies = [ "lazy_static", "pin-project 1.1.8", "prometheus", - "reqwest 0.12.12", - "rstest", + "reqwest 0.12.9", + "rstest 0.23.0", "serde", "serde_json", "sqlx", @@ -3771,7 +3789,7 @@ dependencies = [ "tokio-test", "tokio-util", "tonic", - "tower 0.5.2", + "tower 0.5.1", "tower-http", "tower-service", "tower-test", @@ -3791,6 +3809,7 @@ dependencies = [ "async-trait", "axum", "bigdecimal", + "bon 3.3.2", "clap", "eventuals", "futures", @@ -3800,12 +3819,14 @@ dependencies = [ "indexer-config", "indexer-monitor", "indexer-query", + "indexer-tap-agent", "indexer-watcher", "jsonrpsee", "lazy_static", "prometheus", "ractor", - "reqwest 0.12.12", + "reqwest 0.12.9", + "rstest 0.24.0", "ruint", "serde", "serde_json", @@ -4176,6 +4197,7 @@ version = "0.30.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149" dependencies = [ + "cc", "pkg-config", "vcpkg", ] @@ -4371,6 +4393,12 @@ dependencies = [ "unicase", ] +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "miniz_oxide" version = "0.8.0" @@ -4467,6 +4495,16 @@ dependencies = [ "version_check", ] +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + [[package]] name = "nonempty" version = "0.7.0" @@ -4640,9 +4678,9 @@ dependencies = [ [[package]] name = "nybbles" -version = "0.2.1" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95f06be0417d97f81fe4e5c86d7d01b392655a9cac9c19a848aa033e18937b23" +checksum = "8983bb634df7248924ee0c4c3a749609b5abcb082c28fffe3254b3eb3602b307" dependencies = [ "alloy-rlp", "const-hex", @@ -5364,7 +5402,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3dac85875229e230f661ef8b435643b2befbc5d422eaf732ee308eb8ecae829c" dependencies = [ "async-trait", - "bon", + "bon 2.3.0", "dashmap", "futures", "once_cell", @@ -5580,9 +5618,9 @@ dependencies = [ [[package]] name = "reqwest" -version = "0.12.12" +version = "0.12.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43e734407157c3c2034e0258f5e4473ddb361b1e85f95a66690d67264d7cd1da" +checksum = "a77c62af46e79de0a562e1a9849205ffcb7fc1238876e9bd743357570e04046f" dependencies = [ "async-compression", "base64 0.22.1", @@ -5622,7 +5660,6 @@ dependencies = [ "tokio-native-tls", "tokio-rustls 0.26.0", "tokio-util", - "tower 0.5.2", "tower-service", "url", "wasm-bindgen", @@ -5739,7 +5776,19 @@ checksum = "0a2c585be59b6b5dd66a9d2084aa1d8bd52fbdb806eafdeffb52791147862035" dependencies = [ "futures", "futures-timer", - "rstest_macros", + "rstest_macros 0.23.0", + "rustc_version 0.4.1", +] + +[[package]] +name = "rstest" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03e905296805ab93e13c1ec3a03f4b6c4f35e9498a3d5fa96dc626d22c03cd89" +dependencies = [ + "futures-timer", + "futures-util", + "rstest_macros 0.24.0", "rustc_version 0.4.1", ] @@ -5761,20 +5810,36 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "rstest_macros" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef0053bbffce09062bee4bcc499b0fbe7a57b879f1efe088d6d8d4c7adcdef9b" +dependencies = [ + "cfg-if", + "glob", + "proc-macro-crate", + "proc-macro2", + "quote", + "regex", + "relative-path", + "rustc_version 0.4.1", + "syn 2.0.90", + "unicode-ident", +] + [[package]] name = "ruint" -version = "1.12.4" +version = "1.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f5ef8fb1dd8de3870cb8400d51b4c2023854bbafd5431a3ac7e7317243e22d2f" +checksum = "2c3cc4c2511671f327125da14133d0c5c5d137f006a1017a16f557bc85b16286" dependencies = [ "alloy-rlp", "ark-ff 0.3.0", "ark-ff 0.4.2", "bytes", - "fastrlp 0.3.1", - "fastrlp 0.4.0", + "fastrlp", "num-bigint 0.4.6", - "num-integer", "num-traits", "parity-scale-codec", "primitive-types", @@ -6203,18 +6268,18 @@ checksum = "cd0b0ec5f1c1ca621c432a25813d8d60c88abe6d3e08a3eb9cf37d97a0fe3d73" [[package]] name = "serde" -version = "1.0.217" +version = "1.0.216" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02fc4265df13d6fa1d00ecff087228cc0a2b5f3c0e87e258d8b94a156e984c70" +checksum = "0b9781016e935a97e8beecf0c933758c97a5520d32930e460142b4cd80c6338e" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.217" +version = "1.0.216" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a9bf7cf98d04a2b28aead066b7496853d4779c9cc183c440dbac457641e19a0" +checksum = "46f859dbbf73865c6627ed570e78961cd3ac92407a2d117204c49232485da55e" dependencies = [ "proc-macro2", "quote", @@ -6295,9 +6360,9 @@ dependencies = [ [[package]] name = "serde_with" -version = "3.12.0" +version = "3.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6b6f7f2fcb69f747921f79f3926bd1e203fce4fef62c268dd3abfb6d86029aa" +checksum = "8e28bdad6db2b8340e449f7108f020b3b092e8583a9e3fb82713e1d4e71fe817" dependencies = [ "base64 0.22.1", "chrono", @@ -6313,9 +6378,9 @@ dependencies = [ [[package]] name = "serde_with_macros" -version = "3.12.0" +version = "3.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d00caa5193a3c8362ac2b73be6b9e768aa5a4b2f721d8f4b339600c3cb51f8e" +checksum = "9d846214a9854ef724f3da161b426242d8de7c1fc7de2f89bb1efcb154dca79d" dependencies = [ "darling", "proc-macro2", @@ -6522,11 +6587,21 @@ dependencies = [ "der", ] +[[package]] +name = "sqlformat" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7bba3a93db0cc4f7bdece8bb09e77e2e785c20bfebf79eb8340ed80708048790" +dependencies = [ + "nom 7.1.3", + "unicode_categories", +] + [[package]] name = "sqlx" -version = "0.8.3" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4410e73b3c0d8442c5f99b425d7a435b5ee0ae4167b3196771dd3f7a01be745f" +checksum = "93334716a037193fac19df402f8571269c84a00852f6a7066b5d2616dcd64d3e" dependencies = [ "sqlx-core", "sqlx-macros", @@ -6537,34 +6612,40 @@ dependencies = [ [[package]] name = "sqlx-core" -version = "0.8.3" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a007b6936676aa9ab40207cde35daab0a04b823be8ae004368c0793b96a61e0" +checksum = "d4d8060b456358185f7d50c55d9b5066ad956956fddec42ee2e8567134a8936e" dependencies = [ + "atoi", "bigdecimal", + "byteorder", "bytes", "chrono", "crc", "crossbeam-queue", "either", "event-listener", + "futures-channel", "futures-core", "futures-intrusive", "futures-io", "futures-util", - "hashbrown 0.15.2", + "hashbrown 0.14.5", "hashlink", + "hex", "indexmap 2.7.0", "log", "memchr", "once_cell", + "paste", "percent-encoding", "rust_decimal", "serde", "serde_json", "sha2", "smallvec", - "thiserror 2.0.3", + "sqlformat", + "thiserror 1.0.69", "tokio", "tokio-stream", "tracing", @@ -6574,9 +6655,9 @@ dependencies = [ [[package]] name = "sqlx-macros" -version = "0.8.3" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3112e2ad78643fef903618d78cf0aec1cb3134b019730edb039b69eaf531f310" +checksum = "cac0692bcc9de3b073e8d747391827297e075c7710ff6276d9f7a1f3d58c6657" dependencies = [ "proc-macro2", "quote", @@ -6587,9 +6668,9 @@ dependencies = [ [[package]] name = "sqlx-macros-core" -version = "0.8.3" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e9f90acc5ab146a99bf5061a7eb4976b573f560bc898ef3bf8435448dd5e7ad" +checksum = "1804e8a7c7865599c9c79be146dc8a9fd8cc86935fa641d3ea58e5f0688abaa5" dependencies = [ "dotenvy", "either", @@ -6613,9 +6694,9 @@ dependencies = [ [[package]] name = "sqlx-mysql" -version = "0.8.3" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4560278f0e00ce64938540546f59f590d60beee33fffbd3b9cd47851e5fff233" +checksum = "64bb4714269afa44aef2755150a0fc19d756fb580a67db8885608cf02f47d06a" dependencies = [ "atoi", "base64 0.22.1", @@ -6651,7 +6732,7 @@ dependencies = [ "smallvec", "sqlx-core", "stringprep", - "thiserror 2.0.3", + "thiserror 1.0.69", "tracing", "uuid", "whoami", @@ -6659,9 +6740,9 @@ dependencies = [ [[package]] name = "sqlx-postgres" -version = "0.8.3" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5b98a57f363ed6764d5b3a12bfedf62f07aa16e1856a7ddc2a0bb190a959613" +checksum = "6fa91a732d854c5d7726349bb4bb879bb9478993ceb764247660aee25f67c2f8" dependencies = [ "atoi", "base64 0.22.1", @@ -6674,6 +6755,7 @@ dependencies = [ "etcetera", "futures-channel", "futures-core", + "futures-io", "futures-util", "hex", "hkdf", @@ -6693,7 +6775,7 @@ dependencies = [ "smallvec", "sqlx-core", "stringprep", - "thiserror 2.0.3", + "thiserror 1.0.69", "tracing", "uuid", "whoami", @@ -6701,9 +6783,9 @@ dependencies = [ [[package]] name = "sqlx-sqlite" -version = "0.8.3" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f85ca71d3a5b24e64e1d08dd8fe36c6c95c339a896cc33068148906784620540" +checksum = "d5b2cf34a45953bfd3daaf3db0f7a7878ab9b7a6b91b422d24a7a9e4c857b680" dependencies = [ "atoi", "chrono", @@ -6849,9 +6931,9 @@ dependencies = [ [[package]] name = "sysinfo" -version = "0.33.0" +version = "0.33.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "948512566b1895f93b1592c7574baeb2de842f224f2aab158799ecadb8ebbb46" +checksum = "4fc858248ea01b66f19d8e8a6d55f41deaf91e9d495246fd01368d99935c6c01" dependencies = [ "core-foundation-sys", "libc", @@ -7013,7 +7095,7 @@ dependencies = [ "proc-macro2", "quote", "regex", - "reqwest 0.12.12", + "reqwest 0.12.9", "syn 2.0.90", "sysinfo", "uzers", @@ -7040,7 +7122,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "80bff1bad9a7c3b210876b5601460cab05d8fa6e8f52481472427ed18bc33975" dependencies = [ "async-trait", - "reqwest 0.12.12", + "reqwest 0.12.9", "serde", "serde_json", "thiserror 1.0.69", @@ -7172,9 +7254,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.42.0" +version = "1.41.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5cec9b21b0450273377fc97bd4c33a8acffc8c996c987a7c5b319a0083707551" +checksum = "22cfb5bee7a6a52939ca9224d6ac897bb669134078daa8735560897f69de4d33" dependencies = [ "backtrace", "bytes", @@ -7273,9 +7355,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.13" +version = "0.7.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7fcaa8d55a2bdd6b83ace262b016eca0d79ee02818c5c1bcdf0305114081078" +checksum = "61e7c3654c13bcd040d4a03abee2c75b1d14a37b423cf5a813ceae1cc903ec6a" dependencies = [ "bytes", "futures-core", @@ -7390,14 +7472,14 @@ dependencies = [ [[package]] name = "tower" -version = "0.5.2" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9" +checksum = "2873938d487c3cfb9aed7546dc9f2711d867c9f90c46b889989a2cb84eba6b4f" dependencies = [ "futures-core", "futures-util", "pin-project-lite", - "sync_wrapper 1.0.2", + "sync_wrapper 0.1.2", "tokio", "tower-layer", "tower-service", @@ -7472,7 +7554,7 @@ dependencies = [ "http 1.1.0", "pin-project 1.1.8", "thiserror 2.0.3", - "tower 0.5.2", + "tower 0.5.1", "tracing", ] @@ -7696,6 +7778,12 @@ version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" +[[package]] +name = "unicode_categories" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e" + [[package]] name = "unreachable" version = "1.0.0" diff --git a/crates/dips/src/lib.rs b/crates/dips/src/lib.rs index da9a5a6e..2c369b87 100644 --- a/crates/dips/src/lib.rs +++ b/crates/dips/src/lib.rs @@ -22,9 +22,8 @@ pub mod server; pub mod store; use store::AgreementStore; -use uuid::Uuid; - use thiserror::Error; +use uuid::Uuid; sol! { // EIP712 encoded bytes, ABI - ethers @@ -264,9 +263,10 @@ pub async fn validate_and_cancel_agreement( #[cfg(test)] mod test { - use std::sync::Arc; - - use std::time::{Duration, SystemTime, UNIX_EPOCH}; + use std::{ + sync::Arc, + time::{Duration, SystemTime, UNIX_EPOCH}, + }; use thegraph_core::{ alloy::{ @@ -276,12 +276,12 @@ mod test { }, attestation::eip712_domain, }; - - use crate::{CancellationRequest, DipsError}; - use crate::{IndexingAgreementVoucher, SubgraphIndexingVoucherMetadata}; use uuid::Uuid; pub use crate::store::{AgreementStore, InMemoryAgreementStore}; + use crate::{ + CancellationRequest, DipsError, IndexingAgreementVoucher, SubgraphIndexingVoucherMetadata, + }; #[tokio::test] async fn test_validate_and_create_agreement() -> anyhow::Result<()> { diff --git a/crates/dips/src/server.rs b/crates/dips/src/server.rs index 24bab8df..f9245400 100644 --- a/crates/dips/src/server.rs +++ b/crates/dips/src/server.rs @@ -3,15 +3,16 @@ use std::{str::FromStr, sync::Arc, time::Duration}; -use crate::{ - proto::graphprotocol::indexer::dips::*, store::AgreementStore, validate_and_cancel_agreement, - validate_and_create_agreement, DipsError, -}; use anyhow::anyhow; use async_trait::async_trait; use thegraph_core::alloy::{dyn_abi::Eip712Domain, primitives::Address}; use uuid::Uuid; +use crate::{ + proto::graphprotocol::indexer::dips::*, store::AgreementStore, validate_and_cancel_agreement, + validate_and_create_agreement, DipsError, +}; + #[derive(Debug)] pub struct DipsServer { pub agreement_store: Arc, diff --git a/crates/service/src/database/dips.rs b/crates/service/src/database/dips.rs index 2709ec03..b90bd367 100644 --- a/crates/service/src/database/dips.rs +++ b/crates/service/src/database/dips.rs @@ -4,11 +4,9 @@ use anyhow::bail; use axum::async_trait; use build_info::chrono::Utc; - use indexer_dips::{ store::AgreementStore, SignedCancellationRequest, SignedIndexingAgreementVoucher, }; - use sqlx::PgPool; use thegraph_core::alloy::rlp::Decodable; use uuid::Uuid; diff --git a/crates/service/src/routes/request_handler.rs b/crates/service/src/routes/request_handler.rs index 23a7ca5e..21455758 100644 --- a/crates/service/src/routes/request_handler.rs +++ b/crates/service/src/routes/request_handler.rs @@ -38,9 +38,7 @@ pub async fn request_handler( let attestable = response .headers() .get(GRAPH_ATTESTABLE) - .map_or(false, |value| { - value.to_str().map(|value| value == "true").unwrap_or(false) - }); + .is_some_and(|value| value.to_str().map(|value| value == "true").unwrap_or(false)); let graph_indexed = response.headers().get(GRAPH_INDEXED).cloned(); let body = response diff --git a/crates/service/src/service.rs b/crates/service/src/service.rs index 194b112e..9d20f0d9 100644 --- a/crates/service/src/service.rs +++ b/crates/service/src/service.rs @@ -6,6 +6,7 @@ use std::{net::SocketAddr, sync::Arc, time::Duration}; use anyhow::anyhow; use axum::{extract::Request, serve, ServiceExt}; +use clap::Parser; use indexer_config::{Config, DipsConfig, GraphNodeConfig, SubgraphConfig}; use indexer_dips::{ proto::graphprotocol::indexer::dips::agreement_service_server::{ @@ -19,14 +20,13 @@ use reqwest::Url; use tap_core::tap_eip712_domain; use tokio::{net::TcpListener, signal}; use tower_http::normalize_path::NormalizePath; +use tracing::info; use crate::{ cli::Cli, database::{self, dips::PsqlAgreementStore}, metrics::serve_metrics, }; -use clap::Parser; -use tracing::info; mod release; mod router; diff --git a/crates/service/src/tap/checks/sender_balance_check.rs b/crates/service/src/tap/checks/sender_balance_check.rs index c9704dcc..a7f1708f 100644 --- a/crates/service/src/tap/checks/sender_balance_check.rs +++ b/crates/service/src/tap/checks/sender_balance_check.rs @@ -40,7 +40,7 @@ impl Check for SenderBalanceCheck { // `tap-agent`. if !escrow_accounts_snapshot .get_balance_for_sender(receipt_sender) - .map_or(false, |balance| balance > U256::ZERO) + .is_ok_and(|balance| balance > U256::ZERO) { return Err(CheckError::Failed(anyhow!( "Receipt sender `{}` does not have a sufficient balance", diff --git a/crates/tap-agent/Cargo.toml b/crates/tap-agent/Cargo.toml index b4be9171..7cb728af 100644 --- a/crates/tap-agent/Cargo.toml +++ b/crates/tap-agent/Cargo.toml @@ -8,6 +8,9 @@ publish = false name = "indexer-tap-agent" path = "src/main.rs" +[features] +test = ["dep:test-assets"] + [dependencies] indexer-monitor = { path = "../monitor" } indexer-watcher = { path = "../watcher" } @@ -45,6 +48,8 @@ ractor = { version = "0.14", features = [ ], default-features = false } tap_aggregator.workspace = true futures = { version = "0.3.30", default-features = false } +bon = "3.3" +test-assets = { path = "../test-assets" , optional=true} [dev-dependencies] tempfile = "3.8.0" @@ -52,3 +57,6 @@ wiremock.workspace = true wiremock-grpc = "0.0.3-alpha3" test-assets = { path = "../test-assets" } test-log = { version = "0.2.12", default-features = false } +bon = "3.3" +rstest = "0.24.0" +indexer-tap-agent = { path = ".", features = ["test"] } diff --git a/crates/tap-agent/src/agent/sender_account.rs b/crates/tap-agent/src/agent/sender_account.rs index de8219ca..bb5fefee 100644 --- a/crates/tap-agent/src/agent/sender_account.rs +++ b/crates/tap-agent/src/agent/sender_account.rs @@ -965,7 +965,7 @@ impl Actor for SenderAccount { tracing::error!("SenderAllocation doesn't have a name"); return Ok(()); }; - let Some(allocation_id) = allocation_id.split(':').last() else { + let Some(allocation_id) = allocation_id.split(':').next_back() else { tracing::error!(%allocation_id, "Could not extract allocation_id from name"); return Ok(()); }; @@ -1003,7 +1003,7 @@ impl Actor for SenderAccount { tracing::error!("SenderAllocation doesn't have a name"); return Ok(()); }; - let Some(allocation_id) = allocation_id.split(':').last() else { + let Some(allocation_id) = allocation_id.split(':').next_back() else { tracing::error!(%allocation_id, "Could not extract allocation_id from name"); return Ok(()); }; @@ -1048,33 +1048,25 @@ impl SenderAccount { pub mod tests { use std::{ collections::{HashMap, HashSet}, - sync::{atomic::AtomicU32, Arc}, + sync::atomic::AtomicU32, time::{Duration, SystemTime, UNIX_EPOCH}, }; - use indexer_monitor::{DeploymentDetails, EscrowAccounts, SubgraphClient}; + use indexer_monitor::EscrowAccounts; use ractor::{call, Actor, ActorRef, ActorStatus}; - use reqwest::Url; use serde_json::json; use sqlx::PgPool; use test_assets::{ flush_messages, ALLOCATION_ID_0, ALLOCATION_ID_1, TAP_SENDER as SENDER, TAP_SIGNER as SIGNER, }; - use thegraph_core::alloy::{ - hex::ToHexExt, - primitives::{Address, U256}, - }; - use tokio::sync::{ - watch::{self, Sender}, - Notify, - }; + use thegraph_core::alloy::{hex::ToHexExt, primitives::U256}; use wiremock::{ matchers::{body_string_contains, method}, - Mock, MockGuard, MockServer, ResponseTemplate, + Mock, MockServer, ResponseTemplate, }; - use super::{SenderAccount, SenderAccountArgs, SenderAccountMessage}; + use super::SenderAccountMessage; use crate::{ agent::{ sender_account::ReceiptFees, sender_allocation::SenderAllocationMessage, @@ -1082,8 +1074,8 @@ pub mod tests { }, assert_not_triggered, assert_triggered, test::{ - actors::{create_mock_sender_allocation, MockSenderAllocation, TestableActor}, - create_rav, get_grpc_url, store_rav_with_options, INDEXER, TAP_EIP712_DOMAIN_SEPARATOR, + actors::{create_mock_sender_allocation, MockSenderAllocation}, + create_rav, create_sender_account, store_rav_with_options, }, }; @@ -1132,17 +1124,17 @@ pub mod tests { } pub static PREFIX_ID: AtomicU32 = AtomicU32::new(0); - const DUMMY_URL: &str = "http://localhost:1234"; const TRIGGER_VALUE: u128 = 500; const ESCROW_VALUE: u128 = 1000; const BUFFER_DURATION: Duration = Duration::from_millis(100); const RECEIPT_LIMIT: u64 = 10000; const RETRY_DURATION: Duration = Duration::from_millis(1000); - async fn mock_escrow_subgraph() -> (MockServer, MockGuard) { - let mock_ecrow_subgraph_server: MockServer = MockServer::start().await; - let _mock_ecrow_subgraph = mock_ecrow_subgraph_server - .register_as_scoped( + #[rstest::fixture] + async fn mock_escrow_subgraph() -> MockServer { + let mock_escrow_subgraph_server: MockServer = MockServer::start().await; + mock_escrow_subgraph_server + .register( Mock::given(method("POST")) .and(body_string_contains("TapTransactions")) .respond_with(ResponseTemplate::new(200).set_body_json(json!({ "data": { @@ -1153,95 +1145,15 @@ pub mod tests { }))), ) .await; - (mock_ecrow_subgraph_server, _mock_ecrow_subgraph) - } - - async fn create_sender_account( - pgpool: PgPool, - initial_allocation: HashSet
, - rav_request_trigger_value: u128, - max_amount_willing_to_lose_grt: u128, - escrow_subgraph_endpoint: &str, - network_subgraph_endpoint: &str, - rav_request_receipt_limit: u64, - ) -> ( - ActorRef, - Arc, - String, - Sender, - ) { - let config = Box::leak(Box::new(super::SenderAccountConfig { - rav_request_buffer: BUFFER_DURATION, - max_amount_willing_to_lose_grt, - trigger_value: rav_request_trigger_value, - rav_request_timeout: Duration::default(), - rav_request_receipt_limit, - indexer_address: INDEXER.1, - escrow_polling_interval: Duration::default(), - tap_sender_timeout: Duration::from_secs(30), - })); - - let network_subgraph = Box::leak(Box::new( - SubgraphClient::new( - reqwest::Client::new(), - None, - DeploymentDetails::for_query_url(network_subgraph_endpoint).unwrap(), - ) - .await, - )); - let escrow_subgraph = Box::leak(Box::new( - SubgraphClient::new( - reqwest::Client::new(), - None, - DeploymentDetails::for_query_url(escrow_subgraph_endpoint).unwrap(), - ) - .await, - )); - let (escrow_accounts_tx, escrow_accounts_rx) = watch::channel(EscrowAccounts::default()); - escrow_accounts_tx - .send(EscrowAccounts::new( - HashMap::from([(SENDER.1, U256::from(ESCROW_VALUE))]), - HashMap::from([(SENDER.1, vec![SIGNER.1])]), - )) - .expect("Failed to update escrow_accounts channel"); - - // Start a new mock aggregator server for this test - - let prefix = format!( - "test-{}", - PREFIX_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst) - ); - - let args = SenderAccountArgs { - config, - pgpool, - sender_id: SENDER.1, - escrow_accounts: escrow_accounts_rx, - indexer_allocations: watch::channel(initial_allocation).1, - escrow_subgraph, - network_subgraph, - domain_separator: TAP_EIP712_DOMAIN_SEPARATOR.clone(), - sender_aggregator_endpoint: Url::parse(&get_grpc_url().await).unwrap(), - allocation_ids: HashSet::new(), - prefix: Some(prefix.clone()), - retry_interval: RETRY_DURATION, - }; - - let actor = TestableActor::new(SenderAccount); - let notify = actor.notify.clone(); - - let (sender, _) = Actor::spawn(Some(prefix.clone()), actor, args) - .await - .unwrap(); - - // flush all messages - flush_messages(¬ify).await; - - (sender, notify, prefix, escrow_accounts_tx) + mock_escrow_subgraph_server } + #[rstest::rstest] #[sqlx::test(migrations = "../../migrations")] - async fn test_update_allocation_ids(pgpool: PgPool) { + async fn test_update_allocation_ids( + #[ignore] pgpool: PgPool, + #[future(awt)] mock_escrow_subgraph: MockServer, + ) { // Start a mock graphql server using wiremock let mock_server = MockServer::start().await; @@ -1263,18 +1175,16 @@ pub mod tests { ) .await; - let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await; - - let (sender_account, notify, prefix, _) = create_sender_account( - pgpool, - HashSet::new(), - TRIGGER_VALUE, - TRIGGER_VALUE, - &mock_escrow_subgraph_server.uri(), - &mock_server.uri(), - RECEIPT_LIMIT, - ) - .await; + let (sender_account, notify, prefix, _) = create_sender_account() + .pgpool(pgpool) + .initial_allocation(HashSet::new()) + .rav_request_trigger_value(TRIGGER_VALUE) + .max_amount_willing_to_lose_grt(TRIGGER_VALUE) + .escrow_subgraph_endpoint(&mock_escrow_subgraph.uri()) + .network_subgraph_endpoint(&mock_server.uri()) + .rav_request_receipt_limit(RECEIPT_LIMIT) + .call() + .await; // we expect it to create a sender allocation sender_account @@ -1328,8 +1238,12 @@ pub mod tests { assert!(actor_ref.is_none()); } + #[rstest::rstest] #[sqlx::test(migrations = "../../migrations")] - async fn test_new_allocation_id(pgpool: PgPool) { + async fn test_new_allocation_id( + #[ignore] pgpool: PgPool, + #[future(awt)] mock_escrow_subgraph: MockServer, + ) { // Start a mock graphql server using wiremock let mock_server = MockServer::start().await; @@ -1351,18 +1265,16 @@ pub mod tests { ) .await; - let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await; - - let (sender_account, notify, prefix, _) = create_sender_account( - pgpool, - HashSet::new(), - TRIGGER_VALUE, - TRIGGER_VALUE, - &mock_escrow_subgraph_server.uri(), - &mock_server.uri(), - RECEIPT_LIMIT, - ) - .await; + let (sender_account, notify, prefix, _) = create_sender_account() + .pgpool(pgpool) + .initial_allocation(HashSet::new()) + .rav_request_trigger_value(TRIGGER_VALUE) + .max_amount_willing_to_lose_grt(TRIGGER_VALUE) + .escrow_subgraph_endpoint(&mock_escrow_subgraph.uri()) + .network_subgraph_endpoint(&mock_server.uri()) + .rav_request_receipt_limit(RECEIPT_LIMIT) + .call() + .await; // we expect it to create a sender allocation sender_account @@ -1442,16 +1354,14 @@ pub mod tests { #[sqlx::test(migrations = "../../migrations")] async fn test_update_receipt_fees_no_rav(pgpool: PgPool) { - let (sender_account, _, prefix, _) = create_sender_account( - pgpool, - HashSet::new(), - TRIGGER_VALUE, - TRIGGER_VALUE, - DUMMY_URL, - DUMMY_URL, - RECEIPT_LIMIT, - ) - .await; + let (sender_account, _, prefix, _) = create_sender_account() + .pgpool(pgpool) + .initial_allocation(HashSet::new()) + .rav_request_trigger_value(TRIGGER_VALUE) + .max_amount_willing_to_lose_grt(TRIGGER_VALUE) + .rav_request_receipt_limit(RECEIPT_LIMIT) + .call() + .await; // create a fake sender allocation let (triggered_rav_request, _, _) = create_mock_sender_allocation( @@ -1477,16 +1387,14 @@ pub mod tests { #[sqlx::test(migrations = "../../migrations")] async fn test_update_receipt_fees_trigger_rav(pgpool: PgPool) { - let (sender_account, notify, prefix, _) = create_sender_account( - pgpool, - HashSet::new(), - TRIGGER_VALUE, - TRIGGER_VALUE, - DUMMY_URL, - DUMMY_URL, - RECEIPT_LIMIT, - ) - .await; + let (sender_account, notify, prefix, _) = create_sender_account() + .pgpool(pgpool) + .initial_allocation(HashSet::new()) + .rav_request_trigger_value(TRIGGER_VALUE) + .max_amount_willing_to_lose_grt(TRIGGER_VALUE) + .rav_request_receipt_limit(RECEIPT_LIMIT) + .call() + .await; // create a fake sender allocation let (triggered_rav_request, _, _) = create_mock_sender_allocation( @@ -1523,16 +1431,14 @@ pub mod tests { #[sqlx::test(migrations = "../../migrations")] async fn test_counter_greater_limit_trigger_rav(pgpool: PgPool) { - let (sender_account, notify, prefix, _) = create_sender_account( - pgpool, - HashSet::new(), - TRIGGER_VALUE, - TRIGGER_VALUE, - DUMMY_URL, - DUMMY_URL, - 2, - ) - .await; + let (sender_account, notify, prefix, _) = create_sender_account() + .pgpool(pgpool.clone()) + .initial_allocation(HashSet::new()) + .rav_request_trigger_value(TRIGGER_VALUE) + .max_amount_willing_to_lose_grt(TRIGGER_VALUE) + .rav_request_receipt_limit(2) + .call() + .await; // create a fake sender allocation let (triggered_rav_request, _, _) = create_mock_sender_allocation( @@ -1575,19 +1481,21 @@ pub mod tests { assert_triggered!(&triggered_rav_request); } + #[rstest::rstest] #[sqlx::test(migrations = "../../migrations")] - async fn test_remove_sender_account(pgpool: PgPool) { - let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await; - let (sender_account, _, prefix, _) = create_sender_account( - pgpool, - vec![ALLOCATION_ID_0].into_iter().collect(), - TRIGGER_VALUE, - TRIGGER_VALUE, - &mock_escrow_subgraph_server.uri(), - DUMMY_URL, - RECEIPT_LIMIT, - ) - .await; + async fn test_remove_sender_account( + #[ignore] pgpool: PgPool, + #[future(awt)] mock_escrow_subgraph: MockServer, + ) { + let (sender_account, _, prefix, _) = create_sender_account() + .pgpool(pgpool) + .initial_allocation(vec![ALLOCATION_ID_0].into_iter().collect()) + .rav_request_trigger_value(TRIGGER_VALUE) + .max_amount_willing_to_lose_grt(TRIGGER_VALUE + 1000) + .escrow_subgraph_endpoint(&mock_escrow_subgraph.uri()) + .rav_request_receipt_limit(RECEIPT_LIMIT) + .call() + .await; // check if allocation exists let sender_allocation_id = format!("{}:{}:{}", prefix.clone(), SENDER.1, ALLOCATION_ID_0); @@ -1627,16 +1535,14 @@ pub mod tests { .await .unwrap(); - let (sender_account, _notify, _, _) = create_sender_account( - pgpool.clone(), - HashSet::new(), - TRIGGER_VALUE, - TRIGGER_VALUE, - DUMMY_URL, - DUMMY_URL, - RECEIPT_LIMIT, - ) - .await; + let (sender_account, _notify, _, _) = create_sender_account() + .pgpool(pgpool.clone()) + .initial_allocation(HashSet::new()) + .rav_request_trigger_value(TRIGGER_VALUE) + .max_amount_willing_to_lose_grt(TRIGGER_VALUE + 1000) + .rav_request_receipt_limit(RECEIPT_LIMIT) + .call() + .await; let deny = call!(sender_account, SenderAccountMessage::GetDeny).unwrap(); assert!(deny); @@ -1647,16 +1553,14 @@ pub mod tests { // we set to zero to block the sender, no matter the fee let max_unaggregated_fees_per_sender: u128 = 0; - let (sender_account, notify, prefix, _) = create_sender_account( - pgpool, - HashSet::new(), - TRIGGER_VALUE, - max_unaggregated_fees_per_sender, - DUMMY_URL, - DUMMY_URL, - RECEIPT_LIMIT, - ) - .await; + let (sender_account, notify, prefix, _) = create_sender_account() + .pgpool(pgpool) + .initial_allocation(HashSet::new()) + .rav_request_trigger_value(TRIGGER_VALUE) + .max_amount_willing_to_lose_grt(max_unaggregated_fees_per_sender) + .rav_request_receipt_limit(RECEIPT_LIMIT) + .call() + .await; let (triggered_rav_request, next_value, _) = create_mock_sender_allocation( prefix, @@ -1696,16 +1600,14 @@ pub mod tests { let max_unaggregated_fees_per_sender: u128 = 1000; // Making sure no RAV is going to be triggered during the test - let (sender_account, notify, _, _) = create_sender_account( - pgpool.clone(), - HashSet::new(), - u128::MAX, - max_unaggregated_fees_per_sender, - DUMMY_URL, - DUMMY_URL, - RECEIPT_LIMIT, - ) - .await; + let (sender_account, notify, _, _) = create_sender_account() + .pgpool(pgpool.clone()) + .initial_allocation(HashSet::new()) + .rav_request_trigger_value(u128::MAX) + .max_amount_willing_to_lose_grt(max_unaggregated_fees_per_sender) + .rav_request_receipt_limit(RECEIPT_LIMIT) + .call() + .await; macro_rules! update_receipt_fees { ($value:expr) => { @@ -1794,16 +1696,14 @@ pub mod tests { .await .unwrap(); - let (sender_account, _notify, _, _) = create_sender_account( - pgpool.clone(), - HashSet::new(), - TRIGGER_VALUE, - u128::MAX, - DUMMY_URL, - DUMMY_URL, - RECEIPT_LIMIT, - ) - .await; + let (sender_account, _notify, _, _) = create_sender_account() + .pgpool(pgpool.clone()) + .initial_allocation(HashSet::new()) + .rav_request_trigger_value(TRIGGER_VALUE) + .max_amount_willing_to_lose_grt(u128::MAX) + .rav_request_receipt_limit(RECEIPT_LIMIT) + .call() + .await; let deny = call!(sender_account, SenderAccountMessage::GetDeny).unwrap(); assert!(deny); @@ -1826,16 +1726,14 @@ pub mod tests { let trigger_rav_request = ESCROW_VALUE * 2; // initialize with no trigger value and no max receipt deny - let (sender_account, notify, prefix, _) = create_sender_account( - pgpool.clone(), - HashSet::new(), - trigger_rav_request, - u128::MAX, - DUMMY_URL, - DUMMY_URL, - RECEIPT_LIMIT, - ) - .await; + let (sender_account, notify, prefix, _) = create_sender_account() + .pgpool(pgpool.clone()) + .initial_allocation(HashSet::new()) + .rav_request_trigger_value(trigger_rav_request) + .max_amount_willing_to_lose_grt(u128::MAX) + .rav_request_receipt_limit(RECEIPT_LIMIT) + .call() + .await; let (mock_sender_allocation, next_rav_value) = MockSenderAllocation::new_with_next_rav_value(sender_account.clone()); @@ -1933,16 +1831,15 @@ pub mod tests { .await .unwrap(); - let (sender_account, notify, _, escrow_accounts_tx) = create_sender_account( - pgpool.clone(), - HashSet::new(), - TRIGGER_VALUE, - u128::MAX, - &mock_server.uri(), - DUMMY_URL, - RECEIPT_LIMIT, - ) - .await; + let (sender_account, notify, _, escrow_accounts_tx) = create_sender_account() + .pgpool(pgpool.clone()) + .initial_allocation(HashSet::new()) + .rav_request_trigger_value(TRIGGER_VALUE) + .max_amount_willing_to_lose_grt(u128::MAX) + .escrow_subgraph_endpoint(&mock_server.uri()) + .rav_request_receipt_limit(RECEIPT_LIMIT) + .call() + .await; let deny = call!(sender_account, SenderAccountMessage::GetDeny).unwrap(); assert!(!deny, "should start unblocked"); @@ -1989,16 +1886,14 @@ pub mod tests { .await .unwrap(); - let (sender_account, notify, _, escrow_accounts_tx) = create_sender_account( - pgpool.clone(), - HashSet::new(), - TRIGGER_VALUE, - u128::MAX, - DUMMY_URL, - DUMMY_URL, - RECEIPT_LIMIT, - ) - .await; + let (sender_account, notify, _, escrow_accounts_tx) = create_sender_account() + .pgpool(pgpool.clone()) + .initial_allocation(HashSet::new()) + .rav_request_trigger_value(TRIGGER_VALUE) + .max_amount_willing_to_lose_grt(u128::MAX) + .rav_request_receipt_limit(RECEIPT_LIMIT) + .call() + .await; let deny = call!(sender_account, SenderAccountMessage::GetDeny).unwrap(); assert!(!deny, "should start unblocked"); @@ -2037,16 +1932,14 @@ pub mod tests { // we set to 1 to block the sender on a really low value let max_unaggregated_fees_per_sender: u128 = 1; - let (sender_account, notify, prefix, _) = create_sender_account( - pgpool, - HashSet::new(), - TRIGGER_VALUE, - max_unaggregated_fees_per_sender, - DUMMY_URL, - DUMMY_URL, - RECEIPT_LIMIT, - ) - .await; + let (sender_account, notify, prefix, _) = create_sender_account() + .pgpool(pgpool) + .initial_allocation(HashSet::new()) + .rav_request_trigger_value(TRIGGER_VALUE) + .max_amount_willing_to_lose_grt(max_unaggregated_fees_per_sender) + .rav_request_receipt_limit(RECEIPT_LIMIT) + .call() + .await; let (mock_sender_allocation, _, next_unaggregated_fees) = MockSenderAllocation::new_with_triggered_rav_request(sender_account.clone()); diff --git a/crates/tap-agent/src/agent/sender_accounts_manager.rs b/crates/tap-agent/src/agent/sender_accounts_manager.rs index 94797979..b9506107 100644 --- a/crates/tap-agent/src/agent/sender_accounts_manager.rs +++ b/crates/tap-agent/src/agent/sender_accounts_manager.rs @@ -243,7 +243,7 @@ impl Actor for SenderAccountsManager { tracing::error!("SenderAllocation doesn't have a name"); return Ok(()); }; - let Some(sender_id) = sender_id.split(':').last() else { + let Some(sender_id) = sender_id.split(':').next_back() else { tracing::error!(%sender_id, "Could not extract sender_id from name"); return Ok(()); }; @@ -598,115 +598,55 @@ async fn handle_notification( #[cfg(test)] mod tests { - use std::{ - collections::{HashMap, HashSet}, - sync::Arc, - time::Duration, - }; + use std::collections::{HashMap, HashSet}; use indexer_monitor::{DeploymentDetails, EscrowAccounts, SubgraphClient}; - use ractor::{concurrency::JoinHandle, Actor, ActorRef, ActorStatus}; + use ractor::{Actor, ActorRef, ActorStatus}; use reqwest::Url; use ruint::aliases::U256; use sqlx::{postgres::PgListener, PgPool}; use test_assets::{flush_messages, TAP_SENDER as SENDER, TAP_SIGNER as SIGNER}; use thegraph_core::alloy::hex::ToHexExt; - use tokio::sync::{mpsc, mpsc::error::TryRecvError, watch, Notify}; - - use super::{ - new_receipts_watcher, SenderAccountsManager, SenderAccountsManagerArgs, - SenderAccountsManagerMessage, State, + use tokio::sync::{ + mpsc::{self, error::TryRecvError}, + watch, }; + + use super::{new_receipts_watcher, SenderAccountsManagerMessage, State}; use crate::{ agent::{ - sender_account::{tests::PREFIX_ID, SenderAccountConfig, SenderAccountMessage}, + sender_account::{tests::PREFIX_ID, SenderAccountMessage}, sender_accounts_manager::{handle_notification, NewReceiptNotification}, }, test::{ actors::{DummyActor, MockSenderAccount, MockSenderAllocation, TestableActor}, - create_rav, create_received_receipt, get_grpc_url, store_rav, store_receipt, - ALLOCATION_ID_0, ALLOCATION_ID_1, INDEXER, SENDER_2, TAP_EIP712_DOMAIN_SEPARATOR, + create_rav, create_received_receipt, create_sender_accounts_manager, get_grpc_url, + get_sender_account_config, store_rav, store_receipt, ALLOCATION_ID_0, ALLOCATION_ID_1, + INDEXER, SENDER_2, TAP_EIP712_DOMAIN_SEPARATOR, }, }; - const DUMMY_URL: &str = "http://localhost:1234"; - async fn get_subgraph_client() -> &'static SubgraphClient { Box::leak(Box::new( SubgraphClient::new( reqwest::Client::new(), None, - DeploymentDetails::for_query_url(DUMMY_URL).unwrap(), + DeploymentDetails::for_query_url(&get_grpc_url().await).unwrap(), ) .await, )) } - fn get_config() -> &'static SenderAccountConfig { - Box::leak(Box::new(SenderAccountConfig { - rav_request_buffer: Duration::from_millis(1), - max_amount_willing_to_lose_grt: 0, - trigger_value: 100, - rav_request_timeout: Duration::from_millis(1), - rav_request_receipt_limit: 1000, - indexer_address: INDEXER.1, - escrow_polling_interval: Duration::default(), - tap_sender_timeout: Duration::from_secs(30), - })) - } - - async fn create_sender_accounts_manager( - pgpool: PgPool, - ) -> ( - String, - Arc, - (ActorRef, JoinHandle<()>), - ) { - let config = get_config(); - - let (_allocations_tx, allocations_rx) = watch::channel(HashMap::new()); - let escrow_subgraph = get_subgraph_client().await; - let network_subgraph = get_subgraph_client().await; - - let (_, escrow_accounts_rx) = watch::channel(EscrowAccounts::default()); - - // Start a new mock aggregator server for this test - let prefix = format!( - "test-{}", - PREFIX_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst) - ); - let args = SenderAccountsManagerArgs { - config, - domain_separator: TAP_EIP712_DOMAIN_SEPARATOR.clone(), - pgpool, - indexer_allocations: allocations_rx, - escrow_accounts: escrow_accounts_rx, - escrow_subgraph, - network_subgraph, - sender_aggregator_endpoints: HashMap::from([ - (SENDER.1, Url::parse(&get_grpc_url().await).unwrap()), - (SENDER_2.1, Url::parse(&get_grpc_url().await).unwrap()), - ]), - prefix: Some(prefix.clone()), - }; - let actor = TestableActor::new(SenderAccountsManager); - let notify = actor.notify.clone(); - ( - prefix, - notify, - Actor::spawn(None, actor, args).await.unwrap(), - ) - } - #[sqlx::test(migrations = "../../migrations")] async fn test_create_sender_accounts_manager(pgpool: PgPool) { - let (_, _, (actor, join_handle)) = create_sender_accounts_manager(pgpool).await; + let (_, _, (actor, join_handle)) = + create_sender_accounts_manager().pgpool(pgpool).call().await; actor.stop_and_wait(None, None).await.unwrap(); join_handle.await.unwrap(); } async fn create_state(pgpool: PgPool) -> (String, State) { - let config = get_config(); + let config = get_sender_account_config(); let senders_to_signers = vec![(SENDER.1, vec![SIGNER.1])].into_iter().collect(); let escrow_accounts = EscrowAccounts::new(HashMap::new(), senders_to_signers); @@ -762,7 +702,8 @@ mod tests { #[sqlx::test(migrations = "../../migrations")] async fn test_update_sender_allocation(pgpool: PgPool) { - let (prefix, notify, (actor, join_handle)) = create_sender_accounts_manager(pgpool).await; + let (prefix, notify, (actor, join_handle)) = + create_sender_accounts_manager().pgpool(pgpool).call().await; actor .cast(SenderAccountsManagerMessage::UpdateSenderAccounts( diff --git a/crates/tap-agent/src/agent/sender_allocation.rs b/crates/tap-agent/src/agent/sender_allocation.rs index 0c0f938a..5d102287 100644 --- a/crates/tap-agent/src/agent/sender_allocation.rs +++ b/crates/tap-agent/src/agent/sender_allocation.rs @@ -149,7 +149,7 @@ pub struct SenderAllocationArgs { pub enum SenderAllocationMessage { NewReceipt(NewReceiptNotification), TriggerRAVRequest, - #[cfg(test)] + #[cfg(any(test, feature = "test"))] GetUnaggregatedReceipts(ractor::RpcReplyPort), } @@ -264,6 +264,7 @@ impl Actor for SenderAllocation { "New SenderAllocation message" ); let unaggregated_fees = &mut state.unaggregated_fees; + match message { SenderAllocationMessage::NewReceipt(notification) => { let NewReceiptNotification { @@ -311,7 +312,6 @@ impl Actor for SenderAllocation { Err(anyhow!("Unaggregated fee equals zero")) }; let rav_response = (state.unaggregated_fees, rav_result); - // encapsulate inanother okay, unwrap after and send the result over here state .sender_account_ref .cast(SenderAccountMessage::UpdateReceiptFees( @@ -319,7 +319,7 @@ impl Actor for SenderAllocation { ReceiptFees::RavRequestResponse(rav_response), ))?; } - #[cfg(test)] + #[cfg(any(test, feature = "test"))] SenderAllocationMessage::GetUnaggregatedReceipts(reply) => { if !reply.is_closed() { let _ = reply.send(*unaggregated_fees); @@ -410,7 +410,6 @@ impl SenderAllocationState { self.tap_manager.remove_obsolete_receipts().await?; let signers = signers_trimmed(self.escrow_accounts.clone(), self.sender).await?; - let res = sqlx::query!( r#" SELECT @@ -607,7 +606,6 @@ impl SenderAllocationState { ); } })?; - let rav_response_time = rav_response_time_start.elapsed(); RAV_RESPONSE_TIME .with_label_values(&[&self.sender.to_string()]) @@ -860,17 +858,22 @@ impl SenderAllocationState { pub mod tests { use std::{ collections::HashMap, + future::Future, sync::Arc, time::{Duration, SystemTime, UNIX_EPOCH}, }; + use bigdecimal::ToPrimitive; use futures::future::join_all; use indexer_monitor::{DeploymentDetails, EscrowAccounts, SubgraphClient}; use ractor::{call, cast, Actor, ActorRef, ActorStatus}; use ruint::aliases::U256; use serde_json::json; use sqlx::PgPool; - use tap_aggregator::grpc::{tap_aggregator_client::TapAggregatorClient, RavResponse}; + use tap_aggregator::{ + grpc::{tap_aggregator_client::TapAggregatorClient, RavResponse}, + server::run_server, + }; use tap_core::receipt::{ checks::{Check, CheckError, CheckList, CheckResult}, state::Checking, @@ -899,11 +902,13 @@ pub mod tests { }, test::{ actors::{create_mock_sender_account, TestableActor}, - create_rav, create_received_receipt, get_grpc_url, store_invalid_receipt, store_rav, - store_receipt, INDEXER, + create_rav, create_received_receipt, get_grpc_url, store_batch_receipts, + store_invalid_receipt, store_rav, store_receipt, INDEXER, }, }; + const RECEIPT_LIMIT: u64 = 1000; + async fn mock_escrow_subgraph() -> (MockServer, MockGuard) { let mock_ecrow_subgraph_server: MockServer = MockServer::start().await; let _mock_ecrow_subgraph = mock_ecrow_subgraph_server @@ -925,6 +930,7 @@ pub mod tests { pgpool: PgPool, sender_aggregator_endpoint: String, escrow_subgraph_endpoint: &str, + rav_request_receipt_limit: u64, sender_account: Option>, ) -> SenderAllocationArgs { let escrow_subgraph = Box::leak(Box::new( @@ -969,23 +975,26 @@ pub mod tests { sender_aggregator, config: super::AllocationConfig { timestamp_buffer_ns: 1, - rav_request_receipt_limit: 1000, + rav_request_receipt_limit, indexer_address: INDEXER.1, escrow_polling_interval: Duration::from_millis(1000), }, } } + #[bon::builder] async fn create_sender_allocation( pgpool: PgPool, - sender_aggregator_endpoint: String, + sender_aggregator_endpoint: Option, escrow_subgraph_endpoint: &str, + #[builder(default = 1000)] rav_request_receipt_limit: u64, sender_account: Option>, ) -> (ActorRef, Arc) { let args = create_sender_allocation_args( pgpool, - sender_aggregator_endpoint, + sender_aggregator_endpoint.unwrap_or(get_grpc_url().await), escrow_subgraph_endpoint, + rav_request_receipt_limit, sender_account, ) .await; @@ -1009,13 +1018,13 @@ pub mod tests { .unwrap(); } - let (sender_allocation, _notify) = create_sender_allocation( - pgpool.clone(), - get_grpc_url().await, - &mock_escrow_subgraph_server.uri(), - Some(sender_account), - ) - .await; + let (sender_allocation, _notify) = create_sender_allocation() + .pgpool(pgpool.clone()) + .escrow_subgraph_endpoint(&mock_escrow_subgraph_server.uri()) + .sender_account(sender_account) + .sender_aggregator_endpoint(get_grpc_url().await) + .call() + .await; // Get total_unaggregated_fees let total_unaggregated_fees = call!( @@ -1052,13 +1061,13 @@ pub mod tests { .unwrap(); } - let (sender_allocation, _notify) = create_sender_allocation( - pgpool.clone(), - get_grpc_url().await, - &mock_escrow_subgraph_server.uri(), - Some(sender_account), - ) - .await; + let (sender_allocation, _notify) = create_sender_allocation() + .pgpool(pgpool.clone()) + .escrow_subgraph_endpoint(&mock_escrow_subgraph_server.uri()) + .sender_account(sender_account) + .sender_aggregator_endpoint(get_grpc_url().await) + .call() + .await; // Get total_unaggregated_fees let total_unaggregated_fees = call!( @@ -1096,13 +1105,13 @@ pub mod tests { let (mut message_receiver, sender_account) = create_mock_sender_account().await; - let (sender_allocation, notify) = create_sender_allocation( - pgpool.clone(), - get_grpc_url().await, - &mock_escrow_subgraph_server.uri(), - Some(sender_account), - ) - .await; + let (sender_allocation, notify) = create_sender_allocation() + .pgpool(pgpool.clone()) + .escrow_subgraph_endpoint(&mock_escrow_subgraph_server.uri()) + .sender_account(sender_account) + .sender_aggregator_endpoint(get_grpc_url().await) + .call() + .await; // should validate with id less than last_id cast!( @@ -1175,6 +1184,7 @@ pub mod tests { .await; // Add receipts to the database. + for i in 0..10 { let receipt = create_received_receipt(&ALLOCATION_ID_0, &SIGNER.0, i, i + 1, i.into()); store_receipt(&pgpool, receipt.signed_receipt()) @@ -1186,17 +1196,16 @@ pub mod tests { .await .unwrap(); } - let (mut message_receiver, sender_account) = create_mock_sender_account().await; // Create a sender_allocation. - let (sender_allocation, notify) = create_sender_allocation( - pgpool.clone(), - get_grpc_url().await, - &mock_server.uri(), - Some(sender_account), - ) - .await; + let (sender_allocation, notify) = create_sender_allocation() + .pgpool(pgpool.clone()) + .escrow_subgraph_endpoint(&mock_server.uri()) + .sender_account(sender_account) + .sender_aggregator_endpoint(get_grpc_url().await) + .call() + .await; // Trigger a RAV request manually and wait for updated fees. sender_allocation @@ -1232,7 +1241,7 @@ pub mod tests { ALLOCATION_ID_0, UnaggregatedReceipts { last_id: 0, - value: 45u128, + value: 45, counter: 0, }, ); @@ -1244,19 +1253,142 @@ pub mod tests { )); } + async fn execute( + pgpool: PgPool, + amount_of_receipts: u64, + populate: impl FnOnce(PgPool) -> Fut, + ) where + Fut: Future, + { + // Start a TAP aggregator server. + let (handle, aggregator_endpoint) = run_server( + 0, + SIGNER.0.clone(), + vec![SIGNER.1].into_iter().collect(), + TAP_EIP712_DOMAIN_SEPARATOR.clone(), + 1000 * 1024, + 1000 * 1024, + 1, + ) + .await + .unwrap(); + + // Start a mock graphql server using wiremock + let mock_server = MockServer::start().await; + + // Mock result for TAP redeem txs for (allocation, sender) pair. + mock_server + .register( + Mock::given(method("POST")) + .and(body_string_contains("transactions")) + .respond_with( + ResponseTemplate::new(200) + .set_body_json(json!({ "data": { "transactions": []}})), + ), + ) + .await; + + populate(pgpool.clone()).await; + + let (mut message_receiver, sender_account) = create_mock_sender_account().await; + + // Create a sender_allocation. + let (sender_allocation, notify) = create_sender_allocation() + .pgpool(pgpool.clone()) + .sender_aggregator_endpoint("http://".to_owned() + &aggregator_endpoint.to_string()) + .escrow_subgraph_endpoint(&mock_server.uri()) + .rav_request_receipt_limit(2000) + .sender_account(sender_account) + .call() + .await; + + // Trigger a RAV request manually and wait for updated fees. + sender_allocation + .cast(SenderAllocationMessage::TriggerRAVRequest) + .unwrap(); + + flush_messages(¬ify).await; + + let total_unaggregated_fees = call!( + sender_allocation, + SenderAllocationMessage::GetUnaggregatedReceipts + ) + .unwrap(); + + // Check that the unaggregated fees are correct. + assert_eq!(total_unaggregated_fees.value, 0u128); + + let startup_msg = message_receiver.recv().await.unwrap(); + let expected_value: u128 = ((amount_of_receipts.to_u128().expect("should be u128") + - 1u128) + * (amount_of_receipts.to_u128().expect("should be u128"))) + / 2; + assert_eq!( + startup_msg, + SenderAccountMessage::UpdateReceiptFees( + ALLOCATION_ID_0, + ReceiptFees::UpdateValue(UnaggregatedReceipts { + value: expected_value, + last_id: amount_of_receipts, + counter: amount_of_receipts, + }) + ) + ); + + // Stop the TAP aggregator server. + handle.abort(); + // handle.stop().unwrap(); + // handle.stopped().await; + } + + #[sqlx::test(migrations = "../../migrations")] + async fn test_several_receipts_rav_request(pgpool: PgPool) { + const AMOUNT_OF_RECEIPTS: u64 = 1000; + execute(pgpool, AMOUNT_OF_RECEIPTS, |pgpool| async move { + // Add receipts to the database. + + for i in 0..AMOUNT_OF_RECEIPTS { + let receipt = + create_received_receipt(&ALLOCATION_ID_0, &SIGNER.0, i, i + 1, i.into()); + store_receipt(&pgpool, receipt.signed_receipt()) + .await + .unwrap(); + } + }) + .await; + } + + #[sqlx::test(migrations = "../../migrations")] + async fn test_several_receipts_batch_insert_rav_request(pgpool: PgPool) { + // Add batch receipts to the database. + const AMOUNT_OF_RECEIPTS: u64 = 1000; + execute(pgpool, AMOUNT_OF_RECEIPTS, |pgpool| async move { + // Add receipts to the database. + let mut receipts: Vec> = Vec::with_capacity(1000); + for i in 0..AMOUNT_OF_RECEIPTS { + let receipt = + create_received_receipt(&ALLOCATION_ID_0, &SIGNER.0, i, i + 1, i.into()); + receipts.push(receipt); + } + let res = store_batch_receipts(&pgpool, receipts).await; + assert!(res.is_ok()); + }) + .await; + } + #[sqlx::test(migrations = "../../migrations")] async fn test_close_allocation_no_pending_fees(pgpool: PgPool) { let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await; let (mut message_receiver, sender_account) = create_mock_sender_account().await; // create allocation - let (sender_allocation, _notify) = create_sender_allocation( - pgpool.clone(), - get_grpc_url().await, - &mock_escrow_subgraph_server.uri(), - Some(sender_account), - ) - .await; + let (sender_allocation, _notify) = create_sender_allocation() + .pgpool(pgpool.clone()) + .escrow_subgraph_endpoint(&mock_escrow_subgraph_server.uri()) + .sender_account(sender_account) + .sender_aggregator_endpoint(get_grpc_url().await) + .call() + .await; sender_allocation.stop_and_wait(None, None).await.unwrap(); @@ -1323,13 +1455,16 @@ pub mod tests { let (_, sender_account) = create_mock_sender_account().await; // create allocation - let (sender_allocation, _notify) = create_sender_allocation( - pgpool.clone(), - format!("http://[::1]:{}", mock_aggregator.address().port()), - &mock_server.uri(), - Some(sender_account), - ) - .await; + let (sender_allocation, _notify) = create_sender_allocation() + .pgpool(pgpool.clone()) + .sender_aggregator_endpoint(format!( + "http://[::1]:{}", + mock_aggregator.address().port() + )) + .escrow_subgraph_endpoint(&mock_server.uri()) + .sender_account(sender_account) + .call() + .await; sender_allocation.stop_and_wait(None, None).await.unwrap(); @@ -1349,6 +1484,7 @@ pub mod tests { pgpool.clone(), get_grpc_url().await, &mock_escrow_subgraph_server.uri(), + RECEIPT_LIMIT, None, ) .await; @@ -1376,6 +1512,7 @@ pub mod tests { pgpool.clone(), get_grpc_url().await, &mock_escrow_subgraph_server.uri(), + RECEIPT_LIMIT, None, ) .await; @@ -1409,6 +1546,7 @@ pub mod tests { pgpool.clone(), get_grpc_url().await, &mock_escrow_subgraph_server.uri(), + RECEIPT_LIMIT, None, ) .await; @@ -1442,6 +1580,7 @@ pub mod tests { pgpool.clone(), get_grpc_url().await, &mock_escrow_subgraph_server.uri(), + RECEIPT_LIMIT, None, ) .await; @@ -1477,6 +1616,7 @@ pub mod tests { pgpool.clone(), get_grpc_url().await, &mock_escrow_subgraph_server.uri(), + RECEIPT_LIMIT, None, ) .await; @@ -1519,6 +1659,7 @@ pub mod tests { pgpool.clone(), get_grpc_url().await, &mock_escrow_subgraph_server.uri(), + RECEIPT_LIMIT, None, ) .await; @@ -1547,13 +1688,13 @@ pub mod tests { let (mut message_receiver, sender_account) = create_mock_sender_account().await; // Create a sender_allocation. - let (sender_allocation, notify) = create_sender_allocation( - pgpool.clone(), - get_grpc_url().await, - &mock_escrow_subgraph_server.uri(), - Some(sender_account), - ) - .await; + let (sender_allocation, notify) = create_sender_allocation() + .pgpool(pgpool.clone()) + .sender_aggregator_endpoint(get_grpc_url().await) + .escrow_subgraph_endpoint(&mock_escrow_subgraph_server.uri()) + .sender_account(sender_account) + .call() + .await; // Trigger a RAV request manually and wait for updated fees. // this should fail because there's no receipt with valid timestamp @@ -1634,13 +1775,13 @@ pub mod tests { let (mut message_receiver, sender_account) = create_mock_sender_account().await; - let (sender_allocation, notify) = create_sender_allocation( - pgpool.clone(), - get_grpc_url().await, - &mock_server.uri(), - Some(sender_account), - ) - .await; + let (sender_allocation, notify) = create_sender_allocation() + .pgpool(pgpool.clone()) + .sender_aggregator_endpoint(get_grpc_url().await) + .escrow_subgraph_endpoint(&mock_server.uri()) + .sender_account(sender_account) + .call() + .await; // Trigger a RAV request manually and wait for updated fees. // this should fail because there's no receipt with valid timestamp diff --git a/crates/tap-agent/src/lib.rs b/crates/tap-agent/src/lib.rs index d0f6452e..0a63214d 100644 --- a/crates/tap-agent/src/lib.rs +++ b/crates/tap-agent/src/lib.rs @@ -21,7 +21,6 @@ pub mod cli; pub mod database; pub mod metrics; pub mod tap; -pub mod tracker; - -#[cfg(test)] +#[cfg(any(test, feature = "test"))] pub mod test; +pub mod tracker; diff --git a/crates/tap-agent/src/tap/context/receipt.rs b/crates/tap-agent/src/tap/context/receipt.rs index f9418f43..b01e6112 100644 --- a/crates/tap-agent/src/tap/context/receipt.rs +++ b/crates/tap-agent/src/tap/context/receipt.rs @@ -274,14 +274,14 @@ mod test { range.contains(&received_receipt.signed_receipt().message.timestamp_ns) && (received_receipt.signed_receipt().message.allocation_id == storage_adapter.allocation_id) - && (escrow_accounts_snapshot + && escrow_accounts_snapshot .get_sender_for_signer( &received_receipt .signed_receipt() .recover_signer(&TAP_EIP712_DOMAIN_SEPARATOR) .unwrap(), ) - .map_or(false, |v| v == storage_adapter.sender)) + .is_ok_and(|v| v == storage_adapter.sender) }) .cloned() .collect(); @@ -340,14 +340,14 @@ mod test { .filter(|(_, received_receipt)| { if (received_receipt.signed_receipt().message.allocation_id == storage_adapter.allocation_id) - && (escrow_accounts_snapshot + && escrow_accounts_snapshot .get_sender_for_signer( &received_receipt .signed_receipt() .recover_signer(&TAP_EIP712_DOMAIN_SEPARATOR) .unwrap(), ) - .map_or(false, |v| v == storage_adapter.sender)) + .is_ok_and(|v| v == storage_adapter.sender) { !range.contains(&received_receipt.signed_receipt().message.timestamp_ns) } else { diff --git a/crates/tap-agent/src/test.rs b/crates/tap-agent/src/test.rs index 405ea399..8dbec5c7 100644 --- a/crates/tap-agent/src/test.rs +++ b/crates/tap-agent/src/test.rs @@ -1,10 +1,21 @@ // Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 +use std::{ + collections::{HashMap, HashSet}, + net::SocketAddr, + sync::{atomic::AtomicU32, Arc}, + time::Duration, +}; + +use actors::TestableActor; +use anyhow::anyhow; use bigdecimal::num_bigint::BigInt; +use indexer_monitor::{DeploymentDetails, EscrowAccounts, SubgraphClient}; use lazy_static::lazy_static; +use ractor::{concurrency::JoinHandle, Actor, ActorRef}; +use reqwest::Url; use sqlx::{types::BigDecimal, PgPool}; -use std::net::SocketAddr; use tap_aggregator::server::run_server; use tap_core::{ rav::{ReceiptAggregateVoucher, SignedRAV}, @@ -12,16 +23,32 @@ use tap_core::{ signed_message::EIP712SignedMessage, tap_eip712_domain, }; -use test_assets::TAP_SIGNER as SIGNER; +use test_assets::{flush_messages, TAP_SENDER as SENDER, TAP_SIGNER as SIGNER}; use thegraph_core::alloy::{ - primitives::{address, hex::ToHexExt, Address}, + primitives::{address, hex::ToHexExt, Address, U256}, signers::local::{coins_bip39::English, MnemonicBuilder, PrivateKeySigner}, sol_types::Eip712Domain, }; -use tokio::task::JoinHandle; pub const ALLOCATION_ID_0: Address = address!("abababababababababababababababababababab"); pub const ALLOCATION_ID_1: Address = address!("bcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbc"); +use tokio::sync::{ + watch::{self, Sender}, + Notify, +}; +use tracing::error; + +use crate::{ + agent::{ + sender_account::{ + SenderAccount, SenderAccountArgs, SenderAccountConfig, SenderAccountMessage, + }, + sender_accounts_manager::{ + SenderAccountsManager, SenderAccountsManagerArgs, SenderAccountsManagerMessage, + }, + }, + tap::context::AdapterError, +}; lazy_static! { // pub static ref SENDER: (PrivateKeySigner, Address) = wallet(0); @@ -31,6 +58,192 @@ lazy_static! { tap_eip712_domain(1, Address::from([0x11u8; 20]),); } +pub static PREFIX_ID: AtomicU32 = AtomicU32::new(0); + +const TRIGGER_VALUE: u128 = 500; +const ESCROW_VALUE: u128 = 1000; +const BUFFER_DURATION: Duration = Duration::from_millis(100); +const RETRY_DURATION: Duration = Duration::from_millis(1000); +const RAV_REQUEST_TIMEOUT: Duration = Duration::from_secs(60); +const TAP_SENDER_TIMEOUT: Duration = Duration::from_secs(30); + +const RAV_REQUEST_BUFFER: Duration = Duration::from_secs(60); +const ESCROW_POLLING_INTERVAL: Duration = Duration::from_secs(30); + +pub fn get_sender_account_config() -> &'static SenderAccountConfig { + Box::leak(Box::new(SenderAccountConfig { + rav_request_buffer: RAV_REQUEST_BUFFER, + max_amount_willing_to_lose_grt: TRIGGER_VALUE + 100, + trigger_value: TRIGGER_VALUE, + rav_request_timeout: Duration::from_secs(30), + rav_request_receipt_limit: 1000, + indexer_address: INDEXER.1, + escrow_polling_interval: ESCROW_POLLING_INTERVAL, + tap_sender_timeout: Duration::from_secs(63), + })) +} + +#[allow(clippy::too_many_arguments)] +#[bon::builder] +pub async fn create_sender_account( + pgpool: PgPool, + initial_allocation: HashSet
, + rav_request_trigger_value: u128, + max_amount_willing_to_lose_grt: u128, + escrow_subgraph_endpoint: Option<&str>, + network_subgraph_endpoint: Option<&str>, + rav_request_receipt_limit: u64, + aggregator_endpoint: Option, +) -> ( + ActorRef, + Arc, + String, + Sender, +) { + let config = Box::leak(Box::new(SenderAccountConfig { + rav_request_buffer: BUFFER_DURATION, + max_amount_willing_to_lose_grt, + trigger_value: rav_request_trigger_value, + rav_request_timeout: RAV_REQUEST_TIMEOUT, + rav_request_receipt_limit, + indexer_address: INDEXER.1, + escrow_polling_interval: Duration::default(), + tap_sender_timeout: TAP_SENDER_TIMEOUT, + })); + + let network_subgraph = Box::leak(Box::new( + SubgraphClient::new( + reqwest::Client::new(), + None, + DeploymentDetails::for_query_url( + network_subgraph_endpoint.unwrap_or(&get_grpc_url().await), + ) + .unwrap(), + ) + .await, + )); + let escrow_subgraph = Box::leak(Box::new( + SubgraphClient::new( + reqwest::Client::new(), + None, + DeploymentDetails::for_query_url( + escrow_subgraph_endpoint.unwrap_or(&get_grpc_url().await), + ) + .unwrap(), + ) + .await, + )); + let (escrow_accounts_tx, escrow_accounts_rx) = watch::channel(EscrowAccounts::default()); + escrow_accounts_tx + .send(EscrowAccounts::new( + HashMap::from([(SENDER.1, U256::from(ESCROW_VALUE))]), + HashMap::from([(SENDER.1, vec![SIGNER.1])]), + )) + .expect("Failed to update escrow_accounts channel"); + + let prefix = format!( + "test-{}", + PREFIX_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst) + ); + + let args = SenderAccountArgs { + config, + pgpool, + sender_id: SENDER.1, + escrow_accounts: escrow_accounts_rx, + indexer_allocations: watch::channel(initial_allocation).1, + escrow_subgraph, + network_subgraph, + domain_separator: TAP_EIP712_DOMAIN_SEPARATOR.clone(), + sender_aggregator_endpoint: aggregator_endpoint + .unwrap_or(Url::parse(&get_grpc_url().await).unwrap()), + allocation_ids: HashSet::new(), + prefix: Some(prefix.clone()), + retry_interval: RETRY_DURATION, + }; + + let actor = TestableActor::new(SenderAccount); + let notify = actor.notify.clone(); + + let (sender, _) = Actor::spawn(Some(prefix.clone()), actor, args) + .await + .unwrap(); + + // flush all messages + flush_messages(¬ify).await; + + (sender, notify, prefix, escrow_accounts_tx) +} + +#[bon::builder] +pub async fn create_sender_accounts_manager( + pgpool: PgPool, + aggregator_endpoint: Option, + network_subgraph: Option<&str>, + escrow_subgraph: Option<&str>, +) -> ( + String, + Arc, + (ActorRef, JoinHandle<()>), +) { + let config = get_sender_account_config(); + let (_allocations_tx, allocations_rx) = watch::channel(HashMap::new()); + let escrow_subgraph = Box::leak(Box::new( + SubgraphClient::new( + reqwest::Client::new(), + None, + DeploymentDetails::for_query_url(escrow_subgraph.unwrap_or(&get_grpc_url().await)) + .unwrap(), + ) + .await, + )); + let network_subgraph = Box::leak(Box::new( + SubgraphClient::new( + reqwest::Client::new(), + None, + DeploymentDetails::for_query_url(network_subgraph.unwrap_or(&get_grpc_url().await)) + .unwrap(), + ) + .await, + )); + let (escrow_accounts_tx, escrow_accounts_rx) = watch::channel(EscrowAccounts::default()); + escrow_accounts_tx + .send(EscrowAccounts::new( + HashMap::from([(SENDER.1, U256::from(ESCROW_VALUE))]), + HashMap::from([(SENDER.1, vec![SIGNER.1])]), + )) + .expect("Failed to update escrow_accounts channel"); + + let prefix = format!( + "test-{}", + PREFIX_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst) + ); + let args = SenderAccountsManagerArgs { + config, + domain_separator: TAP_EIP712_DOMAIN_SEPARATOR.clone(), + pgpool, + indexer_allocations: allocations_rx, + escrow_accounts: escrow_accounts_rx, + escrow_subgraph, + network_subgraph, + sender_aggregator_endpoints: HashMap::from([ + ( + SENDER.1, + aggregator_endpoint.unwrap_or(Url::parse(&get_grpc_url().await).unwrap()), + ), + (SENDER_2.1, Url::parse("http://localhost:8000").unwrap()), + ]), + prefix: Some(prefix.clone()), + }; + let actor = TestableActor::new(SenderAccountsManager); + let notify = actor.notify.clone(); + ( + prefix, + notify, + Actor::spawn(None, actor, args).await.unwrap(), + ) +} + /// Fixture to generate a RAV using the wallet from `keys()` pub fn create_rav( allocation_id: Address, @@ -100,6 +313,64 @@ pub async fn store_receipt(pgpool: &PgPool, signed_receipt: &SignedReceipt) -> a Ok(id) } +pub async fn store_batch_receipts( + pgpool: &PgPool, + receipts: Vec>, +) -> Result<(), AdapterError> { + let receipts_len = receipts.len(); + let mut signers = Vec::with_capacity(receipts_len); + let mut signatures = Vec::with_capacity(receipts_len); + let mut allocation_ids = Vec::with_capacity(receipts_len); + let mut timestamps = Vec::with_capacity(receipts_len); + let mut nonces = Vec::with_capacity(receipts_len); + let mut values = Vec::with_capacity(receipts_len); + + for receipt in receipts { + let receipt = receipt.signed_receipt(); + signers.push( + receipt + .recover_signer(&TAP_EIP712_DOMAIN_SEPARATOR) + .unwrap() + .encode_hex(), + ); + signatures.push(receipt.signature.as_bytes().to_vec()); + allocation_ids.push(receipt.message.allocation_id.encode_hex().to_string()); + timestamps.push(BigDecimal::from(receipt.message.timestamp_ns)); + nonces.push(BigDecimal::from(receipt.message.nonce)); + values.push(BigDecimal::from(receipt.message.value)); + } + let _ = sqlx::query!( + r#"INSERT INTO scalar_tap_receipts ( + signer_address, + signature, + allocation_id, + timestamp_ns, + nonce, + value + ) SELECT * FROM UNNEST( + $1::CHAR(40)[], + $2::BYTEA[], + $3::CHAR(40)[], + $4::NUMERIC(20)[], + $5::NUMERIC(20)[], + $6::NUMERIC(40)[] + )"#, + &signers, + &signatures, + &allocation_ids, + ×tamps, + &nonces, + &values, + ) + .execute(pgpool) + .await + .map_err(|e| { + error!("Failed to store receipt: {}", e); + anyhow!(e) + }); + Ok(()) +} + pub async fn store_invalid_receipt( pgpool: &PgPool, signed_receipt: &SignedReceipt, diff --git a/crates/tap-agent/tests/sender_account_manager_test.rs b/crates/tap-agent/tests/sender_account_manager_test.rs new file mode 100644 index 00000000..6f84ba89 --- /dev/null +++ b/crates/tap-agent/tests/sender_account_manager_test.rs @@ -0,0 +1,133 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use std::{collections::HashSet, str::FromStr}; + +use indexer_tap_agent::{ + agent::{ + sender_account::SenderAccountMessage, sender_accounts_manager::SenderAccountsManagerMessage, + }, + test::{ + create_received_receipt, create_sender_accounts_manager, get_grpc_url, store_receipt, + ALLOCATION_ID_0, + }, +}; +use ractor::ActorRef; +use reqwest::Url; +use serde_json::json; +use sqlx::PgPool; +use test_assets::{flush_messages, TAP_SENDER as SENDER, TAP_SIGNER as SIGNER}; +use wiremock::{ + matchers::{body_string_contains, method}, + Mock, MockServer, ResponseTemplate, +}; + +const TRIGGER_VALUE: u128 = 100; + +// This test should ensure the full flow starting from +// sender account manager layer to work, up to closing an allocation +#[sqlx::test(migrations = "../../migrations")] +async fn sender_account_manager_layer_test(pgpool: PgPool) { + let mock_network_subgraph_server: MockServer = MockServer::start().await; + mock_network_subgraph_server + .register( + Mock::given(method("POST")) + .and(body_string_contains("ClosedAllocations")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ "data": { + "meta": { + "block": { + "number": 1, + "hash": "hash", + "timestamp": 1 + } + }, + "allocations": [ + {"id": *ALLOCATION_ID_0 } + ] + } + }))), + ) + .await; + + let mock_escrow_subgraph_server: MockServer = MockServer::start().await; + mock_escrow_subgraph_server + .register(Mock::given(method("POST")).respond_with( + ResponseTemplate::new(200).set_body_json(json!({ "data": { + "transactions": [], + } + })), + )) + .await; + + let (prefix, notify, (actor, join_handle)) = create_sender_accounts_manager() + .pgpool(pgpool.clone()) + .aggregator_endpoint(Url::from_str(&get_grpc_url().await).unwrap()) + .network_subgraph(&mock_network_subgraph_server.uri()) + .escrow_subgraph(&mock_escrow_subgraph_server.uri()) + .call() + .await; + + actor + .cast(SenderAccountsManagerMessage::UpdateSenderAccounts( + vec![SENDER.1].into_iter().collect(), + )) + .unwrap(); + flush_messages(¬ify).await; + + // verify if create sender account + let actor_ref = + ActorRef::::where_is(format!("{}:{}", prefix.clone(), SENDER.1)); + assert!(actor_ref.is_some()); + + let receipt = create_received_receipt(&ALLOCATION_ID_0, &SIGNER.0, 1, 1, TRIGGER_VALUE - 10); + store_receipt(&pgpool, receipt.signed_receipt()) + .await + .unwrap(); + + // we expect it to create a sender allocation + actor_ref + .clone() + .unwrap() + .cast(SenderAccountMessage::UpdateAllocationIds( + vec![ALLOCATION_ID_0].into_iter().collect(), + )) + .unwrap(); + flush_messages(¬ify).await; + + // try to delete sender allocation_id + actor_ref + .clone() + .unwrap() + .cast(SenderAccountMessage::UpdateAllocationIds(HashSet::new())) + .unwrap(); + + actor_ref.unwrap().stop_children_and_wait(None, None).await; + + // this calls and closes acounts manager sender accounts + actor + .cast(SenderAccountsManagerMessage::UpdateSenderAccounts( + HashSet::new(), + )) + .unwrap(); + + flush_messages(¬ify).await; + // verify if it gets removed + let actor_ref = ActorRef::::where_is(format!("{}:{}", prefix, SENDER.1)); + assert!(actor_ref.is_none()); + + //verify the rav is marked as last + let rav_marked_as_last = sqlx::query!( + r#" + SELECT * FROM scalar_tap_ravs WHERE last = true; + "#, + ) + .fetch_all(&pgpool) + .await + .expect("Should not fail to fetch from scalar_tap_ravs"); + + assert!(!rav_marked_as_last.is_empty()); + + // safely stop the manager + actor.stop_and_wait(None, None).await.unwrap(); + join_handle.await.unwrap(); +} diff --git a/crates/tap-agent/tests/sender_account_test.rs b/crates/tap-agent/tests/sender_account_test.rs new file mode 100644 index 00000000..4add1098 --- /dev/null +++ b/crates/tap-agent/tests/sender_account_test.rs @@ -0,0 +1,102 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use std::{collections::HashSet, str::FromStr, sync::atomic::AtomicU32}; + +use indexer_tap_agent::{ + agent::sender_account::SenderAccountMessage, + test::{create_received_receipt, create_sender_account, get_grpc_url, store_receipt}, +}; +use ractor::concurrency::Duration; +use reqwest::Url; +use serde_json::json; +use sqlx::PgPool; +use test_assets::{ALLOCATION_ID_0, TAP_SIGNER as SIGNER}; +use wiremock::{ + matchers::{body_string_contains, method}, + Mock, MockServer, ResponseTemplate, +}; + +pub static PREFIX_ID: AtomicU32 = AtomicU32::new(0); +const TRIGGER_VALUE: u128 = 500; +const RECEIPT_LIMIT: u64 = 10000; + +// This test should ensure the full flow starting from +// sender account layer to work, up to closing an allocation +#[sqlx::test(migrations = "../../migrations")] +async fn sender_account_layer_test(pgpool: PgPool) { + let mock_server = MockServer::start().await; + let mock_escrow_subgraph_server: MockServer = MockServer::start().await; + mock_escrow_subgraph_server + .register(Mock::given(method("POST")).respond_with( + ResponseTemplate::new(200).set_body_json(json!({ "data": { + "transactions": [], + } + })), + )) + .await; + + let receipt = create_received_receipt(&ALLOCATION_ID_0, &SIGNER.0, 1, 1, TRIGGER_VALUE - 100); + store_receipt(&pgpool, receipt.signed_receipt()) + .await + .unwrap(); + + let (sender_account, notify, _, _) = create_sender_account() + .pgpool(pgpool.clone()) + .initial_allocation(HashSet::new()) + .rav_request_trigger_value(TRIGGER_VALUE) + .max_amount_willing_to_lose_grt(TRIGGER_VALUE + 1000) + .escrow_subgraph_endpoint(&mock_escrow_subgraph_server.uri()) + .network_subgraph_endpoint(&mock_server.uri()) + .rav_request_receipt_limit(RECEIPT_LIMIT) + .aggregator_endpoint(Url::from_str(&get_grpc_url().await).unwrap()) + .call() + .await; + + // we expect it to create a sender allocation + sender_account + .cast(SenderAccountMessage::UpdateAllocationIds( + vec![ALLOCATION_ID_0].into_iter().collect(), + )) + .unwrap(); + notify.notified().await; + + mock_server + .register( + Mock::given(method("POST")) + .and(body_string_contains("ClosedAllocations")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ "data": { + "meta": { + "block": { + "number": 1, + "hash": "hash", + "timestamp": 1 + } + }, + "allocations": [ + {"id": *ALLOCATION_ID_0 } + ] + } + }))), + ) + .await; + + // try to delete sender allocation_id + sender_account + .cast(SenderAccountMessage::UpdateAllocationIds(HashSet::new())) + .unwrap(); + + sender_account + .stop_children_and_wait(None, Some(Duration::from_secs(10))) + .await; + + let rav_marked_as_last = sqlx::query!( + r#" + SELECT * FROM scalar_tap_ravs WHERE last = true; + "#, + ) + .fetch_all(&pgpool) + .await + .expect("Should not fail to fetch from scalar_tap_ravs"); + assert!(!rav_marked_as_last.is_empty()); +} diff --git a/crates/tap-agent/tests/tap_agent_test.rs b/crates/tap-agent/tests/tap_agent_test.rs new file mode 100644 index 00000000..73c6ef0e --- /dev/null +++ b/crates/tap-agent/tests/tap_agent_test.rs @@ -0,0 +1,175 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use std::{ + collections::HashMap, + str::FromStr, + sync::{atomic::AtomicU32, Arc}, + time::Duration, +}; + +use indexer_monitor::{DeploymentDetails, EscrowAccounts, SubgraphClient}; +use indexer_tap_agent::{ + agent::{ + sender_account::{SenderAccountConfig, SenderAccountMessage}, + sender_accounts_manager::{ + SenderAccountsManager, SenderAccountsManagerArgs, SenderAccountsManagerMessage, + }, + sender_allocation::SenderAllocationMessage, + }, + test::{actors::TestableActor, create_received_receipt, get_grpc_url, store_batch_receipts}, +}; +use ractor::{call, concurrency::JoinHandle, Actor, ActorRef}; +use reqwest::Url; +use serde_json::json; +use sqlx::PgPool; +use tap_core::receipt::{state::Checking, ReceiptWithState}; +use test_assets::{ + assert_while_retry, flush_messages, ALLOCATION_ID_0, ALLOCATION_ID_1, ALLOCATION_ID_2, + ESCROW_ACCOUNTS_BALANCES, ESCROW_ACCOUNTS_SENDERS_TO_SIGNERS, INDEXER_ADDRESS, + INDEXER_ALLOCATIONS, TAP_EIP712_DOMAIN, TAP_SENDER, TAP_SIGNER, +}; +use thegraph_core::alloy::primitives::Address; +use tokio::sync::{watch, Notify}; +use wiremock::{matchers::method, Mock, MockServer, ResponseTemplate}; + +pub async fn start_agent( + pgpool: PgPool, +) -> ( + String, + Arc, + (ActorRef, JoinHandle<()>), +) { + let escrow_subgraph_mock_server: MockServer = MockServer::start().await; + escrow_subgraph_mock_server + .register(Mock::given(method("POST")).respond_with( + ResponseTemplate::new(200).set_body_json(json!({ "data": { + "transactions": [], + } + })), + )) + .await; + + let network_subgraph_mock_server = MockServer::start().await; + + let (_escrow_tx, escrow_accounts) = watch::channel(EscrowAccounts::new( + ESCROW_ACCOUNTS_BALANCES.clone(), + ESCROW_ACCOUNTS_SENDERS_TO_SIGNERS.clone(), + )); + let (_dispute_tx, _dispute_manager) = watch::channel(Address::ZERO); + + let (_allocations_tx, indexer_allocations1) = watch::channel(INDEXER_ALLOCATIONS.clone()); + + let sender_aggregator_endpoints: HashMap<_, _> = + vec![(TAP_SENDER.1, Url::from_str(&get_grpc_url().await).unwrap())] + .into_iter() + .collect(); + + let http_client = reqwest::Client::new(); + + let network_subgraph = Box::leak(Box::new( + SubgraphClient::new( + http_client.clone(), + None, + DeploymentDetails::for_query_url(&network_subgraph_mock_server.uri()).unwrap(), + ) + .await, + )); + + let escrow_subgraph = Box::leak(Box::new( + SubgraphClient::new( + http_client.clone(), + None, + DeploymentDetails::for_query_url(&escrow_subgraph_mock_server.uri()).unwrap(), + ) + .await, + )); + + let config = Box::leak(Box::new(SenderAccountConfig { + rav_request_buffer: Duration::from_millis(500), + max_amount_willing_to_lose_grt: 50, + trigger_value: 150, + rav_request_timeout: Duration::from_secs(60), + rav_request_receipt_limit: 10, + indexer_address: INDEXER_ADDRESS, + escrow_polling_interval: Duration::from_secs(10), + tap_sender_timeout: Duration::from_secs(30), + })); + pub static PREFIX_ID: AtomicU32 = AtomicU32::new(0); + let prefix = format!( + "test-{}", + PREFIX_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst) + ); + let args = SenderAccountsManagerArgs { + config, + domain_separator: TAP_EIP712_DOMAIN.clone(), + pgpool, + indexer_allocations: indexer_allocations1, + escrow_accounts, + escrow_subgraph, + network_subgraph, + sender_aggregator_endpoints: sender_aggregator_endpoints.clone(), + prefix: Some(prefix.clone()), + }; + + let actor = TestableActor::new(SenderAccountsManager); + let notify = actor.notify.clone(); + ( + prefix, + notify, + Actor::spawn(None, actor, args).await.unwrap(), + ) +} + +#[sqlx::test(migrations = "../../migrations")] +async fn test_start_tap_agent(pgpool: PgPool) { + let (prefix, notify, (_actor_ref, _handle)) = start_agent(pgpool.clone()).await; + flush_messages(¬ify).await; + + // verify if create sender account + let actor_ref = + ActorRef::::where_is(format!("{}:{}", prefix.clone(), TAP_SENDER.1)); + + assert!(actor_ref.is_some()); + + // Add batch receits to the database. + const AMOUNT_OF_RECEIPTS: u64 = 3000; + let allocations = [ALLOCATION_ID_0, ALLOCATION_ID_1, ALLOCATION_ID_2]; + let mut receipts: Vec> = + Vec::with_capacity(AMOUNT_OF_RECEIPTS as usize); + for i in 0..AMOUNT_OF_RECEIPTS { + // This would select the 3 defined allocations in order + let allocation_selected = (i % 3) as usize; + let receipt = create_received_receipt( + allocations.get(allocation_selected).unwrap(), + &TAP_SIGNER.0, + i, + i + 1, + i.into(), + ); + receipts.push(receipt); + } + let res = store_batch_receipts(&pgpool, receipts).await; + assert!(res.is_ok()); + let sender_allocation_ref = ActorRef::::where_is(format!( + "{}:{}:{}", + prefix.clone(), + TAP_SENDER.1, + ALLOCATION_ID_0, + )) + .unwrap(); + + assert_while_retry!( + { + let total_unaggregated_fees = call!( + sender_allocation_ref, + SenderAllocationMessage::GetUnaggregatedReceipts + ) + .unwrap(); + total_unaggregated_fees.value == 0u128 + }, + "Unnagregated fees", + Duration::from_secs(10), + Duration::from_millis(50) + ); +}