Skip to content

Commit

Permalink
Fix torii commit updates before publish (#1172)
Browse files Browse the repository at this point in the history
* Fix torii commit updates before publish

* use serial_test
  • Loading branch information
broody authored Nov 9, 2023
1 parent afa501c commit 4dd2167
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 28 deletions.
26 changes: 26 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 6 additions & 4 deletions crates/torii/core/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,11 @@ impl Sql {
.fetch_one(&self.pool)
.await?;

SimpleBroker::publish(model_registered);

let mut model_idx = 0_i64;
self.build_register_queries_recursive(&model, vec![model.name()], &mut model_idx);
self.query_queue.execute_all().await?;

SimpleBroker::publish(model_registered);

Ok(())
}
Expand Down Expand Up @@ -149,10 +150,11 @@ impl Sql {
.fetch_one(&self.pool)
.await?;

SimpleBroker::publish(entity_updated);

let path = vec![entity.name()];
self.build_set_entity_queries_recursive(path, event_id, &entity_id, &entity);
self.query_queue.execute_all().await?;

SimpleBroker::publish(entity_updated);

Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions crates/torii/graphql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,4 @@ scarb.workspace = true
sozo = { path = "../../sozo" }
starknet-crypto.workspace = true
starknet.workspace = true
serial_test = "2.0.0"
83 changes: 59 additions & 24 deletions crates/torii/graphql/src/tests/subscription_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,18 @@ mod tests {
use async_graphql::value;
use dojo_types::primitive::Primitive;
use dojo_types::schema::{Enum, EnumOption, Member, Struct, Ty};
use serial_test::serial;
use sqlx::SqlitePool;
use starknet::core::types::Event;
use starknet_crypto::{poseidon_hash_many, FieldElement};
use tokio::sync::mpsc;
// use tokio_util::sync::CancellationToken;
use torii_core::sql::Sql;

use crate::tests::{model_fixtures, run_graphql_subscription};

#[sqlx::test(migrations = "../migrations")]
#[serial]
async fn test_entity_subscription(pool: SqlitePool) {
// Sleep in order to run this test in a single thread
tokio::time::sleep(Duration::from_secs(1)).await;
// let cts = CancellationToken::new();
let mut db = Sql::new(pool.clone(), FieldElement::ZERO).await.unwrap();

model_fixtures(&mut db).await;
Expand All @@ -28,7 +26,16 @@ mod tests {
let entity_id = format!("{:#x}", poseidon_hash_many(&key));
let keys_str = key.iter().map(|k| format!("{:#x}", k)).collect::<Vec<String>>().join(",");
let expected_value: async_graphql::Value = value!({
"entityUpdated": { "id": entity_id, "keys":vec![keys_str], "model_names": "Moves" }
"entityUpdated": {
"id": entity_id,
"keys":vec![keys_str],
"model_names": "Moves",
"models" : [{
"player": format!("{:#x}", FieldElement::ONE),
"remaining": 10,
"last_direction": "Left"
}]
}
});
let (tx, mut rx) = mpsc::channel(10);

Expand Down Expand Up @@ -80,12 +87,20 @@ mod tests {
// 2. The subscription is executed and it is listeing, waiting for publish() to be executed
let response_value = run_graphql_subscription(
&pool,
r#"
subscription {
entityUpdated {
id, keys, model_names
}
}"#,
r#"subscription {
entityUpdated {
id
keys
model_names
models {
... on Moves {
player
remaining
last_direction
}
}
}
}"#,
)
.await;
// 4. The subcription has received the message from publish()
Expand All @@ -95,10 +110,8 @@ mod tests {
}

#[sqlx::test(migrations = "../migrations")]
#[serial]
async fn test_entity_subscription_with_id(pool: SqlitePool) {
// Sleep in order to run this test in a single thread
tokio::time::sleep(Duration::from_secs(1)).await;
// let cts = CancellationToken::new();
let mut db = Sql::new(pool.clone(), FieldElement::ZERO).await.unwrap();

model_fixtures(&mut db).await;
Expand All @@ -107,7 +120,16 @@ mod tests {
let entity_id = format!("{:#x}", poseidon_hash_many(&key));
let keys_str = key.iter().map(|k| format!("{:#x}", k)).collect::<Vec<String>>().join(",");
let expected_value: async_graphql::Value = value!({
"entityUpdated": { "id": entity_id, "keys":vec![keys_str], "model_names": "Moves" }
"entityUpdated": {
"id": entity_id,
"keys":vec![keys_str],
"model_names": "Moves",
"models" : [{
"player": format!("{:#x}", FieldElement::ONE),
"remaining": 10,
"last_direction": "Left"
}]
}
});
let (tx, mut rx) = mpsc::channel(10);

Expand All @@ -125,6 +147,11 @@ mod tests {
key: true,
ty: Ty::Primitive(Primitive::ContractAddress(Some(FieldElement::ONE))),
},
Member {
name: "remaining".to_string(),
key: false,
ty: Ty::Primitive(Primitive::U8(Some(10))),
},
Member {
name: "last_direction".to_string(),
key: false,
Expand Down Expand Up @@ -154,12 +181,20 @@ mod tests {
// 2. The subscription is executed and it is listeing, waiting for publish() to be executed
let response_value = run_graphql_subscription(
&pool,
r#"
subscription {
entityUpdated(id: "0x579e8877c7755365d5ec1ec7d3a94a457eff5d1f40482bbe9729c064cdead2") {
id, keys, model_names
}
}"#,
r#"subscription {
entityUpdated(id: "0x579e8877c7755365d5ec1ec7d3a94a457eff5d1f40482bbe9729c064cdead2") {
id
keys
model_names
models {
... on Moves {
player
remaining
last_direction
}
}
}
}"#,
)
.await;
// 4. The subscription has received the message from publish()
Expand All @@ -169,6 +204,7 @@ mod tests {
}

#[sqlx::test(migrations = "../migrations")]
#[serial]
async fn test_model_subscription(pool: SqlitePool) {
let mut db = Sql::new(pool.clone(), FieldElement::ZERO).await.unwrap();
// 0. Preprocess model value
Expand Down Expand Up @@ -217,10 +253,8 @@ mod tests {
}

#[sqlx::test(migrations = "../migrations")]
#[serial]
async fn test_model_subscription_with_id(pool: SqlitePool) {
// Sleep in order to run this test at the end in a single thread
tokio::time::sleep(Duration::from_secs(2)).await;

let mut db = Sql::new(pool.clone(), FieldElement::ZERO).await.unwrap();
// 0. Preprocess model value
let name = "Test".to_string();
Expand Down Expand Up @@ -267,6 +301,7 @@ mod tests {
}

#[sqlx::test(migrations = "../migrations")]
#[serial]
async fn test_event_emitted(pool: SqlitePool) {
let mut db = Sql::new(pool.clone(), FieldElement::ZERO).await.unwrap();

Expand Down

0 comments on commit 4dd2167

Please sign in to comment.