Skip to content

Commit

Permalink
Add Client#health_check_protocol_listener
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelklishin committed Dec 31, 2024
1 parent c235eb0 commit 6878c35
Show file tree
Hide file tree
Showing 6 changed files with 280 additions and 11 deletions.
22 changes: 16 additions & 6 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,28 +1,38 @@
# Rust Client for the RabbitMQ HTTP API Change Log

# v0.14.0 (in development)
## v0.15.0 (in development)

No (documented) changes yet.


# v0.13.0 (Dec 31, 2024)
## v0.14.0 (Dec 31, 2024)

### Enhancements

* New `responses::HealthCheckFailureDetails` variants to accommodate active port and protocol
listener [health checks](https://www.rabbitmq.com/docs/monitoring#health-checks)

* New [health check](https://www.rabbitmq.com/docs/monitoring#health-checks) function: `Client#health_check_protocol_listener`


## v0.13.0 (Dec 31, 2024)

### Enhancements

* New functions for listing [stream](https://www.rabbitmq.com/docs/streams) connections, publishers and consumers: `Client#list_stream_publishers`, `Client#list_stream_publishers_in`, `Client#list_stream_publishers_of`, `Client#list_stream_publishers_on_connection`, `Client#list_stream_consumers`, `Client#list_stream_consumers_in`, `Client#list_stream_consumers_on_connection`, `Client#list_stream_connections`, `Client#list_stream_connections_in`

* New [health check](https://www.rabbitmq.com/docs/monitoring#health-checks) function: `Client#health_check_port_listene`
* New [health check](https://www.rabbitmq.com/docs/monitoring#health-checks) function: `Client#health_check_port_listener`


# v0.12.0 (Dec 28, 2024)
## v0.12.0 (Dec 28, 2024)

### Enhancements

* `Client#list_feature_flags`, `Client#enable_feature_flag`, `Client#enable_all_stable_feature_flags` are three
new functions for working with [feature flags](https://www.rabbitmq.com/docs/feature-flags)


# v0.11.0 (Dec 28, 2024)
## v0.11.0 (Dec 28, 2024)

### Enhancements

Expand All @@ -34,7 +44,7 @@ No (documented) changes yet.
in the cluster, including their state and stability.


# v0.10.0 (Dec 27, 2024)
## v0.10.0 (Dec 27, 2024)

### Dependencies

Expand Down
14 changes: 13 additions & 1 deletion src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::responses::{
MessageList,
};
use crate::{
commons::{BindingDestinationType, UserLimitTarget, VirtualHostLimitTarget},
commons::{BindingDestinationType, UserLimitTarget, VirtualHostLimitTarget, SupportedProtocol},
path,
requests::{
self, BulkUserDelete, EnforcedLimitParams, ExchangeParams, Permissions, PolicyParams,
Expand Down Expand Up @@ -1233,6 +1233,18 @@ where
self.boolean_health_check(path).await
}

pub async fn health_check_port_listener(&self, port: u16) -> Result<()> {
let port_s = port.to_string();
let path = path!("health", "checks", "port-listener", port_s);
self.boolean_health_check(&path).await
}

pub async fn health_check_protocol_listener(&self, protocol: SupportedProtocol) -> Result<()> {
let proto: String = String::from(protocol);
let path = path!("health", "checks", "protocol-listener", proto);
self.boolean_health_check(&path).await
}

async fn boolean_health_check(&self, path: &str) -> std::result::Result<(), HttpClientError> {
// we expect that StatusCode::SERVICE_UNAVAILABLE may be return and ignore
// it here to provide a custom error type later
Expand Down
11 changes: 9 additions & 2 deletions src/blocking_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::responses::{
OAuthConfiguration,
};
use crate::{
commons::{BindingDestinationType, UserLimitTarget, VirtualHostLimitTarget},
commons::{BindingDestinationType, UserLimitTarget, VirtualHostLimitTarget, SupportedProtocol},
path,
requests::{
self, BulkUserDelete, EnforcedLimitParams, ExchangeParams, Permissions, PolicyParams,
Expand Down Expand Up @@ -1084,7 +1084,14 @@ where
}

pub fn health_check_port_listener(&self, port: u16) -> Result<()> {
let path = format!("health/checks/port-listener/{}", port);
let port_s = port.to_string();
let path = path!("health", "checks", "port-listener", port_s);
self.boolean_health_check(&path)
}

pub fn health_check_protocol_listener(&self, protocol: SupportedProtocol) -> Result<()> {
let proto: String = String::from(protocol);
let path = path!("health", "checks", "protocol-listener", proto);
self.boolean_health_check(&path)
}

Expand Down
179 changes: 179 additions & 0 deletions src/commons.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,185 @@ use std::fmt;

use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)]
#[serde(rename_all(serialize = "lowercase", deserialize = "PascalCase"))]
pub enum SupportedProtocol {
Clustering,
/// Represents both AMQP 1.0 and AMQP 0-9-1 because they share a listener
#[serde(rename = "amqp")]
AMQP,
/// Represents both AMQP 1.0 with TLS enabled and AMQP 0-9-1 with TLS enabled
#[serde(rename = "amqp/ssl")]
AMQPWithTLS,
/// Represents the RabbitMQ Stream protocol
#[serde(rename = "stream")]
Stream,
/// Represents the RabbitMQ Stream protocol with TLS enabled
#[serde(rename = "stream/ssl")]
StreamWithTLS,
#[serde(rename = "mqtt")]
MQTT,
#[serde(rename = "mqtt/ssl")]
MQTTWithTLS,
#[serde(rename = "stomp")]
STOMP,
#[serde(rename = "stomp/ssl")]
STOMPWithTLS,
#[serde(rename = "http/web-mqtt")]
MQTTOverWebSockets,
#[serde(rename = "https/web-mqtt")]
MQTTOverWebSocketsWithTLS,
#[serde(rename = "http/web-stomp")]
STOMPOverWebsockets,
#[serde(rename = "https/web-stomp")]
STOMPOverWebsocketsWithTLS,
#[serde(rename = "http/prometheus")]
Prometheus,
#[serde(rename = "https/prometheus")]
PrometheusWithTLS,
#[serde(rename = "http")]
HTTP,
#[serde(rename = "https")]
HTTPWithTLS,
Other(String),
}

const SUPPORTED_PROTOCOL_CLUSTERING: &str = "clustering";

const SUPPORTED_PROTOCOL_AMQP: &str = "amqp";
const SUPPORTED_PROTOCOL_AMQP_WITH_TLS: &str = "amqps";

const SUPPORTED_PROTOCOL_STREAM: &str = "stream";
const SUPPORTED_PROTOCOL_STREAM_WITH_TLS: &str = "stream/ssl";

const SUPPORTED_PROTOCOL_MQTT: &str = "mqtt";
const SUPPORTED_PROTOCOL_MQTT_WITH_TLS: &str = "mqtt/ssl";
const SUPPORTED_PROTOCOL_MQTT_OVER_WEBSOCKETS: &str = "http/web-mqtt";
const SUPPORTED_PROTOCOL_MQTT_OVER_WEBSOCKETS_WITH_TLS: &str = "https/web-mqtt";

const SUPPORTED_PROTOCOL_STOMP: &str = "stomp";
const SUPPORTED_PROTOCOL_STOMP_WITH_TLS: &str = "stomp/ssl";
const SUPPORTED_PROTOCOL_STOMP_OVER_WEBSOCKETS: &str = "http/stomp-mqtt";
const SUPPORTED_PROTOCOL_STOMP_OVER_WEBSOCKETS_WITH_TLS: &str = "https/stomp-mqtt";

const SUPPORTED_PROTOCOL_PROMETHEUS: &str = "http/prometheus";
const SUPPORTED_PROTOCOL_PROMETHEUS_WITH_TLS: &str = "https/prometheus";

const SUPPORTED_PROTOCOL_HTTP: &str = "http";
const SUPPORTED_PROTOCOL_HTTP_WITH_TLS: &str = "https";

impl From<&str> for SupportedProtocol {
fn from(value: &str) -> Self {
match value {
SUPPORTED_PROTOCOL_CLUSTERING => SupportedProtocol::Clustering,
SUPPORTED_PROTOCOL_AMQP => SupportedProtocol::AMQP,
SUPPORTED_PROTOCOL_AMQP_WITH_TLS => SupportedProtocol::AMQPWithTLS,
SUPPORTED_PROTOCOL_STREAM => SupportedProtocol::Stream,
SUPPORTED_PROTOCOL_STREAM_WITH_TLS => SupportedProtocol::StreamWithTLS,
SUPPORTED_PROTOCOL_MQTT => SupportedProtocol::MQTT,
SUPPORTED_PROTOCOL_MQTT_WITH_TLS => SupportedProtocol::MQTTWithTLS,
SUPPORTED_PROTOCOL_STOMP => SupportedProtocol::STOMP,
SUPPORTED_PROTOCOL_STOMP_WITH_TLS => SupportedProtocol::STOMPWithTLS,
SUPPORTED_PROTOCOL_MQTT_OVER_WEBSOCKETS => SupportedProtocol::MQTTOverWebSockets,
SUPPORTED_PROTOCOL_MQTT_OVER_WEBSOCKETS_WITH_TLS => {
SupportedProtocol::MQTTOverWebSocketsWithTLS
}
SUPPORTED_PROTOCOL_STOMP_OVER_WEBSOCKETS => SupportedProtocol::STOMPOverWebsockets,
SUPPORTED_PROTOCOL_STOMP_OVER_WEBSOCKETS_WITH_TLS => {
SupportedProtocol::STOMPOverWebsocketsWithTLS
}
SUPPORTED_PROTOCOL_PROMETHEUS => SupportedProtocol::Prometheus,
SUPPORTED_PROTOCOL_PROMETHEUS_WITH_TLS => SupportedProtocol::PrometheusWithTLS,
SUPPORTED_PROTOCOL_HTTP => SupportedProtocol::HTTP,
SUPPORTED_PROTOCOL_HTTP_WITH_TLS => SupportedProtocol::HTTPWithTLS,
other => SupportedProtocol::Other(other.to_owned()),
}
}
}

impl From<String> for SupportedProtocol {
fn from(value: String) -> Self {
SupportedProtocol::from(value.as_str())
}
}

impl From<SupportedProtocol> for String {
fn from(value: SupportedProtocol) -> String {
match value {
SupportedProtocol::Clustering => SUPPORTED_PROTOCOL_CLUSTERING.to_owned(),
SupportedProtocol::AMQP => SUPPORTED_PROTOCOL_AMQP.to_owned(),
SupportedProtocol::AMQPWithTLS => SUPPORTED_PROTOCOL_AMQP_WITH_TLS.to_owned(),
SupportedProtocol::Stream => SUPPORTED_PROTOCOL_STREAM.to_owned(),
SupportedProtocol::StreamWithTLS => SUPPORTED_PROTOCOL_STREAM_WITH_TLS.to_owned(),
SupportedProtocol::MQTT => SUPPORTED_PROTOCOL_MQTT.to_owned(),
SupportedProtocol::MQTTWithTLS => SUPPORTED_PROTOCOL_MQTT_WITH_TLS.to_owned(),
SupportedProtocol::STOMP => SUPPORTED_PROTOCOL_STOMP.to_owned(),
SupportedProtocol::STOMPWithTLS => SUPPORTED_PROTOCOL_STOMP_WITH_TLS.to_owned(),
SupportedProtocol::MQTTOverWebSockets => {
SUPPORTED_PROTOCOL_MQTT_OVER_WEBSOCKETS.to_owned()
}
SupportedProtocol::MQTTOverWebSocketsWithTLS => {
SUPPORTED_PROTOCOL_MQTT_OVER_WEBSOCKETS_WITH_TLS.to_owned()
}
SupportedProtocol::STOMPOverWebsockets => {
SUPPORTED_PROTOCOL_STOMP_OVER_WEBSOCKETS.to_owned()
}
SupportedProtocol::STOMPOverWebsocketsWithTLS => {
SUPPORTED_PROTOCOL_STOMP_OVER_WEBSOCKETS_WITH_TLS.to_owned()
}
SupportedProtocol::Prometheus => SUPPORTED_PROTOCOL_PROMETHEUS.to_owned(),
SupportedProtocol::PrometheusWithTLS => {
SUPPORTED_PROTOCOL_PROMETHEUS_WITH_TLS.to_owned()
}
SupportedProtocol::HTTP => SUPPORTED_PROTOCOL_HTTP.to_owned(),
SupportedProtocol::HTTPWithTLS => SUPPORTED_PROTOCOL_HTTP_WITH_TLS.to_owned(),
SupportedProtocol::Other(s) => s,
}
}
}

impl From<&SupportedProtocol> for String {
fn from(value: &SupportedProtocol) -> Self {
match value {
SupportedProtocol::Clustering => SUPPORTED_PROTOCOL_CLUSTERING.to_owned(),
SupportedProtocol::AMQP => SUPPORTED_PROTOCOL_AMQP.to_owned(),
SupportedProtocol::AMQPWithTLS => SUPPORTED_PROTOCOL_AMQP_WITH_TLS.to_owned(),
SupportedProtocol::Stream => SUPPORTED_PROTOCOL_STREAM.to_owned(),
SupportedProtocol::StreamWithTLS => SUPPORTED_PROTOCOL_STREAM_WITH_TLS.to_owned(),
SupportedProtocol::MQTT => SUPPORTED_PROTOCOL_MQTT.to_owned(),
SupportedProtocol::MQTTWithTLS => SUPPORTED_PROTOCOL_MQTT_WITH_TLS.to_owned(),
SupportedProtocol::STOMP => SUPPORTED_PROTOCOL_STOMP.to_owned(),
SupportedProtocol::STOMPWithTLS => SUPPORTED_PROTOCOL_STOMP_WITH_TLS.to_owned(),
SupportedProtocol::MQTTOverWebSockets => {
SUPPORTED_PROTOCOL_MQTT_OVER_WEBSOCKETS.to_owned()
}
SupportedProtocol::MQTTOverWebSocketsWithTLS => {
SUPPORTED_PROTOCOL_MQTT_OVER_WEBSOCKETS_WITH_TLS.to_owned()
}
SupportedProtocol::STOMPOverWebsockets => {
SUPPORTED_PROTOCOL_STOMP_OVER_WEBSOCKETS.to_owned()
}
SupportedProtocol::STOMPOverWebsocketsWithTLS => {
SUPPORTED_PROTOCOL_STOMP_OVER_WEBSOCKETS_WITH_TLS.to_owned()
}
SupportedProtocol::Prometheus => SUPPORTED_PROTOCOL_PROMETHEUS.to_owned(),
SupportedProtocol::PrometheusWithTLS => {
SUPPORTED_PROTOCOL_PROMETHEUS_WITH_TLS.to_owned()
}
SupportedProtocol::HTTP => SUPPORTED_PROTOCOL_HTTP.to_owned(),
SupportedProtocol::HTTPWithTLS => SUPPORTED_PROTOCOL_HTTP_WITH_TLS.to_owned(),
SupportedProtocol::Other(s) => (*s).clone(),
}
}
}

impl fmt::Display for SupportedProtocol {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let proto: String = self.into();
write!(f, "{}", proto)
}
}

/// Exchange types. Most variants are for exchange types included with modern RabbitMQ distributions.
/// For custom types provided by 3rd party plugins, use the `Plugin(String)` variant.
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)]
Expand Down
24 changes: 24 additions & 0 deletions src/responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -765,13 +765,17 @@ pub struct DefinitionSet {
pub enum HealthCheckFailureDetails {
AlarmCheck(ClusterAlarmCheckDetails),
NodeIsQuorumCritical(QuorumCriticalityCheckDetails),
NoActivePortListener(NoActivePortListenerDetails),
NoActiveProtocolListener(NoActiveProtocolListenerDetails),
}

impl HealthCheckFailureDetails {
pub fn reason(&self) -> String {
match self {
HealthCheckFailureDetails::AlarmCheck(details) => details.reason.clone(),
HealthCheckFailureDetails::NodeIsQuorumCritical(details) => details.reason.clone(),
HealthCheckFailureDetails::NoActivePortListener(details) => details.reason.clone(),
HealthCheckFailureDetails::NoActiveProtocolListener(details) => details.reason.clone(),
}
}
}
Expand All @@ -794,6 +798,26 @@ pub struct QuorumCriticalityCheckDetails {
pub queues: Vec<QuorumEndangeredQueue>,
}

#[derive(Debug, Deserialize, Clone, Eq, PartialEq)]
pub struct NoActivePortListenerDetails {
pub status: String,
pub reason: String,
#[serde(rename(deserialize = "missing"))]
#[serde(default)]
pub inactive_port: u16,
}

#[derive(Debug, Deserialize, Clone, Eq, PartialEq)]
pub struct NoActiveProtocolListenerDetails {
pub status: String,
pub reason: String,
#[serde(rename(deserialize = "missing"))]
// Note: switching this to SupportedProtocol will break serde's
// detection of various HealthCheckFailureDetails variants since
// that enum is untagged
pub inactive_protocol: String,
}

#[derive(Debug, Deserialize, Clone, Eq, PartialEq)]
pub struct QuorumEndangeredQueue {
pub name: String,
Expand Down
41 changes: 39 additions & 2 deletions tests/health_checks_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use rabbitmq_http_client::blocking_api::Client;
use rabbitmq_http_client::{blocking_api::Client, commons::SupportedProtocol};

mod test_helpers;
use crate::test_helpers::{endpoint, PASSWORD, USERNAME};
Expand Down Expand Up @@ -44,10 +44,47 @@ fn test_health_check_node_is_quorum_critical() {
}

#[test]
fn test_health_check_port_listener() {
fn test_health_check_port_listener_succeeds() {
let endpoint = endpoint();
let rc = Client::new(&endpoint, USERNAME, PASSWORD);

let result1 = rc.health_check_port_listener(15672);
assert!(result1.is_ok());
}

#[test]
fn test_health_check_port_listener_fails() {
let endpoint = endpoint();
let rc = Client::new(&endpoint, USERNAME, PASSWORD);

let result1 = rc.health_check_port_listener(15679);
assert!(result1.is_err());
}

#[test]
fn test_health_check_protocol_listener_succeeds() {
let endpoint = endpoint();
let rc = Client::new(&endpoint, USERNAME, PASSWORD);

let result1 = rc.health_check_protocol_listener(SupportedProtocol::HTTP);
assert!(result1.is_ok());

let result2 = rc.health_check_protocol_listener(SupportedProtocol::AMQP);
assert!(result2.is_ok());

let result3 = rc.health_check_protocol_listener(SupportedProtocol::Stream);
assert!(result3.is_ok());
}

#[test]
fn test_health_check_protocol_listener_fails() {
let endpoint = endpoint();
let rc = Client::new(&endpoint, USERNAME, PASSWORD);

let result1 = rc
.health_check_protocol_listener(SupportedProtocol::Other("https/non-existent".to_owned()));
assert!(result1.is_err());

let result2 = rc.health_check_protocol_listener(SupportedProtocol::STOMPOverWebsocketsWithTLS);
assert!(result2.is_err());
}

0 comments on commit 6878c35

Please sign in to comment.