Skip to content

Commit

Permalink
Merge branch 'master' into rafal/dust_factor_in_coins_to_spend
Browse files Browse the repository at this point in the history
  • Loading branch information
rafal-ch authored Jan 22, 2025
2 parents ae586fd + 2031bea commit 609b9ce
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 46 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- [2551](https://github.com/FuelLabs/fuel-core/pull/2551): Enhanced the DA compressed block header to include block id.

### Fixed
- [2609](https://github.com/FuelLabs/fuel-core/pull/2609): Check response before trying to deserialize, return error instead
- [2599](https://github.com/FuelLabs/fuel-core/pull/2599): Use the proper `url` apis to construct full url path in `BlockCommitterHttpApi` client

## [Version 0.41.0]
Expand Down
4 changes: 2 additions & 2 deletions crates/services/gas_price_service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ num_enum = { workspace = true }
parking_lot = { workspace = true }
reqwest = { workspace = true, features = ["json"] }
serde = { workspace = true }
serde_json = { workspace = true, optional = true }
serde_json = { workspace = true }
strum = { workspace = true, features = ["derive"] }
strum_macros = { workspace = true }
thiserror = { workspace = true }
Expand All @@ -39,4 +39,4 @@ mockito = { version = "1.6.1" }
serde_json = { workspace = true }

[features]
test-helpers = ["dep:mockito", "dep:serde_json"]
test-helpers = ["dep:mockito"]
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ impl BlockCommitterApi for BlockCommitterHttpApi {
let path = format!("/v1/costs?variant=specific&value={l2_block_number}&limit={NUMBER_OF_BUNDLES}");
let full_path = url.join(&path)?;
let response = self.client.get(full_path).send().await?;
let parsed = response.json::<Vec<RawDaBlockCosts>>().await?;
let text = response.text().await?;
let parsed: Vec<RawDaBlockCosts> = serde_json::from_str(&text).map_err(|e| { anyhow::anyhow!("Failed to get costs from block committer: {e} for the response {text}") })?;
Ok(parsed)
} else {
Ok(vec![])
Expand Down
105 changes: 62 additions & 43 deletions crates/services/src/async_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,46 +126,45 @@ mod tests {
};
use tokio::time::Instant;

#[test]
fn one_spawn_single_tasks_works() {
#[tokio::test]
async fn one_spawn_single_tasks_works() {
// Given
let number_of_pending_tasks = 1;
const NUMBER_OF_PENDING_TASKS: usize = 1;
let heavy_task_processor =
AsyncProcessor::new("Test", 1, number_of_pending_tasks).unwrap();
AsyncProcessor::new("Test", 1, NUMBER_OF_PENDING_TASKS).unwrap();

// When
let (sender, mut receiver) = tokio::sync::oneshot::channel();
let (sender, receiver) = tokio::sync::oneshot::channel();
let result = heavy_task_processor.try_spawn(async move {
sender.send(()).unwrap();
});

// Then
result.expect("Expected Ok result");
sleep(Duration::from_secs(1));
receiver.try_recv().unwrap();
tokio::time::timeout(Duration::from_secs(5), receiver)
.await
.unwrap()
.unwrap();
}

#[tokio::test]
async fn one_spawn_single_tasks_works__thread_id_is_different_than_main() {
// Given
let number_of_threads = 10;
let number_of_pending_tasks = 10000;
const MAX_NUMBER_OF_THREADS: usize = 10;
const NUMBER_OF_PENDING_TASKS: usize = 10000;
let heavy_task_processor =
AsyncProcessor::new("Test", number_of_threads, number_of_pending_tasks)
AsyncProcessor::new("Test", MAX_NUMBER_OF_THREADS, NUMBER_OF_PENDING_TASKS)
.unwrap();
let main_handler = tokio::spawn(async move { std::thread::current().id() });
let main_id = main_handler.await.unwrap();

// When
let futures = iter::repeat_with(|| {
heavy_task_processor
.try_spawn(async move {
tokio::time::sleep(Duration::from_secs(1)).await;
std::thread::current().id()
})
.try_spawn(async move { std::thread::current().id() })
.unwrap()
})
.take(number_of_pending_tasks)
.take(NUMBER_OF_PENDING_TASKS)
.collect::<Vec<_>>();

// Then
Expand All @@ -175,16 +174,20 @@ mod tests {
.map(|r| r.unwrap())
.collect::<HashSet<_>>();

// Main thread was not used.
assert!(!unique_thread_ids.contains(&main_id));
assert_eq!(unique_thread_ids.len(), number_of_threads);
// There's been at least one worker thread used.
assert!(!unique_thread_ids.is_empty());
// There were no more worker threads above the threshold.
assert!(unique_thread_ids.len() <= MAX_NUMBER_OF_THREADS);
}

#[test]
fn second_spawn_fails_when_limit_is_one_and_first_in_progress() {
// Given
let number_of_pending_tasks = 1;
const NUMBER_OF_PENDING_TASKS: usize = 1;
let heavy_task_processor =
AsyncProcessor::new("Test", 1, number_of_pending_tasks).unwrap();
AsyncProcessor::new("Test", 1, NUMBER_OF_PENDING_TASKS).unwrap();
let first_spawn_result = heavy_task_processor.try_spawn(async move {
sleep(Duration::from_secs(1));
});
Expand All @@ -196,15 +199,15 @@ mod tests {
});

// Then
let err = second_spawn_result.expect_err("Expected Ok result");
let err = second_spawn_result.expect_err("Should error");
assert_eq!(err, OutOfCapacity);
}

#[test]
fn second_spawn_works_when_first_is_finished() {
let number_of_pending_tasks = 1;
const NUMBER_OF_PENDING_TASKS: usize = 1;
let heavy_task_processor =
AsyncProcessor::new("Test", 1, number_of_pending_tasks).unwrap();
AsyncProcessor::new("Test", 1, NUMBER_OF_PENDING_TASKS).unwrap();

// Given
let (sender, receiver) = tokio::sync::oneshot::channel();
Expand All @@ -229,11 +232,11 @@ mod tests {
#[test]
fn can_spawn_10_tasks_when_limit_is_10() {
// Given
let number_of_pending_tasks = 10;
const NUMBER_OF_PENDING_TASKS: usize = 10;
let heavy_task_processor =
AsyncProcessor::new("Test", 1, number_of_pending_tasks).unwrap();
AsyncProcessor::new("Test", 1, NUMBER_OF_PENDING_TASKS).unwrap();

for _ in 0..number_of_pending_tasks {
for _ in 0..NUMBER_OF_PENDING_TASKS {
// When
let result = heavy_task_processor.try_spawn(async move {
tokio::time::sleep(Duration::from_secs(1)).await;
Expand All @@ -245,19 +248,19 @@ mod tests {
}

#[tokio::test]
async fn executes_10_tasks_for_10_seconds_with_one_thread() {
async fn executes_5_tasks_for_5_seconds_with_one_thread() {
// Given
let number_of_pending_tasks = 10;
let number_of_threads = 1;
const NUMBER_OF_PENDING_TASKS: usize = 5;
const NUMBER_OF_THREADS: usize = 1;
let heavy_task_processor =
AsyncProcessor::new("Test", number_of_threads, number_of_pending_tasks)
AsyncProcessor::new("Test", NUMBER_OF_THREADS, NUMBER_OF_PENDING_TASKS)
.unwrap();

// When
let (broadcast_sender, mut broadcast_receiver) =
tokio::sync::broadcast::channel(1024);
let instant = Instant::now();
for _ in 0..number_of_pending_tasks {
for _ in 0..NUMBER_OF_PENDING_TASKS {
let broadcast_sender = broadcast_sender.clone();
let result = heavy_task_processor.try_spawn(async move {
sleep(Duration::from_secs(1));
Expand All @@ -269,29 +272,36 @@ mod tests {

// Then
while broadcast_receiver.recv().await.is_ok() {}
assert!(instant.elapsed() >= Duration::from_secs(10));
// 5 tasks running on 1 thread, each task taking 1 second,
// should complete in approximately 5 seconds overall.
// Allowing some LEEWAY to account for runtime overhead.
const LEEWAY: Duration = Duration::from_millis(300);
assert!(instant.elapsed() < Duration::from_secs(5) + LEEWAY);
// Make sure that the tasks were not executed in parallel.
assert!(instant.elapsed() >= Duration::from_secs(5));
// Wait for the metrics to be updated.
tokio::time::sleep(Duration::from_secs(1)).await;
let duration = Duration::from_nanos(heavy_task_processor.metric.busy.get());
assert_eq!(duration.as_secs(), 10);
assert_eq!(duration.as_secs(), 5);
let duration = Duration::from_nanos(heavy_task_processor.metric.idle.get());
assert_eq!(duration.as_secs(), 0);
}

#[tokio::test]
async fn executes_10_tasks_for_2_seconds_with_10_thread() {
async fn executes_10_blocking_tasks_for_1_second_with_10_threads__records_busy_time()
{
// Given
let number_of_pending_tasks = 10;
let number_of_threads = 10;
const NUMBER_OF_PENDING_TASKS: usize = 10;
const NUMBER_OF_THREADS: usize = 10;
let heavy_task_processor =
AsyncProcessor::new("Test", number_of_threads, number_of_pending_tasks)
AsyncProcessor::new("Test", NUMBER_OF_THREADS, NUMBER_OF_PENDING_TASKS)
.unwrap();

// When
let (broadcast_sender, mut broadcast_receiver) =
tokio::sync::broadcast::channel(1024);
let instant = Instant::now();
for _ in 0..number_of_pending_tasks {
for _ in 0..NUMBER_OF_PENDING_TASKS {
let broadcast_sender = broadcast_sender.clone();
let result = heavy_task_processor.try_spawn(async move {
sleep(Duration::from_secs(1));
Expand All @@ -303,7 +313,11 @@ mod tests {

// Then
while broadcast_receiver.recv().await.is_ok() {}
assert!(instant.elapsed() <= Duration::from_secs(2));
// 10 blocking tasks running on 10 threads, each task taking 1 second,
// should complete in approximately 1 second overall.
// Allowing some LEEWAY to account for runtime overhead.
const LEEWAY: Duration = Duration::from_millis(300);
assert!(instant.elapsed() <= Duration::from_secs(1) + LEEWAY);
// Wait for the metrics to be updated.
tokio::time::sleep(Duration::from_secs(1)).await;
let duration = Duration::from_nanos(heavy_task_processor.metric.busy.get());
Expand All @@ -313,19 +327,20 @@ mod tests {
}

#[tokio::test]
async fn executes_10_tasks_for_2_seconds_with_1_thread() {
async fn executes_10_non_blocking_tasks_for_1_second_with_10_threads__records_idle_time(
) {
// Given
let number_of_pending_tasks = 10;
let number_of_threads = 10;
const NUMBER_OF_PENDING_TASKS: usize = 10;
const NUMBER_OF_THREADS: usize = 10;
let heavy_task_processor =
AsyncProcessor::new("Test", number_of_threads, number_of_pending_tasks)
AsyncProcessor::new("Test", NUMBER_OF_THREADS, NUMBER_OF_PENDING_TASKS)
.unwrap();

// When
let (broadcast_sender, mut broadcast_receiver) =
tokio::sync::broadcast::channel(1024);
let instant = Instant::now();
for _ in 0..number_of_pending_tasks {
for _ in 0..NUMBER_OF_PENDING_TASKS {
let broadcast_sender = broadcast_sender.clone();
let result = heavy_task_processor.try_spawn(async move {
tokio::time::sleep(Duration::from_secs(1)).await;
Expand All @@ -337,7 +352,11 @@ mod tests {

// Then
while broadcast_receiver.recv().await.is_ok() {}
assert!(instant.elapsed() <= Duration::from_secs(2));
// 10 non-blocking tasks running on 10 threads, each task taking 1 second,
// should complete in approximately 1 second overall.
// Allowing some LEEWAY to account for runtime overhead.
const LEEWAY: Duration = Duration::from_millis(300);
assert!(instant.elapsed() <= Duration::from_secs(1) + LEEWAY);
// Wait for the metrics to be updated.
tokio::time::sleep(Duration::from_secs(1)).await;
let duration = Duration::from_nanos(heavy_task_processor.metric.busy.get());
Expand Down
2 changes: 2 additions & 0 deletions version-compatibility/forkless-upgrade/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ hex = "0.4.3"
rand = "0.8"
tempfile = "3.4"
tokio = { version = "1.37.0", features = ["rt-multi-thread"] }
# Remove when we upgrade rust version
netlink-proto = "=0.11.3"

# Neutral deps
fuel-core-trace = { path = "../../crates/trace" }
Expand Down
3 changes: 3 additions & 0 deletions version-compatibility/forkless-upgrade/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#![deny(unused_crate_dependencies)]
#![deny(warnings)]

#[cfg(test)]
use netlink_proto as _;

#[cfg(test)]
mod backward_compatibility;
#[cfg(test)]
Expand Down

0 comments on commit 609b9ce

Please sign in to comment.