diff --git a/qmdb/examples/v1_fuzz/main.rs b/qmdb/examples/v1_fuzz/main.rs index 03a0d50..30a0e7a 100644 --- a/qmdb/examples/v1_fuzz/main.rs +++ b/qmdb/examples/v1_fuzz/main.rs @@ -111,7 +111,8 @@ fn run_fuzz_single_round( let task_count = task_list.len() as i64; println!("AA height={} task_count={:#08x}", height, task_count); let last_task_id = (height << IN_BLOCK_IDX_BITS) | (task_count - 1); - let success = ads.start_block(height, Arc::new(TasksManager::new(task_list, last_task_id))); + let (success, _) = + ads.start_block(height, Arc::new(TasksManager::new(task_list, last_task_id))); if !success { unsafe { if ROOT_BEFORE_SET[0] == [0u8; 32] { diff --git a/qmdb/src/def.rs b/qmdb/src/def.rs index 60fc348..52b1198 100644 --- a/qmdb/src/def.rs +++ b/qmdb/src/def.rs @@ -17,6 +17,7 @@ pub const DEFAULT_ENTRY_SIZE: usize = 300; pub const SENTRY_COUNT: usize = (1 << 16) / SHARD_COUNT; pub const PRUNE_EVERY_NBLOCKS: i64 = 500; +pub const MAX_PROOF_REQ: usize = 1000; pub const JOB_COUNT: usize = 2000; pub const SUB_JOB_RESERVE_COUNT: usize = 50; diff --git a/qmdb/src/flusher.rs b/qmdb/src/flusher.rs index c473bc8..dceaaa3 100644 --- a/qmdb/src/flusher.rs +++ b/qmdb/src/flusher.rs @@ -1,10 +1,16 @@ -use crate::def::{LEAF_COUNT_IN_TWIG, MIN_PRUNE_COUNT, PRUNE_EVERY_NBLOCKS, TWIG_SHIFT}; +use crate::def::{ + LEAF_COUNT_IN_TWIG, MAX_PROOF_REQ, MIN_PRUNE_COUNT, PRUNE_EVERY_NBLOCKS, SHARD_COUNT, + TWIG_SHIFT, +}; use crate::entryfile::{EntryBufferReader, EntryFile}; +use crate::merkletree::proof::ProofPath; use crate::merkletree::Tree; -use crate::metadb::MetaDB; +#[cfg(feature = "slow_hashing")] +use crate::merkletree::UpperTree; +use crate::metadb::{MetaDB, MetaInfo}; use parking_lot::RwLock; -use std::sync::mpsc::SyncSender; -use std::sync::{Arc, Barrier}; +use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; +use std::sync::{Arc, Barrier, Condvar, Mutex}; use std::thread; type RocksMetaDB = MetaDB; @@ -23,12 +29,14 @@ impl BarrierSet { } } +pub type ProofReqElem = Arc<(Mutex<(u64, Option>)>, Condvar)>; + pub struct Flusher { shards: Vec>, meta: Arc>, curr_height: i64, max_kept_height: i64, - end_block_chan: SyncSender, + end_block_chan: SyncSender>, } impl Flusher { @@ -37,7 +45,7 @@ impl Flusher { meta: Arc>, curr_height: i64, max_kept_height: i64, - end_block_chan: SyncSender, + end_block_chan: SyncSender>, ) -> Self { Self { shards, @@ -48,6 +56,14 @@ impl Flusher { } } + pub fn get_proof_req_senders(&self) -> Vec> { + let mut v = Vec::with_capacity(SHARD_COUNT); + for i in 0..SHARD_COUNT { + v.push(self.shards[i].proof_req_sender.clone()); + } + v + } + pub fn flush(&mut self, shard_count: usize) { loop { self.curr_height += 1; @@ -82,22 +98,27 @@ pub struct FlusherShard { tree: Tree, last_compact_done_sn: u64, shard_id: usize, + proof_req_sender: SyncSender, + proof_req_receiver: Receiver, #[cfg(feature = "slow_hashing")] - upper_tree_sender: SyncSender, + upper_tree_sender: SyncSender, #[cfg(feature = "slow_hashing")] - upper_tree_receiver: std::sync::mpsc::Receiver, + upper_tree_receiver: Receiver, } impl FlusherShard { pub fn new(tree: Tree, oldest_active_sn: u64, shard_id: usize) -> Self { #[cfg(feature = "slow_hashing")] - let (ut_sender, ut_receiver) = std::sync::mpsc::sync_channel(2); + let (ut_sender, ut_receiver) = sync_channel(2); + let (pr_sender, pr_receiver) = sync_channel(MAX_PROOF_REQ); Self { buf_read: None, tree, last_compact_done_sn: oldest_active_sn, shard_id, + proof_req_sender: pr_sender, + proof_req_receiver: pr_receiver, #[cfg(feature = "slow_hashing")] upper_tree_sender: ut_sender, #[cfg(feature = "slow_hashing")] @@ -105,13 +126,28 @@ impl FlusherShard { } } + pub fn handle_proof_req(&self) { + loop { + let pair = self.proof_req_receiver.try_recv(); + if pair.is_err() { + break; + } + let pair = pair.unwrap(); + let (lock, cvar) = &*pair; + let mut sn_proof = lock.lock().unwrap(); + let proof = self.tree.get_proof(sn_proof.0); + sn_proof.1 = Some(proof); + cvar.notify_one(); + } + } + pub fn flush( &mut self, prune_to_height: i64, curr_height: i64, meta: Arc>, bar_set: Arc, - end_block_chan: SyncSender, + end_block_chan: SyncSender>, ) { let buf_read = self.buf_read.as_mut().unwrap(); loop { @@ -172,7 +208,7 @@ impl FlusherShard { let youngest_twig_id = self.tree.youngest_twig_id; let shard_id = self.shard_id; - let mut upper_tree = crate::merkletree::UpperTree::empty(); + let mut upper_tree = UpperTree::empty(); std::mem::swap(&mut self.tree.upper_tree, &mut upper_tree); let upper_tree_sender = self.upper_tree_sender.clone(); thread::spawn(move || { @@ -191,6 +227,7 @@ impl FlusherShard { edge_nodes_bytes = upper_tree.prune_nodes(start_twig_id, end_twig_id, youngest_twig_id); } + //shard#0 must wait other shards to finish if shard_id == 0 { bar_set.metadb_bar.wait(); @@ -218,9 +255,9 @@ impl FlusherShard { if shard_id == 0 { meta.set_curr_height(curr_height); - meta.commit(); + let meta_info = meta.commit(); drop(meta); - match end_block_chan.send(curr_height) { + match end_block_chan.send(meta_info) { Ok(_) => { //println!("{} end block", curr_height); } @@ -290,6 +327,8 @@ impl FlusherShard { upper_tree.prune_nodes(start_twig_id, end_twig_id, youngest_twig_id); } + self.handle_proof_req(); + //shard#0 must wait other shards to finish if shard_id == 0 { bar_set.metadb_bar.wait(); @@ -317,9 +356,9 @@ impl FlusherShard { if shard_id == 0 { meta.set_curr_height(curr_height); - meta.commit(); + let meta_info = meta.commit(); drop(meta); - match end_block_chan.send(curr_height) { + match end_block_chan.send(meta_info) { Ok(_) => { //println!("{} end block", curr_height); } @@ -468,7 +507,7 @@ mod flusher_tests { ); assert_eq!(recovered_root, root); check_hash_consistency(&tree); - let mut proof_path = tree.get_proof(SENTRY_COUNT as u64); + let mut proof_path = tree.get_proof(SENTRY_COUNT as u64).unwrap(); check_proof(&mut proof_path).unwrap(); } } diff --git a/qmdb/src/lib.rs b/qmdb/src/lib.rs index b9e1823..27ebb04 100644 --- a/qmdb/src/lib.rs +++ b/qmdb/src/lib.rs @@ -23,12 +23,13 @@ pub mod test_helper; use aes_gcm::{Aes256Gcm, Key, KeyInit}; use byteorder::{BigEndian, ByteOrder}; use entryfile::entrybuffer; +use merkletree::proof::ProofPath; use parking_lot::RwLock; use std::collections::VecDeque; use std::fs; use std::path::Path; use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; -use std::sync::Arc; +use std::sync::{Arc, Condvar, Mutex}; use std::thread; use threadpool::ThreadPool; @@ -38,13 +39,13 @@ use crate::def::{ TWIG_SHIFT, }; use crate::entryfile::{entry::sentry_entry, EntryBz, EntryCache, EntryFile}; -use crate::flusher::{Flusher, FlusherShard}; +use crate::flusher::{Flusher, FlusherShard, ProofReqElem}; use crate::indexer::Indexer; use crate::merkletree::{ recover::{bytes_to_edge_nodes, recover_tree}, Tree, }; -use crate::metadb::MetaDB; +use crate::metadb::{MetaDB, MetaInfo}; use log::{debug, error, info}; #[cfg(all(target_os = "linux", feature = "directio"))] @@ -65,6 +66,7 @@ pub struct AdsCore { entry_files: Vec>, meta: Arc>, wrbuf_size: usize, + proof_req_senders: Vec>, } fn get_ciphers( @@ -112,7 +114,7 @@ impl AdsCore { pub fn new( task_hub: Arc, config: &config::Config, - ) -> (Self, Receiver, Flusher) { + ) -> (Self, Receiver>, Flusher) { #[cfg(feature = "tee_cipher")] assert!(config.aes_keys.unwrap().len() == 96); @@ -133,7 +135,7 @@ impl AdsCore { file_segment_size: usize, with_twig_file: bool, aes_keys: &Option<[u8; 96]>, - ) -> (Self, Receiver, Flusher) { + ) -> (Self, Receiver>, Flusher) { let (ciphers, idx_cipher, meta_db_cipher) = get_ciphers(aes_keys); let (data_dir, meta_dir, _indexer_dir) = Self::get_sub_dirs(dir); @@ -185,10 +187,35 @@ impl AdsCore { entry_files, meta: meta.clone(), wrbuf_size, + proof_req_senders: flusher.get_proof_req_senders(), }; (ads_core, eb_receiver, flusher) } + pub fn get_proof(&self, shard_id: usize, sn: u64) -> Result { + if cfg!(feature = "slow_hashing") { + return Err("do not support proof in slow hashing mode".to_owned()); + } + + let pair = Arc::new((Mutex::new((sn, Option::None)), Condvar::new())); + + if let Err(er) = self.proof_req_senders[shard_id].send(Arc::clone(&pair)) { + return Err(format!("send proof request failed: {:?}", er)); + } + + // wait for the request to be handled + let (lock, cvar) = &*pair; + let mut sn_proof = lock.lock().unwrap(); + while sn_proof.1.is_none() { + sn_proof = cvar.wait(sn_proof).unwrap(); + } + + if let Err(er) = sn_proof.1.as_ref().unwrap() { + return Err(format!("get proof failed: {:?}", er)); + } + sn_proof.1.take().unwrap() + } + pub fn get_entry_files(&self) -> Vec> { let mut res = Vec::with_capacity(self.entry_files.len()); for ef in self.entry_files.iter() { @@ -401,7 +428,7 @@ impl AdsCore { meta.set_next_serial_num(shard_id, SENTRY_COUNT as u64); } meta.insert_extra_data(0, "".to_owned()); - meta.commit() + meta.commit(); } pub fn check_entry(key_hash: &[u8], key: &[u8], entry_bz: &EntryBz) -> bool { @@ -598,7 +625,8 @@ pub struct AdsWrap { ads: Arc, cache: Arc, cache_list: Vec>, - end_block_chan: Receiver, // when ads finish the prev block disk job, there will receive something. + // when ads finish the prev block disk job, end_block_chan will receive MetaInfo + end_block_chan: Receiver>, stop_height: i64, } @@ -689,11 +717,18 @@ impl AdsWrap { self.ads.get_entry_files() } - pub fn flush(&mut self) { + pub fn get_proof(&self, shard_id: usize, sn: u64) -> Result { + self.ads.get_proof(shard_id, sn) + } + + pub fn flush(&mut self) -> Vec> { + let mut v = Vec::with_capacity(2); while self.task_hub.free_slot_count() < 2 { - let height = self.end_block_chan.recv().unwrap(); - self.task_hub.end_block(height); + let meta_info = self.end_block_chan.recv().unwrap(); + self.task_hub.end_block(meta_info.curr_height); + v.push(meta_info); } + v } fn allocate_cache(&mut self) -> Arc { @@ -718,21 +753,27 @@ impl AdsWrap { self.stop_height = height; } - pub fn start_block(&mut self, height: i64, tasks_manager: Arc>) -> bool { + pub fn start_block( + &mut self, + height: i64, + tasks_manager: Arc>, + ) -> (bool, Option>) { if height == self.stop_height + 1 { - return false; + return (false, Option::None); } self.cache = self.allocate_cache(); + let mut meta_info = Option::None; if self.task_hub.free_slot_count() == 0 { // adscore and task_hub are busy, wait for them to finish an old block - let height = self.end_block_chan.recv().unwrap(); - self.task_hub.end_block(height); + let _meta_info = self.end_block_chan.recv().unwrap(); + self.task_hub.end_block(_meta_info.curr_height); + meta_info = Some(_meta_info); } self.task_hub .start_block(height, tasks_manager, self.cache.clone()); - true + (true, meta_info) } pub fn get_shared(&self) -> SharedAdsWrap { diff --git a/qmdb/src/merkletree/tree.rs b/qmdb/src/merkletree/tree.rs index d810f0b..96e61d2 100644 --- a/qmdb/src/merkletree/tree.rs +++ b/qmdb/src/merkletree/tree.rs @@ -824,25 +824,28 @@ impl Tree { (upper_path, root) } - pub fn get_proof(&self, sn: u64) -> proof::ProofPath { + pub fn get_proof(&self, sn: u64) -> Result { let twig_id = sn >> TWIG_SHIFT; let mut path = proof::ProofPath::new(); path.serial_num = sn; if twig_id > self.youngest_twig_id { - panic!("twig_id > self.youngest_twig_id"); + return Err(format!("twig_id > self.youngest_twig_id")); } (path.upper_path, path.root) = self.get_upper_path_and_root(twig_id); if path.upper_path.is_empty() { - panic!("Cannot find upper path"); + return Err(format!("Cannot find upper path")); } if twig_id == self.youngest_twig_id { path.left_of_twig = proof::get_left_path_in_mem(&self.mtree_for_youngest_twig, sn); } else { - path.left_of_twig = - proof::get_left_path_on_disk(&self.twig_file_wr.twig_file, twig_id, sn); + let twig_file = &self.twig_file_wr.twig_file; + if twig_file.is_empty() { + return Err(format!("twig_file is empty")); + } + path.left_of_twig = proof::get_left_path_on_disk(twig_file, twig_id, sn); } let (s, k) = get_shard_idx_and_key(twig_id); let twig = self.upper_tree.active_twig_shards[s] @@ -853,7 +856,7 @@ impl Tree { .unwrap_or(&twig::NULL_ACTIVE_BITS); path.right_of_twig = proof::get_right_path(twig, active_bits, sn); - path + Ok(path) } pub fn get_hashes_by_pos_list(&self, pos_list: &Vec<(u8, u64)>) -> Vec<[u8; 32]> { diff --git a/qmdb/src/metadb.rs b/qmdb/src/metadb.rs index a327796..07fc055 100644 --- a/qmdb/src/metadb.rs +++ b/qmdb/src/metadb.rs @@ -15,20 +15,20 @@ use std::{ os::unix::fs::FileExt, }; -#[derive(serde::Serialize, serde::Deserialize)] +#[derive(serde::Serialize, serde::Deserialize, Clone)] pub struct MetaInfo { - curr_height: i64, - last_pruned_twig: [(u64, i64); SHARD_COUNT], - next_serial_num: [u64; SHARD_COUNT], - oldest_active_sn: [u64; SHARD_COUNT], - oldest_active_file_pos: [i64; SHARD_COUNT], - root_hash: [[u8; 32]; SHARD_COUNT], - root_hash_by_height: Vec<[u8; 32]>, - edge_nodes: [Vec; SHARD_COUNT], - twig_file_sizes: [i64; SHARD_COUNT], - entry_file_sizes: [i64; SHARD_COUNT], - first_twig_at_height: [(u64, i64); SHARD_COUNT], - extra_data: String, + pub curr_height: i64, + pub last_pruned_twig: [(u64, i64); SHARD_COUNT], + pub next_serial_num: [u64; SHARD_COUNT], + pub oldest_active_sn: [u64; SHARD_COUNT], + pub oldest_active_file_pos: [i64; SHARD_COUNT], + pub root_hash: [[u8; 32]; SHARD_COUNT], + pub root_hash_by_height: Vec<[u8; 32]>, + pub edge_nodes: [Vec; SHARD_COUNT], + pub twig_file_sizes: [i64; SHARD_COUNT], + pub entry_file_sizes: [i64; SHARD_COUNT], + pub first_twig_at_height: [(u64, i64); SHARD_COUNT], + pub extra_data: String, } impl MetaInfo { @@ -150,7 +150,7 @@ impl MetaDB { self.extra_data_map.insert(height, data); } - pub fn commit(&mut self) { + pub fn commit(&mut self) -> Arc { let kv = self.extra_data_map.remove(&self.info.curr_height).unwrap(); self.info.extra_data = kv.1; let name = format!("{}.{}", self.meta_file_name, self.info.curr_height % 2); @@ -202,6 +202,7 @@ impl MetaDB { self.history_file.write(&data[..]).unwrap(); } } + Arc::new(self.info.clone()) } pub fn set_curr_height(&mut self, h: i64) { diff --git a/qmdb/tests/test_proof.rs b/qmdb/tests/test_proof.rs index f0e8c9b..abc696d 100644 --- a/qmdb/tests/test_proof.rs +++ b/qmdb/tests/test_proof.rs @@ -1,12 +1,19 @@ +use std::{sync::Arc, thread}; + +use parking_lot::RwLock; use qmdb::{ - def::{ENTRY_BASE_LENGTH, TWIG_MASK}, + config::Config, + def::{ENTRY_BASE_LENGTH, IN_BLOCK_IDX_BITS, TWIG_MASK}, entryfile::entry, merkletree::{ check, helpers::build_test_tree, - proof::{self, ProofPath}, + proof::{self, check_proof, ProofPath}, }, + seqads::task::{SingleCsTask, TaskBuilder}, + tasks::TasksManager, test_helper::TempDir, + AdsCore, AdsWrap, ADS, }; fn check_equal(pp: &ProofPath, other: &ProofPath) -> String { @@ -71,7 +78,7 @@ fn test_tree_proof() { let max_sn = TWIG_MASK as i32 * 4 + 1600; for i in 0..max_sn { - let mut proof_path = tree.get_proof(i as u64); + let mut proof_path = tree.get_proof(i as u64).unwrap(); proof_path.check(false).unwrap(); let bz = proof_path.to_bytes(); @@ -86,7 +93,7 @@ fn test_tree_proof() { let mut bz = [0u8; ENTRY_BASE_LENGTH + 8]; let null_hash = entry::null_entry(&mut bz[..]).hash(); for i in max_sn.._max_sn { - let mut proof_path = tree.get_proof(i as u64); + let mut proof_path = tree.get_proof(i as u64).unwrap(); assert_eq!(proof_path.left_of_twig[0].self_hash, null_hash); proof_path.check(false).unwrap(); @@ -97,3 +104,84 @@ fn test_tree_proof() { path2.check(true).unwrap(); } } + +#[test] +fn test_tree_get_proof() { + let ads_dir = "./test_tree_get_proof"; + let _tmp_dir = TempDir::new(ads_dir); + + let mut config = Config::from_dir(ads_dir); + AdsCore::init_dir(&config); + config.set_with_twig_file(true); + + let mut ads = Box::new(AdsWrap::new(&config)); + let ads_p = &mut *ads as *mut AdsWrap; + + let handle = thread::spawn(move || { + let mut proof = ads.get_proof(5, 4096).unwrap(); + assert_eq!( + proof.left_of_twig[0].self_hash, + [ + 93, 178, 212, 56, 37, 103, 172, 205, 255, 184, 39, 231, 94, 228, 14, 210, 209, 165, + 122, 253, 187, 100, 5, 13, 55, 169, 246, 181, 247, 201, 159, 152 + ] + ); + check_proof(&mut proof).unwrap(); + }); + + unsafe { + let ads = &mut (*ads_p); + let task_id = 1 << IN_BLOCK_IDX_BITS; + ads.start_block( + 1, + Arc::new(TasksManager::new( + vec![RwLock::new(Some( + TaskBuilder::new().create(b"k12", b"v1").build(), + ))], + task_id, + )), + ); + let shared_ads = ads.get_shared(); + shared_ads.insert_extra_data(1, "".to_owned()); + shared_ads.add_task(task_id); + } + handle.join().unwrap(); +} + +#[test] +fn test_tree_get_proof_err() { + let ads_dir = "./test_tree_get_proof_err"; + let _tmp_dir = TempDir::new(ads_dir); + + let config = Config::from_dir(ads_dir); + AdsCore::init_dir(&config); + + let mut ads = Box::new(AdsWrap::new(&config)); + let ads_p = &mut *ads as *mut AdsWrap; + + let handle = thread::spawn(move || { + let proof = ads.get_proof(5, 4095); + assert!(proof.is_err()); + if let Err(e) = proof { + assert_eq!(e, format!("get proof failed: \"twig_file is empty\"")); + } + }); + + unsafe { + let ads = &mut (*ads_p); + let task_id = 1 << IN_BLOCK_IDX_BITS; + ads.start_block( + 1, + Arc::new(TasksManager::new( + vec![RwLock::new(Some( + TaskBuilder::new().create(b"k12", b"v1").build(), + ))], + task_id, + )), + ); + let shared_ads = ads.get_shared(); + shared_ads.insert_extra_data(1, "".to_owned()); + shared_ads.add_task(task_id); + } + handle.join().unwrap(); +}