Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-24247 Indexes created with table must be created in AVAILABLE state #5113

Merged
merged 16 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.catalog;

import java.util.ArrayList;
import java.util.List;
import org.apache.ignite.internal.catalog.storage.UpdateEntry;

/**
* Update producer that is used to group updates
* when executing a batch of catalog commands.
*/
class BulkUpdateProducer implements UpdateProducer {
private final List<? extends UpdateProducer> commands;

BulkUpdateProducer(List<? extends UpdateProducer> producers) {
this.commands = producers;
}

@Override
public List<UpdateEntry> get(UpdateContext updateContext) {
List<UpdateEntry> bulkUpdateEntries = new ArrayList<>();

for (UpdateProducer producer : commands) {
List<UpdateEntry> entries = producer.get(updateContext);

for (UpdateEntry entry : entries) {
updateContext.updateCatalog(
catalog -> entry.applyUpdate(catalog, CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN)
);
}

bulkUpdateEntries.addAll(entries);
}

return bulkUpdateEntries;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,10 @@ public CompletableFuture<Integer> execute(List<CatalogCommand> commands) {
return nullCompletedFuture();
}

if (commands.size() == 1) {
return execute(commands.get(0));
}

return saveUpdateAndWaitForActivation(new BulkUpdateProducer(List.copyOf(commands)));
}

Expand Down Expand Up @@ -282,7 +286,7 @@ private CompletableFuture<Void> initCatalog(Catalog emptyCatalog) {
CreateSchemaCommand.builder().name(SYSTEM_SCHEMA_NAME).build()
);

List<UpdateEntry> entries = new BulkUpdateProducer(initCommands).get(emptyCatalog);
List<UpdateEntry> entries = new BulkUpdateProducer(initCommands).get(new UpdateContext(emptyCatalog));

return updateLog.append(new VersionedUpdate(emptyCatalog.version() + 1, 0L, entries))
.handle((result, error) -> {
Expand Down Expand Up @@ -393,7 +397,7 @@ private CompletableFuture<Integer> saveUpdate(UpdateProducer updateProducer, int

List<UpdateEntry> updates;
try {
updates = updateProducer.get(catalog);
updates = updateProducer.get(new UpdateContext(catalog));
} catch (CatalogValidationException ex) {
return failedFuture(new CatalogVersionAwareValidationException(ex, catalog.version()));
} catch (Exception ex) {
Expand Down Expand Up @@ -511,28 +515,4 @@ private static Catalog applyUpdateFinal(Catalog catalog, VersionedUpdate update,
);
}

private static class BulkUpdateProducer implements UpdateProducer {
private final List<? extends UpdateProducer> commands;

BulkUpdateProducer(List<? extends UpdateProducer> producers) {
this.commands = producers;
}

@Override
public List<UpdateEntry> get(Catalog catalog) {
List<UpdateEntry> bulkUpdateEntries = new ArrayList<>();

for (UpdateProducer producer : commands) {
List<UpdateEntry> entries = producer.get(catalog);

for (UpdateEntry entry : entries) {
catalog = entry.applyUpdate(catalog, INITIAL_CAUSALITY_TOKEN);
}

bulkUpdateEntries.addAll(entries);
}

return bulkUpdateEntries;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.catalog;

import java.util.function.Function;

/**
* Context contains two instances of the catalog: the base one and the updated one.
*
* <p>During batch command processing, changes are generated and applied to
* the updated instance. The base catalog instance can be used by a command
* to determine whether certain changes have been made to the catalog during
* processing of the current batch of commands.
*
* @see BulkUpdateProducer
*/
public class UpdateContext {
/** The base catalog descriptor. */
private final Catalog baseCatalog;

/** The updatable catalog descriptor. */
private Catalog updatableCatalog;

/** Constructor. */
public UpdateContext(Catalog catalog) {
this.baseCatalog = catalog;
this.updatableCatalog = catalog;
}

/**
* Returns the catalog descriptor on the basis of which the command generates the list of updates.
*
* <p>In the case of batch processing, this catalog instance contains the updates made by previous
* commands in the batch.
*
* @return Catalog descriptor.
*/
public Catalog catalog() {
return updatableCatalog;
}

/** Returns base catalog as it was before any updates from the batch were applied. */
public Catalog baseCatalog() {
return baseCatalog;
}

/** Applies specified action to the catalog. */
public void updateCatalog(Function<Catalog, Catalog> updater) {
updatableCatalog = updater.apply(updatableCatalog);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ public interface UpdateProducer {
* Returns list of {@link UpdateEntry entries} to be applied to catalog to bring it to the state
* described in the command.
*
* @param catalog Catalog on the basis of which to generate the list of updates.
* @param updateContext Context containing the catalog on the basis of which to generate the list of updates.
* @return List of updates. Should be empty if no updates actually required.
*/
List<UpdateEntry> get(Catalog catalog);
List<UpdateEntry> get(UpdateContext updateContext);
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Set;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogValidationException;
import org.apache.ignite.internal.catalog.UpdateContext;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
Expand All @@ -50,8 +51,6 @@ public abstract class AbstractCreateIndexCommand extends AbstractIndexCommand {

protected final List<String> columns;

protected final boolean isCreatedWithTable;

private final boolean ifNotExists;

AbstractCreateIndexCommand(
Expand All @@ -60,8 +59,7 @@ public abstract class AbstractCreateIndexCommand extends AbstractIndexCommand {
boolean ifNotExists,
String tableName,
boolean unique,
List<String> columns,
boolean isCreatedWithTable
List<String> columns
) throws CatalogValidationException {
super(schemaName, indexName);

Expand All @@ -71,17 +69,18 @@ public abstract class AbstractCreateIndexCommand extends AbstractIndexCommand {
this.tableName = tableName;
this.unique = unique;
this.columns = copyOrNull(columns);
this.isCreatedWithTable = isCreatedWithTable;
}

public boolean ifNotExists() {
return ifNotExists;
}

protected abstract CatalogIndexDescriptor createDescriptor(int indexId, int tableId, CatalogIndexStatus status);
protected abstract CatalogIndexDescriptor createDescriptor(int indexId, int tableId, CatalogIndexStatus status,
boolean createdWithTable);

@Override
public List<UpdateEntry> get(Catalog catalog) {
public List<UpdateEntry> get(UpdateContext context) {
Catalog catalog = context.catalog();
CatalogSchemaDescriptor schema = schemaOrThrow(catalog, schemaName);

ensureNoTableIndexOrSysViewExistsWithGivenName(schema, indexName);
Expand All @@ -101,10 +100,14 @@ public List<UpdateEntry> get(Catalog catalog) {
throw new CatalogValidationException("Unique index must include all colocation columns");
}

CatalogIndexStatus status = isCreatedWithTable ? CatalogIndexStatus.AVAILABLE : CatalogIndexStatus.REGISTERED;
boolean indexCreatedWithTable = context.baseCatalog().table(table.id()) == null;

CatalogIndexStatus status = indexCreatedWithTable
? CatalogIndexStatus.AVAILABLE
: CatalogIndexStatus.REGISTERED;

return List.of(
new NewIndexEntry(createDescriptor(catalog.objectIdGenState(), table.id(), status)),
new NewIndexEntry(createDescriptor(catalog.objectIdGenState(), table.id(), status, indexCreatedWithTable)),
new ObjectIdGenUpdateEntry(1)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,4 @@ interface AbstractCreateIndexCommandBuilder<T extends AbstractIndexCommandBuilde

/** List of the columns to index. There must be at least one column. */
T columns(List<String> columns);

/** Flag indicating that this index has been created at the same time as its table. */
T isCreatedWithTable(boolean isCreatedWithTable);
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogCommand;
import org.apache.ignite.internal.catalog.CatalogValidationException;
import org.apache.ignite.internal.catalog.UpdateContext;
import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
Expand Down Expand Up @@ -71,7 +72,8 @@ private AlterTableAddColumnCommand(
}

@Override
public List<UpdateEntry> get(Catalog catalog) {
public List<UpdateEntry> get(UpdateContext updateContext) {
Catalog catalog = updateContext.catalog();
CatalogSchemaDescriptor schema = schemaOrThrow(catalog, schemaName);

CatalogTableDescriptor table = tableOrThrow(schema, tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogCommand;
import org.apache.ignite.internal.catalog.CatalogValidationException;
import org.apache.ignite.internal.catalog.UpdateContext;
import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
Expand Down Expand Up @@ -88,7 +89,8 @@ private AlterTableAlterColumnCommand(
}

@Override
public List<UpdateEntry> get(Catalog catalog) {
public List<UpdateEntry> get(UpdateContext updateContext) {
Catalog catalog = updateContext.catalog();
CatalogSchemaDescriptor schema = schemaOrThrow(catalog, schemaName);

CatalogTableDescriptor table = tableOrThrow(schema, tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogCommand;
import org.apache.ignite.internal.catalog.CatalogValidationException;
import org.apache.ignite.internal.catalog.UpdateContext;
import org.apache.ignite.internal.catalog.descriptors.CatalogHashIndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexColumnDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
Expand Down Expand Up @@ -75,7 +76,8 @@ private AlterTableDropColumnCommand(
}

@Override
public List<UpdateEntry> get(Catalog catalog) {
public List<UpdateEntry> get(UpdateContext updateContext) {
Catalog catalog = updateContext.catalog();
CatalogSchemaDescriptor schema = schemaOrThrow(catalog, schemaName);

CatalogTableDescriptor table = tableOrThrow(schema, tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogCommand;
import org.apache.ignite.internal.catalog.CatalogValidationException;
import org.apache.ignite.internal.catalog.UpdateContext;
import org.apache.ignite.internal.catalog.descriptors.CatalogStorageProfilesDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.storage.AlterZoneEntry;
Expand Down Expand Up @@ -104,7 +105,8 @@ public boolean ifExists() {
}

@Override
public List<UpdateEntry> get(Catalog catalog) {
public List<UpdateEntry> get(UpdateContext updateContext) {
Catalog catalog = updateContext.catalog();
CatalogZoneDescriptor zone = zoneOrThrow(catalog, zoneName);

CatalogZoneDescriptor descriptor = fromParamsAndPreviousValue(zone);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogCommand;
import org.apache.ignite.internal.catalog.CatalogValidationException;
import org.apache.ignite.internal.catalog.UpdateContext;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.storage.SetDefaultZoneEntry;
import org.apache.ignite.internal.catalog.storage.UpdateEntry;
Expand Down Expand Up @@ -53,7 +54,8 @@ private AlterZoneSetDefaultCommand(String zoneName, boolean ifExists) throws Cat
}

@Override
public List<UpdateEntry> get(Catalog catalog) {
public List<UpdateEntry> get(UpdateContext updateContext) {
Catalog catalog = updateContext.catalog();
CatalogZoneDescriptor zone = zoneOrThrow(catalog, zoneName);

CatalogZoneDescriptor defaultZone = catalog.defaultZone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,14 @@ private CreateHashIndexCommand(
boolean ifNotExists,
String tableName,
boolean unique,
List<String> columns,
boolean isCreatedWithTable
List<String> columns
) throws CatalogValidationException {
super(schemaName, indexName, ifNotExists, tableName, unique, columns, isCreatedWithTable);
super(schemaName, indexName, ifNotExists, tableName, unique, columns);
}

@Override
protected CatalogIndexDescriptor createDescriptor(int indexId, int tableId, CatalogIndexStatus status) {
return new CatalogHashIndexDescriptor(indexId, indexName, tableId, unique, status, columns, isCreatedWithTable);
protected CatalogIndexDescriptor createDescriptor(int indexId, int tableId, CatalogIndexStatus status, boolean createdWithTable) {
return new CatalogHashIndexDescriptor(indexId, indexName, tableId, unique, status, columns, createdWithTable);
}

private static class Builder implements CreateHashIndexCommandBuilder {
Expand All @@ -68,7 +67,6 @@ private static class Builder implements CreateHashIndexCommandBuilder {
private String tableName;
private List<String> columns;
private boolean unique;
private boolean isCreatedWithTable;

@Override
public Builder tableName(String tableName) {
Expand Down Expand Up @@ -112,16 +110,9 @@ public CreateHashIndexCommandBuilder ifNotExists(boolean ifNotExists) {
return this;
}

@Override
public CreateHashIndexCommandBuilder isCreatedWithTable(boolean isCreatedWithTable) {
this.isCreatedWithTable = isCreatedWithTable;

return this;
}

@Override
public CatalogCommand build() {
return new CreateHashIndexCommand(schemaName, indexName, ifNotExists, tableName, unique, columns, isCreatedWithTable);
return new CreateHashIndexCommand(schemaName, indexName, ifNotExists, tableName, unique, columns);
}
}
}
Loading