Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RPC Consistency Proposal #2473

Open
wants to merge 51 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
a8f5f41
Start structuring middleware
acerone85 Dec 5, 2024
778dd7c
Use a static atomic to keep track of the current block height
acerone85 Dec 5, 2024
d1b1c33
Add changelog entry
acerone85 Dec 5, 2024
b77506c
Add tests and fix middleware
acerone85 Dec 5, 2024
b764cd4
Update crates/fuel-core/src/graphql_api/api_service.rs
acerone85 Dec 6, 2024
d9091e6
Set headers in fuel client
acerone85 Dec 6, 2024
90c411f
AxumRequest/AxumResponse -> http::Request/http::Response
acerone85 Dec 6, 2024
625bf8f
Fix compilation
acerone85 Dec 6, 2024
085d173
Extension instead of Axum middleware
acerone85 Dec 7, 2024
a98ef9b
Merge branch 'master' into 1897-rpc-consistency-proposal
xgreenx Dec 12, 2024
c95f09a
Add response header
acerone85 Dec 12, 2024
6099651
Better handling of response
acerone85 Dec 14, 2024
3b81537
Use BlockHeight type when possible
acerone85 Dec 14, 2024
11e6876
Fix compilation error
acerone85 Dec 14, 2024
88b9c52
Use Request data to retrieve required block height
acerone85 Dec 14, 2024
c5f82ff
Remove useless .into
acerone85 Dec 15, 2024
844e4e6
Move up const definitions
acerone85 Dec 15, 2024
71f8087
Remove useless .into()
acerone85 Dec 16, 2024
cbea694
Inject request data in subscription handler
acerone85 Dec 16, 2024
da7f057
Improvements
acerone85 Dec 16, 2024
f5e63f6
Add documentation
acerone85 Dec 16, 2024
5f4500b
Do not fetch the read view twice
acerone85 Dec 16, 2024
37dd63e
Remove commented code
acerone85 Dec 16, 2024
9913620
Typo
acerone85 Dec 16, 2024
f849eb5
Update crates/fuel-core/src/graphql_api/api_service.rs
acerone85 Dec 16, 2024
1fddd80
Improve setting headers in client
acerone85 Dec 16, 2024
e1f71d4
Revert to setting header in graphql extension when possible
acerone85 Dec 16, 2024
be7f927
Update comment
acerone85 Dec 16, 2024
0b1e31f
Remove use of Arc<Mutex<_>>
acerone85 Dec 16, 2024
8e1b0a5
Remove stray comment
acerone85 Dec 16, 2024
abe13c2
WIP
acerone85 Dec 16, 2024
eae1e22
Example of how it can be implemented via `extensions`
xgreenx Dec 16, 2024
0a3da1f
Merge branch 'master' into 1897-rpc-consistency-proposal
acerone85 Dec 18, 2024
e7e1db7
Add current height header on failed requests
acerone85 Dec 18, 2024
6b76769
Return current level after request has been executed
acerone85 Dec 19, 2024
b6f206d
WIP: Use only graphql extensions: tests still to be adjusted
acerone85 Dec 31, 2024
2f52aaa
Merge branch 'master' into 1897-rpc-consistency-proposal
acerone85 Jan 20, 2025
cd3be50
Add capability to the client to set required_fuel_block_height + fix …
acerone85 Jan 20, 2025
dbc5cdc
Fix other tests
acerone85 Jan 20, 2025
c5db12f
Rename test file
acerone85 Jan 20, 2025
24b8d69
Fix BlockHeight base format in extension error
acerone85 Jan 20, 2025
4a8d839
Fix typo
acerone85 Jan 21, 2025
51ec4a9
Merge branch 'master' into 1897-rpc-consistency-proposal
acerone85 Jan 21, 2025
37b03f1
Fix compilation error after rename
acerone85 Jan 21, 2025
396a953
Reference follow-up issue
acerone85 Jan 21, 2025
af0c6d6
Fix rustfmt
acerone85 Jan 21, 2025
b1e89d6
Fix subscriptions feature
acerone85 Jan 21, 2025
6e50f3b
Use HashMap::new instead of Default::default
acerone85 Jan 21, 2025
dcec472
Downgrade netlink-proto
acerone85 Jan 21, 2025
8c6af1f
Revert "Downgrade netlink-proto"
acerone85 Jan 21, 2025
818a717
Merge branch 'master' into 1897-rpc-consistency-proposal
acerone85 Jan 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- [2429](https://github.com/FuelLabs/fuel-core/pull/2429): Introduce custom enum for representing result of running service tasks
- [2377](https://github.com/FuelLabs/fuel-core/pull/2377): Add more errors that can be returned as responses when using protocol `/fuel/req_res/0.0.2`. The errors supported are `ProtocolV1EmptyResponse` (status code `0`) for converting empty responses sent via protocol `/fuel/req_res/0.0.1`, `RequestedRangeTooLarge`(status code `1`) if the client requests a range of objects such as sealed block headers or transactions too large, `Timeout` (status code `2`) if the remote peer takes too long to fulfill a request, or `SyncProcessorOutOfCapacity` if the remote peer is fulfilling too many requests concurrently.
- [2233](https://github.com/FuelLabs/fuel-core/pull/2233): Introduce a new column `modification_history_v2` for storing the modification history in the historical rocksDB. Keys in this column are stored in big endian order. Changed the behaviour of the historical rocksDB to write changes for new block heights to the new column, and to perform lookup of values from the `modification_history_v2` table first, and then from the `modification_history` table, performing a migration upon access if necessary.
- [2473](https://github.com/FuelLabs/fuel-core/pull/2473): Graphql requests look for a `REQUIRED_FUEL_BLOCK_HEIGHT` header. If the header is specified, the request will not be served unless the node's current fuel block height is at least the value specified in the header. All graphql responses now contain a `CURRENT_FUEL_BLOCK_HEIGHT` header which contains the block height of the last block processed by the node.

#### Breaking
- [2389](https://github.com/FuelLabs/fuel-core/pull/2258): Updated the `messageProof` GraphQL schema to return a non-nullable `MessageProof`.
Expand Down
51 changes: 47 additions & 4 deletions crates/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ use pagination::{
PaginatedResult,
PaginationRequest,
};
use reqwest::header::{
AsHeaderName,
HeaderMap,
HeaderValue,
IntoHeaderName,
};
use schema::{
balance::BalanceArgs,
blob::BlobByIdArgs,
Expand Down Expand Up @@ -149,12 +155,24 @@ pub mod types;

type RegisterId = u32;

#[derive(Debug, derive_more::Display, derive_more::From)]
#[non_exhaustive]
/// Error occurring during interaction with the FuelClient
// anyhow::Error is wrapped inside a custom Error type,
// so that we can specific error variants in the future.
pub enum Error {
/// Unknown or not expected(by architecture) error.
#[from]
Other(anyhow::Error),
}

#[derive(Debug, Clone)]
pub struct FuelClient {
client: reqwest::Client,
#[cfg(feature = "subscriptions")]
cookie: std::sync::Arc<reqwest::cookie::Jar>,
url: reqwest::Url,
headers: HeaderMap,
}

impl FromStr for FuelClient {
Expand Down Expand Up @@ -182,13 +200,18 @@ impl FromStr for FuelClient {
client,
cookie,
url,
headers: HeaderMap::new(),
})
}

#[cfg(not(feature = "subscriptions"))]
{
let client = reqwest::Client::new();
Ok(Self { client, url })
Ok(Self {
client,
url,
headers: HeaderMap::new(),
})
}
}
}
Expand Down Expand Up @@ -221,6 +244,23 @@ impl FuelClient {
Self::from_str(url.as_ref())
}

pub fn set_header(
&mut self,
key: impl IntoHeaderName,
value: impl TryInto<HeaderValue>,
) -> Result<&mut Self, Error> {
let header_value: HeaderValue = value
.try_into()
.map_err(|_err| anyhow::anyhow!("Cannot parse value for header"))?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we now have an proper Error, maybe we should introduce specific variant instead of anyhow?

self.headers.insert(key, header_value);
Ok(self)
}

pub fn remove_header(&mut self, key: impl AsHeaderName) -> &mut Self {
self.headers.remove(key);
self
}

/// Send the GraphQL query to the client.
pub async fn query<ResponseData, Vars>(
&self,
Expand All @@ -230,9 +270,12 @@ impl FuelClient {
Vars: serde::Serialize,
ResponseData: serde::de::DeserializeOwned + 'static,
{
let response = self
.client
.post(self.url.clone())
let mut request_builder = self.client.post(self.url.clone());
for (header_name, header_value) in self.headers.iter() {
request_builder = request_builder.header(header_name, header_value);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use headers() to avoid manual loop, like:

    request_builder = request_builder.headers(&self.headers);


let response = request_builder
.run_graphql(q)
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
Expand Down
1 change: 1 addition & 0 deletions crates/fuel-core/src/graphql_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub mod database;
pub(crate) mod indexation;
pub(crate) mod metrics_extension;
pub mod ports;
pub(crate) mod required_fuel_block_height_extension;
pub mod storage;
pub(crate) mod validation_extension;
pub(crate) mod view_extension;
Expand Down
122 changes: 117 additions & 5 deletions crates/fuel-core/src/graphql_api/api_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ use crate::{
view_extension::ViewExtension,
Config,
},
graphql_api,
graphql_api::{
self,
required_fuel_block_height_extension::RequiredFuelBlockHeightExtension,
},
schema::{
CoreSchema,
CoreSchemaBuilder,
Expand All @@ -34,6 +37,7 @@ use axum::{
extract::{
DefaultBodyLimit,
Extension,
FromRequest,
},
http::{
header::{
Expand All @@ -42,11 +46,13 @@ use axum::{
ACCESS_CONTROL_ALLOW_ORIGIN,
},
HeaderValue,
StatusCode,
},
response::{
sse::Event,
Html,
IntoResponse,
IntoResponseParts,
Sse,
},
routing::{
Expand All @@ -69,6 +75,7 @@ use futures::Stream;
use hyper::rt::Executor;
use serde_json::json;
use std::{
convert::Infallible,
future::Future,
net::{
SocketAddr,
Expand All @@ -77,6 +84,7 @@ use std::{
pin::Pin,
sync::Arc,
};
use tokio::sync::Mutex;
use tokio_stream::StreamExt;
use tower::limit::ConcurrencyLimitLayer;
use tower_http::{
Expand All @@ -85,10 +93,16 @@ use tower_http::{
trace::TraceLayer,
};

pub(crate) const REQUIRED_FUEL_BLOCK_HEIGHT_HEADER: &str = "REQUIRED_FUEL_BLOCK_HEIGHT";
pub(crate) const CURRENT_FUEL_BLOCK_HEIGHT_HEADER: &str = "CURRENT_FUEL_BLOCK_HEIGHT";

pub type Service = fuel_core_services::ServiceRunner<GraphqlService>;

pub use super::database::ReadDatabase;
use super::ports::worker;
use super::{
ports::worker,
required_fuel_block_height_extension::RequiredFuelBlockHeightTooFarInTheFuture,
};

pub type BlockProducer = Box<dyn BlockProducerPort>;
// In the future GraphQL should not be aware of `TxPool`. It should
Expand Down Expand Up @@ -274,6 +288,9 @@ where
))
.extension(async_graphql::extensions::Tracing)
.extension(ViewExtension::new())
// `RequiredFuelBlockHeightExtension`` uses the view set by the ViewExtension.
acerone85 marked this conversation as resolved.
Show resolved Hide resolved
// Do not reorder this line before adding the `ViewExtension`.
.extension(RequiredFuelBlockHeightExtension::new())
.finish();

let graphql_endpoint = "/v1/graphql";
Expand Down Expand Up @@ -346,19 +363,114 @@ async fn health() -> Json<serde_json::Value> {
Json(json!({ "up": true }))
}

/// Optional value which is set via the REQUIRED_FUEL_BLOCK_HEIGHT_HEADER header.
/// When present, it is used to check whether the current block height is lower
/// than the current fuel block height. Requests that do not meet this
/// condition fail with a 412 `Precondition Failed` status code.
#[derive(Clone)]
pub(crate) struct RequiredHeight(pub(crate) Option<BlockHeight>);

#[async_trait::async_trait]
impl<Body> FromRequest<Body> for RequiredHeight
where
Body: Send,
{
type Rejection = (StatusCode, String);

async fn from_request(
req: &mut axum::extract::RequestParts<Body>,
) -> Result<Self, Self::Rejection> {
let required_fuel_block_height = req
.headers()
.get(REQUIRED_FUEL_BLOCK_HEIGHT_HEADER)
.map(|value| value.to_str())
.transpose()
.map_err(|_| (StatusCode::BAD_REQUEST, "Header Malformed".to_string()))?
.map(|value| value.parse::<u32>())
.transpose()
.map_err(|_| (StatusCode::BAD_REQUEST, "Header Malformed".to_string()))?;

Ok(RequiredHeight(
required_fuel_block_height.map(BlockHeight::new),
))
}
}

/// Structure to be used to store the current fuel block height
/// in the graphql `RequiredFuelBlockHeightExtension`.
/// Instances of this type returned by the [RequiredFuelBlockHeightExtension]
/// are used to se the `CURRENT_FUEL_BLOCK_HEIGHT` header in the response.

struct CurrentHeight(BlockHeight);

impl IntoResponseParts for CurrentHeight {
type Error = Infallible;

fn into_response_parts(
self,
mut res: axum::response::ResponseParts,
) -> Result<axum::response::ResponseParts, Self::Error> {
let current_block_height: u32 = self.0.into();
res.headers_mut().insert(
CURRENT_FUEL_BLOCK_HEIGHT_HEADER,
current_block_height.into(),
);
Ok(res)
}
}

impl IntoResponse for CurrentHeight {
fn into_response(self) -> axum::response::Response {
(self, ()).into_response()
}
}

async fn graphql_handler(
required_fuel_block_height: RequiredHeight,
schema: Extension<CoreSchema>,
req: Json<Request>,
) -> Json<Response> {
schema.execute(req.0).await.into()
) -> Result<(CurrentHeight, Json<Response>), (StatusCode, CurrentHeight)> {
let current_fuel_block_height_data: Arc<Mutex<Option<BlockHeight>>> =
Arc::new(Mutex::new(None));

let request = req
.0
.data(current_fuel_block_height_data.clone())
.data(required_fuel_block_height);

let graphql_response: Response = schema.execute(request).await;

let current_block_height = CurrentHeight(
current_fuel_block_height_data
.lock()
.await
.expect("Block height is set"),
);

if graphql_response
.errors
.first()
.and_then(|err| err.source::<RequiredFuelBlockHeightTooFarInTheFuture>())
.is_some()
{
Err((StatusCode::PRECONDITION_FAILED, current_block_height))
} else {
Ok((current_block_height, graphql_response.into()))
}
}

async fn graphql_subscription_handler(
schema: Extension<CoreSchema>,
req: Json<Request>,
) -> Sse<impl Stream<Item = anyhow::Result<Event, serde_json::Error>>> {
let current_fuel_block_height_data: Arc<Mutex<Option<BlockHeight>>> =
Arc::new(Mutex::new(None));
let request = req
.0
.data(RequiredHeight(None))
.data(current_fuel_block_height_data);
let stream = schema
.execute_stream(req.0)
.execute_stream(request)
.map(|r| Event::default().json_data(r));
Sse::new(stream)
.keep_alive(axum::response::sse::KeepAlive::new().text("keep-alive-text"))
Expand Down
Loading
Loading