From e5278a022dd5099106dfc82012f39308eae206d6 Mon Sep 17 00:00:00 2001 From: tejas1990 Date: Wed, 25 Jun 2014 09:52:39 +0530 Subject: [PATCH 01/17] Update AbstractSQSConnector.java To add custom visibility timeout for each message in SQS queue --- .../nevado/jms/connector/AbstractSQSConnector.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/AbstractSQSConnector.java b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/AbstractSQSConnector.java index 9209a22..6b5a23b 100644 --- a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/AbstractSQSConnector.java +++ b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/AbstractSQSConnector.java @@ -34,16 +34,18 @@ public abstract class AbstractSQSConnector implements SQSConnector { private final long _receiveCheckIntervalMs; private final boolean _isAsync; + private int _visibilityTimeoutOnReset; protected AbstractSQSConnector(long receiveCheckIntervalMs) { - this(receiveCheckIntervalMs, false); + this(receiveCheckIntervalMs, false, 0); } - protected AbstractSQSConnector(long receiveCheckIntervalMs, boolean isAsync) + protected AbstractSQSConnector(long receiveCheckIntervalMs, boolean isAsync, int visibilityTimeoutOnReset) { _receiveCheckIntervalMs = receiveCheckIntervalMs; _isAsync = isAsync; + _visibilityTimeoutOnReset = visibilityTimeoutOnReset; } public boolean isAsync() { @@ -127,7 +129,7 @@ public void resetMessage(NevadoMessage message) throws JMSException { "Did this come from an SQS queue?"); } SQSQueue sqsQueue = getSQSQueue(message.getNevadoDestination()); - sqsQueue.setMessageVisibilityTimeout(sqsReceiptHandle, 0); + sqsQueue.setMessageVisibilityTimeout(sqsReceiptHandle, _visibilityTimeoutOnReset); //to have custom visibility timeout } /** @@ -179,7 +181,7 @@ protected SQSMessage receiveSQSMessage(NevadoConnection connection, NevadoDestin if (sqsMessage != null && !connection.isRunning()) { // Connection was stopped while the REST call to SQS was being made try { - sqsQueue.setMessageVisibilityTimeout(sqsMessage.getReceiptHandle(), 0); // Make it immediately available to the next requestor + sqsQueue.setMessageVisibilityTimeout(sqsMessage.getReceiptHandle(), _visibilityTimeoutOnReset); // to have a custom visibility timeout } catch (JMSException e) { String exMessage = "Unable to reset visibility timeout for message: " + e.getMessage(); _log.warn(exMessage, e); // Non-fatal. Just means the message will disappear until the visibility timeout expires. From f34294a5d759438f6bae4f91c049f4157f69a81c Mon Sep 17 00:00:00 2001 From: tejas1990 Date: Wed, 25 Jun 2014 09:55:32 +0530 Subject: [PATCH 02/17] Update AmazonAwsSQSConnectorFactory.java Add custom visibility timeout to each message in SQS queue.You can set the value of visibilityTimeoutOnReset in spring configuration. Value is in seconds. --- .../amazonaws/AmazonAwsSQSConnectorFactory.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnectorFactory.java b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnectorFactory.java index 1f6e6cd..2553f4b 100644 --- a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnectorFactory.java +++ b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnectorFactory.java @@ -10,11 +10,12 @@ */ public class AmazonAwsSQSConnectorFactory extends AbstractSQSConnectorFactory { protected boolean _useAsyncSend = false; + protected int visibilityTimeoutOnReset; @Override public AmazonAwsSQSConnector getInstance(String awsAccessKey, String awsSecretKey, String awsSQSEndpoint, String awsSNSEndpoint) { AmazonAwsSQSConnector amazonAwsSQSConnector = new AmazonAwsSQSConnector(awsAccessKey, awsSecretKey, _isSecure, - _receiveCheckIntervalMs, _useAsyncSend); + _receiveCheckIntervalMs, _useAsyncSend, visibilityTimeoutOnReset); if (StringUtils.isNotEmpty(awsSQSEndpoint)) { amazonAwsSQSConnector.getAmazonSQS().setEndpoint(awsSQSEndpoint); } @@ -31,4 +32,12 @@ public void setUseAsyncSend(boolean useAsyncSend) { public boolean isUseAsyncSend() { return _useAsyncSend; } + + public int getVisibilityTimeoutOnReset() { + return visibilityTimeoutOnReset; + } + + public void setVisibilityTimeoutOnReset(int visibilityTimeoutOnReset) { + this.visibilityTimeoutOnReset = visibilityTimeoutOnReset; + } } From 3b3fa8918c165194d804f812c60abb40019d50cd Mon Sep 17 00:00:00 2001 From: tejas1990 Date: Wed, 25 Jun 2014 09:57:48 +0530 Subject: [PATCH 03/17] Update AmazonAwsSQSConnector.java To set custom visibility timeout for each message in SQS queue. --- .../jms/connector/amazonaws/AmazonAwsSQSConnector.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnector.java b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnector.java index 9ae5274..9247d36 100644 --- a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnector.java +++ b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnector.java @@ -50,12 +50,12 @@ public class AmazonAwsSQSConnector extends AbstractSQSConnector { private final AmazonSQS _amazonSQS; private final AmazonSNS _amazonSNS; - public AmazonAwsSQSConnector(String awsAccessKey, String awsSecretKey, boolean isSecure, long receiveCheckIntervalMs) { - this(awsAccessKey, awsSecretKey, isSecure, receiveCheckIntervalMs, false); + public AmazonAwsSQSConnector(String awsAccessKey, String awsSecretKey, boolean isSecure, long receiveCheckIntervalMs, , int visibilityTimeoutOnReset) { + this(awsAccessKey, awsSecretKey, isSecure, receiveCheckIntervalMs, false, visibilityTimeoutOnReset); } - public AmazonAwsSQSConnector(String awsAccessKey, String awsSecretKey, boolean isSecure, long receiveCheckIntervalMs, boolean isAsync) { - super(receiveCheckIntervalMs, isAsync); + public AmazonAwsSQSConnector(String awsAccessKey, String awsSecretKey, boolean isSecure, long receiveCheckIntervalMs, boolean isAsync, int visibilityTimeoutOnReset) { + super(receiveCheckIntervalMs, isAsync, visibilityTimeoutOnReset); AWSCredentials awsCredentials = new BasicAWSCredentials(awsAccessKey, awsSecretKey); ClientConfiguration clientConfiguration = new ClientConfiguration(); String proxyHost = System.getProperty("http.proxyHost"); From 120d14c16ed006f13322fad1a2ad9de4e4136454 Mon Sep 17 00:00:00 2001 From: tejas1990 Date: Wed, 2 Jul 2014 12:35:28 +0530 Subject: [PATCH 04/17] Update AmazonAwsSQSConnectorFactory.java Made changes as per your suggestions. --- .../amazonaws/AmazonAwsSQSConnectorFactory.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnectorFactory.java b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnectorFactory.java index 2553f4b..dc48691 100644 --- a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnectorFactory.java +++ b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnectorFactory.java @@ -10,12 +10,12 @@ */ public class AmazonAwsSQSConnectorFactory extends AbstractSQSConnectorFactory { protected boolean _useAsyncSend = false; - protected int visibilityTimeoutOnReset; + protected int _visibilityTimeoutOnReset; @Override public AmazonAwsSQSConnector getInstance(String awsAccessKey, String awsSecretKey, String awsSQSEndpoint, String awsSNSEndpoint) { AmazonAwsSQSConnector amazonAwsSQSConnector = new AmazonAwsSQSConnector(awsAccessKey, awsSecretKey, _isSecure, - _receiveCheckIntervalMs, _useAsyncSend, visibilityTimeoutOnReset); + _receiveCheckIntervalMs, _useAsyncSend, _visibilityTimeoutOnReset); if (StringUtils.isNotEmpty(awsSQSEndpoint)) { amazonAwsSQSConnector.getAmazonSQS().setEndpoint(awsSQSEndpoint); } @@ -33,11 +33,11 @@ public boolean isUseAsyncSend() { return _useAsyncSend; } - public int getVisibilityTimeoutOnReset() { - return visibilityTimeoutOnReset; + public int get_visibilityTimeoutOnReset() { + return _visibilityTimeoutOnReset; } - public void setVisibilityTimeoutOnReset(int visibilityTimeoutOnReset) { - this.visibilityTimeoutOnReset = visibilityTimeoutOnReset; + public void set_visibilityTimeoutOnReset(int visibilityTimeoutOnReset) { + _visibilityTimeoutOnReset = visibilityTimeoutOnReset; } } From e0d6d7e16351279ad6dfa166bb62dab4d032cc52 Mon Sep 17 00:00:00 2001 From: tejas1990 Date: Wed, 2 Jul 2014 12:39:21 +0530 Subject: [PATCH 05/17] Update AmazonAwsSQSConnector.java Made changes as per your suggestions. Created a new constructor. --- .../jms/connector/amazonaws/AmazonAwsSQSConnector.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnector.java b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnector.java index 9247d36..3d7c8bc 100644 --- a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnector.java +++ b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnector.java @@ -50,7 +50,11 @@ public class AmazonAwsSQSConnector extends AbstractSQSConnector { private final AmazonSQS _amazonSQS; private final AmazonSNS _amazonSNS; - public AmazonAwsSQSConnector(String awsAccessKey, String awsSecretKey, boolean isSecure, long receiveCheckIntervalMs, , int visibilityTimeoutOnReset) { + public AmazonAwsSQSConnector(String awsAccessKey, String awsSecretKey, boolean isSecure, long receiveCheckIntervalMs) { + this(awsAccessKey, awsSecretKey, isSecure, receiveCheckIntervalMs, false, 0); + } + + public AmazonAwsSQSConnector(String awsAccessKey, String awsSecretKey, boolean isSecure, long receiveCheckIntervalMs, int visibilityTimeoutOnReset) { this(awsAccessKey, awsSecretKey, isSecure, receiveCheckIntervalMs, false, visibilityTimeoutOnReset); } From 547bdced9eef8f4b1fe4678b7b81da5190f98172 Mon Sep 17 00:00:00 2001 From: tejas1990 Date: Wed, 2 Jul 2014 12:49:18 +0530 Subject: [PATCH 06/17] Update AbstractSQSConnector.java Made changes as per your suggestions. I was not able to set visibilitytimeout as final as it was causing error in your current method- protected AbstractSQSConnector(long receiveCheckIntervalMs, boolean isAsync) { _receiveCheckIntervalMs = receiveCheckIntervalMs; _isAsync = isAsync; } --- .../jms/connector/AbstractSQSConnector.java | 24 +++++++++++++++---- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/AbstractSQSConnector.java b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/AbstractSQSConnector.java index 6b5a23b..0575acd 100644 --- a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/AbstractSQSConnector.java +++ b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/AbstractSQSConnector.java @@ -29,6 +29,7 @@ */ public abstract class AbstractSQSConnector implements SQSConnector { protected static final String AWS_ERROR_CODE_AUTHENTICATION = "InvalidClientTokenId"; + protected static final String BLANK_STRING = ""; protected final Log _log = LogFactory.getLog(getClass()); @@ -38,19 +39,28 @@ public abstract class AbstractSQSConnector implements SQSConnector { protected AbstractSQSConnector(long receiveCheckIntervalMs) { - this(receiveCheckIntervalMs, false, 0); + this(receiveCheckIntervalMs, false); } - protected AbstractSQSConnector(long receiveCheckIntervalMs, boolean isAsync, int visibilityTimeoutOnReset) + protected AbstractSQSConnector(long receiveCheckIntervalMs, boolean isAsync) { _receiveCheckIntervalMs = receiveCheckIntervalMs; _isAsync = isAsync; - _visibilityTimeoutOnReset = visibilityTimeoutOnReset; + } + + protected AbstractSQSConnector(long receiveCheckIntervalMs, boolean isAsync, int visibilityTimeoutOnReset) + { + this(receiveCheckIntervalMs, isAsync); + _visibilityTimeoutOnReset = visibilityTimeoutOnReset; } public boolean isAsync() { return _isAsync; } + + public int getVisibilityTimeoutOnReset() { + return _visibilityTimeoutOnReset; + } public void sendMessage(NevadoDestination destination, NevadoMessage message) throws JMSException { @@ -129,7 +139,9 @@ public void resetMessage(NevadoMessage message) throws JMSException { "Did this come from an SQS queue?"); } SQSQueue sqsQueue = getSQSQueue(message.getNevadoDestination()); - sqsQueue.setMessageVisibilityTimeout(sqsReceiptHandle, _visibilityTimeoutOnReset); //to have custom visibility timeout + if (sqsReceiptHandle != null && !BLANK_STRING.equals(sqsReceiptHandle)){ + sqsQueue.setMessageVisibilityTimeout(sqsReceiptHandle, _visibilityTimeoutOnReset); // Customize message visibility timeout + } } /** @@ -181,7 +193,9 @@ protected SQSMessage receiveSQSMessage(NevadoConnection connection, NevadoDestin if (sqsMessage != null && !connection.isRunning()) { // Connection was stopped while the REST call to SQS was being made try { - sqsQueue.setMessageVisibilityTimeout(sqsMessage.getReceiptHandle(), _visibilityTimeoutOnReset); // to have a custom visibility timeout + if (sqsMessage.getReceiptHandle() != null && !BLANK_STRING.equals(sqsMessage.getReceiptHandle())) { + sqsQueue.setMessageVisibilityTimeout(sqsMessage.getReceiptHandle(), _visibilityTimeoutOnReset); // Customize message visibility timeout + } } catch (JMSException e) { String exMessage = "Unable to reset visibility timeout for message: " + e.getMessage(); _log.warn(exMessage, e); // Non-fatal. Just means the message will disappear until the visibility timeout expires. From 93eebe7e580caac3e6fdeb015971f068bd4ff418 Mon Sep 17 00:00:00 2001 From: tejas1990 Date: Wed, 2 Jul 2014 12:54:26 +0530 Subject: [PATCH 07/17] Update AmazonAwsSQSConnector.java removed unwanted constructor. --- .../nevado/jms/connector/amazonaws/AmazonAwsSQSConnector.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnector.java b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnector.java index 3d7c8bc..37ab04f 100644 --- a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnector.java +++ b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnector.java @@ -53,10 +53,6 @@ public class AmazonAwsSQSConnector extends AbstractSQSConnector { public AmazonAwsSQSConnector(String awsAccessKey, String awsSecretKey, boolean isSecure, long receiveCheckIntervalMs) { this(awsAccessKey, awsSecretKey, isSecure, receiveCheckIntervalMs, false, 0); } - - public AmazonAwsSQSConnector(String awsAccessKey, String awsSecretKey, boolean isSecure, long receiveCheckIntervalMs, int visibilityTimeoutOnReset) { - this(awsAccessKey, awsSecretKey, isSecure, receiveCheckIntervalMs, false, visibilityTimeoutOnReset); - } public AmazonAwsSQSConnector(String awsAccessKey, String awsSecretKey, boolean isSecure, long receiveCheckIntervalMs, boolean isAsync, int visibilityTimeoutOnReset) { super(receiveCheckIntervalMs, isAsync, visibilityTimeoutOnReset); From fbcbe245bb62235198c3658e63cd6e7ad8593a99 Mon Sep 17 00:00:00 2001 From: tejas1990 Date: Thu, 3 Jul 2014 14:14:29 +0530 Subject: [PATCH 08/17] Update AbstractSQSConnector.java Removed BLANK_STRING as you suggested. I have kept the if conditions as sometimes it throws an exception "Unable to reset visibility timeout as receipthandle does not exist in queue" even if the message is there in the queue. --- .../nevado/jms/connector/AbstractSQSConnector.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/AbstractSQSConnector.java b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/AbstractSQSConnector.java index 0575acd..6aa2024 100644 --- a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/AbstractSQSConnector.java +++ b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/AbstractSQSConnector.java @@ -29,8 +29,7 @@ */ public abstract class AbstractSQSConnector implements SQSConnector { protected static final String AWS_ERROR_CODE_AUTHENTICATION = "InvalidClientTokenId"; - protected static final String BLANK_STRING = ""; - + protected final Log _log = LogFactory.getLog(getClass()); private final long _receiveCheckIntervalMs; @@ -59,8 +58,8 @@ public boolean isAsync() { } public int getVisibilityTimeoutOnReset() { - return _visibilityTimeoutOnReset; - } + return _visibilityTimeoutOnReset; + } public void sendMessage(NevadoDestination destination, NevadoMessage message) throws JMSException { @@ -139,7 +138,7 @@ public void resetMessage(NevadoMessage message) throws JMSException { "Did this come from an SQS queue?"); } SQSQueue sqsQueue = getSQSQueue(message.getNevadoDestination()); - if (sqsReceiptHandle != null && !BLANK_STRING.equals(sqsReceiptHandle)){ + if (sqsReceiptHandle != null && StringUtils.isEmpty(sqsReceiptHandle) && sqsReceiptHandle.trim().length() == 0) { sqsQueue.setMessageVisibilityTimeout(sqsReceiptHandle, _visibilityTimeoutOnReset); // Customize message visibility timeout } } @@ -193,7 +192,7 @@ protected SQSMessage receiveSQSMessage(NevadoConnection connection, NevadoDestin if (sqsMessage != null && !connection.isRunning()) { // Connection was stopped while the REST call to SQS was being made try { - if (sqsMessage.getReceiptHandle() != null && !BLANK_STRING.equals(sqsMessage.getReceiptHandle())) { + if (sqsMessage.getReceiptHandle() != null && StringUtils.isEmpty(sqsMessage.getReceiptHandle()) && sqsMessage.getReceiptHandle().trim().length() == 0) { sqsQueue.setMessageVisibilityTimeout(sqsMessage.getReceiptHandle(), _visibilityTimeoutOnReset); // Customize message visibility timeout } } catch (JMSException e) { From 0b8ebf6d99f118cc2d166e1910578cbfe9bea815 Mon Sep 17 00:00:00 2001 From: tejas1990 Date: Thu, 3 Jul 2014 14:43:11 +0530 Subject: [PATCH 09/17] Update AbstractSQSConnector.java Correction made to if conditions for visibility timeout. --- .../nevado/jms/connector/AbstractSQSConnector.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/AbstractSQSConnector.java b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/AbstractSQSConnector.java index 6aa2024..9ccf463 100644 --- a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/AbstractSQSConnector.java +++ b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/AbstractSQSConnector.java @@ -138,7 +138,7 @@ public void resetMessage(NevadoMessage message) throws JMSException { "Did this come from an SQS queue?"); } SQSQueue sqsQueue = getSQSQueue(message.getNevadoDestination()); - if (sqsReceiptHandle != null && StringUtils.isEmpty(sqsReceiptHandle) && sqsReceiptHandle.trim().length() == 0) { + if (sqsReceiptHandle != null && StringUtils.isNotEmpty(sqsReceiptHandle) && sqsReceiptHandle.trim().length() != 0){ sqsQueue.setMessageVisibilityTimeout(sqsReceiptHandle, _visibilityTimeoutOnReset); // Customize message visibility timeout } } @@ -192,7 +192,7 @@ protected SQSMessage receiveSQSMessage(NevadoConnection connection, NevadoDestin if (sqsMessage != null && !connection.isRunning()) { // Connection was stopped while the REST call to SQS was being made try { - if (sqsMessage.getReceiptHandle() != null && StringUtils.isEmpty(sqsMessage.getReceiptHandle()) && sqsMessage.getReceiptHandle().trim().length() == 0) { + if (sqsMessage.getReceiptHandle() != null && StringUtils.isNotEmpty(sqsMessage.getReceiptHandle()) && sqsMessage.getReceiptHandle().trim().length() != 0) { sqsQueue.setMessageVisibilityTimeout(sqsMessage.getReceiptHandle(), _visibilityTimeoutOnReset); // Customize message visibility timeout } } catch (JMSException e) { From 0c6610f55d4c1df32155acb9ef96ac4d3f4a7ae8 Mon Sep 17 00:00:00 2001 From: tejas1990 Date: Thu, 3 Jul 2014 14:45:02 +0530 Subject: [PATCH 10/17] Update AmazonAwsSQSConnector.java Made changes as per your suggestions. Created my own constructor. --- .../amazonaws/AmazonAwsSQSConnector.java | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnector.java b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnector.java index 37ab04f..d6a8dc7 100644 --- a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnector.java +++ b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnector.java @@ -54,6 +54,29 @@ public AmazonAwsSQSConnector(String awsAccessKey, String awsSecretKey, boolean i this(awsAccessKey, awsSecretKey, isSecure, receiveCheckIntervalMs, false, 0); } + public AmazonAwsSQSConnector(String awsAccessKey, String awsSecretKey, boolean isSecure, long receiveCheckIntervalMs, boolean isAsync) { + super(receiveCheckIntervalMs, isAsync); + AWSCredentials awsCredentials = new BasicAWSCredentials(awsAccessKey, awsSecretKey); + ClientConfiguration clientConfiguration = new ClientConfiguration(); + String proxyHost = System.getProperty("http.proxyHost"); + String proxyPort = System.getProperty("http.proxyPort"); + if(proxyHost != null){ + clientConfiguration.setProxyHost(proxyHost); + if(proxyPort != null){ + clientConfiguration.setProxyPort(Integer.parseInt(proxyPort)); + } + } + clientConfiguration.setProtocol(isSecure ? Protocol.HTTPS : Protocol.HTTP); + if (isAsync) { + ExecutorService executorService = Executors.newSingleThreadExecutor(); + _amazonSQS = new AmazonSQSAsyncClient(awsCredentials, clientConfiguration, executorService); + _amazonSNS = new AmazonSNSAsyncClient(awsCredentials, clientConfiguration, executorService); + } else { + _amazonSQS = new AmazonSQSClient(awsCredentials, clientConfiguration); + _amazonSNS = new AmazonSNSClient(awsCredentials, clientConfiguration); + } + } + public AmazonAwsSQSConnector(String awsAccessKey, String awsSecretKey, boolean isSecure, long receiveCheckIntervalMs, boolean isAsync, int visibilityTimeoutOnReset) { super(receiveCheckIntervalMs, isAsync, visibilityTimeoutOnReset); AWSCredentials awsCredentials = new BasicAWSCredentials(awsAccessKey, awsSecretKey); From dbfab5afd9da4bea09bfe98f40e235c204ebffe9 Mon Sep 17 00:00:00 2001 From: tejas1990 Date: Thu, 3 Jul 2014 14:48:17 +0530 Subject: [PATCH 11/17] Update AmazonAwsSQSConnectorFactory.java Made changes as per your suggestions. --- .../amazonaws/AmazonAwsSQSConnectorFactory.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnectorFactory.java b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnectorFactory.java index dc48691..b6a1a70 100644 --- a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnectorFactory.java +++ b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnectorFactory.java @@ -33,11 +33,11 @@ public boolean isUseAsyncSend() { return _useAsyncSend; } - public int get_visibilityTimeoutOnReset() { - return _visibilityTimeoutOnReset; - } + public int getVisibilityTimeoutOnReset() { + return _visibilityTimeoutOnReset; + } - public void set_visibilityTimeoutOnReset(int visibilityTimeoutOnReset) { - _visibilityTimeoutOnReset = visibilityTimeoutOnReset; - } + public void setVisibilityTimeoutOnReset(int visibilityTimeoutOnReset) { + _visibilityTimeoutOnReset = visibilityTimeoutOnReset; + } } From 38fe205d14d522474ef6e66ecbb3d517beeaf7ea Mon Sep 17 00:00:00 2001 From: Tejas Raorane Date: Tue, 8 Jul 2014 15:45:20 +0530 Subject: [PATCH 12/17] Update AbstractSQSConnector.java As per your suggestions I have made changes. Created a final variable. --- .../jms/connector/AbstractSQSConnector.java | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/AbstractSQSConnector.java b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/AbstractSQSConnector.java index 9ccf463..a9f9638 100644 --- a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/AbstractSQSConnector.java +++ b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/AbstractSQSConnector.java @@ -34,23 +34,23 @@ public abstract class AbstractSQSConnector implements SQSConnector { private final long _receiveCheckIntervalMs; private final boolean _isAsync; - private int _visibilityTimeoutOnReset; + private final int _visibilityTimeoutOnReset; protected AbstractSQSConnector(long receiveCheckIntervalMs) { this(receiveCheckIntervalMs, false); } - protected AbstractSQSConnector(long receiveCheckIntervalMs, boolean isAsync) + protected AbstractSQSConnector(long receiveCheckIntervalMs, boolean isAsync, int visibilityTimeoutOnReset) { _receiveCheckIntervalMs = receiveCheckIntervalMs; _isAsync = isAsync; + _visibilityTimeoutOnReset = visibilityTimeoutOnReset; } - protected AbstractSQSConnector(long receiveCheckIntervalMs, boolean isAsync, int visibilityTimeoutOnReset) + protected AbstractSQSConnector(long receiveCheckIntervalMs, boolean isAsync) { - this(receiveCheckIntervalMs, isAsync); - _visibilityTimeoutOnReset = visibilityTimeoutOnReset; + this(receiveCheckIntervalMs, isAsync, 0); } public boolean isAsync() { @@ -138,9 +138,7 @@ public void resetMessage(NevadoMessage message) throws JMSException { "Did this come from an SQS queue?"); } SQSQueue sqsQueue = getSQSQueue(message.getNevadoDestination()); - if (sqsReceiptHandle != null && StringUtils.isNotEmpty(sqsReceiptHandle) && sqsReceiptHandle.trim().length() != 0){ - sqsQueue.setMessageVisibilityTimeout(sqsReceiptHandle, _visibilityTimeoutOnReset); // Customize message visibility timeout - } + sqsQueue.setMessageVisibilityTimeout(sqsReceiptHandle, _visibilityTimeoutOnReset); // Customize message visibility timeout } /** @@ -192,7 +190,7 @@ protected SQSMessage receiveSQSMessage(NevadoConnection connection, NevadoDestin if (sqsMessage != null && !connection.isRunning()) { // Connection was stopped while the REST call to SQS was being made try { - if (sqsMessage.getReceiptHandle() != null && StringUtils.isNotEmpty(sqsMessage.getReceiptHandle()) && sqsMessage.getReceiptHandle().trim().length() != 0) { + if(sqsMessage.getReceiptHandle() != null && StringUtils.isNotEmpty(sqsMessage.getReceiptHandle()) && sqsMessage.getReceiptHandle().trim().length() > 0) { sqsQueue.setMessageVisibilityTimeout(sqsMessage.getReceiptHandle(), _visibilityTimeoutOnReset); // Customize message visibility timeout } } catch (JMSException e) { From 43f159c8bcc897e39f36c9eaf625d7790d3a7d5b Mon Sep 17 00:00:00 2001 From: Tejas Raorane Date: Tue, 8 Jul 2014 15:49:37 +0530 Subject: [PATCH 13/17] Update AmazonAwsSQSConnector.java Here I made _amazonSQS and _amazonSNS non final to implement the cut-paste anti pattern. So i created a private method to initialize the connection and created my own constructor as you suggested. --- .../amazonaws/AmazonAwsSQSConnector.java | 32 ++++++------------- 1 file changed, 9 insertions(+), 23 deletions(-) diff --git a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnector.java b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnector.java index d6a8dc7..b16b291 100644 --- a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnector.java +++ b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnector.java @@ -47,39 +47,25 @@ public class AmazonAwsSQSConnector extends AbstractSQSConnector { public static final String MESSAGE_ATTRIBUTE_APPROXIMATE_RECEIVE_COUNT = "ApproximateReceiveCount"; - private final AmazonSQS _amazonSQS; - private final AmazonSNS _amazonSNS; + private AmazonSQS _amazonSQS; + private AmazonSNS _amazonSNS; public AmazonAwsSQSConnector(String awsAccessKey, String awsSecretKey, boolean isSecure, long receiveCheckIntervalMs) { - this(awsAccessKey, awsSecretKey, isSecure, receiveCheckIntervalMs, false, 0); + this(awsAccessKey, awsSecretKey, isSecure, receiveCheckIntervalMs, false); } public AmazonAwsSQSConnector(String awsAccessKey, String awsSecretKey, boolean isSecure, long receiveCheckIntervalMs, boolean isAsync) { super(receiveCheckIntervalMs, isAsync); - AWSCredentials awsCredentials = new BasicAWSCredentials(awsAccessKey, awsSecretKey); - ClientConfiguration clientConfiguration = new ClientConfiguration(); - String proxyHost = System.getProperty("http.proxyHost"); - String proxyPort = System.getProperty("http.proxyPort"); - if(proxyHost != null){ - clientConfiguration.setProxyHost(proxyHost); - if(proxyPort != null){ - clientConfiguration.setProxyPort(Integer.parseInt(proxyPort)); - } - } - clientConfiguration.setProtocol(isSecure ? Protocol.HTTPS : Protocol.HTTP); - if (isAsync) { - ExecutorService executorService = Executors.newSingleThreadExecutor(); - _amazonSQS = new AmazonSQSAsyncClient(awsCredentials, clientConfiguration, executorService); - _amazonSNS = new AmazonSNSAsyncClient(awsCredentials, clientConfiguration, executorService); - } else { - _amazonSQS = new AmazonSQSClient(awsCredentials, clientConfiguration); - _amazonSNS = new AmazonSNSClient(awsCredentials, clientConfiguration); - } + initializeConnection(awsAccessKey, awsSecretKey, isSecure, receiveCheckIntervalMs, isAsync); } public AmazonAwsSQSConnector(String awsAccessKey, String awsSecretKey, boolean isSecure, long receiveCheckIntervalMs, boolean isAsync, int visibilityTimeoutOnReset) { super(receiveCheckIntervalMs, isAsync, visibilityTimeoutOnReset); - AWSCredentials awsCredentials = new BasicAWSCredentials(awsAccessKey, awsSecretKey); + initializeConnection(awsAccessKey, awsSecretKey, isSecure, receiveCheckIntervalMs, isAsync); + } + + private void initializeConnection(String awsAccessKey, String awsSecretKey, boolean isSecure, long receiveCheckIntervalMs, boolean isAsync){ + AWSCredentials awsCredentials = new BasicAWSCredentials(awsAccessKey, awsSecretKey); ClientConfiguration clientConfiguration = new ClientConfiguration(); String proxyHost = System.getProperty("http.proxyHost"); String proxyPort = System.getProperty("http.proxyPort"); From a45e68e2fb03488550b54609d7e1c2cc7d70e6ad Mon Sep 17 00:00:00 2001 From: Tejas Raorane Date: Tue, 8 Jul 2014 16:03:25 +0530 Subject: [PATCH 14/17] Update AmazonAwsSQSConnectorFactory.java Initialized _visibilityTimeoutOnReset with default value. --- .../jms/connector/amazonaws/AmazonAwsSQSConnectorFactory.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnectorFactory.java b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnectorFactory.java index b6a1a70..6eb4a7c 100644 --- a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnectorFactory.java +++ b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnectorFactory.java @@ -10,7 +10,7 @@ */ public class AmazonAwsSQSConnectorFactory extends AbstractSQSConnectorFactory { protected boolean _useAsyncSend = false; - protected int _visibilityTimeoutOnReset; + protected int _visibilityTimeoutOnReset = 0; @Override public AmazonAwsSQSConnector getInstance(String awsAccessKey, String awsSecretKey, String awsSQSEndpoint, String awsSNSEndpoint) { @@ -39,5 +39,5 @@ public int getVisibilityTimeoutOnReset() { public void setVisibilityTimeoutOnReset(int visibilityTimeoutOnReset) { _visibilityTimeoutOnReset = visibilityTimeoutOnReset; - } + } } From 6cd28f508ad8492875611941dd7d8e716160b31f Mon Sep 17 00:00:00 2001 From: Tejas Raorane Date: Wed, 9 Jul 2014 14:49:45 +0530 Subject: [PATCH 15/17] Update AbstractSQSConnector.java Made changes as per your suggestions. --- .../nevado/jms/connector/AbstractSQSConnector.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/AbstractSQSConnector.java b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/AbstractSQSConnector.java index a9f9638..3a7207f 100644 --- a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/AbstractSQSConnector.java +++ b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/AbstractSQSConnector.java @@ -29,12 +29,12 @@ */ public abstract class AbstractSQSConnector implements SQSConnector { protected static final String AWS_ERROR_CODE_AUTHENTICATION = "InvalidClientTokenId"; - + protected final Log _log = LogFactory.getLog(getClass()); private final long _receiveCheckIntervalMs; private final boolean _isAsync; - private final int _visibilityTimeoutOnReset; + private final int _visibilityTimeoutOnReset; protected AbstractSQSConnector(long receiveCheckIntervalMs) { @@ -50,15 +50,15 @@ protected AbstractSQSConnector(long receiveCheckIntervalMs, boolean isAsync, int protected AbstractSQSConnector(long receiveCheckIntervalMs, boolean isAsync) { - this(receiveCheckIntervalMs, isAsync, 0); + this(receiveCheckIntervalMs, false, 0); } public boolean isAsync() { return _isAsync; } - + public int getVisibilityTimeoutOnReset() { - return _visibilityTimeoutOnReset; + return _visibilityTimeoutOnReset; } public void sendMessage(NevadoDestination destination, NevadoMessage message) throws JMSException From 4b743bbde9aa143eac3a119e095bc916f77d9cd1 Mon Sep 17 00:00:00 2001 From: Tejas Raorane Date: Wed, 9 Jul 2014 14:53:06 +0530 Subject: [PATCH 16/17] Update AmazonAwsSQSConnector.java Added this() to second constructor and reverted the variables to final. --- .../amazonaws/AmazonAwsSQSConnector.java | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnector.java b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnector.java index b16b291..78557f7 100644 --- a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnector.java +++ b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnector.java @@ -47,25 +47,20 @@ public class AmazonAwsSQSConnector extends AbstractSQSConnector { public static final String MESSAGE_ATTRIBUTE_APPROXIMATE_RECEIVE_COUNT = "ApproximateReceiveCount"; - private AmazonSQS _amazonSQS; - private AmazonSNS _amazonSNS; + private final AmazonSQS _amazonSQS; + private final AmazonSNS _amazonSNS; public AmazonAwsSQSConnector(String awsAccessKey, String awsSecretKey, boolean isSecure, long receiveCheckIntervalMs) { this(awsAccessKey, awsSecretKey, isSecure, receiveCheckIntervalMs, false); } public AmazonAwsSQSConnector(String awsAccessKey, String awsSecretKey, boolean isSecure, long receiveCheckIntervalMs, boolean isAsync) { - super(receiveCheckIntervalMs, isAsync); - initializeConnection(awsAccessKey, awsSecretKey, isSecure, receiveCheckIntervalMs, isAsync); + this(awsAccessKey, awsSecretKey, isSecure, receiveCheckIntervalMs, isAsync, 0); } - + public AmazonAwsSQSConnector(String awsAccessKey, String awsSecretKey, boolean isSecure, long receiveCheckIntervalMs, boolean isAsync, int visibilityTimeoutOnReset) { super(receiveCheckIntervalMs, isAsync, visibilityTimeoutOnReset); - initializeConnection(awsAccessKey, awsSecretKey, isSecure, receiveCheckIntervalMs, isAsync); - } - - private void initializeConnection(String awsAccessKey, String awsSecretKey, boolean isSecure, long receiveCheckIntervalMs, boolean isAsync){ - AWSCredentials awsCredentials = new BasicAWSCredentials(awsAccessKey, awsSecretKey); + AWSCredentials awsCredentials = new BasicAWSCredentials(awsAccessKey, awsSecretKey); ClientConfiguration clientConfiguration = new ClientConfiguration(); String proxyHost = System.getProperty("http.proxyHost"); String proxyPort = System.getProperty("http.proxyPort"); From 2f08cfbb74a3e4dd089d0bff9ba74f791718ab62 Mon Sep 17 00:00:00 2001 From: Tejas Raorane Date: Wed, 9 Jul 2014 14:56:39 +0530 Subject: [PATCH 17/17] Update AbstractSQSConnector.java Removed the if check for receipt handle as you suggested. But now i get the exception. --- .../nevado/jms/connector/AbstractSQSConnector.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/AbstractSQSConnector.java b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/AbstractSQSConnector.java index 3a7207f..3ee0dda 100644 --- a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/AbstractSQSConnector.java +++ b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/AbstractSQSConnector.java @@ -190,9 +190,7 @@ protected SQSMessage receiveSQSMessage(NevadoConnection connection, NevadoDestin if (sqsMessage != null && !connection.isRunning()) { // Connection was stopped while the REST call to SQS was being made try { - if(sqsMessage.getReceiptHandle() != null && StringUtils.isNotEmpty(sqsMessage.getReceiptHandle()) && sqsMessage.getReceiptHandle().trim().length() > 0) { - sqsQueue.setMessageVisibilityTimeout(sqsMessage.getReceiptHandle(), _visibilityTimeoutOnReset); // Customize message visibility timeout - } + sqsQueue.setMessageVisibilityTimeout(sqsMessage.getReceiptHandle(), _visibilityTimeoutOnReset); // Customize visibility timeout } catch (JMSException e) { String exMessage = "Unable to reset visibility timeout for message: " + e.getMessage(); _log.warn(exMessage, e); // Non-fatal. Just means the message will disappear until the visibility timeout expires.