Skip to content

Commit

Permalink
Send trace info back to results sns (#445)
Browse files Browse the repository at this point in the history
* send trace info back to results sns

* bump version
  • Loading branch information
eaypek-tfh authored Sep 24, 2024
1 parent 8713988 commit 9fec85b
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 10 deletions.
2 changes: 1 addition & 1 deletion deploy/stage/common-values-iris-mpc.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
image: "ghcr.io/worldcoin/iris-mpc:v0.8.9"
image: "ghcr.io/worldcoin/iris-mpc:v0.8.10"

environment: stage
replicaCount: 1
Expand Down
11 changes: 6 additions & 5 deletions iris-mpc-common/src/helpers/aws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ pub const TRACE_ID_MESSAGE_ATTRIBUTE_NAME: &str = "TraceID";
pub const SPAN_ID_MESSAGE_ATTRIBUTE_NAME: &str = "SpanID";
pub const NODE_ID_MESSAGE_ATTRIBUTE_NAME: &str = "NodeID";

pub fn construct_message_attributes() -> eyre::Result<HashMap<String, MessageAttributeValue>> {
let (trace_id, span_id) = telemetry_batteries::tracing::extract_span_ids();

pub fn construct_message_attributes(
trace_id: &String,
span_id: &String,
) -> eyre::Result<HashMap<String, MessageAttributeValue>> {
let mut message_attributes = HashMap::new();

let trace_id_message_attribute = MessageAttributeValue::builder()
.data_type("String")
.string_value(trace_id.to_string())
.string_value(trace_id)
.build()?;

message_attributes.insert(
Expand All @@ -23,7 +24,7 @@ pub fn construct_message_attributes() -> eyre::Result<HashMap<String, MessageAtt

let span_id_message_attribute = MessageAttributeValue::builder()
.data_type("String")
.string_value(span_id.to_string())
.string_value(span_id)
.build()?;

message_attributes.insert(
Expand Down
1 change: 1 addition & 0 deletions iris-mpc-gpu/src/server/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,7 @@ impl ServerActor {
.send(ServerJobResult {
merged_results,
request_ids: batch.request_ids,
metadata: batch.metadata,
matches,
match_ids,
store_left: query_store_left,
Expand Down
1 change: 1 addition & 0 deletions iris-mpc-gpu/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ pub struct ServerJob {
pub struct ServerJobResult {
pub merged_results: Vec<u32>,
pub request_ids: Vec<String>,
pub metadata: Vec<BatchMetadata>,
pub matches: Vec<bool>,
pub match_ids: Vec<Vec<u32>>,
pub store_left: BatchQueryEntries,
Expand Down
20 changes: 16 additions & 4 deletions iris-mpc/src/bin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ use iris_mpc_common::{
config::{json_wrapper::JsonStrWrapper, Config, Opt},
galois_engine::degree4::{GaloisRingIrisCodeShare, GaloisRingTrimmedMaskCodeShare},
helpers::{
aws::{SPAN_ID_MESSAGE_ATTRIBUTE_NAME, TRACE_ID_MESSAGE_ATTRIBUTE_NAME},
aws::{
construct_message_attributes, SPAN_ID_MESSAGE_ATTRIBUTE_NAME,
TRACE_ID_MESSAGE_ATTRIBUTE_NAME,
},
key_pair::SharesEncryptionKeyPairs,
kms_dh::derive_shared_secret,
smpc_request::{
Expand Down Expand Up @@ -448,17 +451,22 @@ async fn initialize_chacha_seeds(

async fn send_results_to_sns(
result_events: Vec<String>,
metadata: &[BatchMetadata],
sns_client: &SNSClient,
config: &Config,
message_attributes: &HashMap<String, MessageAttributeValue>,
base_message_attributes: &HashMap<String, MessageAttributeValue>,
) -> eyre::Result<()> {
for result_event in result_events {
for (i, result_event) in result_events.iter().enumerate() {
let trace_attributes =
construct_message_attributes(&metadata[i].trace_id, &metadata[i].span_id)?;
let mut message_attributes = base_message_attributes.clone();
message_attributes.extend(trace_attributes);
sns_client
.publish()
.topic_arn(&config.results_topic_arn)
.message(result_event)
.message_group_id(format!("party-id-{}", config.party_id))
.set_message_attributes(Some(message_attributes.clone()))
.set_message_attributes(Some(message_attributes))
.send()
.await?;
}
Expand Down Expand Up @@ -531,6 +539,7 @@ async fn server_main(config: Config) -> eyre::Result<()> {
tracing::info!("Replaying results");
send_results_to_sns(
store.last_results(max_sync_lookback).await?,
&Vec::new(),
&sns_client,
&config,
&uniqueness_result_attributes,
Expand Down Expand Up @@ -707,6 +716,7 @@ async fn server_main(config: Config) -> eyre::Result<()> {
while let Some(ServerJobResult {
merged_results,
request_ids,
metadata,
matches,
match_ids,
store_left,
Expand Down Expand Up @@ -774,6 +784,7 @@ async fn server_main(config: Config) -> eyre::Result<()> {
tracing::info!("Sending {} uniqueness results", uniqueness_results.len());
send_results_to_sns(
uniqueness_results,
&metadata,
&sns_client_bg,
&config_bg,
&uniqueness_result_attributes,
Expand All @@ -796,6 +807,7 @@ async fn server_main(config: Config) -> eyre::Result<()> {
);
send_results_to_sns(
identity_deletion_results,
&metadata,
&sns_client_bg,
&config_bg,
&identity_deletion_result_attributes,
Expand Down

0 comments on commit 9fec85b

Please sign in to comment.