Skip to content

Commit

Permalink
Add Client#list_stream_connections_in, Client#get_stream_connection_info
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelklishin committed Dec 31, 2024
1 parent 114331c commit 977226b
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 0 deletions.
28 changes: 28 additions & 0 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,22 @@ where
Ok(response)
}

pub async fn get_stream_connection_info(
&self,
virtual_host: &str,
name: &str,
) -> Result<responses::Connection> {
let response = self
.http_get(
path!("stream", "connections", virtual_host, name),
None,
None,
)
.await?;
let response = response.json().await?;
Ok(response)
}

pub async fn close_connection(&self, name: &str, reason: Option<&str>) -> Result<()> {
match reason {
None => {
Expand Down Expand Up @@ -321,6 +337,18 @@ where
Ok(response)
}

/// Lists RabbitMQ Stream Protocol client connections in the given virtual host.
pub async fn list_stream_connections_in(
&self,
virtual_host: &str,
) -> Result<Vec<responses::Connection>> {
let response = self
.http_get(path!("stream", "connections", virtual_host), None, None)
.await?;
let response = response.json().await?;
Ok(response)
}

/// Lists all channels across the cluster.
pub async fn list_channels(&self) -> Result<Vec<responses::Channel>> {
let response = self.http_get("channels", None, None).await?;
Expand Down
24 changes: 24 additions & 0 deletions src/blocking_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,20 @@ where
Ok(response)
}

pub fn get_stream_connection_info(
&self,
virtual_host: &str,
name: &str,
) -> Result<responses::Connection> {
let response = self.http_get(
path!("stream", "connections", virtual_host, name),
None,
None,
)?;
let response = response.json()?;
Ok(response)
}

pub fn close_connection(&self, name: &str, reason: Option<&str>) -> Result<()> {
match reason {
None => self.http_delete(
Expand Down Expand Up @@ -303,6 +317,16 @@ where
Ok(response)
}

/// Lists RabbitMQ Stream Protocol client connections in the given virtual host.
pub fn list_stream_connections_in(
&self,
virtual_host: &str,
) -> Result<Vec<responses::Connection>> {
let response = self.http_get(path!("stream", "connections", virtual_host), None, None)?;
let response = response.json()?;
Ok(response)
}

/// Lists all channels across the cluster.
pub fn list_channels(&self) -> Result<Vec<responses::Channel>> {
let response = self.http_get("channels", None, None)?;
Expand Down
14 changes: 14 additions & 0 deletions tests/connection_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,17 @@ fn test_list_stream_connections() {
result1
);
}

#[test]
fn test_list_virtual_host_stream_connections() {
let endpoint = endpoint();
let rc = Client::new(&endpoint, USERNAME, PASSWORD);

let vh_name = "/";
let result1 = rc.list_stream_connections_in(vh_name);
assert!(
result1.is_ok(),
"list_stream_connections returned {:?}",
result1
);
}

0 comments on commit 977226b

Please sign in to comment.