Skip to content

Commit

Permalink
IGNITE-24247 Indexes created with table must be created in AVAILABLE …
Browse files Browse the repository at this point in the history
…state (wip).
  • Loading branch information
xtern committed Jan 24, 2025
1 parent 044313d commit cc914e9
Show file tree
Hide file tree
Showing 51 changed files with 415 additions and 342 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.catalog;

import java.util.ArrayList;
import java.util.List;
import org.apache.ignite.internal.catalog.storage.UpdateEntry;

/**
* Update producer that is used to group updates
* when executing a batch of catalog commands.
*/
class BulkUpdateProducer implements UpdateProducer {
private final List<? extends UpdateProducer> commands;

BulkUpdateProducer(List<? extends UpdateProducer> producers) {
this.commands = producers;
}

@Override
public List<UpdateEntry> get(UpdateContext updateContext) {
List<UpdateEntry> bulkUpdateEntries = new ArrayList<>();

for (UpdateProducer producer : commands) {
List<UpdateEntry> entries = producer.get(updateContext);

for (UpdateEntry entry : entries) {
updateContext.updateCatalog(
catalog -> entry.applyUpdate(catalog, CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN)
);
}

bulkUpdateEntries.addAll(entries);
}

return bulkUpdateEntries;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,10 @@ public CompletableFuture<Integer> execute(List<CatalogCommand> commands) {
return nullCompletedFuture();
}

if (commands.size() == 1) {
return execute(commands.get(0));
}

return saveUpdateAndWaitForActivation(new BulkUpdateProducer(List.copyOf(commands)));
}

Expand Down Expand Up @@ -390,7 +394,7 @@ private CompletableFuture<Void> initCatalog(Catalog emptyCatalog) {
CreateSchemaCommand.builder().name(SYSTEM_SCHEMA_NAME).build()
);

List<UpdateEntry> entries = new BulkUpdateProducer(initCommands).get(emptyCatalog);
List<UpdateEntry> entries = new BulkUpdateProducer(initCommands).get(new UpdateContext(emptyCatalog));

return updateLog.append(new VersionedUpdate(emptyCatalog.version() + 1, 0L, entries))
.handle((result, error) -> {
Expand Down Expand Up @@ -501,7 +505,7 @@ private CompletableFuture<Integer> saveUpdate(UpdateProducer updateProducer, int

List<UpdateEntry> updates;
try {
updates = updateProducer.get(catalog);
updates = updateProducer.get(new UpdateContext(catalog));
} catch (CatalogValidationException ex) {
return failedFuture(new CatalogVersionAwareValidationException(ex, catalog.version()));
} catch (Exception ex) {
Expand Down Expand Up @@ -619,28 +623,4 @@ private static Catalog applyUpdateFinal(Catalog catalog, VersionedUpdate update,
);
}

private static class BulkUpdateProducer implements UpdateProducer {
private final List<? extends UpdateProducer> commands;

BulkUpdateProducer(List<? extends UpdateProducer> producers) {
this.commands = producers;
}

@Override
public List<UpdateEntry> get(Catalog catalog) {
List<UpdateEntry> bulkUpdateEntries = new ArrayList<>();

for (UpdateProducer producer : commands) {
List<UpdateEntry> entries = producer.get(catalog);

for (UpdateEntry entry : entries) {
catalog = entry.applyUpdate(catalog, INITIAL_CAUSALITY_TOKEN);
}

bulkUpdateEntries.addAll(entries);
}

return bulkUpdateEntries;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.catalog;

import java.util.HashSet;
import java.util.Set;
import java.util.function.Function;
import org.jetbrains.annotations.Nullable;

/**
* Context contains the catalog to be updated.
* It is used by {@link BulkUpdateProducer} when executing a batch of catalog commands
*
* @see BulkUpdateProducer
*/
public class UpdateContext {
/** Catalog descriptor. */
private Catalog catalog;

/** Identifiers of created tables. */
private @Nullable Set<Integer> tableIds;

/** Constructor. */
public UpdateContext(Catalog catalog) {
this.catalog = catalog;
}

/** Returns catalog descriptor. */
public Catalog catalog() {
return catalog;
}

/** Registers the table being created in the context. */
public void registerTableCreation(int tableId) {
if (tableIds == null) {
tableIds = new HashSet<>();
}

tableIds.add(tableId);
}

/** Returns whether the command to create a table with the specified identifier was processed. */
public boolean containsTableCreation(int tableId) {
return tableIds != null && tableIds.contains(tableId);
}

/** Applies specified action to the catalog. */
void updateCatalog(Function<Catalog, Catalog> updater) {
catalog = updater.apply(catalog);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ public interface UpdateProducer {
* Returns list of {@link UpdateEntry entries} to be applied to catalog to bring it to the state
* described in the command.
*
* @param catalog Catalog on the basis of which to generate the list of updates.
* @param updateContext TODO Catalog on the basis of which to generate the list of updates.
* @return List of updates. Should be empty if no updates actually required.
*/
List<UpdateEntry> get(Catalog catalog);
List<UpdateEntry> get(UpdateContext updateContext);
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Set;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogValidationException;
import org.apache.ignite.internal.catalog.UpdateContext;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
Expand All @@ -50,8 +51,6 @@ public abstract class AbstractCreateIndexCommand extends AbstractIndexCommand {

protected final List<String> columns;

protected final boolean isCreatedWithTable;

private final boolean ifNotExists;

AbstractCreateIndexCommand(
Expand All @@ -60,8 +59,7 @@ public abstract class AbstractCreateIndexCommand extends AbstractIndexCommand {
boolean ifNotExists,
String tableName,
boolean unique,
List<String> columns,
boolean isCreatedWithTable
List<String> columns
) throws CatalogValidationException {
super(schemaName, indexName);

Expand All @@ -71,7 +69,6 @@ public abstract class AbstractCreateIndexCommand extends AbstractIndexCommand {
this.tableName = tableName;
this.unique = unique;
this.columns = copyOrNull(columns);
this.isCreatedWithTable = isCreatedWithTable;
}

public boolean ifNotExists() {
Expand All @@ -81,7 +78,8 @@ public boolean ifNotExists() {
protected abstract CatalogIndexDescriptor createDescriptor(int indexId, int tableId, CatalogIndexStatus status);

@Override
public List<UpdateEntry> get(Catalog catalog) {
public List<UpdateEntry> get(UpdateContext context) {
Catalog catalog = context.catalog();
CatalogSchemaDescriptor schema = schemaOrThrow(catalog, schemaName);

ensureNoTableIndexOrSysViewExistsWithGivenName(schema, indexName);
Expand All @@ -101,7 +99,9 @@ public List<UpdateEntry> get(Catalog catalog) {
throw new CatalogValidationException("Unique index must include all colocation columns");
}

CatalogIndexStatus status = isCreatedWithTable ? CatalogIndexStatus.AVAILABLE : CatalogIndexStatus.REGISTERED;
CatalogIndexStatus status = context.containsTableCreation(table.id())
? CatalogIndexStatus.AVAILABLE
: CatalogIndexStatus.REGISTERED;

return List.of(
new NewIndexEntry(createDescriptor(catalog.objectIdGenState(), table.id(), status)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,4 @@ interface AbstractCreateIndexCommandBuilder<T extends AbstractIndexCommandBuilde

/** List of the columns to index. There must be at least one column. */
T columns(List<String> columns);

/** Flag indicating that this index has been created at the same time as its table. */
T isCreatedWithTable(boolean isCreatedWithTable);
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogCommand;
import org.apache.ignite.internal.catalog.CatalogValidationException;
import org.apache.ignite.internal.catalog.UpdateContext;
import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
Expand Down Expand Up @@ -71,7 +72,8 @@ private AlterTableAddColumnCommand(
}

@Override
public List<UpdateEntry> get(Catalog catalog) {
public List<UpdateEntry> get(UpdateContext updateContext) {
Catalog catalog = updateContext.catalog();
CatalogSchemaDescriptor schema = schemaOrThrow(catalog, schemaName);

CatalogTableDescriptor table = tableOrThrow(schema, tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogCommand;
import org.apache.ignite.internal.catalog.CatalogValidationException;
import org.apache.ignite.internal.catalog.UpdateContext;
import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
Expand Down Expand Up @@ -88,7 +89,8 @@ private AlterTableAlterColumnCommand(
}

@Override
public List<UpdateEntry> get(Catalog catalog) {
public List<UpdateEntry> get(UpdateContext updateContext) {
Catalog catalog = updateContext.catalog();
CatalogSchemaDescriptor schema = schemaOrThrow(catalog, schemaName);

CatalogTableDescriptor table = tableOrThrow(schema, tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogCommand;
import org.apache.ignite.internal.catalog.CatalogValidationException;
import org.apache.ignite.internal.catalog.UpdateContext;
import org.apache.ignite.internal.catalog.descriptors.CatalogHashIndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexColumnDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
Expand Down Expand Up @@ -75,7 +76,8 @@ private AlterTableDropColumnCommand(
}

@Override
public List<UpdateEntry> get(Catalog catalog) {
public List<UpdateEntry> get(UpdateContext updateContext) {
Catalog catalog = updateContext.catalog();
CatalogSchemaDescriptor schema = schemaOrThrow(catalog, schemaName);

CatalogTableDescriptor table = tableOrThrow(schema, tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogCommand;
import org.apache.ignite.internal.catalog.CatalogValidationException;
import org.apache.ignite.internal.catalog.UpdateContext;
import org.apache.ignite.internal.catalog.descriptors.CatalogStorageProfilesDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.storage.AlterZoneEntry;
Expand Down Expand Up @@ -104,7 +105,8 @@ public boolean ifExists() {
}

@Override
public List<UpdateEntry> get(Catalog catalog) {
public List<UpdateEntry> get(UpdateContext updateContext) {
Catalog catalog = updateContext.catalog();
CatalogZoneDescriptor zone = zoneOrThrow(catalog, zoneName);

CatalogZoneDescriptor descriptor = fromParamsAndPreviousValue(zone);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogCommand;
import org.apache.ignite.internal.catalog.CatalogValidationException;
import org.apache.ignite.internal.catalog.UpdateContext;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.storage.SetDefaultZoneEntry;
import org.apache.ignite.internal.catalog.storage.UpdateEntry;
Expand Down Expand Up @@ -53,7 +54,8 @@ private AlterZoneSetDefaultCommand(String zoneName, boolean ifExists) throws Cat
}

@Override
public List<UpdateEntry> get(Catalog catalog) {
public List<UpdateEntry> get(UpdateContext updateContext) {
Catalog catalog = updateContext.catalog();
CatalogZoneDescriptor zone = zoneOrThrow(catalog, zoneName);

CatalogZoneDescriptor defaultZone = catalog.defaultZone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,14 @@ private CreateHashIndexCommand(
boolean ifNotExists,
String tableName,
boolean unique,
List<String> columns,
boolean isCreatedWithTable
List<String> columns
) throws CatalogValidationException {
super(schemaName, indexName, ifNotExists, tableName, unique, columns, isCreatedWithTable);
super(schemaName, indexName, ifNotExists, tableName, unique, columns);
}

@Override
protected CatalogIndexDescriptor createDescriptor(int indexId, int tableId, CatalogIndexStatus status) {
return new CatalogHashIndexDescriptor(indexId, indexName, tableId, unique, status, columns, isCreatedWithTable);
return new CatalogHashIndexDescriptor(indexId, indexName, tableId, unique, status, columns, false);
}

private static class Builder implements CreateHashIndexCommandBuilder {
Expand All @@ -68,7 +67,6 @@ private static class Builder implements CreateHashIndexCommandBuilder {
private String tableName;
private List<String> columns;
private boolean unique;
private boolean isCreatedWithTable;

@Override
public Builder tableName(String tableName) {
Expand Down Expand Up @@ -112,16 +110,9 @@ public CreateHashIndexCommandBuilder ifNotExists(boolean ifNotExists) {
return this;
}

@Override
public CreateHashIndexCommandBuilder isCreatedWithTable(boolean isCreatedWithTable) {
this.isCreatedWithTable = isCreatedWithTable;

return this;
}

@Override
public CatalogCommand build() {
return new CreateHashIndexCommand(schemaName, indexName, ifNotExists, tableName, unique, columns, isCreatedWithTable);
return new CreateHashIndexCommand(schemaName, indexName, ifNotExists, tableName, unique, columns);
}
}
}
Loading

0 comments on commit cc914e9

Please sign in to comment.