From 632aa88e448de9acb4cd575e6bc57c10615aef4d Mon Sep 17 00:00:00 2001 From: justinwwhuang Date: Mon, 13 May 2024 10:18:40 +0800 Subject: [PATCH 01/15] [INLONG-10187][Agent] Handle situations where time offset is empty (#10188) --- .../java/org/apache/inlong/agent/pojo/TaskProfileDto.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java index 374b7331d4e..d0b378dd47d 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java @@ -97,6 +97,8 @@ public class TaskProfileDto { private static final Gson GSON = new Gson(); + public static final String deafult_time_offset = "0"; + private Task task; private Proxy proxy; @@ -157,9 +159,10 @@ private static FileTask getFileTask(DataConfig dataConfig) { fileTask.setCycleUnit(taskConfig.getCycleUnit()); fileTask.setStartTime(taskConfig.getStartTime()); fileTask.setEndTime(taskConfig.getEndTime()); - fileTask.setProperties(GSON.toJson(taskConfig.getProperties())); if (taskConfig.getTimeOffset() != null) { fileTask.setTimeOffset(taskConfig.getTimeOffset()); + } else { + fileTask.setTimeOffset(deafult_time_offset + fileTask.getCycleUnit()); } if (taskConfig.getAdditionalAttr() != null) { From 808cf799c8ac90737de8a924ad59f842ce048925 Mon Sep 17 00:00:00 2001 From: justinwwhuang Date: Mon, 13 May 2024 12:22:19 +0800 Subject: [PATCH 02/15] [INLONG-10091][Agent] Delete the useless code related to 8008 listen port (#10192) * [INLONG-10091][Agent] Delete useless code * [INLONG-10091][Agent] Delete useless code * [INLONG-10091][Agent] Delete useless code * [INLONG-10091][Agent] Delete useless code * [INLONG-10091][Agent] Delete useless code --- docker/docker-compose/docker-compose.yml | 2 -- .../org/apache/inlong/agent/constant/AgentConstants.java | 2 -- .../java/org/apache/inlong/agent/core/HeartbeatManager.java | 5 ----- inlong-agent/agent-docker/Dockerfile | 1 - inlong-agent/agent-docker/README.md | 2 +- inlong-agent/conf/agent.properties | 4 ---- 6 files changed, 1 insertion(+), 15 deletions(-) diff --git a/docker/docker-compose/docker-compose.yml b/docker/docker-compose/docker-compose.yml index 2c941cdcd08..9434fdb6aaf 100644 --- a/docker/docker-compose/docker-compose.yml +++ b/docker/docker-compose/docker-compose.yml @@ -99,8 +99,6 @@ services: - DATAPROXY_IP=dataproxy - DATAPROXY_PORT=46801 - AUDIT_PROXY_URL=audit:10081 - ports: - - "8008:8008" volumes: - ./collect-data:/data/collect-data diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java index 36c5423ca3c..0d67e55f820 100755 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java @@ -39,8 +39,6 @@ public class AgentConstants { public static final String AGENT_FETCHER_CLASSNAME = "agent.fetcher.classname"; public static final String AGENT_CONF_PARENT = "agent.conf.parent"; public static final String DEFAULT_AGENT_CONF_PARENT = "conf"; - public static final String AGENT_HTTP_PORT = "agent.http.port"; - public static final int DEFAULT_AGENT_HTTP_PORT = 8008; public static final String CHANNEL_MEMORY_CAPACITY = "channel.memory.capacity"; public static final int DEFAULT_CHANNEL_MEMORY_CAPACITY = 2000; public static final String JOB_NUMBER_LIMIT = "job.number.limit"; diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java index f37a13fa933..d9879586f89 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java @@ -39,9 +39,7 @@ import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_IN_CHARGES; import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME; import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_TAG; -import static org.apache.inlong.agent.constant.AgentConstants.AGENT_HTTP_PORT; import static org.apache.inlong.agent.constant.AgentConstants.AGENT_NODE_GROUP; -import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AGENT_HTTP_PORT; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_HEARTBEAT_HTTP_PATH; import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_HEARTBEAT_HTTP_PATH; @@ -148,7 +146,6 @@ public void reportHeartbeat(HeartbeatMsg heartbeat) { */ private HeartbeatMsg buildHeartbeatMsg() { final String agentIp = AgentUtils.fetchLocalIp(); - final int agentPort = conf.getInt(AGENT_HTTP_PORT, DEFAULT_AGENT_HTTP_PORT); final String clusterName = conf.get(AGENT_CLUSTER_NAME); final String clusterTag = conf.get(AGENT_CLUSTER_TAG); final String inCharges = conf.get(AGENT_CLUSTER_IN_CHARGES); @@ -156,7 +153,6 @@ private HeartbeatMsg buildHeartbeatMsg() { HeartbeatMsg heartbeatMsg = new HeartbeatMsg(); heartbeatMsg.setIp(agentIp); - heartbeatMsg.setPort(String.valueOf(agentPort)); heartbeatMsg.setComponentType(ComponentTypeEnum.Agent.getType()); heartbeatMsg.setReportTime(System.currentTimeMillis()); if (StringUtils.isNotBlank(clusterName)) { @@ -183,7 +179,6 @@ private HeartbeatMsg buildDeadHeartbeatMsg() { heartbeatMsg.setNodeSrvStatus(NodeSrvStatus.SERVICE_UNINSTALL); heartbeatMsg.setInCharges(conf.get(AGENT_CLUSTER_IN_CHARGES)); heartbeatMsg.setIp(AgentUtils.fetchLocalIp()); - heartbeatMsg.setPort(String.valueOf(conf.getInt(AGENT_HTTP_PORT, DEFAULT_AGENT_HTTP_PORT))); heartbeatMsg.setComponentType(ComponentTypeEnum.Agent.getType()); heartbeatMsg.setClusterName(conf.get(AGENT_CLUSTER_NAME)); heartbeatMsg.setClusterTag(conf.get(AGENT_CLUSTER_TAG)); diff --git a/inlong-agent/agent-docker/Dockerfile b/inlong-agent/agent-docker/Dockerfile index 9080b660ff4..edb9700581c 100644 --- a/inlong-agent/agent-docker/Dockerfile +++ b/inlong-agent/agent-docker/Dockerfile @@ -27,7 +27,6 @@ ADD ${AGENT_TARBALL} /opt/inlong-agent RUN cp /usr/share/java/snappy-java.jar lib/snappy-java-*.jar # add mysql connector RUN wget -P lib/ https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar -EXPOSE 8008 ENV MANAGER_OPENAPI_IP=127.0.0.1 ENV MANAGER_OPENAPI_PORT=8082 ENV DATAPROXY_IP=127.0.0.1 diff --git a/inlong-agent/agent-docker/README.md b/inlong-agent/agent-docker/README.md index 5cf1d9d5356..8ae753cb5b5 100644 --- a/inlong-agent/agent-docker/README.md +++ b/inlong-agent/agent-docker/README.md @@ -8,7 +8,7 @@ docker pull inlong/agent:latest ##### Start Container ``` -docker run -d --name agent -p 8008:8008 \ +docker run -d --name agent \ -e MANAGER_OPENAPI_IP=manager_opeapi_ip -e DATAPROXY_IP=dataproxy_ip \ -e MANAGER_OPENAPI_AUTH_ID=auth_id -e MANAGER_OPENAPI_AUTH_KEY=auth_key \ -e MANAGER_OPENAPI_PORT=8082 -e DATAPROXY_PORT=46801 inlong/agent diff --git a/inlong-agent/conf/agent.properties b/inlong-agent/conf/agent.properties index 3a2c4631b21..86c2e3165ac 100755 --- a/inlong-agent/conf/agent.properties +++ b/inlong-agent/conf/agent.properties @@ -22,10 +22,6 @@ ######################## # bdb data readonly agent.localStore.readonly=false -# whether enable http service -agent.http.enable=true -# http default port -agent.http.port=8008 ###################### # fetch center From bbd29d961e556813c31560a8641ce423ef5420f3 Mon Sep 17 00:00:00 2001 From: fuweng11 <76141879+fuweng11@users.noreply.github.com> Date: Mon, 13 May 2024 12:33:47 +0800 Subject: [PATCH 03/15] [INLONG-10197][Manager] Support OpenAPI for querying audit data (#10199) --- .../controller/openapi/AuditController.java | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AuditController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AuditController.java index 4b44b1f73b9..c874c37bbe3 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AuditController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AuditController.java @@ -19,8 +19,11 @@ import org.apache.inlong.common.pojo.audit.AuditConfig; import org.apache.inlong.common.pojo.audit.AuditConfigRequest; +import org.apache.inlong.manager.pojo.audit.AuditRequest; +import org.apache.inlong.manager.pojo.audit.AuditVO; import org.apache.inlong.manager.pojo.common.Response; import org.apache.inlong.manager.service.cluster.InlongClusterService; +import org.apache.inlong.manager.service.core.AuditService; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; @@ -31,6 +34,10 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; +import javax.validation.Valid; + +import java.util.List; + /** * Audit controller. */ @@ -41,6 +48,8 @@ public class AuditController { @Autowired private InlongClusterService clusterService; + @Autowired + private AuditService auditService; @PostMapping("/audit/getConfig") @ApiOperation(value = "Get mq config list") @@ -51,4 +60,16 @@ public Response getConfig(@RequestBody AuditConfigRequest request) } return Response.success(auditConfig); } + + @PostMapping(value = "/audit/list") + @ApiOperation(value = "Query audit list according to conditions") + public Response> listByCondition(@Valid @RequestBody AuditRequest request) throws Exception { + return Response.success(auditService.listByCondition(request)); + } + + @PostMapping(value = "/audit/listAll") + @ApiOperation(value = "Query all audit list according to conditions") + public Response> listAll(@Valid @RequestBody AuditRequest request) throws Exception { + return Response.success(auditService.listAll(request)); + } } From cdf81cd7915a2b60cc4882dd7b487c10c7d1b199 Mon Sep 17 00:00:00 2001 From: fuweng11 <76141879+fuweng11@users.noreply.github.com> Date: Mon, 13 May 2024 14:03:30 +0800 Subject: [PATCH 04/15] [INLONG-10200][Manager] Define module type mapping relationships (#10203) --- .../manager/common/enums/ModuleType.java | 26 +++++++++++++++++-- .../service/core/impl/AgentServiceImpl.java | 6 +---- .../main/resources/application-dev.properties | 2 -- .../resources/application-prod.properties | 2 -- .../resources/application-test.properties | 2 -- 5 files changed, 25 insertions(+), 13 deletions(-) diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ModuleType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ModuleType.java index 8001b20dca6..debe72217a4 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ModuleType.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ModuleType.java @@ -17,12 +17,34 @@ package org.apache.inlong.manager.common.enums; +import java.util.Objects; + /** * Constant of module type. */ public enum ModuleType { - AGENT, - INSTALLER + AGENT(1), + INSTALLER(2), + UNKNOWN(3); + + final int moduleId; + + ModuleType(int moduleId) { + this.moduleId = moduleId; + } + + public int getModuleId() { + return moduleId; + } + + public static ModuleType forType(String type) { + for (ModuleType moduleType : ModuleType.values()) { + if (Objects.equals(moduleType.name(), type)) { + return moduleType; + } + } + return UNKNOWN; + } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java index 09729662468..0deade816e7 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java @@ -103,7 +103,6 @@ import java.util.HashSet; import java.util.List; import java.util.Locale; -import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; @@ -163,9 +162,6 @@ public class AgentServiceImpl implements AgentService { @Value("${add.task.retention.days:7}") private Integer retentionDays; - @Value("#{${module.name.map:{'agent':1}}}") - private Map moduleNameIdMap = new HashMap<>(); - @Autowired private StreamSourceEntityMapper sourceMapper; @Autowired @@ -860,7 +856,7 @@ private List getModuleConfigs(AgentClusterNodeDTO dto) { for (Integer moduleId : moduleIdList) { ModuleConfigEntity moduleConfigEntity = moduleConfigEntityMapper.selectByPrimaryKey(moduleId); ModuleConfig moduleConfig = CommonBeanUtils.copyProperties(moduleConfigEntity, ModuleConfig::new); - moduleConfig.setId(moduleNameIdMap.getOrDefault(moduleConfigEntity.getName(), 1)); + moduleConfig.setId(ModuleType.forType(moduleConfigEntity.getType()).getModuleId()); PackageConfigEntity packageConfigEntity = packageConfigEntityMapper.selectByPrimaryKey(moduleConfigEntity.getPackageId()); moduleConfig diff --git a/inlong-manager/manager-web/src/main/resources/application-dev.properties b/inlong-manager/manager-web/src/main/resources/application-dev.properties index aef5a4ad37c..7e1be40a037 100644 --- a/inlong-manager/manager-web/src/main/resources/application-dev.properties +++ b/inlong-manager/manager-web/src/main/resources/application-dev.properties @@ -100,6 +100,4 @@ metrics.audit.proxy.hosts=127.0.0.1:10081 # tencent cloud log service endpoint, The Operator cls resource by it cls.manager.endpoint=127.0.0.1 -# The mapping relationship between module name and module id -module.name.map={'inlong-agent':1,'agent-installer':2,'temp-cmd':3} diff --git a/inlong-manager/manager-web/src/main/resources/application-prod.properties b/inlong-manager/manager-web/src/main/resources/application-prod.properties index 2bcf25316bf..8ff532fdf86 100644 --- a/inlong-manager/manager-web/src/main/resources/application-prod.properties +++ b/inlong-manager/manager-web/src/main/resources/application-prod.properties @@ -99,5 +99,3 @@ metrics.audit.proxy.hosts=127.0.0.1:10081 # tencent cloud log service endpoint, The Operator cls resource by it cls.manager.endpoint=127.0.0.1 -# The mapping relationship between module name and module id -module.name.map={'inlong-agent':1,'agent-installer':2,'temp-cmd':3} diff --git a/inlong-manager/manager-web/src/main/resources/application-test.properties b/inlong-manager/manager-web/src/main/resources/application-test.properties index 4b12742c735..29c18d2826b 100644 --- a/inlong-manager/manager-web/src/main/resources/application-test.properties +++ b/inlong-manager/manager-web/src/main/resources/application-test.properties @@ -100,5 +100,3 @@ metrics.audit.proxy.hosts=127.0.0.1:10081 # tencent cloud log service endpoint, The Operator cls resource by it cls.manager.endpoint=127.0.0.1 -# The mapping relationship between module name and module id -module.name.map={'inlong-agent':1,'agent-installer':2,'temp-cmd':3} From 50fc3024b6867e67c376fa0d42cd20cf09922720 Mon Sep 17 00:00:00 2001 From: Charles Zhang Date: Mon, 13 May 2024 14:04:41 +0800 Subject: [PATCH 05/15] [INLONG-10205][Script] Add the metrics.audit.proxy.hosts for standalone deployment (#10206) --- bin/init-config.sh | 2 ++ .../src/test/resources/application-test.properties | 4 ++-- .../src/main/resources/application-dev.properties | 7 ++++--- .../src/main/resources/application-prod.properties | 7 ++++--- .../src/main/resources/application-test.properties | 7 ++++--- 5 files changed, 16 insertions(+), 11 deletions(-) diff --git a/bin/init-config.sh b/bin/init-config.sh index 018b22a79b8..057b0cd6889 100644 --- a/bin/init-config.sh +++ b/bin/init-config.sh @@ -96,12 +96,14 @@ init_inlong_manager() { $SED_COMMAND 's#jdbc:mysql://.*apache_inlong_manager#'''jdbc:mysql://${spring_datasource_hostname}:${spring_datasource_port}/apache_inlong_manager'''#g' application-dev.properties $SED_COMMAND 's/spring.datasource.druid.username=.*/'''spring.datasource.druid.username=${spring_datasource_username}'''/g' application-dev.properties $SED_COMMAND 's/spring.datasource.druid.password=.*/'''spring.datasource.druid.password=${spring_datasource_password}'''/g' application-dev.properties + $SED_COMMAND 's/metrics.audit.proxy.hosts=.*/'''metrics.audit.proxy.hosts=${audit_proxy_ip}:${audit_proxy_port}'''/g' application-dev.properties $SED_COMMAND 's/audit.query.url=.*/'''audit.query.url=${audit_service_ip}:${audit_service_port}'''/g' application-dev.properties fi if [ $spring_profiles_active == "prod" ]; then $SED_COMMAND 's#jdbc:mysql://.*apache_inlong_manager#'''jdbc:mysql://${spring_datasource_hostname}:${spring_datasource_port}/apache_inlong_manager'''#g' application-prod.properties $SED_COMMAND 's/spring.datasource.druid.username=.*/'''spring.datasource.druid.username=${spring_datasource_username}'''/g' application-prod.properties $SED_COMMAND 's/spring.datasource.druid.password=.*/'''spring.datasource.druid.password=${spring_datasource_password}'''/g' application-prod.properties + $SED_COMMAND 's/metrics.audit.proxy.hosts=.*/'''metrics.audit.proxy.hosts=${audit_proxy_ip}:${audit_proxy_port}'''/g' application-prod.properties $SED_COMMAND 's/audit.query.url=.*/'''audit.query.url=${audit_service_ip}:${audit_service_port}'''/g' application-prod.properties fi echo "Init inlong manager flink plugin configuration" diff --git a/inlong-audit/audit-store/src/test/resources/application-test.properties b/inlong-audit/audit-store/src/test/resources/application-test.properties index 2848b93c631..c04553bd827 100644 --- a/inlong-audit/audit-store/src/test/resources/application-test.properties +++ b/inlong-audit/audit-store/src/test/resources/application-test.properties @@ -47,8 +47,8 @@ mybatis.type-aliases-package=org.apache.inlong.audit.db.entities audit.config.file.check.enable=false audit.config.manager.server.url=http://127.0.0.1:8000 -# store.server: elasticsearch / mysql -audit.config.store.mode=elasticsearch +# Supports common JDBC protocol +audit.config.store.mode=jdbc # proxy.type: pulsar / tube / kafka audit.config.proxy.type=pulsar diff --git a/inlong-manager/manager-web/src/main/resources/application-dev.properties b/inlong-manager/manager-web/src/main/resources/application-dev.properties index 7e1be40a037..d5d7c047da7 100644 --- a/inlong-manager/manager-web/src/main/resources/application-dev.properties +++ b/inlong-manager/manager-web/src/main/resources/application-dev.properties @@ -20,11 +20,11 @@ # Log level logging.level.root=info logging.level.org.apache.inlong.manager=debug - +# Database configuration spring.datasource.druid.url=jdbc:mysql://127.0.0.1:3306/apache_inlong_manager?useSSL=false&allowPublicKeyRetrieval=true&characterEncoding=UTF-8&nullCatalogMeansCurrent=true&serverTimezone=GMT%2b8 spring.datasource.druid.username=root spring.datasource.druid.password=inlong -# datasource config, set org.postgresql.Driver if using PostgreSQL +# Database config, set org.postgresql.Driver if using PostgreSQL spring.datasource.druid.driver-class-name=com.mysql.cj.jdbc.Driver spring.datasource.druid.validationQuery=SELECT 'x' # Initialization size, minimum, maximum @@ -95,9 +95,10 @@ group.deleted.batch.size=100 # If turned on, the groups could be deleted periodically. group.deleted.enabled=false +# Audit Proxy Address metrics.audit.proxy.hosts=127.0.0.1:10081 -# tencent cloud log service endpoint, The Operator cls resource by it +# Tencent cloud log service endpoint, The Operator cls resource by it cls.manager.endpoint=127.0.0.1 diff --git a/inlong-manager/manager-web/src/main/resources/application-prod.properties b/inlong-manager/manager-web/src/main/resources/application-prod.properties index 8ff532fdf86..2f8f0c4aa72 100644 --- a/inlong-manager/manager-web/src/main/resources/application-prod.properties +++ b/inlong-manager/manager-web/src/main/resources/application-prod.properties @@ -19,11 +19,11 @@ # Log level logging.level.root=info - +# Database configuration spring.datasource.druid.url=jdbc:mysql://127.0.0.1:3306/apache_inlong_manager?useSSL=false&allowPublicKeyRetrieval=true&characterEncoding=UTF-8&nullCatalogMeansCurrent=true&serverTimezone=GMT%2b8 spring.datasource.druid.username=root spring.datasource.druid.password=inlong -# datasource config, set org.postgresql.Driver if using PostgreSQL +# Database config, set org.postgresql.Driver if using PostgreSQL spring.datasource.druid.driver-class-name=com.mysql.cj.jdbc.Driver spring.datasource.druid.validationQuery=SELECT 'x' # Initialization size, minimum, maximum @@ -94,8 +94,9 @@ group.deleted.batch.size=100 # If turned on, the groups could be deleted periodically. group.deleted.enabled=false +# Audit Proxy Address metrics.audit.proxy.hosts=127.0.0.1:10081 -# tencent cloud log service endpoint, The Operator cls resource by it +# Tencent cloud log service endpoint, The Operator cls resource by it cls.manager.endpoint=127.0.0.1 diff --git a/inlong-manager/manager-web/src/main/resources/application-test.properties b/inlong-manager/manager-web/src/main/resources/application-test.properties index 29c18d2826b..a8aedfa67d3 100644 --- a/inlong-manager/manager-web/src/main/resources/application-test.properties +++ b/inlong-manager/manager-web/src/main/resources/application-test.properties @@ -20,11 +20,11 @@ # Log level logging.level.root=info logging.level.org.apache.inlong.manager=debug - +# Database configuration spring.datasource.druid.url=jdbc:mysql://127.0.0.1:3306/apache_inlong_manager?useSSL=false&allowPublicKeyRetrieval=true&characterEncoding=UTF-8&nullCatalogMeansCurrent=true&serverTimezone=GMT%2b8 spring.datasource.druid.username=root spring.datasource.druid.password=inlong -# datasource config, set org.postgresql.Driver if using PostgreSQL +# Database config, set org.postgresql.Driver if using PostgreSQL spring.datasource.druid.driver-class-name=com.mysql.cj.jdbc.Driver spring.datasource.druid.validationQuery=SELECT 'x' # Initialization size, minimum, maximum @@ -95,8 +95,9 @@ group.deleted.batch.size=100 # If turned on, the groups could be deleted periodically. group.deleted.enabled=false +# Audit Proxy Address metrics.audit.proxy.hosts=127.0.0.1:10081 -# tencent cloud log service endpoint, The Operator cls resource by it +# Tencent cloud log service endpoint, The Operator cls resource by it cls.manager.endpoint=127.0.0.1 From f91542b75a2480fbe3377ef2affa1e4359262c0a Mon Sep 17 00:00:00 2001 From: wohainilaodou <165994047+wohainilaodou@users.noreply.github.com> Date: Mon, 13 May 2024 16:55:15 +0800 Subject: [PATCH 06/15] [INLONG-10179][DashBoard] Remove the All types In cluster management (#10209) Co-authored-by: v_shuomqiu --- .../src/plugins/clusters/common/ClusterDefaultInfo.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/inlong-dashboard/src/plugins/clusters/common/ClusterDefaultInfo.ts b/inlong-dashboard/src/plugins/clusters/common/ClusterDefaultInfo.ts index e8be5de31bd..a3e3b678c86 100644 --- a/inlong-dashboard/src/plugins/clusters/common/ClusterDefaultInfo.ts +++ b/inlong-dashboard/src/plugins/clusters/common/ClusterDefaultInfo.ts @@ -52,7 +52,7 @@ export class ClusterDefaultInfo implements DataWithBackend, RenderRow, RenderLis props: values => ({ disabled: Boolean(values.id), options: clusters - .filter(item => item.value !== 'DATAPROXY') + .filter(item => item.value !== 'DATAPROXY' && item.value !== '') .map(item => ({ label: item.label, value: item.value, From 1fa64267e34e8404cce467e7a7a754b3e9c3cd5f Mon Sep 17 00:00:00 2001 From: castor <58140421+castorqin@users.noreply.github.com> Date: Mon, 13 May 2024 16:58:34 +0800 Subject: [PATCH 07/15] [INLONG-10204][Manager] Kafka sink supports automatic allocation of sort standalone cluster (#10207) Co-authored-by: castorqin --- .../service/resource/sink/kafka/KafkaResourceOperator.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/kafka/KafkaResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/kafka/KafkaResourceOperator.java index 0e89b9867ba..dac4a22efeb 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/kafka/KafkaResourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/kafka/KafkaResourceOperator.java @@ -24,7 +24,7 @@ import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.pojo.sink.SinkInfo; import org.apache.inlong.manager.pojo.sink.kafka.KafkaSinkDTO; -import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator; +import org.apache.inlong.manager.service.resource.sink.AbstractStandaloneSinkResourceOperator; import org.apache.inlong.manager.service.sink.StreamSinkService; import org.apache.kafka.clients.admin.Admin; @@ -47,7 +47,7 @@ * Kafka resource operator for creating Kafka topic */ @Service -public class KafkaResourceOperator implements SinkResourceOperator { +public class KafkaResourceOperator extends AbstractStandaloneSinkResourceOperator { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaResourceOperator.class); @@ -76,7 +76,6 @@ public void createSinkResource(SinkInfo sinkInfo) { new NewTopic(topicName, Optional.of(partitionNum), Optional.empty()))); result.values().get(topicName).get(); } - sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_SUCCESSFUL.getCode(), "create kafka topic success"); LOGGER.info("success to create kafka topic [{}] for sinkInfo={}", topicName, sinkInfo); @@ -85,6 +84,7 @@ public void createSinkResource(SinkInfo sinkInfo) { sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_FAILED.getCode(), e.getMessage()); throw new WorkflowException("create kafka topic failed, reason: " + e.getMessage()); } + this.assignCluster(sinkInfo); } /** From 06d5e370ec454909517b5f42787c716c668cf7c5 Mon Sep 17 00:00:00 2001 From: haifxu Date: Mon, 13 May 2024 17:01:55 +0800 Subject: [PATCH 08/15] [INLONG-10201][Audit] Renamed configuration variables for clarity (#10202) --- .../java/org/apache/inlong/audit/file/ConfigManager.java | 2 +- inlong-audit/audit-docker/audit-docker.sh | 2 +- .../apache/inlong/audit/service/AuditMsgConsumerServer.java | 2 +- inlong-audit/conf/application.properties | 6 ++++-- 4 files changed, 7 insertions(+), 5 deletions(-) diff --git a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/ConfigManager.java b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/ConfigManager.java index 2f19dc2754f..2847836da26 100644 --- a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/ConfigManager.java +++ b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/ConfigManager.java @@ -247,7 +247,7 @@ private void checkRemoteConfig() { try { String managerHosts = configManager.getProperties(DEFAULT_CONFIG_PROPERTIES).get("manager.hosts"); String proxyClusterTag = configManager.getProperties(DEFAULT_CONFIG_PROPERTIES) - .get("proxy.cluster.tag"); + .get("default.mq.cluster.tag"); LOG.info("manager url: {}", managerHosts); String[] hostList = StringUtils.split(managerHosts, ","); for (String host : hostList) { diff --git a/inlong-audit/audit-docker/audit-docker.sh b/inlong-audit/audit-docker/audit-docker.sh index cdc6c103cfa..f8c19973e17 100755 --- a/inlong-audit/audit-docker/audit-docker.sh +++ b/inlong-audit/audit-docker/audit-docker.sh @@ -32,7 +32,7 @@ service_conf_file=${file_path}/conf/audit-service.properties # replace the configuration for audit-proxy sed -i "s/manager.hosts=.*$/manager.hosts=${MANAGER_OPENAPI_IP}:${MANAGER_OPENAPI_PORT}/g" "${store_conf_file}" -sed -i "s/proxy.cluster.tag=.*$/proxy.cluster.tag=${CLUSTER_TAG}/g" "${store_conf_file}" +sed -i "s/default.mq.cluster.tag=.*$/default.mq.cluster.tag=${CLUSTER_TAG}/g" "${store_conf_file}" if [ "${MQ_TYPE}" = "pulsar" ]; then sed -i "s/audit.config.proxy.type=.*$/audit.config.proxy.type=pulsar"/g "${store_conf_file}" sed -i "s/audit.pulsar.topic = .*$/audit.pulsar.topic = ${PULSAR_AUDIT_TOPIC}/g" "${store_conf_file}" diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java index bf32271a690..acd43a7ed6d 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java @@ -121,7 +121,7 @@ private List getClusterFromManager() { try (InputStream inputStream = getClass().getClassLoader().getResourceAsStream(DEFAULT_CONFIG_PROPERTIES)) { properties.load(inputStream); String managerHosts = properties.getProperty("manager.hosts"); - String clusterTag = properties.getProperty("proxy.cluster.tag"); + String clusterTag = properties.getProperty("default.mq.cluster.tag"); String[] hostList = StringUtils.split(managerHosts, ","); for (String host : hostList) { while (true) { diff --git a/inlong-audit/conf/application.properties b/inlong-audit/conf/application.properties index 17344fc6b79..e3843b1161b 100644 --- a/inlong-audit/conf/application.properties +++ b/inlong-audit/conf/application.properties @@ -16,7 +16,7 @@ # specific language governing permissions and limitations # under the License. # -# proxy.type: pulsar / tube / kafka +# the MQ type for audit proxy: pulsar / kafka audit.config.proxy.type=pulsar # Supports common JDBC protocol @@ -24,7 +24,9 @@ audit.config.store.mode=jdbc # manger config manager.hosts=127.0.0.1:8083 -proxy.cluster.tag=default_cluster + +# Kafka or Pulsar cluster tag +default.mq.cluster.tag=default_cluster # pulsar config audit.pulsar.topic=persistent://public/default/inlong-audit From 61d2b74ce1c31e5b00ac8abeb80d47e6e7ee6da5 Mon Sep 17 00:00:00 2001 From: justinwwhuang Date: Mon, 13 May 2024 17:08:37 +0800 Subject: [PATCH 09/15] [INLONG-10189][Agent] Handling SDK initialization exceptions (#10190) Co-authored-by: AloysZhang --- .../apache/inlong/agent/plugin/Instance.java | 2 + .../agent/core/instance/InstanceManager.java | 11 ++++++ .../inlong/agent/core/task/TaskManager.java | 2 +- .../agent/core/instance/MockInstance.java | 6 +++ .../agent/plugin/instance/CommonInstance.java | 38 +++++++++++-------- 5 files changed, 43 insertions(+), 16 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Instance.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Instance.java index 990d7e60b26..0d43587f6eb 100755 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Instance.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Instance.java @@ -53,4 +53,6 @@ public abstract class Instance extends AbstractStateWrapper { * get instance id */ public abstract String getInstanceId(); + + public abstract long getLastHeartbeatTime(); } diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java index d388f2293e8..25ce136ddf1 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java @@ -54,6 +54,7 @@ public class InstanceManager extends AbstractDaemon { private static final int ACTION_QUEUE_CAPACITY = 100; public volatile int CORE_THREAD_SLEEP_TIME_MS = 1000; public static final int INSTANCE_PRINT_INTERVAL_MS = 10000; + public static final long INSTANCE_KEEP_ALIVE_MS = 5 * 60 * 1000; private long lastPrintTime = 0; // instance in db private final InstanceDb instanceDb; @@ -240,6 +241,11 @@ private void traverseMemoryTasksToDb() { if (stateFromDb != InstanceStateEnum.DEFAULT) { deleteFromMemory(instance.getInstanceId()); } + if (AgentUtils.getCurrentTime() - instance.getLastHeartbeatTime() > INSTANCE_KEEP_ALIVE_MS) { + LOGGER.error("instance heartbeat timeout, id: {}, will be deleted from memory", + instance.getInstanceId()); + deleteFromMemory(instance.getInstanceId()); + } }); } @@ -391,6 +397,11 @@ private void addToMemory(InstanceProfile instanceProfile) { } LOGGER.info("instanceProfile {}", instanceProfile.toJsonStr()); try { + if (instanceMap.size() > instanceLimit) { + LOGGER.info("add instance to memory refused because instanceMap size over limit {}", + instanceProfile.getInstanceId()); + return; + } Class taskClass = Class.forName(instanceProfile.getInstanceClass()); Instance instance = (Instance) taskClass.newInstance(); boolean initSuc = instance.init(this, instanceProfile); diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java index eca7f6b25f4..7fded1fcbd1 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java @@ -58,7 +58,7 @@ public class TaskManager extends AbstractDaemon { public static final int CONFIG_QUEUE_CAPACITY = 1; public static final int CORE_THREAD_SLEEP_TIME = 1000; public static final int CORE_THREAD_PRINT_TIME = 10000; - private static final int ACTION_QUEUE_CAPACITY = 100000; + private static final int ACTION_QUEUE_CAPACITY = 1000; private long lastPrintTime = 0; // task basic db private final Db taskBasicDb; diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/MockInstance.java b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/MockInstance.java index dc4e16bebcc..ea5a260b421 100644 --- a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/MockInstance.java +++ b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/MockInstance.java @@ -19,6 +19,7 @@ import org.apache.inlong.agent.conf.InstanceProfile; import org.apache.inlong.agent.plugin.Instance; +import org.apache.inlong.agent.utils.AgentUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,6 +67,11 @@ public String getInstanceId() { return profile.getInstanceId(); } + @Override + public long getLastHeartbeatTime() { + return AgentUtils.getCurrentTime(); + } + @Override public void addCallbacks() { diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java index e8d848b36b4..566fb7be444 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java @@ -57,7 +57,7 @@ public abstract class CommonInstance extends Instance { private volatile boolean running = false; private volatile boolean inited = false; private volatile int checkFinishCount = 0; - private int heartbeatcheckCount = 0; + private int heartbeatCheckCount = 0; private long heartBeatStartTime = AgentUtils.getCurrentTime(); protected long auditVersion; @@ -72,15 +72,15 @@ public boolean init(Object srcManager, InstanceProfile srcProfile) { profile.getInstanceId(), profile.toJsonStr()); source = (Source) Class.forName(profile.getSourceClass()).newInstance(); source.init(profile); - source.start(); sink = (Sink) Class.forName(profile.getSinkClass()).newInstance(); sink.init(profile); inited = true; return true; } catch (Throwable e) { - handleSourceDeleted(); + handleDeleted(); doChangeState(State.FATAL); - LOGGER.error("init instance {} for task {} failed", profile.getInstanceId(), profile.getInstanceId(), e); + LOGGER.error("init instance {} for task {} failed", profile.getInstanceId(), profile.getInstanceId(), + e); ThreadUtils.threadThrowableHandler(Thread.currentThread(), e); return false; } @@ -117,10 +117,17 @@ public void run() { } private void doRun() { + source.start(); while (!isFinished()) { if (!source.sourceExist()) { - handleSourceDeleted(); - break; + if (handleDeleted()) { + break; + } else { + LOGGER.error("instance manager action queue is full: taskId {}", + instanceManager.getTaskId()); + AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME); + continue; + } } Message msg = source.read(); if (msg == null) { @@ -144,8 +151,8 @@ private void doRun() { AgentUtils.silenceSleepInMs(WRITE_FAILED_WAIT_TIME_MS); } } - heartbeatcheckCount++; - if (heartbeatcheckCount > HEARTBEAT_CHECK_GAP) { + heartbeatCheckCount++; + if (heartbeatCheckCount > HEARTBEAT_CHECK_GAP) { heartbeatStatic(); } } @@ -156,7 +163,7 @@ private void heartbeatStatic() { if (AgentUtils.getCurrentTime() - heartBeatStartTime > TimeUnit.SECONDS.toMillis(1)) { AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_HEARTBEAT, profile.getInlongGroupId(), profile.getInlongStreamId(), AgentUtils.getCurrentTime(), 1, 1, auditVersion); - heartbeatcheckCount = 0; + heartbeatCheckCount = 0; heartBeatStartTime = AgentUtils.getCurrentTime(); } } @@ -169,16 +176,17 @@ private void handleReadEnd() { } } - private void handleSourceDeleted() { + private boolean handleDeleted() { OffsetManager.getInstance().deleteOffset(getTaskId(), getInstanceId()); profile.setState(InstanceStateEnum.DELETE); profile.setModifyTime(AgentUtils.getCurrentTime()); InstanceAction action = new InstanceAction(ActionType.DELETE, profile); - while (!isFinished() && !instanceManager.submitAction(action)) { - LOGGER.error("instance manager action queue is full: taskId {}", - instanceManager.getTaskId()); - AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME); - } + return instanceManager.submitAction(action); + } + + @Override + public long getLastHeartbeatTime() { + return heartBeatStartTime; } @Override From 065392d3a2d0ef907659ff03293968e5b2d3a0e4 Mon Sep 17 00:00:00 2001 From: justinwwhuang Date: Mon, 13 May 2024 19:09:48 +0800 Subject: [PATCH 10/15] [INLONG-10210][Agent] Add a script for environment initialization (#10211) Co-authored-by: Charles Zhang --- inlong-agent/agent-installer/assembly.xml | 10 ++++++++++ .../agent-installer/environment/init.sh | 20 +++++++++++++++++++ 2 files changed, 30 insertions(+) create mode 100644 inlong-agent/agent-installer/environment/init.sh diff --git a/inlong-agent/agent-installer/assembly.xml b/inlong-agent/agent-installer/assembly.xml index f95a452d097..eb4905c8175 100644 --- a/inlong-agent/agent-installer/assembly.xml +++ b/inlong-agent/agent-installer/assembly.xml @@ -29,6 +29,16 @@ false + + environment + + *.* + + 0755 + environment + unix + + bin diff --git a/inlong-agent/agent-installer/environment/init.sh b/inlong-agent/agent-installer/environment/init.sh new file mode 100644 index 00000000000..4f373137606 --- /dev/null +++ b/inlong-agent/agent-installer/environment/init.sh @@ -0,0 +1,20 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# This script is used for environment initialization and can be modified according to different environments +echo "Begin initializing the environment" \ No newline at end of file From 3b6600baf28dd22ed52e6f313c0beae263bf7ad4 Mon Sep 17 00:00:00 2001 From: fuweng11 <76141879+fuweng11@users.noreply.github.com> Date: Tue, 14 May 2024 14:51:37 +0800 Subject: [PATCH 11/15] [INLONG-10096][Manager] Support installing agents by SSH (#10098) --- .../dao/entity/InlongClusterNodeEntity.java | 3 + .../mappers/InlongClusterNodeEntityMapper.xml | 39 ++++-- .../pojo/cluster/ClusterNodeRequest.java | 9 ++ .../pojo/cluster/ClusterNodeResponse.java | 9 ++ .../node/AgentClusterNodeInstallOperator.java | 96 ++++++++++++- .../manager/service/cmd/CommandExecutor.java | 40 ++++++ .../service/cmd/CommandExecutorImpl.java | 120 ++++++++++++++++ .../manager/service/cmd/CommandResult.java | 61 ++++++++ .../service/cmd/shell/ShellExecutor.java | 30 ++++ .../service/cmd/shell/ShellExecutorImpl.java | 131 ++++++++++++++++++ .../service/cmd/shell/ShellTracker.java | 36 +++++ .../resources/h2/apache_inlong_manager.sql | 3 + .../manager-web/sql/apache_inlong_manager.sql | 3 + .../manager-web/sql/changes-1.12.0.sql | 4 +- .../manager-web/sql/changes-1.13.0.sql | 28 ++++ .../main/resources/application-dev.properties | 2 + .../src/main/resources/exec_cmd.exp | 41 ++++++ 17 files changed, 639 insertions(+), 16 deletions(-) create mode 100644 inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cmd/CommandExecutor.java create mode 100644 inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cmd/CommandExecutorImpl.java create mode 100644 inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cmd/CommandResult.java create mode 100644 inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cmd/shell/ShellExecutor.java create mode 100644 inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cmd/shell/ShellExecutorImpl.java create mode 100644 inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cmd/shell/ShellTracker.java create mode 100644 inlong-manager/manager-web/sql/changes-1.13.0.sql create mode 100644 inlong-manager/manager-web/src/main/resources/exec_cmd.exp diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterNodeEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterNodeEntity.java index b57aa6d268e..eddf41f641c 100644 --- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterNodeEntity.java +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterNodeEntity.java @@ -34,6 +34,9 @@ public class InlongClusterNodeEntity implements Serializable { private String type; private String ip; private Integer port; + private String username; + private String password; + private Integer sshPort; private String protocolType; private Integer nodeLoad; private String extParams; diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml index b4b65da45b2..e3d54df45af 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml @@ -26,6 +26,9 @@ + + + @@ -39,32 +42,34 @@ - id, parent_id, type, ip, port, protocol_type, node_load, ext_params, description, + id, parent_id, type, ip, port, username, password, ssh_port, protocol_type, node_load, ext_params, description, status, is_deleted, creator, modifier, create_time, modify_time, version insert into inlong_cluster_node (id, parent_id, type, - ip, port, protocol_type, - node_load, ext_params, - description, status, - creator, modifier) + ip, port, username, + password, ssh_port, protocol_type, + node_load, ext_params, description, + status, creator, modifier) values (#{id,jdbcType=INTEGER}, #{parentId,jdbcType=INTEGER}, #{type,jdbcType=VARCHAR}, - #{ip,jdbcType=VARCHAR}, #{port,jdbcType=INTEGER}, #{protocolType,jdbcType=VARCHAR}, - #{nodeLoad,jdbcType=INTEGER}, #{extParams,jdbcType=LONGVARCHAR}, - #{description, jdbcType=VARCHAR}, #{status,jdbcType=INTEGER}, - #{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR}) + #{ip,jdbcType=VARCHAR}, #{port,jdbcType=INTEGER}, #{username,jdbcType=VARCHAR}, + #{password,jdbcType=VARCHAR}, #{sshPort,jdbcType=INTEGER}, #{protocolType,jdbcType=VARCHAR}, + #{nodeLoad,jdbcType=INTEGER}, #{extParams,jdbcType=LONGVARCHAR}, #{description, jdbcType=VARCHAR}, + #{status,jdbcType=INTEGER}, #{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR}) insert into inlong_cluster_node (id, parent_id, type, - ip, port, protocol_type, + ip, port, username, + password, ssh_port, protocol_type, node_load, ext_params, status, creator, modifier) values (#{id,jdbcType=INTEGER}, #{parentId,jdbcType=INTEGER}, #{type,jdbcType=VARCHAR}, - #{ip,jdbcType=VARCHAR}, #{port,jdbcType=INTEGER}, #{protocolType,jdbcType=VARCHAR}, + #{ip,jdbcType=VARCHAR}, #{port,jdbcType=INTEGER}, #{username,jdbcType=VARCHAR}, + #{password,jdbcType=VARCHAR}, #{sshPort,jdbcType=INTEGER}, #{protocolType,jdbcType=VARCHAR}, #{nodeLoad,jdbcType=INTEGER}, #{extParams,jdbcType=LONGVARCHAR}, #{status,jdbcType=INTEGER}, #{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR}) ON DUPLICATE KEY UPDATE node_load = VALUES(node_load), @@ -157,6 +162,9 @@ type = #{type,jdbcType=VARCHAR}, ip = #{ip,jdbcType=VARCHAR}, port = #{port,jdbcType=INTEGER}, + username = #{username,jdbcType=VARCHAR}, + password = #{password,jdbcType=VARCHAR}, + ssh_port = #{sshPort,jdbcType=INTEGER}, protocol_type = #{protocolType,jdbcType=VARCHAR}, node_load = #{nodeLoad,jdbcType=INTEGER}, ext_params = #{extParams,jdbcType=LONGVARCHAR}, @@ -183,6 +191,15 @@ port = #{port,jdbcType=INTEGER}, + + username = #{username,jdbcType=VARCHAR}, + + + password = #{password,jdbcType=VARCHAR}, + + + ssh_port = #{sshPort,jdbcType=INTEGER}, + protocol_type = #{protocolType,jdbcType=VARCHAR}, diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeRequest.java index 07960d23047..a7ea519fbe9 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeRequest.java @@ -58,6 +58,15 @@ public class ClusterNodeRequest { @NotNull(message = "port cannot be null") private Integer port; + @ApiModelProperty(value = "Username") + private String username; + + @ApiModelProperty(value = "password") + private String password; + + @ApiModelProperty(value = "SSH port") + private Integer sshPort; + @ApiModelProperty(value = "Cluster protocol type") @Length(min = 1, max = 20, message = "length must be less than or equal to 20") private String protocolType; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeResponse.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeResponse.java index f16d582f8e2..51291fafdce 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeResponse.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeResponse.java @@ -52,6 +52,15 @@ public class ClusterNodeResponse { @ApiModelProperty(value = "Cluster port") private Integer port; + @ApiModelProperty(value = "Username") + private String username; + + @ApiModelProperty(value = "password") + private String password; + + @ApiModelProperty(value = "SSH port") + private Integer sshPort; + @ApiModelProperty(value = "Cluster protocol type") private String protocolType; diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AgentClusterNodeInstallOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AgentClusterNodeInstallOperator.java index ae9615eedc4..22be7be9902 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AgentClusterNodeInstallOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AgentClusterNodeInstallOperator.java @@ -18,24 +18,66 @@ package org.apache.inlong.manager.service.cluster.node; import org.apache.inlong.manager.common.enums.ClusterType; +import org.apache.inlong.manager.common.enums.ModuleType; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.AESUtils; +import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.dao.entity.InlongClusterEntity; import org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity; +import org.apache.inlong.manager.dao.entity.ModuleConfigEntity; +import org.apache.inlong.manager.dao.entity.PackageConfigEntity; +import org.apache.inlong.manager.dao.entity.UserEntity; import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper; +import org.apache.inlong.manager.dao.mapper.ModuleConfigEntityMapper; +import org.apache.inlong.manager.dao.mapper.PackageConfigEntityMapper; +import org.apache.inlong.manager.dao.mapper.UserEntityMapper; import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest; import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeRequest; +import org.apache.inlong.manager.service.cmd.CommandExecutor; +import org.apache.commons.collections.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + @Service public class AgentClusterNodeInstallOperator implements InlongClusterNodeInstallOperator { private static final Logger LOGGER = LoggerFactory.getLogger(AgentClusterNodeInstallOperator.class); + public static final String INSTALLER_CONF_PATH = "/conf/installer.properties"; + public static final String INSTALLER_START_CMD = "/bin/installer.sh start"; + public static final String AGENT_MANAGER_AUTH_SECRET_ID = "agent.manager.auth.secretId"; + public static final String AGENT_MANAGER_AUTH_SECRET_KEY = "agent.manager.auth.secretKey"; + public static final String AGENT_MANAGER_ADDR = "agent.manager.addr"; + public static final String AGENT_CLUSTER_NAME = "agent.cluster.name"; + public static final String AGENT_CLUSTER_TAG = "agent.cluster.tag"; + public static final String AUDIT_PROXYS_URL = "audit.proxys"; + public static final String AGENT_LOCAL_IP = "agent.local.ip"; + @Autowired private InlongClusterEntityMapper clusterEntityMapper; + @Autowired + private CommandExecutor commandExecutor; + @Autowired + private ModuleConfigEntityMapper moduleConfigEntityMapper; + @Autowired + private PackageConfigEntityMapper packageConfigEntityMapper; + @Autowired + private UserEntityMapper userEntityMapper; + + @Value("${metrics.audit.proxy.hosts:127.0.0.1:10081}") + private String auditProxyUrl; + @Value("${agent.install.path:inlong/inlong-installer/}") + private String agentInstallPath; + @Value("${manager.url:127.0.0.1:8083}") + private String managerUrl; @Override public Boolean accept(String clusterType) { @@ -49,9 +91,38 @@ public String getClusterNodeType() { @Override public boolean install(ClusterNodeRequest clusterNodeRequest, String operator) { - // todo Provide agent installation capability - AgentClusterNodeRequest agentNodeRequest = (AgentClusterNodeRequest) clusterNodeRequest; - InlongClusterEntity clusterEntity = clusterEntityMapper.selectById(clusterNodeRequest.getParentId()); + LOGGER.info("begin to insert agent inlong cluster node={}", clusterNodeRequest); + try { + InlongClusterEntity clusterEntity = clusterEntityMapper.selectById(clusterNodeRequest.getParentId()); + AgentClusterNodeRequest request = (AgentClusterNodeRequest) clusterNodeRequest; + commandExecutor.mkdir(request, agentInstallPath); + String downLoadUrl = getInstallerDownLoadUrl(request); + String fileName = downLoadUrl.substring(downLoadUrl.lastIndexOf('/') + 1); + commandExecutor.downLoadPackage(request, agentInstallPath, downLoadUrl); + commandExecutor.tarPackage(request, fileName, agentInstallPath); + String confFile = agentInstallPath + INSTALLER_CONF_PATH; + Map configMap = new HashMap<>(); + configMap.put(AGENT_LOCAL_IP, request.getIp()); + configMap.put(AGENT_MANAGER_ADDR, managerUrl); + UserEntity userInfo = userEntityMapper.selectByName(operator); + Preconditions.expectNotNull(userInfo, "User doesn't exist"); + String secretKey = + new String(AESUtils.decryptAsString(userInfo.getSecretKey(), userInfo.getEncryptVersion())); + configMap.put(AGENT_MANAGER_AUTH_SECRET_ID, operator); + configMap.put(AGENT_MANAGER_AUTH_SECRET_KEY, secretKey); + configMap.put(AGENT_CLUSTER_TAG, clusterEntity.getClusterTags()); + configMap.put(AGENT_CLUSTER_NAME, clusterEntity.getName()); + configMap.put(AUDIT_PROXYS_URL, auditProxyUrl); + commandExecutor.modifyConfig(request, configMap, confFile); + String startCmd = agentInstallPath + INSTALLER_START_CMD; + commandExecutor.execRemote(request, startCmd); + + } catch (Exception e) { + String errMsg = String.format("install installer failed for ip=%s", clusterNodeRequest.getIp()); + LOGGER.error(errMsg, e); + throw new BusinessException(errMsg); + } + LOGGER.info("success to insert agent inlong cluster node={}", clusterNodeRequest); return true; } @@ -61,4 +132,23 @@ public boolean unload(InlongClusterNodeEntity clusterNodeEntity, String operator InlongClusterEntity clusterEntity = clusterEntityMapper.selectById(clusterNodeEntity.getParentId()); return true; } + + private String getInstallerDownLoadUrl(AgentClusterNodeRequest request) { + if (CollectionUtils.isEmpty(request.getModuleIdList())) { + throw new BusinessException( + String.format("install failed when module id list is null for ip=%s, type=%s", request.getIp(), + request.getType())); + } + for (Integer moduleId : request.getModuleIdList()) { + ModuleConfigEntity moduleConfigEntity = moduleConfigEntityMapper.selectByPrimaryKey(moduleId); + if (Objects.equals(moduleConfigEntity.getType(), ModuleType.INSTALLER.name())) { + PackageConfigEntity packageConfigEntity = packageConfigEntityMapper.selectByPrimaryKey( + moduleConfigEntity.getPackageId()); + return packageConfigEntity.getDownloadUrl(); + } + } + throw new BusinessException( + String.format("can't get installer download url for ip=%s, type=%s", request.getIp(), + request.getType())); + } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cmd/CommandExecutor.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cmd/CommandExecutor.java new file mode 100644 index 00000000000..e7f8eac0d83 --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cmd/CommandExecutor.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.cmd; + +import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeRequest; + +import java.util.Map; + +public interface CommandExecutor { + + CommandResult exec(String cmd) throws Exception; + + CommandResult execRemote(AgentClusterNodeRequest clusterNodeRequest, String cmd) throws Exception; + + CommandResult modifyConfig(AgentClusterNodeRequest clusterNodeRequest, Map configMap, + String confPath) throws Exception; + + CommandResult tarPackage(AgentClusterNodeRequest clusterNodeRequest, String fileName, String tarPath) + throws Exception; + + CommandResult downLoadPackage(AgentClusterNodeRequest clusterNodeRequest, String downLoadPath, String downLoadUrl) + throws Exception; + + CommandResult mkdir(AgentClusterNodeRequest clusterNodeRequest, String path) throws Exception; +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cmd/CommandExecutorImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cmd/CommandExecutorImpl.java new file mode 100644 index 00000000000..b1729b9b9ad --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cmd/CommandExecutorImpl.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.cmd; + +import org.apache.inlong.manager.common.consts.InlongConstants; +import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeRequest; +import org.apache.inlong.manager.service.cmd.shell.ShellExecutorImpl; +import org.apache.inlong.manager.service.cmd.shell.ShellTracker; + +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +@Service +public class CommandExecutorImpl implements CommandExecutor { + + private static final Logger LOG = LoggerFactory.getLogger(CommandExecutorImpl.class); + + @Override + public CommandResult exec(String cmd) throws Exception { + ShellTracker shellTracker = new ShellTracker(); + ShellExecutorImpl shellExecutor = new ShellExecutorImpl(shellTracker); + shellExecutor.syncExec("sh", "-c", cmd); + String cmdMsg = String.join(InlongConstants.BLANK, "sh", "-c", cmd); + LOG.debug("run command : {}", cmdMsg); + CommandResult commandResult = new CommandResult(); + commandResult.setCode(shellTracker.getCode()); + commandResult.setResult(String.join(InlongConstants.BLANK, shellTracker.getResult())); + if (commandResult.getCode() != 0) { + throw new Exception("command " + cmdMsg + " exec failed, code = " + + commandResult.getCode() + ", output = " + commandResult.getResult()); + } + LOG.debug(commandResult.getResult()); + return commandResult; + } + + @Override + public CommandResult execRemote(AgentClusterNodeRequest clusterNodeRequest, String cmd) throws Exception { + String cmdShell = "./conf/exec_cmd.exp"; + String ip = clusterNodeRequest.getIp(); + String port = String.valueOf(clusterNodeRequest.getSshPort()); + String user = clusterNodeRequest.getUsername(); + String password = clusterNodeRequest.getPassword(); + String remoteCommandTimeout = "20000"; + + cmd = "sh -c \"" + cmd + "\""; + String cmdMsg = + String.join(InlongConstants.BLANK, cmdShell, ip, user, password, remoteCommandTimeout, cmd, port); + LOG.info("run remote command : {}", cmdMsg); + + ShellTracker shellTracker = new ShellTracker(); + ShellExecutorImpl shellExecutor = new ShellExecutorImpl(shellTracker); + shellExecutor.syncExec(cmdShell, ip, user, password, remoteCommandTimeout, cmd, port); + + CommandResult commandResult = new CommandResult(); + commandResult.setCode(shellTracker.getCode()); + commandResult.setResult(String.join(InlongConstants.BLANK, shellTracker.getResult())); + + LOG.debug(commandResult.getResult()); + if (commandResult.getCode() != 0) { + throw new Exception( + "remote command " + cmdMsg + " exec failed, code = " + commandResult.getCode() + ", output = " + + commandResult.getResult()); + } + return commandResult; + } + + @Override + public CommandResult modifyConfig(AgentClusterNodeRequest clusterNodeRequest, Map configMap, + String confPath) + throws Exception { + List configList = configMap.entrySet().stream() + .map(entry -> "grep " + entry.getKey() + " " + confPath + " && sed -i 's%^" + entry.getKey() + ".*%" + + entry.getKey() + InlongConstants.EQUAL + entry.getValue() + "%' " + confPath + + " || echo " + entry.getKey() + InlongConstants.EQUAL + entry.getValue() + " >> " + + confPath) + .collect(Collectors.toList()); + String modifyCmd = StringUtils.join(configList, ";"); + return this.execRemote(clusterNodeRequest, modifyCmd); + } + + @Override + public CommandResult tarPackage(AgentClusterNodeRequest clusterNodeRequest, String fileName, + String tarPath) throws Exception { + String tarCmd = "tar -zxvf " + tarPath + fileName + " -C " + tarPath; + return execRemote(clusterNodeRequest, tarCmd); + } + + @Override + public CommandResult downLoadPackage(AgentClusterNodeRequest clusterNodeRequest, String downLoadPath, + String downLoadUrl) throws Exception { + return execRemote(clusterNodeRequest, "wget -P " + downLoadPath + InlongConstants.BLANK + downLoadUrl); + } + + @Override + public CommandResult mkdir(AgentClusterNodeRequest clusterNodeRequest, String path) throws Exception { + return execRemote(clusterNodeRequest, "mkdir " + path); + } + +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cmd/CommandResult.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cmd/CommandResult.java new file mode 100644 index 00000000000..6cbb594d1f2 --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cmd/CommandResult.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.cmd; + +/** + * Command result + */ +public class CommandResult { + + private int code = 0; + private String result; + private String errMsg; + + public int getCode() { + return code; + } + + public void setCode(int code) { + this.code = code; + } + + public String getResult() { + return result; + } + + public void setResult(String result) { + this.result = result; + } + + public String getErrMsg() { + return errMsg; + } + + public void setErrMsg(String errMsg) { + this.errMsg = errMsg; + } + + @Override + public String toString() { + return "CommandResult{" + + "code=" + code + + ", stdout='" + result + '\'' + + ", stderr='" + errMsg + '\'' + + '}'; + } +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cmd/shell/ShellExecutor.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cmd/shell/ShellExecutor.java new file mode 100644 index 00000000000..0a34401bdf9 --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cmd/shell/ShellExecutor.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.cmd.shell; + +public interface ShellExecutor { + + /** + * Execute shell commands + * + * @param shellPath shell path + * @param params params + */ + void syncExec(String shellPath, String... params); + +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cmd/shell/ShellExecutorImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cmd/shell/ShellExecutorImpl.java new file mode 100644 index 00000000000..6b910684cae --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cmd/shell/ShellExecutorImpl.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.cmd.shell; + +import org.apache.inlong.manager.common.consts.InlongConstants; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +@Slf4j +public class ShellExecutorImpl implements ShellExecutor { + + private static final String[] EXCEPTION_REG = new String[]{"(.*)Caused by: (.*)Exception(.*)", + "(.*)java.net.UnknownHostException(.*)", + "(.*)Copy failed: java.io.IOException: Job failed!(.*)"}; + private ShellTracker tracker; + + public ShellExecutorImpl(ShellTracker tracker) { + this.tracker = tracker; + } + + private static long getPid(Process process) { + try { + Field f = process.getClass().getDeclaredField("pid"); + f.setAccessible(true); + return f.getLong(process); + } catch (Exception e) { + log.error("get pid failed", e); + return -1; + } + } + + private static String[] merge(String shellPath, String[] paths) { + List cmds = new ArrayList(); + cmds.add(shellPath); + for (String path : paths) { + if (StringUtils.isBlank(path)) { + continue; + } + cmds.add(path); + } + String[] strings = new String[cmds.size()]; + cmds.toArray(strings); + return strings; + } + + private static String arrayToString(Object[] array, String split) { + if (array == null || array.length == 0) { + return InlongConstants.BLANK; + } + StringBuilder str = new StringBuilder(); + for (int i = 0, length = array.length; i < length; i++) { + if (i != 0) { + str.append(split); + } + str.append(array[i]); + } + return str.toString(); + } + + private static boolean HasException(String str) { + for (String reg : EXCEPTION_REG) { + Pattern pattern = Pattern.compile(reg); + Matcher matcher = pattern.matcher(str); + if (matcher.find()) { + return true; + } + } + return false; + } + + public void syncExec(String shellPath, String... params) { + List result = new ArrayList(); + String[] cmds = merge(shellPath, params); + try { + Process ps = Runtime.getRuntime().exec(cmds); + long pid = getPid(ps); + tracker.setProcessId(pid); + BufferedReader br = new BufferedReader(new InputStreamReader(ps.getInputStream())); + String line; + boolean hasException = false; + while ((line = br.readLine()) != null) { + if (HasException(line)) { + hasException = true; + } + result.add(line); + tracker.setRunResult(arrayToString(result.toArray(), InlongConstants.NEW_LINE)); + tracker.lineChange(line); + } + if (hasException) { + tracker.lineChange("Java exception exist in output"); + tracker.setCode(-1); + return; + } + ps.waitFor(); + int exitValue = ps.exitValue(); + if (exitValue != 0) { + tracker.setCode(exitValue); + } + } catch (Exception e) { + log.error("sync exec shell failed", e); + result.add(e.getMessage()); + tracker.setRunResult(arrayToString(result.toArray(), InlongConstants.NEW_LINE)); + tracker.lineChange(e.getMessage()); + tracker.setCode(-1); + } + } +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cmd/shell/ShellTracker.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cmd/shell/ShellTracker.java new file mode 100644 index 00000000000..e0635faa3cc --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cmd/shell/ShellTracker.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.cmd.shell; + +import lombok.Data; + +import java.util.ArrayList; +import java.util.List; + +@Data +public class ShellTracker { + + private List result = new ArrayList<>(); + private int code; + private Long processId; + private String runResult; + + public void lineChange(String line) { + result.add(line); + } +} diff --git a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql index c9f9a424b79..8f201963527 100644 --- a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql +++ b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql @@ -132,6 +132,9 @@ CREATE TABLE IF NOT EXISTS `inlong_cluster_node` `type` varchar(20) NOT NULL COMMENT 'Cluster type, such as: AGENT, DATAPROXY, etc', `ip` varchar(512) NOT NULL COMMENT 'Cluster IP, separated by commas, such as: 127.0.0.1:8080,host2:8081', `port` int(6) NULL COMMENT 'Cluster port', + `username` varchar(256) DEFAULT NULL COMMENT 'Username for ssh', + `password` varchar(256) DEFAULT NULL COMMENT 'Password for ssh', + `ssh_port` int(11) DEFAULT NULL COMMENT 'Ssh port', `protocol_type` varchar(20) DEFAULT NULL COMMENT 'DATAPROXY Source listen protocol type, such as: TCP/HTTP', `node_load` int(11) DEFAULT '-1' COMMENT 'Current load value of the node', `ext_params` mediumtext DEFAULT NULL COMMENT 'Another fields will be saved as JSON string', diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql index cff675f287c..41e55c52068 100644 --- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql +++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql @@ -143,6 +143,9 @@ CREATE TABLE IF NOT EXISTS `inlong_cluster_node` `type` varchar(20) NOT NULL COMMENT 'Cluster type, such as: AGENT, DATAPROXY, etc', `ip` varchar(512) NOT NULL COMMENT 'Cluster IP, separated by commas, such as: 127.0.0.1:8080,host2:8081', `port` int(6) NULL COMMENT 'Cluster port', + `username` varchar(256) DEFAULT NULL COMMENT 'Username for ssh', + `password` varchar(256) DEFAULT NULL COMMENT 'Password for ssh', + `ssh_port` int(11) DEFAULT NULL COMMENT 'Ssh port', `protocol_type` varchar(20) DEFAULT NULL COMMENT 'DATAPROXY Source listen protocol type, such as: TCP/HTTP', `node_load` int(11) DEFAULT '-1' COMMENT 'Current load value of the node', `ext_params` mediumtext DEFAULT NULL COMMENT 'Another fields will be saved as JSON string', diff --git a/inlong-manager/manager-web/sql/changes-1.12.0.sql b/inlong-manager/manager-web/sql/changes-1.12.0.sql index 36cbaefe9a6..18c5979da90 100644 --- a/inlong-manager/manager-web/sql/changes-1.12.0.sql +++ b/inlong-manager/manager-web/sql/changes-1.12.0.sql @@ -15,8 +15,8 @@ * limitations under the License. */ --- This is the SQL change file from version 1.9.0 to the current version 1.10.0. --- When upgrading to version 1.10.0, please execute those SQLs in the DB (such as MySQL) used by the Manager module. +-- This is the SQL change file from version 1.11.0 to the current version 1.12.0. +-- When upgrading to version 1.12.0, please execute those SQLs in the DB (such as MySQL) used by the Manager module. SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; diff --git a/inlong-manager/manager-web/sql/changes-1.13.0.sql b/inlong-manager/manager-web/sql/changes-1.13.0.sql new file mode 100644 index 00000000000..7013c2990f3 --- /dev/null +++ b/inlong-manager/manager-web/sql/changes-1.13.0.sql @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- This is the SQL change file from version 1.12.0 to the current version 1.13.0. +-- When upgrading to version 1.13.0, please execute those SQLs in the DB (such as MySQL) used by the Manager module. + +SET NAMES utf8mb4; +SET FOREIGN_KEY_CHECKS = 0; + +USE `apache_inlong_manager`; + +ALTER TABLE `inlong_cluster_node` ADD COLUMN `username` varchar(256) DEFAULT NULL COMMENT 'username for ssh'; +ALTER TABLE `inlong_cluster_node` ADD COLUMN `password` varchar(256) DEFAULT NULL COMMENT 'password for ssh'; +ALTER TABLE `inlong_cluster_node` ADD COLUMN `ssh_port` int(11) DEFAULT NULL COMMENT 'ssh port'; diff --git a/inlong-manager/manager-web/src/main/resources/application-dev.properties b/inlong-manager/manager-web/src/main/resources/application-dev.properties index d5d7c047da7..ab8b5e2c4aa 100644 --- a/inlong-manager/manager-web/src/main/resources/application-dev.properties +++ b/inlong-manager/manager-web/src/main/resources/application-dev.properties @@ -102,3 +102,5 @@ metrics.audit.proxy.hosts=127.0.0.1:10081 cls.manager.endpoint=127.0.0.1 +manager.url=127.0.0.1:8083 +agent.install.path= \ No newline at end of file diff --git a/inlong-manager/manager-web/src/main/resources/exec_cmd.exp b/inlong-manager/manager-web/src/main/resources/exec_cmd.exp new file mode 100644 index 00000000000..a56079f96dd --- /dev/null +++ b/inlong-manager/manager-web/src/main/resources/exec_cmd.exp @@ -0,0 +1,41 @@ +#!/usr/bin/expect +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +set remoteHost [lindex $argv 0] +set remoteUser [lindex $argv 1] +set password [lindex $argv 2] +set cmdTimeout [lindex $argv 3] +set runCommand [lindex $argv 4] +set remotePort [lindex $argv 5] + +spawn ssh -p ${remotePort} ${remoteHost} -l ${remoteUser} "${runCommand} && echo \\#SUCCESS\\# || echo \\#fail\\#" + +set timeout ${cmdTimeout} +expect { + "*yes/no)?" {send "yes\n"; exp_continue} + "*assword:" {send "${password}\n"; exp_continue } + "Last login:" {} + "#SUCCESS#" {} + "#fail#" {exit 1 } + "*No route to host" {exit 2} + "Permission denied" {exit 3} + "*Host key verification failed*" {exit 4} + timeout { exit 5 } + eof { exit 6 } +} \ No newline at end of file From 974ef70d7ff6b4d7f5d1484fcf6808d331f19ddb Mon Sep 17 00:00:00 2001 From: wohainilaodou <165994047+wohainilaodou@users.noreply.github.com> Date: Tue, 14 May 2024 15:40:50 +0800 Subject: [PATCH 12/15] [INLONG-10215][DashBoard] Add default value for file data source in data access module (#10216) Co-authored-by: v_shuomqiu --- inlong-dashboard/src/plugins/sources/defaults/File.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/inlong-dashboard/src/plugins/sources/defaults/File.ts b/inlong-dashboard/src/plugins/sources/defaults/File.ts index 8f59cb76746..9e5107fe24b 100644 --- a/inlong-dashboard/src/plugins/sources/defaults/File.ts +++ b/inlong-dashboard/src/plugins/sources/defaults/File.ts @@ -139,6 +139,7 @@ export default class PulsarSource @FieldDecorator({ type: 'inputnumber', rules: [{ required: true }], + initialValue: 20, props: values => ({ min: 1, max: 100, @@ -152,6 +153,7 @@ export default class PulsarSource @FieldDecorator({ type: 'radio', + initialValue: 'H', props: values => ({ disabled: Boolean(values.id), options: [ @@ -177,9 +179,11 @@ export default class PulsarSource @FieldDecorator({ type: 'input', tooltip: i18n.t('meta.Sources.File.TimeOffsetHelp'), + initialValue: '0h', rules: [ { - pattern: /[0-9][mhd]$/, + pattern: /-?[0-9][mhd]$/, + required: true, message: i18n.t('meta.Sources.File.TimeOffsetRules'), }, ], From e8dbf282bf1a984972f7224f1dab31fec1c1e098 Mon Sep 17 00:00:00 2001 From: haifxu Date: Tue, 14 May 2024 19:40:18 +0800 Subject: [PATCH 13/15] [INLONG-10076][Manager] Doris data node supports test connections (#10217) --- .../service/node/DataNodeServiceImpl.java | 1 - .../node/doris/DorisDataNodeOperator.java | 22 +++++++++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java index 4535f14f617..9022049acf5 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java @@ -288,7 +288,6 @@ public Boolean deleteByKey(String name, String type, String operator) { @Override public Boolean testConnection(DataNodeRequest request) { LOGGER.info("begin test connection for: {}", request); - String type = request.getType(); // according to the data node type, test connection DataNodeOperator dataNodeOperator = operatorFactory.getInstance(request.getType()); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/doris/DorisDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/doris/DorisDataNodeOperator.java index e29e2483eff..e3c14e8f9b2 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/doris/DorisDataNodeOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/doris/DorisDataNodeOperator.java @@ -21,13 +21,16 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.dao.entity.DataNodeEntity; import org.apache.inlong.manager.pojo.node.DataNodeInfo; import org.apache.inlong.manager.pojo.node.DataNodeRequest; import org.apache.inlong.manager.pojo.node.doris.DorisDataNodeDTO; import org.apache.inlong.manager.pojo.node.doris.DorisDataNodeInfo; import org.apache.inlong.manager.pojo.node.doris.DorisDataNodeRequest; +import org.apache.inlong.manager.pojo.node.mysql.MySQLDataNodeDTO; import org.apache.inlong.manager.service.node.AbstractDataNodeOperator; +import org.apache.inlong.manager.service.resource.sink.mysql.MySQLJdbcUtils; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.lang3.StringUtils; @@ -36,6 +39,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.sql.Connection; + @Service public class DorisDataNodeOperator extends AbstractDataNodeOperator { @@ -81,4 +86,21 @@ protected void setTargetEntity(DataNodeRequest request, DataNodeEntity targetEnt } } + @Override + public Boolean testConnection(DataNodeRequest request) { + String jdbcUrl = MySQLDataNodeDTO.convertToJdbcurl(request.getUrl()); + String username = request.getUsername(); + String password = request.getToken(); + Preconditions.expectNotBlank(jdbcUrl, ErrorCodeEnum.INVALID_PARAMETER, "connection jdbcUrl cannot be empty"); + try (Connection ignored = MySQLJdbcUtils.getConnection(jdbcUrl, username, password)) { + LOGGER.info("doris connection not null - connection success for jdbcUrl={}, username={}, password={}", + jdbcUrl, username, password); + return true; + } catch (Exception e) { + String errMsg = String.format("doris connection failed for jdbcUrl=%s, username=%s, password=%s", jdbcUrl, + username, password); + LOGGER.error(errMsg, e); + throw new BusinessException(errMsg); + } + } } From e68c69f986df57f04d0ef0ba067db07ae2912f4e Mon Sep 17 00:00:00 2001 From: vernedeng Date: Wed, 15 May 2024 15:54:21 +0800 Subject: [PATCH 14/15] [INLONG-10208][Sort] ClsSink support unified configuration (#10220) * [INLONG-10208][Sort] ClsSink support unified configuration --- .../common/pojo/sort/SortClusterConfig.java | 4 + .../common/pojo/sort/SortTaskConfig.java | 4 + .../pojo/sort/dataflow/DataFlowConfig.java | 6 + .../sort/DefaultSortConfigOperator.java | 2 + .../sort/standalone/sink/SinkContext.java | 1 + .../sort/standalone/sink/cls/ClsIdConfig.java | 49 +++-- .../sort/standalone/sink/cls/ClsSink.java | 26 +-- .../standalone/sink/cls/ClsSinkContext.java | 167 +++----------- .../sort/standalone/sink/v2/SinkContext.java | 165 ++++++++++++++ .../source/sortsdk/v2/SortSdkSource.java | 205 ++++++++++++++++++ .../standalone/sink/cls/TestClsIdConfig.java | 4 +- .../cls/TestDefaultEvent2LogItemHandler.java | 3 +- 12 files changed, 452 insertions(+), 184 deletions(-) create mode 100644 inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/v2/SinkContext.java create mode 100644 inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/v2/SortSdkSource.java diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortClusterConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortClusterConfig.java index 13109d18716..ff3d337cb1d 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortClusterConfig.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortClusterConfig.java @@ -20,14 +20,18 @@ import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig; import org.apache.inlong.common.pojo.sort.mq.MqClusterConfig; +import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; +import lombok.NoArgsConstructor; import java.io.Serializable; import java.util.List; @Data @Builder +@AllArgsConstructor +@NoArgsConstructor public class SortClusterConfig implements Serializable { private String clusterTag; diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortTaskConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortTaskConfig.java index e8795fb2be1..b8f80ef26f2 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortTaskConfig.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortTaskConfig.java @@ -19,14 +19,18 @@ import org.apache.inlong.common.pojo.sort.node.NodeConfig; +import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; +import lombok.NoArgsConstructor; import java.io.Serializable; import java.util.List; @Data @Builder +@AllArgsConstructor +@NoArgsConstructor public class SortTaskConfig implements Serializable { private String sortTaskName; diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/DataFlowConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/DataFlowConfig.java index 7429a78be35..a7bb1c36a35 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/DataFlowConfig.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/DataFlowConfig.java @@ -19,18 +19,24 @@ import org.apache.inlong.common.pojo.sort.dataflow.sink.SinkConfig; +import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; +import lombok.NoArgsConstructor; import java.io.Serializable; import java.util.Map; @Data @Builder +@AllArgsConstructor +@NoArgsConstructor public class DataFlowConfig implements Serializable { private String dataflowId; private Integer version; + private String inlongGroupId; + private String inlongStreamId; private SourceConfig sourceConfig; private SinkConfig sinkConfig; private Map properties; diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java index b712ff11ee4..5c51c1abcf9 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java @@ -161,6 +161,8 @@ private DataFlowConfig getDataFlowConfig(InlongGroupInfo groupInfo, InlongStream .dataflowId(String.valueOf(sink.getId())) .sourceConfig(getSourceConfig(groupInfo, streamInfo, sink)) .sinkConfig(getSinkConfig(sink)) + .inlongGroupId(groupInfo.getInlongGroupId()) + .inlongStreamId(streamInfo.getInlongStreamId()) .build(); } diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java index 7e8b26e3cae..16b15ab8b94 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java @@ -41,6 +41,7 @@ * * SinkContext */ +@Deprecated public class SinkContext { public static final Logger LOG = InlongLoggerFactory.getLogger(SinkContext.class); diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java index faefaa6aa11..f0fd784acbc 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java @@ -17,16 +17,26 @@ package org.apache.inlong.sort.standalone.sink.cls; +import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig; +import org.apache.inlong.common.pojo.sort.dataflow.field.FieldConfig; +import org.apache.inlong.common.pojo.sort.dataflow.sink.ClsSinkConfig; +import org.apache.inlong.common.pojo.sort.node.ClsNodeConfig; + +import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; +import lombok.NoArgsConstructor; -import java.util.ArrayList; -import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; /** * Cls config of each uid. */ @Data +@Builder +@NoArgsConstructor +@AllArgsConstructor public class ClsIdConfig { private String inlongGroupId; @@ -36,23 +46,28 @@ public class ClsIdConfig { private String secretId; private String secretKey; private String topicId; - private String fieldNames; + private List fieldList; private int fieldOffset = 2; private int contentOffset = 0; - private List fieldList; - /** - * Parse fieldNames to list of fields. - * @return List of fields. - */ - public List getFieldList() { - if (fieldList == null) { - this.fieldList = new ArrayList<>(); - if (fieldNames != null) { - String[] fieldNameArray = fieldNames.split("\\s+"); - this.fieldList.addAll(Arrays.asList(fieldNameArray)); - } - } - return fieldList; + public static ClsIdConfig create(DataFlowConfig dataFlowConfig, ClsNodeConfig nodeConfig) { + ClsSinkConfig sinkConfig = (ClsSinkConfig) dataFlowConfig.getSinkConfig(); + List fields = sinkConfig.getFieldConfigs() + .stream() + .map(FieldConfig::getName) + .collect(Collectors.toList()); + return ClsIdConfig.builder() + .inlongGroupId(dataFlowConfig.getInlongGroupId()) + .inlongStreamId(dataFlowConfig.getInlongStreamId()) + .contentOffset(sinkConfig.getContentOffset()) + .fieldOffset(sinkConfig.getFieldOffset()) + .separator(sinkConfig.getSeparator()) + .fieldList(fields) + .topicId(sinkConfig.getTopicId()) + .endpoint(nodeConfig.getEndpoint()) + .secretId(nodeConfig.getSendSecretId()) + .secretKey(nodeConfig.getSendSecretKey()) + .build(); } + } diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSink.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSink.java index 30289489e40..e959b249fe8 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSink.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSink.java @@ -27,13 +27,6 @@ import java.util.ArrayList; import java.util.List; -/** - * Cls Sink implementation. - * - *

- * Response for initialization of {@link ClsChannelWorker}. - *

- */ public class ClsSink extends AbstractSink implements Configurable { private static final Logger LOG = LoggerFactory.getLogger(ClsSink.class); @@ -42,9 +35,6 @@ public class ClsSink extends AbstractSink implements Configurable { private ClsSinkContext context; private List workers = new ArrayList<>(); - /** - * Start {@link ClsChannelWorker}. - */ @Override public void start() { super.start(); @@ -57,13 +47,10 @@ public void start() { worker.start(); } } catch (Exception e) { - LOG.error(e.getMessage(), e); + LOG.error("failed to start cls sink, ex={}", e.getMessage(), e); } } - /** - * Stop {@link ClsChannelWorker}. - */ @Override public void stop() { super.stop(); @@ -74,24 +61,15 @@ public void stop() { } this.workers.clear(); } catch (Exception e) { - LOG.error(e.getMessage(), e); + LOG.error("failed to stop cls sink, ex={}", e.getMessage(), e); } } - /** - * Process. - * @return Status - * @throws EventDeliveryException - */ @Override public Status process() throws EventDeliveryException { return Status.BACKOFF; } - /** - * Config parent context. - * @param context Parent context. - */ @Override public void configure(Context context) { LOG.info("start to configure:{}, context:{}.", this.getName(), context.toString()); diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java index 6d05717243c..fef43cadfc9 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java @@ -17,18 +17,18 @@ package org.apache.inlong.sort.standalone.sink.cls; -import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig; +import org.apache.inlong.common.pojo.sort.SortClusterConfig; +import org.apache.inlong.common.pojo.sort.SortTaskConfig; +import org.apache.inlong.common.pojo.sort.node.ClsNodeConfig; import org.apache.inlong.sort.standalone.channel.ProfileEvent; import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder; -import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder; +import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder; import org.apache.inlong.sort.standalone.config.pojo.InlongId; import org.apache.inlong.sort.standalone.metrics.SortMetricItem; import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils; -import org.apache.inlong.sort.standalone.sink.SinkContext; -import org.apache.inlong.sort.standalone.utils.Constants; +import org.apache.inlong.sort.standalone.sink.v2.SinkContext; import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.tencentcloudapi.cls.producer.AsyncProducerClient; @@ -41,16 +41,13 @@ import org.slf4j.Logger; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; -/** - * Cls sink context. - */ public class ClsSinkContext extends SinkContext { private static final Logger LOG = InlongLoggerFactory.getLogger(ClsSinkContext.class); @@ -74,20 +71,16 @@ public class ClsSinkContext extends SinkContext { private final Map clientMap; private List deletingClients = new ArrayList<>(); - private Context sinkContext; private Map idConfigMap = new ConcurrentHashMap<>(); private IEvent2LogItemHandler event2LogItemHandler; + private ClsNodeConfig clsNodeConfig; + private ObjectMapper objectMapper; - /** - * Constructor - * - * @param sinkName Name of sink. - * @param context Basic context. - * @param channel Channel which worker acquire profile event from. - */ public ClsSinkContext(String sinkName, Context context, Channel channel) { super(sinkName, context, channel); this.clientMap = new ConcurrentHashMap<>(); + this.objectMapper = new ObjectMapper(); + this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); } @Override @@ -104,26 +97,25 @@ public void reload() { } }); - SortTaskConfig newSortTaskConfig = SortClusterConfigHolder.getTaskConfig(taskName); + SortTaskConfig newSortTaskConfig = SortConfigHolder.getTaskConfig(taskName); if (newSortTaskConfig == null || newSortTaskConfig.equals(sortTaskConfig)) { return; } LOG.info("get new SortTaskConfig:taskName:{}:config:{}", taskName, - new ObjectMapper().writeValueAsString(newSortTaskConfig)); + objectMapper.writeValueAsString(newSortTaskConfig)); this.sortTaskConfig = newSortTaskConfig; - this.sinkContext = new Context(this.sortTaskConfig.getSinkParams()); + ClsNodeConfig requestNodeConfig = (ClsNodeConfig) sortTaskConfig.getNodeConfig(); + this.clsNodeConfig = + requestNodeConfig.getVersion() >= clsNodeConfig.getVersion() ? requestNodeConfig : clsNodeConfig; + this.keywordMaxLength = DEFAULT_KEYWORD_MAX_LENGTH; this.reloadIdParams(); this.reloadClients(); this.reloadHandler(); - this.keywordMaxLength = sinkContext.getInteger(KEY_MAX_KEYWORD_LENGTH, DEFAULT_KEYWORD_MAX_LENGTH); } catch (Exception e) { LOG.error(e.getMessage(), e); } } - /** - * Reload LogItemHandler. - */ private void reloadHandler() { String logItemHandlerClass = CommonPropertiesHolder.getString(KEY_EVENT_LOG_ITEM_HANDLER, DefaultEvent2LogItemHandler.class.getName()); @@ -136,47 +128,22 @@ private void reloadHandler() { LOG.error("{} is not the instance of IEvent2LogItemHandler", logItemHandlerClass); } } catch (Throwable t) { - LOG.error("Fail to init IEvent2LogItemHandler, handlerClass:{}, error:{}", + LOG.error("fail to init IEvent2LogItemHandler, handlerClass:{}, error:{}", logItemHandlerClass, t.getMessage()); } } - /** - * Reload id params. - * - * @throws JsonProcessingException - */ - private void reloadIdParams() throws JsonProcessingException { - List> idList = this.sortTaskConfig.getIdParams(); - Map newIdConfigMap = new ConcurrentHashMap<>(); - ObjectMapper objectMapper = new ObjectMapper(); - objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - for (Map idParam : idList) { - String inlongGroupId = idParam.get(Constants.INLONG_GROUP_ID); - String inlongStreamId = idParam.get(Constants.INLONG_STREAM_ID); - String uid = InlongId.generateUid(inlongGroupId, inlongStreamId); - String jsonIdConfig = objectMapper.writeValueAsString(idParam); - ClsIdConfig idConfig = objectMapper.readValue(jsonIdConfig, ClsIdConfig.class); - idConfig.getFieldList(); - newIdConfigMap.put(uid, idConfig); - } - this.idConfigMap = newIdConfigMap; + private void reloadIdParams() { + this.idConfigMap = this.sortTaskConfig.getClusters() + .stream() + .map(SortClusterConfig::getDataFlowConfigs) + .flatMap(Collection::stream) + .map(dataFlowConfig -> ClsIdConfig.create(dataFlowConfig, clsNodeConfig)) + .collect(Collectors.toMap( + config -> InlongId.generateUid(config.getInlongGroupId(), config.getInlongStreamId()), + v -> v)); } - /** - * Close expire clients and start new clients. - * - *

- * Each client response for data of one secretId. - *

- *

- * First, find all secretId that are in the active clientMap but not in the updated id config (or to say EXPIRE - * secretId), and put those clients into deletingClientsMap. The real close process will be done at the beginning of - * next period of reloading. Second, find all secretIds that in the updated id config but not in the active - * clientMap(or to say NEW secretId), and start new clients for these secretId and put them into the active - * clientMap. - *

- */ private void reloadClients() { // get update secretIds Map updateConfigMap = idConfigMap.values() @@ -196,76 +163,25 @@ private void reloadClients() { .forEach(this::startNewClient); } - /** - * Start new cls client and put it to the active clientMap. - * - * @param idConfig idConfig of new client. - */ private void startNewClient(ClsIdConfig idConfig) { AsyncProducerConfig producerConfig = new AsyncProducerConfig( idConfig.getEndpoint(), idConfig.getSecretId(), idConfig.getSecretKey(), NetworkUtils.getLocalMachineIP()); - this.setCommonClientConfig(producerConfig); AsyncProducerClient client = new AsyncProducerClient(producerConfig); clientMap.put(idConfig.getSecretId(), client); } - /** - * Get common client config from context and set them. - * - * @param config Config to be set. - */ - private void setCommonClientConfig(AsyncProducerConfig config) { - Optional.ofNullable(sinkContext.getInteger(KEY_TOTAL_SIZE_IN_BYTES)) - .ifPresent(config::setTotalSizeInBytes); - Optional.ofNullable(sinkContext.getInteger(KEY_MAX_SEND_THREAD_COUNT)) - .ifPresent(config::setSendThreadCount); - Optional.ofNullable(sinkContext.getInteger(KEY_MAX_BLOCK_SEC)) - .ifPresent(config::setMaxBlockMs); - Optional.ofNullable(sinkContext.getInteger(KEY_MAX_BATCH_SIZE)) - .ifPresent(config::setBatchSizeThresholdInBytes); - Optional.ofNullable(sinkContext.getInteger(KEY_MAX_BATCH_COUNT)) - .ifPresent(config::setBatchCountThreshold); - Optional.ofNullable(sinkContext.getInteger(KEY_LINGER_MS)) - .ifPresent(config::setLingerMs); - Optional.ofNullable(sinkContext.getInteger(KEY_RETRIES)) - .ifPresent(config::setRetries); - Optional.ofNullable(sinkContext.getInteger(KEY_MAX_RESERVED_ATTEMPTS)) - .ifPresent(config::setMaxReservedAttempts); - Optional.ofNullable(sinkContext.getInteger(KEY_BASE_RETRY_BACKOFF_MS)) - .ifPresent(config::setBaseRetryBackoffMs); - Optional.ofNullable(sinkContext.getInteger(KEY_MAX_RETRY_BACKOFF_MS)) - .ifPresent(config::setMaxRetryBackoffMs); - } - - /** - * Remove expire client from active clientMap and into the deleting client list. - *

- * The reason why not close client when it remove from clientMap is to avoid Race Condition. Which will - * happen when worker thread get the client and ready to send msg, while the reload thread try to close it. - *

- * - * @param secretId SecretId of expire client. - */ private void removeExpireClient(String secretId) { AsyncProducerClient client = clientMap.get(secretId); if (client == null) { - LOG.error("Remove client failed, there is not client of {}", secretId); + LOG.error("remove client failed, there is not client of {}", secretId); return; } deletingClients.add(clientMap.remove(secretId)); } - /** - * Add send result. - * - * @param currentRecord Event to be sent. - * @param bid Topic or dest ip of event. - * @param result Result of send. - * @param sendTime Time of sending. - */ public void addSendResultMetric(ProfileEvent currentRecord, String bid, boolean result, long sendTime) { Map dimensions = this.getDimensions(currentRecord, bid); SortMetricItem metricItem = this.getMetricItemSet().findMetricItem(dimensions); @@ -288,13 +204,6 @@ public void addSendResultMetric(ProfileEvent currentRecord, String bid, boolean } } - /** - * Get report dimensions. - * - * @param currentRecord Event. - * @param bid Topic or dest ip. - * @return Prepared dimensions map. - */ private Map getDimensions(ProfileEvent currentRecord, String bid) { Map dimensions = new HashMap<>(); dimensions.put(SortMetricItem.KEY_CLUSTER_ID, this.getClusterId()); @@ -309,40 +218,18 @@ private Map getDimensions(ProfileEvent currentRecord, String bid return dimensions; } - /** - * Get {@link ClsIdConfig} by uid. - * - * @param uid Uid of event. - * @return Corresponding cls id config. - */ public ClsIdConfig getIdConfig(String uid) { return idConfigMap.get(uid); } - /** - * Get max length of single value. - * - * @return Max length of single value. - */ public int getKeywordMaxLength() { return keywordMaxLength; } - /** - * Get LogItem handler. - * - * @return Handler. - */ public IEvent2LogItemHandler getLogItemHandler() { return event2LogItemHandler; } - /** - * Get cls client. - * - * @param secretId ID of client. - * @return Client instance. - */ public AsyncProducerClient getClient(String secretId) { return clientMap.get(secretId); } diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/v2/SinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/v2/SinkContext.java new file mode 100644 index 00000000000..251a6d56af8 --- /dev/null +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/v2/SinkContext.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.standalone.sink.v2; + +import org.apache.inlong.common.metric.MetricRegister; +import org.apache.inlong.common.pojo.sort.SortTaskConfig; +import org.apache.inlong.sort.standalone.channel.ProfileEvent; +import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder; +import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder; +import org.apache.inlong.sort.standalone.metrics.SortMetricItem; +import org.apache.inlong.sort.standalone.metrics.SortMetricItemSet; +import org.apache.inlong.sort.standalone.utils.BufferQueue; +import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory; + +import org.apache.commons.lang3.StringUtils; +import org.apache.flume.Channel; +import org.apache.flume.Context; +import org.slf4j.Logger; + +import java.util.Date; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; + +public class SinkContext { + + public static final Logger LOG = InlongLoggerFactory.getLogger(SinkContext.class); + public static final String KEY_MAX_THREADS = "maxThreads"; + public static final String KEY_PROCESSINTERVAL = "processInterval"; + public static final String KEY_RELOADINTERVAL = "reloadInterval"; + public static final String KEY_TASK_NAME = "taskName"; + public static final String KEY_MAX_BUFFERQUEUE_SIZE_KB = "maxBufferQueueSizeKb"; + public static final int DEFAULT_MAX_BUFFERQUEUE_SIZE_KB = 128 * 1024; + protected final String clusterId; + protected final String taskName; + protected final String sinkName; + protected final Context sinkContext; + protected SortTaskConfig sortTaskConfig; + protected final Channel channel; + protected final int maxThreads; + protected final long processInterval; + protected final long reloadInterval; + protected final SortMetricItemSet metricItemSet; + protected Timer reloadTimer; + + public SinkContext(String sinkName, Context context, Channel channel) { + this.sinkName = sinkName; + this.sinkContext = context; + this.channel = channel; + this.clusterId = sinkContext.getString(CommonPropertiesHolder.KEY_CLUSTER_ID); + this.taskName = sinkContext.getString(KEY_TASK_NAME); + this.maxThreads = sinkContext.getInteger(KEY_MAX_THREADS, 10); + this.processInterval = sinkContext.getInteger(KEY_PROCESSINTERVAL, 100); + this.reloadInterval = sinkContext.getLong(KEY_RELOADINTERVAL, 60000L); + this.metricItemSet = new SortMetricItemSet(sinkName); + MetricRegister.register(this.metricItemSet); + } + + public void start() { + try { + this.reload(); + this.setReloadTimer(); + } catch (Exception e) { + LOG.error("failed to start sink context", e); + } + } + + public void close() { + try { + this.reloadTimer.cancel(); + } catch (Exception e) { + LOG.error("failed to close sink context", e); + } + } + + protected void setReloadTimer() { + reloadTimer = new Timer(true); + TimerTask task = new TimerTask() { + + public void run() { + reload(); + } + }; + reloadTimer.schedule(task, new Date(System.currentTimeMillis() + reloadInterval), reloadInterval); + } + + public void reload() { + try { + this.sortTaskConfig = SortConfigHolder.getTaskConfig(taskName); + } catch (Throwable e) { + LOG.error("failed to stop sink context", e); + } + } + + public String getClusterId() { + return clusterId; + } + + public String getTaskName() { + return taskName; + } + + public String getSinkName() { + return sinkName; + } + + public Context getSinkContext() { + return sinkContext; + } + + public SortTaskConfig getSortTaskConfig() { + return sortTaskConfig; + } + + public Channel getChannel() { + return channel; + } + + public int getMaxThreads() { + return maxThreads; + } + + public long getProcessInterval() { + return processInterval; + } + + public long getReloadInterval() { + return reloadInterval; + } + + public SortMetricItemSet getMetricItemSet() { + return metricItemSet; + } + + public static void fillInlongId(ProfileEvent currentRecord, Map dimensions) { + String inlongGroupId = currentRecord.getInlongGroupId(); + inlongGroupId = (StringUtils.isBlank(inlongGroupId)) ? "-" : inlongGroupId; + String inlongStreamId = currentRecord.getInlongStreamId(); + inlongStreamId = (StringUtils.isBlank(inlongStreamId)) ? "-" : inlongStreamId; + dimensions.put(SortMetricItem.KEY_INLONG_GROUP_ID, inlongGroupId); + dimensions.put(SortMetricItem.KEY_INLONG_STREAM_ID, inlongStreamId); + } + + public static BufferQueue createBufferQueue() { + int maxBufferQueueSizeKb = CommonPropertiesHolder.getInteger(KEY_MAX_BUFFERQUEUE_SIZE_KB, + DEFAULT_MAX_BUFFERQUEUE_SIZE_KB); + BufferQueue dispatchQueue = new BufferQueue<>(maxBufferQueueSizeKb); + return dispatchQueue; + } +} diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/v2/SortSdkSource.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/v2/SortSdkSource.java new file mode 100644 index 00000000000..43245da9d0d --- /dev/null +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/v2/SortSdkSource.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.standalone.source.sortsdk.v2; + +import org.apache.inlong.sdk.commons.admin.AdminServiceRegister; +import org.apache.inlong.sdk.sort.api.QueryConsumeConfig; +import org.apache.inlong.sdk.sort.api.SortClient; +import org.apache.inlong.sdk.sort.api.SortClientConfig; +import org.apache.inlong.sdk.sort.api.SortClientFactory; +import org.apache.inlong.sort.standalone.admin.ConsumerServiceMBean; +import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder; +import org.apache.inlong.sort.standalone.config.holder.ManagerUrlHandler; +import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigType; +import org.apache.inlong.sort.standalone.config.holder.SortSourceConfigType; +import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder; +import org.apache.inlong.sort.standalone.config.loader.ClassResourceQueryConsumeConfig; +import org.apache.inlong.sort.standalone.source.sortsdk.DefaultTopicChangeListener; +import org.apache.inlong.sort.standalone.source.sortsdk.FetchCallback; +import org.apache.inlong.sort.standalone.source.sortsdk.SortSdkSourceContext; +import org.apache.inlong.sort.standalone.utils.v2.FlumeConfigGenerator; + +import org.apache.commons.lang3.ClassUtils; +import org.apache.flume.Context; +import org.apache.flume.EventDrivenSource; +import org.apache.flume.conf.Configurable; +import org.apache.flume.source.AbstractSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Default Source implementation of InLong. + * + *

+ * SortSdkSource acquired msg from different upstream data store by register {@link SortClient} for each sort task. The + * only things SortSdkSource should do is to get one client by the sort task id, or remove one client when the task is + * finished or schedule to other source instance. + *

+ * + *

+ * The Default Manager of InLong will schedule the partition and topic automatically. + *

+ * + *

+ * Because all sources should implement {@link Configurable}, the SortSdkSource should have default constructor + * WITHOUT any arguments, and parameters will be configured by {@link Configurable#configure(Context)}. + *

+ */ +public final class SortSdkSource extends AbstractSource + implements + Configurable, + Runnable, + EventDrivenSource, + ConsumerServiceMBean { + + private static final Logger LOG = LoggerFactory.getLogger(SortSdkSource.class); + public static final String SORT_SDK_PREFIX = "sortsdk."; + private static final int CORE_POOL_SIZE = 1; + private static final SortClientConfig.ConsumeStrategy defaultStrategy = SortClientConfig.ConsumeStrategy.lastest; + private static final String KEY_SORT_SDK_CLIENT_NUM = "sortSdkClientNum"; + private static final int DEFAULT_SORT_SDK_CLIENT_NUM = 1; + private String taskName; + private SortSdkSourceContext context; + private String sortClusterName; + private long reloadInterval; + private ScheduledExecutorService pool; + + private List sortClients = new ArrayList<>(); + + @Override + public synchronized void start() { + int sortSdkClientNum = CommonPropertiesHolder.getInteger(KEY_SORT_SDK_CLIENT_NUM, DEFAULT_SORT_SDK_CLIENT_NUM); + LOG.info("start SortSdkSource:{}, client num is {}", taskName, sortSdkClientNum); + for (int i = 0; i < sortSdkClientNum; i++) { + SortClient client = this.newClient(taskName); + if (client != null) { + this.sortClients.add(client); + } + } + } + + @Override + public void stop() { + pool.shutdownNow(); + LOG.info("close sort client {}.", taskName); + for (SortClient sortClient : sortClients) { + sortClient.getConfig().setStopConsume(true); + sortClient.close(); + } + } + + @Override + public void run() { + LOG.info("start to reload SortSdkSource:{}", taskName); + for (SortClient sortClient : sortClients) { + sortClient.getConfig().setManagerApiUrl(ManagerUrlHandler.getSortSourceConfigUrl()); + } + } + + @Override + public void configure(Context context) { + this.taskName = context.getString(FlumeConfigGenerator.KEY_TASK_NAME); + this.context = new SortSdkSourceContext(getName(), context); + this.sortClusterName = SortConfigHolder.getSortConfig().getSortClusterName(); + this.reloadInterval = this.context.getReloadInterval(); + this.initReloadExecutor(); + // register + AdminServiceRegister.register(ConsumerServiceMBean.MBEAN_TYPE, taskName, this); + } + + private void initReloadExecutor() { + this.pool = Executors.newScheduledThreadPool(CORE_POOL_SIZE); + pool.scheduleAtFixedRate(this, reloadInterval, reloadInterval, TimeUnit.SECONDS); + } + + private SortClient newClient(final String sortTaskName) { + LOG.info("start a new sort client for task: {}", sortTaskName); + try { + final SortClientConfig clientConfig = new SortClientConfig(sortTaskName, this.sortClusterName, + new DefaultTopicChangeListener(), + SortSdkSource.defaultStrategy, InetAddress.getLocalHost().getHostAddress()); + final FetchCallback callback = FetchCallback.Factory.create(sortTaskName, getChannelProcessor(), context); + clientConfig.setCallback(callback); + Map sortSdkParams = this.getSortClientConfigParameters(); + clientConfig.setParameters(sortSdkParams); + + // create SortClient + String configType = CommonPropertiesHolder + .getString(SortSourceConfigType.KEY_TYPE, SortSourceConfigType.MANAGER.name()); + SortClient client = null; + if (SortClusterConfigType.FILE.name().equalsIgnoreCase(configType)) { + LOG.info("create sort sdk client in file way:{}", configType); + ClassResourceQueryConsumeConfig queryConfig = new ClassResourceQueryConsumeConfig(); + client = SortClientFactory.createSortClient(clientConfig, queryConfig); + } else if (SortClusterConfigType.MANAGER.name().equalsIgnoreCase(configType)) { + LOG.info("create sort sdk client in manager way:{}", configType); + clientConfig.setManagerApiUrl(ManagerUrlHandler.getSortSourceConfigUrl()); + client = SortClientFactory.createSortClient(clientConfig); + } else { + LOG.info("create sort sdk client in custom way:{}", configType); + Class loaderClass = ClassUtils.getClass(configType); + Object loaderObject = loaderClass.getDeclaredConstructor().newInstance(); + if (loaderObject instanceof Configurable) { + ((Configurable) loaderObject).configure(new Context(CommonPropertiesHolder.get())); + } + if (!(loaderObject instanceof QueryConsumeConfig)) { + LOG.error("got exception when create QueryConsumeConfig instance, config key:{},config class:{}", + SortSourceConfigType.KEY_TYPE, configType); + return null; + } + // if it specifies the type of QueryConsumeConfig. + client = SortClientFactory.createSortClient(clientConfig, (QueryConsumeConfig) loaderObject); + } + + client.init(); + callback.setClient(client); + return client; + } catch (Throwable th) { + LOG.error("got one throwable when init client of id:{}", sortTaskName, th); + } + return null; + } + + private Map getSortClientConfigParameters() { + Map commonParams = CommonPropertiesHolder.getContext().getSubProperties(SORT_SDK_PREFIX); + return new HashMap<>(commonParams); + } + + @Override + public void stopConsumer() { + for (SortClient sortClient : sortClients) { + sortClient.getConfig().setStopConsume(true); + } + } + + @Override + public void recoverConsumer() { + for (SortClient sortClient : sortClients) { + sortClient.getConfig().setStopConsume(false); + } + } +} diff --git a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestClsIdConfig.java b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestClsIdConfig.java index 270760f317f..bfeb0adde6c 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestClsIdConfig.java +++ b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestClsIdConfig.java @@ -23,6 +23,7 @@ import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.modules.junit4.PowerMockRunner; +import java.util.Arrays; import java.util.List; @RunWith(PowerMockRunner.class) @@ -32,8 +33,7 @@ public class TestClsIdConfig { @Test public void testGetFieldList() { ClsIdConfig idConfig = new ClsIdConfig(); - String testFieldName = "1 2 3 4 5 6 7"; - idConfig.setFieldNames(testFieldName); + idConfig.setFieldList(Arrays.asList("1", "2", "3", "4", "5", "6", "7")); List fieldList = idConfig.getFieldList(); Assert.assertEquals(7, fieldList.size()); } diff --git a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestDefaultEvent2LogItemHandler.java b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestDefaultEvent2LogItemHandler.java index 512a349190b..efdf3cb52cc 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestDefaultEvent2LogItemHandler.java +++ b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestDefaultEvent2LogItemHandler.java @@ -32,6 +32,7 @@ import org.powermock.modules.junit4.PowerMockRunner; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -69,7 +70,7 @@ public void testNormal() { private ClsIdConfig prepareIdConfig() { ClsIdConfig config = new ClsIdConfig(); - config.setFieldNames("f1 f2 f3 f4 f5 f6 f7 f8"); + config.setFieldList(Arrays.asList("f1", "f2", "f3", "f4", "f5", "f6", "f7", "f8")); config.setInlongGroupId("testGroup"); config.setInlongStreamId("testStream"); config.setSecretId("testSecretId"); From c5dbfabf224bee26373562e3e2987dd029b7eba9 Mon Sep 17 00:00:00 2001 From: XiaoYou201 <58425449+XiaoYou201@users.noreply.github.com> Date: Wed, 15 May 2024 16:02:14 +0800 Subject: [PATCH 15/15] [INLONG-10194][Sort] Sqlserver connector support audit ID (#10212) --- .../RowDataDebeziumDeserializeSchema.java | 674 ++++++++++++++++++ .../sort/sqlserver/SqlServerTableSource.java | 254 +++++++ .../sort/sqlserver/SqlserverTableFactory.java | 20 +- licenses/inlong-sort-connectors/LICENSE | 5 + 4 files changed, 951 insertions(+), 2 deletions(-) create mode 100644 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/RowDataDebeziumDeserializeSchema.java create mode 100644 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerTableSource.java diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/RowDataDebeziumDeserializeSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/RowDataDebeziumDeserializeSchema.java new file mode 100644 index 00000000000..210a55f7c50 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/RowDataDebeziumDeserializeSchema.java @@ -0,0 +1,674 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.sqlserver; + +import org.apache.inlong.sort.base.metric.MetricsCollector; +import org.apache.inlong.sort.base.metric.SourceMetricData; + +import com.ververica.cdc.debezium.DebeziumDeserializationSchema; +import com.ververica.cdc.debezium.table.AppendMetadataCollector; +import com.ververica.cdc.debezium.table.DebeziumChangelogMode; +import com.ververica.cdc.debezium.table.DeserializationRuntimeConverter; +import com.ververica.cdc.debezium.table.DeserializationRuntimeConverterFactory; +import com.ververica.cdc.debezium.table.MetadataConverter; +import com.ververica.cdc.debezium.utils.TemporalConversions; +import io.debezium.data.Envelope; +import io.debezium.data.SpecialValueDecimal; +import io.debezium.data.VariableScaleDecimal; +import io.debezium.time.MicroTime; +import io.debezium.time.MicroTimestamp; +import io.debezium.time.NanoTime; +import io.debezium.time.NanoTimestamp; +import io.debezium.time.Timestamp; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.Collector; +import org.apache.kafka.connect.data.Decimal; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.Optional; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Deserialization schema from Debezium object to Flink Table/SQL internal data structure {@link + * RowData}. + *

+ * Copy from com.ververica:flink-connector-debezium:2.3.0 + */ +public final class RowDataDebeziumDeserializeSchema implements DebeziumDeserializationSchema { + + private static final long serialVersionUID = 2L; + + /** Custom validator to validate the row value. */ + public interface ValueValidator extends Serializable { + + void validate(RowData rowData, RowKind rowKind) throws Exception; + } + + /** TypeInformation of the produced {@link RowData}. * */ + private final TypeInformation resultTypeInfo; + + /** + * Runtime converter that converts Kafka {@link SourceRecord}s into {@link RowData} consisted of + * physical column values. + */ + private final DeserializationRuntimeConverter physicalConverter; + + /** Whether the deserializer needs to handle metadata columns. */ + private final boolean hasMetadata; + + /** + * A wrapped output collector which is used to append metadata columns after physical columns. + */ + private final AppendMetadataCollector appendMetadataCollector; + + /** Validator to validate the row value. */ + private final ValueValidator validator; + + /** Changelog Mode to use for encoding changes in Flink internal data structure. */ + private final DebeziumChangelogMode changelogMode; + private final SourceMetricData sourceMetricData; + + /** Returns a builder to build {@link RowDataDebeziumDeserializeSchema}. */ + public static Builder newBuilder() { + return new Builder(); + } + + RowDataDebeziumDeserializeSchema( + RowType physicalDataType, + MetadataConverter[] metadataConverters, + TypeInformation resultTypeInfo, + ValueValidator validator, + ZoneId serverTimeZone, + DeserializationRuntimeConverterFactory userDefinedConverterFactory, + DebeziumChangelogMode changelogMode, + SourceMetricData sourceMetricData) { + this.hasMetadata = checkNotNull(metadataConverters).length > 0; + this.appendMetadataCollector = new AppendMetadataCollector(metadataConverters); + this.physicalConverter = + createConverter( + checkNotNull(physicalDataType), + serverTimeZone, + userDefinedConverterFactory); + this.resultTypeInfo = checkNotNull(resultTypeInfo); + this.validator = checkNotNull(validator); + this.changelogMode = checkNotNull(changelogMode); + this.sourceMetricData = sourceMetricData; + } + + @Override + public void deserialize(SourceRecord record, Collector out) throws Exception { + Envelope.Operation op = Envelope.operationFor(record); + Struct value = (Struct) record.value(); + Schema valueSchema = record.valueSchema(); + if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) { + GenericRowData insert = extractAfterRow(value, valueSchema); + validator.validate(insert, RowKind.INSERT); + insert.setRowKind(RowKind.INSERT); + if (sourceMetricData != null) { + out = new MetricsCollector<>(out, sourceMetricData); + } + emit(record, insert, out); + } else if (op == Envelope.Operation.DELETE) { + GenericRowData delete = extractBeforeRow(value, valueSchema); + validator.validate(delete, RowKind.DELETE); + delete.setRowKind(RowKind.DELETE); + emit(record, delete, out); + } else { + if (changelogMode == DebeziumChangelogMode.ALL) { + GenericRowData before = extractBeforeRow(value, valueSchema); + validator.validate(before, RowKind.UPDATE_BEFORE); + before.setRowKind(RowKind.UPDATE_BEFORE); + emit(record, before, out); + } + + GenericRowData after = extractAfterRow(value, valueSchema); + validator.validate(after, RowKind.UPDATE_AFTER); + after.setRowKind(RowKind.UPDATE_AFTER); + if (sourceMetricData != null) { + out = new MetricsCollector<>(out, sourceMetricData); + } + emit(record, after, out); + } + } + + private GenericRowData extractAfterRow(Struct value, Schema valueSchema) throws Exception { + Schema afterSchema = valueSchema.field(Envelope.FieldName.AFTER).schema(); + Struct after = value.getStruct(Envelope.FieldName.AFTER); + return (GenericRowData) physicalConverter.convert(after, afterSchema); + } + + private GenericRowData extractBeforeRow(Struct value, Schema valueSchema) throws Exception { + Schema beforeSchema = valueSchema.field(Envelope.FieldName.BEFORE).schema(); + Struct before = value.getStruct(Envelope.FieldName.BEFORE); + return (GenericRowData) physicalConverter.convert(before, beforeSchema); + } + + private void emit(SourceRecord inRecord, RowData physicalRow, Collector collector) { + if (!hasMetadata) { + collector.collect(physicalRow); + return; + } + appendMetadataCollector.inputRecord = inRecord; + appendMetadataCollector.outputCollector = collector; + appendMetadataCollector.collect(physicalRow); + } + + @Override + public TypeInformation getProducedType() { + return resultTypeInfo; + } + + // ------------------------------------------------------------------------------------- + // Builder + // ------------------------------------------------------------------------------------- + + /** Builder of {@link RowDataDebeziumDeserializeSchema}. */ + public static class Builder { + + private RowType physicalRowType; + private TypeInformation resultTypeInfo; + private MetadataConverter[] metadataConverters = new MetadataConverter[0]; + private final ValueValidator validator = (rowData, rowKind) -> { + }; + private ZoneId serverTimeZone = ZoneId.of("UTC"); + private DeserializationRuntimeConverterFactory userDefinedConverterFactory = + DeserializationRuntimeConverterFactory.DEFAULT; + private final DebeziumChangelogMode changelogMode = DebeziumChangelogMode.ALL; + private SourceMetricData sourceMetricData; + + public Builder setPhysicalRowType(RowType physicalRowType) { + this.physicalRowType = physicalRowType; + return this; + } + + public Builder setMetadataConverters(MetadataConverter[] metadataConverters) { + this.metadataConverters = metadataConverters; + return this; + } + + public Builder setResultTypeInfo(TypeInformation resultTypeInfo) { + this.resultTypeInfo = resultTypeInfo; + return this; + } + + public Builder setServerTimeZone(ZoneId serverTimeZone) { + this.serverTimeZone = serverTimeZone; + return this; + } + + public Builder setUserDefinedConverterFactory( + DeserializationRuntimeConverterFactory userDefinedConverterFactory) { + this.userDefinedConverterFactory = userDefinedConverterFactory; + return this; + } + + public Builder setSourceMetricData(SourceMetricData sourceMetricData) { + this.sourceMetricData = sourceMetricData; + return this; + } + + public RowDataDebeziumDeserializeSchema build() { + return new RowDataDebeziumDeserializeSchema( + physicalRowType, + metadataConverters, + resultTypeInfo, + validator, + serverTimeZone, + userDefinedConverterFactory, + changelogMode, + sourceMetricData); + } + } + + // ------------------------------------------------------------------------------------- + // Runtime Converters + // ------------------------------------------------------------------------------------- + + /** Creates a runtime converter which is null safe. */ + private static DeserializationRuntimeConverter createConverter( + LogicalType type, + ZoneId serverTimeZone, + DeserializationRuntimeConverterFactory userDefinedConverterFactory) { + return wrapIntoNullableConverter( + createNotNullConverter(type, serverTimeZone, userDefinedConverterFactory)); + } + + // -------------------------------------------------------------------------------- + // IMPORTANT! We use anonymous classes instead of lambdas for a reason here. It is + // necessary because the maven shade plugin cannot relocate classes in + // SerializedLambdas (MSHADE-260). + // -------------------------------------------------------------------------------- + + /** Creates a runtime converter which assuming input object is not null. */ + public static DeserializationRuntimeConverter createNotNullConverter( + LogicalType type, + ZoneId serverTimeZone, + DeserializationRuntimeConverterFactory userDefinedConverterFactory) { + // user defined converter has a higher resolve order + Optional converter = + userDefinedConverterFactory.createUserDefinedConverter(type, serverTimeZone); + if (converter.isPresent()) { + return converter.get(); + } + + // if no matched user defined converter, fallback to the default converter + switch (type.getTypeRoot()) { + case NULL: + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + return null; + } + }; + case BOOLEAN: + return convertToBoolean(); + case TINYINT: + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + return Byte.parseByte(dbzObj.toString()); + } + }; + case SMALLINT: + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + return Short.parseShort(dbzObj.toString()); + } + }; + case INTEGER: + case INTERVAL_YEAR_MONTH: + return convertToInt(); + case BIGINT: + case INTERVAL_DAY_TIME: + return convertToLong(); + case DATE: + return convertToDate(); + case TIME_WITHOUT_TIME_ZONE: + return convertToTime(); + case TIMESTAMP_WITHOUT_TIME_ZONE: + return convertToTimestamp(serverTimeZone); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return convertToLocalTimeZoneTimestamp(serverTimeZone); + case FLOAT: + return convertToFloat(); + case DOUBLE: + return convertToDouble(); + case CHAR: + case VARCHAR: + return convertToString(); + case BINARY: + case VARBINARY: + return convertToBinary(); + case DECIMAL: + return createDecimalConverter((DecimalType) type); + case ROW: + return createRowConverter( + (RowType) type, serverTimeZone, userDefinedConverterFactory); + case ARRAY: + case MAP: + case MULTISET: + case RAW: + default: + throw new UnsupportedOperationException("Unsupported type: " + type); + } + } + + private static DeserializationRuntimeConverter convertToBoolean() { + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + if (dbzObj instanceof Boolean) { + return dbzObj; + } else if (dbzObj instanceof Byte) { + return (byte) dbzObj == 1; + } else if (dbzObj instanceof Short) { + return (short) dbzObj == 1; + } else { + return Boolean.parseBoolean(dbzObj.toString()); + } + } + }; + } + + private static DeserializationRuntimeConverter convertToInt() { + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + if (dbzObj instanceof Integer) { + return dbzObj; + } else if (dbzObj instanceof Long) { + return ((Long) dbzObj).intValue(); + } else { + return Integer.parseInt(dbzObj.toString()); + } + } + }; + } + + private static DeserializationRuntimeConverter convertToLong() { + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + if (dbzObj instanceof Integer) { + return ((Integer) dbzObj).longValue(); + } else if (dbzObj instanceof Long) { + return dbzObj; + } else { + return Long.parseLong(dbzObj.toString()); + } + } + }; + } + + private static DeserializationRuntimeConverter convertToDouble() { + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + if (dbzObj instanceof Float) { + return ((Float) dbzObj).doubleValue(); + } else if (dbzObj instanceof Double) { + return dbzObj; + } else { + return Double.parseDouble(dbzObj.toString()); + } + } + }; + } + + private static DeserializationRuntimeConverter convertToFloat() { + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + if (dbzObj instanceof Float) { + return dbzObj; + } else if (dbzObj instanceof Double) { + return ((Double) dbzObj).floatValue(); + } else { + return Float.parseFloat(dbzObj.toString()); + } + } + }; + } + + private static DeserializationRuntimeConverter convertToDate() { + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + return (int) TemporalConversions.toLocalDate(dbzObj).toEpochDay(); + } + }; + } + + private static DeserializationRuntimeConverter convertToTime() { + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + if (dbzObj instanceof Long) { + switch (schema.name()) { + case MicroTime.SCHEMA_NAME: + return (int) ((long) dbzObj / 1000); + case NanoTime.SCHEMA_NAME: + return (int) ((long) dbzObj / 1000_000); + } + } else if (dbzObj instanceof Integer) { + return dbzObj; + } + // get number of milliseconds of the day + return TemporalConversions.toLocalTime(dbzObj).toSecondOfDay() * 1000; + } + }; + } + + private static DeserializationRuntimeConverter convertToTimestamp(ZoneId serverTimeZone) { + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + if (dbzObj instanceof Long) { + switch (schema.name()) { + case Timestamp.SCHEMA_NAME: + return TimestampData.fromEpochMillis((Long) dbzObj); + case MicroTimestamp.SCHEMA_NAME: + long micro = (long) dbzObj; + return TimestampData.fromEpochMillis( + micro / 1000, (int) (micro % 1000 * 1000)); + case NanoTimestamp.SCHEMA_NAME: + long nano = (long) dbzObj; + return TimestampData.fromEpochMillis( + nano / 1000_000, (int) (nano % 1000_000)); + } + } + LocalDateTime localDateTime = + TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone); + return TimestampData.fromLocalDateTime(localDateTime); + } + }; + } + + private static DeserializationRuntimeConverter convertToLocalTimeZoneTimestamp( + ZoneId serverTimeZone) { + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + if (dbzObj instanceof String) { + String str = (String) dbzObj; + // TIMESTAMP_LTZ type is encoded in string type + Instant instant = Instant.parse(str); + return TimestampData.fromLocalDateTime( + LocalDateTime.ofInstant(instant, serverTimeZone)); + } + throw new IllegalArgumentException( + "Unable to convert to TimestampData from unexpected value '" + + dbzObj + + "' of type " + + dbzObj.getClass().getName()); + } + }; + } + + private static DeserializationRuntimeConverter convertToString() { + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + return StringData.fromString(dbzObj.toString()); + } + }; + } + + private static DeserializationRuntimeConverter convertToBinary() { + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + if (dbzObj instanceof byte[]) { + return dbzObj; + } else if (dbzObj instanceof ByteBuffer) { + ByteBuffer byteBuffer = (ByteBuffer) dbzObj; + byte[] bytes = new byte[byteBuffer.remaining()]; + byteBuffer.get(bytes); + return bytes; + } else { + throw new UnsupportedOperationException( + "Unsupported BYTES value type: " + dbzObj.getClass().getSimpleName()); + } + } + }; + } + + private static DeserializationRuntimeConverter createDecimalConverter(DecimalType decimalType) { + final int precision = decimalType.getPrecision(); + final int scale = decimalType.getScale(); + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + BigDecimal bigDecimal; + if (dbzObj instanceof byte[]) { + // decimal.handling.mode=precise + bigDecimal = Decimal.toLogical(schema, (byte[]) dbzObj); + } else if (dbzObj instanceof String) { + // decimal.handling.mode=string + bigDecimal = new BigDecimal((String) dbzObj); + } else if (dbzObj instanceof Double) { + // decimal.handling.mode=double + bigDecimal = BigDecimal.valueOf((Double) dbzObj); + } else { + if (VariableScaleDecimal.LOGICAL_NAME.equals(schema.name())) { + SpecialValueDecimal decimal = + VariableScaleDecimal.toLogical((Struct) dbzObj); + bigDecimal = decimal.getDecimalValue().orElse(BigDecimal.ZERO); + } else { + // fallback to string + bigDecimal = new BigDecimal(dbzObj.toString()); + } + } + return DecimalData.fromBigDecimal(bigDecimal, precision, scale); + } + }; + } + + private static DeserializationRuntimeConverter createRowConverter( + RowType rowType, + ZoneId serverTimeZone, + DeserializationRuntimeConverterFactory userDefinedConverterFactory) { + final DeserializationRuntimeConverter[] fieldConverters = + rowType.getFields().stream() + .map(RowType.RowField::getType) + .map( + logicType -> createConverter( + logicType, + serverTimeZone, + userDefinedConverterFactory)) + .toArray(DeserializationRuntimeConverter[]::new); + final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]); + + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) throws Exception { + Struct struct = (Struct) dbzObj; + int arity = fieldNames.length; + GenericRowData row = new GenericRowData(arity); + for (int i = 0; i < arity; i++) { + String fieldName = fieldNames[i]; + Field field = schema.field(fieldName); + if (field == null) { + row.setField(i, null); + } else { + Object fieldValue = struct.getWithoutDefault(fieldName); + Schema fieldSchema = schema.field(fieldName).schema(); + Object convertedField = + convertField(fieldConverters[i], fieldValue, fieldSchema); + row.setField(i, convertedField); + } + } + return row; + } + }; + } + + private static Object convertField( + DeserializationRuntimeConverter fieldConverter, Object fieldValue, Schema fieldSchema) + throws Exception { + if (fieldValue == null) { + return null; + } else { + return fieldConverter.convert(fieldValue, fieldSchema); + } + } + + private static DeserializationRuntimeConverter wrapIntoNullableConverter( + DeserializationRuntimeConverter converter) { + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) throws Exception { + if (dbzObj == null) { + return null; + } + return converter.convert(dbzObj, schema); + } + }; + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerTableSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerTableSource.java new file mode 100644 index 00000000000..635ea465292 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerTableSource.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.sqlserver; + +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.SourceMetricData; + +import com.ververica.cdc.connectors.sqlserver.SqlServerSource; +import com.ververica.cdc.connectors.sqlserver.table.SqlServerDeserializationConverterFactory; +import com.ververica.cdc.connectors.sqlserver.table.SqlServerReadableMetadata; +import com.ververica.cdc.connectors.sqlserver.table.StartupOptions; +import com.ververica.cdc.debezium.DebeziumDeserializationSchema; +import com.ververica.cdc.debezium.DebeziumSourceFunction; +import com.ververica.cdc.debezium.table.MetadataConverter; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.connector.source.SourceFunctionProvider; +import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.RowKind; + +import java.time.ZoneId; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A {@link DynamicTableSource} that describes how to create a SqlServer source from a logical + * description. + *

+ * Copy from com.ververica:flink-connector-sqlserver-cdc:2.3.0 + */ +public class SqlServerTableSource implements ScanTableSource, SupportsReadingMetadata { + + private final ResolvedSchema physicalSchema; + private final int port; + private final String hostname; + private final String database; + private final String schemaName; + private final String tableName; + private final ZoneId serverTimeZone; + private final String username; + private final String password; + private final Properties dbzProperties; + private final StartupOptions startupOptions; + + // -------------------------------------------------------------------------------------------- + // Mutable attributes + // -------------------------------------------------------------------------------------------- + + /** Data type that describes the final output of the source. */ + protected DataType producedDataType; + + /** Metadata that is appended at the end of a physical source row. */ + protected List metadataKeys; + private final MetricOption metricOption; + + public SqlServerTableSource( + ResolvedSchema physicalSchema, + int port, + String hostname, + String database, + String schemaName, + String tableName, + ZoneId serverTimeZone, + String username, + String password, + Properties dbzProperties, + StartupOptions startupOptions, + MetricOption metricOption) { + this.physicalSchema = physicalSchema; + this.port = port; + this.hostname = checkNotNull(hostname); + this.database = checkNotNull(database); + this.schemaName = checkNotNull(schemaName); + this.tableName = checkNotNull(tableName); + this.serverTimeZone = serverTimeZone; + this.username = checkNotNull(username); + this.password = checkNotNull(password); + this.dbzProperties = dbzProperties; + this.producedDataType = physicalSchema.toPhysicalRowDataType(); + this.metadataKeys = Collections.emptyList(); + this.startupOptions = startupOptions; + this.metricOption = metricOption; + } + + @Override + public ChangelogMode getChangelogMode() { + return ChangelogMode.newBuilder() + .addContainedKind(RowKind.INSERT) + .addContainedKind(RowKind.UPDATE_BEFORE) + .addContainedKind(RowKind.UPDATE_AFTER) + .addContainedKind(RowKind.DELETE) + .build(); + } + + @Override + public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { + RowType physicalDataType = + (RowType) physicalSchema.toPhysicalRowDataType().getLogicalType(); + MetadataConverter[] metadataConverters = getMetadataConverters(); + TypeInformation typeInfo = scanContext.createTypeInformation(producedDataType); + + DebeziumDeserializationSchema deserializer = + RowDataDebeziumDeserializeSchema.newBuilder() + .setPhysicalRowType(physicalDataType) + .setMetadataConverters(metadataConverters) + .setResultTypeInfo(typeInfo) + .setServerTimeZone(serverTimeZone) + .setUserDefinedConverterFactory( + SqlServerDeserializationConverterFactory.instance()) + .setSourceMetricData(metricOption == null ? null : new SourceMetricData(metricOption)) + .build(); + DebeziumSourceFunction sourceFunction = + SqlServerSource.builder() + .hostname(hostname) + .port(port) + .database(database) + .tableList(schemaName + "." + tableName) + .username(username) + .password(password) + .debeziumProperties(dbzProperties) + .startupOptions(startupOptions) + .deserializer(deserializer) + .build(); + return SourceFunctionProvider.of(sourceFunction, false); + } + + private MetadataConverter[] getMetadataConverters() { + if (metadataKeys.isEmpty()) { + return new MetadataConverter[0]; + } + + return metadataKeys.stream() + .map( + key -> Stream.of(SqlServerReadableMetadata.values()) + .filter(m -> m.getKey().equals(key)) + .findFirst() + .orElseThrow(IllegalStateException::new)) + .map(SqlServerReadableMetadata::getConverter) + .toArray(MetadataConverter[]::new); + } + + @Override + public DynamicTableSource copy() { + SqlServerTableSource source = + new SqlServerTableSource( + physicalSchema, + port, + hostname, + database, + schemaName, + tableName, + serverTimeZone, + username, + password, + dbzProperties, + startupOptions, + metricOption); + source.metadataKeys = metadataKeys; + source.producedDataType = producedDataType; + return source; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SqlServerTableSource that = (SqlServerTableSource) o; + return port == that.port + && Objects.equals(physicalSchema, that.physicalSchema) + && Objects.equals(hostname, that.hostname) + && Objects.equals(database, that.database) + && Objects.equals(schemaName, that.schemaName) + && Objects.equals(tableName, that.tableName) + && Objects.equals(serverTimeZone, that.serverTimeZone) + && Objects.equals(username, that.username) + && Objects.equals(password, that.password) + && Objects.equals(dbzProperties, that.dbzProperties) + && Objects.equals(startupOptions, that.startupOptions) + && Objects.equals(producedDataType, that.producedDataType) + && Objects.equals(metadataKeys, that.metadataKeys) + && Objects.equals(metricOption, that.metricOption); + } + + @Override + public int hashCode() { + return Objects.hash( + physicalSchema, + port, + hostname, + database, + schemaName, + tableName, + serverTimeZone, + username, + password, + dbzProperties, + startupOptions, + producedDataType, + metadataKeys, + metricOption); + } + + @Override + public String asSummaryString() { + return "SqlServer-CDC"; + } + + @Override + public Map listReadableMetadata() { + return Stream.of(SqlServerReadableMetadata.values()) + .collect( + Collectors.toMap( + SqlServerReadableMetadata::getKey, + SqlServerReadableMetadata::getDataType)); + } + + @Override + public void applyReadableMetadata(List metadataKeys, DataType producedDataType) { + this.metadataKeys = metadataKeys; + this.producedDataType = producedDataType; + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlserverTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlserverTableFactory.java index bab85117095..dfa28d63760 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlserverTableFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlserverTableFactory.java @@ -17,7 +17,8 @@ package org.apache.inlong.sort.sqlserver; -import com.ververica.cdc.connectors.sqlserver.table.SqlServerTableSource; +import org.apache.inlong.sort.base.metric.MetricOption; + import com.ververica.cdc.connectors.sqlserver.table.StartupOptions; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; @@ -35,6 +36,7 @@ import static com.ververica.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX; import static com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties; import static com.ververica.cdc.debezium.utils.ResolvedSchemaUtils.getPhysicalSchema; +import static org.apache.inlong.sort.base.Constants.*; /** Factory for creating configured instance of {@link com.ververica.cdc.connectors.sqlserver.SqlServerSource}. */ public class SqlserverTableFactory implements DynamicTableSourceFactory { @@ -122,6 +124,9 @@ public Set> optionalOptions() { options.add(PORT); options.add(SERVER_TIME_ZONE); options.add(SCAN_STARTUP_MODE); + options.add(INLONG_METRIC); + options.add(INLONG_AUDIT); + options.add(AUDIT_KEYS); return options; } @@ -144,6 +149,16 @@ public DynamicTableSource createDynamicTableSource(Context context) { ResolvedSchema physicalSchema = getPhysicalSchema(context.getCatalogTable().getResolvedSchema()); + String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null); + String auditHostAndPorts = config.get(INLONG_AUDIT); + String auditKeys = config.get(AUDIT_KEYS); + + MetricOption metricOption = MetricOption.builder() + .withInlongLabels(inlongMetric) + .withAuditAddress(auditHostAndPorts) + .withAuditKeys(auditKeys) + .build(); + return new SqlServerTableSource( physicalSchema, port, @@ -155,7 +170,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { username, password, getDebeziumProperties(context.getCatalogTable().getOptions()), - getStartupOptions(config)); + getStartupOptions(config), + metricOption); } private static final String SCAN_STARTUP_MODE_VALUE_INITIAL = "initial"; diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE index 4c8eca64913..970ef97ce6b 100644 --- a/licenses/inlong-sort-connectors/LICENSE +++ b/licenses/inlong-sort-connectors/LICENSE @@ -850,6 +850,11 @@ Source : org.apache.flink:flink-connector-pulsar:4.1.0-1.18 (Please note that the software have been modified.) License : https://github.com/apache/flink-connector-pulsar/blob/main/LICENSE +1.3.24 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/RowDataDebeziumDeserializeSchema.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerTableSource.java +Source : com.ververica:flink-connector-mongodb-cdc:2.3.0 (Please note that the software have been modified.) +License : https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE + ======================================================================= Apache InLong Subcomponents: