From 75de8fa8de2b897e8ebaeb0b1eb60869fec84125 Mon Sep 17 00:00:00 2001 From: zaki Date: Tue, 2 Jul 2024 21:27:33 +0800 Subject: [PATCH] fix: Optimize unit tests and code --- .../eventmesh-connector-http/build.gradle | 4 +- .../sink/handle/CommonHttpSinkHandler.java | 23 ++- .../sink/handle/WebhookHttpSinkHandler.java | 38 +++- .../source/connector/HttpSourceConnector.java | 41 +++- .../connector/HttpSinkConnectorTest.java | 110 ++++++----- .../connector/HttpSourceConnectorTest.java | 184 ++++++++++-------- 6 files changed, 239 insertions(+), 161 deletions(-) diff --git a/eventmesh-connectors/eventmesh-connector-http/build.gradle b/eventmesh-connectors/eventmesh-connector-http/build.gradle index b64f7903b1..cfc69259d5 100644 --- a/eventmesh-connectors/eventmesh-connector-http/build.gradle +++ b/eventmesh-connectors/eventmesh-connector-http/build.gradle @@ -24,8 +24,10 @@ dependencies { implementation 'io.vertx:vertx-web-client:4.5.8' implementation 'dev.failsafe:failsafe:3.3.2' + + testImplementation 'org.apache.httpcomponents.client5:httpclient5:5.3.1' + testImplementation 'org.apache.httpcomponents.client5:httpclient5-fluent:5.3.1' testImplementation 'org.mock-server:mockserver-netty:5.15.0' - testImplementation 'com.squareup.okhttp3:okhttp:4.12.0' compileOnly 'org.projectlombok:lombok' annotationProcessor 'org.projectlombok:lombok' } diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java index 7eeba88d6a..c6cc90e0e0 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java @@ -133,7 +133,14 @@ public Future> deliver(URI url, HttpConnectRecord httpConne // get timestamp and offset Long timestamp = httpConnectRecord.getData().getTimestamp(); - Map offset = ((HttpRecordOffset) httpConnectRecord.getData().getPosition().getRecordOffset()).getOffsetMap(); + Map offset = null; + try { + // May throw NullPointerException. + offset = ((HttpRecordOffset) httpConnectRecord.getData().getPosition().getRecordOffset()).getOffsetMap(); + } catch (NullPointerException e) { + // ignore null pointer exception + } + final Map finalOffset = offset; // send the request return this.webClient.post(url.getPath()) @@ -143,26 +150,28 @@ public Future> deliver(URI url, HttpConnectRecord httpConne .ssl(Objects.equals(url.getScheme(), "https")) .sendJson(httpConnectRecord) .onSuccess(res -> { - log.info("Request sent successfully. Record: timestamp={}, offset={}", timestamp, offset); + log.info("Request sent successfully. Record: timestamp={}, offset={}", timestamp, finalOffset); // log the response if (HttpUtils.is2xxSuccessful(res.statusCode())) { if (log.isDebugEnabled()) { log.debug("Received successful response: statusCode={}. Record: timestamp={}, offset={}, responseBody={}", - res.statusCode(), timestamp, offset, res.bodyAsString()); + res.statusCode(), timestamp, finalOffset, res.bodyAsString()); } else { - log.info("Received successful response: statusCode={}. Record: timestamp={}, offset={}", res.statusCode(), timestamp, offset); + log.info("Received successful response: statusCode={}. Record: timestamp={}, offset={}", res.statusCode(), timestamp, + finalOffset); } } else { if (log.isDebugEnabled()) { log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}, offset={}, responseBody={}", - res.statusCode(), timestamp, offset, res.bodyAsString()); + res.statusCode(), timestamp, finalOffset, res.bodyAsString()); } else { - log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}, offset={}", res.statusCode(), timestamp, offset); + log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}, offset={}", res.statusCode(), timestamp, + finalOffset); } } }) - .onFailure(err -> log.error("Request failed to send. Record: timestamp={}, offset={}", timestamp, offset, err)); + .onFailure(err -> log.error("Request failed to send. Record: timestamp={}, offset={}", timestamp, finalOffset, err)); } diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java index 9af246bc6f..4e64126a9d 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java @@ -72,6 +72,18 @@ public class WebhookHttpSinkHandler extends CommonHttpSinkHandler { // store the received data, when webhook is enabled private final SynchronizedCircularFifoQueue receivedDataQueue; + private volatile boolean exportStarted = false; + + private volatile boolean exportDestroyed = false; + + public boolean isExportStarted() { + return exportStarted; + } + + public boolean isExportDestroyed() { + return exportDestroyed; + } + public WebhookHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) { super(sinkConnectorConfig); this.sinkConnectorConfig = sinkConnectorConfig; @@ -179,10 +191,15 @@ public void start() { // start the webclient super.start(); // start the export server - Throwable t = this.exportServer.listen().cause(); - if (t != null) { - throw new EventMeshException("Failed to start Vertx server. ", t); - } + this.exportServer.listen(res -> { + if (res.succeeded()) { + this.exportStarted = true; + log.info("WebhookHttpExportServer started on port: {}", this.webhookConfig.getPort()); + } else { + log.error("WebhookHttpExportServer failed to start on port: {}", this.webhookConfig.getPort()); + throw new EventMeshException("Failed to start Vertx server. ", res.cause()); + } + }); } /** @@ -250,10 +267,15 @@ public void stop() { super.stop(); // stop the export server if (this.exportServer != null) { - Throwable t = this.exportServer.close().cause(); - if (t != null) { - throw new EventMeshException("Failed to stop Vertx server. ", t); - } + this.exportServer.close(res -> { + if (res.succeeded()) { + this.exportDestroyed = true; + log.info("WebhookHttpExportServer stopped on port: {}", this.webhookConfig.getPort()); + } else { + log.error("WebhookHttpExportServer failed to stop on port: {}", this.webhookConfig.getPort()); + throw new EventMeshException("Failed to stop Vertx server. ", res.cause()); + } + }); } else { log.warn("Callback server is null, ignore."); } diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnector.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnector.java index b976fed9bd..c59915b202 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnector.java @@ -55,6 +55,18 @@ public class HttpSourceConnector implements Source { private HttpServer server; + private volatile boolean started = false; + + private volatile boolean destroyed = false; + + public boolean isStarted() { + return started; + } + + public boolean isDestroyed() { + return destroyed; + } + @Override public Class configClass() { @@ -105,10 +117,15 @@ private void doInit() { @Override public void start() { - Throwable t = this.server.listen().cause(); - if (t != null) { - throw new EventMeshException("failed to start Vertx server", t); - } + this.server.listen(res -> { + if (res.succeeded()) { + this.started = true; + log.info("HttpSourceConnector started on port: {}", this.sourceConfig.getConnectorConfig().getPort()); + } else { + log.error("HttpSourceConnector failed to start on port: {}", this.sourceConfig.getConnectorConfig().getPort()); + throw new EventMeshException("failed to start Vertx server", res.cause()); + } + }); } @Override @@ -123,9 +140,19 @@ public String name() { @Override public void stop() { - Throwable t = this.server.close().cause(); - if (t != null) { - throw new EventMeshException("failed to stop Vertx server", t); + if (this.server != null) { + this.server.close(res -> { + if (res.succeeded()) { + this.destroyed = true; + log.info("HttpSourceConnector stopped on port: {}", this.sourceConfig.getConnectorConfig().getPort()); + } else { + log.error("HttpSourceConnector failed to stop on port: {}", this.sourceConfig.getConnectorConfig().getPort()); + throw new EventMeshException("failed to stop Vertx server", res.cause()); + } + } + ); + } else { + log.warn("HttpSourceConnector server is null, ignore."); } } diff --git a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSinkConnectorTest.java b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSinkConnectorTest.java index eeba625b02..778d963b56 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSinkConnectorTest.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSinkConnectorTest.java @@ -17,6 +17,7 @@ package org.apache.eventmesh.connector.http.source.connector; + import static org.mockserver.model.HttpRequest.request; import org.apache.eventmesh.connector.http.sink.HttpSinkConnector; @@ -25,28 +26,30 @@ import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; import org.apache.eventmesh.openconnect.util.ConfigUtil; +import org.apache.hc.client5.http.fluent.Request; +import org.apache.hc.core5.http.HttpStatus; +import org.apache.hc.core5.net.URIBuilder; + import java.net.URI; +import java.net.URL; import java.util.ArrayList; import java.util.List; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockserver.integration.ClientAndServer; -import org.mockserver.model.HttpRequest; import org.mockserver.model.HttpResponse; import org.mockserver.model.MediaType; + import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; -import okhttp3.HttpUrl; -import okhttp3.OkHttpClient; -import okhttp3.Request; -import okhttp3.Response; -import okhttp3.ResponseBody; public class HttpSinkConnectorTest { @@ -54,30 +57,33 @@ public class HttpSinkConnectorTest { private HttpSinkConfig sinkConfig; - private URI severUri; + private URL url; private ClientAndServer mockServer; + private static final AtomicInteger counter = new AtomicInteger(0); @BeforeEach void before() throws Exception { // init sinkConnector - this.sinkConnector = new HttpSinkConnector(); - this.sinkConfig = (HttpSinkConfig) ConfigUtil.parse(sinkConnector.configClass()); - this.sinkConnector.init(this.sinkConfig); - this.sinkConnector.start(); + sinkConnector = new HttpSinkConnector(); + sinkConfig = (HttpSinkConfig) ConfigUtil.parse(sinkConnector.configClass()); + sinkConnector.init(this.sinkConfig); + sinkConnector.start(); - this.severUri = URI.create(sinkConfig.connectorConfig.getUrls()[0]); + url = new URL(sinkConfig.connectorConfig.getUrls()[0]); // start mockServer - mockServer = ClientAndServer.startClientAndServer(severUri.getPort()); + mockServer = ClientAndServer.startClientAndServer(url.getPort()); mockServer.reset() .when( request() .withMethod("POST") - .withPath(severUri.getPath()) + .withPath(url.getPath()) ) .respond( httpRequest -> { + // Increase the number of requests received + counter.incrementAndGet(); JSONObject requestBody = JSON.parseObject(httpRequest.getBodyAsString()); return HttpResponse.response() .withContentType(MediaType.APPLICATION_JSON) @@ -90,6 +96,7 @@ void before() throws Exception { ); // .withDelay(TimeUnit.SECONDS, 10); } ); + } @AfterEach @@ -101,62 +108,57 @@ void after() throws Exception { @Test void testPut() throws Exception { // Create a list of ConnectRecord - final int times = 10; + final int size = 10; List connectRecords = new ArrayList<>(); - for (int i = 0; i < times; i++) { + for (int i = 0; i < size; i++) { ConnectRecord record = createConnectRecord(); connectRecords.add(record); } // Put ConnectRecord sinkConnector.put(connectRecords); - // sleep 5s - Thread.sleep(5000); - - // verify request - HttpRequest[] recordedRequests = mockServer.retrieveRecordedRequests(null); - // assert recordedRequests.length == times; + // wait for receiving request + final int times = 5000; // 5 seconds + long start = System.currentTimeMillis(); + while (counter.get() < size) { + if (System.currentTimeMillis() - start > times) { + // timeout + Assertions.fail("The number of requests received=" + counter.get() + " is less than the number of ConnectRecord=" + size); + } else { + Thread.sleep(100); + } + } // verify response HttpWebhookConfig webhookConfig = sinkConfig.connectorConfig.getWebhookConfig(); - String url = new HttpUrl.Builder() - .scheme("http") - .host(severUri.getHost()) - .port(webhookConfig.getPort()) - .addPathSegments(webhookConfig.getExportPath()) - .addQueryParameter("pageNum", "1") - .addQueryParameter("pageSize", "10") - .addQueryParameter("type", "poll") - .build().toString(); - - // build request - Request request = new Request.Builder() - .url(url) - .addHeader("Content-Type", "application/json") + + URI exportUrl = new URIBuilder() + .setScheme("http") + .setHost(url.getHost()) + .setPort(webhookConfig.getPort()) + .setPath(webhookConfig.getExportPath()) + .addParameter("pageNum", "1") + .addParameter("pageSize", "10") + .addParameter("type", "poll") .build(); - OkHttpClient client = new OkHttpClient(); - try (Response response = client.newCall(request).execute()) { - // check response code - if (!response.isSuccessful()) { - throw new RuntimeException("Unexpected response code: " + response); - } - // check response body - ResponseBody responseBody = response.body(); - if (responseBody != null) { - JSONObject jsonObject = JSON.parseObject(responseBody.string()); + Request.get(exportUrl) + .execute() + .handleResponse(response -> { + // check response code + Assertions.assertEquals(HttpStatus.SC_OK, response.getCode()); + // check response body + JSONObject jsonObject = JSON.parseObject(response.getEntity().getContent()); JSONArray pageItems = jsonObject.getJSONArray("pageItems"); - assert pageItems != null && pageItems.size() == times; - - for (int i = 0; i < times; i++) { + Assertions.assertNotNull(pageItems); + Assertions.assertEquals(size, pageItems.size()); + for (int i = 0; i < size; i++) { JSONObject pageItem = pageItems.getJSONObject(i); - assert pageItem != null; - // assert pageItem.getJSONObject("data") != null; - // assert pageItem.getJSONObject("metadata") != null; + Assertions.assertNotNull(pageItem); } - } - } + return null; + }); } private ConnectRecord createConnectRecord() { diff --git a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnectorTest.java b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnectorTest.java index b764b4a989..8e3735dd21 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnectorTest.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnectorTest.java @@ -24,50 +24,83 @@ import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; import org.apache.eventmesh.openconnect.util.ConfigUtil; +import org.apache.hc.client5.http.fluent.Request; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.HttpStatus; +import org.apache.hc.core5.http.io.entity.StringEntity; + +import java.io.IOException; import java.net.URL; import java.util.List; import java.util.Objects; import java.util.UUID; -import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import okhttp3.MediaType; -import okhttp3.OkHttpClient; -import okhttp3.Request; -import okhttp3.RequestBody; -import okhttp3.Response; - class HttpSourceConnectorTest { - private HttpSourceConnector connector; - private SourceConnectorConfig config; - private OkHttpClient httpClient; - private String url; - private final String expectedMessage = "testHttpMessage"; + private static HttpSourceConnector connector; + private static String url; + private static final String expectedMessage = "testHttpMessage"; + private static final int batchSize = 10; - @BeforeEach - void setUp() throws Exception { + + @BeforeAll + static void setUpAll() throws Exception { connector = new HttpSourceConnector(); - HttpSourceConfig sourceConfig = (HttpSourceConfig) ConfigUtil.parse(connector.configClass()); - config = sourceConfig.getConnectorConfig(); + final HttpSourceConfig sourceConfig = (HttpSourceConfig) ConfigUtil.parse(connector.configClass()); + final SourceConnectorConfig config = sourceConfig.getConnectorConfig(); + // initialize and start the connector connector.init(sourceConfig); connector.start(); + // wait for the connector to start + long timeout = 5000; // 5 seconds + long start = System.currentTimeMillis(); + while (!connector.isStarted()) { + if (System.currentTimeMillis() - start > timeout) { + // timeout + Assertions.fail("Failed to start the connector"); + } else { + Thread.sleep(100); + } + } + url = new URL("http", "127.0.0.1", config.getPort(), config.getPath()).toString(); - httpClient = new OkHttpClient(); } + @AfterAll + static void tearDownAll() throws IOException { + connector.stop(); + } + + @Test - void testPoll() throws Exception { - final int batchSize = 10; - // test binary content mode + void testPollForBinaryRequest() { for (int i = 0; i < batchSize; i++) { - try (Response resp = mockBinaryRequest()) { - Assertions.assertEquals(200, resp.code()); + try { + // Set the request body + StringEntity entity = new StringEntity(expectedMessage, ContentType.TEXT_PLAIN); + + Request.post(url) + .addHeader("Content-Type", "text/plain") + .addHeader("ce-id", String.valueOf(UUID.randomUUID())) + .addHeader("ce-specversion", "1.0") + .addHeader("ce-type", "com.example.someevent") + .addHeader("ce-source", "/mycontext") + .addHeader("ce-subject", "test") + .body(entity) + .execute() + .handleResponse(res -> { + Assertions.assertEquals(HttpStatus.SC_OK, res.getCode()); + return null; + }); + } catch (IOException e) { + Assertions.fail("Failed to send request", e); } } List res = connector.poll(); @@ -75,78 +108,61 @@ void testPoll() throws Exception { for (ConnectRecord r : res) { Assertions.assertEquals(expectedMessage, new String((byte[]) r.getData())); } + } - // test structured content mode + @Test + void testPollForStructuredRequest() { for (int i = 0; i < batchSize; i++) { - try (Response resp = mockStructuredRequest()) { - Assertions.assertEquals(200, resp.code()); + try { + // Create a CloudEvent + TestEvent event = new TestEvent(); + event.id = String.valueOf(UUID.randomUUID()); + event.specversion = "1.0"; + event.type = "com.example.someevent"; + event.source = "/mycontext"; + event.subject = "test"; + event.datacontenttype = "text/plain"; + event.data = expectedMessage; + + // Set the request body + StringEntity entity = new StringEntity(Objects.requireNonNull(JsonUtils.toJSONString(event)), ContentType.APPLICATION_JSON); + + // Send the request and return the response + Request.post(url) + .addHeader("Content-Type", "application/cloudevents+json") + .body(entity) + .execute() + .handleResponse(res -> { + Assertions.assertEquals(HttpStatus.SC_OK, res.getCode()); + return null; + }); + } catch (IOException e) { + Assertions.fail("Failed to send request", e); } } - res = connector.poll(); + List res = connector.poll(); Assertions.assertEquals(batchSize, res.size()); for (ConnectRecord r : res) { Assertions.assertEquals(expectedMessage, new String((byte[]) r.getData())); } - - // test invalid requests - Request request = new Request.Builder() - .url(url) - .addHeader("Content-Type", "text/plain") - .addHeader("ce-id", String.valueOf(UUID.randomUUID())) - .build(); - - try (Response resp = httpClient.newCall(request).execute()) { - // verify the response code - Assertions.assertEquals(405, resp.code()); - } - - } - - Response mockBinaryRequest() throws Exception { - - RequestBody body = RequestBody.create(expectedMessage, MediaType.parse("text/plain")); - - Request request = new Request.Builder() - .url(url) - .addHeader("Content-Type", "text/plain") - .addHeader("ce-id", String.valueOf(UUID.randomUUID())) - .addHeader("ce-specversion", "1.0") - .addHeader("ce-type", "com.example.someevent") - .addHeader("ce-source", "/mycontext") - .addHeader("ce-subject", "test") - .post(body) - .build(); - - return httpClient.newCall(request).execute(); } - Response mockStructuredRequest() throws Exception { - // create a CloudEvent - TestEvent event = new TestEvent(); - event.id = String.valueOf(UUID.randomUUID()); - event.specversion = "1.0"; - event.type = "com.example.someevent"; - event.source = "/mycontext"; - event.subject = "test"; - event.datacontenttype = "text/plain"; - event.data = expectedMessage; - - RequestBody body = RequestBody.create(Objects.requireNonNull(JsonUtils.toJSONString(event)), MediaType.parse("application/cloudevents+json")); - Request request = new Request.Builder() - .url(url) - .addHeader("Content-Type", "application/cloudevents+json") - .post(body) - .build(); - - return httpClient.newCall(request).execute(); - - } - - @AfterEach - void tearDown() { - connector.stop(); - httpClient.dispatcher().executorService().shutdown(); + @Test + void testPollForInvalidRequest() { + // Send a bad request. + try { + Request.post(url) + .addHeader("Content-Type", "text/plain") + .execute() + .handleResponse(res -> { + // Check the response code + Assertions.assertEquals(HttpStatus.SC_BAD_REQUEST, res.getCode()); + return null; + }); + } catch (IOException e) { + Assertions.fail("Failed to send request", e); + } } class TestEvent {