diff --git a/build.gradle b/build.gradle index 63e9301a43..3d2f8604a3 100644 --- a/build.gradle +++ b/build.gradle @@ -210,6 +210,48 @@ tasks.register('dist') { } } +tasks.register('dist-admin') { + subprojects.forEach { subProject -> + dependsOn("${subProject.path}:jar") + } + def includedProjects = + [ + "eventmesh-admin-server", + "eventmesh-common", + "eventmesh-spi", + "eventmesh-registry:eventmesh-registry-api", + "eventmesh-registry:eventmesh-registry-nacos", + "eventmesh-openconnect:eventmesh-openconnect-offsetmgmt-plugin:eventmesh-openconnect-offsetmgmt-api" + ] + doLast { + includedProjects.each { + def subProject = findProject(it) + copy { + from subProject.jar.archivePath + into rootProject.file('dist/apps') + } + copy { + from subProject.configurations.runtimeClasspath + into rootProject.file('dist/lib') + exclude 'eventmesh-*' + } + copy { + from subProject.file('bin') + into rootProject.file('dist/bin') + } + copy { + from subProject.file('conf') + from subProject.sourceSets.main.resources.srcDirs + into rootProject.file('dist/conf') + duplicatesStrategy = DuplicatesStrategy.EXCLUDE + exclude 'META-INF' + } + + } + } + +} + tasks.register('installPlugin') { var pluginProjects = subprojects.findAll { it.file('gradle.properties').exists() @@ -754,11 +796,12 @@ subprojects { dependency "software.amazon.awssdk:s3:2.26.3" dependency "com.github.rholder:guava-retrying:2.0.0" - dependency "org.mybatis.spring.boot:mybatis-spring-boot-starter:2.3.2" dependency "com.alibaba:druid-spring-boot-starter:1.2.23" + dependency "com.baomidou:mybatis-plus-boot-starter:3.5.5" dependency "org.springframework.boot:spring-boot-starter-jetty:2.7.18" - dependency "com.baomidou:mybatis-plus:3.5.7" dependency "com.mysql:mysql-connector-j:8.4.0" + dependency "org.springframework.boot:spring-boot-starter-jetty:2.7.10" + dependency "org.locationtech.jts:jts-core:1.19.0" } } } diff --git a/eventmesh-admin-server/build.gradle b/eventmesh-admin-server/build.gradle index 6f91f48001..6de881725a 100644 --- a/eventmesh-admin-server/build.gradle +++ b/eventmesh-admin-server/build.gradle @@ -31,18 +31,25 @@ dependencies { implementation "io.grpc:grpc-stub" implementation "io.grpc:grpc-netty-shaded" - // https://mvnrepository.com/artifact/com.baomidou/mybatis-plus-boot-starter - implementation 'com.baomidou:mybatis-plus-boot-starter:3.5.7' - implementation "org.reflections:reflections:0.10.2" + // https://mvnrepository.com/artifact/com.baomidou/mybatis-plus-boot-starter + implementation "com.baomidou:mybatis-plus-boot-starter" - // https://mvnrepository.com/artifact/com.alibaba/druid-spring-boot-starter - implementation "com.alibaba:druid-spring-boot-starter" - compileOnly 'com.mysql:mysql-connector-j' - compileOnly 'org.projectlombok:lombok' - annotationProcessor 'org.projectlombok:lombok' + // https://mvnrepository.com/artifact/com.alibaba/druid-spring-boot-starter + implementation "com.alibaba:druid-spring-boot-starter" + implementation 'com.mysql:mysql-connector-j' + compileOnly 'org.projectlombok:lombok' + annotationProcessor 'org.projectlombok:lombok' } configurations.implementation { exclude group: "org.springframework.boot", module: "spring-boot-starter-logging" } +sourceSets { + main { + resources { + srcDirs = ['src/main/resources', 'conf'] + } + } +} + diff --git a/eventmesh-admin-server/src/main/resources/application.yaml b/eventmesh-admin-server/conf/application.yaml similarity index 100% rename from eventmesh-admin-server/src/main/resources/application.yaml rename to eventmesh-admin-server/conf/application.yaml diff --git a/eventmesh-admin-server/src/main/resources/eventmesh-admin.properties b/eventmesh-admin-server/conf/eventmesh-admin.properties similarity index 100% rename from eventmesh-admin-server/src/main/resources/eventmesh-admin.properties rename to eventmesh-admin-server/conf/eventmesh-admin.properties diff --git a/eventmesh-admin-server/src/main/resources/eventmesh.sql b/eventmesh-admin-server/conf/eventmesh.sql similarity index 100% rename from eventmesh-admin-server/src/main/resources/eventmesh.sql rename to eventmesh-admin-server/conf/eventmesh.sql diff --git a/eventmesh-admin-server/conf/log4j2.xml b/eventmesh-admin-server/conf/log4j2.xml new file mode 100644 index 0000000000..6341a0e629 --- /dev/null +++ b/eventmesh-admin-server/conf/log4j2.xml @@ -0,0 +1,108 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/eventmesh-admin-server/src/main/resources/mapper/EventMeshDataSourceMapper.xml b/eventmesh-admin-server/conf/mapper/EventMeshDataSourceMapper.xml similarity index 100% rename from eventmesh-admin-server/src/main/resources/mapper/EventMeshDataSourceMapper.xml rename to eventmesh-admin-server/conf/mapper/EventMeshDataSourceMapper.xml diff --git a/eventmesh-admin-server/src/main/resources/mapper/EventMeshJobInfoMapper.xml b/eventmesh-admin-server/conf/mapper/EventMeshJobInfoMapper.xml similarity index 100% rename from eventmesh-admin-server/src/main/resources/mapper/EventMeshJobInfoMapper.xml rename to eventmesh-admin-server/conf/mapper/EventMeshJobInfoMapper.xml diff --git a/eventmesh-admin-server/src/main/resources/mapper/EventMeshMysqlPositionMapper.xml b/eventmesh-admin-server/conf/mapper/EventMeshMysqlPositionMapper.xml similarity index 100% rename from eventmesh-admin-server/src/main/resources/mapper/EventMeshMysqlPositionMapper.xml rename to eventmesh-admin-server/conf/mapper/EventMeshMysqlPositionMapper.xml diff --git a/eventmesh-admin-server/src/main/resources/mapper/EventMeshPositionReporterHistoryMapper.xml b/eventmesh-admin-server/conf/mapper/EventMeshPositionReporterHistoryMapper.xml similarity index 100% rename from eventmesh-admin-server/src/main/resources/mapper/EventMeshPositionReporterHistoryMapper.xml rename to eventmesh-admin-server/conf/mapper/EventMeshPositionReporterHistoryMapper.xml diff --git a/eventmesh-admin-server/src/main/resources/mapper/EventMeshRuntimeHeartbeatMapper.xml b/eventmesh-admin-server/conf/mapper/EventMeshRuntimeHeartbeatMapper.xml similarity index 100% rename from eventmesh-admin-server/src/main/resources/mapper/EventMeshRuntimeHeartbeatMapper.xml rename to eventmesh-admin-server/conf/mapper/EventMeshRuntimeHeartbeatMapper.xml diff --git a/eventmesh-admin-server/src/main/resources/mapper/EventMeshRuntimeHistoryMapper.xml b/eventmesh-admin-server/conf/mapper/EventMeshRuntimeHistoryMapper.xml similarity index 100% rename from eventmesh-admin-server/src/main/resources/mapper/EventMeshRuntimeHistoryMapper.xml rename to eventmesh-admin-server/conf/mapper/EventMeshRuntimeHistoryMapper.xml diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/Admin.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/Admin.java index 71c6d67be2..9ee25fadb2 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/Admin.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/Admin.java @@ -17,6 +17,7 @@ package org.apache.eventmesh.admin.server; +import org.apache.eventmesh.common.ComponentLifeCycle; import org.apache.eventmesh.common.remote.Task; import org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest; import org.apache.eventmesh.common.utils.PagedList; diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/AdminServer.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/AdminServer.java index 98247d19b6..a2e4cc7063 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/AdminServer.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/AdminServer.java @@ -102,7 +102,7 @@ public void start() { } @Override - public void destroy() { + public void stop() { if (configuration.isEventMeshRegistryPluginEnabled()) { registryService.unRegister(adminServeInfo); try { diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/ExampleAdminServer.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/ExampleAdminServer.java index c6a6e16504..7f5fa22dda 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/ExampleAdminServer.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/ExampleAdminServer.java @@ -23,7 +23,7 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -@SpringBootApplication +@SpringBootApplication() public class ExampleAdminServer { public static void main(String[] args) throws Exception { diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/BaseServer.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/BaseServer.java index 24085dd89e..9bbe4ce305 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/BaseServer.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/BaseServer.java @@ -17,7 +17,7 @@ package org.apache.eventmesh.admin.server.web; -import org.apache.eventmesh.admin.server.ComponentLifeCycle; +import org.apache.eventmesh.common.ComponentLifeCycle; import org.apache.eventmesh.common.remote.payload.PayloadFactory; import javax.annotation.PostConstruct; @@ -40,9 +40,9 @@ public void init() throws Exception { } @PreDestroy - public void shutdown() { + public void shutdown() throws Exception { log.info("[{}] server will destroy", this.getClass().getSimpleName()); - destroy(); + stop(); log.info("[{}] server has be destroy", this.getClass().getSimpleName()); } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/GrpcServer.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/GrpcServer.java index 5fbb34f489..572e07a21d 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/GrpcServer.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/GrpcServer.java @@ -52,7 +52,7 @@ public void start() throws Exception { } @Override - public void destroy() { + public void stop() { try { if (server != null) { server.shutdown(); diff --git a/eventmesh-common/build.gradle b/eventmesh-common/build.gradle index 70244d2299..c95e9f6c29 100644 --- a/eventmesh-common/build.gradle +++ b/eventmesh-common/build.gradle @@ -48,6 +48,7 @@ dependencies { implementation "org.apache.httpcomponents:httpclient" implementation "io.netty:netty-all" + compileOnly 'com.mysql:mysql-connector-j' implementation "io.grpc:grpc-protobuf:${grpcVersion}" implementation "io.grpc:grpc-stub:${grpcVersion}" diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/AbstractComponent.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/AbstractComponent.java new file mode 100644 index 0000000000..375b6cb1d3 --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/AbstractComponent.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common; + +import java.util.concurrent.atomic.AtomicBoolean; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public abstract class AbstractComponent implements ComponentLifeCycle { + private final AtomicBoolean started = new AtomicBoolean(false); + private final AtomicBoolean stopped = new AtomicBoolean(false); + + @Override + public void start() throws Exception { + if (!started.compareAndSet(false, true)) { + log.info("component [{}] has started", this.getClass()); + return; + } + log.info("component [{}] will start", this.getClass()); + run(); + log.info("component [{}] started successfully", this.getClass()); + } + + @Override + public void stop() throws Exception { + if (!stopped.compareAndSet(false, true)) { + log.info("component [{}] has stopped", this.getClass()); + return; + } + log.info("component [{}] will stop", this.getClass()); + shutdown(); + log.info("component [{}] stopped successfully", this.getClass()); + } + + protected abstract void run() throws Exception; + + protected abstract void shutdown() throws Exception; +} diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/ComponentLifeCycle.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/ComponentLifeCycle.java similarity index 89% rename from eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/ComponentLifeCycle.java rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/ComponentLifeCycle.java index 392eebfbba..76fdd548d0 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/ComponentLifeCycle.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/ComponentLifeCycle.java @@ -15,14 +15,14 @@ * limitations under the License. */ -package org.apache.eventmesh.admin.server; +package org.apache.eventmesh.common; /** - * adminServer ComponentLifeCycle + * LifeCycle of EventMesh Component */ public interface ComponentLifeCycle { void start() throws Exception; - void destroy(); + void stop() throws Exception; } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/JdbcConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/JdbcConfig.java new file mode 100644 index 0000000000..fc784fc187 --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/JdbcConfig.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.config.connector.rdb; + +import org.apache.eventmesh.common.config.connector.rdb.canal.RdbDBDefinition; + +import java.util.Set; + +import lombok.Data; + +@Data +public class JdbcConfig { + private String url; + + private String dbAddress; + + private int dbPort; + + private String userName; + + private String passWord; + + private Set databases; +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalMySQLType.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalMySQLType.java new file mode 100644 index 0000000000..b5107ccbf3 --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalMySQLType.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.config.connector.rdb.canal; + +import java.util.HashMap; +import java.util.Map; + +import com.mysql.cj.MysqlType; + +public enum CanalMySQLType { + BIT("BIT"), + TINYINT("TINYINT"), + SMALLINT("SMALLINT"), + MEDIUMINT("MEDIUMINT"), + INT("INT"), + BIGINT("BIGINT"), + DECIMAL("DECIMAL"), + FLOAT("FLOAT"), + DOUBLE("DOUBLE"), + DATE("DATE"), + DATETIME("DATETIME"), + TIMESTAMP("TIMESTAMP"), + TIME("TIME"), + YEAR("YEAR"), + CHAR("CHAR"), + VARCHAR("VARCHAR"), + BINARY("BINARY"), + VARBINARY("VARBINARY"), + TINYBLOB("TINYBLOB"), + BLOB("BLOB"), + MEDIUMBLOB("MEDIUMBLOB"), + LONGBLOB("LONGBLOB"), + TINYTEXT("TINYTEXT"), + TEXT("TEXT"), + MEDIUMTEXT("MEDIUMTEXT"), + LONGTEXT("LONGTEXT"), + ENUM("ENUM"), + SET("SET"), + JSON("JSON"), + GEOMETRY("GEOMETRY"), + // MysqlType not include the following type + POINT("POINT"), + LINESTRING("LINESTRING"), + POLYGON("POLYGON"), + MULTIPOINT("MULTIPOINT"), + GEOMETRY_COLLECTION("GEOMETRYCOLLECTION"), + GEOM_COLLECTION("GEOMCOLLECTION"), + MULTILINESTRING("MULTILINESTRING"), + MULTIPOLYGON("MULTIPOLYGON"); + + private final String codeKey; + private final MysqlType mysqlType; + + CanalMySQLType(String codeKey) { + this.codeKey = codeKey; + this.mysqlType = MysqlType.getByName(codeKey); + } + + private static final Map TYPES = new HashMap<>(); + + static { + CanalMySQLType[] values = values(); + for (CanalMySQLType tableType : values) { + TYPES.put(tableType.codeKey, tableType); + } + } + + public String genPrepareStatement4Insert() { + switch (this) { + case GEOMETRY: + case GEOM_COLLECTION: + case GEOMETRY_COLLECTION: + return "ST_GEOMFROMTEXT(?)"; + case POINT: + return "ST_PointFromText(?)"; + case LINESTRING: + return "ST_LineStringFromText(?)"; + case POLYGON: + return "ST_PolygonFromText(?)"; + case MULTIPOINT: + return "ST_MultiPointFromText(?)"; + case MULTILINESTRING: + return "ST_MultiLineStringFromText(?)"; + case MULTIPOLYGON: + return "ST_MultiPolygonFromText(?)"; + default: + return "?"; + } + } + + public static CanalMySQLType valueOfCode(String code) { + CanalMySQLType type = TYPES.get(code.toUpperCase()); + if (type != null) { + return type; + } + switch (MysqlType.getByName(code)) { + case BOOLEAN: + case TINYINT: + case TINYINT_UNSIGNED: + return TINYINT; + case SMALLINT: + case SMALLINT_UNSIGNED: + return SMALLINT; + case INT: + case INT_UNSIGNED: + return INT; + case BIGINT: + case BIGINT_UNSIGNED: + return BIGINT; + case MEDIUMINT: + case MEDIUMINT_UNSIGNED: + return MEDIUMINT; + case DECIMAL: + case DECIMAL_UNSIGNED: + return DECIMAL; + case FLOAT: + case FLOAT_UNSIGNED: + return FLOAT; + case DOUBLE: + case DOUBLE_UNSIGNED: + return DOUBLE; + case BIT: + return BIT; + case BINARY: + return BINARY; + case VARBINARY: + return VARBINARY; + case TINYBLOB: + return TINYBLOB; + case MEDIUMBLOB: + return MEDIUMBLOB; + case LONGBLOB: + return LONGBLOB; + case BLOB: + return BLOB; + case CHAR: + return CHAR; + case VARCHAR: + return VARCHAR; + case TINYTEXT: + return TINYTEXT; + case MEDIUMTEXT: + return MEDIUMTEXT; + case LONGTEXT: + return LONGTEXT; + case TEXT: + return TEXT; + case DATE: + return DATE; + case TIME: + return TIME; + case TIMESTAMP: + return TIMESTAMP; + case DATETIME: + return DATETIME; + case YEAR: + return YEAR; + case JSON: + return JSON; + case ENUM: + return ENUM; + case SET: + return SET; + case GEOMETRY: + return GEOMETRY; + case NULL: + case UNKNOWN: + default: + throw new UnsupportedOperationException("Unsupported mysql columnType " + code); + } + } + + public MysqlType getMysqlType() { + return mysqlType; + } +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkFullConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkFullConfig.java new file mode 100644 index 0000000000..c2b881df6c --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkFullConfig.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.config.connector.rdb.canal; + +import org.apache.eventmesh.common.config.connector.SinkConfig; + +import lombok.Data; +import lombok.EqualsAndHashCode; + + +@Data +@EqualsAndHashCode(callSuper = true) +public class CanalSinkFullConfig extends SinkConfig { + private SinkConnectorConfig sinkConfig; + private String zeroDate; +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceFullConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceFullConfig.java new file mode 100644 index 0000000000..a2ab8ba31d --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceFullConfig.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.config.connector.rdb.canal; + +import org.apache.eventmesh.common.config.connector.SourceConfig; +import org.apache.eventmesh.common.remote.offset.RecordPosition; + +import java.util.List; + +import lombok.Data; +import lombok.EqualsAndHashCode; + +@Data +@EqualsAndHashCode(callSuper = true) +public class CanalSourceFullConfig extends SourceConfig { + private SourceConnectorConfig connectorConfig; + private List startPosition; + private int parallel; + private int flushSize; +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/JobRdbFullPosition.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/JobRdbFullPosition.java new file mode 100644 index 0000000000..08f88e1d24 --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/JobRdbFullPosition.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.config.connector.rdb.canal; + +import java.math.BigDecimal; + +import lombok.Data; +import lombok.ToString; + +@Data +@ToString +public class JobRdbFullPosition { + private String jobId; + private String schema; + private String tableName; + private String primaryKeyRecords; + private long maxCount; + private boolean finished; + private BigDecimal percent; +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbColumnDefinition.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbColumnDefinition.java new file mode 100644 index 0000000000..94c0135c3e --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbColumnDefinition.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.config.connector.rdb.canal; + +import java.sql.JDBCType; + +import lombok.Data; + +@Data +public class RdbColumnDefinition { + protected String name; + protected JDBCType jdbcType; +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbDBDefinition.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbDBDefinition.java new file mode 100644 index 0000000000..ab3ed336f8 --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbDBDefinition.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.config.connector.rdb.canal; + +import java.util.Set; + +import lombok.Data; + +/** + * Description: as class name + */ +@Data +public class RdbDBDefinition { + private String schemaName; + private Set tables; +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbTableDefinition.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbTableDefinition.java new file mode 100644 index 0000000000..c281035578 --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbTableDefinition.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.config.connector.rdb.canal; + +import lombok.Data; + +/** + * Description: as class name + */ +@Data +public class RdbTableDefinition { + protected String schemaName; + protected String tableName; +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/SinkConnectorConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/SinkConnectorConfig.java index 1124bb1425..761cdba4bb 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/SinkConnectorConfig.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/SinkConnectorConfig.java @@ -17,27 +17,16 @@ package org.apache.eventmesh.common.config.connector.rdb.canal; +import org.apache.eventmesh.common.config.connector.rdb.JdbcConfig; + import lombok.Data; +import lombok.EqualsAndHashCode; /** * Configuration parameters for a sink connector. */ @Data -public class SinkConnectorConfig { - +@EqualsAndHashCode(callSuper = true) +public class SinkConnectorConfig extends JdbcConfig { private String connectorName; - - private String url; - - private String dbAddress; - - private int dbPort; - - private String userName; - - private String passWord; - - private String schemaName; - - private String tableName; } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/SourceConnectorConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/SourceConnectorConfig.java index e9ae466079..9a95696a0d 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/SourceConnectorConfig.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/SourceConnectorConfig.java @@ -17,28 +17,16 @@ package org.apache.eventmesh.common.config.connector.rdb.canal; +import org.apache.eventmesh.common.config.connector.rdb.JdbcConfig; + import lombok.Data; +import lombok.EqualsAndHashCode; /** * Represents the configuration for a database connector. */ @Data -public class SourceConnectorConfig { - +@EqualsAndHashCode(callSuper = true) +public class SourceConnectorConfig extends JdbcConfig { private String connectorName; - - private String url; - - private String dbAddress; - - private int dbPort; - - private String userName; - - private String passWord; - - private String schemaName; - - private String tableName; - } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/mysql/Constants.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/mysql/Constants.java new file mode 100644 index 0000000000..8c51c7255b --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/mysql/Constants.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.config.connector.rdb.canal.mysql; + +public class Constants { + public static final String MySQLQuot = "`"; +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/mysql/MySQLColumnDef.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/mysql/MySQLColumnDef.java new file mode 100644 index 0000000000..cdc9adf33f --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/mysql/MySQLColumnDef.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.config.connector.rdb.canal.mysql; + +import org.apache.eventmesh.common.config.connector.rdb.canal.CanalMySQLType; +import org.apache.eventmesh.common.config.connector.rdb.canal.RdbColumnDefinition; + +import lombok.Data; +import lombok.EqualsAndHashCode; + +@Data +@EqualsAndHashCode(callSuper = true) +public class MySQLColumnDef extends RdbColumnDefinition { + private CanalMySQLType type; +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/mysql/MySQLTableDef.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/mysql/MySQLTableDef.java new file mode 100644 index 0000000000..cdd3652378 --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/mysql/MySQLTableDef.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.config.connector.rdb.canal.mysql; + +import org.apache.eventmesh.common.config.connector.rdb.canal.RdbTableDefinition; + +import java.util.Map; +import java.util.Set; + +import lombok.Data; +import lombok.EqualsAndHashCode; + +/** + * Description: + */ +@Data +@EqualsAndHashCode(callSuper = true) +public class MySQLTableDef extends RdbTableDefinition { + private Set primaryKeys; + private Map columnDefinitions; +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/RecordPosition.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/RecordPosition.java index c3c2d5dd7a..5f45390b73 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/RecordPosition.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/RecordPosition.java @@ -19,6 +19,8 @@ import org.apache.eventmesh.common.remote.offset.S3.S3RecordOffset; import org.apache.eventmesh.common.remote.offset.S3.S3RecordPartition; +import org.apache.eventmesh.common.remote.offset.canal.CanalFullRecordOffset; +import org.apache.eventmesh.common.remote.offset.canal.CanalFullRecordPartition; import org.apache.eventmesh.common.remote.offset.canal.CanalRecordOffset; import org.apache.eventmesh.common.remote.offset.canal.CanalRecordPartition; import org.apache.eventmesh.common.remote.offset.file.FileRecordOffset; @@ -40,6 +42,7 @@ public class RecordPosition { @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS) @JsonSubTypes({ @JsonSubTypes.Type(value = CanalRecordPartition.class, name = "CanalRecordPartition"), + @JsonSubTypes.Type(value = CanalFullRecordPartition.class, name = "CanalFullRecordPartition"), @JsonSubTypes.Type(value = FileRecordPartition.class, name = "FileRecordPartition"), @JsonSubTypes.Type(value = S3RecordPartition.class, name = "S3RecordPartition"), @JsonSubTypes.Type(value = KafkaRecordPartition.class, name = "KafkaRecordPartition"), @@ -53,6 +56,7 @@ public class RecordPosition { @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS) @JsonSubTypes({ @JsonSubTypes.Type(value = CanalRecordOffset.class, name = "CanalRecordOffset"), + @JsonSubTypes.Type(value = CanalFullRecordOffset.class, name = "CanalFullRecordOffset"), @JsonSubTypes.Type(value = FileRecordOffset.class, name = "FileRecordOffset"), @JsonSubTypes.Type(value = S3RecordOffset.class, name = "S3RecordOffset"), @JsonSubTypes.Type(value = KafkaRecordOffset.class, name = "KafkaRecordOffset"), diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalFullRecordOffset.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalFullRecordOffset.java new file mode 100644 index 0000000000..a0a077b7f5 --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalFullRecordOffset.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.remote.offset.canal; + +import org.apache.eventmesh.common.config.connector.rdb.canal.JobRdbFullPosition; +import org.apache.eventmesh.common.remote.offset.RecordOffset; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +@Data +@EqualsAndHashCode(callSuper = true) +@ToString +public class CanalFullRecordOffset extends RecordOffset { + private JobRdbFullPosition position; + + @Override + public Class getRecordOffsetClass() { + return CanalFullRecordOffset.class; + } +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalFullRecordPartition.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalFullRecordPartition.java new file mode 100644 index 0000000000..73626fa78f --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalFullRecordPartition.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.remote.offset.canal; + +import org.apache.eventmesh.common.remote.offset.RecordPartition; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; + + +@Data +@ToString +@EqualsAndHashCode(callSuper = true) +public class CanalFullRecordPartition extends RecordPartition { + + @Override + public Class getRecordPartitionClass() { + return CanalFullRecordPartition.class; + } +} diff --git a/eventmesh-connectors/eventmesh-connector-canal/build.gradle b/eventmesh-connectors/eventmesh-connector-canal/build.gradle index ccc5acf0ca..134af8ed3e 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/build.gradle +++ b/eventmesh-connectors/eventmesh-connector-canal/build.gradle @@ -23,9 +23,11 @@ List canal = [ dependencies { api project(":eventmesh-openconnect:eventmesh-openconnect-java") + implementation "org.locationtech.jts:jts-core" implementation project(":eventmesh-common") implementation canal - implementation "com.alibaba:druid:1.2.23" + implementation "com.alibaba:druid" + implementation 'com.mysql:mysql-connector-j' compileOnly 'org.projectlombok:lombok' annotationProcessor 'org.projectlombok:lombok' testImplementation "org.mockito:mockito-core" diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/DatabaseConnection.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/DatabaseConnection.java index 0d9da7f8be..0310e5434c 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/DatabaseConnection.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/DatabaseConnection.java @@ -18,8 +18,8 @@ package org.apache.eventmesh.connector.canal; -import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSinkConfig; -import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSourceConfig; +import org.apache.eventmesh.common.config.connector.rdb.canal.SinkConnectorConfig; +import org.apache.eventmesh.common.config.connector.rdb.canal.SourceConnectorConfig; import java.sql.Connection; import java.sql.SQLException; @@ -32,46 +32,40 @@ public class DatabaseConnection { public static DruidDataSource sinkDataSource; - public static CanalSourceConfig sourceConfig; + public static SourceConnectorConfig sourceConfig; - public static CanalSinkConfig sinkConfig; + public static SinkConnectorConfig sinkConfig; + + public static DruidDataSource createDruidDataSource(String url, String userName, String passWord) { + DruidDataSource dataSource = new DruidDataSource(); + dataSource.setUrl(url); + dataSource.setUsername(userName); + dataSource.setPassword(passWord); + dataSource.setInitialSize(5); + dataSource.setMinIdle(5); + dataSource.setMaxActive(20); + dataSource.setMaxWait(60000); + dataSource.setTimeBetweenEvictionRunsMillis(60000); + dataSource.setMinEvictableIdleTimeMillis(300000); + dataSource.setValidationQuery("SELECT 1"); + dataSource.setTestWhileIdle(true); + dataSource.setTestOnBorrow(false); + dataSource.setTestOnReturn(false); + dataSource.setPoolPreparedStatements(true); + dataSource.setMaxPoolPreparedStatementPerConnectionSize(20); + return dataSource; + } public static void initSourceConnection() { - sourceDataSource = new DruidDataSource(); - sourceDataSource.setUrl(sourceConfig.getSourceConnectorConfig().getUrl()); - sourceDataSource.setUsername(sourceConfig.getSourceConnectorConfig().getUserName()); - sourceDataSource.setPassword(sourceConfig.getSourceConnectorConfig().getPassWord()); - sourceDataSource.setInitialSize(5); - sourceDataSource.setMinIdle(5); - sourceDataSource.setMaxActive(20); - sourceDataSource.setMaxWait(60000); - sourceDataSource.setTimeBetweenEvictionRunsMillis(60000); - sourceDataSource.setMinEvictableIdleTimeMillis(300000); - sourceDataSource.setValidationQuery("SELECT 1"); - sourceDataSource.setTestWhileIdle(true); - sourceDataSource.setTestOnBorrow(false); - sourceDataSource.setTestOnReturn(false); - sourceDataSource.setPoolPreparedStatements(true); - sourceDataSource.setMaxPoolPreparedStatementPerConnectionSize(20); + sourceDataSource = createDruidDataSource(sourceConfig.getUrl(), + sourceConfig.getUserName(), + sourceConfig.getPassWord()); } public static void initSinkConnection() { - sinkDataSource = new DruidDataSource(); - sinkDataSource.setUrl(sinkConfig.getSinkConnectorConfig().getUrl()); - sinkDataSource.setUsername(sinkConfig.getSinkConnectorConfig().getUserName()); - sinkDataSource.setPassword(sinkConfig.getSinkConnectorConfig().getPassWord()); - sinkDataSource.setInitialSize(5); - sinkDataSource.setMinIdle(5); - sinkDataSource.setMaxActive(20); - sinkDataSource.setMaxWait(60000); - sinkDataSource.setTimeBetweenEvictionRunsMillis(60000); - sinkDataSource.setMinEvictableIdleTimeMillis(300000); - sinkDataSource.setValidationQuery("SELECT 1"); - sinkDataSource.setTestWhileIdle(true); - sinkDataSource.setTestOnBorrow(false); - sinkDataSource.setTestOnReturn(false); - sinkDataSource.setPoolPreparedStatements(true); - sinkDataSource.setMaxPoolPreparedStatementPerConnectionSize(20); + sinkDataSource = createDruidDataSource(sinkConfig.getUrl(), + sinkConfig.getUserName(), + sinkConfig.getPassWord()); } diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/SqlUtils.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/SqlUtils.java index f6c4329e23..1008ad1cf3 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/SqlUtils.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/SqlUtils.java @@ -30,25 +30,45 @@ import java.sql.Blob; import java.sql.Clob; import java.sql.Date; +import java.sql.JDBCType; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Time; import java.sql.Timestamp; import java.sql.Types; +import java.time.DateTimeException; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.ZoneId; +import java.time.temporal.Temporal; import java.util.HashMap; +import java.util.List; import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.locationtech.jts.geom.GeometryFactory; +import org.locationtech.jts.io.WKBReader; +import org.locationtech.jts.io.WKTReader; + +import com.mysql.cj.Constants; +import com.mysql.cj.MysqlType; +import com.taobao.tddl.dbsync.binlog.LogBuffer; public class SqlUtils { public static final String REQUIRED_FIELD_NULL_SUBSTITUTE = " "; - public static final String SQLDATE_FORMAT = "yyyy-MM-dd"; - public static final String TIMESTAMP_FORMAT = "yyyy-MM-dd HH:mm:ss"; private static final Map> sqlTypeToJavaTypeMap = new HashMap>(); private static final ConvertUtilsBean convertUtilsBean = new ConvertUtilsBean(); - private static final Logger log = LoggerFactory.getLogger(SqlUtils.class); + private static final GeometryFactory GEOMETRY_FACTORY = new GeometryFactory(); + private static final WKBReader WKB_READER = new WKBReader(GEOMETRY_FACTORY); + private static final BigDecimal NANO_SEC = new BigDecimal(LogBuffer.DIG_BASE); + private static final LocalDateTime BASE = LocalDateTime.of(1970, 1, 1, 0, 0, 0, 0); + private static final long ONE_HOUR = 3600; + private static final long ONE_MINUTE = 60; static { // regist Converter @@ -109,6 +129,29 @@ public class SqlUtils { sqlTypeToJavaTypeMap.put(Types.CLOB, String.class); } + public static String genPrepareSqlOfInClause(int size) { + StringBuilder sql = new StringBuilder(); + sql.append("("); + for (int i = 0; i < size; i++) { + sql.append("?"); + if (i < size - 1) { + sql.append(","); + } + } + sql.append(")"); + return sql.toString(); + } + + public static void setInClauseParameters(PreparedStatement preparedStatement, List params) throws SQLException { + setInClauseParameters(preparedStatement, 0, params); + } + + public static void setInClauseParameters(PreparedStatement preparedStatement, int paramIndexStart, List params) throws SQLException { + for (int i = 0; i < params.size(); i++) { + preparedStatement.setString(paramIndexStart + i, params.get(i)); + } + } + public static String sqlValueToString(ResultSet rs, int index, int sqlType) throws SQLException { Class requiredType = sqlTypeToJavaTypeMap.get(sqlType); if (requiredType == null) { @@ -217,8 +260,7 @@ private static String getResultSetValue(ResultSet rs, int index, Class requir } else if (float.class.equals(requiredType) || Float.class.equals(requiredType)) { value = rs.getFloat(index); wasNullCheck = true; - } else if (double.class.equals(requiredType) || Double.class.equals(requiredType) - || Number.class.equals(requiredType)) { + } else if (double.class.equals(requiredType) || Double.class.equals(requiredType) || Number.class.equals(requiredType)) { value = rs.getDouble(index); wasNullCheck = true; } else if (Time.class.equals(requiredType)) { @@ -282,15 +324,598 @@ private static String getResultSetValue(ResultSet rs, int index) throws SQLExcep * Check whether the given SQL type is numeric. */ public static boolean isNumeric(int sqlType) { - return (Types.BIT == sqlType) || (Types.BIGINT == sqlType) || (Types.DECIMAL == sqlType) - || (Types.DOUBLE == sqlType) || (Types.FLOAT == sqlType) || (Types.INTEGER == sqlType) - || (Types.NUMERIC == sqlType) || (Types.REAL == sqlType) || (Types.SMALLINT == sqlType) - || (Types.TINYINT == sqlType); + return (Types.BIT == sqlType) || (Types.BIGINT == sqlType) || (Types.DECIMAL == sqlType) || (Types.DOUBLE == sqlType) + || (Types.FLOAT == sqlType) || (Types.INTEGER == sqlType) || (Types.NUMERIC == sqlType) || (Types.REAL == sqlType) + || (Types.SMALLINT == sqlType) || (Types.TINYINT == sqlType); } public static boolean isTextType(int sqlType) { - return sqlType == Types.CHAR || sqlType == Types.VARCHAR || sqlType == Types.CLOB || sqlType == Types.LONGVARCHAR - || sqlType == Types.NCHAR || sqlType == Types.NVARCHAR || sqlType == Types.NCLOB - || sqlType == Types.LONGNVARCHAR; + return sqlType == Types.CHAR || sqlType == Types.VARCHAR || sqlType == Types.CLOB || sqlType == Types.LONGVARCHAR || sqlType == Types.NCHAR + || sqlType == Types.NVARCHAR || sqlType == Types.NCLOB || sqlType == Types.LONGNVARCHAR; + } + + public static JDBCType toJDBCType(String connectorDataType) { + MysqlType mysqlType = MysqlType.getByName(connectorDataType); + return JDBCType.valueOf(mysqlType.getJdbcType()); + } + + public static BigDecimal toBigDecimal(Object value) { + if (value == null) { + return null; + } + if (value instanceof String) { + String strValue = (String) value; + if (!org.apache.commons.lang3.StringUtils.isNotBlank(strValue)) { + return null; + } + try { + return new BigDecimal(strValue); + } catch (Exception e) { + if ("true".equals(strValue)) { + return BigDecimal.ONE; + } + if ("false".equals(strValue)) { + return BigDecimal.ZERO; + } + return new BigDecimal(strValue); + } + } else if (value instanceof Number) { + if (value instanceof BigDecimal) { + return (BigDecimal) value; + } + if (value instanceof Integer) { + return BigDecimal.valueOf(((Integer) value).longValue()); + } + if (value instanceof Long) { + return BigDecimal.valueOf(((Long) value)); + } + if (value instanceof Double) { + return BigDecimal.valueOf(((Double) value)); + } + if (value instanceof Float) { + return BigDecimal.valueOf(((Float) value).doubleValue()); + } + if (value instanceof BigInteger) { + return new BigDecimal((BigInteger) value); + } + if (value instanceof Byte) { + return BigDecimal.valueOf(((Byte) value).longValue()); + } + if (value instanceof Short) { + return BigDecimal.valueOf(((Short) value).longValue()); + } + return null; + } else if (value instanceof Boolean) { + return Boolean.TRUE.equals(value) ? BigDecimal.ONE : BigDecimal.ZERO; + } else { + throw new UnsupportedOperationException("class " + value.getClass() + ", value '" + value + "' , parse to big decimal failed."); + } + } + + public static Double toDouble(Object value) { + if (value == null) { + return null; + } + if (value instanceof String) { + String strValue = (String) value; + if (org.apache.commons.lang3.StringUtils.isBlank(strValue)) { + return null; + } + try { + return Double.parseDouble(strValue); + } catch (Exception e) { + if ("true".equals(strValue)) { + return 1.0d; + } + if ("false".equals(strValue)) { + return 0.0d; + } + return new BigDecimal(strValue).doubleValue(); + } + } else if (value instanceof Number) { + return ((Number) value).doubleValue(); + } else { + if (value instanceof Boolean) { + return Boolean.TRUE.equals(value) ? 1.0d : 0.0d; + } + throw new UnsupportedOperationException("class " + value.getClass() + ", value '" + value + "' , parse to double failed."); + } + } + + public static Long toLong(Object value) { + if (value == null) { + return null; + } + if (value instanceof String) { + String strValue = (String) value; + if (org.apache.commons.lang3.StringUtils.isBlank(strValue)) { + return null; + } + try { + return Long.parseLong(strValue); + } catch (Exception e) { + try { + return Long.decode(strValue); + } catch (Exception e2) { + if ("true".equals(strValue)) { + return 1L; + } + if ("false".equals(strValue)) { + return 0L; + } + return new BigDecimal(strValue).longValue(); + } + } + } else if (value instanceof Number) { + return ((Number) value).longValue(); + } else { + if (value instanceof Boolean) { + return Boolean.TRUE.equals(value) ? 1L : 0L; + } + throw new UnsupportedOperationException(value.getClass() + ", value '" + value + "' , parse to long failed."); + } + } + + public static boolean isZeroTime(Object value) { + if (value == null || org.apache.commons.lang3.StringUtils.isBlank(value.toString())) { + return false; + } + return value.toString().startsWith("0000-00-00"); + } + + public static String removeZone(String datetime) { + if (datetime == null || datetime.length() == 0) { + return datetime; + } + int len = datetime.length(); + if (datetime.charAt(len - 1) == 'Z' || datetime.charAt(len - 1) == 'z') { + return datetime.substring(0, len - 1).trim(); + } + if (len >= 7) { + char checkCharAt1 = datetime.charAt(len - 2); + if ((checkCharAt1 == '+' || checkCharAt1 == '-') && len >= 10) { + return datetime.substring(0, len - 2).trim(); + } + char checkCharAt2 = datetime.charAt(len - 3); + if ((checkCharAt2 == '+' || checkCharAt2 == '-') && len >= 11) { + return datetime.substring(0, len - 3).trim(); + } + char checkCharAt3 = datetime.charAt(len - 6); + if ((checkCharAt3 == '+' || checkCharAt3 == '-') && checkCharAt2 == ':') { + return datetime.substring(0, len - 6).trim(); + } + char checkCharAt4 = datetime.charAt(len - 5); + if ((checkCharAt4 == '+' || checkCharAt4 == '-') && checkCharAt2 == ':') { + return datetime.substring(0, len - 5).trim(); + } + char checkCharAt5 = len >= 9 ? datetime.charAt(len - 9) : ' '; + if ((checkCharAt5 == '+' || checkCharAt5 == '-') && checkCharAt2 == ':' && checkCharAt3 == ':') { + return datetime.substring(0, len - 9).trim(); + } + char checkCharAt6 = datetime.charAt(len - 7); + if (checkCharAt6 == '+' || checkCharAt6 == '-') { + return datetime.substring(0, len - 7).trim(); + } + if (checkCharAt4 == '+' || checkCharAt4 == '-') { + return datetime.substring(0, len - 5).trim(); + } + } + return datetime; + } + + + + public static String bytes2hex(byte[] b) { + if (b == null) { + return null; + } + if (b.length == 0) { + return ""; + } + StringBuilder hs = new StringBuilder(); + for (byte element : b) { + String stmp = Integer.toHexString(element & 255).toUpperCase(); + if (stmp.length() == 1) { + hs.append(Constants.CJ_MINOR_VERSION); + hs.append(stmp); + } else { + hs.append(stmp); + } + } + return hs.toString(); + } + + public static String convertToString(Object value) { + if (value == null) { + return null; + } + if (value instanceof String) { + return (String) value; + } + if (value instanceof BigInteger) { + return value.toString(); + } + if (value instanceof BigDecimal) { + return ((BigDecimal) value).toPlainString(); + } + if (value instanceof Number) { + return new BigDecimal(value.toString()).toPlainString(); + } + if (value instanceof Boolean) { + return Boolean.TRUE.equals(value) ? "1" : "0"; + } + if (value instanceof byte[]) { + return "0x" + bytes2hex((byte[]) value); + } + if (value instanceof Timestamp) { + long nanos = ((Timestamp) value).getNanos(); + value = Instant.ofEpochMilli(((Timestamp) value).getTime() - (nanos / 1000000)).plusNanos(nanos).atZone(ZoneId.systemDefault()) + .toLocalDateTime(); + } else if (value instanceof Date) { + value = ((Date) value).toLocalDate().atTime(0, 0); + } else if (value instanceof Time) { + value = LocalDateTime.of(LocalDate.of(1970, 1, 1), + Instant.ofEpochMilli(((Time) value).getTime()).atZone(ZoneId.systemDefault()).toLocalTime()); + } else if (value instanceof java.util.Date) { + value = ((java.util.Date) value).toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime(); + } + if (value instanceof LocalDateTime) { + return coverLocalDateTime2String((LocalDateTime) value); + } else if (value instanceof OffsetDateTime) { + OffsetDateTime zone = (OffsetDateTime) value; + String datetimeStr = coverLocalDateTime2String(zone.toLocalDateTime()); + String zonedStr = zone.getOffset().toString(); + if ("Z".equals(zonedStr)) { + return datetimeStr + "+00:00"; + } + return datetimeStr + zonedStr; + } else if (!(value instanceof LocalTime)) { + return value.toString(); + } else { + LocalTime local3 = (LocalTime) value; + return String.format("%02d:%02d:%02d", local3.getHour(), local3.getMinute(), local3.getSecond()); + } + } + + + private static String coverLocalDateTime2String(LocalDateTime localDateTime) { + LocalDate localDate = localDateTime.toLocalDate(); + LocalTime localTime = localDateTime.toLocalTime(); + int year = localDate.getYear(); + int month = localDate.getMonthValue(); + int day = localDate.getDayOfMonth(); + int hour = localTime.getHour(); + int minute = localTime.getMinute(); + int second = localTime.getSecond(); + int nano = localTime.getNano(); + return nano == 0 ? String.format("%04d-%02d-%02d %02d:%02d:%02d", year, month, day, hour, minute, second) : + String.format("%04d-%02d-%02d %02d:%02d:%02d.%s", year, month, day, hour, minute, second, + new BigDecimal(nano).divide(NANO_SEC).toPlainString().substring(2)); + } + + public static String toMySqlTime(Object value) { + if (value == null || StringUtils.isBlank(value.toString())) { + return null; + } + if (value instanceof String) { + return value.toString(); + } + LocalDateTime localTime = toLocalDateTime(value); + if (BASE.isBefore(localTime) || BASE.isEqual(localTime)) { + long diffHours = Duration.between(BASE, localTime).toHours(); + if (localTime.getNano() == 0) { + return String.format("%02d:%02d:%02d", diffHours, localTime.getMinute(), localTime.getSecond()); + } + return String.format("%02d:%02d:%02d.%s", diffHours, localTime.getMinute(), localTime.getSecond(), + Integer.parseInt(trimEnd(String.valueOf(localTime.getNano()), '0'))); + } + Duration duration = Duration.between(localTime, BASE); + long totalSecond = duration.getSeconds(); + long hours = totalSecond / ONE_HOUR; + long remaining = totalSecond - (hours * ONE_HOUR); + long minutes = remaining / ONE_MINUTE; + remaining = remaining - (minutes * ONE_MINUTE); + if (duration.getNano() == 0) { + return String.format("-%02d:%02d:%02d", hours, minutes, remaining); + } + return String.format("-%02d:%02d:%02d.%s", hours, minutes, remaining, Integer.parseInt(trimEnd(String.valueOf(duration.getNano()), '0'))); + } + + public static String trimEnd(String str, char trimChar) { + if (str == null || str.isEmpty()) { + return str; + } + char[] val = str.toCharArray(); + int len = val.length; + while (0 < len && val[len - 1] == trimChar) { + len--; + } + return len < val.length ? str.substring(0, len) : str; + } + + public static byte[] numberToBinaryArray(Number number) { + BigInteger bigInt = BigInteger.valueOf(number.longValue()); + int size = (bigInt.bitLength() + 7) / 8; + byte[] result = new byte[size]; + byte[] bigIntBytes = bigInt.toByteArray(); + int start = bigInt.bitLength() % 8 == 0 ? 1 : 0; + int length = Math.min(bigIntBytes.length - start, size); + System.arraycopy(bigIntBytes, start, result, size - length, length); + return result; + } + + public static Integer toInt(Object value) { + if (value == null) { + return null; + } + if (value instanceof String) { + String strValue = ((String) value).toLowerCase(); + if (StringUtils.isBlank(strValue)) { + return null; + } + try { + return Integer.parseInt(strValue); + } catch (Exception e) { + try { + return Integer.decode(strValue); + } catch (Exception e2) { + if ("true".equals(strValue)) { + return 1; + } + if ("false".equals(strValue)) { + return 0; + } + return new BigDecimal(strValue).intValue(); + } + } + } else if (value instanceof Number) { + return ((Number) value).intValue(); + } else { + if (value instanceof Boolean) { + return Boolean.TRUE.equals(value) ? 1 : 0; + } + throw new UnsupportedOperationException("class " + value.getClass() + ", value '" + value + "' , parse to int failed."); + } + } + + private static LocalDateTime toLocalDateTime(String value) { + if (value.trim().length() >= 4) { + String dateStr2 = removeZone(value); + int len = dateStr2.length(); + if (len == 4) { + return LocalDateTime.of(Integer.parseInt(dateStr2), 1, 1, 0, 0, 0, 0); + } + if (dateStr2.charAt(4) == '-') { + switch (len) { + case 7: + String[] dataParts = dateStr2.split("-"); + return LocalDateTime.of(Integer.parseInt(dataParts[0]), Integer.parseInt(dataParts[1]), 1, 0, 0, 0, 0); + case 8: + case 9: + case 11: + case 12: + case 14: + case 15: + case 17: + case 18: + default: + String[] dataTime = dateStr2.split(" "); + String[] dataParts2 = dataTime[0].split("-"); + String[] timeParts = dataTime[1].split(":"); + String[] secondParts = timeParts[2].split("\\."); + secondParts[1] = StringUtils.rightPad(secondParts[1], 9, Constants.CJ_MINOR_VERSION); + return LocalDateTime.of(Integer.parseInt(dataParts2[0]), Integer.parseInt(dataParts2[1]), Integer.parseInt(dataParts2[2]), + Integer.parseInt(timeParts[0]), Integer.parseInt(timeParts[1]), Integer.parseInt(secondParts[0]), + Integer.parseInt(secondParts[1])); + case 10: + String[] dataParts3 = dateStr2.split("-"); + return LocalDateTime.of(Integer.parseInt(dataParts3[0]), Integer.parseInt(dataParts3[1]), Integer.parseInt(dataParts3[2]), 0, + 0, 0, 0); + case 13: + String[] dataTime2 = dateStr2.split(" "); + String[] dataParts4 = dataTime2[0].split("-"); + return LocalDateTime.of(Integer.parseInt(dataParts4[0]), Integer.parseInt(dataParts4[1]), Integer.parseInt(dataParts4[2]), + Integer.parseInt(dataTime2[1]), 0, 0, 0); + case 16: + String[] dataTime3 = dateStr2.split(" "); + String[] dataParts5 = dataTime3[0].split("-"); + String[] timeParts2 = dataTime3[1].split(":"); + return LocalDateTime.of(Integer.parseInt(dataParts5[0]), Integer.parseInt(dataParts5[1]), Integer.parseInt(dataParts5[2]), + Integer.parseInt(timeParts2[0]), Integer.parseInt(timeParts2[1]), 0, 0); + case 19: + String[] dataTime4 = dateStr2.split(" "); + String[] dataParts6 = dataTime4[0].split("-"); + String[] timeParts3 = dataTime4[1].split(":"); + return LocalDateTime.of(Integer.parseInt(dataParts6[0]), Integer.parseInt(dataParts6[1]), Integer.parseInt(dataParts6[2]), + Integer.parseInt(timeParts3[0]), Integer.parseInt(timeParts3[1]), Integer.parseInt(timeParts3[2]), 0); + } + } else if (dateStr2.charAt(2) == ':') { + switch (len) { + case 5: + String[] timeParts4 = dateStr2.split(":"); + return LocalDateTime.of(0, 1, 1, Integer.parseInt(timeParts4[0]), Integer.parseInt(timeParts4[1]), 0, 0); + case 8: + String[] timeParts5 = dateStr2.split(":"); + return LocalDateTime.of(0, 1, 1, Integer.parseInt(timeParts5[0]), Integer.parseInt(timeParts5[1]), + Integer.parseInt(timeParts5[2]), 0); + default: + String[] timeParts6 = dateStr2.split(":"); + String[] secondParts2 = timeParts6[2].split("\\."); + secondParts2[1] = StringUtils.rightPad(secondParts2[1], 9, Constants.CJ_MINOR_VERSION); + return LocalDateTime.of(0, 1, 1, Integer.parseInt(timeParts6[0]), Integer.parseInt(timeParts6[1]), + Integer.parseInt(secondParts2[0]), Integer.parseInt(secondParts2[1])); + } + } else { + throw new UnsupportedOperationException(value.getClass() + ", value '" + value + "' , parse to local date time failed."); + } + } else if (StringUtils.isNumeric(value)) { + return LocalDateTime.of(Integer.parseInt(value), 1, 1, 0, 0, 0, 0); + } else { + throw new DateTimeException(value + " format error."); + } + } + + public static LocalDateTime toLocalDateTime(Object value) { + if (value == null || StringUtils.isBlank(value.toString())) { + return null; + } + if (value instanceof Temporal) { + if (value instanceof LocalDateTime) { + return (LocalDateTime) value; + } + if (value instanceof OffsetDateTime) { + return ((OffsetDateTime) value).toLocalDateTime(); + } + if (value instanceof LocalTime) { + return LocalDateTime.of(LocalDate.of(1970, 1, 1), (LocalTime) value); + } else if (value instanceof LocalDate) { + return LocalDateTime.of((LocalDate) value, LocalTime.of(0, 0)); + } else { + throw new UnsupportedOperationException(value.getClass() + ", value '" + value + "' , parse local date time failed."); + } + } else if (!(value instanceof java.util.Date)) { + return toLocalDateTime(value.toString()); + } else { + if (value instanceof Timestamp) { + long nanos = ((Timestamp) value).getNanos(); + return Instant.ofEpochMilli(((Timestamp) value).getTime() - (nanos / 1000000)).plusNanos(nanos).atZone(ZoneId.systemDefault()) + .toLocalDateTime(); + } else if (value instanceof java.sql.Date) { + return ((java.sql.Date) value).toLocalDate().atTime(0, 0); + } else { + if (!(value instanceof Time)) { + return ((java.util.Date) value).toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime(); + } + return LocalDateTime.of(LocalDate.of(1970, 1, 1), + Instant.ofEpochMilli(((Time) value).getTime()).atZone(ZoneId.systemDefault()).toLocalTime()); + } + } + } + + public static boolean isHexNumber(String str) { + boolean flag = true; + if (str.startsWith("0x") || str.startsWith("0X")) { + str = str.substring(2); + } + int i = 0; + while (true) { + if (i < str.length()) { + char cc = str.charAt(i); + if (cc != '0' && cc != '1' && cc != '2' && cc != '3' && cc != '4' && cc != '5' && cc != '6' && cc != '7' && cc != '8' && cc != '9' + && cc != 'A' && cc != 'B' && cc != 'C' && cc != 'D' && cc != 'E' && cc != 'F' && cc != 'a' && cc != 'b' && cc != 'c' && cc != 'd' + && cc != 'e' && cc != 'f') { + flag = false; + break; + } + i++; + } else { + break; + } + } + return flag; + } + + public static byte[] toBytes(Object value) { + if (value == null) { + return null; + } + if (value instanceof String) { + String strVal = (String) value; + if ((strVal.startsWith("0x") || strVal.startsWith("0X")) && isHexNumber(strVal)) { + return hex2bytes(strVal.substring(2)); + } + return ((String) value).getBytes(StandardCharsets.ISO_8859_1); + } else if (value instanceof byte[]) { + return (byte[]) value; + } else { + throw new UnsupportedOperationException("class " + value.getClass() + ", value '" + value + "' , parse to bytes failed."); + } + } + + public static String toGeometry(Object value) throws Exception { + if (value == null) { + return null; + } + if (value instanceof String) { + String strVal = (String) value; + if (!strVal.startsWith("0x") && !strVal.startsWith("0X")) { + return (String) value; + } + return new WKTReader().read((String) value).toText(); + } else if (value instanceof byte[]) { + // mysql add 4 byte in header of geometry + byte[] bytes = (byte[]) value; + if (bytes.length > 4) { + byte[] dst = new byte[bytes.length - 4]; + System.arraycopy(bytes, 4, dst, 0, bytes.length - 4); + return new WKBReader().read(dst).toText(); + } + return new WKBReader().read(bytes).toText(); + } else { + throw new UnsupportedOperationException("class " + value.getClass() + ", value '" + value + "' , " + "parse to geometry failed."); + } + } + + public static byte[] hex2bytes(String hexStr) { + if (hexStr == null) { + return null; + } + if (org.apache.commons.lang3.StringUtils.isBlank(hexStr)) { + return new byte[0]; + } + + if (hexStr.length() % 2 == 1) { + hexStr = "0" + hexStr; + } + + int count = hexStr.length() / 2; + byte[] ret = new byte[count]; + for (int i = 0; i < count; i++) { + int index = i * 2; + char c1 = hexStr.charAt(index); + char c2 = hexStr.charAt(index + 1); + ret[i] = (byte) (toByte(c1) << 4); + ret[i] = (byte) (ret[i] | toByte(c2)); + } + return ret; + } + + private static byte toByte(char src) { + switch (Character.toUpperCase(src)) { + case '0': + return 0; + case '1': + return 1; + case '2': + return 2; + case '3': + return 3; + case '4': + return 4; + case '5': + return 5; + case '6': + return 6; + case '7': + return 7; + case '8': + return 8; + case '9': + return 9; + case 'A': + return 10; + case 'B': + return 11; + case 'C': + return 12; + case 'D': + return 13; + case 'E': + return 14; + case 'F': + return 15; + default: + throw new IllegalStateException("0-F"); + } } } diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java index 1888e204ac..8f9df7595b 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java @@ -31,6 +31,7 @@ import org.apache.eventmesh.connector.canal.sink.DbLoadData; import org.apache.eventmesh.connector.canal.sink.DbLoadData.TableLoadData; import org.apache.eventmesh.connector.canal.sink.DbLoadMerger; +import org.apache.eventmesh.connector.canal.source.table.RdbTableMgr; import org.apache.eventmesh.openconnect.api.ConnectorCreateService; import org.apache.eventmesh.openconnect.api.connector.ConnectorContext; import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext; @@ -88,6 +89,7 @@ public class CanalSinkConnector implements Sink, ConnectorCreateService { private int batchSize = 50; private boolean useBatch = true; + private RdbTableMgr tableMgr; @Override public Class configClass() { @@ -107,12 +109,13 @@ public void init(ConnectorContext connectorContext) throws Exception { this.sinkConfig = (CanalSinkConfig) sinkConnectorContext.getSinkConfig(); this.batchSize = sinkConfig.getBatchSize(); this.useBatch = sinkConfig.getUseBatch(); - DatabaseConnection.sinkConfig = this.sinkConfig; + DatabaseConnection.sinkConfig = this.sinkConfig.getSinkConnectorConfig(); DatabaseConnection.initSinkConnection(); jdbcTemplate = new JdbcTemplate(DatabaseConnection.sinkDataSource); dbDialect = new MysqlDialect(jdbcTemplate, new DefaultLobHandler()); interceptor = new SqlBuilderLoadInterceptor(); interceptor.setDbDialect(dbDialect); + tableMgr = new RdbTableMgr(sinkConfig.getSinkConnectorConfig(), DatabaseConnection.sinkDataSource); executor = new ThreadPoolExecutor(sinkConfig.getPoolSize(), sinkConfig.getPoolSize(), 0L, @@ -124,7 +127,7 @@ public void init(ConnectorContext connectorContext) throws Exception { @Override public void start() throws Exception { - + tableMgr.start(); } @Override @@ -147,7 +150,7 @@ public void put(List sinkRecords) { DbLoadContext context = new DbLoadContext(); for (ConnectRecord connectRecord : sinkRecords) { List canalConnectRecordList = (List) connectRecord.getData(); - canalConnectRecordList = filterRecord(canalConnectRecordList, sinkConfig); + canalConnectRecordList = filterRecord(canalConnectRecordList); if (isDdlDatas(canalConnectRecordList)) { doDdl(context, canalConnectRecordList); } else { @@ -179,10 +182,9 @@ private boolean isDdlDatas(List canalConnectRecordList) { return result; } - private List filterRecord(List canalConnectRecordList, CanalSinkConfig sinkConfig) { + private List filterRecord(List canalConnectRecordList) { return canalConnectRecordList.stream() - .filter(record -> sinkConfig.getSinkConnectorConfig().getSchemaName().equalsIgnoreCase(record.getSchemaName()) - && sinkConfig.getSinkConnectorConfig().getTableName().equalsIgnoreCase(record.getTableName())) + .filter(record -> tableMgr.getTable(record.getSchemaName(), record.getTableName()) != null) .collect(Collectors.toList()); } diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkFullConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkFullConnector.java new file mode 100644 index 0000000000..36c03b156c --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkFullConnector.java @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.canal.sink.connector; + +import org.apache.eventmesh.common.config.connector.Config; +import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSinkFullConfig; +import org.apache.eventmesh.common.config.connector.rdb.canal.mysql.Constants; +import org.apache.eventmesh.common.config.connector.rdb.canal.mysql.MySQLColumnDef; +import org.apache.eventmesh.common.config.connector.rdb.canal.mysql.MySQLTableDef; +import org.apache.eventmesh.common.exception.EventMeshException; +import org.apache.eventmesh.common.remote.offset.canal.CanalFullRecordOffset; +import org.apache.eventmesh.connector.canal.DatabaseConnection; +import org.apache.eventmesh.connector.canal.SqlUtils; +import org.apache.eventmesh.connector.canal.source.table.RdbTableMgr; +import org.apache.eventmesh.openconnect.api.ConnectorCreateService; +import org.apache.eventmesh.openconnect.api.connector.ConnectorContext; +import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext; +import org.apache.eventmesh.openconnect.api.sink.Sink; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; + +import org.apache.commons.lang3.StringUtils; + +import java.math.BigDecimal; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Types; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.LockSupport; + +import com.alibaba.druid.pool.DruidPooledConnection; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class CanalSinkFullConnector implements Sink, ConnectorCreateService { + private CanalSinkFullConfig config; + private RdbTableMgr tableMgr; + private final DateTimeFormatter dataTimePattern = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS"); + + @Override + public void start() throws Exception { + tableMgr.start(); + } + + @Override + public void stop() throws Exception { + + } + + @Override + public Sink create() { + return new CanalSinkFullConnector(); + } + + @Override + public Class configClass() { + return CanalSinkFullConfig.class; + } + + @Override + public void init(Config config) throws Exception { + this.config = (CanalSinkFullConfig) config; + init(); + } + + @Override + public void init(ConnectorContext connectorContext) throws Exception { + this.config = (CanalSinkFullConfig) ((SinkConnectorContext) connectorContext).getSinkConfig(); + init(); + } + + private void init() { + if (config.getSinkConfig() == null) { + throw new EventMeshException(String.format("[%s] sink config is null", this.getClass())); + } + DatabaseConnection.sinkConfig = this.config.getSinkConfig(); + DatabaseConnection.initSinkConnection(); + DatabaseConnection.sinkDataSource.setDefaultAutoCommit(false); + + tableMgr = new RdbTableMgr(this.config.getSinkConfig(), DatabaseConnection.sinkDataSource); + } + + @Override + public void commit(ConnectRecord record) { + + } + + @Override + public String name() { + return null; + } + + @Override + public void put(List sinkRecords) { + if (sinkRecords == null || sinkRecords.isEmpty() || sinkRecords.get(0) == null) { + if (log.isDebugEnabled()) { + log.debug("[{}] got sink records are none", this.getClass()); + } + return; + } + ConnectRecord record = sinkRecords.get(0); + List> data = (List>) record.getData(); + if (data == null || data.isEmpty()) { + if (log.isDebugEnabled()) { + log.debug("[{}] got rows data is none", this.getClass()); + } + return; + } + CanalFullRecordOffset offset = (CanalFullRecordOffset) record.getPosition().getRecordOffset(); + if (offset == null || offset.getPosition() == null) { + if (log.isDebugEnabled()) { + log.debug("[{}] got canal full offset is none", this.getClass()); + } + return; + } + + MySQLTableDef tableDefinition = (MySQLTableDef) tableMgr.getTable(offset.getPosition().getSchema(), offset.getPosition().getTableName()); + if (tableDefinition == null) { + log.warn("target schema [{}] table [{}] is not exists", offset.getPosition().getSchema(), offset.getPosition().getTableName()); + return; + } + List cols = new ArrayList<>(tableDefinition.getColumnDefinitions().values()); + String sql = generateInsertPrepareSql(offset.getPosition().getSchema(), offset.getPosition().getTableName(), + cols); + DruidPooledConnection connection = null; + PreparedStatement statement = null; + try { + connection = DatabaseConnection.sinkDataSource.getConnection(); + statement = + connection.prepareStatement(sql); + for (Map col : data) { + setPrepareParams(statement, col, cols); + log.info("insert sql {}", statement.toString()); + statement.addBatch(); + } + statement.executeBatch(); + connection.commit(); + } catch (SQLException e) { + log.warn("full sink process schema [{}] table [{}] connector write fail", tableDefinition.getSchemaName(), tableDefinition.getTableName(), + e); + LockSupport.parkNanos(3000 * 1000L); + } catch (Exception e) { + log.error("full sink process schema [{}] table [{}] catch unknown exception", tableDefinition.getSchemaName(), + tableDefinition.getTableName(), e); + try { + if (connection != null && !connection.isClosed()) { + connection.rollback(); + } + } catch (SQLException rollback) { + log.warn("full sink process schema [{}] table [{}] rollback fail", tableDefinition.getSchemaName(), + tableDefinition.getTableName(), e); + } + } finally { + if (statement != null) { + try { + statement.close(); + } catch (SQLException e) { + log.info("close prepare statement fail", e); + } + } + + if (connection != null) { + try { + connection.close(); + } catch (SQLException e) { + log.info("close db connection fail", e); + } + } + } + } + + private void setPrepareParams(PreparedStatement preparedStatement, Map col, List columnDefs) throws Exception { + for (int i = 0; i < columnDefs.size(); i++) { + writeColumn(preparedStatement, i + 1, columnDefs.get(i), col.get(columnDefs.get(i).getName())); + } + } + + public void writeColumn(PreparedStatement ps, int index, MySQLColumnDef colType, Object value) throws Exception { + if (colType == null) { + String colVal = null; + if (value != null) { + colVal = value.toString(); + } + if (colVal == null) { + ps.setNull(index, Types.VARCHAR); + } else { + ps.setString(index, colVal); + } + } else if (value == null) { + ps.setNull(index, colType.getJdbcType().getVendorTypeNumber()); + } else { + switch (colType.getType()) { + case TINYINT: + case SMALLINT: + case MEDIUMINT: + case INT: + Long longValue = SqlUtils.toLong(value); + if (longValue == null) { + ps.setNull(index, 4); + return; + } else { + ps.setLong(index, longValue); + return; + } + case BIGINT: + case DECIMAL: + BigDecimal bigDecimalValue = SqlUtils.toBigDecimal(value); + if (bigDecimalValue == null) { + ps.setNull(index, 3); + return; + } else { + ps.setBigDecimal(index, bigDecimalValue); + return; + } + case FLOAT: + case DOUBLE: + Double doubleValue = SqlUtils.toDouble(value); + if (doubleValue == null) { + ps.setNull(index, 8); + } else { + ps.setDouble(index, doubleValue); + } + return; + case DATE: + case DATETIME: + case TIMESTAMP: + LocalDateTime dateValue = null; + if (!SqlUtils.isZeroTime(value)) { + try { + dateValue = SqlUtils.toLocalDateTime(value); + } catch (Exception e) { + ps.setString(index, SqlUtils.convertToString(value)); + return; + } + } else if (StringUtils.isNotBlank(config.getZeroDate())) { + dateValue = SqlUtils.toLocalDateTime(config.getZeroDate()); + } else { + ps.setObject(index, value); + return; + } + if (dateValue == null) { + ps.setNull(index, Types.TIMESTAMP); + } else { + ps.setString(index, dataTimePattern.format(dateValue)); + } + return; + case TIME: + String timeValue = SqlUtils.toMySqlTime(value); + if (StringUtils.isBlank(timeValue)) { + ps.setNull(index, 12); + return; + } else { + ps.setString(index, timeValue); + return; + } + case YEAR: + LocalDateTime yearValue = null; + if (!SqlUtils.isZeroTime(value)) { + yearValue = SqlUtils.toLocalDateTime(value); + } else if (StringUtils.isNotBlank(config.getZeroDate())) { + yearValue = SqlUtils.toLocalDateTime(config.getZeroDate()); + } else { + ps.setInt(index, 0); + return; + } + if (yearValue == null) { + ps.setNull(index, 4); + } else { + ps.setInt(index, yearValue.getYear()); + } + return; + case CHAR: + case VARCHAR: + case TINYTEXT: + case TEXT: + case MEDIUMTEXT: + case LONGTEXT: + case ENUM: + case SET: + String strValue = value.toString(); + if (strValue == null) { + ps.setNull(index, Types.VARCHAR); + return; + } else { + ps.setString(index, strValue); + return; + } + case JSON: + String jsonValue = value.toString(); + if (jsonValue == null) { + ps.setNull(index, Types.VARCHAR); + } else { + ps.setString(index, jsonValue); + } + return; + case BIT: + if (value instanceof Boolean) { + byte[] arrayBoolean = new byte[1]; + arrayBoolean[0] = (byte) (Boolean.TRUE.equals(value) ? 1 : 0); + ps.setBytes(index, arrayBoolean); + return; + } else if (value instanceof Number) { + ps.setBytes(index, SqlUtils.numberToBinaryArray((Number) value)); + return; + } else if ((value instanceof byte[]) || value.toString().startsWith("0x") || value.toString().startsWith("0X")) { + byte[] arrayBoolean = SqlUtils.toBytes(value); + if (arrayBoolean == null || arrayBoolean.length == 0) { + ps.setNull(index, Types.BIT); + return; + } else { + ps.setBytes(index, arrayBoolean); + return; + } + } else { + ps.setBytes(index, SqlUtils.numberToBinaryArray(SqlUtils.toInt(value))); + return; + } + case BINARY: + case VARBINARY: + case TINYBLOB: + case BLOB: + case MEDIUMBLOB: + case LONGBLOB: + byte[] binaryValue = SqlUtils.toBytes(value); + if (binaryValue == null) { + ps.setNull(index, Types.BINARY); + return; + } else { + ps.setBytes(index, binaryValue); + return; + } + case GEOMETRY: + case GEOMETRY_COLLECTION: + case GEOM_COLLECTION: + case POINT: + case LINESTRING: + case POLYGON: + case MULTIPOINT: + case MULTILINESTRING: + case MULTIPOLYGON: + String geoValue = SqlUtils.toGeometry(value); + if (geoValue == null) { + ps.setNull(index, Types.VARCHAR); + return; + } + ps.setString(index, geoValue); + return; + default: + throw new UnsupportedOperationException("columnType '" + colType + "' Unsupported."); + } + } + } + + private String generateInsertPrepareSql(String schema, String table, List cols) { + StringBuilder builder = new StringBuilder(); + builder.append("INSERT IGNORE INTO "); + builder.append(Constants.MySQLQuot); + builder.append(schema); + builder.append(Constants.MySQLQuot); + builder.append("."); + builder.append(Constants.MySQLQuot); + builder.append(table); + builder.append(Constants.MySQLQuot); + StringBuilder columns = new StringBuilder(); + StringBuilder values = new StringBuilder(); + for (MySQLColumnDef colInfo : cols) { + if (columns.length() > 0) { + columns.append(", "); + values.append(", "); + } + String wrapName = Constants.MySQLQuot + colInfo.getName() + Constants.MySQLQuot; + columns.append(wrapName); + values.append(colInfo.getType() == null ? "?" : colInfo.getType().genPrepareStatement4Insert()); + } + builder.append("(").append(columns).append(")"); + builder.append(" VALUES "); + builder.append("(").append(values).append(")"); + return builder.toString(); + } + + +} diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java index 32c55ec42c..8ef60ff04d 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java @@ -22,6 +22,7 @@ import org.apache.eventmesh.connector.canal.model.EventColumn; import org.apache.eventmesh.connector.canal.model.EventColumnIndexComparable; import org.apache.eventmesh.connector.canal.model.EventType; +import org.apache.eventmesh.connector.canal.source.table.RdbTableMgr; import org.apache.commons.lang3.StringUtils; @@ -47,7 +48,8 @@ @Slf4j public class EntryParser { - public Map> parse(CanalSourceConfig sourceConfig, List datas) { + public static Map> parse(CanalSourceConfig sourceConfig, List datas, + RdbTableMgr tables) { List recordList = new ArrayList<>(); List transactionDataBuffer = new ArrayList<>(); // need check weather the entry is loopback @@ -64,7 +66,7 @@ public Map> parse(CanalSourceConfig sourceConfig, } break; case TRANSACTIONEND: - parseRecordListWithEntryBuffer(sourceConfig, recordList, transactionDataBuffer); + parseRecordListWithEntryBuffer(sourceConfig, recordList, transactionDataBuffer, tables); if (!recordList.isEmpty()) { recordMap.put(entry.getHeader().getLogfileOffset(), recordList); } @@ -80,10 +82,11 @@ public Map> parse(CanalSourceConfig sourceConfig, return recordMap; } - private void parseRecordListWithEntryBuffer(CanalSourceConfig sourceConfig, List recordList, - List transactionDataBuffer) { + private static void parseRecordListWithEntryBuffer(CanalSourceConfig sourceConfig, + List recordList, + List transactionDataBuffer, RdbTableMgr tables) { for (Entry bufferEntry : transactionDataBuffer) { - List recordParsedList = internParse(sourceConfig, bufferEntry); + List recordParsedList = internParse(sourceConfig, bufferEntry, tables); if (CollectionUtils.isEmpty(recordParsedList)) { continue; } @@ -99,15 +102,17 @@ private void parseRecordListWithEntryBuffer(CanalSourceConfig sourceConfig, List } } - private boolean checkNeedSync(CanalSourceConfig sourceConfig, RowData rowData) { - Column markedColumn = getColumnIgnoreCase(rowData.getAfterColumnsList(), sourceConfig.getNeedSyncMarkTableColumnName()); + private static boolean checkNeedSync(CanalSourceConfig sourceConfig, RowData rowData) { + Column markedColumn = getColumnIgnoreCase(rowData.getAfterColumnsList(), + sourceConfig.getNeedSyncMarkTableColumnName()); if (markedColumn != null) { - return StringUtils.equalsIgnoreCase(markedColumn.getValue(), sourceConfig.getNeedSyncMarkTableColumnValue()); + return StringUtils.equalsIgnoreCase(markedColumn.getValue(), + sourceConfig.getNeedSyncMarkTableColumnValue()); } return false; } - private Column getColumnIgnoreCase(List columns, String columName) { + private static Column getColumnIgnoreCase(List columns, String columName) { for (Column column : columns) { if (column.getName().equalsIgnoreCase(columName)) { return column; @@ -116,11 +121,11 @@ private Column getColumnIgnoreCase(List columns, String columName) { return null; } - private List internParse(CanalSourceConfig sourceConfig, Entry entry) { + private static List internParse(CanalSourceConfig sourceConfig, Entry entry, + RdbTableMgr tableMgr) { String schemaName = entry.getHeader().getSchemaName(); String tableName = entry.getHeader().getTableName(); - if (!schemaName.equalsIgnoreCase(sourceConfig.getSourceConnectorConfig().getSchemaName()) - || !tableName.equalsIgnoreCase(sourceConfig.getSourceConnectorConfig().getTableName())) { + if (tableMgr.getTable(schemaName, tableName) == null) { return null; } @@ -155,7 +160,8 @@ private List internParse(CanalSourceConfig sourceConfig, Ent return recordList; } - private CanalConnectRecord internParse(CanalSourceConfig canalSourceConfig, Entry entry, RowChange rowChange, RowData rowData) { + private static CanalConnectRecord internParse(CanalSourceConfig canalSourceConfig, Entry entry, + RowChange rowChange, RowData rowData) { CanalConnectRecord canalConnectRecord = new CanalConnectRecord(); canalConnectRecord.setTableName(entry.getHeader().getTableName()); canalConnectRecord.setSchemaName(entry.getHeader().getSchemaName()); @@ -242,7 +248,8 @@ private CanalConnectRecord internParse(CanalSourceConfig canalSourceConfig, Entr return canalConnectRecord; } - private void checkUpdateKeyColumns(Map oldKeyColumns, Map keyColumns) { + private static void checkUpdateKeyColumns(Map oldKeyColumns, + Map keyColumns) { if (oldKeyColumns.isEmpty()) { return; } @@ -264,7 +271,7 @@ private void checkUpdateKeyColumns(Map oldKeyColumns, Map> queue; + private final DataSource dataSource; + private final MySQLTableDef tableDefinition; + private final TableFullPosition position; + private static final int LIMIT = 2048; + private final int flushSize; + private final AtomicReference choosePrimaryKey = new AtomicReference<>(null); + private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd"); + private static final DateTimeFormatter DATE_STAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + + + public CanalFullProducer(BlockingQueue> queue, DataSource dataSource, + MySQLTableDef tableDefinition, TableFullPosition position, int flushSize) { + this.queue = queue; + this.dataSource = dataSource; + this.tableDefinition = tableDefinition; + this.position = position; + this.flushSize = flushSize; + } + + public void choosePrimaryKey() { + for (RdbColumnDefinition col : tableDefinition.getColumnDefinitions().values()) { + if (position.getCurPrimaryKeyCols().get(col.getName()) != null) { + choosePrimaryKey.set(col.getName()); + log.info("schema [{}] table [{}] choose primary key [{}]", tableDefinition.getSchemaName(), tableDefinition.getTableName(), + col.getName()); + return; + } + } + throw new EventMeshException("illegal: can't pick any primary key"); + } + + + public void start(AtomicBoolean flag) { + choosePrimaryKey(); + boolean isFirstSelect = true; + List> rows = new LinkedList<>(); + while (flag.get()) { + String scanSql = generateScanSql(isFirstSelect); + log.info("scan sql is [{}] , cur position [{}]", scanSql, JsonUtils.toJSONString(position.getCurPrimaryKeyCols())); + + try (Connection connection = dataSource.getConnection(); PreparedStatement statement = + connection.prepareStatement(scanSql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) { + statement.setFetchSize(Integer.MIN_VALUE); + setPrepareStatementValue(statement); + try (ResultSet resultSet = statement.executeQuery()) { + Map lastCol = null; + while (flag.get() && resultSet.next()) { + Map columnValues = new LinkedHashMap<>(); + for (Map.Entry col : + tableDefinition.getColumnDefinitions().entrySet()) { + columnValues.put(col.getKey(), readColumn(resultSet, col.getKey(), + col.getValue().getType())); + } + lastCol = columnValues; + rows.add(lastCol); + if (rows.size() < flushSize) { + continue; + } + refreshPosition(lastCol); + commitConnectRecord(rows); + rows = new LinkedList<>(); + } + + if (lastCol == null || checkIsScanFinish(lastCol)) { + log.info("full scan db [{}] table [{}] finish", tableDefinition.getSchemaName(), + tableDefinition.getTableName()); + commitConnectRecord(rows); + return; + } + refreshPosition(lastCol); + } catch (InterruptedException ignore) { + log.info("full scan db [{}] table [{}] interrupted", tableDefinition.getSchemaName(), + tableDefinition.getTableName()); + Thread.currentThread().interrupt(); + return; + } + } catch (SQLException e) { + log.error("full source process schema [{}] table [{}] catch SQLException fail", tableDefinition.getSchemaName(), + tableDefinition.getTableName(), e); + LockSupport.parkNanos(3000 * 1000L); + } catch (Exception e) { + log.error("full source process schema [{}] table [{}] catch unknown exception", tableDefinition.getSchemaName(), + tableDefinition.getTableName(), e); + return; + } + if (isFirstSelect) { + isFirstSelect = false; + } + } + } + + private void commitConnectRecord(List> rows) throws InterruptedException { + if (rows == null || rows.isEmpty()) { + return; + } + JobRdbFullPosition jobRdbFullPosition = new JobRdbFullPosition(); + jobRdbFullPosition.setPrimaryKeyRecords(JsonUtils.toJSONString(position)); + jobRdbFullPosition.setTableName(tableDefinition.getTableName()); + jobRdbFullPosition.setSchema(tableDefinition.getSchemaName()); + CanalFullRecordOffset offset = new CanalFullRecordOffset(); + offset.setPosition(jobRdbFullPosition); + CanalFullRecordPartition partition = new CanalFullRecordPartition(); + ArrayList records = new ArrayList<>(); + records.add(new ConnectRecord(partition, offset, System.currentTimeMillis(), rows)); + queue.put(records); + } + + private boolean checkIsScanFinish(Map lastCol) { + Object lastPrimaryValue = lastCol.get(choosePrimaryKey.get()); + Object maxPrimaryValue = position.getMaxPrimaryKeyCols().get(choosePrimaryKey.get()); + if (lastPrimaryValue instanceof Number) { + BigDecimal last = new BigDecimal(String.valueOf(lastPrimaryValue)); + BigDecimal max = + new BigDecimal(String.valueOf(maxPrimaryValue)); + return last.compareTo(max) > 0; + } + if (lastPrimaryValue instanceof Comparable) { + return ((Comparable) lastPrimaryValue).compareTo(maxPrimaryValue) > 0; + } + return false; + } + + public Object readColumn(ResultSet rs, String col, CanalMySQLType colType) throws Exception { + if (col == null || rs.wasNull()) { + return null; + } + switch (colType) { + case TINYINT: + case SMALLINT: + case MEDIUMINT: + case INT: + Long valueLong = rs.getLong(col); + if (valueLong.compareTo((long) Integer.MAX_VALUE) > 0) { + return valueLong; + } + return valueLong.intValue(); + case BIGINT: + String v = rs.getString(col); + if (v == null) { + return null; + } + BigDecimal valueBigInt = new BigDecimal(v); + if (valueBigInt.compareTo(BigDecimal.valueOf(Long.MAX_VALUE)) > 0) { + return valueBigInt; + } + return valueBigInt.longValue(); + case FLOAT: + case DOUBLE: + case DECIMAL: + return rs.getBigDecimal(col); + case DATE: + return rs.getObject(col, LocalDate.class); + case TIME: + return rs.getObject(col, LocalTime.class); + case DATETIME: + case TIMESTAMP: + return rs.getObject(col, LocalDateTime.class); + case YEAR: + return rs.getInt(col); + case CHAR: + case VARCHAR: + case TINYTEXT: + case TEXT: + case MEDIUMTEXT: + case LONGTEXT: + case ENUM: + case SET: + case JSON: + return rs.getString(col); + case BIT: + case BINARY: + case VARBINARY: + case TINYBLOB: + case BLOB: + case MEDIUMBLOB: + case LONGBLOB: + return rs.getBytes(col); + case GEOMETRY: + case GEOMETRY_COLLECTION: + case GEOM_COLLECTION: + case POINT: + case LINESTRING: + case POLYGON: + case MULTIPOINT: + case MULTILINESTRING: + case MULTIPOLYGON: + byte[] geo = rs.getBytes(col); + if (geo == null) { + return null; + } + return SqlUtils.toGeometry(geo); + default: + return rs.getObject(col); + } + } + + + private void refreshPosition(Map lastCol) { + Map nextPosition = new LinkedHashMap<>(); + for (Map.Entry entry : position.getCurPrimaryKeyCols().entrySet()) { + nextPosition.put(entry.getKey(), lastCol.get(entry.getKey())); + } + position.setCurPrimaryKeyCols(nextPosition); + } + + private void setPrepareStatementValue(PreparedStatement statement) throws SQLException { + String colName = choosePrimaryKey.get(); + if (colName == null) { + return; + } + RdbColumnDefinition columnDefinition = tableDefinition.getColumnDefinitions().get(colName); + Object value = position.getCurPrimaryKeyCols().get(colName); + String str; + switch (columnDefinition.getJdbcType()) { + case BIT: + case TINYINT: + case SMALLINT: + case INTEGER: + case BIGINT: + statement.setBigDecimal(1, new BigDecimal(String.valueOf(value))); + break; + case DECIMAL: + case FLOAT: + case DOUBLE: + case NUMERIC: + statement.setDouble(1, new BigDecimal(String.valueOf(value)).doubleValue()); + break; + case CHAR: + case VARCHAR: + case LONGNVARCHAR: + case NCHAR: + case NVARCHAR: + case LONGVARCHAR: + case CLOB: + case NCLOB: + statement.setString(1, String.valueOf(value)); + break; + case BLOB: + case VARBINARY: + case BINARY: + str = String.valueOf(value); + String hexStr = str; + if (str.startsWith("0x")) { + hexStr = str.substring(str.indexOf("0x")); + } + byte[] bytes = SqlUtils.hex2bytes(hexStr); + statement.setBytes(1, bytes); + break; + case DATE: + Instant d; + if (value instanceof Long) { + Long val = (Long) value; + d = Instant.ofEpochMilli(val); + str = d.atZone(ZoneId.systemDefault()).toLocalDateTime().format(DATE_FORMATTER); + } else if (value instanceof Integer) { + Integer val = (Integer) value; + d = Instant.ofEpochMilli((long) val); + str = d.atZone(ZoneId.systemDefault()).toLocalDateTime().format(DATE_FORMATTER); + } else if (value instanceof String) { + str = (String) value; + } else { + if (!(value instanceof LocalDate)) { + throw new IllegalArgumentException("unsupported date class type:" + value.getClass().getSimpleName()); + } + str = ((LocalDate) value).format(DATE_FORMATTER); + } + statement.setString(1, str); + break; + case TIMESTAMP: + if (value instanceof String) { + str = (String) value; + } else { + if (!(value instanceof LocalDateTime)) { + throw new IllegalArgumentException("unsupported timestamp class type:" + value.getClass().getSimpleName()); + } + str = ((LocalDateTime) value).format(DATE_STAMP_FORMATTER); + } + statement.setString(1, str); + break; + default: + throw new EventMeshException(String.format("not support the primary key type [%s]", value.getClass())); + } + } + + + private void generateQueryColumnsSql(StringBuilder builder, Collection rdbColDefs) { + if (rdbColDefs == null || rdbColDefs.isEmpty()) { + builder.append("*"); + return; + } + boolean first = true; + for (RdbColumnDefinition colDef : rdbColDefs) { + if (first) { + first = false; + } else { + builder.append(","); + } + builder.append(Constants.MySQLQuot); + builder.append(colDef.getName()); + builder.append(Constants.MySQLQuot); + } + } + + private String generateScanSql(boolean isFirst) { + StringBuilder builder = new StringBuilder(); + builder.append("select "); + generateQueryColumnsSql(builder, tableDefinition.getColumnDefinitions().values()); + builder.append(" from "); + builder.append(Constants.MySQLQuot); + builder.append(tableDefinition.getSchemaName()); + builder.append(Constants.MySQLQuot); + builder.append("."); + builder.append(Constants.MySQLQuot); + builder.append(tableDefinition.getTableName()); + builder.append(Constants.MySQLQuot); + buildWhereSql(builder, isFirst); + builder.append(" limit " + LIMIT); + return builder.toString(); + } + + private void buildWhereSql(StringBuilder builder, boolean isEquals) { + builder.append(" where ") + .append(Constants.MySQLQuot) + .append(choosePrimaryKey.get()) + .append(Constants.MySQLQuot); + if (isEquals) { + builder.append(" >= ? "); + } else { + builder.append(" > ? "); + } + builder.append(" order by ").append(Constants.MySQLQuot).append(choosePrimaryKey.get()).append(Constants.MySQLQuot) + .append(" asc "); + } +} diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java index 577142e00c..4b96177319 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java @@ -24,7 +24,9 @@ import org.apache.eventmesh.common.remote.offset.canal.CanalRecordPartition; import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.connector.canal.CanalConnectRecord; +import org.apache.eventmesh.connector.canal.DatabaseConnection; import org.apache.eventmesh.connector.canal.source.EntryParser; +import org.apache.eventmesh.connector.canal.source.table.RdbTableMgr; import org.apache.eventmesh.openconnect.api.ConnectorCreateService; import org.apache.eventmesh.openconnect.api.connector.ConnectorContext; import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext; @@ -84,6 +86,8 @@ public class CanalSourceConnector implements Source, ConnectorCreateService configClass() { return CanalSourceConfig.class; @@ -146,6 +150,7 @@ protected void startEventParserInternal(CanalEventParser parser, boolean isGroup return instance; } }); + tableMgr = new RdbTableMgr(sourceConfig.getSourceConnectorConfig(), DatabaseConnection.sourceDataSource); } private Canal buildCanal(CanalSourceConfig sourceConfig) { @@ -218,6 +223,7 @@ public void start() throws Exception { if (running) { return; } + tableMgr.start(); canalServer.start(); canalServer.start(sourceConfig.getDestination()); @@ -288,11 +294,9 @@ public List poll() { entries = message.getEntries(); } - EntryParser entryParser = new EntryParser(); - List result = new ArrayList<>(); // key: Xid offset - Map> connectorRecordMap = entryParser.parse(sourceConfig, entries); + Map> connectorRecordMap = EntryParser.parse(sourceConfig, entries, tableMgr); if (!connectorRecordMap.isEmpty()) { Set>> entrySet = connectorRecordMap.entrySet(); diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java new file mode 100644 index 0000000000..df3c7571c2 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.canal.source.connector; + +import org.apache.eventmesh.common.AbstractComponent; +import org.apache.eventmesh.common.EventMeshThreadFactory; +import org.apache.eventmesh.common.config.connector.Config; +import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSourceFullConfig; +import org.apache.eventmesh.common.config.connector.rdb.canal.JobRdbFullPosition; +import org.apache.eventmesh.common.config.connector.rdb.canal.RdbDBDefinition; +import org.apache.eventmesh.common.config.connector.rdb.canal.RdbTableDefinition; +import org.apache.eventmesh.common.config.connector.rdb.canal.mysql.MySQLTableDef; +import org.apache.eventmesh.common.exception.EventMeshException; +import org.apache.eventmesh.common.utils.JsonUtils; +import org.apache.eventmesh.connector.canal.DatabaseConnection; +import org.apache.eventmesh.connector.canal.source.position.CanalFullPositionMgr; +import org.apache.eventmesh.connector.canal.source.position.TableFullPosition; +import org.apache.eventmesh.connector.canal.source.table.RdbSimpleTable; +import org.apache.eventmesh.connector.canal.source.table.RdbTableMgr; +import org.apache.eventmesh.openconnect.api.ConnectorCreateService; +import org.apache.eventmesh.openconnect.api.connector.ConnectorContext; +import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext; +import org.apache.eventmesh.openconnect.api.source.Source; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; + +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class CanalSourceFullConnector extends AbstractComponent implements Source, ConnectorCreateService { + private CanalSourceFullConfig config; + private CanalFullPositionMgr positionMgr; + private RdbTableMgr tableMgr; + private ThreadPoolExecutor executor; + private final BlockingQueue> queue = new LinkedBlockingQueue<>(); + private final AtomicBoolean flag = new AtomicBoolean(true); + + @Override + protected void run() throws Exception { + this.tableMgr.start(); + this.positionMgr.start(); + if (positionMgr.isFinished()) { + log.info("connector [{}] has finished the job", config.getConnectorConfig().getConnectorName()); + return; + } + executor = new ThreadPoolExecutor(config.getParallel(), config.getParallel(), 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(), new EventMeshThreadFactory("canal-source-full")); + List producers = new LinkedList<>(); + if (config.getConnectorConfig().getDatabases() != null) { + for (RdbDBDefinition db : config.getConnectorConfig().getDatabases()) { + for (RdbTableDefinition table : db.getTables()) { + try { + log.info("it will create producer of db [{}] table [{}]", db.getSchemaName(), table.getTableName()); + RdbSimpleTable simpleTable = new RdbSimpleTable(db.getSchemaName(), table.getTableName()); + JobRdbFullPosition position = positionMgr.getPosition(simpleTable); + if (position == null) { + throw new EventMeshException(String.format("db [%s] table [%s] have none position info", + db.getSchemaName(), table.getTableName())); + } + RdbTableDefinition tableDefinition = tableMgr.getTable(simpleTable); + if (tableDefinition == null) { + throw new EventMeshException(String.format("db [%s] table [%s] have none table definition info", + db.getSchemaName(), table.getTableName())); + } + + producers.add(new CanalFullProducer(queue, DatabaseConnection.sourceDataSource, (MySQLTableDef) tableDefinition, + JsonUtils.parseObject(position.getPrimaryKeyRecords(), TableFullPosition.class), + config.getFlushSize())); + } catch (Exception e) { + log.error("create schema [{}] table [{}] producers fail", db.getSchemaName(), + table.getTableName(), e); + } + } + } + } + producers.forEach(p -> executor.execute(() -> p.start(flag))); + } + + @Override + protected void shutdown() throws Exception { + flag.set(false); + if (!executor.isShutdown()) { + executor.shutdown(); + try { + if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { + log.warn("wait thread pool shutdown timeout, it will shutdown now"); + executor.shutdownNow(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.info("shutdown thread pool fail"); + } + } + if (DatabaseConnection.sourceDataSource != null) { + DatabaseConnection.sourceDataSource.close(); + log.info("data source has been closed"); + } + } + + @Override + public Source create() { + return new CanalSourceFullConnector(); + } + + @Override + public Class configClass() { + return CanalSourceFullConfig.class; + } + + @Override + public void init(Config config) throws Exception { + this.config = (CanalSourceFullConfig) config; + init(); + } + + private void init() { + DatabaseConnection.sourceConfig = this.config.getConnectorConfig(); + DatabaseConnection.initSourceConnection(); + this.tableMgr = new RdbTableMgr(config.getConnectorConfig(), DatabaseConnection.sourceDataSource); + this.positionMgr = new CanalFullPositionMgr(config, tableMgr); + } + + @Override + public void init(ConnectorContext connectorContext) throws Exception { + SourceConnectorContext sourceConnectorContext = (SourceConnectorContext) connectorContext; + this.config = (CanalSourceFullConfig) sourceConnectorContext.getSourceConfig(); + init(); + } + + @Override + public void commit(ConnectRecord record) { + // nothing + } + + @Override + public String name() { + return this.config.getConnectorConfig().getConnectorName(); + } + + @Override + public List poll() { + while (flag.get()) { + try { + List records = queue.poll(5, TimeUnit.SECONDS); + if (records == null || records.isEmpty()) { + continue; + } + return records; + } catch (InterruptedException ignore) { + Thread.currentThread().interrupt(); + log.info("[{}] thread interrupted", this.getClass()); + return null; + } + } + log.info("[{}] life flag is stop, so return null", this.getClass()); + return null; + } + +} diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/position/CanalFullPositionMgr.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/position/CanalFullPositionMgr.java new file mode 100644 index 0000000000..a9d47b4604 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/position/CanalFullPositionMgr.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.canal.source.position; + +import org.apache.eventmesh.common.AbstractComponent; +import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSourceFullConfig; +import org.apache.eventmesh.common.config.connector.rdb.canal.JobRdbFullPosition; +import org.apache.eventmesh.common.config.connector.rdb.canal.RdbColumnDefinition; +import org.apache.eventmesh.common.config.connector.rdb.canal.RdbDBDefinition; +import org.apache.eventmesh.common.config.connector.rdb.canal.RdbTableDefinition; +import org.apache.eventmesh.common.config.connector.rdb.canal.mysql.Constants; +import org.apache.eventmesh.common.config.connector.rdb.canal.mysql.MySQLTableDef; +import org.apache.eventmesh.common.remote.offset.RecordPosition; +import org.apache.eventmesh.common.remote.offset.canal.CanalFullRecordOffset; +import org.apache.eventmesh.common.utils.JsonUtils; +import org.apache.eventmesh.connector.canal.DatabaseConnection; +import org.apache.eventmesh.connector.canal.source.table.RdbSimpleTable; +import org.apache.eventmesh.connector.canal.source.table.RdbTableMgr; + +import org.apache.commons.lang3.StringUtils; + +import java.sql.JDBCType; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.LinkedHashMap; +import java.util.Map; + +import javax.sql.DataSource; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class CanalFullPositionMgr extends AbstractComponent { + + private final CanalSourceFullConfig config; + private final Map positions = new LinkedHashMap<>(); + private final RdbTableMgr tableMgr; + + public CanalFullPositionMgr(CanalSourceFullConfig config, RdbTableMgr tableMgr) { + this.config = config; + this.tableMgr = tableMgr; + } + + @Override + protected void run() throws Exception { + if (config == null || config.getConnectorConfig() == null || config.getConnectorConfig().getDatabases() == null) { + log.info("config or database is null"); + return; + } + prepareRecordPosition(); + initPositions(); + } + + public void prepareRecordPosition() { + if (config.getStartPosition() != null && !config.getStartPosition().isEmpty()) { + for (RecordPosition record : config.getStartPosition()) { + CanalFullRecordOffset offset = (CanalFullRecordOffset) record.getRecordOffset(); + RdbSimpleTable table = new RdbSimpleTable(offset.getPosition().getSchema(), offset.getPosition().getTableName()); + positions.put(table, offset.getPosition()); + } + } + } + + public JobRdbFullPosition getPosition(RdbSimpleTable table) { + return positions.get(table); + } + + public boolean isFinished() { + for (JobRdbFullPosition position : positions.values()) { + if (!position.isFinished()) { + log.info("schema [{}] table [{}] is not finish", position.getSchema(), position.getTableName()); + return false; + } + } + return true; + } + + private void initPositions() { + for (RdbDBDefinition database : config.getConnectorConfig().getDatabases()) { + for (RdbTableDefinition table : database.getTables()) { + try { + RdbSimpleTable simpleTable = new RdbSimpleTable(database.getSchemaName(), table.getTableName()); + RdbTableDefinition tableDefinition; + if ((tableDefinition = tableMgr.getTable(simpleTable)) == null) { + log.error("db [{}] table [{}] definition is null", database.getSchemaName(), table.getTableName()); + continue; + } + log.info("init position of data [{}] table [{}]", database.getSchemaName(), table.getTableName()); + + JobRdbFullPosition recordPosition = positions.get(simpleTable); + if (recordPosition == null || !recordPosition.isFinished()) { + positions.put(simpleTable, + fetchTableInfo(DatabaseConnection.sourceDataSource, (MySQLTableDef) tableDefinition, recordPosition)); + } + } catch (Exception e) { + log.error("process schema [{}] table [{}] position fail", database.getSchemaName(), table.getTableName(), e); + } + + } + } + } + + private JobRdbFullPosition fetchTableInfo(DataSource dataSource, MySQLTableDef tableDefinition, JobRdbFullPosition recordPosition) + throws SQLException { + TableFullPosition position = new TableFullPosition(); + Map preMinPrimaryKeys = new LinkedHashMap<>(); + Map preMaxPrimaryKeys = new LinkedHashMap<>(); + for (String pk : tableDefinition.getPrimaryKeys()) { + Object min = fetchMinPrimaryKey(dataSource, tableDefinition, preMinPrimaryKeys, pk); + Object max = fetchMaxPrimaryKey(dataSource, tableDefinition, preMaxPrimaryKeys, pk); + preMinPrimaryKeys.put(pk, min); + preMaxPrimaryKeys.put(pk, max); + position.getCurPrimaryKeyCols().put(pk, min); + position.getMinPrimaryKeyCols().put(pk, min); + position.getMaxPrimaryKeyCols().put(pk, max); + } + JobRdbFullPosition jobRdbFullPosition = new JobRdbFullPosition(); + if (recordPosition != null) { + if (StringUtils.isNotBlank(recordPosition.getPrimaryKeyRecords())) { + TableFullPosition record = JsonUtils.parseObject(recordPosition.getPrimaryKeyRecords(), TableFullPosition.class); + if (record != null && record.getCurPrimaryKeyCols() != null && !record.getCurPrimaryKeyCols().isEmpty()) { + position.setCurPrimaryKeyCols(record.getCurPrimaryKeyCols()); + } + } + jobRdbFullPosition.setPercent(recordPosition.getPercent()); + } + long rowCount = queryCurTableRowCount(dataSource, tableDefinition); + jobRdbFullPosition.setSchema(tableDefinition.getSchemaName()); + jobRdbFullPosition.setTableName(tableDefinition.getTableName()); + jobRdbFullPosition.setMaxCount(rowCount); + jobRdbFullPosition.setPrimaryKeyRecords(JsonUtils.toJSONString(position)); + return jobRdbFullPosition; + } + + + private long queryCurTableRowCount(DataSource datasource, MySQLTableDef tableDefinition) throws SQLException { + String sql = "select `AVG_ROW_LENGTH`,`DATA_LENGTH` from information_schema.TABLES where `TABLE_SCHEMA`='" + tableDefinition.getSchemaName() + + "' and `TABLE_NAME`='" + tableDefinition.getTableName() + "'"; + try (Statement statement = datasource.getConnection().createStatement(); ResultSet resultSet = statement.executeQuery(sql)) { + long result = 0L; + if (resultSet.next()) { + long avgRowLength = resultSet.getLong("AVG_ROW_LENGTH"); + long dataLength = resultSet.getLong("DATA_LENGTH"); + if (avgRowLength != 0L) { + result = dataLength / avgRowLength; + } + } + return result; + } + } + + private void appendPrePrimaryKey(Map preMap, StringBuilder sql) { + if (preMap != null && !preMap.isEmpty()) { + sql.append(" WHERE "); + boolean first = true; + for (Map.Entry entry : preMap.entrySet()) { + if (first) { + first = false; + } else { + sql.append(" AND "); + } + sql.append(Constants.MySQLQuot).append(entry.getKey()).append(Constants.MySQLQuot).append("=?"); + } + } + } + + private void setValue2Statement(PreparedStatement ps, Map preMap, MySQLTableDef tableDefinition) throws SQLException { + if (preMap != null && !preMap.isEmpty()) { + int index = 1; + for (Map.Entry entry : preMap.entrySet()) { + RdbColumnDefinition def = tableDefinition.getColumnDefinitions().get(entry.getKey()); + ps.setObject(index, entry.getValue(), def.getJdbcType().getVendorTypeNumber()); + ++index; + } + } + } + + private Object fetchMinPrimaryKey(DataSource dataSource, MySQLTableDef tableDefinition, Map prePrimary, String curPrimaryKeyCol) + throws SQLException { + StringBuilder builder = new StringBuilder(); + builder.append("SELECT MIN(").append(Constants.MySQLQuot).append(curPrimaryKeyCol).append(Constants.MySQLQuot) + .append(") min_primary_key FROM").append(Constants.MySQLQuot).append(tableDefinition.getSchemaName()).append(Constants.MySQLQuot) + .append(".").append(Constants.MySQLQuot).append(tableDefinition.getTableName()).append(Constants.MySQLQuot); + appendPrePrimaryKey(prePrimary, builder); + String sql = builder.toString(); + log.info("fetch min primary sql [{}]", sql); + try (PreparedStatement statement = dataSource.getConnection().prepareStatement(sql)) { + setValue2Statement(statement, prePrimary, tableDefinition); + try (ResultSet resultSet = statement.executeQuery()) { + if (resultSet.next()) { + RdbColumnDefinition columnDefinition = tableDefinition.getColumnDefinitions().get(curPrimaryKeyCol); + if (columnDefinition.getJdbcType() == JDBCType.TIMESTAMP) { + return resultSet.getString("min_primary_key"); + } else { + return resultSet.getObject("min_primary_key"); + } + } + } + } + return null; + } + + private Object fetchMaxPrimaryKey(DataSource dataSource, MySQLTableDef tableDefinition, Map prePrimary, String curPrimaryKeyCol) + throws SQLException { + StringBuilder builder = new StringBuilder(); + builder.append("SELECT MAX(").append(Constants.MySQLQuot).append(curPrimaryKeyCol).append(Constants.MySQLQuot) + .append(") max_primary_key FROM").append(Constants.MySQLQuot).append(tableDefinition.getSchemaName()).append(Constants.MySQLQuot) + .append(".").append(Constants.MySQLQuot).append(tableDefinition.getTableName()).append(Constants.MySQLQuot); + appendPrePrimaryKey(prePrimary, builder); + String sql = builder.toString(); + log.info("fetch max primary sql [{}]", sql); + try (PreparedStatement statement = dataSource.getConnection().prepareStatement(sql)) { + setValue2Statement(statement, prePrimary, tableDefinition); + try (ResultSet resultSet = statement.executeQuery()) { + if (resultSet.next()) { + RdbColumnDefinition columnDefinition = tableDefinition.getColumnDefinitions().get(curPrimaryKeyCol); + if (columnDefinition.getJdbcType() == JDBCType.TIMESTAMP) { + return resultSet.getString("max_primary_key"); + } else { + return resultSet.getObject("max_primary_key"); + } + } + } + } + return null; + } + + + @Override + protected void shutdown() throws Exception { + + } +} diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/position/TableFullPosition.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/position/TableFullPosition.java new file mode 100644 index 0000000000..b1a8024ec5 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/position/TableFullPosition.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.canal.source.position; + +import java.util.LinkedHashMap; +import java.util.Map; + +import lombok.Data; + +@Data +public class TableFullPosition { + private Map curPrimaryKeyCols = new LinkedHashMap<>(); + private Map minPrimaryKeyCols = new LinkedHashMap<>(); + private Map maxPrimaryKeyCols = new LinkedHashMap<>(); +} diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/table/RdbSimpleTable.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/table/RdbSimpleTable.java new file mode 100644 index 0000000000..5b9c35fff3 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/table/RdbSimpleTable.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.canal.source.table; + +import org.apache.eventmesh.common.config.connector.rdb.canal.RdbTableDefinition; + +import java.util.Objects; + +import lombok.Data; + +@Data +public class RdbSimpleTable extends RdbTableDefinition { + public RdbSimpleTable(String database, String schema, String tableName) { + this.schemaName = schema; + this.tableName = tableName; + this.database = database; + } + + public RdbSimpleTable(String schema, String tableName) { + this(null, schema, tableName); + } + + private final String database; + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + RdbSimpleTable that = (RdbSimpleTable) o; + return Objects.equals(database, that.database); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), database); + } +} diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/table/RdbTableMgr.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/table/RdbTableMgr.java new file mode 100644 index 0000000000..1aebcf6364 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/table/RdbTableMgr.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.canal.source.table; + +import org.apache.eventmesh.common.AbstractComponent; +import org.apache.eventmesh.common.config.connector.rdb.JdbcConfig; +import org.apache.eventmesh.common.config.connector.rdb.canal.CanalMySQLType; +import org.apache.eventmesh.common.config.connector.rdb.canal.RdbDBDefinition; +import org.apache.eventmesh.common.config.connector.rdb.canal.RdbTableDefinition; +import org.apache.eventmesh.common.config.connector.rdb.canal.mysql.MySQLColumnDef; +import org.apache.eventmesh.common.config.connector.rdb.canal.mysql.MySQLTableDef; +import org.apache.eventmesh.connector.canal.SqlUtils; + +import java.sql.JDBCType; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import javax.sql.DataSource; + +import lombok.extern.slf4j.Slf4j; + +/** + * Description: + */ +@Slf4j + +public class RdbTableMgr extends AbstractComponent { + private final JdbcConfig config; + private final Map tables = new HashMap<>(); + private final DataSource dataSource; + + public RdbTableMgr(JdbcConfig config, DataSource dataSource) { + this.config = config; + this.dataSource = dataSource; + } + + public RdbTableDefinition getTable(String schema, String tableName) { + return getTable(new RdbSimpleTable(schema, tableName)); + } + + public RdbTableDefinition getTable(RdbSimpleTable table) { + return tables.get(table); + } + + @Override + protected void run() { + if (config != null && config.getDatabases() != null) { + for (RdbDBDefinition db : config.getDatabases()) { + if (db.getTables() == null) { + log.warn("init db [{}] position, but it's tables are null", db.getSchemaName()); + continue; + } + for (RdbTableDefinition table : db.getTables()) { + try { + MySQLTableDef mysqlTable = new MySQLTableDef(); + mysqlTable.setSchemaName(db.getSchemaName()); + mysqlTable.setTableName(table.getTableName()); + List tables = Collections.singletonList(table.getTableName()); + Map> primaryKeys = queryTablePrimaryKey(db.getSchemaName(), tables); + Map> columns = queryColumns(db.getSchemaName(), tables); + if (primaryKeys == null || primaryKeys.isEmpty() || primaryKeys.get(table.getTableName()) == null) { + log.warn("init db [{}] table [{}] info, and primary keys are empty", db.getSchemaName(), table.getTableName()); + } else { + mysqlTable.setPrimaryKeys(new HashSet<>(primaryKeys.get(table.getTableName()))); + } + if (columns == null || columns.isEmpty() || columns.get(table.getTableName()) == null) { + log.warn("init db [{}] table [{}] info, and columns are empty", db.getSchemaName(), table.getTableName()); + } else { + LinkedHashMap cols = new LinkedHashMap<>(); + columns.get(table.getTableName()).forEach(x -> cols.put(x.getName(), x)); + mysqlTable.setColumnDefinitions(cols); + } + + this.tables.put(new RdbSimpleTable(db.getSchemaName(), table.getTableName()), mysqlTable); + } catch (Exception e) { + log.error("init rdb table schema [{}] table [{}] fail", db.getSchemaName(), table.getTableName(), e); + } + } + + } + } + } + + private Map> queryTablePrimaryKey(String schema, List tables) throws SQLException { + Map> primaryKeys = new LinkedHashMap<>(); + String prepareTables = SqlUtils.genPrepareSqlOfInClause(tables.size()); + String sql = "select L.TABLE_NAME,L.COLUMN_NAME,R.CONSTRAINT_TYPE from " + + "INFORMATION_SCHEMA.KEY_COLUMN_USAGE L left join INFORMATION_SCHEMA.TABLE_CONSTRAINTS R on L" + + ".TABLE_SCHEMA = R.TABLE_SCHEMA and L.TABLE_NAME = R.TABLE_NAME and L.CONSTRAINT_CATALOG = R" + + ".CONSTRAINT_CATALOG and L.CONSTRAINT_SCHEMA = R.CONSTRAINT_SCHEMA and L.CONSTRAINT_NAME = R" + + ".CONSTRAINT_NAME where L.TABLE_SCHEMA = ? and L.TABLE_NAME in " + prepareTables + " and R" + + ".CONSTRAINT_TYPE IN ('PRIMARY KEY') order by L.ORDINAL_POSITION asc"; + try (PreparedStatement statement = dataSource.getConnection().prepareStatement(sql)) { + statement.setString(1, schema); + SqlUtils.setInClauseParameters(statement, 2, tables); + ResultSet resultSet = statement.executeQuery(); + if (resultSet == null) { + return null; + } + while (resultSet.next()) { + String tableName = resultSet.getString("TABLE_NAME"); + String colName = resultSet.getString("COLUMN_NAME"); + primaryKeys.compute(tableName, (k, v) -> { + if (v == null) { + v = new LinkedList<>(); + } + v.add(colName); + return v; + }); + } + resultSet.close(); + } + return primaryKeys; + } + + private Map> queryColumns(String schema, List tables) throws SQLException { + String prepareTables = SqlUtils.genPrepareSqlOfInClause(tables.size()); + String sql = "select TABLE_SCHEMA,TABLE_NAME,COLUMN_NAME,IS_NULLABLE,DATA_TYPE,CHARACTER_MAXIMUM_LENGTH," + + "CHARACTER_OCTET_LENGTH,NUMERIC_SCALE,NUMERIC_PRECISION,DATETIME_PRECISION,CHARACTER_SET_NAME," + + "COLLATION_NAME,COLUMN_TYPE,COLUMN_DEFAULT,COLUMN_COMMENT,ORDINAL_POSITION,EXTRA from " + + "INFORMATION_SCHEMA.COLUMNS where TABLE_SCHEMA = ? and TABLE_NAME in " + prepareTables + " order by " + "ORDINAL_POSITION asc"; + Map> cols = new LinkedHashMap<>(); + try (PreparedStatement statement = dataSource.getConnection().prepareStatement(sql)) { + statement.setString(1, schema); + SqlUtils.setInClauseParameters(statement, 2, tables); + ResultSet resultSet = statement.executeQuery(); + if (resultSet == null) { + return null; + } + while (resultSet.next()) { + String dataType = resultSet.getString("DATA_TYPE"); + JDBCType jdbcType = SqlUtils.toJDBCType(dataType); + MySQLColumnDef col = new MySQLColumnDef(); + col.setJdbcType(jdbcType); + col.setType(CanalMySQLType.valueOfCode(dataType)); + String colName = resultSet.getString("COLUMN_NAME"); + col.setName(colName); + String tableName = resultSet.getString("TABLE_NAME"); + cols.compute(tableName, (k, v) -> { + if (v == null) { + v = new LinkedList<>(); + } + v.add(col); + return v; + }); + } + resultSet.close(); + } + return cols; + } + + @Override + protected void shutdown() throws Exception { + + } +} diff --git a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/Connector.java b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/Connector.java index 11c2b77454..8ac09eac38 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/Connector.java +++ b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/Connector.java @@ -17,13 +17,14 @@ package org.apache.eventmesh.openconnect.api.connector; +import org.apache.eventmesh.common.ComponentLifeCycle; import org.apache.eventmesh.common.config.connector.Config; import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; /** * Connector */ -public interface Connector { +public interface Connector extends ComponentLifeCycle { /** * Returns the class type of the configuration for this Connector. @@ -52,13 +53,6 @@ public interface Connector { */ void init(ConnectorContext connectorContext) throws Exception; - /** - * Starts the Connector. - * - * @throws Exception if the start operation fails - */ - void start() throws Exception; - /** * Commits the specified ConnectRecord object. * @@ -73,11 +67,4 @@ public interface Connector { */ String name(); - /** - * Stops the Connector. - * - * @throws Exception if stopping fails - */ - void stop() throws Exception; - }