Skip to content

Commit

Permalink
[PLAT-14303][dr] Support rollback of switchover operation
Browse files Browse the repository at this point in the history
Summary:
This diff adds the rollback api `POST   /customers/:cUUID/tasks/:tUUID/rollback` with empty body. Then it reads the task params associated with task uuid `:tUUID` from the YBA database and creates a task to roll it back.

It also adds the rollback button for the task in the UI.

This diff specifically utilizes the above api to implement the rollback support for a failed switchover task. During swtichover a new xCluster config in the reverse direction is created and the old xCluster config is deleted. The rollback of a switchover is doable if the previous xCluster config is not yet deleted, otherwise, the user has to retry the failed task. This diff makes sure the right error is shown to the user in this case.

Test Plan:
- Aborted the switchover task in the middle and called the rollback api on the task and made sure the rollback task is created properly and the DR config end up in the right state after the rollback task finishes.

...TBD more tests...

Reviewers: #yba-api-review!, nsingh, cwang, jmak, yshchetinin, nbhatia

Reviewed By: cwang

Subscribers: yugaware

Differential Revision: https://phorge.dev.yugabyte.com/D40514
  • Loading branch information
shahrooz1997 committed Jan 22, 2025
1 parent c15f41e commit 3c52b9e
Show file tree
Hide file tree
Showing 28 changed files with 645 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,16 @@ public static boolean isTaskTypeRetryable(TaskType taskType) {
return TaskExecutor.isTaskRetryable(taskType.getTaskClass());
}

/**
* Returns true if the task identified by the task type can rollback.
*
* @param taskType the task type.
* @return true if can rollback.
*/
public static boolean canTaskTypeRollback(TaskType taskType) {
return TaskExecutor.canTaskRollback(taskType.getTaskClass());
}

/**
* Creates a new task runnable to run the required task, and submits it to the TaskExecutor.
*
Expand Down Expand Up @@ -353,6 +363,7 @@ public Optional<ObjectNode> buildTaskStatus(
return taskUuidsToAllowRetry.contains(taskInfo.getUuid().toString());
});
responseJson.put("retryable", retryable);
responseJson.put("canRollback", canTaskRollback(taskInfo));
if (isTaskPaused(taskInfo.getUuid())) {
// Set this only if it is true. The thread is just parking. From the task state
// perspective, it is still running.
Expand Down Expand Up @@ -380,6 +391,11 @@ public boolean isTaskRetryable(TaskInfo taskInfo, Predicate<TaskInfo> moreCondit
return false;
}

public boolean canTaskRollback(TaskInfo taskInfo) {
return canTaskTypeRollback(taskInfo.getTaskType())
&& TaskInfo.ERROR_STATES.contains(taskInfo.getTaskState());
}

public ObjectNode getVersionInfo(CustomerTask task, TaskInfo taskInfo) {
ObjectNode versionNumbers = Json.newObject();
JsonNode taskParams = taskInfo.getTaskParams();
Expand Down
7 changes: 7 additions & 0 deletions managed/src/main/java/com/yugabyte/yw/commissioner/ITask.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ public interface ITask extends Runnable {
boolean enabled() default true;
}

/** Annotation for a ITask class to enable/disable rollback on a Task Type. */
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@interface CanRollback {
boolean enabled() default true;
}

/** Annotation for a ITask class to enable/disable abortable. */
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.common.collect.Sets;
import com.google.inject.Provider;
import com.yugabyte.yw.commissioner.ITask.Abortable;
import com.yugabyte.yw.commissioner.ITask.CanRollback;
import com.yugabyte.yw.commissioner.ITask.Retryable;
import com.yugabyte.yw.commissioner.UserTaskDetails.SubTaskGroupType;
import com.yugabyte.yw.common.DrainableMap;
Expand Down Expand Up @@ -240,6 +241,17 @@ static boolean isTaskRetryable(Class<? extends ITask> taskClass) {
return optional.map(Retryable::enabled).orElse(false);
}

/**
* It returns a boolean showing whether the task can be rolled back or not.
*
* <p>See {@link TaskExecutor#isTaskRetryable(Class)}
*/
static boolean canTaskRollback(Class<? extends ITask> taskClass) {
checkNotNull(taskClass, "Task class must be non-null");
Optional<CanRollback> optional = CommonUtils.isAnnotatedWith(taskClass, CanRollback.class);
return optional.map(CanRollback::enabled).orElse(false);
}

/**
* Returns the task type for the given task class.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import com.yugabyte.yw.commissioner.BaseTaskDependencies;
import com.yugabyte.yw.commissioner.ITask.Abortable;
import com.yugabyte.yw.commissioner.ITask.CanRollback;
import com.yugabyte.yw.commissioner.ITask.Retryable;
import com.yugabyte.yw.commissioner.UserTaskDetails;
import com.yugabyte.yw.commissioner.UserTaskDetails.SubTaskGroupType;
Expand All @@ -23,6 +24,7 @@
@Slf4j
@Retryable
@Abortable
@CanRollback
public class SwitchoverDrConfig extends EditDrConfig {

@Inject
Expand Down Expand Up @@ -50,7 +52,7 @@ public void run() {
// which means the old xCluster config can potentially be deleted.
if (isFirstTry() && Objects.isNull(currentXClusterConfig)) {
throw new IllegalStateException(
"The old xCluster config does not exist and cannot do a failover");
"The old xCluster config does not exist and cannot do a switchover");
} else if (!isFirstTry() && Objects.isNull(currentXClusterConfig)) {
log.warn("The old xCluster config got deleted in the previous run");
}
Expand Down Expand Up @@ -130,7 +132,7 @@ public void run() {
createDeleteXClusterConfigSubtasks(
currentXClusterConfig,
false /* keepEntry */,
false /*forceDelete*/,
false /* forceDelete */,
false /* deleteSourcePitrConfigs */,
false /* deleteTargetPitrConfigs */);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Copyright 2024 YugaByte, Inc. and Contributors
*
* Licensed under the Polyform Free Trial License 1.0.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://github.com/YugaByte/yugabyte-db/blob/master/licenses/POLYFORM-FREE-TRIAL-LICENSE-1.0.0.txt
*/

package com.yugabyte.yw.commissioner.tasks;

import com.yugabyte.yw.commissioner.BaseTaskDependencies;
import com.yugabyte.yw.commissioner.ITask.Abortable;
import com.yugabyte.yw.commissioner.ITask.Retryable;
import com.yugabyte.yw.commissioner.UserTaskDetails;
import com.yugabyte.yw.common.DrConfigStates.SourceUniverseState;
import com.yugabyte.yw.common.DrConfigStates.State;
import com.yugabyte.yw.common.DrConfigStates.TargetUniverseState;
import com.yugabyte.yw.common.XClusterUniverseService;
import com.yugabyte.yw.models.Restore;
import com.yugabyte.yw.models.Universe;
import com.yugabyte.yw.models.XClusterConfig;
import com.yugabyte.yw.models.XClusterConfig.ConfigType;
import com.yugabyte.yw.models.XClusterTableConfig;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import javax.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import org.yb.cdc.CdcConsumer.XClusterRole;

/**
* Task to perform a roll back a switchover task of the DR config that has failed or aborted. This
* task will make sure the old xCluster config is in Running statue while the new xCluster config is
* deleted.
*/
@Slf4j
@Retryable
@Abortable
public class SwitchoverDrConfigRollback extends SwitchoverDrConfig {

@Inject
protected SwitchoverDrConfigRollback(
BaseTaskDependencies baseTaskDependencies, XClusterUniverseService xClusterUniverseService) {
super(baseTaskDependencies, xClusterUniverseService);
}

@Override
public void run() {
log.info("Running {}", getName());

XClusterConfig currentXClusterConfig = taskParams().getOldXClusterConfig();
XClusterConfig switchoverXClusterConfig = getXClusterConfigFromTaskParams();
UUID sourceUniverseUUID;
UUID targetUniverseUUID;
if (Objects.nonNull(currentXClusterConfig)) {
sourceUniverseUUID = currentXClusterConfig.getSourceUniverseUUID();
targetUniverseUUID = currentXClusterConfig.getTargetUniverseUUID();
} else if (Objects.nonNull(switchoverXClusterConfig)) {
// In switchoverXClusterConfig, the source and target universes are swapped.
sourceUniverseUUID = switchoverXClusterConfig.getTargetUniverseUUID();
targetUniverseUUID = switchoverXClusterConfig.getSourceUniverseUUID();
} else {
throw new IllegalStateException("Both old and new xCluster configs are null");
}
Universe sourceUniverse = Universe.getOrBadRequest(sourceUniverseUUID);
Universe targetUniverse = Universe.getOrBadRequest(targetUniverseUUID);
try {
// Lock the source universe.
lockAndFreezeUniverseForUpdate(
sourceUniverse.getUniverseUUID(), sourceUniverse.getVersion(), null /* Txn callback */);
try {
// Lock the target universe.
lockAndFreezeUniverseForUpdate(
targetUniverse.getUniverseUUID(), targetUniverse.getVersion(), null /* Txn callback */);

// The previous xCluster config is already in Running state, we only need to delete the new
// switchover xCluster config.
if (Objects.nonNull(switchoverXClusterConfig)) {
createDeleteXClusterConfigSubtasks(
switchoverXClusterConfig,
false /* keepEntry */,
false /* forceDelete */,
false /* deleteSourcePitrConfigs */,
false /* deleteTargetPitrConfigs */);
}

if (currentXClusterConfig.getType() == ConfigType.Txn) {
// Set the target universe role to Standby.
createChangeXClusterRoleTask(
currentXClusterConfig,
null /* sourceRole */,
XClusterRole.STANDBY /* targetRole */,
false /* ignoreErrors */)
.setSubTaskGroupType(UserTaskDetails.SubTaskGroupType.ConfigureUniverse);
}

createPromoteSecondaryConfigToMainConfigTask(currentXClusterConfig);

createSetDrStatesTask(
currentXClusterConfig,
State.Replicating,
SourceUniverseState.ReplicatingData,
TargetUniverseState.ReceivingData,
null /* keyspacePending */)
.setSubTaskGroupType(UserTaskDetails.SubTaskGroupType.ConfigureUniverse);

createMarkUniverseUpdateSuccessTasks(targetUniverse.getUniverseUUID())
.setSubTaskGroupType(UserTaskDetails.SubTaskGroupType.ConfigureUniverse);

createMarkUniverseUpdateSuccessTasks(sourceUniverse.getUniverseUUID())
.setSubTaskGroupType(UserTaskDetails.SubTaskGroupType.ConfigureUniverse);

getRunnableTask().runSubTasks();
} catch (Exception e) {
log.error("{} hit error : {}", getName(), e.getMessage());
// Set tables in updating status to failed.
Set<String> tablesInPendingStatus =
switchoverXClusterConfig.getTableIdsInStatus(
getTableIds(taskParams().getTableInfoList()),
X_CLUSTER_TABLE_CONFIG_PENDING_STATUS_LIST);
switchoverXClusterConfig.updateStatusForTables(
tablesInPendingStatus, XClusterTableConfig.Status.Failed);
// Set backup and restore status to failed and alter load balanced.
boolean isLoadBalancerAltered = false;
for (Restore restore : restoreList) {
isLoadBalancerAltered = isLoadBalancerAltered || restore.isAlterLoadBalancer();
}
handleFailedBackupAndRestore(
backupList, restoreList, false /* isAbort */, isLoadBalancerAltered);
throw new RuntimeException(e);
} finally {
// Unlock the target universe.
unlockUniverseForUpdate(targetUniverse.getUniverseUUID());
}
} finally {
// Unlock the source universe.
unlockUniverseForUpdate(sourceUniverse.getUniverseUUID());
}

log.info("Completed {}", getName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
import com.yugabyte.yw.commissioner.BaseTaskDependencies;
import com.yugabyte.yw.commissioner.tasks.XClusterConfigTaskBase;
import com.yugabyte.yw.common.XClusterUniverseService;
import com.yugabyte.yw.models.DrConfig;
import com.yugabyte.yw.models.XClusterConfig;
import io.ebean.DB;
import io.ebean.Transaction;
import javax.inject.Inject;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -29,8 +32,35 @@ public void run() {

XClusterConfig xClusterConfig = getXClusterConfigFromTaskParams();

// Delete the config.
xClusterConfig.delete();
try (Transaction transaction = DB.beginTransaction()) {
// Promote a secondary xCluster config to primary if required.
if (xClusterConfig.isUsedForDr() && !xClusterConfig.isSecondary()) {
DrConfig drConfig = xClusterConfig.getDrConfig();
drConfig.refresh();
log.info(
"DR config {} has {} xCluster configs",
drConfig.getUuid(),
drConfig.getXClusterConfigs().size());
if (drConfig.getXClusterConfigs().size() > 1) {
XClusterConfig secondaryXClusterConfig =
drConfig.getXClusterConfigs().stream()
.filter(config -> !config.equals(xClusterConfig))
.findFirst()
.orElseThrow(() -> new IllegalStateException("No other xCluster config found"));
secondaryXClusterConfig.setSecondary(false);
secondaryXClusterConfig.update(); // Mark as primary (no longer secondary).
}
}

// Delete the xClusterConfig.
xClusterConfig.delete();

// Commit the transaction.
transaction.commit();
} catch (Exception e) {
log.error("{} hit error : {}", getName(), e.getMessage());
throw new RuntimeException(e);
}

log.info("Completed {}", getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.yugabyte.yw.commissioner.BaseTaskDependencies;
import com.yugabyte.yw.commissioner.tasks.XClusterConfigTaskBase;
import com.yugabyte.yw.common.XClusterUniverseService;
import com.yugabyte.yw.models.DrConfig;
import com.yugabyte.yw.models.XClusterConfig;
import javax.inject.Inject;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -28,8 +29,22 @@ public void run() {

XClusterConfig xClusterConfig = getXClusterConfigFromTaskParams();

xClusterConfig.setSecondary(false);
xClusterConfig.update();
if (!xClusterConfig.isUsedForDr()) {
throw new IllegalArgumentException("The xCluster config is not used for DR");
}

// The following update happens in a transaction which ensure a dr config has exactly one
// primary xCluster config.
DrConfig drConfig = xClusterConfig.getDrConfig();
drConfig.refresh();
log.debug(
"DR config {} has {} xCluster configs",
drConfig.getUuid(),
drConfig.getXClusterConfigs().size());
drConfig
.getXClusterConfigs()
.forEach(config -> config.setSecondary(!config.equals(xClusterConfig)));
drConfig.update();

log.info("Completed {}", getName());
}
Expand Down
Loading

0 comments on commit 3c52b9e

Please sign in to comment.