From 276c405e0d2ba31c19f4bca83cad3c10e56dd37e Mon Sep 17 00:00:00 2001 From: Theo Butler Date: Mon, 4 Nov 2024 13:11:53 -0500 Subject: [PATCH] migrate realtime topic format --- src/receipts.rs | 37 +++++++++++++++++++++++++++++-------- 1 file changed, 29 insertions(+), 8 deletions(-) diff --git a/src/receipts.rs b/src/receipts.rs index 95f9ca1..d48a97f 100644 --- a/src/receipts.rs +++ b/src/receipts.rs @@ -110,6 +110,25 @@ struct IndexerFeesHourlyProtobuf { aggregations: Vec, } +#[derive(prost::Message)] +pub struct ClientQueryProtobuf { + // 20 bytes (address) + #[prost(bytes, tag = "2")] + pub receipt_signer: Vec, + #[prost(message, repeated, tag = "10")] + pub indexer_queries: Vec, +} +#[derive(prost::Message)] +pub struct IndexerQueryProtobuf { + /// 20 bytes (address) + #[prost(bytes, tag = "1")] + pub indexer: Vec, + #[prost(double, tag = "6")] + pub fee_grt: f64, + #[prost(bool, tag = "12")] + pub legacy_scalar: bool, +} + async fn process_messages( consumer: &mut StreamConsumer, db: mpsc::Sender, @@ -134,22 +153,24 @@ async fn process_messages( .to_millis() .and_then(|t| DateTime::from_timestamp(t / 1_000, (t % 1_000) as u32 * 1_000)) .unwrap_or_else(Utc::now); - let payload = match IndexerFeesProtobuf::decode(payload) { + let payload = match ClientQueryProtobuf::decode(payload) { Ok(payload) => payload, Err(payload_parse_err) => { tracing::error!(%payload_parse_err, input = payload.encode_hex()); return; } }; - if !signers.contains(&Address::from_slice(&payload.signer)) { + if !signers.contains(&Address::from_slice(&payload.receipt_signer)) { return; } - let update = Update { - timestamp, - indexer: Address::from_slice(&payload.receiver), - fee: (payload.fee_grt * 1e18) as u128, - }; - let _ = db.send(update).await; + for indexer_query in payload.indexer_queries { + let update = Update { + timestamp, + indexer: Address::from_slice(&indexer_query.indexer), + fee: (indexer_query.fee_grt * 1e18) as u128, + }; + let _ = db.send(update).await; + } }) .await; Ok(())