From ba163f98dbb5071c2cb21ccb2404846e01e2286a Mon Sep 17 00:00:00 2001 From: Florentin Dubois Date: Wed, 26 Jul 2023 12:11:42 +0200 Subject: [PATCH] build: allow to use rustls instead of native-tls * This is used in an effort to remove all dependencies to openssl. Which could be interesting in embedded system or on environment which is difficult to know on which openssl version the software will run it and breaks deployments. * It introduces two compiler feature flags which are `tokio-rustls-runtime` and `async-std-rustls-runtime` that have the same meaning as `tokio-runtime` and `async-std-runtime` except that they use rustls. * There is a safe guard, if we enable both runtimes, this is the native-tls ones that are used to keep consistent with the current behaviour. Signed-off-by: Florentin Dubois --- Cargo.toml | 15 ++++-- src/connection.rs | 110 +++++++++++++++++++++++++++++++++++--- src/connection_manager.rs | 9 ++++ src/consumer/mod.rs | 16 +++--- src/error.rs | 24 +++++++++ src/executor.rs | 40 +++++++------- src/lib.rs | 16 +++--- src/message.rs | 8 +-- 8 files changed, 187 insertions(+), 51 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f2268ba7..c3579e6e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,13 +30,16 @@ regex = "^1.9.1" bit-vec = "^0.6.3" futures = "^0.3.28" futures-io = "^0.3.28" -native-tls = "^0.2.11" +native-tls = { version = "^0.2.11", optional = true } +rustls = { version = "^0.21.5", optional = true } pem = "^3.0.0" tokio = { version = "^1.29.1", features = ["rt", "net", "time"], optional = true } tokio-util = { version = "^0.7.8", features = ["codec"], optional = true } +tokio-rustls = { version = "^0.24.1", optional = true } tokio-native-tls = { version = "^0.3.1", optional = true } -async-std = {version = "^1.12.0", features = [ "attributes", "unstable" ], optional = true } +async-std = { version = "^1.12.0", features = [ "attributes", "unstable" ], optional = true } asynchronous-codec = { version = "^0.6.2", optional = true } +async-rustls = { version = "^0.4.0", optional = true } async-native-tls = { version = "^0.5.0", optional = true } lz4 = { version = "^1.24.0", optional = true } flate2 = { version = "^1.0.26", optional = true } @@ -49,7 +52,7 @@ serde_json = { version = "^1.0.103", optional = true } tracing = { version = "^0.1.37", optional = true } async-trait = "^0.1.72" data-url = { version = "^0.3.0", optional = true } -uuid = {version = "^1.4.1", features = ["v4", "fast-rng"] } +uuid = { version = "^1.4.1", features = ["v4", "fast-rng"] } [dev-dependencies] serde = { version = "^1.0.175", features = ["derive"] } @@ -64,8 +67,10 @@ protobuf-src = { version = "1.1.0", optional = true } [features] default = [ "compression", "tokio-runtime", "async-std-runtime", "auth-oauth2"] compression = [ "lz4", "flate2", "zstd", "snap" ] -tokio-runtime = [ "tokio", "tokio-util", "tokio-native-tls" ] -async-std-runtime = [ "async-std", "asynchronous-codec", "async-native-tls" ] +tokio-runtime = [ "tokio", "tokio-util", "native-tls", "tokio-native-tls" ] +tokio-rustls-runtime = ["tokio", "tokio-util", "tokio-rustls", "rustls" ] +async-std-runtime = [ "async-std", "asynchronous-codec", "native-tls", "async-native-tls" ] +async-std-rustls-runtime = ["async-std", "asynchronous-codec", "async-rustls", "rustls" ] auth-oauth2 = [ "openidconnect", "oauth2", "serde", "serde_json", "data-url" ] telemetry = ["tracing"] protobuf-src = ["dep:protobuf-src"] diff --git a/src/connection.rs b/src/connection.rs index feda49e9..ff105858 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -20,7 +20,10 @@ use futures::{ task::{Context, Poll}, Future, FutureExt, Sink, SinkExt, Stream, StreamExt, }; +#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))] use native_tls::Certificate; +#[cfg(all(any(feature = "tokio-rustls-runtime", feature = "async-std--rustls-runtime"), not(any(feature = "tokio-runtime", feature = "async-std-runtime"))))] +use rustls::Certificate; use proto::MessageIdData; use rand::{seq::SliceRandom, thread_rng}; use url::Url; @@ -934,11 +937,58 @@ impl Connection { .await } } - #[cfg(not(feature = "tokio-runtime"))] + #[cfg(all(feature = "tokio-rustls-runtime", not(feature = "tokio-runtime")))] + ExecutorKind::Tokio => { + if tls { + let stream = tokio::net::TcpStream::connect(&address).await?; + let mut root_store = rustls::RootCertStore::empty(); + for certificate in certificate_chain { + root_store.add(certificate)?; + } + + let config = rustls::ClientConfig::builder() + .with_safe_default_cipher_suites() + .with_safe_default_kx_groups() + .with_safe_default_protocol_versions()? + .with_root_certificates(root_store) + .with_no_client_auth(); + + let cx = tokio_rustls::TlsConnector::from(Arc::new(config)); + let stream = cx + .connect(rustls::ServerName::try_from(hostname.as_str())?, stream) + .await + .map(|stream| tokio_util::codec::Framed::new(stream, Codec))?; + + Connection::connect( + connection_id, + stream, + auth, + proxy_to_broker_url, + executor, + operation_timeout, + ) + .await + } else { + let stream = tokio::net::TcpStream::connect(&address) + .await + .map(|stream| tokio_util::codec::Framed::new(stream, Codec))?; + + Connection::connect( + connection_id, + stream, + auth, + proxy_to_broker_url, + executor, + operation_timeout, + ) + .await + } + } + #[cfg(all(not(feature = "tokio-runtime"), not(feature = "tokio-rustls-runtime")))] ExecutorKind::Tokio => { unimplemented!("the tokio-runtime cargo feature is not active"); } - #[cfg(feature = "async-std-runtime")] + #[cfg(feature = "async-std-runtime")] ExecutorKind::AsyncStd => { if tls { let stream = async_std::net::TcpStream::connect(&address).await?; @@ -980,7 +1030,54 @@ impl Connection { .await } } - #[cfg(not(feature = "async-std-runtime"))] + #[cfg(all(feature = "async-std-rustls-runtime", not(feature = "async-std-runtime")))] + ExecutorKind::AsyncStd => { + if tls { + let stream = async_std::net::TcpStream::connect(&address).await?; + let mut root_store = rustls::RootCertStore::empty(); + for certificate in certificate_chain { + root_store.add(certificate)?; + } + + let config = rustls::ClientConfig::builder() + .with_safe_default_cipher_suites() + .with_safe_default_kx_groups() + .with_safe_default_protocol_versions()? + .with_root_certificates(root_store) + .with_no_client_auth(); + + let connector = async_rustls::TlsConnector::from(Arc::new(config)); + let stream = connector + .connect(rustls::ServerName::try_from(hostname.as_str())?, stream) + .await + .map(|stream| asynchronous_codec::Framed::new(stream, Codec))?; + + Connection::connect( + connection_id, + stream, + auth, + proxy_to_broker_url, + executor, + operation_timeout, + ) + .await + } else { + let stream = async_std::net::TcpStream::connect(&address) + .await + .map(|stream| asynchronous_codec::Framed::new(stream, Codec))?; + + Connection::connect( + connection_id, + stream, + auth, + proxy_to_broker_url, + executor, + operation_timeout, + ) + .await + } + } + #[cfg(all(not(feature = "async-std-runtime"), not(feature = "async-std-rustls-runtime")))] ExecutorKind::AsyncStd => { unimplemented!("the async-std-runtime cargo feature is not active"); } @@ -1628,11 +1725,12 @@ mod tests { error::{AuthenticationError, SharedError}, message::{BaseCommand, Codec, Message}, proto::{AuthData, CommandAuthChallenge, CommandAuthResponse, CommandConnected}, - TokioExecutor, }; + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] + use crate::TokioExecutor; #[tokio::test] - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] async fn receiver_auth_challenge_test() { let (message_tx, message_rx) = mpsc::unbounded(); let (tx, _) = mpsc::unbounded(); @@ -1690,7 +1788,7 @@ mod tests { } #[tokio::test] - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] async fn connection_auth_challenge_test() { let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); diff --git a/src/connection_manager.rs b/src/connection_manager.rs index 002b89b9..7df45392 100644 --- a/src/connection_manager.rs +++ b/src/connection_manager.rs @@ -1,7 +1,10 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; use futures::{channel::oneshot, lock::Mutex}; +#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))] use native_tls::Certificate; +#[cfg(all(any(feature = "tokio-rustls-runtime", feature = "async-std--rustls-runtime"), not(any(feature = "tokio-runtime", feature = "async-std-runtime"))))] +use rustls::Certificate; use rand::Rng; use url::Url; @@ -153,10 +156,16 @@ impl ConnectionManager { .iter() .rev() { + #[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))] v.push( Certificate::from_der(&cert.contents()) .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?, ); + + #[cfg(all(any(feature = "tokio-rustls-runtime", feature = "async-std--rustls-runtime"), not(any(feature = "tokio-runtime", feature = "async-std-runtime"))))] + v.push( + Certificate(cert.contents().to_vec()) + ); } v } diff --git a/src/consumer/mod.rs b/src/consumer/mod.rs index 37bc4176..849071d5 100644 --- a/src/consumer/mod.rs +++ b/src/consumer/mod.rs @@ -437,11 +437,11 @@ mod tests { }; use log::LevelFilter; use regex::Regex; - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] use tokio::time::timeout; use super::*; - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] use crate::executor::TokioExecutor; use crate::{ consumer::initial_position::InitialPosition, producer, proto, tests::TEST_LOGGER, @@ -476,7 +476,7 @@ mod tests { tag: "multi_consumer", }; #[tokio::test] - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] async fn multi_consumer() { let _result = log::set_logger(&MULTI_LOGGER); log::set_max_level(LevelFilter::Debug); @@ -567,7 +567,7 @@ mod tests { } #[tokio::test] - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] async fn consumer_dropped_with_lingering_acks() { use rand::{distributions::Alphanumeric, Rng}; let _result = log::set_logger(&TEST_LOGGER); @@ -664,7 +664,7 @@ mod tests { } #[tokio::test] - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] async fn dead_letter_queue() { let _result = log::set_logger(&TEST_LOGGER); log::set_max_level(LevelFilter::Debug); @@ -738,7 +738,7 @@ mod tests { } #[tokio::test] - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] async fn failover() { let _result = log::set_logger(&MULTI_LOGGER); log::set_max_level(LevelFilter::Debug); @@ -798,7 +798,7 @@ mod tests { } #[tokio::test] - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] async fn seek_single_consumer() { let _result = log::set_logger(&MULTI_LOGGER); log::set_max_level(LevelFilter::Debug); @@ -917,7 +917,7 @@ mod tests { } #[tokio::test] - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] async fn schema_test() { #[derive(Serialize, Deserialize)] struct TestData { diff --git a/src/error.rs b/src/error.rs index bd1dc643..9fd8af59 100644 --- a/src/error.rs +++ b/src/error.rs @@ -88,7 +88,12 @@ pub enum ConnectionError { Encoding(String), SocketAddr(String), UnexpectedResponse(String), + #[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))] Tls(native_tls::Error), + #[cfg(all(any(feature = "tokio-rustls-runtime", feature = "async-std--rustls-runtime"), not(any(feature = "tokio-runtime", feature = "async-std-runtime"))))] + Tls(rustls::Error), + #[cfg(any(feature = "tokio-rustls-runtime", feature = "async-std-rustls-runtime"))] + DnsName(rustls::client::InvalidDnsNameError), Authentication(AuthenticationError), NotFound, Canceled, @@ -113,6 +118,7 @@ impl From for ConnectionError { } } +#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))] impl From for ConnectionError { #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] fn from(err: native_tls::Error) -> Self { @@ -120,6 +126,22 @@ impl From for ConnectionError { } } +#[cfg(all(any(feature = "tokio-rustls-runtime", feature = "async-std--rustls-runtime"), not(any(feature = "tokio-runtime", feature = "async-std-runtime"))))] +impl From for ConnectionError { + #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] + fn from(err: rustls::Error) -> Self { + ConnectionError::Tls(err) + } +} + +#[cfg(any(feature = "tokio-rustls-runtime", feature = "async-std-rustls-runtime"))] +impl From for ConnectionError { + #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] + fn from(err: rustls::client::InvalidDnsNameError) -> Self { + ConnectionError::DnsName(err) + } +} + impl From for ConnectionError { #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] fn from(err: AuthenticationError) -> Self { @@ -141,6 +163,8 @@ impl fmt::Display for ConnectionError { ConnectionError::Encoding(e) => write!(f, "Error encoding message: {e}"), ConnectionError::SocketAddr(e) => write!(f, "Error obtaining socket address: {e}"), ConnectionError::Tls(e) => write!(f, "Error connecting TLS stream: {e}"), + #[cfg(any(feature = "tokio-rustls-runtime", feature = "async-std-rustls-runtime"))] + ConnectionError::DnsName(e) => write!(f, "Error resolving hostname: {e}"), ConnectionError::Authentication(e) => write!(f, "Error authentication: {e}"), ConnectionError::UnexpectedResponse(e) => { write!(f, "Unexpected response from pulsar: {e}") diff --git a/src/executor.rs b/src/executor.rs index 3273f307..31d18912 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -40,11 +40,11 @@ pub trait Executor: Clone + Send + Sync + 'static { } /// Wrapper for the Tokio executor -#[cfg(feature = "tokio-runtime")] +#[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] #[derive(Clone, Debug)] pub struct TokioExecutor; -#[cfg(feature = "tokio-runtime")] +#[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] impl Executor for TokioExecutor { #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] fn spawn(&self, f: Pin + Send>>) -> Result<(), ()> { @@ -78,11 +78,11 @@ impl Executor for TokioExecutor { } /// Wrapper for the async-std executor -#[cfg(feature = "async-std-runtime")] +#[cfg(any(feature = "async-std-runtime", features = "async-std-rustls-runtime"))] #[derive(Clone, Debug)] pub struct AsyncStdExecutor; -#[cfg(feature = "async-std-runtime")] +#[cfg(any(feature = "async-std-runtime", features = "async-std-rustls-runtime"))] impl Executor for AsyncStdExecutor { #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] fn spawn(&self, f: Pin + Send>>) -> Result<(), ()> { @@ -150,13 +150,13 @@ impl Executor for Arc { /// future returned by [Executor::spawn_blocking] to await on the task's result pub enum JoinHandle { /// wrapper for tokio's `JoinHandle` - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] Tokio(tokio::task::JoinHandle), /// wrapper for async-std's `JoinHandle` - #[cfg(feature = "async-std-runtime")] + #[cfg(any(feature = "async-std-runtime", features = "async-std-rustls-runtime"))] AsyncStd(async_std::task::JoinHandle), // here to avoid a compilation error since T is not used - #[cfg(all(not(feature = "tokio-runtime"), not(feature = "async-std-runtime")))] + #[cfg(all(not(feature = "tokio-runtime"), not(feature = "tokio-rustls-runtime"), not(feature = "async-std-runtime"), not(feature = "async-std-rustls-runtime")))] PlaceHolder(T), } @@ -166,17 +166,17 @@ impl Future for JoinHandle { #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context) -> std::task::Poll { match self.get_mut() { - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] JoinHandle::Tokio(j) => match Pin::new(j).poll(cx) { Poll::Pending => Poll::Pending, Poll::Ready(v) => Poll::Ready(v.ok()), }, - #[cfg(feature = "async-std-runtime")] + #[cfg(any(feature = "async-std-runtime", features = "async-std-rustls-runtime"))] JoinHandle::AsyncStd(j) => match Pin::new(j).poll(cx) { Poll::Pending => Poll::Pending, Poll::Ready(v) => Poll::Ready(Some(v)), }, - #[cfg(all(not(feature = "tokio-runtime"), not(feature = "async-std-runtime")))] + #[cfg(all(not(feature = "tokio-runtime"), not(feature = "tokio-rustls-runtime"), not(feature = "async-std-runtime"), not(feature = "async-std-rustls-runtime")))] JoinHandle::PlaceHolder(t) => { unimplemented!("please activate one of the following cargo features: tokio-runtime, async-std-runtime") } @@ -187,12 +187,12 @@ impl Future for JoinHandle { /// a `Stream` producing a `()` at rgular time intervals pub enum Interval { /// wrapper for tokio's interval - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] Tokio(tokio::time::Interval), /// wrapper for async-std's interval - #[cfg(feature = "async-std-runtime")] + #[cfg(any(feature = "async-std-runtime", features = "async-std-rustls-runtime"))] AsyncStd(async_std::stream::Interval), - #[cfg(all(not(feature = "tokio-runtime"), not(feature = "async-std-runtime")))] + #[cfg(all(not(feature = "tokio-runtime"), not(feature = "tokio-rustls-runtime"), not(feature = "async-std-runtime"), not(feature = "async-std-rustls-runtime")))] PlaceHolder, } @@ -206,17 +206,17 @@ impl Stream for Interval { ) -> std::task::Poll> { unsafe { match Pin::get_unchecked_mut(self) { - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] Interval::Tokio(j) => match Pin::new_unchecked(j).poll_tick(cx) { Poll::Pending => Poll::Pending, Poll::Ready(_) => Poll::Ready(Some(())), }, - #[cfg(feature = "async-std-runtime")] + #[cfg(any(feature = "async-std-runtime", features = "async-std-rustls-runtime"))] Interval::AsyncStd(j) => match Pin::new_unchecked(j).poll_next(cx) { Poll::Pending => Poll::Pending, Poll::Ready(v) => Poll::Ready(v), }, - #[cfg(all(not(feature = "tokio-runtime"), not(feature = "async-std-runtime")))] + #[cfg(all(not(feature = "tokio-runtime"), not(feature = "tokio-rustls-runtime"), not(feature = "async-std-runtime"), not(feature = "async-std-rustls-runtime")))] Interval::PlaceHolder => { unimplemented!("please activate one of the following cargo features: tokio-runtime, async-std-runtime") } @@ -228,10 +228,10 @@ impl Stream for Interval { /// a future producing a `()` after some time pub enum Delay { /// wrapper around tokio's `Sleep` - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] Tokio(tokio::time::Sleep), /// wrapper around async-std's `Delay` - #[cfg(feature = "async-std-runtime")] + #[cfg(any(feature = "async-std-runtime", features = "async-std-rustls-runtime"))] AsyncStd(Pin + Send>>), } @@ -242,12 +242,12 @@ impl Future for Delay { fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context) -> std::task::Poll { unsafe { match Pin::get_unchecked_mut(self) { - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] Delay::Tokio(d) => match Pin::new_unchecked(d).poll(cx) { Poll::Pending => Poll::Pending, Poll::Ready(_) => Poll::Ready(()), }, - #[cfg(feature = "async-std-runtime")] + #[cfg(any(feature = "async-std-runtime", features = "async-std-rustls-runtime"))] Delay::AsyncStd(j) => match Pin::new_unchecked(j).poll(cx) { Poll::Pending => Poll::Pending, Poll::Ready(_) => Poll::Ready(()), diff --git a/src/lib.rs b/src/lib.rs index 58415189..bfb59c5d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -167,10 +167,10 @@ pub use connection_manager::{ }; pub use consumer::{Consumer, ConsumerBuilder, ConsumerOptions}; pub use error::Error; -#[cfg(feature = "async-std-runtime")] +#[cfg(any(feature = "async-std-runtime", features = "async-std-rustls-runtime"))] pub use executor::AsyncStdExecutor; pub use executor::Executor; -#[cfg(feature = "tokio-runtime")] +#[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] pub use executor::TokioExecutor; pub use message::{ proto::{self, command_subscribe::SubType, CommandSendReceipt}, @@ -201,11 +201,11 @@ mod tests { use futures::{future::try_join_all, StreamExt}; use log::{LevelFilter, Metadata, Record}; - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] use tokio::time::timeout; use super::*; - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] use crate::executor::TokioExecutor; use crate::{ client::SerializeMessage, @@ -309,7 +309,7 @@ mod tests { pub static TEST_LOGGER: SimpleLogger = SimpleLogger { tag: "" }; #[tokio::test] - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] async fn round_trip() { let _result = log::set_logger(&TEST_LOGGER); log::set_max_level(LevelFilter::Debug); @@ -374,7 +374,7 @@ mod tests { } #[tokio::test] - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] async fn unsized_data() { let _result = log::set_logger(&TEST_LOGGER); log::set_max_level(LevelFilter::Debug); @@ -460,7 +460,7 @@ mod tests { } #[tokio::test] - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] async fn redelivery() { let _result = log::set_logger(&TEST_LOGGER); log::set_max_level(LevelFilter::Debug); @@ -506,7 +506,7 @@ mod tests { } #[tokio::test] - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] async fn batching() { let _result = log::set_logger(&TEST_LOGGER); log::set_max_level(LevelFilter::Debug); diff --git a/src/message.rs b/src/message.rs index fa4a4842..0ab3a01c 100644 --- a/src/message.rs +++ b/src/message.rs @@ -220,7 +220,7 @@ impl Message { /// tokio and async-std codec for Pulsar messages pub struct Codec; -#[cfg(feature = "tokio-runtime")] +#[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] impl tokio_util::codec::Encoder for Codec { type Error = ConnectionError; @@ -269,7 +269,7 @@ impl tokio_util::codec::Encoder for Codec { } } -#[cfg(feature = "tokio-runtime")] +#[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] impl tokio_util::codec::Decoder for Codec { type Item = Message; type Error = ConnectionError; @@ -324,7 +324,7 @@ impl tokio_util::codec::Decoder for Codec { } } -#[cfg(feature = "async-std-runtime")] +#[cfg(any(feature = "async-std-runtime", feature = "async-std-rustls-runtime"))] impl asynchronous_codec::Encoder for Codec { type Item = Message; type Error = ConnectionError; @@ -374,7 +374,7 @@ impl asynchronous_codec::Encoder for Codec { } } -#[cfg(feature = "async-std-runtime")] +#[cfg(any(feature = "async-std-runtime", feature = "async-std-rustls-runtime"))] impl asynchronous_codec::Decoder for Codec { type Item = Message; type Error = ConnectionError;