Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set Latest Recorded Height on startup #2603

Merged
merged 22 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
83bf21a
Use Url apis to build the full url path
MitchTurner Jan 18, 2025
b9977d4
Remove unused import
MitchTurner Jan 18, 2025
e8d72c8
Update CHANGELOG
MitchTurner Jan 18, 2025
968fd69
Make comment more inclusive
MitchTurner Jan 18, 2025
b331fe2
Add test to show the latest recorded height is set on init
MitchTurner Jan 20, 2025
b3da4da
Update CHANGELOG
MitchTurner Jan 20, 2025
cdfec32
Apply fix to shared sequencer service
MitchTurner Jan 20, 2025
b3540ce
Avoid creating extra storage tx
MitchTurner Jan 20, 2025
2f021af
Add missed commit
MitchTurner Jan 20, 2025
c57b645
Add missing api endpoints
MitchTurner Jan 20, 2025
575c28f
Appease Clippy-sama
MitchTurner Jan 20, 2025
6be00a7
Merge branch 'fix-url-scheme' into set-latest-recorded-height-on-startup
MitchTurner Jan 20, 2025
449bd41
Merge branch 'master' into set-latest-recorded-height-on-startup
MitchTurner Jan 21, 2025
25bca69
Merge branch 'master' into set-latest-recorded-height-on-startup
MitchTurner Jan 21, 2025
dc13c4e
Fix failing test
MitchTurner Jan 21, 2025
76f5b43
Include `--starting-recorded-height` in `cli` flags
MitchTurner Jan 21, 2025
6a44c23
Remove unused imports
MitchTurner Jan 21, 2025
fd0b38e
Remove unused imports
MitchTurner Jan 21, 2025
21edb4f
Merge branch 'master' into set-latest-recorded-height-on-startup
MitchTurner Jan 21, 2025
ab55e3f
Rename cli arg
MitchTurner Jan 22, 2025
3316f76
Merge branch 'master' into set-latest-recorded-height-on-startup
MitchTurner Jan 22, 2025
e18acd0
Merge branch 'master' into set-latest-recorded-height-on-startup
MitchTurner Jan 22, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
### Added
- [2551](https://github.com/FuelLabs/fuel-core/pull/2551): Enhanced the DA compressed block header to include block id.

### Changed
- [2603](https://github.com/FuelLabs/fuel-core/pull/2603): Sets the latest recorded height on initialization, not just when DA costs are received

### Fixed
- [2599](https://github.com/FuelLabs/fuel-core/pull/2599): Use the proper `url` apis to construct full url path in `BlockCommitterHttpApi` client

## [Version 0.41.0]

### Added
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,12 @@ impl BlockCommitterApi for BlockCommitterHttpApi {
&self,
l2_block_number: u32,
) -> DaBlockCostsResult<Vec<RawDaBlockCosts>> {
// Specific: http://localhost:8080/v1/costs?variant=specific&value=19098935&limit=5
// Specific: http://committer.url/v1/costs?variant=specific&value=19098935&limit=5
if let Some(url) = &self.url {
tracing::debug!("getting da costs by l2 block number: {l2_block_number}");
let formatted_url = format!("{url}/v1/costs?variant=specific&value={l2_block_number}&limit={NUMBER_OF_BUNDLES}");
let response = self.client.get(formatted_url).send().await?;
let path = format!("/v1/costs?variant=specific&value={l2_block_number}&limit={NUMBER_OF_BUNDLES}");
let full_path = url.join(&path)?;
let response = self.client.get(full_path).send().await?;
let parsed = response.json::<Vec<RawDaBlockCosts>>().await?;
Ok(parsed)
} else {
Expand Down
213 changes: 205 additions & 8 deletions crates/services/gas_price_service/src/v1/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ where
storage_tx_provider: AtomicStorage,
/// communicates to the Da source service what the latest L2 block was
latest_l2_block: Arc<AtomicU32>,
/// Initial Recorded Height
initial_recorded_height: Option<BlockHeight>,
}

impl<L2, DA, StorageTxProvider> GasPriceServiceV1<L2, DA, StorageTxProvider>
Expand All @@ -140,6 +142,11 @@ where
pub fn latest_l2_block(&self) -> &AtomicU32 {
&self.latest_l2_block
}

#[cfg(test)]
pub fn initial_recorded_height(&self) -> Option<BlockHeight> {
self.initial_recorded_height
}
}

impl<L2, DA, AtomicStorage> GasPriceServiceV1<L2, DA, AtomicStorage>
Expand Down Expand Up @@ -180,6 +187,7 @@ where
DA: DaBlockCostsSource,
AtomicStorage: GasPriceServiceAtomicStorage,
{
#[allow(clippy::too_many_arguments)]
pub fn new(
l2_block_source: L2,
shared_algo: SharedV1Algorithm,
Expand All @@ -188,6 +196,7 @@ where
da_source_adapter_handle: ServiceRunner<DaSourceService<DA>>,
storage_tx_provider: AtomicStorage,
latest_l2_block: Arc<AtomicU32>,
initial_recorded_height: Option<BlockHeight>,
) -> Self {
let da_source_channel = da_source_adapter_handle.shared.clone().subscribe();
Self {
Expand All @@ -200,6 +209,7 @@ where
da_block_costs_buffer: Vec::new(),
storage_tx_provider,
latest_l2_block,
initial_recorded_height,
}
}

Expand Down Expand Up @@ -237,9 +247,16 @@ where
) -> anyhow::Result<()> {
let capacity = Self::validate_block_gas_capacity(block_gas_capacity)?;
let mut storage_tx = self.storage_tx_provider.begin_transaction()?;
let mut latest_recorded_height = storage_tx
let mut new_recorded_height = match storage_tx
.get_recorded_height()
.map_err(|err| anyhow!(err))?;
.map_err(|err| anyhow!(err))?
{
Some(_) => None,
None => {
// Sets it on first run
self.initial_recorded_height.take()
}
};

for da_block_costs in &self.da_block_costs_buffer {
tracing::debug!("Updating DA block costs: {:?}", da_block_costs);
Expand All @@ -251,10 +268,10 @@ where
da_block_costs.blob_cost_wei,
&mut storage_tx.as_unrecorded_blocks(),
)?;
latest_recorded_height = Some(BlockHeight::from(end));
new_recorded_height = Some(BlockHeight::from(end));
}

if let Some(recorded_height) = latest_recorded_height {
if let Some(recorded_height) = new_recorded_height {
storage_tx
.set_recorded_height(recorded_height)
.map_err(|err| anyhow!(err))?;
Expand Down Expand Up @@ -417,6 +434,7 @@ where
#[allow(non_snake_case)]
#[cfg(test)]
mod tests {
use super::*;
use std::{
num::NonZeroU64,
sync::{
Expand All @@ -436,7 +454,6 @@ mod tests {
};
use fuel_core_storage::{
structured_storage::test::InMemoryStorage,
tables::merkle::DenseMetadataKey::Latest,
transactional::{
IntoTransaction,
StorageTransaction,
Expand All @@ -450,9 +467,7 @@ mod tests {
common::{
fuel_core_storage_adapter::storage::{
GasPriceColumn,
GasPriceColumn::UnrecordedBlocks,
GasPriceMetadata,
RecordedHeights,
UnrecordedBlocksTable,
},
gas_price_algorithm::SharedGasPriceAlgo,
Expand All @@ -464,8 +479,10 @@ mod tests {
},
},
ports::{
GasPriceServiceAtomicStorage,
GetLatestRecordedHeight,
GetMetadataStorage,
SetLatestRecordedHeight,
SetMetadataStorage,
},
v1::{
Expand All @@ -484,9 +501,13 @@ mod tests {
GasPriceServiceV1,
LatestGasPrice,
},
uninitialized_task::fuel_storage_unrecorded_blocks::FuelStorageUnrecordedBlocks,
uninitialized_task::fuel_storage_unrecorded_blocks::AsUnrecordedBlocks,
},
};
use fuel_gas_price_algorithm::v1::{
Bytes,
Height,
};

struct FakeL2BlockSource {
l2_block: mpsc::Receiver<BlockInfo>,
Expand Down Expand Up @@ -604,6 +625,7 @@ mod tests {
da_service_runner,
inner,
latest_l2_height,
None,
);
let read_algo = service.next_block_algorithm();
let mut watcher = StateWatcher::default();
Expand Down Expand Up @@ -699,6 +721,7 @@ mod tests {
da_service_runner,
inner,
latest_l2_block,
None,
);
let read_algo = service.next_block_algorithm();
let initial_price = read_algo.next_gas_price();
Expand Down Expand Up @@ -802,6 +825,7 @@ mod tests {
da_service_runner,
inner,
latest_l2_height,
None,
);
let read_algo = service.next_block_algorithm();
let initial_price = read_algo.next_gas_price();
Expand Down Expand Up @@ -894,6 +918,7 @@ mod tests {
da_service_runner,
inner,
latest_l2_height,
None,
);
let read_algo = service.next_block_algorithm();
let initial_price = read_algo.next_gas_price();
Expand All @@ -918,4 +943,176 @@ mod tests {

service.shutdown().await.unwrap();
}

fn arbitrary_config() -> V1AlgorithmConfig {
V1AlgorithmConfig {
new_exec_gas_price: 100,
min_exec_gas_price: 50,
exec_gas_price_change_percent: 20,
l2_block_fullness_threshold_percent: 20,
gas_price_factor: NonZeroU64::new(10).unwrap(),
min_da_gas_price: 10,
max_da_gas_price: 11,
max_da_gas_price_change_percent: 20,
da_p_component: 4,
da_d_component: 2,
normal_range_size: 10,
capped_range_size: 100,
decrease_range_size: 4,
block_activity_threshold: 20,
da_poll_interval: None,
}
}

#[derive(Clone)]
struct FakeAtomicStorage {
inner: Arc<Mutex<Option<BlockHeight>>>,
}

impl FakeAtomicStorage {
fn new() -> Self {
Self {
inner: Arc::new(Mutex::new(None)),
}
}
}

impl GasPriceServiceAtomicStorage for FakeAtomicStorage {
type Transaction<'a>
= Self
where
Self: 'a;

fn begin_transaction(&mut self) -> GasPriceResult<Self::Transaction<'_>> {
Ok(self.clone())
}

fn commit_transaction(transaction: Self::Transaction<'_>) -> GasPriceResult<()> {
Ok(())
}
}

impl GetLatestRecordedHeight for FakeAtomicStorage {
fn get_recorded_height(&self) -> GasPriceResult<Option<BlockHeight>> {
let height = self.inner.lock().unwrap();
Ok(*height)
}
}

impl SetLatestRecordedHeight for FakeAtomicStorage {
fn set_recorded_height(&mut self, height: BlockHeight) -> GasPriceResult<()> {
*self.inner.lock().unwrap() = Some(height);
Ok(())
}
}

impl AsUnrecordedBlocks for FakeAtomicStorage {
type Wrapper<'a> = OkUnrecordedBlocks;

fn as_unrecorded_blocks(&mut self) -> Self::Wrapper<'_> {
OkUnrecordedBlocks
}
}

struct OkUnrecordedBlocks;

impl UnrecordedBlocks for OkUnrecordedBlocks {
fn insert(&mut self, height: Height, bytes: Bytes) -> Result<(), String> {
Ok(())
}

fn remove(&mut self, height: &Height) -> Result<Option<Bytes>, String> {
Ok(None)
}
}

impl GetMetadataStorage for FakeAtomicStorage {
fn get_metadata(
&self,
_: &BlockHeight,
) -> GasPriceResult<Option<UpdaterMetadata>> {
Ok(None)
}
}

impl SetMetadataStorage for FakeAtomicStorage {
fn set_metadata(&mut self, _: &UpdaterMetadata) -> GasPriceResult<()> {
Ok(())
}
}

#[tokio::test]
async fn run__sets_the_latest_recorded_block_if_not_set() {
// given
let expected_recorded_height = BlockHeight::new(9999999);

let block_height = 1;
let l2_block = BlockInfo::Block {
height: block_height,
gas_used: 60,
block_gas_capacity: 100,
block_bytes: 100,
block_fees: 100,
gas_price: 100,
};

let (l2_block_sender, l2_block_receiver) = mpsc::channel(1);
let l2_block_source = FakeL2BlockSource {
l2_block: l2_block_receiver,
};

let metadata_storage = FakeMetadata::empty();
let l2_block_height = 0;
let config = arbitrary_config();
let atomic_storage = FakeAtomicStorage::new();
let handle = atomic_storage.clone();
let (algo_updater, shared_algo) = initialize_algorithm(
&config,
l2_block_height,
l2_block_height,
&metadata_storage,
)
.unwrap();

let notifier = Arc::new(tokio::sync::Notify::new());
let latest_l2_height = Arc::new(AtomicU32::new(0));
let recorded_height = BlockHeight::new(0);

let dummy_da_source = DaSourceService::new(
DummyDaBlockCosts::new(
Err(anyhow::anyhow!("unused at the moment")),
notifier.clone(),
),
None,
Arc::clone(&latest_l2_height),
recorded_height,
);
let da_service_runner = ServiceRunner::new(dummy_da_source);
da_service_runner.start_and_await().await.unwrap();
let latest_gas_price = LatestGasPrice::new(0, 0);

let mut service = GasPriceServiceV1::new(
l2_block_source,
shared_algo,
latest_gas_price,
algo_updater,
da_service_runner,
atomic_storage,
latest_l2_height,
Some(expected_recorded_height),
);
let read_algo = service.next_block_algorithm();
let initial_price = read_algo.next_gas_price();
let mut watcher = StateWatcher::default();

// when
service.run(&mut watcher).await;
l2_block_sender.send(l2_block).await.unwrap();
service.shutdown().await.unwrap();

// then
let actual = handle.get_recorded_height().unwrap().unwrap();

assert_eq!(expected_recorded_height, actual);
}
}
Loading
Loading