Skip to content

Commit

Permalink
wip: raft
Browse files Browse the repository at this point in the history
  • Loading branch information
jdockerty committed Apr 1, 2024
1 parent 9ef9957 commit 63db935
Show file tree
Hide file tree
Showing 5 changed files with 241 additions and 113 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ bytes = "1.6.0"
reqwest = { version = "0.12.2", features = ["json"] }
tracing-slog = "0.2.0"
slog = "2.7.0"
protobuf = { version = "2", features = ["with-bytes"] }

[build-dependencies]
prost-build = "0.12.3"
280 changes: 187 additions & 93 deletions src/bin/sqrl-server.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
use clap::Parser;
use dashmap::DashMap;
use protobuf::Message as _;
use raft::eraftpb::ConfChange;
use raft::eraftpb::Message;
use raft::storage::MemStorage;
use raft::{prelude::*, StateRole};
use sqrl::client::{self, Action};
use sqrl::raft::{Msg, ProposeCallback};
use sqrl::raft::{Msg, Node, ProposeCallback};
use sqrl::Cluster;
use sqrl::KvStore;
use sqrl::KvStoreError;
use sqrl::KvsEngine;
use sqrl::ENGINE_FILE;
use std::collections::VecDeque;
use std::sync::mpsc::{channel, RecvTimeoutError};
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -17,6 +21,7 @@ use std::{fmt::Display, net::SocketAddr};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::signal::ctrl_c;
use tokio::sync::Mutex;
use tracing::{debug, error, info};

#[derive(Debug, Parser)]
Expand Down Expand Up @@ -74,7 +79,7 @@ async fn main() -> anyhow::Result<()> {
.with_max_level(app.log_level)
.init();
let kv = std::sync::Arc::new(KvStore::open(app.log_file)?.with_raft(tx.clone()));
let mut c = Cluster::new(app.node_id, app.peers)?;
let mut c = Cluster::new(app.node_id, app.peers.clone())?;

info!(
"sqrl-server version: {}, engine: {}",
Expand All @@ -93,38 +98,66 @@ async fn main() -> anyhow::Result<()> {
}
});

tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_millis(100));
let mut cbs = DashMap::new();
loop {
interval.tick().await;

match rx.recv_timeout(interval.period()) {
Ok(Msg::Propose { id, callback }) => {
cbs.insert(id, callback);
c.node.propose(vec![], vec![id]).unwrap();
}
Ok(Msg::Set { id, key, value }) => {
if c.node.raft.state == StateRole::Leader {
info!("Leader sending to peers");
let mut stream = TcpStream::connect("127.0.0.1:4001").await.unwrap();
client::set(&mut stream, key.clone(), value.clone())
.await
.unwrap();
stream.shutdown().await.unwrap();
match app.peers {
Some(peers) => {
info!(node_id = app.node_id, "Starting Raft node");
tokio::spawn(async move {
// A global pending proposals queue. New proposals will be pushed back into the queue, and
// after it's committed by the raft cluster, it will be poped from the queue.
let proposals = Arc::new(Mutex::new(VecDeque::<Proposal>::new()));

let mut interval = tokio::time::interval(Duration::from_millis(100));
let mut cbs = DashMap::new();
loop {
interval.tick().await;

let node = match c.node.0 {
Some(ref mut c) => c,
// Node not initialized
None => continue,
};

match rx.recv_timeout(interval.period()) {
Ok(Msg::Propose { id, callback }) => {
cbs.insert(id, callback);
node.propose(vec![], vec![id]).unwrap();
}
Ok(Msg::Set { id, key, value }) => {
match node.raft.state {
StateRole::Leader => {
info!("Leader sending to peers");
for p in &peers {
info!("sending to {p}");
let mut proposals = proposals.lock().await;
for p in proposals.iter_mut().skip_while(|p| p.proposed > 0)
{
propose(node, p);
}
let mut stream = TcpStream::connect(p).await.unwrap();
client::set(&mut stream, key.clone(), value.clone())
.await
.unwrap();
}
}
StateRole::Follower => {
info!("Follower receiving from leader");
}
_ => {}
}

//node.propose(vec![id], format!("{}={}", key, value).into_bytes())
// .unwrap();
}
Err(RecvTimeoutError::Timeout) => (),
Err(RecvTimeoutError::Disconnected) => return,
}
c.node
.propose(vec![id], format!("{}={}", key, value).into_bytes())
.unwrap();
node.tick();
on_ready(&mut c.node, &mut cbs);
}
Err(RecvTimeoutError::Timeout) => (),
Err(RecvTimeoutError::Disconnected) => return,
}
//println!("Ticking");
c.node.tick();
on_ready(&mut c.node, &mut cbs);
});
}
});
None => info!("Starting single node store"),
};

// TODO: add cancellation tokens
match ctrl_c().await {
Expand All @@ -135,81 +168,92 @@ async fn main() -> anyhow::Result<()> {
Ok(())
}

fn on_ready(raft_group: &mut RawNode<MemStorage>, cbs: &mut DashMap<u8, ProposeCallback>) {
if !raft_group.has_ready() {
return;
}
let store = raft_group.raft.raft_log.store.clone();

// Get the `Ready` with `RawNode::ready` interface.
let mut ready = raft_group.ready();

let handle_messages = |msgs: Vec<Message>| {
for msg in msgs {
// Send messages to other peers.
fn on_ready(raft_group: &mut Node, cbs: &mut DashMap<u8, ProposeCallback>) {
if let Some(ref mut raft_group) = &mut raft_group.0 {
if !raft_group.has_ready() {
return;
}
};
let store = raft_group.raft.raft_log.store.clone();

if !ready.messages().is_empty() {
// Send out the messages come from the node.
handle_messages(ready.take_messages());
}
// Get the `Ready` with `RawNode::ready` interface.
let mut ready = raft_group.ready();

if !ready.snapshot().is_empty() {
// This is a snapshot, we need to apply the snapshot at first.
store.wl().apply_snapshot(ready.snapshot().clone()).unwrap();
}
let handle_messages = |msgs: Vec<Message>| {
for msg in msgs {
// Send messages to other peers.
}
};

let mut _last_apply_index = 0;
let mut handle_committed_entries = |committed_entries: Vec<Entry>| {
for entry in committed_entries {
// Mostly, you need to save the last apply index to resume applying
// after restart. Here we just ignore this because we use a Memory storage.
_last_apply_index = entry.index;
if !ready.messages().is_empty() {
// Send out the messages come from the node.
handle_messages(ready.take_messages());
}

if entry.data.is_empty() {
// Empty entry, when the peer becomes Leader it will send an empty entry.
continue;
}
if !ready.snapshot().is_empty() {
// This is a snapshot, we need to apply the snapshot at first.
store.wl().apply_snapshot(ready.snapshot().clone()).unwrap();
}

if entry.get_entry_type() == EntryType::EntryNormal {
if let Some((_, cb)) = cbs.remove(entry.data.first().unwrap()) {
cb();
let mut _last_apply_index = 0;
let handle_committed_entries =
|rn: &mut RawNode<MemStorage>, committed_entries: Vec<Entry>| {
for entry in committed_entries {
if entry.data.is_empty() {
// From new elected leaders.
continue;
}
if let EntryType::EntryConfChange = entry.get_entry_type() {
info!("ConfChange!");
// For conf change messages, make them effective.
let mut cc = ConfChange::default();
cc.merge_from_bytes(&entry.data).unwrap();
let cs = rn.apply_conf_change(&cc).unwrap();
store.wl().set_conf_state(cs);
} else {
// For normal proposals, extract the key-value pair and then
// insert them into the kv engine.
}
if rn.raft.state == StateRole::Leader {
info!("Leader proposing to new clients");
// The leader should response to the clients, tell them if their proposals
// succeeded or not.
// let proposal = proposals.lock().unwrap().pop_front().unwrap();
// proposal.propose_success.send(true).unwrap();
}
}
}
};
handle_committed_entries(raft_group, ready.take_committed_entries());

// TODO: handle EntryConfChange
// Persistent raft logs. It's necessary because in `RawNode::advance` we stabilize
// raft logs to the latest position.
if let Err(e) = store.wl().append(ready.entries()) {
error!("persist raft log fail: {:?}, need to retry or panic", e);
return;
}
};
handle_committed_entries(ready.take_committed_entries());

if !ready.entries().is_empty() {
// Append entries to the Raft log.
store.wl().append(ready.entries()).unwrap();
}

if let Some(hs) = ready.hs() {
// Raft HardState changed, and we need to persist it.
store.wl().set_hardstate(hs.clone());
}
if let Some(hs) = ready.hs() {
// Raft HardState changed, and we need to persist it.
store.wl().set_hardstate(hs.clone());
}

if !ready.persisted_messages().is_empty() {
// Send out the persisted messages come from the node.
handle_messages(ready.take_persisted_messages());
}
if !ready.persisted_messages().is_empty() {
// Send out the persisted messages come from the node.
handle_messages(ready.take_persisted_messages());
}

// Advance the Raft.
let mut light_rd = raft_group.advance(ready);
// Update commit index.
if let Some(commit) = light_rd.commit_index() {
store.wl().mut_hard_state().set_commit(commit);
// Advance the Raft.
let mut light_rd = raft_group.advance(ready);
// Update commit index.
if let Some(commit) = light_rd.commit_index() {
store.wl().mut_hard_state().set_commit(commit);
}
// Send out the messages.
handle_messages(light_rd.take_messages());
// Apply all committed entries.
handle_committed_entries(raft_group, light_rd.take_committed_entries());
// Advance the apply index.
raft_group.advance_apply();
}
// Send out the messages.
handle_messages(light_rd.take_messages());
// Apply all committed entries.
handle_committed_entries(light_rd.take_committed_entries());
// Advance the apply index.
raft_group.advance_apply();
}

async fn handle_connection(
Expand Down Expand Up @@ -256,3 +300,53 @@ async fn handle_connection(

Ok(())
}

struct Proposal {
normal: Option<(u16, String)>, // key is an u16 integer, and value is a string.
conf_change: Option<ConfChange>, // conf change.
transfer_leader: Option<u64>,
// If it's proposed, it will be set to the index of the entry.
proposed: u64,
}

impl Proposal {
fn conf_change(cc: &ConfChange) -> Self {
Proposal {
normal: None,
conf_change: Some(cc.clone()),
transfer_leader: None,
proposed: 0,
}
}

fn normal(key: u16, value: String) -> Self {
Proposal {
normal: Some((key, value)),
conf_change: None,
transfer_leader: None,
proposed: 0,
}
}
}

fn propose(raft_group: &mut RawNode<MemStorage>, proposal: &mut Proposal) {
let last_index1 = raft_group.raft.raft_log.last_index() + 1;
if let Some((ref key, ref value)) = proposal.normal {
let data = format!("put {} {}", key, value).into_bytes();
let _ = raft_group.propose(vec![], data);
} else if let Some(ref cc) = proposal.conf_change {
let _ = raft_group.propose_conf_change(vec![], cc.clone());
} else if let Some(_transferee) = proposal.transfer_leader {
// TODO: implement transfer leader.
unimplemented!();
}

let last_index2 = raft_group.raft.raft_log.last_index() + 1;
if last_index2 == last_index1 {
// Propose failed, don't forget to respond to the client.
// TODO: responding to other peers about this failure
error!("Propsal failed!");
} else {
proposal.proposed = last_index1;
}
}
Loading

0 comments on commit 63db935

Please sign in to comment.