Skip to content

Commit

Permalink
[INLONG-10173][Sort] SortStandalone support request unified configura…
Browse files Browse the repository at this point in the history
…tion
  • Loading branch information
vernedeng committed May 10, 2024
1 parent 0fd55f8 commit 8817ab5
Show file tree
Hide file tree
Showing 22 changed files with 897 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,20 @@

package org.apache.inlong.common.pojo.sort;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;
import java.util.List;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class SortConfig implements Serializable {

private String sortClusterName;
private List<SortTaskConfig> clusters;
private List<SortTaskConfig> tasks;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.inlong.common.pojo.sortstandalone;
package org.apache.inlong.common.pojo.sort;

import lombok.AllArgsConstructor;
import lombok.Builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Deprecated
public class SortClusterConfig {

String clusterName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Deprecated
public class SortClusterResponse {

public static final int SUCC = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Deprecated
public class SortTaskConfig {

String name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import org.apache.inlong.common.pojo.sdk.SortSourceConfigResponse;
import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
import org.apache.inlong.common.pojo.sortstandalone.SortConfigResponse;
import org.apache.inlong.common.pojo.sort.SortConfigResponse;
import org.apache.inlong.manager.pojo.sort.SortStatusInfo;
import org.apache.inlong.manager.pojo.sort.SortStatusRequest;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.inlong.common.pojo.sort.mq.PulsarClusterConfig;
import org.apache.inlong.common.pojo.sort.node.NodeConfig;
import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
import org.apache.inlong.common.pojo.sortstandalone.SortConfigResponse;
import org.apache.inlong.common.pojo.sort.SortConfigResponse;
import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.plugin.Plugin;
Expand Down Expand Up @@ -305,7 +305,7 @@ private void reloadDataFlowConfig() {
}
map.add(sortTaskConfig);
}
sortConfig.setClusters(temp.get(sortClusterName));
sortConfig.setTasks(temp.get(sortClusterName));
try {
String configStr = objectMapper.writeValueAsString(sortConfig);
sortConfigs.put(sortClusterName, configStr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import org.apache.inlong.common.pojo.sdk.SortSourceConfigResponse;
import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
import org.apache.inlong.common.pojo.sortstandalone.SortConfigResponse;
import org.apache.inlong.common.pojo.sort.SortConfigResponse;
import org.apache.inlong.manager.service.core.SortService;

import io.swagger.annotations.Api;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
*
* SortClusterConfigHolder
*/
@Deprecated
public final class SortClusterConfigHolder {

public static final Logger LOG = InlongLoggerFactory.getLogger(SortClusterConfigHolder.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.standalone.config.holder.v2;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Context;
import org.apache.inlong.common.pojo.sort.SortConfig;
import org.apache.inlong.common.pojo.sort.SortTaskConfig;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigType;
import org.apache.inlong.sort.standalone.config.loader.v2.ClassResourceSortClusterConfigLoader;
import org.apache.inlong.sort.standalone.config.loader.v2.ManagerSortClusterConfigLoader;
import org.apache.inlong.sort.standalone.config.loader.v2.SortConfigLoader;

import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;

import static org.apache.inlong.sort.standalone.utils.Constants.RELOAD_INTERVAL;

@Slf4j
public class SortClusterConfigHolder {

private static SortClusterConfigHolder instance;

private long reloadInterval;
private Timer reloadTimer;
private SortConfigLoader loader;
private SortConfig config;

private SortClusterConfigHolder() {

}

private static SortClusterConfigHolder get() {
if (instance != null) {
return instance;
}

synchronized (SortClusterConfigHolder.class) {
instance = new SortClusterConfigHolder();
instance.reloadInterval = CommonPropertiesHolder.getLong(RELOAD_INTERVAL, 60000L);
String loaderType = CommonPropertiesHolder
.getString(SortClusterConfigType.KEY_TYPE, SortClusterConfigType.MANAGER.name());

if (SortClusterConfigType.FILE.name().equalsIgnoreCase(loaderType)) {
instance.loader = new ClassResourceSortClusterConfigLoader();
} else if (SortClusterConfigType.MANAGER.name().equalsIgnoreCase(loaderType)) {
instance.loader = new ManagerSortClusterConfigLoader();
} else {
// user-defined
try {
Class<?> loaderClass = ClassUtils.getClass(loaderType);
Object loaderObject = loaderClass.getDeclaredConstructor().newInstance();
if (loaderObject instanceof SortConfigLoader) {
instance.loader = (SortConfigLoader) loaderObject;
}
} catch (Throwable t) {
log.error("fail to init loader,loaderType:{},error:{}", loaderType, t.getMessage());
}
}
if (instance.loader == null) {
instance.loader = new ClassResourceSortClusterConfigLoader();
}
try {
instance.loader.configure(new Context(CommonPropertiesHolder.get()));
instance.reload();
instance.setReloadTimer();
} catch (Exception e) {
log.error("failed to reload instance", e);
}
}
return instance;

}

/**
* setReloadTimer
*/
private void setReloadTimer() {
reloadTimer = new Timer(true);
TimerTask task = new TimerTask() {

/**
* run
*/
public void run() {
reload();
}
};
reloadTimer.schedule(task, new Date(System.currentTimeMillis() + reloadInterval), reloadInterval);
}

/**
* reload
*/
private void reload() {
try {
SortConfig newConfig = this.loader.load();
if (newConfig != null) {
this.config = newConfig;
}
} catch (Throwable e) {
log.error(e.getMessage(), e);
}
}

/**
* getClusterConfig
*
* @return
*/
public static SortConfig getSortConfig() {
return get().config;
}

/**
* getTaskConfig
*
* @param sortTaskName
* @return
*/
public static SortTaskConfig getTaskConfig(String sortTaskName) {
SortConfig config = get().config;
if (config != null && config.getTasks() != null) {
for (SortTaskConfig task : config.getTasks()) {
if (StringUtils.equals(sortTaskName, task.getSortTaskName())) {
return task;
}
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
*
* ClassResourceCommonPropertiesLoader
*/
@Deprecated
public class ClassResourceSortClusterConfigLoader implements SortClusterConfigLoader {

public static final Logger LOG = InlongLoggerFactory.getLogger(ClassResourceSortClusterConfigLoader.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
*
* ManagerSortClusterConfigLoader
*/
@Deprecated
public class ManagerSortClusterConfigLoader implements SortClusterConfigLoader {

public static final Logger LOG = InlongLoggerFactory.getLogger(ClassResourceSortClusterConfigLoader.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
*
* SortClusterConfigLoader
*/
@Deprecated
public interface SortClusterConfigLoader extends Configurable {

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.standalone.config.loader.v2;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.IOUtils;
import org.apache.flume.Context;
import org.apache.inlong.common.pojo.sort.SortConfig;
import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigType;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.slf4j.Logger;

import java.nio.charset.Charset;

/**
*
* ClassResourceCommonPropertiesLoader
*/
public class ClassResourceSortClusterConfigLoader implements SortConfigLoader {

public static final Logger LOG = InlongLoggerFactory.getLogger(ClassResourceSortClusterConfigLoader.class);

private Context context;
private ObjectMapper objectMapper = new ObjectMapper();

/**
* load
*
* @return
*/
@Override
public SortConfig load() {
String fileName = SortClusterConfigType.DEFAULT_FILE;
try {
if (context != null) {
fileName = context.getString(SortClusterConfigType.KEY_FILE, SortClusterConfigType.DEFAULT_FILE);
}
String confString = IOUtils.toString(getClass().getClassLoader().getResource(fileName),
Charset.defaultCharset());
int index = confString.indexOf('{');
confString = confString.substring(index);
return objectMapper.readValue(confString, SortConfig.class);
} catch (Exception e) {
LOG.error("fail to load properties, file ={}, and e= {}", fileName, e);
}
return SortConfig.builder().build();
}

/**
* configure
*
* @param context
*/
@Override
public void configure(Context context) {
this.context = context;
}
}
Loading

0 comments on commit 8817ab5

Please sign in to comment.