Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom visibility timeout for each message in SQS queue #89

Open
wants to merge 17 commits into
base: master
Choose a base branch
from

Conversation

tejasraorane
Copy link

Configure visibility timeout for each message in SQS queue.

tejas1990 and others added 14 commits June 25, 2014 09:52
To add custom visibility timeout for each message in SQS queue
Add custom visibility timeout to each message in SQS queue.You can set the value of visibilityTimeoutOnReset in spring configuration. Value is in seconds.
To set custom visibility timeout for each message in SQS queue.
Made changes as per your suggestions.
Made changes as per your suggestions. Created a new constructor.
Made changes as per your suggestions. I was not able to set visibilitytimeout as final as it was causing error in your current method- 
protected AbstractSQSConnector(long receiveCheckIntervalMs, boolean isAsync)
    {
        _receiveCheckIntervalMs = receiveCheckIntervalMs;
        _isAsync = isAsync;
    }
removed unwanted constructor.
Removed BLANK_STRING as you suggested. I have kept the if conditions as sometimes it throws an exception "Unable to reset visibility timeout as receipthandle does not exist in queue" even if the message is there in the queue.
Correction made to if conditions for visibility timeout.
Made changes as per your suggestions. Created my own constructor.
Made changes as per your suggestions.
As per your suggestions I have made changes. Created a final variable.
Here I made _amazonSQS and _amazonSNS non final to implement the cut-paste anti pattern. So i created a private method to initialize the connection and created my own constructor as you suggested.
Initialized _visibilityTimeoutOnReset with default value.
@tejasraorane
Copy link
Author

Hey sorry for the trouble but we need the visibility timeout feature. I have made changes as per your suggestion. Please can you review it.

@@ -29,26 +29,37 @@
*/
public abstract class AbstractSQSConnector implements SQSConnector {
protected static final String AWS_ERROR_CODE_AUTHENTICATION = "InvalidClientTokenId";

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove extra space. (But not the blank line. There are three spaces here.)

@carterpage
Copy link
Member

Needs more work.

Made changes as per your suggestions.
Added this() to second constructor and reverted the variables to final.
Removed the if check for receipt handle as you suggested. But now i get the exception.
@tejasraorane
Copy link
Author

I have removed the "if check" for receipt handle. But now I get the exception.

@carterpage
Copy link
Member

Looks much better. You'll still need to add a unit test to confirm the functionality. Easiest way to do this is to add a test to MessageListenerTest. Do something similar to testThrowRuntimeAutoAck(), using the TestMessageListenerRuntimeException. You'll need to create a special ConnectorFactory and set your visibility timeout high (like 15 seconds). You should test that the message listener does not return a message for 5 seconds. (The default case is already covered in the other methods.)

This may take a couple of iterations to get it right, but we'll walk through it.

@carterpage
Copy link
Member

Any progress on the unit test?

Also, any idea why you get an exception getting the receipt handle? I've never had a check there and no one has complained. Are you doing something unusual?

@tejasraorane
Copy link
Author

Yes.I'm working on it. I have followed the instructions you gave.I'm facing some issues in acknowledging the message.How can I do a session rollback to send the message back to queue.

@carterpage
Copy link
Member

If you use the TestMessageListenerRuntimeException it will throw an exception when the message comes through. If you are using auto_ack, then the message should automatically reset.

@tejasraorane
Copy link
Author

So I should use the messageListener.getMessage() to fetch the message from queue or the listener will automatically listen to the queue. I wrote a method as you told similar to testThrowRuntimeAutoAck() but I have some doubts regarding it. While using the methods messageListener.getMessage() and messageListener.getFirstMessage() should I receive the message or not. I have set the messagevisibilitytimeout to 15 seconds.

@carterpage
Copy link
Member

They way to know it's working is to try to retrieve the message a second
time. If the message visibility timeout is set to 0 it will immediately
appear, if it is set to 15 seconds it won't appear until 15 seconds have
passed.

On Wed, Jul 30, 2014 at 2:36 AM, Tejas Raorane [email protected]
wrote:

So I should use the messageListener.getMessage() to fetch the message from
queue or the listener will automatically listen to the queue. I wrote a
method as you told similar to testThrowRuntimeAutoAck() but I have some
doubts regarding it. While using the methods messageListener.getMessage()
and messageListener.getFirstMessage() should I receive the message or not.
I have set the messagevisibilitytimeout to 15 seconds.


Reply to this email directly or view it on GitHub
#89 (comment).

@tejasraorane
Copy link
Author

Hey need some help. I'm trying to receive the message by using messageListener.getMessage().But I'm unable to find a way to send it back again. I tried to resend using producer.send() but it didn't worked. I'm using auto_ack. When i do session.rollback which is I'm using in my product code and it is working fine but in test it also didn't worked. Please can you tell me how to resend the message and also whether to use messageListener.getMessage() to get the message.

@carterpage
Copy link
Member

I have a question. When a message fails, do you need it to always be sent
back to SQS? If so, you may need to wait on another contributor who is
making a fix to bypass message parking (whereby we hold onto a message that
recently failed).

As far as resending the message, I thinking you mean resetting it. No need
to resend the same message through a send().

On Wed, Aug 6, 2014 at 10:59 AM, Tejas Raorane [email protected]
wrote:

Hey need some help. I'm trying to receive the message by using
messageListener.getMessage().But I'm unable to find a way to send it back
again. I tried to resend using producer.send() but it didn't worked. I'm
using auto_ack. When i do session.rollback which is I'm using in my product
code and it is working fine but in test it also didn't worked. Please can
you tell me how to resend the message and also whether to use
messageListener.getMessage() to get the message.


Reply to this email directly or view it on GitHub
#89 (comment).

@tejasraorane
Copy link
Author

In my code I use SessionAwareMessageListener in which the onMessage method has two parameters javax.jms.Message and javax.jms.Session. When processing of any message fails i simply do session.rollback and the message goes back to SQS. After that the visibilityTimeout is set and then I can fetch the message again only after the given interval is elapsed. The visibility timeout is set by my new code in nevado jms lib. But in your code I'm unable to send it back that's why i need your help.

@carterpage
Copy link
Member

Do you have your code in a branch on github? Can you point me to it?

On Tue, Aug 12, 2014 at 6:03 AM, Tejas Raorane [email protected]
wrote:

In my code I use SessionAwareMessageListener in which the onMessage method
has two parameters javax.jms.Message and javax.jms.Session. When processing
of any message fails i simply do session.rollback and the message goes back
to SQS. After that the visibilityTimeout is set and then I can fetch the
message again only after the given interval is elapsed. The visibility
timeout is set by my new code in nevado jms lib. But in your code I'm
unable to send it back that's why i need your help.


Reply to this email directly or view it on GitHub
#89 (comment).

@tejasraorane
Copy link
Author

Its not on github but I'll mail you the snippet of code.

@tejasraorane
Copy link
Author

Let me explain you what exactly my problem is. From the start i create a message and send it to the SQS queue. My code which I mailed you is the listener which fetch the message from the queue. The same code processes the message. If the processing fails I do the session.rollback() which sends the message back to queue. Now the new code of visibility timeout in nevado library comes in picture. It sets the visibility timeout to the given value. Now my listener can fetch the message only after the interval is elapsed. So the whole scenario is the visibility timeout is set only after the failure or session.rollback() i.e. sending the message back to the queue again. In the test I'm able to send the message to the queue and fetch it but the problem is how can I send it back again to the queue to test the visibility timeout. I tried using session.rollback() but it terminates the execution. I need some help from you regarding this.

@carterpage
Copy link
Member

Two things:

  1. Rollback does not immediately send the message back to the queue, but to
    an internal cache, to limit the need to hit SQS again.

  2. I'm very sorry about this, but I have zero free time, and this feature
    is not a high priority for me or any other users, from what I can tell. If
    other engineers at your job can help you figure this out, great. What I
    can offer you is code reviews, and tell you what needs to be included (like
    good tests, clean code) to be submitted. What I can't offer is to do the
    work for you. If you need to understand better what's going on, set up a
    debugger, walk through it. Experiment. Programming is hard work and
    patience. Practice it year after year, and it slowly gets easier. But not
    much.

On Thu, Aug 28, 2014 at 12:59 AM, Tejas Raorane [email protected]
wrote:

Let me explain you what exactly my problem is. From the start i create a
message and send it to the SQS queue. My code which I mailed you is the
listener which fetch the message from the queue. The same code processes
the message. If the processing fails I do the session.rollback() which
sends the message back to queue. Now the new code of visibility timeout in
nevado library comes in picture. It sets the visibility timeout to the
given value. Now my listener can fetch the message only after the interval
is elapsed. So the whole scenario is the visibility timeout is set only
after the failure or session.rollback() i.e. sending the message back to
the queue again. In the test I'm able to send the message to the queue and
fetch it but the problem is how can I send it back again to the queue to
test the visibility timeout. I tried using session.rollback() but it
terminates the execution. I need some help from you regarding this.


Reply to this email directly or view it on GitHub
#89 (comment).

@tejasraorane
Copy link
Author

Ya sure I'll try my best. Hey I'm facing another issue. The duplicate messages issue which you mentioned under Grey area section. When we process the message and if the message is not acknowledged within 10 seconds we receive the same message again and it is processed twice. It’s a high priority issue, Could you please give some pointers to fix this issue.

@carterpage
Copy link
Member

I assume you have your default queue timeout set to 10 seconds? You'll
either need to increase the timeout, or find a way to de-dup the messages.

Nevado may be the wrong software for what you are trying to do, since it
assumes it can control the queue. If you are using custom queue settings,
you might have better luck programming directly against the Amazon SDK.

Regardless of whether or not you use Nevado, SQS makes no guarantee of
once-and-only-once delivery. Dups will still happen occasionally.

On Fri, Sep 19, 2014 at 11:38 PM, Tejas Raorane [email protected]
wrote:

Ya sure I'll try my best. Hey I'm facing another issue. The duplicate
messages issue which you mentioned under Grey area section. When we process
the message and if the message is not acknowledged within 10 seconds we
receive the same message again and it is processed twice. It’s a high
priority issue, Could you please give some pointers to fix this issue.


Reply to this email directly or view it on GitHub
#89 (comment).

@tejasraorane
Copy link
Author

Hi Carter. I have mailed you some exception. It is a class not found exception. could you please provide me any pointers on this so that i can try to fix it. It is an intermittent exception.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants