Skip to content

Commit

Permalink
fix(core): Adjust historical streaming to stream block by block
Browse files Browse the repository at this point in the history
  • Loading branch information
pedronauck committed Jan 19, 2025
1 parent 7f32333 commit bdeddd7
Show file tree
Hide file tree
Showing 14 changed files with 78 additions and 114 deletions.
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/fuel-streams-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }

[dev-dependencies]
pretty_assertions = { workspace = true }
Expand Down
4 changes: 3 additions & 1 deletion crates/fuel-streams-core/src/stream/error.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use async_nats::SubscribeError;
use fuel_message_broker::MessageBrokerError;
use fuel_streams_store::{
db::DbError,
db::{DbError, SqlxError},
record::RecordPacketError,
store::StoreError,
};
Expand All @@ -22,4 +22,6 @@ pub enum StreamError {
MessageBrokerClient(#[from] MessageBrokerError),
#[error(transparent)]
RecordPacket(#[from] RecordPacketError),
#[error(transparent)]
Sqlx(#[from] SqlxError),
}
64 changes: 55 additions & 9 deletions crates/fuel-streams-core/src/stream/stream_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ use std::{sync::Arc, time::Duration};

pub use async_nats::Subscriber as StreamLiveSubscriber;
use fuel_message_broker::MessageBroker;
use fuel_streams_domains::blocks::Block;
use fuel_streams_macros::subject::IntoSubject;
use fuel_streams_store::{
db::{Db, StoreItem},
record::Record,
record::{QueryOptions, Record},
store::Store,
};
use futures::{
Expand Down Expand Up @@ -75,18 +76,14 @@ impl<R: Record> Stream<R> {
subject: Arc<dyn IntoSubject>,
deliver_policy: DeliverPolicy,
) -> BoxStream<'static, Result<(String, Vec<u8>), StreamError>> {
let store = self.store.clone();
let db = self.store.db.clone();
let broker = self.broker.clone();
let subject_clone = subject.clone();
let subject = subject.clone();
let stream = async_stream::try_stream! {
if let DeliverPolicy::FromBlock { block_height } = deliver_policy {
let height = Some(block_height);
let mut historical = store.stream_by_subject(&subject_clone, height);
let mut historical = Self::historical_streaming(&db, subject.to_owned(), Some(block_height)).await;
while let Some(result) = historical.next().await {
let item = result.map_err(StreamError::Store)?;
let subject = item.subject_str();
let value = item.encoded_value().to_vec();
yield (subject, value);
yield result?;
let throttle_time = *config::STREAM_THROTTLE_HISTORICAL;
sleep(Duration::from_millis(throttle_time as u64)).await;
}
Expand All @@ -109,4 +106,53 @@ impl<R: Record> Stream<R> {
let subject = Arc::new(subject);
self.subscribe_dynamic(subject, deliver_policy).await
}

async fn historical_streaming(
db: &Arc<Db>,
subject: Arc<dyn IntoSubject>,
from_block: Option<u64>,
) -> BoxStream<'static, Result<(String, Vec<u8>), StreamError>> {
let db = db.clone();
let stream = async_stream::try_stream! {
let mut current_height = from_block.unwrap_or(0);
let mut last_height = Self::find_last_block_height(&db).await?;
while current_height <= last_height {
let opts = QueryOptions::default().with_from_block(Some(current_height));
let mut query = R::build_find_many_query(subject.to_owned(), opts.clone());
let mut stream = query
.build_query_as::<R::StoreItem>()
.fetch(&db.pool);
while let Some(result) = stream.next().await {
let result = result?;
let subject = result.subject_str();
let value = result.encoded_value().to_vec();
yield (subject, value);
}
current_height += 1;
// When we reach the last known height, we need to check if any new blocks
// were produced while we were processing the previous ones
if current_height == last_height {
let new_last_height = Self::find_last_block_height(&db).await?;
if new_last_height > last_height {
// Reset current_height back to process the blocks we haven't seen yet
current_height = last_height;
last_height = new_last_height;
} else {
tracing::debug!("No new blocks found, stopping historical streaming on block {}", current_height);
break
}
}
}
};
Box::pin(stream)
}

async fn find_last_block_height(db: &Arc<Db>) -> Result<u64, StreamError> {
let opts = QueryOptions::default();
let record = Block::find_last_record(db, opts).await?;
match record {
Some(record) => Ok(record.block_height as u64),
None => Ok(0),
}
}
}
2 changes: 0 additions & 2 deletions crates/fuel-streams-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,10 @@ version = { workspace = true }
rust-version = { workspace = true }

[dependencies]
async-stream = { workspace = true }
async-trait = { workspace = true }
dotenvy = { workspace = true }
fuel-data-parser = { workspace = true }
fuel-streams-macros = { workspace = true }
futures = { workspace = true }
serde = { workspace = true }
sqlx = { workspace = true, default-features = false, features = [
"runtime-tokio",
Expand Down
1 change: 1 addition & 0 deletions crates/fuel-streams-store/src/db/db_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub enum DbError {
}

pub type DbResult<T> = Result<T, DbError>;
pub type SqlxError = sqlx::Error;

pub static DB_POOL_SIZE: LazyLock<usize> = LazyLock::new(|| {
dotenvy::var("DB_POOL_SIZE")
Expand Down
6 changes: 0 additions & 6 deletions crates/fuel-streams-store/src/record/query_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ pub struct QueryOptions {
pub offset: i64,
pub limit: i64,
pub from_block: Option<u64>,
pub to_block: Option<u64>,
pub namespace: Option<String>,
}
impl Default for QueryOptions {
Expand All @@ -12,7 +11,6 @@ impl Default for QueryOptions {
offset: 0,
limit: 100,
from_block: None,
to_block: None,
namespace: None,
}
}
Expand All @@ -38,8 +36,4 @@ impl QueryOptions {
pub fn increment_offset(&mut self) {
self.offset += self.limit;
}
pub fn with_to_block(mut self, to_block: Option<u64>) -> Self {
self.to_block = to_block;
self
}
}
16 changes: 7 additions & 9 deletions crates/fuel-streams-store/src/record/record_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,7 @@ pub trait Record: RecordEncoder + 'static {

// Add block conditions
if let Some(block) = options.from_block {
conditions.push(format!("block_height >= {}", block));
}

if let Some(block) = options.to_block {
conditions.push(format!("block_height < {}", block));
conditions.push(format!("block_height = {}", block));
}

// Add namespace condition for tests
Expand All @@ -94,10 +90,12 @@ pub trait Record: RecordEncoder + 'static {

query_builder.push(" ORDER BY ");
query_builder.push(Self::ORDER_PROPS.join(", "));
query_builder.push(" ASC LIMIT ");
query_builder.push_bind(options.limit);
query_builder.push(" OFFSET ");
query_builder.push_bind(options.offset);
query_builder.push(" ASC");
// query_builder.push(" LIMIT ");
// query_builder.push_bind(options.limit);
// query_builder.push(" OFFSET ");
// query_builder.push_bind(options.offset);
dbg!(&query_builder.sql());
query_builder
}

Expand Down
8 changes: 0 additions & 8 deletions crates/fuel-streams-store/src/store/config.rs

This file was deleted.

1 change: 0 additions & 1 deletion crates/fuel-streams-store/src/store/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
pub(super) mod config;
mod errors;
mod store_impl;

Expand Down
30 changes: 2 additions & 28 deletions crates/fuel-streams-store/src/store/store_impl.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use std::sync::Arc;

use fuel_data_parser::DataEncoder;
use fuel_streams_macros::subject::IntoSubject;
use futures::{stream::BoxStream, StreamExt, TryStreamExt};

use super::{config, StoreError};
use super::StoreError;
use crate::{
db::Db,
record::{DbTransaction, QueryOptions, Record, RecordPacket},
Expand Down Expand Up @@ -58,7 +56,7 @@ impl<R: Record + DataEncoder> Store<R> {
#[cfg(any(test, feature = "test-helpers"))]
pub async fn find_many_by_subject(
&self,
subject: &Arc<dyn IntoSubject>,
subject: &Arc<dyn fuel_streams_macros::subject::IntoSubject>,
mut options: QueryOptions,
) -> StoreResult<Vec<R::StoreItem>> {
options = options.with_namespace(self.namespace.clone());
Expand All @@ -71,30 +69,6 @@ impl<R: Record + DataEncoder> Store<R> {
.map_err(StoreError::from)
}

pub fn stream_by_subject(
&self,
subject: &Arc<dyn IntoSubject>,
from_block: Option<u64>,
) -> BoxStream<'static, Result<R::StoreItem, StoreError>> {
let db = Arc::clone(&self.db);
let namespace = self.namespace.clone();
let subject = subject.clone();
async_stream::stream! {
let options = QueryOptions::default()
.with_namespace(namespace)
.with_from_block(from_block)
.with_limit(*config::STORE_PAGINATION_LIMIT);
let mut query = R::build_find_many_query(subject, options.clone());
let mut stream = query
.build_query_as::<R::StoreItem>()
.fetch(&db.pool);
while let Some(result) = stream.try_next().await? {
yield Ok(result);
}
}
.boxed()
}

pub async fn find_last_record(&self) -> StoreResult<Option<R::DbItem>> {
let options =
QueryOptions::default().with_namespace(self.namespace.clone());
Expand Down
20 changes: 8 additions & 12 deletions tests/tests/store/query_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ fn test_query_builder() {
let options = QueryOptions {
offset: 0,
limit: 10,
from_block: Some(100),
to_block: Some(200),
from_block: None,
namespace: Some("test_ns".to_string()),
};

Expand All @@ -46,7 +45,7 @@ fn test_query_builder() {
assert_eq!(
sql,
format!(
r#"SELECT _id, subject, value, block_height FROM blocks WHERE producer_address = '{}' AND block_height = '50' AND block_height >= 100 AND block_height < 200 AND subject LIKE 'test_ns%' ORDER BY block_height ASC LIMIT $1 OFFSET $2"#,
r#"SELECT _id, subject, value, block_height FROM blocks WHERE producer_address = '{}' AND block_height = '50' AND subject LIKE 'test_ns%' ORDER BY block_height ASC"#,
Address::default()
)
);
Expand All @@ -55,14 +54,13 @@ fn test_query_builder() {
#[test]
fn test_query_builder_with_no_subject_fields() {
let subject = Arc::new(BlocksSubject::new());

let options = QueryOptions::default();
let query = Block::build_find_many_query(subject, options);
let sql = query.sql();

assert_eq!(
sql,
r#"SELECT _id, subject, value, block_height FROM blocks ORDER BY block_height ASC LIMIT $1 OFFSET $2"#
r#"SELECT _id, subject, value, block_height FROM blocks ORDER BY block_height ASC"#
);
}

Expand All @@ -82,7 +80,6 @@ fn test_query_builder_coin_input() {
offset: 0,
limit: 20,
from_block: Some(50),
to_block: Some(150),
namespace: Some("test_ns".to_string()),
};

Expand All @@ -92,7 +89,7 @@ fn test_query_builder_coin_input() {
assert_eq!(
sql,
format!(
r#"SELECT _id, subject, value, block_height, tx_index, input_index FROM inputs WHERE block_height = '100' AND tx_id = '{}' AND tx_index = '1' AND input_index = '2' AND owner_id = '{}' AND asset_id = '{}' AND block_height >= 50 AND block_height < 150 AND subject LIKE 'test_ns%' ORDER BY block_height, tx_index, input_index ASC LIMIT $1 OFFSET $2"#,
r#"SELECT _id, subject, value, block_height, tx_index, input_index FROM inputs WHERE block_height = '100' AND tx_id = '{}' AND tx_index = '1' AND input_index = '2' AND owner_id = '{}' AND asset_id = '{}' AND block_height = 50 AND subject LIKE 'test_ns%' ORDER BY block_height, tx_index, input_index ASC"#,
TxId::default(),
Address::default(),
AssetId::default(),
Expand All @@ -118,7 +115,7 @@ fn test_query_builder_contract_input() {
assert_eq!(
sql,
format!(
r#"SELECT _id, subject, value, block_height, tx_index, input_index FROM inputs WHERE block_height = '100' AND contract_id = '{}' ORDER BY block_height, tx_index, input_index ASC LIMIT $1 OFFSET $2"#,
r#"SELECT _id, subject, value, block_height, tx_index, input_index FROM inputs WHERE block_height = '100' AND contract_id = '{}' ORDER BY block_height, tx_index, input_index ASC"#,
contract_id,
)
);
Expand All @@ -143,7 +140,7 @@ fn test_query_builder_message_input() {
assert_eq!(
sql,
format!(
r#"SELECT _id, subject, value, block_height, tx_index, input_index FROM inputs WHERE sender_address = '{}' ORDER BY block_height, tx_index, input_index ASC LIMIT $1 OFFSET $2"#,
r#"SELECT _id, subject, value, block_height, tx_index, input_index FROM inputs WHERE sender_address = '{}' ORDER BY block_height, tx_index, input_index ASC"#,
sender,
)
);
Expand All @@ -159,7 +156,7 @@ fn test_query_builder_empty_subject() {

assert_eq!(
sql,
r#"SELECT _id, subject, value, block_height, tx_index, input_index FROM inputs ORDER BY block_height, tx_index, input_index ASC LIMIT $1 OFFSET $2"#
r#"SELECT _id, subject, value, block_height, tx_index, input_index FROM inputs ORDER BY block_height, tx_index, input_index ASC"#
);
}

Expand All @@ -170,7 +167,6 @@ fn test_query_builder_only_block_range() {
offset: 0,
limit: 50,
from_block: Some(100),
to_block: Some(200),
namespace: None,
};

Expand All @@ -179,6 +175,6 @@ fn test_query_builder_only_block_range() {

assert_eq!(
sql,
r#"SELECT _id, subject, value, block_height, tx_index, input_index FROM inputs WHERE block_height >= 100 AND block_height < 200 ORDER BY block_height, tx_index, input_index ASC LIMIT $1 OFFSET $2"#
r#"SELECT _id, subject, value, block_height, tx_index, input_index FROM inputs WHERE block_height = 100 ORDER BY block_height, tx_index, input_index ASC"#
);
}
1 change: 0 additions & 1 deletion tests/tests/stream/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
mod live_data;
mod store_stream;
Loading

0 comments on commit bdeddd7

Please sign in to comment.