Skip to content

Commit

Permalink
[INLONG-10529][Sort] PulsarSink support switch metadata acquire mode
Browse files Browse the repository at this point in the history
  • Loading branch information
vernedeng committed Jul 2, 2024
1 parent 8de559e commit f6cad3a
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import org.apache.inlong.common.pojo.sort.ClusterTagConfig;
import org.apache.inlong.common.pojo.sort.TaskConfig;
import org.apache.inlong.common.pojo.sort.node.PulsarNodeConfig;
import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
import org.apache.inlong.sort.standalone.config.pojo.InlongId;
import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
Expand All @@ -36,6 +38,7 @@

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
Expand All @@ -53,36 +56,59 @@ public PulsarFederationSinkContext(String sinkName, Context context, Channel cha

public void reload() {
try {
TaskConfig newSortTaskConfig = SortConfigHolder.getTaskConfig(taskName);
if (newSortTaskConfig == null) {
TaskConfig newTaskConfig = SortConfigHolder.getTaskConfig(taskName);
SortTaskConfig newSortTaskConfig = SortClusterConfigHolder.getTaskConfig(taskName);
if (newTaskConfig == null && newSortTaskConfig == null) {
LOG.error("newSortTaskConfig is null.");
return;
}
if (this.taskConfig != null && this.taskConfig.equals(newSortTaskConfig)) {
if ((this.taskConfig != null && this.taskConfig.equals(newTaskConfig))
&& (this.sortTaskConfig != null && this.sortTaskConfig.equals(newSortTaskConfig))) {
LOG.info("Same sortTaskConfig, do nothing.");
return;
}
this.taskConfig = newSortTaskConfig;

PulsarNodeConfig requestNodeConfig = (PulsarNodeConfig) newSortTaskConfig.getNodeConfig();
PulsarNodeConfig requestNodeConfig = (PulsarNodeConfig) newTaskConfig.getNodeConfig();
if (pulsarNodeConfig == null || requestNodeConfig.getVersion() > pulsarNodeConfig.getVersion()) {
this.pulsarNodeConfig = requestNodeConfig;
}
this.taskConfig = newTaskConfig;
this.sortTaskConfig = newSortTaskConfig;

this.idConfigMap = this.taskConfig.getClusterTagConfigs()
.stream()
.map(ClusterTagConfig::getDataFlowConfigs)
.flatMap(Collection::stream)
.map(PulsarIdConfig::create)
.collect(Collectors.toMap(
config -> InlongId.generateUid(config.getInlongGroupId(), config.getInlongStreamId()),
v -> v,
(v1, v2) -> v1));
Map<String, PulsarIdConfig> fromTaskConfig = fromTaskConfig(taskConfig);
Map<String, PulsarIdConfig> fromSortTaskConfig = fromSortTaskConfig(sortTaskConfig);
idConfigMap = unifiedConfiguration ? fromTaskConfig : fromSortTaskConfig;
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
}
}

public Map<String, PulsarIdConfig> fromTaskConfig(TaskConfig taskConfig) {
return taskConfig.getClusterTagConfigs()
.stream()
.map(ClusterTagConfig::getDataFlowConfigs)
.flatMap(Collection::stream)
.map(PulsarIdConfig::create)
.collect(Collectors.toMap(
config -> InlongId.generateUid(config.getInlongGroupId(), config.getInlongStreamId()),
v -> v,
(v1, v2) -> v1));
}

public Map<String, PulsarIdConfig> fromSortTaskConfig(SortTaskConfig sortTaskConfig) {
Map<String, PulsarIdConfig> newIdConfigMap = new ConcurrentHashMap<>();
List<Map<String, String>> idList = sortTaskConfig.getIdParams();
for (Map<String, String> idParam : idList) {
try {
PulsarIdConfig idConfig = new PulsarIdConfig(idParam);
newIdConfigMap.put(idConfig.getUid(), idConfig);
} catch (Exception e) {
LOG.error("fail to parse pulsar id config", e);
}
}
return newIdConfigMap;
}

public String getTopic(String uid) {
PulsarIdConfig idConfig = this.idConfigMap.get(uid);
if (idConfig == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@
import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
import org.apache.inlong.common.pojo.sort.dataflow.sink.PulsarSinkConfig;
import org.apache.inlong.sort.standalone.config.pojo.InlongId;
import org.apache.inlong.sort.standalone.utils.Constants;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.Map;

@Data
@Builder
@NoArgsConstructor
Expand All @@ -46,6 +49,16 @@ public class PulsarIdConfig {
private String topic;
private DataTypeEnum dataType = DataTypeEnum.TEXT;

public PulsarIdConfig(Map<String, String> idParam) {
this.inlongGroupId = idParam.get(Constants.INLONG_GROUP_ID);
this.inlongStreamId = idParam.getOrDefault(Constants.INLONG_STREAM_ID, DEFAULT_INLONG_STREAM);
this.uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
this.separator = idParam.getOrDefault(PulsarIdConfig.KEY_SEPARATOR, PulsarIdConfig.DEFAULT_SEPARATOR);
this.topic = idParam.getOrDefault(Constants.TOPIC, uid);
this.dataType = DataTypeEnum
.convert(idParam.getOrDefault(PulsarIdConfig.KEY_DATA_TYPE, DataTypeEnum.TEXT.getType()));
}

public static PulsarIdConfig create(DataFlowConfig dataFlowConfig) {
PulsarSinkConfig sinkConfig = (PulsarSinkConfig) dataFlowConfig.getSinkConfig();

Expand Down

0 comments on commit f6cad3a

Please sign in to comment.