Skip to content

Commit

Permalink
ARTEMIS-4510 - Add auto-create-destination logic to diverts
Browse files Browse the repository at this point in the history
  • Loading branch information
AntonRoskvist committed Apr 8, 2024
1 parent 774d321 commit 87fc689
Show file tree
Hide file tree
Showing 13 changed files with 292 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,8 @@ public static String getDefaultHapolicyBackupStrategy() {

private static final boolean DEFAULT_MANAGEMENT_MESSAGE_RBAC = false;

private static final boolean DEFAULT_DIVERT_REUSE_USER_SESSION = false;

/**
* If true then the ActiveMQ Artemis Server will make use of any Protocol Managers that are in available on the classpath. If false then only the core protocol will be available, unless in Embedded mode where users can inject their own Protocol Managers.
*/
Expand Down Expand Up @@ -1918,4 +1920,9 @@ public static String getManagementRbacPrefix() {
public static boolean getManagementMessagesRbac() {
return DEFAULT_MANAGEMENT_MESSAGE_RBAC;
}

public static boolean isDefaultDivertReuseUserSession() {
return DEFAULT_DIVERT_REUSE_USER_SESSION;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1739,6 +1739,18 @@ void createDivert(@Parameter(name = "name", desc = "Name of the divert") String
@Parameter(name = "transformerPropertiesAsJSON", desc = "Configuration properties of the divert's transformer in JSON form") String transformerPropertiesAsJSON,
@Parameter(name = "routingType", desc = "How should the routing-type on the diverted messages be set?") String routingType) throws Exception;

@Operation(desc = "Create a Divert", impact = MBeanOperationInfo.ACTION)
void createDivert(@Parameter(name = "name", desc = "Name of the divert") String name,
@Parameter(name = "routingName", desc = "Routing name of the divert") String routingName,
@Parameter(name = "address", desc = "Address to divert from") String address,
@Parameter(name = "forwardingAddress", desc = "Address to divert to") String forwardingAddress,
@Parameter(name = "exclusive", desc = "Is the divert exclusive?") boolean exclusive,
@Parameter(name = "filterString", desc = "Filter of the divert") String filterString,
@Parameter(name = "transformerClassName", desc = "Class name of the divert's transformer") String transformerClassName,
@Parameter(name = "transformerProperties", desc = "Configuration properties of the divert's transformer") Map<String, String> transformerProperties,
@Parameter(name = "routingType", desc = "How should the routing-type on the diverted messages be set?") String routingType,
@Parameter(name = "reuseUserSession", desc = "Should the divert route messages using the senders session?") boolean reuseUserSession) throws Exception;

/**
* update a divert
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public class DivertConfiguration implements Serializable, EncodingSupport {

private ComponentConfigurationRoutingType routingType = ComponentConfigurationRoutingType.valueOf(ActiveMQDefaultConfiguration.getDefaultDivertRoutingType());

private boolean reuseUserSession = ActiveMQDefaultConfiguration.isDefaultDivertReuseUserSession();

public DivertConfiguration() {
}

Expand Down Expand Up @@ -82,6 +84,10 @@ public ComponentConfigurationRoutingType getRoutingType() {
return routingType;
}

public boolean isReuseUserSession() {
return reuseUserSession;
}

/**
* @param name the name to set
*/
Expand Down Expand Up @@ -150,6 +156,14 @@ public DivertConfiguration setRoutingType(final ComponentConfigurationRoutingTyp
return this;
}

/**
* @param reuseUserSession Should the divert route messages using the senders session
*/
public DivertConfiguration setReuseUserSession(final boolean reuseUserSession) {
this.reuseUserSession = reuseUserSession;
return this;
}

@Override
public int hashCode() {
final int prime = 31;
Expand All @@ -162,6 +176,7 @@ public int hashCode() {
result = prime * result + ((routingName == null) ? 0 : routingName.hashCode());
result = prime * result + ((transformerConfiguration == null) ? 0 : transformerConfiguration.hashCode());
result = prime * result + ((routingType == null) ? 0 : routingType.hashCode());
result = prime * result + (reuseUserSession ? 1231 : 1237);
return result;
}

Expand Down Expand Up @@ -211,6 +226,9 @@ public boolean equals(Object obj) {
return false;
} else if (!routingType.equals(other.routingType))
return false;
if (reuseUserSession != other.reuseUserSession) {
return false;
}
return true;
}

Expand Down Expand Up @@ -261,7 +279,7 @@ public void encode(ActiveMQBuffer buffer) {

@Override
public String toString() {
return "DivertConfiguration{" + "name='" + name + '\'' + ", routingName='" + routingName + '\'' + ", address='" + address + '\'' + ", forwardingAddress='" + forwardingAddress + '\'' + ", exclusive=" + exclusive + ", filterString='" + filterString + '\'' + ", transformerConfiguration=" + transformerConfiguration + '}';
return "DivertConfiguration{" + "name='" + name + '\'' + ", routingName='" + routingName + '\'' + ", address='" + address + '\'' + ", forwardingAddress='" + forwardingAddress + '\'' + ", exclusive=" + exclusive + ", filterString='" + filterString + '\'' + ", transformerConfiguration=" + transformerConfiguration + '\'' + ", reuseUserSession=" + reuseUserSession + '}';
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2829,6 +2829,8 @@ private void parseDivertConfiguration(final Element e, final Configuration mainC

ComponentConfigurationRoutingType routingType = ComponentConfigurationRoutingType.valueOf(getString(e, "routing-type", ActiveMQDefaultConfiguration.getDefaultDivertRoutingType(), COMPONENT_ROUTING_TYPE));

boolean reuseUserSession = getBoolean(e, "reuse-user-session", ActiveMQDefaultConfiguration.isDefaultDivertReuseUserSession());

TransformerConfiguration transformerConfiguration = null;

String filterString = null;
Expand All @@ -2849,7 +2851,7 @@ private void parseDivertConfiguration(final Element e, final Configuration mainC
transformerConfiguration = getTransformerConfiguration(transformerClassName);
}

DivertConfiguration config = new DivertConfiguration().setName(name).setRoutingName(routingName).setAddress(address).setForwardingAddress(forwardingAddress).setExclusive(exclusive).setFilterString(filterString).setTransformerConfiguration(transformerConfiguration).setRoutingType(routingType);
DivertConfiguration config = new DivertConfiguration().setName(name).setRoutingName(routingName).setAddress(address).setForwardingAddress(forwardingAddress).setExclusive(exclusive).setFilterString(filterString).setTransformerConfiguration(transformerConfiguration).setRoutingType(routingType).setReuseUserSession(reuseUserSession);

mainConfig.getDivertConfigurations().add(config);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3669,6 +3669,20 @@ public void createDivert(final String name,
final String transformerClassName,
final Map<String, String> transformerProperties,
final String routingType) throws Exception {
createDivert(name, routingName, address, forwardingAddress, exclusive, filterString, transformerClassName, transformerProperties, routingType, ActiveMQDefaultConfiguration.isDefaultDivertReuseUserSession());
}

@Override
public void createDivert(final String name,
final String routingName,
final String address,
final String forwardingAddress,
final boolean exclusive,
final String filterString,
final String transformerClassName,
final Map<String, String> transformerProperties,
final String routingType,
final boolean reuseUserSession) throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.createDivert(this.server, name, routingName, address, forwardingAddress,
exclusive, filterString, transformerClassName, transformerProperties, routingType);
Expand All @@ -3678,7 +3692,7 @@ public void createDivert(final String name,
clearIO();
try {
TransformerConfiguration transformerConfiguration = transformerClassName == null || transformerClassName.isEmpty() ? null : new TransformerConfiguration(transformerClassName).setProperties(transformerProperties);
DivertConfiguration config = new DivertConfiguration().setName(name).setRoutingName(routingName).setAddress(address).setForwardingAddress(forwardingAddress).setExclusive(exclusive).setFilterString(filterString).setTransformerConfiguration(transformerConfiguration).setRoutingType(ComponentConfigurationRoutingType.valueOf(routingType));
DivertConfiguration config = new DivertConfiguration().setName(name).setRoutingName(routingName).setAddress(address).setForwardingAddress(forwardingAddress).setExclusive(exclusive).setFilterString(filterString).setTransformerConfiguration(transformerConfiguration).setRoutingType(ComponentConfigurationRoutingType.valueOf(routingType)).setReuseUserSession(reuseUserSession);
server.deployDivert(config);
} finally {
blockOnIO();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2934,7 +2934,8 @@ public Divert deployDivert(DivertConfiguration config) throws Exception {

Divert divert = new DivertImpl(sName, sAddress, new SimpleString(config.getForwardingAddress()),
new SimpleString(config.getRoutingName()), config.isExclusive(),
filter, transformer, postOffice, storageManager, config.getRoutingType());
filter, transformer, postOffice, storageManager, config.getRoutingType(),
config.isReuseUserSession());

Binding binding = new DivertBinding(storageManager.generateID(), sAddress, divert);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public class DivertImpl implements Divert {

private volatile ComponentConfigurationRoutingType routingType;

private final boolean reuseUserSession;

public DivertImpl(final SimpleString uniqueName,
final SimpleString address,
final SimpleString forwardAddress,
Expand All @@ -68,7 +70,8 @@ public DivertImpl(final SimpleString uniqueName,
final Transformer transformer,
final PostOffice postOffice,
final StorageManager storageManager,
final ComponentConfigurationRoutingType routingType) {
final ComponentConfigurationRoutingType routingType,
final boolean reuseUserSession) {
this.address = address;

this.setForwardAddress(forwardAddress);
Expand All @@ -88,6 +91,8 @@ public DivertImpl(final SimpleString uniqueName,
this.storageManager = storageManager;

this.routingType = routingType;

this.reuseUserSession = reuseUserSession;
}

@Override
Expand Down Expand Up @@ -140,7 +145,11 @@ public void route(final Message message, final RoutingContext context) throws Ex
copy = message;
}

postOffice.route(copy, new RoutingContextImpl(context.getTransaction()).setReusable(false).setRoutingType(copy.getRoutingType()), false);
if (!reuseUserSession) {
postOffice.route(copy, new RoutingContextImpl(context.getTransaction()).setReusable(false).setRoutingType(copy.getRoutingType()), false);
} else {
postOffice.route(copy, new RoutingContextImpl(context.getTransaction()).setReusable(false).setRoutingType(copy.getRoutingType()).setServerSession(context.getServerSession()), false);
}
}
}

Expand Down Expand Up @@ -233,6 +242,10 @@ public String toString() {
filter +
", transformer=" +
transformer +
", routingType=" +
routingType +
", reuseUserSession=" +
reuseUserSession +
"]";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2661,6 +2661,14 @@
</xsd:documentation>
</xsd:annotation>
</xsd:element>

<xsd:element name="reuse-user-session" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
Should the divert route messages using the senders session
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:all>

<xsd:attribute name="name" type="xsd:ID" use="required">
Expand Down
3 changes: 3 additions & 0 deletions docs/user-manual/configuration-index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1105,6 +1105,9 @@ Default=false
| xref:diverts.adoc#diverting-and-splitting-message-flows[routing-type]
| how to set the routing-type on the diverted message.
Default=`STRIP`

| xref:diverts.adoc#reusing-the-senders-session[reuse-user-session]
| should the divert route messages using the senders session
|===

== address type
Expand Down
48 changes: 48 additions & 0 deletions docs/user-manual/diverts.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,51 @@ Just use a comma separated list in `forwarding-address`, e.g.:
<exclusive>false</exclusive>
</divert>
----

== Reusing the senders session

By default, after a message is received on the diverts `address`, its consequent routing is done by the divert.
This means it happens separate to the original sender, freeing it up to proceed with other tasks.

In some cases it's useful to keep the original session coupled with consequent routing.

An example of this would be to enable automatic creation of forwarded destinations if they are missing,
or being able to roll back message delivery if one of multiple destinations could not accept the message.

This behavior can be configured by setting `reuse-user-session` on the divert, which defaults to `false`.

== Computing destinations at runtime

Using logic in a diverts transformer, it's possible to perform complex routing
by altering properties on individual messages.

A simple example of this:

[,java]
----
@Override
public Message transform(Message message) {
if (message.getBooleanProperty("changeDestination")) {
message.setAddress(message.getStringProperty("newDestination"));
}
return message;
}
----

Here, the computed destination will get used instead of the diverts `forwarding-address`.

[NOTE]
====
There are two important things to consider when computing messages destination where the resulting destination might
not exist on the broker beforehand.
1. When the diverting to a destination that is not already present on the broker, auto-creation configuration of the
destination address applies. This configuration is tested against the sender so `reuse-user-session` needs
to be set to `true`.
2. To be able to create a new destination, the messages `routing-type` has to be known.
The default behavior of the divert is to remove this information from a message before handing it over to the `transformer`
according to the default `routingType = STRIP`.
To make sure the messages end up where they are supposed to, either set the diverts `routingType` or
make sure to add this information in the `transformer` with the messages `setRoutingType()` method.
====
Loading

0 comments on commit 87fc689

Please sign in to comment.