Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature] integrate with Apache Arrow #2864

Merged
merged 40 commits into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
cafbac7
[feature] integrate with Apache Arrow (#2860)
Calvin979 Dec 8, 2024
2a3906a
[refactor] Delete Field and ValueRow in collect_rep.proto
Calvin979 Dec 8, 2024
e4371ad
[refactor] complete comment on class and method
Calvin979 Dec 8, 2024
b51ce8f
[improve] Add Apache Arrow JDK options in scripts
Calvin979 Dec 8, 2024
25ef745
[docs] update start backend docs
Calvin979 Dec 9, 2024
d20389b
[improve] optimize RowWrapper API
Calvin979 Dec 9, 2024
4e40411
[improve] add apache header
Calvin979 Dec 9, 2024
6dde4b5
[test] fix e2e test
Calvin979 Dec 9, 2024
3c01475
[test] fix e2e test
Calvin979 Dec 9, 2024
568993b
[test] fix e2e test
Calvin979 Dec 9, 2024
ecf8467
[test] fix e2e test
Calvin979 Dec 9, 2024
484dac8
Merge branch 'master' into feature/integrate_apache_arrow
yuluo-yx Dec 10, 2024
b0d3425
Merge branch 'master' into feature/integrate_apache_arrow
Calvin979 Dec 12, 2024
d2f246e
Merge branch 'master' into feature/integrate_apache_arrow
Calvin979 Dec 14, 2024
0245e79
[bugfix] fix calculate expr error
tomsun28 Dec 15, 2024
312fff2
[improve] update CollectRep
tomsun28 Dec 15, 2024
3469c82
Merge branch 'master' into feature/integrate_apache_arrow
Aias00 Dec 16, 2024
138e4ef
[refactor] use ArrowVector as in-memory structure
Calvin979 Dec 19, 2024
8fc6d17
Merge branch 'master' into feature/integrate_apache_arrow
yuluo-yx Dec 21, 2024
42145a1
[improve] update apache arrow integrate
tomsun28 Dec 22, 2024
1507a4d
[improve] update apache arrow integrate
tomsun28 Dec 23, 2024
870be62
[improve] update apache arrow integrate
tomsun28 Dec 23, 2024
a51e74a
[improve] update apache arrow integrate
tomsun28 Dec 23, 2024
622f597
[improve] update apache arrow integrate
tomsun28 Dec 23, 2024
f85d046
[improve] update apache arrow integrate
tomsun28 Dec 23, 2024
c2462bf
Merge branch 'master' into feature/integrate_apache_arrow
tomsun28 Dec 23, 2024
05312d8
[improve] update apache arrow integrate
tomsun28 Dec 23, 2024
a0f20f4
Add @Rancho-7 as a contributor
tomsun28 Dec 23, 2024
e971090
Add @doveLin0818 as a contributor
tomsun28 Dec 23, 2024
1bf376f
[improve] update apache arrow integrate
tomsun28 Dec 23, 2024
22a687c
[refactor] delete unused class of Arrow
Calvin979 Dec 23, 2024
2c621a1
Merge branch 'master' into feature/integrate_apache_arrow
Calvin979 Dec 23, 2024
8b40ba4
[improve] update apache arrow integrate
tomsun28 Dec 24, 2024
af5d2a5
[improve] update apache arrow integrate
tomsun28 Dec 24, 2024
70fb091
[improve] update apache arrow integrate
tomsun28 Dec 24, 2024
9abbe90
[improve] update apache arrow integrate
tomsun28 Dec 24, 2024
e2c4764
Add @yunfan24 as a contributor
tomsun28 Dec 24, 2024
df073d9
[improve] update apache arrow integrate
tomsun28 Dec 24, 2024
e4dda4f
[improve] update apache arrow integrate
tomsun28 Dec 24, 2024
2926573
Merge branch 'master' into feature/integrate_apache_arrow
tomsun28 Dec 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ Detailed config refer to [Install HertzBeat via Package](https://hertzbeat.apach
##### 3:Start via source code

1. Local source code debugging needs to start the back-end project `manager` and the front-end project `web-app`.
2. Backend:need `maven3+`, `java17`, `lombok`, start the `manager` service.
2. Backend:need `maven3+`, `java17`, `lombok`, add VM options in IDE: ` --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED `, then start the `manager` service.
3. Web:need `nodejs npm angular-cli` environment, Run `ng serve --open` in `web-app` directory after backend startup.
4. Access `http://localhost:4200` to start, default account: `admin/hertzbeat`

Expand Down
2 changes: 1 addition & 1 deletion README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@
##### 方式三:本地代码启动

1. 此为前后端分离项目,本地代码调试需要分别启动后端工程 `manager` 和前端工程 `web-app`
2. 后端:需要 `maven3+`, `java17` 和 `lombok` 环境,修改 `YML` 配置信息并启动 `manager` 服务
2. 后端:需要 `maven3+`, `java17` 和 `lombok` 环境,修改 `YML` 配置信息,添加JVM参数`--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED`后启动 `manager` 服务即可。
3. 前端:需要 `nodejs npm angular-cli`环境,待本地后端启动后,在 `web-app` 目录下启动 `ng serve --open`
4. 浏览器访问 `http://localhost:4200` 即可开始,默认账号密码 `admin/hertzbeat`

Expand Down
8 changes: 8 additions & 0 deletions hertzbeat-alerter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@
<version>${easy-poi.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-netty</artifactId>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
import static org.apache.hertzbeat.common.constants.CommonConstants.TAG_MONITOR_APP;
import static org.apache.hertzbeat.common.constants.CommonConstants.TAG_MONITOR_ID;
import static org.apache.hertzbeat.common.constants.CommonConstants.TAG_MONITOR_NAME;

import com.google.common.collect.Maps;
import jakarta.persistence.criteria.Predicate;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -45,8 +48,12 @@
import org.apache.hertzbeat.alert.service.AlertService;
import org.apache.hertzbeat.alert.util.AlertTemplateUtil;
import org.apache.hertzbeat.common.constants.CommonConstants;
import org.apache.hertzbeat.common.constants.MetricDataFieldConstants;
import org.apache.hertzbeat.common.entity.alerter.Alert;
import org.apache.hertzbeat.common.entity.alerter.AlertDefine;
import org.apache.hertzbeat.common.entity.arrow.ArrowVectorReader;
import org.apache.hertzbeat.common.entity.arrow.ArrowVectorReaderImpl;
import org.apache.hertzbeat.common.entity.arrow.RowWrapper;
import org.apache.hertzbeat.common.entity.manager.Monitor;
import org.apache.hertzbeat.common.entity.manager.TagItem;
import org.apache.hertzbeat.common.entity.message.CollectRep;
Expand Down Expand Up @@ -155,94 +162,100 @@ private void calculate(CollectRep.MetricsData metricsData) {
if (defineMap.isEmpty()) {
return;
}
List<CollectRep.Field> fields = metricsData.getFieldsList();
Map<String, Object> fieldValueMap = new HashMap<>(8);
int valueRowCount = metricsData.getValuesCount();
for (Map.Entry<String, List<AlertDefine>> entry : defineMap.entrySet()) {
List<AlertDefine> defines = entry.getValue();
for (AlertDefine define : defines) {
final String expr = define.getExpr();
if (StringUtils.isBlank(expr)) {
continue;
}
if (expr.contains(SYSTEM_VALUE_ROW_COUNT) && metricsData.getValuesCount() == 0) {
fieldValueMap.put(SYSTEM_VALUE_ROW_COUNT, valueRowCount);
try {
boolean match = execAlertExpression(fieldValueMap, expr);
try {
if (match) {
// If the threshold rule matches, the number of times the threshold has been triggered is determined and an alarm is triggered
afterThresholdRuleMatch(currentTimeMilli, monitorId, app, metrics, "", fieldValueMap, define);
// if the threshold is triggered, ignore other data rows
continue;
} else {
String alarmKey = String.valueOf(monitorId) + define.getId();
triggeredAlertMap.remove(alarmKey);
if (define.isRecoverNotice()) {
handleRecoveredAlert(currentTimeMilli, define, expr, alarmKey);
}
}
} catch (Exception e) {
log.error(e.getMessage(), e);
}
} catch (Exception ignored) {}
}
for (CollectRep.ValueRow valueRow : metricsData.getValuesList()) {
Map<String, Object> fieldValueMap = Maps.newHashMapWithExpectedSize(8);

if (CollectionUtils.isEmpty(valueRow.getColumnsList())) {
try (ArrowVectorReader arrowVectorReader = new ArrowVectorReaderImpl(metricsData.getData().toByteArray())) {
for (Map.Entry<String, List<AlertDefine>> entry : defineMap.entrySet()) {
List<AlertDefine> defines = entry.getValue();
for (AlertDefine define : defines) {
final String expr = define.getExpr();
if (StringUtils.isBlank(expr)) {
continue;
}
fieldValueMap.clear();
fieldValueMap.put(SYSTEM_VALUE_ROW_COUNT, valueRowCount);
StringBuilder tagBuilder = new StringBuilder();
for (int index = 0; index < valueRow.getColumnsList().size(); index++) {
String valueStr = valueRow.getColumns(index);
if (CommonConstants.NULL_VALUE.equals(valueStr)) {
continue;
}

final CollectRep.Field field = fields.get(index);
final int fieldType = field.getType();

if (fieldType == CommonConstants.TYPE_NUMBER) {
final Double doubleValue;
if ((doubleValue = CommonUtil.parseStrDouble(valueStr)) != null) {
fieldValueMap.put(field.getName(), doubleValue);
}
} else if (fieldType == CommonConstants.TYPE_TIME) {
final Integer integerValue;
if ((integerValue = CommonUtil.parseStrInteger(valueStr)) != null) {
fieldValueMap.put(field.getName(), integerValue);

if (expr.contains(SYSTEM_VALUE_ROW_COUNT) && arrowVectorReader.getRowCount() == 0L) {
fieldValueMap.put(SYSTEM_VALUE_ROW_COUNT, arrowVectorReader.getRowCount());
try {
boolean match = execAlertExpression(fieldValueMap, expr);
try {
if (match) {
// If the threshold rule matches, the number of times the threshold has been triggered is determined and an alarm is triggered
afterThresholdRuleMatch(currentTimeMilli, monitorId, app, metrics, "", fieldValueMap, define);
// if the threshold is triggered, ignore other data rows
continue;
} else {
String alarmKey = String.valueOf(monitorId) + define.getId();
triggeredAlertMap.remove(alarmKey);
if (define.isRecoverNotice()) {
handleRecoveredAlert(currentTimeMilli, define, expr, alarmKey);
}
}
} catch (Exception e) {
log.error(e.getMessage(), e);
}
} else {
if (StringUtils.isNotEmpty(valueStr)) {
fieldValueMap.put(field.getName(), valueStr);
} catch (Exception ignored) {}
}

RowWrapper rowWrapper = arrowVectorReader.readRow();
while (rowWrapper.hasNextRow()) {
rowWrapper = rowWrapper.nextRow();

String finalApp = app;
rowWrapper.foreachCell(cell -> {
fieldValueMap.clear();
fieldValueMap.put(SYSTEM_VALUE_ROW_COUNT, arrowVectorReader.getRowCount());
StringBuilder tagBuilder = new StringBuilder();

String valueStr = cell.getValue();
if (CommonConstants.NULL_VALUE.equals(valueStr)) {
return;
}
}

if (field.getLabel()) {
tagBuilder.append("-").append(valueStr);
}
}
try {
boolean match = execAlertExpression(fieldValueMap, expr);
try {
if (match) {
// If the threshold rule matches, the number of times the threshold has been triggered is determined and an alarm is triggered
afterThresholdRuleMatch(currentTimeMilli, monitorId, app, metrics, tagBuilder.toString(), fieldValueMap, define);
final String fieldName = cell.getField().getName();
final int fieldType = cell.getIntMetaData(MetricDataFieldConstants.TYPE);

if (fieldType == CommonConstants.TYPE_NUMBER) {
final Double doubleValue;
if ((doubleValue = CommonUtil.parseStrDouble(valueStr)) != null) {
fieldValueMap.put(fieldName, doubleValue);
}
} else if (fieldType == CommonConstants.TYPE_TIME) {
final Integer integerValue;
if ((integerValue = CommonUtil.parseStrInteger(valueStr)) != null) {
fieldValueMap.put(fieldName, integerValue);
}
} else {
String alarmKey = String.valueOf(monitorId) + define.getId() + tagBuilder;
triggeredAlertMap.remove(alarmKey);
if (define.isRecoverNotice()) {
handleRecoveredAlert(currentTimeMilli, define, expr, alarmKey);
if (StringUtils.isNotEmpty(valueStr)) {
fieldValueMap.put(fieldName, valueStr);
}
}
} catch (Exception e) {
log.error(e.getMessage(), e);
}
} catch (Exception ignored) {}

if (cell.getBooleanMetaData(MetricDataFieldConstants.LABEL)) {
tagBuilder.append("-").append(valueStr);
}
try {
boolean match = execAlertExpression(fieldValueMap, expr);
try {
if (match) {
// If the threshold rule matches, the number of times the threshold has been triggered is determined and an alarm is triggered
afterThresholdRuleMatch(currentTimeMilli, monitorId, finalApp, metrics, tagBuilder.toString(), fieldValueMap, define);
} else {
String alarmKey = String.valueOf(monitorId) + define.getId() + tagBuilder;
triggeredAlertMap.remove(alarmKey);
if (define.isRecoverNotice()) {
handleRecoveredAlert(currentTimeMilli, define, expr, alarmKey);
}
}
} catch (Exception e) {
log.error(e.getMessage(), e);
}
} catch (Exception ignored) {}
});
}
}
}
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}

Expand Down
Loading
Loading