Skip to content

Commit

Permalink
migrate realtime topic format
Browse files Browse the repository at this point in the history
  • Loading branch information
Theodus committed Nov 4, 2024
1 parent ed917fd commit 276c405
Showing 1 changed file with 29 additions and 8 deletions.
37 changes: 29 additions & 8 deletions src/receipts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,25 @@ struct IndexerFeesHourlyProtobuf {
aggregations: Vec<IndexerFeesProtobuf>,
}

#[derive(prost::Message)]
pub struct ClientQueryProtobuf {
// 20 bytes (address)
#[prost(bytes, tag = "2")]
pub receipt_signer: Vec<u8>,
#[prost(message, repeated, tag = "10")]
pub indexer_queries: Vec<IndexerQueryProtobuf>,
}
#[derive(prost::Message)]
pub struct IndexerQueryProtobuf {
/// 20 bytes (address)
#[prost(bytes, tag = "1")]
pub indexer: Vec<u8>,
#[prost(double, tag = "6")]
pub fee_grt: f64,
#[prost(bool, tag = "12")]
pub legacy_scalar: bool,
}

async fn process_messages(
consumer: &mut StreamConsumer,
db: mpsc::Sender<Update>,
Expand All @@ -134,22 +153,24 @@ async fn process_messages(
.to_millis()
.and_then(|t| DateTime::from_timestamp(t / 1_000, (t % 1_000) as u32 * 1_000))
.unwrap_or_else(Utc::now);
let payload = match IndexerFeesProtobuf::decode(payload) {
let payload = match ClientQueryProtobuf::decode(payload) {
Ok(payload) => payload,
Err(payload_parse_err) => {
tracing::error!(%payload_parse_err, input = payload.encode_hex());
return;
}
};
if !signers.contains(&Address::from_slice(&payload.signer)) {
if !signers.contains(&Address::from_slice(&payload.receipt_signer)) {
return;
}
let update = Update {
timestamp,
indexer: Address::from_slice(&payload.receiver),
fee: (payload.fee_grt * 1e18) as u128,
};
let _ = db.send(update).await;
for indexer_query in payload.indexer_queries {
let update = Update {
timestamp,
indexer: Address::from_slice(&indexer_query.indexer),
fee: (indexer_query.fee_grt * 1e18) as u128,
};
let _ = db.send(update).await;
}
})
.await;
Ok(())
Expand Down

0 comments on commit 276c405

Please sign in to comment.