Skip to content
This repository has been archived by the owner on Nov 30, 2021. It is now read-only.

Add support for reply_to parameter #121

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
6 changes: 6 additions & 0 deletions src/AbstractAMQPConnector.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@
* Abstract base class
* @package celery-php
*/

require_once __DIR__ . "/PECLAMQPConnector.php";
require_once __DIR__ . "/AMQPLibConnector.php";
require_once __DIR__ . "/AMQPLibConnectorSsl.php";
require_once __DIR__ . "/RedisConnector.php";

abstract class AbstractAMQPConnector
{
/**
Expand Down
7 changes: 5 additions & 2 deletions src/Celery.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@

namespace Celery;

require_once __DIR__ . "/CeleryAbstract.php";

/**
* Simple client for a Celery server
*
Expand All @@ -64,7 +66,7 @@ class Celery extends CeleryAbstract
* @param int result_expire Expire time for result queue, milliseconds (for AMQP exchanges only)
* @param array ssl_options Used only for 'php-amqplib-ssl' connections, an associative array with values as defined here: http://php.net/manual/en/context.ssl.php
*/
public function __construct($host, $login, $password, $vhost, $exchange='celery', $binding='celery', $port=5672, $connector=false, $result_expire=0, $ssl_options=[])
public function __construct($host, $login, $password, $vhost, $exchange='celery', $binding='celery', $port=5672, $connector=false, $result_expire=0, $ssl_options=[], $reply_to="celeryresults")
{
$broker_connection = [
'host' => $host,
Expand All @@ -76,7 +78,8 @@ public function __construct($host, $login, $password, $vhost, $exchange='celery'
'port' => $port,
'connector' => $connector,
'result_expire' => $result_expire,
'ssl_options' => $ssl_options
'ssl_options' => $ssl_options,
'reply_to' => $reply_to
];
$backend_connection = $broker_connection;

Expand Down
11 changes: 9 additions & 2 deletions src/CeleryAbstract.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

use PhpAmqpLib\Exception\AMQPProtocolConnectionException;

require_once __DIR__ . "/AbstractAMQPConnector.php";
require_once __DIR__ . "/AsyncResult.php";
require_once __DIR__ . "/CeleryConnectionException.php";

/**
* Client for a Celery server - abstract base class implementing actual logic
* @package celery-php
Expand All @@ -20,13 +24,15 @@ abstract class CeleryAbstract

private $isConnected = false;

public function getBackendConnection() { return $this->backend_connection;}

private function SetDefaultValues($details)
{
$defaultValues = ["host" => "", "login" => "", "password" => "", "vhost" => "", "exchange" => "celery", "binding" => "celery", "port" => 5672, "connector" => false, "persistent_messages" => false, "result_expire" => 0, "ssl_options" => []];
$defaultValues = ["host" => "", "login" => "", "password" => "", "vhost" => "", "exchange" => "celery", "binding" => "celery", "port" => 5672, "connector" => false, "persistent_messages" => false, "result_expire" => 0, "ssl_options" => [], "reply_to" => "celeryresults"];

$returnValue = [];

foreach (['host', 'login', 'password', 'vhost', 'exchange', 'binding', 'port', 'connector', 'persistent_messages', 'result_expire', 'ssl_options'] as $detail) {
foreach (['host', 'login', 'password', 'vhost', 'exchange', 'binding', 'port', 'connector', 'persistent_messages', 'result_expire', 'ssl_options', 'reply_to'] as $detail) {
if (!array_key_exists($detail, $details)) {
$returnValue[$detail] = $defaultValues[$detail];
} else {
Expand Down Expand Up @@ -131,6 +137,7 @@ public function PostTask($task, $args, $async_result=true, $routing_key="celery"
'content_type' => 'application/json',
'content_encoding' => 'UTF-8',
'immediate' => false,
'reply_to' => $this->broker_connection_details['reply_to']
];

if ($this->broker_connection_details['persistent_messages']) {
Expand Down
2 changes: 2 additions & 0 deletions src/CeleryConnectionException.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

namespace Celery;

require_once __DIR__ . "/CeleryException.php";

/**
* Emited by CeleryAbstract::PostTask() connection failures etc
* @package celery-php
Expand Down