diff --git a/packages/api/actor/Cargo.toml b/packages/api/actor/Cargo.toml index af149d993..3a18e4413 100644 --- a/packages/api/actor/Cargo.toml +++ b/packages/api/actor/Cargo.toml @@ -44,6 +44,7 @@ game-namespace-get.workspace = true game-namespace-resolve-name-id.workspace = true game-resolve-name-id.workspace = true game-version-get.workspace = true +pegboard.workspace = true region-recommend.workspace = true rivet-config.workspace = true rivet-env.workspace = true diff --git a/packages/api/actor/src/route/actors.rs b/packages/api/actor/src/route/actors.rs index 4bdde8dfd..476359d85 100644 --- a/packages/api/actor/src/route/actors.rs +++ b/packages/api/actor/src/route/actors.rs @@ -670,7 +670,7 @@ async fn list_actors_inner( _watch_index: WatchIndexQuery, query: ListQuery, ) -> GlobalResult { - let CheckOutput { game_id, env_id } = ctx + let CheckOutput { env_id, .. } = ctx .auth() .check( ctx.op_ctx(), diff --git a/packages/api/actor/src/route/builds.rs b/packages/api/actor/src/route/builds.rs index 673bbaccc..55fd5ae36 100644 --- a/packages/api/actor/src/route/builds.rs +++ b/packages/api/actor/src/route/builds.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use api_helper::{anchor::WatchIndexQuery, ctx::Ctx}; +use futures_util::{StreamExt, TryStreamExt}; use rivet_api::models; use rivet_convert::{ApiInto, ApiTryInto}; use rivet_operation::prelude::*; @@ -34,25 +35,25 @@ pub async fn get( ) .await?; - let builds_res = op!([ctx] build_get { - build_ids: vec![build_id.into()], - }) - .await?; + let builds_res = ctx + .op(build::ops::get::Input { + build_ids: vec![build_id], + }) + .await?; let build = unwrap_with!(builds_res.builds.first(), BUILD_NOT_FOUND); - ensure_with!(unwrap!(build.env_id).as_uuid() == env_id, BUILD_NOT_FOUND); + ensure_with!( + unwrap_with!(build.env_id, BUILD_NOT_FOUND) == env_id, + BUILD_NOT_FOUND + ); let uploads_res = op!([ctx] upload_get { - upload_ids: builds_res - .builds - .iter() - .filter_map(|build| build.upload_id) - .collect::>(), + upload_ids: vec![build.upload_id.into()], }) .await?; let upload = unwrap!(uploads_res.uploads.first()); let build = models::ActorBuild { - id: unwrap!(build.build_id).as_uuid(), + id: build.build_id, name: build.display_name.clone(), created_at: timestamp::to_string(build.create_ts)?, content_length: upload.content_length.api_try_into()?, @@ -253,14 +254,17 @@ pub async fn patch_tags( } } - let build_res = ctx + let builds_res = ctx .op(build::ops::get::Input { build_ids: vec![build_id], }) .await?; - let build = unwrap_with!(build_res.builds.first(), BUILD_NOT_FOUND); + let build = unwrap_with!(builds_res.builds.first(), BUILD_NOT_FOUND); - ensure_with!(unwrap!(build.env_id) == env_id, BUILD_NOT_FOUND); + ensure_with!( + unwrap_with!(build.env_id, BUILD_NOT_FOUND) == env_id, + BUILD_NOT_FOUND + ); ctx.op(build::ops::patch_tags::Input { build_id, @@ -366,7 +370,7 @@ pub async fn create_build_deprecated( body: models::ServersCreateBuildRequest, ) -> GlobalResult { let global = build_global_query_compat(&ctx, game_id, env_id).await?; - let build_res = create_build( + let builds_res = create_build( ctx, models::ActorPrepareBuildRequest { compression: body.compression.map(|c| match c { @@ -388,18 +392,18 @@ pub async fn create_build_deprecated( let (image_presigned_request, image_presigned_requests) = if !multipart_upload { ( - Some(Box::new(unwrap!(build_res + Some(Box::new(unwrap!(builds_res .presigned_requests .into_iter() .next()))), None, ) } else { - (None, Some(build_res.presigned_requests)) + (None, Some(builds_res.presigned_requests)) }; Ok(models::ServersCreateBuildResponse { - build: build_res.build, + build: builds_res.build, image_presigned_request, image_presigned_requests, }) @@ -424,20 +428,57 @@ pub async fn complete_build( ) .await?; - let build_res = op!([ctx] build_get { - build_ids: vec![build_id.into()], - }) - .await?; - let build = unwrap_with!(build_res.builds.first(), BUILD_NOT_FOUND); + let builds_res = ctx + .op(build::ops::get::Input { + build_ids: vec![build_id], + }) + .await?; + let build = unwrap_with!(builds_res.builds.first(), BUILD_NOT_FOUND); - ensure_with!(unwrap!(build.env_id).as_uuid() == env_id, BUILD_NOT_FOUND); + ensure_with!( + unwrap_with!(build.env_id, BUILD_NOT_FOUND) == env_id, + BUILD_NOT_FOUND + ); op!([ctx] @dont_log_body upload_complete { - upload_id: build.upload_id, + upload_id: Some(build.upload_id.into()), bucket: None, }) .await?; + // Prewarm all datacenters for pegboard + { + let default_cluster_id = ctx.config().server()?.rivet.default_cluster_id()?; + + let datacenters_res = ctx + .op(cluster::ops::datacenter::list::Input { + cluster_ids: vec![default_cluster_id], + }) + .await?; + let cluster = unwrap!(datacenters_res.clusters.first()); + + futures_util::stream::iter(cluster.datacenter_ids.iter().cloned()) + .map(|datacenter_id| { + let ctx = ctx.clone(); + async move { + ctx.signal(pegboard::workflows::PrewarmImage { + image_id: build_id, + image_artifact_url_stub: ds::util::image_artifact_url_stub( + ctx.config(), + build.upload_id, + &build::utils::file_name(build.kind, build.compression), + )?, + }) + .tag("datacenter_id", datacenter_id) + .send() + .await + } + }) + .buffer_unordered(16) + .try_collect::>() + .await?; + } + Ok(json!({})) } diff --git a/packages/common/chirp-workflow/core/src/ctx/api.rs b/packages/common/chirp-workflow/core/src/ctx/api.rs index 45620a03b..c04a4c289 100644 --- a/packages/common/chirp-workflow/core/src/ctx/api.rs +++ b/packages/common/chirp-workflow/core/src/ctx/api.rs @@ -17,6 +17,8 @@ use crate::{ workflow::{Workflow, WorkflowInput}, }; +// NOTE: Clonable because of inner arcs +#[derive(Clone)] pub struct ApiCtx { ray_id: Uuid, name: String, diff --git a/packages/infra/client/manager/src/actor/setup.rs b/packages/infra/client/manager/src/actor/setup.rs index c0fc24999..bb3410a40 100644 --- a/packages/infra/client/manager/src/actor/setup.rs +++ b/packages/infra/client/manager/src/actor/setup.rs @@ -1,26 +1,22 @@ use std::{ collections::HashMap, - hash::{DefaultHasher, Hasher}, path::{Path, PathBuf}, process::Stdio, result::Result::{Err, Ok}, }; use anyhow::*; -use futures_util::Stream; use futures_util::StreamExt; use indoc::indoc; use pegboard::protocol; use pegboard_config::isolate_runner::actor as actor_config; use rand::Rng; -use rand::{prelude::SliceRandom, SeedableRng}; use serde_json::json; use tokio::{ fs::{self, File}, io::{AsyncReadExt, AsyncWriteExt}, process::Command, }; -use url::Url; use uuid::Uuid; use super::{oci_config, Actor}; @@ -71,63 +67,19 @@ impl Actor { Ok(()) } - /// Deterministically shuffles a list of available ATS URL's to download the image from based on the image - // ID and attempts to download from each URL until success. - async fn fetch_image_stream( - &self, - ctx: &Ctx, - ) -> Result>> { - // Get hash from image id - let mut hasher = DefaultHasher::new(); - hasher.write(self.config.image.id.as_bytes()); - let hash = hasher.finish(); - - let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(hash); - - // Shuffle based on hash - let mut addresses = ctx - .pull_addr_handler - .addresses(ctx.config()) - .await? - .iter() - .map(|addr| { - Ok( - Url::parse(&format!("{addr}{}", self.config.image.artifact_url_stub)) - .context("failed to build artifact url")? - .to_string(), - ) - }) - .collect::>>()?; - addresses.shuffle(&mut rng); - - // Add fallback url to the end if one is set - if let Some(fallback_artifact_url) = &self.config.image.fallback_artifact_url { - addresses.push(fallback_artifact_url.clone()); - } - - let mut iter = addresses.into_iter(); - while let Some(artifact_url) = iter.next() { - match reqwest::get(&artifact_url) - .await - .and_then(|res| res.error_for_status()) - { - Ok(res) => return Ok(res.bytes_stream()), - Err(err) => { - tracing::warn!(actor_id=?self.actor_id, "failed to start download from {artifact_url}: {err}"); - } - } - } - - bail!("artifact url could not be resolved"); - } - pub async fn download_image(&self, ctx: &Ctx) -> Result<()> { tracing::info!(actor_id=?self.actor_id, "downloading artifact"); let actor_path = ctx.actor_path(self.actor_id); let fs_path = actor_path.join("fs"); - let mut stream = self.fetch_image_stream(ctx).await?; + let mut stream = utils::fetch_image_stream( + ctx, + self.config.image.id, + &self.config.image.artifact_url_stub, + self.config.image.fallback_artifact_url.as_deref(), + ) + .await?; match self.config.image.kind { protocol::ImageKind::DockerImage => { diff --git a/packages/infra/client/manager/src/ctx.rs b/packages/infra/client/manager/src/ctx.rs index b17e33334..2ce6e71b7 100644 --- a/packages/infra/client/manager/src/ctx.rs +++ b/packages/infra/client/manager/src/ctx.rs @@ -319,7 +319,16 @@ impl Ctx { self.process_command(command).await?; } } - protocol::ToClient::FetchStateRequest {} => todo!(), + protocol::ToClient::PrewarmImage { + image_id, + image_artifact_url_stub, + } => { + let self2 = self.clone(); + + tokio::spawn(async move { + utils::prewarm_image(&self2, image_id, &image_artifact_url_stub).await + }); + } } Ok(()) @@ -521,8 +530,6 @@ impl Ctx { let actor = actor.clone(); let self2 = self.clone(); tokio::spawn(async move { - use std::result::Result::Err; - if let Err(err) = actor.observe(&self2).await { tracing::error!(actor_id=?row.actor_id, ?err, "observe failed"); } diff --git a/packages/infra/client/manager/src/lib.rs b/packages/infra/client/manager/src/lib.rs index 26dec1575..e2dd40510 100644 --- a/packages/infra/client/manager/src/lib.rs +++ b/packages/infra/client/manager/src/lib.rs @@ -9,10 +9,10 @@ mod ctx; #[cfg(feature = "test")] pub mod event_sender; #[cfg(feature = "test")] -pub mod pull_addr_handler; -#[cfg(feature = "test")] mod metrics; #[cfg(feature = "test")] +pub mod pull_addr_handler; +#[cfg(feature = "test")] mod runner; #[cfg(feature = "test")] pub mod system_info; diff --git a/packages/infra/client/manager/src/main.rs b/packages/infra/client/manager/src/main.rs index 6bfb96d2f..e3b7f3d6b 100644 --- a/packages/infra/client/manager/src/main.rs +++ b/packages/infra/client/manager/src/main.rs @@ -27,7 +27,7 @@ mod utils; use ctx::Ctx; -const PROTOCOL_VERSION: u16 = 1; +const PROTOCOL_VERSION: u16 = 2; #[derive(Clone)] struct Init { diff --git a/packages/infra/client/manager/src/utils/mod.rs b/packages/infra/client/manager/src/utils/mod.rs index c075c9195..e94fe49c9 100644 --- a/packages/infra/client/manager/src/utils/mod.rs +++ b/packages/infra/client/manager/src/utils/mod.rs @@ -1,16 +1,21 @@ use std::{ + hash::{DefaultHasher, Hasher}, net::Ipv4Addr, path::Path, + result::Result::{Err, Ok}, time::{self, Duration}, }; use anyhow::*; +use futures_util::Stream; use indoc::indoc; use notify::{ event::{AccessKind, AccessMode}, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher, }; use pegboard::protocol; +use pegboard_config::{Addresses, Config}; +use rand::{prelude::SliceRandom, SeedableRng}; use serde::Deserialize; use sqlx::{ migrate::MigrateDatabase, @@ -21,8 +26,10 @@ use tokio::{ fs, sync::mpsc::{channel, Receiver}, }; +use url::Url; +use uuid::Uuid; -use pegboard_config::{Addresses, Config}; +use crate::ctx::Ctx; pub mod sql; @@ -227,13 +234,13 @@ pub async fn init_sqlite_schema(pool: &SqlitePool) -> Result<()> { } #[derive(Deserialize)] -pub(crate) struct ApiResponse { - pub(crate) servers: Vec, +pub struct ApiResponse { + pub servers: Vec, } #[derive(Deserialize)] -pub(crate) struct ApiServer { - pub(crate) lan_ip: Option, +pub struct ApiServer { + pub lan_ip: Option, } pub async fn init_fdb_config(config: &Config) -> Result<()> { @@ -328,3 +335,68 @@ pub async fn wait_for_write>(path: P) -> Result<()> { Ok(()) } + +/// Deterministically shuffles a list of available ATS URL's to download the image from based on the image +// ID and attempts to download from each URL until success. +pub async fn fetch_image_stream( + ctx: &Ctx, + image_id: Uuid, + image_artifact_url_stub: &str, + image_fallback_artifact_url: Option<&str>, +) -> Result>> { + // Get hash from image id + let mut hasher = DefaultHasher::new(); + hasher.write(image_id.as_bytes()); + let hash = hasher.finish(); + + let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(hash); + + // Shuffle based on hash + let mut addresses = ctx + .pull_addr_handler + .addresses(ctx.config()) + .await? + .iter() + .map(|addr| { + Ok(Url::parse(&format!("{addr}{}", image_artifact_url_stub)) + .context("failed to build artifact url")? + .to_string()) + }) + .collect::>>()?; + addresses.shuffle(&mut rng); + + // Add fallback url to the end if one is set + if let Some(fallback_artifact_url) = image_fallback_artifact_url { + addresses.push(fallback_artifact_url.to_string()); + } + + let mut iter = addresses.into_iter(); + while let Some(artifact_url) = iter.next() { + match reqwest::get(&artifact_url) + .await + .and_then(|res| res.error_for_status()) + { + Ok(res) => return Ok(res.bytes_stream()), + Err(err) => { + tracing::warn!( + ?image_id, + "failed to start download from {artifact_url}: {err}" + ); + } + } + } + + bail!("artifact url could not be resolved"); +} + +pub async fn prewarm_image(ctx: &Ctx, image_id: Uuid, image_artifact_url_stub: &str) { + tracing::debug!(?image_id, "prewarming"); + + match fetch_image_stream(ctx, image_id, image_artifact_url_stub, None).await { + Ok(_) => tracing::debug!(?image_id, "prewarm complete"), + Err(_) => tracing::warn!( + ?image_id, + "prewarm failed, artifact url could not be resolved" + ), + } +} diff --git a/packages/infra/client/manager/tests/common.rs b/packages/infra/client/manager/tests/common.rs index a43f80484..72d88d426 100644 --- a/packages/infra/client/manager/tests/common.rs +++ b/packages/infra/client/manager/tests/common.rs @@ -31,7 +31,7 @@ use tracing_subscriber::prelude::*; use url::Url; use uuid::Uuid; -pub const PROTOCOL_VERSION: u16 = 1; +pub const PROTOCOL_VERSION: u16 = 2; pub const ARTIFACTS_PORT: u16 = 1234; pub async fn send_packet( diff --git a/packages/services/build/src/types.rs b/packages/services/build/src/types.rs index c25159f88..aa28d5500 100644 --- a/packages/services/build/src/types.rs +++ b/packages/services/build/src/types.rs @@ -47,7 +47,7 @@ pub struct Build { pub create_ts: i64, pub kind: BuildKind, pub compression: BuildCompression, - pub tags: HashMap>, + pub tags: HashMap, } // TODO: Move to upload pkg when its converted to new ops diff --git a/packages/services/ds/src/util/mod.rs b/packages/services/ds/src/util/mod.rs index 84be4fa77..fab76129f 100644 --- a/packages/services/ds/src/util/mod.rs +++ b/packages/services/ds/src/util/mod.rs @@ -64,6 +64,17 @@ pub fn build_ds_hostname_and_path( } } +pub fn image_artifact_url_stub( + config: &rivet_config::Config, + upload_id: Uuid, + file_name: &str, +) -> GlobalResult { + Ok(format!( + "/s3-cache/{namespace}-bucket-build/{upload_id}/{file_name}", + namespace = config.server()?.rivet.namespace, + )) +} + /// Formats the port label to be used in Nomad and Pegboard. /// /// Prefixing this port ensure that the user defined port names don't interfere diff --git a/packages/services/ds/src/workflows/server/pegboard/mod.rs b/packages/services/ds/src/workflows/server/pegboard/mod.rs index e75cfce31..93132a6ba 100644 --- a/packages/services/ds/src/workflows/server/pegboard/mod.rs +++ b/packages/services/ds/src/workflows/server/pegboard/mod.rs @@ -642,7 +642,11 @@ async fn resolve_artifacts( }; Ok(ResolveArtifactsOutput { - artifact_url_stub, + artifact_url_stub: crate::util::image_artifact_url_stub( + ctx.config(), + input.build_upload_id, + &input.build_file_name, + )?, fallback_artifact_url, }) } diff --git a/packages/services/pegboard/src/protocol.rs b/packages/services/pegboard/src/protocol.rs index d1ddfc254..7d012c33d 100644 --- a/packages/services/pegboard/src/protocol.rs +++ b/packages/services/pegboard/src/protocol.rs @@ -20,9 +20,14 @@ pub enum PegboardProtocolError { #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub enum ToClient { - Init { last_event_idx: i64 }, + Init { + last_event_idx: i64, + }, Commands(Vec), - FetchStateRequest {}, + PrewarmImage { + image_id: Uuid, + image_artifact_url_stub: String, + }, } impl ToClient { @@ -46,7 +51,6 @@ pub enum ToServer { system: crate::system_info::SystemInfo, }, Events(Vec), - FetchStateResponse {}, } impl ToServer { diff --git a/packages/services/pegboard/src/workflows/client.rs b/packages/services/pegboard/src/workflows/client.rs index 6a7f26088..5f6eaa8a2 100644 --- a/packages/services/pegboard/src/workflows/client.rs +++ b/packages/services/pegboard/src/workflows/client.rs @@ -4,7 +4,7 @@ use chirp_workflow::prelude::*; use futures_util::FutureExt; use nix::sys::signal::Signal; -use crate::{metrics, protocol}; +use crate::{metrics, protocol, workflows::PrewarmImage}; #[derive(Debug, Serialize, Deserialize)] pub struct Input { @@ -103,12 +103,22 @@ pub async fn pegboard_client(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResu } } } - protocol::ToServer::FetchStateResponse {} => todo!(), } } Main::Command(command) => { handle_commands(ctx, client_id, vec![command]).await?; } + Main::PrewarmImage(sig) => { + ctx.msg(ToWs { + client_id, + inner: protocol::ToClient::PrewarmImage { + image_id: sig.image_id, + image_artifact_url_stub: sig.image_artifact_url_stub, + }, + }) + .send() + .await?; + } Main::Drain(_) => { ctx.activity(SetDrainInput { client_id, @@ -633,6 +643,7 @@ join_signal!(Main { Command(protocol::Command), // Forwarded from the ws to this workflow Forward(protocol::ToServer), + PrewarmImage, Drain, Undrain, Destroy, diff --git a/packages/services/pegboard/src/workflows/datacenter.rs b/packages/services/pegboard/src/workflows/datacenter.rs index 893d2a942..12af825d3 100644 --- a/packages/services/pegboard/src/workflows/datacenter.rs +++ b/packages/services/pegboard/src/workflows/datacenter.rs @@ -1,7 +1,7 @@ use chirp_workflow::prelude::*; use futures_util::FutureExt; -use crate::protocol; +use crate::{protocol, workflows::PrewarmImage}; /// How long after last ping before not considering a client for allocation. const CLIENT_ELIGIBLE_THRESHOLD_MS: i64 = util::duration::seconds(10); @@ -17,8 +17,8 @@ pub async fn pegboard_datacenter(ctx: &mut WorkflowCtx, input: &Input) -> Global let datacenter_id = input.datacenter_id; async move { - match ctx.listen::().await? { - protocol::Command::StartActor { actor_id, config } => { + match ctx.listen::
().await? { + Main::Command(protocol::Command::StartActor { actor_id, config }) => { let client_id = ctx .activity(AllocateActorInput { datacenter_id, @@ -51,12 +51,12 @@ pub async fn pegboard_datacenter(ctx: &mut WorkflowCtx, input: &Input) -> Global .await?; } } - protocol::Command::SignalActor { + Main::Command(protocol::Command::SignalActor { actor_id, signal, persist_storage, ignore_future_state, - } => { + }) => { let client_id = ctx.activity(GetClientForActorInput { actor_id }).await?; if let Some(client_id) = client_id { @@ -77,6 +77,16 @@ pub async fn pegboard_datacenter(ctx: &mut WorkflowCtx, input: &Input) -> Global ); } } + Main::PrewarmImage(sig) => { + let client_id = ctx.activity(GetClientFromDcInput { datacenter_id }).await?; + + if let Some(client_id) = client_id { + // Forward signal to client + ctx.signal(sig).tag("client_id", client_id).send().await?; + } else { + tracing::error!(?datacenter_id, image_id=?sig.image_id, "failed to prewarm image"); + } + } } Ok(Loop::<()>::Continue) @@ -235,3 +245,32 @@ async fn get_client_for_actor( Ok(row.map(|(client_id,)| client_id)) } + +#[derive(Debug, Serialize, Deserialize, Hash)] +struct GetClientFromDcInput { + datacenter_id: Uuid, +} + +#[activity(GetClientFromDc)] +async fn get_client_from_dc( + ctx: &ActivityCtx, + input: &GetClientFromDcInput, +) -> GlobalResult> { + let row = sql_fetch_optional!( + [ctx, (Uuid,)] + " + SELECT client_id + FROM db_pegboard.actors + WHERE datacenter_id = $1 + ", + input.datacenter_id, + ) + .await?; + + Ok(row.map(|(client_id,)| client_id)) +} + +join_signal!(Main { + Command(protocol::Command), + PrewarmImage, +}); diff --git a/packages/services/pegboard/src/workflows/mod.rs b/packages/services/pegboard/src/workflows/mod.rs index b236b9de3..7391759e5 100644 --- a/packages/services/pegboard/src/workflows/mod.rs +++ b/packages/services/pegboard/src/workflows/mod.rs @@ -1,2 +1,10 @@ +use chirp_workflow::prelude::*; + pub mod client; pub mod datacenter; + +#[signal("pegboard_prewarm_image")] +pub struct PrewarmImage { + pub image_id: Uuid, + pub image_artifact_url_stub: String, +}