Skip to content

Commit

Permalink
Merge pull request #4142 from joboet/apple-futex
Browse files Browse the repository at this point in the history
shim Apple's futex primitives
  • Loading branch information
RalfJung authored Feb 2, 2025
2 parents df9fbcd + 3954d5c commit 1c797a2
Show file tree
Hide file tree
Showing 7 changed files with 626 additions and 53 deletions.
64 changes: 34 additions & 30 deletions src/concurrency/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ struct Condvar {
/// The futex state.
#[derive(Default, Debug)]
struct Futex {
waiters: VecDeque<FutexWaiter>,
waiters: Vec<FutexWaiter>,
/// Tracks the happens-before relationship
/// between a futex-wake and a futex-wait
/// during a non-spurious wake event.
Expand All @@ -140,6 +140,12 @@ struct Futex {
#[derive(Default, Clone)]
pub struct FutexRef(Rc<RefCell<Futex>>);

impl FutexRef {
pub fn waiters(&self) -> usize {
self.0.borrow().waiters.len()
}
}

impl VisitProvenance for FutexRef {
fn visit_provenance(&self, _visit: &mut VisitWith<'_>) {
// No provenance in `Futex`.
Expand Down Expand Up @@ -728,25 +734,21 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
interp_ok(true)
}

/// Wait for the futex to be signaled, or a timeout.
/// On a signal, `retval_succ` is written to `dest`.
/// On a timeout, `retval_timeout` is written to `dest` and `errno_timeout` is set as the last error.
/// Wait for the futex to be signaled, or a timeout. Once the thread is
/// unblocked, `callback` is called with the unblock reason.
fn futex_wait(
&mut self,
futex_ref: FutexRef,
bitset: u32,
timeout: Option<(TimeoutClock, TimeoutAnchor, Duration)>,
retval_succ: Scalar,
retval_timeout: Scalar,
dest: MPlaceTy<'tcx>,
errno_timeout: IoError,
callback: DynUnblockCallback<'tcx>,
) {
let this = self.eval_context_mut();
let thread = this.active_thread();
let mut futex = futex_ref.0.borrow_mut();
let waiters = &mut futex.waiters;
assert!(waiters.iter().all(|waiter| waiter.thread != thread), "thread is already waiting");
waiters.push_back(FutexWaiter { thread, bitset });
waiters.push(FutexWaiter { thread, bitset });
drop(futex);

this.block_thread(
Expand All @@ -755,10 +757,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
callback!(
@capture<'tcx> {
futex_ref: FutexRef,
retval_succ: Scalar,
retval_timeout: Scalar,
dest: MPlaceTy<'tcx>,
errno_timeout: IoError,
callback: DynUnblockCallback<'tcx>,
}
|this, unblock: UnblockKind| {
match unblock {
Expand All @@ -768,29 +767,29 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
if let Some(data_race) = &this.machine.data_race {
data_race.acquire_clock(&futex.clock, &this.machine.threads);
}
// Write the return value.
this.write_scalar(retval_succ, &dest)?;
interp_ok(())
},
UnblockKind::TimedOut => {
// Remove the waiter from the futex.
let thread = this.active_thread();
let mut futex = futex_ref.0.borrow_mut();
futex.waiters.retain(|waiter| waiter.thread != thread);
// Set errno and write return value.
this.set_last_error(errno_timeout)?;
this.write_scalar(retval_timeout, &dest)?;
interp_ok(())
},
}

callback.call(this, unblock)
}
),
);
}

/// Wake up the first thread in the queue that matches any of the bits in the bitset.
/// Returns whether anything was woken.
fn futex_wake(&mut self, futex_ref: &FutexRef, bitset: u32) -> InterpResult<'tcx, bool> {
/// Wake up `count` of the threads in the queue that match any of the bits
/// in the bitset. Returns how many threads were woken.
fn futex_wake(
&mut self,
futex_ref: &FutexRef,
bitset: u32,
count: usize,
) -> InterpResult<'tcx, usize> {
let this = self.eval_context_mut();
let mut futex = futex_ref.0.borrow_mut();
let data_race = &this.machine.data_race;
Expand All @@ -800,13 +799,18 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
data_race.release_clock(&this.machine.threads, |clock| futex.clock.clone_from(clock));
}

// Wake up the first thread in the queue that matches any of the bits in the bitset.
let Some(i) = futex.waiters.iter().position(|w| w.bitset & bitset != 0) else {
return interp_ok(false);
};
let waiter = futex.waiters.remove(i).unwrap();
// Remove `count` of the threads in the queue that match any of the bits in the bitset.
// We collect all of them before unblocking because the unblock callback may access the
// futex state to retrieve the remaining number of waiters on macOS.
let waiters: Vec<_> =
futex.waiters.extract_if(.., |w| w.bitset & bitset != 0).take(count).collect();
drop(futex);
this.unblock_thread(waiter.thread, BlockReason::Futex)?;
interp_ok(true)

let woken = waiters.len();
for waiter in waiters {
this.unblock_thread(waiter.thread, BlockReason::Futex)?;
}

interp_ok(woken)
}
}
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#![feature(unqualified_local_imports)]
#![feature(derive_coerce_pointee)]
#![feature(arbitrary_self_types)]
#![feature(unsigned_is_multiple_of)]
#![feature(extract_if)]
// Configure clippy and other lints
#![allow(
clippy::collapsible_else_if,
Expand Down
30 changes: 16 additions & 14 deletions src/shims/unix/linux_like/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,14 +158,24 @@ pub fn futex<'tcx>(
.futex
.clone();

let dest = dest.clone();
ecx.futex_wait(
futex_ref,
bitset,
timeout,
Scalar::from_target_isize(0, ecx), // retval_succ
Scalar::from_target_isize(-1, ecx), // retval_timeout
dest.clone(),
LibcError("ETIMEDOUT"), // errno_timeout
callback!(
@capture<'tcx> {
dest: MPlaceTy<'tcx>,
}
|ecx, unblock: UnblockKind| match unblock {
UnblockKind::Ready => {
ecx.write_int(0, &dest)
}
UnblockKind::TimedOut => {
ecx.set_last_error_and_return(LibcError("ETIMEDOUT"), &dest)
}
}
),
);
} else {
// The futex value doesn't match the expected value, so we return failure
Expand Down Expand Up @@ -209,16 +219,8 @@ pub fn futex<'tcx>(
// will see the latest value on addr which could be changed by our caller
// before doing the syscall.
ecx.atomic_fence(AtomicFenceOrd::SeqCst)?;
let mut n = 0;
#[expect(clippy::arithmetic_side_effects)]
for _ in 0..val {
if ecx.futex_wake(&futex_ref, bitset)? {
n += 1;
} else {
break;
}
}
ecx.write_scalar(Scalar::from_target_isize(n, ecx), dest)?;
let woken = ecx.futex_wake(&futex_ref, bitset, val.try_into().unwrap())?;
ecx.write_scalar(Scalar::from_target_isize(woken.try_into().unwrap(), ecx), dest)?;
}
op => throw_unsup_format!("Miri does not support `futex` syscall with op={}", op),
}
Expand Down
66 changes: 63 additions & 3 deletions src/shims/unix/macos/foreign_items.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,20 @@ use rustc_middle::ty::Ty;
use rustc_span::Symbol;
use rustc_target::callconv::{Conv, FnAbi};

use super::sync::EvalContextExt as _;
use super::sync::{EvalContextExt as _, MacOsFutexTimeout};
use crate::shims::unix::*;
use crate::*;

pub fn is_dyn_sym(_name: &str) -> bool {
false
pub fn is_dyn_sym(name: &str) -> bool {
match name {
// These only became available with macOS 11.0, so std looks them up dynamically.
"os_sync_wait_on_address"
| "os_sync_wait_on_address_with_deadline"
| "os_sync_wait_on_address_with_timeout"
| "os_sync_wake_by_address_any"
| "os_sync_wake_by_address_all" => true,
_ => false,
}
}

impl<'tcx> EvalContextExt<'tcx> for crate::MiriInterpCx<'tcx> {}
Expand Down Expand Up @@ -214,6 +222,58 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
this.write_scalar(res, dest)?;
}

// Futex primitives
"os_sync_wait_on_address" => {
let [addr_op, value_op, size_op, flags_op] =
this.check_shim(abi, Conv::C, link_name, args)?;
this.os_sync_wait_on_address(
addr_op,
value_op,
size_op,
flags_op,
MacOsFutexTimeout::None,
dest,
)?;
}
"os_sync_wait_on_address_with_deadline" => {
let [addr_op, value_op, size_op, flags_op, clock_op, timeout_op] =
this.check_shim(abi, Conv::C, link_name, args)?;
this.os_sync_wait_on_address(
addr_op,
value_op,
size_op,
flags_op,
MacOsFutexTimeout::Absolute { clock_op, timeout_op },
dest,
)?;
}
"os_sync_wait_on_address_with_timeout" => {
let [addr_op, value_op, size_op, flags_op, clock_op, timeout_op] =
this.check_shim(abi, Conv::C, link_name, args)?;
this.os_sync_wait_on_address(
addr_op,
value_op,
size_op,
flags_op,
MacOsFutexTimeout::Relative { clock_op, timeout_op },
dest,
)?;
}
"os_sync_wake_by_address_any" => {
let [addr_op, size_op, flags_op] =
this.check_shim(abi, Conv::C, link_name, args)?;
this.os_sync_wake_by_address(
addr_op, size_op, flags_op, /* all */ false, dest,
)?;
}
"os_sync_wake_by_address_all" => {
let [addr_op, size_op, flags_op] =
this.check_shim(abi, Conv::C, link_name, args)?;
this.os_sync_wake_by_address(
addr_op, size_op, flags_op, /* all */ true, dest,
)?;
}

"os_unfair_lock_lock" => {
let [lock_op] = this.check_shim(abi, Conv::C, link_name, args)?;
this.os_unfair_lock_lock(lock_op)?;
Expand Down
Loading

0 comments on commit 1c797a2

Please sign in to comment.