Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queue mechanism to stop processing jobs #24

Open
wants to merge 7 commits into
base: development
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,5 @@ SENDMAIL_DSN=sendmail://default
MAILER_DSN=native://default

MAIL_CHARSET=utf-8

MAX_ATTEMPTS_DEFAULT=10
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

10 its a lot of to be set as default, you can change this to be 3

17 changes: 10 additions & 7 deletions src/Queue/Backend/Beanstalkd/BeanstalkdQueueStoreAdapter.php
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ani2amigos as we have a config file to deal with the envs and load them all at once, you can move this $_ENV['MAX_ATTEMPTS_DEFAULT'] there so we can easily have access to all env setup

Original file line number Diff line number Diff line change
Expand Up @@ -112,16 +112,18 @@ public function ack(MailJobInterface $mailJob)
}

$pheanstalk = $this->getConnection()->getInstance()->useTube($this->queueName);
if ($mailJob->isCompleted()) {
$pheanstalk->delete($mailJob->getPheanstalkJob());
return null;
}

$timestamp = $mailJob->getTimeToSend();
$delay = max(0, $timestamp - time());
$pheanstalk->delete($mailJob->getPheanstalkJob());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pheanstalk has a method to re-queue the job, the release as it was set.
set the algorithm to be like:

  • if not finished, increase the attempt number;
  • if not overtaking the number of attempts defined, use release method;
  • if it's finished or reached the attempts limit, delete

$mailJob->incrementAttempt();
if (!$mailJob->isCompleted() && $mailJob->getAttempt() <= $_ENV['MAX_ATTEMPTS_DEFAULT']) {
$timestamp = $mailJob->getTimeToSend();
$delay = max(0, $timestamp - time());
// add back to the queue as it wasn't completed maybe due to some transitory error
Copy link
Collaborator

@2amjsouza 2amjsouza Apr 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove any comment from code not related to docs

// could also be failed.
$pheanstalk->release($mailJob->getPheanstalkJob(), Pheanstalk::DEFAULT_PRIORITY, $delay);

$pheanstalk->put($this->createPayload($mailJob), Pheanstalk::DEFAULT_PRIORITY, $delay);
}

return null;
}

Expand Down Expand Up @@ -150,4 +152,5 @@ protected function createPayload(MailJobInterface $mailJob)
'message' => $mailJob->getMessage(),
]);
}

}
28 changes: 19 additions & 9 deletions src/Queue/Backend/Pdo/PdoQueueStoreAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -112,17 +112,26 @@ public function ack(MailJobInterface $mailJob)
throw new InvalidCallException('PdoMailJob cannot be a new object to be acknowledged');
}

$sqlText = 'UPDATE `%s`
$mailJob->incrementAttempt();
if($mailJob->getAttempt() > $_ENV['MAX_ATTEMPTS_DEFAULT']){
$sqlText = 'DELETE FROM mail_queue WHERE id = :max_attempt;';
$sql = sprintf($sqlText, $this->tableName);
$query = $this->getConnection()->getInstance()->prepare($sql);
$query->bindValue(':id', $mailJob->getId());
} else {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avoid using else when writing the algs, return query execute instead, them outside the if, the other case:

if($mailJob->getAttempt() > $_ENV['MAX_ATTEMPTS_DEFAULT']){
            $sqlText = 'DELETE FROM mail_queue WHERE id = :max_attempt;';
            $sql = sprintf($sqlText, $this->tableName);
            $query = $this->getConnection()->getInstance()->prepare($sql);
            $query->bindValue(':id', $mailJob->getId());

            return $query->execute();
        }

$sqlText = 'UPDATE `%s`
                SET `attempt`=:attempt, `state`=:state, `timeToSend`=:timeToSend, `sentTime`=:sentTime
                WHERE `id`=:id';

... rest of your code

$sqlText = 'UPDATE `%s`
SET `attempt`=:attempt, `state`=:state, `timeToSend`=:timeToSend, `sentTime`=:sentTime
WHERE `id`=:id';
$sql = sprintf($sqlText, $this->tableName);
$sentTime = $mailJob->isCompleted() ? date('Y-m-d H:i:s', time()) : null;
$query = $this->getConnection()->getInstance()->prepare($sql);
$query->bindValue(':id', $mailJob->getId(), PDO::PARAM_INT);
$query->bindValue(':attempt', $mailJob->getAttempt(), PDO::PARAM_INT);
$query->bindValue(':state', $mailJob->getState());
$query->bindValue(':timeToSend', $mailJob->getTimeToSend());
$query->bindValue(':sentTime', $sentTime);
$sql = sprintf($sqlText, $this->tableName);
$sentTime = $mailJob->isCompleted() ? date('Y-m-d H:i:s', time()) : null;
$query = $this->getConnection()->getInstance()->prepare($sql);
$query->bindValue(':id', $mailJob->getId(), PDO::PARAM_INT);
$query->bindValue(':attempt', $mailJob->getAttempt(), PDO::PARAM_INT);
$query->bindValue(':state', $mailJob->getState());
$query->bindValue(':timeToSend', $mailJob->getTimeToSend());
$query->bindValue(':sentTime', $sentTime);
}

return $query->execute();
}

Expand All @@ -138,4 +147,5 @@ public function isEmpty()
$query->execute();
return intval($query->fetchColumn(0)) === 0;
}

}
1 change: 1 addition & 0 deletions src/Queue/Backend/QueueStoreAdapterInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,5 @@ public function ack(MailJobInterface $mailJob);
* @return bool
*/
public function isEmpty();

}
6 changes: 5 additions & 1 deletion src/Queue/Backend/RabbitMq/RabbitMqQueueStoreAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,10 @@ public function ack(MailJobInterface $mailJob)
{
/** @var AMQPChannel $chanel */
$chanel = $this->getConnection()->getInstance();
if ($mailJob->isCompleted()) {

$mailJob->incrementAttempt();

if ($mailJob->isCompleted() || $mailJob->getAttempt() > $_ENV['MAX_ATTEMPTS_DEFAULT']) {
$chanel->basic_ack($mailJob->getDeliveryTag(), false);
return;
}
Expand Down Expand Up @@ -144,4 +147,5 @@ protected function createPayload(MailJobInterface $mailJob)
'delivery_tag' => null,
]);
}

}
9 changes: 8 additions & 1 deletion src/Queue/Backend/Redis/RedisQueueStoreAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,18 @@ public function ack(MailJobInterface $mailJob)
}

$this->removeReserved($mailJob);

if (!$mailJob->isCompleted()) {
if ($mailJob->getTimeToSend() === null || $mailJob->getTimeToSend() < time()) {
$mailJob->setTimeToSend(time() + $this->expireTime);
}
$this->enqueue($mailJob);

$mailJob->incrementAttempt();
if ($mailJob->getAttempt() <= $_ENV['MAX_ATTEMPTS_DEFAULT']){
$this->enqueue($mailJob);
}
}

}

/**
Expand Down Expand Up @@ -214,4 +220,5 @@ protected function pushExpiredJobsOntoNewQueue($transaction, $to, $jobs)
{
call_user_func_array([$transaction, 'rpush'], array_merge([$to], $jobs));
}

}
6 changes: 6 additions & 0 deletions src/Queue/Backend/Sqs/SqsMailJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ class SqsMailJob extends MailJob
public function __construct(array $config = [])
{
parent::__construct($config);

if ($this->id){
$messageBody = json_decode($this->getMessage(), true);
$this->setAttempt($messageBody['attempt']);
$this->setMessage($messageBody['message']);
}
}

/**
Expand Down
40 changes: 26 additions & 14 deletions src/Queue/Backend/Sqs/SqsQueueStoreAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

class SqsQueueStoreAdapter implements QueueStoreAdapterInterface
{

const DELAY_SECONDS = 10;
/**
* @var string the name of the queue to store the messages
*/
Expand Down Expand Up @@ -64,9 +66,8 @@ public function enqueue(MailJobInterface $mailJob)
{
$result = $this->getConnection()->getInstance()->sendMessage([
'QueueUrl' => $this->queueUrl,
'MessageBody' => $mailJob->getMessage(),
'DelaySeconds' => $mailJob->getDelaySeconds(),
'Attempt' => $mailJob->getAttempt(),
'MessageBody' => json_encode(['message' => $mailJob->getMessage(), 'attempt' => $mailJob->getAttempt()]),
'DelaySeconds' => $mailJob->getDelaySeconds() ?? self::DELAY_SECONDS,
]);
$messageId = $result['MessageId'];
return $messageId !== null && is_string($messageId);
Expand All @@ -91,7 +92,7 @@ public function dequeue()
'id' => $result['MessageId'],
'receiptHandle' => $result['ReceiptHandle'],
'message' => $result['Body'],
'attempt' => $result['Attempt'],
// 'attempt' => $result['Attempt'],
]);
}

Expand All @@ -106,18 +107,28 @@ public function ack(MailJobInterface $mailJob)
throw new InvalidCallException('SqsMailJob cannot be a new object to be acknowledged');
}

if ($mailJob->getDeleted()) {
$this->getConnection()->getInstance()->deleteMessage([
'QueueUrl' => $this->queueUrl,
'ReceiptHandle' => $mailJob->getReceiptHandle(),
]);
return true;
} elseif ($mailJob->getVisibilityTimeout() !== null) {
$this->getConnection()->getInstance()->changeMessageVisibility([
$mailJob->incrementAttempt();

$this->getConnection()->getInstance()->deleteMessage([
'QueueUrl' => $this->queueUrl,
'ReceiptHandle' => $mailJob->getReceiptHandle(),
]);

if (!$mailJob->getDeleted() && $mailJob->getAttempt() <= $_ENV['MAX_ATTEMPTS_DEFAULT']) {
$this->getConnection()->getInstance()->sendMessage([
'QueueUrl' => $this->queueUrl,
'ReceiptHandle' => $mailJob->getReceiptHandle(),
'VisibilityTimeout' => $mailJob->getVisibilityTimeout(),
'MessageBody' => json_encode(['message' => $mailJob->getMessage(), 'attempt' => $mailJob->getAttempt()]),
'DelaySeconds' => $mailJob->getDelaySeconds() ?? self::DELAY_SECONDS,
]);

if($mailJob->getVisibilityTimeout() !== null) {
$this->getConnection()->getInstance()->changeMessageVisibility([
'QueueUrl' => $this->queueUrl,
'ReceiptHandle' => $mailJob->getReceiptHandle(),
'VisibilityTimeout' => $mailJob->getVisibilityTimeout(),
]);
}

return true;
}

Expand All @@ -135,4 +146,5 @@ public function isEmpty(): bool
]);
return $response['Attributes']['ApproximateNumberOfMessages'] === 0;
}

}
6 changes: 4 additions & 2 deletions src/Queue/Backend/Sqs/SqsQueueStoreConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ public function connect()
$secret = $this->getConfigurationValue('secret');
$region = $this->getConfigurationValue('region');
$this->instance = new SqsClient([
'key' => $key,
'secret' => $secret,
'credentials' => [
'key' => $key,
'secret' => $secret,
],
'region' => $region,
]);
return $this;
Expand Down
Loading