Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

maxDelayBetweenPolls is not honored if there is long-running SQS task during LOW throughput mode #1187

Open
wmvsilva opened this issue Aug 8, 2024 · 2 comments
Labels
component: sqs SQS integration related issue type: bug Something isn't working

Comments

@wmvsilva
Copy link

wmvsilva commented Aug 8, 2024

Type: Bug

Component:
SQS
Version: 3.0.3
Config:

  • maxDelayBetweenPolls: 10000
  • maxConcurrentMessages: 121
  • listenerShutdownTimeout: 120000
    All other portions of the config are the defaults.

Describe the bug

Bug Description

Hi team,

My application uses @SqsListener to process messages from an SQS queue. Occasionally, it experiences an issue where no SQS messages are received or processed for several minutes, despite thousands of messages being present in the queue. I enabled DEBUG logs for io.awspring.cloud.sqs to investigate this problem.

During the last time the issue occurred, I observed the following log every 10 seconds (but no polling logs):

Trying to acquire full permits for io.awspring.cloud.sqs.sqsListenerEndpointContainer#4-0. Permits left: 120

Additionally, I could see logs in my application indicating that a single SQS message was actively being processed. As soon as that task finished processing, polling started up again and there were plenty of logs like:

Polling queue <MY-QUEUE> for 10 messages.

Bug Scenario

After looking through the awspring source code, I believe the following sequence of events occurs:

  1. awspring is in CurrentThroughputMode.HIGH mode
  2. An SQS message is received that will take several minutes to process
  3. While this SQS message is processing, awspring attempts to poll the queue for 10 messages, but no messages are received. This causes SemaphoreBackPressureHandler to enter CurrentThroughputMode.LOW mode.
  4. In the AbstractPollingMessageSource.pollAndEmitMessages() loop, when SemaphoreBackPressureHandler.requestInLowThroughputMode() is run, it tries to acquire all 121 permits. However, one permit remains unavailable due to the long-running SQS task that started earlier. Because requestInLowThroughputMode cannot acquire all permits, it acquires none, resulting in no SQS polling.
  5. Once the long-running SQS task completes, all permits are available, and polling begins again.

For step 5, I see the following logs:

...
Processed SQS notifications message (this log is from my own app)
Releasing permit for queue <MY-QUEUE>
1 unused permit(s), setting TM HIGH for io.awspring.cloud.sqs.sqsListenerEndpointContainer#4-0. Permits left: 120
Acquired full permits for io.awspring.cloud.sqs.sqsListenerEndpointContainer#4-0. Permits left: 0
Polling queue <MY-QUEUE> for 10 messages.
...

I believe this happens because SemaphoreBackPressureHandler.requestInLowThroughputMode() uses the following code. With this approach, LOW can only acquire permits if the full totalPermits are available. While this prevents parallel SQS polling in LOW mode, it also prevents any polling if there ongoing SQS tasks consuming permits. With fast processing of SQS messages, this would not be noticeable, but I occasionally have SQS tasks that last several minutes.

	private int requestInLowThroughputMode() throws InterruptedException {
		// Although LTM can be set / unset by many processes, only the MessageSource thread gets here,
		// so no actual concurrency
		logger.debug("Trying to acquire full permits for {}. Permits left: {}", this.id,
				this.semaphore.availablePermits());
		boolean hasAcquired = tryAcquire(this.totalPermits, CurrentThroughputMode.LOW);
		if (hasAcquired) {
			logger.debug("Acquired full permits for {}. Permits left: {}", this.id, this.semaphore.availablePermits());
			// We've acquired all permits - there's no other process currently processing messages
			if (!this.hasAcquiredFullPermits.compareAndSet(false, true)) {
				logger.warn("hasAcquiredFullPermits was already true. Permits left: {}",
						this.semaphore.availablePermits());
			}
			return this.batchSize;
		}
		else {
			return 0;
		}
	}

Expectation

I would expect that if my application is handling an SQS message that takes several minutes to process, and SemaphoreBackPressureHandler is in LOW mode, then SqsListener would continue to poll the queue for up to 10 messages every 10 seconds.

Workaround

As a workaround, I am using backPressureMode=FIXED_HIGH_THROUGHPUT to prevent my application from entering LOW mode. However, this results in additional polling.

Please let me know if there is any additional information I can provide or if any of my assumptions are incorrect.

@tomazfernandes
Copy link
Contributor

Hey @wmvsilva, thanks for the detailed investigation.

The scenario you described makes sense, and indeed it would prevent further polling until the message is done processing.

We could look into enabling partial polls for LTM, e.g. if one permit is being used, we'd try to acquire all permits, and if that fails, we could try to acquire 9 permits to do a partial poll.

The main issue I see is that it's not so simple to orchestrate this with the release of the permits afterwards.

I'll need to give this some more thought - do you have any ideas?

Thanks.

@tomazfernandes
Copy link
Contributor

tomazfernandes commented Jan 24, 2025

We should be able to solve this in a simple way after we merge #1251, by separating a ThroughputModeBPH from SemaphoreBPH, and having a CompositeBPH when maxInflightMessages is more than the batch size.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
component: sqs SQS integration related issue type: bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants