Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #5048] Add report verify request to admin for connector runtime #5049

Merged
merged 9 commits into from
Jul 30, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public class CanalSinkConfig extends SinkConfig {

private boolean isGTIDMode = true;

private boolean isMariaDB = true;

// skip sink process exception
private Boolean skipException = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public class CanalSourceConfig extends SourceConfig {

private String serverUUID;

private boolean isMariaDB = true;

private boolean isGTIDMode = true;

private Integer batchSize = 10000;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.common.enums;

public enum ConnectorStage {
SOURCE,
SINK

Check warning on line 22 in eventmesh-common/src/main/java/org/apache/eventmesh/common/enums/ConnectorStage.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-common/src/main/java/org/apache/eventmesh/common/enums/ConnectorStage.java#L20-L22

Added lines #L20 - L22 were not covered by tests
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.common.remote.request;

import lombok.Data;
import lombok.EqualsAndHashCode;

@Data
@EqualsAndHashCode(callSuper = true)
public class ReportVerifyRequest extends BaseRemoteRequest {

private String taskID;

private String recordID;

private String recordSig;

private String connectorName;

private String connectorStage;

private String position;
}
Original file line number Diff line number Diff line change
Expand Up @@ -476,9 +476,11 @@ private Exception doCall() {
}
JdbcTemplate template = dbDialect.getJdbcTemplate();
String sourceGtid = context.getGtid();
if (StringUtils.isNotEmpty(sourceGtid)) {
String setGtid = "SET @@session.gtid_next = '" + sourceGtid + "';";
template.execute(setGtid);
if (StringUtils.isNotEmpty(sourceGtid) && !sinkConfig.isMariaDB()) {
String setMySQLGtid = "SET @@session.gtid_next = '" + sourceGtid + "';";
template.execute(setMySQLGtid);
} else if (StringUtils.isNotEmpty(sourceGtid) && sinkConfig.isMariaDB()) {
throw new RuntimeException("unsupport gtid mode for mariaDB");
} else {
log.error("gtid is empty in gtid mode");
throw new RuntimeException("gtid is empty in gtid mode");
Expand Down Expand Up @@ -510,8 +512,13 @@ public void setValues(PreparedStatement ps) throws SQLException {
});

// reset gtid
String resetGtid = "SET @@session.gtid_next = AUTOMATIC;";
dbDialect.getJdbcTemplate().execute(resetGtid);
if (sinkConfig.isMariaDB()) {
throw new RuntimeException("unsupport gtid mode for mariaDB");
} else {
String resetMySQLGtid = "SET @@session.gtid_next = 'AUTOMATIC';";
dbDialect.getJdbcTemplate().execute(resetMySQLGtid);
}

error = null;
exeResult = ExecuteResult.SUCCESS;
} catch (DeadlockLoserDataAccessException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
public class EntryParser {

public static Map<Long, List<CanalConnectRecord>> parse(CanalSourceConfig sourceConfig, List<Entry> datas,
RdbTableMgr tables) {
RdbTableMgr tables) {
List<CanalConnectRecord> recordList = new ArrayList<>();
List<Entry> transactionDataBuffer = new ArrayList<>();
// need check weather the entry is loopback
Expand All @@ -60,9 +60,9 @@ public static Map<Long, List<CanalConnectRecord>> parse(CanalSourceConfig source
switch (entry.getEntryType()) {
case ROWDATA:
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
if (sourceConfig.getServerUUID() != null && sourceConfig.isGTIDMode()) {
String currentGtid = entry.getHeader().getPropsList().get(0).getValue();
if (currentGtid.contains(sourceConfig.getServerUUID())) {
// don't support gtid for mariadb
if (sourceConfig.getServerUUID() != null && sourceConfig.isGTIDMode() && !sourceConfig.isMariaDB()) {
if (checkGtidForEntry(entry, sourceConfig)) {
transactionDataBuffer.add(entry);
}
} else {
Expand Down Expand Up @@ -90,9 +90,14 @@ public static Map<Long, List<CanalConnectRecord>> parse(CanalSourceConfig source
return recordMap;
}

private static boolean checkGtidForEntry(Entry entry, CanalSourceConfig sourceConfig) {
String currentGtid = entry.getHeader().getPropsList().get(0).getValue();
return currentGtid.contains(sourceConfig.getServerUUID());
}

private static void parseRecordListWithEntryBuffer(CanalSourceConfig sourceConfig,
List<CanalConnectRecord> recordList,
List<Entry> transactionDataBuffer, RdbTableMgr tables) {
List<CanalConnectRecord> recordList,
List<Entry> transactionDataBuffer, RdbTableMgr tables) {
for (Entry bufferEntry : transactionDataBuffer) {
List<CanalConnectRecord> recordParsedList = internParse(sourceConfig, bufferEntry, tables);
if (CollectionUtils.isEmpty(recordParsedList)) {
Expand Down Expand Up @@ -130,7 +135,7 @@ private static Column getColumnIgnoreCase(List<Column> columns, String columName
}

private static List<CanalConnectRecord> internParse(CanalSourceConfig sourceConfig, Entry entry,
RdbTableMgr tableMgr) {
RdbTableMgr tableMgr) {
String schemaName = entry.getHeader().getSchemaName();
String tableName = entry.getHeader().getTableName();
if (tableMgr.getTable(schemaName, tableName) == null) {
Expand Down Expand Up @@ -169,7 +174,7 @@ private static List<CanalConnectRecord> internParse(CanalSourceConfig sourceConf
}

private static CanalConnectRecord internParse(CanalSourceConfig canalSourceConfig, Entry entry,
RowChange rowChange, RowData rowData) {
RowChange rowChange, RowData rowData) {
CanalConnectRecord canalConnectRecord = new CanalConnectRecord();
canalConnectRecord.setTableName(entry.getHeader().getTableName());
canalConnectRecord.setSchemaName(entry.getHeader().getSchemaName());
Expand All @@ -179,10 +184,16 @@ private static CanalConnectRecord internParse(CanalSourceConfig canalSourceConfi
canalConnectRecord.setBinLogOffset(entry.getHeader().getLogfileOffset());
// if enabled gtid mode, gtid not null
if (canalSourceConfig.isGTIDMode()) {
String currentGtid = entry.getHeader().getPropsList().get(0).getValue();
String gtidRange = replaceGtidRange(entry.getHeader().getGtid(), currentGtid, canalSourceConfig.getServerUUID());
canalConnectRecord.setGtid(gtidRange);
canalConnectRecord.setCurrentGtid(currentGtid);
if (canalSourceConfig.isMariaDB()) {
String currentGtid = entry.getHeader().getGtid();
canalConnectRecord.setGtid(currentGtid);
canalConnectRecord.setCurrentGtid(currentGtid);
} else {
String currentGtid = entry.getHeader().getPropsList().get(0).getValue();
String gtidRange = replaceGtidRange(entry.getHeader().getGtid(), currentGtid, canalSourceConfig.getServerUUID());
canalConnectRecord.setGtid(gtidRange);
canalConnectRecord.setCurrentGtid(currentGtid);
}
}

EventType eventType = canalConnectRecord.getEventType();
Expand Down Expand Up @@ -276,7 +287,7 @@ public static String replaceGtidRange(String gtid, String currentGtid, String se
}

private static void checkUpdateKeyColumns(Map<String, EventColumn> oldKeyColumns,
Map<String, EventColumn> keyColumns) {
Map<String, EventColumn> keyColumns) {
if (oldKeyColumns.isEmpty()) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,13 +198,16 @@ private Canal buildCanal(CanalSourceConfig sourceConfig) {
recordPositionMap.put("journalName", canalRecordPartition.getJournalName());
recordPositionMap.put("timestamp", canalRecordPartition.getTimeStamp());
recordPositionMap.put("position", canalRecordOffset.getOffset());
String gtidRange = canalRecordOffset.getGtid();
if (gtidRange != null) {
if (canalRecordOffset.getCurrentGtid() != null) {
gtidRange = EntryParser.replaceGtidRange(canalRecordOffset.getGtid(), canalRecordOffset.getCurrentGtid(),
sourceConfig.getServerUUID());
// for mariaDB not support gtid mode
if (sourceConfig.isGTIDMode() && !sourceConfig.isMariaDB()) {
String gtidRange = canalRecordOffset.getGtid();
if (gtidRange != null) {
if (canalRecordOffset.getCurrentGtid() != null) {
gtidRange = EntryParser.replaceGtidRange(canalRecordOffset.getGtid(), canalRecordOffset.getCurrentGtid(),
sourceConfig.getServerUUID());
}
recordPositionMap.put("gtid", gtidRange);
}
recordPositionMap.put("gtid", gtidRange);
}
positions.add(JsonUtils.toJSONString(recordPositionMap));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@
import org.apache.eventmesh.common.config.connector.SinkConfig;
import org.apache.eventmesh.common.config.connector.SourceConfig;
import org.apache.eventmesh.common.config.connector.offset.OffsetStorageConfig;
import org.apache.eventmesh.common.enums.ConnectorStage;
import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc;
import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc.AdminServiceBlockingStub;
import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc.AdminServiceStub;
import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata;
import org.apache.eventmesh.common.protocol.grpc.adminserver.Payload;
import org.apache.eventmesh.common.remote.request.FetchJobRequest;
import org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest;
import org.apache.eventmesh.common.remote.request.ReportVerifyRequest;
import org.apache.eventmesh.common.remote.response.FetchJobResponse;
import org.apache.eventmesh.common.utils.IPUtils;
import org.apache.eventmesh.common.utils.JsonUtils;
Expand All @@ -55,10 +57,13 @@

import org.apache.commons.collections4.CollectionUtils;

import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -281,8 +286,9 @@
try {
this.stop();
} catch (Exception ex) {
throw new RuntimeException(ex);
log.error("Failed to stop after exception", ex);
}
throw new RuntimeException(e);
}
});
// start
Expand All @@ -294,8 +300,9 @@
try {
this.stop();
} catch (Exception ex) {
throw new RuntimeException(ex);
log.error("Failed to stop after exception", ex);
}
throw new RuntimeException(e);
}
});
}
Expand All @@ -304,6 +311,8 @@
public void stop() throws Exception {
sourceConnector.stop();
sinkConnector.stop();
sourceService.shutdown();
sinkService.shutdown();
heartBeatExecutor.shutdown();
requestObserver.onCompleted();
if (channel != null && !channel.isShutdown()) {
Expand All @@ -318,6 +327,11 @@
// TODO: use producer pub record to storage replace below
if (connectorRecordList != null && !connectorRecordList.isEmpty()) {
for (ConnectRecord record : connectorRecordList) {
// if enabled incremental data reporting consistency check
if (connectorRuntimeConfig.enableIncrementalDataConsistencyCheck) {
reportVerifyRequest(record, connectorRuntimeConfig, ConnectorStage.SOURCE);
}

queue.put(record);
Optional<RecordOffsetManagement.SubmittedPosition> submittedRecordPosition = prepareToUpdateRecordOffset(record);
Optional<SendMessageCallback> callback =
Expand All @@ -336,6 +350,43 @@
}
}

private void reportVerifyRequest(ConnectRecord record, ConnectorRuntimeConfig connectorRuntimeConfig, ConnectorStage connectorStage) {
UUID uuid = UUID.randomUUID();
String recordId = uuid.toString();
String md5Str = md5(record.toString());
ReportVerifyRequest reportVerifyRequest = new ReportVerifyRequest();
reportVerifyRequest.setTaskID(connectorRuntimeConfig.getTaskID());
reportVerifyRequest.setRecordID(recordId);
reportVerifyRequest.setRecordSig(md5Str);
reportVerifyRequest.setConnectorName(
IPUtils.getLocalAddress() + "_" + connectorRuntimeConfig.getJobID() + "_" + connectorRuntimeConfig.getRegion());
reportVerifyRequest.setConnectorStage(connectorStage.name());
reportVerifyRequest.setPosition(JsonUtils.toJSONString(record.getPosition()));

Metadata metadata = Metadata.newBuilder().setType(ReportVerifyRequest.class.getSimpleName()).build();

Payload request = Payload.newBuilder().setMetadata(metadata)
.setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(Objects.requireNonNull(JsonUtils.toJSONBytes(reportVerifyRequest))))
.build())
.build();

requestObserver.onNext(request);
}

private String md5(String input) {
try {
MessageDigest md = MessageDigest.getInstance("MD5");
Dismissed Show dismissed Hide dismissed
byte[] messageDigest = md.digest(input.getBytes());
StringBuilder sb = new StringBuilder();
for (byte b : messageDigest) {
sb.append(String.format("%02x", b));
}
return sb.toString();
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException(e);
}
}

public Optional<RecordOffsetManagement.SubmittedPosition> prepareToUpdateRecordOffset(ConnectRecord record) {
return Optional.of(this.offsetManagement.submitRecord(record.getPosition()));
}
Expand Down Expand Up @@ -426,6 +477,10 @@
List<ConnectRecord> connectRecordList = new ArrayList<>();
connectRecordList.add(connectRecord);
sinkConnector.put(connectRecordList);
// if enabled incremental data reporting consistency check
if (connectorRuntimeConfig.enableIncrementalDataConsistencyCheck) {
reportVerifyRequest(connectRecord, connectorRuntimeConfig, ConnectorStage.SINK);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,12 @@ public class ConnectorRuntimeConfig {

private String connectorRuntimeInstanceId;

private String taskID;

private String jobID;

private String region;

private String sourceConnectorType;

private String sourceConnectorDesc;
Expand All @@ -45,4 +49,6 @@ public class ConnectorRuntimeConfig {

private Map<String, Object> sinkConnectorConfig;

public boolean enableIncrementalDataConsistencyCheck = true;

}
2 changes: 2 additions & 0 deletions eventmesh-runtime-v2/src/main/resources/connector.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,6 @@
# limitations under the License.
#

taskID: 1
jobID: 1
region: region1
Loading