diff --git a/packages/infra/client/echo/.dockerignore b/packages/infra/client/echo/.dockerignore deleted file mode 100644 index 8c96672144..0000000000 --- a/packages/infra/client/echo/.dockerignore +++ /dev/null @@ -1,5 +0,0 @@ -/target -**/target -.dockerignore -Dockerfile - diff --git a/packages/infra/client/echo/Dockerfile b/packages/infra/client/echo/Dockerfile index 3f317cd993..c6a0198457 100644 --- a/packages/infra/client/echo/Dockerfile +++ b/packages/infra/client/echo/Dockerfile @@ -9,7 +9,7 @@ RUN \ cd packages/infra/client && \ RUSTFLAGS="--cfg tokio_unstable" cargo build --release --bin pegboard-echo-server && \ mkdir -p /app/dist && \ - mv /app/packages/infra/client/target/x86_64-unknown-linux-musl/release/pegboard-echo-server /app/dist/pegboard-echo-server + mv /app/target/x86_64-unknown-linux-musl/release/pegboard-echo-server /app/dist/pegboard-echo-server # Create non-root user and group RUN useradd -m -d /home/nonroot -s /bin/sh nonroot diff --git a/packages/infra/client/echo/Dockerfile.dockerignore b/packages/infra/client/echo/Dockerfile.dockerignore index cd737a51e2..0096277683 100644 --- a/packages/infra/client/echo/Dockerfile.dockerignore +++ b/packages/infra/client/echo/Dockerfile.dockerignore @@ -4,3 +4,5 @@ !Cargo.toml !packages !resources/legacy/proto +!sdks/api/full/rust/Cargo.toml +!sdks/api/full/rust/src diff --git a/packages/infra/client/manager/src/actor/setup.rs b/packages/infra/client/manager/src/actor/setup.rs index ddaf1703d8..c0fc249994 100644 --- a/packages/infra/client/manager/src/actor/setup.rs +++ b/packages/infra/client/manager/src/actor/setup.rs @@ -86,7 +86,9 @@ impl Actor { // Shuffle based on hash let mut addresses = ctx - .pull_addresses + .pull_addr_handler + .addresses(ctx.config()) + .await? .iter() .map(|addr| { Ok( diff --git a/packages/infra/client/manager/src/ctx.rs b/packages/infra/client/manager/src/ctx.rs index 0d3572a026..b17e33334c 100644 --- a/packages/infra/client/manager/src/ctx.rs +++ b/packages/infra/client/manager/src/ctx.rs @@ -37,7 +37,9 @@ use uuid::Uuid; use crate::{ actor::Actor, event_sender::EventSender, - metrics, runner, + metrics, + pull_addr_handler::PullAddrHandler, + runner, utils::{self, sql::SqliteConnectionExt}, }; @@ -75,8 +77,7 @@ pub struct Ctx { pool: SqlitePool, tx: Mutex>, Message>>, event_sender: EventSender, - // Cached addresses (should be sorted beforehand) - pub(crate) pull_addresses: Vec, + pub(crate) pull_addr_handler: PullAddrHandler, pub(crate) actors: RwLock>>, isolate_runner: RwLock>, @@ -88,7 +89,6 @@ impl Ctx { system: SystemInfo, pool: SqlitePool, tx: SplitSink>, Message>, - pull_addresses: Vec, ) -> Arc { Arc::new(Ctx { config, @@ -97,7 +97,7 @@ impl Ctx { pool, tx: Mutex::new(tx), event_sender: EventSender::new(), - pull_addresses, + pull_addr_handler: PullAddrHandler::new(), actors: RwLock::new(HashMap::new()), isolate_runner: RwLock::new(None), diff --git a/packages/infra/client/manager/src/lib.rs b/packages/infra/client/manager/src/lib.rs index 3d03a054a0..26dec15754 100644 --- a/packages/infra/client/manager/src/lib.rs +++ b/packages/infra/client/manager/src/lib.rs @@ -9,6 +9,8 @@ mod ctx; #[cfg(feature = "test")] pub mod event_sender; #[cfg(feature = "test")] +pub mod pull_addr_handler; +#[cfg(feature = "test")] mod metrics; #[cfg(feature = "test")] mod runner; diff --git a/packages/infra/client/manager/src/main.rs b/packages/infra/client/manager/src/main.rs index aa6d7c20ef..6bfb96d2fc 100644 --- a/packages/infra/client/manager/src/main.rs +++ b/packages/infra/client/manager/src/main.rs @@ -20,6 +20,7 @@ mod actor; mod ctx; mod event_sender; mod metrics; +mod pull_addr_handler; mod runner; mod system_info; mod utils; @@ -191,9 +192,6 @@ async fn run(init: Init, first: bool) -> Result<()> { // Start metrics server let metrics_thread = tokio::spawn(metrics::run_standalone(init.config.client.metrics.port())); - // Fetch ATS ips - let pull_addresses = utils::fetch_pull_addresses(&init.config).await?; - tracing::info!("connecting to pegboard ws: {}", &init.url); // Connect to WS @@ -207,7 +205,7 @@ async fn run(init: Init, first: bool) -> Result<()> { tracing::info!("connected to pegboard ws"); - let ctx = Ctx::new(init.config, init.system, init.pool, tx, pull_addresses); + let ctx = Ctx::new(init.config, init.system, init.pool, tx); tokio::try_join!( async { metrics_thread.await?.map_err(Into::into) }, diff --git a/packages/infra/client/manager/src/pull_addr_handler.rs b/packages/infra/client/manager/src/pull_addr_handler.rs new file mode 100644 index 0000000000..404d88de35 --- /dev/null +++ b/packages/infra/client/manager/src/pull_addr_handler.rs @@ -0,0 +1,60 @@ +use std::time::{Duration, Instant}; + +use crate::utils::ApiResponse; +use anyhow::*; +use pegboard_config::{Addresses, Client}; +use tokio::sync::RwLock; + +/// Duration between pulling addresses again. +const PULL_INTERVAL: Duration = Duration::from_secs(3 * 60 * 60); + +pub struct PullAddrHandler { + last_pull: RwLock>, + addresses: RwLock>, +} + +impl PullAddrHandler { + pub fn new() -> Self { + PullAddrHandler { + last_pull: RwLock::new(None), + addresses: RwLock::new(Vec::new()), + } + } + + pub async fn addresses(&self, client: &Client) -> Result> { + match &*client.images.pull_addresses() { + Addresses::Dynamic { fetch_endpoint } => { + let mut last_pull_guard = self.last_pull.write().await; + + if last_pull_guard + .map(|x| x.elapsed() > PULL_INTERVAL) + .unwrap_or(true) + { + let mut addr_guard = self.addresses.write().await; + + let mut addresses = reqwest::get(fetch_endpoint.clone()) + .await? + .error_for_status()? + .json::() + .await? + .servers + .into_iter() + .filter_map(|server| server.lan_ip) + .map(|vlan_ip| format!("http://{vlan_ip}:8080")) + .collect::>(); + + // Always sort the addresses so the list is deterministic + addresses.sort(); + + *addr_guard = addresses.clone(); + *last_pull_guard = Some(Instant::now()); + + Ok(addresses) + } else { + Ok(self.addresses.read().await.clone()) + } + } + Addresses::Static(addresses) => Ok(addresses.clone()), + } + } +} diff --git a/packages/infra/client/manager/src/utils/mod.rs b/packages/infra/client/manager/src/utils/mod.rs index b78a36bf14..c075c91959 100644 --- a/packages/infra/client/manager/src/utils/mod.rs +++ b/packages/infra/client/manager/src/utils/mod.rs @@ -227,13 +227,13 @@ pub async fn init_sqlite_schema(pool: &SqlitePool) -> Result<()> { } #[derive(Deserialize)] -struct ApiResponse { - servers: Vec, +pub(crate) struct ApiResponse { + pub(crate) servers: Vec, } #[derive(Deserialize)] -struct ApiServer { - lan_ip: Option, +pub(crate) struct ApiServer { + pub(crate) lan_ip: Option, } pub async fn init_fdb_config(config: &Config) -> Result<()> { @@ -272,27 +272,6 @@ pub async fn init_fdb_config(config: &Config) -> Result<()> { Ok(()) } -pub async fn fetch_pull_addresses(config: &Config) -> Result> { - let mut addresses = match &*config.client.images.pull_addresses() { - Addresses::Dynamic { fetch_endpoint } => reqwest::get(fetch_endpoint.clone()) - .await? - .error_for_status()? - .json::() - .await? - .servers - .into_iter() - .filter_map(|server| server.lan_ip) - .map(|vlan_ip| format!("http://{vlan_ip}:8080")) - .collect::>(), - Addresses::Static(addresses) => addresses.clone(), - }; - - // Always sort the addresses so the list is deterministic - addresses.sort(); - - Ok(addresses) -} - pub fn now() -> i64 { time::SystemTime::now() .duration_since(time::UNIX_EPOCH) diff --git a/packages/infra/client/manager/tests/common.rs b/packages/infra/client/manager/tests/common.rs index 802849093a..a43f80484f 100644 --- a/packages/infra/client/manager/tests/common.rs +++ b/packages/infra/client/manager/tests/common.rs @@ -337,9 +337,6 @@ pub async fn start_client( // Init FDB config files utils::init_fdb_config(&config).await.unwrap(); - // Fetch ATS addresses - let pull_addresses = utils::fetch_pull_addresses(&config).await.unwrap(); - // Build WS connection URL let mut url = Url::parse("ws://127.0.0.1").unwrap(); url.set_port(Some(port)).unwrap(); @@ -362,7 +359,7 @@ pub async fn start_client( tracing::info!("connected"); - let ctx = Ctx::new(config, system, pool, tx, pull_addresses); + let ctx = Ctx::new(config, system, pool, tx); // Share reference { diff --git a/packages/services/pegboard/standalone/gc/src/lib.rs b/packages/services/pegboard/standalone/gc/src/lib.rs index 7cfe1e9b56..126a69dad0 100644 --- a/packages/services/pegboard/standalone/gc/src/lib.rs +++ b/packages/services/pegboard/standalone/gc/src/lib.rs @@ -9,10 +9,10 @@ const CLIENT_LOST_THRESHOLD_MS: i64 = util::duration::minutes(2); /// How long to wait after creating and not receiving a starting state before forcibly stopping actor. const ACTOR_START_THRESHOLD_MS: i64 = util::duration::seconds(30); /// How long to wait after stopping and not receiving a stop state before manually setting actor as -/// stopped. +/// lost. const ACTOR_STOP_THRESHOLD_MS: i64 = util::duration::seconds(30); /// How long to wait after stopped and not receiving an exit state before manually setting actor as -/// exited. +/// lost. const ACTOR_EXIT_THRESHOLD_MS: i64 = util::duration::seconds(5); #[derive(sqlx::FromRow)]