diff --git a/.env b/.env new file mode 100644 index 0000000..c152c23 --- /dev/null +++ b/.env @@ -0,0 +1 @@ +RUST_LOG=debug \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index bfe8688..eb073c6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -165,6 +165,12 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "524cbf6897b527295dff137cec09ecf3a05f4fddffd7dfcd1585403449e74198" +[[package]] +name = "dotenv" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f" + [[package]] name = "env_logger" version = "0.10.0" @@ -267,6 +273,7 @@ version = "0.1.0" dependencies = [ "assert_cmd", "clap", + "dotenv", "env_logger", "lazy_static", "log", diff --git a/Cargo.toml b/Cargo.toml index 0960e96..1e47101 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ test = false [dependencies] clap = { version = "4.1.6", features = ["derive"] } +dotenv = "0.15.0" env_logger = "0.10.0" lazy_static = "1.4.0" log = "0.4.20" diff --git a/src/bin/main.rs b/src/bin/main.rs index d41a385..f00cc87 100644 --- a/src/bin/main.rs +++ b/src/bin/main.rs @@ -3,7 +3,8 @@ use kvs::cli; use log::{error, info}; use std::env; fn main() -> kvs::Result<()> { - // Read kv_00001.log file into BufReader + ::dotenv::dotenv().ok(); + // Read kv_00001.log file into BufReader // let file = std::fs::File::open("kv_00001.log")?; // let buf = std::io::BufReader::new(&file); // let buf2 = std::io::BufRead::lines(buf) @@ -22,6 +23,10 @@ fn main() -> kvs::Result<()> { // create a local kvs instance let mut kvs = kvs::KvStore::open(env::current_dir()?)?; + if cli.compact { + kvs.compaction()?; + } + if let Some(action) = cli.action { match action { Action::Set(SetCmd { key, value }) => { @@ -43,7 +48,7 @@ fn main() -> kvs::Result<()> { } Action::Remove(RmCmd { key }) => { info!("Removing \"{key}\""); - let _ = kvs.remove(key); + let _ = kvs.remove(key); exit_program(0); } } diff --git a/src/cli.rs b/src/cli.rs index c886c91..e53fc49 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -14,6 +14,10 @@ pub struct KvsCLI { /// Turn debugging information on #[arg(short, long, action = clap::ArgAction::Count)] pub debug: u8, + + /// Run compaction + #[arg(short, long)] + pub compact: bool, } #[derive(clap::Args, Serialize, Deserialize, Debug)] diff --git a/src/lib.rs b/src/lib.rs index 665a8d1..16c6c58 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -32,7 +32,7 @@ use ron::ser::PrettyConfig; use serde::Deserialize; use std::{ cell::RefCell, - collections::HashMap, + collections::{BTreeMap, HashMap}, fs::{File, OpenOptions}, io::{BufRead, BufReader, BufWriter, Read, Seek, Write}, path::{Path, PathBuf}, @@ -124,23 +124,7 @@ impl KvStore { Err(err) => error!("Cannot load in memory index: {:?}", err), } } else { - let mut buf = String::new(); - // Reads the entire file at once - // Alternatively you can use BufReader on the file and read it line by line to perform deserialization - // and replay the `Action` - let _bytes_read = (*disk).read_to_string(&mut buf).map_err(|e| { - error!("Cannot load log file into memory"); - e - })?; - let mut de = ron::Deserializer::from_str(&buf).expect("RON: deserializer init error"); - let log: Vec = std::iter::from_fn({ - move || { - de.end() - .is_err() - .then_some(Action::deserialize::<_>(&mut de)) - } - }) - .collect::, _>>()?; + let log = read_action_from_log(&mut disk)?; // -- Replay the commands in the log -- let mut offset: Offset = 0; for (idx, action) in log.iter().enumerate() { @@ -170,12 +154,108 @@ impl KvStore { drop(disk); Ok(store) } + + /// Run compaction on the disk log + pub fn compaction(&mut self) -> Result<()> { + let mut file = self + .disk + .as_ref() + .ok_or(DbError::Uninitialized)? + .borrow_mut() + .try_clone()?; + file.rewind()?; + + let log: Vec = read_action_from_log(&mut file)?; + // Hold unique keys last set value, None in case it was removed + let mut unique_keys: BTreeMap> = BTreeMap::new(); + let mut compacted_log: Vec = Vec::with_capacity(log.capacity()); + // debug!("Log to compact : {log:?}"); + log.iter().rev().for_each(|action| { + // debug!("compact OFFSET: {offset}"); + match action { + Action::Set(SetCmd { key, value }) => { + if !unique_keys.contains_key(key) { + unique_keys.insert(key.to_string(), Some(value.to_string())); + } + } + Action::Get(_) => (), + Action::Remove(RmCmd { key }) => { + if !unique_keys.contains_key(key) { + unique_keys.insert(key.to_string(), None); + } + } + } + }); + debug!("Unique Keys : {:?}", unique_keys); + // Rebuild log + compacted_log.extend(unique_keys.into_iter().rev().map(|(key, value)| { + if let Some(value) = value { + Action::Set(SetCmd { key, value }) + } else { + Action::Remove(RmCmd { key }) + } + })); + // Recompute offsets + self.map.clear(); + let mut offset: Offset = 1; + compacted_log.iter().for_each(|action| { + match action { + Action::Set(SetCmd { key, .. }) => { + self.map.insert(key.clone(), offset); + } + Action::Get(_) => {} + Action::Remove(RmCmd { key }) => { + self.map.remove(key); + } + }; + offset += 1; + }); + self.offset = offset; + debug!("Post compaction, current offset {}", self.offset); + // Write compacted_log to self.disk + let mut file = self + .disk + .as_ref() + .ok_or(DbError::Uninitialized)? + .borrow_mut(); + file.rewind()?; + // Clear file contents + file.set_len(0)?; + // Write serialized to file but one entry at a time instead of as a Vec + for action in compacted_log { + let serialized = ron::ser::to_string_pretty(&action, RON_CONFIG.to_owned())? + "\n"; + file.write_all(serialized.as_bytes())?; + } + Ok(()) + } +} +/// Deserialize on disk log +fn read_action_from_log(disk: &mut File) -> Result> { + let mut buf = String::new(); + let _bytes_read = (*disk).read_to_string(&mut buf).map_err(|e| { + error!("Cannot load log file into memory"); + e + })?; + let mut de = ron::Deserializer::from_str(&buf).expect("RON: deserializer init error"); + let log: Vec = std::iter::from_fn({ + move || { + de.end() + .is_err() + .then_some(Action::deserialize::<_>(&mut de)) + } + }) + .collect::, _>>()?; + Ok(log) } impl KvStore { /// Set : When setting a key to a value, kvs writes the set command to disk in a sequential log, /// then stores the log pointer (file offset) of that command in the in-memory index from key to pointer. pub fn set(&mut self, key: String, value: String) -> Result<()> { + if self.map.len() > 500 { + // trigger compaction + (*self).compaction()?; + } let mut file = self .disk .as_ref() @@ -193,6 +273,7 @@ impl KvStore { // write serialized to self.disk // TODO : Maybe think about optimizing this? file sys-call on every set cmd? file.write_all(serialized.as_bytes())?; + Ok(()) } /// Get : When retrieving a value for a key with the get command, it searches the index,