Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CALCITE-6728] Introduce new methods to lookup tables and schemas inside schemas #4100

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.calcite.schema.impl.MaterializedViewTable;
import org.apache.calcite.schema.lookup.LikePattern;
import org.apache.calcite.sql.SqlSelect;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.SqlWriterConfig;
Expand Down Expand Up @@ -325,8 +326,8 @@ private void addMaterializedViews() {
query = buf.toString();

// Add the view for this query
String viewName = "$" + getTableNames().size();
SchemaPlus schema = parentSchema.getSubSchema(name);
String viewName = "$" + tables().getNames(LikePattern.any()).size();
SchemaPlus schema = parentSchema.subSchemas().get(name);
if (schema == null) {
throw new IllegalStateException("Cannot find schema " + name
+ " in parent schema " + parentSchema.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.calcite.schema.Schemas;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.calcite.schema.lookup.LikePattern;
import org.apache.calcite.schema.lookup.Lookup;

import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -68,8 +70,9 @@ public CloneSchema(SchemaPlus sourceSchema) {

@Override protected Map<String, Table> getTableMap() {
final Map<String, Table> map = new LinkedHashMap<>();
for (String name : sourceSchema.getTableNames()) {
final Table table = sourceSchema.getTable(name);
final Lookup<Table> tables = sourceSchema.tables();
for (String name : tables.getNames(LikePattern.any())) {
final Table table = tables.get(name);
if (table instanceof QueryableTable) {
final QueryableTable sourceTable = (QueryableTable) table;
map.put(name,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.calcite.adapter.jdbc;

import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.rel.type.RelProtoDataType;
import org.apache.calcite.schema.Function;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.SchemaVersion;
import org.apache.calcite.schema.Schemas;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.lookup.LikePattern;
import org.apache.calcite.schema.lookup.Lookup;

import org.checkerframework.checker.nullness.qual.Nullable;

import java.util.Collection;
import java.util.Collections;
import java.util.Set;

import static java.util.Objects.requireNonNull;

/**
* Base class for JDBC schemas.
*/
public abstract class JdbcBaseSchema implements Schema {

@Override public abstract Lookup<Table> tables();


@Override public @Nullable Table getTable(String name) {
return tables().get(name);
}

@Override public Set<String> getTableNames() {
return tables().getNames(LikePattern.any());
}

@Override public abstract Lookup<? extends Schema> subSchemas();

@Override public @Nullable Schema getSubSchema(String name) {
return subSchemas().get(name);
}

@Override public Set<String> getSubSchemaNames() {
return subSchemas().getNames(LikePattern.any());
}


@Override public @Nullable RelProtoDataType getType(String name) {
return null;
}

@Override public Set<String> getTypeNames() {
return Collections.emptySet();
}

@Override public final Collection<Function> getFunctions(String name) {
return Collections.emptyList();
}

@Override public final Set<String> getFunctionNames() {
return Collections.emptySet();
}

@Override public Expression getExpression(final @Nullable SchemaPlus parentSchema,
final String name) {
requireNonNull(parentSchema, "parentSchema");
return Schemas.subSchemaExpression(parentSchema, name, getClass());
}

@Override public boolean isMutable() {
return false;
}

@Override public Schema snapshot(final SchemaVersion version) {
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,27 @@
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Schemas;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.Wrapper;
import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.calcite.schema.lookup.IgnoreCaseLookup;
import org.apache.calcite.schema.lookup.LikePattern;
import org.apache.calcite.schema.lookup.LoadingCacheLookup;
import org.apache.calcite.schema.lookup.Lookup;
import org.apache.calcite.sql.SqlDialect;
import org.apache.calcite.sql.SqlDialectFactory;
import org.apache.calcite.sql.SqlDialectFactoryImpl;
import org.apache.calcite.util.BuiltInMethod;

import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;

import org.checkerframework.checker.nullness.qual.Nullable;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import javax.sql.DataSource;

Expand All @@ -51,20 +56,21 @@
* an instance of {@link JdbcSchema}.
*
* <p>This schema is lazy: it does not compute the list of schema names until
* the first call to {@link #getSubSchemaMap()}. Then it creates a
* {@link JdbcSchema} for each schema name. Each JdbcSchema will populate its
* the first call to {@link #subSchemas()} and {@link Lookup#get(String)}. Then it creates a
* {@link JdbcSchema} for this schema name. Each JdbcSchema will populate its
* tables on demand.
*/
public class JdbcCatalogSchema extends AbstractSchema implements Wrapper {
public class JdbcCatalogSchema extends JdbcBaseSchema implements Wrapper {
final DataSource dataSource;
public final SqlDialect dialect;
final JdbcConvention convention;
final String catalog;
private final Lookup<JdbcSchema> subSchemas;

/** Sub-schemas by name, lazily initialized. */
/** default schema name, lazily initialized. */
@SuppressWarnings({"method.invocation.invalid", "Convert2MethodRef"})
final Supplier<SubSchemaMap> subSchemaMapSupplier =
Suppliers.memoize(() -> computeSubSchemaMap());
private final Supplier<Optional<String>> defaultSchemaName =
Suppliers.memoize(() -> Optional.ofNullable(computeDefaultSchemaName()));

/** Creates a JdbcCatalogSchema. */
public JdbcCatalogSchema(DataSource dataSource, SqlDialect dialect,
Expand All @@ -73,6 +79,40 @@ public JdbcCatalogSchema(DataSource dataSource, SqlDialect dialect,
this.dialect = requireNonNull(dialect, "dialect");
this.convention = requireNonNull(convention, "convention");
this.catalog = catalog;
this.subSchemas = new LoadingCacheLookup<>(new IgnoreCaseLookup<JdbcSchema>() {
@Override public @Nullable JdbcSchema get(String name) {
try (Connection connection = dataSource.getConnection();
ResultSet resultSet =
connection.getMetaData().getSchemas(catalog, name)) {
while (resultSet.next()) {
final String schemaName =
requireNonNull(resultSet.getString(1),
"got null schemaName from the database");
return new JdbcSchema(dataSource, dialect, convention, catalog, schemaName);
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
return null;
}

@Override public Set<String> getNames(LikePattern pattern) {
final ImmutableSet.Builder<String> builder =
ImmutableSet.builder();
try (Connection connection = dataSource.getConnection();
ResultSet resultSet =
connection.getMetaData().getSchemas(catalog, pattern.pattern)) {
while (resultSet.next()) {
builder.add(
requireNonNull(resultSet.getString(1),
"got null schemaName from the database"));
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
return builder.build();
}
});
}

public static JdbcCatalogSchema create(
Expand Down Expand Up @@ -103,34 +143,25 @@ public static JdbcCatalogSchema create(
return new JdbcCatalogSchema(dataSource, dialect, convention, catalog);
}

private SubSchemaMap computeSubSchemaMap() {
final ImmutableMap.Builder<String, Schema> builder =
ImmutableMap.builder();
@Nullable String defaultSchemaName;
try (Connection connection = dataSource.getConnection();
ResultSet resultSet =
connection.getMetaData().getSchemas(catalog, null)) {
defaultSchemaName = connection.getSchema();
while (resultSet.next()) {
final String schemaName =
requireNonNull(resultSet.getString(1),
"got null schemaName from the database");
builder.put(schemaName,
new JdbcSchema(dataSource, dialect, convention, catalog, schemaName));
}
@Override public Lookup<Table> tables() {
return Lookup.empty();
}

@Override public Lookup<? extends Schema> subSchemas() {
return subSchemas;
}

private @Nullable String computeDefaultSchemaName() {
try (Connection connection = dataSource.getConnection()) {
return connection.getSchema();
} catch (SQLException e) {
throw new RuntimeException(e);
}
return new SubSchemaMap(defaultSchemaName, builder.build());
}

@Override protected Map<String, Schema> getSubSchemaMap() {
return subSchemaMapSupplier.get().map;
}

/** Returns the name of the default sub-schema. */
public @Nullable String getDefaultSubSchemaName() {
return subSchemaMapSupplier.get().defaultSchemaName;
return defaultSchemaName.get().orElse(null);
}

/** Returns the data source. */
Expand All @@ -148,16 +179,4 @@ public DataSource getDataSource() {
}
return null;
}

/** Contains sub-schemas by name, and the name of the default schema. */
private static class SubSchemaMap {
final @Nullable String defaultSchemaName;
final ImmutableMap<String, Schema> map;

private SubSchemaMap(@Nullable String defaultSchemaName,
ImmutableMap<String, Schema> map) {
this.defaultSchemaName = defaultSchemaName;
this.map = map;
}
}
}
Loading
Loading