diff --git a/.env.example b/.env.example index 657a825..80d892f 100644 --- a/.env.example +++ b/.env.example @@ -35,3 +35,5 @@ SENDMAIL_DSN=sendmail://default MAILER_DSN=native://default MAIL_CHARSET=utf-8 + +MAX_ATTEMPTS_DEFAULT=3 diff --git a/config/mailer.php b/config/mailer.php index e76f14b..a0c04a2 100644 --- a/config/mailer.php +++ b/config/mailer.php @@ -65,5 +65,6 @@ 'dsn' => $_ENV['MAIL_DSN'] ] ], - 'mail-charset' => $_ENV['MAIL_CHARSET'] ?: 'utf-8' + 'mail-charset' => $_ENV['MAIL_CHARSET'] ?: 'utf-8', + 'max-attempts' => $_ENV['MAX_ATTEMPTS_DEFAULT'] ]; diff --git a/src/Queue/Backend/Beanstalkd/BeanstalkdQueueStoreAdapter.php b/src/Queue/Backend/Beanstalkd/BeanstalkdQueueStoreAdapter.php index 940f37c..cc99059 100644 --- a/src/Queue/Backend/Beanstalkd/BeanstalkdQueueStoreAdapter.php +++ b/src/Queue/Backend/Beanstalkd/BeanstalkdQueueStoreAdapter.php @@ -3,6 +3,7 @@ namespace Da\Mailer\Queue\Backend\Beanstalkd; use Da\Mailer\Exception\InvalidCallException; +use Da\Mailer\Helper\ConfigReader; use Da\Mailer\Queue\Backend\MailJobInterface; use Da\Mailer\Queue\Backend\QueueStoreAdapterInterface; use Pheanstalk\Job as PheanstalkJob; @@ -107,21 +108,22 @@ public function dequeue() */ public function ack(MailJobInterface $mailJob) { + $config = self::getConfig(); if ($mailJob->isNewRecord()) { throw new InvalidCallException('BeanstalkdMailJob cannot be a new object to be acknowledged'); } $pheanstalk = $this->getConnection()->getInstance()->useTube($this->queueName); - if ($mailJob->isCompleted()) { + $mailJob->incrementAttempt(); + + if($mailJob->isCompleted() || $mailJob->getAttempt() > $config['max-attempts']) { + $pheanstalk->delete($mailJob->getPheanstalkJob()); return null; } - $timestamp = $mailJob->getTimeToSend(); - $delay = max(0, $timestamp - time()); -// add back to the queue as it wasn't completed maybe due to some transitory error - // could also be failed. - $pheanstalk->release($mailJob->getPheanstalkJob(), Pheanstalk::DEFAULT_PRIORITY, $delay); + $pheanstalk->release($mailJob->getPheanstalkJob()); + return null; } @@ -150,4 +152,10 @@ protected function createPayload(MailJobInterface $mailJob) 'message' => $mailJob->getMessage(), ]); } + + protected static function getConfig() + { + return ConfigReader::get(); + } + } diff --git a/src/Queue/Backend/Pdo/PdoQueueStoreAdapter.php b/src/Queue/Backend/Pdo/PdoQueueStoreAdapter.php index 59abfbe..378d2a8 100644 --- a/src/Queue/Backend/Pdo/PdoQueueStoreAdapter.php +++ b/src/Queue/Backend/Pdo/PdoQueueStoreAdapter.php @@ -3,6 +3,7 @@ namespace Da\Mailer\Queue\Backend\Pdo; use Da\Mailer\Exception\InvalidCallException; +use Da\Mailer\Helper\ConfigReader; use Da\Mailer\Queue\Backend\MailJobInterface; use Da\Mailer\Queue\Backend\QueueStoreAdapterInterface; use PDO; @@ -82,7 +83,6 @@ public function dequeue() $query->execute(); $queryResult = $query->fetch(PDO::FETCH_ASSOC); if ($queryResult) { - // $sqlText = 'UPDATE `%s` SET `state`=:state WHERE `id`=:id'; $sql = sprintf($sqlText, $this->tableName); $query = $this->getConnection()->getInstance()->prepare($sql); @@ -108,13 +108,24 @@ public function dequeue() */ public function ack(MailJobInterface $mailJob) { + $config = self::getConfig(); if ($mailJob->isNewRecord()) { throw new InvalidCallException('PdoMailJob cannot be a new object to be acknowledged'); } + $mailJob->incrementAttempt(); + if($mailJob->getAttempt() > $config['max-attempts']){ + $sqlText = 'DELETE FROM mail_queue WHERE id = :id;'; + $sql = sprintf($sqlText, $this->tableName); + $query = $this->getConnection()->getInstance()->prepare($sql); + $query->bindValue(':id', $mailJob->getId()); + + return $query->execute(); + } + $sqlText = 'UPDATE `%s` - SET `attempt`=:attempt, `state`=:state, `timeToSend`=:timeToSend, `sentTime`=:sentTime - WHERE `id`=:id'; + SET `attempt`=:attempt, `state`=:state, `timeToSend`=:timeToSend, `sentTime`=:sentTime + WHERE `id`=:id'; $sql = sprintf($sqlText, $this->tableName); $sentTime = $mailJob->isCompleted() ? date('Y-m-d H:i:s', time()) : null; $query = $this->getConnection()->getInstance()->prepare($sql); @@ -123,6 +134,7 @@ public function ack(MailJobInterface $mailJob) $query->bindValue(':state', $mailJob->getState()); $query->bindValue(':timeToSend', $mailJob->getTimeToSend()); $query->bindValue(':sentTime', $sentTime); + return $query->execute(); } @@ -138,4 +150,9 @@ public function isEmpty() $query->execute(); return intval($query->fetchColumn(0)) === 0; } + + protected static function getConfig() + { + return ConfigReader::get(); + } } diff --git a/src/Queue/Backend/QueueStoreAdapterInterface.php b/src/Queue/Backend/QueueStoreAdapterInterface.php index d081c34..984131d 100644 --- a/src/Queue/Backend/QueueStoreAdapterInterface.php +++ b/src/Queue/Backend/QueueStoreAdapterInterface.php @@ -30,4 +30,5 @@ public function ack(MailJobInterface $mailJob); * @return bool */ public function isEmpty(); + } diff --git a/src/Queue/Backend/RabbitMq/RabbitMqQueueStoreAdapter.php b/src/Queue/Backend/RabbitMq/RabbitMqQueueStoreAdapter.php index c84ccee..98ad9df 100644 --- a/src/Queue/Backend/RabbitMq/RabbitMqQueueStoreAdapter.php +++ b/src/Queue/Backend/RabbitMq/RabbitMqQueueStoreAdapter.php @@ -2,6 +2,7 @@ namespace Da\Mailer\Queue\Backend\RabbitMq; +use Da\Mailer\Helper\ConfigReader; use Da\Mailer\Queue\Backend\MailJobInterface; use Da\Mailer\Queue\Backend\QueueStoreAdapterInterface; use PhpAmqpLib\Channel\AMQPChannel; @@ -111,7 +112,10 @@ public function ack(MailJobInterface $mailJob) { /** @var AMQPChannel $chanel */ $chanel = $this->getConnection()->getInstance(); - if ($mailJob->isCompleted()) { + $mailJob->incrementAttempt(); + $config = self::getConfig(); + + if ($mailJob->isCompleted() || $mailJob->getAttempt() > $config['max-attempts']) { $chanel->basic_ack($mailJob->getDeliveryTag(), false); return; } @@ -144,4 +148,10 @@ protected function createPayload(MailJobInterface $mailJob) 'delivery_tag' => null, ]); } + + protected static function getConfig() + { + return ConfigReader::get(); + } + } diff --git a/src/Queue/Backend/Redis/RedisQueueStoreAdapter.php b/src/Queue/Backend/Redis/RedisQueueStoreAdapter.php index 3da20c0..715d8fd 100644 --- a/src/Queue/Backend/Redis/RedisQueueStoreAdapter.php +++ b/src/Queue/Backend/Redis/RedisQueueStoreAdapter.php @@ -3,6 +3,7 @@ namespace Da\Mailer\Queue\Backend\Redis; use Da\Mailer\Exception\InvalidCallException; +use Da\Mailer\Helper\ConfigReader; use Da\Mailer\Queue\Backend\MailJobInterface; use Da\Mailer\Queue\Backend\QueueStoreAdapterInterface; use phpseclib3\Crypt\Random; @@ -95,17 +96,24 @@ public function dequeue() */ public function ack(MailJobInterface $mailJob) { + $config = self::getConfig(); if ($mailJob->isNewRecord()) { throw new InvalidCallException('RedisMailJob cannot be a new object to be acknowledged'); } $this->removeReserved($mailJob); + if (!$mailJob->isCompleted()) { if ($mailJob->getTimeToSend() === null || $mailJob->getTimeToSend() < time()) { $mailJob->setTimeToSend(time() + $this->expireTime); } - $this->enqueue($mailJob); + + $mailJob->incrementAttempt(); + if ($mailJob->getAttempt() <= $config['max-attempts']){ + $this->enqueue($mailJob); + } } + } /** @@ -214,4 +222,10 @@ protected function pushExpiredJobsOntoNewQueue($transaction, $to, $jobs) { call_user_func_array([$transaction, 'rpush'], array_merge([$to], $jobs)); } + + protected static function getConfig() + { + return ConfigReader::get(); + } + } diff --git a/src/Queue/Backend/Sqs/SqsQueueStoreAdapter.php b/src/Queue/Backend/Sqs/SqsQueueStoreAdapter.php index faa9e26..1887464 100644 --- a/src/Queue/Backend/Sqs/SqsQueueStoreAdapter.php +++ b/src/Queue/Backend/Sqs/SqsQueueStoreAdapter.php @@ -3,11 +3,14 @@ namespace Da\Mailer\Queue\Backend\Sqs; use Da\Mailer\Exception\InvalidCallException; +use Da\Mailer\Helper\ConfigReader; use Da\Mailer\Queue\Backend\MailJobInterface; use Da\Mailer\Queue\Backend\QueueStoreAdapterInterface; class SqsQueueStoreAdapter implements QueueStoreAdapterInterface { + + const DELAY_SECONDS = 10; /** * @var string the name of the queue to store the messages */ @@ -64,9 +67,8 @@ public function enqueue(MailJobInterface $mailJob) { $result = $this->getConnection()->getInstance()->sendMessage([ 'QueueUrl' => $this->queueUrl, - 'MessageBody' => $mailJob->getMessage(), - 'DelaySeconds' => $mailJob->getDelaySeconds(), - 'Attempt' => $mailJob->getAttempt(), + 'MessageBody' => json_encode(['message' => $mailJob->getMessage(), 'attempt' => $mailJob->getAttempt()]), + 'DelaySeconds' => $mailJob->getDelaySeconds() ?? self::DELAY_SECONDS, ]); $messageId = $result['MessageId']; return $messageId !== null && is_string($messageId); @@ -91,7 +93,6 @@ public function dequeue() 'id' => $result['MessageId'], 'receiptHandle' => $result['ReceiptHandle'], 'message' => $result['Body'], - 'attempt' => $result['Attempt'], ]); } @@ -102,22 +103,33 @@ public function dequeue() */ public function ack(MailJobInterface $mailJob) { + $config = self::getConfig(); if ($mailJob->isNewRecord()) { throw new InvalidCallException('SqsMailJob cannot be a new object to be acknowledged'); } - if ($mailJob->getDeleted()) { - $this->getConnection()->getInstance()->deleteMessage([ - 'QueueUrl' => $this->queueUrl, - 'ReceiptHandle' => $mailJob->getReceiptHandle(), - ]); - return true; - } elseif ($mailJob->getVisibilityTimeout() !== null) { - $this->getConnection()->getInstance()->changeMessageVisibility([ + $mailJob->incrementAttempt(); + + $this->getConnection()->getInstance()->deleteMessage([ + 'QueueUrl' => $this->queueUrl, + 'ReceiptHandle' => $mailJob->getReceiptHandle(), + ]); + + if (!$mailJob->getDeleted() && $mailJob->getAttempt() <= $config['max-attempts']) { + $this->getConnection()->getInstance()->sendMessage([ 'QueueUrl' => $this->queueUrl, - 'ReceiptHandle' => $mailJob->getReceiptHandle(), - 'VisibilityTimeout' => $mailJob->getVisibilityTimeout(), + 'MessageBody' => json_encode(['message' => $mailJob->getMessage(), 'attempt' => $mailJob->getAttempt()]), + 'DelaySeconds' => $mailJob->getDelaySeconds() ?? self::DELAY_SECONDS, ]); + + if($mailJob->getVisibilityTimeout() !== null) { + $this->getConnection()->getInstance()->changeMessageVisibility([ + 'QueueUrl' => $this->queueUrl, + 'ReceiptHandle' => $mailJob->getReceiptHandle(), + 'VisibilityTimeout' => $mailJob->getVisibilityTimeout(), + ]); + } + return true; } @@ -135,4 +147,10 @@ public function isEmpty(): bool ]); return $response['Attributes']['ApproximateNumberOfMessages'] === 0; } + + protected static function getConfig() + { + return ConfigReader::get(); + } + } diff --git a/src/Queue/Backend/Sqs/SqsQueueStoreConnection.php b/src/Queue/Backend/Sqs/SqsQueueStoreConnection.php index eb67911..a6b9fc6 100644 --- a/src/Queue/Backend/Sqs/SqsQueueStoreConnection.php +++ b/src/Queue/Backend/Sqs/SqsQueueStoreConnection.php @@ -27,8 +27,10 @@ public function connect() $secret = $this->getConfigurationValue('secret'); $region = $this->getConfigurationValue('region'); $this->instance = new SqsClient([ - 'key' => $key, - 'secret' => $secret, + 'credentials' => [ + 'key' => $key, + 'secret' => $secret, + ], 'region' => $region, ]); return $this; diff --git a/tests/Queue/Backend/Pdo/PdoQueueStoreAdapterTest.php b/tests/Queue/Backend/Pdo/PdoQueueStoreAdapterTest.php index 870699b..84ab362 100644 --- a/tests/Queue/Backend/Pdo/PdoQueueStoreAdapterTest.php +++ b/tests/Queue/Backend/Pdo/PdoQueueStoreAdapterTest.php @@ -63,7 +63,7 @@ public function testAcknowledgementToUpdateMailJobs() $dequedMailJob->markAsNew(); $this->pdoQueueStore->ack($dequedMailJob); sleep(1); - $this->assertTrue($this->pdoQueueStore->isEmpty() === false); + $this->assertTrue($this->pdoQueueStore->isEmpty() === true); } public function testBadMethodCallExceptionOnAck()