From c6af361065ecbfdb9c0443c7cf2a49da20f91a47 Mon Sep 17 00:00:00 2001 From: Ertugrul Aypek Date: Sun, 12 Jan 2025 15:57:25 +0100 Subject: [PATCH] pre-lookup addresses and use static resolver --- deploy/stage/common-values-iris-mpc.yaml | 2 +- iris-mpc-store/src/s3_importer.rs | 2 +- iris-mpc/src/bin/server.rs | 88 ++++++++++++++++-------- 3 files changed, 61 insertions(+), 31 deletions(-) diff --git a/deploy/stage/common-values-iris-mpc.yaml b/deploy/stage/common-values-iris-mpc.yaml index 461bb8636..572f11407 100644 --- a/deploy/stage/common-values-iris-mpc.yaml +++ b/deploy/stage/common-values-iris-mpc.yaml @@ -1,4 +1,4 @@ -image: "ghcr.io/worldcoin/iris-mpc:5dc76bf10d27aceaf57bda771c4470b4be461d17" +image: "ghcr.io/worldcoin/iris-mpc:05d4b8bf587826573163c6ef5d6145a2a12cf9f2" environment: stage replicaCount: 1 diff --git a/iris-mpc-store/src/s3_importer.rs b/iris-mpc-store/src/s3_importer.rs index a4b5622ed..58cfcefdf 100644 --- a/iris-mpc-store/src/s3_importer.rs +++ b/iris-mpc-store/src/s3_importer.rs @@ -19,7 +19,7 @@ const SINGLE_ELEMENT_SIZE: usize = IRIS_CODE_LENGTH * mem::size_of::() * 2 + MASK_CODE_LENGTH * mem::size_of::() * 2 + mem::size_of::(); // 75 KB -const MAX_RANGE_SIZE: usize = 100; // Download chunks in sub-chunks of 100 elements = 7.5 MB +const MAX_RANGE_SIZE: usize = 200; // Download chunks in sub-chunks of 100 elements = 7.5 MB pub struct S3StoredIris { #[allow(dead_code)] diff --git a/iris-mpc/src/bin/server.rs b/iris-mpc/src/bin/server.rs index d818d896d..358e95431 100644 --- a/iris-mpc/src/bin/server.rs +++ b/iris-mpc/src/bin/server.rs @@ -5,7 +5,7 @@ use aws_sdk_s3::{config::Builder as S3ConfigBuilder, Client as S3Client}; use aws_sdk_sns::{types::MessageAttributeValue, Client as SNSClient}; use aws_sdk_sqs::{config::Region, Client}; use aws_smithy_experimental::hyper_1_0::{CryptoMode, HyperClientBuilder}; -use aws_smithy_runtime_api::client::dns::{DnsFuture, ResolveDns, ResolveDnsError}; +use aws_smithy_runtime_api::client::dns::{DnsFuture, ResolveDns}; use axum::{response::IntoResponse, routing::get, Router}; use clap::Parser; use eyre::{eyre, Context}; @@ -60,7 +60,7 @@ use std::{ net::IpAddr, panic, sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, LazyLock, Mutex, }, time, @@ -672,66 +672,95 @@ async fn main() -> eyre::Result<()> { } struct StaticResolver { - resolver: Arc, + ips: Arc>, + current: Arc, } impl StaticResolver { - fn new() -> Self { - let resolver = - TokioAsyncResolver::tokio(ResolverConfig::default(), ResolverOpts::default()); - StaticResolver { - resolver: Arc::new(resolver), + fn new(ips: Vec) -> Self { + assert!( + !ips.is_empty(), + "StaticResolver requires at least one IP address." + ); + Self { + ips: Arc::new(ips), + current: Arc::new(AtomicUsize::new(0)), } } } impl Debug for StaticResolver { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + // Load the current index atomically + let current_index = self.current.load(Ordering::SeqCst); f.debug_struct("StaticResolver") - .field("resolver", &"Arc") + .field("ips", &self.ips) + .field("current_index", ¤t_index) .finish() } } impl Clone for StaticResolver { fn clone(&self) -> Self { - StaticResolver { - resolver: Arc::clone(&self.resolver), + Self { + ips: Arc::clone(&self.ips), + current: Arc::clone(&self.current), } } } impl ResolveDns for StaticResolver { fn resolve_dns<'a>(&'a self, _name: &'a str) -> DnsFuture<'a> { - let resolver = Arc::clone(&self.resolver); - let hostname = _name.to_string(); - - // Create the async block that performs DNS resolution + let current_index = self.current.fetch_add(1, Ordering::SeqCst); + let index = current_index % self.ips.len(); + let selected_ip = self.ips[index]; let future = async move { - match resolver.lookup_ip(&hostname).await { - Ok(lookup_result) => { - let ips: Vec = lookup_result.iter().collect(); - tracing::info!("Resolved host {} to {:?}", hostname, ips); - Ok(ips) - } - Err(e) => Err(ResolveDnsError::new(format!( - "Failed to resolve {}: {}", - hostname, e - ))), - } + tracing::info!("Returning IP {:?} for host {}", selected_ip, _name); + Ok(vec![selected_ip]) }; - - // Wrap the future into DnsFuture DnsFuture::new(Box::pin(future)) } } +async fn resolve_export_bucket_ips(host: String) -> eyre::Result> { + let mut all_ips = vec![]; + let mut resolver_opts = ResolverOpts::default(); + resolver_opts.positive_max_ttl = Some(time::Duration::from_millis(10)); + let resolver = TokioAsyncResolver::tokio(ResolverConfig::default(), resolver_opts); + loop { + // Check if we've collected enough unique IPs + if all_ips.len() >= 10 { + break; + } + match resolver.lookup_ip(&host).await { + Ok(lookup_result) => { + let ips: Vec = lookup_result.iter().collect(); + tracing::info!("Resolved {:?} for host {}", ips, host); + for ip in lookup_result.iter() { + // Attempt to insert the IP into the HashSet + if !all_ips.contains(&ip) { + all_ips.push(ip); + tracing::info!("Added IP {:?} for host {}", ip, host); + } + } + } + Err(e) => { + tracing::error!("Failed to resolve host {}: {}", host, e); + } + } + tokio::time::sleep(Duration::from_millis(20)).await; + } + Ok(all_ips) +} + async fn server_main(config: Config) -> eyre::Result<()> { let shutdown_handler = Arc::new(ShutdownHandler::new( config.shutdown_last_results_sync_timeout_secs, )); shutdown_handler.wait_for_shutdown_signal().await; + let shares_bucket_host = format!("{}.s3.{}.amazonaws.com", config.shares_bucket_name, REGION); + let shares_bucket_ips = resolve_export_bucket_ips(shares_bucket_host); // Load batch_size config *CURRENT_BATCH_SIZE.lock().unwrap() = config.max_batch_size; let max_sync_lookback: usize = config.max_batch_size * 2; @@ -753,7 +782,8 @@ async fn server_main(config: Config) -> eyre::Result<()> { // Increase S3 retries to 5 // let resolver = Resolver::new(ResolverConfig::default(), // ResolverOpts::default()).unwrap(); - let static_resolver = StaticResolver::new(); + + let static_resolver = StaticResolver::new(shares_bucket_ips.await?); let client = HyperClientBuilder::new() .crypto_mode(CryptoMode::Ring) .build_with_resolver(static_resolver);