Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Support customized JMX monitoring through the Factory Pattern. #2932

Merged
merged 25 commits into from
Jan 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
13384fb
[collector]feature:add CustomizedJmx feature
doveLin0818 Dec 28, 2024
ef5d11d
[collector]feature:add CustomizedJmx feature
doveLin0818 Jan 1, 2025
1b57dea
Merge remote-tracking branch 'origin/feature_CustomizedJmx' into feat…
doveLin0818 Jan 1, 2025
fc4e624
[collector]feature:add CustomizedJmx feature
doveLin0818 Jan 1, 2025
6a0e3da
[collector]feature:add CustomizedJmx feature
doveLin0818 Jan 1, 2025
5b1e6e3
[collector]feature:add CustomizedJmx feature
doveLin0818 Jan 1, 2025
e149f24
Merge branch 'master' into feature_JmxCustomization
doveLin0818 Jan 1, 2025
4148249
[collector]feature:add CustomizedJmx feature
doveLin0818 Jan 1, 2025
37484ce
Merge remote-tracking branch 'origin/feature_JmxCustomization' into f…
doveLin0818 Jan 1, 2025
8abc76b
[collector]feature:add CustomizedJmx feature
doveLin0818 Jan 1, 2025
63b5346
[collector]feature:add CustomizedJmx feature
doveLin0818 Jan 2, 2025
3957a60
[collector]feature:add CustomizedJmx feature
doveLin0818 Jan 2, 2025
71e7c73
[collector]feature:add CustomizedJmx feature
doveLin0818 Jan 2, 2025
f529a82
Merge branch 'master' into feature_JmxCustomization
doveLin0818 Jan 2, 2025
401355a
Merge branch 'master' into feature_JmxCustomization
doveLin0818 Jan 2, 2025
5da4b6b
Merge branch 'master' into feature_JmxCustomization
Aias00 Jan 3, 2025
898923e
Merge branch 'master' into feature_JmxCustomization
doveLin0818 Jan 3, 2025
19db92f
Merge branch 'master' into feature_JmxCustomization
Aias00 Jan 3, 2025
9ef88a4
[collector]feature:add CustomizedJmx feature
doveLin0818 Jan 3, 2025
0d145d2
Merge remote-tracking branch 'origin/feature_JmxCustomization' into f…
doveLin0818 Jan 3, 2025
9baf61a
Merge branch 'master' into feature_JmxCustomization
doveLin0818 Jan 4, 2025
c3c918c
Merge branch 'master' into feature_JmxCustomization
doveLin0818 Jan 4, 2025
97e9e19
[collector]feature:add CustomizedJmx feature
doveLin0818 Jan 5, 2025
34d2eb1
Merge remote-tracking branch 'origin/feature_JmxCustomization' into f…
doveLin0818 Jan 5, 2025
2020226
Merge branch 'master' into feature_JmxCustomization
doveLin0818 Jan 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hertzbeat.collector.collect.jmx;


import java.util.Collections;
import java.util.HashMap;

import java.util.Map;

import org.apache.commons.lang3.StringUtils;
import org.apache.hertzbeat.collector.collect.jmx.kafkajmx.KafkaJmxValidator;

/**
* Please register the components you need to customize here
*/
public class CustomizedJmxFactory {

// Map associating app names with their corresponding JmxValidator
private static final Map<String, JmxValidator> VALIDATOR_MAP;

static {
Map<String, JmxValidator> map = new HashMap<>();
// Register validator for "kafka"
map.put("kafka", new KafkaJmxValidator());

// Future validators for other apps can be added here
// Example for "zookeeper":
// map.put("zookeeper", new ZookeeperJmxValidator());

VALIDATOR_MAP = Collections.unmodifiableMap(map);
}

/**
* Validates the object name using the validator associated with the specified app.
*
* @param app The application name.
* @param objectName The JMX object name to validate.
* @return true if the validator deems the object name valid, false otherwise.
*/
public static boolean validate(String app, String objectName) {
if (StringUtils.isBlank(app) || StringUtils.isBlank(objectName)) {
return false;
}

JmxValidator validator = VALIDATOR_MAP.get(app.toLowerCase());
if (validator == null) {
return false;
}

return validator.isValid(objectName);
}

/**
* Retrieves the appropriate processor for the given object name using the validator associated with the specified app.
*
* @param app The application name.
* @param objectName The JMX object name for which to retrieve the processor.
* @return The corresponding MbeanProcessor instance, or null if none are applicable.
*/
public static MbeanProcessor getProcessor(String app, String objectName) {
if (StringUtils.isBlank(app) || StringUtils.isBlank(objectName)) {
return null;
}

JmxValidator validator = VALIDATOR_MAP.get(app.toLowerCase());
if (validator == null) {
return null;
}

return validator.getProcessor(objectName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class JmxCollectImpl extends AbstractCollect {
private final GlobalConnectionCache connectionCommonCache = GlobalConnectionCache.getInstance();

private final ClassLoader jmxClassLoader;


public JmxCollectImpl() {
jmxClassLoader = new JmxClassLoader(ClassLoader.getSystemClassLoader());
Expand All @@ -95,16 +96,25 @@ public void collect(CollectRep.MetricsData.Builder builder, Metrics metrics) {
Thread.currentThread().setContextClassLoader(jmxClassLoader);
try {
JmxProtocol jmxProtocol = metrics.getJmx();

// Whether to use customized JMX
MbeanProcessor processor = null;
if (CustomizedJmxFactory.validate(builder.getApp(), jmxProtocol.getObjectName())) {
processor = CustomizedJmxFactory.getProcessor(builder.getApp(), jmxProtocol.getObjectName());
if (processor != null) {
processor.preProcess(builder, metrics);
if (processor.isCollectionComplete()) {
return;
}
}
}
// Create a jndi remote connection
JMXConnector jmxConnector = getConnectSession(jmxProtocol);

MBeanServerConnection serverConnection = jmxConnector.getMBeanServerConnection();
ObjectName objectName = new ObjectName(jmxProtocol.getObjectName());

Set<ObjectInstance> objectInstanceSet = serverConnection.queryMBeans(objectName, null);
Set<String> attributeNameSet = metrics.getAliasFields().stream()
.map(field -> field.split(SUB_ATTRIBUTE)[0]).collect(Collectors.toSet());
Set<ObjectInstance> objectInstanceSet = serverConnection.queryMBeans(objectName, null);
for (ObjectInstance objectInstance : objectInstanceSet) {
ObjectName currentObjectName = objectInstance.getObjectName();
MBeanInfo beanInfo = serverConnection.getMBeanInfo(currentObjectName);
Expand All @@ -118,11 +128,18 @@ public void collect(CollectRep.MetricsData.Builder builder, Metrics metrics) {

Map<String, String> attributeValueMap = extractAttributeValue(attributeList);
CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder();
if (processor != null) {
processor.process(serverConnection, objectInstance, objectInstanceSet,
currentObjectName, attributeValueMap, valueRowBuilder);
}
for (String aliasField : metrics.getAliasFields()) {
String fieldValue = attributeValueMap.get(aliasField);
valueRowBuilder.addColumn(fieldValue != null ? fieldValue : CommonConstants.NULL_VALUE);
}
builder.addValueRow(valueRowBuilder.build());
if (processor != null && processor.isCollectionComplete()) {
return;
}
}
} catch (IOException exception) {
String errorMsg = CommonUtil.getMessageFromThrowable(exception);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hertzbeat.collector.collect.jmx;

/**
* If you want to customize JMX, please implement this interface
*/

public interface JmxValidator {
/**
* Validates whether the given object name matches the validator's criteria.
*
* @param objectName The JMX object name to validate.
* @return true if valid, false otherwise.
*/
boolean isValid(String objectName);

/**
* Retrieves the appropriate MBeanProcessor for the given object name.
*
* @param objectName The JMX object name.
* @return The corresponding MBeanProcessor instance, or null if not applicable.
*/
MbeanProcessor getProcessor(String objectName);
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hertzbeat.collector.collect.jmx;

import java.util.Map;
import java.util.Set;

import javax.management.MBeanServerConnection;
import javax.management.ObjectInstance;
import javax.management.ObjectName;

import org.apache.hertzbeat.common.entity.job.Metrics;
import org.apache.hertzbeat.common.entity.message.CollectRep;

/**
* If you want to customize JMX, please implement this interface
*/

public interface MbeanProcessor {

/**
* Theoretically, any customized requirement can be handled in preProcess, bypassing the general JMX collection method.
*/
void preProcess(CollectRep.MetricsData.Builder builder, Metrics metrics);

/**
* Additional customized tasks, depending on the general JMX collection method.
*/
void process(MBeanServerConnection serverConnection, ObjectInstance objectInstance,
Set<ObjectInstance> objectInstanceSet, ObjectName objectName, Map<String, String> attributeValueMap, CollectRep.ValueRow.Builder valueRowBuilder);

/**
* Indicator of whether the collection is complete.
*/
Boolean isCollectionComplete();
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hertzbeat.collector.collect.jmx.kafkajmx;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.hertzbeat.collector.collect.jmx.JmxValidator;
import org.apache.hertzbeat.collector.collect.jmx.MbeanProcessor;
import org.apache.hertzbeat.collector.collect.jmx.kafkajmx.kafkaprocessor.KafkaBytesInAndOutPerSecProcessor;
import org.apache.hertzbeat.collector.collect.jmx.kafkajmx.kafkaprocessor.KafkaCommonProcessor;
import org.apache.hertzbeat.collector.collect.jmx.kafkajmx.kafkaprocessor.KafkaReplicaManageProcessor;



import java.util.function.Supplier;


/**
* KafkaJmxValidator
*/

public class KafkaJmxValidator implements JmxValidator {

// Map of object name patterns to their corresponding processor suppliers
private static final Map<String, Supplier<MbeanProcessor>> OBJECT_NAME_MAP;

static {
Map<String, Supplier<MbeanProcessor>> map = new HashMap<>();
map.put("kafka\\.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=\\*",
KafkaBytesInAndOutPerSecProcessor::new);
map.put("kafka\\.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=\\*",
KafkaBytesInAndOutPerSecProcessor::new);
map.put("kafka\\.server:type=ReplicaManager,name=\\*",
KafkaReplicaManageProcessor::new);
map.put("kafka\\.controller:type=KafkaController,name=\\*",
KafkaCommonProcessor::new);
map.put("kafka\\..*:type=GroupMetadataManager,name=\\*",
KafkaCommonProcessor::new);
OBJECT_NAME_MAP = Collections.unmodifiableMap(map);
}

@Override
public boolean isValid(String objectName) {
if (StringUtils.isBlank(objectName)) {
return false;
}
return OBJECT_NAME_MAP.keySet().stream()
.anyMatch(pattern -> objectName.matches(pattern));
}

@Override
public MbeanProcessor getProcessor(String objectName) {
if (StringUtils.isBlank(objectName)) {
return null;
}
return OBJECT_NAME_MAP.entrySet().stream()
.filter(entry -> objectName.matches(entry.getKey()))
.map(entry -> entry.getValue().get())
.findFirst()
.orElse(null);
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hertzbeat.collector.collect.jmx.kafkajmx.kafkaprocessor;

import java.util.Map;
import java.util.Set;

import javax.management.MBeanServerConnection;
import javax.management.ObjectInstance;
import javax.management.ObjectName;

import org.apache.hertzbeat.collector.collect.jmx.MbeanProcessor;
import org.apache.hertzbeat.common.entity.job.Metrics;
import org.apache.hertzbeat.common.entity.message.CollectRep;
import org.apache.hertzbeat.common.entity.message.CollectRep.MetricsData.Builder;

/**
* kafkaBytesInAndOutPerSecProcessor
*/
public class KafkaBytesInAndOutPerSecProcessor implements MbeanProcessor {

Boolean completeFlag = false;

@Override
public void preProcess(Builder builder, Metrics metrics) {

}

@Override
public void process(MBeanServerConnection serverConnection,
ObjectInstance objectInstance, Set<ObjectInstance> objectInstanceSet, ObjectName objectName,
Map<String, String> attributeValueMap, CollectRep.ValueRow.Builder valueRowBuilder) {
String topic = objectName.getKeyProperty("topic");
attributeValueMap.put("topic", topic);
}

@Override
public Boolean isCollectionComplete() {
return completeFlag;
}

}

Loading
Loading