diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java index 80aec7bfe9..026f33f4fc 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java @@ -41,6 +41,8 @@ public class CanalSinkConfig extends SinkConfig { private boolean isGTIDMode = true; + private boolean isMariaDB = true; + // skip sink process exception private Boolean skipException = false; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java index 707f102901..8331d32cb7 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java @@ -47,6 +47,8 @@ public class CanalSourceConfig extends SourceConfig { private String serverUUID; + private boolean isMariaDB = true; + private boolean isGTIDMode = true; private Integer batchSize = 10000; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/enums/ConnectorStage.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/enums/ConnectorStage.java new file mode 100644 index 0000000000..90265fba4a --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/enums/ConnectorStage.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.enums; + +public enum ConnectorStage { + SOURCE, + SINK +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java new file mode 100644 index 0000000000..87f4581eb5 --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.remote.request; + +import lombok.Data; +import lombok.EqualsAndHashCode; + +@Data +@EqualsAndHashCode(callSuper = true) +public class ReportVerifyRequest extends BaseRemoteRequest { + + private String taskID; + + private String recordID; + + private String recordSig; + + private String connectorName; + + private String connectorStage; + + private String position; +} diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java index 5f3c0a2bca..8ecda8e125 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java @@ -476,9 +476,11 @@ private Exception doCall() { } JdbcTemplate template = dbDialect.getJdbcTemplate(); String sourceGtid = context.getGtid(); - if (StringUtils.isNotEmpty(sourceGtid)) { - String setGtid = "SET @@session.gtid_next = '" + sourceGtid + "';"; - template.execute(setGtid); + if (StringUtils.isNotEmpty(sourceGtid) && !sinkConfig.isMariaDB()) { + String setMySQLGtid = "SET @@session.gtid_next = '" + sourceGtid + "';"; + template.execute(setMySQLGtid); + } else if (StringUtils.isNotEmpty(sourceGtid) && sinkConfig.isMariaDB()) { + throw new RuntimeException("unsupport gtid mode for mariaDB"); } else { log.error("gtid is empty in gtid mode"); throw new RuntimeException("gtid is empty in gtid mode"); @@ -510,8 +512,13 @@ public void setValues(PreparedStatement ps) throws SQLException { }); // reset gtid - String resetGtid = "SET @@session.gtid_next = AUTOMATIC;"; - dbDialect.getJdbcTemplate().execute(resetGtid); + if (sinkConfig.isMariaDB()) { + throw new RuntimeException("unsupport gtid mode for mariaDB"); + } else { + String resetMySQLGtid = "SET @@session.gtid_next = 'AUTOMATIC';"; + dbDialect.getJdbcTemplate().execute(resetMySQLGtid); + } + error = null; exeResult = ExecuteResult.SUCCESS; } catch (DeadlockLoserDataAccessException ex) { diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java index 708d5d120c..5c4303588d 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java @@ -49,7 +49,7 @@ public class EntryParser { public static Map> parse(CanalSourceConfig sourceConfig, List datas, - RdbTableMgr tables) { + RdbTableMgr tables) { List recordList = new ArrayList<>(); List transactionDataBuffer = new ArrayList<>(); // need check weather the entry is loopback @@ -60,9 +60,9 @@ public static Map> parse(CanalSourceConfig source switch (entry.getEntryType()) { case ROWDATA: RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); - if (sourceConfig.getServerUUID() != null && sourceConfig.isGTIDMode()) { - String currentGtid = entry.getHeader().getPropsList().get(0).getValue(); - if (currentGtid.contains(sourceConfig.getServerUUID())) { + // don't support gtid for mariadb + if (sourceConfig.getServerUUID() != null && sourceConfig.isGTIDMode() && !sourceConfig.isMariaDB()) { + if (checkGtidForEntry(entry, sourceConfig)) { transactionDataBuffer.add(entry); } } else { @@ -90,9 +90,14 @@ public static Map> parse(CanalSourceConfig source return recordMap; } + private static boolean checkGtidForEntry(Entry entry, CanalSourceConfig sourceConfig) { + String currentGtid = entry.getHeader().getPropsList().get(0).getValue(); + return currentGtid.contains(sourceConfig.getServerUUID()); + } + private static void parseRecordListWithEntryBuffer(CanalSourceConfig sourceConfig, - List recordList, - List transactionDataBuffer, RdbTableMgr tables) { + List recordList, + List transactionDataBuffer, RdbTableMgr tables) { for (Entry bufferEntry : transactionDataBuffer) { List recordParsedList = internParse(sourceConfig, bufferEntry, tables); if (CollectionUtils.isEmpty(recordParsedList)) { @@ -130,7 +135,7 @@ private static Column getColumnIgnoreCase(List columns, String columName } private static List internParse(CanalSourceConfig sourceConfig, Entry entry, - RdbTableMgr tableMgr) { + RdbTableMgr tableMgr) { String schemaName = entry.getHeader().getSchemaName(); String tableName = entry.getHeader().getTableName(); if (tableMgr.getTable(schemaName, tableName) == null) { @@ -169,7 +174,7 @@ private static List internParse(CanalSourceConfig sourceConf } private static CanalConnectRecord internParse(CanalSourceConfig canalSourceConfig, Entry entry, - RowChange rowChange, RowData rowData) { + RowChange rowChange, RowData rowData) { CanalConnectRecord canalConnectRecord = new CanalConnectRecord(); canalConnectRecord.setTableName(entry.getHeader().getTableName()); canalConnectRecord.setSchemaName(entry.getHeader().getSchemaName()); @@ -179,10 +184,16 @@ private static CanalConnectRecord internParse(CanalSourceConfig canalSourceConfi canalConnectRecord.setBinLogOffset(entry.getHeader().getLogfileOffset()); // if enabled gtid mode, gtid not null if (canalSourceConfig.isGTIDMode()) { - String currentGtid = entry.getHeader().getPropsList().get(0).getValue(); - String gtidRange = replaceGtidRange(entry.getHeader().getGtid(), currentGtid, canalSourceConfig.getServerUUID()); - canalConnectRecord.setGtid(gtidRange); - canalConnectRecord.setCurrentGtid(currentGtid); + if (canalSourceConfig.isMariaDB()) { + String currentGtid = entry.getHeader().getGtid(); + canalConnectRecord.setGtid(currentGtid); + canalConnectRecord.setCurrentGtid(currentGtid); + } else { + String currentGtid = entry.getHeader().getPropsList().get(0).getValue(); + String gtidRange = replaceGtidRange(entry.getHeader().getGtid(), currentGtid, canalSourceConfig.getServerUUID()); + canalConnectRecord.setGtid(gtidRange); + canalConnectRecord.setCurrentGtid(currentGtid); + } } EventType eventType = canalConnectRecord.getEventType(); @@ -276,7 +287,7 @@ public static String replaceGtidRange(String gtid, String currentGtid, String se } private static void checkUpdateKeyColumns(Map oldKeyColumns, - Map keyColumns) { + Map keyColumns) { if (oldKeyColumns.isEmpty()) { return; } diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java index 6cd575cb77..f3f8b2e160 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java @@ -198,13 +198,16 @@ private Canal buildCanal(CanalSourceConfig sourceConfig) { recordPositionMap.put("journalName", canalRecordPartition.getJournalName()); recordPositionMap.put("timestamp", canalRecordPartition.getTimeStamp()); recordPositionMap.put("position", canalRecordOffset.getOffset()); - String gtidRange = canalRecordOffset.getGtid(); - if (gtidRange != null) { - if (canalRecordOffset.getCurrentGtid() != null) { - gtidRange = EntryParser.replaceGtidRange(canalRecordOffset.getGtid(), canalRecordOffset.getCurrentGtid(), - sourceConfig.getServerUUID()); + // for mariaDB not support gtid mode + if (sourceConfig.isGTIDMode() && !sourceConfig.isMariaDB()) { + String gtidRange = canalRecordOffset.getGtid(); + if (gtidRange != null) { + if (canalRecordOffset.getCurrentGtid() != null) { + gtidRange = EntryParser.replaceGtidRange(canalRecordOffset.getGtid(), canalRecordOffset.getCurrentGtid(), + sourceConfig.getServerUUID()); + } + recordPositionMap.put("gtid", gtidRange); } - recordPositionMap.put("gtid", gtidRange); } positions.add(JsonUtils.toJSONString(recordPositionMap)); }); diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java index 65676903dd..1605319862 100644 --- a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java +++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java @@ -25,6 +25,7 @@ import org.apache.eventmesh.common.config.connector.SinkConfig; import org.apache.eventmesh.common.config.connector.SourceConfig; import org.apache.eventmesh.common.config.connector.offset.OffsetStorageConfig; +import org.apache.eventmesh.common.enums.ConnectorStage; import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc; import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc.AdminServiceBlockingStub; import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc.AdminServiceStub; @@ -32,6 +33,7 @@ import org.apache.eventmesh.common.protocol.grpc.adminserver.Payload; import org.apache.eventmesh.common.remote.request.FetchJobRequest; import org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest; +import org.apache.eventmesh.common.remote.request.ReportVerifyRequest; import org.apache.eventmesh.common.remote.response.FetchJobResponse; import org.apache.eventmesh.common.utils.IPUtils; import org.apache.eventmesh.common.utils.JsonUtils; @@ -55,10 +57,13 @@ import org.apache.commons.collections4.CollectionUtils; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -281,8 +286,9 @@ public void start() throws Exception { try { this.stop(); } catch (Exception ex) { - throw new RuntimeException(ex); + log.error("Failed to stop after exception", ex); } + throw new RuntimeException(e); } }); // start @@ -294,8 +300,9 @@ public void start() throws Exception { try { this.stop(); } catch (Exception ex) { - throw new RuntimeException(ex); + log.error("Failed to stop after exception", ex); } + throw new RuntimeException(e); } }); } @@ -304,6 +311,8 @@ public void start() throws Exception { public void stop() throws Exception { sourceConnector.stop(); sinkConnector.stop(); + sourceService.shutdown(); + sinkService.shutdown(); heartBeatExecutor.shutdown(); requestObserver.onCompleted(); if (channel != null && !channel.isShutdown()) { @@ -318,6 +327,11 @@ private void startSourceConnector() throws Exception { // TODO: use producer pub record to storage replace below if (connectorRecordList != null && !connectorRecordList.isEmpty()) { for (ConnectRecord record : connectorRecordList) { + // if enabled incremental data reporting consistency check + if (connectorRuntimeConfig.enableIncrementalDataConsistencyCheck) { + reportVerifyRequest(record, connectorRuntimeConfig, ConnectorStage.SOURCE); + } + queue.put(record); Optional submittedRecordPosition = prepareToUpdateRecordOffset(record); Optional callback = @@ -336,6 +350,43 @@ private void startSourceConnector() throws Exception { } } + private void reportVerifyRequest(ConnectRecord record, ConnectorRuntimeConfig connectorRuntimeConfig, ConnectorStage connectorStage) { + UUID uuid = UUID.randomUUID(); + String recordId = uuid.toString(); + String md5Str = md5(record.toString()); + ReportVerifyRequest reportVerifyRequest = new ReportVerifyRequest(); + reportVerifyRequest.setTaskID(connectorRuntimeConfig.getTaskID()); + reportVerifyRequest.setRecordID(recordId); + reportVerifyRequest.setRecordSig(md5Str); + reportVerifyRequest.setConnectorName( + IPUtils.getLocalAddress() + "_" + connectorRuntimeConfig.getJobID() + "_" + connectorRuntimeConfig.getRegion()); + reportVerifyRequest.setConnectorStage(connectorStage.name()); + reportVerifyRequest.setPosition(JsonUtils.toJSONString(record.getPosition())); + + Metadata metadata = Metadata.newBuilder().setType(ReportVerifyRequest.class.getSimpleName()).build(); + + Payload request = Payload.newBuilder().setMetadata(metadata) + .setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(Objects.requireNonNull(JsonUtils.toJSONBytes(reportVerifyRequest)))) + .build()) + .build(); + + requestObserver.onNext(request); + } + + private String md5(String input) { + try { + MessageDigest md = MessageDigest.getInstance("MD5"); + byte[] messageDigest = md.digest(input.getBytes()); + StringBuilder sb = new StringBuilder(); + for (byte b : messageDigest) { + sb.append(String.format("%02x", b)); + } + return sb.toString(); + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException(e); + } + } + public Optional prepareToUpdateRecordOffset(ConnectRecord record) { return Optional.of(this.offsetManagement.submitRecord(record.getPosition())); } @@ -426,6 +477,10 @@ private void startSinkConnector() throws Exception { List connectRecordList = new ArrayList<>(); connectRecordList.add(connectRecord); sinkConnector.put(connectRecordList); + // if enabled incremental data reporting consistency check + if (connectorRuntimeConfig.enableIncrementalDataConsistencyCheck) { + reportVerifyRequest(connectRecord, connectorRuntimeConfig, ConnectorStage.SINK); + } } } } diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntimeConfig.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntimeConfig.java index 901defc47d..5a58cce08e 100644 --- a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntimeConfig.java +++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntimeConfig.java @@ -31,8 +31,12 @@ public class ConnectorRuntimeConfig { private String connectorRuntimeInstanceId; + private String taskID; + private String jobID; + private String region; + private String sourceConnectorType; private String sourceConnectorDesc; @@ -45,4 +49,6 @@ public class ConnectorRuntimeConfig { private Map sinkConnectorConfig; + public boolean enableIncrementalDataConsistencyCheck = true; + } diff --git a/eventmesh-runtime-v2/src/main/resources/connector.yaml b/eventmesh-runtime-v2/src/main/resources/connector.yaml index bc7bc20756..bf7f58028b 100644 --- a/eventmesh-runtime-v2/src/main/resources/connector.yaml +++ b/eventmesh-runtime-v2/src/main/resources/connector.yaml @@ -15,4 +15,6 @@ # limitations under the License. # +taskID: 1 jobID: 1 +region: region1