Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement high contention allocator #122

Merged
merged 7 commits into from
Jun 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions foundationdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ lazy_static = "1.0"
byteorder = "1.2"
log = "0.4"
uuid = { version = "0.7", optional = true }
rand = "0.6.5"

[dev-dependencies]
rand = "0.6"
220 changes: 220 additions & 0 deletions foundationdb/src/hca.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
// Copyright 2018 foundationdb-rs developers, https://github.com/bluejekyll/foundationdb-rs/graphs/contributors
// Copyright 2013-2018 Apple, Inc and the FoundationDB project authors.
//
// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
// http://opensource.org/licenses/MIT>, at your option. This file may not be
// copied, modified, or distributed except according to those terms.

//! The directory layer offers subspace indirection, where logical application subspaces are mapped to short, auto-generated key prefixes. This prefix assignment is done by the High Contention Allocator, which allows many clients to allocate short directory prefixes efficiently.
//!
//! The allocation process works over candidate value windows. It uses two subspaces to operate, the "counters" subspace and "recents" subspace (derived from the subspace used to create the HCA).
//!
//! "counters" contains a single key : "counters : window_start", whose value is the number of allocations in the current window. "window_start" is an integer that marks the lower bound of values that can be assigned from the current window.
//! "recents" can contain many keys : "recents : <candidate>", where each "candidate" is an integer that has been assigned to some client
//!
//! Assignment has two stages that are executed in a loop until they both succeed.
//!
//! 1. Find the current window. The client scans "counters : *" to get the current "window_start" and how many allocations have been made in the current window.
//! If the window is more than half-full (using pre-defined window sizes), the window is advanced: "counters : *" and "recents : *" are both cleared, and a new "counters : window_start + window_size" key is created with a value of 0. (1) is retried
//! If the window still has space, it moves to (2).
//!
//! 2. Find a candidate value inside that window. The client picks a candidate number between "[window_start, window_start + window_size)" and tries to set the key "recents : <candidate>".
//! If the write succeeds, the candidate is returned as the allocated value. Success!
//! If the write fails because the window has been advanced, it repeats (1).
//! If the write fails because the value was already set, it repeats (2).

use std::sync::Mutex;

use byteorder::ByteOrder;
use futures::future::Future;
use futures::stream::Stream;
use rand::Rng;

use error::Error;
use keyselector::KeySelector;
use options::{ConflictRangeType, MutationType, TransactionOption};
use subspace::Subspace;
use transaction::{RangeOptionBuilder, Transaction};
use tuple::Element;

const ONE_BYTES: &[u8] = &[0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00];

/// Represents a High Contention Allocator for a given subspace
#[derive(Debug)]
pub struct HighContentionAllocator {
counters: Subspace,
recent: Subspace,
allocation_mutex: Mutex<()>,
}

impl HighContentionAllocator {
/// Constructs an allocator that will use the input subspace for assigning values.
/// The given subspace should not be used by anything other than the allocator
pub fn new(subspace: Subspace) -> HighContentionAllocator {
HighContentionAllocator {
counters: subspace.subspace(0),
recent: subspace.subspace(1),
allocation_mutex: Mutex::new(()),
}
}

/// Returns a byte string that
/// 1) has never and will never be returned by another call to this method on the same subspace
/// 2) is nearly as short as possible given the above
pub fn allocate(&self, transaction: &mut Transaction) -> Result<i64, Error> {
let (begin, end) = self.counters.range();

loop {
let counters_begin = KeySelector::first_greater_or_equal(&begin);
let counters_end = KeySelector::first_greater_than(&end);
let range_option = RangeOptionBuilder::new(counters_begin, counters_end)
.reverse(true)
.limit(1)
.snapshot(true)
.build();

let kvs: Vec<i64> = transaction
.get_ranges(range_option)
.map_err(|(_, e)| e)
.fold(Vec::new(), move |mut out, range_result| {
let kvs = range_result.key_values();

for kv in kvs.as_ref() {
if let Element::I64(counter) = self.counters.unpack(kv.key())? {
out.push(counter);
}
}

Ok::<_, Error>(out)
})
.wait()?;

let mut start: i64 = 0;
let mut window: i64;

if kvs.len() == 1 {
start = kvs[0];
}

let mut window_advanced = false;

loop {
let mutex_guard = self.allocation_mutex.lock().unwrap();

if window_advanced {
transaction
.clear_range(self.counters.bytes(), self.counters.subspace(start).bytes());
transaction.set_option(TransactionOption::NextWriteNoWriteConflictRange)?;
transaction
.clear_range(self.recent.bytes(), self.recent.subspace(start).bytes());
}

let counters_subspace_with_start = self.counters.subspace(start);

// Increment the allocation count for the current window
transaction.atomic_op(
counters_subspace_with_start.bytes(),
ONE_BYTES,
MutationType::Add,
);

let subspace_start_trx = transaction
.get(counters_subspace_with_start.bytes(), true)
.wait()?;
let count = byteorder::LittleEndian::read_i64(subspace_start_trx.value().unwrap());

drop(mutex_guard);

window = HighContentionAllocator::window_size(start);

if count * 2 < window {
break;
}

start += window;
window_advanced = true;
}

loop {
// As of the snapshot being read from, the window is less than half
// full, so this should be expected to take 2 tries. Under high
// contention (and when the window advances), there is an additional
// subsequent risk of conflict for this transaction.
let mut rng = rand::thread_rng();

let candidate: i64 = rng.gen_range(0, window) + start;
let recent_subspace_for_candidate = self.recent.subspace(candidate);
let candidate_subspace = recent_subspace_for_candidate.bytes();

let mutex_guard = self.allocation_mutex.lock().unwrap();

let counters_begin = KeySelector::first_greater_or_equal(&begin);
let counters_end = KeySelector::first_greater_than(&end);
let range_option = RangeOptionBuilder::new(counters_begin, counters_end)
.reverse(true)
.limit(1)
.snapshot(true)
.build();

let kvs: Vec<i64> = transaction
.get_ranges(range_option)
.map_err(|(_, e)| e)
.fold(Vec::new(), move |mut out, range_result| {
let kvs = range_result.key_values();

for kv in kvs.as_ref() {
if let Element::I64(counter) = self.counters.unpack(kv.key())? {
out.push(counter);
}
}

Ok::<_, Error>(out)
})
.wait()?;

let candidate_value_trx = transaction.get(candidate_subspace, false);

transaction.set_option(TransactionOption::NextWriteNoWriteConflictRange)?;
transaction.set(candidate_subspace, &[]);

drop(mutex_guard);

if !kvs.is_empty() {
let current_start = kvs[0];

if current_start > start {
break;
}
}

let candidate_value = candidate_value_trx.wait()?;

match candidate_value.value() {
Some(_) => (),
None => {
transaction.add_conflict_range(
candidate_subspace,
candidate_subspace,
ConflictRangeType::Write,
)?;
return Ok::<_, Error>(candidate);
}
};
}
}
}

fn window_size(start: i64) -> i64 {
// Larger window sizes are better for high contention, smaller sizes for
// keeping the keys small. But if there are many allocations, the keys
// can't be too small. So start small and scale up. We don't want this to
// ever get *too* big because we have to store about window_size/2 recent
// items.
match start {
_ if start < 255 => 64,
_ if start < 65535 => 1024,
_ => 8192,
}
}
}
6 changes: 5 additions & 1 deletion foundationdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ extern crate byteorder;
extern crate foundationdb_sys;
#[macro_use]
extern crate futures;
extern crate core;
extern crate lazy_static;
extern crate rand;
#[cfg(feature = "uuid")]
extern crate uuid;

Expand All @@ -126,12 +129,13 @@ pub mod database;
pub mod error;
pub mod fdb_api;
pub mod future;
pub mod hca;
pub mod keyselector;
pub mod network;
/// Generated configuration types for use with the various `set_option` functions
#[allow(missing_docs)]
pub mod options;
mod subspace;
pub mod subspace;
pub mod transaction;
pub mod tuple;

Expand Down
5 changes: 4 additions & 1 deletion foundationdb/src/subspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
// http://opensource.org/licenses/MIT>, at your option. This file may not be
// copied, modified, or distributed except according to those terms.

//! Implements the FDB Subspace Layer

use tuple::{Decode, Encode, Error, Result};

/// Represents a well-defined region of keyspace in a FoundationDB database
Expand Down Expand Up @@ -99,9 +101,10 @@ impl Subspace {

#[cfg(test)]
mod tests {
use super::*;
use tuple::Tuple;

use super::*;

#[test]
fn sub() {
let ss0: Subspace = 1.into();
Expand Down
117 changes: 117 additions & 0 deletions foundationdb/tests/hca.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright 2018 foundationdb-rs developers, https://github.com/bluejekyll/foundationdb-rs/graphs/contributors
//
// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
// http://opensource.org/licenses/MIT>, at your option. This file may not be
// copied, modified, or distributed except according to those terms.

extern crate foundationdb;
extern crate futures;
#[macro_use]
extern crate lazy_static;

use std::collections::HashSet;
use std::iter::FromIterator;

use futures::future::*;

use foundationdb::error::Error;
use foundationdb::hca::HighContentionAllocator;
use foundationdb::*;

mod common;

#[test]
fn test_hca_many_sequential_allocations() {
common::setup_static();
const N: usize = 6000;
const KEY: &[u8] = b"test-hca-allocate";

let fut = Cluster::new(foundationdb::default_config_path())
.and_then(|cluster| cluster.create_database())
.and_then(|db: Database| {
let cleared_range = db
.transact(move |tx| {
tx.clear_subspace_range(Subspace::from_bytes(KEY));
futures::future::result(Ok::<(), failure::Error>(()))
})
.wait();

cleared_range.expect("unable to clear hca test range");

let hca = HighContentionAllocator::new(Subspace::from_bytes(KEY));

let mut all_ints = Vec::new();

for _ in 0..N {
let mut tx: Transaction = db.create_trx()?;

let next_int: i64 = hca.allocate(&mut tx)?;
all_ints.push(next_int);

tx.commit().wait()?;
}

Ok::<_, Error>(all_ints)
});

let all_ints: Vec<i64> = fut.wait().expect("failed to run");
check_hca_result_uniqueness(&all_ints);

eprintln!("ran test {:?}", all_ints);
}

#[test]
fn test_hca_concurrent_allocations() {
common::setup_static();
const N: usize = 1000;
const KEY: &[u8] = b"test-hca-allocate-concurrent";

let fut = Cluster::new(foundationdb::default_config_path())
.and_then(|cluster| cluster.create_database())
.and_then(|db: Database| {
let cleared_range = db
.transact(move |tx| {
tx.clear_subspace_range(Subspace::from_bytes(KEY));
futures::future::result(Ok::<(), failure::Error>(()))
})
.wait();

cleared_range.expect("unable to clear hca test range");

let mut futures = Vec::new();
let mut all_ints: Vec<i64> = Vec::new();

for _ in 0..N {
let f = db.transact(move |mut tx| {
HighContentionAllocator::new(Subspace::from_bytes(KEY)).allocate(&mut tx)
});

futures.push(f);
}

for allocation in futures {
let i = allocation.wait().expect("unable to get allocation");
all_ints.push(i);
}

Ok::<_, Error>(all_ints)
});

let all_ints: Vec<i64> = fut.wait().expect("failed to run");
check_hca_result_uniqueness(&all_ints);

eprintln!("ran test {:?}", all_ints);
}

fn check_hca_result_uniqueness(results: &Vec<i64>) {
let result_set: HashSet<i64> = HashSet::from_iter(results.clone());

if results.len() != result_set.len() {
panic!(
"Set size does not much, got duplicates from HCA. Set: {:?}, List: {:?}",
result_set.len(),
results.len(),
);
}
}