From dd7bea4fa067ff70c560b78ca5e0af438ffb5e7f Mon Sep 17 00:00:00 2001 From: shamb0 Date: Tue, 24 Dec 2024 12:08:26 +0530 Subject: [PATCH] Add callback support to FileDescription - Implementing atomic reads for contiguous buffers - Supports read operations with callback-based completion. Signed-off-by: shamb0 --- src/shims/files.rs | 21 ++++++ src/shims/unix/fd.rs | 25 +++++++- src/shims/unix/fs.rs | 149 +++++++++++++++++++++++++++++++++++++++---- 3 files changed, 180 insertions(+), 15 deletions(-) diff --git a/src/shims/files.rs b/src/shims/files.rs index 73425eee51..aa78bcd4da 100644 --- a/src/shims/files.rs +++ b/src/shims/files.rs @@ -121,6 +121,9 @@ impl FileDescriptionExt for T { pub type DynFileDescriptionRef = FileDescriptionRef; +/// Represents a dynamic callback for file I/O operations. +pub type DynFileIOCallback<'tcx> = DynMachineCallback<'tcx, Result>; + impl FileDescriptionRef { pub fn downcast(self) -> Option> { let inner = self.into_rc_any().downcast::>().ok()?; @@ -135,6 +138,7 @@ pub trait FileDescription: std::fmt::Debug + FileDescriptionExt { /// Reads as much as possible into the given buffer `ptr`. /// `len` indicates how many bytes we should try to read. /// `dest` is where the return value should be stored: number of bytes read, or `-1` in case of error. + #[allow(dead_code)] fn read<'tcx>( self: FileDescriptionRef, _communicate_allowed: bool, @@ -146,6 +150,23 @@ pub trait FileDescription: std::fmt::Debug + FileDescriptionExt { throw_unsup_format!("cannot read from {}", self.name()); } + /// Performs an atomic file read operation with callback completion. + /// The operation guarantees thread safety and proper error propagation. + /// + /// The caller must ensure that: + /// * The buffer pointer points to valid memory of sufficient size + /// * The file description remains valid for the duration of the operation + fn read_with_callback<'tcx>( + self: FileDescriptionRef, + _communicate_allowed: bool, + _ptr: Pointer, + _len: usize, + _completion_callback: DynFileIOCallback<'tcx>, + _ecx: &mut MiriInterpCx<'tcx>, + ) -> InterpResult<'tcx> { + throw_unsup_format!("cannot read from {}", self.name()); + } + /// Writes as much as possible from the given buffer `ptr`. /// `len` indicates how many bytes we should try to write. /// `dest` is where the return value should be stored: number of bytes written, or `-1` in case of error. diff --git a/src/shims/unix/fd.rs b/src/shims/unix/fd.rs index 0b59490308..f0e6dfe498 100644 --- a/src/shims/unix/fd.rs +++ b/src/shims/unix/fd.rs @@ -7,7 +7,7 @@ use std::io::ErrorKind; use rustc_abi::Size; use crate::helpers::check_min_arg_count; -use crate::shims::files::FileDescription; +use crate::shims::files::{DynFileIOCallback, FileDescription}; use crate::shims::unix::linux_like::epoll::EpollReadyEvents; use crate::shims::unix::*; use crate::*; @@ -203,7 +203,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { interp_ok(Scalar::from_i32(this.try_unwrap_io_result(result)?)) } - /// Read data from `fd` into buffer specified by `buf` and `count`. + /// Reads data from a file descriptor using callback-based completion. /// /// If `offset` is `None`, reads data from current cursor position associated with `fd` /// and updates cursor position on completion. Otherwise, reads from the specified offset @@ -244,8 +244,27 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { // because it was a target's `usize`. Also we are sure that its smaller than // `usize::MAX` because it is bounded by the host's `isize`. + // Clone the result destination for use in the completion callback + let result_destination = dest.clone(); + + let completion_callback: DynFileIOCallback<'tcx> = callback!( + @capture<'tcx> { + result_destination: MPlaceTy<'tcx>, + } + |this, read_result: Result| { + match read_result { + Ok(read_size) => { + this.write_int(u64::try_from(read_size).unwrap(), &result_destination) + } + Err(_err_code) => { + this.set_last_error_and_return(LibcError("EIO"), &result_destination) + } + } + } + ); + match offset { - None => fd.read(communicate, buf, count, dest, this)?, + None => fd.read_with_callback(communicate, buf, count, completion_callback, this)?, Some(offset) => { let Ok(offset) = u64::try_from(offset) else { return this.set_last_error_and_return(LibcError("EINVAL"), dest); diff --git a/src/shims/unix/fs.rs b/src/shims/unix/fs.rs index 25594b7803..d796a739f5 100644 --- a/src/shims/unix/fs.rs +++ b/src/shims/unix/fs.rs @@ -14,7 +14,10 @@ use rustc_data_structures::fx::FxHashMap; use self::shims::time::system_time_to_duration; use crate::helpers::check_min_arg_count; -use crate::shims::files::{EvalContextExt as _, FileDescription, FileDescriptionRef}; +use crate::shims::files::{ + DynFileIOCallback, EvalContextExt as _, FileDescription, FileDescriptionRef, + WeakFileDescriptionRef, +}; use crate::shims::os_str::bytes_to_os_str; use crate::shims::unix::fd::{FlockOp, UnixFileDescription}; use crate::*; @@ -23,6 +26,86 @@ use crate::*; struct FileHandle { file: File, writable: bool, + /// Mutex for synchronizing file access across threads. + file_lock: MutexRef, +} + +impl VisitProvenance for FileHandle { + fn visit_provenance(&self, _visit: &mut VisitWith<'_>) { + // No provenance tracking needed for FileHandle as it contains no references. + // This implementation satisfies the trait requirement but performs no operations. + } +} + +impl FileHandle { + /// Creates a new FileHandle with specified permissions and synchronization primitive. + fn new(file: File, writable: bool, file_lock: MutexRef) -> Self { + Self { file, writable, file_lock } + } + + /// Attempts to create a clone of the file handle while preserving all attributes. + /// + /// # Errors + /// Returns an `InterpResult` error if file handle cloning fails. + fn try_clone<'tcx>(&self) -> InterpResult<'tcx, FileHandle> { + let cloned_file = self + .file + .try_clone() + .map_err(|e| err_unsup_format!("Failed to clone file handle: {}", e))?; + + interp_ok(FileHandle { + file: cloned_file, + writable: self.writable, + file_lock: self.file_lock.clone(), + }) + } + + /// Performs a synchronized file read operation with callback completion. + /// Acquires a mutex lock, validates the file descriptor, performs the read, + /// and invokes the callback with the result. + fn perform_read<'tcx>( + this: &mut MiriInterpCx<'tcx>, + completion_callback: DynFileIOCallback<'tcx>, + mut file_handle: FileHandle, + weak_fd: WeakFileDescriptionRef, + buffer_ptr: Pointer, + length: usize, + ) -> InterpResult<'tcx> { + this.mutex_lock(&file_handle.file_lock); + + let result = { + // Verify file descriptor is still valid + if weak_fd.upgrade().is_none() { + throw_unsup_format!("file got closed while blocking") + } + + let mut bytes = vec![0; length]; + let read_result = file_handle.file.read(&mut bytes); + + // Handle the read result + match read_result { + Ok(read_size) => { + // Write the bytes to memory + if let Err(err_code) = this + .write_bytes_ptr(buffer_ptr, bytes[..read_size].iter().copied()) + .report_err() + { + throw_unsup_format!( + "Memory write failed during file read operation: {:#?}", + err_code + ) + } + completion_callback.call(this, Ok(read_size)) + } + Err(err_code) => completion_callback.call(this, Err(err_code)), + } + }; + + // Always unlock the mutex, even if the read operation failed + this.mutex_unlock(&file_handle.file_lock)?; + + result + } } impl FileDescription for FileHandle { @@ -30,20 +113,54 @@ impl FileDescription for FileHandle { "file" } - fn read<'tcx>( + fn read_with_callback<'tcx>( self: FileDescriptionRef, communicate_allowed: bool, ptr: Pointer, len: usize, - dest: &MPlaceTy<'tcx>, + completion_callback: DynFileIOCallback<'tcx>, ecx: &mut MiriInterpCx<'tcx>, ) -> InterpResult<'tcx> { + let this = ecx; assert!(communicate_allowed, "isolation should have prevented even opening a file"); - let mut bytes = vec![0; len]; - let result = (&mut &self.file).read(&mut bytes); - match result { - Ok(read_size) => ecx.return_read_success(ptr, &bytes, read_size, dest), - Err(e) => ecx.set_last_error_and_return(e, dest), + + // Clone the underlying File + let clone_file_handle = match self.try_clone().report_err() { + Ok(handle) => handle, + Err(ec) => throw_unsup_format!("unable to clone file discp {:#?}", ec), + }; + + let weak_fd = FileDescriptionRef::downgrade(&self); + + if this.mutex_is_locked(&self.file_lock) { + this.block_thread( + BlockReason::Mutex, + None, + callback!( + @capture<'tcx> { + completion_callback: DynFileIOCallback<'tcx>, + clone_file_handle: FileHandle, + weak_fd: WeakFileDescriptionRef, + ptr: Pointer, + len: usize, + } + |this, unblock: UnblockKind| { + assert_eq!(unblock, UnblockKind::Ready); + FileHandle::perform_read(this, completion_callback, clone_file_handle, weak_fd, ptr, len) + } + ), + ); + + unreachable!() + } else { + FileHandle::perform_read( + this, + completion_callback, + clone_file_handle, + weak_fd, + ptr, + len, + ) } } @@ -584,9 +701,13 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { return this.set_last_error_and_return_i32(ErrorKind::PermissionDenied); } - let fd = options - .open(path) - .map(|file| this.machine.fds.insert_new(FileHandle { file, writable })); + let fd = options.open(path).map(|file| { + this.machine.fds.insert_new(FileHandle::new( + file, + writable, + this.machine.sync.mutex_create(), + )) + }); interp_ok(Scalar::from_i32(this.try_unwrap_io_result(fd)?)) } @@ -1645,7 +1766,11 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { match file { Ok(f) => { - let fd = this.machine.fds.insert_new(FileHandle { file: f, writable: true }); + let fd = this.machine.fds.insert_new(FileHandle::new( + f, + true, + this.machine.sync.mutex_create(), + )); return interp_ok(Scalar::from_i32(fd)); } Err(e) =>