diff --git a/servers/su/src/domain/clients/gateway.rs b/servers/su/src/domain/clients/gateway.rs index 07aacd29f..c9fdd4644 100644 --- a/servers/su/src/domain/clients/gateway.rs +++ b/servers/su/src/domain/clients/gateway.rs @@ -28,9 +28,19 @@ impl From for String { } } +/* + Right now we dont need all the fields + but later we can add to these types to + pull more data from gql responses +*/ +#[derive(Deserialize, Debug, Clone)] +struct Node { + id: String, +} + #[derive(Deserialize, Debug)] struct Edge { - node: GatewayTx, + node: Node, } #[derive(Deserialize, Debug)] @@ -209,82 +219,14 @@ impl Gateway for ArweaveGateway { } } - async fn raw(&self, tx_id: &String) -> Result, String> { - let config = AoConfig::new(Some("su".to_string())).expect("Failed to read configuration"); - let arweave_url = config.arweave_url; - - let url = match Url::parse(&arweave_url) { - Ok(u) => u, - Err(e) => return Err(format!("{}", e)), - }; - - let client = Client::new(); - - let response = client - .get( - url.join(&format!("raw/{}", tx_id)) - .map_err(|e| GatewayErrorType::StatusError(e.to_string()))?, - ) - .send() - .await - .map_err(|e| GatewayErrorType::StatusError(e.to_string()))?; - - if response.status().is_success() { - let body = response - .bytes() - .await - .map_err(|e| GatewayErrorType::StatusError(e.to_string()))?; - Ok(body.to_vec()) - } else { - Err(format!( - "Failed to get status. Status code: {}", - response.status() - )) - } - } - async fn gql_tx(&self, tx_id: &String) -> Result { let config = AoConfig::new(Some("su".to_string())).expect("Failed to read configuration"); let graphql_url = config.graphql_url; let client = Client::new(); - /* - id - signature - anchor - owner { - address - key - } - tags { - name - value - } - recipient - */ - let query = serde_json::json!({ "query": format!( - "query {{ - transactions(ids: [\"{}\"]) {{ - edges {{ - node {{ - id - signature - anchor - owner {{ - address - key - }} - tags {{ - name - value - }} - recipient - }} - }} - }} - }}", + "query {{ transactions (ids: [\"{}\"]){{ edges {{ node {{ id }} }} }} }}", tx_id ), "variables": {} @@ -308,7 +250,9 @@ impl Gateway for ArweaveGateway { .map_err(|e| GatewayErrorType::JsonParseError(e.to_string()))?; if let Some(edge) = body.data.transactions.edges.get(0) { - Ok(edge.node.clone()) + Ok(GatewayTx { + id: edge.node.clone().id, + }) } else { Err("Transaction not found".to_string()) } diff --git a/servers/su/src/domain/clients/local_store/store.rs b/servers/su/src/domain/clients/local_store/store.rs index ca2a9b891..ae598a5f6 100644 --- a/servers/su/src/domain/clients/local_store/store.rs +++ b/servers/su/src/domain/clients/local_store/store.rs @@ -111,7 +111,6 @@ impl LocalStoreClient { ("process_ordering".to_string(), opts_index.clone()), ("message".to_string(), opts_index.clone()), ("message_ordering".to_string(), opts_index.clone()), - ("deep_hash".to_string(), opts_index.clone()), ] } @@ -163,13 +162,6 @@ impl LocalStoreClient { )) } - fn deep_hash_key(&self, process_id: &String, deep_hash: &String) -> Result { - Ok(format!( - "deep_hash:{}:{}", - process_id, deep_hash - )) - } - /* This is the core method of this program used for querying message ranges for the /processid @@ -228,7 +220,7 @@ impl LocalStoreClient { if let Some(ref to_str) = to { if let Ok(to_timestamp) = to_str.parse::() { if timestamp > to_timestamp { - has_next_page = false; + has_next_page = true; break; } } @@ -296,7 +288,6 @@ impl DataStore for LocalStoreClient { &self, message: &Message, bundle_in: &[u8], - deep_hash: Option<&String>, ) -> Result { let message_id = message.message_id()?; let assignment_id = message.assignment_id()?; @@ -323,19 +314,6 @@ impl DataStore for LocalStoreClient { let assignment_key = self.msg_assignment_key(&assignment_id); self.file_db.put(assignment_key.as_bytes(), bundle_in)?; - let cf = self.index_db.cf_handle("deep_hash").ok_or_else(|| { - StoreErrorType::DatabaseError("Column family 'message_ordering' not found".to_string()) - })?; - - match deep_hash { - Some(dh) => { - let deep_hash_key = self.deep_hash_key(&message.process_id()?, dh)?; - self.index_db - .put_cf(cf, deep_hash_key.as_bytes(), message.process_id()?.as_bytes())?; - }, - None => () - }; - Ok("Message saved".to_string()) } @@ -421,23 +399,6 @@ impl DataStore for LocalStoreClient { } } - async fn check_existing_deep_hash(&self, process_id: &String, deep_hash: &String) -> Result<(), StoreErrorType> { - let cf = self.index_db.cf_handle("deep_hash").ok_or_else(|| { - StoreErrorType::DatabaseError("Column family 'deep_hash' not found".to_string()) - })?; - - let deep_hash_key = self.deep_hash_key(process_id, deep_hash)?; - match self.index_db.get_cf(cf, deep_hash_key) { - Ok(dh) => { - match dh { - Some(_) => return Err(StoreErrorType::MessageExists("Deep hash already exists".to_string())), - None => return Ok(()) - } - }, - Err(_) => return Ok(()) - } - } - /* Message list retrieval for the /processid query, this returns a paginated list of messages @@ -463,9 +424,7 @@ impl DataStore for LocalStoreClient { */ let include_process = process_in.assignment.is_some() && match from { - Some(ref from_timestamp) => { - from_timestamp != &process_in.timestamp()?.to_string() - }, + Some(ref from_nonce) => from_nonce == &process_in.nonce()?.to_string(), /* No 'from' means it's the first page */ @@ -482,26 +441,6 @@ impl DataStore for LocalStoreClient { actual_limit -= 1; } - /* - handles an edge case where "to" is the message right - after the process, and limit is 1 - */ - if include_process && actual_limit == 0 { - match to { - Some(t) => { - let timestamp: i64 = t.parse()?; - if timestamp == process_in.timestamp()? { - return Ok(PaginatedMessages::from_messages(messages, false)?); - } else if timestamp > process_in.timestamp()? { - return Ok(PaginatedMessages::from_messages(messages, true)?); - } - }, - None => { - return Ok(PaginatedMessages::from_messages(messages, false)?); - } - } - } - let (paginated_keys, has_next_page) = self .fetch_message_range(process_id, from, to, &Some(actual_limit)) .await?; @@ -514,11 +453,6 @@ impl DataStore for LocalStoreClient { for (_, assignment_id) in paginated_keys { let assignment_key = self.msg_assignment_key(&assignment_id); - /* - It is possible the file isnt finished saving and - available on the file db yet that is why this retry loop - is here. - */ for _ in 0..10 { if let Some(message_data) = self.file_db.get(assignment_key.as_bytes())? { let message: Message = Message::from_bytes(message_data)?; diff --git a/servers/su/src/domain/clients/local_store/tests.rs b/servers/su/src/domain/clients/local_store/tests.rs index 61f351f86..a4dfbd51d 100644 --- a/servers/su/src/domain/clients/local_store/tests.rs +++ b/servers/su/src/domain/clients/local_store/tests.rs @@ -67,7 +67,7 @@ mod tests { let message_bundle = create_test_message_bundle(); let test_message = Message::from_bytes(message_bundle.clone())?; - client.save_message(&test_message, &message_bundle, None).await?; + client.save_message(&test_message, &message_bundle).await?; let retrieved_message = client.get_message(&test_message.assignment.id)?; assert_eq!(retrieved_message.assignment.id, test_message.assignment.id); @@ -86,7 +86,7 @@ mod tests { // Save all messages for bundle in message_bundles.iter() { let test_message = Message::from_bytes(bundle.clone())?; - client.save_message(&test_message, &bundle, None).await?; + client.save_message(&test_message, &bundle).await?; } // Retrieve messages and check nonce order and continuity @@ -124,7 +124,7 @@ mod tests { for bundle in message_bundles.iter() { let test_message = Message::from_bytes(bundle.clone())?; - client.save_message(&test_message, &bundle, None).await?; + client.save_message(&test_message, &bundle).await?; } // Case 1: Default parameters @@ -202,7 +202,7 @@ mod tests { // Save half of the messages for bundle in message_bundles.iter().take(message_bundles.len() / 2) { let test_message = Message::from_bytes(bundle.clone())?; - client.save_message(&test_message, &bundle, None).await?; + client.save_message(&test_message, &bundle).await?; } let (process_bundle_2, message_bundles_2) = bundle_list_2(); @@ -212,19 +212,19 @@ mod tests { // Save half of the messages of next process for bundle in message_bundles_2.iter().take(message_bundles_2.len() / 2) { let test_message = Message::from_bytes(bundle.clone())?; - client.save_message(&test_message, &bundle, None).await?; + client.save_message(&test_message, &bundle).await?; } // Save second half of messages for the first process for bundle in message_bundles.iter().skip(message_bundles.len() / 2) { let test_message = Message::from_bytes(bundle.clone())?; - client.save_message(&test_message, &bundle, None).await?; + client.save_message(&test_message, &bundle).await?; } // Save second half of messages for the second process for bundle in message_bundles_2.iter().skip(message_bundles_2.len() / 2) { let test_message = Message::from_bytes(bundle.clone())?; - client.save_message(&test_message, &bundle, None).await?; + client.save_message(&test_message, &bundle).await?; } // Retrieve messages and check length, nonce order, and continuity diff --git a/servers/su/src/domain/clients/store.rs b/servers/su/src/domain/clients/store.rs index c5f970d76..80a441501 100644 --- a/servers/su/src/domain/clients/store.rs +++ b/servers/su/src/domain/clients/store.rs @@ -814,21 +814,10 @@ impl DataStore for StoreClient { } } - async fn check_existing_deep_hash(&self, process_id: &String, deep_hash: &String) -> Result<(), StoreErrorType> { - if self.bytestore.is_ready() { - match self.bytestore.deep_hash_exists(process_id, deep_hash) { - true => return Err(StoreErrorType::MessageExists("Deep hash already exists".to_string())), - false => return Ok(()) - } - } - Ok(()) - } - async fn save_message( &self, message: &Message, bundle_in: &[u8], - deep_hash: Option<&String>, ) -> Result { use super::schema::messages::dsl::*; let conn = &mut self.get_conn()?; @@ -864,15 +853,6 @@ impl DataStore for StoreClient { message.timestamp()?.to_string(), bundle_in.to_vec(), )?; - match deep_hash { - Some(dh) => { - bytestore.save_deep_hash( - &message.process_id()?, - dh - )?; - }, - None => () - }; } Ok("saved".to_string()) } @@ -920,7 +900,7 @@ impl DataStore for StoreClient { let from_timestamp = from_timestamp_str .parse::() .map_err(StoreErrorType::from)?; - from_timestamp != process_in.process.timestamp + from_timestamp == process_in.process.timestamp } None => true, // No 'from' timestamp means it's the first page }; @@ -1561,62 +1541,6 @@ mod bytestore { false } } - - pub fn save_deep_hash( - &self, - process_id: &String, - deep_hash: &String - ) -> Result<(), String> { - let key = format!( - "deephash___{}___{}", - process_id, - deep_hash - ).into_bytes(); - - let value = format!( - "{}", - process_id - ).into_bytes(); - - let db = match self.db.read() { - Ok(r) => r, - Err(_) => return Err("Failed to acquire read lock".into()), - }; - - if let Some(ref db) = *db { - db.put(key, value) - .map_err(|e| format!("Failed to write to RocksDB: {:?}", e))?; - Ok(()) - } else { - Err("Database is not initialized".into()) - } - } - - pub fn deep_hash_exists( - &self, - process_id: &String, - deep_hash: &String, - ) -> bool { - let key = format!( - "deephash___{}___{}", - process_id, - deep_hash - ).into_bytes(); - - let db = match self.db.read() { - Ok(r) => r, - Err(_) => return false, - }; - - if let Some(ref db) = *db { - match db.get(&key) { - Ok(Some(_)) => true, - _ => false, - } - } else { - false - } - } } } diff --git a/servers/su/src/domain/config.rs b/servers/su/src/domain/config.rs index 804a98605..749f0892f 100644 --- a/servers/su/src/domain/config.rs +++ b/servers/su/src/domain/config.rs @@ -39,8 +39,6 @@ pub struct AoConfig { pub use_local_store: bool, pub su_file_db_dir: String, pub su_index_db_dir: String, - - pub enable_deep_hash_checks: bool, } fn get_db_dirs() -> (String, String) { @@ -141,10 +139,6 @@ impl AoConfig { Err(_e) => false, }; let (su_file_db_dir, su_index_db_dir) = get_db_dirs(); - let enable_deep_hash_checks = match env::var("ENABLE_DEEP_HASH_CHECKS") { - Ok(val) => val == "true", - Err(_e) => false, - }; Ok(AoConfig { database_url: env::var("DATABASE_URL")?, database_read_url, @@ -167,7 +161,6 @@ impl AoConfig { use_local_store, su_file_db_dir, su_index_db_dir, - enable_deep_hash_checks }) } } @@ -182,7 +175,4 @@ impl Config for AoConfig { fn enable_process_assignment(&self) -> bool { self.enable_process_assignment.clone() } - fn enable_deep_hash_checks(&self) -> bool { - self.enable_deep_hash_checks.clone() - } } diff --git a/servers/su/src/domain/core/builder.rs b/servers/su/src/domain/core/builder.rs index 02f2f2dfc..3d277ec80 100644 --- a/servers/su/src/domain/core/builder.rs +++ b/servers/su/src/domain/core/builder.rs @@ -1,15 +1,17 @@ use std::sync::Arc; use super::tags::Tag; +use dashmap::DashMap; use super::bytes::{ByteErrorType, DataBundle, DataItem}; -use super::dal::{Gateway, Log, ScheduleProvider, Signer, TxStatus, GatewayTx}; +use super::dal::{Gateway, Log, ScheduleProvider, Signer, TxStatus}; use super::json::Process; pub struct Builder<'a> { gateway: Arc, signer: Arc, - logger: &'a Arc + logger: &'a Arc, + cache: Arc>>, } pub struct BuildResult { @@ -50,7 +52,8 @@ impl<'a> Builder<'a> { Ok(Builder { gateway, signer, - logger + logger, + cache: Arc::new(DashMap::new()), }) } @@ -202,7 +205,12 @@ impl<'a> Builder<'a> { tx_id: &String, process: &Process, base_layer: &Option, - ) -> Result, BuilderErrorType> { + ) -> Result<(), BuilderErrorType> { + // Check if the result is in the DashMap cache + if let Some(cached_result) = self.cache.get(tx_id) { + return cached_result.clone(); + } + // Process the assignment verification let result = match base_layer { Some(_) => { @@ -222,20 +230,23 @@ impl<'a> Builder<'a> { }; match status.number_of_confirmations { - n if n >= threshold => Ok(None), + n if n >= threshold => Ok(()), _ => Err(BuilderErrorType::BuilderError( "Not enough confirmations to assign".to_string(), )), } } None => { - Ok(Some(self.gateway.gql_tx(&tx_id).await?)) + self.gateway.gql_tx(&tx_id).await?; + Ok(()) } }; + // Store the result in the DashMap cache + self.cache.insert(tx_id.clone(), result.clone()); + result } - } #[cfg(test)] @@ -269,16 +280,8 @@ mod tests { async fn gql_tx(&self, _tx_id: &String) -> Result { Ok(GatewayTx { id: "id".to_string(), - signature: "sig".to_string(), - anchor: None, - tags: vec![], - recipient: None, }) } - - async fn raw(&self, _tx_id: &String) -> Result, String> { - Ok(vec![]) - } } struct MockSigner; diff --git a/servers/su/src/domain/core/bytes.rs b/servers/su/src/domain/core/bytes.rs index e30bc7a9a..e7bf1187a 100644 --- a/servers/su/src/domain/core/bytes.rs +++ b/servers/su/src/domain/core/bytes.rs @@ -38,12 +38,6 @@ impl From for ByteErrorType { } } -impl From for ByteErrorType { - fn from(error: base64_url::base64::DecodeError) -> Self { - ByteErrorType::ByteError(format!("Byte error: {:?}", error)) - } -} - #[derive(Clone)] pub struct DataBundle { pub items: Vec, @@ -332,7 +326,7 @@ impl DataItem { Ok(DataItem { signature_type: SignerMap::Arweave, signature: vec![], - owner, + owner: owner, target, anchor, tags, @@ -535,73 +529,6 @@ impl DataItem { Ok(b) } - /* - Utilized for deduplicating incoming messages even - if they have the same id - */ - pub fn deep_hash(&mut self) -> Result { - let data_chunk = match &mut self.data { - Data::None => DeepHashChunk::Chunk(Bytes::new()), - Data::Bytes(data) => DeepHashChunk::Chunk(data.clone().into()) - }; - - let encoded_tags = if !self.tags.is_empty() { - self.tags.encode()? - } else { - Bytes::default() - }; - - let deep_hash_vec = deep_hash_sync(DeepHashChunk::Chunks(vec![ - DeepHashChunk::Chunk(DATAITEM_AS_BUFFER.into()), - DeepHashChunk::Chunk(ONE_AS_BUFFER.into()), - DeepHashChunk::Chunk(ONE_AS_BUFFER.into()), - // this is where the owner normally would be - DeepHashChunk::Chunk(Bytes::new()), - DeepHashChunk::Chunk(self.target.to_vec().into()), - DeepHashChunk::Chunk(self.anchor.to_vec().into()), - DeepHashChunk::Chunk(encoded_tags.clone()), - data_chunk, - ]))?; - - Ok(base64_url::encode(&deep_hash_vec)) - } - - pub fn deep_hash_fields( - target: Option, - anchor: Option, - tags: Vec, - data: Vec - ) -> Result { - let target_chunk = match target { - None => DeepHashChunk::Chunk(Bytes::new()), - Some(t) => DeepHashChunk::Chunk(base64_url::decode(&t)?.into()) - }; - - let anchor_chunk = match anchor { - None => DeepHashChunk::Chunk(Bytes::new()), - Some(a) => DeepHashChunk::Chunk(base64_url::decode(&a)?.into()) - }; - - let encoded_tags = if !tags.is_empty() { - tags.encode()? - } else { - Bytes::default() - }; - - let deep_hash_vec = deep_hash_sync(DeepHashChunk::Chunks(vec![ - DeepHashChunk::Chunk(DATAITEM_AS_BUFFER.into()), - DeepHashChunk::Chunk(ONE_AS_BUFFER.into()), - DeepHashChunk::Chunk(ONE_AS_BUFFER.into()), - DeepHashChunk::Chunk(Bytes::new()), - target_chunk, - anchor_chunk, - DeepHashChunk::Chunk(encoded_tags.clone()), - DeepHashChunk::Chunk(Bytes::from(data)), - ]))?; - - Ok(base64_url::encode(&deep_hash_vec)) - } - pub fn raw_id(&self) -> Vec { let mut hasher = Sha256::new(); hasher.update(&self.signature); diff --git a/servers/su/src/domain/core/dal.rs b/servers/su/src/domain/core/dal.rs index f7b400a44..4a3edb0fc 100644 --- a/servers/su/src/domain/core/dal.rs +++ b/servers/su/src/domain/core/dal.rs @@ -4,7 +4,6 @@ use serde::Deserialize; pub use super::bytes::DataItem; pub use super::json::{JsonErrorType, Message, PaginatedMessages, Process}; pub use super::router::{ProcessScheduler, Scheduler}; -pub use super::tags::Tag; /* Interfaces for core dependencies. Implement these traits @@ -23,13 +22,9 @@ pub struct TxStatus { pub number_of_confirmations: i32, } -#[derive(Deserialize, Debug, Clone)] +#[derive(Deserialize)] pub struct GatewayTx { pub id: String, - pub signature: String, - pub anchor: Option, - pub tags: Vec, - pub recipient: Option, } #[async_trait] @@ -38,7 +33,6 @@ pub trait Gateway: Send + Sync { async fn network_info(&self) -> Result; async fn status(&self, tx_id: &String) -> Result; async fn gql_tx(&self, tx_id: &String) -> Result; - async fn raw(&self, tx_id: &String) -> Result, String>; } pub trait Wallet: Send + Sync { @@ -68,7 +62,6 @@ pub trait Config: Send + Sync { fn mode(&self) -> String; fn scheduler_list_path(&self) -> String; fn enable_process_assignment(&self) -> bool; - fn enable_deep_hash_checks(&self) -> bool; } #[derive(Debug)] @@ -128,7 +121,6 @@ pub trait DataStore: Send + Sync { &self, message: &Message, bundle_in: &[u8], - deep_hash: Option<&String>, ) -> Result; async fn get_messages( &self, @@ -143,7 +135,6 @@ pub trait DataStore: Send + Sync { process_id_in: &str, ) -> Result, StoreErrorType>; fn check_existing_message(&self, message_id: &String) -> Result<(), StoreErrorType>; - async fn check_existing_deep_hash(&self, process_id: &String, deep_hash: &String) -> Result<(), StoreErrorType>; } #[async_trait] diff --git a/servers/su/src/domain/core/flows.rs b/servers/su/src/domain/core/flows.rs index 56fb9fcdf..b76232207 100644 --- a/servers/su/src/domain/core/flows.rs +++ b/servers/su/src/domain/core/flows.rs @@ -9,7 +9,6 @@ use simd_json::to_string as simd_to_string; use super::builder::Builder; use super::json::{Message, Process}; use super::scheduler; -use super::bytes::DataItem; use super::dal::{ Config, CoreMetrics, DataStore, Gateway, Log, RouterDataStore, Signer, Uploader, Wallet, @@ -133,8 +132,7 @@ pub async fn write_item( set cache that gets set before the lock is released */ if let Some(ref item) = data_item { - deps.data_store.check_existing_message(&item.id())?; - + deps.data_store.check_existing_message(&item.id())? }; deps.logger.log(format!("checked for message existence- {}", &target_id)); @@ -150,11 +148,7 @@ pub async fn write_item( deps.logger.log(format!("incrememted scheduler - {}", &target_id)); - /* - XOR, if we have one of these, we must have both. - The else if condition here contains the flow for a - POST of an assignment - */ + // XOR, if we have one of these, we must have both. if process_id.is_some() ^ assign.is_some() { return Err("If sending assign or process-id, you must send both.".to_string()); } else if let (Some(process_id), Some(assign)) = (process_id.clone(), assign.clone()) { @@ -168,49 +162,18 @@ pub async fn write_item( .await?; let process = deps.data_store.get_process(&process_id).await?; - - let gateway_tx = match builder + builder .verify_assignment(&assign, &process, &base_layer) - .await? { - Some(g) => g, - None => return Err("Invalid gateway tx for assignming".to_string()) - }; + .await?; - /* - If this is an assignment of an AO Message, - check for a duplicate deep hash and throw - an error if we find one - */ - let deep_hash = match &base_layer { - Some(_) => None, - None => { - let tx_data = deps.gateway.raw(&assign).await?; - let dh = DataItem::deep_hash_fields( - gateway_tx.recipient, - gateway_tx.anchor, - gateway_tx.tags, - tx_data, - ) - .map_err(|_| "Unable to calculate deep hash".to_string())?; - - if deps.config.enable_deep_hash_checks() { - deps.data_store - .check_existing_deep_hash(&process_id, &dh) - .await?; - } - - Some(dh) - } - }; - let aid = assignment.id(); let return_aid = assignment.id(); let build_result = builder.bundle_items(vec![assignment]).await?; let message = Message::from_bundle(&build_result.bundle)?; deps.data_store - .save_message(&message, &build_result.binary, deep_hash.as_ref()) + .save_message(&message, &build_result.binary) .await?; - deps.logger.log(format!("saved message")); + deps.logger.log(format!("saved message - {:?}", &message)); /* we set the id of the previous assignment @@ -229,11 +192,6 @@ pub async fn write_item( return id_res(&deps, return_aid, start_top_level); } - /* - The rest of this function handles writing a Process - or a Message data item. - */ - let data_item = match data_item { Some(d) => d, None => return Err("Unable to parse data item".to_string()), @@ -246,146 +204,124 @@ pub async fn write_item( return Err("Data-Protocol tag not present".to_string()); } - let type_tag = match type_tag { - Some(t) => t, - None => return Err("Invalid Type Tag".to_string()) - }; + deps.logger.log(format!("tags cloned - {}", &target_id)); - if type_tag.value == "Process" { - let mod_tag_exists = tags.iter().any(|tag| tag.name == "Module"); - let sched_tag_exists = tags.iter().any(|tag| tag.name == "Scheduler"); + if let Some(type_tag) = type_tag { + if type_tag.value == "Process" { + let mod_tag_exists = tags.iter().any(|tag| tag.name == "Module"); + let sched_tag_exists = tags.iter().any(|tag| tag.name == "Scheduler"); - if !mod_tag_exists || !sched_tag_exists { - return Err( - "Required Module and Scheduler tags for Process type not present".to_string(), - ); - } + if !mod_tag_exists || !sched_tag_exists { + return Err( + "Required Module and Scheduler tags for Process type not present".to_string(), + ); + } - /* - If we dont enable_process_assignment, the - su will follow the old flow and not generate - an assignment for the process. + /* + If we dont enable_process_assignment, the + su will follow the old flow and not generate + an assignment for the process. - As a result, no process will be returned - in the messages list either, and the Nonce - will start at 0 for the first message - */ - if deps.config.enable_process_assignment() { - match data_item.tags().iter().find(|tag| tag.name == "On-Boot") { - Some(boot_tag) => match boot_tag.value.as_str() { - "Data" => (), - tx_id => { - if !deps.gateway.check_head(tx_id.to_string()).await? { - return Err("Invalid tx id for On-Boot tag".to_string()); + As a result, no process will be returned + in the messages list either, and the Nonce + will start at 0 for the first message + */ + if deps.config.enable_process_assignment() { + match data_item.tags().iter().find(|tag| tag.name == "On-Boot") { + Some(boot_tag) => match boot_tag.value.as_str() { + "Data" => (), + tx_id => { + if !deps.gateway.check_head(tx_id.to_string()).await? { + return Err("Invalid tx id for On-Boot tag".to_string()); + } } - } - }, - None => (), - }; + }, + None => (), + }; - let assignment = builder - .gen_assignment(None, data_item.id(), &next_schedule_info, &None) - .await?; + deps.logger.log(format!("boot load check complete - {}", &target_id)); - let aid = assignment.id(); - let did = data_item.id(); - let build_result = builder.bundle_items(vec![assignment, data_item]).await?; + let assignment = builder + .gen_assignment(None, data_item.id(), &next_schedule_info, &None) + .await?; - let process = Process::from_bundle(&build_result.bundle)?; - deps.data_store - .save_process(&process, &build_result.binary)?; + deps.logger.log(format!("assignment generated - {}", &target_id)); - deps.scheduler - .commit(&mut *schedule_info, &next_schedule_info, did, aid); - drop(schedule_info); + let aid = assignment.id(); + let did = data_item.id(); + let build_result = builder.bundle_items(vec![assignment, data_item]).await?; - upload(&deps, build_result.binary.to_vec()).await?; + deps.logger.log(format!("data bundled - {}", &target_id)); - return id_res(&deps, process.process.process_id.clone(), start_top_level); - } else { - let build_result = builder.build_process(input, &next_schedule_info).await?; - let process = Process::from_bundle_no_assign( - &build_result.bundle, - &build_result.bundle_data_item, - )?; - deps.data_store - .save_process(&process, &build_result.binary)?; - deps.logger.log(format!("saved process")); + let process = Process::from_bundle(&build_result.bundle)?; + deps.data_store + .save_process(&process, &build_result.binary)?; + deps.logger.log(format!("saved process - {:?}", &process)); - /* - We dont commit and schedule info change here - because the process is not getting a Nonce. - However we dont drop the lock until the Process - is successfully saved to the database - */ - drop(schedule_info); + deps.scheduler + .commit(&mut *schedule_info, &next_schedule_info, did, aid); + drop(schedule_info); - upload(&deps, build_result.binary.to_vec()).await?; - return id_res(&deps, process.process.process_id.clone(), start_top_level); - } - } else if type_tag.value == "Message" { - let assignment = builder - .gen_assignment( - Some(data_item.id()), - data_item.target(), - &next_schedule_info, - &None, - ) - .await?; + deps.logger.log(format!("scheduler committed cloned - {}", &target_id)); - let aid = assignment.id(); - let dtarget = data_item.target(); - - let deep_hash = match tags.iter().find(|tag| tag.name == "From-Process") { - /* - If the Message contains a From-Process tag it is - a pushed message so we should dedupe it, otherwise - it is a user message and we should not - */ - Some(_) => { - let mut mutable_item = data_item.clone(); - let deep_hash = match mutable_item.deep_hash() { - Ok(d) => d, - Err(_) => return Err("Unable to calculate deep hash".to_string()) - }; - - /* - Throw an error if we detect a duplicated pushed - message - */ - if deps.config.enable_deep_hash_checks() { + upload(&deps, build_result.binary.to_vec()).await?; + + deps.logger.log(format!("upload triggered - {}", &target_id)); + return id_res(&deps, process.process.process_id.clone(), start_top_level); + } else { + let build_result = builder.build_process(input, &next_schedule_info).await?; + let process = Process::from_bundle_no_assign( + &build_result.bundle, + &build_result.bundle_data_item, + )?; deps.data_store - .check_existing_deep_hash(&dtarget, &deep_hash) - .await?; + .save_process(&process, &build_result.binary)?; + deps.logger.log(format!("saved process - {:?}", &process)); + + /* + We dont commit and schedule info change here + because the process is not getting a Nonce. + However we dont drop the lock until the Process + is successfully saved to the database + */ + drop(schedule_info); + + upload(&deps, build_result.binary.to_vec()).await?; + return id_res(&deps, process.process.process_id.clone(), start_top_level); } + } else if type_tag.value == "Message" { + let assignment = builder + .gen_assignment( + Some(data_item.id()), + data_item.target(), + &next_schedule_info, + &None, + ) + .await?; - Some(deep_hash) - }, - None => { - None - } - }; - - let build_result = builder.bundle_items(vec![assignment, data_item]).await?; - let message = Message::from_bundle(&build_result.bundle)?; - - deps.data_store - .save_message(&message, &build_result.binary, deep_hash.as_ref()) - .await?; - - deps.logger.log(format!("saved message")); + let aid = assignment.id(); + let dtarget = data_item.target(); + let build_result = builder.bundle_items(vec![assignment, data_item]).await?; + let message = Message::from_bundle(&build_result.bundle)?; + deps.data_store + .save_message(&message, &build_result.binary) + .await?; + deps.logger.log(format!("saved message - {:?}", &message)); - /* - we set the id of the previous assignment - for the next message to be able to use - in its Hash Chain - */ - deps.scheduler - .commit(&mut *schedule_info, &next_schedule_info, dtarget, aid); - drop(schedule_info); + /* + we set the id of the previous assignment + for the next message to be able to use + in its Hash Chain + */ + deps.scheduler + .commit(&mut *schedule_info, &next_schedule_info, dtarget, aid); + drop(schedule_info); - upload(&deps, build_result.binary.to_vec()).await?; - return id_res(&deps, message.message_id()?, start_top_level); + upload(&deps, build_result.binary.to_vec()).await?; + return id_res(&deps, message.message_id()?, start_top_level); + } else { + return Err("Type tag not present".to_string()); + } } else { return Err("Type tag not present".to_string()); } diff --git a/servers/su/su b/servers/su/su index 9f910b66f..703b5177a 100644 Binary files a/servers/su/su and b/servers/su/su differ