From 58f896f662acc129bdf81426b096aa42b8bced6c Mon Sep 17 00:00:00 2001 From: vinnerzhang Date: Wed, 8 May 2024 18:42:15 +0800 Subject: [PATCH 1/6] [INLONG-10152][Manager] Refactor MetricOption code structure. --- .../inlong/sort/base/metric/MetricOption.java | 80 ++++++++++--------- 1 file changed, 42 insertions(+), 38 deletions(-) diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java index a649c963ba8..79d5098cb48 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java @@ -57,7 +57,7 @@ public class MetricOption implements Serializable { private List inlongAuditKeys; private MetricOption( - String inlongLabels, + Map labels, @Nullable String inlongAudit, RegisteredMetric registeredMetric, long initRecords, @@ -65,46 +65,18 @@ private MetricOption( Long initDirtyRecords, Long initDirtyBytes, Long readPhase, - String inlongAuditKeys) { - Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(inlongLabels), - "Inlong labels must be set for register metric."); - + List inlongAuditKeys, + HashSet ipPortList) { this.initRecords = initRecords; this.initBytes = initBytes; this.initDirtyRecords = initDirtyRecords; this.initDirtyBytes = initDirtyBytes; this.readPhase = readPhase; - this.labels = new LinkedHashMap<>(); - String[] inLongLabelArray = inlongLabels.split(DELIMITER); - Preconditions.checkArgument(Stream.of(inLongLabelArray).allMatch(label -> label.contains("=")), - "InLong metric label format must be xxx=xxx"); - Stream.of(inLongLabelArray).forEach(label -> { - String key = label.substring(0, label.indexOf('=')); - String value = label.substring(label.indexOf('=') + 1); - labels.put(key, value); - }); - + this.labels = labels; this.ipPorts = inlongAudit; - - if (ipPorts != null) { - - Preconditions.checkArgument(labels.containsKey(GROUP_ID) && labels.containsKey(STREAM_ID), - "groupId and streamId must be set when enable inlong audit collect."); - - if (inlongAuditKeys == null) { - LOG.warn("should set inlongAuditKeys when enable inlong audit collect, " - + "fallback to use id {} as audit key", AUDIT_SORT_INPUT); - inlongAuditKeys = AUDIT_SORT_INPUT; - } - - this.inlongAuditKeys = AuditUtils.extractAuditKeys(inlongAuditKeys); - this.ipPortList = AuditUtils.extractAuditIpPorts(ipPorts); - - } - - if (registeredMetric != null) { - this.registeredMetric = registeredMetric; - } + this.inlongAuditKeys = inlongAuditKeys; + this.ipPortList = ipPortList; + this.registeredMetric = registeredMetric; } public Map getLabels() { @@ -238,11 +210,43 @@ public MetricOption.Builder withInitReadPhase(Long initReadPhase) { } public MetricOption build() { - if (inlongLabels == null && inlongAudit == null) { + if (inlongAudit == null) { + LOG.warn("The property 'metrics.audit.proxy.hosts' has not been set," + + " the program will not open audit function"); return null; } - return new MetricOption(inlongLabels, inlongAudit, registeredMetric, initRecords, initBytes, - initDirtyRecords, initDirtyBytes, initReadPhase, inlongAuditKeys); + + Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(inlongLabels), + "Inlong labels must be set for register metric."); + String[] inLongLabelArray = inlongLabels.split(DELIMITER); + Preconditions.checkArgument(Stream.of(inLongLabelArray).allMatch(label -> label.contains("=")), + "InLong metric label format must be xxx=xxx"); + Map labels = new LinkedHashMap<>(); + Stream.of(inLongLabelArray).forEach(label -> { + String key = label.substring(0, label.indexOf('=')); + String value = label.substring(label.indexOf('=') + 1); + labels.put(key, value); + }); + + List inlongAuditKeysList = null; + HashSet ipPortList = null; + + if (inlongAudit != null) { + Preconditions.checkArgument(labels.containsKey(GROUP_ID) && labels.containsKey(STREAM_ID), + "The groupId and streamId must be set when enable inlong audit collect."); + + if (inlongAuditKeys == null) { + LOG.warn("The inlongAuditKeys should be set when enable inlong audit collect, " + + "fallback to use id {} as audit key", AUDIT_SORT_INPUT); + inlongAuditKeys = AUDIT_SORT_INPUT; + } + + inlongAuditKeysList = AuditUtils.extractAuditKeys(inlongAuditKeys); + ipPortList = AuditUtils.extractAuditIpPorts(inlongAudit); + } + + return new MetricOption(labels, inlongAudit, registeredMetric, initRecords, initBytes, + initDirtyRecords, initDirtyBytes, initReadPhase, inlongAuditKeysList, ipPortList); } } } From 1520293103f51d56d7237afefa0330df9962d193 Mon Sep 17 00:00:00 2001 From: vinnerzhang Date: Wed, 8 May 2024 19:36:32 +0800 Subject: [PATCH 2/6] [INLONG-10152][Manager] Refactor MetricOption code structure. --- .../inlong/sort/base/metric/MetricOption.java | 21 ++++++++++++------- .../sort/base/metric/SinkMetricData.java | 2 +- .../sort/base/metric/SourceMetricData.java | 4 ++-- 3 files changed, 16 insertions(+), 11 deletions(-) diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java index 79d5098cb48..ed30578dff9 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java @@ -28,6 +28,7 @@ import java.io.Serializable; import java.util.HashSet; +import java.util.Set; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -46,7 +47,7 @@ public class MetricOption implements Serializable { private static final long serialVersionUID = 1L; private Map labels; - private HashSet ipPortList; + private Set ipPortSet; private String ipPorts; private RegisteredMetric registeredMetric; private long initRecords; @@ -66,7 +67,7 @@ private MetricOption( Long initDirtyBytes, Long readPhase, List inlongAuditKeys, - HashSet ipPortList) { + Set ipPortSet) { this.initRecords = initRecords; this.initBytes = initBytes; this.initDirtyRecords = initDirtyRecords; @@ -75,7 +76,7 @@ private MetricOption( this.labels = labels; this.ipPorts = inlongAudit; this.inlongAuditKeys = inlongAuditKeys; - this.ipPortList = ipPortList; + this.ipPortSet = ipPortSet; this.registeredMetric = registeredMetric; } @@ -83,8 +84,12 @@ public Map getLabels() { return labels; } - public HashSet getIpPortList() { - return ipPortList; + public Set getIpPortSet() { + return ipPortSet; + } + + public HashSet getIpPortSetAsHashSet() { + return new HashSet<>(ipPortSet); } public Optional getIpPorts() { @@ -229,7 +234,7 @@ public MetricOption build() { }); List inlongAuditKeysList = null; - HashSet ipPortList = null; + Set ipPortSet = null; if (inlongAudit != null) { Preconditions.checkArgument(labels.containsKey(GROUP_ID) && labels.containsKey(STREAM_ID), @@ -242,11 +247,11 @@ public MetricOption build() { } inlongAuditKeysList = AuditUtils.extractAuditKeys(inlongAuditKeys); - ipPortList = AuditUtils.extractAuditIpPorts(inlongAudit); + ipPortSet = AuditUtils.extractAuditIpPorts(inlongAudit); } return new MetricOption(labels, inlongAudit, registeredMetric, initRecords, initBytes, - initDirtyRecords, initDirtyBytes, initReadPhase, inlongAuditKeysList, ipPortList); + initDirtyRecords, initDirtyBytes, initReadPhase, inlongAuditKeysList, ipPortSet); } } } diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java index 7270becdb7a..8dba63be787 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java @@ -101,7 +101,7 @@ public SinkMetricData(MetricOption option, MetricGroup metricGroup) { } if (option.getIpPorts().isPresent()) { - AuditOperator.getInstance().setAuditProxy(option.getIpPortList()); + AuditOperator.getInstance().setAuditProxy(option.getIpPortSetAsHashSet()); this.auditOperator = AuditOperator.getInstance(); this.auditKeys = option.getInlongAuditKeys(); } diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java index 0568efd7c35..76996c7fadf 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java @@ -104,7 +104,7 @@ public SourceMetricData(MetricOption option, MetricGroup metricGroup) { } if (option.getIpPorts().isPresent()) { - AuditOperator.getInstance().setAuditProxy(option.getIpPortList()); + AuditOperator.getInstance().setAuditProxy(option.getIpPortSetAsHashSet()); this.auditOperator = AuditOperator.getInstance(); this.auditKeys = option.getInlongAuditKeys(); } @@ -114,7 +114,7 @@ public SourceMetricData(MetricOption option) { this.labels = option.getLabels(); if (option.getIpPorts().isPresent()) { - AuditOperator.getInstance().setAuditProxy(option.getIpPortList()); + AuditOperator.getInstance().setAuditProxy(option.getIpPortSetAsHashSet()); this.auditOperator = AuditOperator.getInstance(); this.auditKeys = option.getInlongAuditKeys(); } From ffb28fd943081707e957c82a923db3a534b58bdf Mon Sep 17 00:00:00 2001 From: vinnerzhang Date: Wed, 8 May 2024 19:52:04 +0800 Subject: [PATCH 3/6] [INLONG-10152][Manager] fix defect. --- .../java/org/apache/inlong/sort/base/metric/MetricOption.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java index ed30578dff9..45cbc5faebb 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java @@ -215,8 +215,8 @@ public MetricOption.Builder withInitReadPhase(Long initReadPhase) { } public MetricOption build() { - if (inlongAudit == null) { - LOG.warn("The property 'metrics.audit.proxy.hosts' has not been set," + + if (inlongAudit == null && inlongLabels == null) { + LOG.warn("The property 'metrics.audit.proxy.hosts and inlong.metric.labels' has not been set," + " the program will not open audit function"); return null; } From 7dc2d8c9c3814a7aeb3cb43922598dbc40559791 Mon Sep 17 00:00:00 2001 From: vinnerzhang Date: Wed, 8 May 2024 20:18:51 +0800 Subject: [PATCH 4/6] [INLONG-10152][Sort] fix defect. --- .../java/org/apache/inlong/sort/base/metric/MetricOption.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java index 45cbc5faebb..dd54854e96a 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java @@ -28,11 +28,11 @@ import java.io.Serializable; import java.util.HashSet; -import java.util.Set; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.stream.Stream; import static org.apache.inlong.sort.base.Constants.AUDIT_SORT_INPUT; From 93b09dd46e68638bf7cda5bba9853b5f8aadf5bb Mon Sep 17 00:00:00 2001 From: vinnerzhang Date: Thu, 9 May 2024 15:48:25 +0800 Subject: [PATCH 5/6] [INLONG-10152][Sort] fix defect. --- .../org/apache/inlong/sort/base/metric/MetricOption.java | 8 ++------ .../apache/inlong/sort/base/metric/SinkMetricData.java | 2 +- .../apache/inlong/sort/base/metric/SourceMetricData.java | 4 ++-- 3 files changed, 5 insertions(+), 9 deletions(-) diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java index dd54854e96a..1fb01ffac80 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java @@ -83,12 +83,8 @@ private MetricOption( public Map getLabels() { return labels; } - - public Set getIpPortSet() { - return ipPortSet; - } - - public HashSet getIpPortSetAsHashSet() { + + public HashSet getIpPortSet() { return new HashSet<>(ipPortSet); } diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java index 8dba63be787..1eedde502e1 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java @@ -101,7 +101,7 @@ public SinkMetricData(MetricOption option, MetricGroup metricGroup) { } if (option.getIpPorts().isPresent()) { - AuditOperator.getInstance().setAuditProxy(option.getIpPortSetAsHashSet()); + AuditOperator.getInstance().setAuditProxy(option.getIpPortSet()); this.auditOperator = AuditOperator.getInstance(); this.auditKeys = option.getInlongAuditKeys(); } diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java index 76996c7fadf..0d2035e71cc 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java @@ -104,7 +104,7 @@ public SourceMetricData(MetricOption option, MetricGroup metricGroup) { } if (option.getIpPorts().isPresent()) { - AuditOperator.getInstance().setAuditProxy(option.getIpPortSetAsHashSet()); + AuditOperator.getInstance().setAuditProxy(option.getIpPortSet()); this.auditOperator = AuditOperator.getInstance(); this.auditKeys = option.getInlongAuditKeys(); } @@ -114,7 +114,7 @@ public SourceMetricData(MetricOption option) { this.labels = option.getLabels(); if (option.getIpPorts().isPresent()) { - AuditOperator.getInstance().setAuditProxy(option.getIpPortSetAsHashSet()); + AuditOperator.getInstance().setAuditProxy(option.getIpPortSet()); this.auditOperator = AuditOperator.getInstance(); this.auditKeys = option.getInlongAuditKeys(); } From 3cdda656fc70622b49dbdcbbb008e1915598ba18 Mon Sep 17 00:00:00 2001 From: vinnerzhang Date: Thu, 9 May 2024 15:55:26 +0800 Subject: [PATCH 6/6] [INLONG-10152][Sort] fix defect. --- .../java/org/apache/inlong/sort/base/metric/MetricOption.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java index 1fb01ffac80..eb01d9f1658 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java @@ -83,7 +83,7 @@ private MetricOption( public Map getLabels() { return labels; } - + public HashSet getIpPortSet() { return new HashSet<>(ipPortSet); }