Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(telemetry): add opentelemetry support #346

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
330 changes: 318 additions & 12 deletions Cargo.lock

Large diffs are not rendered by default.

9 changes: 8 additions & 1 deletion attest/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
#[tokio::main]
async fn main() -> color_eyre::Result<()> {
color_eyre::install()?;
orb_telemetry::TelemetryConfig::new()

let _telemetry_guard = orb_telemetry::TelemetryConfig::new(
orb_attest::SYSLOG_IDENTIFIER,
"1.0.0",
"orb"
)
.with_journald(orb_attest::SYSLOG_IDENTIFIER)
.with_opentelemetry(orb_telemetry::OpenTelemetryConfig::default())
.init();

orb_attest::main().await
}
7 changes: 5 additions & 2 deletions attest/src/remote_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,8 +518,11 @@ printf dmFsaWRzaWduYXR1cmU=
// A happy path
#[tokio::test]
async fn get_token() {
orb_telemetry::TelemetryConfig::new().init();

let _telemetry_guard = orb_telemetry::TelemetryConfig::new(
"test-orb-auth", // service name for test context
"test", // version for test context
"test" // environment for test context
).init();
let mock_server = MockServer::start().await;

let orb_id = "TEST_ORB";
Expand Down
9 changes: 7 additions & 2 deletions backend-state/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,13 @@ struct Cli {}
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
color_eyre::install()?;
orb_telemetry::TelemetryConfig::new()
let _telemetry_guard = orb_telemetry::TelemetryConfig::new(
SYSLOG_IDENTIFIER,
BUILD_INFO.version,
"orb"
)
.with_journald(SYSLOG_IDENTIFIER)
.with_opentelemetry(orb_telemetry::OpenTelemetryConfig::default())
.init();

let _args = Cli::parse();
Expand Down Expand Up @@ -186,7 +191,7 @@ async fn poll_backend(mut ctx: Context) -> ! {

/// Listens for changes to state, and signals that change to the dbus interface.
fn spawn_notify_state_task(
iface: zbus::InterfaceRef<crate::dbus_interface::Interface>,
iface: zbus::InterfaceRef<dbus_interface::Interface>,
mut ctx: Context,
) -> tokio::task::JoinHandle<Result<()>> {
tokio::task::spawn(async move {
Expand Down
19 changes: 16 additions & 3 deletions experiments/zenoh/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,24 @@ enum Args {

#[tokio::main]
async fn main() -> color_eyre::Result<()> {
orb_telemetry::TelemetryConfig::new().init();
tracing::debug!("debug logging is enabled");

let args = Args::parse();

// Configure telemetry with appropriate service name based on mode
let service_name = match args {
Args::Alice { .. } => "zenoh-bench-sender",
Args::Bob { .. } => "zenoh-bench-receiver",
};

let _telemetry_guard = orb_telemetry::TelemetryConfig::new(
service_name,
env!("CARGO_PKG_VERSION"),
"orb"
)
.with_opentelemetry(orb_telemetry::OpenTelemetryConfig::default())
.init();

tracing::debug!("debug logging is enabled");

match args {
Args::Alice { .. } => alice(args).await,
Args::Bob { .. } => bob(args).await,
Expand Down
9 changes: 8 additions & 1 deletion mcu-util/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,14 @@ fn clap_v3_styles() -> Styles {
#[tokio::main]
async fn main() -> Result<()> {
color_eyre::install()?;
orb_telemetry::TelemetryConfig::new().init();

let _telemetry_guard = orb_telemetry::TelemetryConfig::new(
"orb-mcu-util",
BUILD_INFO.version,
"orb"
)
.with_opentelemetry(orb_telemetry::OpenTelemetryConfig::default())
.init();

let args = Args::parse();

Expand Down
9 changes: 8 additions & 1 deletion supervisor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,16 @@ fn clap_v3_styles() -> Styles {
#[tokio::main]
async fn main() -> color_eyre::Result<()> {
color_eyre::install()?;
orb_telemetry::TelemetryConfig::new()

let _telemetry_guard = orb_telemetry::TelemetryConfig::new(
SYSLOG_IDENTIFIER,
BUILD_INFO.version,
"orb"
)
.with_journald(SYSLOG_IDENTIFIER)
.with_opentelemetry(orb_telemetry::OpenTelemetryConfig::default())
.init();

debug!("initialized telemetry");

let _args = Cli::parse();
Expand Down
12 changes: 10 additions & 2 deletions supervisor/tests/it/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,16 @@ use zbus::{

pub const WORLDCOIN_CORE_SERVICE_OBJECT_PATH: &str =
"/org/freedesktop/systemd1/unit/worldcoin_2dcore_2eservice";
static TRACING: Lazy<()> = Lazy::new(|| {
orb_telemetry::TelemetryConfig::new().init();

// Store the shutdown handler in the Lazy static to keep it alive
static TRACING: Lazy<orb_telemetry::TelemetryShutdownHandler> = Lazy::new(|| {
orb_telemetry::TelemetryConfig::new(
"orb-dbus-manager",
env!("CARGO_PKG_VERSION"),
"orb"
)
.with_opentelemetry(orb_telemetry::OpenTelemetryConfig::default())
.init()
});

#[derive(Debug)]
Expand Down
10 changes: 8 additions & 2 deletions telemetry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,14 @@ rust-version.workspace = true

[dependencies]
tracing-journald.workspace = true
tracing-subscriber.workspace = true
tracing.workspace = true
opentelemetry = { version = "0.21", features = ["trace"] }
opentelemetry-otlp = { version = "0.14", features = ["trace", "tonic"] }
opentelemetry_sdk = { version = "0.21", features = ["trace", "rt-tokio"] }
opentelemetry-datadog = "0.10"
tracing-opentelemetry = "0.22"
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json", "time"] }
tracing = "0.1.40"


[target.'cfg(tokio_unstable)'.dependencies]
console-subscriber.workspace = true
Expand Down
184 changes: 161 additions & 23 deletions telemetry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,104 @@ use std::io::IsTerminal as _;

use tracing::level_filters::LevelFilter;
use tracing_subscriber::{
layer::SubscriberExt as _, util::SubscriberInitExt as _, EnvFilter,
layer::SubscriberExt as _,
EnvFilter,
util::SubscriberInitExt,
};

use opentelemetry::{global, KeyValue};
use opentelemetry::trace::TracerProvider;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::{
trace::{self, Sampler},
runtime::Tokio,
Resource,
};
use opentelemetry_sdk::propagation::TraceContextPropagator;

/// Configuration for OpenTelemetry tracing.
#[derive(Debug, Clone)]
pub struct OpenTelemetryConfig {
/// The endpoint to send OTLP data to
pub endpoint: String,
}

impl Default for OpenTelemetryConfig {
fn default() -> Self {
Self {
endpoint: "http://localhost:4317".to_string(),
}
}
}

impl OpenTelemetryConfig {
/// Creates a new OpenTelemetry configuration with a custom endpoint.
pub fn new(endpoint: impl Into<String>) -> Self {
Self {
endpoint: endpoint.into(),
}
}
}

/// A struct controlling how telemetry will be configured (logging + optional OpenTelemetry).
#[derive(Debug)]
pub struct TelemetryConfig {
syslog_identifier: Option<String>,
global_filter: EnvFilter,
service_name: String,
service_version: String,
environment: String,
Comment on lines +49 to +51
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these three fields seem like they should be a part of the OpenTelemetryConfig struct instead of the main telemetry struct. They won't be used by anything other than opentelemetry.

otel: Option<OpenTelemetryConfig>,
}

/// Handles cleanup of telemetry resources on drop.
#[must_use]
pub struct TelemetryShutdownHandler;

impl Drop for TelemetryShutdownHandler {
fn drop(&mut self) {
global::shutdown_tracer_provider();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if we aren't using opentelemetry at all (for example, if we only did with_journald but did not do with_opentelemetry)? Will this panic?

}
}

impl TelemetryConfig {
/// Provides all required arguments for telemetry configuration.
/// - `log_identifier` will be used for journald, if appropriate.
#[expect(clippy::new_without_default, reason = "may add required args later")]
#[must_use]
pub fn new() -> Self {
/// Creates a new telemetry configuration with mandatory service identification.
///
/// # Arguments
/// * `service_name` - The name of the service (e.g., "user-service")
/// * `service_version` - The version of the service (e.g., "1.0.0")
/// * `environment` - The deployment environment (e.g., "production", but for orbs it's always "orb")
pub fn new(
service_name: impl Into<String>,
service_version: impl Into<String>,
environment: impl Into<String>,
) -> Self {
Self {
syslog_identifier: None,
global_filter: EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.from_env_lossy(),
// Spans from dependencies are emitted only at the error level
.parse_lossy("info,zbus=error,h2=error,hyper=error,tonic=error,tower_http=error"),
service_name: service_name.into(),
service_version: service_version.into(),
environment: environment.into(),
otel: None,
}
}

/// Enables journald, and uses the provided syslog identifier.
///
/// If you run the application in a tty, stderr will be used instead.
#[must_use]
pub fn with_journald(self, syslog_identifier: &str) -> Self {
pub fn with_journald(self, syslog_identifier: impl Into<String>) -> Self {
Self {
syslog_identifier: Some(syslog_identifier.to_owned()),
syslog_identifier: Some(syslog_identifier.into()),
..self
}
}

/// Override the global filter to a custom filter.
/// Only do this if actually necessary to deviate from the orb's defaults.
/// Only do this if you actually need to deviate from the defaults.
#[must_use]
pub fn with_global_filter(self, filter: EnvFilter) -> Self {
Self {
Expand All @@ -46,13 +108,79 @@ impl TelemetryConfig {
}
}

pub fn try_init(self) -> Result<(), tracing_subscriber::util::TryInitError> {
/// Enable OpenTelemetry/OTLP tracing with the specified configuration.
#[must_use]
pub fn with_opentelemetry(self, config: OpenTelemetryConfig) -> Self {
Self {
otel: Some(config),
..self
}
}

/// Initialize the OpenTelemetry TracerProvider and set it globally.
fn init_opentelemetry(&self)
-> Result<(trace::TracerProvider, trace::Tracer), Box<dyn std::error::Error>>
Copy link
Collaborator

@TheButlah TheButlah Jan 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: always use eyre::Report instead of Box<dyn std::error::Error>, except in extremely niche scenarios (can't think of any off the top of my head).

But in reality, instead of using eyre::Report or Box<dyn std::error::Error>, lets use the thiserror crate since enum based errors are more suitable for library code than trait object errors.

{
// Build an OpenTelemetry Resource with service metadata
let resource = Resource::new(vec![
KeyValue::new("service.name", self.service_name.clone()),
KeyValue::new("service.version", self.service_version.clone()),
KeyValue::new("deployment.environment", self.environment.clone()),
]);

let config = self.otel.as_ref().expect("OpenTelemetry config must be present");
Copy link
Collaborator

@TheButlah TheButlah Jan 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps this is a code smell that indicates we should actually have this as a method on OpenTelemetryConfig instead of TelemetryConfig. If we do that, otel will never be None, and its also more clear that this function only should be called when OpenTelemetryConfig is available. Alternatively, it could be a free function (not a method) that takes an OpenTelemetryConfig as an argument, since its just a helper function.


let exporter = opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(&config.endpoint)
.build_span_exporter()?;

let trace_config = trace::config()
.with_resource(resource)
.with_sampler(Sampler::AlwaysOn);

// Create a new tracer provider builder
let tracer_provider = trace::TracerProvider::builder()
.with_config(trace_config)
.with_batch_exporter(exporter, Tokio)
.build();

// Create a concrete tracer from the provider:
let tracer = tracer_provider.tracer("telemetry");

// Now set the global tracer provider (if desired)
global::set_tracer_provider(tracer_provider.clone());
global::set_text_map_propagator(TraceContextPropagator::new());

Ok((tracer_provider, tracer))
}


/// Try to initialize telemetry (journald/stderr + optional OTLP).
/// Returns an error if something goes wrong setting up the subscriber stack.
pub fn try_init(self) -> Result<(TelemetryShutdownHandler, Result<(), tracing_subscriber::util::TryInitError>), Box<dyn std::error::Error>> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ths type is quite strange. I think you can simplify it by using thiserror for your error type, and the type signature then becomes:

pub fn try_init(self) -> Result<TelemetryShutdownHandler, OrbTelemetryErr> {
    // ...
}

// Set up the tracer provider if OTLP was requested
let tracer = if let Some(_otel_config) = self.otel.as_ref() {
match self.init_opentelemetry() {
Ok((_provider, tracer)) => Some(tracer),
Err(err) => {
eprintln!("Failed to initialize OTLP exporter: {err}");
None
}
}
} else {
None
};

// Base journald/stderr logging setup
let registry = tracing_subscriber::registry();
// The type is only there to get it to compile.

// If tokio_unstable is enabled, we can gather runtime metrics
let tokio_console_layer: Option<tracing_subscriber::layer::Identity> = None;
#[cfg(tokio_unstable)]
let tokio_console_layer = console_subscriber::spawn();
// Checking for a terminal helps detect if we are running under systemd.

// If we're not attached to a terminal, assume journald is the intended output
let journald_layer = if !std::io::stderr().is_terminal() {
self.syslog_identifier.and_then(|syslog_identifier| {
tracing_journald::layer()
Expand All @@ -68,24 +196,34 @@ impl TelemetryConfig {
} else {
None
};

// If journald is not available or we're in a TTY, fallback to stderr
let stderr_layer = journald_layer
.is_none()
.then(|| tracing_subscriber::fmt::layer().with_writer(std::io::stderr));
assert!(stderr_layer.is_some() || journald_layer.is_some());
registry

// If OTLP tracing is available, attach a tracing-opentelemetry layer
let otlp_layer = tracer.map(|tracer| {
tracing_opentelemetry::layer().with_tracer(tracer)
});

// Build the final subscriber
let init_result = registry
.with(tokio_console_layer)
.with(stderr_layer)
.with(journald_layer)
.with(otlp_layer)
.with(self.global_filter)
.try_init()
.try_init();

Ok((TelemetryShutdownHandler, init_result))
}

/// Initializes the telemetry config. Call this only once, at the beginning of the
/// program.
///
/// Calling this more than once or when another tracing subscriber is registered
/// will cause a panic.
pub fn init(self) {
self.try_init().expect("failed to initialize orb-telemetry")
/// Initializes telemetry, panicking if something goes wrong.
/// Returns a shutdown handler that will clean up resources when dropped.
pub fn init(self) -> TelemetryShutdownHandler {
let (handler, result) = self.try_init().expect("failed to create shutdown handler");
result.expect("failed to initialize telemetry");
handler
}
}
Loading
Loading