Skip to content

Commit

Permalink
Update async client
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelklishin committed Dec 9, 2024
1 parent f469be0 commit 00008fa
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 24 deletions.
6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ edition = "2021"
description = "RabbitMQ HTTP API client"
license = "MIT OR Apache-2.0"

[lib]
# doctests for async client must be wrapped
# with a Tokio block_on block or marked with no_run,
# or skipped entirely, which is what we want
doctest = false

[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde-aux = "4.5"
Expand Down
101 changes: 80 additions & 21 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,19 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt;

use backtrace::Backtrace;
use reqwest::{
header::{HeaderMap, HeaderValue},
Client as HttpClient, StatusCode,
};
use serde::Serialize;
use serde_json::{json, Map, Value};
use std::fmt;

use crate::error::Error;
use crate::error::Error::{
ClientErrorResponse, InvalidHeaderValue, RequestError, ServerErrorResponse,
};
use crate::{
commons::{BindingDestinationType, UserLimitTarget, VirtualHostLimitTarget},
path,
Expand All @@ -32,8 +35,52 @@ use crate::{
};

type HttpClientResponse = reqwest::Response;
type HttpClientError = Error<HttpClientResponse, StatusCode, reqwest::Error, Backtrace>;

pub type Result<T> = std::result::Result<T, HttpClientError>;

impl From<reqwest::Error> for HttpClientError {
fn from(req_err: reqwest::Error) -> Self {
match req_err.status() {
None => RequestError {
error: req_err,
backtrace: Backtrace::new(),
},
Some(status_code) => {
if status_code.is_client_error() {
return ClientErrorResponse {
status_code,
// reqwest::Error does not provide access to the associated
// response, if any
response: None,
backtrace: Backtrace::new(),
};
};

pub type Result<T> = std::result::Result<T, Error<HttpClientResponse>>;
if status_code.is_server_error() {
return ServerErrorResponse {
status_code,
// reqwest::Error does not provide access to the associated
// response, if any
response: None,
backtrace: Backtrace::new(),
};
};

RequestError {
error: req_err,
backtrace: Backtrace::new(),
}
}
}
}
}

impl From<reqwest::header::InvalidHeaderValue> for HttpClientError {
fn from(err: reqwest::header::InvalidHeaderValue) -> Self {
InvalidHeaderValue { error: err }
}
}

/// A `ClientBuilder` can be used to create a `Client` with custom configuration.
///
Expand Down Expand Up @@ -144,11 +191,11 @@ where
/// let password = "password";
/// let rc = Client::new(&endpoint, &username, &password);
/// // list cluster nodes
/// rc.list_nodes().await;
/// let _ = rc.list_nodes().await?;
/// // list user connections
/// rc.list_connections().await;
/// let _ = rc.list_connections().await?;
/// // fetch information and metrics of a specific queue
/// rc.get_queue_info("/", "qq.1").await;
/// let _ = rc.get_queue_info("/", "qq.1").await;
/// ```
pub struct Client<E, U, P> {
endpoint: E,
Expand Down Expand Up @@ -684,7 +731,7 @@ where
let response = self.http_delete(&path, None, None).await?;
Ok(response)
}
_ => Err(Error::MultipleMatchingBindings()),
_ => Err(Error::MultipleMatchingBindings),
}
}

Expand Down Expand Up @@ -1043,14 +1090,16 @@ where
)
.await?;

if response.status().is_success() {
let status_code = response.status();
if status_code.is_success() {
return Ok(());
}

let failure_details = response.json().await?;
Err(Error::HealthCheckFailed(
responses::HealthCheckFailureDetails::NodeIsQuorumCritical(failure_details),
))
Err(Error::HealthCheckFailed {
status_code,
details: failure_details,
})
}

//
Expand Down Expand Up @@ -1117,14 +1166,17 @@ where
let response = self
.http_get(path, None, Some(StatusCode::SERVICE_UNAVAILABLE))
.await?;
if response.status().is_success() {
let status_code = response.status();
if status_code.is_success() {
return Ok(());
}

let failure_details = response.json().await?;
Err(Error::HealthCheckFailed(
responses::HealthCheckFailureDetails::AlarmCheck(failure_details),
))
let body = response.json().await?;
let failure_details = responses::HealthCheckFailureDetails::AlarmCheck(body);
Err(Error::HealthCheckFailed {
details: failure_details,
status_code,
})
}

async fn list_exchange_bindings_with_source_or_destination(
Expand Down Expand Up @@ -1278,18 +1330,25 @@ where
match client_expect_code_error {
Some(expect) if status == expect => {}
_ => {
return Err(Error::ClientErrorResponse(
status,
response.error_for_status()?,
))
return Err(ClientErrorResponse {
response: Some(response),
status_code: status,
backtrace: Backtrace::new(),
})
}
}
}

if status.is_server_error() {
match server_expect_code_error {
Some(expect) if status == expect => {}
_ => return Err(Error::ServerErrorResponse(status, response)),
_ => {
return Err(ServerErrorResponse {
response: Some(response),
status_code: status,
backtrace: Backtrace::new(),
})
}
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/blocking_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,11 @@ impl From<reqwest::header::InvalidHeaderValue> for HttpClientError {
/// let password = "password";
/// let rc = ClientBuilder::new().with_endpoint(&endpoint).with_basic_auth_credentials(&username, &password).build();
/// // list cluster nodes
/// rc.list_nodes();
/// let _ = rc.list_nodes();
/// // list user connections
/// rc.list_connections();
/// let _ = rc.list_connections();
/// // fetch information and metrics of a specific queue
/// rc.get_queue_info("/", "qq.1");
/// let _ = rc.get_queue_info("/", "qq.1");
/// ```
pub struct ClientBuilder<E, U, P> {
endpoint: E,
Expand Down

0 comments on commit 00008fa

Please sign in to comment.