Skip to content

Commit

Permalink
add get_proof function to AdsCore
Browse files Browse the repository at this point in the history
  • Loading branch information
absolute0kelvin committed Jan 20, 2025
1 parent 34aa582 commit 9b7f6b4
Show file tree
Hide file tree
Showing 7 changed files with 230 additions and 56 deletions.
3 changes: 2 additions & 1 deletion qmdb/examples/v1_fuzz/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ fn run_fuzz_single_round(
let task_count = task_list.len() as i64;
println!("AA height={} task_count={:#08x}", height, task_count);
let last_task_id = (height << IN_BLOCK_IDX_BITS) | (task_count - 1);
let success = ads.start_block(height, Arc::new(TasksManager::new(task_list, last_task_id)));
let (success, _) =
ads.start_block(height, Arc::new(TasksManager::new(task_list, last_task_id)));
if !success {
unsafe {
if ROOT_BEFORE_SET[0] == [0u8; 32] {
Expand Down
1 change: 1 addition & 0 deletions qmdb/src/def.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub const DEFAULT_ENTRY_SIZE: usize = 300;
pub const SENTRY_COUNT: usize = (1 << 16) / SHARD_COUNT;

pub const PRUNE_EVERY_NBLOCKS: i64 = 500;
pub const MAX_PROOF_REQ: usize = 1000;

pub const JOB_COUNT: usize = 2000;
pub const SUB_JOB_RESERVE_COUNT: usize = 50;
Expand Down
71 changes: 55 additions & 16 deletions qmdb/src/flusher.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
use crate::def::{LEAF_COUNT_IN_TWIG, MIN_PRUNE_COUNT, PRUNE_EVERY_NBLOCKS, TWIG_SHIFT};
use crate::def::{
LEAF_COUNT_IN_TWIG, MAX_PROOF_REQ, MIN_PRUNE_COUNT, PRUNE_EVERY_NBLOCKS, SHARD_COUNT,
TWIG_SHIFT,
};
use crate::entryfile::{EntryBufferReader, EntryFile};
use crate::merkletree::proof::ProofPath;
use crate::merkletree::Tree;
use crate::metadb::MetaDB;
#[cfg(feature = "slow_hashing")]
use crate::merkletree::UpperTree;
use crate::metadb::{MetaDB, MetaInfo};
use parking_lot::RwLock;
use std::sync::mpsc::SyncSender;
use std::sync::{Arc, Barrier};
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
use std::sync::{Arc, Barrier, Condvar, Mutex};
use std::thread;

type RocksMetaDB = MetaDB;
Expand All @@ -23,12 +29,14 @@ impl BarrierSet {
}
}

pub type ProofReqElem = Arc<(Mutex<(u64, Option<Result<ProofPath, String>>)>, Condvar)>;

pub struct Flusher {
shards: Vec<Box<FlusherShard>>,
meta: Arc<RwLock<RocksMetaDB>>,
curr_height: i64,
max_kept_height: i64,
end_block_chan: SyncSender<i64>,
end_block_chan: SyncSender<Arc<MetaInfo>>,
}

impl Flusher {
Expand All @@ -37,7 +45,7 @@ impl Flusher {
meta: Arc<RwLock<RocksMetaDB>>,
curr_height: i64,
max_kept_height: i64,
end_block_chan: SyncSender<i64>,
end_block_chan: SyncSender<Arc<MetaInfo>>,
) -> Self {
Self {
shards,
Expand All @@ -48,6 +56,14 @@ impl Flusher {
}
}

pub fn get_proof_req_senders(&self) -> Vec<SyncSender<ProofReqElem>> {
let mut v = Vec::with_capacity(SHARD_COUNT);
for i in 0..SHARD_COUNT {
v.push(self.shards[i].proof_req_sender.clone());
}
v
}

pub fn flush(&mut self, shard_count: usize) {
loop {
self.curr_height += 1;
Expand Down Expand Up @@ -82,36 +98,56 @@ pub struct FlusherShard {
tree: Tree,
last_compact_done_sn: u64,
shard_id: usize,
proof_req_sender: SyncSender<ProofReqElem>,
proof_req_receiver: Receiver<ProofReqElem>,
#[cfg(feature = "slow_hashing")]
upper_tree_sender: SyncSender<crate::merkletree::UpperTree>,
upper_tree_sender: SyncSender<UpperTree>,
#[cfg(feature = "slow_hashing")]
upper_tree_receiver: std::sync::mpsc::Receiver<crate::merkletree::UpperTree>,
upper_tree_receiver: Receiver<UpperTree>,
}

impl FlusherShard {
pub fn new(tree: Tree, oldest_active_sn: u64, shard_id: usize) -> Self {
#[cfg(feature = "slow_hashing")]
let (ut_sender, ut_receiver) = std::sync::mpsc::sync_channel(2);
let (ut_sender, ut_receiver) = sync_channel(2);
let (pr_sender, pr_receiver) = sync_channel(MAX_PROOF_REQ);

Self {
buf_read: None,
tree,
last_compact_done_sn: oldest_active_sn,
shard_id,
proof_req_sender: pr_sender,
proof_req_receiver: pr_receiver,
#[cfg(feature = "slow_hashing")]
upper_tree_sender: ut_sender,
#[cfg(feature = "slow_hashing")]
upper_tree_receiver: ut_receiver,
}
}

pub fn handle_proof_req(&self) {
loop {
let pair = self.proof_req_receiver.try_recv();
if pair.is_err() {
break;
}
let pair = pair.unwrap();
let (lock, cvar) = &*pair;
let mut sn_proof = lock.lock().unwrap();
let proof = self.tree.get_proof(sn_proof.0);
sn_proof.1 = Some(proof);
cvar.notify_one();
}
}

pub fn flush(
&mut self,
prune_to_height: i64,
curr_height: i64,
meta: Arc<RwLock<RocksMetaDB>>,
bar_set: Arc<BarrierSet>,
end_block_chan: SyncSender<i64>,
end_block_chan: SyncSender<Arc<MetaInfo>>,
) {
let buf_read = self.buf_read.as_mut().unwrap();
loop {
Expand Down Expand Up @@ -172,7 +208,7 @@ impl FlusherShard {

let youngest_twig_id = self.tree.youngest_twig_id;
let shard_id = self.shard_id;
let mut upper_tree = crate::merkletree::UpperTree::empty();
let mut upper_tree = UpperTree::empty();
std::mem::swap(&mut self.tree.upper_tree, &mut upper_tree);
let upper_tree_sender = self.upper_tree_sender.clone();
thread::spawn(move || {
Expand All @@ -191,6 +227,7 @@ impl FlusherShard {
edge_nodes_bytes =
upper_tree.prune_nodes(start_twig_id, end_twig_id, youngest_twig_id);
}

//shard#0 must wait other shards to finish
if shard_id == 0 {
bar_set.metadb_bar.wait();
Expand Down Expand Up @@ -218,9 +255,9 @@ impl FlusherShard {

if shard_id == 0 {
meta.set_curr_height(curr_height);
meta.commit();
let meta_info = meta.commit();
drop(meta);
match end_block_chan.send(curr_height) {
match end_block_chan.send(meta_info) {
Ok(_) => {
//println!("{} end block", curr_height);
}
Expand Down Expand Up @@ -290,6 +327,8 @@ impl FlusherShard {
upper_tree.prune_nodes(start_twig_id, end_twig_id, youngest_twig_id);
}

self.handle_proof_req();

//shard#0 must wait other shards to finish
if shard_id == 0 {
bar_set.metadb_bar.wait();
Expand Down Expand Up @@ -317,9 +356,9 @@ impl FlusherShard {

if shard_id == 0 {
meta.set_curr_height(curr_height);
meta.commit();
let meta_info = meta.commit();
drop(meta);
match end_block_chan.send(curr_height) {
match end_block_chan.send(meta_info) {
Ok(_) => {
//println!("{} end block", curr_height);
}
Expand Down Expand Up @@ -468,7 +507,7 @@ mod flusher_tests {
);
assert_eq!(recovered_root, root);
check_hash_consistency(&tree);
let mut proof_path = tree.get_proof(SENTRY_COUNT as u64);
let mut proof_path = tree.get_proof(SENTRY_COUNT as u64).unwrap();
check_proof(&mut proof_path).unwrap();
}
}
71 changes: 56 additions & 15 deletions qmdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ pub mod test_helper;
use aes_gcm::{Aes256Gcm, Key, KeyInit};
use byteorder::{BigEndian, ByteOrder};
use entryfile::entrybuffer;
use merkletree::proof::ProofPath;
use parking_lot::RwLock;
use std::collections::VecDeque;
use std::fs;
use std::path::Path;
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
use std::sync::Arc;
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use threadpool::ThreadPool;

Expand All @@ -38,13 +39,13 @@ use crate::def::{
TWIG_SHIFT,
};
use crate::entryfile::{entry::sentry_entry, EntryBz, EntryCache, EntryFile};
use crate::flusher::{Flusher, FlusherShard};
use crate::flusher::{Flusher, FlusherShard, ProofReqElem};
use crate::indexer::Indexer;
use crate::merkletree::{
recover::{bytes_to_edge_nodes, recover_tree},
Tree,
};
use crate::metadb::MetaDB;
use crate::metadb::{MetaDB, MetaInfo};
use log::{debug, error, info};

#[cfg(all(target_os = "linux", feature = "directio"))]
Expand All @@ -65,6 +66,7 @@ pub struct AdsCore {
entry_files: Vec<Arc<EntryFile>>,
meta: Arc<RwLock<MetaDB>>,
wrbuf_size: usize,
proof_req_senders: Vec<SyncSender<ProofReqElem>>,
}

fn get_ciphers(
Expand Down Expand Up @@ -112,7 +114,7 @@ impl AdsCore {
pub fn new(
task_hub: Arc<dyn TaskHub>,
config: &config::Config,
) -> (Self, Receiver<i64>, Flusher) {
) -> (Self, Receiver<Arc<MetaInfo>>, Flusher) {
#[cfg(feature = "tee_cipher")]
assert!(config.aes_keys.unwrap().len() == 96);

Expand All @@ -133,7 +135,7 @@ impl AdsCore {
file_segment_size: usize,
with_twig_file: bool,
aes_keys: &Option<[u8; 96]>,
) -> (Self, Receiver<i64>, Flusher) {
) -> (Self, Receiver<Arc<MetaInfo>>, Flusher) {
let (ciphers, idx_cipher, meta_db_cipher) = get_ciphers(aes_keys);
let (data_dir, meta_dir, _indexer_dir) = Self::get_sub_dirs(dir);

Expand Down Expand Up @@ -185,10 +187,35 @@ impl AdsCore {
entry_files,
meta: meta.clone(),
wrbuf_size,
proof_req_senders: flusher.get_proof_req_senders(),
};
(ads_core, eb_receiver, flusher)
}

pub fn get_proof(&self, shard_id: usize, sn: u64) -> Result<ProofPath, String> {
if cfg!(feature = "slow_hashing") {
return Err("do not support proof in slow hashing mode".to_owned());
}

let pair = Arc::new((Mutex::new((sn, Option::None)), Condvar::new()));

if let Err(er) = self.proof_req_senders[shard_id].send(Arc::clone(&pair)) {
return Err(format!("send proof request failed: {:?}", er));
}

// wait for the request to be handled
let (lock, cvar) = &*pair;
let mut sn_proof = lock.lock().unwrap();
while sn_proof.1.is_none() {
sn_proof = cvar.wait(sn_proof).unwrap();
}

if let Err(er) = sn_proof.1.as_ref().unwrap() {
return Err(format!("get proof failed: {:?}", er));
}
sn_proof.1.take().unwrap()
}

pub fn get_entry_files(&self) -> Vec<Arc<EntryFile>> {
let mut res = Vec::with_capacity(self.entry_files.len());
for ef in self.entry_files.iter() {
Expand Down Expand Up @@ -401,7 +428,7 @@ impl AdsCore {
meta.set_next_serial_num(shard_id, SENTRY_COUNT as u64);
}
meta.insert_extra_data(0, "".to_owned());
meta.commit()
meta.commit();
}

pub fn check_entry(key_hash: &[u8], key: &[u8], entry_bz: &EntryBz) -> bool {
Expand Down Expand Up @@ -598,7 +625,8 @@ pub struct AdsWrap<T: Task> {
ads: Arc<AdsCore>,
cache: Arc<EntryCache>,
cache_list: Vec<Arc<EntryCache>>,
end_block_chan: Receiver<i64>, // when ads finish the prev block disk job, there will receive something.
// when ads finish the prev block disk job, end_block_chan will receive MetaInfo
end_block_chan: Receiver<Arc<MetaInfo>>,
stop_height: i64,
}

Expand Down Expand Up @@ -689,11 +717,18 @@ impl<T: Task + 'static> AdsWrap<T> {
self.ads.get_entry_files()
}

pub fn flush(&mut self) {
pub fn get_proof(&self, shard_id: usize, sn: u64) -> Result<ProofPath, String> {
self.ads.get_proof(shard_id, sn)
}

pub fn flush(&mut self) -> Vec<Arc<MetaInfo>> {
let mut v = Vec::with_capacity(2);
while self.task_hub.free_slot_count() < 2 {
let height = self.end_block_chan.recv().unwrap();
self.task_hub.end_block(height);
let meta_info = self.end_block_chan.recv().unwrap();
self.task_hub.end_block(meta_info.curr_height);
v.push(meta_info);
}
v
}

fn allocate_cache(&mut self) -> Arc<EntryCache> {
Expand All @@ -718,21 +753,27 @@ impl<T: Task + 'static> AdsWrap<T> {
self.stop_height = height;
}

pub fn start_block(&mut self, height: i64, tasks_manager: Arc<TasksManager<T>>) -> bool {
pub fn start_block(
&mut self,
height: i64,
tasks_manager: Arc<TasksManager<T>>,
) -> (bool, Option<Arc<MetaInfo>>) {
if height == self.stop_height + 1 {
return false;
return (false, Option::None);
}
self.cache = self.allocate_cache();

let mut meta_info = Option::None;
if self.task_hub.free_slot_count() == 0 {
// adscore and task_hub are busy, wait for them to finish an old block
let height = self.end_block_chan.recv().unwrap();
self.task_hub.end_block(height);
let _meta_info = self.end_block_chan.recv().unwrap();
self.task_hub.end_block(_meta_info.curr_height);
meta_info = Some(_meta_info);
}

self.task_hub
.start_block(height, tasks_manager, self.cache.clone());
true
(true, meta_info)
}

pub fn get_shared(&self) -> SharedAdsWrap {
Expand Down
Loading

1 comment on commit 9b7f6b4

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 2.

Benchmark suite Current: 9b7f6b4 Previous: 34aa582 Ratio
Reads Throughput 385582.27666438324 ops/s 973184.9210757734 ops/s 2.52
Deletes Throughput 904468.9085194821 ops/s 2276840.747762771 ops/s 2.52

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.