diff --git a/Cargo.lock b/Cargo.lock index 09db1524..16605f9d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3786,6 +3786,7 @@ dependencies = [ "serde_json", "thiserror 2.0.11", "tokio", + "tracing", ] [[package]] @@ -3833,12 +3834,10 @@ dependencies = [ name = "fuel-streams-store" version = "0.0.16" dependencies = [ - "async-stream", "async-trait", "dotenvy", "fuel-data-parser", "fuel-streams-macros", - "futures", "serde", "sqlx", "test-case", diff --git a/crates/fuel-streams-core/Cargo.toml b/crates/fuel-streams-core/Cargo.toml index 5f9161a2..66ed23a1 100644 --- a/crates/fuel-streams-core/Cargo.toml +++ b/crates/fuel-streams-core/Cargo.toml @@ -28,6 +28,7 @@ serde = { workspace = true } serde_json = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } +tracing = { workspace = true } [dev-dependencies] pretty_assertions = { workspace = true } diff --git a/crates/fuel-streams-core/src/stream/error.rs b/crates/fuel-streams-core/src/stream/error.rs index fcb0bf12..4487bb80 100644 --- a/crates/fuel-streams-core/src/stream/error.rs +++ b/crates/fuel-streams-core/src/stream/error.rs @@ -1,7 +1,7 @@ use async_nats::SubscribeError; use fuel_message_broker::MessageBrokerError; use fuel_streams_store::{ - db::DbError, + db::{DbError, SqlxError}, record::RecordPacketError, store::StoreError, }; @@ -22,4 +22,6 @@ pub enum StreamError { MessageBrokerClient(#[from] MessageBrokerError), #[error(transparent)] RecordPacket(#[from] RecordPacketError), + #[error(transparent)] + Sqlx(#[from] SqlxError), } diff --git a/crates/fuel-streams-core/src/stream/stream_impl.rs b/crates/fuel-streams-core/src/stream/stream_impl.rs index e83ec158..c88afa2f 100644 --- a/crates/fuel-streams-core/src/stream/stream_impl.rs +++ b/crates/fuel-streams-core/src/stream/stream_impl.rs @@ -2,10 +2,11 @@ use std::{sync::Arc, time::Duration}; pub use async_nats::Subscriber as StreamLiveSubscriber; use fuel_message_broker::MessageBroker; +use fuel_streams_domains::blocks::Block; use fuel_streams_macros::subject::IntoSubject; use fuel_streams_store::{ db::{Db, StoreItem}, - record::Record, + record::{QueryOptions, Record}, store::Store, }; use futures::{ @@ -75,18 +76,14 @@ impl Stream { subject: Arc, deliver_policy: DeliverPolicy, ) -> BoxStream<'static, Result<(String, Vec), StreamError>> { - let store = self.store.clone(); + let db = self.store.db.clone(); let broker = self.broker.clone(); - let subject_clone = subject.clone(); + let subject = subject.clone(); let stream = async_stream::try_stream! { if let DeliverPolicy::FromBlock { block_height } = deliver_policy { - let height = Some(block_height); - let mut historical = store.stream_by_subject(&subject_clone, height); + let mut historical = Self::historical_streaming(&db, subject.to_owned(), Some(block_height)).await; while let Some(result) = historical.next().await { - let item = result.map_err(StreamError::Store)?; - let subject = item.subject_str(); - let value = item.encoded_value().to_vec(); - yield (subject, value); + yield result?; let throttle_time = *config::STREAM_THROTTLE_HISTORICAL; sleep(Duration::from_millis(throttle_time as u64)).await; } @@ -109,4 +106,53 @@ impl Stream { let subject = Arc::new(subject); self.subscribe_dynamic(subject, deliver_policy).await } + + async fn historical_streaming( + db: &Arc, + subject: Arc, + from_block: Option, + ) -> BoxStream<'static, Result<(String, Vec), StreamError>> { + let db = db.clone(); + let stream = async_stream::try_stream! { + let mut current_height = from_block.unwrap_or(0); + let mut last_height = Self::find_last_block_height(&db).await?; + while current_height <= last_height { + let opts = QueryOptions::default().with_from_block(Some(current_height)); + let mut query = R::build_find_many_query(subject.to_owned(), opts.clone()); + let mut stream = query + .build_query_as::() + .fetch(&db.pool); + while let Some(result) = stream.next().await { + let result = result?; + let subject = result.subject_str(); + let value = result.encoded_value().to_vec(); + yield (subject, value); + } + current_height += 1; + // When we reach the last known height, we need to check if any new blocks + // were produced while we were processing the previous ones + if current_height == last_height { + let new_last_height = Self::find_last_block_height(&db).await?; + if new_last_height > last_height { + // Reset current_height back to process the blocks we haven't seen yet + current_height = last_height; + last_height = new_last_height; + } else { + tracing::debug!("No new blocks found, stopping historical streaming on block {}", current_height); + break + } + } + } + }; + Box::pin(stream) + } + + async fn find_last_block_height(db: &Arc) -> Result { + let opts = QueryOptions::default(); + let record = Block::find_last_record(db, opts).await?; + match record { + Some(record) => Ok(record.block_height as u64), + None => Ok(0), + } + } } diff --git a/crates/fuel-streams-store/Cargo.toml b/crates/fuel-streams-store/Cargo.toml index d325c4cc..d4d1ac55 100644 --- a/crates/fuel-streams-store/Cargo.toml +++ b/crates/fuel-streams-store/Cargo.toml @@ -10,12 +10,10 @@ version = { workspace = true } rust-version = { workspace = true } [dependencies] -async-stream = { workspace = true } async-trait = { workspace = true } dotenvy = { workspace = true } fuel-data-parser = { workspace = true } fuel-streams-macros = { workspace = true } -futures = { workspace = true } serde = { workspace = true } sqlx = { workspace = true, default-features = false, features = [ "runtime-tokio", diff --git a/crates/fuel-streams-store/src/db/db_impl.rs b/crates/fuel-streams-store/src/db/db_impl.rs index e1523d26..4e719a21 100644 --- a/crates/fuel-streams-store/src/db/db_impl.rs +++ b/crates/fuel-streams-store/src/db/db_impl.rs @@ -31,6 +31,7 @@ pub enum DbError { } pub type DbResult = Result; +pub type SqlxError = sqlx::Error; pub static DB_POOL_SIZE: LazyLock = LazyLock::new(|| { dotenvy::var("DB_POOL_SIZE") diff --git a/crates/fuel-streams-store/src/record/query_options.rs b/crates/fuel-streams-store/src/record/query_options.rs index be676e23..89182727 100644 --- a/crates/fuel-streams-store/src/record/query_options.rs +++ b/crates/fuel-streams-store/src/record/query_options.rs @@ -3,7 +3,6 @@ pub struct QueryOptions { pub offset: i64, pub limit: i64, pub from_block: Option, - pub to_block: Option, pub namespace: Option, } impl Default for QueryOptions { @@ -12,7 +11,6 @@ impl Default for QueryOptions { offset: 0, limit: 100, from_block: None, - to_block: None, namespace: None, } } @@ -38,8 +36,4 @@ impl QueryOptions { pub fn increment_offset(&mut self) { self.offset += self.limit; } - pub fn with_to_block(mut self, to_block: Option) -> Self { - self.to_block = to_block; - self - } } diff --git a/crates/fuel-streams-store/src/record/record_impl.rs b/crates/fuel-streams-store/src/record/record_impl.rs index 2e2978a8..3fec91fd 100644 --- a/crates/fuel-streams-store/src/record/record_impl.rs +++ b/crates/fuel-streams-store/src/record/record_impl.rs @@ -72,11 +72,7 @@ pub trait Record: RecordEncoder + 'static { // Add block conditions if let Some(block) = options.from_block { - conditions.push(format!("block_height >= {}", block)); - } - - if let Some(block) = options.to_block { - conditions.push(format!("block_height < {}", block)); + conditions.push(format!("block_height = {}", block)); } // Add namespace condition for tests @@ -94,10 +90,12 @@ pub trait Record: RecordEncoder + 'static { query_builder.push(" ORDER BY "); query_builder.push(Self::ORDER_PROPS.join(", ")); - query_builder.push(" ASC LIMIT "); - query_builder.push_bind(options.limit); - query_builder.push(" OFFSET "); - query_builder.push_bind(options.offset); + query_builder.push(" ASC"); + // query_builder.push(" LIMIT "); + // query_builder.push_bind(options.limit); + // query_builder.push(" OFFSET "); + // query_builder.push_bind(options.offset); + dbg!(&query_builder.sql()); query_builder } diff --git a/crates/fuel-streams-store/src/store/config.rs b/crates/fuel-streams-store/src/store/config.rs deleted file mode 100644 index 9b1a01ce..00000000 --- a/crates/fuel-streams-store/src/store/config.rs +++ /dev/null @@ -1,8 +0,0 @@ -use std::sync::LazyLock; - -pub static STORE_PAGINATION_LIMIT: LazyLock = LazyLock::new(|| { - dotenvy::var("STORE_PAGINATION_LIMIT") - .ok() - .and_then(|val| val.parse().ok()) - .unwrap_or(100) -}); diff --git a/crates/fuel-streams-store/src/store/mod.rs b/crates/fuel-streams-store/src/store/mod.rs index 581ab5af..1eefff3a 100644 --- a/crates/fuel-streams-store/src/store/mod.rs +++ b/crates/fuel-streams-store/src/store/mod.rs @@ -1,4 +1,3 @@ -pub(super) mod config; mod errors; mod store_impl; diff --git a/crates/fuel-streams-store/src/store/store_impl.rs b/crates/fuel-streams-store/src/store/store_impl.rs index aaeb9b2d..b23da1de 100644 --- a/crates/fuel-streams-store/src/store/store_impl.rs +++ b/crates/fuel-streams-store/src/store/store_impl.rs @@ -1,10 +1,8 @@ use std::sync::Arc; use fuel_data_parser::DataEncoder; -use fuel_streams_macros::subject::IntoSubject; -use futures::{stream::BoxStream, StreamExt, TryStreamExt}; -use super::{config, StoreError}; +use super::StoreError; use crate::{ db::Db, record::{DbTransaction, QueryOptions, Record, RecordPacket}, @@ -58,7 +56,7 @@ impl Store { #[cfg(any(test, feature = "test-helpers"))] pub async fn find_many_by_subject( &self, - subject: &Arc, + subject: &Arc, mut options: QueryOptions, ) -> StoreResult> { options = options.with_namespace(self.namespace.clone()); @@ -71,30 +69,6 @@ impl Store { .map_err(StoreError::from) } - pub fn stream_by_subject( - &self, - subject: &Arc, - from_block: Option, - ) -> BoxStream<'static, Result> { - let db = Arc::clone(&self.db); - let namespace = self.namespace.clone(); - let subject = subject.clone(); - async_stream::stream! { - let options = QueryOptions::default() - .with_namespace(namespace) - .with_from_block(from_block) - .with_limit(*config::STORE_PAGINATION_LIMIT); - let mut query = R::build_find_many_query(subject, options.clone()); - let mut stream = query - .build_query_as::() - .fetch(&db.pool); - while let Some(result) = stream.try_next().await? { - yield Ok(result); - } - } - .boxed() - } - pub async fn find_last_record(&self) -> StoreResult> { let options = QueryOptions::default().with_namespace(self.namespace.clone()); diff --git a/tests/tests/store/query_builder.rs b/tests/tests/store/query_builder.rs index 574237d7..91fcd31a 100644 --- a/tests/tests/store/query_builder.rs +++ b/tests/tests/store/query_builder.rs @@ -21,8 +21,7 @@ fn test_query_builder() { let options = QueryOptions { offset: 0, limit: 10, - from_block: Some(100), - to_block: Some(200), + from_block: None, namespace: Some("test_ns".to_string()), }; @@ -46,7 +45,7 @@ fn test_query_builder() { assert_eq!( sql, format!( - r#"SELECT _id, subject, value, block_height FROM blocks WHERE producer_address = '{}' AND block_height = '50' AND block_height >= 100 AND block_height < 200 AND subject LIKE 'test_ns%' ORDER BY block_height ASC LIMIT $1 OFFSET $2"#, + r#"SELECT _id, subject, value, block_height FROM blocks WHERE producer_address = '{}' AND block_height = '50' AND subject LIKE 'test_ns%' ORDER BY block_height ASC"#, Address::default() ) ); @@ -55,14 +54,13 @@ fn test_query_builder() { #[test] fn test_query_builder_with_no_subject_fields() { let subject = Arc::new(BlocksSubject::new()); - let options = QueryOptions::default(); let query = Block::build_find_many_query(subject, options); let sql = query.sql(); assert_eq!( sql, - r#"SELECT _id, subject, value, block_height FROM blocks ORDER BY block_height ASC LIMIT $1 OFFSET $2"# + r#"SELECT _id, subject, value, block_height FROM blocks ORDER BY block_height ASC"# ); } @@ -82,7 +80,6 @@ fn test_query_builder_coin_input() { offset: 0, limit: 20, from_block: Some(50), - to_block: Some(150), namespace: Some("test_ns".to_string()), }; @@ -92,7 +89,7 @@ fn test_query_builder_coin_input() { assert_eq!( sql, format!( - r#"SELECT _id, subject, value, block_height, tx_index, input_index FROM inputs WHERE block_height = '100' AND tx_id = '{}' AND tx_index = '1' AND input_index = '2' AND owner_id = '{}' AND asset_id = '{}' AND block_height >= 50 AND block_height < 150 AND subject LIKE 'test_ns%' ORDER BY block_height, tx_index, input_index ASC LIMIT $1 OFFSET $2"#, + r#"SELECT _id, subject, value, block_height, tx_index, input_index FROM inputs WHERE block_height = '100' AND tx_id = '{}' AND tx_index = '1' AND input_index = '2' AND owner_id = '{}' AND asset_id = '{}' AND block_height = 50 AND subject LIKE 'test_ns%' ORDER BY block_height, tx_index, input_index ASC"#, TxId::default(), Address::default(), AssetId::default(), @@ -118,7 +115,7 @@ fn test_query_builder_contract_input() { assert_eq!( sql, format!( - r#"SELECT _id, subject, value, block_height, tx_index, input_index FROM inputs WHERE block_height = '100' AND contract_id = '{}' ORDER BY block_height, tx_index, input_index ASC LIMIT $1 OFFSET $2"#, + r#"SELECT _id, subject, value, block_height, tx_index, input_index FROM inputs WHERE block_height = '100' AND contract_id = '{}' ORDER BY block_height, tx_index, input_index ASC"#, contract_id, ) ); @@ -143,7 +140,7 @@ fn test_query_builder_message_input() { assert_eq!( sql, format!( - r#"SELECT _id, subject, value, block_height, tx_index, input_index FROM inputs WHERE sender_address = '{}' ORDER BY block_height, tx_index, input_index ASC LIMIT $1 OFFSET $2"#, + r#"SELECT _id, subject, value, block_height, tx_index, input_index FROM inputs WHERE sender_address = '{}' ORDER BY block_height, tx_index, input_index ASC"#, sender, ) ); @@ -159,7 +156,7 @@ fn test_query_builder_empty_subject() { assert_eq!( sql, - r#"SELECT _id, subject, value, block_height, tx_index, input_index FROM inputs ORDER BY block_height, tx_index, input_index ASC LIMIT $1 OFFSET $2"# + r#"SELECT _id, subject, value, block_height, tx_index, input_index FROM inputs ORDER BY block_height, tx_index, input_index ASC"# ); } @@ -170,7 +167,6 @@ fn test_query_builder_only_block_range() { offset: 0, limit: 50, from_block: Some(100), - to_block: Some(200), namespace: None, }; @@ -179,6 +175,6 @@ fn test_query_builder_only_block_range() { assert_eq!( sql, - r#"SELECT _id, subject, value, block_height, tx_index, input_index FROM inputs WHERE block_height >= 100 AND block_height < 200 ORDER BY block_height, tx_index, input_index ASC LIMIT $1 OFFSET $2"# + r#"SELECT _id, subject, value, block_height, tx_index, input_index FROM inputs WHERE block_height = 100 ORDER BY block_height, tx_index, input_index ASC"# ); } diff --git a/tests/tests/stream/mod.rs b/tests/tests/stream/mod.rs index b025511b..682b2d5b 100644 --- a/tests/tests/stream/mod.rs +++ b/tests/tests/stream/mod.rs @@ -1,2 +1 @@ mod live_data; -mod store_stream; diff --git a/tests/tests/stream/store_stream.rs b/tests/tests/stream/store_stream.rs deleted file mode 100644 index 8582dfb5..00000000 --- a/tests/tests/stream/store_stream.rs +++ /dev/null @@ -1,35 +0,0 @@ -use fuel_streams_core::types::Block; -use fuel_streams_test::{ - create_multiple_records, - create_random_db_name, - insert_records, - setup_store, -}; -use futures::StreamExt; - -#[tokio::test] -async fn test_stream_by_subject() -> anyhow::Result<()> { - // Setup store and test data - let prefix = create_random_db_name(); - let mut store = setup_store::().await?; - store.with_namespace(&prefix); - - let data = create_multiple_records(10, 0); - let _ = insert_records(&store, &prefix, &data).await?; - - // Test streaming with the first subject - let subject = data[0].0.clone(); - let mut stream = store.stream_by_subject(&subject, Some(0)); - let mut count = 0; - while let Some(result) = stream.next().await { - let record = result?; - let height: u32 = data[count].1.height.clone().into(); - assert_eq!(record.block_height as u32, height); - count += 1; - } - - // Verify we got all records for this subject - assert_eq!(count, 1); // We should only get one record since we're querying by specific subject - - Ok(()) -}