Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subgraph composition spec version #5782

Open
wants to merge 2 commits into
base: zoran/subgraph-composition-rework-vid-wrap2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions graph/src/schema/entity_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ impl EntityType {
pub fn is_object_type(&self) -> bool {
self.schema.is_object_type(self.atom)
}

pub fn new_vid_form(&self) -> bool {
self.schema.new_vid_form()
}
}

impl fmt::Display for EntityType {
Expand Down
7 changes: 7 additions & 0 deletions graph/src/schema/input/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::data::graphql::{DirectiveExt, DocumentExt, ObjectTypeExt, TypeExt, Va
use crate::data::store::{
self, EntityValidationError, IdType, IntoEntityIterator, TryIntoEntityIterator, ValueType, ID,
};
use crate::data::subgraph::SPEC_VERSION_1_3_0;
use crate::data::value::Word;
use crate::derive::CheapClone;
use crate::prelude::q::Value;
Expand Down Expand Up @@ -955,6 +956,7 @@ pub struct Inner {
pool: Arc<AtomPool>,
/// A list of all timeseries types by interval
agg_mappings: Box<[AggregationMapping]>,
spec_version: Version,
}

impl InputSchema {
Expand Down Expand Up @@ -1042,6 +1044,7 @@ impl InputSchema {
enum_map,
pool,
agg_mappings,
spec_version: spec_version.clone(),
}),
})
}
Expand Down Expand Up @@ -1585,6 +1588,10 @@ impl InputSchema {
}?;
Some(EntityType::new(self.cheap_clone(), obj_type.name))
}

pub fn new_vid_form(&self) -> bool {
self.inner.spec_version >= SPEC_VERSION_1_3_0
}
}

/// Create a new pool that contains the names of all the types defined
Expand Down
30 changes: 28 additions & 2 deletions store/postgres/src/relational/prune.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@ use graph::{
};
use itertools::Itertools;

use crate::{catalog, copy::AdaptiveBatchSize, deployment, relational::Table};
use crate::{
catalog,
copy::AdaptiveBatchSize,
deployment,
relational::{Table, VID_COLUMN},
};

use super::{Catalog, Layout, Namespace};

Expand Down Expand Up @@ -68,6 +73,7 @@ struct TablePair {
// has the same name as `src` but is in a different namespace
dst: Arc<Table>,
src_nsp: Namespace,
dst_nsp: Namespace,
}

impl TablePair {
Expand All @@ -94,7 +100,12 @@ impl TablePair {
}
conn.batch_execute(&query)?;

Ok(TablePair { src, dst, src_nsp })
Ok(TablePair {
src,
dst,
src_nsp,
dst_nsp,
})
}

/// Copy all entity versions visible between `earliest_block` and
Expand Down Expand Up @@ -228,6 +239,12 @@ impl TablePair {
let src_qname = &self.src.qualified_name;
let dst_qname = &self.dst.qualified_name;
let src_nsp = &self.src_nsp;
let dst_nsp = &self.dst_nsp;

let vid_seq = format!("{}_{VID_COLUMN}_seq", self.src.name);

let old_vid_form = !self.src.object.new_vid_form();

let mut query = String::new();

// What we are about to do would get blocked by autovacuum on our
Expand All @@ -237,6 +254,15 @@ impl TablePair {
"src" => src_nsp.as_str(), "error" => e.to_string());
}

// Make sure the vid sequence
// continues from where it was
if old_vid_form {
writeln!(
query,
"select setval('{dst_nsp}.{vid_seq}', nextval('{src_nsp}.{vid_seq}'));"
)?;
}

writeln!(query, "drop table {src_qname};")?;
writeln!(query, "alter table {dst_qname} set schema {src_nsp}")?;
conn.transaction(|conn| conn.batch_execute(&query))?;
Expand Down
22 changes: 17 additions & 5 deletions store/postgres/src/relational_queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2548,6 +2548,8 @@ impl<'a> QueryFragment<Pg> for InsertQuery<'a> {
let out = &mut out;
out.unsafe_to_cache_prepared();

let new_vid_form = self.table.object.new_vid_form();

// Construct a query
// insert into schema.table(column, ...)
// values
Expand All @@ -2573,7 +2575,9 @@ impl<'a> QueryFragment<Pg> for InsertQuery<'a> {
out.push_sql(CAUSALITY_REGION_COLUMN);
};

out.push_sql(", vid");
if new_vid_form {
out.push_sql(", vid");
}
out.push_sql(") values\n");

for (i, row) in self.rows.iter().enumerate() {
Expand All @@ -2591,8 +2595,10 @@ impl<'a> QueryFragment<Pg> for InsertQuery<'a> {
out.push_sql(", ");
out.push_bind_param::<Integer, _>(&row.causality_region)?;
};
out.push_sql(", ");
out.push_bind_param::<BigInt, _>(&row.vid)?;
if new_vid_form {
out.push_sql(", ");
out.push_bind_param::<BigInt, _>(&row.vid)?;
}
out.push_sql(")");
}

Expand Down Expand Up @@ -5090,6 +5096,8 @@ impl<'a> QueryFragment<Pg> for CopyEntityBatchQuery<'a> {
fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> {
out.unsafe_to_cache_prepared();

let new_vid_form = self.src.object.new_vid_form();

// Construct a query
// insert into {dst}({columns})
// select {columns} from {src}
Expand All @@ -5110,7 +5118,9 @@ impl<'a> QueryFragment<Pg> for CopyEntityBatchQuery<'a> {
out.push_sql(", ");
out.push_sql(CAUSALITY_REGION_COLUMN);
};
out.push_sql(", vid");
if new_vid_form {
out.push_sql(", vid");
}

out.push_sql(")\nselect ");
for column in &self.columns {
Expand Down Expand Up @@ -5176,7 +5186,9 @@ impl<'a> QueryFragment<Pg> for CopyEntityBatchQuery<'a> {
));
}
}
out.push_sql(", vid");
if new_vid_form {
out.push_sql(", vid");
}

out.push_sql(" from ");
out.push_sql(self.src.qualified_name.as_str());
Expand Down
Loading