Skip to content

Commit

Permalink
Adding cursor for histoflux
Browse files Browse the repository at this point in the history
  • Loading branch information
leboiko committed Jan 20, 2025
1 parent 2bd0864 commit e78fdbf
Show file tree
Hide file tree
Showing 9 changed files with 172 additions and 15 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions histoflux/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ dotenvy.workspace = true
env_logger.workspace = true
envy.workspace = true
log.workspace = true
macon.workspace = true
models = { path = "../models" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
Expand Down
Binary file not shown.
56 changes: 41 additions & 15 deletions histoflux/src/app_context.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::error::HistoFluxError;
use crate::models::cursor::HistoFluxCursor;
use aws_sdk_sqs::Client as AWSClient;
use log::info;
use models::raw_logs::RawLog;
Expand Down Expand Up @@ -125,7 +126,14 @@ impl SqsProducer {
/// This function processes all existing records in the database and sends
/// them to the SQS queue.
pub async fn process_historical_records(&self) -> Result<(), HistoFluxError> {
let mut last_processed_id = 0;
// Get the last processed id from the database, if it doesnt exist,
// it will return the default value.
info!("Getting last processed id from the DB");
let mut last_processed_id = HistoFluxCursor::find(&self.pg_pool, &self.env.indexer_schema)
.await?
.unwrap_or_default()
.last_processed_id;
info!("Last processed id: {}", last_processed_id);
let amount_of_logs =
RawLog::get_total_count(&self.pg_pool, &self.env.indexer_schema).await?;
// If there are no logs, we dont need to process anything
Expand All @@ -139,7 +147,7 @@ impl SqsProducer {
'outer_loop: for _page in 0..pages {
let logs = RawLog::get_paginated_after_id(
&self.pg_pool,
last_processed_id,
last_processed_id as i32,
page_size,
&self.env.indexer_schema,
)
Expand All @@ -148,7 +156,8 @@ impl SqsProducer {
info!("Processing {} logs", logs.len());
for log in logs {
info!("Processing log: {:?}", log);
last_processed_id = log.id;
last_processed_id = log.id as i64;
self.update_last_processed_id(last_processed_id).await?;
// This is added because we dont want to process more logs than
// the total amount we initially got. We have a listener that
// will send us new logs, so we dont need to process all logs.
Expand All @@ -162,6 +171,22 @@ impl SqsProducer {

Ok(())
}

/// This function updates the last processed id in the database.
pub async fn update_last_processed_id(
&self,
last_processed_id: i64,
) -> Result<(), HistoFluxError> {
let cursor = HistoFluxCursor::builder()
.id(1)
.last_processed_id(last_processed_id)
.updated_at(chrono::Utc::now())
.build();
cursor
.upsert(&self.pg_pool, &self.env.indexer_schema)
.await?;
Ok(())
}
/// This function starts polling the database for raw logs and sends them to
/// the SQS queue.
pub async fn start_pooling_events(&self) -> Result<(), HistoFluxError> {
Expand All @@ -179,19 +204,20 @@ impl SqsProducer {

info!("Processed historical records");

// Process any notifications that arrived during historical processing
while let Ok(notification) = listener.recv().await {
self.process_notification(notification, start_time).await?;
}

// Continue with normal listening
// Process notifications continuously
loop {
let notification = listener.recv().await?;
info!("Processing notification: {:?}", notification);
let payload: RawLog = serde_json::from_str(notification.payload())?;
let message = serde_json::to_string(&payload)?;
self.send_message(message).await?;
info!("Sent message to SQS");
info!("Waiting for notifications");
match listener.recv().await {
Ok(notification) => {
self.process_notification(notification, start_time).await?;
}
Err(e) => {
// Log the error but continue the loop
log::error!("Error receiving notification: {:?}", e);
// Optional: Add delay to prevent tight loop on persistent errors
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
}
}

Expand Down
1 change: 1 addition & 0 deletions histoflux/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use app_context::SqsProducer;

mod app_context;
mod error;
mod models;
mod types;

#[tokio::main]
Expand Down
116 changes: 116 additions & 0 deletions histoflux/src/models/cursor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
use chrono::{DateTime, Utc};
use macon::Builder;
use serde::{Deserialize, Serialize};
use sqlx::PgPool;

use crate::error::HistoFluxError;

#[derive(sqlx::FromRow, Debug, PartialEq, Clone, Builder, Serialize, Deserialize)]
#[sqlx(type_name = "histoflux_cursor")]
pub struct HistoFluxCursor {
pub id: i64,
pub last_processed_id: i64,
pub updated_at: DateTime<Utc>,
}

impl Default for HistoFluxCursor {
fn default() -> Self {
Self {
id: 1,
last_processed_id: 0,
updated_at: Utc::now(),
}
}
}

impl HistoFluxCursor {
/// Upsert the cursor into the DB.
pub async fn upsert(&self, db: &PgPool, schema: &str) -> Result<Self, HistoFluxError> {
let query = format!(
r#"
INSERT INTO {schema}.histoflux_cursor (id, last_processed_id)
VALUES ($1::numeric, $2)
ON CONFLICT (id) DO UPDATE SET
last_processed_id = $2,
updated_at = CURRENT_TIMESTAMP
RETURNING id, last_processed_id, updated_at::timestamptz as updated_at
"#,
schema = schema,
);

sqlx::query_as::<_, HistoFluxCursor>(&query)
.bind(self.id)
.bind(self.last_processed_id)
.fetch_one(db)
.await
.map_err(HistoFluxError::SQLXError)
}

/// Find the cursor in the DB.
pub async fn find(db: &PgPool, schema: &str) -> Result<Option<Self>, HistoFluxError> {
let query = format!(
r#"
SELECT id, last_processed_id, updated_at::timestamptz as updated_at
FROM {}.histoflux_cursor
WHERE id = 1
"#,
schema,
);

sqlx::query_as::<_, HistoFluxCursor>(&query)
.fetch_optional(db)
.await
.map_err(HistoFluxError::SQLXError)
}
}

#[cfg(test)]
mod tests {
use super::*;
use chrono::Duration;
use models::test_helpers::{setup_test_db, TEST_PROXY_SCHEMA};

#[sqlx::test]
async fn test_cursor_upsert_and_find() {
let pool = setup_test_db().await;

let cursor = HistoFluxCursor {
id: 1,
last_processed_id: 100,
updated_at: Utc::now(),
};

// First upsert
let saved = cursor.upsert(&pool, TEST_PROXY_SCHEMA).await.unwrap();

// Find and verify first insert
let found = HistoFluxCursor::find(&pool, TEST_PROXY_SCHEMA)
.await
.unwrap()
.unwrap();
assert_eq!(found.last_processed_id, 100);
assert_eq!(found.updated_at, saved.updated_at);

// Wait a bit to ensure timestamp changes
tokio::time::sleep(Duration::milliseconds(100).to_std().unwrap()).await;

// Second upsert with new block number
let cursor2 = HistoFluxCursor {
id: 1,
last_processed_id: 200,
updated_at: found.updated_at,
};
let saved2 = cursor2.upsert(&pool, TEST_PROXY_SCHEMA).await.unwrap();

// Find and verify update
let found2 = HistoFluxCursor::find(&pool, TEST_PROXY_SCHEMA)
.await
.unwrap()
.unwrap();
assert_eq!(found2.last_processed_id, 200);
assert_eq!(found2.updated_at, saved2.updated_at);

// Verify updated_at changed
assert!(found2.updated_at > found.updated_at);
}
}
1 change: 1 addition & 0 deletions histoflux/src/models/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod cursor;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE base_sepolia_indexer.histoflux_cursor;
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- Add migration script here
CREATE TABLE base_sepolia_indexer.histoflux_cursor(
id BIGINT PRIMARY KEY,
last_processed_id BIGINT NOT NULL,
updated_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_histoflux_cursor_id ON base_sepolia_indexer.histoflux_cursor(id);

INSERT INTO base_sepolia_indexer.histoflux_cursor (id, last_processed_id) VALUES (1, 50232);

0 comments on commit e78fdbf

Please sign in to comment.