Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #5020] Optimize unit tests and code <HTTP Connector> #5023

Merged
merged 2 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion eventmesh-connectors/eventmesh-connector-http/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ dependencies {
implementation 'io.vertx:vertx-web-client:4.5.8'
implementation 'dev.failsafe:failsafe:3.3.2'


testImplementation 'org.apache.httpcomponents.client5:httpclient5:5.3.1'
testImplementation 'org.apache.httpcomponents.client5:httpclient5-fluent:5.3.1'
testImplementation 'org.mock-server:mockserver-netty:5.15.0'
testImplementation 'com.squareup.okhttp3:okhttp:4.12.0'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,14 @@ public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord httpConne

// get timestamp and offset
Long timestamp = httpConnectRecord.getData().getTimestamp();
Map<String, ?> offset = ((HttpRecordOffset) httpConnectRecord.getData().getPosition().getRecordOffset()).getOffsetMap();
Map<String, ?> offset = null;
try {
// May throw NullPointerException.
offset = ((HttpRecordOffset) httpConnectRecord.getData().getPosition().getRecordOffset()).getOffsetMap();
} catch (NullPointerException e) {
// ignore null pointer exception
}
final Map<String, ?> finalOffset = offset;

// send the request
return this.webClient.post(url.getPath())
Expand All @@ -143,26 +150,28 @@ public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord httpConne
.ssl(Objects.equals(url.getScheme(), "https"))
.sendJson(httpConnectRecord)
.onSuccess(res -> {
log.info("Request sent successfully. Record: timestamp={}, offset={}", timestamp, offset);
log.info("Request sent successfully. Record: timestamp={}, offset={}", timestamp, finalOffset);
// log the response
if (HttpUtils.is2xxSuccessful(res.statusCode())) {
if (log.isDebugEnabled()) {
log.debug("Received successful response: statusCode={}. Record: timestamp={}, offset={}, responseBody={}",
res.statusCode(), timestamp, offset, res.bodyAsString());
res.statusCode(), timestamp, finalOffset, res.bodyAsString());
} else {
log.info("Received successful response: statusCode={}. Record: timestamp={}, offset={}", res.statusCode(), timestamp, offset);
log.info("Received successful response: statusCode={}. Record: timestamp={}, offset={}", res.statusCode(), timestamp,
finalOffset);
}
} else {
if (log.isDebugEnabled()) {
log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}, offset={}, responseBody={}",
res.statusCode(), timestamp, offset, res.bodyAsString());
res.statusCode(), timestamp, finalOffset, res.bodyAsString());
} else {
log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}, offset={}", res.statusCode(), timestamp, offset);
log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}, offset={}", res.statusCode(), timestamp,
finalOffset);
}
}

})
.onFailure(err -> log.error("Request failed to send. Record: timestamp={}, offset={}", timestamp, offset, err));
.onFailure(err -> log.error("Request failed to send. Record: timestamp={}, offset={}", timestamp, finalOffset, err));
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,18 @@ public class WebhookHttpSinkHandler extends CommonHttpSinkHandler {
// store the received data, when webhook is enabled
private final SynchronizedCircularFifoQueue<HttpExportRecord> receivedDataQueue;

private volatile boolean exportStarted = false;

private volatile boolean exportDestroyed = false;

public boolean isExportStarted() {
return exportStarted;
}

public boolean isExportDestroyed() {
return exportDestroyed;
}

public WebhookHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) {
super(sinkConnectorConfig);
this.sinkConnectorConfig = sinkConnectorConfig;
Expand Down Expand Up @@ -179,10 +191,15 @@ public void start() {
// start the webclient
super.start();
// start the export server
Throwable t = this.exportServer.listen().cause();
if (t != null) {
throw new EventMeshException("Failed to start Vertx server. ", t);
}
this.exportServer.listen(res -> {
if (res.succeeded()) {
this.exportStarted = true;
log.info("WebhookHttpExportServer started on port: {}", this.webhookConfig.getPort());
} else {
log.error("WebhookHttpExportServer failed to start on port: {}", this.webhookConfig.getPort());
throw new EventMeshException("Failed to start Vertx server. ", res.cause());
}
});
}

/**
Expand Down Expand Up @@ -250,10 +267,15 @@ public void stop() {
super.stop();
// stop the export server
if (this.exportServer != null) {
Throwable t = this.exportServer.close().cause();
if (t != null) {
throw new EventMeshException("Failed to stop Vertx server. ", t);
}
this.exportServer.close(res -> {
if (res.succeeded()) {
this.exportDestroyed = true;
log.info("WebhookHttpExportServer stopped on port: {}", this.webhookConfig.getPort());
} else {
log.error("WebhookHttpExportServer failed to stop on port: {}", this.webhookConfig.getPort());
throw new EventMeshException("Failed to stop Vertx server. ", res.cause());
}
});
} else {
log.warn("Callback server is null, ignore.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,18 @@ public class HttpSourceConnector implements Source {

private HttpServer server;

private volatile boolean started = false;

private volatile boolean destroyed = false;

public boolean isStarted() {
return started;
}

public boolean isDestroyed() {
return destroyed;
}


@Override
public Class<? extends Config> configClass() {
Expand Down Expand Up @@ -105,10 +117,15 @@ private void doInit() {

@Override
public void start() {
Throwable t = this.server.listen().cause();
if (t != null) {
throw new EventMeshException("failed to start Vertx server", t);
}
this.server.listen(res -> {
if (res.succeeded()) {
this.started = true;
log.info("HttpSourceConnector started on port: {}", this.sourceConfig.getConnectorConfig().getPort());
} else {
log.error("HttpSourceConnector failed to start on port: {}", this.sourceConfig.getConnectorConfig().getPort());
throw new EventMeshException("failed to start Vertx server", res.cause());
}
});
}

@Override
Expand All @@ -123,9 +140,19 @@ public String name() {

@Override
public void stop() {
Throwable t = this.server.close().cause();
if (t != null) {
throw new EventMeshException("failed to stop Vertx server", t);
if (this.server != null) {
this.server.close(res -> {
if (res.succeeded()) {
this.destroyed = true;
log.info("HttpSourceConnector stopped on port: {}", this.sourceConfig.getConnectorConfig().getPort());
} else {
log.error("HttpSourceConnector failed to stop on port: {}", this.sourceConfig.getConnectorConfig().getPort());
throw new EventMeshException("failed to stop Vertx server", res.cause());
}
}
);
} else {
log.warn("HttpSourceConnector server is null, ignore.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.eventmesh.connector.http.source.connector;


import static org.mockserver.model.HttpRequest.request;

import org.apache.eventmesh.connector.http.sink.HttpSinkConnector;
Expand All @@ -25,59 +26,64 @@
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import org.apache.eventmesh.openconnect.util.ConfigUtil;

import org.apache.hc.client5.http.fluent.Request;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.net.URIBuilder;

import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockserver.integration.ClientAndServer;
import org.mockserver.model.HttpRequest;
import org.mockserver.model.HttpResponse;
import org.mockserver.model.MediaType;


import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;

import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.ResponseBody;

public class HttpSinkConnectorTest {

private HttpSinkConnector sinkConnector;

private HttpSinkConfig sinkConfig;

private URI severUri;
private URL url;

private ClientAndServer mockServer;

private static final AtomicInteger counter = new AtomicInteger(0);

@BeforeEach
void before() throws Exception {
// init sinkConnector
this.sinkConnector = new HttpSinkConnector();
this.sinkConfig = (HttpSinkConfig) ConfigUtil.parse(sinkConnector.configClass());
this.sinkConnector.init(this.sinkConfig);
this.sinkConnector.start();
sinkConnector = new HttpSinkConnector();
sinkConfig = (HttpSinkConfig) ConfigUtil.parse(sinkConnector.configClass());
sinkConnector.init(this.sinkConfig);
sinkConnector.start();

this.severUri = URI.create(sinkConfig.connectorConfig.getUrls()[0]);
url = new URL(sinkConfig.connectorConfig.getUrls()[0]);
// start mockServer
mockServer = ClientAndServer.startClientAndServer(severUri.getPort());
mockServer = ClientAndServer.startClientAndServer(url.getPort());
mockServer.reset()
.when(
request()
.withMethod("POST")
.withPath(severUri.getPath())
.withPath(url.getPath())
)
.respond(
httpRequest -> {
// Increase the number of requests received
counter.incrementAndGet();
JSONObject requestBody = JSON.parseObject(httpRequest.getBodyAsString());
return HttpResponse.response()
.withContentType(MediaType.APPLICATION_JSON)
Expand All @@ -90,6 +96,7 @@ void before() throws Exception {
); // .withDelay(TimeUnit.SECONDS, 10);
}
);

}

@AfterEach
Expand All @@ -101,62 +108,57 @@ void after() throws Exception {
@Test
void testPut() throws Exception {
// Create a list of ConnectRecord
final int times = 10;
final int size = 10;
List<ConnectRecord> connectRecords = new ArrayList<>();
for (int i = 0; i < times; i++) {
for (int i = 0; i < size; i++) {
ConnectRecord record = createConnectRecord();
connectRecords.add(record);
}
// Put ConnectRecord
sinkConnector.put(connectRecords);

// sleep 5s
Thread.sleep(5000);

// verify request
HttpRequest[] recordedRequests = mockServer.retrieveRecordedRequests(null);
// assert recordedRequests.length == times;
// wait for receiving request
final int times = 5000; // 5 seconds
long start = System.currentTimeMillis();
while (counter.get() < size) {
if (System.currentTimeMillis() - start > times) {
// timeout
Assertions.fail("The number of requests received=" + counter.get() + " is less than the number of ConnectRecord=" + size);
} else {
Thread.sleep(100);
}
}

// verify response
HttpWebhookConfig webhookConfig = sinkConfig.connectorConfig.getWebhookConfig();
String url = new HttpUrl.Builder()
.scheme("http")
.host(severUri.getHost())
.port(webhookConfig.getPort())
.addPathSegments(webhookConfig.getExportPath())
.addQueryParameter("pageNum", "1")
.addQueryParameter("pageSize", "10")
.addQueryParameter("type", "poll")
.build().toString();

// build request
Request request = new Request.Builder()
.url(url)
.addHeader("Content-Type", "application/json")

URI exportUrl = new URIBuilder()
.setScheme("http")
.setHost(url.getHost())
.setPort(webhookConfig.getPort())
.setPath(webhookConfig.getExportPath())
.addParameter("pageNum", "1")
.addParameter("pageSize", "10")
.addParameter("type", "poll")
.build();

OkHttpClient client = new OkHttpClient();
try (Response response = client.newCall(request).execute()) {
// check response code
if (!response.isSuccessful()) {
throw new RuntimeException("Unexpected response code: " + response);
}
// check response body
ResponseBody responseBody = response.body();
if (responseBody != null) {
JSONObject jsonObject = JSON.parseObject(responseBody.string());
Request.get(exportUrl)
.execute()
.handleResponse(response -> {
// check response code
Assertions.assertEquals(HttpStatus.SC_OK, response.getCode());
// check response body
JSONObject jsonObject = JSON.parseObject(response.getEntity().getContent());
JSONArray pageItems = jsonObject.getJSONArray("pageItems");

assert pageItems != null && pageItems.size() == times;

for (int i = 0; i < times; i++) {
Assertions.assertNotNull(pageItems);
Assertions.assertEquals(size, pageItems.size());
for (int i = 0; i < size; i++) {
JSONObject pageItem = pageItems.getJSONObject(i);
assert pageItem != null;
// assert pageItem.getJSONObject("data") != null;
// assert pageItem.getJSONObject("metadata") != null;
Assertions.assertNotNull(pageItem);
}
}
}
return null;
});
}

private ConnectRecord createConnectRecord() {
Expand Down
Loading
Loading