From 8e78d4972d5bccc42d37713c5fa38d67a4996c7d Mon Sep 17 00:00:00 2001 From: Ertugrul Aypek Date: Mon, 21 Oct 2024 05:25:40 +0200 Subject: [PATCH] test --- .github/workflows/temp-build-and-push.yaml | 45 ++++++++++++++++++++++ Cargo.lock | 5 +++ Cargo.toml | 2 + deploy/stage/common-values-iris-mpc.yaml | 2 +- iris-mpc-common/Cargo.toml | 2 + iris-mpc-common/src/helpers/aws.rs | 41 -------------------- iris-mpc-common/src/helpers/mod.rs | 1 + iris-mpc-common/src/helpers/tracing.rs | 38 ++++++++++++++++++ iris-mpc-gpu/Cargo.toml | 1 + iris-mpc-gpu/src/server/actor.rs | 17 +++----- iris-mpc-gpu/src/server/mod.rs | 2 + iris-mpc/Cargo.toml | 3 +- iris-mpc/src/bin/server.rs | 44 ++++++++++++--------- 13 files changed, 130 insertions(+), 73 deletions(-) create mode 100644 .github/workflows/temp-build-and-push.yaml create mode 100644 iris-mpc-common/src/helpers/tracing.rs diff --git a/.github/workflows/temp-build-and-push.yaml b/.github/workflows/temp-build-and-push.yaml new file mode 100644 index 000000000..f2dd284f9 --- /dev/null +++ b/.github/workflows/temp-build-and-push.yaml @@ -0,0 +1,45 @@ +name: Branch - Build and push docker image + +on: + push: + +concurrency: + group: '${{ github.workflow }} @ ${{ github.event.pull_request.head.label || github.head_ref || github.ref }}' + cancel-in-progress: true + +env: + REGISTRY: ghcr.io + IMAGE_NAME: ${{ github.repository }} + +jobs: + docker: + runs-on: + labels: ubuntu-22.04-64core + permissions: + packages: write + contents: read + attestations: write + id-token: write + steps: + - name: Checkout + uses: actions/checkout@v4 + - name: Set up QEMU + uses: docker/setup-qemu-action@v3 + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + - name: Log in to the Container registry + uses: docker/login-action@v3 + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.repository_owner }} + password: ${{ secrets.GITHUB_TOKEN }} + - name: Build and Push + uses: docker/build-push-action@v6 + with: + context: . + push: true + tags: | + ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ github.sha }} + platforms: linux/amd64 + cache-from: type=gha + cache-to: type=gha,mode=max diff --git a/Cargo.lock b/Cargo.lock index 639731018..16182649a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2525,6 +2525,7 @@ dependencies = [ "metrics 0.22.3", "metrics-exporter-statsd 0.7.0", "ndarray", + "opentelemetry", "rand", "serde_json", "sha2", @@ -2533,6 +2534,7 @@ dependencies = [ "telemetry-batteries", "tokio", "tracing", + "tracing-opentelemetry", "tracing-subscriber", "uuid", ] @@ -2560,6 +2562,7 @@ dependencies = [ "hmac", "http 1.1.0", "itertools 0.13.0", + "opentelemetry", "percent-encoding", "rand", "rayon", @@ -2576,6 +2579,7 @@ dependencies = [ "tokio", "tokio-retry", "tracing", + "tracing-opentelemetry", "tracing-subscriber", "url", "wiremock", @@ -2629,6 +2633,7 @@ dependencies = [ "metrics-exporter-statsd 0.7.0", "ndarray", "num-traits", + "opentelemetry", "rand", "rayon", "reqwest 0.12.8", diff --git a/Cargo.toml b/Cargo.toml index 4047b4be1..2a60254a2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,6 +44,8 @@ telemetry-batteries = { git = "https://github.com/worldcoin/telemetry-batteries. thiserror = "1" tokio = { version = "1.40", features = ["full", "rt-multi-thread"] } uuid = { version = "1", features = ["v4"] } +opentelemetry = { version = "0.24.0" } +tracing-opentelemetry = { version = "0.25.0"} # Abort on panics rather than unwinding. # This improves performance and makes panic propagation more reliable. diff --git a/deploy/stage/common-values-iris-mpc.yaml b/deploy/stage/common-values-iris-mpc.yaml index f3d5da972..eb673f5ae 100644 --- a/deploy/stage/common-values-iris-mpc.yaml +++ b/deploy/stage/common-values-iris-mpc.yaml @@ -1,4 +1,4 @@ -image: "ghcr.io/worldcoin/iris-mpc:v0.8.26" +image: "ghcr.io/worldcoin/iris-mpc:ecfcc4be02cbed9260c70e6baeab64948df7d59a" environment: stage replicaCount: 1 diff --git a/iris-mpc-common/Cargo.toml b/iris-mpc-common/Cargo.toml index a658dba54..78f184829 100644 --- a/iris-mpc-common/Cargo.toml +++ b/iris-mpc-common/Cargo.toml @@ -28,6 +28,8 @@ config = "0.14.0" tokio.workspace = true tracing.workspace = true tracing-subscriber.workspace = true +opentelemetry.workspace = true +tracing-opentelemetry.workspace = true reqwest = { workspace = true, features = ["blocking", "json"] } sodiumoxide = "0.2.7" diff --git a/iris-mpc-common/src/helpers/aws.rs b/iris-mpc-common/src/helpers/aws.rs index 7135a4ae1..09e83d3bb 100644 --- a/iris-mpc-common/src/helpers/aws.rs +++ b/iris-mpc-common/src/helpers/aws.rs @@ -1,8 +1,5 @@ use aws_sdk_sns::types::MessageAttributeValue; use std::collections::HashMap; -use telemetry_batteries::reexports::opentelemetry::trace::{ - SpanContext, SpanId, TraceFlags, TraceId, TraceState, -}; pub const TRACE_ID_MESSAGE_ATTRIBUTE_NAME: &str = "TraceID"; pub const SPAN_ID_MESSAGE_ATTRIBUTE_NAME: &str = "SpanID"; @@ -36,41 +33,3 @@ pub fn construct_message_attributes( Ok(message_attributes) } - -// This would only ever be leveraged if the code had isolated flows for every -// message, leaving for now, maybe it will happen in the future -pub fn trace_from_message_attributes( - message_attributes: &HashMap, - receipt_handle: &str, -) -> eyre::Result<()> { - if let Some(trace_id) = message_attributes.get(TRACE_ID_MESSAGE_ATTRIBUTE_NAME) { - if let Some(span_id) = message_attributes.get(SPAN_ID_MESSAGE_ATTRIBUTE_NAME) { - let trace_id = trace_id - .string_value() - .expect("Could not parse TraceID") - .parse::()?; - - let span_id = span_id - .string_value() - .expect("Could not parse SpanID") - .parse::()?; - - // Create and set the span parent context - let parent_ctx = SpanContext::new( - TraceId::from(trace_id), - SpanId::from(span_id), - TraceFlags::default(), - true, - TraceState::default(), - ); - - telemetry_batteries::tracing::trace_from_ctx(parent_ctx); - } else { - tracing::warn!(?receipt_handle, "SQS message missing SpanID"); - } - } else { - tracing::warn!(?receipt_handle, "SQS message missing TraceID"); - } - - Ok(()) -} diff --git a/iris-mpc-common/src/helpers/mod.rs b/iris-mpc-common/src/helpers/mod.rs index 75f276cc3..0a94b16e4 100644 --- a/iris-mpc-common/src/helpers/mod.rs +++ b/iris-mpc-common/src/helpers/mod.rs @@ -7,3 +7,4 @@ pub mod smpc_request; pub mod sqs_s3_helper; pub mod sync; pub mod task_monitor; +pub mod tracing; diff --git a/iris-mpc-common/src/helpers/tracing.rs b/iris-mpc-common/src/helpers/tracing.rs new file mode 100644 index 000000000..e536a76e6 --- /dev/null +++ b/iris-mpc-common/src/helpers/tracing.rs @@ -0,0 +1,38 @@ +use opentelemetry::{ + trace::{SpanContext, SpanId, TraceContextExt, TraceFlags, TraceId, TraceState}, + Context, +}; +use tracing::info_span; +use tracing_opentelemetry::OpenTelemetrySpanExt; + +pub fn trace_from_message_attributes(trace_id: &str, span_id: &str) -> eyre::Result { + tracing::info!( + "Creating span context from message attributes. trace id: {}, span id: {}", + trace_id, + span_id + ); + + // Create and set the span parent context + let parent_span_ctx = SpanContext::new( + TraceId::from(trace_id.parse::()?), + SpanId::from(span_id.parse::()?), + TraceFlags::default(), + true, + TraceState::default(), + ); + // let parent_ctx = Context::new().with_remote_span_context(parent_span_ctx); + // let item_span = info_span!("item", trace_id = trace_id, span_id = span_id); + // item_span.set_parent(parent_ctx.clone()); + Ok(parent_span_ctx) +} + +pub fn link_batch_spans(span_contexts: Vec, batch_span: &tracing::Span) { + tracing::info!("Linking batch spans to item spans"); + for span_ctx in span_contexts { + let parent_ctx = Context::new().with_remote_span_context(span_ctx); + let item_span = info_span!("item"); + item_span.set_parent(parent_ctx.clone()); + batch_span.follows_from(&item_span); + item_span.follows_from(batch_span); + } +} diff --git a/iris-mpc-gpu/Cargo.toml b/iris-mpc-gpu/Cargo.toml index 9ee89c19e..acf6a7ebe 100644 --- a/iris-mpc-gpu/Cargo.toml +++ b/iris-mpc-gpu/Cargo.toml @@ -31,6 +31,7 @@ iris-mpc-common = { path = "../iris-mpc-common" } base64 = "0.22.1" metrics = "0.22.1" metrics-exporter-statsd = "0.7" +opentelemetry.workspace = true [dev-dependencies] criterion = "0.5" diff --git a/iris-mpc-gpu/src/server/actor.rs b/iris-mpc-gpu/src/server/actor.rs index ba2e59300..b8b23d513 100644 --- a/iris-mpc-gpu/src/server/actor.rs +++ b/iris-mpc-gpu/src/server/actor.rs @@ -55,6 +55,7 @@ impl ServerActorHandle { &mut self, batch: BatchQuery, ) -> impl Future { + tracing::info!("Submitting batch query to the Actor job queue"); let (tx, rx) = oneshot::channel(); let job = ServerJob { batch, @@ -464,6 +465,7 @@ impl ServerActor { .preprocess_db(&mut self.right_mask_db_slices, &self.current_db_sizes); } + #[tracing::instrument(skip(self, batch, return_channel))] fn process_batch_query( &mut self, batch: BatchQuery, @@ -712,17 +714,7 @@ impl ServerActor { self.device_manager.await_streams(&self.streams[0]); - // Iterate over a list of tracing payloads, and create logs with mappings to - // payloads Log at least a "start" event using a log with trace.id - // and parent.trace.id - for tracing_payload in batch.metadata.iter() { - tracing::info!( - node_id = tracing_payload.node_id, - dd.trace_id = tracing_payload.trace_id, - dd.span_id = tracing_payload.span_id, - "Protocol finished", - ); - } + tracing::info!("Protocol finished"); // Fetch the final results (blocking) let mut host_results = self @@ -907,6 +899,7 @@ impl ServerActor { Ok(()) } + #[tracing::instrument(skip(self, compact_device_queries, compact_device_sums, events, eye_db))] fn compare_query_against_db_and_self( &mut self, compact_device_queries: &DeviceCompactQuery, @@ -1190,6 +1183,7 @@ impl ServerActor { } } + #[tracing::instrument(skip(self, valid_entries))] fn sync_batch_entries(&mut self, valid_entries: &[bool]) -> eyre::Result> { let mut buffer = self .device_manager @@ -1223,6 +1217,7 @@ impl ServerActor { Ok(valid_merged) } + #[tracing::instrument(skip(self))] fn prepare_deletion_shares(&self) -> eyre::Result<(DeviceCompactQuery, DeviceCompactSums)> { let (dummy_code_share, dummy_mask_share) = get_dummy_shares_for_deletion(self.party_id); let compact_query = { diff --git a/iris-mpc-gpu/src/server/mod.rs b/iris-mpc-gpu/src/server/mod.rs index 51ed3beb7..f63472171 100644 --- a/iris-mpc-gpu/src/server/mod.rs +++ b/iris-mpc-gpu/src/server/mod.rs @@ -7,6 +7,7 @@ pub use actor::{get_dummy_shares_for_deletion, ServerActor, ServerActorHandle}; use iris_mpc_common::galois_engine::degree4::{ GaloisRingIrisCodeShare, GaloisRingTrimmedMaskCodeShare, }; +use opentelemetry::trace::SpanContext; use std::collections::HashSet; use tokio::sync::oneshot; @@ -27,6 +28,7 @@ pub struct BatchMetadata { pub struct BatchQuery { pub request_ids: Vec, pub metadata: Vec, + pub span_contexts: Vec, pub query_left: BatchQueryEntries, pub db_left: BatchQueryEntries, pub store_left: BatchQueryEntries, diff --git a/iris-mpc/Cargo.toml b/iris-mpc/Cargo.toml index f91407733..e3cfd1255 100644 --- a/iris-mpc/Cargo.toml +++ b/iris-mpc/Cargo.toml @@ -26,7 +26,8 @@ dotenvy.workspace = true rand.workspace = true base64.workspace = true uuid.workspace = true - +opentelemetry = { version = "0.24.0", features = ["trace"] } +tracing-opentelemetry.workspace = true sodiumoxide = "0.2.7" iris-mpc-gpu = { path = "../iris-mpc-gpu" } iris-mpc-common = { path = "../iris-mpc-common" } diff --git a/iris-mpc/src/bin/server.rs b/iris-mpc/src/bin/server.rs index 1b4dd66b4..4652a2986 100644 --- a/iris-mpc/src/bin/server.rs +++ b/iris-mpc/src/bin/server.rs @@ -24,6 +24,7 @@ use iris_mpc_common::{ }, sync::SyncState, task_monitor::TaskMonitor, + tracing::{link_batch_spans, trace_from_message_attributes}, }, }; use iris_mpc_gpu::{ @@ -49,6 +50,7 @@ use tokio::{ task::spawn_blocking, time::timeout, }; +use tracing::{info_span, Instrument}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; const REGION: &str = "eu-north-1"; @@ -153,6 +155,12 @@ async fn receive_batch( batch_metadata.span_id = span_id.to_string(); } + let span_context = trace_from_message_attributes( + &batch_metadata.trace_id, + &batch_metadata.span_id, + )?; + batch_query.span_contexts.push(span_context); + let request_type = message_attributes .get(SMPC_MESSAGE_TYPE_ATTRIBUTE) .ok_or(ReceiveRequestError::NoMessageTypeAttribute)? @@ -942,6 +950,17 @@ async fn server_main(config: Config) -> eyre::Result<()> { let batch = next_batch.await?; + let batch_span = info_span!(parent: None, "batch_processing_span"); + + link_batch_spans(batch.span_contexts.clone(), &batch_span); + + let _guard = batch_span.enter(); + batch_span.record("main_batch_size", batch.request_ids.len()); + tracing::info!("Main span details: {:?}", batch_span.id()); + tracing::info!("Current span details: {:?}", tracing::Span::current().id()); + + tracing::info!("Finished batch trace setup"); + process_identity_deletions( &batch, &store, @@ -950,23 +969,13 @@ async fn server_main(config: Config) -> eyre::Result<()> { ) .await?; - // Iterate over a list of tracing payloads, and create logs with mappings to - // payloads Log at least a "start" event using a log with trace.id and - // parent.trace.id - for tracing_payload in batch.metadata.iter() { - tracing::info!( - node_id = tracing_payload.node_id, - dd.trace_id = tracing_payload.trace_id, - dd.span_id = tracing_payload.span_id, - "Started processing share", - ); - } + tracing::info!("Started processing share"); // start trace span - with single TraceId and single ParentTraceID tracing::info!("Received batch in {:?}", now.elapsed()); background_tasks.check_tasks(); - let result_future = handle.submit_batch_query(batch); + let result_future = handle.submit_batch_query(batch).instrument(batch_span.clone()); next_batch = receive_batch( party_id, @@ -1009,12 +1018,15 @@ async fn server_main(config: Config) -> eyre::Result<()> { Ok(()) } +#[tracing::instrument(skip(batch, store, dummy_iris_share, dummy_mask_share))] async fn process_identity_deletions( batch: &BatchQuery, store: &Store, dummy_iris_share: &GaloisRingIrisCodeShare, dummy_mask_share: &GaloisRingTrimmedMaskCodeShare, ) -> eyre::Result<()> { + tracing::info!("Processing identity deletions..."); + if batch.deletion_requests_indices.is_empty() { return Ok(()); } @@ -1044,13 +1056,7 @@ async fn process_identity_deletions( ) .await?; - tracing::info!( - node_id = tracing_payload.node_id, - dd.trace_id = tracing_payload.trace_id, - dd.span_id = tracing_payload.span_id, - "Deleted identity with serial id {}", - serial_id, - ); + tracing::info!("Deleted identity with serial id {}", serial_id,); } Ok(())