Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ignite-22197 Move ifExists flag handling inside a Catalog #5121

Merged
merged 16 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.catalog;


import java.util.BitSet;

/** Represent result of applying Catalog command. */
public class CatalogApplyResult {
private final int catalogVersion;
private final long catalogTime;
private final BitSet applied;

/**
* Creates a result with a bitset of applied results and version of a catalog when the changes was visible.
*
* @param appliedResults Set of bits representing the result of applying commands. Every {@code 1} bit indicates that the
* corresponding command was applied, {@code 0} indicates that the corresponding command has not been applied. Order of bits the
* same as commands given for execution to Catalog.
* @param catalogVersion Version of a catalog when the changes was visible.
*/
CatalogApplyResult(BitSet appliedResults, int catalogVersion, long catalogTime) {
assert appliedResults != null;

this.applied = appliedResults;
this.catalogVersion = catalogVersion;
this.catalogTime = catalogTime;
}

/** Returns catalog version since applied result is available. */
public int getCatalogVersion() {
return catalogVersion;
}

/** Returns catalog time since applied result is available. */
public long getCatalogTime() {
return catalogTime;
}

/**
* Returns whether the specified command was applied or not.
*
* @param idx Index of command in the result.
* @return {@code True} if command has been successfully applied or {@code false} otherwise.
*/
public boolean isApplied(int idx) {
return applied.get(idx);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,18 @@ public interface CatalogManager extends IgniteComponent, CatalogService {
* Executes given command.
*
* @param command Command to execute.
* @return Future representing result of execution (it will be completed with the created catalog version).
* @return Future representing result of execution with the created catalog version.
*/
CompletableFuture<Integer> execute(CatalogCommand command);
CompletableFuture<CatalogApplyResult> execute(CatalogCommand command);

/**
* Executes given list of commands atomically. That is, either all commands will be applied at once
* or neither of them. The whole bulk will increment catalog's version by a single point.
*
* @param commands Commands to execute.
* @return Future representing result of execution (it will be completed with the created catalog version).
* @return Future representing result of execution with the created catalog version.
*/
CompletableFuture<Integer> execute(List<CatalogCommand> commands);
CompletableFuture<CatalogApplyResult> execute(List<CatalogCommand> commands);

/**
* Returns a future, which completes when empty catalog is initialised. Otherwise this future completes upon startup.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;

import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
import java.util.Map.Entry;
import java.util.NavigableMap;
Expand Down Expand Up @@ -116,8 +117,8 @@ public class CatalogManagerImpl extends AbstractEventProducer<CatalogEvent, Cata
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();

/**
* Future used to chain local appends to UpdateLog to avoid useless concurrency (as all concurrent attempts to append compete for
* the same catalog version, hence only one will win and the rest will have to retry).
* Future used to chain local appends to UpdateLog to avoid useless concurrency (as all concurrent attempts to append compete for the
* same catalog version, hence only one will win and the rest will have to retry).
*
* <p>Guarded by {@link #lastSaveUpdateFutureMutex}.
*/
Expand Down Expand Up @@ -230,21 +231,17 @@ private Catalog catalogAt(long timestamp) {
}

@Override
public CompletableFuture<Integer> execute(CatalogCommand command) {
return saveUpdateAndWaitForActivation(command);
public CompletableFuture<CatalogApplyResult> execute(CatalogCommand command) {
return saveUpdateAndWaitForActivation(List.of(command));
}

@Override
public CompletableFuture<Integer> execute(List<CatalogCommand> commands) {
public CompletableFuture<CatalogApplyResult> execute(List<CatalogCommand> commands) {
if (nullOrEmpty(commands)) {
return nullCompletedFuture();
}

if (commands.size() == 1) {
return execute(commands.get(0));
}

return saveUpdateAndWaitForActivation(new BulkUpdateProducer(List.copyOf(commands)));
return saveUpdateAndWaitForActivation(List.copyOf(commands));
}

/**
Expand Down Expand Up @@ -310,12 +307,12 @@ private void truncateUpTo(Catalog catalog) {
LOG.info("Catalog history was truncated up to version=" + catalog.version());
}

private CompletableFuture<Integer> saveUpdateAndWaitForActivation(UpdateProducer updateProducer) {
CompletableFuture<Integer> resultFuture = new CompletableFuture<>();
private CompletableFuture<CatalogApplyResult> saveUpdateAndWaitForActivation(List<UpdateProducer> updateProducers) {
CompletableFuture<CatalogApplyResult> resultFuture = new CompletableFuture<>();

saveUpdateEliminatingLocalConcurrency(updateProducer)
saveUpdateEliminatingLocalConcurrency(updateProducers)
.thenCompose(this::awaitVersionActivation)
.whenComplete((newVersion, err) -> {
.whenComplete((result, err) -> {
if (err != null) {
Throwable errUnwrapped = ExceptionUtils.unwrapCause(err);

Expand All @@ -342,17 +339,17 @@ private CompletableFuture<Integer> saveUpdateAndWaitForActivation(UpdateProducer
resultFuture.completeExceptionally(err);
}
} else {
resultFuture.complete(newVersion);
resultFuture.complete(result);
}
});

return resultFuture;
}

private CompletableFuture<Integer> saveUpdateEliminatingLocalConcurrency(UpdateProducer updateProducer) {
private CompletableFuture<CatalogApplyResult> saveUpdateEliminatingLocalConcurrency(List<UpdateProducer> updateProducer) {
// Avoid useless and wasteful competition for the save catalog version by enforcing an order.
synchronized (lastSaveUpdateFutureMutex) {
CompletableFuture<Integer> chainedFuture = lastSaveUpdateFuture
CompletableFuture<CatalogApplyResult> chainedFuture = lastSaveUpdateFuture
.thenCompose(unused -> saveUpdate(updateProducer, 0));

// Suppressing any exception to make sure it doesn't ruin subsequent appends. The suppression is not a problem
Expand All @@ -363,6 +360,10 @@ private CompletableFuture<Integer> saveUpdateEliminatingLocalConcurrency(UpdateP
}
}

private CompletableFuture<CatalogApplyResult> awaitVersionActivation(CatalogApplyResult catalogApplyResult) {
return awaitVersionActivation(catalogApplyResult.getCatalogVersion()).thenApply(unused -> catalogApplyResult);
}

private CompletableFuture<Integer> awaitVersionActivation(int version) {
Catalog catalog = catalogByVer.get(version);

Expand All @@ -376,14 +377,15 @@ private HybridTimestamp calcClusterWideEnsureActivationTime(Catalog catalog) {
}

/**
* Attempts to save a versioned update using a CAS-like logic. If the attempt fails, makes more attempts
* until the max retry count is reached.
* Attempts to save a versioned update using a CAS-like logic. If the attempt fails, makes more attempts until the max retry count is
* reached.
*
* @param updateProducer Supplies simple updates to include into a versioned update to install.
* @param updateProducers List of simple updates to include into a versioned update to install.
* @param attemptNo Ordinal number of an attempt.
* @return Future that completes with the new Catalog version (if update was saved successfully) or an exception, otherwise.
* @return Future that completes with the result of applying updates, contains the Catalog version when the updates are visible or an
* exception in case of any error.
*/
private CompletableFuture<Integer> saveUpdate(UpdateProducer updateProducer, int attemptNo) {
private CompletableFuture<CatalogApplyResult> saveUpdate(List<UpdateProducer> updateProducers, int attemptNo) {
if (!busyLock.enterBusy()) {
return failedFuture(new NodeStoppingException());
}
Expand All @@ -395,17 +397,34 @@ private CompletableFuture<Integer> saveUpdate(UpdateProducer updateProducer, int

Catalog catalog = catalogByVer.lastEntry().getValue();

List<UpdateEntry> updates;
BitSet applyResults = new BitSet(updateProducers.size());
List<UpdateEntry> bulkUpdateEntries = new ArrayList<>();
try {
updates = updateProducer.get(new UpdateContext(catalog));
UpdateContext updateContext = new UpdateContext(catalog);
for (int i = 0; i < updateProducers.size(); i++) {
UpdateProducer update = updateProducers.get(i);
List<UpdateEntry> entries = update.get(updateContext);

if (entries.isEmpty()) {
continue;
}

for (UpdateEntry entry : entries) {
updateContext.updateCatalog(cat -> entry.applyUpdate(cat, INITIAL_CAUSALITY_TOKEN));
}

applyResults.set(i);
bulkUpdateEntries.addAll(entries);
}
} catch (CatalogValidationException ex) {
return failedFuture(new CatalogVersionAwareValidationException(ex, catalog.version()));
} catch (Exception ex) {
return failedFuture(ex);
}

if (updates.isEmpty()) {
return completedFuture(catalog.version());
// all results are empty.
if (applyResults.cardinality() == 0) {
xtern marked this conversation as resolved.
Show resolved Hide resolved
return completedFuture(new CatalogApplyResult(applyResults, catalog.version(), catalog.time()));
}

int newVersion = catalog.version() + 1;
Expand All @@ -415,14 +434,15 @@ private CompletableFuture<Integer> saveUpdate(UpdateProducer updateProducer, int
// after all reactions to that event will be completed through the catalog event notifications mechanism.
// This is important for the distribution zones recovery purposes:
// we guarantee recovery for a zones' catalog actions only if that actions were completed.
return updateLog.append(new VersionedUpdate(newVersion, delayDurationMsSupplier.getAsLong(), updates))
return updateLog.append(new VersionedUpdate(newVersion, delayDurationMsSupplier.getAsLong(), bulkUpdateEntries))
.thenCompose(result -> versionTracker.waitFor(newVersion).thenApply(none -> result))
.thenCompose(result -> {
if (result) {
return completedFuture(newVersion);
long newCatalogTime = catalogByVer.get(newVersion).time();
return completedFuture(new CatalogApplyResult(applyResults, newVersion, newCatalogTime));
}

return saveUpdate(updateProducer, attemptNo + 1);
return saveUpdate(updateProducers, attemptNo + 1);
});
} finally {
busyLock.leaveBusy();
Expand Down Expand Up @@ -503,7 +523,7 @@ private static Catalog applyUpdateFinal(Catalog catalog, VersionedUpdate update,

assert activationTimestamp > catalog.time()
: "Activation timestamp " + activationTimestamp + " must be greater than previous catalog version activation timestamp "
+ catalog.time();
+ catalog.time();

return new Catalog(
update.version(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import static org.apache.ignite.internal.catalog.CatalogParamsValidationUtils.ensureNoTableIndexOrSysViewExistsWithGivenName;
import static org.apache.ignite.internal.catalog.CatalogParamsValidationUtils.validateIdentifier;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.schemaOrThrow;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.tableOrThrow;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.table;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static org.apache.ignite.internal.util.CollectionUtils.copyOrNull;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
Expand Down Expand Up @@ -83,9 +83,13 @@ public List<UpdateEntry> get(UpdateContext context) {
Catalog catalog = context.catalog();
CatalogSchemaDescriptor schema = schemaOrThrow(catalog, schemaName);

if (ifNotExists && schema.aliveIndex(indexName) != null) {
return List.of();
}

ensureNoTableIndexOrSysViewExistsWithGivenName(schema, indexName);

CatalogTableDescriptor table = tableOrThrow(schema, tableName);
CatalogTableDescriptor table = table(schema, tableName, true);

assert columns != null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import static org.apache.ignite.internal.catalog.commands.CatalogUtils.fromParams;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.schemaOrThrow;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.tableOrThrow;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static org.apache.ignite.internal.util.CollectionUtils.copyOrNull;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
Expand Down Expand Up @@ -76,7 +75,10 @@ public List<UpdateEntry> get(UpdateContext updateContext) {
Catalog catalog = updateContext.catalog();
CatalogSchemaDescriptor schema = schemaOrThrow(catalog, schemaName);

CatalogTableDescriptor table = tableOrThrow(schema, tableName);
CatalogTableDescriptor table = CatalogUtils.table(schema, tableName, !ifTableExists);
if (table == null) {
return List.of();
}

List<CatalogTableColumnDescriptor> columnDescriptors = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import static org.apache.ignite.internal.catalog.CatalogParamsValidationUtils.validateIdentifier;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.schemaOrThrow;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.tableOrThrow;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.table;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;

import java.util.List;
Expand Down Expand Up @@ -93,7 +93,10 @@ public List<UpdateEntry> get(UpdateContext updateContext) {
Catalog catalog = updateContext.catalog();
CatalogSchemaDescriptor schema = schemaOrThrow(catalog, schemaName);

CatalogTableDescriptor table = tableOrThrow(schema, tableName);
CatalogTableDescriptor table = table(schema, tableName, !ifTableExists);
if (table == null) {
return List.of();
}

CatalogTableColumnDescriptor origin = table.column(columnName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import static org.apache.ignite.internal.catalog.CatalogParamsValidationUtils.validateIdentifier;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.schemaOrThrow;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.tableOrThrow;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.table;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static org.apache.ignite.internal.util.CollectionUtils.copyOrNull;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
Expand Down Expand Up @@ -80,7 +80,10 @@ public List<UpdateEntry> get(UpdateContext updateContext) {
Catalog catalog = updateContext.catalog();
CatalogSchemaDescriptor schema = schemaOrThrow(catalog, schemaName);

CatalogTableDescriptor table = tableOrThrow(schema, tableName);
CatalogTableDescriptor table = table(schema, tableName, !ifTableExists);
if (table == null) {
return List.of();
}

Set<String> indexedColumns = aliveIndexesForTable(catalog, table.id())
.flatMap(AlterTableDropColumnCommand::indexColumnNames)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import static org.apache.ignite.internal.catalog.CatalogParamsValidationUtils.validateZoneFilter;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.INFINITE_TIMER_VALUE;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.fromParams;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.zoneOrThrow;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.zone;

import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -107,7 +107,11 @@ public boolean ifExists() {
@Override
public List<UpdateEntry> get(UpdateContext updateContext) {
Catalog catalog = updateContext.catalog();
CatalogZoneDescriptor zone = zoneOrThrow(catalog, zoneName);

CatalogZoneDescriptor zone = zone(catalog, zoneName, !ifExists);
if (zone == null) {
return List.of();
}

CatalogZoneDescriptor descriptor = fromParamsAndPreviousValue(zone);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.ignite.internal.catalog.commands;

import static org.apache.ignite.internal.catalog.commands.CatalogUtils.zoneOrThrow;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.zone;

import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -56,7 +56,11 @@ private AlterZoneSetDefaultCommand(String zoneName, boolean ifExists) throws Cat
@Override
public List<UpdateEntry> get(UpdateContext updateContext) {
Catalog catalog = updateContext.catalog();
CatalogZoneDescriptor zone = zoneOrThrow(catalog, zoneName);

CatalogZoneDescriptor zone = zone(catalog, zoneName, !ifExists);
if (zone == null) {
return List.of();
}

CatalogZoneDescriptor defaultZone = catalog.defaultZone();
if (defaultZone != null && zone.id() == defaultZone.id()) {
Expand Down
Loading