Skip to content

Commit

Permalink
PLAT-188185:Send all the failed message to the error reporter topic a…
Browse files Browse the repository at this point in the history
…nd throw exception (#69)

and Updated the documentation
  • Loading branch information
maniskum authored May 31, 2024
1 parent c776465 commit d5257f0
Show file tree
Hide file tree
Showing 7 changed files with 205 additions and 25 deletions.
17 changes: 17 additions & 0 deletions DEVELOPER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,23 @@ curl -s -X POST \
}' http://localhost:8083/connectors
```
#### Error Handling And Logging
| Config | Description |
|---------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------|
| "errors.tolerance": "none" | Default Behaviour.Connector will stop working on error |
| "errors.tolerance": "all","errors.log.enable": "false" | Connector will continue on error and error message will not be logged |
| "errors.tolerance": "all","errors.log.enable": "true","errors.log.include.message": "false" | Connector will continue on error and error occurrence will be logged and failed message will not be logged |
| "errors.tolerance": "all","errors.log.enable": "true","errors.log.include.message": "true" | Connector will continue on error and error occurrence will be logged and failed message will be logged |
| "errors.tolerance": "all","errors.log.enable": "true or false","errors.deadletterqueue.topic.name": "topic-name"|Connector will continue on error and error occurrence will be logged as per the option and failed message will be send to dead letter topic |
| "errors.tolerance": "all","errors.log.enable": "true or false","errors.deadletterqueue.topic.name": "topic-name","errors.deadletterqueue.context.headers.enable": "true" | Connector will continue on error and error occurrence will be logged as per the option and failed message will be send to dead letter topic and failure reason will be logged in message header. |
Please follow documentation for more information : `https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues`
Note : In case of Authentication error such as 401 and 403 connector will stop working irrespective of what error tolerance value we have set.
#### Dead Letter Configuration
To send error records to dead letter topic please use standard kafka connector error configuration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public class AEPPublisher extends AbstractAEPPublisher {
private static final String MESSAGES_KEY = "messages";
private static final String RESPONSES_KEY = "responses";
private static final String STATUS_KEY = "status";
private static final String XACTIONID_KEY = "xactionId";
private static final String DASH = "-";

private int count;
private final HttpProducer producer;
Expand All @@ -68,6 +70,7 @@ public void publishData(List<Pair<String, SinkRecord>> messages) throws AEPStrea
int totalMessageCount;

final ArrayNode jsonMessages = JacksonFactory.OBJECT_MAPPER.createArrayNode();

try {
messages.stream()
.map(Pair::getKey)
Expand Down Expand Up @@ -100,7 +103,14 @@ public void publishData(List<Pair<String, SinkRecord>> messages) throws AEPStrea
for (JsonNode messageResponse : publishMessagesResponses) {
if (messageResponse.hasNonNull(STATUS_KEY)) {
failedMessageCount++;
LOG.debug("Failed to publish message to Adobe Experience Platform: {}", messageResponse);
final Pair<String, SinkRecord> failedMessage = messages.get(getFailedMessageIndex(messageResponse));
LOG.debug("Failed to publish message: {} to Adobe Experience Platform due to the error: {}",
failedMessage, messageResponse);
if (Objects.nonNull(errorReporter)) {
final int responseCode = messageResponse.get(STATUS_KEY).asInt();
errorReporter.report(failedMessage.getRight(),
new HttpException(String.format("error response= %s", messageResponse), responseCode));
}
} else {
successMessageCount++;
}
Expand All @@ -110,25 +120,32 @@ public void publishData(List<Pair<String, SinkRecord>> messages) throws AEPStrea
} else {
LOG.error("Invalid Response received while publishing data to Adobe Experience Platform: {}", response);
}

} catch (JsonProcessingException jsonException) {
LOG.error("Failed to publish data to Adobe Experience Platform", jsonException);
if (Objects.nonNull(errorReporter)) {
messages.forEach(message -> errorReporter.report(message.getValue(), jsonException));
}
throw new AEPStreamingException("Failed to publish invalid JSON", jsonException);
} catch (HttpException httpException) {
LOG.error("Failed to publish data to Adobe Experience Platform", httpException);
if (Objects.nonNull(errorReporter)) {
messages.forEach(message -> errorReporter.report(message.getValue(), httpException));
}
final int responseCode = httpException.getResponseCode();
if (HttpUtil.is500(responseCode) || HttpUtil.isUnauthorized(responseCode)) {
if (HttpUtil.isUnauthorized(responseCode)) {
throw new AEPStreamingException("Failed to publish", httpException);
}
}
}

private Integer getFailedMessageIndex(final JsonNode messageResponse) throws HttpException {
if (messageResponse.hasNonNull(XACTIONID_KEY)) {
final String xactionId = messageResponse.get(XACTIONID_KEY).asText();
return Integer.parseInt(xactionId.substring(xactionId.lastIndexOf(DASH) + 1));
}
throw new HttpException(String.format("xactionId is missing in the failed message error response : %s",
messageResponse));
}

public void stop() {
LOG.info("Stopping AEP Data Publisher after publishing {} messages", count);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class AEPSinkConnectorErrorReporterTest extends AbstractConnectorTest {
private static final String AEP_KAFKA_ERROR_CONNECTOR_CONFIG = "aep-connector-error-reporter.json";
private static final String AEP_KAFKA_ERROR_CONNECTOR_HEADER_CONFIG = "aep-connector-error-reporter-header.json";
private static final String XDM_PAYLOAD_FILE = "xdm-data.json";
private static final String XDM_MULTI_MESSAGE_PAYLOAD_FILE = "xdm-data-multiple-messages.json";
private static final String DEAD_LETTER_TOPIC = "errors.deadletterqueue.topic.name";
private static final String ERROR_CLASS_NAME = "__connect.errors.exception.class.name";
private static final String ERROR_HEADER_MESSAGE = "__connect.errors.exception.message";
Expand All @@ -70,7 +71,7 @@ public void kafkaErrorReporterTest() throws HttpException, IOException, Interrup
LOG.info("Starting connector cluster with connector : {}", CONNECTOR_NAME);
getConnect().configureConnector(CONNECTOR_NAME, connectorConfig);

String xdmData = xdmData();
String xdmData = xdmData(XDM_PAYLOAD_FILE);
getConnect().kafka().produce(TOPIC_NAME, xdmData);
waitForConnectorStart(CONNECTOR_NAME, 1, 8000);

Expand All @@ -87,7 +88,89 @@ public void kafkaErrorReporterTest() throws HttpException, IOException, Interrup

// Verify inlet endpoint received 1 XDM record
getWiremockServer().verify(postRequestedFor(urlEqualTo(getRelativeUrl()))
.withRequestBody(equalToJson(payloadReceivedXdmData())));
.withRequestBody(equalToJson(payloadReceivedXdmData(XDM_PAYLOAD_FILE))));
}

@Test
public void kafkaErrorReporterMultiMessageTest() throws HttpException, IOException, InterruptedException {
inletMultiStatusSuccessfulResponse();
getConnect().kafka().createTopic(TOPIC_NAME, TOPIC_PARTITION);

// Create error topic to dump failed data
Map<String, String> connectorConfig = connectorConfig(AEP_KAFKA_ERROR_CONNECTOR_CONFIG);
getConnect().kafka().createTopic(connectorConfig.get(DEAD_LETTER_TOPIC), TOPIC_PARTITION);

LOG.info("Starting connector cluster with connector : {}", CONNECTOR_NAME);
getConnect().configureConnector(CONNECTOR_NAME, connectorConfig);

String xdmData = xdmData(XDM_MULTI_MESSAGE_PAYLOAD_FILE);
ArrayNode xdmDataValues = (ArrayNode)JacksonFactory.OBJECT_MAPPER.readTree(xdmData);
String failedMessage = xdmDataValues.get(0).toString();
String successMessage = xdmDataValues.get(1).toString();

getConnect().kafka().produce(TOPIC_NAME, failedMessage);
getConnect().kafka().produce(TOPIC_NAME, successMessage);

waitForConnectorStart(CONNECTOR_NAME, 1, 8000);

// Check if error record sent to error topic
ConsumerRecords<byte[], byte[]> consumerRecords = getConnect().kafka()
.consume(1, 8000, connectorConfig.get(DEAD_LETTER_TOPIC));

Assertions.assertEquals(1, consumerRecords.count());

ConsumerRecord<byte[], byte[]> consumerRecord = consumerRecords.iterator().next();
JsonNode record = JacksonFactory.OBJECT_MAPPER.readTree(consumerRecord.value());

Assertions.assertEquals(JacksonFactory.OBJECT_MAPPER.readTree(failedMessage).toString(), record.toString());

// Verify inlet endpoint received 2 XDM record
getWiremockServer().verify(postRequestedFor(urlEqualTo(getRelativeUrl()))
.withRequestBody(equalToJson(payloadReceivedMultiMessageXdmData(XDM_MULTI_MESSAGE_PAYLOAD_FILE))));
}

@Test
public void kafkaErrorReporterMultiMessageWithHeadersTest() throws HttpException, IOException, InterruptedException {
inletMultiStatusSuccessfulResponse();

getConnect().kafka().createTopic(TOPIC_NAME, TOPIC_PARTITION);

// Create error topic to dump failed data
Map<String, String> connectorConfig = connectorConfig(AEP_KAFKA_ERROR_CONNECTOR_HEADER_CONFIG);
getConnect().kafka().createTopic(connectorConfig.get(DEAD_LETTER_TOPIC), TOPIC_PARTITION);

LOG.info("Starting connector cluster with connector : {}", CONNECTOR_NAME);
getConnect().configureConnector(CONNECTOR_NAME, connectorConfig);

String xdmData = xdmData(XDM_MULTI_MESSAGE_PAYLOAD_FILE);
ArrayNode xdmDataValues = (ArrayNode)JacksonFactory.OBJECT_MAPPER.readTree(xdmData);
String failedMessage = xdmDataValues.get(0).toString();
String successMessage = xdmDataValues.get(1).toString();
getConnect().kafka().produce(TOPIC_NAME, failedMessage);
getConnect().kafka().produce(TOPIC_NAME, successMessage);

waitForConnectorStart(CONNECTOR_NAME, 1, 8000);

// Check if error record sent to error topic
ConsumerRecords<byte[], byte[]> consumerRecords = getConnect().kafka()
.consume(1, 8000, connectorConfig.get(DEAD_LETTER_TOPIC));

Assertions.assertEquals(1, consumerRecords.count());

ConsumerRecord<byte[], byte[]> consumerRecord = consumerRecords.iterator().next();
JsonNode record = JacksonFactory.OBJECT_MAPPER.readTree(consumerRecord.value());

Assertions.assertEquals(JacksonFactory.OBJECT_MAPPER.readTree(failedMessage).toString(), record.toString());

final Headers errorHeaders = consumerRecord.headers();
errorHeaders.headers(ERROR_CLASS_NAME)
.forEach(header -> Assertions.assertEquals(EXPECTED_EXCEPTION_CLASS, new String(header.value())));
errorHeaders.headers(ERROR_HEADER_MESSAGE).forEach(header ->
Assertions.assertTrue(new String(header.value()).contains(String.valueOf(HTTP_BAD_REQUEST_ERROR_CODE))));

// Verify inlet endpoint received 2 XDM record
getWiremockServer().verify(postRequestedFor(urlEqualTo(getRelativeUrl()))
.withRequestBody(equalToJson(payloadReceivedMultiMessageXdmData(XDM_MULTI_MESSAGE_PAYLOAD_FILE))));
}

@Test
Expand All @@ -101,7 +184,7 @@ public void kafkaErrorReporterWithHeadersTest() throws HttpException, IOExceptio
LOG.info("Starting connector cluster with connector : {}", CONNECTOR_NAME);
getConnect().configureConnector(CONNECTOR_NAME, connectorConfig);

String xdmData = xdmData();
String xdmData = xdmData(XDM_PAYLOAD_FILE);
getConnect().kafka().produce(TOPIC_NAME, xdmData);
waitForConnectorStart(CONNECTOR_NAME, 1, 8000);

Expand All @@ -122,13 +205,21 @@ public void kafkaErrorReporterWithHeadersTest() throws HttpException, IOExceptio
errorHeaders.headers(ERROR_HEADER_MESSAGE).forEach(header ->
Assertions.assertTrue(new String(header.value()).contains(String.valueOf(HTTP_SERVER_SIDE_ERROR_CODE))));

// Verify inlet endpoint received 1 XDM record
// Verify inlet endpoint received 2 XDM record
getWiremockServer().verify(postRequestedFor(urlEqualTo(getRelativeUrl()))
.withRequestBody(equalToJson(payloadReceivedXdmData())));
.withRequestBody(equalToJson(payloadReceivedXdmData(XDM_PAYLOAD_FILE))));
}

public String payloadReceivedXdmData() throws HttpException, JsonProcessingException {
String xdmData = xdmData();
public String payloadReceivedMultiMessageXdmData(String payloadfile) throws HttpException, JsonProcessingException {
String xdmData = xdmData(payloadfile);
ObjectNode messageNode = JacksonFactory.OBJECT_MAPPER.createObjectNode();
ArrayNode xdmDataValues = (ArrayNode)JacksonFactory.OBJECT_MAPPER.readTree(xdmData);
messageNode.set("messages", xdmDataValues);
return JacksonFactory.OBJECT_MAPPER.writeValueAsString(messageNode);
}

public String payloadReceivedXdmData(String payloadfile) throws HttpException, JsonProcessingException {
String xdmData = xdmData(payloadfile);
ObjectNode messageNode = JacksonFactory.OBJECT_MAPPER.createObjectNode();
ArrayNode xdmDataValues = JacksonFactory.OBJECT_MAPPER.createArrayNode();
xdmDataValues.add(JacksonFactory.OBJECT_MAPPER.readTree(xdmData));
Expand All @@ -137,8 +228,8 @@ public String payloadReceivedXdmData() throws HttpException, JsonProcessingExcep
return JacksonFactory.OBJECT_MAPPER.writeValueAsString(messageNode);
}

public String xdmData() throws HttpException {
return HttpUtil.streamToString(this.getClass().getClassLoader().getResourceAsStream(XDM_PAYLOAD_FILE));
public String xdmData(String payloadfile) throws HttpException {
return HttpUtil.streamToString(this.getClass().getClassLoader().getResourceAsStream(payloadfile));
}

public Map<String, String> connectorConfig(String configFile) throws HttpException, JsonProcessingException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public abstract class AbstractConnectorTest {
.constructMapLikeType(TreeMap.class, String.class, String.class);
private static final long OFFSET_COMMIT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(5);
protected static final int HTTP_SERVER_SIDE_ERROR_CODE = 500;
protected static final int HTTP_BAD_REQUEST_ERROR_CODE = 400;

private static final String AUTH_TOKEN_RESPONSE = "{\"access_token\":\"accessToken\"," +
"\"refresh_token\":\"refreshToken\",\"token_type\":\"bearer\",\"expires_in\":82399996}";
private static final String AUTH_TOKEN_RESPONSE_OAUTH2 = "{\"access_token\":\"accessToken\"," +
Expand All @@ -56,6 +58,8 @@ public abstract class AbstractConnectorTest {
private static final String AEP_CONNECTOR_INLET_SUCCESSFUL_RESPONSE =
"aep-connector-inlet-successful-response.json";

private static final String AEP_CONNECTOR_INLET_MULTI_STATUS_SUCCESSFUL_RESPONSE =
"aep-connector-inlet-multi-message-response.json";
protected static final int TOPIC_PARTITION = 1;
protected static final int NUMBER_OF_TASKS = 1;
protected static final String CONNECTOR_NAME = "aep-sink-connector";
Expand Down Expand Up @@ -132,6 +136,15 @@ public void inletSuccessfulResponse() throws IOException {
.getResourceAsStream(AEP_CONNECTOR_INLET_SUCCESSFUL_RESPONSE)))));
}

public void inletMultiStatusSuccessfulResponse() throws IOException {
wiremockExtension.getWireMockServer()
.stubFor(WireMock
.post(WireMock.urlEqualTo(getRelativeUrl()))
.willReturn(ResponseDefinitionBuilder.responseDefinition()
.withJsonBody(JacksonFactory.OBJECT_MAPPER.readTree(this.getClass().getClassLoader()
.getResourceAsStream(AEP_CONNECTOR_INLET_MULTI_STATUS_SUCCESSFUL_RESPONSE)))));
}

public void inletIMSAuthenticationSuccessfulResponse() throws JsonProcessingException {
wiremockExtension.getWireMockServer()
.stubFor(WireMock
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"inletId": "9b0cb233972f3b0092992284c7353f5eead496218e8441a79b25e9421ea127f5",
"batchId": "1565638336649:1750:244",
"receivedTimeMs": 1565638336705,
"responses": [
{
"xactionId":"9341f8eb-494a-4c89-9879-4d06a58d2dc7-0",
"status":400,
"message":"The 'header' field is mandatory. Provide a valid 'header' value and try again."
},
{
"xactionId": "9341f8eb-494a-4c89-9879-4d06a58d2dc7-1"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,6 @@
"responses": [
{
"xactionId": "1565650704337:2124:92:3"
},
{
"status": 400,
"message": "inletId: [9b0cb233972f3b0092992284c7353f5eead496218e8441a79b25e9421ea127f5] imsOrgId: [{ORG_ID}] Message has unknown xdm format"
},
{
"status": 400,
"message": "inletId: [9b0cb233972f3b0092992284c7353f5eead496218e8441a79b25e9421ea127f5] imsOrgId: [{ORG_ID}] Message has an absent or wrong ims org in the header"
},
{
"status": 400,
"message": "inletId: [9b0cb233972f3b0092992284c7353f5eead496218e8441a79b25e9421ea127f5] imsOrgId: [{ORG_ID}] Message has unknown xdm format"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
[
{
"body": {
"xdmMeta": {
"schemaRef": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/{SCHEMA_ID}",
"contentType": "application/vnd.adobe.xed-full+json;version={SCHEMA_VERSION}"
}
},
"xdmEntity": {
"firstname": "abc",
"lastname": "def"
}
}
},
{
"header": {
"schemaRef": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/{SCHEMA_ID}",
"contentType": "application/vnd.adobe.xed-full+json;version={SCHEMA_VERSION}"
},
"source": {
"name": "aep-sink-connector"
}
},
"body": {
"xdmMeta": {
"schemaRef": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/{SCHEMA_ID}",
"contentType": "application/vnd.adobe.xed-full+json;version={SCHEMA_VERSION}"
}
},
"xdmEntity": {
"firstname": "abc",
"lastname": "def"
}
}
}
]

0 comments on commit d5257f0

Please sign in to comment.