Skip to content

Commit

Permalink
[ISSUE #5144] update eventmesh-connector-http module (#5145)
Browse files Browse the repository at this point in the history
* [ISSUE #5137] update connector runtime v2 module

* fix checkStyle error

* [ISSUE #5139] update canal connector module

* [ISSUE #5141] update eventmesh-admin-server module

* [ISSUE #5144] update eventmesh-connector-http module
  • Loading branch information
xwm1992 authored Dec 11, 2024
1 parent f6aa097 commit b72d4f8
Show file tree
Hide file tree
Showing 10 changed files with 153 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,24 @@ public class SinkConnectorConfig {
// timeunit: ms, default 5000ms
private int idleTimeout = 5000;

// maximum number of HTTP/1 connections a client will pool, default 5
private int maxConnectionPoolSize = 5;
// maximum number of HTTP/1 connections a client will pool, default 50
private int maxConnectionPoolSize = 50;

// retry config
private HttpRetryConfig retryConfig = new HttpRetryConfig();

// webhook config
private HttpWebhookConfig webhookConfig = new HttpWebhookConfig();

private String deliveryStrategy = "ROUND_ROBIN";

private boolean skipDeliverException = false;

// managed pipelining param, default true
private boolean isParallelized = true;

private int parallelism = 2;


/**
* Fill default values if absent (When there are multiple default values for a field)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,18 @@ public class SourceConnectorConfig {
*/
private int maxFormAttributeSize = 1024 * 1024;

// protocol, default Common
// max size of the queue, default 1000
private int maxStorageSize = 1000;

// batch size, default 10
private int batchSize = 10;

// protocol, default CloudEvent
private String protocol = "Common";

// extra config, e.g. GitHub secret
private Map<String, String> extraConfig = new HashMap<>();

// data consistency enabled, default true
private boolean dataConsistencyEnabled = false;
private boolean dataConsistencyEnabled = true;
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ public synchronized List<E> fetchRange(int start, int end, boolean removed) {
count++;
}
return items;

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.eventmesh.connector.http.sink;

import org.apache.eventmesh.common.EventMeshThreadFactory;
import org.apache.eventmesh.common.config.connector.Config;
import org.apache.eventmesh.common.config.connector.http.HttpSinkConfig;
import org.apache.eventmesh.common.config.connector.http.SinkConnectorConfig;
Expand All @@ -32,6 +33,10 @@

import java.util.List;
import java.util.Objects;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import lombok.Getter;
import lombok.SneakyThrows;
Expand All @@ -45,6 +50,12 @@ public class HttpSinkConnector implements Sink, ConnectorCreateService<Sink> {
@Getter
private HttpSinkHandler sinkHandler;

private ThreadPoolExecutor executor;

private final LinkedBlockingQueue<ConnectRecord> queue = new LinkedBlockingQueue<>(10000);

private final AtomicBoolean isStart = new AtomicBoolean(true);

@Override
public Class<? extends Config> configClass() {
return HttpSinkConfig.class;
Expand Down Expand Up @@ -90,11 +101,30 @@ private void doInit() {
} else {
throw new IllegalArgumentException("Max retries must be greater than or equal to 0.");
}
boolean isParallelized = this.httpSinkConfig.connectorConfig.isParallelized();
int parallelism = isParallelized ? this.httpSinkConfig.connectorConfig.getParallelism() : 1;
executor = new ThreadPoolExecutor(parallelism, parallelism, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(), new EventMeshThreadFactory("http-sink-handler"));
}

@Override
public void start() throws Exception {
this.sinkHandler.start();
for (int i = 0; i < this.httpSinkConfig.connectorConfig.getParallelism(); i++) {
executor.execute(() -> {
while (isStart.get()) {
ConnectRecord connectRecord = null;
try {
connectRecord = queue.poll(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (connectRecord != null) {
sinkHandler.handle(connectRecord);
}
}
});
}
}

@Override
Expand All @@ -114,7 +144,18 @@ public void onException(ConnectRecord record) {

@Override
public void stop() throws Exception {
isStart.set(false);
while (!queue.isEmpty()) {
ConnectRecord record = queue.poll();
this.sinkHandler.handle(record);
}
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
this.sinkHandler.stop();
log.info("All tasks completed, start shut down http sink connector");
}

@Override
Expand All @@ -125,8 +166,7 @@ public void put(List<ConnectRecord> sinkRecords) {
log.warn("ConnectRecord data is null, ignore.");
continue;
}
// Handle the ConnectRecord
this.sinkHandler.handle(sinkRecord);
queue.put(sinkRecord);
} catch (Exception e) {
log.error("Failed to sink message via HTTP. ", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ public class HttpExportMetadata implements Serializable {

private LocalDateTime receivedTime;

private String httpRecordId;

private String recordId;

private String retriedBy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,32 +30,33 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import lombok.Getter;

/**
* AbstractHttpSinkHandler is an abstract class that provides a base implementation for HttpSinkHandler.
*/
public abstract class AbstractHttpSinkHandler implements HttpSinkHandler {

@Getter
private final SinkConnectorConfig sinkConnectorConfig;

@Getter
private final List<URI> urls;

private final HttpDeliveryStrategy deliveryStrategy;

private int roundRobinIndex = 0;

protected AbstractHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) {
this.sinkConnectorConfig = sinkConnectorConfig;
this.deliveryStrategy = HttpDeliveryStrategy.valueOf(sinkConnectorConfig.getDeliveryStrategy());
// Initialize URLs
String[] urlStrings = sinkConnectorConfig.getUrls();
this.urls = Arrays.stream(urlStrings)
.map(URI::create)
.collect(Collectors.toList());
}

public SinkConnectorConfig getSinkConnectorConfig() {
return sinkConnectorConfig;
}

public List<URI> getUrls() {
return urls;
}

/**
* Processes a ConnectRecord by sending it over HTTP or HTTPS. This method should be called for each ConnectRecord that needs to be processed.
*
Expand All @@ -65,23 +66,38 @@ public List<URI> getUrls() {
public void handle(ConnectRecord record) {
// build attributes
Map<String, Object> attributes = new ConcurrentHashMap<>();
attributes.put(MultiHttpRequestContext.NAME, new MultiHttpRequestContext(urls.size()));

// send the record to all URLs
for (URI url : urls) {
// convert ConnectRecord to HttpConnectRecord
String type = String.format("%s.%s.%s",
this.sinkConnectorConfig.getConnectorName(), url.getScheme(),
this.sinkConnectorConfig.getWebhookConfig().isActivate() ? "webhook" : "common");
HttpConnectRecord httpConnectRecord = HttpConnectRecord.convertConnectRecord(record, type);

// add AttemptEvent to the attributes
HttpAttemptEvent attemptEvent = new HttpAttemptEvent(this.sinkConnectorConfig.getRetryConfig().getMaxRetries() + 1);
attributes.put(HttpAttemptEvent.PREFIX + httpConnectRecord.getHttpRecordId(), attemptEvent);

// deliver the record
deliver(url, httpConnectRecord, attributes, record);

switch (deliveryStrategy) {
case ROUND_ROBIN:
attributes.put(MultiHttpRequestContext.NAME, new MultiHttpRequestContext(1));
URI url = urls.get(roundRobinIndex);
roundRobinIndex = (roundRobinIndex + 1) % urls.size();
sendRecordToUrl(record, attributes, url);
break;
case BROADCAST:
for (URI broadcastUrl : urls) {
attributes.put(MultiHttpRequestContext.NAME, new MultiHttpRequestContext(urls.size()));
sendRecordToUrl(record, attributes, broadcastUrl);
}
break;
default:
throw new IllegalArgumentException("Unknown delivery strategy: " + deliveryStrategy);
}
}

private void sendRecordToUrl(ConnectRecord record, Map<String, Object> attributes, URI url) {
// convert ConnectRecord to HttpConnectRecord
String type = String.format("%s.%s.%s",
this.sinkConnectorConfig.getConnectorName(), url.getScheme(),
this.sinkConnectorConfig.getWebhookConfig().isActivate() ? "webhook" : "common");
HttpConnectRecord httpConnectRecord = HttpConnectRecord.convertConnectRecord(record, type);

// add AttemptEvent to the attributes
HttpAttemptEvent attemptEvent = new HttpAttemptEvent(this.sinkConnectorConfig.getRetryConfig().getMaxRetries() + 1);
attributes.put(HttpAttemptEvent.PREFIX + httpConnectRecord.getHttpRecordId(), attemptEvent);

// deliver the record
deliver(url, httpConnectRecord, attributes, record);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.http.sink.handler;

public enum HttpDeliveryStrategy {
ROUND_ROBIN,
BROADCAST
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ private void doInitWebClient() {
.setIdleTimeout(sinkConnectorConfig.getIdleTimeout())
.setIdleTimeoutUnit(TimeUnit.MILLISECONDS)
.setConnectTimeout(sinkConnectorConfig.getConnectionTimeout())
.setMaxPoolSize(sinkConnectorConfig.getMaxConnectionPoolSize());
.setMaxPoolSize(sinkConnectorConfig.getMaxConnectionPoolSize())
.setPipelining(sinkConnectorConfig.isParallelized());
this.webClient = WebClient.create(vertx, options);
}

Expand All @@ -108,7 +109,7 @@ private void doInitWebClient() {
*/
@Override
public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord httpConnectRecord, Map<String, Object> attributes,
ConnectRecord connectRecord) {
ConnectRecord connectRecord) {
// create headers
Map<String, Object> extensionMap = new HashMap<>();
Set<String> extensionKeySet = httpConnectRecord.getExtensions().keySet();
Expand Down Expand Up @@ -203,6 +204,9 @@ private void tryCallback(HttpConnectRecord httpConnectRecord, Throwable e, Map<S
// failure
record.getCallback().onException(buildSendExceptionContext(record, lastFailedEvent.getLastException()));
}
} else {
log.warn("still have requests to process, size {}|attempt num {}",
multiHttpRequestContext.getRemainingRequests(), attemptEvent.getAttempts());
}
}

Expand Down
Loading

0 comments on commit b72d4f8

Please sign in to comment.