diff --git a/DEVELOPER_GUIDE.md b/DEVELOPER_GUIDE.md index 3493fca..1909b23 100644 --- a/DEVELOPER_GUIDE.md +++ b/DEVELOPER_GUIDE.md @@ -341,6 +341,23 @@ curl -s -X POST \ }' http://localhost:8083/connectors ``` +#### Error Handling And Logging + + + + +| Config | Description | +|---------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------| +| "errors.tolerance": "none" | Default Behaviour.Connector will stop working on error | +| "errors.tolerance": "all","errors.log.enable": "false" | Connector will continue on error and error message will not be logged | +| "errors.tolerance": "all","errors.log.enable": "true","errors.log.include.message": "false" | Connector will continue on error and error occurrence will be logged and failed message will not be logged | +| "errors.tolerance": "all","errors.log.enable": "true","errors.log.include.message": "true" | Connector will continue on error and error occurrence will be logged and failed message will be logged | +| "errors.tolerance": "all","errors.log.enable": "true or false","errors.deadletterqueue.topic.name": "topic-name"|Connector will continue on error and error occurrence will be logged as per the option and failed message will be send to dead letter topic | +| "errors.tolerance": "all","errors.log.enable": "true or false","errors.deadletterqueue.topic.name": "topic-name","errors.deadletterqueue.context.headers.enable": "true" | Connector will continue on error and error occurrence will be logged as per the option and failed message will be send to dead letter topic and failure reason will be logged in message header. | +Please follow documentation for more information : `https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues` + +Note : In case of Authentication error such as 401 and 403 connector will stop working irrespective of what error tolerance value we have set. + #### Dead Letter Configuration To send error records to dead letter topic please use standard kafka connector error configuration. diff --git a/streaming-connect-sink/src/main/java/com/adobe/platform/streaming/sink/impl/AEPPublisher.java b/streaming-connect-sink/src/main/java/com/adobe/platform/streaming/sink/impl/AEPPublisher.java index ca5ffc9..9231d82 100644 --- a/streaming-connect-sink/src/main/java/com/adobe/platform/streaming/sink/impl/AEPPublisher.java +++ b/streaming-connect-sink/src/main/java/com/adobe/platform/streaming/sink/impl/AEPPublisher.java @@ -46,6 +46,8 @@ public class AEPPublisher extends AbstractAEPPublisher { private static final String MESSAGES_KEY = "messages"; private static final String RESPONSES_KEY = "responses"; private static final String STATUS_KEY = "status"; + private static final String XACTIONID_KEY = "xactionId"; + private static final String DASH = "-"; private int count; private final HttpProducer producer; @@ -68,6 +70,7 @@ public void publishData(List> messages) throws AEPStrea int totalMessageCount; final ArrayNode jsonMessages = JacksonFactory.OBJECT_MAPPER.createArrayNode(); + try { messages.stream() .map(Pair::getKey) @@ -100,7 +103,14 @@ public void publishData(List> messages) throws AEPStrea for (JsonNode messageResponse : publishMessagesResponses) { if (messageResponse.hasNonNull(STATUS_KEY)) { failedMessageCount++; - LOG.debug("Failed to publish message to Adobe Experience Platform: {}", messageResponse); + final Pair failedMessage = messages.get(getFailedMessageIndex(messageResponse)); + LOG.debug("Failed to publish message: {} to Adobe Experience Platform due to the error: {}", + failedMessage, messageResponse); + if (Objects.nonNull(errorReporter)) { + final int responseCode = messageResponse.get(STATUS_KEY).asInt(); + errorReporter.report(failedMessage.getRight(), + new HttpException(String.format("error response= %s", messageResponse), responseCode)); + } } else { successMessageCount++; } @@ -110,25 +120,32 @@ public void publishData(List> messages) throws AEPStrea } else { LOG.error("Invalid Response received while publishing data to Adobe Experience Platform: {}", response); } - } catch (JsonProcessingException jsonException) { LOG.error("Failed to publish data to Adobe Experience Platform", jsonException); if (Objects.nonNull(errorReporter)) { messages.forEach(message -> errorReporter.report(message.getValue(), jsonException)); } - throw new AEPStreamingException("Failed to publish invalid JSON", jsonException); } catch (HttpException httpException) { LOG.error("Failed to publish data to Adobe Experience Platform", httpException); if (Objects.nonNull(errorReporter)) { messages.forEach(message -> errorReporter.report(message.getValue(), httpException)); } final int responseCode = httpException.getResponseCode(); - if (HttpUtil.is500(responseCode) || HttpUtil.isUnauthorized(responseCode)) { + if (HttpUtil.isUnauthorized(responseCode)) { throw new AEPStreamingException("Failed to publish", httpException); } } } + private Integer getFailedMessageIndex(final JsonNode messageResponse) throws HttpException { + if (messageResponse.hasNonNull(XACTIONID_KEY)) { + final String xactionId = messageResponse.get(XACTIONID_KEY).asText(); + return Integer.parseInt(xactionId.substring(xactionId.lastIndexOf(DASH) + 1)); + } + throw new HttpException(String.format("xactionId is missing in the failed message error response : %s", + messageResponse)); + } + public void stop() { LOG.info("Stopping AEP Data Publisher after publishing {} messages", count); } diff --git a/streaming-connect-sink/src/test/java/com/adobe/platform/streaming/integration/AEPSinkConnectorErrorReporterTest.java b/streaming-connect-sink/src/test/java/com/adobe/platform/streaming/integration/AEPSinkConnectorErrorReporterTest.java index 896e8fe..84ce786 100644 --- a/streaming-connect-sink/src/test/java/com/adobe/platform/streaming/integration/AEPSinkConnectorErrorReporterTest.java +++ b/streaming-connect-sink/src/test/java/com/adobe/platform/streaming/integration/AEPSinkConnectorErrorReporterTest.java @@ -47,6 +47,7 @@ public class AEPSinkConnectorErrorReporterTest extends AbstractConnectorTest { private static final String AEP_KAFKA_ERROR_CONNECTOR_CONFIG = "aep-connector-error-reporter.json"; private static final String AEP_KAFKA_ERROR_CONNECTOR_HEADER_CONFIG = "aep-connector-error-reporter-header.json"; private static final String XDM_PAYLOAD_FILE = "xdm-data.json"; + private static final String XDM_MULTI_MESSAGE_PAYLOAD_FILE = "xdm-data-multiple-messages.json"; private static final String DEAD_LETTER_TOPIC = "errors.deadletterqueue.topic.name"; private static final String ERROR_CLASS_NAME = "__connect.errors.exception.class.name"; private static final String ERROR_HEADER_MESSAGE = "__connect.errors.exception.message"; @@ -70,7 +71,7 @@ public void kafkaErrorReporterTest() throws HttpException, IOException, Interrup LOG.info("Starting connector cluster with connector : {}", CONNECTOR_NAME); getConnect().configureConnector(CONNECTOR_NAME, connectorConfig); - String xdmData = xdmData(); + String xdmData = xdmData(XDM_PAYLOAD_FILE); getConnect().kafka().produce(TOPIC_NAME, xdmData); waitForConnectorStart(CONNECTOR_NAME, 1, 8000); @@ -87,7 +88,89 @@ public void kafkaErrorReporterTest() throws HttpException, IOException, Interrup // Verify inlet endpoint received 1 XDM record getWiremockServer().verify(postRequestedFor(urlEqualTo(getRelativeUrl())) - .withRequestBody(equalToJson(payloadReceivedXdmData()))); + .withRequestBody(equalToJson(payloadReceivedXdmData(XDM_PAYLOAD_FILE)))); + } + + @Test + public void kafkaErrorReporterMultiMessageTest() throws HttpException, IOException, InterruptedException { + inletMultiStatusSuccessfulResponse(); + getConnect().kafka().createTopic(TOPIC_NAME, TOPIC_PARTITION); + + // Create error topic to dump failed data + Map connectorConfig = connectorConfig(AEP_KAFKA_ERROR_CONNECTOR_CONFIG); + getConnect().kafka().createTopic(connectorConfig.get(DEAD_LETTER_TOPIC), TOPIC_PARTITION); + + LOG.info("Starting connector cluster with connector : {}", CONNECTOR_NAME); + getConnect().configureConnector(CONNECTOR_NAME, connectorConfig); + + String xdmData = xdmData(XDM_MULTI_MESSAGE_PAYLOAD_FILE); + ArrayNode xdmDataValues = (ArrayNode)JacksonFactory.OBJECT_MAPPER.readTree(xdmData); + String failedMessage = xdmDataValues.get(0).toString(); + String successMessage = xdmDataValues.get(1).toString(); + + getConnect().kafka().produce(TOPIC_NAME, failedMessage); + getConnect().kafka().produce(TOPIC_NAME, successMessage); + + waitForConnectorStart(CONNECTOR_NAME, 1, 8000); + + // Check if error record sent to error topic + ConsumerRecords consumerRecords = getConnect().kafka() + .consume(1, 8000, connectorConfig.get(DEAD_LETTER_TOPIC)); + + Assertions.assertEquals(1, consumerRecords.count()); + + ConsumerRecord consumerRecord = consumerRecords.iterator().next(); + JsonNode record = JacksonFactory.OBJECT_MAPPER.readTree(consumerRecord.value()); + + Assertions.assertEquals(JacksonFactory.OBJECT_MAPPER.readTree(failedMessage).toString(), record.toString()); + + // Verify inlet endpoint received 2 XDM record + getWiremockServer().verify(postRequestedFor(urlEqualTo(getRelativeUrl())) + .withRequestBody(equalToJson(payloadReceivedMultiMessageXdmData(XDM_MULTI_MESSAGE_PAYLOAD_FILE)))); + } + + @Test + public void kafkaErrorReporterMultiMessageWithHeadersTest() throws HttpException, IOException, InterruptedException { + inletMultiStatusSuccessfulResponse(); + + getConnect().kafka().createTopic(TOPIC_NAME, TOPIC_PARTITION); + + // Create error topic to dump failed data + Map connectorConfig = connectorConfig(AEP_KAFKA_ERROR_CONNECTOR_HEADER_CONFIG); + getConnect().kafka().createTopic(connectorConfig.get(DEAD_LETTER_TOPIC), TOPIC_PARTITION); + + LOG.info("Starting connector cluster with connector : {}", CONNECTOR_NAME); + getConnect().configureConnector(CONNECTOR_NAME, connectorConfig); + + String xdmData = xdmData(XDM_MULTI_MESSAGE_PAYLOAD_FILE); + ArrayNode xdmDataValues = (ArrayNode)JacksonFactory.OBJECT_MAPPER.readTree(xdmData); + String failedMessage = xdmDataValues.get(0).toString(); + String successMessage = xdmDataValues.get(1).toString(); + getConnect().kafka().produce(TOPIC_NAME, failedMessage); + getConnect().kafka().produce(TOPIC_NAME, successMessage); + + waitForConnectorStart(CONNECTOR_NAME, 1, 8000); + + // Check if error record sent to error topic + ConsumerRecords consumerRecords = getConnect().kafka() + .consume(1, 8000, connectorConfig.get(DEAD_LETTER_TOPIC)); + + Assertions.assertEquals(1, consumerRecords.count()); + + ConsumerRecord consumerRecord = consumerRecords.iterator().next(); + JsonNode record = JacksonFactory.OBJECT_MAPPER.readTree(consumerRecord.value()); + + Assertions.assertEquals(JacksonFactory.OBJECT_MAPPER.readTree(failedMessage).toString(), record.toString()); + + final Headers errorHeaders = consumerRecord.headers(); + errorHeaders.headers(ERROR_CLASS_NAME) + .forEach(header -> Assertions.assertEquals(EXPECTED_EXCEPTION_CLASS, new String(header.value()))); + errorHeaders.headers(ERROR_HEADER_MESSAGE).forEach(header -> + Assertions.assertTrue(new String(header.value()).contains(String.valueOf(HTTP_BAD_REQUEST_ERROR_CODE)))); + + // Verify inlet endpoint received 2 XDM record + getWiremockServer().verify(postRequestedFor(urlEqualTo(getRelativeUrl())) + .withRequestBody(equalToJson(payloadReceivedMultiMessageXdmData(XDM_MULTI_MESSAGE_PAYLOAD_FILE)))); } @Test @@ -101,7 +184,7 @@ public void kafkaErrorReporterWithHeadersTest() throws HttpException, IOExceptio LOG.info("Starting connector cluster with connector : {}", CONNECTOR_NAME); getConnect().configureConnector(CONNECTOR_NAME, connectorConfig); - String xdmData = xdmData(); + String xdmData = xdmData(XDM_PAYLOAD_FILE); getConnect().kafka().produce(TOPIC_NAME, xdmData); waitForConnectorStart(CONNECTOR_NAME, 1, 8000); @@ -122,13 +205,21 @@ public void kafkaErrorReporterWithHeadersTest() throws HttpException, IOExceptio errorHeaders.headers(ERROR_HEADER_MESSAGE).forEach(header -> Assertions.assertTrue(new String(header.value()).contains(String.valueOf(HTTP_SERVER_SIDE_ERROR_CODE)))); - // Verify inlet endpoint received 1 XDM record + // Verify inlet endpoint received 2 XDM record getWiremockServer().verify(postRequestedFor(urlEqualTo(getRelativeUrl())) - .withRequestBody(equalToJson(payloadReceivedXdmData()))); + .withRequestBody(equalToJson(payloadReceivedXdmData(XDM_PAYLOAD_FILE)))); } - public String payloadReceivedXdmData() throws HttpException, JsonProcessingException { - String xdmData = xdmData(); + public String payloadReceivedMultiMessageXdmData(String payloadfile) throws HttpException, JsonProcessingException { + String xdmData = xdmData(payloadfile); + ObjectNode messageNode = JacksonFactory.OBJECT_MAPPER.createObjectNode(); + ArrayNode xdmDataValues = (ArrayNode)JacksonFactory.OBJECT_MAPPER.readTree(xdmData); + messageNode.set("messages", xdmDataValues); + return JacksonFactory.OBJECT_MAPPER.writeValueAsString(messageNode); + } + + public String payloadReceivedXdmData(String payloadfile) throws HttpException, JsonProcessingException { + String xdmData = xdmData(payloadfile); ObjectNode messageNode = JacksonFactory.OBJECT_MAPPER.createObjectNode(); ArrayNode xdmDataValues = JacksonFactory.OBJECT_MAPPER.createArrayNode(); xdmDataValues.add(JacksonFactory.OBJECT_MAPPER.readTree(xdmData)); @@ -137,8 +228,8 @@ public String payloadReceivedXdmData() throws HttpException, JsonProcessingExcep return JacksonFactory.OBJECT_MAPPER.writeValueAsString(messageNode); } - public String xdmData() throws HttpException { - return HttpUtil.streamToString(this.getClass().getClassLoader().getResourceAsStream(XDM_PAYLOAD_FILE)); + public String xdmData(String payloadfile) throws HttpException { + return HttpUtil.streamToString(this.getClass().getClassLoader().getResourceAsStream(payloadfile)); } public Map connectorConfig(String configFile) throws HttpException, JsonProcessingException { diff --git a/streaming-connect-sink/src/test/java/com/adobe/platform/streaming/integration/AbstractConnectorTest.java b/streaming-connect-sink/src/test/java/com/adobe/platform/streaming/integration/AbstractConnectorTest.java index d2b8bd0..58f5b91 100644 --- a/streaming-connect-sink/src/test/java/com/adobe/platform/streaming/integration/AbstractConnectorTest.java +++ b/streaming-connect-sink/src/test/java/com/adobe/platform/streaming/integration/AbstractConnectorTest.java @@ -48,6 +48,8 @@ public abstract class AbstractConnectorTest { .constructMapLikeType(TreeMap.class, String.class, String.class); private static final long OFFSET_COMMIT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(5); protected static final int HTTP_SERVER_SIDE_ERROR_CODE = 500; + protected static final int HTTP_BAD_REQUEST_ERROR_CODE = 400; + private static final String AUTH_TOKEN_RESPONSE = "{\"access_token\":\"accessToken\"," + "\"refresh_token\":\"refreshToken\",\"token_type\":\"bearer\",\"expires_in\":82399996}"; private static final String AUTH_TOKEN_RESPONSE_OAUTH2 = "{\"access_token\":\"accessToken\"," + @@ -56,6 +58,8 @@ public abstract class AbstractConnectorTest { private static final String AEP_CONNECTOR_INLET_SUCCESSFUL_RESPONSE = "aep-connector-inlet-successful-response.json"; + private static final String AEP_CONNECTOR_INLET_MULTI_STATUS_SUCCESSFUL_RESPONSE = + "aep-connector-inlet-multi-message-response.json"; protected static final int TOPIC_PARTITION = 1; protected static final int NUMBER_OF_TASKS = 1; protected static final String CONNECTOR_NAME = "aep-sink-connector"; @@ -132,6 +136,15 @@ public void inletSuccessfulResponse() throws IOException { .getResourceAsStream(AEP_CONNECTOR_INLET_SUCCESSFUL_RESPONSE))))); } + public void inletMultiStatusSuccessfulResponse() throws IOException { + wiremockExtension.getWireMockServer() + .stubFor(WireMock + .post(WireMock.urlEqualTo(getRelativeUrl())) + .willReturn(ResponseDefinitionBuilder.responseDefinition() + .withJsonBody(JacksonFactory.OBJECT_MAPPER.readTree(this.getClass().getClassLoader() + .getResourceAsStream(AEP_CONNECTOR_INLET_MULTI_STATUS_SUCCESSFUL_RESPONSE))))); + } + public void inletIMSAuthenticationSuccessfulResponse() throws JsonProcessingException { wiremockExtension.getWireMockServer() .stubFor(WireMock diff --git a/streaming-connect-sink/src/test/resources/aep-connector-inlet-multi-message-response.json b/streaming-connect-sink/src/test/resources/aep-connector-inlet-multi-message-response.json new file mode 100644 index 0000000..ffc6e50 --- /dev/null +++ b/streaming-connect-sink/src/test/resources/aep-connector-inlet-multi-message-response.json @@ -0,0 +1,15 @@ +{ + "inletId": "9b0cb233972f3b0092992284c7353f5eead496218e8441a79b25e9421ea127f5", + "batchId": "1565638336649:1750:244", + "receivedTimeMs": 1565638336705, + "responses": [ + { + "xactionId":"9341f8eb-494a-4c89-9879-4d06a58d2dc7-0", + "status":400, + "message":"The 'header' field is mandatory. Provide a valid 'header' value and try again." + }, + { + "xactionId": "9341f8eb-494a-4c89-9879-4d06a58d2dc7-1" + } + ] +} \ No newline at end of file diff --git a/streaming-connect-sink/src/test/resources/aep-connector-inlet-successful-response.json b/streaming-connect-sink/src/test/resources/aep-connector-inlet-successful-response.json index 683318f..ffe9655 100644 --- a/streaming-connect-sink/src/test/resources/aep-connector-inlet-successful-response.json +++ b/streaming-connect-sink/src/test/resources/aep-connector-inlet-successful-response.json @@ -5,18 +5,6 @@ "responses": [ { "xactionId": "1565650704337:2124:92:3" - }, - { - "status": 400, - "message": "inletId: [9b0cb233972f3b0092992284c7353f5eead496218e8441a79b25e9421ea127f5] imsOrgId: [{ORG_ID}] Message has unknown xdm format" - }, - { - "status": 400, - "message": "inletId: [9b0cb233972f3b0092992284c7353f5eead496218e8441a79b25e9421ea127f5] imsOrgId: [{ORG_ID}] Message has an absent or wrong ims org in the header" - }, - { - "status": 400, - "message": "inletId: [9b0cb233972f3b0092992284c7353f5eead496218e8441a79b25e9421ea127f5] imsOrgId: [{ORG_ID}] Message has unknown xdm format" } ] } \ No newline at end of file diff --git a/streaming-connect-sink/src/test/resources/xdm-data-multiple-messages.json b/streaming-connect-sink/src/test/resources/xdm-data-multiple-messages.json new file mode 100644 index 0000000..452e79e --- /dev/null +++ b/streaming-connect-sink/src/test/resources/xdm-data-multiple-messages.json @@ -0,0 +1,39 @@ +[ + { + "body": { + "xdmMeta": { + "schemaRef": { + "id": "https://ns.adobe.com/{TENANT_ID}/schemas/{SCHEMA_ID}", + "contentType": "application/vnd.adobe.xed-full+json;version={SCHEMA_VERSION}" + } + }, + "xdmEntity": { + "firstname": "abc", + "lastname": "def" + } + } + }, + { + "header": { + "schemaRef": { + "id": "https://ns.adobe.com/{TENANT_ID}/schemas/{SCHEMA_ID}", + "contentType": "application/vnd.adobe.xed-full+json;version={SCHEMA_VERSION}" + }, + "source": { + "name": "aep-sink-connector" + } + }, + "body": { + "xdmMeta": { + "schemaRef": { + "id": "https://ns.adobe.com/{TENANT_ID}/schemas/{SCHEMA_ID}", + "contentType": "application/vnd.adobe.xed-full+json;version={SCHEMA_VERSION}" + } + }, + "xdmEntity": { + "firstname": "abc", + "lastname": "def" + } + } + } +] \ No newline at end of file