Skip to content

Commit

Permalink
Implement Unix domain sockets support
Browse files Browse the repository at this point in the history
  • Loading branch information
w1ldptr authored and carllerche committed Dec 1, 2014
1 parent fff8edd commit ff6d5af
Show file tree
Hide file tree
Showing 6 changed files with 420 additions and 9 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@ little overhead as possible over the OS abstractions.

* Event loop backed by epoll, kqueue.
* Zero allocations at runtime
* Non-blocking TCP sockets
* Non-blocking TCP, UDP and Unix domain sockets
* High performance timer system
* Thread safe message channel for cross thread communication

__Coming soon__

* UDP and Unix domain sockets
* Signal handling

## Non goals
Expand Down
6 changes: 5 additions & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use std::io;
use nix::errno::{SysError, EAGAIN};
use nix::errno::{SysError, EAGAIN, EADDRINUSE};

use self::MioErrorKind::{
Eof,
BufUnderflow,
BufOverflow,
WouldBlock,
AddrInUse,
EventLoopTerminated,
OtherError
};
Expand All @@ -22,6 +23,7 @@ pub struct MioError {
pub enum MioErrorKind {
Eof, // End of file or socket closed
WouldBlock, // The operation would have blocked
AddrInUse, // Inet socket address or domain socket path already in use
BufUnderflow, // Buf does not contain enough data to perform read op
BufOverflow, // Buf does not contain enough capacity to perform write op
EventLoopTerminated, // The event loop is not running anymore
Expand Down Expand Up @@ -53,6 +55,7 @@ impl MioError {
pub fn from_sys_error(err: SysError) -> MioError {
let kind = match err.kind {
EAGAIN => WouldBlock,
EADDRINUSE => AddrInUse,
_ => OtherError
};

Expand Down Expand Up @@ -96,6 +99,7 @@ impl MioError {
match self.kind {
Eof | BufUnderflow | BufOverflow => io::standard_error(io::EndOfFile),
WouldBlock => io::standard_error(io::ResourceUnavailable),
AddrInUse => io::standard_error(io::PathAlreadyExists),
OtherError => match self.sys {
Some(err) => io::IoError::from_errno(err.kind as uint, false),
None => io::standard_error(io::OtherIoError)
Expand Down
110 changes: 108 additions & 2 deletions src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ impl SockAddr {
}
}

pub fn from_path(p: Path) -> SockAddr {
UnixAddr(p)
}

#[inline]
pub fn consume_std(addr: StdSocketAddr) -> SockAddr {
InetAddr(addr.ip, addr.port)
Expand Down Expand Up @@ -387,21 +391,123 @@ pub mod udp {

pub mod pipe {
use os;
use io::{IoHandle};
use net::Socket;
use error::MioResult;
use buf::{Buf, MutBuf};
use io;
use io::{IoHandle, IoAcceptor, IoReader, IoWriter, NonBlock};
use io::NonBlock::{Ready, WouldBlock};
use net::{Socket, SockAddr, SocketType};
use net::SocketType::Stream;
use net::AddressFamily::Unix;

#[deriving(Show)]
pub struct UnixSocket {
desc: os::IoDesc
}

impl UnixSocket {
pub fn stream() -> MioResult<UnixSocket> {
UnixSocket::new(Stream)
}

fn new(socket_type: SocketType) -> MioResult<UnixSocket> {
Ok(UnixSocket { desc: try!(os::socket(Unix, socket_type)) })
}

pub fn connect(&self, addr: &SockAddr) -> MioResult<()> {
debug!("socket connect; addr={}", addr);

// Attempt establishing the context. This may not complete immediately.
if try!(os::connect(&self.desc, addr)) {
// On some OSs, connecting to localhost succeeds immediately. In
// this case, queue the writable callback for execution during the
// next event loop tick.
debug!("socket connected immediately; addr={}", addr);
}

Ok(())
}

pub fn bind(self, addr: &SockAddr) -> MioResult<UnixListener> {
try!(os::bind(&self.desc, addr))
Ok(UnixListener { desc: self.desc })
}
}

impl IoHandle for UnixSocket {
fn desc(&self) -> &os::IoDesc {
&self.desc
}
}

impl IoReader for UnixSocket {
fn read(&mut self, buf: &mut MutBuf) -> MioResult<NonBlock<(uint)>> {
io::read(self, buf)
}
}

impl IoWriter for UnixSocket {
fn write(&mut self, buf: &mut Buf) -> MioResult<NonBlock<(uint)>> {
io::write(self, buf)
}
}

impl Socket for UnixSocket {
}

#[deriving(Show)]
pub struct UnixListener {
desc: os::IoDesc,
}

impl UnixListener {
pub fn listen(self, backlog: uint) -> MioResult<UnixAcceptor> {
try!(os::listen(self.desc(), backlog));
Ok(UnixAcceptor { desc: self.desc })
}
}

impl IoHandle for UnixListener {
fn desc(&self) -> &os::IoDesc {
&self.desc
}
}

#[deriving(Show)]
pub struct UnixAcceptor {
desc: os::IoDesc,
}

impl UnixAcceptor {
pub fn new(addr: &SockAddr, backlog: uint) -> MioResult<UnixAcceptor> {
let sock = try!(UnixSocket::stream());
let listener = try!(sock.bind(addr));
listener.listen(backlog)
}
}

impl IoHandle for UnixAcceptor {
fn desc(&self) -> &os::IoDesc {
&self.desc
}
}

impl Socket for UnixAcceptor {
}

impl IoAcceptor<UnixSocket> for UnixAcceptor {
fn accept(&mut self) -> MioResult<NonBlock<UnixSocket>> {
match os::accept(self.desc()) {
Ok(sock) => Ok(Ready(UnixSocket { desc: sock })),
Err(e) => {
if e.is_would_block() {
return Ok(WouldBlock);
}

return Err(e);
}
}
}
}
}

29 changes: 25 additions & 4 deletions src/os/posix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use std::num::Int;
use error::{MioResult, MioError};
use net::{AddressFamily, SockAddr, IPv4Addr, SocketType};
use net::SocketType::{Dgram, Stream};
use net::SockAddr::InetAddr;
use net::AddressFamily::{Inet, Inet6};
use net::SockAddr::{InetAddr, UnixAddr};
use net::AddressFamily::{Inet, Inet6, Unix};
pub use std::io::net::ip::IpAddr;

mod nix {
Expand Down Expand Up @@ -94,7 +94,7 @@ pub fn socket(af: AddressFamily, sock_type: SocketType) -> MioResult<IoDesc> {
let family = match af {
Inet => nix::AF_INET,
Inet6 => nix::AF_INET6,
_ => unimplemented!()
Unix => nix::AF_UNIX
};

let socket_type = match sock_type {
Expand Down Expand Up @@ -262,6 +262,15 @@ fn to_sockaddr(addr: &nix::SockAddr) -> SockAddr {
nix::SockAddr::SockIpV4(sin) => {
InetAddr(u32be_to_ipv4(sin.sin_addr.s_addr), Int::from_be(sin.sin_port))
}
nix::SockAddr::SockUnix(addr) => {
let mut str_path = String::new();
for c in addr.sun_path.iter() {
if *c == 0 { break; }
str_path.push(*c as u8 as char);
}

UnixAddr(Path::new(str_path))
}
_ => unimplemented!()
}
}
Expand All @@ -284,7 +293,19 @@ fn from_sockaddr(addr: &SockAddr) -> nix::SockAddr {
_ => unimplemented!()
}
}
_ => unimplemented!()
UnixAddr(ref path) => {
let mut addr: nix::sockaddr_un = unsafe { mem::zeroed() };

addr.sun_family = nix::AF_UNIX as nix::sa_family_t;

let c_path_ptr = path.to_c_str();
assert!(c_path_ptr.len() < addr.sun_path.len());
for (sp_iter, path_iter) in addr.sun_path.iter_mut().zip(c_path_ptr.iter()) {
*sp_iter = path_iter as i8;
}

nix::SockAddr::SockUnix(addr)
}
}
}

Expand Down
1 change: 1 addition & 0 deletions test/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ mod test_timer;
mod test_udp_socket;
mod test_udp_socket_connectionless;
mod test_register_deregister;
mod test_unix_echo_server;

mod ports {
use std::sync::atomic::{AtomicUint, SeqCst, INIT_ATOMIC_UINT};
Expand Down
Loading

0 comments on commit ff6d5af

Please sign in to comment.