From 792bd9a089c7a5a2b8645606ec50a5230a426fda Mon Sep 17 00:00:00 2001 From: Paul Hemberger Date: Fri, 24 May 2019 21:29:33 -0400 Subject: [PATCH 1/7] hca outline --- foundationdb/src/hca.rs | 240 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 240 insertions(+) create mode 100644 foundationdb/src/hca.rs diff --git a/foundationdb/src/hca.rs b/foundationdb/src/hca.rs new file mode 100644 index 00000000..44f26804 --- /dev/null +++ b/foundationdb/src/hca.rs @@ -0,0 +1,240 @@ +// Copyright 2018 foundationdb-rs developers, https://github.com/bluejekyll/foundationdb-rs/graphs/contributors +// Copyright 2013-2018 Apple, Inc and the FoundationDB project authors. +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +//! Most functions in the FoundationDB API are asynchronous, meaning that they may return to the caller before actually delivering their result. +//! +//! These functions always return FDBFuture*. An FDBFuture object represents a result value or error to be delivered at some future time. You can wait for a Future to be “ready” – to have a value or error delivered – by setting a callback function, or by blocking a thread, or by polling. Once a Future is ready, you can extract either an error code or a value of the appropriate type (the documentation for the original function will tell you which fdb_future_get_*() function you should call). +//! +//! Futures make it easy to do multiple operations in parallel, by calling several asynchronous functions before waiting for any of the results. This can be important for reducing the latency of transactions. +//! +//! The Rust API Client has been implemented to use the Rust futures crate, and should work within that ecosystem (suchas Tokio). See Rust [futures](https://docs.rs/crate/futures/0.1.21) documentation. + +use std::io::Bytes; +use std::sync::Mutex; + +use byteorder::ByteOrder; +use futures::future::{Future, IntoFuture}; +use futures::stream::Stream; +use rand::{random, Rng}; + +use error::{self, Error, Result}; +use future::{KeyValue, KeyValues}; +use options::{ConflictRangeType, MutationType, TransactionOption}; +use subspace::Subspace; +use transaction::{GetRangeResult, RangeOption, RangeOptionBuilder, RangeStream, Transaction, TrxGetRange}; +use tuple::{Decode, Element, Tuple}; + +lazy_static! { + static ref LOCK: Mutex = Mutex::new(0); +} + +static ONE_BYTES : &[u8] = &[0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]; + +/// Represents a well-defined region of keyspace in a FoundationDB database +/// +/// It provides a convenient way to use FoundationDB tuples to define namespaces for +/// different categories of data. The namespace is specified by a prefix tuple which is prepended +/// to all tuples packed by the subspace. When unpacking a key with the subspace, the prefix tuple +/// will be removed from the result. +/// +/// As a best practice, API clients should use at least one subspace for application data. For +/// general guidance on subspace usage, see the Subspaces section of the [Developer Guide]. +/// +/// [Developer Guide]: https://apple.github.io/foundationdb/developer-guide.html#subspaces +/// +/// +#[derive(Debug, Clone)] +pub struct HighContentionAllocator { + counters: Subspace, + recent: Subspace, +} + +impl HighContentionAllocator { + fn new(subspace: Subspace) -> HighContentionAllocator { + HighContentionAllocator { + counters: subspace.subspace(0), + recent: subspace.subspace(1), + } + } + + fn window_size(start : i64) -> i64 { + // Larger window sizes are better for high contention, smaller sizes for + // keeping the keys small. But if there are many allocations, the keys + // can't be too small. So start small and scale up. We don't want this to + // ever get *too* big because we have to store about window_size/2 recent + // items. + if start < 255 { + return 64 + } + if start < 65535 { + return 1024 + } + return 8192 + } + + fn allocate( + &self, + transaction: &mut Transaction, + subspace: &Subspace, + ) -> Result{ + let range_option = RangeOptionBuilder::from(self.counters.range()) + .reverse(true) + .limit(1) + .snapshot(true) + .build(); + + let kvs : Vec = transaction.get_ranges(range_option) + .map_err(|(_, e)| e) + .filter_map(|range_result| { + for kv in range_result.key_values().as_ref() { + if let Element::I64(counter) = self.counters.unpack(kv.key()).expect("hello") { + return Some(counter); + } + } + + return None; + }) + .collect() + .wait() + .expect("failed to fetch HCA counter range"); + + let mut start : i64 = 0; + let mut window : i64 = 0; + + if kvs.len() == 1 { + start = kvs[0]; + } + + let mut window_advanced = false; + + loop { + let mutex_guard = LOCK.lock().unwrap(); + + if window_advanced { + transaction.clear_range(self.counters.bytes(), self.counters.subspace(start).bytes()); + transaction.set_option(TransactionOption::NextWriteNoWriteConflictRange); + transaction.clear_range(self.recent.bytes(), self.recent.subspace(start).bytes()); + } + + let counters_subspace_with_start = self.counters.subspace(start); + + // Increment the allocation count for the current window + transaction.atomic_op(counters_subspace_with_start.bytes(), ONE_BYTES, MutationType::Add); + + let get_result = transaction.get(counters_subspace_with_start.bytes(), true) + .wait() + .expect("get request failed"); + + let count : i64 = match get_result.value() { + Some(x) => byteorder::LittleEndian::read_i64(x), + None => 2 // return failure + }; + + drop(mutex_guard); + + window = HighContentionAllocator::window_size(start); + + if count * 2 < window { + break + } + + start += window; + window_advanced = true; + } + + loop { + // As of the snapshot being read from, the window is less than half + // full, so this should be expected to take 2 tries. Under high + // contention (and when the window advances), there is an additional + // subsequent risk of conflict for this transaction. + let mut rng = rand::thread_rng(); + + let candidate = rng.gen::() + start; + let key = self.recent.subspace(candidate); + let key_bytes = key.bytes(); + + let mutex_guard = LOCK.lock().unwrap(); + + let range_option = RangeOptionBuilder::from(self.counters.range()) + .reverse(true) + .limit(1) + .snapshot(true) + .build(); + + let kvs : Vec = transaction.get_ranges(range_option) + .map_err(|(_, e)| e) + .filter_map(|range_result| { + for kv in range_result.key_values().as_ref() { + if let Element::I64(counter) = self.counters.unpack(kv.key()).expect("hello") { + return Some(counter); + } + } + + return None; + }) + .collect() + .wait() + .expect("failed to fetch HCA counter range"); + + let candidate_value_trx = transaction.get(key_bytes, false); + + transaction.set_option(TransactionOption::NextWriteNoWriteConflictRange); + transaction.set(key_bytes, &[]); + + drop(mutex_guard); + + if kvs.len() > 0 { + let current_start = kvs[0]; + + if current_start > start { + break + } + } + + let candidate_value = candidate_value_trx.wait().expect("unable to get candidate value"); + + match candidate_value.value() { + Some(x) => { + continue + }, + None => { + transaction.add_conflict_range(key_bytes, key_bytes, ConflictRangeType::Write); + // return subspace + } + }; + + /* + kvs, e = latestCounter.GetSliceWithError() + if e != nil { + return nil, e + } + if len(kvs) > 0 { + t, e := hca.counters.Unpack(kvs[0].Key) + if e != nil { + return nil, e + } + currentStart := t[0].(int64) + if currentStart > start { + break + } + } + + v, e := candidateValue.Get() + if e != nil { + return nil, e + } + if v == nil { + tr.AddWriteConflictKey(key) + return s.Sub(candidate), nil + } + */ + + } + + } +} From e0a4a314c007bb504d4762dc589c818da3337cb8 Mon Sep 17 00:00:00 2001 From: Paul Hemberger Date: Fri, 24 May 2019 21:27:57 -0400 Subject: [PATCH 2/7] lib updates + tweaks to hca --- foundationdb/Cargo.toml | 1 + foundationdb/src/hca.rs | 280 ++++++++++++++++------------------------ foundationdb/src/lib.rs | 9 +- 3 files changed, 123 insertions(+), 167 deletions(-) diff --git a/foundationdb/Cargo.toml b/foundationdb/Cargo.toml index 5c61235a..a9c2d699 100644 --- a/foundationdb/Cargo.toml +++ b/foundationdb/Cargo.toml @@ -38,6 +38,7 @@ lazy_static = "1.0" byteorder = "1.2" log = "0.4" uuid = { version = "0.7", optional = true } +rand = "0.6.5" [dev-dependencies] rand = "0.6" diff --git a/foundationdb/src/hca.rs b/foundationdb/src/hca.rs index 44f26804..2248ece1 100644 --- a/foundationdb/src/hca.rs +++ b/foundationdb/src/hca.rs @@ -6,47 +6,29 @@ // http://opensource.org/licenses/MIT>, at your option. This file may not be // copied, modified, or distributed except according to those terms. -//! Most functions in the FoundationDB API are asynchronous, meaning that they may return to the caller before actually delivering their result. +//! Docs! //! -//! These functions always return FDBFuture*. An FDBFuture object represents a result value or error to be delivered at some future time. You can wait for a Future to be “ready” – to have a value or error delivered – by setting a callback function, or by blocking a thread, or by polling. Once a Future is ready, you can extract either an error code or a value of the appropriate type (the documentation for the original function will tell you which fdb_future_get_*() function you should call). -//! -//! Futures make it easy to do multiple operations in parallel, by calling several asynchronous functions before waiting for any of the results. This can be important for reducing the latency of transactions. -//! -//! The Rust API Client has been implemented to use the Rust futures crate, and should work within that ecosystem (suchas Tokio). See Rust [futures](https://docs.rs/crate/futures/0.1.21) documentation. -use std::io::Bytes; use std::sync::Mutex; use byteorder::ByteOrder; -use futures::future::{Future, IntoFuture}; +use futures::future::Future; use futures::stream::Stream; -use rand::{random, Rng}; +use rand::Rng; -use error::{self, Error, Result}; -use future::{KeyValue, KeyValues}; +use error::Error; use options::{ConflictRangeType, MutationType, TransactionOption}; use subspace::Subspace; -use transaction::{GetRangeResult, RangeOption, RangeOptionBuilder, RangeStream, Transaction, TrxGetRange}; -use tuple::{Decode, Element, Tuple}; +use transaction::{RangeOptionBuilder, Transaction}; +use tuple::Element; lazy_static! { static ref LOCK: Mutex = Mutex::new(0); } -static ONE_BYTES : &[u8] = &[0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]; +const ONE_BYTES: &[u8] = &[0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]; -/// Represents a well-defined region of keyspace in a FoundationDB database -/// -/// It provides a convenient way to use FoundationDB tuples to define namespaces for -/// different categories of data. The namespace is specified by a prefix tuple which is prepended -/// to all tuples packed by the subspace. When unpacking a key with the subspace, the prefix tuple -/// will be removed from the result. -/// -/// As a best practice, API clients should use at least one subspace for application data. For -/// general guidance on subspace usage, see the Subspaces section of the [Developer Guide]. -/// -/// [Developer Guide]: https://apple.github.io/foundationdb/developer-guide.html#subspaces -/// +/// High Contention Allocator /// #[derive(Debug, Clone)] pub struct HighContentionAllocator { @@ -62,115 +44,23 @@ impl HighContentionAllocator { } } - fn window_size(start : i64) -> i64 { - // Larger window sizes are better for high contention, smaller sizes for - // keeping the keys small. But if there are many allocations, the keys - // can't be too small. So start small and scale up. We don't want this to - // ever get *too* big because we have to store about window_size/2 recent - // items. - if start < 255 { - return 64 - } - if start < 65535 { - return 1024 - } - return 8192 - } - fn allocate( &self, transaction: &mut Transaction, subspace: &Subspace, - ) -> Result{ - let range_option = RangeOptionBuilder::from(self.counters.range()) - .reverse(true) - .limit(1) - .snapshot(true) - .build(); - - let kvs : Vec = transaction.get_ranges(range_option) - .map_err(|(_, e)| e) - .filter_map(|range_result| { - for kv in range_result.key_values().as_ref() { - if let Element::I64(counter) = self.counters.unpack(kv.key()).expect("hello") { - return Some(counter); - } - } - - return None; - }) - .collect() - .wait() - .expect("failed to fetch HCA counter range"); - - let mut start : i64 = 0; - let mut window : i64 = 0; - - if kvs.len() == 1 { - start = kvs[0]; - } - - let mut window_advanced = false; - - loop { - let mutex_guard = LOCK.lock().unwrap(); - - if window_advanced { - transaction.clear_range(self.counters.bytes(), self.counters.subspace(start).bytes()); - transaction.set_option(TransactionOption::NextWriteNoWriteConflictRange); - transaction.clear_range(self.recent.bytes(), self.recent.subspace(start).bytes()); - } - - let counters_subspace_with_start = self.counters.subspace(start); - - // Increment the allocation count for the current window - transaction.atomic_op(counters_subspace_with_start.bytes(), ONE_BYTES, MutationType::Add); - - let get_result = transaction.get(counters_subspace_with_start.bytes(), true) - .wait() - .expect("get request failed"); - - let count : i64 = match get_result.value() { - Some(x) => byteorder::LittleEndian::read_i64(x), - None => 2 // return failure - }; - - drop(mutex_guard); - - window = HighContentionAllocator::window_size(start); - - if count * 2 < window { - break - } - - start += window; - window_advanced = true; - } - + ) -> Result { loop { - // As of the snapshot being read from, the window is less than half - // full, so this should be expected to take 2 tries. Under high - // contention (and when the window advances), there is an additional - // subsequent risk of conflict for this transaction. - let mut rng = rand::thread_rng(); - - let candidate = rng.gen::() + start; - let key = self.recent.subspace(candidate); - let key_bytes = key.bytes(); - - let mutex_guard = LOCK.lock().unwrap(); - let range_option = RangeOptionBuilder::from(self.counters.range()) .reverse(true) .limit(1) .snapshot(true) .build(); - let kvs : Vec = transaction.get_ranges(range_option) + let kvs: Vec = transaction.get_ranges(range_option) .map_err(|(_, e)| e) .filter_map(|range_result| { for kv in range_result.key_values().as_ref() { - if let Element::I64(counter) = self.counters.unpack(kv.key()).expect("hello") { + if let Element::I64(counter) = self.counters.unpack(kv.key()).expect("unable to unpack counter key") { return Some(counter); } } @@ -178,63 +68,121 @@ impl HighContentionAllocator { return None; }) .collect() - .wait() - .expect("failed to fetch HCA counter range"); + .wait()?; + + let mut start: i64 = 0; + let mut window: i64 = 0; + + if kvs.len() == 1 { + start = kvs[0]; + } - let candidate_value_trx = transaction.get(key_bytes, false); + let mut window_advanced = false; - transaction.set_option(TransactionOption::NextWriteNoWriteConflictRange); - transaction.set(key_bytes, &[]); + loop { + let mutex_guard = LOCK.lock().unwrap(); - drop(mutex_guard); + if window_advanced { + transaction.clear_range(self.counters.bytes(), self.counters.subspace(start).bytes()); + transaction.set_option(TransactionOption::NextWriteNoWriteConflictRange)?; + transaction.clear_range(self.recent.bytes(), self.recent.subspace(start).bytes()); + } + + let counters_subspace_with_start = self.counters.subspace(start); + + // Increment the allocation count for the current window + transaction.atomic_op(counters_subspace_with_start.bytes(), ONE_BYTES, MutationType::Add); + + let subspace_start_trx = transaction.get(counters_subspace_with_start.bytes(), true).wait()?; + let count = byteorder::LittleEndian::read_i64(subspace_start_trx.value().unwrap()); + + drop(mutex_guard); - if kvs.len() > 0 { - let current_start = kvs[0]; + window = HighContentionAllocator::window_size(start); - if current_start > start { - break + if count * 2 < window { + break; } + + start += window; + window_advanced = true; } - let candidate_value = candidate_value_trx.wait().expect("unable to get candidate value"); + loop { + // As of the snapshot being read from, the window is less than half + // full, so this should be expected to take 2 tries. Under high + // contention (and when the window advances), there is an additional + // subsequent risk of conflict for this transaction. + let mut rng = rand::thread_rng(); + + let candidate = self.recent.subspace(rng.gen::() + start); + let candidate_subspace = candidate.bytes(); + + let mutex_guard = LOCK.lock().unwrap(); + + let range_option = RangeOptionBuilder::from(self.counters.range()) + .reverse(true) + .limit(1) + .snapshot(true) + .build(); + + let kvs: Vec = transaction.get_ranges(range_option) + .map_err(|(_, e)| e) + .filter_map(|range_result| { + for kv in range_result.key_values().as_ref() { + if let Element::I64(counter) = self.counters.unpack(kv.key()).expect("unable to unpack counter key") { + return Some(counter); + } + } + return None; + }) + .collect() + .wait()?; + + let candidate_value_trx = transaction.get(candidate_subspace, false); + + transaction.set_option(TransactionOption::NextWriteNoWriteConflictRange)?; + transaction.set(candidate_subspace, &[]); - match candidate_value.value() { - Some(x) => { - continue - }, - None => { - transaction.add_conflict_range(key_bytes, key_bytes, ConflictRangeType::Write); - // return subspace + drop(mutex_guard); + + if kvs.len() > 0 { + let current_start = kvs[0]; + + if current_start > start { + break; + } } - }; - - /* - kvs, e = latestCounter.GetSliceWithError() - if e != nil { - return nil, e - } - if len(kvs) > 0 { - t, e := hca.counters.Unpack(kvs[0].Key) - if e != nil { - return nil, e - } - currentStart := t[0].(int64) - if currentStart > start { - break - } - } - - v, e := candidateValue.Get() - if e != nil { - return nil, e - } - if v == nil { - tr.AddWriteConflictKey(key) - return s.Sub(candidate), nil - } - */ + let candidate_value = candidate_value_trx.wait()?; + + match candidate_value.value() { + Some(x) => { + if x.len() == 0 { + transaction.add_conflict_range(candidate_subspace, candidate_subspace, ConflictRangeType::Write)?; + return Ok(self.counters.subspace(candidate_subspace)); + } + } + None => { + continue; + } + }; + } } + } + fn window_size(start: i64) -> i64 { + // Larger window sizes are better for high contention, smaller sizes for + // keeping the keys small. But if there are many allocations, the keys + // can't be too small. So start small and scale up. We don't want this to + // ever get *too* big because we have to store about window_size/2 recent + // items. + if start < 255 { + return 64; + } + if start < 65535 { + return 1024; + } + return 8192; } } diff --git a/foundationdb/src/lib.rs b/foundationdb/src/lib.rs index 32dedead..537bc6e6 100644 --- a/foundationdb/src/lib.rs +++ b/foundationdb/src/lib.rs @@ -118,20 +118,27 @@ extern crate byteorder; extern crate foundationdb_sys; #[macro_use] extern crate futures; +extern crate core; #[cfg(feature = "uuid")] extern crate uuid; +#[macro_use] +extern crate lazy_static; +extern crate rand; pub mod cluster; pub mod database; pub mod error; pub mod fdb_api; pub mod future; +pub mod hca; pub mod keyselector; pub mod network; /// Generated configuration types for use with the various `set_option` functions #[allow(missing_docs)] pub mod options; -mod subspace; +/// TODO: @phemberger +#[allow(missing_docs)] +pub mod subspace; pub mod transaction; pub mod tuple; From 46de9e4699ff4023265a5608696f43a7137082dc Mon Sep 17 00:00:00 2001 From: Paul Hemberger Date: Fri, 24 May 2019 22:22:29 -0400 Subject: [PATCH 3/7] consider each key via fold --- foundationdb/src/hca.rs | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/foundationdb/src/hca.rs b/foundationdb/src/hca.rs index 2248ece1..eb6442d2 100644 --- a/foundationdb/src/hca.rs +++ b/foundationdb/src/hca.rs @@ -58,16 +58,17 @@ impl HighContentionAllocator { let kvs: Vec = transaction.get_ranges(range_option) .map_err(|(_, e)| e) - .filter_map(|range_result| { - for kv in range_result.key_values().as_ref() { + .fold(Vec::new(), move |mut out, range_result| { + let kvs = range_result.key_values(); + + for kv in kvs.as_ref() { if let Element::I64(counter) = self.counters.unpack(kv.key()).expect("unable to unpack counter key") { - return Some(counter); + out.push(counter); } } - return None; + Ok::<_, Error>(out) }) - .collect() .wait()?; let mut start: i64 = 0; @@ -128,15 +129,17 @@ impl HighContentionAllocator { let kvs: Vec = transaction.get_ranges(range_option) .map_err(|(_, e)| e) - .filter_map(|range_result| { - for kv in range_result.key_values().as_ref() { + .fold(Vec::new(), move |mut out, range_result| { + let kvs = range_result.key_values(); + + for kv in kvs.as_ref() { if let Element::I64(counter) = self.counters.unpack(kv.key()).expect("unable to unpack counter key") { - return Some(counter); + out.push(counter); } } - return None; + + Ok::<_, Error>(out) }) - .collect() .wait()?; let candidate_value_trx = transaction.get(candidate_subspace, false); From 39bf5a7c3d01eaaa3ca6e8abfc40a560476aefba Mon Sep 17 00:00:00 2001 From: Paul Hemberger Date: Wed, 29 May 2019 22:53:41 -0400 Subject: [PATCH 4/7] bahgawd it works --- foundationdb/tests/hca.rs | 84 +++++++++++++++++++++++++++++++++++ foundationdb/tests/range_2.rs | 63 ++++++++++++++++++++++++++ 2 files changed, 147 insertions(+) create mode 100644 foundationdb/tests/hca.rs create mode 100644 foundationdb/tests/range_2.rs diff --git a/foundationdb/tests/hca.rs b/foundationdb/tests/hca.rs new file mode 100644 index 00000000..bde296d7 --- /dev/null +++ b/foundationdb/tests/hca.rs @@ -0,0 +1,84 @@ +// Copyright 2018 foundationdb-rs developers, https://github.com/bluejekyll/foundationdb-rs/graphs/contributors +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +extern crate foundationdb; +extern crate futures; +#[macro_use] +extern crate lazy_static; + +use std::collections::HashSet; + +use futures::future::*; +use futures::prelude::*; + +use foundationdb::error::Error; +use foundationdb::hca::HighContentionAllocator; +use foundationdb::tuple::{Element, Tuple}; +use foundationdb::*; + +mod common; + +#[test] +fn test_allocate() { + use foundationdb::keyselector::KeySelector; + + common::setup_static(); + const N: usize = 100; + const KEY: &[u8] = b"h"; + + let fut = Cluster::new(foundationdb::default_config_path()) + .and_then(|cluster| cluster.create_database()) + // .and_then(|db| result(db.create_trx())) + .and_then(|db: Database| { + let hca = HighContentionAllocator::new(Subspace::from_bytes(KEY)); + + db.transact(move |tx| { + tx.clear_range(KEY, KEY); + futures::future::result(Ok::<(), failure::Error>(())) + }) + .wait(); + + let mut all_ints = Vec::new(); + + // let mut tx: Transaction = db.create_trx()?; + // tx.clear_subspace_range(Subspace::from_bytes(KEY)); + // tx.commit().wait()?; + + for _ in 0..N { + let mut tx: Transaction = db.create_trx()?; + + let next_int: i64 = hca.allocate(&mut tx)?; + println!("next: {:?}", next_int); + all_ints.push(next_int); + + tx.commit().wait(); + } + + Ok::<_, Error>(all_ints) + }); + // .and_then(|mut trx| { + // println!("starting to run"); + // let hca = HighContentionAllocator::new(Subspace::from_bytes(KEY)); + // + // trx.clear_range(KEY, KEY); + // + // let mut all_ints = Vec::new(); + // + // for _ in 0..N { + // + // let next_int : i64 = hca.allocate(&mut trx)?; + // all_ints.push(next_int); + //// println!("next: {:?}", next_int); + // } + // + // Ok::<_, Error>(all_ints) + // }); + + println!("running test"); + let all_ints: Vec = fut.wait().expect("failed to run"); + println!("ran test {:?}", all_ints); +} diff --git a/foundationdb/tests/range_2.rs b/foundationdb/tests/range_2.rs new file mode 100644 index 00000000..b02ecaff --- /dev/null +++ b/foundationdb/tests/range_2.rs @@ -0,0 +1,63 @@ +// Copyright 2018 foundationdb-rs developers, https://github.com/bluejekyll/foundationdb-rs/graphs/contributors +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +extern crate foundationdb; +extern crate futures; +#[macro_use] +extern crate lazy_static; + +use foundationdb::error::Error; +use foundationdb::*; +use futures::future::*; +use futures::prelude::*; + +mod common; + +#[test] +fn test_get_range() { + use foundationdb::keyselector::KeySelector; + + common::setup_static(); + const N: usize = 10000; + + let fut = Cluster::new(foundationdb::default_config_path()) + .and_then(|cluster| cluster.create_database()) + .and_then(|db| result(db.create_trx())) + .and_then(|trx| { + let key_begin = "test-range-"; + let key_end = "test-range."; + + trx.clear_range(key_begin.as_bytes(), key_end.as_bytes()); + + for _ in 0..N { + let key = format!("{}-{}", key_begin, common::random_str(10)); + let value = common::random_str(10); + trx.set(key.as_bytes(), value.as_bytes()); + } + + let begin = KeySelector::first_greater_or_equal(key_begin.as_bytes()); + let end = KeySelector::first_greater_than(key_end.as_bytes()); + let opt = transaction::RangeOptionBuilder::new(begin, end) + .reverse(true) + .build(); + + trx.get_ranges(opt) + .map_err(|(_opt, e)| e) + .fold(0, |count, item| { + let kvs = item.key_values(); + Ok::<_, Error>(count + kvs.as_ref().len()) + }) + .map(|count| { + if count != N { + panic!("count expected={}, found={}", N, count); + } + eprintln!("count: {:?}", count); + }) + }); + + fut.wait().expect("failed to run") +} From 67507d16598fe5a0884751947da5fff06d4a80db Mon Sep 17 00:00:00 2001 From: Paul Hemberger Date: Thu, 30 May 2019 23:29:29 -0400 Subject: [PATCH 5/7] working tests --- foundationdb/src/hca.rs | 87 +++++++++++++++----------- foundationdb/tests/hca.rs | 113 ++++++++++++++++++++++------------ foundationdb/tests/range_2.rs | 63 ------------------- 3 files changed, 125 insertions(+), 138 deletions(-) delete mode 100644 foundationdb/tests/range_2.rs diff --git a/foundationdb/src/hca.rs b/foundationdb/src/hca.rs index eb6442d2..95e7bdf9 100644 --- a/foundationdb/src/hca.rs +++ b/foundationdb/src/hca.rs @@ -17,13 +17,14 @@ use futures::stream::Stream; use rand::Rng; use error::Error; +use keyselector::KeySelector; use options::{ConflictRangeType, MutationType, TransactionOption}; use subspace::Subspace; use transaction::{RangeOptionBuilder, Transaction}; use tuple::Element; lazy_static! { - static ref LOCK: Mutex = Mutex::new(0); + static ref LOCK: Mutex = Mutex::new(0); } const ONE_BYTES: &[u8] = &[0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]; @@ -37,32 +38,38 @@ pub struct HighContentionAllocator { } impl HighContentionAllocator { - fn new(subspace: Subspace) -> HighContentionAllocator { + /// New HCA + pub fn new(subspace: Subspace) -> HighContentionAllocator { HighContentionAllocator { counters: subspace.subspace(0), recent: subspace.subspace(1), } } - fn allocate( - &self, - transaction: &mut Transaction, - subspace: &Subspace, - ) -> Result { + /// Returns a byte string that + /// 1) has never and will never be returned by another call to this + /// method on the same subspace + /// 2) is nearly as short as possible given the above + pub fn allocate(&self, transaction: &mut Transaction) -> Result { + let (begin, end) = self.counters.range(); + loop { - let range_option = RangeOptionBuilder::from(self.counters.range()) + let counters_begin = KeySelector::first_greater_or_equal(&begin); + let counters_end = KeySelector::first_greater_than(&end); + let range_option = RangeOptionBuilder::new(counters_begin, counters_end) .reverse(true) .limit(1) .snapshot(true) .build(); - let kvs: Vec = transaction.get_ranges(range_option) + let kvs: Vec = transaction + .get_ranges(range_option) .map_err(|(_, e)| e) .fold(Vec::new(), move |mut out, range_result| { let kvs = range_result.key_values(); for kv in kvs.as_ref() { - if let Element::I64(counter) = self.counters.unpack(kv.key()).expect("unable to unpack counter key") { + if let Element::I64(counter) = self.counters.unpack(kv.key())? { out.push(counter); } } @@ -72,7 +79,7 @@ impl HighContentionAllocator { .wait()?; let mut start: i64 = 0; - let mut window: i64 = 0; + let mut window: i64; if kvs.len() == 1 { start = kvs[0]; @@ -84,17 +91,25 @@ impl HighContentionAllocator { let mutex_guard = LOCK.lock().unwrap(); if window_advanced { - transaction.clear_range(self.counters.bytes(), self.counters.subspace(start).bytes()); + transaction + .clear_range(self.counters.bytes(), self.counters.subspace(start).bytes()); transaction.set_option(TransactionOption::NextWriteNoWriteConflictRange)?; - transaction.clear_range(self.recent.bytes(), self.recent.subspace(start).bytes()); + transaction + .clear_range(self.recent.bytes(), self.recent.subspace(start).bytes()); } let counters_subspace_with_start = self.counters.subspace(start); // Increment the allocation count for the current window - transaction.atomic_op(counters_subspace_with_start.bytes(), ONE_BYTES, MutationType::Add); - - let subspace_start_trx = transaction.get(counters_subspace_with_start.bytes(), true).wait()?; + transaction.atomic_op( + counters_subspace_with_start.bytes(), + ONE_BYTES, + MutationType::Add, + ); + + let subspace_start_trx = transaction + .get(counters_subspace_with_start.bytes(), true) + .wait()?; let count = byteorder::LittleEndian::read_i64(subspace_start_trx.value().unwrap()); drop(mutex_guard); @@ -116,24 +131,28 @@ impl HighContentionAllocator { // subsequent risk of conflict for this transaction. let mut rng = rand::thread_rng(); - let candidate = self.recent.subspace(rng.gen::() + start); - let candidate_subspace = candidate.bytes(); + let candidate: i64 = rng.gen_range(0, window) + start; + let recent_subspace_for_candidate = self.recent.subspace(candidate); + let candidate_subspace = recent_subspace_for_candidate.bytes(); let mutex_guard = LOCK.lock().unwrap(); - let range_option = RangeOptionBuilder::from(self.counters.range()) + let counters_begin = KeySelector::first_greater_or_equal(&begin); + let counters_end = KeySelector::first_greater_than(&end); + let range_option = RangeOptionBuilder::new(counters_begin, counters_end) .reverse(true) .limit(1) .snapshot(true) .build(); - let kvs: Vec = transaction.get_ranges(range_option) + let kvs: Vec = transaction + .get_ranges(range_option) .map_err(|(_, e)| e) .fold(Vec::new(), move |mut out, range_result| { let kvs = range_result.key_values(); for kv in kvs.as_ref() { - if let Element::I64(counter) = self.counters.unpack(kv.key()).expect("unable to unpack counter key") { + if let Element::I64(counter) = self.counters.unpack(kv.key())? { out.push(counter); } } @@ -149,7 +168,7 @@ impl HighContentionAllocator { drop(mutex_guard); - if kvs.len() > 0 { + if !kvs.is_empty() { let current_start = kvs[0]; if current_start > start { @@ -160,14 +179,14 @@ impl HighContentionAllocator { let candidate_value = candidate_value_trx.wait()?; match candidate_value.value() { - Some(x) => { - if x.len() == 0 { - transaction.add_conflict_range(candidate_subspace, candidate_subspace, ConflictRangeType::Write)?; - return Ok(self.counters.subspace(candidate_subspace)); - } - } + Some(_) => (), None => { - continue; + transaction.add_conflict_range( + candidate_subspace, + candidate_subspace, + ConflictRangeType::Write, + )?; + return Ok::<_, Error>(candidate); } }; } @@ -180,12 +199,10 @@ impl HighContentionAllocator { // can't be too small. So start small and scale up. We don't want this to // ever get *too* big because we have to store about window_size/2 recent // items. - if start < 255 { - return 64; - } - if start < 65535 { - return 1024; + match start { + _ if start < 255 => 64, + _ if start < 65535 => 1024, + _ => 8192, } - return 8192; } } diff --git a/foundationdb/tests/hca.rs b/foundationdb/tests/hca.rs index bde296d7..d5aae70d 100644 --- a/foundationdb/tests/hca.rs +++ b/foundationdb/tests/hca.rs @@ -11,74 +11,107 @@ extern crate futures; extern crate lazy_static; use std::collections::HashSet; +use std::iter::FromIterator; use futures::future::*; -use futures::prelude::*; use foundationdb::error::Error; use foundationdb::hca::HighContentionAllocator; -use foundationdb::tuple::{Element, Tuple}; use foundationdb::*; mod common; #[test] -fn test_allocate() { - use foundationdb::keyselector::KeySelector; - +fn test_hca_many_sequential_allocations() { common::setup_static(); - const N: usize = 100; - const KEY: &[u8] = b"h"; + const N: usize = 6000; + const KEY: &[u8] = b"test-hca-allocate"; let fut = Cluster::new(foundationdb::default_config_path()) .and_then(|cluster| cluster.create_database()) - // .and_then(|db| result(db.create_trx())) .and_then(|db: Database| { - let hca = HighContentionAllocator::new(Subspace::from_bytes(KEY)); + let cleared_range = db + .transact(move |tx| { + tx.clear_subspace_range(Subspace::from_bytes(KEY)); + futures::future::result(Ok::<(), failure::Error>(())) + }) + .wait(); - db.transact(move |tx| { - tx.clear_range(KEY, KEY); - futures::future::result(Ok::<(), failure::Error>(())) - }) - .wait(); + cleared_range.expect("unable to clear hca test range"); - let mut all_ints = Vec::new(); + let hca = HighContentionAllocator::new(Subspace::from_bytes(KEY)); - // let mut tx: Transaction = db.create_trx()?; - // tx.clear_subspace_range(Subspace::from_bytes(KEY)); - // tx.commit().wait()?; + let mut all_ints = Vec::new(); for _ in 0..N { let mut tx: Transaction = db.create_trx()?; let next_int: i64 = hca.allocate(&mut tx)?; - println!("next: {:?}", next_int); all_ints.push(next_int); - tx.commit().wait(); + tx.commit().wait()?; + } + + Ok::<_, Error>(all_ints) + }); + + let all_ints: Vec = fut.wait().expect("failed to run"); + check_hca_result_uniqueness(&all_ints); + + eprintln!("ran test {:?}", all_ints); +} + +#[test] +fn test_hca_concurrent_allocations() { + common::setup_static(); + const N: usize = 1000; + const KEY: &[u8] = b"test-hca-allocate-concurrent"; + + let fut = Cluster::new(foundationdb::default_config_path()) + .and_then(|cluster| cluster.create_database()) + .and_then(|db: Database| { + let cleared_range = db + .transact(move |tx| { + tx.clear_subspace_range(Subspace::from_bytes(KEY)); + futures::future::result(Ok::<(), failure::Error>(())) + }) + .wait(); + + cleared_range.expect("unable to clear hca test range"); + + let mut futures = Vec::new(); + let mut all_ints: Vec = Vec::new(); + + for _ in 0..N { + let f = db.transact(move |mut tx| { + HighContentionAllocator::new(Subspace::from_bytes(KEY)).allocate(&mut tx) + }); + + futures.push(f); + } + + for allocation in futures { + let i = allocation.wait().expect("unable to get allocation"); + all_ints.push(i); } Ok::<_, Error>(all_ints) }); - // .and_then(|mut trx| { - // println!("starting to run"); - // let hca = HighContentionAllocator::new(Subspace::from_bytes(KEY)); - // - // trx.clear_range(KEY, KEY); - // - // let mut all_ints = Vec::new(); - // - // for _ in 0..N { - // - // let next_int : i64 = hca.allocate(&mut trx)?; - // all_ints.push(next_int); - //// println!("next: {:?}", next_int); - // } - // - // Ok::<_, Error>(all_ints) - // }); - - println!("running test"); + let all_ints: Vec = fut.wait().expect("failed to run"); - println!("ran test {:?}", all_ints); + check_hca_result_uniqueness(&all_ints); + + eprintln!("ran test {:?}", all_ints); +} + +fn check_hca_result_uniqueness(results: &Vec) { + let result_set: HashSet = HashSet::from_iter(results.clone()); + + if results.len() != result_set.len() { + panic!( + "Set size does not much, got duplicates from HCA. Set: {:?}, List: {:?}", + result_set.len(), + results.len(), + ); + } } diff --git a/foundationdb/tests/range_2.rs b/foundationdb/tests/range_2.rs deleted file mode 100644 index b02ecaff..00000000 --- a/foundationdb/tests/range_2.rs +++ /dev/null @@ -1,63 +0,0 @@ -// Copyright 2018 foundationdb-rs developers, https://github.com/bluejekyll/foundationdb-rs/graphs/contributors -// -// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be -// copied, modified, or distributed except according to those terms. - -extern crate foundationdb; -extern crate futures; -#[macro_use] -extern crate lazy_static; - -use foundationdb::error::Error; -use foundationdb::*; -use futures::future::*; -use futures::prelude::*; - -mod common; - -#[test] -fn test_get_range() { - use foundationdb::keyselector::KeySelector; - - common::setup_static(); - const N: usize = 10000; - - let fut = Cluster::new(foundationdb::default_config_path()) - .and_then(|cluster| cluster.create_database()) - .and_then(|db| result(db.create_trx())) - .and_then(|trx| { - let key_begin = "test-range-"; - let key_end = "test-range."; - - trx.clear_range(key_begin.as_bytes(), key_end.as_bytes()); - - for _ in 0..N { - let key = format!("{}-{}", key_begin, common::random_str(10)); - let value = common::random_str(10); - trx.set(key.as_bytes(), value.as_bytes()); - } - - let begin = KeySelector::first_greater_or_equal(key_begin.as_bytes()); - let end = KeySelector::first_greater_than(key_end.as_bytes()); - let opt = transaction::RangeOptionBuilder::new(begin, end) - .reverse(true) - .build(); - - trx.get_ranges(opt) - .map_err(|(_opt, e)| e) - .fold(0, |count, item| { - let kvs = item.key_values(); - Ok::<_, Error>(count + kvs.as_ref().len()) - }) - .map(|count| { - if count != N { - panic!("count expected={}, found={}", N, count); - } - eprintln!("count: {:?}", count); - }) - }); - - fut.wait().expect("failed to run") -} From 6759b187db1f999d807a11c73c425aaedcac686d Mon Sep 17 00:00:00 2001 From: Paul Hemberger Date: Sat, 1 Jun 2019 09:42:53 -0400 Subject: [PATCH 6/7] add docs and description of algorithm --- foundationdb/src/hca.rs | 38 ++++++++++++++++++++++++------------ foundationdb/src/lib.rs | 7 ++----- foundationdb/src/subspace.rs | 5 ++++- 3 files changed, 31 insertions(+), 19 deletions(-) diff --git a/foundationdb/src/hca.rs b/foundationdb/src/hca.rs index 95e7bdf9..6b31513c 100644 --- a/foundationdb/src/hca.rs +++ b/foundationdb/src/hca.rs @@ -6,8 +6,23 @@ // http://opensource.org/licenses/MIT>, at your option. This file may not be // copied, modified, or distributed except according to those terms. -//! Docs! +//! The directory layer offers subspace indirection, where logical application subspaces are mapped to short, auto-generated key prefixes. This prefix assignment is done by the High Contention Allocator, which allows many clients to allocate short directory prefixes efficiently. //! +//! The allocation process works over candidate value windows. It uses two subspaces to operate, the "counters" subspace and "recents" subspace (derived from the subspace used to create the HCA). +//! +//! "counters" contains a single key : "counters : window_start", whose value is the # of allocations in the current window. "window_start" is an integer that marks the lower bound of values that can be assigned from the current window. +//! "recents" can contain many keys : "recents : ", where each "candidate" is an integer that has been assigned to some client +//! +//! Assignment has two stages that are executed in a loop until they both succeed. +//! +//! 1. Find the current window. The client scans "counters : *" to get the current "window_start" and how many allocations have been made in the current window. +//! If the window is more than half-full (using pre-defined window sizes), the window is advanced: "counters : *" and "recents : *" are both cleared, and a new "counters : window_start + window_size" key is created with a value of 0. (1) is retried +//! If the window still has space, it moves to (2). +//! +//! 2. Find a candidate value inside that window. The client picks a candidate number between "[window_start, window_start + window_size)" and tries to set the key "recents : ". +//! If the write succeeds, the candidate is returned as the allocated value. Success! +//! If the write fails because the window has been advanced, it repeats (1). +//! If the write fails because the value was already set, it repeats (2). use std::sync::Mutex; @@ -23,32 +38,29 @@ use subspace::Subspace; use transaction::{RangeOptionBuilder, Transaction}; use tuple::Element; -lazy_static! { - static ref LOCK: Mutex = Mutex::new(0); -} - const ONE_BYTES: &[u8] = &[0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]; -/// High Contention Allocator -/// -#[derive(Debug, Clone)] +/// Represents a High Contention Allocator for a given subspace +#[derive(Debug)] pub struct HighContentionAllocator { counters: Subspace, recent: Subspace, + allocation_mutex: Mutex<()>, } impl HighContentionAllocator { - /// New HCA + /// Constructs an allocator that will use the input subspace for assigning values. + /// The given subspace should not be used by anything other than the allocator pub fn new(subspace: Subspace) -> HighContentionAllocator { HighContentionAllocator { counters: subspace.subspace(0), recent: subspace.subspace(1), + allocation_mutex: Mutex::new(()), } } /// Returns a byte string that - /// 1) has never and will never be returned by another call to this - /// method on the same subspace + /// 1) has never and will never be returned by another call to this method on the same subspace /// 2) is nearly as short as possible given the above pub fn allocate(&self, transaction: &mut Transaction) -> Result { let (begin, end) = self.counters.range(); @@ -88,7 +100,7 @@ impl HighContentionAllocator { let mut window_advanced = false; loop { - let mutex_guard = LOCK.lock().unwrap(); + let mutex_guard = self.allocation_mutex.lock().unwrap(); if window_advanced { transaction @@ -135,7 +147,7 @@ impl HighContentionAllocator { let recent_subspace_for_candidate = self.recent.subspace(candidate); let candidate_subspace = recent_subspace_for_candidate.bytes(); - let mutex_guard = LOCK.lock().unwrap(); + let mutex_guard = self.allocation_mutex.lock().unwrap(); let counters_begin = KeySelector::first_greater_or_equal(&begin); let counters_end = KeySelector::first_greater_than(&end); diff --git a/foundationdb/src/lib.rs b/foundationdb/src/lib.rs index 537bc6e6..d2cf925d 100644 --- a/foundationdb/src/lib.rs +++ b/foundationdb/src/lib.rs @@ -119,11 +119,10 @@ extern crate foundationdb_sys; #[macro_use] extern crate futures; extern crate core; -#[cfg(feature = "uuid")] -extern crate uuid; -#[macro_use] extern crate lazy_static; extern crate rand; +#[cfg(feature = "uuid")] +extern crate uuid; pub mod cluster; pub mod database; @@ -136,8 +135,6 @@ pub mod network; /// Generated configuration types for use with the various `set_option` functions #[allow(missing_docs)] pub mod options; -/// TODO: @phemberger -#[allow(missing_docs)] pub mod subspace; pub mod transaction; pub mod tuple; diff --git a/foundationdb/src/subspace.rs b/foundationdb/src/subspace.rs index 85fdc684..cb6f61da 100644 --- a/foundationdb/src/subspace.rs +++ b/foundationdb/src/subspace.rs @@ -6,6 +6,8 @@ // http://opensource.org/licenses/MIT>, at your option. This file may not be // copied, modified, or distributed except according to those terms. +//! Implements the FDB Subspace Layer + use tuple::{Decode, Encode, Error, Result}; /// Represents a well-defined region of keyspace in a FoundationDB database @@ -99,9 +101,10 @@ impl Subspace { #[cfg(test)] mod tests { - use super::*; use tuple::Tuple; + use super::*; + #[test] fn sub() { let ss0: Subspace = 1.into(); From f760c335a9c3b8ad5d30ce9affd42cabf06fd346 Mon Sep 17 00:00:00 2001 From: Paul Hemberger Date: Sun, 2 Jun 2019 09:10:34 -0400 Subject: [PATCH 7/7] tweaks to doc formatting --- foundationdb/src/hca.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/foundationdb/src/hca.rs b/foundationdb/src/hca.rs index 6b31513c..ab1e6f14 100644 --- a/foundationdb/src/hca.rs +++ b/foundationdb/src/hca.rs @@ -10,19 +10,19 @@ //! //! The allocation process works over candidate value windows. It uses two subspaces to operate, the "counters" subspace and "recents" subspace (derived from the subspace used to create the HCA). //! -//! "counters" contains a single key : "counters : window_start", whose value is the # of allocations in the current window. "window_start" is an integer that marks the lower bound of values that can be assigned from the current window. -//! "recents" can contain many keys : "recents : ", where each "candidate" is an integer that has been assigned to some client +//! "counters" contains a single key : "counters : window_start", whose value is the number of allocations in the current window. "window_start" is an integer that marks the lower bound of values that can be assigned from the current window. +//! "recents" can contain many keys : "recents : ", where each "candidate" is an integer that has been assigned to some client //! //! Assignment has two stages that are executed in a loop until they both succeed. //! -//! 1. Find the current window. The client scans "counters : *" to get the current "window_start" and how many allocations have been made in the current window. -//! If the window is more than half-full (using pre-defined window sizes), the window is advanced: "counters : *" and "recents : *" are both cleared, and a new "counters : window_start + window_size" key is created with a value of 0. (1) is retried -//! If the window still has space, it moves to (2). +//! 1. Find the current window. The client scans "counters : *" to get the current "window_start" and how many allocations have been made in the current window. +//! If the window is more than half-full (using pre-defined window sizes), the window is advanced: "counters : *" and "recents : *" are both cleared, and a new "counters : window_start + window_size" key is created with a value of 0. (1) is retried +//! If the window still has space, it moves to (2). //! -//! 2. Find a candidate value inside that window. The client picks a candidate number between "[window_start, window_start + window_size)" and tries to set the key "recents : ". -//! If the write succeeds, the candidate is returned as the allocated value. Success! -//! If the write fails because the window has been advanced, it repeats (1). -//! If the write fails because the value was already set, it repeats (2). +//! 2. Find a candidate value inside that window. The client picks a candidate number between "[window_start, window_start + window_size)" and tries to set the key "recents : ". +//! If the write succeeds, the candidate is returned as the allocated value. Success! +//! If the write fails because the window has been advanced, it repeats (1). +//! If the write fails because the value was already set, it repeats (2). use std::sync::Mutex;