Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature] integrate with Apache Arrow #2860

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions hertzbeat-alerter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@
<version>${easy-poi.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-netty</artifactId>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
import static org.apache.hertzbeat.common.constants.CommonConstants.TAG_MONITOR_APP;
import static org.apache.hertzbeat.common.constants.CommonConstants.TAG_MONITOR_ID;
import static org.apache.hertzbeat.common.constants.CommonConstants.TAG_MONITOR_NAME;

import com.google.common.collect.Maps;
import jakarta.persistence.criteria.Predicate;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -45,8 +48,12 @@
import org.apache.hertzbeat.alert.service.AlertService;
import org.apache.hertzbeat.alert.util.AlertTemplateUtil;
import org.apache.hertzbeat.common.constants.CommonConstants;
import org.apache.hertzbeat.common.constants.MetricDataFieldConstants;
import org.apache.hertzbeat.common.entity.alerter.Alert;
import org.apache.hertzbeat.common.entity.alerter.AlertDefine;
import org.apache.hertzbeat.common.entity.arrow.ArrowVectorReader;
import org.apache.hertzbeat.common.entity.arrow.ArrowVectorReaderImpl;
import org.apache.hertzbeat.common.entity.arrow.RowWrapper;
import org.apache.hertzbeat.common.entity.manager.Monitor;
import org.apache.hertzbeat.common.entity.manager.TagItem;
import org.apache.hertzbeat.common.entity.message.CollectRep;
Expand Down Expand Up @@ -155,94 +162,100 @@ private void calculate(CollectRep.MetricsData metricsData) {
if (defineMap.isEmpty()) {
return;
}
List<CollectRep.Field> fields = metricsData.getFieldsList();
Map<String, Object> fieldValueMap = new HashMap<>(8);
int valueRowCount = metricsData.getValuesCount();
for (Map.Entry<String, List<AlertDefine>> entry : defineMap.entrySet()) {
List<AlertDefine> defines = entry.getValue();
for (AlertDefine define : defines) {
final String expr = define.getExpr();
if (StringUtils.isBlank(expr)) {
continue;
}
if (expr.contains(SYSTEM_VALUE_ROW_COUNT) && metricsData.getValuesCount() == 0) {
fieldValueMap.put(SYSTEM_VALUE_ROW_COUNT, valueRowCount);
try {
boolean match = execAlertExpression(fieldValueMap, expr);
try {
if (match) {
// If the threshold rule matches, the number of times the threshold has been triggered is determined and an alarm is triggered
afterThresholdRuleMatch(currentTimeMilli, monitorId, app, metrics, "", fieldValueMap, define);
// if the threshold is triggered, ignore other data rows
continue;
} else {
String alarmKey = String.valueOf(monitorId) + define.getId();
triggeredAlertMap.remove(alarmKey);
if (define.isRecoverNotice()) {
handleRecoveredAlert(currentTimeMilli, define, expr, alarmKey);
}
}
} catch (Exception e) {
log.error(e.getMessage(), e);
}
} catch (Exception ignored) {}
}
for (CollectRep.ValueRow valueRow : metricsData.getValuesList()) {
Map<String, Object> fieldValueMap = Maps.newHashMapWithExpectedSize(8);

if (CollectionUtils.isEmpty(valueRow.getColumnsList())) {
try (ArrowVectorReader arrowVectorReader = new ArrowVectorReaderImpl(metricsData.getData().toByteArray())) {
for (Map.Entry<String, List<AlertDefine>> entry : defineMap.entrySet()) {
List<AlertDefine> defines = entry.getValue();
for (AlertDefine define : defines) {
final String expr = define.getExpr();
if (StringUtils.isBlank(expr)) {
continue;
}
fieldValueMap.clear();
fieldValueMap.put(SYSTEM_VALUE_ROW_COUNT, valueRowCount);
StringBuilder tagBuilder = new StringBuilder();
for (int index = 0; index < valueRow.getColumnsList().size(); index++) {
String valueStr = valueRow.getColumns(index);
if (CommonConstants.NULL_VALUE.equals(valueStr)) {
continue;
}

final CollectRep.Field field = fields.get(index);
final int fieldType = field.getType();

if (fieldType == CommonConstants.TYPE_NUMBER) {
final Double doubleValue;
if ((doubleValue = CommonUtil.parseStrDouble(valueStr)) != null) {
fieldValueMap.put(field.getName(), doubleValue);
}
} else if (fieldType == CommonConstants.TYPE_TIME) {
final Integer integerValue;
if ((integerValue = CommonUtil.parseStrInteger(valueStr)) != null) {
fieldValueMap.put(field.getName(), integerValue);

if (expr.contains(SYSTEM_VALUE_ROW_COUNT) && arrowVectorReader.getRowCount() == 0L) {
fieldValueMap.put(SYSTEM_VALUE_ROW_COUNT, arrowVectorReader.getRowCount());
try {
boolean match = execAlertExpression(fieldValueMap, expr);
try {
if (match) {
// If the threshold rule matches, the number of times the threshold has been triggered is determined and an alarm is triggered
afterThresholdRuleMatch(currentTimeMilli, monitorId, app, metrics, "", fieldValueMap, define);
// if the threshold is triggered, ignore other data rows
continue;
} else {
String alarmKey = String.valueOf(monitorId) + define.getId();
triggeredAlertMap.remove(alarmKey);
if (define.isRecoverNotice()) {
handleRecoveredAlert(currentTimeMilli, define, expr, alarmKey);
}
}
} catch (Exception e) {
log.error(e.getMessage(), e);
}
} else {
if (StringUtils.isNotEmpty(valueStr)) {
fieldValueMap.put(field.getName(), valueStr);
} catch (Exception ignored) {}
}

RowWrapper rowWrapper = arrowVectorReader.readRow();
while (rowWrapper.hasNextRow()) {
rowWrapper = rowWrapper.nextRow();

String finalApp = app;
rowWrapper.foreach(cell -> {
fieldValueMap.clear();
fieldValueMap.put(SYSTEM_VALUE_ROW_COUNT, arrowVectorReader.getRowCount());
StringBuilder tagBuilder = new StringBuilder();

String valueStr = cell.getValue();
if (CommonConstants.NULL_VALUE.equals(valueStr)) {
return;
}
}

if (field.getLabel()) {
tagBuilder.append("-").append(valueStr);
}
}
try {
boolean match = execAlertExpression(fieldValueMap, expr);
try {
if (match) {
// If the threshold rule matches, the number of times the threshold has been triggered is determined and an alarm is triggered
afterThresholdRuleMatch(currentTimeMilli, monitorId, app, metrics, tagBuilder.toString(), fieldValueMap, define);
final String fieldName = cell.getField().getName();
final int fieldType = Integer.parseInt(cell.getMetadata().get(MetricDataFieldConstants.TYPE));

if (fieldType == CommonConstants.TYPE_NUMBER) {
final Double doubleValue;
if ((doubleValue = CommonUtil.parseStrDouble(valueStr)) != null) {
fieldValueMap.put(fieldName, doubleValue);
}
} else if (fieldType == CommonConstants.TYPE_TIME) {
final Integer integerValue;
if ((integerValue = CommonUtil.parseStrInteger(valueStr)) != null) {
fieldValueMap.put(fieldName, integerValue);
}
} else {
String alarmKey = String.valueOf(monitorId) + define.getId() + tagBuilder;
triggeredAlertMap.remove(alarmKey);
if (define.isRecoverNotice()) {
handleRecoveredAlert(currentTimeMilli, define, expr, alarmKey);
if (StringUtils.isNotEmpty(valueStr)) {
fieldValueMap.put(fieldName, valueStr);
}
}
} catch (Exception e) {
log.error(e.getMessage(), e);
}
} catch (Exception ignored) {}

if (Boolean.parseBoolean(cell.getMetadata().get(MetricDataFieldConstants.LABEL))) {
tagBuilder.append("-").append(valueStr);
}
try {
boolean match = execAlertExpression(fieldValueMap, expr);
try {
if (match) {
// If the threshold rule matches, the number of times the threshold has been triggered is determined and an alarm is triggered
afterThresholdRuleMatch(currentTimeMilli, monitorId, finalApp, metrics, tagBuilder.toString(), fieldValueMap, define);
} else {
String alarmKey = String.valueOf(monitorId) + define.getId() + tagBuilder;
triggeredAlertMap.remove(alarmKey);
if (define.isRecoverNotice()) {
handleRecoveredAlert(currentTimeMilli, define, expr, alarmKey);
}
}
} catch (Exception e) {
log.error(e.getMessage(), e);
}
} catch (Exception ignored) {}
});
}
}
}
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.hertzbeat.collector.collect.AbstractCollect;
import org.apache.hertzbeat.collector.collect.common.MetricsDataBuilder;
import org.apache.hertzbeat.collector.collect.common.cache.CacheIdentifier;
import org.apache.hertzbeat.collector.collect.common.cache.ConnectionCommonCache;
import org.apache.hertzbeat.collector.collect.common.cache.JdbcConnect;
import org.apache.hertzbeat.collector.constants.CollectorConstants;
import org.apache.hertzbeat.collector.dispatch.DispatchConstants;
import org.apache.hertzbeat.collector.util.CollectUtil;
import org.apache.hertzbeat.common.constants.CommonConstants;
import org.apache.hertzbeat.common.entity.job.Metrics;
import org.apache.hertzbeat.common.entity.job.protocol.JdbcProtocol;
import org.apache.hertzbeat.common.entity.message.CollectRep;
Expand Down Expand Up @@ -78,7 +78,8 @@ public void preCheck(Metrics metrics) throws IllegalArgumentException {
}

@Override
public void collect(CollectRep.MetricsData.Builder builder, long monitorId, String app, Metrics metrics) {
public void collect(MetricsDataBuilder metricsDataBuilder, Metrics metrics) {
final CollectRep.MetricsData.Builder builder = metricsDataBuilder.getBuilder();
long startTime = System.currentTimeMillis();
JdbcProtocol jdbcProtocol = metrics.getJdbc();
String databaseUrl = constructDatabaseUrl(jdbcProtocol);
Expand All @@ -88,9 +89,9 @@ public void collect(CollectRep.MetricsData.Builder builder, long monitorId, Stri
statement = getConnection(jdbcProtocol.getUsername(),
jdbcProtocol.getPassword(), databaseUrl, timeout);
switch (jdbcProtocol.getQueryType()) {
case QUERY_TYPE_ONE_ROW -> queryOneRow(statement, jdbcProtocol.getSql(), metrics.getAliasFields(), builder, startTime);
case QUERY_TYPE_MULTI_ROW -> queryMultiRow(statement, jdbcProtocol.getSql(), metrics.getAliasFields(), builder, startTime);
case QUERY_TYPE_COLUMNS -> queryOneRowByMatchTwoColumns(statement, jdbcProtocol.getSql(), metrics.getAliasFields(), builder, startTime);
case QUERY_TYPE_ONE_ROW -> queryOneRow(statement, jdbcProtocol.getSql(), metrics.getAliasFields(), metricsDataBuilder, startTime);
case QUERY_TYPE_MULTI_ROW -> queryMultiRow(statement, jdbcProtocol.getSql(), metrics.getAliasFields(), metricsDataBuilder, startTime);
case QUERY_TYPE_COLUMNS -> queryOneRowByMatchTwoColumns(statement, jdbcProtocol.getSql(), metrics.getAliasFields(), metricsDataBuilder, startTime);
case RUN_SCRIPT -> {
Connection connection = statement.getConnection();
FileSystemResource rc = new FileSystemResource(jdbcProtocol.getSql());
Expand Down Expand Up @@ -193,22 +194,11 @@ private Statement getConnection(String username, String password, String url, In
* @throws Exception when error happen
*/
private void queryOneRow(Statement statement, String sql, List<String> columns,
CollectRep.MetricsData.Builder builder, long startTime) throws Exception {
MetricsDataBuilder metricsDataBuilder, long startTime) throws Exception {
statement.setMaxRows(1);
try (ResultSet resultSet = statement.executeQuery(sql)) {
if (resultSet.next()) {
CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder();
for (String column : columns) {
if (CollectorConstants.RESPONSE_TIME.equals(column)) {
long time = System.currentTimeMillis() - startTime;
valueRowBuilder.addColumns(String.valueOf(time));
} else {
String value = resultSet.getString(column);
value = value == null ? CommonConstants.NULL_VALUE : value;
valueRowBuilder.addColumns(value);
}
}
builder.addValues(valueRowBuilder.build());
addMetricsDataByResultSet(columns, metricsDataBuilder, startTime, resultSet);
}
}
}
Expand All @@ -229,26 +219,35 @@ private void queryOneRow(Statement statement, String sql, List<String> columns,
* @throws Exception when error happen
*/
private void queryOneRowByMatchTwoColumns(Statement statement, String sql, List<String> columns,
CollectRep.MetricsData.Builder builder, long startTime) throws Exception {
MetricsDataBuilder metricsDataBuilder, long startTime) throws Exception {
try (ResultSet resultSet = statement.executeQuery(sql)) {
HashMap<String, String> values = new HashMap<>(columns.size());
while (resultSet.next()) {
if (resultSet.getString(1) != null) {
values.put(resultSet.getString(1).toLowerCase().trim(), resultSet.getString(2));
}
}
CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder();

for (String column : columns) {
if (CollectorConstants.RESPONSE_TIME.equals(column)) {
long time = System.currentTimeMillis() - startTime;
valueRowBuilder.addColumns(String.valueOf(time));
metricsDataBuilder.getArrowVectorWriter().setValue(column, String.valueOf(time));
} else {
String value = values.get(column.toLowerCase());
value = value == null ? CommonConstants.NULL_VALUE : value;
valueRowBuilder.addColumns(value);
metricsDataBuilder.getArrowVectorWriter().setValue(column, value);
}
}
builder.addValues(valueRowBuilder.build());
}
}

private void addMetricsDataByResultSet(List<String> columns, MetricsDataBuilder metricsDataBuilder, long startTime, ResultSet resultSet) throws SQLException {
for (String column : columns) {
if (CollectorConstants.RESPONSE_TIME.equalsIgnoreCase(column)) {
long time = System.currentTimeMillis() - startTime;
metricsDataBuilder.getArrowVectorWriter().setValue(column, String.valueOf(time));
} else {
metricsDataBuilder.getArrowVectorWriter().setValue(column, resultSet.getString(column));
}
}
}

Expand All @@ -264,21 +263,10 @@ private void queryOneRowByMatchTwoColumns(Statement statement, String sql, List<
* @throws Exception when error happen
*/
private void queryMultiRow(Statement statement, String sql, List<String> columns,
CollectRep.MetricsData.Builder builder, long startTime) throws Exception {
MetricsDataBuilder metricsDataBuilder, long startTime) throws Exception {
try (ResultSet resultSet = statement.executeQuery(sql)) {
while (resultSet.next()) {
CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder();
for (String column : columns) {
if (CollectorConstants.RESPONSE_TIME.equals(column)) {
long time = System.currentTimeMillis() - startTime;
valueRowBuilder.addColumns(String.valueOf(time));
} else {
String value = resultSet.getString(column);
value = value == null ? CommonConstants.NULL_VALUE : value;
valueRowBuilder.addColumns(value);
}
}
builder.addValues(valueRowBuilder.build());
addMetricsDataByResultSet(columns, metricsDataBuilder, startTime, resultSet);
}
}
}
Expand Down
Loading
Loading