Skip to content

Commit

Permalink
wip: raft
Browse files Browse the repository at this point in the history
  • Loading branch information
jdockerty committed Mar 30, 2024
1 parent e914394 commit e30c4e8
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 56 deletions.
61 changes: 14 additions & 47 deletions src/bin/sqrl-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ use dashmap::DashMap;
use raft::prelude::*;
use raft::storage::MemStorage;
use sqrl::client::Action;
use sqrl::raft::{Msg, ProposeCallback};
use sqrl::Cluster;
use sqrl::KvStore;
use sqrl::KvStoreError;
use sqrl::KvsEngine;
use sqrl::ENGINE_FILE;
use std::sync::mpsc::{channel, RecvTimeoutError};
use std::time::Duration;
use std::{ffi::OsString, path::PathBuf};
use std::{fmt::Display, net::SocketAddr};
Expand All @@ -16,21 +18,18 @@ use tokio::net::TcpListener;
use tokio::signal::ctrl_c;
use tracing::{debug, error, info};

type ProposeCallback = Box<dyn Fn() + Send>;
enum Msg {
Propose {
id: u8,
callback: Box<dyn Fn() + Send>,
},
Raft(Message),
}

#[derive(Debug, Parser)]
#[command(author, version, about, long_about = None)]
struct App {
#[clap(long, default_value = "127.0.0.1:4000")]
addr: SocketAddr,

#[clap(long, default_value = "1")]
node_id: u64,

#[clap(long, value_delimiter = ',')]
peers: Option<Vec<SocketAddr>>,

#[clap(name = "engine", short, long, default_value = "sqrl")]
engine_name: Engine,

Expand Down Expand Up @@ -67,10 +66,11 @@ impl Display for Engine {
async fn main() -> anyhow::Result<()> {
let app = App::parse();

let (tx, rx) = channel();
// We must error if the previous storage engine was not 'sqrl' as it is incompatible.
KvStore::engine_is_sqrl(app.engine_name.to_string(), app.log_file.join(ENGINE_FILE))?;
let kv = std::sync::Arc::new(KvStore::open(app.log_file)?);
let mut c = Cluster::new()?;
let kv = std::sync::Arc::new(KvStore::open(app.log_file)?.with_raft(tx.clone()));
let mut c = Cluster::new(app.node_id, app.peers)?;

info!(
"sqrl-server version: {}, engine: {}",
Expand All @@ -79,7 +79,7 @@ async fn main() -> anyhow::Result<()> {
);

let listener = TcpListener::bind(app.addr).await?;
info!("listening on {}", app.addr);
info!("k-v store server on {}", app.addr);

// Handle incoming connections.
tokio::spawn(async move {
Expand All @@ -89,12 +89,6 @@ async fn main() -> anyhow::Result<()> {
}
});

use std::{
sync::mpsc::{channel, RecvTimeoutError},
time::Duration,
};
let (tx, rx) = channel();
send_propose(tx);
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_millis(250));
let mut cbs = DashMap::new();
Expand All @@ -103,7 +97,6 @@ async fn main() -> anyhow::Result<()> {

match rx.recv_timeout(interval.period()) {
Ok(Msg::Propose { id, callback }) => {
println!("Proposal {}", id);
cbs.insert(id, callback);
c.node.propose(vec![], vec![id]).unwrap();
}
Expand All @@ -124,6 +117,7 @@ async fn main() -> anyhow::Result<()> {

Ok(())
}

fn on_ready(raft_group: &mut RawNode<MemStorage>, cbs: &mut DashMap<u8, ProposeCallback>) {
if !raft_group.has_ready() {
return;
Expand All @@ -134,7 +128,7 @@ fn on_ready(raft_group: &mut RawNode<MemStorage>, cbs: &mut DashMap<u8, ProposeC
let mut ready = raft_group.ready();

let handle_messages = |msgs: Vec<Message>| {
for _msg in msgs {
for msg in msgs {
// Send messages to other peers.
}
};
Expand Down Expand Up @@ -201,33 +195,6 @@ fn on_ready(raft_group: &mut RawNode<MemStorage>, cbs: &mut DashMap<u8, ProposeC
raft_group.advance_apply();
}

fn send_propose(sender: std::sync::mpsc::Sender<Msg>) {
std::thread::spawn(move || {
// Wait some time and send the request to the Raft.
std::thread::sleep(Duration::from_secs(3));

let (s1, r1) = std::sync::mpsc::channel::<u8>();

println!("propose a request");

// Send a command to the Raft, wait for the Raft to apply it
// and get the result.
sender
.send(Msg::Propose {
id: 1,
callback: Box::new(move || {
s1.send(0).unwrap();
}),
})
.unwrap();

let n = r1.recv().unwrap();
assert_eq!(n, 0);

println!("receive the propose callback");
});
}

async fn handle_connection(
mut stream: tokio::net::TcpStream,
kv: std::sync::Arc<KvStore>,
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub mod client;
mod engine;
/// Errors that may originate from operating the store.
mod error;
mod raft;
pub mod raft;
/// Implementation of the key-value store.
mod store;
pub use engine::KvsEngine;
Expand Down
10 changes: 8 additions & 2 deletions src/raft.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,8 @@
use crate::KvsEngine;
use raft::{storage::MemStorage, Config, RawNode};
use raft::prelude::Message;

pub type ProposeCallback = Box<dyn Fn() + Send>;

pub enum Msg {
Propose { id: u8, callback: ProposeCallback },
Raft(Message),
}
33 changes: 27 additions & 6 deletions src/store.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::engine::KvsEngine;
use crate::raft::Msg;
use crate::{KvStoreError, Result};
use crate::{LOG_PREFIX, MAX_LOG_FILE_SIZE};
use dashmap::DashMap;
Expand All @@ -9,6 +10,7 @@ use raft::RawNode;
use serde::{Deserialize, Serialize};
use std::fs::File;
use std::io::{prelude::*, BufReader, BufWriter, SeekFrom};
use std::net::SocketAddr;
use std::os::unix::fs::FileExt;
use std::path::PathBuf;
use std::str::FromStr;
Expand All @@ -20,21 +22,22 @@ use tracing::{self, debug, info, warn};
use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt;

pub struct Cluster {
pub peers: Option<Vec<SocketAddr>>,
pub node: raft::RawNode<raft::storage::MemStorage>,
}

impl Cluster {
pub fn new() -> anyhow::Result<Cluster> {
let node = Self::raft_init()?;
Ok(Cluster { node })
pub fn new(node_id: u64, peers: Option<Vec<SocketAddr>>) -> anyhow::Result<Cluster> {
let node = Self::raft_init(node_id)?;
Ok(Cluster { node, peers })
}
fn raft_init() -> anyhow::Result<RawNode<MemStorage>> {
let id = 1;

fn raft_init(node_id: u64) -> anyhow::Result<RawNode<MemStorage>> {
let election_tick = 10;
let heartbeat_tick = 3;
let storage = MemStorage::new_with_conf_state((vec![1], vec![]));
let config = Config {
id,
id: node_id,
election_tick,
heartbeat_tick,
..Default::default()
Expand Down Expand Up @@ -89,6 +92,8 @@ pub struct KvStore {
/// The maximum size of a log file in bytes.
max_log_file_size: u64,

raft_tx: Option<std::sync::mpsc::Sender<Msg>>,

_tracing: Option<Arc<tracing::subscriber::DefaultGuard>>,
}

Expand Down Expand Up @@ -124,6 +129,16 @@ struct StoreConfig {
impl KvsEngine for KvStore {
/// Set the value of a key by inserting the value into the store for the given key.
async fn set(&self, key: String, value: String) -> Result<()> {
self.raft_tx
.as_ref()
.unwrap()
.send(Msg::Propose {
id: 1,
callback: Box::new(move || {
println!("Set callback");
}),
})
.unwrap();
debug!(key, value, "Setting key");
let timestamp = chrono::Utc::now().timestamp();
let entry = LogEntry {
Expand Down Expand Up @@ -244,9 +259,15 @@ impl KvStore {
keydir: Arc::new(DashMap::new()),
max_log_file_size: config.max_log_file_size,
_tracing: None,
raft_tx: None,
}
}

pub fn with_raft(&mut self, tx: std::sync::mpsc::Sender<Msg>) -> Self {
self.raft_tx = Some(tx);
self.clone()
}

/// Open a KvStore at the given path.
pub fn open<P>(path: P) -> Result<KvStore>
where
Expand Down

0 comments on commit e30c4e8

Please sign in to comment.