Skip to content

Commit

Permalink
feat(user): migrate pub/sub
Browse files Browse the repository at this point in the history
  • Loading branch information
ABCxFF committed Jan 28, 2025
1 parent 71bf25a commit 62f4b39
Show file tree
Hide file tree
Showing 19 changed files with 154 additions and 545 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions packages/api/auth/src/route/identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,10 @@ pub async fn complete(
.await?;

// Send user update to hub
msg!([ctx] user::msg::update(user_ent.user_id) {
user_id: Some(user_ent.user_id.into()),
})
.await?;
(*ctx).msg(::user::workflows::user::Update {})
.tag("user_id", user_ent.user_id)
.send()
.await?;

Ok(models::AuthIdentityCompleteEmailVerificationResponse {
status: models::AuthCompleteStatus::LinkedAccountAdded,
Expand Down
21 changes: 16 additions & 5 deletions packages/api/auth/src/route/tokens.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,23 @@ async fn fallback_user(
AccessKind::Public | AccessKind::Private => {
// Register new user
let user_id = Uuid::new_v4();
msg!([ctx] user::msg::create(user_id) -> user::msg::create_complete {
user_id: Some(user_id.into()),
namespace_id: None,
display_name: None,
})
let mut creation_sub = chirp_workflow::compat::subscribe::<
::user::workflows::user::CreateComplete, _
>(&ctx, ("user_id", user_id)).await?;

chirp_workflow::compat::workflow(
&ctx,
::user::workflows::user::Input {
user_id,
display_name: None,
}
)
.await?
.tag("user_id", user_id)
.dispatch()
.await?;

creation_sub.next().await?;

user_id
}
Expand Down
26 changes: 2 additions & 24 deletions packages/api/cloud/src/route/games/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,32 +25,10 @@ const MAX_BANNER_UPLOAD_SIZE: i64 = util::file_size::megabytes(10) as i64;
// MARK: GET /games
pub async fn list(
ctx: Ctx<Auth>,
watch_index: WatchIndexQuery,
_watch_index: WatchIndexQuery,
) -> GlobalResult<models::CloudGamesGetGamesResponse> {
let accessible_games = ctx.auth().accessible_games(ctx.op_ctx()).await?;

// Wait for an update if needed
let update_ts = if let Some(anchor) = watch_index.to_consumer()? {
// Error if a cloud token tries to watch this endpoint, game update
// messages for teams aren't implemented
if let Some(user_id) = accessible_games.user_id {
let game_update_sub = tail_anchor!([ctx, anchor] user::msg::game_update(user_id));

util::macros::select_with_timeout!({
event = game_update_sub => {
let event = event?;

event.msg_ts()
}
})
} else {
bail_with!(API_FORBIDDEN, reason = "Cloud token cannot watch `/games`");
}
} else {
Default::default()
};
let update_ts = update_ts.unwrap_or_else(util::timestamp::now);

let games = fetch::game::summaries(ctx.op_ctx(), accessible_games.game_ids).await?;
let groups = fetch::group::summaries(
ctx.op_ctx(),
Expand All @@ -62,7 +40,7 @@ pub async fn list(
Ok(models::CloudGamesGetGamesResponse {
games,
groups,
watch: WatchResponse::new_as_model(update_ts),
watch: WatchResponse::new_as_model(util::timestamp::now()),
})
}

Expand Down
2 changes: 1 addition & 1 deletion packages/api/group/src/fetch/identity.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use api_helper::ctx::Ctx;
use proto::{
backend::{self, pkg::*},
backend,
common,
};
use rivet_operation::prelude::*;
Expand Down
Loading

0 comments on commit 62f4b39

Please sign in to comment.