diff --git a/src/bin/sqrl-server.rs b/src/bin/sqrl-server.rs index bea37ea..c270cd8 100644 --- a/src/bin/sqrl-server.rs +++ b/src/bin/sqrl-server.rs @@ -84,10 +84,16 @@ async fn main() -> anyhow::Result<()> { if clients.len() > 3 { warn!("Replicating to many followers can greatly impact write performance"); } - ReplicatedServer::new(clients.into(), app.log_file, app.addr)? + ReplicatedServer::new(clients.into(), app.log_file, app.addr) + .await? + .run() + .await + } + replication::Mode::Follower => { + StandaloneServer::new(app.log_file, app.addr) + .await? .run() .await } - replication::Mode::Follower => StandaloneServer::new(app.log_file, app.addr)?.run().await, } } diff --git a/src/replication/server.rs b/src/replication/server.rs index 9c584b6..60d2bc9 100644 --- a/src/replication/server.rs +++ b/src/replication/server.rs @@ -27,7 +27,7 @@ pub struct ReplicatedServer { } impl ReplicatedServer { - pub fn new

( + pub async fn new

( clients: Mutex>, path: P, addr: SocketAddr, @@ -37,7 +37,7 @@ impl ReplicatedServer { { Ok(Self { addr, - server: Arc::new(StandaloneServer::new(path, addr)?), + server: Arc::new(StandaloneServer::new(path, addr).await?), remote_replicas: Arc::new(clients), }) } diff --git a/src/server.rs b/src/server.rs index cfadd1f..9839c19 100644 --- a/src/server.rs +++ b/src/server.rs @@ -16,11 +16,11 @@ pub struct StandaloneServer { } impl StandaloneServer { - pub fn new

(path: P, addr: SocketAddr) -> anyhow::Result + pub async fn new

(path: P, addr: SocketAddr) -> anyhow::Result where P: Into, { - let store = Arc::new(KvStore::open(path)?); + let store = Arc::new(KvStore::open(path).await?); Ok(Self { store, addr }) } diff --git a/src/store.rs b/src/store.rs index a351dde..9eb36a0 100644 --- a/src/store.rs +++ b/src/store.rs @@ -12,7 +12,10 @@ use std::path::PathBuf; use std::str::FromStr; use std::sync::atomic::AtomicUsize; use std::sync::{Arc, RwLock}; +use std::time::Duration; use std::usize; +use tokio::sync::mpsc::error::TryRecvError; +use tokio::sync::mpsc::{Receiver, Sender}; use tracing::level_filters::LevelFilter; use tracing::{self, debug, info, warn}; use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt; @@ -62,6 +65,12 @@ pub struct KvStore { /// The maximum size of a log file in bytes. max_log_file_size: u64, + /// Channel on which compaction notifications are sent. + /// + /// This allows the [`KvStore`] to signal to the background task which receives + /// compaction jobs to begin its work. + compaction_tx: Sender<()>, + _tracing: Option>, } @@ -96,10 +105,11 @@ struct KeydirEntry { timestamp: i64, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug)] struct StoreConfig { log_location: PathBuf, max_log_file_size: u64, + compaction_tx: Sender<()>, } impl KvsEngine for KvStore { @@ -230,28 +240,31 @@ impl KvStore { /// Create a new KvStore. /// /// The store is created in memory and is not persisted to disk. - fn new(config: StoreConfig) -> KvStore { + fn new(config: &StoreConfig) -> KvStore { KvStore { writer: Arc::new(RwLock::new(StoreWriter::default())), - log_location: config.log_location, + log_location: config.log_location.clone(), keydir: Arc::new(DashMap::new()), max_log_file_size: config.max_log_file_size, + compaction_tx: config.compaction_tx.clone(), _tracing: None, } } /// Open a KvStore at the given path. - pub fn open

(path: P) -> Result + pub async fn open

(path: P) -> Result where P: Into, { let path = path.into(); + let (tx, mut rx) = tokio::sync::mpsc::channel(5); let store_config = StoreConfig { log_location: path.clone(), max_log_file_size: MAX_LOG_FILE_SIZE.with(|f| *f), + compaction_tx: tx, }; - let mut store = KvStore::new(store_config.clone()); + let mut store = KvStore::new(&store_config); let log_level = std::env::var("KVS_LOG").unwrap_or("info".to_string()); store.setup_logging(log_level)?; info!("Initialising store"); @@ -260,6 +273,24 @@ impl KvStore { debug!("Creating initial log file"); store.set_active_log_handle()?; + tokio::spawn(async move { + let interval_ms = std::env::var("KVS_COMPACTION_INTERVAL_MS") + .unwrap_or_else(|_| "200".to_string()) + .parse::() + .expect("Unable to parse default compaction interval"); + let mut interval = tokio::time::interval(Duration::from_millis(interval_ms)); + info!(interval_ms, "Background compaction polling"); + loop { + interval.tick().await; + match rx.try_recv() { + Ok(_trigger) => { + info!("Compaction triggered"); + } + Err(TryRecvError::Empty) => debug!("No compaction required"), + Err(TryRecvError::Disconnected) => panic!("Compaction channel unavailable"), + } + } + }); Ok(store) } @@ -486,10 +517,17 @@ impl KvStore { pub(crate) fn setup_logging(&mut self, level: String) -> anyhow::Result<()> { let level = LevelFilter::from_str(&level)?; - let layer = tracing_subscriber::fmt::layer().with_writer(std::io::stderr); + let layer = tracing_subscriber::fmt::layer() + .compact() + .with_writer(std::io::stderr) + .with_thread_ids(true) + .with_line_number(true) + .with_file(true) + .with_target(false); let subscriber = tracing_subscriber::registry().with(level).with(layer); - let tracing_guard = tracing::subscriber::set_default(subscriber); - self._tracing = Some(Arc::new(tracing_guard)); + tracing::subscriber::set_global_default(subscriber)?; + //let tracing_guard = tracing::subscriber::set_default(subscriber); + //self._tracing = Some(Arc::new(tracing_guard)); Ok(()) } }