Skip to content

Commit

Permalink
fix: periodically pull ats addr
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterPtato committed Jan 13, 2025
1 parent 313b24b commit 58c589f
Show file tree
Hide file tree
Showing 11 changed files with 82 additions and 47 deletions.
5 changes: 0 additions & 5 deletions packages/infra/client/echo/.dockerignore

This file was deleted.

2 changes: 1 addition & 1 deletion packages/infra/client/echo/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ RUN \
cd packages/infra/client && \
RUSTFLAGS="--cfg tokio_unstable" cargo build --release --bin pegboard-echo-server && \
mkdir -p /app/dist && \
mv /app/packages/infra/client/target/x86_64-unknown-linux-musl/release/pegboard-echo-server /app/dist/pegboard-echo-server
mv /app/target/x86_64-unknown-linux-musl/release/pegboard-echo-server /app/dist/pegboard-echo-server

# Create non-root user and group
RUN useradd -m -d /home/nonroot -s /bin/sh nonroot
Expand Down
2 changes: 2 additions & 0 deletions packages/infra/client/echo/Dockerfile.dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@
!Cargo.toml
!packages
!resources/legacy/proto
!sdks/api/full/rust/Cargo.toml
!sdks/api/full/rust/src
4 changes: 3 additions & 1 deletion packages/infra/client/manager/src/actor/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ impl Actor {

// Shuffle based on hash
let mut addresses = ctx
.pull_addresses
.pull_addr_handler
.addresses(ctx.config())
.await?
.iter()
.map(|addr| {
Ok(
Expand Down
10 changes: 5 additions & 5 deletions packages/infra/client/manager/src/ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ use uuid::Uuid;
use crate::{
actor::Actor,
event_sender::EventSender,
metrics, runner,
metrics,
pull_addr_handler::PullAddrHandler,
runner,
utils::{self, sql::SqliteConnectionExt},
};

Expand Down Expand Up @@ -75,8 +77,7 @@ pub struct Ctx {
pool: SqlitePool,
tx: Mutex<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>,
event_sender: EventSender,
// Cached addresses (should be sorted beforehand)
pub(crate) pull_addresses: Vec<String>,
pub(crate) pull_addr_handler: PullAddrHandler,

pub(crate) actors: RwLock<HashMap<Uuid, Arc<Actor>>>,
isolate_runner: RwLock<Option<runner::Handle>>,
Expand All @@ -88,7 +89,6 @@ impl Ctx {
system: SystemInfo,
pool: SqlitePool,
tx: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
pull_addresses: Vec<String>,
) -> Arc<Self> {
Arc::new(Ctx {
config,
Expand All @@ -97,7 +97,7 @@ impl Ctx {
pool,
tx: Mutex::new(tx),
event_sender: EventSender::new(),
pull_addresses,
pull_addr_handler: PullAddrHandler::new(),

actors: RwLock::new(HashMap::new()),
isolate_runner: RwLock::new(None),
Expand Down
2 changes: 2 additions & 0 deletions packages/infra/client/manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ mod ctx;
#[cfg(feature = "test")]
pub mod event_sender;
#[cfg(feature = "test")]
pub mod pull_addr_handler;
#[cfg(feature = "test")]
mod metrics;
#[cfg(feature = "test")]
mod runner;
Expand Down
6 changes: 2 additions & 4 deletions packages/infra/client/manager/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod actor;
mod ctx;
mod event_sender;
mod metrics;
mod pull_addr_handler;
mod runner;
mod system_info;
mod utils;
Expand Down Expand Up @@ -191,9 +192,6 @@ async fn run(init: Init, first: bool) -> Result<()> {
// Start metrics server
let metrics_thread = tokio::spawn(metrics::run_standalone(init.config.client.metrics.port()));

// Fetch ATS ips
let pull_addresses = utils::fetch_pull_addresses(&init.config).await?;

tracing::info!("connecting to pegboard ws: {}", &init.url);

// Connect to WS
Expand All @@ -207,7 +205,7 @@ async fn run(init: Init, first: bool) -> Result<()> {

tracing::info!("connected to pegboard ws");

let ctx = Ctx::new(init.config, init.system, init.pool, tx, pull_addresses);
let ctx = Ctx::new(init.config, init.system, init.pool, tx);

tokio::try_join!(
async { metrics_thread.await?.map_err(Into::into) },
Expand Down
60 changes: 60 additions & 0 deletions packages/infra/client/manager/src/pull_addr_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use std::time::{Duration, Instant};

use crate::utils::ApiResponse;
use anyhow::*;
use pegboard_config::{Addresses, Client};
use tokio::sync::RwLock;

/// Duration between pulling addresses again.
const PULL_INTERVAL: Duration = Duration::from_secs(3 * 60);

pub struct PullAddrHandler {
last_pull: RwLock<Option<Instant>>,
addresses: RwLock<Vec<String>>,
}

impl PullAddrHandler {
pub fn new() -> Self {
PullAddrHandler {
last_pull: RwLock::new(None),
addresses: RwLock::new(Vec::new()),
}
}

pub async fn addresses(&self, client: &Client) -> Result<Vec<String>> {
match &*client.images.pull_addresses() {
Addresses::Dynamic { fetch_endpoint } => {
let mut last_pull_guard = self.last_pull.write().await;

if last_pull_guard
.map(|x| x.elapsed() > PULL_INTERVAL)
.unwrap_or(true)
{
let mut addr_guard = self.addresses.write().await;

let mut addresses = reqwest::get(fetch_endpoint.clone())
.await?
.error_for_status()?
.json::<ApiResponse>()
.await?
.servers
.into_iter()
.filter_map(|server| server.lan_ip)
.map(|vlan_ip| format!("http://{vlan_ip}:8080"))
.collect::<Vec<_>>();

// Always sort the addresses so the list is deterministic
addresses.sort();

*addr_guard = addresses.clone();
*last_pull_guard = Some(Instant::now());

Ok(addresses)
} else {
Ok(self.addresses.read().await.clone())
}
}
Addresses::Static(addresses) => Ok(addresses.clone()),
}
}
}
29 changes: 4 additions & 25 deletions packages/infra/client/manager/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,13 +227,13 @@ pub async fn init_sqlite_schema(pool: &SqlitePool) -> Result<()> {
}

#[derive(Deserialize)]
struct ApiResponse {
servers: Vec<ApiServer>,
pub(crate) struct ApiResponse {
pub(crate) servers: Vec<ApiServer>,
}

#[derive(Deserialize)]
struct ApiServer {
lan_ip: Option<Ipv4Addr>,
pub(crate) struct ApiServer {
pub(crate) lan_ip: Option<Ipv4Addr>,
}

pub async fn init_fdb_config(config: &Config) -> Result<()> {
Expand Down Expand Up @@ -272,27 +272,6 @@ pub async fn init_fdb_config(config: &Config) -> Result<()> {
Ok(())
}

pub async fn fetch_pull_addresses(config: &Config) -> Result<Vec<String>> {
let mut addresses = match &*config.client.images.pull_addresses() {
Addresses::Dynamic { fetch_endpoint } => reqwest::get(fetch_endpoint.clone())
.await?
.error_for_status()?
.json::<ApiResponse>()
.await?
.servers
.into_iter()
.filter_map(|server| server.lan_ip)
.map(|vlan_ip| format!("http://{vlan_ip}:8080"))
.collect::<Vec<_>>(),
Addresses::Static(addresses) => addresses.clone(),
};

// Always sort the addresses so the list is deterministic
addresses.sort();

Ok(addresses)
}

pub fn now() -> i64 {
time::SystemTime::now()
.duration_since(time::UNIX_EPOCH)
Expand Down
5 changes: 1 addition & 4 deletions packages/infra/client/manager/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,9 +337,6 @@ pub async fn start_client(
// Init FDB config files
utils::init_fdb_config(&config).await.unwrap();

// Fetch ATS addresses
let pull_addresses = utils::fetch_pull_addresses(&config).await.unwrap();

// Build WS connection URL
let mut url = Url::parse("ws://127.0.0.1").unwrap();
url.set_port(Some(port)).unwrap();
Expand All @@ -362,7 +359,7 @@ pub async fn start_client(

tracing::info!("connected");

let ctx = Ctx::new(config, system, pool, tx, pull_addresses);
let ctx = Ctx::new(config, system, pool, tx);

// Share reference
{
Expand Down
4 changes: 2 additions & 2 deletions packages/services/pegboard/standalone/gc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ const CLIENT_LOST_THRESHOLD_MS: i64 = util::duration::minutes(2);
/// How long to wait after creating and not receiving a starting state before forcibly stopping actor.
const ACTOR_START_THRESHOLD_MS: i64 = util::duration::seconds(30);
/// How long to wait after stopping and not receiving a stop state before manually setting actor as
/// stopped.
/// lost.
const ACTOR_STOP_THRESHOLD_MS: i64 = util::duration::seconds(30);
/// How long to wait after stopped and not receiving an exit state before manually setting actor as
/// exited.
/// lost.
const ACTOR_EXIT_THRESHOLD_MS: i64 = util::duration::seconds(5);

#[derive(sqlx::FromRow)]
Expand Down

0 comments on commit 58c589f

Please sign in to comment.