Skip to content

Commit

Permalink
Merge branch 'apache:master' into INLONG-11349
Browse files Browse the repository at this point in the history
  • Loading branch information
qy-liuhuo authored Oct 30, 2024
2 parents ed4ea5e + 43f690b commit a279878
Show file tree
Hide file tree
Showing 394 changed files with 8,041 additions and 7,043 deletions.
8 changes: 0 additions & 8 deletions docker/kubernetes/templates/tubemq-broker-ini-configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,4 @@ data:
loadMessageStoresInParallel=true
consumerRegTimeoutMs=35000
[zookeeper]
zkNodeRoot=/tubemq
zkServerAddr={{ template "inlong.zookeeper.hostname" . }}:{{ .Values.zookeeper.ports.client }}
zkSessionTimeoutMs=30000
zkConnectionTimeoutMs=30000
zkSyncTimeMs=5000
zkCommitPeriodMs=5000
zkCommitFailRetries=10
{{- end }}
Original file line number Diff line number Diff line change
Expand Up @@ -141,30 +141,24 @@ public String getFieldsString() {
private String systemStartupTime = ExcuteLinux.exeCmd("uptime -s").replaceAll("\r|\n", "");

private AgentStatusManager(AgentManager agentManager) {
this.agentManager = agentManager;
this.conf = AgentConfiguration.getAgentConf();
threadBean = ManagementFactory.getThreadMXBean();
this.agentManager = agentManager;
}

public static AgentStatusManager getInstance(AgentManager agentManager) {
if (manager == null) {
synchronized (AgentStatusManager.class) {
if (manager == null) {
manager = new AgentStatusManager(agentManager);
}
public static void init(AgentManager agentManager) {
synchronized (AgentStatusManager.class) {
if (manager == null) {
manager = new AgentStatusManager(agentManager);
}
}
return manager;
}

public static AgentStatusManager getInstance() {
if (manager == null) {
throw new RuntimeException("HeartbeatManager has not been initialized by agentManager");
}
private static AgentStatusManager getInstance() {
return manager;
}

public void sendStatusMsg(DefaultMessageSender sender) {
private void doSendStatusMsg(DefaultMessageSender sender) {
AgentStatus data = AgentStatusManager.getInstance().getStatus();
LOGGER.info("status detail: {}", data);
if (sender == null) {
Expand All @@ -180,6 +174,12 @@ public void sendStatusMsg(DefaultMessageSender sender) {
}
}

public static void sendStatusMsg(DefaultMessageSender sender) {
if (AgentStatusManager.getInstance() != null) {
AgentStatusManager.getInstance().doSendStatusMsg(sender);
}
}

private double getProcessCpu() {
double cpu = tryGetProcessCpu();
int tryTimes = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,27 +92,24 @@ public String getFieldsString() {
private final AgentConfiguration conf;
protected BlockingQueue<FileStatic> queue;

private FileStaticManager(AgentManager agentManager) {
private FileStaticManager() {
this.conf = AgentConfiguration.getAgentConf();
queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE);
}

public static FileStaticManager getInstance(AgentManager agentManager) {
if (manager == null) {
synchronized (FileStaticManager.class) {
if (manager == null) {
manager = new FileStaticManager(agentManager);
}
public static void init() {
synchronized (FileStaticManager.class) {
if (manager == null) {
manager = new FileStaticManager();
}
}
return manager;
}

public static FileStaticManager getInstance() {
private static FileStaticManager getInstance() {
return manager;
}

public void putStaticMsg(FileStatic data) {
private void doPutStaticMsg(FileStatic data) {
data.setAgentIp(AgentUtils.fetchLocalIp());
data.setTag(conf.get(AGENT_CLUSTER_TAG));
data.setCluster(conf.get(AGENT_CLUSTER_NAME));
Expand All @@ -121,7 +118,13 @@ public void putStaticMsg(FileStatic data) {
}
}

public void sendStaticMsg(DefaultMessageSender sender) {
public static void putStaticMsg(FileStatic data) {
if (FileStaticManager.getInstance() != null) {
FileStaticManager.getInstance().doPutStaticMsg(data);
}
}

private void doSendStaticMsg(DefaultMessageSender sender) {
while (!queue.isEmpty()) {
FileStatic data = queue.poll();
LOGGER.info("file static detail: {}", data);
Expand All @@ -138,4 +141,10 @@ public void sendStaticMsg(DefaultMessageSender sender) {
}
}
}

public static void sendStaticMsg(DefaultMessageSender sender) {
if (FileStaticManager.getInstance() != null) {
FileStaticManager.getInstance().doSendStaticMsg(sender);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,6 @@ private HeartbeatManager(AgentManager agentManager) {
baseManagerUrl = httpManager.getBaseUrl();
reportHeartbeatUrl = buildReportHeartbeatUrl(baseManagerUrl);
createMessageSender();
AgentStatusManager.getInstance(agentManager);
FileStaticManager.getInstance(agentManager);
}

public static HeartbeatManager getInstance(AgentManager agentManager) {
Expand Down Expand Up @@ -126,8 +124,8 @@ private Runnable heartbeatReportThread() {
if (sender == null) {
createMessageSender();
}
AgentStatusManager.getInstance().sendStatusMsg(sender);
FileStaticManager.getInstance().sendStaticMsg(sender);
AgentStatusManager.sendStatusMsg(sender);
FileStaticManager.sendStaticMsg(sender);
} catch (Throwable e) {
LOGGER.error("interrupted while report heartbeat", e);
ThreadUtils.threadThrowableHandler(Thread.currentThread(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,6 @@ public boolean sourceExist() {
protected void releaseSource() {
if (randomAccessFile != null) {
try {
if (FileStaticManager.getInstance() == null) {
return;
}
FileStatic data = new FileStatic();
data.setTaskId(taskId);
data.setRetry(String.valueOf(profile.isRetry()));
Expand All @@ -342,7 +339,7 @@ protected void releaseSource() {
return;
}
data.setSendLines(offsetProfile.getOffset());
FileStaticManager.getInstance().putStaticMsg(data);
FileStaticManager.putStaticMsg(data);
randomAccessFile.close();
} catch (IOException e) {
LOGGER.error("close randomAccessFile error", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

public class ConfigManager {

Expand Down Expand Up @@ -94,12 +95,28 @@ public Map<String, String> getProperties(String fileName) {
return null;
}

public String getValue(String key) {
public <T> T getValue(String key, T defaultValue, Function<String, T> parser) {
ConfigHolder holder = holderMap.get(DEFAULT_CONFIG_PROPERTIES);
if (holder != null) {
return holder.getHolder().get(key);
if (holder == null) {
return defaultValue;
}
return null;
Object value = holder.getHolder().get(key);
if (value == null) {
return defaultValue;
}
try {
return parser.apply((String) value);
} catch (Exception e) {
return defaultValue;
}
}

public String getValue(String key, String defaultValue) {
return getValue(key, defaultValue, Function.identity());
}

public int getValue(String key, int defaultValue) {
return getValue(key, defaultValue, Integer::parseInt);
}

private boolean updatePropertiesHolder(Map<String, String> result,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.audit.metric;

public interface AbstractMetric {

public void report();
public void stop();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.audit.utils;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class NamedThreadFactory implements ThreadFactory {

private final String baseName;
private final AtomicInteger counter = new AtomicInteger(0);

public NamedThreadFactory(String baseName) {
this.baseName = baseName;
}

@Override
public Thread newThread(Runnable runnable) {
return new Thread(runnable, baseName + "-Thread-" + counter.getAndIncrement());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.audit.config;

/**
* Config constants
*/
public class ConfigConstants {

public static final String KEY_PROMETHEUS_PORT = "audit.proxy.prometheus.port";
public static final int DEFAULT_PROMETHEUS_PORT = 10082;
public static final String KEY_PROXY_METRIC_CLASSNAME = "audit.proxy.metric.classname";
public static final String DEFAULT_PROXY_METRIC_CLASSNAME =
"org.apache.inlong.audit.metric.prometheus.ProxyPrometheusMetric";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.audit.metric;

public enum MetricDimension {

RECEIVE_COUNT_SUCCESS("receiveCountSuccess"),
RECEIVE_PACK_SUCCESS("receivePackSuccess"),
RECEIVE_SIZE_SUCCESS("receiveSizeSuccess"),
RECEIVE_COUNT_INVALID("receiveCountInvalid"),
RECEIVE_COUNT_EXPIRED("receiveCountExpired"),
SEND_COUNT_SUCCESS("sendCountSuccess"),
SEND_COUNT_FAILED("sendCountFailed"),
SEND_PACK_SUCCESS("sendPackSuccess"),
SEND_DURATION("sendDuration");

private final String key;

MetricDimension(String key) {
this.key = key;
}

public String getKey() {
return key;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.audit.metric;

import lombok.Data;

import java.util.concurrent.atomic.AtomicLong;

@Data
public class MetricItem {

public static final String K_DIMENSION_KEY = "dimensionName";
private AtomicLong receiveCountSuccess = new AtomicLong(0);
private AtomicLong receivePackSuccess = new AtomicLong(0);
private AtomicLong receiveSizeSuccess = new AtomicLong(0);
private AtomicLong receiveCountInvalid = new AtomicLong(0);
private AtomicLong receiveCountExpired = new AtomicLong(0);
private AtomicLong sendCountSuccess = new AtomicLong(0);
private AtomicLong sendCountFailed = new AtomicLong(0);
public void resetAllMetrics() {
receiveCountSuccess.set(0);
receivePackSuccess.set(0);
receiveSizeSuccess.set(0);
receiveCountInvalid.set(0);
receiveCountExpired.set(0);
sendCountSuccess.set(0);
sendCountFailed.set(0);
}
}
Loading

0 comments on commit a279878

Please sign in to comment.