Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add ats prewarm to pegboard #1816

Open
wants to merge 1 commit into
base: 01-09-fix_periodically_pull_ats_addr
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/api/actor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ game-namespace-get.workspace = true
game-namespace-resolve-name-id.workspace = true
game-resolve-name-id.workspace = true
game-version-get.workspace = true
pegboard.workspace = true
region-recommend.workspace = true
rivet-config.workspace = true
rivet-env.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion packages/api/actor/src/route/actors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,7 @@ async fn list_actors_inner(
_watch_index: WatchIndexQuery,
query: ListQuery,
) -> GlobalResult<models::ActorListActorsResponse> {
let CheckOutput { game_id, env_id } = ctx
let CheckOutput { env_id, .. } = ctx
.auth()
.check(
ctx.op_ctx(),
Expand Down
91 changes: 66 additions & 25 deletions packages/api/actor/src/route/builds.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::HashMap;

use api_helper::{anchor::WatchIndexQuery, ctx::Ctx};
use futures_util::{StreamExt, TryStreamExt};
use rivet_api::models;
use rivet_convert::{ApiInto, ApiTryInto};
use rivet_operation::prelude::*;
Expand Down Expand Up @@ -34,25 +35,25 @@ pub async fn get(
)
.await?;

let builds_res = op!([ctx] build_get {
build_ids: vec![build_id.into()],
})
.await?;
let builds_res = ctx
.op(build::ops::get::Input {
build_ids: vec![build_id],
})
.await?;
let build = unwrap_with!(builds_res.builds.first(), BUILD_NOT_FOUND);
ensure_with!(unwrap!(build.env_id).as_uuid() == env_id, BUILD_NOT_FOUND);
ensure_with!(
unwrap_with!(build.env_id, BUILD_NOT_FOUND) == env_id,
BUILD_NOT_FOUND
);

let uploads_res = op!([ctx] upload_get {
upload_ids: builds_res
.builds
.iter()
.filter_map(|build| build.upload_id)
.collect::<Vec<_>>(),
upload_ids: vec![build.upload_id.into()],
})
.await?;
let upload = unwrap!(uploads_res.uploads.first());

let build = models::ActorBuild {
id: unwrap!(build.build_id).as_uuid(),
id: build.build_id,
name: build.display_name.clone(),
created_at: timestamp::to_string(build.create_ts)?,
content_length: upload.content_length.api_try_into()?,
Expand Down Expand Up @@ -253,14 +254,17 @@ pub async fn patch_tags(
}
}

let build_res = ctx
let builds_res = ctx
.op(build::ops::get::Input {
build_ids: vec![build_id],
})
.await?;
let build = unwrap_with!(build_res.builds.first(), BUILD_NOT_FOUND);
let build = unwrap_with!(builds_res.builds.first(), BUILD_NOT_FOUND);

ensure_with!(unwrap!(build.env_id) == env_id, BUILD_NOT_FOUND);
ensure_with!(
unwrap_with!(build.env_id, BUILD_NOT_FOUND) == env_id,
BUILD_NOT_FOUND
);

ctx.op(build::ops::patch_tags::Input {
build_id,
Expand Down Expand Up @@ -366,7 +370,7 @@ pub async fn create_build_deprecated(
body: models::ServersCreateBuildRequest,
) -> GlobalResult<models::ServersCreateBuildResponse> {
let global = build_global_query_compat(&ctx, game_id, env_id).await?;
let build_res = create_build(
let builds_res = create_build(
ctx,
models::ActorPrepareBuildRequest {
compression: body.compression.map(|c| match c {
Expand All @@ -388,18 +392,18 @@ pub async fn create_build_deprecated(

let (image_presigned_request, image_presigned_requests) = if !multipart_upload {
(
Some(Box::new(unwrap!(build_res
Some(Box::new(unwrap!(builds_res
.presigned_requests
.into_iter()
.next()))),
None,
)
} else {
(None, Some(build_res.presigned_requests))
(None, Some(builds_res.presigned_requests))
};

Ok(models::ServersCreateBuildResponse {
build: build_res.build,
build: builds_res.build,
image_presigned_request,
image_presigned_requests,
})
Expand All @@ -424,20 +428,57 @@ pub async fn complete_build(
)
.await?;

let build_res = op!([ctx] build_get {
build_ids: vec![build_id.into()],
})
.await?;
let build = unwrap_with!(build_res.builds.first(), BUILD_NOT_FOUND);
let builds_res = ctx
.op(build::ops::get::Input {
build_ids: vec![build_id],
})
.await?;
let build = unwrap_with!(builds_res.builds.first(), BUILD_NOT_FOUND);

ensure_with!(unwrap!(build.env_id).as_uuid() == env_id, BUILD_NOT_FOUND);
ensure_with!(
unwrap_with!(build.env_id, BUILD_NOT_FOUND) == env_id,
BUILD_NOT_FOUND
);

op!([ctx] @dont_log_body upload_complete {
upload_id: build.upload_id,
upload_id: Some(build.upload_id.into()),
bucket: None,
})
.await?;

// Prewarm all datacenters for pegboard
{
let default_cluster_id = ctx.config().server()?.rivet.default_cluster_id()?;

let datacenters_res = ctx
.op(cluster::ops::datacenter::list::Input {
cluster_ids: vec![default_cluster_id],
})
.await?;
let cluster = unwrap!(datacenters_res.clusters.first());

futures_util::stream::iter(cluster.datacenter_ids.iter().cloned())
.map(|datacenter_id| {
let ctx = ctx.clone();
async move {
ctx.signal(pegboard::workflows::PrewarmImage {
image_id: build_id,
image_artifact_url_stub: ds::util::image_artifact_url_stub(
ctx.config(),
build.upload_id,
&build::utils::file_name(build.kind, build.compression),
)?,
})
.tag("datacenter_id", datacenter_id)
.send()
.await
}
})
.buffer_unordered(16)
.try_collect::<Vec<_>>()
.await?;
}

Ok(json!({}))
}

Expand Down
2 changes: 2 additions & 0 deletions packages/common/chirp-workflow/core/src/ctx/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use crate::{
workflow::{Workflow, WorkflowInput},
};

// NOTE: Clonable because of inner arcs
#[derive(Clone)]
pub struct ApiCtx {
ray_id: Uuid,
name: String,
Expand Down
62 changes: 7 additions & 55 deletions packages/infra/client/manager/src/actor/setup.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,22 @@
use std::{
collections::HashMap,
hash::{DefaultHasher, Hasher},
path::{Path, PathBuf},
process::Stdio,
result::Result::{Err, Ok},
};

use anyhow::*;
use futures_util::Stream;
use futures_util::StreamExt;
use indoc::indoc;
use pegboard::protocol;
use pegboard_config::isolate_runner::actor as actor_config;
use rand::Rng;
use rand::{prelude::SliceRandom, SeedableRng};
use serde_json::json;
use tokio::{
fs::{self, File},
io::{AsyncReadExt, AsyncWriteExt},
process::Command,
};
use url::Url;
use uuid::Uuid;

use super::{oci_config, Actor};
Expand Down Expand Up @@ -71,63 +67,19 @@ impl Actor {
Ok(())
}

/// Deterministically shuffles a list of available ATS URL's to download the image from based on the image
// ID and attempts to download from each URL until success.
async fn fetch_image_stream(
&self,
ctx: &Ctx,
) -> Result<impl Stream<Item = reqwest::Result<bytes::Bytes>>> {
// Get hash from image id
let mut hasher = DefaultHasher::new();
hasher.write(self.config.image.id.as_bytes());
let hash = hasher.finish();

let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(hash);

// Shuffle based on hash
let mut addresses = ctx
.pull_addr_handler
.addresses(ctx.config())
.await?
.iter()
.map(|addr| {
Ok(
Url::parse(&format!("{addr}{}", self.config.image.artifact_url_stub))
.context("failed to build artifact url")?
.to_string(),
)
})
.collect::<Result<Vec<_>>>()?;
addresses.shuffle(&mut rng);

// Add fallback url to the end if one is set
if let Some(fallback_artifact_url) = &self.config.image.fallback_artifact_url {
addresses.push(fallback_artifact_url.clone());
}

let mut iter = addresses.into_iter();
while let Some(artifact_url) = iter.next() {
match reqwest::get(&artifact_url)
.await
.and_then(|res| res.error_for_status())
{
Ok(res) => return Ok(res.bytes_stream()),
Err(err) => {
tracing::warn!(actor_id=?self.actor_id, "failed to start download from {artifact_url}: {err}");
}
}
}

bail!("artifact url could not be resolved");
}

pub async fn download_image(&self, ctx: &Ctx) -> Result<()> {
tracing::info!(actor_id=?self.actor_id, "downloading artifact");

let actor_path = ctx.actor_path(self.actor_id);
let fs_path = actor_path.join("fs");

let mut stream = self.fetch_image_stream(ctx).await?;
let mut stream = utils::fetch_image_stream(
ctx,
self.config.image.id,
&self.config.image.artifact_url_stub,
self.config.image.fallback_artifact_url.as_deref(),
)
.await?;

match self.config.image.kind {
protocol::ImageKind::DockerImage => {
Expand Down
13 changes: 10 additions & 3 deletions packages/infra/client/manager/src/ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,16 @@ impl Ctx {
self.process_command(command).await?;
}
}
protocol::ToClient::FetchStateRequest {} => todo!(),
protocol::ToClient::PrewarmImage {
image_id,
image_artifact_url_stub,
} => {
let self2 = self.clone();

tokio::spawn(async move {
utils::prewarm_image(&self2, image_id, &image_artifact_url_stub).await
});
}
}

Ok(())
Expand Down Expand Up @@ -521,8 +530,6 @@ impl Ctx {
let actor = actor.clone();
let self2 = self.clone();
tokio::spawn(async move {
use std::result::Result::Err;

if let Err(err) = actor.observe(&self2).await {
tracing::error!(actor_id=?row.actor_id, ?err, "observe failed");
}
Expand Down
4 changes: 2 additions & 2 deletions packages/infra/client/manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ mod ctx;
#[cfg(feature = "test")]
pub mod event_sender;
#[cfg(feature = "test")]
pub mod pull_addr_handler;
#[cfg(feature = "test")]
mod metrics;
#[cfg(feature = "test")]
pub mod pull_addr_handler;
#[cfg(feature = "test")]
mod runner;
#[cfg(feature = "test")]
pub mod system_info;
Expand Down
2 changes: 1 addition & 1 deletion packages/infra/client/manager/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ mod utils;

use ctx::Ctx;

const PROTOCOL_VERSION: u16 = 1;
const PROTOCOL_VERSION: u16 = 2;

#[derive(Clone)]
struct Init {
Expand Down
Loading
Loading