Skip to content
This repository has been archived by the owner on Dec 6, 2024. It is now read-only.

Commit

Permalink
Adds failfast mode for unroutable requests.
Browse files Browse the repository at this point in the history
* Fixes Issue #26
* Adds a failfast mode for currently unroutable requests.
* Adds an integration test failfast.
* Adds a mock namerd.
* Adds a mock static webserver.
* Improves some comments.
  • Loading branch information
Steve Jenson committed Apr 10, 2017
1 parent 94d6410 commit 38fc35f
Show file tree
Hide file tree
Showing 12 changed files with 318 additions and 51 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,10 @@ tokio-io = "0.1"
tokio-service = "0.1"
tokio-timer = "0.1"
url = "1.4"

[dev-dependencies]
env_logger = { version = "0.3", default-features = false }
futures = "0.1"
# We use not-yet-released tokio integration on master:
hyper = { git = "https://github.com/hyperium/hyper", rev = "5a3743c1" }
tokio-core = "0.1"
79 changes: 45 additions & 34 deletions src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::cell::RefCell;
use std::collections::{VecDeque, HashMap};
use std::fs::File;
use std::io::{self, BufReader};
use std::net;
use std::net::{self, SocketAddr};
use std::rc::Rc;
use std::time::Duration;
use tacho::{self, Tacho};
Expand All @@ -21,12 +21,12 @@ mod admin_http;
mod sni;
pub mod config;

use self::config::*;
use self::sni::Sni;
use WeightedAddr;
use lb::{Balancer, Acceptor, Connector, PlainAcceptor, PlainConnector, SecureAcceptor,
SecureConnector};
use namerd;
use self::config::*;
use self::sni::Sni;

const DEFAULT_BUFFER_SIZE: usize = 8 * 1024;
const DEFAULT_MAX_WAITERS: usize = 8;
Expand Down Expand Up @@ -95,7 +95,7 @@ pub fn configure(app: AppConfig) -> (Admin, Proxies) {

pub trait Loader: Sized {
type Run: Future<Item = (), Error = io::Error>;
fn load(self, handle: Handle) -> io::Result<Self::Run>;
fn load(self, handle: Handle) -> io::Result<(SocketAddr, Self::Run)>;
}
pub trait Runner: Sized {
fn run(self) -> io::Result<()>;
Expand All @@ -104,7 +104,7 @@ pub trait Runner: Sized {
impl<L: Loader> Runner for L {
fn run(self) -> io::Result<()> {
let mut core = Core::new()?;
let fut = self.load(core.handle())?;
let (_, fut) = self.load(core.handle())?;
core.run(fut)
}
}
Expand All @@ -118,12 +118,12 @@ pub struct Admin {
}
impl Loader for Admin {
type Run = Running;
fn load(self, handle: Handle) -> io::Result<Running> {
fn load(self, handle: Handle) -> io::Result<(SocketAddr, Running)> {
let mut running = Running::new();
{
let mut namerds = self.namerds;
for _ in 0..namerds.len() {
let f = namerds.pop_front().unwrap().load(handle.clone())?;
let (_, f) = namerds.pop_front().unwrap().load(handle.clone())?;
running.register(f.map_err(|_| io::ErrorKind::Other.into()));
}
}
Expand Down Expand Up @@ -163,19 +163,19 @@ impl Loader for Admin {
});
running.register(srv);
}
Ok(running)
Ok((self.addr, running))
}
}


struct Namerd {
config: NamerdConfig,
sender: mpsc::Sender<Vec<WeightedAddr>>,
metrics: tacho::Metrics,
pub struct Namerd {
pub config: NamerdConfig,
pub sender: mpsc::Sender<Vec<WeightedAddr>>,
pub metrics: tacho::Metrics,
}
impl Loader for Namerd {
type Run = Box<Future<Item = (), Error = io::Error>>;
fn load(self, handle: Handle) -> io::Result<Self::Run> {
fn load(self, handle: Handle) -> io::Result<(SocketAddr, Self::Run)> {
let path = self.config.path;
let addr = self.config.addr;
let interval_secs = self.config.interval_secs.unwrap_or(DEFAULT_NAMERD_SECONDS);
Expand All @@ -194,7 +194,7 @@ impl Loader for Namerd {
let sink = self.sender.sink_map_err(|_| error!("sink error"));
addrs.forward(sink).map_err(|_| io::ErrorKind::Other.into()).map(|_| {})
};
Ok(Box::new(driver))
Ok((addr, Box::new(driver)))
}
}

Expand All @@ -203,29 +203,32 @@ pub struct Proxies {
}
impl Loader for Proxies {
type Run = Running;
fn load(self, handle: Handle) -> io::Result<Running> {
fn load(self, handle: Handle) -> io::Result<(SocketAddr, Running)> {
let mut running = Running::new();
let mut proxies = self.proxies;
let mut addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
for _ in 0..proxies.len() {
let p = proxies.pop_front().unwrap();
let f = p.load(handle.clone())?;
let (_addr, f) = p.load(handle.clone())?;
addr = _addr;
running.register(f);
}
Ok(running)
Ok((addr, running))
}
}

struct Proxy {
client: Option<ClientConfig>,
server: ProxyServer,
pub struct Proxy {
pub client: Option<ClientConfig>,
pub server: ProxyServer,
}
impl Loader for Proxy {
type Run = Running;
fn load(self, handle: Handle) -> io::Result<Running> {
fn load(self, handle: Handle) -> io::Result<(SocketAddr, Running)> {
match self.client.and_then(|c| c.tls) {
None => {
let conn = PlainConnector::new(handle.clone());
self.server.load(&handle, conn)
let f = self.server.load(&handle, conn).expect("b");
Ok(f)
}
Some(ref c) => {
let mut tls = rustls::ClientConfig::new();
Expand All @@ -238,29 +241,33 @@ impl Loader for Proxy {
}
};
let conn = SecureConnector::new(c.dns_name.clone(), tls, handle.clone());
self.server.load(&handle, conn)
let f = self.server.load(&handle, conn).expect("a");
Ok(f)
}
}
}
}

struct ProxyServer {
label: String,
servers: Vec<ServerConfig>,
addrs: Box<Stream<Item = Vec<WeightedAddr>, Error = ()>>,
buf: Rc<RefCell<Vec<u8>>>,
max_waiters: usize,
metrics: tacho::Metrics,
pub struct ProxyServer {
pub label: String,
pub servers: Vec<ServerConfig>,
pub addrs: Box<Stream<Item = Vec<WeightedAddr>, Error = ()>>,
pub buf: Rc<RefCell<Vec<u8>>>,
pub max_waiters: usize,
pub metrics: tacho::Metrics,
}
impl ProxyServer {
fn load<C>(self, handle: &Handle, conn: C) -> io::Result<Running>
fn load<C>(self, handle: &Handle, conn: C) -> io::Result<(SocketAddr, Running)>
where C: Connector + 'static
{
let addrs = self.addrs.map_err(|_| io::ErrorKind::Other.into());
let metrics = self.metrics.clone().labeled("proxy".into(), self.label.into());
let bal = Balancer::new(addrs, conn, self.buf.clone(), metrics.clone())
.into_shared(self.max_waiters, handle.clone());

// Placeholder for our local listening SocketAddr.
let mut local_addr: SocketAddr = "127.0.0.1:0".parse().expect("unable to parse addr");

// TODO scope/tag stats for servers.

let mut running = Running::new();
Expand All @@ -271,7 +278,9 @@ impl ProxyServer {
ServerConfig::Tcp { ref addr } => {
let metrics = metrics.clone().labeled("srv".into(), format!("{}", addr));
let acceptor = PlainAcceptor::new(handle, metrics);
let f = acceptor.accept(addr).forward(bal).map(|_| {});
let (bound_addr, forwarder) = acceptor.accept(addr);
local_addr = bound_addr;
let f = forwarder.forward(bal).map(|_| {});
running.register(f);
}
ServerConfig::Tls { ref addr,
Expand All @@ -287,12 +296,14 @@ impl ProxyServer {

let metrics = metrics.clone().labeled("srv".into(), format!("{}", addr));
let acceptor = SecureAcceptor::new(handle, tls, metrics);
let f = acceptor.accept(addr).forward(bal).map(|_| {});
let (bound_addr, forwarder) = acceptor.accept(addr);
local_addr = bound_addr;
let f = forwarder.forward(bal).map(|_| {});
running.register(f);
}
}
}
Ok(running)
Ok((local_addr, running))
}
}

Expand Down
18 changes: 15 additions & 3 deletions src/lb/balancer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ pub struct Balancer<A, C> {
retired: VecDeque<Endpoint>,

stats: Stats,

fail_fast_mode: bool,
}

impl<A, C> Balancer<A, C>
Expand All @@ -64,6 +66,7 @@ impl<A, C> Balancer<A, C>
ready: VecDeque::new(),
retired: VecDeque::new(),
stats: Stats::new(metrics),
fail_fast_mode: false,
}
}

Expand Down Expand Up @@ -140,6 +143,9 @@ impl<A, C> Balancer<A, C>
if let Async::Ready(addrs) = self.addrs.poll()? {
trace!("addr update");
let addrs = addrs.expect("addr stream must be infinite");
// If there are no addrs to route to, drop requests quickly.
// TODO: validate that fail_fast_mode is being disabled once addrs exist.
self.fail_fast_mode = addrs.is_empty();
let new = addr_weight_map(&addrs);
self.update_endpoints(&new);
}
Expand Down Expand Up @@ -357,12 +363,18 @@ impl<A, C> Sink for Balancer<A, C>
self.evict_retirees(&mut rec)?;
self.promote_unready(&mut rec)?;
self.discover_and_retire()?;
trace!("retrying {} unready={} ready={} retired={}",
trace!("retrying {} unready={} ready={} retired={} failfast={}",
src_addr,
self.unready.len(),
self.ready.len(),
self.retired.len());
self.dispatch(src, &mut rec)
self.retired.len(),
self.fail_fast_mode);
if self.fail_fast_mode {
trace!("in fail fast mode, dropping traffic");
Err(io::ErrorKind::Other.into())
} else {
self.dispatch(src, &mut rec)
}
}
};

Expand Down
29 changes: 19 additions & 10 deletions src/lb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,11 @@ impl WithAddr for Src {
}
}

/// Binds on `addr` and produces `U`-typed src connections.
/// Binds on `addr` and produces the bound `SocketAddr` and a` Stream` of `Src` connections.
pub trait Acceptor {
fn accept(&self, addr: &SocketAddr) -> Box<Stream<Item = Src, Error = io::Error>>;
fn accept(&self,
addr: &SocketAddr)
-> (SocketAddr, Box<Stream<Item = Src, Error = io::Error>>);
}

/// Establishes a `D`-typed connection to `addr`.
Expand All @@ -71,17 +73,20 @@ impl PlainAcceptor {
}
}
impl Acceptor for PlainAcceptor {
fn accept(&self, addr: &SocketAddr) -> Box<Stream<Item = Src, Error = io::Error>> {
fn accept(&self,
addr: &SocketAddr)
-> (SocketAddr, Box<Stream<Item = Src, Error = io::Error>>) {
let metrics = self.metrics.clone();
let connects_key = self.connects_key.clone();
TcpListener::bind(addr, &self.handle)
.unwrap()
.incoming()
let listener = TcpListener::bind(addr, &self.handle).expect("could not bind to address");
let local_addr = listener.local_addr().expect("could not get local_addr from listener");
let worker = listener.incoming()
.map(move |(s, a)| {
metrics.recorder().incr(&connects_key, 1);
Src(Socket::plain(a, s))
})
.boxed()
.boxed();
(local_addr, worker)
}
}

Expand Down Expand Up @@ -119,9 +124,13 @@ impl SecureAcceptor {
}
}
impl Acceptor for SecureAcceptor {
fn accept(&self, addr: &SocketAddr) -> Box<Stream<Item = Src, Error = io::Error>> {
fn accept(&self,
addr: &SocketAddr)
-> (SocketAddr, Box<Stream<Item = Src, Error = io::Error>>) {
let tls = self.config.clone();
let l = TcpListener::bind(addr, &self.handle).unwrap();
let l = TcpListener::bind(addr, &self.handle)
.expect("could not bind listener for SecureAcceptor");
let local_addr = l.local_addr().expect("could not get local_addr from listener");

let metrics = self.metrics.clone();
let connects_key = self.connects_key.clone();
Expand All @@ -143,7 +152,7 @@ impl Acceptor for SecureAcceptor {
}
}
});
Box::new(srcs)
(local_addr, Box::new(srcs))
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/lb/proxy_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use tokio_io::AsyncWrite;
/// A future representing reading all data from one side of a proxy connection and writing
/// it to another.
///
/// In the typical case, nothing allocations are required. If the write side exhibits
/// backpressure, however, a buffer is allocated to
/// In the typical case, no allocations are required. If the write side exhibits
/// backpressure, however, a buffer is allocated.
pub struct ProxyStream {
reader: Rc<RefCell<Socket>>,
writer: Rc<RefCell<Socket>>,
Expand Down
7 changes: 5 additions & 2 deletions src/namerd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ fn request<C: Connect>(client: Rc<Client<C>>, url: Url, stats: Stats) -> AddrsFu
}
}
Err(e) => {
error!("failed to read response: {}", e);
error!("failed to read response from remote namerd: {}", e);
future::ok(None).boxed()
}
})
Expand Down Expand Up @@ -136,9 +136,10 @@ fn parse_chunks(chunks: &[Chunk]) -> Option<Vec<::WeightedAddr>> {
let result: json::Result<NamerdResponse> = json::from_reader(r);
match result {
Ok(ref nrsp) if nrsp.kind == "bound" => Some(to_weighted_addrs(&nrsp.addrs)),
Ok(ref nrsp) if nrsp.kind == "neg" => Some(vec![]),
Ok(_) => Some(vec![]),
Err(e) => {
info!("error parsing response: {}", e);
error!("error parsing response: {}", e);
None
}
}
Expand All @@ -159,7 +160,9 @@ fn to_weighted_addrs(namerd_addrs: &[NamerdAddr]) -> Vec<::WeightedAddr> {
struct NamerdResponse {
#[serde(rename = "type")]
kind: String,
#[serde(default)]
addrs: Vec<NamerdAddr>,
#[serde(default)]
meta: HashMap<String, String>,
}

Expand Down
13 changes: 13 additions & 0 deletions tests/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#[cfg(tests)]
mod tests;
extern crate log;

extern crate env_logger;
extern crate futures;
extern crate hyper;
extern crate tokio_core;
extern crate tokio_io;
extern crate linkerd_tcp;

mod mocks;
pub use mocks::MockNamerd;
Loading

0 comments on commit 38fc35f

Please sign in to comment.