Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
yito88 committed Nov 8, 2024
1 parent a865827 commit 43ed2be
Show file tree
Hide file tree
Showing 7 changed files with 280 additions and 47 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ An embedded key-value store
# TODO
- API
- [x] get()
- [ ] scan()
- [x] put()
- including insert and update
- [x] delete()
Expand Down
24 changes: 24 additions & 0 deletions src/compactor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
mod leveled;

use std::sync::{Arc, RwLock};

use leveled::LeveledCompactor;

use crate::config::Config;
use crate::sstable_manager::LeveledTables;

trait CompactionContext {
fn compact(&self, level: usize, updated_tables: LeveledTables) -> Result<(), std::io::Error>;
}

pub enum Compactor {
/// Leveled compaction
Leveled(LeveledCompactor),
//Tiered(TieredCompactor),
}

impl Compactor {
pub fn leveled_compactor(config: Config, tables: Arc<RwLock<LeveledTables>>) -> Self {
Self::Leveled(LeveledCompactor::new(config, tables))
}
}
180 changes: 180 additions & 0 deletions src/compactor/leveled.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
use bloomfilter::Bloom;
use std::collections::BTreeMap;
use std::fs::File;
use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
use std::sync::{Arc, RwLock};

use super::CompactionContext;
use crate::config::Config;
use crate::sparse_index::SparseIndex;
use crate::sstable_manager::{read_data, LeveledTables, TableId, TableInfo, Tables};
use crate::util::data_util;

const WRITE_BUFFER_SIZE: usize = 1 << 18;
const READ_BUFFER_SIZE: usize = 1 << 16;

pub struct LeveledCompactor {
name: String,
/// Configurations
config: Config,
/// Shared table info.
tables: Arc<RwLock<LeveledTables>>,
}

impl LeveledCompactor {
pub fn new(name: String, config: Config, tables: Arc<RwLock<LeveledTables>>) -> Self {
Self {
name,
config,
tables,
}
}
}

impl CompactionContext for LeveledCompactor {
fn compact(
&self,
level: usize,
deleted_tables: Vec<Vec<TableId>>,
new_tables: LeveledTables,
) -> Result<(Vec<Vec<TableId>>, LeveledTables), std::io::Error> {
let mut target_tables = match new_tables.get(level) {
Some(tables) => tables,
None => return Ok((deleted_tables, new_tables)),
};
let current_tables = match self.tables.read().unwrap().get(level) {
Some(table) => table,
None => return Ok((deleted_tables, new_tables)),
};
let head_key = target_tables
.first_key_value()
.map(|(_, table)| table.range.0)
.expect("Smallest table should exist");
let tail_key = target_tables
.last_key_value()
.map(|(_, table)| table.range.1)
.expect("Largest table should exist");

let new_table_id = current_tables
.last_key_value()
.map(|(_, table)| table.id + 1)
.expect("Largest table should exist");
let table_file_path = self.config.get_table_file_path(&self.name, new_table_id);
let table_file = File::create(table_file_path)?;
let mut writer = BufWriter::with_capacity(WRITE_BUFFER_SIZE, &table_file);

let mut current_tables = current_tables
.iter()
.filter(|(_, table)| table.range.0 <= tail_key && head_key <= table.range.1);
let deleted_table_ids = current_tables.map(|table| table.0).collect();

let mut offset = 0;
let mut index = SparseIndex::new();
let mut filter = Bloom::new_for_fp_rate(
self.config.get_filter_items_count(),
self.config.get_filter_fp_rate(),
);
let mut key_range = (None, None);

let next_table_reader = |tables_iter: &mut dyn Iterator<Item = (&TableId, &TableInfo)>| -> Option<BufReader<File>> {
let table = tables_iter.next();
table.map(|(id, _)| {
let path = self.config.get_table_file_path(&self.name, *id);
let file = File::open(path).expect("File should exist");
BufReader::with_capacity(READ_BUFFER_SIZE, file)
})
};

// TODO: hard to read the big loop. Need refactoring
let mut target_tables = target_tables.iter();
let mut target_reader = next_table_reader(&mut target_tables);
let mut current_reader = next_table_reader(&mut current_tables);

let mut target_pair = match target_reader {
Some(reader) => read_data(&mut reader)?.zip(read_data(&mut reader)?),
None => None,
};
let mut current_pair = match current_reader {
Some(reader) => read_data(&mut reader)?.zip(read_data(&mut reader)?),
None => None,
};
loop {
let (key, value) = match (target_pair, current_pair) {
(Some((target_key, target_val)), Some((current_key, current_val))) => {
if target_key <= current_key {
let reader = target_reader.expect("Reader should exist");
target_pair = read_data(&mut reader)?.zip(read_data(&mut reader)?);
(target_key, target_val)
} else {
let reader = current_reader.expect("Reader should exist");
current_pair = read_data(&mut reader)?.zip(read_data(&mut reader)?);
if current_pair.is_none() {}
(current_key, current_val)
}
}
(Some((target_key, target_val)), None) => {
// check the next table
current_reader = next_table_reader(&mut current_tables);
match current_reader {
Some(reader) => {
current_pair = read_data(&mut reader)?.zip(read_data(&mut reader)?);
// compare the target and the current again
continue;
}
None => (target_key, target_val),
}
}
(None, Some((current_key, current_val))) => {
// check the next table
target_reader = next_table_reader(&mut target_tables);
match target_reader {
Some(reader) => {
target_pair = read_data(&mut reader)?.zip(read_data(&mut reader)?);
// compare the target and the current again
continue;
}
None => (current_key, current_val),
}
}
(None, None) => break,
};
if key_range.0.is_none() {
key_range.0 = Some(key.clone());
}
filter.set(&key);
index.insert(&key, offset);
offset += data_util::get_data_size(key.len(), value.len());
writer.write_all(&data_util::format_data_with_crc(&key, &value))?;
key_range.1 = Some(key);
}

table_file.sync_all()?;
let range = (
key_range.0.expect("Head key should exist"),
key_range.1.expect("Tail key should exist"),
);

let table_info = TableInfo {
id: new_table_id,
size: table_file.metadata()?.len() as _,
level,
range,
filter,
index,
};

let mut new_tables = new_tables;
match new_tables.get_mut(level) {
Some(tables) => {
tables.insert(new_table_id, table_info);
}
None => {
let mut tables = BTreeMap::new();
tables.insert(new_table_id, table_info);
new_tables.push(tables);
}
}

Ok(new_tables)
}
}
41 changes: 29 additions & 12 deletions src/flush_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use log::{debug, trace};
use mockall_double::double;
use std::fs::File;
use std::io::{BufWriter, Write};
use std::sync::{Arc, RwLock};
use std::sync::{Arc, Mutex, RwLock};
use std::thread::{self, JoinHandle};

use crate::config::Config;
Expand Down Expand Up @@ -52,23 +52,23 @@ pub fn spawn_flush_writer(
pub struct FlushWriter {
name: String,
config: Config,
table_id: TableId,
last_table_id: Arc<Mutex<TableId>>,
}

impl FlushWriter {
pub fn new(name: &str, config: Config, table_id: usize) -> Self {
pub fn new(name: &str, config: Config, last_table_id: Arc<Mutex<TableId>>) -> Self {
FlushWriter {
name: name.to_string(),
config,
table_id,
last_table_id,
}
}

/// flush the current tree
pub fn flush(&mut self, first_leaf: Arc<RwLock<Leaf>>) -> Result<TableInfo, std::io::Error> {
debug!(
"Starting flush FPTree of {} to SSTable ID {}",
self.name, self.table_id
"Starting flush FPTree of {} to SSTable ID {:?}",
self.name, self.last_table_id
);
let leaf_manager = first_leaf.read().unwrap().get_leaf_manager();
let id_list = leaf_manager.read().unwrap().get_leaf_id_chain();
Expand All @@ -91,14 +91,14 @@ impl FlushWriter {
}

fn create_new_table(&mut self) -> Result<(TableId, File), std::io::Error> {
let id = self.table_id;
let table_file_path = self.config.get_table_file_path(&self.name, id);
let mut id = self.last_table_id.lock().unwrap();
let table_file_path = self.config.get_table_file_path(&self.name, *id);
let table_file = File::create(table_file_path)?;

// odd ID used by compactions
self.table_id += 2;
*id += 2;

Ok((id, table_file))
Ok((*id, table_file))
}

fn flush_kv(
Expand All @@ -113,12 +113,14 @@ impl FlushWriter {
self.config.get_filter_items_count(),
self.config.get_filter_fp_rate(),
);
let mut head_key = None;
let mut tail_key = None;
let mut writer = BufWriter::with_capacity(WRITE_BUFFER_SIZE, &table_file);
for id in id_list {
for id in &id_list {
let header = leaf_manager
.read()
.unwrap()
.get_header(id)
.get_header(*id)
.expect("The header doesn't exist");
let mut kv_pairs: Vec<(Vec<u8>, Vec<u8>)> = Vec::with_capacity(NUM_SLOT);
for slot in 0..NUM_SLOT {
Expand All @@ -135,6 +137,16 @@ impl FlushWriter {
}
// it is enough to sort only kv_pairs since all leaves are ordered
kv_pairs.sort();

if !kv_pairs.is_empty() {
if id == id_list.first().expect("The list shouldn't be empty") {
head_key = kv_pairs.first().map(|(k, _)| k.clone());
}
if id == id_list.last().expect("The list shouldn't be empty") {
tail_key = kv_pairs.last().map(|(k, _)| k.clone());
}
}

for (key, value) in kv_pairs {
filter.set(&key);
index.insert(&key, offset);
Expand All @@ -144,10 +156,15 @@ impl FlushWriter {
}
table_file.sync_all()?;

let range = (
head_key.expect("Head key should exist"),
tail_key.expect("Tail key should exist"),
);
Ok(TableInfo {
id: table_id,
size: table_file.metadata()?.len() as _,
level: 0,
range,
filter,
index,
})
Expand Down
5 changes: 3 additions & 2 deletions src/kvs.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crossbeam_channel::Sender;
use log::{debug, error, info, trace};
use std::path::Path;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::thread::JoinHandle;

//use crate::amphis_error::CrudError;
Expand All @@ -26,7 +26,8 @@ impl KVS {
let (sstable_manager, next_table_id) = SstableManager::new(name, config.clone())?;
let sstable_manager = Arc::new(sstable_manager);

let mut flush_writer = FlushWriter::new(name, config.clone(), next_table_id);
let mut flush_writer =
FlushWriter::new(name, config.clone(), Arc::new(Mutex::new(next_table_id)));
if Path::new(&path).exists() {
// flush the exsting trees
for entry in std::fs::read_dir(path)? {
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod config;
pub mod kvs;

mod compactor;
mod flush_writer;
mod fptree;
mod fptree_manager;
Expand Down
Loading

0 comments on commit 43ed2be

Please sign in to comment.