Skip to content

Commit

Permalink
feat: add ats prewarm to pegboard
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterPtato committed Jan 10, 2025
1 parent 8d5d663 commit 6b8219e
Show file tree
Hide file tree
Showing 17 changed files with 257 additions and 105 deletions.
1 change: 1 addition & 0 deletions packages/api/actor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ game-namespace-get.workspace = true
game-namespace-resolve-name-id.workspace = true
game-resolve-name-id.workspace = true
game-version-get.workspace = true
pegboard.workspace = true
region-recommend.workspace = true
rivet-config.workspace = true
rivet-env.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion packages/api/actor/src/route/actors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,7 @@ async fn list_actors_inner(
_watch_index: WatchIndexQuery,
query: ListQuery,
) -> GlobalResult<models::ActorListActorsResponse> {
let CheckOutput { game_id, env_id } = ctx
let CheckOutput { env_id, .. } = ctx
.auth()
.check(
ctx.op_ctx(),
Expand Down
91 changes: 66 additions & 25 deletions packages/api/actor/src/route/builds.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::HashMap;

use api_helper::{anchor::WatchIndexQuery, ctx::Ctx};
use futures_util::{StreamExt, TryStreamExt};
use rivet_api::models;
use rivet_convert::{ApiInto, ApiTryInto};
use rivet_operation::prelude::*;
Expand Down Expand Up @@ -34,25 +35,25 @@ pub async fn get(
)
.await?;

let builds_res = op!([ctx] build_get {
build_ids: vec![build_id.into()],
})
.await?;
let builds_res = ctx
.op(build::ops::get::Input {
build_ids: vec![build_id],
})
.await?;
let build = unwrap_with!(builds_res.builds.first(), BUILD_NOT_FOUND);
ensure_with!(unwrap!(build.env_id).as_uuid() == env_id, BUILD_NOT_FOUND);
ensure_with!(
unwrap_with!(build.env_id, BUILD_NOT_FOUND) == env_id,
BUILD_NOT_FOUND
);

let uploads_res = op!([ctx] upload_get {
upload_ids: builds_res
.builds
.iter()
.filter_map(|build| build.upload_id)
.collect::<Vec<_>>(),
upload_ids: vec![build.upload_id.into()],
})
.await?;
let upload = unwrap!(uploads_res.uploads.first());

let build = models::ActorBuild {
id: unwrap!(build.build_id).as_uuid(),
id: build.build_id,
name: build.display_name.clone(),
created_at: timestamp::to_string(build.create_ts)?,
content_length: upload.content_length.api_try_into()?,
Expand Down Expand Up @@ -253,14 +254,17 @@ pub async fn patch_tags(
}
}

let build_res = ctx
let builds_res = ctx
.op(build::ops::get::Input {
build_ids: vec![build_id],
})
.await?;
let build = unwrap_with!(build_res.builds.first(), BUILD_NOT_FOUND);
let build = unwrap_with!(builds_res.builds.first(), BUILD_NOT_FOUND);

ensure_with!(unwrap!(build.env_id) == env_id, BUILD_NOT_FOUND);
ensure_with!(
unwrap_with!(build.env_id, BUILD_NOT_FOUND) == env_id,
BUILD_NOT_FOUND
);

ctx.op(build::ops::patch_tags::Input {
build_id,
Expand Down Expand Up @@ -366,7 +370,7 @@ pub async fn create_build_deprecated(
body: models::ServersCreateBuildRequest,
) -> GlobalResult<models::ServersCreateBuildResponse> {
let global = build_global_query_compat(&ctx, game_id, env_id).await?;
let build_res = create_build(
let builds_res = create_build(
ctx,
models::ActorPrepareBuildRequest {
compression: body.compression.map(|c| match c {
Expand All @@ -388,18 +392,18 @@ pub async fn create_build_deprecated(

let (image_presigned_request, image_presigned_requests) = if !multipart_upload {
(
Some(Box::new(unwrap!(build_res
Some(Box::new(unwrap!(builds_res
.presigned_requests
.into_iter()
.next()))),
None,
)
} else {
(None, Some(build_res.presigned_requests))
(None, Some(builds_res.presigned_requests))
};

Ok(models::ServersCreateBuildResponse {
build: build_res.build,
build: builds_res.build,
image_presigned_request,
image_presigned_requests,
})
Expand All @@ -424,20 +428,57 @@ pub async fn complete_build(
)
.await?;

let build_res = op!([ctx] build_get {
build_ids: vec![build_id.into()],
})
.await?;
let build = unwrap_with!(build_res.builds.first(), BUILD_NOT_FOUND);
let builds_res = ctx
.op(build::ops::get::Input {
build_ids: vec![build_id],
})
.await?;
let build = unwrap_with!(builds_res.builds.first(), BUILD_NOT_FOUND);

ensure_with!(unwrap!(build.env_id).as_uuid() == env_id, BUILD_NOT_FOUND);
ensure_with!(
unwrap_with!(build.env_id, BUILD_NOT_FOUND) == env_id,
BUILD_NOT_FOUND
);

op!([ctx] @dont_log_body upload_complete {
upload_id: build.upload_id,
upload_id: Some(build.upload_id.into()),
bucket: None,
})
.await?;

// Prewarm all datacenters for pegboard
{
let default_cluster_id = ctx.config().server()?.rivet.default_cluster_id()?;

let datacenters_res = ctx
.op(cluster::ops::datacenter::list::Input {
cluster_ids: vec![default_cluster_id],
})
.await?;
let cluster = unwrap!(datacenters_res.clusters.first());

futures_util::stream::iter(cluster.datacenter_ids.iter().cloned())
.map(|datacenter_id| {
let ctx = ctx.clone();
async move {
ctx.signal(pegboard::workflows::PrewarmImage {
image_id: build_id,
image_artifact_url_stub: ds::util::image_artifact_url_stub(
ctx.config(),
build.upload_id,
&build::utils::file_name(build.kind, build.compression),
)?,
})
.tag("datacenter_id", datacenter_id)
.send()
.await
}
})
.buffer_unordered(16)
.try_collect::<Vec<_>>()
.await?;
}

Ok(json!({}))
}

Expand Down
2 changes: 2 additions & 0 deletions packages/common/chirp-workflow/core/src/ctx/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use crate::{
workflow::{Workflow, WorkflowInput},
};

// NOTE: Clonable because of inner arcs
#[derive(Clone)]
pub struct ApiCtx {
ray_id: Uuid,
name: String,
Expand Down
62 changes: 7 additions & 55 deletions packages/infra/client/manager/src/actor/setup.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,22 @@
use std::{
collections::HashMap,
hash::{DefaultHasher, Hasher},
path::{Path, PathBuf},
process::Stdio,
result::Result::{Err, Ok},
};

use anyhow::*;
use futures_util::Stream;
use futures_util::StreamExt;
use indoc::indoc;
use pegboard::protocol;
use pegboard_config::isolate_runner::actor as actor_config;
use rand::Rng;
use rand::{prelude::SliceRandom, SeedableRng};
use serde_json::json;
use tokio::{
fs::{self, File},
io::{AsyncReadExt, AsyncWriteExt},
process::Command,
};
use url::Url;
use uuid::Uuid;

use super::{oci_config, Actor};
Expand Down Expand Up @@ -71,63 +67,19 @@ impl Actor {
Ok(())
}

/// Deterministically shuffles a list of available ATS URL's to download the image from based on the image
// ID and attempts to download from each URL until success.
async fn fetch_image_stream(
&self,
ctx: &Ctx,
) -> Result<impl Stream<Item = reqwest::Result<bytes::Bytes>>> {
// Get hash from image id
let mut hasher = DefaultHasher::new();
hasher.write(self.config.image.id.as_bytes());
let hash = hasher.finish();

let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(hash);

// Shuffle based on hash
let mut addresses = ctx
.pull_addr_handler
.addresses(ctx.config())
.await?
.iter()
.map(|addr| {
Ok(
Url::parse(&format!("{addr}{}", self.config.image.artifact_url_stub))
.context("failed to build artifact url")?
.to_string(),
)
})
.collect::<Result<Vec<_>>>()?;
addresses.shuffle(&mut rng);

// Add fallback url to the end if one is set
if let Some(fallback_artifact_url) = &self.config.image.fallback_artifact_url {
addresses.push(fallback_artifact_url.clone());
}

let mut iter = addresses.into_iter();
while let Some(artifact_url) = iter.next() {
match reqwest::get(&artifact_url)
.await
.and_then(|res| res.error_for_status())
{
Ok(res) => return Ok(res.bytes_stream()),
Err(err) => {
tracing::warn!(actor_id=?self.actor_id, "failed to start download from {artifact_url}: {err}");
}
}
}

bail!("artifact url could not be resolved");
}

pub async fn download_image(&self, ctx: &Ctx) -> Result<()> {
tracing::info!(actor_id=?self.actor_id, "downloading artifact");

let actor_path = ctx.actor_path(self.actor_id);
let fs_path = actor_path.join("fs");

let mut stream = self.fetch_image_stream(ctx).await?;
let mut stream = utils::fetch_image_stream(
ctx,
self.config.image.id,
&self.config.image.artifact_url_stub,
self.config.image.fallback_artifact_url.as_deref(),
)
.await?;

match self.config.image.kind {
protocol::ImageKind::DockerImage => {
Expand Down
13 changes: 10 additions & 3 deletions packages/infra/client/manager/src/ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,16 @@ impl Ctx {
self.process_command(command).await?;
}
}
protocol::ToClient::FetchStateRequest {} => todo!(),
protocol::ToClient::PrewarmImage {
image_id,
image_artifact_url_stub,
} => {
let self2 = self.clone();

tokio::spawn(async move {
utils::prewarm_image(&self2, image_id, &image_artifact_url_stub).await
});
}
}

Ok(())
Expand Down Expand Up @@ -521,8 +530,6 @@ impl Ctx {
let actor = actor.clone();
let self2 = self.clone();
tokio::spawn(async move {
use std::result::Result::Err;

if let Err(err) = actor.observe(&self2).await {
tracing::error!(actor_id=?row.actor_id, ?err, "observe failed");
}
Expand Down
4 changes: 2 additions & 2 deletions packages/infra/client/manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ mod ctx;
#[cfg(feature = "test")]
pub mod event_sender;
#[cfg(feature = "test")]
pub mod pull_addr_handler;
#[cfg(feature = "test")]
mod metrics;
#[cfg(feature = "test")]
pub mod pull_addr_handler;
#[cfg(feature = "test")]
mod runner;
#[cfg(feature = "test")]
pub mod system_info;
Expand Down
2 changes: 1 addition & 1 deletion packages/infra/client/manager/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ mod utils;

use ctx::Ctx;

const PROTOCOL_VERSION: u16 = 1;
const PROTOCOL_VERSION: u16 = 2;

#[derive(Clone)]
struct Init {
Expand Down
Loading

0 comments on commit 6b8219e

Please sign in to comment.