Skip to content

Commit

Permalink
ARTEMIS-5250 Add sender state to the management metrics for federation
Browse files Browse the repository at this point in the history
Adds management and metrics to the outgoing AMQP senders created by a
remote peer that has message federation configured from the local broker
including a view for the remote policies that tracks number of sent
messages for all producers. Also adds a top level federation management
view that shows counts for incoming and outgoing messages for all
federation consumer and producers.

Add a more declarative state handling API for initialization, start,
stop and shutdown to federation resources and the broker connection API
to make management resource create and cleanup more reliable and robust
to avoid and leaked management objects when reconnection attempts stop
or broker connections are manually stopped or are stopped by
configuration updates or configuration remove.
  • Loading branch information
tabish121 authored and gemmellr committed Jan 21, 2025
1 parent 7c99a60 commit d130d6b
Show file tree
Hide file tree
Showing 45 changed files with 4,268 additions and 1,143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2808,11 +2808,18 @@ static void getMessagesReceived(Object source) {
@LogMessage(id = 601789, value = "User {} is getting the number of messages received on target resource: {}", level = LogMessage.Level.INFO)
void getMessagesReceived(String user, Object source);


static void copyMessage(Object source, Object... args) {
BASE_LOGGER.copyMessage(getCaller(), source, parametersList(args));
}

@LogMessage(id = 601790, value = "User {} is copying a message to another queue on target resource: {} {}", level = LogMessage.Level.INFO)
void copyMessage(String user, Object source, String args);

static void getMessagesSent(Object source) {
BASE_LOGGER.getMessagesSent(getCaller(), source);
}

@LogMessage(id = 601791, value = "User {} is getting the number of messages sent on target resource: {}", level = LogMessage.Level.INFO)
void getMessagesSent(String user, Object source);

}
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,66 @@ public int getConnectionTimeout() {
return connectionTimeout;
}

@Override
public synchronized void initialize() throws Exception {
try {
server.registerBrokerConnection(this);
server.getManagementService().registerBrokerConnection(this);

if (brokerConnectConfiguration != null && brokerConnectConfiguration.getConnectionElements() != null) {
for (AMQPBrokerConnectionElement connectionElement : brokerConnectConfiguration.getConnectionElements()) {
final AMQPBrokerConnectionAddressType elementType = connectionElement.getType();

if (elementType == AMQPBrokerConnectionAddressType.FEDERATION) {
installFederation((AMQPFederatedBrokerConnectionElement) connectionElement, server);
}
}
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}

@Override
public synchronized void start() throws Exception {
if (!started) {
started = true;
server.getConfiguration().registerBrokerPlugin(this);

try {
if (brokerConnectConfiguration != null && brokerConnectConfiguration.getConnectionElements() != null) {
for (AMQPBrokerConnectionElement connectionElement : brokerConnectConfiguration.getConnectionElements()) {
final AMQPBrokerConnectionAddressType elementType = connectionElement.getType();

// Preserve old behavior where mirror controller wasn't installed until the first time
// the broker connection was started. This won't add a new controller if stopped and then
// restarted, once added a controller cannot currently be rmoved.
if (elementType == AMQPBrokerConnectionAddressType.MIRROR) {
installMirrorController((AMQPMirrorBrokerConnectionElement) connectionElement, server);
}
}
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}

if (brokerFederation != null) {
try {
brokerFederation.start();
} catch (ActiveMQException e) {
logger.warn("Error caught while starting federation instance.", e);
}
}

connectExecutor.execute(() -> doConnect());
}
}

@Override
public synchronized void stop() {
if (started) {
started = false;
server.getConfiguration().unRegisterBrokerPlugin(this);

if (protonRemotingConnection != null) {
protonRemotingConnection.fail(new ActiveMQException("Stopping Broker Connection"));
Expand All @@ -240,36 +296,28 @@ public synchronized void stop() {
brokerFederation.stop();
} catch (ActiveMQException e) {
logger.debug("Error caught while stopping federation instance.", e);
} finally {
brokerFederation = null;
}
}
}
}

@Override
public synchronized void start() throws Exception {
if (!started) {
started = true;

server.getConfiguration().registerBrokerPlugin(this);
try {
if (brokerConnectConfiguration != null && brokerConnectConfiguration.getConnectionElements() != null) {
for (AMQPBrokerConnectionElement connectionElement : brokerConnectConfiguration.getConnectionElements()) {
final AMQPBrokerConnectionAddressType elementType = connectionElement.getType();

if (elementType == AMQPBrokerConnectionAddressType.MIRROR) {
installMirrorController((AMQPMirrorBrokerConnectionElement) connectionElement, server);
} else if (elementType == AMQPBrokerConnectionAddressType.FEDERATION) {
installFederation((AMQPFederatedBrokerConnectionElement) connectionElement, server);
}
}
public synchronized void shutdown() throws Exception {
try {
stop();
} finally {
if (brokerFederation != null) {
try {
brokerFederation.shutdown();
} catch (ActiveMQException e) {
logger.debug("Error caught while shutting down federation instance.", e);
} finally {
brokerFederation = null;
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
return;
}
connectExecutor.execute(() -> doConnect());

server.unregisterBrokerConnection(this);
server.getManagementService().unregisterBrokerConnection(getName());
}
}

Expand Down Expand Up @@ -470,9 +518,9 @@ private void doConnect() {
null,
requiredOfferedCapabilities);
} else if (connectionElement.getType() == AMQPBrokerConnectionAddressType.FEDERATION) {
// Starting the Federation triggers rebuild of federation links
// Signal the Federation instance to start a rebuild of federation links
// based on current broker state.
brokerFederation.handleConnectionRestored(protonRemotingConnection.getAmqpConnection(), sessionContext);
brokerFederation.connectionRestored(protonRemotingConnection.getAmqpConnection(), sessionContext);
}
}
}
Expand Down Expand Up @@ -614,6 +662,9 @@ private static Queue checkCurrentMirror(AMQPBrokerConnection brokerConnection,
private void installFederation(AMQPFederatedBrokerConnectionElement connectionElement, ActiveMQServer server) throws Exception {
final AMQPFederationSource federation = new AMQPFederationSource(connectionElement.getName(), connectionElement.getProperties(), this);

// Ensure basic federation resources are initialized before adding policies and moving on
federation.initialize();

// Broker federation configuration for local resources that should be receiving from remote resources
// when there is local demand.
final Set<AMQPFederationAddressPolicyElement> localAddressPolicies = connectionElement.getLocalAddressPolicies();
Expand Down Expand Up @@ -649,7 +700,6 @@ private void installFederation(AMQPFederatedBrokerConnectionElement connectionEl
}

this.brokerFederation = federation;
this.brokerFederation.start();
}

private void connectReceiver(ActiveMQProtonRemotingConnection protonRemotingConnection,
Expand Down Expand Up @@ -953,7 +1003,7 @@ public Consumer init(ProtonServerSenderContext senderContext) throws Exception {
}

@Override
public void close() throws Exception {
public void close(boolean remoteClose) throws Exception {

}

Expand Down Expand Up @@ -1029,7 +1079,7 @@ private void redoConnection() {

if (federation != null) {
try {
federation.handleConnectionDropped();
federation.connectionInterrupted();
} catch (ActiveMQException e) {
logger.debug("Broker Federation on connection {} threw an error on stop before connection attempt", getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ public int getConfiguredConnectionsCount() {
private void createBrokerConnection(AMQPBrokerConnectConfiguration configuration, boolean start) throws Exception {
AMQPBrokerConnection amqpBrokerConnection = new AMQPBrokerConnection(this, configuration, protonProtocolManagerFactory, server);
amqpBrokerConnections.put(configuration.getName(), amqpBrokerConnection);
server.registerBrokerConnection(amqpBrokerConnection);
server.getManagementService().registerBrokerConnection(amqpBrokerConnection);

amqpBrokerConnection.initialize();

if (start) {
amqpBrokerConnection.start();
Expand All @@ -110,9 +110,13 @@ public void updateConfiguration(List<AMQPBrokerConnectConfiguration> configurati
final List<AMQPBrokerConnectConfiguration> updatedConfigurations =
configurations != null ? configurations : Collections.EMPTY_LIST;

// We want to shutdown all broker connections before starting any new ones just to ensure
// we do not have any overlapping connections to the same broker from old to new configurations.
final Map<AMQPBrokerConnectConfiguration, Boolean> newConnections = new HashMap<>();

// Find any updated configurations and stop / and recreate as needed.
for (AMQPBrokerConnectConfiguration configuration : updatedConfigurations) {
final AMQPBrokerConnectConfiguration previous = amqpConnectionsConfig.put(configuration.getName(), configuration);
final AMQPBrokerConnectConfiguration previous = amqpConnectionsConfig.get(configuration.getName());

if (previous == null || !configuration.equals(previous)) {
// We don't currently allow updating broker connections with mirror configurations
Expand All @@ -127,6 +131,8 @@ public void updateConfiguration(List<AMQPBrokerConnectConfiguration> configurati
logger.info("Skipping update of broker connection {} which contains a mirror " +
"configuration which are not reloadable.", previous.getName());
continue;
} else {
amqpConnectionsConfig.put(configuration.getName(), configuration);
}

// If this was an update and the connection is active meaning the manager is
Expand All @@ -139,16 +145,11 @@ public void updateConfiguration(List<AMQPBrokerConnectConfiguration> configurati
configuration.isAutostart() : connection.isStarted() || configuration.isAutostart();

if (connection != null) {
try {
connection.stop();
} finally {
server.unregisterBrokerConnection(connection);
server.getManagementService().unregisterBrokerConnection(connection.getName());
}
connection.shutdown();
}

if (started) {
createBrokerConnection(configuration, autoStart);
newConnections.put(configuration, autoStart);
}
}
}
Expand Down Expand Up @@ -181,14 +182,14 @@ public void updateConfiguration(List<AMQPBrokerConnectConfiguration> configurati
final AMQPBrokerConnection connection = amqpBrokerConnections.remove(toRemove.getName());

if (connection != null) {
try {
connection.stop();
} finally {
server.unregisterBrokerConnection(connection);
server.getManagementService().unregisterBrokerConnection(connection.getName());
}
connection.shutdown();
}
}

// Start all new or updated broker connections now that all old connection have been shutdown.
for (Map.Entry<AMQPBrokerConnectConfiguration, Boolean> entry : newConnections.entrySet()) {
createBrokerConnection(entry.getKey(), entry.getValue());
}
}

private boolean containsMirrorConfiguration(AMQPBrokerConnectConfiguration configuration) {
Expand All @@ -210,12 +211,7 @@ public void stop() throws Exception {
started = false;
try {
for (AMQPBrokerConnection connection : amqpBrokerConnections.values()) {
try {
connection.stop();
} finally {
server.unregisterBrokerConnection(connection);
server.getManagementService().unregisterBrokerConnection(connection.getName());
}
connection.shutdown();
}
} finally {
amqpBrokerConnections.clear();
Expand Down
Loading

0 comments on commit d130d6b

Please sign in to comment.