diff --git a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/AbstractSQSConnector.java b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/AbstractSQSConnector.java index 9209a22..3ee0dda 100644 --- a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/AbstractSQSConnector.java +++ b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/AbstractSQSConnector.java @@ -34,22 +34,33 @@ public abstract class AbstractSQSConnector implements SQSConnector { private final long _receiveCheckIntervalMs; private final boolean _isAsync; + private final int _visibilityTimeoutOnReset; protected AbstractSQSConnector(long receiveCheckIntervalMs) { this(receiveCheckIntervalMs, false); } - protected AbstractSQSConnector(long receiveCheckIntervalMs, boolean isAsync) + protected AbstractSQSConnector(long receiveCheckIntervalMs, boolean isAsync, int visibilityTimeoutOnReset) { _receiveCheckIntervalMs = receiveCheckIntervalMs; _isAsync = isAsync; + _visibilityTimeoutOnReset = visibilityTimeoutOnReset; + } + + protected AbstractSQSConnector(long receiveCheckIntervalMs, boolean isAsync) + { + this(receiveCheckIntervalMs, false, 0); } public boolean isAsync() { return _isAsync; } + public int getVisibilityTimeoutOnReset() { + return _visibilityTimeoutOnReset; + } + public void sendMessage(NevadoDestination destination, NevadoMessage message) throws JMSException { if (destination == null) @@ -127,7 +138,7 @@ public void resetMessage(NevadoMessage message) throws JMSException { "Did this come from an SQS queue?"); } SQSQueue sqsQueue = getSQSQueue(message.getNevadoDestination()); - sqsQueue.setMessageVisibilityTimeout(sqsReceiptHandle, 0); + sqsQueue.setMessageVisibilityTimeout(sqsReceiptHandle, _visibilityTimeoutOnReset); // Customize message visibility timeout } /** @@ -179,7 +190,7 @@ protected SQSMessage receiveSQSMessage(NevadoConnection connection, NevadoDestin if (sqsMessage != null && !connection.isRunning()) { // Connection was stopped while the REST call to SQS was being made try { - sqsQueue.setMessageVisibilityTimeout(sqsMessage.getReceiptHandle(), 0); // Make it immediately available to the next requestor + sqsQueue.setMessageVisibilityTimeout(sqsMessage.getReceiptHandle(), _visibilityTimeoutOnReset); // Customize visibility timeout } catch (JMSException e) { String exMessage = "Unable to reset visibility timeout for message: " + e.getMessage(); _log.warn(exMessage, e); // Non-fatal. Just means the message will disappear until the visibility timeout expires. diff --git a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnector.java b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnector.java index 9ae5274..78557f7 100644 --- a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnector.java +++ b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnector.java @@ -55,7 +55,11 @@ public AmazonAwsSQSConnector(String awsAccessKey, String awsSecretKey, boolean i } public AmazonAwsSQSConnector(String awsAccessKey, String awsSecretKey, boolean isSecure, long receiveCheckIntervalMs, boolean isAsync) { - super(receiveCheckIntervalMs, isAsync); + this(awsAccessKey, awsSecretKey, isSecure, receiveCheckIntervalMs, isAsync, 0); + } + + public AmazonAwsSQSConnector(String awsAccessKey, String awsSecretKey, boolean isSecure, long receiveCheckIntervalMs, boolean isAsync, int visibilityTimeoutOnReset) { + super(receiveCheckIntervalMs, isAsync, visibilityTimeoutOnReset); AWSCredentials awsCredentials = new BasicAWSCredentials(awsAccessKey, awsSecretKey); ClientConfiguration clientConfiguration = new ClientConfiguration(); String proxyHost = System.getProperty("http.proxyHost"); diff --git a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnectorFactory.java b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnectorFactory.java index 1f6e6cd..6eb4a7c 100644 --- a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnectorFactory.java +++ b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnectorFactory.java @@ -10,11 +10,12 @@ */ public class AmazonAwsSQSConnectorFactory extends AbstractSQSConnectorFactory { protected boolean _useAsyncSend = false; + protected int _visibilityTimeoutOnReset = 0; @Override public AmazonAwsSQSConnector getInstance(String awsAccessKey, String awsSecretKey, String awsSQSEndpoint, String awsSNSEndpoint) { AmazonAwsSQSConnector amazonAwsSQSConnector = new AmazonAwsSQSConnector(awsAccessKey, awsSecretKey, _isSecure, - _receiveCheckIntervalMs, _useAsyncSend); + _receiveCheckIntervalMs, _useAsyncSend, _visibilityTimeoutOnReset); if (StringUtils.isNotEmpty(awsSQSEndpoint)) { amazonAwsSQSConnector.getAmazonSQS().setEndpoint(awsSQSEndpoint); } @@ -31,4 +32,12 @@ public void setUseAsyncSend(boolean useAsyncSend) { public boolean isUseAsyncSend() { return _useAsyncSend; } + + public int getVisibilityTimeoutOnReset() { + return _visibilityTimeoutOnReset; + } + + public void setVisibilityTimeoutOnReset(int visibilityTimeoutOnReset) { + _visibilityTimeoutOnReset = visibilityTimeoutOnReset; + } }