Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improvement]: Controller dosenot depend on TableService #3425

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -252,12 +252,7 @@ private void startThriftServer(TServer server, String threadName) {
private void initHttpService() {
DashboardServer dashboardServer =
new DashboardServer(
serviceConfig,
catalogManager,
tableManager,
optimizingService,
terminalManager,
tableService);
serviceConfig, catalogManager, tableManager, optimizingService, terminalManager);
RestCatalogService restCatalogService = new RestCatalogService(catalogManager, tableManager);

httpServer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,14 @@
import org.apache.amoro.resource.Resource;
import org.apache.amoro.resource.ResourceGroup;
import org.apache.amoro.server.catalog.CatalogManager;
import org.apache.amoro.server.optimizing.OptimizingProcess;
import org.apache.amoro.server.optimizing.OptimizingProcessMeta;
import org.apache.amoro.server.optimizing.OptimizingQueue;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.optimizing.TaskRuntime;
import org.apache.amoro.server.persistence.StatedPersistentBase;
import org.apache.amoro.server.persistence.mapper.OptimizerMapper;
import org.apache.amoro.server.persistence.mapper.OptimizingMapper;
import org.apache.amoro.server.persistence.mapper.ResourceMapper;
import org.apache.amoro.server.resource.OptimizerInstance;
import org.apache.amoro.server.resource.OptimizerManager;
Expand All @@ -55,6 +58,7 @@
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList;
import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.amoro.shade.thrift.org.apache.thrift.TException;
import org.apache.amoro.table.TableProperties;
import org.apache.commons.lang3.StringUtils;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -269,6 +273,26 @@ public String authenticate(OptimizerRegisterInfo registerInfo) {
return optimizer.getToken();
}

@Override
public boolean cancelProcess(long processId) throws TException {
OptimizingProcessMeta processMeta =
getAs(OptimizingMapper.class, m -> m.getOptimizingProcess(processId));
if (processMeta == null) {
return false;
}
long tableId = processMeta.getTableId();
TableRuntime tableRuntime = tableService.getRuntime(tableId);
if (tableRuntime == null) {
return false;
}
OptimizingProcess process = tableRuntime.getOptimizingProcess();
if (process == null || process.getProcessId() != processId) {
return false;
}
process.close();
return true;
}

/**
* Get optimizing queue.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.apache.amoro.server.dashboard.response.ErrorResponse;
import org.apache.amoro.server.dashboard.utils.ParamSignatureCalculator;
import org.apache.amoro.server.table.TableManager;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.server.terminal.TerminalManager;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -101,24 +100,21 @@ public DashboardServer(
CatalogManager catalogManager,
TableManager tableManager,
DefaultOptimizingService optimizerManager,
TerminalManager terminalManager,
TableService tableService) {
TerminalManager terminalManager) {
PlatformFileManager platformFileManager = new PlatformFileManager();
this.catalogController = new CatalogController(catalogManager, platformFileManager);
this.healthCheckController = new HealthCheckController();
this.loginController = new LoginController(serviceConfig);
// TODO: remove table service from OptimizerGroupController
this.optimizerGroupController =
new OptimizerGroupController(tableManager, tableService, optimizerManager);
this.optimizerGroupController = new OptimizerGroupController(tableManager, optimizerManager);
this.optimizerController = new OptimizerController(optimizerManager);
this.platformFileInfoController = new PlatformFileInfoController(platformFileManager);
this.settingController = new SettingController(serviceConfig, optimizerManager);
ServerTableDescriptor tableDescriptor =
new ServerTableDescriptor(catalogManager, tableManager, serviceConfig);
// TODO: remove table service from TableController
this.tableController =
new TableController(
catalogManager, tableManager, tableService, tableDescriptor, serviceConfig);
new TableController(catalogManager, tableManager, tableDescriptor, serviceConfig);
this.terminalController = new TerminalController(terminalManager);
this.versionController = new VersionController();
this.overviewController = new OverviewController();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,11 @@
import org.apache.amoro.server.dashboard.model.TableOptimizingInfo;
import org.apache.amoro.server.dashboard.response.OkResponse;
import org.apache.amoro.server.dashboard.response.PageResult;
import org.apache.amoro.server.dashboard.utils.OptimizingUtil;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.persistence.TableRuntimeMeta;
import org.apache.amoro.server.resource.ContainerMetadata;
import org.apache.amoro.server.resource.OptimizerInstance;
import org.apache.amoro.server.resource.ResourceContainers;
import org.apache.amoro.server.table.TableManager;
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
Expand All @@ -60,15 +56,11 @@ public class OptimizerGroupController {

private static final String ALL_GROUP = "all";
private final TableManager tableManager;
private final TableService tableService;
private final DefaultOptimizingService optimizerManager;

public OptimizerGroupController(
TableManager tableManager,
TableService tableService,
DefaultOptimizingService optimizerManager) {
TableManager tableManager, DefaultOptimizingService optimizerManager) {
this.tableManager = tableManager;
this.tableService = tableService;
this.optimizerManager = optimizerManager;
}

Expand Down Expand Up @@ -106,26 +98,16 @@ public void getOptimizerTables(Context ctx) {
if (statusCodes.isEmpty()) {
statusCodes = null;
}
Pair<List<TableRuntimeMeta>, Integer> tableRuntimeBeans =
tableManager.queryTableRuntimeMetas(
Pair<List<TableOptimizingInfo>, Integer> tableRuntimeBeans =
tableManager.queryTableOptimizingInfo(
optimizerGroupUsedInDbFilter,
dbFilterStr,
tableFilterStr,
statusCodes,
pageSize,
offset);

List<TableRuntime> tableRuntimes =
tableRuntimeBeans.getLeft().stream()
.map(meta -> tableService.getRuntime(meta.getTableId()))
.collect(Collectors.toList());

PageResult<TableOptimizingInfo> amsPageResult =
PageResult.of(
tableRuntimes.stream()
.map(OptimizingUtil::buildTableOptimizeInfo)
.collect(Collectors.toList()),
tableRuntimeBeans.getRight());
PageResult.of(tableRuntimeBeans.getLeft(), tableRuntimeBeans.getRight());
ctx.json(OkResponse.of(amsPageResult));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.TableFormat;
import org.apache.amoro.api.CatalogMeta;
import org.apache.amoro.api.OptimizingService;
import org.apache.amoro.client.OptimizingClientPools;
import org.apache.amoro.config.Configurations;
import org.apache.amoro.hive.CachedHiveClientPool;
import org.apache.amoro.hive.HMSClientPool;
Expand All @@ -51,12 +53,12 @@
import org.apache.amoro.server.dashboard.utils.AmsUtil;
import org.apache.amoro.server.dashboard.utils.CommonUtil;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.persistence.TableRuntimeMeta;
import org.apache.amoro.server.table.TableManager;
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.shade.guava32.com.google.common.base.Function;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.amoro.shade.thrift.org.apache.thrift.TException;
import org.apache.amoro.table.TableIdentifier;
import org.apache.amoro.table.TableMetaStore;
import org.apache.amoro.table.TableProperties;
Expand Down Expand Up @@ -103,7 +105,6 @@ public class TableController {

private final CatalogManager catalogManager;
private final TableManager tableManager;
private final TableService tableService;
private final ServerTableDescriptor tableDescriptor;
private final Configurations serviceConfig;
private final ConcurrentHashMap<TableIdentifier, UpgradeRunningInfo> upgradeRunningInfo =
Expand All @@ -113,12 +114,10 @@ public class TableController {
public TableController(
CatalogManager catalogManager,
TableManager tableManager,
TableService tableService,
ServerTableDescriptor tableDescriptor,
Configurations serviceConfig) {
this.catalogManager = catalogManager;
this.tableManager = tableManager;
this.tableService = tableService;
this.tableDescriptor = tableDescriptor;
this.serviceConfig = serviceConfig;
this.tableUpgradeExecutor =
Expand Down Expand Up @@ -157,11 +156,12 @@ public void getTableDetail(Context ctx) {
tableManager.getServerTableIdentifier(
TableIdentifier.of(catalog, database, tableName).buildTableIdentifier()));
if (serverTableIdentifier.isPresent()) {
TableRuntime tableRuntime = tableService.getRuntime(serverTableIdentifier.get().getId());
if (tableRuntime != null) {
tableSummary.setOptimizingStatus(tableRuntime.getOptimizingStatus().name());
TableRuntimeMeta tableRuntimeMeta =
tableManager.getTableRuntimeMata(serverTableIdentifier.get());
if (tableRuntimeMeta != null) {
tableSummary.setOptimizingStatus(tableRuntimeMeta.getTableStatus().name());
AbstractOptimizingEvaluator.PendingInput tableRuntimeSummary =
tableRuntime.getTableSummary();
tableRuntimeMeta.getTableSummary();
if (tableRuntimeSummary != null) {
tableSummary.setHealthScore(tableRuntimeSummary.getHealthScore());
}
Expand Down Expand Up @@ -674,31 +674,31 @@ public void cancelOptimizingProcess(Context ctx) {
String catalog = ctx.pathParam("catalog");
String db = ctx.pathParam("db");
String table = ctx.pathParam("table");
String processId = ctx.pathParam("processId");
String processIds = ctx.pathParam("processId");
Preconditions.checkArgument(
StringUtils.isNotBlank(catalog)
&& StringUtils.isNotBlank(db)
&& StringUtils.isNotBlank(table),
"catalog.database.tableName can not be empty in any element");
Preconditions.checkState(catalogManager.catalogExist(catalog), "invalid catalog!");

long processId = Long.parseLong(processIds);
ServerTableIdentifier serverTableIdentifier =
tableManager.getServerTableIdentifier(
TableIdentifier.of(catalog, db, table).buildTableIdentifier());
TableRuntime tableRuntime =
serverTableIdentifier != null
? tableService.getRuntime(serverTableIdentifier.getId())
: null;
TableRuntimeMeta meta = tableManager.getTableRuntimeMata(serverTableIdentifier);
if (meta == null || meta.getOptimizingProcessId() != processId) {
throw new IllegalArgumentException(
String.format("Can't cancel optimizing process %s", processId));
}

Preconditions.checkArgument(
tableRuntime != null
&& tableRuntime.getOptimizingProcess() != null
&& Objects.equals(
tableRuntime.getOptimizingProcess().getProcessId(), Long.parseLong(processId)),
"Can't cancel optimizing process %s",
processId);

tableRuntime.getOptimizingProcess().close();
OptimizingService.Iface client =
OptimizingClientPools.getClient(
AmsUtil.getAMSThriftAddress(serviceConfig, Constants.THRIFT_OPTIMIZING_SERVICE_NAME));
try {
client.cancelProcess(processId);
} catch (TException e) {
throw new IllegalStateException("Failed to cancel optimizing process:" + e.getMessage());
}
ctx.json(OkResponse.ok());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,66 @@

package org.apache.amoro.server.dashboard.utils;

import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.config.OptimizingConfig;
import org.apache.amoro.optimizing.MetricsSummary;
import org.apache.amoro.optimizing.plan.AbstractOptimizingEvaluator;
import org.apache.amoro.server.dashboard.model.TableOptimizingInfo;
import org.apache.amoro.server.optimizing.OptimizingProcess;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.server.optimizing.OptimizingTaskMeta;
import org.apache.amoro.server.optimizing.TaskRuntime;
import org.apache.amoro.server.persistence.TableRuntimeMeta;
import org.apache.amoro.table.descriptor.FilesStatistics;
import org.apache.iceberg.ContentFile;

import java.util.List;
import java.util.stream.Collectors;

public class OptimizingUtil {

/**
* Build current table optimize info.
*
* @return TableOptimizeInfo
*/
public static TableOptimizingInfo buildTableOptimizeInfo(TableRuntime optimizingTableRuntime) {
OptimizingProcess process = optimizingTableRuntime.getOptimizingProcess();
TableOptimizingInfo tableOptimizeInfo =
new TableOptimizingInfo(optimizingTableRuntime.getTableIdentifier());
tableOptimizeInfo.setOptimizeStatus(
optimizingTableRuntime.getOptimizingStatus().displayValue());
public static TableOptimizingInfo buildTableOptimizeInfo(
TableRuntimeMeta optimizingTableRuntime,
List<OptimizingTaskMeta> processTasks,
List<TaskRuntime.TaskQuota> quotas) {
ServerTableIdentifier identifier =
ServerTableIdentifier.of(
optimizingTableRuntime.getTableId(),
optimizingTableRuntime.getCatalogName(),
optimizingTableRuntime.getDbName(),
optimizingTableRuntime.getTableName(),
optimizingTableRuntime.getFormat());
TableOptimizingInfo tableOptimizeInfo = new TableOptimizingInfo(identifier);
OptimizingStatus optimizingStatus = optimizingTableRuntime.getTableStatus();
tableOptimizeInfo.setOptimizeStatus(optimizingStatus.displayValue());
tableOptimizeInfo.setDuration(
System.currentTimeMillis() - optimizingTableRuntime.getCurrentStatusStartTime());
tableOptimizeInfo.setQuota(optimizingTableRuntime.getTargetQuota());
tableOptimizeInfo.setQuotaOccupation(optimizingTableRuntime.calculateQuotaOccupy());
OptimizingConfig optimizingConfig =
optimizingTableRuntime.getTableConfig().getOptimizingConfig();
tableOptimizeInfo.setQuota(optimizingConfig.getTargetQuota());
double quotaOccupy =
calculateQuotaOccupy(
processTasks,
quotas,
optimizingTableRuntime.getCurrentStatusStartTime(),
System.currentTimeMillis());
tableOptimizeInfo.setQuotaOccupation(quotaOccupy);
FilesStatistics optimizeFileInfo;
if (optimizingTableRuntime.getOptimizingStatus().isProcessing()) {
optimizeFileInfo = collectOptimizingFileInfo(process == null ? null : process.getSummary());
} else if (optimizingTableRuntime.getOptimizingStatus() == OptimizingStatus.PENDING) {
if (optimizingStatus.isProcessing()) {
MetricsSummary summary = null;
if (processTasks != null && !processTasks.isEmpty()) {
List<MetricsSummary> taskSummary =
processTasks.stream()
.map(OptimizingTaskMeta::getMetricsSummary)
.collect(Collectors.toList());
summary = new MetricsSummary(taskSummary);
}
optimizeFileInfo = collectOptimizingFileInfo(summary);
} else if (optimizingStatus == OptimizingStatus.PENDING) {
optimizeFileInfo = collectPendingFileInfo(optimizingTableRuntime.getPendingInput());
} else {
optimizeFileInfo = null;
Expand All @@ -60,6 +90,28 @@ public static TableOptimizingInfo buildTableOptimizeInfo(TableRuntime optimizing
return tableOptimizeInfo;
}

private static double calculateQuotaOccupy(
List<OptimizingTaskMeta> processTasks,
List<TaskRuntime.TaskQuota> quotas,
long startTime,
long endTime) {
double finishedOccupy = 0;
if (quotas != null) {
finishedOccupy = quotas.stream().mapToDouble(q -> q.getQuotaTime(startTime)).sum();
}
double runningOccupy = 0;
if (processTasks != null) {
runningOccupy =
processTasks.stream()
.mapToDouble(
t ->
TaskRuntime.taskRunningQuotaTime(
startTime, endTime, t.getStartTime(), t.getCostTime()))
.sum();
}
return finishedOccupy + runningOccupy;
}

private static FilesStatistics collectPendingFileInfo(
AbstractOptimizingEvaluator.PendingInput pendingInput) {
if (pendingInput == null) {
Expand Down
Loading
Loading