diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java index 39dde5d934..307c79f1da 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java @@ -141,30 +141,24 @@ public String getFieldsString() { private String systemStartupTime = ExcuteLinux.exeCmd("uptime -s").replaceAll("\r|\n", ""); private AgentStatusManager(AgentManager agentManager) { - this.agentManager = agentManager; this.conf = AgentConfiguration.getAgentConf(); threadBean = ManagementFactory.getThreadMXBean(); + this.agentManager = agentManager; } - public static AgentStatusManager getInstance(AgentManager agentManager) { - if (manager == null) { - synchronized (AgentStatusManager.class) { - if (manager == null) { - manager = new AgentStatusManager(agentManager); - } + public static void init(AgentManager agentManager) { + synchronized (AgentStatusManager.class) { + if (manager == null) { + manager = new AgentStatusManager(agentManager); } } - return manager; } - public static AgentStatusManager getInstance() { - if (manager == null) { - throw new RuntimeException("HeartbeatManager has not been initialized by agentManager"); - } + private static AgentStatusManager getInstance() { return manager; } - public void sendStatusMsg(DefaultMessageSender sender) { + private void doSendStatusMsg(DefaultMessageSender sender) { AgentStatus data = AgentStatusManager.getInstance().getStatus(); LOGGER.info("status detail: {}", data); if (sender == null) { @@ -180,6 +174,12 @@ public void sendStatusMsg(DefaultMessageSender sender) { } } + public static void sendStatusMsg(DefaultMessageSender sender) { + if (AgentStatusManager.getInstance() != null) { + AgentStatusManager.getInstance().doSendStatusMsg(sender); + } + } + private double getProcessCpu() { double cpu = tryGetProcessCpu(); int tryTimes = 0; diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/FileStaticManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/FileStaticManager.java index 7acd32941b..abda6a2eab 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/FileStaticManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/FileStaticManager.java @@ -92,27 +92,24 @@ public String getFieldsString() { private final AgentConfiguration conf; protected BlockingQueue queue; - private FileStaticManager(AgentManager agentManager) { + private FileStaticManager() { this.conf = AgentConfiguration.getAgentConf(); queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE); } - public static FileStaticManager getInstance(AgentManager agentManager) { - if (manager == null) { - synchronized (FileStaticManager.class) { - if (manager == null) { - manager = new FileStaticManager(agentManager); - } + public static void init() { + synchronized (FileStaticManager.class) { + if (manager == null) { + manager = new FileStaticManager(); } } - return manager; } - public static FileStaticManager getInstance() { + private static FileStaticManager getInstance() { return manager; } - public void putStaticMsg(FileStatic data) { + private void doPutStaticMsg(FileStatic data) { data.setAgentIp(AgentUtils.fetchLocalIp()); data.setTag(conf.get(AGENT_CLUSTER_TAG)); data.setCluster(conf.get(AGENT_CLUSTER_NAME)); @@ -121,7 +118,13 @@ public void putStaticMsg(FileStatic data) { } } - public void sendStaticMsg(DefaultMessageSender sender) { + public static void putStaticMsg(FileStatic data) { + if (FileStaticManager.getInstance() != null) { + FileStaticManager.getInstance().doPutStaticMsg(data); + } + } + + private void doSendStaticMsg(DefaultMessageSender sender) { while (!queue.isEmpty()) { FileStatic data = queue.poll(); LOGGER.info("file static detail: {}", data); @@ -138,4 +141,10 @@ public void sendStaticMsg(DefaultMessageSender sender) { } } } + + public static void sendStaticMsg(DefaultMessageSender sender) { + if (FileStaticManager.getInstance() != null) { + FileStaticManager.getInstance().doSendStaticMsg(sender); + } + } } \ No newline at end of file diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java index 1f833275eb..2a16bffb0a 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java @@ -75,8 +75,6 @@ private HeartbeatManager(AgentManager agentManager) { baseManagerUrl = httpManager.getBaseUrl(); reportHeartbeatUrl = buildReportHeartbeatUrl(baseManagerUrl); createMessageSender(); - AgentStatusManager.getInstance(agentManager); - FileStaticManager.getInstance(agentManager); } public static HeartbeatManager getInstance(AgentManager agentManager) { @@ -126,8 +124,8 @@ private Runnable heartbeatReportThread() { if (sender == null) { createMessageSender(); } - AgentStatusManager.getInstance().sendStatusMsg(sender); - FileStaticManager.getInstance().sendStaticMsg(sender); + AgentStatusManager.sendStatusMsg(sender); + FileStaticManager.sendStaticMsg(sender); } catch (Throwable e) { LOGGER.error("interrupted while report heartbeat", e); ThreadUtils.threadThrowableHandler(Thread.currentThread(), e); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java index 4adf59c97b..9ce20f6daa 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java @@ -323,9 +323,6 @@ public boolean sourceExist() { protected void releaseSource() { if (randomAccessFile != null) { try { - if (FileStaticManager.getInstance() == null) { - return; - } FileStatic data = new FileStatic(); data.setTaskId(taskId); data.setRetry(String.valueOf(profile.isRetry())); @@ -342,7 +339,7 @@ protected void releaseSource() { return; } data.setSendLines(offsetProfile.getOffset()); - FileStaticManager.getInstance().putStaticMsg(data); + FileStaticManager.putStaticMsg(data); randomAccessFile.close(); } catch (IOException e) { LOGGER.error("close randomAccessFile error", e);