Skip to content

Commit

Permalink
feat: wip background compaction
Browse files Browse the repository at this point in the history
  • Loading branch information
jdockerty committed Apr 23, 2024
1 parent f1decd7 commit ecf190f
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 14 deletions.
10 changes: 8 additions & 2 deletions src/bin/sqrl-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,16 @@ async fn main() -> anyhow::Result<()> {
if clients.len() > 3 {
warn!("Replicating to many followers can greatly impact write performance");
}
ReplicatedServer::new(clients.into(), app.log_file, app.addr)?
ReplicatedServer::new(clients.into(), app.log_file, app.addr)
.await?
.run()
.await
}
replication::Mode::Follower => {
StandaloneServer::new(app.log_file, app.addr)
.await?
.run()
.await
}
replication::Mode::Follower => StandaloneServer::new(app.log_file, app.addr)?.run().await,
}
}
4 changes: 2 additions & 2 deletions src/replication/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub struct ReplicatedServer {
}

impl ReplicatedServer {
pub fn new<P>(
pub async fn new<P>(
clients: Mutex<Vec<RemoteNodeClient>>,
path: P,
addr: SocketAddr,
Expand All @@ -37,7 +37,7 @@ impl ReplicatedServer {
{
Ok(Self {
addr,
server: Arc::new(StandaloneServer::new(path, addr)?),
server: Arc::new(StandaloneServer::new(path, addr).await?),
remote_replicas: Arc::new(clients),
})
}
Expand Down
4 changes: 2 additions & 2 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ pub struct StandaloneServer {
}

impl StandaloneServer {
pub fn new<P>(path: P, addr: SocketAddr) -> anyhow::Result<Self>
pub async fn new<P>(path: P, addr: SocketAddr) -> anyhow::Result<Self>
where
P: Into<std::path::PathBuf>,
{
let store = Arc::new(KvStore::open(path)?);
let store = Arc::new(KvStore::open(path).await?);
Ok(Self { store, addr })
}

Expand Down
54 changes: 46 additions & 8 deletions src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ use std::path::PathBuf;
use std::str::FromStr;
use std::sync::atomic::AtomicUsize;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use std::usize;
use tokio::sync::mpsc::error::TryRecvError;
use tokio::sync::mpsc::{Receiver, Sender};
use tracing::level_filters::LevelFilter;
use tracing::{self, debug, info, warn};
use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt;
Expand Down Expand Up @@ -62,6 +65,12 @@ pub struct KvStore {
/// The maximum size of a log file in bytes.
max_log_file_size: u64,

/// Channel on which compaction notifications are sent.
///
/// This allows the [`KvStore`] to signal to the background task which receives
/// compaction jobs to begin its work.
compaction_tx: Sender<()>,

_tracing: Option<Arc<tracing::subscriber::DefaultGuard>>,
}

Expand Down Expand Up @@ -96,10 +105,11 @@ struct KeydirEntry {
timestamp: i64,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug)]
struct StoreConfig {
log_location: PathBuf,
max_log_file_size: u64,
compaction_tx: Sender<()>,
}

impl KvsEngine for KvStore {
Expand Down Expand Up @@ -230,28 +240,31 @@ impl KvStore {
/// Create a new KvStore.
///
/// The store is created in memory and is not persisted to disk.
fn new(config: StoreConfig) -> KvStore {
fn new(config: &StoreConfig) -> KvStore {
KvStore {
writer: Arc::new(RwLock::new(StoreWriter::default())),
log_location: config.log_location,
log_location: config.log_location.clone(),
keydir: Arc::new(DashMap::new()),
max_log_file_size: config.max_log_file_size,
compaction_tx: config.compaction_tx.clone(),
_tracing: None,
}
}

/// Open a KvStore at the given path.
pub fn open<P>(path: P) -> Result<KvStore>
pub async fn open<P>(path: P) -> Result<KvStore>
where
P: Into<PathBuf>,
{
let path = path.into();
let (tx, mut rx) = tokio::sync::mpsc::channel(5);
let store_config = StoreConfig {
log_location: path.clone(),
max_log_file_size: MAX_LOG_FILE_SIZE.with(|f| *f),
compaction_tx: tx,
};

let mut store = KvStore::new(store_config.clone());
let mut store = KvStore::new(&store_config);
let log_level = std::env::var("KVS_LOG").unwrap_or("info".to_string());
store.setup_logging(log_level)?;
info!("Initialising store");
Expand All @@ -260,6 +273,24 @@ impl KvStore {

debug!("Creating initial log file");
store.set_active_log_handle()?;
tokio::spawn(async move {
let interval_ms = std::env::var("KVS_COMPACTION_INTERVAL_MS")
.unwrap_or_else(|_| "200".to_string())
.parse::<u64>()
.expect("Unable to parse default compaction interval");
let mut interval = tokio::time::interval(Duration::from_millis(interval_ms));
info!(interval_ms, "Background compaction polling");
loop {
interval.tick().await;
match rx.try_recv() {
Ok(_trigger) => {
info!("Compaction triggered");
}
Err(TryRecvError::Empty) => debug!("No compaction required"),
Err(TryRecvError::Disconnected) => panic!("Compaction channel unavailable"),
}
}
});
Ok(store)
}

Expand Down Expand Up @@ -486,10 +517,17 @@ impl KvStore {

pub(crate) fn setup_logging(&mut self, level: String) -> anyhow::Result<()> {
let level = LevelFilter::from_str(&level)?;
let layer = tracing_subscriber::fmt::layer().with_writer(std::io::stderr);
let layer = tracing_subscriber::fmt::layer()
.compact()
.with_writer(std::io::stderr)
.with_thread_ids(true)
.with_line_number(true)
.with_file(true)
.with_target(false);
let subscriber = tracing_subscriber::registry().with(level).with(layer);
let tracing_guard = tracing::subscriber::set_default(subscriber);
self._tracing = Some(Arc::new(tracing_guard));
tracing::subscriber::set_global_default(subscriber)?;
//let tracing_guard = tracing::subscriber::set_default(subscriber);
//self._tracing = Some(Arc::new(tracing_guard));
Ok(())
}
}
Expand Down

0 comments on commit ecf190f

Please sign in to comment.