Skip to content

Commit

Permalink
fixup! [AMORO-3418] Optimize ams configuration to support parsing of …
Browse files Browse the repository at this point in the history
…time interval and storage related configuration items when both values and units are specified
  • Loading branch information
jzjsnow committed Jan 23, 2025
1 parent d5a92ed commit 53ed46f
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ public class AmoroManagementConf {
ConfigOptions.key("thrift-server.max-message-size")
.stringType()
.defaultValue("100 MB")
.withDescription("Maximum message size that the Thrift server can accept. Default unit is bytes if not specified.");
.withDescription(
"Maximum message size that the Thrift server can accept. Default unit is bytes if not specified.");

public static final ConfigOption<Integer> THRIFT_WORKER_THREADS =
ConfigOptions.key("thrift-server.table-service.worker-thread-count")
Expand Down Expand Up @@ -400,7 +401,8 @@ public class AmoroManagementConf {
ConfigOptions.key("terminal.session.timeout")
.durationType()
.defaultValue(Duration.ofMinutes(30))
.withDescription("Session timeout. Default unit is milliseconds if not specified (** Note: default units are minutes when version < 0.8).");
.withDescription(
"Session timeout. Default unit is milliseconds if not specified (** Note: default units are minutes when version < 0.8).");

public static final ConfigOption<String> TERMINAL_SENSITIVE_CONF_KEYS =
ConfigOptions.key("terminal.sensitive-conf-keys")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,9 @@ private void registerAmsServiceMetric() {

private void initThriftService() throws TTransportException {
LOG.info("Initializing thrift service...");
long maxMessageSize = MemorySize.parse(serviceConfig.getString(AmoroManagementConf.THRIFT_MAX_MESSAGE_SIZE)).getBytes();
long maxMessageSize =
MemorySize.parse(serviceConfig.getString(AmoroManagementConf.THRIFT_MAX_MESSAGE_SIZE))
.getBytes();
int selectorThreads = serviceConfig.getInteger(AmoroManagementConf.THRIFT_SELECTOR_THREADS);
int workerThreads = serviceConfig.getInteger(AmoroManagementConf.THRIFT_WORKER_THREADS);
int queueSizePerSelector =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,14 @@ public DefaultOptimizingService(
CatalogManager catalogManager,
MaintainedTableManager tableManager,
TableService tableService) {
this.optimizerTouchTimeout = serviceConfig.get(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT).toMillis();
this.taskAckTimeout = serviceConfig.get(AmoroManagementConf.OPTIMIZER_TASK_ACK_TIMEOUT).toMillis();
this.optimizerTouchTimeout =
serviceConfig.get(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT).toMillis();
this.taskAckTimeout =
serviceConfig.get(AmoroManagementConf.OPTIMIZER_TASK_ACK_TIMEOUT).toMillis();
this.maxPlanningParallelism =
serviceConfig.getInteger(AmoroManagementConf.OPTIMIZER_MAX_PLANNING_PARALLELISM);
this.pollingTimeout = serviceConfig.get(AmoroManagementConf.OPTIMIZER_POLLING_TIMEOUT).toMillis();
this.pollingTimeout =
serviceConfig.get(AmoroManagementConf.OPTIMIZER_POLLING_TIMEOUT).toMillis();
this.tableService = tableService;
this.catalogManager = catalogManager;
this.tableManager = tableManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,8 @@ private void startAms() throws Exception {
AmoroManagementConf.OPTIMIZING_SERVICE_THRIFT_BIND_PORT,
optimizingServiceBindPort);
serviceConfig.set(
AmoroManagementConf.REFRESH_EXTERNAL_CATALOGS_INTERVAL, Duration.ofMillis(1000L));
AmoroManagementConf.REFRESH_EXTERNAL_CATALOGS_INTERVAL,
Duration.ofMillis(1000L));
serviceContainer.startService();
break;
} catch (TTransportException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.amoro.server;

import static org.apache.amoro.server.AmoroServiceContainer.expandConfigMap;

import org.apache.amoro.config.ConfigOption;
import org.apache.amoro.config.Configurations;
import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableMap;
Expand All @@ -39,82 +41,94 @@
import java.time.Duration;
import java.util.Map;

import static org.apache.amoro.server.AmoroServiceContainer.expandConfigMap;

public class TestAmoroManagementConf {
private static final ConfigOption<Duration>[] TIME_RELATED_CONFIG_OPTIONS = new ConfigOption[] {
AmoroManagementConf.REFRESH_EXTERNAL_CATALOGS_INTERVAL,
AmoroManagementConf.AUTO_CREATE_TAGS_INTERVAL,
AmoroManagementConf.REFRESH_TABLES_INTERVAL,
AmoroManagementConf.BLOCKER_TIMEOUT,
AmoroManagementConf.OPTIMIZER_HB_TIMEOUT,
AmoroManagementConf.OPTIMIZER_TASK_ACK_TIMEOUT,
AmoroManagementConf.OPTIMIZER_POLLING_TIMEOUT,
AmoroManagementConf.TERMINAL_SESSION_TIMEOUT
};
private static final ConfigOption<Duration>[] TIME_RELATED_CONFIG_OPTIONS =
new ConfigOption[] {
AmoroManagementConf.REFRESH_EXTERNAL_CATALOGS_INTERVAL,
AmoroManagementConf.AUTO_CREATE_TAGS_INTERVAL,
AmoroManagementConf.REFRESH_TABLES_INTERVAL,
AmoroManagementConf.BLOCKER_TIMEOUT,
AmoroManagementConf.OPTIMIZER_HB_TIMEOUT,
AmoroManagementConf.OPTIMIZER_TASK_ACK_TIMEOUT,
AmoroManagementConf.OPTIMIZER_POLLING_TIMEOUT,
AmoroManagementConf.TERMINAL_SESSION_TIMEOUT
};

private static final ConfigOption<String>[] STORAGE_RELATED_CONFIG_OPTIONS = new ConfigOption[] {
AmoroManagementConf.THRIFT_MAX_MESSAGE_SIZE
};
private static final ConfigOption<String>[] STORAGE_RELATED_CONFIG_OPTIONS =
new ConfigOption[] {AmoroManagementConf.THRIFT_MAX_MESSAGE_SIZE};

@Test
void testParsingAmoroManagementConfWithTimeUnits() throws Exception {
Configurations serviceConfig = getConfigurationsWithUnits();
assertTimeRelatedConfigs(serviceConfig);
}
@Test
void testParsingDefaultTimeRelatedConfigs() {
Configurations serviceConfig = new Configurations();
assertTimeRelatedConfigs(serviceConfig);
}

@Test
void testParsingAmoroManagementConfWithStorageUnits() throws Exception {
Configurations serviceConfig = getConfigurationsWithUnits();
assertStorageRelatedConfigs(serviceConfig);
}
@Test
void testParsingDefaultStorageRelatedConfigs() {
Configurations serviceConfig = new Configurations();
assertStorageRelatedConfigs(serviceConfig);
}

private Configurations getConfigurationsWithUnits() throws URISyntaxException, IOException {
URL resource = Resources.getResource("config-with-units.yaml");
JsonNode yamlConfig =
JacksonUtil.fromObjects(
new Yaml().loadAs(Files.newInputStream(Paths.get(resource.toURI())), Map.class));
Map<String, Object> systemConfig =
JacksonUtil.getMap(
yamlConfig,
AmoroManagementConf.SYSTEM_CONFIG,
new TypeReference<Map<String, Object>>() {});
Map<String, Object> expandedConfigurationMap = Maps.newHashMap();
expandConfigMap(systemConfig, "", expandedConfigurationMap);
return Configurations.fromObjectMap(expandedConfigurationMap);
}
private void assertTimeRelatedConfigs(Configurations serviceConfig) {
Configurations timeRelatedConfigsWithoutTimeUnit = Configurations.fromObjectMap(timeRelatedConfigMapInMillisSecondsWithoutTimeUnits);
for (ConfigOption<Duration> configOption : TIME_RELATED_CONFIG_OPTIONS) {
Assertions.assertEquals(
timeRelatedConfigsWithoutTimeUnit.get(configOption),
serviceConfig.get(configOption));
}
@Test
void testParsingAmoroManagementConfWithTimeUnits() throws Exception {
Configurations serviceConfig = getConfigurationsWithUnits();
assertTimeRelatedConfigs(serviceConfig);
}

@Test
void testParsingAmoroManagementConfWithStorageUnits() throws Exception {
Configurations serviceConfig = getConfigurationsWithUnits();
assertStorageRelatedConfigs(serviceConfig);
}

private Configurations getConfigurationsWithUnits() throws URISyntaxException, IOException {
URL resource = Resources.getResource("config-with-units.yaml");
JsonNode yamlConfig =
JacksonUtil.fromObjects(
new Yaml().loadAs(Files.newInputStream(Paths.get(resource.toURI())), Map.class));
Map<String, Object> systemConfig =
JacksonUtil.getMap(
yamlConfig,
AmoroManagementConf.SYSTEM_CONFIG,
new TypeReference<Map<String, Object>>() {});
Map<String, Object> expandedConfigurationMap = Maps.newHashMap();
expandConfigMap(systemConfig, "", expandedConfigurationMap);
return Configurations.fromObjectMap(expandedConfigurationMap);
}

private void assertTimeRelatedConfigs(Configurations serviceConfig) {
Configurations timeRelatedConfigsWithoutTimeUnit =
Configurations.fromObjectMap(timeRelatedConfigMapInMillisSecondsWithoutTimeUnits);
for (ConfigOption<Duration> configOption : TIME_RELATED_CONFIG_OPTIONS) {
Assertions.assertEquals(
timeRelatedConfigsWithoutTimeUnit.get(configOption), serviceConfig.get(configOption));
}
}

private void assertStorageRelatedConfigs(Configurations serviceConfig) {
Configurations storageRelatedConfigsWithoutTimeUnit = Configurations.fromObjectMap(storageRelatedConfigMapWithoutTimeUnits);
for (ConfigOption<String> configOption : STORAGE_RELATED_CONFIG_OPTIONS) {
Assertions.assertEquals(
MemorySize.parse(storageRelatedConfigsWithoutTimeUnit.getString(configOption)),
MemorySize.parse(serviceConfig.getString(AmoroManagementConf.THRIFT_MAX_MESSAGE_SIZE)));
}
private void assertStorageRelatedConfigs(Configurations serviceConfig) {
Configurations storageRelatedConfigsWithoutTimeUnit =
Configurations.fromObjectMap(storageRelatedConfigMapWithoutTimeUnits);
for (ConfigOption<String> configOption : STORAGE_RELATED_CONFIG_OPTIONS) {
Assertions.assertEquals(
MemorySize.parse(storageRelatedConfigsWithoutTimeUnit.getString(configOption)),
MemorySize.parse(serviceConfig.getString(AmoroManagementConf.THRIFT_MAX_MESSAGE_SIZE)));
}
}

private final Map<String, Object> timeRelatedConfigMapInMillisSecondsWithoutTimeUnits =
ImmutableMap.<String, Object>builder()
.put("refresh-external-catalogs.interval", "180000")
.put("refresh-tables.interval", "60000")
.put("optimizer.heart-beat-timeout", "60000")
.put("optimizer.task-ack-timeout", "30000")
.put("optimizer.polling-timeout", "3000")
.put("blocker.timeout", "60000")
.put("auto-create-tags.interval", "60000")
.put("terminal.session", "180000")
.build();
private final Map<String, Object> timeRelatedConfigMapInMillisSecondsWithoutTimeUnits =
ImmutableMap.<String, Object>builder()
.put("refresh-external-catalogs.interval", "180000")
.put("refresh-tables.interval", "60000")
.put("optimizer.heart-beat-timeout", "60000")
.put("optimizer.task-ack-timeout", "30000")
.put("optimizer.polling-timeout", "3000")
.put("blocker.timeout", "60000")
.put("auto-create-tags.interval", "60000")
.put("terminal.session", "180000")
.build();

private final Map<String, Object> storageRelatedConfigMapWithoutTimeUnits =
ImmutableMap.<String, Object>builder()
.put("thrift-server.max-message-size", "104857600")
.build();
private final Map<String, Object> storageRelatedConfigMapWithoutTimeUnits =
ImmutableMap.<String, Object>builder()
.put("thrift-server.max-message-size", "104857600")
.build();
}

0 comments on commit 53ed46f

Please sign in to comment.