Skip to content

Commit

Permalink
Optimize the SchedulingPolicy logic, and implement a plugin-based fra…
Browse files Browse the repository at this point in the history
…mework for the sorter of the policy
  • Loading branch information
张文领 committed Jan 13, 2025
1 parent 0cc0529 commit e8ca237
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,35 @@
import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.api.BlockableOperation;
import org.apache.amoro.resource.ResourceGroup;
import org.apache.amoro.server.optimizing.sorter.QuotaOccupySorter;
import org.apache.amoro.server.optimizing.sorter.SorterFactory;
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class SchedulingPolicy {

public static final Logger LOG = LoggerFactory.getLogger(SchedulingPolicy.class);

private static final String SCHEDULING_POLICY_PROPERTY_NAME = "scheduling-policy";
private static final String QUOTA = "quota";
private static final String BALANCED = "balanced";

private final Map<ServerTableIdentifier, TableRuntime> tableRuntimeMap = new HashMap<>();
private volatile String policyName;
private final Lock tableLock = new ReentrantLock();
private static final Map<String, SorterFactory> sorterFactoryCache = new ConcurrentHashMap<>();

public SchedulingPolicy(ResourceGroup group) {
setTableSorterIfNeeded(group);
Expand All @@ -53,12 +61,26 @@ public void setTableSorterIfNeeded(ResourceGroup optimizerGroup) {
policyName =
Optional.ofNullable(optimizerGroup.getProperties())
.orElseGet(Maps::newHashMap)
.getOrDefault(SCHEDULING_POLICY_PROPERTY_NAME, QUOTA);
.getOrDefault(SCHEDULING_POLICY_PROPERTY_NAME, QuotaOccupySorter.IDENTIFIER);
} finally {
tableLock.unlock();
}
}

static {
ServiceLoader<SorterFactory> sorterFactories = ServiceLoader.load(SorterFactory.class);
Iterator<SorterFactory> iterator = sorterFactories.iterator();
iterator.forEachRemaining(
sorterFactory -> {
String identifier = sorterFactory.getIdentifier();
sorterFactoryCache.put(identifier, sorterFactory);
LOG.info(
"Loaded scheduling policy {} and its corresponding sorter instance {}",
identifier,
sorterFactory.getClass().getName());
});
}

public String name() {
return policyName;
}
Expand All @@ -77,12 +99,15 @@ public TableRuntime scheduleTable(Set<ServerTableIdentifier> skipSet) {
}

private Comparator<TableRuntime> createSorterByPolicy() {
if (policyName.equalsIgnoreCase(QUOTA)) {
return new QuotaOccupySorter();
} else if (policyName.equalsIgnoreCase(BALANCED)) {
return new BalancedSorter();
if (sorterFactoryCache.get(policyName) != null) {
SorterFactory sorterFactory = sorterFactoryCache.get(policyName);
LOG.info(
"Using sorter instance {} corresponding to the scheduling policy {}",
sorterFactory.getClass().getName(),
policyName);
return sorterFactory.createComparator();
} else {
throw new IllegalArgumentException("Illegal scheduling policy: " + policyName);
throw new IllegalArgumentException("Unsupported scheduling policy: " + policyName);
}
}

Expand Down Expand Up @@ -136,30 +161,4 @@ public void removeTable(TableRuntime tableRuntime) {
Map<ServerTableIdentifier, TableRuntime> getTableRuntimeMap() {
return tableRuntimeMap;
}

private static class QuotaOccupySorter implements Comparator<TableRuntime> {

private final Map<TableRuntime, Double> tableWeightMap = Maps.newHashMap();

@Override
public int compare(TableRuntime one, TableRuntime another) {
return Double.compare(
tableWeightMap.computeIfAbsent(one, TableRuntime::calculateQuotaOccupy),
tableWeightMap.computeIfAbsent(another, TableRuntime::calculateQuotaOccupy));
}
}

private static class BalancedSorter implements Comparator<TableRuntime> {
@Override
public int compare(TableRuntime one, TableRuntime another) {
return Long.compare(
Math.max(
one.getLastFullOptimizingTime(),
Math.max(one.getLastMinorOptimizingTime(), one.getLastMajorOptimizingTime())),
Math.max(
another.getLastFullOptimizingTime(),
Math.max(
another.getLastMinorOptimizingTime(), another.getLastMajorOptimizingTime())));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.amoro.server.optimizing.sorter;

import org.apache.amoro.server.table.TableRuntime;

import java.util.Comparator;

public class BalancedSorter implements SorterFactory {

private static final String IDENTIFIER = "balanced";

@Override
public String getIdentifier() {
return IDENTIFIER;
}

@Override
public Comparator<TableRuntime> createComparator() {
return new Comparator<TableRuntime>() {
@Override
public int compare(TableRuntime one, TableRuntime another) {
return Long.compare(
Math.max(
one.getLastFullOptimizingTime(),
Math.max(one.getLastMinorOptimizingTime(), one.getLastMajorOptimizingTime())),
Math.max(
another.getLastFullOptimizingTime(),
Math.max(
another.getLastMinorOptimizingTime(), another.getLastMajorOptimizingTime())));
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.amoro.server.optimizing.sorter;

import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;

import java.util.Comparator;
import java.util.Map;

public class QuotaOccupySorter implements SorterFactory {

public static final String IDENTIFIER = "quota";

@Override
public String getIdentifier() {
return IDENTIFIER;
}

@Override
public Comparator<TableRuntime> createComparator() {
final Map<TableRuntime, Double> tableWeightMap = Maps.newHashMap();
return new Comparator<TableRuntime>() {
@Override
public int compare(TableRuntime one, TableRuntime another) {
return Double.compare(
tableWeightMap.computeIfAbsent(one, TableRuntime::calculateQuotaOccupy),
tableWeightMap.computeIfAbsent(another, TableRuntime::calculateQuotaOccupy));
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.amoro.server.optimizing.sorter;

import org.apache.amoro.server.optimizing.SchedulingPolicy;

import java.util.Comparator;

/**
* A factory for sorter. Sorter instantiates a comparator, which is automatically loaded by the
* {@link SchedulingPolicy} as a plugin, as long as the sorter is constructed and the {@link
* /resources/META-INF/services/org.apache.amoro.server.optimizing.sorter.SorterFactory} file
* contains the full qualified class name for the sorter. The comparator sorts the tableRuntimes
* based on the parameters of each tableRuntime in the input tableRuntimeList, and determines the
* scheduling priority for optimization of each tableRuntime.
*/
public interface SorterFactory {

/**
* Returns a globally unique identifier for the sorter instance, such as Balanced、balanced and
* BALANCED represent the different sorter instance. The {@link SchedulingPolicy} will check the
* values of {@link SchedulingPolicy#policyName} and {@link SorterFactory#getIdentifier} to decide
* which sorter instance to use. If sorter instances sorterA and sorterB have the same identifier
* id1, the {@link SchedulingPolicy} loads sorter instances in the order of their loading, first
* loading sorterA, then sorterB. Ultimately, sorterB will replace sorterA as the final sorter
* instance associated with id1, which is stored in {@link SchedulingPolicy#sorterFactoryCache}.
*/
String getIdentifier();

/** Create a comparator for sorter. */
Comparator createComparator();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# *
# http://www.apache.org/licenses/LICENSE-2.0
# *
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

org.apache.amoro.server.optimizing.sorter.QuotaOccupySorter
org.apache.amoro.server.optimizing.sorter.BalancedSorter

0 comments on commit e8ca237

Please sign in to comment.