diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java index a649c963ba8..eb01d9f1658 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.stream.Stream; import static org.apache.inlong.sort.base.Constants.AUDIT_SORT_INPUT; @@ -46,7 +47,7 @@ public class MetricOption implements Serializable { private static final long serialVersionUID = 1L; private Map labels; - private HashSet ipPortList; + private Set ipPortSet; private String ipPorts; private RegisteredMetric registeredMetric; private long initRecords; @@ -57,7 +58,7 @@ public class MetricOption implements Serializable { private List inlongAuditKeys; private MetricOption( - String inlongLabels, + Map labels, @Nullable String inlongAudit, RegisteredMetric registeredMetric, long initRecords, @@ -65,54 +66,26 @@ private MetricOption( Long initDirtyRecords, Long initDirtyBytes, Long readPhase, - String inlongAuditKeys) { - Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(inlongLabels), - "Inlong labels must be set for register metric."); - + List inlongAuditKeys, + Set ipPortSet) { this.initRecords = initRecords; this.initBytes = initBytes; this.initDirtyRecords = initDirtyRecords; this.initDirtyBytes = initDirtyBytes; this.readPhase = readPhase; - this.labels = new LinkedHashMap<>(); - String[] inLongLabelArray = inlongLabels.split(DELIMITER); - Preconditions.checkArgument(Stream.of(inLongLabelArray).allMatch(label -> label.contains("=")), - "InLong metric label format must be xxx=xxx"); - Stream.of(inLongLabelArray).forEach(label -> { - String key = label.substring(0, label.indexOf('=')); - String value = label.substring(label.indexOf('=') + 1); - labels.put(key, value); - }); - + this.labels = labels; this.ipPorts = inlongAudit; - - if (ipPorts != null) { - - Preconditions.checkArgument(labels.containsKey(GROUP_ID) && labels.containsKey(STREAM_ID), - "groupId and streamId must be set when enable inlong audit collect."); - - if (inlongAuditKeys == null) { - LOG.warn("should set inlongAuditKeys when enable inlong audit collect, " - + "fallback to use id {} as audit key", AUDIT_SORT_INPUT); - inlongAuditKeys = AUDIT_SORT_INPUT; - } - - this.inlongAuditKeys = AuditUtils.extractAuditKeys(inlongAuditKeys); - this.ipPortList = AuditUtils.extractAuditIpPorts(ipPorts); - - } - - if (registeredMetric != null) { - this.registeredMetric = registeredMetric; - } + this.inlongAuditKeys = inlongAuditKeys; + this.ipPortSet = ipPortSet; + this.registeredMetric = registeredMetric; } public Map getLabels() { return labels; } - public HashSet getIpPortList() { - return ipPortList; + public HashSet getIpPortSet() { + return new HashSet<>(ipPortSet); } public Optional getIpPorts() { @@ -238,11 +211,43 @@ public MetricOption.Builder withInitReadPhase(Long initReadPhase) { } public MetricOption build() { - if (inlongLabels == null && inlongAudit == null) { + if (inlongAudit == null && inlongLabels == null) { + LOG.warn("The property 'metrics.audit.proxy.hosts and inlong.metric.labels' has not been set," + + " the program will not open audit function"); return null; } - return new MetricOption(inlongLabels, inlongAudit, registeredMetric, initRecords, initBytes, - initDirtyRecords, initDirtyBytes, initReadPhase, inlongAuditKeys); + + Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(inlongLabels), + "Inlong labels must be set for register metric."); + String[] inLongLabelArray = inlongLabels.split(DELIMITER); + Preconditions.checkArgument(Stream.of(inLongLabelArray).allMatch(label -> label.contains("=")), + "InLong metric label format must be xxx=xxx"); + Map labels = new LinkedHashMap<>(); + Stream.of(inLongLabelArray).forEach(label -> { + String key = label.substring(0, label.indexOf('=')); + String value = label.substring(label.indexOf('=') + 1); + labels.put(key, value); + }); + + List inlongAuditKeysList = null; + Set ipPortSet = null; + + if (inlongAudit != null) { + Preconditions.checkArgument(labels.containsKey(GROUP_ID) && labels.containsKey(STREAM_ID), + "The groupId and streamId must be set when enable inlong audit collect."); + + if (inlongAuditKeys == null) { + LOG.warn("The inlongAuditKeys should be set when enable inlong audit collect, " + + "fallback to use id {} as audit key", AUDIT_SORT_INPUT); + inlongAuditKeys = AUDIT_SORT_INPUT; + } + + inlongAuditKeysList = AuditUtils.extractAuditKeys(inlongAuditKeys); + ipPortSet = AuditUtils.extractAuditIpPorts(inlongAudit); + } + + return new MetricOption(labels, inlongAudit, registeredMetric, initRecords, initBytes, + initDirtyRecords, initDirtyBytes, initReadPhase, inlongAuditKeysList, ipPortSet); } } } diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java index 7270becdb7a..1eedde502e1 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java @@ -101,7 +101,7 @@ public SinkMetricData(MetricOption option, MetricGroup metricGroup) { } if (option.getIpPorts().isPresent()) { - AuditOperator.getInstance().setAuditProxy(option.getIpPortList()); + AuditOperator.getInstance().setAuditProxy(option.getIpPortSet()); this.auditOperator = AuditOperator.getInstance(); this.auditKeys = option.getInlongAuditKeys(); } diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java index 0568efd7c35..0d2035e71cc 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java @@ -104,7 +104,7 @@ public SourceMetricData(MetricOption option, MetricGroup metricGroup) { } if (option.getIpPorts().isPresent()) { - AuditOperator.getInstance().setAuditProxy(option.getIpPortList()); + AuditOperator.getInstance().setAuditProxy(option.getIpPortSet()); this.auditOperator = AuditOperator.getInstance(); this.auditKeys = option.getInlongAuditKeys(); } @@ -114,7 +114,7 @@ public SourceMetricData(MetricOption option) { this.labels = option.getLabels(); if (option.getIpPorts().isPresent()) { - AuditOperator.getInstance().setAuditProxy(option.getIpPortList()); + AuditOperator.getInstance().setAuditProxy(option.getIpPortSet()); this.auditOperator = AuditOperator.getInstance(); this.auditKeys = option.getInlongAuditKeys(); }