Skip to content

Commit

Permalink
Refactor torii grpc (#1186)
Browse files Browse the repository at this point in the history
  • Loading branch information
broody authored and kariy committed Nov 16, 2023
1 parent ae67b1a commit 9d7a2e2
Show file tree
Hide file tree
Showing 14 changed files with 228 additions and 244 deletions.
56 changes: 0 additions & 56 deletions crates/dojo-types/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,62 +20,6 @@ impl Member {
}
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Hash, Eq, Clone)]
pub struct EntityQuery {
pub model: String,
pub clause: Clause,
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Hash, Eq, Clone)]
pub enum Clause {
Keys(KeysClause),
Attribute(AttributeClause),
Composite(CompositeClause),
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Hash, Eq, Clone)]
pub struct KeysClause {
pub keys: Vec<FieldElement>,
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Hash, Eq, Clone)]
pub struct AttributeClause {
pub attribute: String,
pub operator: ComparisonOperator,
pub value: Value,
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Hash, Eq, Clone)]
pub struct CompositeClause {
pub operator: LogicalOperator,
pub clauses: Vec<Clause>,
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Hash, Eq, Clone)]
pub enum LogicalOperator {
And,
Or,
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Hash, Eq, Clone)]
pub enum ComparisonOperator {
Eq,
Neq,
Gt,
Gte,
Lt,
Lte,
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Hash, Eq, Clone)]
pub enum Value {
String(String),
Int(i64),
UInt(u64),
Bool(bool),
Bytes(Vec<u8>),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ModelMetadata {
pub schema: Ty,
Expand Down
89 changes: 38 additions & 51 deletions crates/torii/client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::collections::HashSet;
use std::sync::Arc;

use dojo_types::packing::unpack;
use dojo_types::schema::{Clause, EntityQuery, Ty};
use dojo_types::schema::Ty;
use dojo_types::WorldMetadata;
use dojo_world::contracts::WorldContractReader;
use parking_lot::{RwLock, RwLockReadGuard};
Expand All @@ -17,6 +17,7 @@ use starknet::providers::JsonRpcClient;
use starknet_crypto::FieldElement;
use tokio::sync::RwLock as AsyncRwLock;
use torii_grpc::client::EntityUpdateStreaming;
use torii_grpc::types::KeysClause;

use self::error::{Error, ParseError};
use self::storage::ModelStorage;
Expand Down Expand Up @@ -46,7 +47,7 @@ impl Client {
torii_url: String,
rpc_url: String,
world: FieldElement,
queries: Option<Vec<EntityQuery>>,
entities_keys: Option<Vec<KeysClause>>,
) -> Result<Self, Error> {
let mut grpc_client = torii_grpc::client::WorldClient::new(torii_url, world).await?;

Expand All @@ -61,23 +62,18 @@ impl Client {
let provider = JsonRpcClient::new(HttpTransport::new(rpc_url));
let world_reader = WorldContractReader::new(world, provider);

if let Some(queries) = queries {
subbed_entities.add_entities(queries)?;
if let Some(keys) = entities_keys {
subbed_entities.add_entities(keys)?;

// TODO: change this to querying the gRPC url instead
let subbed_entities = subbed_entities.entities.read().clone();
for EntityQuery { model, clause } in subbed_entities {
let model_reader = world_reader.model(&model).await?;
let keys = if let Clause::Keys(clause) = clause {
clause.keys
} else {
return Err(Error::UnsupportedQuery);
};
let values = model_reader.entity_storage(&keys).await?;
let subbed_entities = subbed_entities.entities_keys.read().clone();
for keys in subbed_entities {
let model_reader = world_reader.model(&keys.model).await?;
let values = model_reader.entity_storage(&keys.keys).await?;

client_storage.set_entity_storage(
cairo_short_string_to_felt(&model).unwrap(),
keys,
cairo_short_string_to_felt(&keys.model).unwrap(),
keys.keys,
values,
)?;
}
Expand All @@ -98,8 +94,8 @@ impl Client {
self.metadata.read()
}

pub fn subscribed_entities(&self) -> RwLockReadGuard<'_, HashSet<EntityQuery>> {
self.subscribed_entities.entities.read()
pub fn subscribed_entities(&self) -> RwLockReadGuard<'_, HashSet<KeysClause>> {
self.subscribed_entities.entities_keys.read()
}

/// Returns the model value of an entity.
Expand All @@ -109,40 +105,33 @@ impl Client {
///
/// If the requested entity is not among the synced entities, it will attempt to fetch it from
/// the RPC.
pub async fn entity(&self, entity: &EntityQuery) -> Result<Option<Ty>, Error> {
let Some(mut schema) = self.metadata.read().model(&entity.model).map(|m| m.schema.clone())
pub async fn entity(&self, keys: &KeysClause) -> Result<Option<Ty>, Error> {
let Some(mut schema) = self.metadata.read().model(&keys.model).map(|m| m.schema.clone())
else {
return Ok(None);
};

let keys = if let Clause::Keys(clause) = entity.clone().clause {
clause.keys
} else {
return Err(Error::UnsupportedQuery);
};

if !self.subscribed_entities.is_synced(entity) {
let model = self.world_reader.model(&entity.model).await?;
return Ok(Some(model.entity(&keys).await?));
if !self.subscribed_entities.is_synced(keys) {
let model = self.world_reader.model(&keys.model).await?;
return Ok(Some(model.entity(&keys.keys).await?));
}

let Ok(Some(raw_values)) = self.storage.get_entity_storage(
cairo_short_string_to_felt(&entity.model)
.map_err(ParseError::CairoShortStringToFelt)?,
&keys,
cairo_short_string_to_felt(&keys.model).map_err(ParseError::CairoShortStringToFelt)?,
&keys.keys,
) else {
return Ok(Some(schema));
};

let layout = self
.metadata
.read()
.model(&entity.model)
.model(&keys.model)
.map(|m| m.layout.clone())
.expect("qed; layout should exist");

let unpacked = unpack(raw_values, layout).unwrap();
let mut keys_and_unpacked = [keys.to_vec(), unpacked].concat();
let mut keys_and_unpacked = [keys.keys.to_vec(), unpacked].concat();

schema.deserialize(&mut keys_and_unpacked).unwrap();

Expand All @@ -152,8 +141,9 @@ impl Client {
/// Initiate the entity subscriptions and returns a [SubscriptionService] which when await'ed
/// will execute the subscription service and starts the syncing process.
pub async fn start_subscription(&self) -> Result<SubscriptionService, Error> {
let entities = self.subscribed_entities.entities.read().clone().into_iter().collect();
let sub_res_stream = self.initiate_subscription(entities).await?;
let entities_keys =
self.subscribed_entities.entities_keys.read().clone().into_iter().collect();
let sub_res_stream = self.initiate_subscription(entities_keys).await?;

let (service, handle) = SubscriptionService::new(
Arc::clone(&self.storage),
Expand All @@ -169,21 +159,15 @@ impl Client {
/// Adds entities to the list of entities to be synced.
///
/// NOTE: This will establish a new subscription stream with the server.
pub async fn add_entities_to_sync(&self, entities: Vec<EntityQuery>) -> Result<(), Error> {
for entity in &entities {
let keys = if let Clause::Keys(clause) = entity.clone().clause {
clause.keys
} else {
return Err(Error::UnsupportedQuery);
};

self.initiate_entity(&entity.model, keys.clone()).await?;
pub async fn add_entities_to_sync(&self, entities_keys: Vec<KeysClause>) -> Result<(), Error> {
for keys in &entities_keys {
self.initiate_entity(&keys.model, keys.keys.clone()).await?;
}

self.subscribed_entities.add_entities(entities)?;
self.subscribed_entities.add_entities(entities_keys)?;

let updated_entities =
self.subscribed_entities.entities.read().clone().into_iter().collect();
self.subscribed_entities.entities_keys.read().clone().into_iter().collect();
let sub_res_stream = self.initiate_subscription(updated_entities).await?;

match self.sub_client_handle.get() {
Expand All @@ -196,11 +180,14 @@ impl Client {
/// Removes entities from the list of entities to be synced.
///
/// NOTE: This will establish a new subscription stream with the server.
pub async fn remove_entities_to_sync(&self, entities: Vec<EntityQuery>) -> Result<(), Error> {
self.subscribed_entities.remove_entities(entities)?;
pub async fn remove_entities_to_sync(
&self,
entities_keys: Vec<KeysClause>,
) -> Result<(), Error> {
self.subscribed_entities.remove_entities(entities_keys)?;

let updated_entities =
self.subscribed_entities.entities.read().clone().into_iter().collect();
self.subscribed_entities.entities_keys.read().clone().into_iter().collect();
let sub_res_stream = self.initiate_subscription(updated_entities).await?;

match self.sub_client_handle.get() {
Expand All @@ -216,10 +203,10 @@ impl Client {

async fn initiate_subscription(
&self,
entities: Vec<EntityQuery>,
keys: Vec<KeysClause>,
) -> Result<EntityUpdateStreaming, Error> {
let mut grpc_client = self.inner.write().await;
let stream = grpc_client.subscribe_entities(entities).await?;
let stream = grpc_client.subscribe_entities(keys).await?;
Ok(stream)
}

Expand Down
25 changes: 5 additions & 20 deletions crates/torii/client/src/client/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ mod tests {
use std::collections::HashMap;
use std::sync::Arc;

use dojo_types::schema::{KeysClause, Ty};
use dojo_types::schema::Ty;
use dojo_types::WorldMetadata;
use parking_lot::RwLock;
use starknet::core::utils::cairo_short_string_to_felt;
Expand Down Expand Up @@ -202,13 +202,8 @@ mod tests {
fn err_if_set_values_too_many() {
let storage = create_dummy_storage();
let keys = vec![felt!("0x12345")];
let entity = dojo_types::schema::EntityQuery {
model: "Position".into(),
clause: dojo_types::schema::Clause::Keys(KeysClause { keys: keys.clone() }),
};

let values = vec![felt!("1"), felt!("2"), felt!("3"), felt!("4"), felt!("5")];
let model = cairo_short_string_to_felt(&entity.model).unwrap();
let model = cairo_short_string_to_felt("Position").unwrap();
let result = storage.set_entity_storage(model, keys, values);

assert!(storage.storage.read().is_empty());
Expand All @@ -222,13 +217,8 @@ mod tests {
fn err_if_set_values_too_few() {
let storage = create_dummy_storage();
let keys = vec![felt!("0x12345")];
let entity = dojo_types::schema::EntityQuery {
model: "Position".into(),
clause: dojo_types::schema::Clause::Keys(KeysClause { keys: keys.clone() }),
};

let values = vec![felt!("1"), felt!("2")];
let model = cairo_short_string_to_felt(&entity.model).unwrap();
let model = cairo_short_string_to_felt("Position").unwrap();
let result = storage.set_entity_storage(model, keys, values);

assert!(storage.storage.read().is_empty());
Expand All @@ -242,23 +232,18 @@ mod tests {
fn set_and_get_entity_value() {
let storage = create_dummy_storage();
let keys = vec![felt!("0x12345")];
let entity = dojo_types::schema::EntityQuery {
model: "Position".into(),
clause: dojo_types::schema::Clause::Keys(KeysClause { keys: keys.clone() }),
};

assert!(storage.storage.read().is_empty(), "storage must be empty initially");

let model = storage.metadata.read().model(&entity.model).cloned().unwrap();

let model = storage.metadata.read().model("Position").cloned().unwrap();
let expected_storage_addresses = compute_all_storage_addresses(
cairo_short_string_to_felt(&model.name).unwrap(),
&keys,
model.packed_size,
);

let expected_values = vec![felt!("1"), felt!("2"), felt!("3"), felt!("4")];
let model_name_in_felt = cairo_short_string_to_felt(&entity.model).unwrap();
let model_name_in_felt = cairo_short_string_to_felt("Position").unwrap();

storage
.set_entity_storage(model_name_in_felt, keys.clone(), expected_values.clone())
Expand Down
Loading

0 comments on commit 9d7a2e2

Please sign in to comment.