Skip to content

Commit

Permalink
Merge pull request #351 from DedunuKarunarathne/master
Browse files Browse the repository at this point in the history
RabbitMQ Proxy level throttling implementation
  • Loading branch information
DedunuKarunarathne authored Oct 16, 2024
2 parents 39c1d8a + 3ae9fcf commit 6bc1900
Show file tree
Hide file tree
Showing 3 changed files with 266 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/**
* Copyright (c) 2024, WSO2 LLC. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 LLC. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.axis2.transport.rabbitmq;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.util.Map;

/**
* The {@code RabbitMQConfigUtils} class is responsible for retrieving
* RabbitMQ configuration properties, including throttle limit,
* throttle mode, and throttle time unit.
*/
public class RabbitMQConfigUtils {

private static final Log log = LogFactory.getLog(RabbitMQConfigUtils.class);

/**
*
* Retrieves the throttle count from rabbitMQProperties.
*
* @param rabbitMQProperties the map containing the configuration properties
* @return the throttle limit, defaulting to 60
*/
public static int getThrottleCount(Map<String, String> rabbitMQProperties) {
String throttleCountStr = rabbitMQProperties.get(RabbitMQConstants.RABBITMQ_PROXY_THROTTLE_COUNT);
int throttleCount = RabbitMQConstants.RABBITMQ_PROXY_DEFAULT_THROTTLE_LIMIT; // Default value

if (throttleCountStr != null) {
try {
throttleCount = Integer.parseInt(throttleCountStr);
if (throttleCount <= 0) {
log.error("Throttle limit value is zero or negative, using default: "
+ RabbitMQConstants.RABBITMQ_PROXY_DEFAULT_THROTTLE_LIMIT);
throttleCount = RabbitMQConstants.RABBITMQ_PROXY_DEFAULT_THROTTLE_LIMIT;
}
} catch (NumberFormatException e) {
log.error("Invalid throttle limit value '" + throttleCountStr + "', using default: "
+ RabbitMQConstants.RABBITMQ_PROXY_DEFAULT_THROTTLE_LIMIT, e);
}
} else {
log.warn("Throttle count property is not set, using default: "
+ RabbitMQConstants.RABBITMQ_PROXY_DEFAULT_THROTTLE_LIMIT);
}

return throttleCount;
}

/**
* Retrieves the throttle mode from rabbitMQProperties.
*
* @param rabbitMQProperties the map containing the configuration properties
* @return the throttle mode as a string, defaulting to FIXED_INTERVAL
*/
public static RabbitMQConstants.ThrottleMode getThrottleMode(Map<String, String> rabbitMQProperties) {
String throttleModeStr = rabbitMQProperties.get(RabbitMQConstants.RABBITMQ_PROXY_THROTTLE_MODE);
RabbitMQConstants.ThrottleMode throttleMode = null;

if (throttleModeStr != null) {
throttleMode = RabbitMQConstants.ThrottleMode.fromString(throttleModeStr);
}

if (throttleMode == null) {
log.error("Invalid or missing throttle mode: " + throttleModeStr + ", using default: "
+ RabbitMQConstants.ThrottleMode.FIXED_INTERVAL);
throttleMode = RabbitMQConstants.ThrottleMode.FIXED_INTERVAL; // default mode
}

return throttleMode;
}

/**
* Retrieves the throttle time unit from rabbitMQProperties.
*
* @param rabbitMQProperties the map containing the configuration properties
* @return the throttle time unit as a string, defaulting to MINUTE
*/
public static RabbitMQConstants.ThrottleTimeUnit getThrottleTimeUnit(Map<String, String> rabbitMQProperties) {
String timeUnitStr = rabbitMQProperties.get(RabbitMQConstants.RABBITMQ_PROXY_THROTTLE_TIME_UNIT);
RabbitMQConstants.ThrottleTimeUnit timeUnit = null;

if (timeUnitStr != null) {
timeUnit = RabbitMQConstants.ThrottleTimeUnit.fromString(timeUnitStr);
}

if (timeUnit == null) {
log.error("Invalid or missing throttle time unit: " + timeUnitStr + ", using default: "
+ RabbitMQConstants.ThrottleTimeUnit.MINUTE);
timeUnit = RabbitMQConstants.ThrottleTimeUnit.MINUTE;
}

return timeUnit;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,45 @@ public class RabbitMQConstants {
public static final String QUEUE = "queue";
public static final String EXCHANGE = "exchange";

/**
* Throttling related constants
*/
public static final String RABBITMQ_PROXY_THROTTLE_ENABLED = "rabbitmq.proxy.throttle.enabled";
public static final String RABBITMQ_PROXY_THROTTLE_MODE = "rabbitmq.proxy.throttle.mode";
public static final String RABBITMQ_PROXY_THROTTLE_TIME_UNIT = "rabbitmq.proxy.throttle.timeUnit";
public static final String RABBITMQ_PROXY_THROTTLE_COUNT = "rabbitmq.proxy.throttle.count";
public static final int RABBITMQ_PROXY_DEFAULT_THROTTLE_LIMIT = 60;

/**
* Throttle mode
*/
public enum ThrottleMode {
FIXED_INTERVAL,
BATCH;

public static ThrottleMode fromString(String mode) {
try {
return ThrottleMode.valueOf(mode.toUpperCase());
} catch (IllegalArgumentException e) {
return null;
}
}
}
/**
* Throttle time unit
*/
public enum ThrottleTimeUnit {
MINUTE,
HOUR,
DAY;

public static ThrottleTimeUnit fromString(String unit) {
try {
return ThrottleTimeUnit.valueOf(unit.toUpperCase());
} catch (IllegalArgumentException e) {
return null;
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.commons.lang.BooleanUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.math.NumberUtils;
import org.apache.commons.lang.time.DateUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

Expand Down Expand Up @@ -90,6 +91,14 @@ private class MessageListenerTask implements Runnable, Consumer {
private long maxDeadLetteredCount;
private long requeueDelay;

// Throttling variables
private boolean isThrottlingEnabled;
private RabbitMQConstants.ThrottleMode throttleMode;
private RabbitMQConstants.ThrottleTimeUnit throttleTimeUnit;
private int throttleCount;
private long consumptionStartedTime;
private int consumedMessageCount = 0;

private MessageListenerTask() throws IOException {
this.channel = connection.createChannel();
((Recoverable) this.channel).addRecoveryListener(new RabbitMQRecoveryListener());
Expand Down Expand Up @@ -139,6 +148,15 @@ private void initConsumer() throws IOException {
autoAck = BooleanUtils.toBooleanDefaultIfNull(BooleanUtils.toBooleanObject(rabbitMQProperties
.get(RabbitMQConstants.QUEUE_AUTO_ACK)), true);

// Get throttle configurations if throttling is enabled
isThrottlingEnabled = Boolean.parseBoolean(rabbitMQProperties.getOrDefault(
RabbitMQConstants.RABBITMQ_PROXY_THROTTLE_ENABLED, "false"));
if (isThrottlingEnabled) {
this.throttleMode = RabbitMQConfigUtils.getThrottleMode(rabbitMQProperties);
this.throttleTimeUnit = RabbitMQConfigUtils.getThrottleTimeUnit(rabbitMQProperties);
this.throttleCount = RabbitMQConfigUtils.getThrottleCount(rabbitMQProperties);
}

if (StringUtils.isNotEmpty(consumerTag)) {
channel.basicConsume(queueName, autoAck, consumerTag, this);
} else {
Expand Down Expand Up @@ -227,6 +245,61 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
throws IOException {
AcknowledgementMode acknowledgementMode =
rabbitMQMessageReceiver.processThroughAxisEngine(properties, body);

if (isThrottlingEnabled) {
try {
switch (throttleMode) {
case FIXED_INTERVAL: {
long throttleSleepDelay = getSleepDelay();
if (log.isDebugEnabled()) {
log.debug("Sleeping " + throttleSleepDelay
+ " ms with Fixed-Interval throttling for service :" + serviceName);
}
Thread.sleep(throttleSleepDelay);
break;
}
case BATCH: {
if (consumedMessageCount == 0) {
consumptionStartedTime = System.currentTimeMillis();
if (log.isDebugEnabled()) {
log.debug("Batch throttling started at " + consumptionStartedTime
+ " for service :" + serviceName);
}
}

consumedMessageCount++;
if (consumedMessageCount >= throttleCount) {
long consumptionDuration = System.currentTimeMillis() - consumptionStartedTime;
// consumed messages have exceeded the defined count
long remainingDuration = getRemainingDuration(consumptionDuration);
if (remainingDuration >= 0) {
// if time is remaining, we need to sleep while it exceeds
if (log.isDebugEnabled()) {
log.debug("Sleeping " + remainingDuration
+ " ms with Batch throttling for service :" + serviceName);
}
Thread.sleep(remainingDuration);
}
consumedMessageCount = 0;
}
if (log.isDebugEnabled()) {
log.debug("Consumed Message Count per min: " + consumedMessageCount);
}
break;
}
default:
throw new AxisRabbitMQException("Invalid Throttling mode " + throttleMode
+ " specified for service : " + serviceName);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Error in sleeping with " + throttleMode + " throttling", e);
} catch (AxisRabbitMQException e) {
log.error("Invalid Throttling mode " + throttleMode + " specified for service : " + serviceName,
e);
}
}

switch (acknowledgementMode) {
case REQUEUE_TRUE:
try {
Expand Down Expand Up @@ -322,6 +395,46 @@ public void close() {
channel = null;
connection = null;
}

private long getSleepDelay() {
long sleepDelay;
switch (throttleTimeUnit) {
case MINUTE:
sleepDelay = DateUtils.MILLIS_PER_MINUTE / throttleCount;
break;
case HOUR:
sleepDelay = DateUtils.MILLIS_PER_HOUR / throttleCount;
break;
case DAY:
sleepDelay = DateUtils.MILLIS_PER_DAY / throttleCount;
break;
default:
log.error("Unrecognized throttle time unit, defaulting to MINUTE.");
sleepDelay = DateUtils.MILLIS_PER_MINUTE / throttleCount;
break;
}
return sleepDelay;
}

private long getRemainingDuration(long consumptionDuration) {
long remainingDuration;

switch (throttleTimeUnit) {
case HOUR:
remainingDuration = DateUtils.MILLIS_PER_HOUR - consumptionDuration;
break;
case MINUTE:
remainingDuration = DateUtils.MILLIS_PER_MINUTE - consumptionDuration;
break;
case DAY:
remainingDuration = DateUtils.MILLIS_PER_DAY - consumptionDuration;
break;
default:
log.error("Unrecognized throttle time unit, defaulting to MINUTE.");
remainingDuration = DateUtils.MILLIS_PER_MINUTE - consumptionDuration;
}
return remainingDuration;
}
}

/**
Expand Down

0 comments on commit 6bc1900

Please sign in to comment.