Skip to content

Commit

Permalink
Revert "fix(su): revert to pre dedupe"
Browse files Browse the repository at this point in the history
This reverts commit d9b8144.
  • Loading branch information
VinceJuliano committed Jan 21, 2025
1 parent d9b8144 commit 247e2f1
Show file tree
Hide file tree
Showing 10 changed files with 501 additions and 150 deletions.
86 changes: 71 additions & 15 deletions servers/su/src/domain/clients/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,9 @@ impl From<GatewayErrorType> for String {
}
}

/*
Right now we dont need all the fields
but later we can add to these types to
pull more data from gql responses
*/
#[derive(Deserialize, Debug, Clone)]
struct Node {
id: String,
}

#[derive(Deserialize, Debug)]
struct Edge {
node: Node,
node: GatewayTx,
}

#[derive(Deserialize, Debug)]
Expand Down Expand Up @@ -219,14 +209,82 @@ impl Gateway for ArweaveGateway {
}
}

async fn raw(&self, tx_id: &String) -> Result<Vec<u8>, String> {
let config = AoConfig::new(Some("su".to_string())).expect("Failed to read configuration");
let arweave_url = config.arweave_url;

let url = match Url::parse(&arweave_url) {
Ok(u) => u,
Err(e) => return Err(format!("{}", e)),
};

let client = Client::new();

let response = client
.get(
url.join(&format!("raw/{}", tx_id))
.map_err(|e| GatewayErrorType::StatusError(e.to_string()))?,
)
.send()
.await
.map_err(|e| GatewayErrorType::StatusError(e.to_string()))?;

if response.status().is_success() {
let body = response
.bytes()
.await
.map_err(|e| GatewayErrorType::StatusError(e.to_string()))?;
Ok(body.to_vec())
} else {
Err(format!(
"Failed to get status. Status code: {}",
response.status()
))
}
}

async fn gql_tx(&self, tx_id: &String) -> Result<GatewayTx, String> {
let config = AoConfig::new(Some("su".to_string())).expect("Failed to read configuration");
let graphql_url = config.graphql_url;
let client = Client::new();

/*
id
signature
anchor
owner {
address
key
}
tags {
name
value
}
recipient
*/

let query = serde_json::json!({
"query": format!(
"query {{ transactions (ids: [\"{}\"]){{ edges {{ node {{ id }} }} }} }}",
"query {{
transactions(ids: [\"{}\"]) {{
edges {{
node {{
id
signature
anchor
owner {{
address
key
}}
tags {{
name
value
}}
recipient
}}
}}
}}
}}",
tx_id
),
"variables": {}
Expand All @@ -250,9 +308,7 @@ impl Gateway for ArweaveGateway {
.map_err(|e| GatewayErrorType::JsonParseError(e.to_string()))?;

if let Some(edge) = body.data.transactions.edges.get(0) {
Ok(GatewayTx {
id: edge.node.clone().id,
})
Ok(edge.node.clone())
} else {
Err("Transaction not found".to_string())
}
Expand Down
70 changes: 68 additions & 2 deletions servers/su/src/domain/clients/local_store/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ impl LocalStoreClient {
("process_ordering".to_string(), opts_index.clone()),
("message".to_string(), opts_index.clone()),
("message_ordering".to_string(), opts_index.clone()),
("deep_hash".to_string(), opts_index.clone()),
]
}

Expand Down Expand Up @@ -162,6 +163,13 @@ impl LocalStoreClient {
))
}

fn deep_hash_key(&self, process_id: &String, deep_hash: &String) -> Result<String, StoreErrorType> {
Ok(format!(
"deep_hash:{}:{}",
process_id, deep_hash
))
}

/*
This is the core method of this program used
for querying message ranges for the /processid
Expand Down Expand Up @@ -220,7 +228,7 @@ impl LocalStoreClient {
if let Some(ref to_str) = to {
if let Ok(to_timestamp) = to_str.parse::<i64>() {
if timestamp > to_timestamp {
has_next_page = true;
has_next_page = false;
break;
}
}
Expand Down Expand Up @@ -288,6 +296,7 @@ impl DataStore for LocalStoreClient {
&self,
message: &Message,
bundle_in: &[u8],
deep_hash: Option<&String>,
) -> Result<String, StoreErrorType> {
let message_id = message.message_id()?;
let assignment_id = message.assignment_id()?;
Expand All @@ -314,6 +323,19 @@ impl DataStore for LocalStoreClient {
let assignment_key = self.msg_assignment_key(&assignment_id);
self.file_db.put(assignment_key.as_bytes(), bundle_in)?;

let cf = self.index_db.cf_handle("deep_hash").ok_or_else(|| {
StoreErrorType::DatabaseError("Column family 'message_ordering' not found".to_string())
})?;

match deep_hash {
Some(dh) => {
let deep_hash_key = self.deep_hash_key(&message.process_id()?, dh)?;
self.index_db
.put_cf(cf, deep_hash_key.as_bytes(), message.process_id()?.as_bytes())?;
},
None => ()
};

Ok("Message saved".to_string())
}

Expand Down Expand Up @@ -399,6 +421,23 @@ impl DataStore for LocalStoreClient {
}
}

async fn check_existing_deep_hash(&self, process_id: &String, deep_hash: &String) -> Result<(), StoreErrorType> {
let cf = self.index_db.cf_handle("deep_hash").ok_or_else(|| {
StoreErrorType::DatabaseError("Column family 'deep_hash' not found".to_string())
})?;

let deep_hash_key = self.deep_hash_key(process_id, deep_hash)?;
match self.index_db.get_cf(cf, deep_hash_key) {
Ok(dh) => {
match dh {
Some(_) => return Err(StoreErrorType::MessageExists("Deep hash already exists".to_string())),
None => return Ok(())
}
},
Err(_) => return Ok(())
}
}

/*
Message list retrieval for the /processid
query, this returns a paginated list of messages
Expand All @@ -424,7 +463,9 @@ impl DataStore for LocalStoreClient {
*/
let include_process = process_in.assignment.is_some()
&& match from {
Some(ref from_nonce) => from_nonce == &process_in.nonce()?.to_string(),
Some(ref from_timestamp) => {
from_timestamp != &process_in.timestamp()?.to_string()
},
/*
No 'from' means it's the first page
*/
Expand All @@ -441,6 +482,26 @@ impl DataStore for LocalStoreClient {
actual_limit -= 1;
}

/*
handles an edge case where "to" is the message right
after the process, and limit is 1
*/
if include_process && actual_limit == 0 {
match to {
Some(t) => {
let timestamp: i64 = t.parse()?;
if timestamp == process_in.timestamp()? {
return Ok(PaginatedMessages::from_messages(messages, false)?);
} else if timestamp > process_in.timestamp()? {
return Ok(PaginatedMessages::from_messages(messages, true)?);
}
},
None => {
return Ok(PaginatedMessages::from_messages(messages, false)?);
}
}
}

let (paginated_keys, has_next_page) = self
.fetch_message_range(process_id, from, to, &Some(actual_limit))
.await?;
Expand All @@ -453,6 +514,11 @@ impl DataStore for LocalStoreClient {
for (_, assignment_id) in paginated_keys {
let assignment_key = self.msg_assignment_key(&assignment_id);

/*
It is possible the file isnt finished saving and
available on the file db yet that is why this retry loop
is here.
*/
for _ in 0..10 {
if let Some(message_data) = self.file_db.get(assignment_key.as_bytes())? {
let message: Message = Message::from_bytes(message_data)?;
Expand Down
14 changes: 7 additions & 7 deletions servers/su/src/domain/clients/local_store/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ mod tests {
let message_bundle = create_test_message_bundle();
let test_message = Message::from_bytes(message_bundle.clone())?;

client.save_message(&test_message, &message_bundle).await?;
client.save_message(&test_message, &message_bundle, None).await?;
let retrieved_message = client.get_message(&test_message.assignment.id)?;

assert_eq!(retrieved_message.assignment.id, test_message.assignment.id);
Expand All @@ -86,7 +86,7 @@ mod tests {
// Save all messages
for bundle in message_bundles.iter() {
let test_message = Message::from_bytes(bundle.clone())?;
client.save_message(&test_message, &bundle).await?;
client.save_message(&test_message, &bundle, None).await?;
}

// Retrieve messages and check nonce order and continuity
Expand Down Expand Up @@ -124,7 +124,7 @@ mod tests {

for bundle in message_bundles.iter() {
let test_message = Message::from_bytes(bundle.clone())?;
client.save_message(&test_message, &bundle).await?;
client.save_message(&test_message, &bundle, None).await?;
}

// Case 1: Default parameters
Expand Down Expand Up @@ -202,7 +202,7 @@ mod tests {
// Save half of the messages
for bundle in message_bundles.iter().take(message_bundles.len() / 2) {
let test_message = Message::from_bytes(bundle.clone())?;
client.save_message(&test_message, &bundle).await?;
client.save_message(&test_message, &bundle, None).await?;
}

let (process_bundle_2, message_bundles_2) = bundle_list_2();
Expand All @@ -212,19 +212,19 @@ mod tests {
// Save half of the messages of next process
for bundle in message_bundles_2.iter().take(message_bundles_2.len() / 2) {
let test_message = Message::from_bytes(bundle.clone())?;
client.save_message(&test_message, &bundle).await?;
client.save_message(&test_message, &bundle, None).await?;
}

// Save second half of messages for the first process
for bundle in message_bundles.iter().skip(message_bundles.len() / 2) {
let test_message = Message::from_bytes(bundle.clone())?;
client.save_message(&test_message, &bundle).await?;
client.save_message(&test_message, &bundle, None).await?;
}

// Save second half of messages for the second process
for bundle in message_bundles_2.iter().skip(message_bundles_2.len() / 2) {
let test_message = Message::from_bytes(bundle.clone())?;
client.save_message(&test_message, &bundle).await?;
client.save_message(&test_message, &bundle, None).await?;
}

// Retrieve messages and check length, nonce order, and continuity
Expand Down
Loading

0 comments on commit 247e2f1

Please sign in to comment.