Skip to content

Commit

Permalink
feat(su): deduplicate deep hash fixes, and message retrieval fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
VinceJuliano committed Jan 18, 2025
1 parent ea0fd72 commit 003661c
Show file tree
Hide file tree
Showing 7 changed files with 324 additions and 147 deletions.
86 changes: 71 additions & 15 deletions servers/su/src/domain/clients/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,9 @@ impl From<GatewayErrorType> for String {
}
}

/*
Right now we dont need all the fields
but later we can add to these types to
pull more data from gql responses
*/
#[derive(Deserialize, Debug, Clone)]
struct Node {
id: String,
}

#[derive(Deserialize, Debug)]
struct Edge {
node: Node,
node: GatewayTx,
}

#[derive(Deserialize, Debug)]
Expand Down Expand Up @@ -219,14 +209,82 @@ impl Gateway for ArweaveGateway {
}
}

async fn raw(&self, tx_id: &String) -> Result<Vec<u8>, String> {
let config = AoConfig::new(Some("su".to_string())).expect("Failed to read configuration");
let arweave_url = config.arweave_url;

let url = match Url::parse(&arweave_url) {
Ok(u) => u,
Err(e) => return Err(format!("{}", e)),
};

let client = Client::new();

let response = client
.get(
url.join(&format!("raw/{}", tx_id))
.map_err(|e| GatewayErrorType::StatusError(e.to_string()))?,
)
.send()
.await
.map_err(|e| GatewayErrorType::StatusError(e.to_string()))?;

if response.status().is_success() {
let body = response
.bytes()
.await
.map_err(|e| GatewayErrorType::StatusError(e.to_string()))?;
Ok(body.to_vec())
} else {
Err(format!(
"Failed to get status. Status code: {}",
response.status()
))
}
}

async fn gql_tx(&self, tx_id: &String) -> Result<GatewayTx, String> {
let config = AoConfig::new(Some("su".to_string())).expect("Failed to read configuration");
let graphql_url = config.graphql_url;
let client = Client::new();

/*
id
signature
anchor
owner {
address
key
}
tags {
name
value
}
recipient
*/

let query = serde_json::json!({
"query": format!(
"query {{ transactions (ids: [\"{}\"]){{ edges {{ node {{ id }} }} }} }}",
"query {{
transactions(ids: [\"{}\"]) {{
edges {{
node {{
id
signature
anchor
owner {{
address
key
}}
tags {{
name
value
}}
recipient
}}
}}
}}
}}",
tx_id
),
"variables": {}
Expand All @@ -250,9 +308,7 @@ impl Gateway for ArweaveGateway {
.map_err(|e| GatewayErrorType::JsonParseError(e.to_string()))?;

if let Some(edge) = body.data.transactions.edges.get(0) {
Ok(GatewayTx {
id: edge.node.clone().id,
})
Ok(edge.node.clone())
} else {
Err("Transaction not found".to_string())
}
Expand Down
32 changes: 30 additions & 2 deletions servers/su/src/domain/clients/local_store/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ impl LocalStoreClient {
if let Some(ref to_str) = to {
if let Ok(to_timestamp) = to_str.parse::<i64>() {
if timestamp > to_timestamp {
has_next_page = true;
has_next_page = false;
break;
}
}
Expand Down Expand Up @@ -425,6 +425,7 @@ impl DataStore for LocalStoreClient {
let cf = self.index_db.cf_handle("deep_hash").ok_or_else(|| {
StoreErrorType::DatabaseError("Column family 'deep_hash' not found".to_string())
})?;

let deep_hash_key = self.deep_hash_key(process_id, deep_hash)?;
match self.index_db.get_cf(cf, deep_hash_key) {
Ok(dh) => {
Expand Down Expand Up @@ -462,7 +463,9 @@ impl DataStore for LocalStoreClient {
*/
let include_process = process_in.assignment.is_some()
&& match from {
Some(ref from_nonce) => from_nonce == &process_in.nonce()?.to_string(),
Some(ref from_timestamp) => {
from_timestamp != &process_in.timestamp()?.to_string()
},
/*
No 'from' means it's the first page
*/
Expand All @@ -479,6 +482,26 @@ impl DataStore for LocalStoreClient {
actual_limit -= 1;
}

/*
handles an edge case where "to" is the message right
after the process, and limit is 1
*/
if include_process && actual_limit == 0 {
match to {
Some(t) => {
let timestamp: i64 = t.parse()?;
if timestamp == process_in.timestamp()? {
return Ok(PaginatedMessages::from_messages(messages, false)?);
} else if timestamp > process_in.timestamp()? {
return Ok(PaginatedMessages::from_messages(messages, true)?);
}
},
None => {
return Ok(PaginatedMessages::from_messages(messages, false)?);
}
}
}

let (paginated_keys, has_next_page) = self
.fetch_message_range(process_id, from, to, &Some(actual_limit))
.await?;
Expand All @@ -491,6 +514,11 @@ impl DataStore for LocalStoreClient {
for (_, assignment_id) in paginated_keys {
let assignment_key = self.msg_assignment_key(&assignment_id);

/*
It is possible the file isnt finished saving and
available on the file db yet that is why this retry loop
is here.
*/
for _ in 0..10 {
if let Some(message_data) = self.file_db.get(assignment_key.as_bytes())? {
let message: Message = Message::from_bytes(message_data)?;
Expand Down
2 changes: 1 addition & 1 deletion servers/su/src/domain/clients/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -920,7 +920,7 @@ impl DataStore for StoreClient {
let from_timestamp = from_timestamp_str
.parse::<i64>()
.map_err(StoreErrorType::from)?;
from_timestamp == process_in.process.timestamp
from_timestamp != process_in.process.timestamp
}
None => true, // No 'from' timestamp means it's the first page
};
Expand Down
32 changes: 14 additions & 18 deletions servers/su/src/domain/core/builder.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
use std::sync::Arc;

use super::tags::Tag;
use dashmap::DashMap;

use super::bytes::{ByteErrorType, DataBundle, DataItem};
use super::dal::{Gateway, Log, ScheduleProvider, Signer, TxStatus};
use super::dal::{Gateway, Log, ScheduleProvider, Signer, TxStatus, GatewayTx};
use super::json::Process;

pub struct Builder<'a> {
gateway: Arc<dyn Gateway>,
signer: Arc<dyn Signer>,
logger: &'a Arc<dyn Log>,
cache: Arc<DashMap<String, Result<(), BuilderErrorType>>>,
logger: &'a Arc<dyn Log>
}

pub struct BuildResult {
Expand Down Expand Up @@ -52,8 +50,7 @@ impl<'a> Builder<'a> {
Ok(Builder {
gateway,
signer,
logger,
cache: Arc::new(DashMap::new()),
logger
})
}

Expand Down Expand Up @@ -205,12 +202,7 @@ impl<'a> Builder<'a> {
tx_id: &String,
process: &Process,
base_layer: &Option<String>,
) -> Result<(), BuilderErrorType> {
// Check if the result is in the DashMap cache
if let Some(cached_result) = self.cache.get(tx_id) {
return cached_result.clone();
}

) -> Result<Option<GatewayTx>, BuilderErrorType> {
// Process the assignment verification
let result = match base_layer {
Some(_) => {
Expand All @@ -230,21 +222,17 @@ impl<'a> Builder<'a> {
};

match status.number_of_confirmations {
n if n >= threshold => Ok(()),
n if n >= threshold => Ok(None),
_ => Err(BuilderErrorType::BuilderError(
"Not enough confirmations to assign".to_string(),
)),
}
}
None => {
self.gateway.gql_tx(&tx_id).await?;
Ok(())
Ok(Some(self.gateway.gql_tx(&tx_id).await?))
}
};

// Store the result in the DashMap cache
self.cache.insert(tx_id.clone(), result.clone());

result
}

Expand Down Expand Up @@ -281,8 +269,16 @@ mod tests {
async fn gql_tx(&self, _tx_id: &String) -> Result<GatewayTx, String> {
Ok(GatewayTx {
id: "id".to_string(),
signature: "sig".to_string(),
anchor: None,
tags: vec![],
recipient: None,
})
}

async fn raw(&self, _tx_id: &String) -> Result<Vec<u8>, String> {
Ok(vec![])
}
}

struct MockSigner;
Expand Down
42 changes: 42 additions & 0 deletions servers/su/src/domain/core/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ impl From<String> for ByteErrorType {
}
}

impl From<base64_url::base64::DecodeError> for ByteErrorType {
fn from(error: base64_url::base64::DecodeError) -> Self {
ByteErrorType::ByteError(format!("Byte error: {:?}", error))
}
}

#[derive(Clone)]
pub struct DataBundle {
pub items: Vec<DataItem>,
Expand Down Expand Up @@ -560,6 +566,42 @@ impl DataItem {
Ok(base64_url::encode(&deep_hash_vec))
}

pub fn deep_hash_fields(
target: Option<String>,
anchor: Option<String>,
tags: Vec<Tag>,
data: Vec<u8>
) -> Result<String, ByteErrorType> {
let target_chunk = match target {
None => DeepHashChunk::Chunk(Bytes::new()),
Some(t) => DeepHashChunk::Chunk(base64_url::decode(&t)?.into())
};

let anchor_chunk = match anchor {
None => DeepHashChunk::Chunk(Bytes::new()),
Some(a) => DeepHashChunk::Chunk(base64_url::decode(&a)?.into())
};

let encoded_tags = if !tags.is_empty() {
tags.encode()?
} else {
Bytes::default()
};

let deep_hash_vec = deep_hash_sync(DeepHashChunk::Chunks(vec![
DeepHashChunk::Chunk(DATAITEM_AS_BUFFER.into()),
DeepHashChunk::Chunk(ONE_AS_BUFFER.into()),
DeepHashChunk::Chunk(ONE_AS_BUFFER.into()),
DeepHashChunk::Chunk(Bytes::new()),
target_chunk,
anchor_chunk,
DeepHashChunk::Chunk(encoded_tags.clone()),
DeepHashChunk::Chunk(Bytes::from(data)),
]))?;

Ok(base64_url::encode(&deep_hash_vec))
}

pub fn raw_id(&self) -> Vec<u8> {
let mut hasher = Sha256::new();
hasher.update(&self.signature);
Expand Down
8 changes: 7 additions & 1 deletion servers/su/src/domain/core/dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use serde::Deserialize;
pub use super::bytes::DataItem;
pub use super::json::{JsonErrorType, Message, PaginatedMessages, Process};
pub use super::router::{ProcessScheduler, Scheduler};
pub use super::tags::Tag;

/*
Interfaces for core dependencies. Implement these traits
Expand All @@ -22,9 +23,13 @@ pub struct TxStatus {
pub number_of_confirmations: i32,
}

#[derive(Deserialize)]
#[derive(Deserialize, Debug, Clone)]
pub struct GatewayTx {
pub id: String,
pub signature: String,
pub anchor: Option<String>,
pub tags: Vec<Tag>,
pub recipient: Option<String>,
}

#[async_trait]
Expand All @@ -33,6 +38,7 @@ pub trait Gateway: Send + Sync {
async fn network_info(&self) -> Result<NetworkInfo, String>;
async fn status(&self, tx_id: &String) -> Result<TxStatus, String>;
async fn gql_tx(&self, tx_id: &String) -> Result<GatewayTx, String>;
async fn raw(&self, tx_id: &String) -> Result<Vec<u8>, String>;
}

pub trait Wallet: Send + Sync {
Expand Down
Loading

0 comments on commit 003661c

Please sign in to comment.