Skip to content

Commit

Permalink
Merge branch 'master' into INLONG-10193
Browse files Browse the repository at this point in the history
  • Loading branch information
XiaoYou201 authored May 15, 2024
2 parents 9f32ac5 + c5dbfab commit 667bc10
Show file tree
Hide file tree
Showing 63 changed files with 2,224 additions and 270 deletions.
2 changes: 2 additions & 0 deletions bin/init-config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,14 @@ init_inlong_manager() {
$SED_COMMAND 's#jdbc:mysql://.*apache_inlong_manager#'''jdbc:mysql://${spring_datasource_hostname}:${spring_datasource_port}/apache_inlong_manager'''#g' application-dev.properties
$SED_COMMAND 's/spring.datasource.druid.username=.*/'''spring.datasource.druid.username=${spring_datasource_username}'''/g' application-dev.properties
$SED_COMMAND 's/spring.datasource.druid.password=.*/'''spring.datasource.druid.password=${spring_datasource_password}'''/g' application-dev.properties
$SED_COMMAND 's/metrics.audit.proxy.hosts=.*/'''metrics.audit.proxy.hosts=${audit_proxy_ip}:${audit_proxy_port}'''/g' application-dev.properties
$SED_COMMAND 's/audit.query.url=.*/'''audit.query.url=${audit_service_ip}:${audit_service_port}'''/g' application-dev.properties
fi
if [ $spring_profiles_active == "prod" ]; then
$SED_COMMAND 's#jdbc:mysql://.*apache_inlong_manager#'''jdbc:mysql://${spring_datasource_hostname}:${spring_datasource_port}/apache_inlong_manager'''#g' application-prod.properties
$SED_COMMAND 's/spring.datasource.druid.username=.*/'''spring.datasource.druid.username=${spring_datasource_username}'''/g' application-prod.properties
$SED_COMMAND 's/spring.datasource.druid.password=.*/'''spring.datasource.druid.password=${spring_datasource_password}'''/g' application-prod.properties
$SED_COMMAND 's/metrics.audit.proxy.hosts=.*/'''metrics.audit.proxy.hosts=${audit_proxy_ip}:${audit_proxy_port}'''/g' application-prod.properties
$SED_COMMAND 's/audit.query.url=.*/'''audit.query.url=${audit_service_ip}:${audit_service_port}'''/g' application-prod.properties
fi
echo "Init inlong manager flink plugin configuration"
Expand Down
2 changes: 0 additions & 2 deletions docker/docker-compose/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,6 @@ services:
- DATAPROXY_IP=dataproxy
- DATAPROXY_PORT=46801
- AUDIT_PROXY_URL=audit:10081
ports:
- "8008:8008"
volumes:
- ./collect-data:/data/collect-data

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ public class AgentConstants {
public static final String AGENT_FETCHER_CLASSNAME = "agent.fetcher.classname";
public static final String AGENT_CONF_PARENT = "agent.conf.parent";
public static final String DEFAULT_AGENT_CONF_PARENT = "conf";
public static final String AGENT_HTTP_PORT = "agent.http.port";
public static final int DEFAULT_AGENT_HTTP_PORT = 8008;
public static final String CHANNEL_MEMORY_CAPACITY = "channel.memory.capacity";
public static final int DEFAULT_CHANNEL_MEMORY_CAPACITY = 2000;
public static final String JOB_NUMBER_LIMIT = "job.number.limit";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,6 @@ public abstract class Instance extends AbstractStateWrapper {
* get instance id
*/
public abstract String getInstanceId();

public abstract long getLastHeartbeatTime();
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ public class TaskProfileDto {

private static final Gson GSON = new Gson();

public static final String deafult_time_offset = "0";

private Task task;
private Proxy proxy;

Expand Down Expand Up @@ -157,9 +159,10 @@ private static FileTask getFileTask(DataConfig dataConfig) {
fileTask.setCycleUnit(taskConfig.getCycleUnit());
fileTask.setStartTime(taskConfig.getStartTime());
fileTask.setEndTime(taskConfig.getEndTime());
fileTask.setProperties(GSON.toJson(taskConfig.getProperties()));
if (taskConfig.getTimeOffset() != null) {
fileTask.setTimeOffset(taskConfig.getTimeOffset());
} else {
fileTask.setTimeOffset(deafult_time_offset + fileTask.getCycleUnit());
}

if (taskConfig.getAdditionalAttr() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@
import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_IN_CHARGES;
import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME;
import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_TAG;
import static org.apache.inlong.agent.constant.AgentConstants.AGENT_HTTP_PORT;
import static org.apache.inlong.agent.constant.AgentConstants.AGENT_NODE_GROUP;
import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AGENT_HTTP_PORT;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_HEARTBEAT_HTTP_PATH;
import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_HEARTBEAT_HTTP_PATH;

Expand Down Expand Up @@ -148,15 +146,13 @@ public void reportHeartbeat(HeartbeatMsg heartbeat) {
*/
private HeartbeatMsg buildHeartbeatMsg() {
final String agentIp = AgentUtils.fetchLocalIp();
final int agentPort = conf.getInt(AGENT_HTTP_PORT, DEFAULT_AGENT_HTTP_PORT);
final String clusterName = conf.get(AGENT_CLUSTER_NAME);
final String clusterTag = conf.get(AGENT_CLUSTER_TAG);
final String inCharges = conf.get(AGENT_CLUSTER_IN_CHARGES);
final String nodeGroup = conf.get(AGENT_NODE_GROUP);

HeartbeatMsg heartbeatMsg = new HeartbeatMsg();
heartbeatMsg.setIp(agentIp);
heartbeatMsg.setPort(String.valueOf(agentPort));
heartbeatMsg.setComponentType(ComponentTypeEnum.Agent.getType());
heartbeatMsg.setReportTime(System.currentTimeMillis());
if (StringUtils.isNotBlank(clusterName)) {
Expand All @@ -183,7 +179,6 @@ private HeartbeatMsg buildDeadHeartbeatMsg() {
heartbeatMsg.setNodeSrvStatus(NodeSrvStatus.SERVICE_UNINSTALL);
heartbeatMsg.setInCharges(conf.get(AGENT_CLUSTER_IN_CHARGES));
heartbeatMsg.setIp(AgentUtils.fetchLocalIp());
heartbeatMsg.setPort(String.valueOf(conf.getInt(AGENT_HTTP_PORT, DEFAULT_AGENT_HTTP_PORT)));
heartbeatMsg.setComponentType(ComponentTypeEnum.Agent.getType());
heartbeatMsg.setClusterName(conf.get(AGENT_CLUSTER_NAME));
heartbeatMsg.setClusterTag(conf.get(AGENT_CLUSTER_TAG));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class InstanceManager extends AbstractDaemon {
private static final int ACTION_QUEUE_CAPACITY = 100;
public volatile int CORE_THREAD_SLEEP_TIME_MS = 1000;
public static final int INSTANCE_PRINT_INTERVAL_MS = 10000;
public static final long INSTANCE_KEEP_ALIVE_MS = 5 * 60 * 1000;
private long lastPrintTime = 0;
// instance in db
private final InstanceDb instanceDb;
Expand Down Expand Up @@ -240,6 +241,11 @@ private void traverseMemoryTasksToDb() {
if (stateFromDb != InstanceStateEnum.DEFAULT) {
deleteFromMemory(instance.getInstanceId());
}
if (AgentUtils.getCurrentTime() - instance.getLastHeartbeatTime() > INSTANCE_KEEP_ALIVE_MS) {
LOGGER.error("instance heartbeat timeout, id: {}, will be deleted from memory",
instance.getInstanceId());
deleteFromMemory(instance.getInstanceId());
}
});
}

Expand Down Expand Up @@ -391,6 +397,11 @@ private void addToMemory(InstanceProfile instanceProfile) {
}
LOGGER.info("instanceProfile {}", instanceProfile.toJsonStr());
try {
if (instanceMap.size() > instanceLimit) {
LOGGER.info("add instance to memory refused because instanceMap size over limit {}",
instanceProfile.getInstanceId());
return;
}
Class<?> taskClass = Class.forName(instanceProfile.getInstanceClass());
Instance instance = (Instance) taskClass.newInstance();
boolean initSuc = instance.init(this, instanceProfile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class TaskManager extends AbstractDaemon {
public static final int CONFIG_QUEUE_CAPACITY = 1;
public static final int CORE_THREAD_SLEEP_TIME = 1000;
public static final int CORE_THREAD_PRINT_TIME = 10000;
private static final int ACTION_QUEUE_CAPACITY = 100000;
private static final int ACTION_QUEUE_CAPACITY = 1000;
private long lastPrintTime = 0;
// task basic db
private final Db taskBasicDb;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.plugin.Instance;
import org.apache.inlong.agent.utils.AgentUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -66,6 +67,11 @@ public String getInstanceId() {
return profile.getInstanceId();
}

@Override
public long getLastHeartbeatTime() {
return AgentUtils.getCurrentTime();
}

@Override
public void addCallbacks() {

Expand Down
1 change: 0 additions & 1 deletion inlong-agent/agent-docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ ADD ${AGENT_TARBALL} /opt/inlong-agent
RUN cp /usr/share/java/snappy-java.jar lib/snappy-java-*.jar
# add mysql connector
RUN wget -P lib/ https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar
EXPOSE 8008
ENV MANAGER_OPENAPI_IP=127.0.0.1
ENV MANAGER_OPENAPI_PORT=8082
ENV DATAPROXY_IP=127.0.0.1
Expand Down
2 changes: 1 addition & 1 deletion inlong-agent/agent-docker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ docker pull inlong/agent:latest

##### Start Container
```
docker run -d --name agent -p 8008:8008 \
docker run -d --name agent \
-e MANAGER_OPENAPI_IP=manager_opeapi_ip -e DATAPROXY_IP=dataproxy_ip \
-e MANAGER_OPENAPI_AUTH_ID=auth_id -e MANAGER_OPENAPI_AUTH_KEY=auth_key \
-e MANAGER_OPENAPI_PORT=8082 -e DATAPROXY_PORT=46801 inlong/agent
Expand Down
10 changes: 10 additions & 0 deletions inlong-agent/agent-installer/assembly.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<!-- for bin -->
<fileSet>
<directory>environment</directory>
<includes>
<include>*.*</include>
</includes>
<fileMode>0755</fileMode>
<outputDirectory>environment</outputDirectory>
<lineEnding>unix</lineEnding>
</fileSet>

<fileSet>
<directory>bin</directory>
<includes>
Expand Down
20 changes: 20 additions & 0 deletions inlong-agent/agent-installer/environment/init.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# This script is used for environment initialization and can be modified according to different environments
echo "Begin initializing the environment"
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public abstract class CommonInstance extends Instance {
private volatile boolean running = false;
private volatile boolean inited = false;
private volatile int checkFinishCount = 0;
private int heartbeatcheckCount = 0;
private int heartbeatCheckCount = 0;
private long heartBeatStartTime = AgentUtils.getCurrentTime();
protected long auditVersion;

Expand All @@ -72,15 +72,15 @@ public boolean init(Object srcManager, InstanceProfile srcProfile) {
profile.getInstanceId(), profile.toJsonStr());
source = (Source) Class.forName(profile.getSourceClass()).newInstance();
source.init(profile);
source.start();
sink = (Sink) Class.forName(profile.getSinkClass()).newInstance();
sink.init(profile);
inited = true;
return true;
} catch (Throwable e) {
handleSourceDeleted();
handleDeleted();
doChangeState(State.FATAL);
LOGGER.error("init instance {} for task {} failed", profile.getInstanceId(), profile.getInstanceId(), e);
LOGGER.error("init instance {} for task {} failed", profile.getInstanceId(), profile.getInstanceId(),
e);
ThreadUtils.threadThrowableHandler(Thread.currentThread(), e);
return false;
}
Expand Down Expand Up @@ -117,10 +117,17 @@ public void run() {
}

private void doRun() {
source.start();
while (!isFinished()) {
if (!source.sourceExist()) {
handleSourceDeleted();
break;
if (handleDeleted()) {
break;
} else {
LOGGER.error("instance manager action queue is full: taskId {}",
instanceManager.getTaskId());
AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
continue;
}
}
Message msg = source.read();
if (msg == null) {
Expand All @@ -144,8 +151,8 @@ private void doRun() {
AgentUtils.silenceSleepInMs(WRITE_FAILED_WAIT_TIME_MS);
}
}
heartbeatcheckCount++;
if (heartbeatcheckCount > HEARTBEAT_CHECK_GAP) {
heartbeatCheckCount++;
if (heartbeatCheckCount > HEARTBEAT_CHECK_GAP) {
heartbeatStatic();
}
}
Expand All @@ -156,7 +163,7 @@ private void heartbeatStatic() {
if (AgentUtils.getCurrentTime() - heartBeatStartTime > TimeUnit.SECONDS.toMillis(1)) {
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_HEARTBEAT, profile.getInlongGroupId(),
profile.getInlongStreamId(), AgentUtils.getCurrentTime(), 1, 1, auditVersion);
heartbeatcheckCount = 0;
heartbeatCheckCount = 0;
heartBeatStartTime = AgentUtils.getCurrentTime();
}
}
Expand All @@ -169,16 +176,17 @@ private void handleReadEnd() {
}
}

private void handleSourceDeleted() {
private boolean handleDeleted() {
OffsetManager.getInstance().deleteOffset(getTaskId(), getInstanceId());
profile.setState(InstanceStateEnum.DELETE);
profile.setModifyTime(AgentUtils.getCurrentTime());
InstanceAction action = new InstanceAction(ActionType.DELETE, profile);
while (!isFinished() && !instanceManager.submitAction(action)) {
LOGGER.error("instance manager action queue is full: taskId {}",
instanceManager.getTaskId());
AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
}
return instanceManager.submitAction(action);
}

@Override
public long getLastHeartbeatTime() {
return heartBeatStartTime;
}

@Override
Expand Down
4 changes: 0 additions & 4 deletions inlong-agent/conf/agent.properties
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@
########################
# bdb data readonly
agent.localStore.readonly=false
# whether enable http service
agent.http.enable=true
# http default port
agent.http.port=8008

######################
# fetch center
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ private void checkRemoteConfig() {
try {
String managerHosts = configManager.getProperties(DEFAULT_CONFIG_PROPERTIES).get("manager.hosts");
String proxyClusterTag = configManager.getProperties(DEFAULT_CONFIG_PROPERTIES)
.get("proxy.cluster.tag");
.get("default.mq.cluster.tag");
LOG.info("manager url: {}", managerHosts);
String[] hostList = StringUtils.split(managerHosts, ",");
for (String host : hostList) {
Expand Down
2 changes: 1 addition & 1 deletion inlong-audit/audit-docker/audit-docker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ service_conf_file=${file_path}/conf/audit-service.properties

# replace the configuration for audit-proxy
sed -i "s/manager.hosts=.*$/manager.hosts=${MANAGER_OPENAPI_IP}:${MANAGER_OPENAPI_PORT}/g" "${store_conf_file}"
sed -i "s/proxy.cluster.tag=.*$/proxy.cluster.tag=${CLUSTER_TAG}/g" "${store_conf_file}"
sed -i "s/default.mq.cluster.tag=.*$/default.mq.cluster.tag=${CLUSTER_TAG}/g" "${store_conf_file}"
if [ "${MQ_TYPE}" = "pulsar" ]; then
sed -i "s/audit.config.proxy.type=.*$/audit.config.proxy.type=pulsar"/g "${store_conf_file}"
sed -i "s/audit.pulsar.topic = .*$/audit.pulsar.topic = ${PULSAR_AUDIT_TOPIC}/g" "${store_conf_file}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ private List<MQInfo> getClusterFromManager() {
try (InputStream inputStream = getClass().getClassLoader().getResourceAsStream(DEFAULT_CONFIG_PROPERTIES)) {
properties.load(inputStream);
String managerHosts = properties.getProperty("manager.hosts");
String clusterTag = properties.getProperty("proxy.cluster.tag");
String clusterTag = properties.getProperty("default.mq.cluster.tag");
String[] hostList = StringUtils.split(managerHosts, ",");
for (String host : hostList) {
while (true) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ mybatis.type-aliases-package=org.apache.inlong.audit.db.entities
audit.config.file.check.enable=false
audit.config.manager.server.url=http://127.0.0.1:8000

# store.server: elasticsearch / mysql
audit.config.store.mode=elasticsearch
# Supports common JDBC protocol
audit.config.store.mode=jdbc

# proxy.type: pulsar / tube / kafka
audit.config.proxy.type=pulsar
Expand Down
6 changes: 4 additions & 2 deletions inlong-audit/conf/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,17 @@
# specific language governing permissions and limitations
# under the License.
#
# proxy.type: pulsar / tube / kafka
# the MQ type for audit proxy: pulsar / kafka
audit.config.proxy.type=pulsar

# Supports common JDBC protocol
audit.config.store.mode=jdbc

# manger config
manager.hosts=127.0.0.1:8083
proxy.cluster.tag=default_cluster

# Kafka or Pulsar cluster tag
default.mq.cluster.tag=default_cluster

# pulsar config
audit.pulsar.topic=persistent://public/default/inlong-audit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,18 @@
import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
import org.apache.inlong.common.pojo.sort.mq.MqClusterConfig;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;
import java.util.List;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class SortClusterConfig implements Serializable {

private String clusterTag;
Expand Down
Loading

0 comments on commit 667bc10

Please sign in to comment.