diff --git a/orb-supervisor/CHANGELOG.md b/orb-supervisor/CHANGELOG.md new file mode 100644 index 00000000..0593f0fd --- /dev/null +++ b/orb-supervisor/CHANGELOG.md @@ -0,0 +1,65 @@ +# Changelog + +## 0.4.1 + +### Added + ++ Private proxy for getting notified of service registration `org.worldcoin.OrbSupervisor1` ++ Version pinning for GitHub actions + +### Changed + ++ Upon booting `orb-supervisor` permits `update-agent` to begin downloading + immediately without throttling, until a signup starts + +## 0.4.0 + +### Added + ++ Proxy for logind method `org.freedesktop.login1.Manager.ScheduleShutdown` + + enables `orb-core` and `update-agent` to shutdown or restart the device without + needing to grant elevated priveleges/suid + +## 0.3.0 + +`orb-supervisor` no longer shuts down `orb-core` immediately when an update happens +but waits until no new signups have been started for a while. + +### Added + ++ Upon receiving a `RequestUpdatePermission` request, `orb-supervisor` only shuts + down `orb-core` after 20 minutes of inactivity (meaning that no signups have been + performed for 20 minutes). This timer is reset every time a new signup starts. + Once the timer is up, `orb-supervisor` schedules `update-agent` to immediately run again. + +### Changed + ++ `orb-supervisor` now returns custom `MethodError`s to report why an update was denied, + bringing it more in line with DBus conventions. + +## 0.2.0 (October 20, 2022) + +`orb-supervisor`'s integration with systemd and journald is improved by using +journald conventions and writing directly to the journald socket. + +### Added + ++ `orb-supervisor` detects if its attached to an interactive TTY using `STDIN`: + + if not attached to a TTY, it will write to the journald socket + + if attached to a TTY, it will write to stdout/stderr ++ `orb-supervisor` identifies itself as `worldcoin-supervisor` using SYSLOG IDENT; + + use `journalctl -t worldcoin-supervisor` to filter journald entries + (`-u worldcoin-supervisor` however is still the preferred way); + +## 0.1.0 (August 31, 2022) + +This is the first release of `orb-supervisor`. + +### Added + ++ Expose dbus property `org.worldcoin.OrbSupervisor1.Manager.BackgroundDownloadsAllowed`; + + Tracks how much time has passed since the last + `org.worldcoin.OrbCore1.Signup.SignupStarted` events; ++ Expose dbus method `org.worldcoin.OrbSupervisor1.Manager.RequestUpdatePermission`; + + attempts to shutdown `worldcoin-core.service` through + `org.freedesktop.systemd1.Manager.StopUnit`; diff --git a/orb-supervisor/Cargo.toml b/orb-supervisor/Cargo.toml new file mode 100644 index 00000000..e4924b11 --- /dev/null +++ b/orb-supervisor/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "orb-supervisor" +version = "0.4.1" +edition = "2021" + +[dependencies] +color-eyre = "0.6.3" +libc = "0.2.135" +listenfd = "1.0.0" +tokio = { version = "1.21.2", features = ["macros", "net", "rt-multi-thread"] } +tokio-stream = "0.1.11" +tracing = { version = "0.1.37", features = ["attributes"] } +tracing-subscriber = { version = "0.3.16", features = ["env-filter"] } +zbus = { version = "3.9.0", default-features = false, features = ["tokio"] } +zbus_systemd = { version = "0.0.8", features = [ "systemd1", "login1" ] } +thiserror = "1.0.37" +futures = "0.3.24" +once_cell = "1.15.0" +tap = "1.0.1" +tracing-journald = "0.3.0" + +[dev-dependencies] +dbus-launch = "0.2.0" +tokio = { version = "1.25.0", features = ["sync", "test-util"] } diff --git a/orb-supervisor/PROCESS.md b/orb-supervisor/PROCESS.md new file mode 100644 index 00000000..35453b5c --- /dev/null +++ b/orb-supervisor/PROCESS.md @@ -0,0 +1,15 @@ +# Guideline for Supervised Process Development + +Examples of SuPr (**Su**pervised **Pr**ocess) development: +- update-agent +- orb-core +- fan-controller +- ... + +## Expectations + +Through signal_hook or otherwise, we expect components to adhere to UNIX signal best practices, specifically around shutdown signals. + +### Shutdown Flow + +The supervisor _decides_ it must shutdown. The supervisor iterates over the list of supervised processes, reads their corresponding PID file, and issues a [SIGTERM](https://man7.org/linux/man-pages/man7/signal.7.html) to give the application **SOME DEFINED SECONDS** to shutdown. After that time has elapsed, the supervisor re-reads the SuPr PID files and sends a [SIGKILL](https://man7.org/linux/man-pages/man7/signal.7.html). diff --git a/orb-supervisor/README.md b/orb-supervisor/README.md new file mode 100644 index 00000000..24055bbc --- /dev/null +++ b/orb-supervisor/README.md @@ -0,0 +1,77 @@ +# Orb Supervisor + +Orb supervisor is a central IPC server that coordinates device state and external UX across independent agents (binaries/processes). + +## Table of Contents + +- Minimal viable product (MVP) +- Why (this is necessary) + - Managing device health + - Consistent UX + - Seperation of concerns +- Relevant components + +## MVP + +### Initial release + +- supervisor running [tonic gRPC](https://github.com/hyperium/tonic) over UDS (Unix Domain Sockets) +- supervisor can broadcast shutdown message + - component apps (orb-core, update-agent) listen for broadcast and shutdown +- supervisor can update SMD **through sub-process** +- supervisor can display front LED patterns +- IPC (InterProcess-Communication) client library supporting defaults for process shutdown handlers + - Setup the bidirectional communication + the listener for broadcast messages + +### Immediate follow-up release +- supervisor can play sounds +- supervisor can engage in bi-directional communication for signup permission with orb-core; orb-core must not run a signup if... + - an update is scheduled; + - the device is shutting down; + - the SSD is full (coordinate with @AI on signup extensions); +- fan-controller PoC + - spin fans up/down depending on temperature/temperature-analogs + - watch iops/sec on NVMe as an indicator of SSD temperature (can be replaced by reading out SMART data after kernel 5.10 is deployed) +- supervisor can update SMD **through nvidia-smd crate** + - Implement an Nvidia SMD parser as a crate (other people may want this) + +## Why this is necessary + +There are two reasons that make the orb supervisor necessary: + +1. Managing device health (heat, updates) +1. Consistent UX (updates w/ voice, LEDs) +1. Separation of concerns + +### Managing device health + +Device health must be ensured at all times, whether the device is updating or in the middle of a signup. Furthermore, you want this to be maximally isolated to avoid a scenario where, through a vulnerability in a monolithic application, an attacker acquires fan control and overheats the device. + +> **Scenario**: _A non-security critical update is running in the background and writing large blobs of data to the NVMe SSD_ while _orb-core is running and signups are being performed. An attacker uses a vulnerability in the QR code processing to deadlock a thread. They then proceed to garble the incoming network traffic causing the download to be repeatedly retried and data to be constantly written to the SSD while thermal management is stuck in the blocked runtime. This can feasibly fry an Orb._ + +### Consistent UX + +By necessity, the update agent service must have heightened privileges. Under no circumstances can we extend these to the entire orb-core process. At the same time, the operator must receive feedback on the status of an update. For certain updates, orb-core will not run during the update. In this scenario there is currently no mechanism to give feedback to the operator. + +Thus, an independent service that owns UX is a necessary condition for operator feedback. + +### Seperation of concerns + +Breaking components down allows us to: + ++ Reduce attack surfaces by restricting the responsibilities of privileged services; ++ Employ best patterns for the job (a fan monitoring service looks different from an update agent looks different from orb core); ++ Reduce engineering load (understanding a 500 LoC binary and finding bugs _is_ easier than in a 10k LoC monolith); ++ Running integration tests is significantly easier outside of complex runtimes. + +It is best industry practice to write dedicated services *where possible*, where coupling is low and where solutions already exist. This applies especially on a full Linux host and will reduce engineering load. + +## Relevant components + ++ update agent ++ fan monitor & control ++ wifi management ++ UX controller, split into: + + Sound + + LED ++ library for basic and repeatable "component" diff --git a/orb-supervisor/src/consts.rs b/orb-supervisor/src/consts.rs new file mode 100644 index 00000000..46adb979 --- /dev/null +++ b/orb-supervisor/src/consts.rs @@ -0,0 +1,5 @@ +use tokio::time::Duration; + +pub const WORLDCOIN_CORE_UNIT_NAME: &str = "worldcoin-core.service"; +pub const DURATION_TO_STOP_CORE_AFTER_LAST_SIGNUP: Duration = + Duration::from_secs(20 * 60); diff --git a/orb-supervisor/src/interfaces/manager.rs b/orb-supervisor/src/interfaces/manager.rs new file mode 100644 index 00000000..1705a02b --- /dev/null +++ b/orb-supervisor/src/interfaces/manager.rs @@ -0,0 +1,286 @@ +//! [`Manager`] defines the `org.worldcoin.OrbSupervisor1.Manager` Dbus interface. +//! +//! It currently only supports the `BackgroundDownloadsAllowed` property used by the update to +//! decide whether or not it can download updates. + +use tokio::{ + sync::watch, + time::{Duration, Instant}, +}; +use tracing::{debug, info, instrument, warn}; +use zbus::{ + dbus_interface, fdo::Error as FdoError, Connection, DBusError, SignalContext, +}; +use zbus_systemd::{login1, systemd1}; + +use crate::shutdown::UnknownShutdownKind; + +/// The duration of time since the last "start signup" event that has to have passed +/// before the update agent is permitted to start a download. +pub const DEFAULT_DURATION_TO_ALLOW_DOWNLOADS: Duration = Duration::from_secs(20 * 60); + +pub const BACKGROUND_DOWNLOADS_ALLOWED_PROPERTY_NAME: &str = + "BackgroundDownloadsAllowed"; +pub const INTERFACE_NAME: &str = "org.worldcoin.OrbSupervisor1.Manager"; +pub const OBJECT_PATH: &str = "/org/worldcoin/OrbSupervisor1/Manager"; + +#[derive(Debug, DBusError)] +#[dbus_error(prefix = "org.worldcoin.OrbSupervisor1.Manager")] +pub enum BusError { + #[dbus_error(zbus_error)] + ZBus(zbus::Error), + UpdatesBlocked(String), + InvalidArgs(String), +} + +impl BusError { + fn updates_blocked(msg: impl Into) -> Self { + Self::UpdatesBlocked(msg.into()) + } +} + +pub struct Manager { + duration_to_allow_downloads: Duration, + last_signup_event: watch::Sender, + system_connection: Option, +} + +impl Manager { + /// Constructs a new `Manager` instance. + #[allow(clippy::must_use_candidate)] + pub fn new() -> Self { + let duration_to_allow_downloads = DEFAULT_DURATION_TO_ALLOW_DOWNLOADS; + + // We subtract the DEFAULT_DURATION_TO_ALLOW_DOWNLOADS from the current time + // so that the first check on boot doesn't throttle + let (tx, _rx) = watch::channel( + Instant::now() + .checked_sub(duration_to_allow_downloads) + .unwrap_or(Instant::now()), + ); + Self { + duration_to_allow_downloads, + last_signup_event: tx, + system_connection: None, + } + } + + #[must_use] + pub fn duration_to_allow_downloads( + self, + duration_to_allow_downloads: Duration, + ) -> Self { + Self { + duration_to_allow_downloads, + ..self + } + } + + #[allow(clippy::must_use_candidate)] + pub fn are_downloads_allowed(&self) -> bool { + self.last_signup_event.borrow().elapsed() >= self.duration_to_allow_downloads + } + + fn reset_last_signup_event(&mut self) { + self.last_signup_event.send_replace(Instant::now()); + } + + pub fn set_system_connection(&mut self, conn: zbus::Connection) { + self.system_connection.replace(conn); + } + + /// Resets the internal timer tracking the last signup event to the current time and emits a + /// `PropertyChanged` for the `BackgroundDownloadsAllowed` signal. + /// + /// # Errors + /// + /// The same as calling [`zbus::fdo::Properties::properties_changed`]. + pub async fn reset_last_signup_event_and_notify( + &mut self, + signal_context: &SignalContext<'_>, + ) -> zbus::Result<()> { + self.reset_last_signup_event(); + self.background_downloads_allowed_changed(signal_context) + .await + } +} + +impl Default for Manager { + fn default() -> Self { + Self::new() + } +} + +#[dbus_interface(name = "org.worldcoin.OrbSupervisor1.Manager")] +impl Manager { + #[dbus_interface(property, name = "BackgroundDownloadsAllowed")] + #[instrument( + fields( + dbus_interface = "org.worldcoin.OrbSupervisor1.Manager.BackgroundDownloadsAllowed" + ), + skip_all + )] + async fn background_downloads_allowed(&self) -> bool { + debug!( + millis = self.last_signup_event.borrow().elapsed().as_millis(), + "time since last signup event", + ); + self.are_downloads_allowed() + } + + #[dbus_interface(name = "RequestUpdatePermission")] + #[instrument( + name = "org.worldcoin.OrbSupervisor1.Manager.RequestUpdatePermission", + skip_all + )] + async fn request_update_permission(&self) -> Result<(), BusError> { + debug!("RequestUpdatePermission was called"); + let conn = self + .system_connection + .as_ref() + .expect("manager must be conntected to system dbus"); + let systemd_proxy = systemd1::ManagerProxy::new(conn).await?; + // Spawn task to shut down worldcoin core + let mut shutdown_core_task = + crate::tasks::update::spawn_shutdown_worldcoin_core_timer( + systemd_proxy.clone(), + self.last_signup_event.subscribe(), + ); + // Wait for one second to see if worldcoin core is already shut down + match tokio::time::timeout(Duration::from_secs(1), &mut shutdown_core_task) + .await + { + Ok(Ok(Ok(()))) => { + debug!("worldcoin core shut down task returned in less than 1s, permitting update"); + Ok(()) + } + Ok(Ok(Err(e))) => { + warn!( + error = ?e, + "worldcoin core shutdown task returned with error in less than 1s; permitting update because of unclear status", + ); + Ok(()) + } + Ok(Err(e)) => { + warn!( + panic_msg = ?e, + "worldcoin core shutdown task panicked trying; permitting update because of unclear status", + ); + Ok(()) + } + Err(elapsed) => { + debug!(%elapsed, "shutting down worldcoin core takes longer than 1s; running in background and blocking update by returning a method error"); + let _deteched_shutdown_task = + crate::tasks::update::spawn_start_update_agent_after_core_shutdown_task( + systemd_proxy, + shutdown_core_task, + ); + Err(BusError::updates_blocked( + "orb core is still running and will be shut down 20 minutes after the last \ + signup; supervisor will start update agent after", + )) + } + } + } + + #[dbus_interface(name = "ScheduleShutdown")] + #[instrument( + name = "org.worldcoin.OrbSupervisor1.Manager.ScheduleShutdown", + skip_all + )] + async fn schedule_shutdown(&self, kind: &str, when: u64) -> zbus::fdo::Result<()> { + debug!("ScheduleShutdown was called"); + let shutdown_request = + crate::shutdown::ScheduledShutdown::try_from_dbus((kind.to_owned(), when)) + .map_err(|err: UnknownShutdownKind| { + FdoError::InvalidArgs(format!("{err:?}`")) + })? + .ok_or(FdoError::InvalidArgs("empty string".to_owned()))?; + let conn = self + .system_connection + .as_ref() + .expect("manager must be connected to the system dbus"); + let logind_proxy = login1::ManagerProxy::new(conn).await?; + + let preemption_info = + crate::shutdown::schedule_shutdown(logind_proxy, shutdown_request.clone()) + .await?; + use crate::shutdown::PreemptionInfo as P; + match preemption_info { + P::NoExistingShutdown => { + info!("scheduled shutdown {shutdown_request:?}"); + } + P::PreemptedExistingShutdown(s) => { + warn!("preempting existing lower priority shutdown {s:?} with new shutdown {shutdown_request:?}"); + } + P::KeptExistingShutdown(s) => warn!( + "skipped scheduling shutdown {shutdown_request:?} due to existing higher priority shutdown {s:?}" + ), + }; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use zbus::Interface; + + use super::{Manager, DEFAULT_DURATION_TO_ALLOW_DOWNLOADS}; + + #[test] + fn manager_interface_name_matches_exported_const() { + assert_eq!(super::INTERFACE_NAME, &*Manager::name()); + } + + #[tokio::test] + async fn manager_background_downloads_allowed_property_matched_exported_const() { + let manager = Manager::new(); + assert!(manager + .get(super::BACKGROUND_DOWNLOADS_ALLOWED_PROPERTY_NAME) + .await + .is_some()); + } + + #[test] + fn downloads_are_allowed_on_startup() { + let manager = Manager::new() + .duration_to_allow_downloads(DEFAULT_DURATION_TO_ALLOW_DOWNLOADS); + + assert!(manager.are_downloads_allowed()); + } + + #[tokio::test(start_paused = true)] + async fn downloads_are_disallowed_if_last_signup_event_is_too_recent() { + let mut manager = Manager::new() + .duration_to_allow_downloads(DEFAULT_DURATION_TO_ALLOW_DOWNLOADS); + + manager.reset_last_signup_event(); + + tokio::time::advance(DEFAULT_DURATION_TO_ALLOW_DOWNLOADS / 2).await; + assert!(!manager.are_downloads_allowed()); + } + + #[tokio::test(start_paused = true)] + async fn downloads_are_allowed_if_last_signup_event_is_old_enough() { + let mut manager = Manager::new() + .duration_to_allow_downloads(DEFAULT_DURATION_TO_ALLOW_DOWNLOADS); + + manager.reset_last_signup_event(); + + tokio::time::advance(DEFAULT_DURATION_TO_ALLOW_DOWNLOADS * 2).await; + assert!(manager.are_downloads_allowed()); + } + + #[tokio::test(start_paused = true)] + async fn downloads_become_disallowed_after_reset() { + let mut manager = Manager::new() + .duration_to_allow_downloads(DEFAULT_DURATION_TO_ALLOW_DOWNLOADS); + manager.reset_last_signup_event(); + + tokio::time::advance(DEFAULT_DURATION_TO_ALLOW_DOWNLOADS * 2).await; + assert!(manager.are_downloads_allowed()); + + manager.reset_last_signup_event(); + assert!(!manager.are_downloads_allowed()); + } +} diff --git a/orb-supervisor/src/interfaces/mod.rs b/orb-supervisor/src/interfaces/mod.rs new file mode 100644 index 00000000..99d5fa78 --- /dev/null +++ b/orb-supervisor/src/interfaces/mod.rs @@ -0,0 +1,3 @@ +pub mod manager; + +pub use manager::Manager; diff --git a/orb-supervisor/src/lib.rs b/orb-supervisor/src/lib.rs new file mode 100644 index 00000000..59209814 --- /dev/null +++ b/orb-supervisor/src/lib.rs @@ -0,0 +1,15 @@ +#![warn(clippy::pedantic)] +#![allow( + clippy::missing_errors_doc, + clippy::module_name_repetitions, + clippy::ignored_unit_patterns, + clippy::items_after_statements +)] + +pub mod consts; +pub mod interfaces; +pub mod proxies; +pub mod shutdown; +pub mod startup; +pub mod tasks; +pub mod telemetry; diff --git a/orb-supervisor/src/main.rs b/orb-supervisor/src/main.rs new file mode 100644 index 00000000..8a2e2c1d --- /dev/null +++ b/orb-supervisor/src/main.rs @@ -0,0 +1,25 @@ +use color_eyre::eyre::WrapErr as _; +use orb_supervisor::{ + startup::{Application, Settings}, + telemetry::{self, ExecContext}, +}; +use tracing::debug; +use tracing_subscriber::filter::LevelFilter; + +#[tokio::main] +async fn main() -> color_eyre::Result<()> { + color_eyre::install()?; + telemetry::start::(LevelFilter::INFO, std::io::stdout) + .wrap_err("failed to initialize tracing; bailing")?; + debug!("initialized telemetry"); + + let settings = Settings::default(); + debug!(?settings, "starting supervisor with settings"); + let application = Application::build(settings.clone()) + .await + .wrap_err("failed to build supervisor")?; + + application.run().await?; + + Ok(()) +} diff --git a/orb-supervisor/src/proxies/core.rs b/orb-supervisor/src/proxies/core.rs new file mode 100644 index 00000000..758e99b7 --- /dev/null +++ b/orb-supervisor/src/proxies/core.rs @@ -0,0 +1,17 @@ +//! Dbus proxies for interacting with orb core. + +use zbus::dbus_proxy; + +pub const SIGNUP_PROXY_DEFAULT_WELL_KNOWN_NAME: &str = "org.worldcoin.OrbCore1"; +pub const SIGNUP_PROXY_DEFAULT_OBJECT_PATH: &str = "/org/worldcoin/OrbCore1/Signup"; + +#[dbus_proxy( + interface = "org.worldcoin.OrbCore1.Signup", + gen_blocking = false, + default_service = "org.worldcoin.OrbCore1", + default_path = "/org/worldcoin/OrbCore1/Signup" +)] +pub trait Signup { + #[dbus_proxy(signal)] + fn signup_started(&self) -> Result<()>; +} diff --git a/orb-supervisor/src/proxies/mod.rs b/orb-supervisor/src/proxies/mod.rs new file mode 100644 index 00000000..f9dcffa7 --- /dev/null +++ b/orb-supervisor/src/proxies/mod.rs @@ -0,0 +1,24 @@ +use color_eyre::eyre::{eyre, Result, WrapErr as _}; +use futures::StreamExt; +use zbus::fdo::DBusProxy; +use zbus::Connection; + +pub mod core; + +/// Returns after `name` appears on dbus. +pub async fn wait_for_dbus_registration(conn: &Connection, name: &str) -> Result<()> { + let dbus = DBusProxy::new(conn) + .await + .wrap_err("failed to create org.freedesktop.DBus proxy object")?; + let mut name_changed = dbus + .receive_name_owner_changed() + .await + .wrap_err("failed to get NameOwnerChanged signal stream")?; + while let Some(c) = name_changed.next().await { + let a = c.args().wrap_err("failed to extract signal args")?; + if a.name == name { + return Ok(()); + } + } + Err(eyre!("NameOwnerChanged stream unexpectedly ended")) +} diff --git a/orb-supervisor/src/shutdown.rs b/orb-supervisor/src/shutdown.rs new file mode 100644 index 00000000..6c893bc1 --- /dev/null +++ b/orb-supervisor/src/shutdown.rs @@ -0,0 +1,154 @@ +use std::{cmp::Ordering, fmt::Display, str::FromStr}; + +use tracing::info; +use zbus_systemd::login1::{self}; +use Kind::{DryHalt, DryPoweroff, DryReboot, Halt, Poweroff, Reboot}; + +/// `ScheduledShutdown` represents the logind shutdown tuple as an argument +/// for the `org.freedesktop.login1.Manager.ScheduleShutdown` dbus method. +/// +/// `Option` represents the return value of the +/// `org.freedesktop.login1.Manager.ScheduleShutdown` property. +/// +/// The priority of a scheduled shutdown is determined first by the kind (and +/// its presence), followed by the soonest (lowest `when` value). +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ScheduledShutdown { + pub kind: Kind, + pub when: u64, +} + +impl ScheduledShutdown { + /// `try_from_dbus` attempts to convert from the tuple returned by + /// `org.freedesktop.login1.Manager.ScheduledShutdown` into a + /// `Option` instance. + /// + /// `org.freedesktop.login1.Manager.ScheduledShutdown` returns an empty + /// string for `kind` (and `0` for `when`) if there is no already scheduled + /// shutdown. In this case, we return `Ok(None)` + pub fn try_from_dbus( + (kind, when): (String, u64), + ) -> Result, UnknownShutdownKind> { + let kind = match kind.parse::() { + Ok(kind) => kind, + Err(KindParseErr::EmptyStr) => return Ok(None), + Err(KindParseErr::Unknown(err)) => return Err(err), + }; + + Ok(Some(Self { kind, when })) + } +} + +impl PartialOrd for ScheduledShutdown { + fn partial_cmp(&self, other: &Self) -> Option { + match self.kind.cmp(&other.kind) { + // We want to prioritize smaller `when` values + Ordering::Equal => Some(self.when.cmp(&other.when).reverse()), + v => Some(v), + } + } +} + +#[derive(Clone, Debug, PartialEq, PartialOrd, Eq, Ord)] +pub enum Kind { + Poweroff = 6, + Reboot = 5, + Halt = 4, + DryPoweroff = 3, + DryReboot = 2, + DryHalt = 1, +} + +#[derive(thiserror::Error, Debug, Eq, PartialEq)] +#[error("unknown shutdown kind `{0}`")] +pub struct UnknownShutdownKind(String); + +#[derive(thiserror::Error, Debug, Eq, PartialEq)] +pub enum KindParseErr { + #[error(transparent)] + Unknown(#[from] UnknownShutdownKind), + #[error("empty string")] + EmptyStr, +} + +impl FromStr for Kind { + type Err = KindParseErr; + + fn from_str(value: &str) -> Result { + Ok(match value { + "" => return Err(KindParseErr::EmptyStr), + "dry-poweroff" => Kind::DryPoweroff, + "dry-reboot" => Kind::DryReboot, + "dry-halt" => Kind::DryHalt, + "poweroff" => Kind::Poweroff, + "reboot" => Kind::Reboot, + "halt" => Kind::Halt, + unknown => return Err(UnknownShutdownKind(unknown.to_owned()).into()), + }) + } +} + +impl Kind { + #[must_use] + pub fn as_str(&self) -> &'static str { + match self { + Poweroff => "poweroff", + Reboot => "reboot", + Halt => "halt", + DryPoweroff => "dry-poweroff", + DryReboot => "dry-reboot", + DryHalt => "dry-halt", + } + } +} + +impl Display for Kind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.as_str()) + } +} + +/// The Happy path return value of [`schedule_shutdown`]. Describes whether there +/// were any existing scheduled shutdowns. +#[derive(Debug, Eq, PartialEq)] +pub enum PreemptionInfo { + /// Scheduled successfully, and there were no existing shutdowns to preempt. + NoExistingShutdown, + /// The shutdown that was requested was ignored in favor of an existing shutdown. + KeptExistingShutdown(ScheduledShutdown), + /// The shutdown that was requested preempted/replaced an existing shutdown. + PreemptedExistingShutdown(ScheduledShutdown), +} + +/// Schedules a shutdown using `proxy`. Will preempt a pre-existing shutdown +/// of lower priority. +#[allow(clippy::missing_panics_doc)] +pub async fn schedule_shutdown( + proxy: login1::ManagerProxy<'static>, + shutdown_req: ScheduledShutdown, +) -> zbus::Result { + let already_scheduled: Option = { + info!("getting property `org.freedesktop.login1.Manager.ScheduledShutdown`"); + let raw_tuple = proxy.scheduled_shutdown().await?; + ScheduledShutdown::try_from_dbus(raw_tuple) + .expect("infallible, the result should always parse") + }; + + let result = if let Some(already_scheduled) = already_scheduled { + if shutdown_req.lt(&already_scheduled) { + return Ok(PreemptionInfo::KeptExistingShutdown(already_scheduled)); + } + PreemptionInfo::PreemptedExistingShutdown(already_scheduled) + } else { + PreemptionInfo::NoExistingShutdown + }; + + info!( + "calling `org.freedesktop.login1.Manager.ScheduleShutdown` to shutdown system" + ); + proxy + .schedule_shutdown(shutdown_req.kind.as_str().to_owned(), shutdown_req.when) + .await?; + + Ok(result) +} diff --git a/orb-supervisor/src/startup.rs b/orb-supervisor/src/startup.rs new file mode 100644 index 00000000..17066cf9 --- /dev/null +++ b/orb-supervisor/src/startup.rs @@ -0,0 +1,148 @@ +use color_eyre::eyre::WrapErr as _; +use futures::{future::TryFutureExt as _, FutureExt as _}; +use tracing::debug; +use zbus::{Connection, ConnectionBuilder}; + +use crate::{ + interfaces::{self, manager}, + proxies::core::{ + SIGNUP_PROXY_DEFAULT_OBJECT_PATH, SIGNUP_PROXY_DEFAULT_WELL_KNOWN_NAME, + }, + tasks, +}; + +pub const DBUS_WELL_KNOWN_NAME: &str = "org.worldcoin.OrbSupervisor1"; + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("failed to establish connection to session dbus")] + EstablishSessionConnection(#[source] zbus::Error), + #[error("failed to establish connection to system dbus")] + EstablishSystemConnection(#[source] zbus::Error), + #[error("error occurred in zbus communication")] + Zbus(#[from] zbus::Error), + #[error("invalid session D-Bus address")] + SessionDbusAddress(#[source] zbus::Error), + #[error("error establishing a connection to the session D-Bus or registering an interface")] + SessionDbusConnection, +} + +#[derive(Clone, Debug)] +pub struct Settings { + pub session_dbus_path: Option, + pub system_dbus_path: Option, + pub manager_object_path: String, + pub signup_proxy_well_known_name: String, + pub signup_proxy_object_path: String, + pub well_known_name: String, +} + +impl Settings { + fn new() -> Self { + Self { + session_dbus_path: None, + system_dbus_path: None, + manager_object_path: manager::OBJECT_PATH.to_string(), + signup_proxy_well_known_name: SIGNUP_PROXY_DEFAULT_WELL_KNOWN_NAME + .to_string(), + signup_proxy_object_path: SIGNUP_PROXY_DEFAULT_OBJECT_PATH.to_string(), + well_known_name: DBUS_WELL_KNOWN_NAME.to_string(), + } + } +} + +impl Default for Settings { + fn default() -> Self { + Self::new() + } +} + +pub struct Application { + pub session_connection: Connection, + pub system_connection: Connection, + pub settings: Settings, +} + +impl Application { + /// Constructs an [`Application`] from [`Settings`]. + /// + /// This function also connects to the session D-Bus instance. + /// + /// # Errors + /// + /// [`Application::build`] will return the following errors: + /// + /// + [`Error::SessionDbusAddress`], if the path to the socket holding the session D-Bus + /// instance was not understood (the path is conventionally stored in the environment variable + /// `$DBUS_SESSION_BUS_ADDRESS`, e.g. `unix:path=/run/user/1000/bus` and usually set by + /// systemd. + /// + [`Error::EstablishSessionConnection`], if an error occurred while trying to establish + /// a connection to the session D-Bus instance, or trying to register an interface with it. + /// path to which is conventionally stored in the environment variable + /// systemd. + pub async fn build(settings: Settings) -> Result { + let system_builder = if let Some(path) = settings.system_dbus_path.as_deref() { + ConnectionBuilder::address(path)? + } else { + ConnectionBuilder::system()? + }; + let system_connection = system_builder + .name(settings.well_known_name.clone())? + .build() + .await + .map_err(Error::EstablishSystemConnection)?; + + debug!( + unique_bus_name = ?system_connection.unique_name(), + "system dbus assigned unique bus name", + ); + + let mut manager = interfaces::Manager::new(); + manager.set_system_connection(system_connection.clone()); + + let session_builder = if let Some(path) = settings.session_dbus_path.as_deref() + { + ConnectionBuilder::address(path) + } else { + ConnectionBuilder::session() + } + .map_err(Error::SessionDbusAddress)?; + + let session_connection = futures::future::ready( + session_builder + .name(settings.well_known_name.clone()) + .and_then(|builder| { + builder.serve_at(settings.manager_object_path.clone(), manager) + }), + ) + .and_then(ConnectionBuilder::build) + .await + .map_err(Error::EstablishSessionConnection)?; + + debug!( + unique_bus_name = ?session_connection.unique_name(), + "session dbus assigned unique bus name", + ); + + Ok(Self { + session_connection, + system_connection, + settings, + }) + } + + /// Runs `Application` by spawning its constituent tasks. + pub async fn run(self) -> color_eyre::Result<()> { + let signup_started_task = + tasks::spawn_signup_started_task(&self.settings, &self.session_connection) + .await?; + + let ((),) = tokio::try_join!( + // All tasks are joined here + signup_started_task.map(|e| e + .wrap_err("signup_started task aborted unexpectedly")? + .wrap_err("signup_started task exited with error")), + )?; + Ok(()) + } +} diff --git a/orb-supervisor/src/tasks/mod.rs b/orb-supervisor/src/tasks/mod.rs new file mode 100644 index 00000000..8d054a23 --- /dev/null +++ b/orb-supervisor/src/tasks/mod.rs @@ -0,0 +1,7 @@ +//! Tasks that make up the orb supervisor. + +pub mod signup_started; +pub mod update; + +pub use signup_started::spawn_signup_started_task; +pub use update::spawn_shutdown_worldcoin_core_timer; diff --git a/orb-supervisor/src/tasks/signup_started.rs b/orb-supervisor/src/tasks/signup_started.rs new file mode 100644 index 00000000..764a6ddf --- /dev/null +++ b/orb-supervisor/src/tasks/signup_started.rs @@ -0,0 +1,44 @@ +//! Listens for signup started signals from Orb Core. + +use tokio::task::JoinHandle; +use tokio_stream::StreamExt as _; + +use crate::{interfaces::Manager, proxies::core::SignupProxy, startup::Settings}; + +/// Spawns a task on the tokio runtime listening for `SignupStarted` D-Bus signals from Orb Core. +/// +/// When the task receives a `SignupStarted` signal it resets the timer of the `Manager` interface, +/// and then sends out a `PropertiesChanged` signal for the `BackgroundDownloadsAllowed` property. +/// +/// # Errors +/// +/// + `[zbus::Error]` if an error occurred while building a D-Bus proxy listening for signups from +/// `orb-core`. The errors are the same as those in [`zbus::ProxyBuilder`]. +pub async fn spawn_signup_started_task<'a>( + settings: &Settings, + connection: &'a zbus::Connection, +) -> zbus::Result>> { + let signup_proxy = SignupProxy::builder(connection) + .destination(settings.signup_proxy_well_known_name.clone())? + .path(settings.signup_proxy_object_path.clone())? + .build() + .await?; + let mut signup_started = signup_proxy.receive_signup_started().await?; + let conn = connection.clone(); + + let manager_object_path = settings.manager_object_path.clone(); + let task_handle = tokio::spawn(async move { + while signup_started.next().await.is_some() { + let iface_ref = conn + .object_server() + .interface::<_, Manager>(manager_object_path.clone()) + .await?; + let mut iface = iface_ref.get_mut().await; + iface + .reset_last_signup_event_and_notify(iface_ref.signal_context()) + .await?; + } + Ok::<_, zbus::Error>(()) + }); + Ok(task_handle) +} diff --git a/orb-supervisor/src/tasks/update.rs b/orb-supervisor/src/tasks/update.rs new file mode 100644 index 00000000..11578c4d --- /dev/null +++ b/orb-supervisor/src/tasks/update.rs @@ -0,0 +1,233 @@ +use std::{convert::identity, time::Duration}; + +use futures::{StreamExt as _, TryFutureExt as _}; +use tokio::{ + task::JoinHandle, + time::{self, error::Elapsed, Instant}, +}; +use tracing::{debug, info, instrument, warn}; +use zbus_systemd::systemd1::{self, ManagerProxy}; + +use crate::consts::{ + DURATION_TO_STOP_CORE_AFTER_LAST_SIGNUP, WORLDCOIN_CORE_UNIT_NAME, +}; + +/// Calculates the instant that is 20 minutes after the last signup event. +fn calculate_stop_deadline(last_signup_started_event: Instant) -> Instant { + last_signup_started_event + .checked_add(crate::consts::DURATION_TO_STOP_CORE_AFTER_LAST_SIGNUP) + .expect("`Instant` should always be able to represent the timescales of this codebase") +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("reached timeout while waiting for worldcoin core service to be stopped")] + Elapsed(#[from] Elapsed), + #[error( + "failed communicating over dbus; TODO: break this up into individual errors" + )] + Dbus(#[from] zbus::Error), +} + +/// Spawns a task that shuts down worldcoin core after enough time has passed. +#[must_use] +pub fn spawn_shutdown_worldcoin_core_timer( + proxy: ManagerProxy<'static>, + mut last_signup_started_event: tokio::sync::watch::Receiver, +) -> JoinHandle> { + tokio::spawn(async move { + let trigger_stop = time::sleep_until(calculate_stop_deadline( + *last_signup_started_event.borrow(), + )); + tokio::pin!(trigger_stop); + loop { + tokio::select!( + + // reset the trigger if a new signup has started + _ = last_signup_started_event.changed() => { + info!( + duration_s = DURATION_TO_STOP_CORE_AFTER_LAST_SIGNUP.as_secs(), + "new signup started, resetting timer", + ); + trigger_stop + .as_mut() + .reset(calculate_stop_deadline(*last_signup_started_event.borrow())); + }, + + () = &mut trigger_stop => { + break; + } + ); + } + info!("deadline reached, shutting down worldcoin core service"); + let worldcoin_core_timeout_stop = + get_worldcoin_core_timeout(proxy.clone()).await?; + stop_worldcoin_core(proxy.clone(), WORLDCOIN_CORE_UNIT_NAME, "replace").await?; + tokio::time::timeout( + worldcoin_core_timeout_stop, + has_worldcoin_core_stopped(proxy.clone()), + ) + .await + .map_err(From::from) + .and_then(identity) + }) +} + +#[must_use] +pub fn spawn_start_update_agent_after_core_shutdown_task( + proxy: systemd1::ManagerProxy<'static>, + shutdown_task: JoinHandle>, +) -> JoinHandle> { + tokio::spawn(async move { + match shutdown_task.await { + Ok(Ok(())) => info!("worldcoin core shutdown task completed"), + Ok(Err(e)) => { + warn!(error = ?e, "worldcoin core shutdown task returned with error"); + } + Err(e) => warn!(panic_msg = ?e, "worldcoin core shutdown task panicked"), + } + info!("calling `org.freedesktop.systemd1.Manager.StartUnit` to start update agent"); + proxy + .start_unit("worldcoin-update-agent.service".into(), "replace".into()) + .await + .map(|_| {}) + .map_err(Into::into) + }) +} +// #[instrument( +// name = "spawn_update_agent_after_core_stopped", +// skip_all, +// )] +// pub async fn spawn_start_update_agent_after_worldcoin_core_stopped_task( +// proxy: ManagerProxy<'static>, +// ) -> JoinHandle> { tokio::spawn(async move { let timeout_duration = +// get_worldcoin_core_timeout(proxy.clone()).await?; match tokio::time::timeout( timeout_duration, +// has_worldcoin_core_stopped(proxy), ).await { Ok(_) => todo!("worldcoin core stopped"), +// Err(elapsed) => { info!(error = %elapsed, "did not " } }; Ok(()) }) + +// } + +#[instrument(skip_all, err, ret(Debug))] +async fn get_worldcoin_core_timeout( + proxy: ManagerProxy<'static>, +) -> zbus::Result { + let worldcoin_core_service = proxy + .get_unit(WORLDCOIN_CORE_UNIT_NAME.to_string()) + .and_then(|worldcoin_core_object| async { + zbus_systemd::systemd1::ServiceProxy::builder(proxy.connection()) + .destination("org.freedesktop.systemd1")? + .path(worldcoin_core_object)? + .build() + .await + }) + .await?; + worldcoin_core_service + .timeout_stop_u_sec() + .map_ok(Duration::from_micros) + .await +} + +/// Reports if the worldcoin core systemd service has stopped. +/// +/// This function makes use of the fact that the first item produced by the `PropertyChangedStream` +/// is its current value. This is probably an implementation detail of zbus. +#[instrument(skip_all, err, ret)] +async fn has_worldcoin_core_stopped(proxy: ManagerProxy<'static>) -> Result<(), Error> { + let orb_core_unit = proxy + .get_unit(WORLDCOIN_CORE_UNIT_NAME.to_string()) + .and_then(|object_path| async { + zbus_systemd::systemd1::UnitProxy::builder(proxy.connection()) + .destination("org.freedesktop.systemd1")? + .path(object_path)? + .build() + .await + }) + .await?; + debug!("awaiting active state changed"); + let mut active_state_stream = orb_core_unit.receive_active_state_changed().await; + + // This makes use of the fact that the first iteration always returns the current state. + // So if the service is already inactive or failed, then this loop will break and we + // doesn't spin indefinitely. + debug!("spinning"); + while let Some(event) = active_state_stream.next().await { + match &*event.get().await? { + "inactive" | "failed" => break, + other => { + info!(event = other, "received event"); + } + } + } + Ok(()) +} + +#[instrument( + skip(proxy), + fields(dbus_method = "org.freedesktop.systemd1.Manager.StopUnit",) +)] +async fn stop_worldcoin_core( + proxy: systemd1::ManagerProxy<'static>, + unit_name: &'static str, + stop_mode: &'static str, +) -> zbus::Result<()> { + match proxy + .stop_unit(unit_name.to_string(), stop_mode.to_string()) + .await + { + Ok(unit_path) => { + debug!( + job_object = unit_path.as_str(), + "dbus method call successful" + ); + } + + Err(zbus::Error::MethodError(name, detail, reply)) + if name == "org.freedesktop.systemd1.NoSuchUnit" => + { + // We need to reconstruct the error here because the destructuring, guards and bindings + // don't work in match statements + let method_error = zbus::Error::MethodError(name, detail, reply); + debug!(error = %method_error, "systemd mostl likely reported that worldcoin core is stopped"); + } + + Err(zbus::Error::FDO(e)) => { + warn!( + err = ?e, + dbus_method = "org.freedesktop.systemd1.Manager.StopUnit", + "encountered a D-Bus error when dbus method; permitting update", + ); + } + Err(e) => { + return Err(e); + } + }; + Ok(()) +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use tokio::time::Instant; + + use super::{calculate_stop_deadline, DURATION_TO_STOP_CORE_AFTER_LAST_SIGNUP}; + + #[test] + fn deadline_of_old_signup_event_is_in_the_past() { + let an_hour_ago = Instant::now() + .checked_sub(Duration::from_secs(60 * 60)) + .expect("`Instant` should always be able to represent current time minus 60 minutes"); + let stop_deadline = calculate_stop_deadline(an_hour_ago); + assert!(stop_deadline < Instant::now()); + } + + #[test] + fn deadline_of_now_is_wait_time() { + let now = Instant::now(); + let calculated_stop_deadline = calculate_stop_deadline(now); + let expected_stop_deadline = now + .checked_add(DURATION_TO_STOP_CORE_AFTER_LAST_SIGNUP) + .expect("`Instant` should always be able to represent current time + some minutes"); + assert_eq!(expected_stop_deadline, calculated_stop_deadline); + } +} diff --git a/orb-supervisor/src/telemetry.rs b/orb-supervisor/src/telemetry.rs new file mode 100644 index 00000000..2eab1d61 --- /dev/null +++ b/orb-supervisor/src/telemetry.rs @@ -0,0 +1,77 @@ +use tap::prelude::*; +use tracing::metadata::LevelFilter; +use tracing_subscriber::{filter, fmt::MakeWriter, prelude::*, util::TryInitError}; + +const SYSLOG_IDENTIFIER: &str = "worldcoin-supervisor"; + +fn is_tty_interactive() -> bool { + unsafe { libc::isatty(libc::STDIN_FILENO) == 1 } +} + +pub struct ExecContext; +pub struct TestContext; + +pub trait Context: private::Sealed { + const ENABLE_TELEMETRY: bool = false; +} + +impl Context for ExecContext { + const ENABLE_TELEMETRY: bool = true; +} +impl Context for TestContext {} + +mod private { + use super::{ExecContext, TestContext}; + pub trait Sealed {} + + impl Sealed for ExecContext {} + impl Sealed for TestContext {} +} + +/// Start telemetry (tracing, logging) of orb supervisor. +/// +/// If supervisor is run form an interactive CLI all trace events will be written to stdout. +/// If this function detects that orb-supervisor is not attached to an interactive session, +/// events will be written to journald (this is the case if supervisor is started as a systemd +/// service). +/// +/// The `C: Context` generic type parameter is used for disabling telemetry inside +/// a test environment. +/// +/// # Errors +/// +/// Returns [`tracing_subscriber.util.TryInitError`] +pub fn start( + env_filter: LevelFilter, + sink: W, +) -> Result<(), TryInitError> +where + W: for<'a> MakeWriter<'a> + Send + Sync + 'static, +{ + let env_filter = filter::EnvFilter::builder() + .with_default_directive(env_filter.into()) + .from_env_lossy(); + + let mut fmt = None; + let mut journald = None; + + if C::ENABLE_TELEMETRY && !is_tty_interactive() { + journald = tracing_journald::layer() + .tap_err(|err| { + eprintln!( + "failed connecting to journald socket; will write to stdout: {err}" + ); + }) + .map(|layer| layer.with_syslog_identifier(SYSLOG_IDENTIFIER.into())) + .ok(); + } + if journald.is_none() { + fmt = Some(tracing_subscriber::fmt::layer().with_writer(sink)); + } + + tracing_subscriber::registry() + .with(fmt) + .with(journald) + .with(env_filter) + .try_init() +} diff --git a/orb-supervisor/tests/it/helpers.rs b/orb-supervisor/tests/it/helpers.rs new file mode 100644 index 00000000..957fda88 --- /dev/null +++ b/orb-supervisor/tests/it/helpers.rs @@ -0,0 +1,205 @@ +use std::io; + +use dbus_launch::{BusType, Daemon}; +use once_cell::sync::Lazy; +use orb_supervisor::{ + startup::{Application, Settings}, + telemetry::{self, TestContext}, +}; +use tokio::task::JoinHandle; +use tracing_subscriber::filter::LevelFilter; +use zbus::{ + dbus_interface, dbus_proxy, fdo, zvariant::OwnedObjectPath, ProxyDefault, + SignalContext, +}; + +pub const WORLDCOIN_CORE_SERVICE_OBJECT_PATH: &str = + "/org/freedesktop/systemd1/unit/worldcoin_2dcore_2eservice"; +static TRACING: Lazy<()> = Lazy::new(|| { + let filter = LevelFilter::DEBUG; + if std::env::var("TEST_LOG").is_ok() { + telemetry::start::(filter, std::io::stdout).unwrap(); + } else { + telemetry::start::(filter, std::io::sink).unwrap(); + } +}); + +#[derive(Debug)] +pub struct DbusInstances { + pub session: Daemon, + pub system: Daemon, +} + +pub fn launch_dbuses() -> JoinHandle> { + tokio::task::spawn_blocking(|| { + let session = launch_session_dbus()?; + let system = launch_system_dbus()?; + Ok(DbusInstances { session, system }) + }) +} + +pub fn launch_session_dbus() -> io::Result { + dbus_launch::Launcher::daemon() + .bus_type(BusType::Session) + .launch() +} + +pub fn launch_system_dbus() -> io::Result { + dbus_launch::Launcher::daemon() + .bus_type(BusType::System) + .launch() +} + +pub fn make_settings(dbus_instances: &DbusInstances) -> Settings { + Settings { + session_dbus_path: dbus_instances.session.address().to_string().into(), + system_dbus_path: dbus_instances.system.address().to_string().into(), + ..Default::default() + } +} + +pub async fn spawn_supervisor_service( + settings: Settings, +) -> color_eyre::Result { + Lazy::force(&TRACING); + let application = Application::build(settings.clone()).await?; + Ok(application) +} + +#[dbus_proxy( + interface = "org.worldcoin.OrbSupervisor1.Manager", + gen_async = true, + gen_blocking = false, + default_service = "org.worldcoin.OrbSupervisor1", + default_path = "/org/worldcoin/OrbSupervisor1/Manager" +)] +pub trait Signup { + #[dbus_proxy(property)] + fn background_downloads_allowed(&self) -> zbus::Result; + + #[dbus_proxy(name = "RequestUpdatePermission")] + fn request_update_permission(&self) -> zbus::fdo::Result<()>; +} + +pub async fn make_update_agent_proxy<'a>( + settings: &'a Settings, + dbus_instances: &DbusInstances, +) -> zbus::Result> { + let connection = + zbus::ConnectionBuilder::address(dbus_instances.session.address())? + .build() + .await?; + SignupProxy::builder(&connection) + .cache_properties(zbus::CacheProperties::No) + .destination(settings.well_known_name.clone())? + .path(settings.manager_object_path.clone())? + .build() + .await +} + +struct Signup; + +#[dbus_interface(name = "org.worldcoin.OrbCore1.Signup")] +impl Signup { + #[dbus_interface(signal)] + pub(crate) async fn signup_started(ctxt: &SignalContext<'_>) -> zbus::Result<()>; +} + +pub async fn start_signup_service_and_send_signal( + settings: &Settings, + dbus_instances: &DbusInstances, +) -> zbus::Result<()> { + let conn = zbus::ConnectionBuilder::address(dbus_instances.session.address())? + .name(settings.signup_proxy_well_known_name.clone())? + .serve_at(settings.signup_proxy_object_path.clone(), Signup)? + .build() + .await?; + + let signup_proxy_object_path = settings.signup_proxy_object_path.clone(); + Signup::signup_started(&zbus::SignalContext::new( + &conn, + signup_proxy_object_path.clone(), + )?) + .await?; + Ok(()) +} + +struct Manager; + +#[dbus_interface(name = "org.freedesktop.systemd1.Manager")] +impl Manager { + #[dbus_interface(name = "GetUnit")] + async fn get_unit(&self, name: String) -> fdo::Result { + tracing::debug!(name, "GetUnit called"); + match &*name { + "worldcoin-core.service" => { + OwnedObjectPath::try_from(WORLDCOIN_CORE_SERVICE_OBJECT_PATH) + } + _other => OwnedObjectPath::try_from( + format!("/org/freedesktop/systemd1/unit/{name}") + .replace('-', "_2d") + .replace('.', "_2e"), + ), + } + .map_err(move |_| fdo::Error::UnknownObject(name)) + } + + #[dbus_interface(name = "StopUnit")] + async fn stop_unit( + &self, + name: String, + _mode: String, + ) -> fdo::Result { + tracing::debug!(name, _mode, "StopUnit called"); + OwnedObjectPath::try_from("/org/freedesktop/systemd1/job/1234") + .map_err(move |_| fdo::Error::UnknownObject(name)) + } +} + +pub struct CoreUnit { + active_state: String, +} + +#[dbus_interface(name = "org.freedesktop.systemd1.Unit")] +impl CoreUnit { + #[dbus_interface(property)] + pub async fn active_state(&self) -> String { + tracing::debug!("ActiveState property requested"); + self.active_state.clone() + } + + #[dbus_interface(property)] + pub async fn set_active_state(&mut self, active_state: String) { + tracing::debug!(active_state, "SetActiveState property called"); + self.active_state = active_state; + } +} + +pub struct CoreService; + +#[dbus_interface(name = "org.freedesktop.systemd1.Service")] +impl CoreService { + #[dbus_interface(property, name = "TimeoutStopUSec")] + async fn timeout_stop_u_sec(&self) -> u64 { + tracing::debug!("TimeoutStopUSec property requested"); + 20_000_000 + } +} + +pub async fn start_interfaces( + dbus_instances: &DbusInstances, +) -> zbus::Result { + let conn = zbus::ConnectionBuilder::address(dbus_instances.system.address())? + .name(zbus_systemd::systemd1::ManagerProxy::DESTINATION)? + .serve_at(zbus_systemd::systemd1::ManagerProxy::PATH, Manager)? + .serve_at(WORLDCOIN_CORE_SERVICE_OBJECT_PATH, CoreService)? + .serve_at( + WORLDCOIN_CORE_SERVICE_OBJECT_PATH, + CoreUnit { + active_state: "active".into(), + }, + )? + .build() + .await?; + Ok(conn) +} diff --git a/orb-supervisor/tests/it/main.rs b/orb-supervisor/tests/it/main.rs new file mode 100644 index 00000000..d83b70a5 --- /dev/null +++ b/orb-supervisor/tests/it/main.rs @@ -0,0 +1,112 @@ +use std::time::Duration; + +use tap::TapFallible; +use tokio::sync::oneshot; +use tracing::error; + +pub mod helpers; + +#[tokio::test(start_paused = true)] +async fn supervisor_disallows_downloads_if_signup_started_received( +) -> color_eyre::Result<()> { + let dbus_instances = helpers::launch_dbuses().await??; + + let settings = helpers::make_settings(&dbus_instances); + + let application = helpers::spawn_supervisor_service(settings.clone()).await?; + let _application_handle = tokio::spawn(application.run()); + + let update_agent_proxy = + helpers::make_update_agent_proxy(&settings, &dbus_instances).await?; + + // We want to ensure that downloads are allowed when the manager begins + let downloads_allowed_initially = + update_agent_proxy.background_downloads_allowed().await?; + assert!(downloads_allowed_initially); + + // Now we check thaht after a signup, downloads are not allowed + helpers::start_signup_service_and_send_signal(&settings, &dbus_instances).await?; + let downloads_allowed_after_signal = + update_agent_proxy.background_downloads_allowed().await?; + assert!(!downloads_allowed_after_signal); + + // Then wait for the timeout duration to pass and ensure that the downloads + // are once again allowed + tokio::time::advance( + orb_supervisor::interfaces::manager::DEFAULT_DURATION_TO_ALLOW_DOWNLOADS, + ) + .await; + + let downloads_allowed_after_period = + update_agent_proxy.background_downloads_allowed().await?; + assert!(downloads_allowed_after_period); + + Ok(()) +} + +#[tokio::test(start_paused = true)] +async fn supervisor_stops_orb_core_when_update_permission_is_requested( +) -> color_eyre::Result<()> { + // FIXME: This is a hack to inhibit tokio auto-advance functionality in tests; + // See https://github.com/tokio-rs/tokio/pull/5200 for more info and rework this + // once the necessary functionality is exposed in an API. + let (inhibit_tx, inhibit_rx) = tokio::sync::oneshot::channel(); + tokio::task::spawn_blocking(move || inhibit_rx.blocking_recv()); + + let dbus_instances = helpers::launch_dbuses().await??; + + let settings = helpers::make_settings(&dbus_instances); + let application = helpers::spawn_supervisor_service(settings.clone()).await?; + + let _application_handle = tokio::spawn(application.run()); + + tokio::time::advance( + orb_supervisor::consts::DURATION_TO_STOP_CORE_AFTER_LAST_SIGNUP, + ) + .await; + + let update_agent_proxy = + helpers::make_update_agent_proxy(&settings, &dbus_instances).await?; + let system_conn = helpers::start_interfaces(&dbus_instances).await?; + + let request_update_permission_task = tokio::task::spawn(async move { + update_agent_proxy.request_update_permission().await + }); + // Switch active state to "inactive" after 300ms with forced synchronizatoin + // through the oneshot channel because tasks are not scheduled immediately. + let (active_task_tx, active_task_rx) = oneshot::channel(); + let system_conn_clone = system_conn.clone(); + let set_active_state_task = tokio::task::spawn(async move { + let core_unit = system_conn_clone + .object_server() + .interface::<_, helpers::CoreUnit>(helpers::WORLDCOIN_CORE_SERVICE_OBJECT_PATH) + .await + .tap_err(|e| error!(error = ?e, "failed getting CoreUnit interface from object server")) + .unwrap(); + active_task_tx + .send(()) + .expect("oneshot channel should be open"); + tokio::time::sleep(Duration::from_millis(300)).await; + core_unit + .get_mut() + .await + .set_active_state("inactive".into()) + .await; + }); + // Using the rx channel as a sync point to make sure time isn't advancing too quickly. + active_task_rx + .await + .expect("oneshot channel should be open"); + tokio::time::advance(Duration::from_millis(500)).await; + let (update_permission, active_state) = + tokio::join!(request_update_permission_task, set_active_state_task); + let update_permission = update_permission.expect( + "the request update permissions task should not have panicked because we don't explicitly \ + panick in it", + ); + assert!(matches!(update_permission, Ok(()))); + assert!(matches!(active_state, Ok(()))); + inhibit_tx.send(()).unwrap(); + + Ok(()) +}