Skip to content

Commit

Permalink
[improve] update activemq connector config option
Browse files Browse the repository at this point in the history
  • Loading branch information
liunaijie committed Jan 23, 2025
1 parent 3cf09f6 commit a79618a
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 163 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import org.apache.activemq.ActiveMQConnectionFactory;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import javax.jms.Connection;
Expand All @@ -35,22 +34,21 @@

import java.nio.charset.StandardCharsets;

import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.ALWAYS_SESSION_ASYNC;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.ALWAYS_SYNC_SEND;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.CHECK_FOR_DUPLICATE;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.CLIENT_ID;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.CLOSE_TIMEOUT;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.CONSUMER_EXPIRY_CHECK_ENABLED;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.DISPATCH_ASYNC;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.NESTED_MAP_AND_LIST_ENABLED;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.PASSWORD;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.QUEUE_NAME;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.URI;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.USERNAME;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.WARN_ABOUT_UNSTARTED_CONNECTION_TIMEOUT;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.ALWAYS_SESSION_ASYNC;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.ALWAYS_SYNC_SEND;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.CHECK_FOR_DUPLICATE;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.CLIENT_ID;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.CLOSE_TIMEOUT;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.CONSUMER_EXPIRY_CHECK_ENABLED;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.DISPATCH_ASYNC;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.NESTED_MAP_AND_LIST_ENABLED;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.PASSWORD;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.QUEUE_NAME;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.URI;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.USERNAME;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.WARN_ABOUT_UNSTARTED_CONNECTION_TIMEOUT;

@Slf4j
@AllArgsConstructor
public class ActivemqClient {
private final ReadonlyConfig config;
private final ActiveMQConnectionFactory connectionFactory;
Expand Down Expand Up @@ -102,7 +100,6 @@ public ActiveMQConnectionFactory getConnectionFactory() {
if (config.get(DISPATCH_ASYNC) != null) {
factory.setDispatchAsync(config.get(DISPATCH_ASYNC));
}

if (config.get(WARN_ABOUT_UNSTARTED_CONNECTION_TIMEOUT) != null) {
factory.setWarnAboutUnstartedConnectionTimeout(
config.get(WARN_ABOUT_UNSTARTED_CONNECTION_TIMEOUT));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,56 +17,12 @@

package org.apache.seatunnel.connectors.seatunnel.activemq.config;

import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;

@Setter
@Getter
@AllArgsConstructor
public class ActivemqConfig implements Serializable {
private String host;
private Integer port;
private String username;
private String password;
private String uri;
private String queueName;
private Boolean checkForDuplicate;
private String clientID;
private Integer closeTimeout;
private Boolean consumerExpiryCheckEnabled;
private Boolean copyMessageOnSend;
private Boolean disableTimeStampsByDefault;
private Boolean dispatchAsync;
private Boolean nestedMapAndListEnabled;
private Boolean useCompression;
private Boolean alwaysSessionAsync;
private Boolean alwaysSyncSend;
private Integer warnAboutUnstartedConnectionTimeout;

private final Map<String, Object> sinkOptionProps = new HashMap<>();

public static final Option<String> HOST =
Options.key("host")
.stringType()
.noDefaultValue()
.withDescription("the default host to use for connections");

public static final Option<Integer> PORT =
Options.key("port")
.intType()
.noDefaultValue()
.withDescription("the default port to use for connections");
public class ActivemqSinkOptions implements Serializable {

public static final Option<String> USERNAME =
Options.key("username")
Expand Down Expand Up @@ -106,29 +62,6 @@ public class ActivemqConfig implements Serializable {
.noDefaultValue()
.withDescription("Sets the JMS clientID to use for the connection.");

public static final Option<Boolean> COPY_MESSAGE_ON_SEND =
Options.key("copy_message_on_send")
.booleanType()
.noDefaultValue()
.withDescription(
"Should a JMS message be copied to a new JMS Message object as part of the send() method in JMS. "
+ "This is enabled by default to be compliant with the JMS specification. "
+ "For a performance boost set to false if you do not mutate JMS messages after they are sent.");

public static final Option<Boolean> DISABLE_TIMESTAMP_BY_DEFAULT =
Options.key("disable_timeStamps_by_default")
.booleanType()
.noDefaultValue()
.withDescription(
"Sets whether or not timestamps on messages should be disabled or not. "
+ "For a small performance boost set to false.");

public static final Option<Boolean> USE_COMPRESSION =
Options.key("use_compression")
.booleanType()
.noDefaultValue()
.withDescription("Enables the use of compression on the message’s body.");

public static final Option<Boolean> ALWAYS_SESSION_ASYNC =
Options.key("always_session_async")
.booleanType()
Expand Down Expand Up @@ -189,54 +122,4 @@ public class ActivemqConfig implements Serializable {
.withDescription(
"Controls whether message expiration checking is done in each "
+ "MessageConsumer prior to dispatching a message.");

public ActivemqConfig(Config config) {
this.host = config.getString(HOST.key());
this.port = config.getInt(PORT.key());
this.queueName = config.getString(QUEUE_NAME.key());
this.uri = config.getString(URI.key());
if (config.hasPath(USERNAME.key())) {
this.username = config.getString(USERNAME.key());
}
if (config.hasPath(PASSWORD.key())) {
this.password = config.getString(PASSWORD.key());
}
if (config.hasPath(CHECK_FOR_DUPLICATE.key())) {
this.checkForDuplicate = config.getBoolean(CHECK_FOR_DUPLICATE.key());
}
if (config.hasPath(CLIENT_ID.key())) {
this.clientID = config.getString(CLIENT_ID.key());
}
if (config.hasPath(COPY_MESSAGE_ON_SEND.key())) {
this.copyMessageOnSend = config.getBoolean(COPY_MESSAGE_ON_SEND.key());
}
if (config.hasPath(DISABLE_TIMESTAMP_BY_DEFAULT.key())) {
this.disableTimeStampsByDefault = config.getBoolean(DISABLE_TIMESTAMP_BY_DEFAULT.key());
}
if (config.hasPath(USE_COMPRESSION.key())) {
this.useCompression = config.getBoolean(USE_COMPRESSION.key());
}
if (config.hasPath(ALWAYS_SESSION_ASYNC.key())) {
this.alwaysSessionAsync = config.getBoolean(ALWAYS_SESSION_ASYNC.key());
}
if (config.hasPath(ALWAYS_SYNC_SEND.key())) {
this.alwaysSyncSend = config.getBoolean(ALWAYS_SYNC_SEND.key());
}
if (config.hasPath(CLOSE_TIMEOUT.key())) {
this.closeTimeout = config.getInt(CLOSE_TIMEOUT.key());
}
if (config.hasPath(DISPATCH_ASYNC.key())) {
this.dispatchAsync = config.getBoolean(DISPATCH_ASYNC.key());
}
if (config.hasPath(NESTED_MAP_AND_LIST_ENABLED.key())) {
this.nestedMapAndListEnabled = config.getBoolean(NESTED_MAP_AND_LIST_ENABLED.key());
}
if (config.hasPath(WARN_ABOUT_UNSTARTED_CONNECTION_TIMEOUT.key())) {
this.warnAboutUnstartedConnectionTimeout =
config.getInt(WARN_ABOUT_UNSTARTED_CONNECTION_TIMEOUT.key());
}
}

@VisibleForTesting
public ActivemqConfig() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,18 @@

import com.google.auto.service.AutoService;

import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.ALWAYS_SESSION_ASYNC;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.ALWAYS_SYNC_SEND;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.CHECK_FOR_DUPLICATE;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.CLIENT_ID;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.CLOSE_TIMEOUT;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.COPY_MESSAGE_ON_SEND;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.DISABLE_TIMESTAMP_BY_DEFAULT;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.DISPATCH_ASYNC;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.HOST;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.NESTED_MAP_AND_LIST_ENABLED;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.PASSWORD;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.PORT;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.QUEUE_NAME;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.URI;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.USERNAME;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.USE_COMPRESSION;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.WARN_ABOUT_UNSTARTED_CONNECTION_TIMEOUT;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.ALWAYS_SESSION_ASYNC;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.ALWAYS_SYNC_SEND;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.CHECK_FOR_DUPLICATE;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.CLIENT_ID;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.CLOSE_TIMEOUT;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.DISPATCH_ASYNC;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.NESTED_MAP_AND_LIST_ENABLED;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.PASSWORD;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.QUEUE_NAME;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.URI;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.USERNAME;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSinkOptions.WARN_ABOUT_UNSTARTED_CONNECTION_TIMEOUT;

@AutoService(Factory.class)
public class ActivemqSinkFactory implements TableSinkFactory {
Expand All @@ -57,13 +52,8 @@ public OptionRule optionRule() {
.required(QUEUE_NAME, URI)
.bundled(USERNAME, PASSWORD)
.optional(
HOST,
PORT,
CLIENT_ID,
CHECK_FOR_DUPLICATE,
COPY_MESSAGE_ON_SEND,
DISABLE_TIMESTAMP_BY_DEFAULT,
USE_COMPRESSION,
ALWAYS_SESSION_ASYNC,
ALWAYS_SYNC_SEND,
CLOSE_TIMEOUT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.apache.seatunnel.format.json.JsonSerializationSchema;

import java.util.Optional;

public class ActivemqSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
private ActivemqClient activeMQClient;

Expand All @@ -42,11 +40,6 @@ public void write(SeaTunnelRow element) {
activeMQClient.write(serializationSchema.serialize(element));
}

@Override
public Optional prepareCommit() {
return Optional.empty();
}

@Override
public void close() {
if (activeMQClient != null) {
Expand Down

0 comments on commit a79618a

Please sign in to comment.