diff --git a/Cargo.toml b/Cargo.toml index f2268ba7..e83bcee9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,13 +30,17 @@ regex = "^1.9.1" bit-vec = "^0.6.3" futures = "^0.3.28" futures-io = "^0.3.28" -native-tls = "^0.2.11" +native-tls = { version = "^0.2.11", optional = true } +rustls = { version = "^0.21.5", optional = true } +webpki-roots = { version = "^0.24.0", optional = true } pem = "^3.0.0" tokio = { version = "^1.29.1", features = ["rt", "net", "time"], optional = true } tokio-util = { version = "^0.7.8", features = ["codec"], optional = true } +tokio-rustls = { version = "^0.24.1", optional = true } tokio-native-tls = { version = "^0.3.1", optional = true } -async-std = {version = "^1.12.0", features = [ "attributes", "unstable" ], optional = true } +async-std = { version = "^1.12.0", features = [ "attributes", "unstable" ], optional = true } asynchronous-codec = { version = "^0.6.2", optional = true } +async-rustls = { version = "^0.4.0", optional = true } async-native-tls = { version = "^0.5.0", optional = true } lz4 = { version = "^1.24.0", optional = true } flate2 = { version = "^1.0.26", optional = true } @@ -49,7 +53,7 @@ serde_json = { version = "^1.0.103", optional = true } tracing = { version = "^0.1.37", optional = true } async-trait = "^0.1.72" data-url = { version = "^0.3.0", optional = true } -uuid = {version = "^1.4.1", features = ["v4", "fast-rng"] } +uuid = { version = "^1.4.1", features = ["v4", "fast-rng"] } [dev-dependencies] serde = { version = "^1.0.175", features = ["derive"] } @@ -64,8 +68,10 @@ protobuf-src = { version = "1.1.0", optional = true } [features] default = [ "compression", "tokio-runtime", "async-std-runtime", "auth-oauth2"] compression = [ "lz4", "flate2", "zstd", "snap" ] -tokio-runtime = [ "tokio", "tokio-util", "tokio-native-tls" ] -async-std-runtime = [ "async-std", "asynchronous-codec", "async-native-tls" ] +tokio-runtime = [ "tokio", "tokio-util", "native-tls", "tokio-native-tls" ] +tokio-rustls-runtime = ["tokio", "tokio-util", "tokio-rustls", "rustls", "webpki-roots" ] +async-std-runtime = [ "async-std", "asynchronous-codec", "native-tls", "async-native-tls" ] +async-std-rustls-runtime = ["async-std", "asynchronous-codec", "async-rustls", "rustls", "webpki-roots" ] auth-oauth2 = [ "openidconnect", "oauth2", "serde", "serde_json", "data-url" ] telemetry = ["tracing"] protobuf-src = ["dep:protobuf-src"] diff --git a/src/connection.rs b/src/connection.rs index feda49e9..06a24676 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -20,9 +20,18 @@ use futures::{ task::{Context, Poll}, Future, FutureExt, Sink, SinkExt, Stream, StreamExt, }; +#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))] use native_tls::Certificate; use proto::MessageIdData; use rand::{seq::SliceRandom, thread_rng}; +#[cfg(all( + any( + feature = "tokio-rustls-runtime", + feature = "async-std-rustls-runtime" + ), + not(any(feature = "tokio-runtime", feature = "async-std-runtime")) +))] +use rustls::Certificate; use url::Url; use uuid::Uuid; @@ -934,7 +943,69 @@ impl Connection { .await } } - #[cfg(not(feature = "tokio-runtime"))] + #[cfg(all(feature = "tokio-rustls-runtime", not(feature = "tokio-runtime")))] + ExecutorKind::Tokio => { + if tls { + let stream = tokio::net::TcpStream::connect(&address).await?; + let mut root_store = rustls::RootCertStore::empty(); + for certificate in certificate_chain { + root_store.add(certificate)?; + } + + let trust_anchors = webpki_roots::TLS_SERVER_ROOTS.0.iter().fold( + vec![], + |mut acc, trust_anchor| { + acc.push( + rustls::OwnedTrustAnchor::from_subject_spki_name_constraints( + trust_anchor.subject, + trust_anchor.spki, + trust_anchor.name_constraints, + ), + ); + acc + }, + ); + + root_store.add_server_trust_anchors(trust_anchors.into_iter()); + let config = rustls::ClientConfig::builder() + .with_safe_default_cipher_suites() + .with_safe_default_kx_groups() + .with_safe_default_protocol_versions()? + .with_root_certificates(root_store) + .with_no_client_auth(); + + let cx = tokio_rustls::TlsConnector::from(Arc::new(config)); + let stream = cx + .connect(rustls::ServerName::try_from(hostname.as_str())?, stream) + .await + .map(|stream| tokio_util::codec::Framed::new(stream, Codec))?; + + Connection::connect( + connection_id, + stream, + auth, + proxy_to_broker_url, + executor, + operation_timeout, + ) + .await + } else { + let stream = tokio::net::TcpStream::connect(&address) + .await + .map(|stream| tokio_util::codec::Framed::new(stream, Codec))?; + + Connection::connect( + connection_id, + stream, + auth, + proxy_to_broker_url, + executor, + operation_timeout, + ) + .await + } + } + #[cfg(all(not(feature = "tokio-runtime"), not(feature = "tokio-rustls-runtime")))] ExecutorKind::Tokio => { unimplemented!("the tokio-runtime cargo feature is not active"); } @@ -980,7 +1051,75 @@ impl Connection { .await } } - #[cfg(not(feature = "async-std-runtime"))] + #[cfg(all( + feature = "async-std-rustls-runtime", + not(feature = "async-std-runtime") + ))] + ExecutorKind::AsyncStd => { + if tls { + let stream = async_std::net::TcpStream::connect(&address).await?; + let mut root_store = rustls::RootCertStore::empty(); + for certificate in certificate_chain { + root_store.add(certificate)?; + } + + let trust_anchors = webpki_roots::TLS_SERVER_ROOTS.0.iter().fold( + vec![], + |mut acc, trust_anchor| { + acc.push( + rustls::OwnedTrustAnchor::from_subject_spki_name_constraints( + trust_anchor.subject, + trust_anchor.spki, + trust_anchor.name_constraints, + ), + ); + acc + }, + ); + + root_store.add_server_trust_anchors(trust_anchors.into_iter()); + let config = rustls::ClientConfig::builder() + .with_safe_default_cipher_suites() + .with_safe_default_kx_groups() + .with_safe_default_protocol_versions()? + .with_root_certificates(root_store) + .with_no_client_auth(); + + let connector = async_rustls::TlsConnector::from(Arc::new(config)); + let stream = connector + .connect(rustls::ServerName::try_from(hostname.as_str())?, stream) + .await + .map(|stream| asynchronous_codec::Framed::new(stream, Codec))?; + + Connection::connect( + connection_id, + stream, + auth, + proxy_to_broker_url, + executor, + operation_timeout, + ) + .await + } else { + let stream = async_std::net::TcpStream::connect(&address) + .await + .map(|stream| asynchronous_codec::Framed::new(stream, Codec))?; + + Connection::connect( + connection_id, + stream, + auth, + proxy_to_broker_url, + executor, + operation_timeout, + ) + .await + } + } + #[cfg(all( + not(feature = "async-std-runtime"), + not(feature = "async-std-rustls-runtime") + ))] ExecutorKind::AsyncStd => { unimplemented!("the async-std-runtime cargo feature is not active"); } @@ -1623,16 +1762,17 @@ mod tests { use uuid::Uuid; use super::{Connection, Receiver}; + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] + use crate::TokioExecutor; use crate::{ authentication::Authentication, error::{AuthenticationError, SharedError}, message::{BaseCommand, Codec, Message}, proto::{AuthData, CommandAuthChallenge, CommandAuthResponse, CommandConnected}, - TokioExecutor, }; #[tokio::test] - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] async fn receiver_auth_challenge_test() { let (message_tx, message_rx) = mpsc::unbounded(); let (tx, _) = mpsc::unbounded(); @@ -1690,7 +1830,7 @@ mod tests { } #[tokio::test] - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] async fn connection_auth_challenge_test() { let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); diff --git a/src/connection_manager.rs b/src/connection_manager.rs index 002b89b9..4b9c74fe 100644 --- a/src/connection_manager.rs +++ b/src/connection_manager.rs @@ -1,8 +1,17 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; use futures::{channel::oneshot, lock::Mutex}; +#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))] use native_tls::Certificate; use rand::Rng; +#[cfg(all( + any( + feature = "tokio-rustls-runtime", + feature = "async-std-rustls-runtime" + ), + not(any(feature = "tokio-runtime", feature = "async-std-runtime")) +))] +use rustls::Certificate; use url::Url; use crate::{connection::Connection, error::ConnectionError, executor::Executor}; @@ -153,10 +162,20 @@ impl ConnectionManager { .iter() .rev() { + #[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))] v.push( - Certificate::from_der(&cert.contents()) + Certificate::from_der(cert.contents()) .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?, ); + + #[cfg(all( + any( + feature = "tokio-rustls-runtime", + feature = "async-std-rustls-runtime" + ), + not(any(feature = "tokio-runtime", feature = "async-std-runtime")) + ))] + v.push(Certificate(cert.contents().to_vec())); } v } diff --git a/src/consumer/mod.rs b/src/consumer/mod.rs index 37bc4176..849071d5 100644 --- a/src/consumer/mod.rs +++ b/src/consumer/mod.rs @@ -437,11 +437,11 @@ mod tests { }; use log::LevelFilter; use regex::Regex; - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] use tokio::time::timeout; use super::*; - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] use crate::executor::TokioExecutor; use crate::{ consumer::initial_position::InitialPosition, producer, proto, tests::TEST_LOGGER, @@ -476,7 +476,7 @@ mod tests { tag: "multi_consumer", }; #[tokio::test] - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] async fn multi_consumer() { let _result = log::set_logger(&MULTI_LOGGER); log::set_max_level(LevelFilter::Debug); @@ -567,7 +567,7 @@ mod tests { } #[tokio::test] - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] async fn consumer_dropped_with_lingering_acks() { use rand::{distributions::Alphanumeric, Rng}; let _result = log::set_logger(&TEST_LOGGER); @@ -664,7 +664,7 @@ mod tests { } #[tokio::test] - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] async fn dead_letter_queue() { let _result = log::set_logger(&TEST_LOGGER); log::set_max_level(LevelFilter::Debug); @@ -738,7 +738,7 @@ mod tests { } #[tokio::test] - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] async fn failover() { let _result = log::set_logger(&MULTI_LOGGER); log::set_max_level(LevelFilter::Debug); @@ -798,7 +798,7 @@ mod tests { } #[tokio::test] - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] async fn seek_single_consumer() { let _result = log::set_logger(&MULTI_LOGGER); log::set_max_level(LevelFilter::Debug); @@ -917,7 +917,7 @@ mod tests { } #[tokio::test] - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] async fn schema_test() { #[derive(Serialize, Deserialize)] struct TestData { diff --git a/src/error.rs b/src/error.rs index bd1dc643..3fce5326 100644 --- a/src/error.rs +++ b/src/error.rs @@ -88,7 +88,18 @@ pub enum ConnectionError { Encoding(String), SocketAddr(String), UnexpectedResponse(String), + #[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))] Tls(native_tls::Error), + #[cfg(all( + any( + feature = "tokio-rustls-runtime", + feature = "async-std-rustls-runtime" + ), + not(any(feature = "tokio-runtime", feature = "async-std-runtime")) + ))] + Tls(rustls::Error), + #[cfg(any(feature = "tokio-rustls-runtime", feature = "async-std-rustls-runtime"))] + DnsName(rustls::client::InvalidDnsNameError), Authentication(AuthenticationError), NotFound, Canceled, @@ -113,6 +124,7 @@ impl From for ConnectionError { } } +#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))] impl From for ConnectionError { #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] fn from(err: native_tls::Error) -> Self { @@ -120,6 +132,28 @@ impl From for ConnectionError { } } +#[cfg(all( + any( + feature = "tokio-rustls-runtime", + feature = "async-std-rustls-runtime" + ), + not(any(feature = "tokio-runtime", feature = "async-std-runtime")) +))] +impl From for ConnectionError { + #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] + fn from(err: rustls::Error) -> Self { + ConnectionError::Tls(err) + } +} + +#[cfg(any(feature = "tokio-rustls-runtime", feature = "async-std-rustls-runtime"))] +impl From for ConnectionError { + #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] + fn from(err: rustls::client::InvalidDnsNameError) -> Self { + ConnectionError::DnsName(err) + } +} + impl From for ConnectionError { #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] fn from(err: AuthenticationError) -> Self { @@ -141,6 +175,8 @@ impl fmt::Display for ConnectionError { ConnectionError::Encoding(e) => write!(f, "Error encoding message: {e}"), ConnectionError::SocketAddr(e) => write!(f, "Error obtaining socket address: {e}"), ConnectionError::Tls(e) => write!(f, "Error connecting TLS stream: {e}"), + #[cfg(any(feature = "tokio-rustls-runtime", feature = "async-std-rustls-runtime"))] + ConnectionError::DnsName(e) => write!(f, "Error resolving hostname: {e}"), ConnectionError::Authentication(e) => write!(f, "Error authentication: {e}"), ConnectionError::UnexpectedResponse(e) => { write!(f, "Unexpected response from pulsar: {e}") diff --git a/src/executor.rs b/src/executor.rs index 3273f307..b980986d 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -40,11 +40,11 @@ pub trait Executor: Clone + Send + Sync + 'static { } /// Wrapper for the Tokio executor -#[cfg(feature = "tokio-runtime")] +#[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] #[derive(Clone, Debug)] pub struct TokioExecutor; -#[cfg(feature = "tokio-runtime")] +#[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] impl Executor for TokioExecutor { #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] fn spawn(&self, f: Pin + Send>>) -> Result<(), ()> { @@ -78,11 +78,11 @@ impl Executor for TokioExecutor { } /// Wrapper for the async-std executor -#[cfg(feature = "async-std-runtime")] +#[cfg(any(feature = "async-std-runtime", features = "async-std-rustls-runtime"))] #[derive(Clone, Debug)] pub struct AsyncStdExecutor; -#[cfg(feature = "async-std-runtime")] +#[cfg(any(feature = "async-std-runtime", features = "async-std-rustls-runtime"))] impl Executor for AsyncStdExecutor { #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] fn spawn(&self, f: Pin + Send>>) -> Result<(), ()> { @@ -150,13 +150,18 @@ impl Executor for Arc { /// future returned by [Executor::spawn_blocking] to await on the task's result pub enum JoinHandle { /// wrapper for tokio's `JoinHandle` - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] Tokio(tokio::task::JoinHandle), /// wrapper for async-std's `JoinHandle` - #[cfg(feature = "async-std-runtime")] + #[cfg(any(feature = "async-std-runtime", features = "async-std-rustls-runtime"))] AsyncStd(async_std::task::JoinHandle), // here to avoid a compilation error since T is not used - #[cfg(all(not(feature = "tokio-runtime"), not(feature = "async-std-runtime")))] + #[cfg(all( + not(feature = "tokio-runtime"), + not(feature = "tokio-rustls-runtime"), + not(feature = "async-std-runtime"), + not(feature = "async-std-rustls-runtime") + ))] PlaceHolder(T), } @@ -166,17 +171,22 @@ impl Future for JoinHandle { #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context) -> std::task::Poll { match self.get_mut() { - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] JoinHandle::Tokio(j) => match Pin::new(j).poll(cx) { Poll::Pending => Poll::Pending, Poll::Ready(v) => Poll::Ready(v.ok()), }, - #[cfg(feature = "async-std-runtime")] + #[cfg(any(feature = "async-std-runtime", features = "async-std-rustls-runtime"))] JoinHandle::AsyncStd(j) => match Pin::new(j).poll(cx) { Poll::Pending => Poll::Pending, Poll::Ready(v) => Poll::Ready(Some(v)), }, - #[cfg(all(not(feature = "tokio-runtime"), not(feature = "async-std-runtime")))] + #[cfg(all( + not(feature = "tokio-runtime"), + not(feature = "tokio-rustls-runtime"), + not(feature = "async-std-runtime"), + not(feature = "async-std-rustls-runtime") + ))] JoinHandle::PlaceHolder(t) => { unimplemented!("please activate one of the following cargo features: tokio-runtime, async-std-runtime") } @@ -187,12 +197,17 @@ impl Future for JoinHandle { /// a `Stream` producing a `()` at rgular time intervals pub enum Interval { /// wrapper for tokio's interval - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] Tokio(tokio::time::Interval), /// wrapper for async-std's interval - #[cfg(feature = "async-std-runtime")] + #[cfg(any(feature = "async-std-runtime", features = "async-std-rustls-runtime"))] AsyncStd(async_std::stream::Interval), - #[cfg(all(not(feature = "tokio-runtime"), not(feature = "async-std-runtime")))] + #[cfg(all( + not(feature = "tokio-runtime"), + not(feature = "tokio-rustls-runtime"), + not(feature = "async-std-runtime"), + not(feature = "async-std-rustls-runtime") + ))] PlaceHolder, } @@ -206,17 +221,22 @@ impl Stream for Interval { ) -> std::task::Poll> { unsafe { match Pin::get_unchecked_mut(self) { - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] Interval::Tokio(j) => match Pin::new_unchecked(j).poll_tick(cx) { Poll::Pending => Poll::Pending, Poll::Ready(_) => Poll::Ready(Some(())), }, - #[cfg(feature = "async-std-runtime")] + #[cfg(any(feature = "async-std-runtime", features = "async-std-rustls-runtime"))] Interval::AsyncStd(j) => match Pin::new_unchecked(j).poll_next(cx) { Poll::Pending => Poll::Pending, Poll::Ready(v) => Poll::Ready(v), }, - #[cfg(all(not(feature = "tokio-runtime"), not(feature = "async-std-runtime")))] + #[cfg(all( + not(feature = "tokio-runtime"), + not(feature = "tokio-rustls-runtime"), + not(feature = "async-std-runtime"), + not(feature = "async-std-rustls-runtime") + ))] Interval::PlaceHolder => { unimplemented!("please activate one of the following cargo features: tokio-runtime, async-std-runtime") } @@ -228,10 +248,10 @@ impl Stream for Interval { /// a future producing a `()` after some time pub enum Delay { /// wrapper around tokio's `Sleep` - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] Tokio(tokio::time::Sleep), /// wrapper around async-std's `Delay` - #[cfg(feature = "async-std-runtime")] + #[cfg(any(feature = "async-std-runtime", features = "async-std-rustls-runtime"))] AsyncStd(Pin + Send>>), } @@ -242,12 +262,12 @@ impl Future for Delay { fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context) -> std::task::Poll { unsafe { match Pin::get_unchecked_mut(self) { - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] Delay::Tokio(d) => match Pin::new_unchecked(d).poll(cx) { Poll::Pending => Poll::Pending, Poll::Ready(_) => Poll::Ready(()), }, - #[cfg(feature = "async-std-runtime")] + #[cfg(any(feature = "async-std-runtime", features = "async-std-rustls-runtime"))] Delay::AsyncStd(j) => match Pin::new_unchecked(j).poll(cx) { Poll::Pending => Poll::Pending, Poll::Ready(_) => Poll::Ready(()), diff --git a/src/lib.rs b/src/lib.rs index 58415189..514973c0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -160,6 +160,11 @@ extern crate prost_derive; #[macro_use] extern crate serde; +#[cfg(all(features = "tokio-rustls-runtime", features = "tokio-runtime"))] +compile_error!("You have selected both features \"tokio-rustls-runtime\" and \"tokio-runtime\" which are exclusive, please choose one of them"); +#[cfg(all(features = "async-std-rustls-runtime", features = "async-std-runtime"))] +compile_error!("You have selected both features \"async-std-rustls-runtime\" and \"async-std-runtime\" which are exclusive, please choose one of them"); + pub use client::{DeserializeMessage, Pulsar, PulsarBuilder, SerializeMessage}; pub use connection::Authentication; pub use connection_manager::{ @@ -167,10 +172,10 @@ pub use connection_manager::{ }; pub use consumer::{Consumer, ConsumerBuilder, ConsumerOptions}; pub use error::Error; -#[cfg(feature = "async-std-runtime")] +#[cfg(any(feature = "async-std-runtime", features = "async-std-rustls-runtime"))] pub use executor::AsyncStdExecutor; pub use executor::Executor; -#[cfg(feature = "tokio-runtime")] +#[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] pub use executor::TokioExecutor; pub use message::{ proto::{self, command_subscribe::SubType, CommandSendReceipt}, @@ -201,11 +206,11 @@ mod tests { use futures::{future::try_join_all, StreamExt}; use log::{LevelFilter, Metadata, Record}; - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] use tokio::time::timeout; use super::*; - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] use crate::executor::TokioExecutor; use crate::{ client::SerializeMessage, @@ -309,7 +314,7 @@ mod tests { pub static TEST_LOGGER: SimpleLogger = SimpleLogger { tag: "" }; #[tokio::test] - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] async fn round_trip() { let _result = log::set_logger(&TEST_LOGGER); log::set_max_level(LevelFilter::Debug); @@ -374,7 +379,7 @@ mod tests { } #[tokio::test] - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] async fn unsized_data() { let _result = log::set_logger(&TEST_LOGGER); log::set_max_level(LevelFilter::Debug); @@ -460,7 +465,7 @@ mod tests { } #[tokio::test] - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] async fn redelivery() { let _result = log::set_logger(&TEST_LOGGER); log::set_max_level(LevelFilter::Debug); @@ -506,7 +511,7 @@ mod tests { } #[tokio::test] - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] async fn batching() { let _result = log::set_logger(&TEST_LOGGER); log::set_max_level(LevelFilter::Debug); diff --git a/src/message.rs b/src/message.rs index fa4a4842..0ab3a01c 100644 --- a/src/message.rs +++ b/src/message.rs @@ -220,7 +220,7 @@ impl Message { /// tokio and async-std codec for Pulsar messages pub struct Codec; -#[cfg(feature = "tokio-runtime")] +#[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] impl tokio_util::codec::Encoder for Codec { type Error = ConnectionError; @@ -269,7 +269,7 @@ impl tokio_util::codec::Encoder for Codec { } } -#[cfg(feature = "tokio-runtime")] +#[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] impl tokio_util::codec::Decoder for Codec { type Item = Message; type Error = ConnectionError; @@ -324,7 +324,7 @@ impl tokio_util::codec::Decoder for Codec { } } -#[cfg(feature = "async-std-runtime")] +#[cfg(any(feature = "async-std-runtime", feature = "async-std-rustls-runtime"))] impl asynchronous_codec::Encoder for Codec { type Item = Message; type Error = ConnectionError; @@ -374,7 +374,7 @@ impl asynchronous_codec::Encoder for Codec { } } -#[cfg(feature = "async-std-runtime")] +#[cfg(any(feature = "async-std-runtime", feature = "async-std-rustls-runtime"))] impl asynchronous_codec::Decoder for Codec { type Item = Message; type Error = ConnectionError;