Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ignite-22197 Move ifExists flag handling inside a Catalog #5121

Merged
merged 16 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions .idea/codeStyles/Project.xml
korlov42 marked this conversation as resolved.
Show resolved Hide resolved

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.catalog;


import java.util.BitSet;

/** Represent result of applying Catalog command. */
public class CatalogApplyResult {
private final int catalogVersion;
private final BitSet applied;

/**
* Creates an result with a bitset of applied results and version of a catalog when the changes was visible.
xtern marked this conversation as resolved.
Show resolved Hide resolved
*
* @param appliedResults Result of applying of commands. Every 1 bit say about applied result, otherwise 0 says that command has
xtern marked this conversation as resolved.
Show resolved Hide resolved
* not been applied. Order of bits the same as commands given for execution to Catalog.
* @param catalogVersion Version of a catalog when the changes was visible.
*/
public CatalogApplyResult(BitSet appliedResults, int catalogVersion) {
assert appliedResults != null;

this.applied = appliedResults;
this.catalogVersion = catalogVersion;
}

/** Returns catalog version since applied result is available. */
public int getCatalogVersion() {
return catalogVersion;
}

/**
* Returns has been applied command by given index or not.
xtern marked this conversation as resolved.
Show resolved Hide resolved
*
* @param idx Index of command in the result.
* @return By given index return {@code true} if command has been successfully applied or {@code false} otherwise.
xtern marked this conversation as resolved.
Show resolved Hide resolved
*/
public boolean isApplied(int idx) {
return applied.get(idx);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,18 @@ public interface CatalogManager extends IgniteComponent, CatalogService {
* Executes given command.
*
* @param command Command to execute.
* @return Future representing result of execution (it will be completed with the created catalog version).
* @return Future representing result of execution with the created catalog version.
*/
CompletableFuture<Integer> execute(CatalogCommand command);
CompletableFuture<CatalogApplyResult> execute(CatalogCommand command);

/**
* Executes given list of commands atomically. That is, either all commands will be applied at once
* or neither of them. The whole bulk will increment catalog's version by a single point.
*
* @param commands Commands to execute.
* @return Future representing result of execution (it will be completed with the created catalog version).
* @return Future representing result of execution with the created catalog version.
*/
CompletableFuture<Integer> execute(List<CatalogCommand> commands);
CompletableFuture<CatalogApplyResult> execute(List<CatalogCommand> commands);

/**
* Returns a future, which completes when empty catalog is initialised. Otherwise this future completes upon startup.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;

import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
import java.util.Map.Entry;
import java.util.NavigableMap;
Expand Down Expand Up @@ -116,8 +117,8 @@ public class CatalogManagerImpl extends AbstractEventProducer<CatalogEvent, Cata
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();

/**
* Future used to chain local appends to UpdateLog to avoid useless concurrency (as all concurrent attempts to append compete for
* the same catalog version, hence only one will win and the rest will have to retry).
* Future used to chain local appends to UpdateLog to avoid useless concurrency (as all concurrent attempts to append compete for the
* same catalog version, hence only one will win and the rest will have to retry).
*
* <p>Guarded by {@link #lastSaveUpdateFutureMutex}.
*/
Expand Down Expand Up @@ -230,17 +231,17 @@ private Catalog catalogAt(long timestamp) {
}

@Override
public CompletableFuture<Integer> execute(CatalogCommand command) {
return saveUpdateAndWaitForActivation(command);
public CompletableFuture<CatalogApplyResult> execute(CatalogCommand command) {
return saveUpdateAndWaitForActivation(List.of(command));
}

@Override
public CompletableFuture<Integer> execute(List<CatalogCommand> commands) {
public CompletableFuture<CatalogApplyResult> execute(List<CatalogCommand> commands) {
if (nullOrEmpty(commands)) {
return nullCompletedFuture();
}

return saveUpdateAndWaitForActivation(new BulkUpdateProducer(List.copyOf(commands)));
return saveUpdateAndWaitForActivation(List.copyOf(commands));
}

/**
Expand Down Expand Up @@ -306,10 +307,10 @@ private void truncateUpTo(Catalog catalog) {
LOG.info("Catalog history was truncated up to version=" + catalog.version());
}

private CompletableFuture<Integer> saveUpdateAndWaitForActivation(UpdateProducer updateProducer) {
CompletableFuture<Integer> resultFuture = new CompletableFuture<>();
private CompletableFuture<CatalogApplyResult> saveUpdateAndWaitForActivation(List<UpdateProducer> updateProducers) {
CompletableFuture<CatalogApplyResult> resultFuture = new CompletableFuture<>();

saveUpdateEliminatingLocalConcurrency(updateProducer)
saveUpdateEliminatingLocalConcurrency(updateProducers)
.thenCompose(this::awaitVersionActivation)
.whenComplete((newVersion, err) -> {
if (err != null) {
Expand Down Expand Up @@ -345,10 +346,10 @@ private CompletableFuture<Integer> saveUpdateAndWaitForActivation(UpdateProducer
return resultFuture;
}

private CompletableFuture<Integer> saveUpdateEliminatingLocalConcurrency(UpdateProducer updateProducer) {
private CompletableFuture<CatalogApplyResult> saveUpdateEliminatingLocalConcurrency(List<UpdateProducer> updateProducer) {
// Avoid useless and wasteful competition for the save catalog version by enforcing an order.
synchronized (lastSaveUpdateFutureMutex) {
CompletableFuture<Integer> chainedFuture = lastSaveUpdateFuture
CompletableFuture<CatalogApplyResult> chainedFuture = lastSaveUpdateFuture
.thenCompose(unused -> saveUpdate(updateProducer, 0));

// Suppressing any exception to make sure it doesn't ruin subsequent appends. The suppression is not a problem
Expand All @@ -359,6 +360,10 @@ private CompletableFuture<Integer> saveUpdateEliminatingLocalConcurrency(UpdateP
}
}

private CompletableFuture<CatalogApplyResult> awaitVersionActivation(CatalogApplyResult catalogApplyResult) {
return awaitVersionActivation(catalogApplyResult.getCatalogVersion()).thenApply(unused -> catalogApplyResult);
}

private CompletableFuture<Integer> awaitVersionActivation(int version) {
Catalog catalog = catalogByVer.get(version);

Expand All @@ -372,14 +377,15 @@ private HybridTimestamp calcClusterWideEnsureActivationTime(Catalog catalog) {
}

/**
* Attempts to save a versioned update using a CAS-like logic. If the attempt fails, makes more attempts
* until the max retry count is reached.
* Attempts to save a versioned update using a CAS-like logic. If the attempt fails, makes more attempts until the max retry count is
* reached.
*
* @param updateProducer Supplies simple updates to include into a versioned update to install.
* @param attemptNo Ordinal number of an attempt.
* @return Future that completes with the new Catalog version (if update was saved successfully) or an exception, otherwise.
* @return Future that completes with the result of applying updates, contains the Catalog version when the updates are visible or an
* exception in case of any error.
*/
private CompletableFuture<Integer> saveUpdate(UpdateProducer updateProducer, int attemptNo) {
private CompletableFuture<CatalogApplyResult> saveUpdate(List<UpdateProducer> batchUpdateProducers, int attemptNo) {
xtern marked this conversation as resolved.
Show resolved Hide resolved
if (!busyLock.enterBusy()) {
return failedFuture(new NodeStoppingException());
}
Expand All @@ -391,17 +397,31 @@ private CompletableFuture<Integer> saveUpdate(UpdateProducer updateProducer, int

Catalog catalog = catalogByVer.lastEntry().getValue();

List<UpdateEntry> updates;
BitSet applyResults = new BitSet(batchUpdateProducers.size());
List<UpdateEntry> bulkUpdateEntries = new ArrayList<>();
try {
updates = updateProducer.get(catalog);
for (int i = 0; i < batchUpdateProducers.size(); i++) {
UpdateProducer update = batchUpdateProducers.get(i);
List<UpdateEntry> entries = update.get(catalog);

for (UpdateEntry entry : entries) {
catalog = entry.applyUpdate(catalog, INITIAL_CAUSALITY_TOKEN);
}

if (!entries.isEmpty()) {
applyResults.set(i);
bulkUpdateEntries.addAll(entries);
}
}
} catch (CatalogValidationException ex) {
return failedFuture(new CatalogVersionAwareValidationException(ex, catalog.version()));
} catch (Exception ex) {
return failedFuture(ex);
}

if (updates.isEmpty()) {
return completedFuture(catalog.version());
// all results are empty.
if (applyResults.cardinality() == 0) {
xtern marked this conversation as resolved.
Show resolved Hide resolved
return completedFuture(new CatalogApplyResult(applyResults, catalog.version()));
}

int newVersion = catalog.version() + 1;
Expand All @@ -411,14 +431,14 @@ private CompletableFuture<Integer> saveUpdate(UpdateProducer updateProducer, int
// after all reactions to that event will be completed through the catalog event notifications mechanism.
// This is important for the distribution zones recovery purposes:
// we guarantee recovery for a zones' catalog actions only if that actions were completed.
return updateLog.append(new VersionedUpdate(newVersion, delayDurationMsSupplier.getAsLong(), updates))
return updateLog.append(new VersionedUpdate(newVersion, delayDurationMsSupplier.getAsLong(), bulkUpdateEntries))
.thenCompose(result -> versionTracker.waitFor(newVersion).thenApply(none -> result))
.thenCompose(result -> {
if (result) {
return completedFuture(newVersion);
return completedFuture(new CatalogApplyResult(applyResults, newVersion));
}

return saveUpdate(updateProducer, attemptNo + 1);
return saveUpdate(batchUpdateProducers, attemptNo + 1);
});
} finally {
busyLock.leaveBusy();
Expand Down Expand Up @@ -499,7 +519,7 @@ private static Catalog applyUpdateFinal(Catalog catalog, VersionedUpdate update,

assert activationTimestamp > catalog.time()
: "Activation timestamp " + activationTimestamp + " must be greater than previous catalog version activation timestamp "
+ catalog.time();
+ catalog.time();

return new Catalog(
update.version(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,13 @@ public boolean ifNotExists() {
public List<UpdateEntry> get(Catalog catalog) {
CatalogSchemaDescriptor schema = schemaOrThrow(catalog, schemaName);

if (ifNotExists && schema.aliveIndex(indexName) != null) {
return List.of();
}

ensureNoTableIndexOrSysViewExistsWithGivenName(schema, indexName);

CatalogTableDescriptor table = tableOrThrow(schema, tableName);
CatalogTableDescriptor table = tableOrThrow(schema, tableName, false).orElseThrow();
korlov42 marked this conversation as resolved.
Show resolved Hide resolved

assert columns != null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogCommand;
Expand Down Expand Up @@ -74,7 +75,11 @@ private AlterTableAddColumnCommand(
public List<UpdateEntry> get(Catalog catalog) {
CatalogSchemaDescriptor schema = schemaOrThrow(catalog, schemaName);

CatalogTableDescriptor table = tableOrThrow(schema, tableName);
Optional<CatalogTableDescriptor> tableOpt = tableOrThrow(schema, tableName, ifTableExists);
if (tableOpt.isEmpty()) {
return List.of();
}
CatalogTableDescriptor table = tableOpt.get();

List<CatalogTableColumnDescriptor> columnDescriptors = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.util.List;
import java.util.Objects;
import java.util.Optional;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogCommand;
import org.apache.ignite.internal.catalog.CatalogValidationException;
Expand Down Expand Up @@ -91,7 +92,11 @@ private AlterTableAlterColumnCommand(
public List<UpdateEntry> get(Catalog catalog) {
CatalogSchemaDescriptor schema = schemaOrThrow(catalog, schemaName);

CatalogTableDescriptor table = tableOrThrow(schema, tableName);
Optional<CatalogTableDescriptor> tableOpt = tableOrThrow(schema, tableName, ifTableExists);
if (tableOpt.isEmpty()) {
return List.of();
}
CatalogTableDescriptor table = tableOpt.get();

CatalogTableColumnDescriptor origin = table.column(columnName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;

import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -78,7 +79,11 @@ private AlterTableDropColumnCommand(
public List<UpdateEntry> get(Catalog catalog) {
CatalogSchemaDescriptor schema = schemaOrThrow(catalog, schemaName);

CatalogTableDescriptor table = tableOrThrow(schema, tableName);
Optional<CatalogTableDescriptor> tableOpt = tableOrThrow(schema, tableName, ifTableExists);
if (tableOpt.isEmpty()) {
return List.of();
}
CatalogTableDescriptor table = tableOpt.get();

Set<String> indexedColumns = aliveIndexesForTable(catalog, table.id())
.flatMap(AlterTableDropColumnCommand::indexColumnNames)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import java.util.List;
import java.util.Objects;
import java.util.Optional;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogCommand;
import org.apache.ignite.internal.catalog.CatalogValidationException;
Expand Down Expand Up @@ -105,7 +106,11 @@ public boolean ifExists() {

@Override
public List<UpdateEntry> get(Catalog catalog) {
CatalogZoneDescriptor zone = zoneOrThrow(catalog, zoneName);
Optional<CatalogZoneDescriptor> zoneOpt = zoneOrThrow(catalog, zoneName, ifExists);
if (zoneOpt.isEmpty()) {
return List.of();
}
CatalogZoneDescriptor zone = zoneOpt.get();

CatalogZoneDescriptor descriptor = fromParamsAndPreviousValue(zone);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogCommand;
import org.apache.ignite.internal.catalog.CatalogValidationException;
Expand Down Expand Up @@ -54,7 +55,11 @@ private AlterZoneSetDefaultCommand(String zoneName, boolean ifExists) throws Cat

@Override
public List<UpdateEntry> get(Catalog catalog) {
CatalogZoneDescriptor zone = zoneOrThrow(catalog, zoneName);
Optional<CatalogZoneDescriptor> zoneOpt = zoneOrThrow(catalog, zoneName, ifExists);
if (zoneOpt.isEmpty()) {
return List.of();
}
CatalogZoneDescriptor zone = zoneOpt.get();

CatalogZoneDescriptor defaultZone = catalog.defaultZone();
if (defaultZone != null && zone.id() == defaultZone.id()) {
Expand Down
Loading