From faa071d7d9e872ab30bb8403ac28bc7426425532 Mon Sep 17 00:00:00 2001 From: Scott Hutton Date: Mon, 29 Jul 2024 21:49:06 -0700 Subject: [PATCH] Support specififying non-default batch size Provide new interfaces to allow calling `sendmmsg` and `recvmmsg` with a non-default batch size, specified by API clients as a generic constant during compile time. This is presently 1024 on Linux and FreeBSD. The default batch size is increased to 128 on these platforms. Also update to minimally support FreeBSD. Not all Linux features are supported yet. --- src/lib.rs | 13 +++- src/unix.rs | 187 +++++++++++++++++++++++++++++++++++++++++++++------- 2 files changed, 174 insertions(+), 26 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 4db566a..2534666 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,8 +16,17 @@ mod imp; pub use imp::{sync, UdpSocket}; pub mod framed; -/// Number of UDP packets to send/receive at a time -pub const BATCH_SIZE: usize = imp::BATCH_SIZE; +/// Maximum number of UDP packets that can be sent by the `sendmmsg`/`recvmmsg` +/// wrappers. Note that, for supported platforms, the OS caps the batch size at +/// this value, but will not return an error, so this is just a suggested +/// maximum. +/// +/// Presently, this is 1024 on Linux an FreeBS, and 1 on platforms that don't +/// support `sendmmsg`/`recvmmsg` +pub const BATCH_SIZE_CAP: usize = imp::BATCH_SIZE_CAP; + +/// Default number of UDP packets to send/receive at a time. +pub const DEFAULT_BATCH_SIZE: usize = imp::DEFAULT_BATCH_SIZE; /// The capabilities a UDP socket suppports on a certain platform #[derive(Debug)] diff --git a/src/unix.rs b/src/unix.rs index fa6b43f..ea2667c 100644 --- a/src/unix.rs +++ b/src/unix.rs @@ -22,6 +22,28 @@ use tokio::{ use super::{cmsg, log_sendmsg_error, RecvMeta, UdpState, IO_ERROR_LOG_INTERVAL}; +pub(crate) const BATCH_SIZE_CAP: usize = SYS_BATCH_SIZE_CAP; + +// This is not set to the maximum as larger batch sizes require larger stack +// frames, which may be undesirable. On non-Linux/FreeBSD systems, this is +// reduced to 1, as they don't support batching UDP messages. +pub(crate) const DEFAULT_BATCH_SIZE: usize = SYS_DEFAULT_BATCH_SIZE; + +#[cfg(target_os = "linux")] +const SYS_BATCH_SIZE_CAP: usize = libc::UIO_MAXIOV as usize; + +#[cfg(target_os = "freebsd")] +const SYS_BATCH_SIZE_CAP: usize = 1024 as usize; + +#[cfg(not(any(target_os = "linux", target_os = "freebsd")))] +pub const SYS_BATCH_SIZE_CAP: usize = 1; + +#[cfg(any(target_os = "linux", target_os = "freebsd"))] +pub const SYS_DEFAULT_BATCH_SIZE: usize = 128; + +#[cfg(not(any(target_os = "linux", target_os = "freebsd")))] +pub const SYS_DEFAULT_BATCH_SIZE: usize = 1; + #[cfg(target_os = "freebsd")] type IpTosTy = libc::c_uchar; #[cfg(not(target_os = "freebsd"))] @@ -239,20 +261,44 @@ impl UdpSocket { } /// Calls syscall [`sendmmsg`]. With a given `state` configured GSO and - /// `transmits` with information on the data and metadata about outgoing packets. + /// `transmits` with information on the data and metadata about outgoing + /// packets. + /// + /// Utilizes the default batch size (`DEFAULT_BATCH_SIZE`), and will send no + /// more than that number of messages. The caller must call this fuction + /// again after modifying `transmits` to continue sending the entire set of + /// messages. /// /// [`sendmmsg`]: https://linux.die.net/man/2/sendmmsg pub async fn send_mmsg>( &self, state: &UdpState, transmits: &[Transmit], + ) -> Result { + self.send_mmsg_with_batch_size::<_, DEFAULT_BATCH_SIZE>(state, transmits) + .await + } + + /// Calls syscall [`sendmmsg`]. With a given `state` configured GSO and + /// `transmits` with information on the data and metadata about outgoing packets. + /// + /// Sends no more than `BATCH_SIZE` messages. The caller must call this + /// fuction again after modifying `transmits` to continue sending the entire + /// set of messages. `BATCH_SIZE_CAP` defines the maximum that will be + /// sent, regardless of the specified `BATCH_SIZE` + /// + /// [`sendmmsg`]: https://linux.die.net/man/2/sendmmsg + pub async fn send_mmsg_with_batch_size, const BATCH_SIZE: usize>( + &self, + state: &UdpState, + transmits: &[Transmit], ) -> Result { let n = loop { self.io.writable().await?; let last_send_error = self.last_send_error.clone(); let io = &self.io; match io.try_io(Interest::WRITABLE, || { - send(state, SockRef::from(io), last_send_error, transmits) + send::<_, BATCH_SIZE>(state, SockRef::from(io), last_send_error, transmits) }) { Ok(res) => break res, Err(_would_block) => continue, @@ -284,17 +330,28 @@ impl UdpSocket { Ok(n) } - /// async version of `recvmmsg` + /// async version of `recvmmsg` with compile-time configurable batch size pub async fn recv_mmsg( &self, bufs: &mut [IoSliceMut<'_>], meta: &mut [RecvMeta], + ) -> io::Result { + self.recv_mmsg_with_batch_size::(bufs, meta) + .await + } + + pub async fn recv_mmsg_with_batch_size( + &self, + bufs: &mut [IoSliceMut<'_>], + meta: &mut [RecvMeta], ) -> io::Result { debug_assert!(!bufs.is_empty()); loop { self.io.readable().await?; let io = &self.io; - match io.try_io(Interest::READABLE, || recv(SockRef::from(io), bufs, meta)) { + match io.try_io(Interest::READABLE, || { + recv::(SockRef::from(io), bufs, meta) + }) { Ok(res) => return Ok(res), Err(_would_block) => continue, } @@ -324,12 +381,22 @@ impl UdpSocket { state: &UdpState, cx: &mut Context, transmits: &[Transmit], + ) -> Poll> { + self.poll_send_mmsg_with_batch_size::<_, DEFAULT_BATCH_SIZE>(state, cx, transmits) + } + + /// calls `sendmmsg` + pub fn poll_send_mmsg_with_batch_size, const BATCH_SIZE: usize>( + &mut self, + state: &UdpState, + cx: &mut Context, + transmits: &[Transmit], ) -> Poll> { loop { ready!(self.io.poll_send_ready(cx))?; let io = &self.io; if let Ok(res) = io.try_io(Interest::WRITABLE, || { - send( + send::<_, BATCH_SIZE>( state, SockRef::from(io), self.last_send_error.clone(), @@ -340,7 +407,8 @@ impl UdpSocket { } } } - /// calls `sendmsg` + + /// calls `sendmsg` with compile-time configurable batch size pub fn poll_send_msg>( &self, state: &UdpState, @@ -379,12 +447,24 @@ impl UdpSocket { cx: &mut Context, bufs: &mut [IoSliceMut<'_>], meta: &mut [RecvMeta], + ) -> Poll> { + self.poll_recv_mmsg_with_batch_size::(cx, bufs, meta) + } + + /// calls `recvmmsg` with compile-time configurable batch size + pub fn poll_recv_mmsg_with_batch_size( + &self, + cx: &mut Context, + bufs: &mut [IoSliceMut<'_>], + meta: &mut [RecvMeta], ) -> Poll> { debug_assert!(!bufs.is_empty()); loop { ready!(self.io.poll_recv_ready(cx))?; let io = &self.io; - if let Ok(res) = io.try_io(Interest::READABLE, || recv(SockRef::from(io), bufs, meta)) { + if let Ok(res) = io.try_io(Interest::READABLE, || { + recv::(SockRef::from(io), bufs, meta) + }) { return Poll::Ready(Ok(res)); } } @@ -512,8 +592,15 @@ pub mod sync { pub fn recv(&self, buf: &mut [u8]) -> io::Result { self.io.recv(buf) } + /// Calls syscall [`sendmmsg`]. With a given `state` configured GSO and - /// `transmits` with information on the data and metadata about outgoing packets. + /// `transmits` with information on the data and metadata about outgoing + /// packets. + /// + /// Utilizes the default batch size (`DEFAULT_BATCH_SIZE`), and will + /// send no more than that number of messages. The caller must call this + /// fuction again after modifying `transmits` to continue sending the + /// entire set of messages. /// /// [`sendmmsg`]: https://linux.die.net/man/2/sendmmsg pub fn send_mmsg>( @@ -521,13 +608,31 @@ pub mod sync { state: &UdpState, transmits: &[Transmit], ) -> Result { - send( + self.send_mmsg_with_batch_size::<_, DEFAULT_BATCH_SIZE>(state, transmits) + } + + /// Calls syscall [`sendmmsg`]. With a given `state` configured GSO and + /// `transmits` with information on the data and metadata about outgoing packets. + /// + /// Sends no more than `BATCH_SIZE` messages. The caller must call this + /// fuction again after modifying `transmits` to continue sending the + /// entire set of messages. `BATCH_SIZE_CAP` defines the maximum that + /// will be sent, regardless of the specified `BATCH_SIZE` + /// + /// [`sendmmsg`]: https://linux.die.net/man/2/sendmmsg + pub fn send_mmsg_with_batch_size, const BATCH_SIZE: usize>( + &mut self, + state: &UdpState, + transmits: &[Transmit], + ) -> Result { + send::<_, BATCH_SIZE>( state, SockRef::from(&self.io), self.last_send_error.clone(), transmits, ) } + /// Calls syscall [`sendmsg`]. With a given `state` configured GSO and /// `transmit` with information on the data and metadata about outgoing packet. /// @@ -545,9 +650,18 @@ pub mod sync { &self, bufs: &mut [IoSliceMut<'_>], meta: &mut [RecvMeta], + ) -> io::Result { + self.recv_mmsg_with_batch_size::(bufs, meta) + } + + /// async version of `recvmmsg`, with compile-time configurable batch size + pub fn recv_mmsg_with_batch_size( + &self, + bufs: &mut [IoSliceMut<'_>], + meta: &mut [RecvMeta], ) -> io::Result { debug_assert!(!bufs.is_empty()); - recv(SockRef::from(&self.io), bufs, meta) + recv::(SockRef::from(&self.io), bufs, meta) } /// `recv_msg` is similar to `recv_from` but returns extra information @@ -666,7 +780,8 @@ fn init(io: SockRef<'_>) -> io::Result<()> { #[cfg(not(any(target_os = "macos", target_os = "ios")))] fn send_msg>( - state: &UdpState, + // `state` is not presently used on FreeBSD + #[allow(unused_variables)] state: &UdpState, io: SockRef<'_>, transmit: &Transmit, ) -> io::Result { @@ -725,8 +840,9 @@ fn send_msg>( } #[cfg(not(any(target_os = "macos", target_os = "ios")))] -fn send>( - state: &UdpState, +fn send, const BATCH_SIZE: usize>( + // `state` is not presently used on FreeBSD + #[allow(unused_variables)] state: &UdpState, io: SockRef<'_>, last_send_error: LastSendError, transmits: &[Transmit], @@ -760,8 +876,12 @@ fn send>( let num_transmits = transmits.len().min(BATCH_SIZE); loop { + #[cfg(target_os = "linux")] let n = unsafe { libc::sendmmsg(io.as_raw_fd(), msgs.as_mut_ptr(), num_transmits as u32, 0) }; + #[cfg(target_os = "freebsd")] + let n = + unsafe { libc::sendmmsg(io.as_raw_fd(), msgs.as_mut_ptr(), num_transmits as usize, 0) }; if n == -1 { let e = io::Error::last_os_error(); match e.kind() { @@ -846,7 +966,7 @@ fn send_msg>( } #[cfg(any(target_os = "macos", target_os = "ios"))] -fn send>( +fn send, const BATCH_SIZE: usize>( _state: &UdpState, io: SockRef<'_>, last_send_error: LastSendError, @@ -887,7 +1007,11 @@ fn send>( } #[cfg(not(any(target_os = "macos", target_os = "ios")))] -fn recv(io: SockRef<'_>, bufs: &mut [IoSliceMut<'_>], meta: &mut [RecvMeta]) -> io::Result { +fn recv( + io: SockRef<'_>, + bufs: &mut [IoSliceMut<'_>], + meta: &mut [RecvMeta], +) -> io::Result { use std::ptr; let mut names = [MaybeUninit::::uninit(); BATCH_SIZE]; @@ -903,6 +1027,7 @@ fn recv(io: SockRef<'_>, bufs: &mut [IoSliceMut<'_>], meta: &mut [RecvMeta]) -> ); } let msg_count = loop { + #[cfg(target_os = "linux")] let n = unsafe { libc::recvmmsg( io.as_raw_fd(), @@ -912,6 +1037,16 @@ fn recv(io: SockRef<'_>, bufs: &mut [IoSliceMut<'_>], meta: &mut [RecvMeta]) -> ptr::null_mut(), ) }; + #[cfg(target_os = "freebsd")] + let n = unsafe { + libc::recvmmsg( + io.as_raw_fd(), + hdrs.as_mut_ptr(), + bufs.len().min(BATCH_SIZE) as usize, + 0, + ptr::null_mut(), + ) + }; if n == -1 { let e = io::Error::last_os_error(); if e.kind() == io::ErrorKind::Interrupted { @@ -953,7 +1088,11 @@ fn recv_msg(io: SockRef<'_>, bufs: &mut IoSliceMut<'_>) -> io::Result } #[cfg(any(target_os = "macos", target_os = "ios"))] -fn recv(io: SockRef<'_>, bufs: &mut [IoSliceMut<'_>], meta: &mut [RecvMeta]) -> io::Result { +fn recv( + io: SockRef<'_>, + bufs: &mut [IoSliceMut<'_>], + meta: &mut [RecvMeta], +) -> io::Result { let mut name = MaybeUninit::::uninit(); let mut ctrl = cmsg::Aligned(MaybeUninit::<[u8; CMSG_LEN]>::uninit()); let mut hdr = unsafe { mem::zeroed::() }; @@ -1076,6 +1215,7 @@ fn prepare_msg>( }; encoder.push(libc::IPPROTO_IPV6, libc::IPV6_PKTINFO, pktinfo); } + #[cfg(not(target_os = "freebsd"))] Source::Interface(i) => { let pktinfo = libc::in_pktinfo { ipi_ifindex: *i as _, // i32 linux, u32 mac @@ -1084,6 +1224,8 @@ fn prepare_msg>( }; encoder.push(libc::IPPROTO_IP, libc::IP_PKTINFO, pktinfo); } + #[cfg(target_os = "freebsd")] + Source::Interface(_) => (), // Not yet supported on FreeBSD Source::InterfaceV6(i, ip) => { let pktinfo = libc::in6_pktinfo { ipi6_ifindex: *i, @@ -1122,9 +1264,12 @@ fn decode_recv( let name = unsafe { name.assume_init() }; let mut ecn_bits = 0; let mut dst_ip = None; + // Only mutated on Linux + #[allow(unused_mut)] let mut dst_local_ip = None; let mut ifindex = 0; - #[allow(unused_mut)] // only mutable on Linux + // Only mutated on Linux + #[allow(unused_mut)] let mut stride = len; let cmsg_iter = unsafe { cmsg::Iter::new(hdr) }; @@ -1145,6 +1290,7 @@ fn decode_recv( ecn_bits = cmsg::decode::(cmsg) as u8; } }, + #[cfg(not(target_os = "freebsd"))] (libc::IPPROTO_IP, libc::IP_PKTINFO) => { let pktinfo = unsafe { cmsg::decode::(cmsg) }; dst_ip = Some(IpAddr::V4(Ipv4Addr::from( @@ -1208,13 +1354,6 @@ fn decode_recv( } } -#[cfg(not(any(target_os = "macos", target_os = "ios")))] -// Chosen somewhat arbitrarily; might benefit from additional tuning. -pub const BATCH_SIZE: usize = 32; - -#[cfg(any(target_os = "macos", target_os = "ios"))] -pub const BATCH_SIZE: usize = 1; - #[cfg(target_os = "linux")] mod gso { use super::*;