From e0f15aab71cda8fce3f06d2f53a4c7e7a4f3c523 Mon Sep 17 00:00:00 2001 From: "Xiaotian (Jackie) Jiang" <17555551+Jackie-Jiang@users.noreply.github.com> Date: Sun, 1 Nov 2020 22:34:34 -0800 Subject: [PATCH] Enhance TableRebalancer to support no-downtime rebalance for strict replica-group routing tables (#6212) On top of #6208, this PR enhances the TableRebalancer to support the no-downtime rebalance for strict replica-group routing, which still hold the minimum available replicas requirement. --- .../helix/core/rebalance/TableRebalancer.java | 106 ++++- .../core/rebalance/TableRebalancerTest.java | 372 +++++++++++++++--- 2 files changed, 404 insertions(+), 74 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java index 644e990e9b51..081e52c700ff 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java @@ -23,7 +23,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; +import java.util.TreeSet; import java.util.concurrent.TimeoutException; import org.I0Itec.zkclient.exception.ZkBadVersionException; import org.apache.commons.configuration.Configuration; @@ -46,6 +48,7 @@ import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment; import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory; import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils; +import org.apache.pinot.spi.config.table.RoutingConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; @@ -127,12 +130,15 @@ public RebalanceResult rebalance(TableConfig tableConfig, Configuration rebalanc int minReplicasToKeepUpForNoDowntime = rebalanceConfig .getInt(RebalanceConfigConstants.MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME, RebalanceConfigConstants.DEFAULT_MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME); + boolean enableStrictReplicaGroup = + tableConfig.getRoutingConfig() != null && RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE + .equalsIgnoreCase(tableConfig.getRoutingConfig().getInstanceSelectorType()); boolean bestEfforts = rebalanceConfig .getBoolean(RebalanceConfigConstants.BEST_EFFORTS, RebalanceConfigConstants.DEFAULT_BEST_EFFORTS); LOGGER.info( - "Start rebalancing table: {} with dryRun: {}, reassignInstances: {}, includeConsuming: {}, bootstrap: {}, downtime: {}, minReplicasToKeepUpForNoDowntime: {}, bestEfforts: {}", + "Start rebalancing table: {} with dryRun: {}, reassignInstances: {}, includeConsuming: {}, bootstrap: {}, downtime: {}, minReplicasToKeepUpForNoDowntime: {}, enableStrictReplicaGroup: {}, bestEfforts: {}", tableNameWithType, dryRun, reassignInstances, includeConsuming, bootstrap, downtime, - minReplicasToKeepUpForNoDowntime, bestEfforts); + minReplicasToKeepUpForNoDowntime, enableStrictReplicaGroup, bestEfforts); // Validate table config try { @@ -334,8 +340,8 @@ public RebalanceResult rebalance(TableConfig tableConfig, Configuration rebalanc minAvailableReplicas = Math.max(numReplicas + minReplicasToKeepUpForNoDowntime, 0); } - LOGGER.info("Rebalancing table: {} with minAvailableReplicas: {}, bestEfforts: {}", tableNameWithType, - minAvailableReplicas, bestEfforts); + LOGGER.info("Rebalancing table: {} with minAvailableReplicas: {}, enableStrictReplicaGroup: {}, bestEfforts: {}", + tableNameWithType, minAvailableReplicas, enableStrictReplicaGroup, bestEfforts); int expectedVersion = currentIdealState.getRecord().getVersion(); while (true) { // Wait for ExternalView to converge before updating the next IdealState @@ -373,8 +379,10 @@ public RebalanceResult rebalance(TableConfig tableConfig, Configuration rebalanc } if (currentAssignment.equals(targetAssignment)) { - LOGGER.info("Finished rebalancing table: {} with minAvailableReplicas: {}, bestEfforts: {} in {}ms.", - tableNameWithType, minAvailableReplicas, bestEfforts, System.currentTimeMillis() - startTimeMs); + LOGGER.info( + "Finished rebalancing table: {} with minAvailableReplicas: {}, enableStrictReplicaGroup: {}, bestEfforts: {} in {}ms.", + tableNameWithType, minAvailableReplicas, enableStrictReplicaGroup, bestEfforts, + System.currentTimeMillis() - startTimeMs); return new RebalanceResult(RebalanceResult.Status.DONE, "Success with minAvailableReplicas: " + minAvailableReplicas + " (both IdealState and ExternalView should reach the target segment assignment)", @@ -382,7 +390,7 @@ public RebalanceResult rebalance(TableConfig tableConfig, Configuration rebalanc } Map> nextAssignment = - getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas); + getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup); LOGGER.info("Got the next assignment for table: {} with number of segments to be moved to each instance: {}", tableNameWithType, SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, nextAssignment)); @@ -546,22 +554,76 @@ static boolean isExternalViewConverged(String tableNameWithType, return true; } - private static Map> getNextAssignment(Map> currentAssignment, - Map> targetAssignment, int minAvailableReplicas) { + /** + * Returns the next assignment for the table based on the current assignment and the target assignment with regards to + * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all + * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement + * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup) { + return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, + minAvailableReplicas) + : getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas); + } + + private static Map> getNextStrictReplicaGroupAssignment( + Map> currentAssignment, Map> targetAssignment, + int minAvailableReplicas) { Map> nextAssignment = new TreeMap<>(); + Map, Set> availableInstancesMap = new HashMap<>(); + for (Map.Entry> entry : currentAssignment.entrySet()) { + String segmentName = entry.getKey(); + Map currentInstanceStateMap = entry.getValue(); + Map targetInstanceStateMap = targetAssignment.get(segmentName); + SingleSegmentAssignment assignment = + getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, minAvailableReplicas); + Set assignedInstances = assignment._instanceStateMap.keySet(); + Set availableInstances = assignment._availableInstances; + availableInstancesMap.compute(assignedInstances, (k, currentAvailableInstances) -> { + if (currentAvailableInstances == null) { + // First segment assigned to these instances, use the new assignment and update the available instances + nextAssignment.put(segmentName, assignment._instanceStateMap); + return availableInstances; + } else { + // There are other segments assigned to the same instances, check the available instances to see if adding the + // new assignment can still hold the minimum available replicas requirement + availableInstances.retainAll(currentAvailableInstances); + if (availableInstances.size() >= minAvailableReplicas) { + // New assignment can be added + nextAssignment.put(segmentName, assignment._instanceStateMap); + return availableInstances; + } else { + // New assignment cannot be added, use the current instance state map + nextAssignment.put(segmentName, currentInstanceStateMap); + return currentAvailableInstances; + } + } + }); + } + return nextAssignment; + } + private static Map> getNextNonStrictReplicaGroupAssignment( + Map> currentAssignment, Map> targetAssignment, + int minAvailableReplicas) { + Map> nextAssignment = new TreeMap<>(); for (Map.Entry> entry : currentAssignment.entrySet()) { String segmentName = entry.getKey(); nextAssignment.put(segmentName, - getNextInstanceStateMap(entry.getValue(), targetAssignment.get(segmentName), minAvailableReplicas)); + getNextSingleSegmentAssignment(entry.getValue(), targetAssignment.get(segmentName), + minAvailableReplicas)._instanceStateMap); } - return nextAssignment; } + /** + * Returns the next assignment for a segment based on the current instance state map and the target instance state map + * with regards to the minimum available replicas requirement. + */ @VisibleForTesting - @SuppressWarnings("Duplicates") - static Map getNextInstanceStateMap(Map currentInstanceStateMap, + static SingleSegmentAssignment getNextSingleSegmentAssignment(Map currentInstanceStateMap, Map targetInstanceStateMap, int minAvailableReplicas) { Map nextInstanceStateMap = new TreeMap<>(); @@ -586,6 +648,7 @@ static Map getNextInstanceStateMap(Map currentIn } } } + Set availableInstances = new TreeSet<>(nextInstanceStateMap.keySet()); // Add target instances until the number of instances matched int instancesToAdd = targetInstanceStateMap.size() - nextInstanceStateMap.size(); @@ -601,6 +664,21 @@ static Map getNextInstanceStateMap(Map currentIn } } - return nextInstanceStateMap; + return new SingleSegmentAssignment(nextInstanceStateMap, availableInstances); + } + + /** + * Assignment result for a single segment. + */ + @VisibleForTesting + static class SingleSegmentAssignment { + final Map _instanceStateMap; + // Instances that are common in both current instance state and next instance state of the segment + final Set _availableInstances; + + SingleSegmentAssignment(Map instanceStateMap, Set availableInstances) { + _instanceStateMap = instanceStateMap; + _availableInstances = availableInstances; + } } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java index a4e41f977989..681ed2c00b5b 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java @@ -19,8 +19,10 @@ package org.apache.pinot.controller.helix.core.rebalance; import java.util.Arrays; +import java.util.Collections; import java.util.Map; import java.util.TreeMap; +import java.util.TreeSet; import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils; import org.testng.annotations.Test; @@ -43,27 +45,31 @@ public void testDowntimeMode() { SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2"), ONLINE); Map targetInstanceStateMap = SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host3"), ONLINE); - Map nextInstanceStateMap = - TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, targetInstanceStateMap, 0); - assertEquals(nextInstanceStateMap, targetInstanceStateMap); + TableRebalancer.SingleSegmentAssignment assignment = + TableRebalancer.getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 0); + assertEquals(assignment._instanceStateMap, targetInstanceStateMap); + assertEquals(assignment._availableInstances, Collections.singleton("host1")); // Without common instance, next assignment should be the same as target assignment targetInstanceStateMap = SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host3", "host4"), ONLINE); - nextInstanceStateMap = TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, targetInstanceStateMap, 0); - assertEquals(nextInstanceStateMap, targetInstanceStateMap); + assignment = TableRebalancer.getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 0); + assertEquals(assignment._instanceStateMap, targetInstanceStateMap); + assertTrue(assignment._availableInstances.isEmpty()); // With increasing number of replicas, next assignment should be the same as target assignment targetInstanceStateMap = SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host3", "host4", "host5"), ONLINE); - nextInstanceStateMap = TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, targetInstanceStateMap, 0); - assertEquals(nextInstanceStateMap, targetInstanceStateMap); + assignment = TableRebalancer.getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 0); + assertEquals(assignment._instanceStateMap, targetInstanceStateMap); + assertTrue(assignment._availableInstances.isEmpty()); // With decreasing number of replicas, next assignment should be the same as target assignment currentInstanceStateMap = SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3"), ONLINE); targetInstanceStateMap = SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host4", "host5"), ONLINE); - nextInstanceStateMap = TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, targetInstanceStateMap, 0); - assertEquals(nextInstanceStateMap, targetInstanceStateMap); + assignment = TableRebalancer.getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 0); + assertEquals(assignment._instanceStateMap, targetInstanceStateMap); + assertTrue(assignment._availableInstances.isEmpty()); } @Test @@ -73,35 +79,41 @@ public void testOneMinAvailableReplicas() { SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3"), ONLINE); Map targetInstanceStateMap = SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host4"), ONLINE); - Map nextInstanceStateMap = - TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, targetInstanceStateMap, 1); - assertEquals(nextInstanceStateMap, targetInstanceStateMap); + TableRebalancer.SingleSegmentAssignment assignment = + TableRebalancer.getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1); + assertEquals(assignment._instanceStateMap, targetInstanceStateMap); + assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2"))); // With 1 common instance, next assignment should be the same as target assignment targetInstanceStateMap = SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host4", "host5"), ONLINE); - nextInstanceStateMap = TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, targetInstanceStateMap, 1); - assertEquals(nextInstanceStateMap, targetInstanceStateMap); + assignment = TableRebalancer.getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1); + assertEquals(assignment._instanceStateMap, targetInstanceStateMap); + assertEquals(assignment._availableInstances, Collections.singleton("host1")); // Without common instance, next assignment should have 1 common instances with current assignment targetInstanceStateMap = SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host4", "host5", "host6"), ONLINE); // [host1, host4, host5] - nextInstanceStateMap = TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, targetInstanceStateMap, 1); - assertEquals(getNumCommonInstances(currentInstanceStateMap, nextInstanceStateMap), 1); + assignment = TableRebalancer.getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1); + assertEquals(assignment._availableInstances, Collections.singleton("host1")); // Next round should make the assignment the same as target assignment - assertEquals(TableRebalancer.getNextInstanceStateMap(nextInstanceStateMap, targetInstanceStateMap, 1), - targetInstanceStateMap); + assignment = + TableRebalancer.getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1); + assertEquals(assignment._instanceStateMap, targetInstanceStateMap); + assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host4", "host5"))); // With increasing number of replicas, next assignment should have 1 common instances with current assignment targetInstanceStateMap = SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host4", "host5", "host6", "host7"), ONLINE); // [host1, host4, host5, host6] - nextInstanceStateMap = TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, targetInstanceStateMap, 1); - assertEquals(getNumCommonInstances(currentInstanceStateMap, nextInstanceStateMap), 1); + assignment = TableRebalancer.getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1); + assertEquals(assignment._availableInstances, Collections.singleton("host1")); // Next round should make the assignment the same as target assignment - assertEquals(TableRebalancer.getNextInstanceStateMap(nextInstanceStateMap, targetInstanceStateMap, 1), - targetInstanceStateMap); + assignment = + TableRebalancer.getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1); + assertEquals(assignment._instanceStateMap, targetInstanceStateMap); + assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host4", "host5", "host6"))); // With decreasing number of replicas, next assignment should have 1 common instances with current assignment currentInstanceStateMap = @@ -109,11 +121,13 @@ public void testOneMinAvailableReplicas() { targetInstanceStateMap = SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host5", "host6", "host7"), ONLINE); // [host1, host5, host6] - nextInstanceStateMap = TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, targetInstanceStateMap, 1); - assertEquals(getNumCommonInstances(currentInstanceStateMap, nextInstanceStateMap), 1); + assignment = TableRebalancer.getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1); + assertEquals(assignment._availableInstances, Collections.singleton("host1")); // Next round should make the assignment the same as target assignment - assertEquals(TableRebalancer.getNextInstanceStateMap(nextInstanceStateMap, targetInstanceStateMap, 1), - targetInstanceStateMap); + assignment = + TableRebalancer.getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1); + assertEquals(assignment._instanceStateMap, targetInstanceStateMap); + assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host5", "host6"))); } @Test @@ -123,72 +137,310 @@ public void testTwoMinAvailableReplicas() { SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3", "host4"), ONLINE); Map targetInstanceStateMap = SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3", "host5"), ONLINE); - Map nextInstanceStateMap = - TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, targetInstanceStateMap, 2); - assertEquals(nextInstanceStateMap, targetInstanceStateMap); + TableRebalancer.SingleSegmentAssignment assignment = + TableRebalancer.getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2); + assertEquals(assignment._instanceStateMap, targetInstanceStateMap); + assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2", "host3"))); // With 2 common instances, next assignment should be the same as target assignment targetInstanceStateMap = SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host5", "host6"), ONLINE); - nextInstanceStateMap = TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, targetInstanceStateMap, 2); - assertEquals(nextInstanceStateMap, targetInstanceStateMap); + assignment = TableRebalancer.getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2); + assertEquals(assignment._instanceStateMap, targetInstanceStateMap); + assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2"))); // With 1 common instance, next assignment should have 2 common instances with current assignment targetInstanceStateMap = SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host5", "host6", "host7"), ONLINE); - nextInstanceStateMap = TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, targetInstanceStateMap, 2); // [host1, host2, host5, host6] - assertEquals(getNumCommonInstances(currentInstanceStateMap, nextInstanceStateMap), 2); + assignment = TableRebalancer.getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2); + assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2"))); // Next round should make the assignment the same as target assignment - assertEquals(TableRebalancer.getNextInstanceStateMap(nextInstanceStateMap, targetInstanceStateMap, 2), - targetInstanceStateMap); + assignment = + TableRebalancer.getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2); + assertEquals(assignment._instanceStateMap, targetInstanceStateMap); + assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host5", "host6"))); // Without common instance, next assignment should have 2 common instances with current assignment targetInstanceStateMap = SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host5", "host6", "host7", "host8"), ONLINE); // [host1, host2, host5, host6] - nextInstanceStateMap = TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, targetInstanceStateMap, 2); - assertEquals(getNumCommonInstances(currentInstanceStateMap, nextInstanceStateMap), 2); + assignment = TableRebalancer.getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2); + assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2"))); // Next round should make the assignment the same as target assignment - assertEquals(TableRebalancer.getNextInstanceStateMap(nextInstanceStateMap, targetInstanceStateMap, 2), - targetInstanceStateMap); + assignment = + TableRebalancer.getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2); + assertEquals(assignment._instanceStateMap, targetInstanceStateMap); + assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host5", "host6"))); // With increasing number of replicas, next assignment should have 1 common instances with current assignment targetInstanceStateMap = SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host5", "host6", "host7", "host8", "host9"), ONLINE); // [host1, host2, host5, host6, host7] - nextInstanceStateMap = TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, targetInstanceStateMap, 2); - assertEquals(getNumCommonInstances(currentInstanceStateMap, nextInstanceStateMap), 2); + assignment = TableRebalancer.getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2); + assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2"))); // Next round should make the assignment the same as target assignment - assertEquals(TableRebalancer.getNextInstanceStateMap(nextInstanceStateMap, targetInstanceStateMap, 2), - targetInstanceStateMap); + assignment = + TableRebalancer.getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2); + assertEquals(assignment._instanceStateMap, targetInstanceStateMap); + assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host5", "host6", "host7"))); // With decreasing number of replicas, next assignment should have 2 common instances with current assignment targetInstanceStateMap = SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host5", "host6", "host7"), ONLINE); // [host1, host2, host5] - Map firstRoundInstanceStateMap = - TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, targetInstanceStateMap, 2); - assertEquals(getNumCommonInstances(currentInstanceStateMap, firstRoundInstanceStateMap), 2); + assignment = TableRebalancer.getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2); + assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2"))); // Next round should have 2 common instances with first round assignment // [host1, host5, host6] - Map secondRoundInstanceStateMap = - TableRebalancer.getNextInstanceStateMap(firstRoundInstanceStateMap, targetInstanceStateMap, 2); - assertEquals(getNumCommonInstances(firstRoundInstanceStateMap, secondRoundInstanceStateMap), 2); + assignment = + TableRebalancer.getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2); + assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host5"))); // Next round should make the assignment the same as target assignment - assertEquals(TableRebalancer.getNextInstanceStateMap(secondRoundInstanceStateMap, targetInstanceStateMap, 2), - targetInstanceStateMap); + assignment = + TableRebalancer.getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2); + assertEquals(assignment._instanceStateMap, targetInstanceStateMap); + assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host5", "host6"))); } - private int getNumCommonInstances(Map currentInstanceStateMap, - Map nextInstanceStateMap) { - int numCommonInstances = 0; - for (String instanceId : currentInstanceStateMap.keySet()) { - if (nextInstanceStateMap.containsKey(instanceId)) { - numCommonInstances++; - } - } - return numCommonInstances; + @Test + public void testStrictReplicaGroup() { + // Current assignment: + // { + // "segment1": { + // "host1": "ONLINE", + // "host2": "ONLINE", + // "host3": "ONLINE" + // }, + // "segment2": { + // "host2": "ONLINE", + // "host3": "ONLINE", + // "host4": "ONLINE" + // }, + // "segment3": { + // "host1": "ONLINE", + // "host2": "ONLINE", + // "host3": "ONLINE" + // }, + // "segment4": { + // "host2": "ONLINE", + // "host3": "ONLINE", + // "host4": "ONLINE" + // } + // } + Map> currentAssignment = new TreeMap<>(); + currentAssignment + .put("segment1", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3"), ONLINE)); + currentAssignment + .put("segment2", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host3", "host4"), ONLINE)); + currentAssignment + .put("segment3", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3"), ONLINE)); + currentAssignment + .put("segment4", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host3", "host4"), ONLINE)); + + // Target assignment: + // { + // "segment1": { + // "host1": "ONLINE", + // "host3": "ONLINE", + // "host5": "ONLINE" + // }, + // "segment2": { + // "host2": "ONLINE", + // "host4": "ONLINE", + // "host6": "ONLINE" + // }, + // "segment3": { + // "host1": "ONLINE", + // "host3": "ONLINE", + // "host5": "ONLINE" + // }, + // "segment4": { + // "host2": "ONLINE", + // "host4": "ONLINE", + // "host6": "ONLINE" + // } + // } + Map> targetAssignment = new TreeMap<>(); + targetAssignment + .put("segment1", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host3", "host5"), ONLINE)); + targetAssignment + .put("segment2", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host4", "host6"), ONLINE)); + targetAssignment + .put("segment3", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host3", "host5"), ONLINE)); + targetAssignment + .put("segment4", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host4", "host6"), ONLINE)); + + // Next assignment with 2 minimum available replicas with or without strict replica-group should reach the target + // assignment + Map> nextAssignment = + TableRebalancer.getNextAssignment(currentAssignment, targetAssignment, 2, false); + assertEquals(nextAssignment, targetAssignment); + nextAssignment = TableRebalancer.getNextAssignment(currentAssignment, targetAssignment, 2, true); + assertEquals(nextAssignment, targetAssignment); + + // Current assignment: + // { + // "segment1": { + // "host1": "ONLINE", + // "host2": "ONLINE", + // "host3": "ONLINE" + // }, + // "segment2": { + // "host2": "ONLINE", + // "host3": "ONLINE", + // "host4": "ONLINE" + // }, + // "segment3": { + // "host1": "ONLINE", + // "host2": "ONLINE", + // "host3": "ONLINE" + // }, + // "segment4": { + // "host2": "ONLINE", + // "host3": "ONLINE", + // "host4": "ONLINE" + // } + // } + currentAssignment = new TreeMap<>(); + currentAssignment + .put("segment1", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3"), ONLINE)); + currentAssignment + .put("segment2", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host3", "host4"), ONLINE)); + currentAssignment + .put("segment3", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3"), ONLINE)); + currentAssignment + .put("segment4", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host3", "host4"), ONLINE)); + + // Target assignment: + // { + // "segment1": { + // "host2": "ONLINE", + // "host4": "ONLINE", + // "host6": "ONLINE" + // }, + // "segment2": { + // "host1": "ONLINE", + // "host4": "ONLINE", + // "host5": "ONLINE" + // }, + // "segment3": { + // "host2": "ONLINE", + // "host4": "ONLINE", + // "host6": "ONLINE" + // }, + // "segment4": { + // "host1": "ONLINE", + // "host4": "ONLINE", + // "host5": "ONLINE" + // } + // } + targetAssignment = new TreeMap<>(); + targetAssignment + .put("segment1", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host4", "host6"), ONLINE)); + targetAssignment + .put("segment2", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host4", "host5"), ONLINE)); + targetAssignment + .put("segment3", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host4", "host6"), ONLINE)); + targetAssignment + .put("segment4", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host4", "host5"), ONLINE)); + + // Next assignment with 2 minimum available replicas without strict replica-group: + // (This assignment will move "segment1" and "segment3" from "host3" to "host4", and move "segment2" and "segment4" + // from "host3" to "host1". "host1" and "host4" might be unavailable for strict replica-group routing, which breaks + // the minimum available replicas requirement.) + // { + // "segment1": { + // "host1": "ONLINE", + // "host2": "ONLINE", + // "host4": "ONLINE" + // }, + // "segment2": { + // "host1": "ONLINE", + // "host2": "ONLINE", + // "host4": "ONLINE" + // }, + // "segment3": { + // "host1": "ONLINE", + // "host2": "ONLINE", + // "host4": "ONLINE" + // }, + // "segment4": { + // "host1": "ONLINE", + // "host2": "ONLINE", + // "host4": "ONLINE" + // } + // } + nextAssignment = TableRebalancer.getNextAssignment(currentAssignment, targetAssignment, 2, false); + assertEquals(nextAssignment.get("segment1").keySet(), new TreeSet<>(Arrays.asList("host1", "host2", "host4"))); + assertEquals(nextAssignment.get("segment2").keySet(), new TreeSet<>(Arrays.asList("host1", "host2", "host4"))); + assertEquals(nextAssignment.get("segment3").keySet(), new TreeSet<>(Arrays.asList("host1", "host2", "host4"))); + assertEquals(nextAssignment.get("segment4").keySet(), new TreeSet<>(Arrays.asList("host1", "host2", "host4"))); + + // Next assignment with 2 minimum available replicas with strict replica-group: + // (This assignment will only move "segment1" and "segment3" from "host3" to "host4". Only "host4" can be + // unavailable for strict replica-group routing during the rebalance, which meets the minimum available replicas + // requirement.) + // { + // "segment1": { + // "host1": "ONLINE", + // "host2": "ONLINE", + // "host4": "ONLINE" + // }, + // "segment2": { + // "host2": "ONLINE", + // "host3": "ONLINE", + // "host4": "ONLINE" + // }, + // "segment3": { + // "host1": "ONLINE", + // "host2": "ONLINE", + // "host4": "ONLINE" + // }, + // "segment4": { + // "host2": "ONLINE", + // "host3": "ONLINE", + // "host4": "ONLINE" + // } + // } + nextAssignment = TableRebalancer.getNextAssignment(currentAssignment, targetAssignment, 2, true); + assertEquals(nextAssignment.get("segment1").keySet(), new TreeSet<>(Arrays.asList("host1", "host2", "host4"))); + assertEquals(nextAssignment.get("segment2").keySet(), new TreeSet<>(Arrays.asList("host2", "host3", "host4"))); + assertEquals(nextAssignment.get("segment3").keySet(), new TreeSet<>(Arrays.asList("host1", "host2", "host4"))); + assertEquals(nextAssignment.get("segment4").keySet(), new TreeSet<>(Arrays.asList("host2", "host3", "host4"))); + + // Next assignment with 2 minimum available replicas with strict replica-group: + // { + // "segment1": { + // "host2": "ONLINE", + // "host4": "ONLINE", + // "host6": "ONLINE" + // }, + // "segment2": { + // "host1": "ONLINE", + // "host2": "ONLINE", + // "host4": "ONLINE" + // }, + // "segment3": { + // "host2": "ONLINE", + // "host4": "ONLINE", + // "host6": "ONLINE" + // }, + // "segment4": { + // "host1": "ONLINE", + // "host2": "ONLINE", + // "host4": "ONLINE" + // } + // } + nextAssignment = TableRebalancer.getNextAssignment(nextAssignment, targetAssignment, 2, true); + assertEquals(nextAssignment.get("segment1").keySet(), new TreeSet<>(Arrays.asList("host2", "host4", "host6"))); + assertEquals(nextAssignment.get("segment2").keySet(), new TreeSet<>(Arrays.asList("host1", "host2", "host4"))); + assertEquals(nextAssignment.get("segment3").keySet(), new TreeSet<>(Arrays.asList("host2", "host4", "host6"))); + assertEquals(nextAssignment.get("segment4").keySet(), new TreeSet<>(Arrays.asList("host1", "host2", "host4"))); + + // Next assignment with 2 minimum available replicas with strict replica-group should reach the target assignment + nextAssignment = TableRebalancer.getNextAssignment(nextAssignment, targetAssignment, 2, true); + assertEquals(nextAssignment, targetAssignment); } @Test