diff --git a/eventmesh-admin-server/conf/eventmesh.sql b/eventmesh-admin-server/conf/eventmesh.sql
index 3b6fc9b777..226101661c 100644
--- a/eventmesh-admin-server/conf/eventmesh.sql
+++ b/eventmesh-admin-server/conf/eventmesh.sql
@@ -71,8 +71,11 @@ CREATE TABLE IF NOT EXISTS `event_mesh_job_info` (
CREATE TABLE IF NOT EXISTS `event_mesh_mysql_position` (
`id` int unsigned NOT NULL AUTO_INCREMENT,
`jobID` int unsigned NOT NULL,
+ `serverUUID` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
`address` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
`position` bigint DEFAULT NULL,
+ `gtid` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
+ `currentGtid` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
`timestamp` bigint DEFAULT NULL,
`journalName` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
`createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
diff --git a/eventmesh-admin-server/conf/mapper/EventMeshMysqlPositionMapper.xml b/eventmesh-admin-server/conf/mapper/EventMeshMysqlPositionMapper.xml
index bc3a3292a2..cbb7c094d8 100644
--- a/eventmesh-admin-server/conf/mapper/EventMeshMysqlPositionMapper.xml
+++ b/eventmesh-admin-server/conf/mapper/EventMeshMysqlPositionMapper.xml
@@ -16,24 +16,28 @@
limitations under the License.
-->
+ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
+ "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
- id,jobID,address,
- position,timestamp,journalName,
+ id
+ ,jobID,serverUUID,address,
+ position,gtid,currentGtid,timestamp,journalName,
createTime,updateTime
diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshMysqlPosition.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshMysqlPosition.java
index ffe3e446d4..65a38b54b5 100644
--- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshMysqlPosition.java
+++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshMysqlPosition.java
@@ -38,10 +38,16 @@ public class EventMeshMysqlPosition implements Serializable {
private Integer jobID;
+ private String serverUUID;
+
private String address;
private Long position;
+ private String gtid;
+
+ private String currentGtid;
+
private Long timestamp;
private String journalName;
diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java
index 525fe02c0d..f2c174c3b7 100644
--- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java
+++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java
@@ -115,9 +115,12 @@ public boolean handler(ReportPositionRequest request, Metadata metadata) {
CanalRecordOffset offset = (CanalRecordOffset) recordPosition.getRecordOffset();
if (offset != null) {
position.setPosition(offset.getOffset());
+ position.setGtid(offset.getGtid());
+ position.setCurrentGtid(offset.getCurrentGtid());
}
CanalRecordPartition partition = (CanalRecordPartition) recordPosition.getRecordPartition();
if (partition != null) {
+ position.setServerUUID(partition.getServerUUID());
position.setTimestamp(partition.getTimeStamp());
position.setJournalName(partition.getJournalName());
}
@@ -148,13 +151,16 @@ public List handler(FetchPositionRequest request, Metadata metad
request.getJobID()));
List recordPositionList = new ArrayList<>();
for (EventMeshMysqlPosition position : positionList) {
- RecordPosition recordPosition = new RecordPosition();
CanalRecordPartition partition = new CanalRecordPartition();
partition.setTimeStamp(position.getTimestamp());
partition.setJournalName(position.getJournalName());
+ partition.setServerUUID(position.getServerUUID());
+ RecordPosition recordPosition = new RecordPosition();
recordPosition.setRecordPartition(partition);
CanalRecordOffset offset = new CanalRecordOffset();
offset.setOffset(position.getPosition());
+ offset.setGtid(position.getGtid());
+ offset.setCurrentGtid(position.getCurrentGtid());
recordPosition.setRecordOffset(offset);
recordPositionList.add(recordPosition);
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java
index 85484b2ce9..80aec7bfe9 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java
@@ -39,6 +39,8 @@ public class CanalSinkConfig extends SinkConfig {
// sync mode: field/row
private SyncMode syncMode;
+ private boolean isGTIDMode = true;
+
// skip sink process exception
private Boolean skipException = false;
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java
index d75ceb6b58..707f102901 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java
@@ -45,6 +45,10 @@ public class CanalSourceConfig extends SourceConfig {
private Short clientId;
+ private String serverUUID;
+
+ private boolean isGTIDMode = true;
+
private Integer batchSize = 10000;
private Long batchTimeout = -1L;
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalRecordOffset.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalRecordOffset.java
index 90c94c99bd..d0f2053f4d 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalRecordOffset.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalRecordOffset.java
@@ -30,6 +30,11 @@ public class CanalRecordOffset extends RecordOffset {
private Long offset;
+ // mysql instance gtid range
+ private String gtid;
+
+ private String currentGtid;
+
@Override
public Class extends RecordOffset> getRecordOffsetClass() {
return CanalRecordOffset.class;
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalRecordPartition.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalRecordPartition.java
index 72d404bab9..ded82306e3 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalRecordPartition.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalRecordPartition.java
@@ -29,6 +29,8 @@
@ToString
public class CanalRecordPartition extends RecordPartition {
+ private String serverUUID;
+
private String journalName;
private Long timeStamp;
diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/CanalConnectRecord.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/CanalConnectRecord.java
index a723b24dc3..36ecd158f6 100644
--- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/CanalConnectRecord.java
+++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/CanalConnectRecord.java
@@ -31,8 +31,14 @@
public class CanalConnectRecord {
private String schemaName;
+
private String tableName;
+ // mysql instance gtid range
+ private String gtid;
+
+ private String currentGtid;
+
/**
* The business type of the changed data (I/U/D/C/A/E), consistent with the EventType defined in EntryProtocol in canal.
*/
diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/AbstractDbDialect.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/AbstractDbDialect.java
index f5c2245b9f..4cf0f82ec9 100644
--- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/AbstractDbDialect.java
+++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/AbstractDbDialect.java
@@ -97,10 +97,6 @@ public SqlTemplate getSqlTemplate() {
return sqlTemplate;
}
- public boolean isDRDS() {
- return false;
- }
-
public String getShardColumns(String schema, String table) {
return null;
}
diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/DbDialect.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/DbDialect.java
index a18edfd5b2..781c2fe954 100644
--- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/DbDialect.java
+++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/DbDialect.java
@@ -48,8 +48,6 @@ public interface DbDialect {
public boolean isSupportMergeSql();
- public boolean isDRDS();
-
public LobHandler getLobHandler();
public JdbcTemplate getJdbcTemplate();
diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java
index acd491ba64..bfe5628716 100644
--- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java
+++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java
@@ -50,10 +50,6 @@ public String getDefaultSchema() {
return null;
}
- public boolean isDRDS() {
- return false;
- }
-
public String getDefaultCatalog() {
return jdbcTemplate.queryForObject("select database()", String.class);
}
diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java
index 24d6b42f8b..0ad07577f9 100644
--- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java
+++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java
@@ -51,35 +51,21 @@ public boolean before(CanalSinkConfig sinkConfig, CanalConnectRecord record) {
String shardColumns = null;
if (type.isInsert()) {
- if (CollectionUtils.isEmpty(record.getColumns())
- && (dbDialect.isDRDS())) {
- // sql
- sql = sqlTemplate.getInsertSql(schemaName,
- record.getTableName(),
- buildColumnNames(record.getKeys()),
- buildColumnNames(record.getColumns()));
- } else {
- sql = sqlTemplate.getMergeSql(schemaName,
+ sql = sqlTemplate.getMergeSql(schemaName,
record.getTableName(),
buildColumnNames(record.getKeys()),
buildColumnNames(record.getColumns()),
new String[] {},
- !dbDialect.isDRDS(),
+ true,
shardColumns);
- }
} else if (type.isUpdate()) {
-
boolean existOldKeys = !CollectionUtils.isEmpty(record.getOldKeys());
boolean rowMode = sinkConfig.getSyncMode().isRow();
String[] keyColumns = null;
String[] otherColumns = null;
if (existOldKeys) {
keyColumns = buildColumnNames(record.getOldKeys());
- if (dbDialect.isDRDS()) {
- otherColumns = buildColumnNames(record.getUpdatedColumns(), record.getUpdatedKeys());
- } else {
- otherColumns = buildColumnNames(record.getUpdatedColumns(), record.getKeys());
- }
+ otherColumns = buildColumnNames(record.getUpdatedColumns(), record.getKeys());
} else {
keyColumns = buildColumnNames(record.getKeys());
otherColumns = buildColumnNames(record.getUpdatedColumns());
@@ -91,10 +77,10 @@ public boolean before(CanalSinkConfig sinkConfig, CanalConnectRecord record) {
keyColumns,
otherColumns,
new String[] {},
- !dbDialect.isDRDS(),
+ true,
shardColumns);
} else {
- sql = sqlTemplate.getUpdateSql(schemaName, record.getTableName(), keyColumns, otherColumns, !dbDialect.isDRDS(), shardColumns);
+ sql = sqlTemplate.getUpdateSql(schemaName, record.getTableName(), keyColumns, otherColumns, true, shardColumns);
}
} else if (type.isDelete()) {
sql = sqlTemplate.getDeleteSql(schemaName,
diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/DbLoadContext.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/DbLoadContext.java
index 561d894870..3498e87e7b 100644
--- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/DbLoadContext.java
+++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/DbLoadContext.java
@@ -28,6 +28,8 @@
@Data
public class DbLoadContext {
+ private String gtid;
+
private List lastProcessedRecords;
private List prepareRecords;
diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/GtidBatch.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/GtidBatch.java
new file mode 100644
index 0000000000..dd6559b832
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/GtidBatch.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.canal.sink;
+
+import org.apache.eventmesh.connector.canal.CanalConnectRecord;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+public class GtidBatch {
+ private int totalBatches;
+ private List> batches;
+ private int receivedBatchCount;
+
+ public GtidBatch(int totalBatches) {
+ this.totalBatches = totalBatches;
+ this.batches = new CopyOnWriteArrayList<>(new List[totalBatches]);
+ this.receivedBatchCount = 0;
+ }
+
+ public void addBatch(int batchIndex, List batchRecords) {
+ batches.set(batchIndex, batchRecords);
+ receivedBatchCount++;
+ }
+
+ public List> getBatches() {
+ return batches;
+ }
+
+ public boolean isComplete() {
+ return receivedBatchCount == totalBatches;
+ }
+}
diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/GtidBatchManager.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/GtidBatchManager.java
new file mode 100644
index 0000000000..30060aa8f5
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/GtidBatchManager.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.canal.sink;
+
+import org.apache.eventmesh.connector.canal.CanalConnectRecord;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class GtidBatchManager {
+
+ private static ConcurrentHashMap gtidBatchMap = new ConcurrentHashMap<>();
+
+ public static void addBatch(String gtid, int batchIndex, int totalBatches, List batchRecords) {
+ gtidBatchMap.computeIfAbsent(gtid, k -> new GtidBatch(totalBatches)).addBatch(batchIndex, batchRecords);
+ }
+
+ public static GtidBatch getGtidBatch(String gtid) {
+ return gtidBatchMap.get(gtid);
+ }
+
+ public static boolean isComplete(String gtid) {
+ GtidBatch batch = gtidBatchMap.get(gtid);
+ return batch != null && batch.isComplete();
+ }
+
+ public static void removeGtidBatch(String gtid) {
+ gtidBatchMap.remove(gtid);
+ }
+}
diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java
index 8f9df7595b..5f3c0a2bca 100644
--- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java
+++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java
@@ -31,6 +31,8 @@
import org.apache.eventmesh.connector.canal.sink.DbLoadData;
import org.apache.eventmesh.connector.canal.sink.DbLoadData.TableLoadData;
import org.apache.eventmesh.connector.canal.sink.DbLoadMerger;
+import org.apache.eventmesh.connector.canal.sink.GtidBatch;
+import org.apache.eventmesh.connector.canal.sink.GtidBatchManager;
import org.apache.eventmesh.connector.canal.source.table.RdbTableMgr;
import org.apache.eventmesh.openconnect.api.ConnectorCreateService;
import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
@@ -38,7 +40,6 @@
import org.apache.eventmesh.openconnect.api.sink.Sink;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
-
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
@@ -52,6 +53,7 @@
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -86,9 +88,12 @@ public class CanalSinkConnector implements Sink, ConnectorCreateService {
private ExecutorService executor;
+ private ExecutorService gtidSingleExecutor;
+
private int batchSize = 50;
private boolean useBatch = true;
+
private RdbTableMgr tableMgr;
@Override
@@ -123,6 +128,7 @@ public void init(ConnectorContext connectorContext) throws Exception {
new ArrayBlockingQueue<>(sinkConfig.getPoolSize() * 4),
new NamedThreadFactory("canalSink"),
new ThreadPoolExecutor.CallerRunsPolicy());
+ gtidSingleExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, "gtidSingleExecutor"));
}
@Override
@@ -143,6 +149,7 @@ public String name() {
@Override
public void stop() {
executor.shutdown();
+ gtidSingleExecutor.shutdown();
}
@Override
@@ -153,6 +160,8 @@ public void put(List sinkRecords) {
canalConnectRecordList = filterRecord(canalConnectRecordList);
if (isDdlDatas(canalConnectRecordList)) {
doDdl(context, canalConnectRecordList);
+ } else if (sinkConfig.isGTIDMode()) {
+ doLoadWithGtid(context, sinkConfig, connectRecord);
} else {
canalConnectRecordList = DbLoadMerger.merge(canalConnectRecordList);
@@ -257,6 +266,57 @@ private void doLoad(DbLoadContext context, CanalSinkConfig sinkConfig, DbLoadDat
batchDatas.clear();
}
+ private void doLoadWithGtid(DbLoadContext context, CanalSinkConfig sinkConfig, ConnectRecord connectRecord) {
+ int batchIndex = connectRecord.getExtension("batchIndex", Integer.class);
+ int totalBatches = connectRecord.getExtension("totalBatches", Integer.class);
+ List canalConnectRecordList = (List) connectRecord.getData();
+ String gtid = canalConnectRecordList.get(0).getCurrentGtid();
+ GtidBatchManager.addBatch(gtid, batchIndex, totalBatches, canalConnectRecordList);
+ // check whether the batch is complete
+ if (GtidBatchManager.isComplete(gtid)) {
+ GtidBatch batch = GtidBatchManager.getGtidBatch(gtid);
+ List> totalRows = batch.getBatches();
+ List filteredRows = new ArrayList<>();
+ for (List canalConnectRecords : totalRows) {
+ canalConnectRecords = filterRecord(canalConnectRecords);
+ if (!CollectionUtils.isEmpty(canalConnectRecords)) {
+ for (final CanalConnectRecord record : canalConnectRecords) {
+ boolean filter = interceptor.before(sinkConfig, record);
+ filteredRows.add(record);
+ }
+ }
+ }
+ context.setGtid(gtid);
+ Future result = gtidSingleExecutor.submit(new DbLoadWorker(context, filteredRows, dbDialect, false, sinkConfig));
+ Exception ex = null;
+ try {
+ ex = result.get();
+ } catch (Exception e) {
+ ex = e;
+ }
+ Boolean skipException = sinkConfig.getSkipException();
+ if (skipException != null && skipException) {
+ if (ex != null) {
+ // do skip
+ log.warn("skip exception for data : {} , caused by {}",
+ filteredRows,
+ ExceptionUtils.getFullStackTrace(ex));
+ GtidBatchManager.removeGtidBatch(gtid);
+ }
+ } else {
+ if (ex != null) {
+ log.error("sink connector will shutdown by " + ex.getMessage(), ExceptionUtils.getFullStackTrace(ex));
+ gtidSingleExecutor.shutdown();
+ System.exit(1);
+ } else {
+ GtidBatchManager.removeGtidBatch(gtid);
+ }
+ }
+ } else {
+ log.info("Batch received, waiting for other batches.");
+ }
+ }
+
private List> split(List records) {
List> result = new ArrayList<>();
if (records == null || records.isEmpty()) {
@@ -296,12 +356,12 @@ private boolean canBatch(CanalConnectRecord source, CanalConnectRecord target) {
}
private void doTwoPhase(DbLoadContext context, CanalSinkConfig sinkConfig, List> totalRows, boolean canBatch) {
- List> results = new ArrayList>();
+ List> results = new ArrayList<>();
for (List rows : totalRows) {
if (CollectionUtils.isEmpty(rows)) {
continue;
}
- results.add(executor.submit(new DbLoadWorker(context, rows, dbDialect, canBatch)));
+ results.add(executor.submit(new DbLoadWorker(context, rows, dbDialect, canBatch, sinkConfig)));
}
boolean partFailed = false;
@@ -330,7 +390,7 @@ private void doTwoPhase(DbLoadContext context, CanalSinkConfig sinkConfig, List<
Boolean skipException = sinkConfig.getSkipException();
if (skipException != null && skipException) {
for (CanalConnectRecord retryRecord : retryRecords) {
- DbLoadWorker worker = new DbLoadWorker(context, Arrays.asList(retryRecord), dbDialect, false);
+ DbLoadWorker worker = new DbLoadWorker(context, Arrays.asList(retryRecord), dbDialect, false, sinkConfig);
try {
Exception ex = worker.call();
if (ex != null) {
@@ -347,7 +407,7 @@ private void doTwoPhase(DbLoadContext context, CanalSinkConfig sinkConfig, List<
}
}
} else {
- DbLoadWorker worker = new DbLoadWorker(context, retryRecords, dbDialect, false);
+ DbLoadWorker worker = new DbLoadWorker(context, retryRecords, dbDialect, false, sinkConfig);
try {
Exception ex = worker.call();
if (ex != null) {
@@ -355,7 +415,9 @@ private void doTwoPhase(DbLoadContext context, CanalSinkConfig sinkConfig, List<
}
} catch (Exception ex) {
log.error("##load phase two failed!", ex);
- throw new RuntimeException(ex);
+ log.error("sink connector will shutdown by " + ex.getMessage(), ex);
+ executor.shutdown();
+ System.exit(1);
}
}
}
@@ -371,16 +433,21 @@ class DbLoadWorker implements Callable {
private final DbDialect dbDialect;
private final List records;
private final boolean canBatch;
+
+ private final CanalSinkConfig sinkConfig;
+
private final List allFailedRecords = new ArrayList<>();
private final List allProcessedRecords = new ArrayList<>();
private final List processedRecords = new ArrayList<>();
private final List failedRecords = new ArrayList<>();
- public DbLoadWorker(DbLoadContext context, List records, DbDialect dbDialect, boolean canBatch) {
+ public DbLoadWorker(DbLoadContext context, List records, DbDialect dbDialect, boolean canBatch,
+ CanalSinkConfig sinkConfig) {
this.context = context;
this.records = records;
this.canBatch = canBatch;
this.dbDialect = dbDialect;
+ this.sinkConfig = sinkConfig;
}
public Exception call() throws Exception {
@@ -394,132 +461,239 @@ public Exception call() throws Exception {
private Exception doCall() {
RuntimeException error = null;
ExecuteResult exeResult = null;
- int index = 0;
- while (index < records.size()) {
- final List splitDatas = new ArrayList<>();
- if (useBatch && canBatch) {
- int end = Math.min(index + batchSize, records.size());
- splitDatas.addAll(records.subList(index, end));
- index = end;
- } else {
- splitDatas.add(records.get(index));
- index = index + 1;
- }
+ if (sinkConfig.isGTIDMode()) {
int retryCount = 0;
- while (true) {
- try {
- if (!CollectionUtils.isEmpty(failedRecords)) {
- splitDatas.clear();
- splitDatas.addAll(failedRecords);
- } else {
- failedRecords.addAll(splitDatas);
+ final List toExecuteRecords = new ArrayList<>();
+ try {
+ if (!CollectionUtils.isEmpty(failedRecords)) {
+ // if failedRecords not empty, make it retry
+ toExecuteRecords.addAll(failedRecords);
+ } else {
+ toExecuteRecords.addAll(records);
+ // add to failed record first, maybe get lob or datasource error
+ failedRecords.addAll(toExecuteRecords);
+ }
+ JdbcTemplate template = dbDialect.getJdbcTemplate();
+ String sourceGtid = context.getGtid();
+ if (StringUtils.isNotEmpty(sourceGtid)) {
+ String setGtid = "SET @@session.gtid_next = '" + sourceGtid + "';";
+ template.execute(setGtid);
+ } else {
+ log.error("gtid is empty in gtid mode");
+ throw new RuntimeException("gtid is empty in gtid mode");
+ }
+
+ final LobCreator lobCreator = dbDialect.getLobHandler().getLobCreator();
+ int affect = (Integer) dbDialect.getTransactionTemplate().execute((TransactionCallback) status -> {
+ try {
+ failedRecords.clear();
+ processedRecords.clear();
+ int affect1 = 0;
+ for (CanalConnectRecord record : toExecuteRecords) {
+ int affects = template.update(record.getSql(), new PreparedStatementSetter() {
+ public void setValues(PreparedStatement ps) throws SQLException {
+ doPreparedStatement(ps, dbDialect, lobCreator, record);
+ }
+ });
+ affect1 = affect1 + affects;
+ processStat(record, affects, false);
+ }
+ return affect1;
+ } catch (Exception e) {
+ // rollback
+ status.setRollbackOnly();
+ throw new RuntimeException("Failed to executed", e);
+ } finally {
+ lobCreator.close();
}
+ });
+
+ // reset gtid
+ String resetGtid = "SET @@session.gtid_next = AUTOMATIC;";
+ dbDialect.getJdbcTemplate().execute(resetGtid);
+ error = null;
+ exeResult = ExecuteResult.SUCCESS;
+ } catch (DeadlockLoserDataAccessException ex) {
+ error = new RuntimeException(ExceptionUtils.getFullStackTrace(ex));
+ exeResult = ExecuteResult.RETRY;
+ } catch (Throwable ex) {
+ error = new RuntimeException(ExceptionUtils.getFullStackTrace(ex));
+ exeResult = ExecuteResult.ERROR;
+ }
- final LobCreator lobCreator = dbDialect.getLobHandler().getLobCreator();
- if (useBatch && canBatch) {
- final String sql = splitDatas.get(0).getSql();
- int[] affects = new int[splitDatas.size()];
- affects = (int[]) dbDialect.getTransactionTemplate().execute((TransactionCallback) status -> {
- try {
- failedRecords.clear();
- processedRecords.clear();
- JdbcTemplate template = dbDialect.getJdbcTemplate();
- int[] affects1 = template.batchUpdate(sql, new BatchPreparedStatementSetter() {
-
- public void setValues(PreparedStatement ps, int idx) throws SQLException {
- doPreparedStatement(ps, dbDialect, lobCreator, splitDatas.get(idx));
- }
-
- public int getBatchSize() {
- return splitDatas.size();
- }
- });
- return affects1;
- } finally {
- lobCreator.close();
- }
- });
+ if (ExecuteResult.SUCCESS == exeResult) {
+ allFailedRecords.addAll(failedRecords);
+ allProcessedRecords.addAll(processedRecords);
+ failedRecords.clear();
+ processedRecords.clear();
+ } else if (ExecuteResult.RETRY == exeResult) {
+ retryCount = retryCount + 1;
+ processedRecords.clear();
+ failedRecords.clear();
+ failedRecords.addAll(toExecuteRecords);
+ int retry = 3;
+ if (retryCount >= retry) {
+ processFailedDatas(toExecuteRecords.size());
+ throw new RuntimeException(String.format("execute retry %s times failed", retryCount), error);
+ } else {
+ try {
+ int retryWait = 3000;
+ int wait = retryCount * retryWait;
+ wait = Math.max(wait, retryWait);
+ Thread.sleep(wait);
+ } catch (InterruptedException ex) {
+ Thread.interrupted();
+ processFailedDatas(toExecuteRecords.size());
+ throw new RuntimeException(ex);
+ }
+ }
+ } else {
+ processedRecords.clear();
+ failedRecords.clear();
+ failedRecords.addAll(toExecuteRecords);
+ processFailedDatas(toExecuteRecords.size());
+ throw error;
+ }
+ } else {
+ int index = 0;
+ while (index < records.size()) {
+ final List toExecuteRecords = new ArrayList<>();
+ if (useBatch && canBatch) {
+ int end = Math.min(index + batchSize, records.size());
+ toExecuteRecords.addAll(records.subList(index, end));
+ index = end;
+ } else {
+ toExecuteRecords.add(records.get(index));
+ index = index + 1;
+ }
- for (int i = 0; i < splitDatas.size(); i++) {
- assert affects != null;
- processStat(splitDatas.get(i), affects[i], true);
+ int retryCount = 0;
+ while (true) {
+ try {
+ if (!CollectionUtils.isEmpty(failedRecords)) {
+ toExecuteRecords.clear();
+ toExecuteRecords.addAll(failedRecords);
+ } else {
+ failedRecords.addAll(toExecuteRecords);
}
- } else {
- final CanalConnectRecord record = splitDatas.get(0);
- int affect = 0;
- affect = (Integer) dbDialect.getTransactionTemplate().execute((TransactionCallback) status -> {
- try {
- failedRecords.clear();
- processedRecords.clear();
- JdbcTemplate template = dbDialect.getJdbcTemplate();
- int affect1 = template.update(record.getSql(), new PreparedStatementSetter() {
-
- public void setValues(PreparedStatement ps) throws SQLException {
- doPreparedStatement(ps, dbDialect, lobCreator, record);
- }
- });
- return affect1;
- } finally {
- lobCreator.close();
+
+ final LobCreator lobCreator = dbDialect.getLobHandler().getLobCreator();
+ if (useBatch && canBatch) {
+ JdbcTemplate template = dbDialect.getJdbcTemplate();
+ final String sql = toExecuteRecords.get(0).getSql();
+
+ int[] affects = new int[toExecuteRecords.size()];
+
+ affects = (int[]) dbDialect.getTransactionTemplate().execute((TransactionCallback) status -> {
+ try {
+ failedRecords.clear();
+ processedRecords.clear();
+ int[] affects1 = template.batchUpdate(sql, new BatchPreparedStatementSetter() {
+
+ public void setValues(PreparedStatement ps, int idx) throws SQLException {
+ doPreparedStatement(ps, dbDialect, lobCreator, toExecuteRecords.get(idx));
+ }
+
+ public int getBatchSize() {
+ return toExecuteRecords.size();
+ }
+ });
+ return affects1;
+ } catch (Exception e) {
+ // rollback
+ status.setRollbackOnly();
+ throw new RuntimeException("Failed to execute batch with GTID", e);
+ } finally {
+ lobCreator.close();
+ }
+ });
+
+ for (int i = 0; i < toExecuteRecords.size(); i++) {
+ assert affects != null;
+ processStat(toExecuteRecords.get(i), affects[i], true);
}
- });
- processStat(record, affect, false);
- }
+ } else {
+ final CanalConnectRecord record = toExecuteRecords.get(0);
+ JdbcTemplate template = dbDialect.getJdbcTemplate();
+ int affect = 0;
+ affect = (Integer) dbDialect.getTransactionTemplate().execute((TransactionCallback) status -> {
+ try {
+ failedRecords.clear();
+ processedRecords.clear();
+ int affect1 = template.update(record.getSql(), new PreparedStatementSetter() {
+
+ public void setValues(PreparedStatement ps) throws SQLException {
+ doPreparedStatement(ps, dbDialect, lobCreator, record);
+ }
+ });
+ return affect1;
+ } catch (Exception e) {
+ // rollback
+ status.setRollbackOnly();
+ throw new RuntimeException("Failed to executed", e);
+ } finally {
+ lobCreator.close();
+ }
+ });
+ processStat(record, affect, false);
+ }
- error = null;
- exeResult = ExecuteResult.SUCCESS;
- } catch (DeadlockLoserDataAccessException ex) {
- error = new RuntimeException(ExceptionUtils.getFullStackTrace(ex));
- exeResult = ExecuteResult.RETRY;
- } catch (Throwable ex) {
- error = new RuntimeException(ExceptionUtils.getFullStackTrace(ex));
- exeResult = ExecuteResult.ERROR;
- }
+ error = null;
+ exeResult = ExecuteResult.SUCCESS;
+ } catch (DeadlockLoserDataAccessException ex) {
+ error = new RuntimeException(ExceptionUtils.getFullStackTrace(ex));
+ exeResult = ExecuteResult.RETRY;
+ } catch (Throwable ex) {
+ error = new RuntimeException(ExceptionUtils.getFullStackTrace(ex));
+ exeResult = ExecuteResult.ERROR;
+ }
- if (ExecuteResult.SUCCESS == exeResult) {
- allFailedRecords.addAll(failedRecords);
- allProcessedRecords.addAll(processedRecords);
- failedRecords.clear();
- processedRecords.clear();
- break; // do next eventData
- } else if (ExecuteResult.RETRY == exeResult) {
- retryCount = retryCount + 1;
- processedRecords.clear();
- failedRecords.clear();
- failedRecords.addAll(splitDatas);
- int retry = 3;
- if (retryCount >= retry) {
- processFailedDatas(index);
- throw new RuntimeException(String.format("execute retry %s times failed", retryCount), error);
- } else {
- try {
- int retryWait = 3000;
- int wait = retryCount * retryWait;
- wait = Math.max(wait, retryWait);
- Thread.sleep(wait);
- } catch (InterruptedException ex) {
- Thread.interrupted();
+ if (ExecuteResult.SUCCESS == exeResult) {
+ allFailedRecords.addAll(failedRecords);
+ allProcessedRecords.addAll(processedRecords);
+ failedRecords.clear();
+ processedRecords.clear();
+ break; // do next eventData
+ } else if (ExecuteResult.RETRY == exeResult) {
+ retryCount = retryCount + 1;
+ processedRecords.clear();
+ failedRecords.clear();
+ failedRecords.addAll(toExecuteRecords);
+ int retry = 3;
+ if (retryCount >= retry) {
processFailedDatas(index);
- throw new RuntimeException(ex);
+ throw new RuntimeException(String.format("execute retry %s times failed", retryCount), error);
+ } else {
+ try {
+ int retryWait = 3000;
+ int wait = retryCount * retryWait;
+ wait = Math.max(wait, retryWait);
+ Thread.sleep(wait);
+ } catch (InterruptedException ex) {
+ Thread.interrupted();
+ processFailedDatas(index);
+ throw new RuntimeException(ex);
+ }
}
+ } else {
+ processedRecords.clear();
+ failedRecords.clear();
+ failedRecords.addAll(toExecuteRecords);
+ processFailedDatas(index);
+ throw error;
}
- } else {
- processedRecords.clear();
- failedRecords.clear();
- failedRecords.addAll(splitDatas);
- processFailedDatas(index);
- throw error;
}
}
}
+
context.getFailedRecords().addAll(allFailedRecords);
context.getProcessedRecords().addAll(allProcessedRecords);
return null;
}
private void doPreparedStatement(PreparedStatement ps, DbDialect dbDialect, LobCreator lobCreator,
- CanalConnectRecord record) throws SQLException {
+ CanalConnectRecord record) throws SQLException {
EventType type = record.getEventType();
List columns = new ArrayList();
if (type.isInsert()) {
@@ -530,11 +704,7 @@ private void doPreparedStatement(PreparedStatement ps, DbDialect dbDialect, LobC
} else if (type.isUpdate()) {
boolean existOldKeys = !CollectionUtils.isEmpty(record.getOldKeys());
columns.addAll(record.getUpdatedColumns());
- if (existOldKeys && dbDialect.isDRDS()) {
- columns.addAll(record.getUpdatedKeys());
- } else {
- columns.addAll(record.getKeys());
- }
+ columns.addAll(record.getKeys());
if (existOldKeys) {
columns.addAll(record.getOldKeys());
}
diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
index 8ef60ff04d..708d5d120c 100644
--- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
+++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
@@ -60,9 +60,17 @@ public static Map> parse(CanalSourceConfig source
switch (entry.getEntryType()) {
case ROWDATA:
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
- needSync = checkNeedSync(sourceConfig, rowChange.getRowDatas(0));
- if (needSync) {
- transactionDataBuffer.add(entry);
+ if (sourceConfig.getServerUUID() != null && sourceConfig.isGTIDMode()) {
+ String currentGtid = entry.getHeader().getPropsList().get(0).getValue();
+ if (currentGtid.contains(sourceConfig.getServerUUID())) {
+ transactionDataBuffer.add(entry);
+ }
+ } else {
+ // if not gtid mode, need check weather the entry is loopback by specified column value
+ needSync = checkNeedSync(sourceConfig, rowChange.getRowDatas(0));
+ if (needSync) {
+ transactionDataBuffer.add(entry);
+ }
}
break;
case TRANSACTIONEND:
@@ -169,6 +177,14 @@ private static CanalConnectRecord internParse(CanalSourceConfig canalSourceConfi
canalConnectRecord.setExecuteTime(entry.getHeader().getExecuteTime());
canalConnectRecord.setJournalName(entry.getHeader().getLogfileName());
canalConnectRecord.setBinLogOffset(entry.getHeader().getLogfileOffset());
+ // if enabled gtid mode, gtid not null
+ if (canalSourceConfig.isGTIDMode()) {
+ String currentGtid = entry.getHeader().getPropsList().get(0).getValue();
+ String gtidRange = replaceGtidRange(entry.getHeader().getGtid(), currentGtid, canalSourceConfig.getServerUUID());
+ canalConnectRecord.setGtid(gtidRange);
+ canalConnectRecord.setCurrentGtid(currentGtid);
+ }
+
EventType eventType = canalConnectRecord.getEventType();
List beforeColumns = rowData.getBeforeColumnsList();
@@ -248,6 +264,17 @@ private static CanalConnectRecord internParse(CanalSourceConfig canalSourceConfi
return canalConnectRecord;
}
+ public static String replaceGtidRange(String gtid, String currentGtid, String serverUUID) {
+ String[] gtidRangeArray = gtid.split(",");
+ for (int i = 0; i < gtidRangeArray.length; i++) {
+ String gtidRange = gtidRangeArray[i];
+ if (gtidRange.startsWith(serverUUID)) {
+ gtidRangeArray[i] = gtidRange.replaceFirst("\\d+$", currentGtid.split(":")[1]);
+ }
+ }
+ return String.join(",", gtidRangeArray);
+ }
+
private static void checkUpdateKeyColumns(Map oldKeyColumns,
Map keyColumns) {
if (oldKeyColumns.isEmpty()) {
diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java
index 4b96177319..6cd575cb77 100644
--- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java
+++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java
@@ -150,6 +150,8 @@ protected void startEventParserInternal(CanalEventParser parser, boolean isGroup
return instance;
}
});
+ DatabaseConnection.sourceConfig = sourceConfig.getSourceConnectorConfig();
+ DatabaseConnection.initSourceConnection();
tableMgr = new RdbTableMgr(sourceConfig.getSourceConnectorConfig(), DatabaseConnection.sourceDataSource);
}
@@ -180,6 +182,9 @@ private Canal buildCanal(CanalSourceConfig sourceConfig) {
parameter.setDbUsername(sourceConfig.getSourceConnectorConfig().getUserName());
parameter.setDbPassword(sourceConfig.getSourceConnectorConfig().getPassWord());
+ // set if enabled gtid mode
+ parameter.setGtidEnable(sourceConfig.isGTIDMode());
+
// check positions
// example: Arrays.asList("{\"journalName\":\"mysql-bin.000001\",\"position\":6163L,\"timestamp\":1322803601000L}",
// "{\"journalName\":\"mysql-bin.000001\",\"position\":6163L,\"timestamp\":1322803601000L}")
@@ -193,6 +198,14 @@ private Canal buildCanal(CanalSourceConfig sourceConfig) {
recordPositionMap.put("journalName", canalRecordPartition.getJournalName());
recordPositionMap.put("timestamp", canalRecordPartition.getTimeStamp());
recordPositionMap.put("position", canalRecordOffset.getOffset());
+ String gtidRange = canalRecordOffset.getGtid();
+ if (gtidRange != null) {
+ if (canalRecordOffset.getCurrentGtid() != null) {
+ gtidRange = EntryParser.replaceGtidRange(canalRecordOffset.getGtid(), canalRecordOffset.getCurrentGtid(),
+ sourceConfig.getServerUUID());
+ }
+ recordPositionMap.put("gtid", gtidRange);
+ }
positions.add(JsonUtils.toJSONString(recordPositionMap));
});
parameter.setPositions(positions);
@@ -237,7 +250,13 @@ public void start() throws Exception {
@Override
public void commit(ConnectRecord record) {
long batchId = Long.parseLong(record.getExtension("messageId"));
- canalServer.ack(clientIdentity, batchId);
+ int batchIndex = record.getExtension("batchIndex", Integer.class);
+ int totalBatches = record.getExtension("totalBatches", Integer.class);
+ if (batchIndex == totalBatches - 1) {
+ log.debug("ack records batchIndex:{}, totalBatches:{}, batchId:{}",
+ batchIndex, totalBatches, batchId);
+ canalServer.ack(clientIdentity, batchId);
+ }
}
@Override
@@ -301,21 +320,37 @@ public List poll() {
if (!connectorRecordMap.isEmpty()) {
Set>> entrySet = connectorRecordMap.entrySet();
for (Map.Entry> entry : entrySet) {
- // Xid offset
- Long binLogOffset = entry.getKey();
List connectRecordList = entry.getValue();
CanalConnectRecord lastRecord = entry.getValue().get(connectRecordList.size() - 1);
CanalRecordPartition canalRecordPartition = new CanalRecordPartition();
+ canalRecordPartition.setServerUUID(sourceConfig.getServerUUID());
canalRecordPartition.setJournalName(lastRecord.getJournalName());
canalRecordPartition.setTimeStamp(lastRecord.getExecuteTime());
-
+ // Xid offset with gtid
+ Long binLogOffset = entry.getKey();
CanalRecordOffset canalRecordOffset = new CanalRecordOffset();
canalRecordOffset.setOffset(binLogOffset);
+ if (StringUtils.isNotEmpty(lastRecord.getGtid()) && StringUtils.isNotEmpty(lastRecord.getCurrentGtid())) {
+ canalRecordOffset.setGtid(lastRecord.getGtid());
+ canalRecordOffset.setCurrentGtid(lastRecord.getCurrentGtid());
+ }
- ConnectRecord connectRecord = new ConnectRecord(canalRecordPartition, canalRecordOffset, System.currentTimeMillis());
- connectRecord.addExtension("messageId", String.valueOf(message.getId()));
- connectRecord.setData(connectRecordList);
- result.add(connectRecord);
+ // split record list
+ List> splitLists = new ArrayList<>();
+ for (int i = 0; i < connectRecordList.size(); i += sourceConfig.getBatchSize()) {
+ int end = Math.min(i + sourceConfig.getBatchSize(), connectRecordList.size());
+ List subList = connectRecordList.subList(i, end);
+ splitLists.add(subList);
+ }
+
+ for (int i = 0; i < splitLists.size(); i++) {
+ ConnectRecord connectRecord = new ConnectRecord(canalRecordPartition, canalRecordOffset, System.currentTimeMillis());
+ connectRecord.addExtension("messageId", String.valueOf(message.getId()));
+ connectRecord.addExtension("batchIndex", i);
+ connectRecord.addExtension("totalBatches", splitLists.size());
+ connectRecord.setData(splitLists.get(i));
+ result.add(connectRecord);
+ }
}
} else {
// for the message has been filtered need ack message