diff --git a/src/AbstractAMQPConnector.php b/src/AbstractAMQPConnector.php index d90c314..3ef02b4 100644 --- a/src/AbstractAMQPConnector.php +++ b/src/AbstractAMQPConnector.php @@ -7,6 +7,12 @@ * Abstract base class * @package celery-php */ + +require_once __DIR__ . "/PECLAMQPConnector.php"; +require_once __DIR__ . "/AMQPLibConnector.php"; +require_once __DIR__ . "/AMQPLibConnectorSsl.php"; +require_once __DIR__ . "/RedisConnector.php"; + abstract class AbstractAMQPConnector { /** diff --git a/src/Celery.php b/src/Celery.php index f16bd50..5037258 100644 --- a/src/Celery.php +++ b/src/Celery.php @@ -43,6 +43,8 @@ namespace Celery; +require_once __DIR__ . "/CeleryAbstract.php"; + /** * Simple client for a Celery server * @@ -64,7 +66,7 @@ class Celery extends CeleryAbstract * @param int result_expire Expire time for result queue, milliseconds (for AMQP exchanges only) * @param array ssl_options Used only for 'php-amqplib-ssl' connections, an associative array with values as defined here: http://php.net/manual/en/context.ssl.php */ - public function __construct($host, $login, $password, $vhost, $exchange='celery', $binding='celery', $port=5672, $connector=false, $result_expire=0, $ssl_options=[]) + public function __construct($host, $login, $password, $vhost, $exchange='celery', $binding='celery', $port=5672, $connector=false, $result_expire=0, $ssl_options=[], $reply_to="celeryresults") { $broker_connection = [ 'host' => $host, @@ -76,7 +78,8 @@ public function __construct($host, $login, $password, $vhost, $exchange='celery' 'port' => $port, 'connector' => $connector, 'result_expire' => $result_expire, - 'ssl_options' => $ssl_options + 'ssl_options' => $ssl_options, + 'reply_to' => $reply_to ]; $backend_connection = $broker_connection; diff --git a/src/CeleryAbstract.php b/src/CeleryAbstract.php index 019cd6b..cc59262 100644 --- a/src/CeleryAbstract.php +++ b/src/CeleryAbstract.php @@ -4,6 +4,10 @@ use PhpAmqpLib\Exception\AMQPProtocolConnectionException; +require_once __DIR__ . "/AbstractAMQPConnector.php"; +require_once __DIR__ . "/AsyncResult.php"; +require_once __DIR__ . "/CeleryConnectionException.php"; + /** * Client for a Celery server - abstract base class implementing actual logic * @package celery-php @@ -20,13 +24,15 @@ abstract class CeleryAbstract private $isConnected = false; + public function getBackendConnection() { return $this->backend_connection;} + private function SetDefaultValues($details) { - $defaultValues = ["host" => "", "login" => "", "password" => "", "vhost" => "", "exchange" => "celery", "binding" => "celery", "port" => 5672, "connector" => false, "persistent_messages" => false, "result_expire" => 0, "ssl_options" => []]; + $defaultValues = ["host" => "", "login" => "", "password" => "", "vhost" => "", "exchange" => "celery", "binding" => "celery", "port" => 5672, "connector" => false, "persistent_messages" => false, "result_expire" => 0, "ssl_options" => [], "reply_to" => "celeryresults"]; $returnValue = []; - foreach (['host', 'login', 'password', 'vhost', 'exchange', 'binding', 'port', 'connector', 'persistent_messages', 'result_expire', 'ssl_options'] as $detail) { + foreach (['host', 'login', 'password', 'vhost', 'exchange', 'binding', 'port', 'connector', 'persistent_messages', 'result_expire', 'ssl_options', 'reply_to'] as $detail) { if (!array_key_exists($detail, $details)) { $returnValue[$detail] = $defaultValues[$detail]; } else { @@ -131,6 +137,7 @@ public function PostTask($task, $args, $async_result=true, $routing_key="celery" 'content_type' => 'application/json', 'content_encoding' => 'UTF-8', 'immediate' => false, + 'reply_to' => $this->broker_connection_details['reply_to'] ]; if ($this->broker_connection_details['persistent_messages']) { diff --git a/src/CeleryConnectionException.php b/src/CeleryConnectionException.php index a03ed3a..c15ec3b 100644 --- a/src/CeleryConnectionException.php +++ b/src/CeleryConnectionException.php @@ -2,6 +2,8 @@ namespace Celery; +require_once __DIR__ . "/CeleryException.php"; + /** * Emited by CeleryAbstract::PostTask() connection failures etc * @package celery-php