From 07ed3bab495862f62c8385562d00c9914b247cd1 Mon Sep 17 00:00:00 2001 From: Dmitry Tomin Date: Thu, 26 Apr 2018 12:16:59 +0300 Subject: [PATCH 01/24] Mongo db transport --- bin/test | 1 + composer.json | 5 + docker-compose.yml | 9 +- pkg/mongodb/.gitignore | 6 + pkg/mongodb/MongodbConnectionFactory.php | 101 ++++++ pkg/mongodb/MongodbConsumer.php | 174 ++++++++++ pkg/mongodb/MongodbContext.php | 102 ++++++ pkg/mongodb/MongodbDestination.php | 38 +++ pkg/mongodb/MongodbMessage.php | 318 ++++++++++++++++++ pkg/mongodb/MongodbProducer.php | 181 ++++++++++ .../Tests/Functional/MongodbConsumerTest.php | 104 ++++++ .../Tests/MongodbConnectionFactoryTest.php | 52 +++ pkg/mongodb/Tests/MongodbConsumerTest.php | 209 ++++++++++++ pkg/mongodb/Tests/MongodbContextTest.php | 166 +++++++++ pkg/mongodb/Tests/MongodbDestinationTest.php | 37 ++ pkg/mongodb/Tests/MongodbMessageTest.php | 91 +++++ pkg/mongodb/Tests/MongodbProducerTest.php | 66 ++++ .../Tests/Spec/CreateMongodbContextTrait.php | 21 ++ .../Spec/MongodbConnectionFactoryTest.php | 17 + pkg/mongodb/Tests/Spec/MongodbContextTest.php | 21 ++ pkg/mongodb/Tests/Spec/MongodbMessageTest.php | 17 + .../Tests/Spec/MongodbProducerTest.php | 21 ++ pkg/mongodb/Tests/Spec/MongodbQueueTest.php | 17 + .../Tests/Spec/MongodbRequeueMessageTest.php | 21 ++ ...dAndReceiveDelayedMessageFromQueueTest.php | 21 ++ ...ndReceivePriorityMessagesFromQueueTest.php | 51 +++ ...ReceiveTimeToLiveMessagesFromQueueTest.php | 21 ++ .../MongodbSendToAndReceiveFromQueueTest.php | 21 ++ .../MongodbSendToAndReceiveFromTopicTest.php | 21 ++ ...odbSendToAndReceiveNoWaitFromQueueTest.php | 21 ++ ...odbSendToAndReceiveNoWaitFromTopicTest.php | 21 ++ pkg/mongodb/Tests/Spec/MongodbTopicTest.php | 17 + pkg/mongodb/composer.json | 48 +++ 33 files changed, 2036 insertions(+), 1 deletion(-) create mode 100644 pkg/mongodb/.gitignore create mode 100644 pkg/mongodb/MongodbConnectionFactory.php create mode 100644 pkg/mongodb/MongodbConsumer.php create mode 100644 pkg/mongodb/MongodbContext.php create mode 100644 pkg/mongodb/MongodbDestination.php create mode 100644 pkg/mongodb/MongodbMessage.php create mode 100644 pkg/mongodb/MongodbProducer.php create mode 100644 pkg/mongodb/Tests/Functional/MongodbConsumerTest.php create mode 100644 pkg/mongodb/Tests/MongodbConnectionFactoryTest.php create mode 100644 pkg/mongodb/Tests/MongodbConsumerTest.php create mode 100644 pkg/mongodb/Tests/MongodbContextTest.php create mode 100644 pkg/mongodb/Tests/MongodbDestinationTest.php create mode 100644 pkg/mongodb/Tests/MongodbMessageTest.php create mode 100644 pkg/mongodb/Tests/MongodbProducerTest.php create mode 100644 pkg/mongodb/Tests/Spec/CreateMongodbContextTrait.php create mode 100644 pkg/mongodb/Tests/Spec/MongodbConnectionFactoryTest.php create mode 100644 pkg/mongodb/Tests/Spec/MongodbContextTest.php create mode 100644 pkg/mongodb/Tests/Spec/MongodbMessageTest.php create mode 100644 pkg/mongodb/Tests/Spec/MongodbProducerTest.php create mode 100644 pkg/mongodb/Tests/Spec/MongodbQueueTest.php create mode 100644 pkg/mongodb/Tests/Spec/MongodbRequeueMessageTest.php create mode 100644 pkg/mongodb/Tests/Spec/MongodbSendAndReceiveDelayedMessageFromQueueTest.php create mode 100644 pkg/mongodb/Tests/Spec/MongodbSendAndReceivePriorityMessagesFromQueueTest.php create mode 100644 pkg/mongodb/Tests/Spec/MongodbSendAndReceiveTimeToLiveMessagesFromQueueTest.php create mode 100644 pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveFromQueueTest.php create mode 100644 pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveFromTopicTest.php create mode 100644 pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveNoWaitFromQueueTest.php create mode 100644 pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveNoWaitFromTopicTest.php create mode 100644 pkg/mongodb/Tests/Spec/MongodbTopicTest.php create mode 100644 pkg/mongodb/composer.json diff --git a/bin/test b/bin/test index 7a91fb8c3..6607d84c9 100755 --- a/bin/test +++ b/bin/test @@ -36,6 +36,7 @@ waitForService redis 6379 50 waitForService beanstalkd 11300 50 waitForService gearmand 4730 50 waitForService kafka 9092 50 +waitForService mongo 27017 50 php pkg/job-queue/Tests/Functional/app/console doctrine:database:create --if-not-exists php pkg/job-queue/Tests/Functional/app/console doctrine:schema:update --force diff --git a/composer.json b/composer.json index 35777310d..544c884d9 100644 --- a/composer.json +++ b/composer.json @@ -16,6 +16,7 @@ "enqueue/fs": "*@dev", "enqueue/null": "*@dev", "enqueue/dbal": "*@dev", + "enqueue/mongodb": "*@dev", "enqueue/sqs": "*@dev", "enqueue/pheanstalk": "*@dev", "enqueue/gearman": "*@dev", @@ -143,6 +144,10 @@ { "type": "path", "url": "pkg/async-event-dispatcher" + }, + { + "type": "path", + "url": "pkg/mongodb" } ] } diff --git a/docker-compose.yml b/docker-compose.yml index 01be96cee..f004f825d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -13,6 +13,7 @@ services: - zookeeper - google-pubsub - rabbitmqssl + - mongo volumes: - './:/mqdev' environment: @@ -24,7 +25,7 @@ services: - RABBITMQ_PASSWORD=guest - RABBITMQ_VHOST=mqdev - RABBITMQ_AMQP__PORT=5672 - - RABBITMQ_STOMP_PORT=61613 + - RABBITMQ_STOMP_PORT=61613 - DOCTRINE_DRIVER=pdo_mysql - DOCTRINE_HOST=mysql - DOCTRINE_PORT=3306 @@ -44,6 +45,7 @@ services: - RDKAFKA_PORT=9092 - PUBSUB_EMULATOR_HOST=http://google-pubsub:8085 - GCLOUD_PROJECT=mqdev + - MONGO_DSN=mongodb://mongo rabbitmq: image: 'enqueue/rabbitmq:latest' @@ -102,6 +104,11 @@ services: image: 'google/cloud-sdk:latest' entrypoint: 'gcloud beta emulators pubsub start --host-port=0.0.0.0:8085' + mongo: + image: mongo + ports: + - "27017:27017" + volumes: mysql-data: driver: local diff --git a/pkg/mongodb/.gitignore b/pkg/mongodb/.gitignore new file mode 100644 index 000000000..a770439e5 --- /dev/null +++ b/pkg/mongodb/.gitignore @@ -0,0 +1,6 @@ +*~ +/composer.lock +/composer.phar +/phpunit.xml +/vendor/ +/.idea/ diff --git a/pkg/mongodb/MongodbConnectionFactory.php b/pkg/mongodb/MongodbConnectionFactory.php new file mode 100644 index 000000000..0e4e75023 --- /dev/null +++ b/pkg/mongodb/MongodbConnectionFactory.php @@ -0,0 +1,101 @@ + 'mongodb://127.0.0.1/' - Mongodb connection string. see http://docs.mongodb.org/manual/reference/connection-string/ + * 'dbname' => 'enqueue', - database name. + * 'collection_name' => 'enqueue' - collection name + * 'polling_interval' => '1000', - How often query for new messages (milliseconds) + * ] + * + * or + * + * mongodb://127.0.0.1:27017/dbname/collection_name?polling_interval=1000 + * + * @param array|string|null $config + */ + public function __construct($config = 'mongodb:') + { + if (empty($config)) { + $config = $this->parseDsn('mongodb:'); + } elseif (is_string($config)) { + $config = $this->parseDsn($config); + } elseif (is_array($config)) { + } else { + throw new \LogicException('The config must be either an array of options, a DSN string or null'); + } + $config = array_replace([ + 'uri' => 'mongodb://127.0.0.1/', + ], $config); + + $this->config = $config; + } + + public function createContext() + { + $client = new Client($this->config['uri']); + + return new MongodbContext($client, $this->config); + } + + public static function parseDsn($dsn) + { + $parsedUrl = parse_url($dsn); + if (false === $parsedUrl) { + throw new \LogicException(sprintf('Failed to parse DSN "%s"', $dsn)); + } + if (empty($parsedUrl['scheme'])) { + throw new \LogicException('Schema is empty'); + } + $supported = [ + 'mongodb' => true, + ]; + if (false == isset($parsedUrl['scheme'])) { + throw new \LogicException(sprintf( + 'The given DSN schema "%s" is not supported. There are supported schemes: "%s".', + $parsedUrl['scheme'], + implode('", "', array_keys($supported)) + )); + } + if ('mongodb:' === $dsn) { + return [ + 'uri' => 'mongodb://127.0.0.1/', + ]; + } + $config['uri'] = $dsn; + if (isset($parsedUrl['path']) && '/' !== $parsedUrl['path']) { + $pathParts = explode('/', $parsedUrl['path']); + //DB name + if ($pathParts[1]) { + $config['dbname'] = $pathParts[1]; + } + } + if (isset($parsedUrl['query'])) { + $queryParts = null; + parse_str($parsedUrl['query'], $queryParts); + //get enqueue attributes values + if (!empty($queryParts['polling_interval'])) { + $config['polling_interval'] = $queryParts['polling_interval']; + } + if (!empty($queryParts['enqueue_collection'])) { + $config['collection_name'] = $queryParts['enqueue_collection']; + } + } + + return $config; + } +} diff --git a/pkg/mongodb/MongodbConsumer.php b/pkg/mongodb/MongodbConsumer.php new file mode 100644 index 000000000..4986d662b --- /dev/null +++ b/pkg/mongodb/MongodbConsumer.php @@ -0,0 +1,174 @@ +context = $context; + $this->queue = $queue; + } + + /** + * Set polling interval in milliseconds. + * + * @param int $msec + */ + public function setPollingInterval($msec) + { + $this->pollingInterval = $msec * 1000; + } + + /** + * Get polling interval in milliseconds. + * + * @return int + */ + public function getPollingInterval() + { + return (int) $this->pollingInterval / 1000; + } + + /** + * {@inheritdoc} + * + * @return MongodbDestination + */ + public function getQueue() + { + return $this->queue; + } + + /** + * {@inheritdoc} + * + * @return MongodbMessage|null + */ + public function receive($timeout = 0) + { + $timeout /= 1000; + $startAt = microtime(true); + + while (true) { + $message = $this->receiveMessage(); + + if ($message) { + return $message; + } + + if ($timeout && (microtime(true) - $startAt) >= $timeout) { + return; + } + + usleep($this->pollingInterval); + + if ($timeout && (microtime(true) - $startAt) >= $timeout) { + return; + } + } + } + + /** + * {@inheritdoc} + * + * @return MongodbMessage|null + */ + public function receiveNoWait() + { + return $this->receiveMessage(); + } + + /** + * {@inheritdoc} + * + * @param MongodbMessage $message + */ + public function acknowledge(PsrMessage $message) + { + // does nothing + } + + /** + * {@inheritdoc} + * + * @param MongodbMessage $message + */ + public function reject(PsrMessage $message, $requeue = false) + { + InvalidMessageException::assertMessageInstanceOf($message, MongodbMessage::class); + + if ($requeue) { + $this->context->createProducer()->send($this->queue, $message); + + return; + } + } + + /** + * @return MongodbMessage|null + */ + protected function receiveMessage() + { + $now = time(); + $collection = $this->context->getCollection(); + $message = $collection->findOneAndDelete( + [ + '$or' => [ + ['delayed_until' => ['$exists' => false]], + ['delayed_until' => ['$lte' => $now]], + ], + ], + [ + 'sort' => ['priority' => -1, 'published_at' => 1], + 'typeMap' => ['root' => 'array', 'document' => 'array'], + ] + ); + + if (!$message) { + return null; + } + if (empty($message['time_to_live']) || $message['time_to_live'] > time()) { + return $this->convertMessage($message); + } + } + + /** + * @param array $dbalMessage + * + * @return MongodbMessage + */ + protected function convertMessage(array $mongodbMessage) + { + $message = $this->context->createMessage($mongodbMessage['body'], $mongodbMessage['properties'], $mongodbMessage['headers']); + $message->setId((string) $mongodbMessage['_id']); + $message->setPriority((int) $mongodbMessage['priority']); + $message->setRedelivered((bool) $mongodbMessage['redelivered']); + $message->setPublishedAt((int) $mongodbMessage['published_at']); + + return $message; + } +} diff --git a/pkg/mongodb/MongodbContext.php b/pkg/mongodb/MongodbContext.php new file mode 100644 index 000000000..c8669edd7 --- /dev/null +++ b/pkg/mongodb/MongodbContext.php @@ -0,0 +1,102 @@ +config = array_replace([ + 'dbname' => 'enqueue', + 'collection_name' => 'enqueue', + 'polling_interval' => null, + ], $config); + $this->client = $client; + } + + public function createMessage($body = '', array $properties = [], array $headers = []) + { + $message = new MongodbMessage(); + $message->setBody($body); + $message->setProperties($properties); + $message->setHeaders($headers); + + return $message; + } + + public function createTopic($name) + { + return new MongodbDestination($name); + } + + public function createQueue($queueName) + { + return new MongodbDestination($queueName); + } + + public function createTemporaryQueue() + { + throw new \BadMethodCallException('Mongodb transport does not support temporary queues'); + } + + public function createProducer() + { + return new MongodbProducer($this); + } + + public function createConsumer(PsrDestination $destination) + { + InvalidDestinationException::assertDestinationInstanceOf($destination, MongodbDestination::class); + + $consumer = new MongodbConsumer($this, $destination); + + if (isset($this->config['polling_interval'])) { + $consumer->setPollingInterval($this->config['polling_interval']); + } + + return $consumer; + } + + public function close() + { + // TODO: Implement close() method. + } + + public function getCollection() + { + return $this->client + ->selectDatabase($this->config['dbname']) + ->selectCollection($this->config['collection_name']) + ; + } + + /** + * @return Client + */ + public function getClient() + { + return $this->client; + } + + /** + * @return array + */ + public function getConfig() + { + return $this->config; + } +} diff --git a/pkg/mongodb/MongodbDestination.php b/pkg/mongodb/MongodbDestination.php new file mode 100644 index 000000000..78b07ccb7 --- /dev/null +++ b/pkg/mongodb/MongodbDestination.php @@ -0,0 +1,38 @@ +destinationName = $name; + } + + /** + * {@inheritdoc} + */ + public function getQueueName() + { + return $this->destinationName; + } + + /** + * {@inheritdoc} + */ + public function getTopicName() + { + return $this->destinationName; + } +} diff --git a/pkg/mongodb/MongodbMessage.php b/pkg/mongodb/MongodbMessage.php new file mode 100644 index 000000000..0ef7ce0e0 --- /dev/null +++ b/pkg/mongodb/MongodbMessage.php @@ -0,0 +1,318 @@ +body = $body; + $this->properties = $properties; + $this->headers = $headers; + $this->redelivered = false; + $this->priority = 0; + $this->deliveryDelay = null; + } + + /** + * @param string $id + */ + public function setId($id) + { + $this->id = $id; + } + + /** + * @return string $id + */ + public function getId() + { + return $this->id; + } + + /** + * @param string $body + */ + public function setBody($body) + { + $this->body = $body; + } + + /** + * {@inheritdoc} + */ + public function getBody() + { + return $this->body; + } + + /** + * {@inheritdoc} + */ + public function setProperties(array $properties) + { + $this->properties = $properties; + } + + /** + * {@inheritdoc} + */ + public function setProperty($name, $value) + { + $this->properties[$name] = $value; + } + + /** + * {@inheritdoc} + */ + public function getProperties() + { + return $this->properties; + } + + /** + * {@inheritdoc} + */ + public function getProperty($name, $default = null) + { + return array_key_exists($name, $this->properties) ? $this->properties[$name] : $default; + } + + /** + * {@inheritdoc} + */ + public function setHeader($name, $value) + { + $this->headers[$name] = $value; + } + + /** + * @param array $headers + */ + public function setHeaders(array $headers) + { + $this->headers = $headers; + } + + /** + * {@inheritdoc} + */ + public function getHeaders() + { + return $this->headers; + } + + /** + * {@inheritdoc} + */ + public function getHeader($name, $default = null) + { + return array_key_exists($name, $this->headers) ? $this->headers[$name] : $default; + } + + /** + * {@inheritdoc} + */ + public function isRedelivered() + { + return $this->redelivered; + } + + /** + * {@inheritdoc} + */ + public function setRedelivered($redelivered) + { + $this->redelivered = $redelivered; + } + + /** + * {@inheritdoc} + */ + public function setReplyTo($replyTo) + { + $this->setHeader('reply_to', $replyTo); + } + + /** + * {@inheritdoc} + */ + public function getReplyTo() + { + return $this->getHeader('reply_to'); + } + + /** + * @return int + */ + public function getPriority() + { + return $this->priority; + } + + /** + * @param int $priority + */ + public function setPriority($priority) + { + $this->priority = $priority; + } + + /** + * @return int + */ + public function getDeliveryDelay() + { + return $this->deliveryDelay; + } + + /** + * Set delay in milliseconds. + * + * @param int $deliveryDelay + */ + public function setDeliveryDelay($deliveryDelay) + { + $this->deliveryDelay = $deliveryDelay; + } + + /** + * @return int|float|null + */ + public function getTimeToLive() + { + return $this->timeToLive; + } + + /** + * Set time to live in milliseconds. + * + * @param int|float|null $timeToLive + */ + public function setTimeToLive($timeToLive) + { + $this->timeToLive = $timeToLive; + } + + /** + * {@inheritdoc} + */ + public function setCorrelationId($correlationId) + { + $this->setHeader('correlation_id', $correlationId); + } + + /** + * {@inheritdoc} + */ + public function getCorrelationId() + { + return $this->getHeader('correlation_id', null); + } + + /** + * {@inheritdoc} + */ + public function setMessageId($messageId) + { + $this->setHeader('message_id', $messageId); + } + + /** + * {@inheritdoc} + */ + public function getMessageId() + { + return $this->getHeader('message_id', null); + } + + /** + * {@inheritdoc} + */ + public function getTimestamp() + { + $value = $this->getHeader('timestamp'); + + return null === $value ? null : (int) $value; + } + + /** + * {@inheritdoc} + */ + public function setTimestamp($timestamp) + { + $this->setHeader('timestamp', $timestamp); + } + + /** + * @return int + */ + public function getPublishedAt() + { + return $this->publishedAt; + } + + /** + * @param int $publishedAt + */ + public function setPublishedAt($publishedAt) + { + $this->publishedAt = $publishedAt; + } +} diff --git a/pkg/mongodb/MongodbProducer.php b/pkg/mongodb/MongodbProducer.php new file mode 100644 index 000000000..304a9ae5c --- /dev/null +++ b/pkg/mongodb/MongodbProducer.php @@ -0,0 +1,181 @@ +context = $context; + } + + /** + * {@inheritdoc} + * + * @param MongodbDestination $destination + * @param MongodbMessage $message + * + * @throws Exception + */ + public function send(PsrDestination $destination, PsrMessage $message) + { + InvalidDestinationException::assertDestinationInstanceOf($destination, MongodbDestination::class); + InvalidMessageException::assertMessageInstanceOf($message, MongodbMessage::class); + + if (null !== $this->priority && 0 === $message->getPriority()) { + $message->setPriority($this->priority); + } + if (null !== $this->deliveryDelay && null === $message->getDeliveryDelay()) { + $message->setDeliveryDelay($this->deliveryDelay); + } + if (null !== $this->timeToLive && null === $message->getTimeToLive()) { + $message->setTimeToLive($this->timeToLive); + } + + $body = $message->getBody(); + if (is_scalar($body) || null === $body) { + $body = (string) $body; + } else { + throw new InvalidMessageException(sprintf( + 'The message body must be a scalar or null. Got: %s', + is_object($body) ? get_class($body) : gettype($body) + )); + } + + $publishedAt = null !== $message->getPublishedAt() ? + $message->getPublishedAt() : + (int) (microtime(true) * 10000) + ; + + $mongoMessage = [ + 'published_at' => $publishedAt, + 'body' => $body, + 'headers' => $message->getHeaders(), + 'properties' => $message->getProperties(), + 'priority' => $message->getPriority(), + 'queue' => $destination->getQueueName(), + 'redelivered' => $message->isRedelivered(), + ]; + + $delay = $message->getDeliveryDelay(); + if ($delay) { + if (!is_int($delay)) { + throw new \LogicException(sprintf( + 'Delay must be integer but got: "%s"', + is_object($delay) ? get_class($delay) : gettype($delay) + )); + } + + if ($delay <= 0) { + throw new \LogicException(sprintf('Delay must be positive integer but got: "%s"', $delay)); + } + + $mongoMessage['delayed_until'] = time() + (int) $delay / 1000; + } + + $timeToLive = $message->getTimeToLive(); + if ($timeToLive) { + if (!is_int($timeToLive)) { + throw new \LogicException(sprintf( + 'TimeToLive must be integer but got: "%s"', + is_object($timeToLive) ? get_class($timeToLive) : gettype($timeToLive) + )); + } + + if ($timeToLive <= 0) { + throw new \LogicException(sprintf('TimeToLive must be positive integer but got: "%s"', $timeToLive)); + } + + $mongoMessage['time_to_live'] = time() + (int) $timeToLive / 1000; + } + + try { + $collection = $this->context->getCollection(); + $collection->insertOne($mongoMessage); + } catch (\Exception $e) { + throw new Exception('The transport has failed to send the message due to some internal error.', null, $e); + } + } + + /** + * {@inheritdoc} + */ + public function setDeliveryDelay($deliveryDelay) + { + $this->deliveryDelay = $deliveryDelay; + + return $this; + } + + /** + * {@inheritdoc} + */ + public function getDeliveryDelay() + { + return $this->deliveryDelay; + } + + /** + * {@inheritdoc} + */ + public function setPriority($priority) + { + $this->priority = $priority; + + return $this; + } + + /** + * {@inheritdoc} + */ + public function getPriority() + { + return $this->priority; + } + + /** + * {@inheritdoc} + */ + public function setTimeToLive($timeToLive) + { + $this->timeToLive = $timeToLive; + } + + /** + * {@inheritdoc} + */ + public function getTimeToLive() + { + return $this->timeToLive; + } +} diff --git a/pkg/mongodb/Tests/Functional/MongodbConsumerTest.php b/pkg/mongodb/Tests/Functional/MongodbConsumerTest.php new file mode 100644 index 000000000..eddf213fe --- /dev/null +++ b/pkg/mongodb/Tests/Functional/MongodbConsumerTest.php @@ -0,0 +1,104 @@ +context = $this->createMongodbContext(); + } + + protected function tearDown() + { + if ($this->context) { + $this->context->close(); + } + + parent::tearDown(); + } + + public function testShouldSetPublishedAtDateToReceivedMessage() + { + $context = $this->context; + $queue = $context->createQueue(__METHOD__); + + $consumer = $context->createConsumer($queue); + + // guard + $this->assertNull($consumer->receiveNoWait()); + + $time = (int) (microtime(true) * 10000); + + $expectedBody = __CLASS__.$time; + + $producer = $context->createProducer(); + + $message = $context->createMessage($expectedBody); + $message->setPublishedAt($time); + $producer->send($queue, $message); + + $message = $consumer->receive(8000); // 8 sec + + $this->assertInstanceOf(MongodbMessage::class, $message); + $consumer->acknowledge($message); + $this->assertSame($expectedBody, $message->getBody()); + $this->assertSame($time, $message->getPublishedAt()); + } + + public function testShouldOrderMessagesWithSamePriorityByPublishedAtDate() + { + $context = $this->context; + $queue = $context->createQueue(__METHOD__); + + $consumer = $context->createConsumer($queue); + + // guard + $this->assertNull($consumer->receiveNoWait()); + + $time = (int) (microtime(true) * 10000); + $olderTime = $time - 10000; + + $expectedPriority5Body = __CLASS__.'_priority5_'.$time; + $expectedPriority5BodyOlderTime = __CLASS__.'_priority5Old_'.$olderTime; + + $producer = $context->createProducer(); + + $message = $context->createMessage($expectedPriority5Body); + $message->setPriority(5); + $message->setPublishedAt($time); + $producer->send($queue, $message); + + $message = $context->createMessage($expectedPriority5BodyOlderTime); + $message->setPriority(5); + $message->setPublishedAt($olderTime); + $producer->send($queue, $message); + + $message = $consumer->receive(8000); // 8 sec + + $this->assertInstanceOf(MongodbMessage::class, $message); + $consumer->acknowledge($message); + $this->assertSame($expectedPriority5BodyOlderTime, $message->getBody()); + + $message = $consumer->receive(8000); // 8 sec + + $this->assertInstanceOf(MongodbMessage::class, $message); + $consumer->acknowledge($message); + $this->assertSame($expectedPriority5Body, $message->getBody()); + } +} diff --git a/pkg/mongodb/Tests/MongodbConnectionFactoryTest.php b/pkg/mongodb/Tests/MongodbConnectionFactoryTest.php new file mode 100644 index 000000000..50afca0eb --- /dev/null +++ b/pkg/mongodb/Tests/MongodbConnectionFactoryTest.php @@ -0,0 +1,52 @@ +assertClassImplements(PsrConnectionFactory::class, MongodbConnectionFactory::class); + } + + public function testCouldBeConstructedWithEmptyConfiguration() + { + $params = [ + 'uri' => 'mongodb://127.0.0.1/', + 'uriOptions' => [], + 'driverOptions' => [], + ]; + + $factory = new MongodbConnectionFactory(); + $this->assertAttributeEquals($params, 'config', $factory); + } + + public function testCouldBeConstructedWithCustomConfiguration() + { + $params = [ + 'uri' => 'mongodb://127.0.0.3/', + 'uriOptions' => ['testValue' => 123], + 'driverOptions' => ['testValue' => 123], + ]; + + $factory = new MongodbConnectionFactory($params); + + $this->assertAttributeEquals($params, 'config', $factory); + } + + public function testShouldCreateContext() + { + $factory = new MongodbConnectionFactory(); + + $context = $factory->createContext(); + + $this->assertInstanceOf(MongodbContext::class, $context); + } +} diff --git a/pkg/mongodb/Tests/MongodbConsumerTest.php b/pkg/mongodb/Tests/MongodbConsumerTest.php new file mode 100644 index 000000000..d4f59b26a --- /dev/null +++ b/pkg/mongodb/Tests/MongodbConsumerTest.php @@ -0,0 +1,209 @@ +assertClassImplements(PsrConsumer::class, MongodbConsumer::class); + } + + public function testCouldBeConstructedWithRequiredArguments() + { + new MongodbConsumer($this->createContextMock(), new MongodbDestination('queue')); + } + + public function testShouldReturnInstanceOfDestination() + { + $destination = new MongodbDestination('queue'); + + $consumer = new MongodbConsumer($this->createContextMock(), $destination); + + $this->assertSame($destination, $consumer->getQueue()); + } + + public function testCouldCallAcknowledgedMethod() + { + $consumer = new MongodbConsumer($this->createContextMock(), new MongodbDestination('queue')); + $consumer->acknowledge(new MongodbMessage()); + } + + public function testCouldSetAndGetPollingInterval() + { + $destination = new MongodbDestination('queue'); + + $consumer = new MongodbConsumer($this->createContextMock(), $destination); + $consumer->setPollingInterval(123456); + + $this->assertEquals(123456, $consumer->getPollingInterval()); + } + + public function testRejectShouldThrowIfInstanceOfMessageIsInvalid() + { + $this->expectException(InvalidMessageException::class); + $this->expectExceptionMessage( + 'The message must be an instance of '. + 'Enqueue\Mongodb\MongodbMessage '. + 'but it is Enqueue\Mongodb\Tests\InvalidMessage.' + ); + + $consumer = new MongodbConsumer($this->createContextMock(), new MongodbDestination('queue')); + $consumer->reject(new InvalidMessage()); + } + + public function testShouldDoNothingOnReject() + { + $queue = new MongodbDestination('queue'); + + $message = new MongodbMessage(); + $message->setBody('theBody'); + + $context = $this->createContextMock(); + $context + ->expects($this->never()) + ->method('createProducer') + ; + + $consumer = new MongodbConsumer($context, $queue); + + $consumer->reject($message); + } + + public function testRejectShouldReSendMessageToSameQueueOnRequeue() + { + $queue = new MongodbDestination('queue'); + + $message = new MongodbMessage(); + $message->setBody('theBody'); + + $producerMock = $this->createProducerMock(); + $producerMock + ->expects($this->once()) + ->method('send') + ->with($this->identicalTo($queue), $this->identicalTo($message)) + ; + + $context = $this->createContextMock(); + $context + ->expects($this->once()) + ->method('createProducer') + ->will($this->returnValue($producerMock)) + ; + + $consumer = new MongodbConsumer($context, $queue); + + $consumer->reject($message, true); + } + + /** + * @return MongodbProducer|\PHPUnit_Framework_MockObject_MockObject + */ + private function createProducerMock() + { + return $this->createMock(MongodbProducer::class); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|MongodbContext + */ + private function createContextMock() + { + return $this->createMock(MongodbContext::class); + } +} + +class InvalidMessage implements PsrMessage +{ + public function getBody() + { + } + + public function setBody($body) + { + } + + public function setProperties(array $properties) + { + } + + public function getProperties() + { + } + + public function setProperty($name, $value) + { + } + + public function getProperty($name, $default = null) + { + } + + public function setHeaders(array $headers) + { + } + + public function getHeaders() + { + } + + public function setHeader($name, $value) + { + } + + public function getHeader($name, $default = null) + { + } + + public function setRedelivered($redelivered) + { + } + + public function isRedelivered() + { + } + + public function setCorrelationId($correlationId) + { + } + + public function getCorrelationId() + { + } + + public function setMessageId($messageId) + { + } + + public function getMessageId() + { + } + + public function getTimestamp() + { + } + + public function setTimestamp($timestamp) + { + } + + public function setReplyTo($replyTo) + { + } + + public function getReplyTo() + { + } +} diff --git a/pkg/mongodb/Tests/MongodbContextTest.php b/pkg/mongodb/Tests/MongodbContextTest.php new file mode 100644 index 000000000..01f44abfe --- /dev/null +++ b/pkg/mongodb/Tests/MongodbContextTest.php @@ -0,0 +1,166 @@ +assertClassImplements(PsrContext::class, MongodbContext::class); + } + + public function testCouldBeConstructedWithRequiredArguments() + { + new MongodbContext($this->createClientMock()); + } + + public function testCouldBeConstructedWithEmptyConfiguration() + { + $context = new MongodbContext($this->createClientMock(), []); + + $this->assertAttributeEquals([ + 'dbname' => 'enqueue', + 'collection_name' => 'enqueue', + 'polling_interval' => null, + ], 'config', $context); + } + + public function testCouldBeConstructedWithCustomConfiguration() + { + $client = new MongodbContext($this->createClientMock(), [ + 'dbname' => 'testDbName', + 'collection_name' => 'testCollectionName', + 'polling_interval' => 123456, + ]); + + $this->assertAttributeEquals([ + 'dbname' => 'testDbName', + 'collection_name' => 'testCollectionName', + 'polling_interval' => 123456, + ], 'config', $client); + } + + public function testShouldCreateMessage() + { + $context = new MongodbContext($this->createClientMock()); + $message = $context->createMessage('body', ['pkey' => 'pval'], ['hkey' => 'hval']); + + $this->assertInstanceOf(MongodbMessage::class, $message); + $this->assertEquals('body', $message->getBody()); + $this->assertEquals(['pkey' => 'pval'], $message->getProperties()); + $this->assertEquals(['hkey' => 'hval'], $message->getHeaders()); + $this->assertSame(0, $message->getPriority()); + $this->assertFalse($message->isRedelivered()); + } + + public function testShouldCreateTopic() + { + $context = new MongodbContext($this->createClientMock()); + $topic = $context->createTopic('topic'); + + $this->assertInstanceOf(MongodbDestination::class, $topic); + $this->assertEquals('topic', $topic->getTopicName()); + } + + public function testShouldCreateQueue() + { + $context = new MongodbContext($this->createClientMock()); + $queue = $context->createQueue('queue'); + + $this->assertInstanceOf(MongodbDestination::class, $queue); + $this->assertEquals('queue', $queue->getQueueName()); + } + + public function testShouldCreateProducer() + { + $context = new MongodbContext($this->createClientMock()); + + $this->assertInstanceOf(MongodbProducer::class, $context->createProducer()); + } + + public function testShouldCreateConsumer() + { + $context = new MongodbContext($this->createClientMock()); + + $this->assertInstanceOf(MongodbConsumer::class, $context->createConsumer(new MongodbDestination(''))); + } + + public function testShouldCreateMessageConsumerAndSetPollingInterval() + { + $context = new MongodbContext($this->createClientMock(), [ + 'polling_interval' => 123456, + ]); + + $consumer = $context->createConsumer(new MongodbDestination('')); + + $this->assertInstanceOf(MongodbConsumer::class, $consumer); + $this->assertEquals(123456, $consumer->getPollingInterval()); + } + + public function testShouldThrowIfDestinationIsInvalidInstanceType() + { + $this->expectException(InvalidDestinationException::class); + $this->expectExceptionMessage( + 'The destination must be an instance of '. + 'Enqueue\Mongodb\MongodbDestination but got '. + 'Enqueue\Mongodb\Tests\NotSupportedDestination2.' + ); + + $context = new MongodbContext($this->createClientMock()); + + $this->assertInstanceOf(MongodbConsumer::class, $context->createConsumer(new NotSupportedDestination2())); + } + + public function testShouldReturnInstanceOfClient() + { + $context = new MongodbContext($client = $this->createClientMock()); + + $this->assertSame($client, $context->getClient()); + } + + public function testShouldReturnConfig() + { + $context = new MongodbContext($this->createClientMock()); + + $this->assertSame([ + 'dbname' => 'enqueue', + 'collection_name' => 'enqueue', + 'polling_interval' => null, + ], $context->getConfig()); + } + + public function testShouldThrowBadMethodCallExceptionOncreateTemporaryQueueCall() + { + $context = new MongodbContext($this->createClientMock()); + + $this->expectException(\BadMethodCallException::class); + $this->expectExceptionMessage('Mongodb transport does not support temporary queues'); + + $context->createTemporaryQueue(); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|Client + */ + private function createClientMock() + { + return $this->createMock(Client::class); + } +} + +class NotSupportedDestination2 implements PsrDestination +{ +} diff --git a/pkg/mongodb/Tests/MongodbDestinationTest.php b/pkg/mongodb/Tests/MongodbDestinationTest.php new file mode 100644 index 000000000..ddde2110c --- /dev/null +++ b/pkg/mongodb/Tests/MongodbDestinationTest.php @@ -0,0 +1,37 @@ +assertClassImplements(PsrDestination::class, MongodbDestination::class); + } + + public function testShouldImplementTopicInterface() + { + $this->assertClassImplements(PsrTopic::class, MongodbDestination::class); + } + + public function testShouldImplementQueueInterface() + { + $this->assertClassImplements(PsrQueue::class, MongodbDestination::class); + } + + public function testShouldReturnTopicAndQueuePreviouslySetInConstructor() + { + $destination = new MongodbDestination('topic-or-queue-name'); + + $this->assertSame('topic-or-queue-name', $destination->getQueueName()); + $this->assertSame('topic-or-queue-name', $destination->getTopicName()); + } +} diff --git a/pkg/mongodb/Tests/MongodbMessageTest.php b/pkg/mongodb/Tests/MongodbMessageTest.php new file mode 100644 index 000000000..50af98e80 --- /dev/null +++ b/pkg/mongodb/Tests/MongodbMessageTest.php @@ -0,0 +1,91 @@ +assertSame('', $message->getBody()); + $this->assertSame([], $message->getProperties()); + $this->assertSame([], $message->getHeaders()); + } + + public function testCouldBeConstructedWithOptionalArguments() + { + $message = new MongodbMessage('theBody', ['barProp' => 'barPropVal'], ['fooHeader' => 'fooHeaderVal']); + + $this->assertSame('theBody', $message->getBody()); + $this->assertSame(['barProp' => 'barPropVal'], $message->getProperties()); + $this->assertSame(['fooHeader' => 'fooHeaderVal'], $message->getHeaders()); + } + + public function testShouldSetPriorityToZeroInConstructor() + { + $message = new MongodbMessage(); + + $this->assertSame(0, $message->getPriority()); + } + + public function testShouldSetDelayToNullInConstructor() + { + $message = new MongodbMessage(); + + $this->assertNull($message->getDeliveryDelay()); + } + + public function testShouldSetCorrelationIdAsHeader() + { + $message = new MongodbMessage(); + $message->setCorrelationId('theCorrelationId'); + + $this->assertSame(['correlation_id' => 'theCorrelationId'], $message->getHeaders()); + } + + public function testShouldSetPublishedAtToNullInConstructor() + { + $message = new MongodbMessage(); + + $this->assertNull($message->getPublishedAt()); + } + + public function testShouldSetMessageIdAsHeader() + { + $message = new MongodbMessage(); + $message->setMessageId('theMessageId'); + + $this->assertSame(['message_id' => 'theMessageId'], $message->getHeaders()); + } + + public function testShouldSetTimestampAsHeader() + { + $message = new MongodbMessage(); + $message->setTimestamp(12345); + + $this->assertSame(['timestamp' => 12345], $message->getHeaders()); + } + + public function testShouldSetReplyToAsHeader() + { + $message = new MongodbMessage(); + $message->setReplyTo('theReply'); + + $this->assertSame(['reply_to' => 'theReply'], $message->getHeaders()); + } + + public function testShouldAllowGetPreviouslySetPublishedAtTime() + { + $message = new MongodbMessage(); + + $message->setPublishedAt(123); + + $this->assertSame(123, $message->getPublishedAt()); + } +} diff --git a/pkg/mongodb/Tests/MongodbProducerTest.php b/pkg/mongodb/Tests/MongodbProducerTest.php new file mode 100644 index 000000000..3b32d1a8b --- /dev/null +++ b/pkg/mongodb/Tests/MongodbProducerTest.php @@ -0,0 +1,66 @@ +assertClassImplements(PsrProducer::class, MongodbProducer::class); + } + + public function testCouldBeConstructedWithRequiredArguments() + { + new MongodbProducer($this->createContextMock()); + } + + public function testShouldThrowIfBodyOfInvalidType() + { + $this->expectException(InvalidMessageException::class); + $this->expectExceptionMessage('The message body must be a scalar or null. Got: stdClass'); + + $producer = new MongodbProducer($this->createContextMock()); + + $message = new MongodbMessage(new \stdClass()); + + $producer->send(new MongodbDestination(''), $message); + } + + public function testShouldThrowIfDestinationOfInvalidType() + { + $this->expectException(InvalidDestinationException::class); + $this->expectExceptionMessage( + 'The destination must be an instance of '. + 'Enqueue\Mongodb\MongodbDestination but got '. + 'Enqueue\Mongodb\Tests\NotSupportedDestination1.' + ); + + $producer = new MongodbProducer($this->createContextMock()); + + $producer->send(new NotSupportedDestination1(), new MongodbMessage()); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|MongodbContext + */ + private function createContextMock() + { + return $this->createMock(MongodbContext::class); + } +} + +class NotSupportedDestination1 implements PsrDestination +{ +} diff --git a/pkg/mongodb/Tests/Spec/CreateMongodbContextTrait.php b/pkg/mongodb/Tests/Spec/CreateMongodbContextTrait.php new file mode 100644 index 000000000..703ea7fc5 --- /dev/null +++ b/pkg/mongodb/Tests/Spec/CreateMongodbContextTrait.php @@ -0,0 +1,21 @@ +markTestSkipped('The MONGO_DSN env is not available. Skip tests'); + } + + $factory = new MongodbConnectionFactory(['uri' => $env]); + + $context = $factory->createContext(); + + return $context; + } +} diff --git a/pkg/mongodb/Tests/Spec/MongodbConnectionFactoryTest.php b/pkg/mongodb/Tests/Spec/MongodbConnectionFactoryTest.php new file mode 100644 index 000000000..748ed516b --- /dev/null +++ b/pkg/mongodb/Tests/Spec/MongodbConnectionFactoryTest.php @@ -0,0 +1,17 @@ +createMongodbContext(); + } +} diff --git a/pkg/mongodb/Tests/Spec/MongodbMessageTest.php b/pkg/mongodb/Tests/Spec/MongodbMessageTest.php new file mode 100644 index 000000000..1a4e8d77d --- /dev/null +++ b/pkg/mongodb/Tests/Spec/MongodbMessageTest.php @@ -0,0 +1,17 @@ +createMongodbContext()->createProducer(); + } +} diff --git a/pkg/mongodb/Tests/Spec/MongodbQueueTest.php b/pkg/mongodb/Tests/Spec/MongodbQueueTest.php new file mode 100644 index 000000000..5945975f5 --- /dev/null +++ b/pkg/mongodb/Tests/Spec/MongodbQueueTest.php @@ -0,0 +1,17 @@ +createMongodbContext(); + } +} diff --git a/pkg/mongodb/Tests/Spec/MongodbSendAndReceiveDelayedMessageFromQueueTest.php b/pkg/mongodb/Tests/Spec/MongodbSendAndReceiveDelayedMessageFromQueueTest.php new file mode 100644 index 000000000..6cf600326 --- /dev/null +++ b/pkg/mongodb/Tests/Spec/MongodbSendAndReceiveDelayedMessageFromQueueTest.php @@ -0,0 +1,21 @@ +createMongodbContext(); + } +} diff --git a/pkg/mongodb/Tests/Spec/MongodbSendAndReceivePriorityMessagesFromQueueTest.php b/pkg/mongodb/Tests/Spec/MongodbSendAndReceivePriorityMessagesFromQueueTest.php new file mode 100644 index 000000000..f06eeb3ff --- /dev/null +++ b/pkg/mongodb/Tests/Spec/MongodbSendAndReceivePriorityMessagesFromQueueTest.php @@ -0,0 +1,51 @@ +publishedAt = (int) (microtime(true) * 10000); + } + + /** + * @return PsrContext + */ + protected function createContext() + { + return $this->createMongodbContext(); + } + + /** + * {@inheritdoc} + * + * @param MongodbContext $context + * + * @return MongodbMessage + */ + protected function createMessage(PsrContext $context, $body) + { + /** @var MongodbMessage $message */ + $message = parent::createMessage($context, $body); + + // in order to test priorities correctly we have to make sure the messages were sent in the same time. + $message->setPublishedAt($this->publishedAt); + + return $message; + } +} diff --git a/pkg/mongodb/Tests/Spec/MongodbSendAndReceiveTimeToLiveMessagesFromQueueTest.php b/pkg/mongodb/Tests/Spec/MongodbSendAndReceiveTimeToLiveMessagesFromQueueTest.php new file mode 100644 index 000000000..766a02e33 --- /dev/null +++ b/pkg/mongodb/Tests/Spec/MongodbSendAndReceiveTimeToLiveMessagesFromQueueTest.php @@ -0,0 +1,21 @@ +createMongodbContext(); + } +} diff --git a/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveFromQueueTest.php b/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveFromQueueTest.php new file mode 100644 index 000000000..b67dd570f --- /dev/null +++ b/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveFromQueueTest.php @@ -0,0 +1,21 @@ +createMongodbContext(); + } +} diff --git a/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveFromTopicTest.php b/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveFromTopicTest.php new file mode 100644 index 000000000..e8ef8396d --- /dev/null +++ b/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveFromTopicTest.php @@ -0,0 +1,21 @@ +createMongodbContext(); + } +} diff --git a/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveNoWaitFromQueueTest.php b/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveNoWaitFromQueueTest.php new file mode 100644 index 000000000..955d41495 --- /dev/null +++ b/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveNoWaitFromQueueTest.php @@ -0,0 +1,21 @@ +createMongodbContext(); + } +} diff --git a/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveNoWaitFromTopicTest.php b/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveNoWaitFromTopicTest.php new file mode 100644 index 000000000..511485af0 --- /dev/null +++ b/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveNoWaitFromTopicTest.php @@ -0,0 +1,21 @@ +createMongodbContext(); + } +} diff --git a/pkg/mongodb/Tests/Spec/MongodbTopicTest.php b/pkg/mongodb/Tests/Spec/MongodbTopicTest.php new file mode 100644 index 000000000..8010874cd --- /dev/null +++ b/pkg/mongodb/Tests/Spec/MongodbTopicTest.php @@ -0,0 +1,17 @@ + Date: Fri, 27 Apr 2018 16:03:24 +0300 Subject: [PATCH 02/24] client --- pkg/mongodb/.gitignore | 1 + pkg/mongodb/Client/MongodbDriver.php | 186 ++++++++++ pkg/mongodb/MongodbConnectionFactory.php | 2 +- pkg/mongodb/MongodbContext.php | 10 +- .../Tests/Client/MongodbDriverTest.php | 351 ++++++++++++++++++ 5 files changed, 547 insertions(+), 3 deletions(-) create mode 100644 pkg/mongodb/Client/MongodbDriver.php create mode 100644 pkg/mongodb/Tests/Client/MongodbDriverTest.php diff --git a/pkg/mongodb/.gitignore b/pkg/mongodb/.gitignore index a770439e5..57bbbe0bb 100644 --- a/pkg/mongodb/.gitignore +++ b/pkg/mongodb/.gitignore @@ -4,3 +4,4 @@ /phpunit.xml /vendor/ /.idea/ +/examples/ diff --git a/pkg/mongodb/Client/MongodbDriver.php b/pkg/mongodb/Client/MongodbDriver.php new file mode 100644 index 000000000..51ded8cc6 --- /dev/null +++ b/pkg/mongodb/Client/MongodbDriver.php @@ -0,0 +1,186 @@ + 0, + MessagePriority::LOW => 1, + MessagePriority::NORMAL => 2, + MessagePriority::HIGH => 3, + MessagePriority::VERY_HIGH => 4, + ]; + + /** + * @param MongodbContext $context + * @param Config $config + * @param QueueMetaRegistry $queueMetaRegistry + */ + public function __construct(MongodbContext $context, Config $config, QueueMetaRegistry $queueMetaRegistry) + { + $this->context = $context; + $this->config = $config; + $this->queueMetaRegistry = $queueMetaRegistry; + } + + /** + * {@inheritdoc} + * + * @return MongodbMessage + */ + public function createTransportMessage(Message $message) + { + $properties = $message->getProperties(); + + $headers = $message->getHeaders(); + $headers['content_type'] = $message->getContentType(); + + $transportMessage = $this->context->createMessage(); + $transportMessage->setBody($message->getBody()); + $transportMessage->setHeaders($headers); + $transportMessage->setProperties($properties); + $transportMessage->setMessageId($message->getMessageId()); + $transportMessage->setTimestamp($message->getTimestamp()); + $transportMessage->setDeliveryDelay($message->getDelay()); + $transportMessage->setReplyTo($message->getReplyTo()); + $transportMessage->setCorrelationId($message->getCorrelationId()); + if (array_key_exists($message->getPriority(), self::$priorityMap)) { + $transportMessage->setPriority(self::$priorityMap[$message->getPriority()]); + } + + return $transportMessage; + } + + /** + * @param MongodbMessage $message + * + * {@inheritdoc} + */ + public function createClientMessage(PsrMessage $message) + { + $clientMessage = new Message(); + + $clientMessage->setBody($message->getBody()); + $clientMessage->setHeaders($message->getHeaders()); + $clientMessage->setProperties($message->getProperties()); + + $clientMessage->setContentType($message->getHeader('content_type')); + $clientMessage->setMessageId($message->getMessageId()); + $clientMessage->setTimestamp($message->getTimestamp()); + $clientMessage->setDelay($message->getDeliveryDelay()); + $clientMessage->setReplyTo($message->getReplyTo()); + $clientMessage->setCorrelationId($message->getCorrelationId()); + + $priorityMap = array_flip(self::$priorityMap); + $priority = array_key_exists($message->getPriority(), $priorityMap) ? + $priorityMap[$message->getPriority()] : + MessagePriority::NORMAL; + $clientMessage->setPriority($priority); + + return $clientMessage; + } + + /** + * {@inheritdoc} + */ + public function sendToRouter(Message $message) + { + if (false == $message->getProperty(Config::PARAMETER_TOPIC_NAME)) { + throw new \LogicException('Topic name parameter is required but is not set'); + } + + $queue = $this->createQueue($this->config->getRouterQueueName()); + $transportMessage = $this->createTransportMessage($message); + + $this->context->createProducer()->send($queue, $transportMessage); + } + + /** + * {@inheritdoc} + */ + public function sendToProcessor(Message $message) + { + if (false == $message->getProperty(Config::PARAMETER_PROCESSOR_NAME)) { + throw new \LogicException('Processor name parameter is required but is not set'); + } + + if (false == $queueName = $message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME)) { + throw new \LogicException('Queue name parameter is required but is not set'); + } + + $transportMessage = $this->createTransportMessage($message); + $destination = $this->createQueue($queueName); + + $this->context->createProducer()->send($destination, $transportMessage); + } + + /** + * {@inheritdoc} + */ + public function createQueue($queueName) + { + $transportName = $this->queueMetaRegistry->getQueueMeta($queueName)->getTransportName(); + + return $this->context->createQueue($transportName); + } + + /** + * {@inheritdoc} + */ + public function setupBroker(LoggerInterface $logger = null) + { + $logger = $logger ?: new NullLogger(); + $log = function ($text, ...$args) use ($logger) { + $logger->debug(sprintf('[MongodbDriver] ' . $text, ...$args)); + }; + $contextConfig = $this->context->getConfig(); + $log('Creating database and collection: "%s" "%s"', $contextConfig['dbname'], $contextConfig['collection_name']); + $this->context->createCollection(); + } + + /** + * {@inheritdoc} + */ + public function getConfig() + { + return $this->config; + } + + /** + * @return array + */ + public static function getPriorityMap() + { + return self::$priorityMap; + } +} diff --git a/pkg/mongodb/MongodbConnectionFactory.php b/pkg/mongodb/MongodbConnectionFactory.php index 0e4e75023..23ff86e0d 100644 --- a/pkg/mongodb/MongodbConnectionFactory.php +++ b/pkg/mongodb/MongodbConnectionFactory.php @@ -24,7 +24,7 @@ class MongodbConnectionFactory implements PsrConnectionFactory * * or * - * mongodb://127.0.0.1:27017/dbname/collection_name?polling_interval=1000 + * mongodb://127.0.0.1:27017/dbname?polling_interval=1000&enqueue_collection=enqueue * * @param array|string|null $config */ diff --git a/pkg/mongodb/MongodbContext.php b/pkg/mongodb/MongodbContext.php index c8669edd7..17f96daf2 100644 --- a/pkg/mongodb/MongodbContext.php +++ b/pkg/mongodb/MongodbContext.php @@ -80,8 +80,7 @@ public function getCollection() { return $this->client ->selectDatabase($this->config['dbname']) - ->selectCollection($this->config['collection_name']) - ; + ->selectCollection($this->config['collection_name']); } /** @@ -99,4 +98,11 @@ public function getConfig() { return $this->config; } + + public function createCollection() + { + $collection = $this->getCollection(); + $collection->createIndex(['priority' => -1, 'published_at' => 1], ['name' => 'enqueue_priority']); + $collection->createIndex(['delayed_until' => 1], ['name' => 'enqueue_delayed']); + } } diff --git a/pkg/mongodb/Tests/Client/MongodbDriverTest.php b/pkg/mongodb/Tests/Client/MongodbDriverTest.php new file mode 100644 index 000000000..1d4bb60ad --- /dev/null +++ b/pkg/mongodb/Tests/Client/MongodbDriverTest.php @@ -0,0 +1,351 @@ +assertClassImplements(DriverInterface::class, MongodbDriver::class); + } + + public function testCouldBeConstructedWithRequiredArguments() + { + new MongodbDriver( + $this->createPsrContextMock(), + $this->createDummyConfig(), + $this->createDummyQueueMetaRegistry() + ); + } + + public function testShouldReturnConfigObject() + { + $config = $this->createDummyConfig(); + + $driver = new MongodbDriver( + $this->createPsrContextMock(), + $config, + $this->createDummyQueueMetaRegistry() + ); + + $this->assertSame($config, $driver->getConfig()); + } + + public function testShouldCreateAndReturnQueueInstance() + { + $expectedQueue = new MongodbDestination('aName'); + + $context = $this->createPsrContextMock(); + $context + ->expects($this->once()) + ->method('createQueue') + ->with('aprefix.afooqueue') + ->willReturn($expectedQueue) + ; + + $driver = new MongodbDriver($context, $this->createDummyConfig(), $this->createDummyQueueMetaRegistry()); + + $queue = $driver->createQueue('aFooQueue'); + + $this->assertSame($expectedQueue, $queue); + } + + public function testShouldCreateAndReturnQueueInstanceWithHardcodedTransportName() + { + $expectedQueue = new MongodbDestination('aName'); + + $context = $this->createPsrContextMock(); + $context + ->expects($this->once()) + ->method('createQueue') + ->with('aBarQueue') + ->willReturn($expectedQueue) + ; + + $driver = new MongodbDriver($context, $this->createDummyConfig(), $this->createDummyQueueMetaRegistry()); + + $queue = $driver->createQueue('aBarQueue'); + + $this->assertSame($expectedQueue, $queue); + } + + public function testShouldConvertTransportMessageToClientMessage() + { + $transportMessage = new MongodbMessage(); + $transportMessage->setBody('body'); + $transportMessage->setHeaders(['hkey' => 'hval']); + $transportMessage->setProperties(['key' => 'val']); + $transportMessage->setHeader('content_type', 'ContentType'); + $transportMessage->setMessageId('MessageId'); + $transportMessage->setTimestamp(1000); + $transportMessage->setPriority(2); + $transportMessage->setDeliveryDelay(12345); + + $driver = new MongodbDriver( + $this->createPsrContextMock(), + $this->createDummyConfig(), + $this->createDummyQueueMetaRegistry() + ); + + $clientMessage = $driver->createClientMessage($transportMessage); + + $this->assertInstanceOf(Message::class, $clientMessage); + $this->assertSame('body', $clientMessage->getBody()); + $this->assertSame([ + 'hkey' => 'hval', + 'content_type' => 'ContentType', + 'message_id' => 'MessageId', + 'timestamp' => 1000, + ], $clientMessage->getHeaders()); + $this->assertSame([ + 'key' => 'val', + ], $clientMessage->getProperties()); + $this->assertSame('MessageId', $clientMessage->getMessageId()); + $this->assertSame('ContentType', $clientMessage->getContentType()); + $this->assertSame(1000, $clientMessage->getTimestamp()); + $this->assertSame(12345, $clientMessage->getDelay()); + + $this->assertNull($clientMessage->getExpire()); + $this->assertSame(MessagePriority::NORMAL, $clientMessage->getPriority()); + } + + public function testShouldConvertClientMessageToTransportMessage() + { + $clientMessage = new Message(); + $clientMessage->setBody('body'); + $clientMessage->setHeaders(['hkey' => 'hval']); + $clientMessage->setProperties(['key' => 'val']); + $clientMessage->setContentType('ContentType'); + $clientMessage->setExpire(123); + $clientMessage->setPriority(MessagePriority::VERY_HIGH); + $clientMessage->setMessageId('MessageId'); + $clientMessage->setTimestamp(1000); + + $context = $this->createPsrContextMock(); + $context + ->expects($this->once()) + ->method('createMessage') + ->willReturn(new MongodbMessage()) + ; + + $driver = new MongodbDriver( + $context, + $this->createDummyConfig(), + $this->createDummyQueueMetaRegistry() + ); + + $transportMessage = $driver->createTransportMessage($clientMessage); + + $this->assertInstanceOf(MongodbMessage::class, $transportMessage); + $this->assertSame('body', $transportMessage->getBody()); + $this->assertSame([ + 'hkey' => 'hval', + 'content_type' => 'ContentType', + 'message_id' => 'MessageId', + 'timestamp' => 1000, + 'reply_to' => null, + 'correlation_id' => null, + ], $transportMessage->getHeaders()); + $this->assertSame([ + 'key' => 'val', + ], $transportMessage->getProperties()); + $this->assertSame('MessageId', $transportMessage->getMessageId()); + $this->assertSame(1000, $transportMessage->getTimestamp()); + } + + public function testShouldSendMessageToRouter() + { + $topic = new MongodbDestination('queue-name'); + $transportMessage = new MongodbMessage(); + + $producer = $this->createPsrProducerMock(); + $producer + ->expects($this->once()) + ->method('send') + ->with($this->identicalTo($topic), $this->identicalTo($transportMessage)) + ; + $context = $this->createPsrContextMock(); + $context + ->expects($this->once()) + ->method('createQueue') + ->with('aprefix.default') + ->willReturn($topic) + ; + $context + ->expects($this->once()) + ->method('createProducer') + ->willReturn($producer) + ; + $context + ->expects($this->once()) + ->method('createMessage') + ->willReturn($transportMessage) + ; + + $driver = new MongodbDriver( + $context, + $this->createDummyConfig(), + $this->createDummyQueueMetaRegistry() + ); + + $message = new Message(); + $message->setProperty(Config::PARAMETER_TOPIC_NAME, 'topic'); + + $driver->sendToRouter($message); + } + + public function testShouldThrowExceptionIfTopicParameterIsNotSet() + { + $driver = new MongodbDriver( + $this->createPsrContextMock(), + $this->createDummyConfig(), + $this->createDummyQueueMetaRegistry() + ); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Topic name parameter is required but is not set'); + + $driver->sendToRouter(new Message()); + } + + public function testShouldSendMessageToProcessor() + { + $queue = new MongodbDestination('queue-name'); + $transportMessage = new MongodbMessage(); + + $producer = $this->createPsrProducerMock(); + $producer + ->expects($this->once()) + ->method('send') + ->with($this->identicalTo($queue), $this->identicalTo($transportMessage)) + ; + $context = $this->createPsrContextMock(); + $context + ->expects($this->once()) + ->method('createQueue') + ->willReturn($queue) + ; + $context + ->expects($this->once()) + ->method('createProducer') + ->willReturn($producer) + ; + $context + ->expects($this->once()) + ->method('createMessage') + ->willReturn($transportMessage) + ; + + $driver = new MongodbDriver( + $context, + $this->createDummyConfig(), + $this->createDummyQueueMetaRegistry() + ); + + $message = new Message(); + $message->setProperty(Config::PARAMETER_PROCESSOR_NAME, 'processor'); + $message->setProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME, 'aFooQueue'); + + $driver->sendToProcessor($message); + } + + public function testShouldThrowExceptionIfProcessorNameParameterIsNotSet() + { + $driver = new MongodbDriver( + $this->createPsrContextMock(), + $this->createDummyConfig(), + $this->createDummyQueueMetaRegistry() + ); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Processor name parameter is required but is not set'); + + $driver->sendToProcessor(new Message()); + } + + public function testShouldThrowExceptionIfProcessorQueueNameParameterIsNotSet() + { + $driver = new MongodbDriver( + $this->createPsrContextMock(), + $this->createDummyConfig(), + $this->createDummyQueueMetaRegistry() + ); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Queue name parameter is required but is not set'); + + $message = new Message(); + $message->setProperty(Config::PARAMETER_PROCESSOR_NAME, 'processor'); + + $driver->sendToProcessor($message); + } + + public function testShouldSetupBroker() + { + $context = $this->createPsrContextMock(); + + $context + ->expects($this->once()) + ->method('createCollection') + ; + + $driver = new MongodbDriver( + $context, + $this->createDummyConfig(), + $this->createDummyQueueMetaRegistry() + ); + + $driver->setupBroker(); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|MongodbContext + */ + private function createPsrContextMock() + { + return $this->createMock(MongodbContext::class); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|PsrProducer + */ + private function createPsrProducerMock() + { + return $this->createMock(PsrProducer::class); + } + + /** + * @return QueueMetaRegistry + */ + private function createDummyQueueMetaRegistry() + { + $registry = new QueueMetaRegistry($this->createDummyConfig(), []); + $registry->add('default'); + $registry->add('aFooQueue'); + $registry->add('aBarQueue', 'aBarQueue'); + + return $registry; + } + + /** + * @return Config + */ + private function createDummyConfig() + { + return Config::create('aPrefix'); + } +} From 83d4b855e3cf76d52359b45b01305f769bebd656 Mon Sep 17 00:00:00 2001 From: Dmitry Tomin Date: Fri, 27 Apr 2018 18:12:32 +0300 Subject: [PATCH 03/24] smfony transport --- pkg/mongodb/Client/MongodbDriver.php | 6 +- .../Symfony/MongodbTransportFactory.php | 123 +++++++++++++ .../Tests/MongodbConnectionFactoryTest.php | 2 - .../Symfony/MongodbTransportFactoryTest.php | 161 ++++++++++++++++++ 4 files changed, 287 insertions(+), 5 deletions(-) create mode 100644 pkg/mongodb/Symfony/MongodbTransportFactory.php create mode 100644 pkg/mongodb/Tests/Symfony/MongodbTransportFactoryTest.php diff --git a/pkg/mongodb/Client/MongodbDriver.php b/pkg/mongodb/Client/MongodbDriver.php index 51ded8cc6..aa0d4f999 100644 --- a/pkg/mongodb/Client/MongodbDriver.php +++ b/pkg/mongodb/Client/MongodbDriver.php @@ -42,8 +42,8 @@ class MongodbDriver implements DriverInterface ]; /** - * @param MongodbContext $context - * @param Config $config + * @param MongodbContext $context + * @param Config $config * @param QueueMetaRegistry $queueMetaRegistry */ public function __construct(MongodbContext $context, Config $config, QueueMetaRegistry $queueMetaRegistry) @@ -161,7 +161,7 @@ public function setupBroker(LoggerInterface $logger = null) { $logger = $logger ?: new NullLogger(); $log = function ($text, ...$args) use ($logger) { - $logger->debug(sprintf('[MongodbDriver] ' . $text, ...$args)); + $logger->debug(sprintf('[MongodbDriver] '.$text, ...$args)); }; $contextConfig = $this->context->getConfig(); $log('Creating database and collection: "%s" "%s"', $contextConfig['dbname'], $contextConfig['collection_name']); diff --git a/pkg/mongodb/Symfony/MongodbTransportFactory.php b/pkg/mongodb/Symfony/MongodbTransportFactory.php new file mode 100644 index 000000000..84d163239 --- /dev/null +++ b/pkg/mongodb/Symfony/MongodbTransportFactory.php @@ -0,0 +1,123 @@ +name = $name; + } + + /** + * {@inheritdoc} + */ + public function addConfiguration(ArrayNodeDefinition $builder) + { + $builder + ->beforeNormalization() + ->ifString() + ->then(function ($v) { + return ['dsn' => $v]; + }) + ->end() + ->children() + ->scalarNode('dsn') + ->info('The Mongodb DSN. Other parameters are ignored if set') + ->end() + ->scalarNode('dbname') + ->defaultValue('enqueue') + ->info('Database name.') + ->end() + ->scalarNode('collection_name') + ->defaultValue('enqueue') + ->info('Collection') + ->end() + ->integerNode('polling_interval') + ->defaultValue(1000) + ->min(100) + ->info('How often query for new messages.') + ->end() + ; + } + + /** + * {@inheritdoc} + */ + public function createConnectionFactory(ContainerBuilder $container, array $config) + { + if (false == empty($config['dsn'])) { + $factory = new Definition(MongodbConnectionFactory::class); + $factory->setArguments([$config]); + } else { + throw new \LogicException('Set "dsn" options when you want ot use Mongodb.'); + } + + $factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName()); + $container->setDefinition($factoryId, $factory); + + return $factoryId; + } + + /** + * {@inheritdoc} + */ + public function createContext(ContainerBuilder $container, array $config) + { + $factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName()); + + $context = new Definition(MongodbContext::class); + $context->setPublic(true); + $context->setFactory([new Reference($factoryId), 'createContext']); + + $contextId = sprintf('enqueue.transport.%s.context', $this->getName()); + $container->setDefinition($contextId, $context); + + return $contextId; + } + + /** + * {@inheritdoc} + */ + public function createDriver(ContainerBuilder $container, array $config) + { + $driver = new Definition(MongodbDriver::class); + $driver->setPublic(true); + $driver->setArguments([ + new Reference(sprintf('enqueue.transport.%s.context', $this->getName())), + new Reference('enqueue.client.config'), + new Reference('enqueue.client.meta.queue_meta_registry'), + ]); + + $driverId = sprintf('enqueue.client.%s.driver', $this->getName()); + $container->setDefinition($driverId, $driver); + + return $driverId; + } + + /** + * {@inheritdoc} + */ + public function getName() + { + return $this->name; + } +} diff --git a/pkg/mongodb/Tests/MongodbConnectionFactoryTest.php b/pkg/mongodb/Tests/MongodbConnectionFactoryTest.php index 50afca0eb..3fb1dfdc2 100644 --- a/pkg/mongodb/Tests/MongodbConnectionFactoryTest.php +++ b/pkg/mongodb/Tests/MongodbConnectionFactoryTest.php @@ -20,8 +20,6 @@ public function testCouldBeConstructedWithEmptyConfiguration() { $params = [ 'uri' => 'mongodb://127.0.0.1/', - 'uriOptions' => [], - 'driverOptions' => [], ]; $factory = new MongodbConnectionFactory(); diff --git a/pkg/mongodb/Tests/Symfony/MongodbTransportFactoryTest.php b/pkg/mongodb/Tests/Symfony/MongodbTransportFactoryTest.php new file mode 100644 index 000000000..e38b78adc --- /dev/null +++ b/pkg/mongodb/Tests/Symfony/MongodbTransportFactoryTest.php @@ -0,0 +1,161 @@ +assertClassImplements(TransportFactoryInterface::class, MongodbTransportFactory::class); + } + + public function testCouldBeConstructedWithDefaultName() + { + $transport = new MongodbTransportFactory(); + + $this->assertEquals('Mongodb', $transport->getName()); + } + + public function testCouldBeConstructedWithCustomName() + { + $transport = new MongodbTransportFactory('theCustomName'); + + $this->assertEquals('theCustomName', $transport->getName()); + } + + public function testShouldAllowAddConfiguration() + { + $transport = new MongodbTransportFactory(); + $tb = new TreeBuilder(); + $rootNode = $tb->root('foo'); + + $transport->addConfiguration($rootNode); + $processor = new Processor(); + $config = $processor->process($tb->buildTree(), [[ + 'dsn' => 'mongodb://127.0.0.1/', + ]]); + + $this->assertEquals([ + 'dsn' => 'mongodb://127.0.0.1/', + 'dbname' => 'enqueue', + 'collection_name' => 'enqueue', + 'polling_interval' => 1000, + ], $config); + } + + public function testShouldAllowAddConfigurationAsString() + { + $transport = new MongodbTransportFactory(); + $tb = new TreeBuilder(); + $rootNode = $tb->root('foo'); + + $transport->addConfiguration($rootNode); + $processor = new Processor(); + $config = $processor->process($tb->buildTree(), ['mysqlDSN']); + + $this->assertEquals([ + 'dsn' => 'mysqlDSN', + 'dbname' => 'enqueue', + 'collection_name' => 'enqueue', + 'polling_interval' => 1000, + ], $config); + } + + public function testShouldCreateMongodbConnectionFactory() + { + $container = new ContainerBuilder(); + + $transport = new MongodbTransportFactory(); + + $serviceId = $transport->createConnectionFactory($container, [ + 'dsn' => 'mysqlDSN', + 'dbname' => 'enqueue', + 'collection_name' => 'enqueue', + 'polling_interval' => 1000, + ]); + + $this->assertTrue($container->hasDefinition($serviceId)); + $factory = $container->getDefinition($serviceId); + $this->assertEquals(MongodbConnectionFactory::class, $factory->getClass()); + + $this->assertSame([ + 'dsn' => 'mysqlDSN', + 'dbname' => 'enqueue', + 'collection_name' => 'enqueue', + 'polling_interval' => 1000, + ], $factory->getArgument(0)); + } + + public function testShouldCreateConnectionFactoryFromDsnString() + { + $container = new ContainerBuilder(); + + $transport = new MongodbTransportFactory(); + + $serviceId = $transport->createConnectionFactory($container, [ + 'dsn' => 'theDSN', + 'connection' => [], + 'lazy' => true, + 'table_name' => 'enqueue', + 'polling_interval' => 1000, + ]); + + $this->assertTrue($container->hasDefinition($serviceId)); + $factory = $container->getDefinition($serviceId); + $this->assertEquals(MongodbConnectionFactory::class, $factory->getClass()); + $this->assertSame('theDSN', $factory->getArgument(0)['dsn']); + } + + public function testShouldCreateContext() + { + $container = new ContainerBuilder(); + + $transport = new MongodbTransportFactory(); + + $serviceId = $transport->createContext($container, []); + + $this->assertEquals('enqueue.transport.Mongodb.context', $serviceId); + $this->assertTrue($container->hasDefinition($serviceId)); + + $context = $container->getDefinition('enqueue.transport.Mongodb.context'); + $this->assertInstanceOf(Reference::class, $context->getFactory()[0]); + $this->assertEquals('enqueue.transport.Mongodb.connection_factory', (string) $context->getFactory()[0]); + $this->assertEquals('createContext', $context->getFactory()[1]); + } + + public function testShouldCreateDriver() + { + $container = new ContainerBuilder(); + + $transport = new MongodbTransportFactory(); + + $serviceId = $transport->createDriver($container, []); + + $this->assertEquals('enqueue.client.Mongodb.driver', $serviceId); + $this->assertTrue($container->hasDefinition($serviceId)); + + $driver = $container->getDefinition($serviceId); + $this->assertSame(MongodbDriver::class, $driver->getClass()); + + $this->assertInstanceOf(Reference::class, $driver->getArgument(0)); + $this->assertEquals('enqueue.transport.Mongodb.context', (string) $driver->getArgument(0)); + + $this->assertInstanceOf(Reference::class, $driver->getArgument(1)); + $this->assertEquals('enqueue.client.config', (string) $driver->getArgument(1)); + + $this->assertInstanceOf(Reference::class, $driver->getArgument(2)); + $this->assertEquals('enqueue.client.meta.queue_meta_registry', (string) $driver->getArgument(2)); + } +} From 2e8f931f1089e1057b344ee0b6309d4607fb393e Mon Sep 17 00:00:00 2001 From: Dmitry Tomin Date: Fri, 27 Apr 2018 18:45:08 +0300 Subject: [PATCH 04/24] fix --- pkg/mongodb/Symfony/MongodbTransportFactory.php | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/pkg/mongodb/Symfony/MongodbTransportFactory.php b/pkg/mongodb/Symfony/MongodbTransportFactory.php index 84d163239..2efb7627a 100644 --- a/pkg/mongodb/Symfony/MongodbTransportFactory.php +++ b/pkg/mongodb/Symfony/MongodbTransportFactory.php @@ -42,6 +42,7 @@ public function addConfiguration(ArrayNodeDefinition $builder) ->children() ->scalarNode('dsn') ->info('The Mongodb DSN. Other parameters are ignored if set') + ->isRequired() ->end() ->scalarNode('dbname') ->defaultValue('enqueue') @@ -64,12 +65,7 @@ public function addConfiguration(ArrayNodeDefinition $builder) */ public function createConnectionFactory(ContainerBuilder $container, array $config) { - if (false == empty($config['dsn'])) { - $factory = new Definition(MongodbConnectionFactory::class); - $factory->setArguments([$config]); - } else { - throw new \LogicException('Set "dsn" options when you want ot use Mongodb.'); - } + $factory = new Definition(MongodbConnectionFactory::class, [$config]); $factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName()); $container->setDefinition($factoryId, $factory); From 7e9f62d4a50b231b045e59188c8a53caf2114bf2 Mon Sep 17 00:00:00 2001 From: Dmitry Tomin Date: Fri, 27 Apr 2018 19:25:15 +0300 Subject: [PATCH 05/24] develop --- pkg/mongodb/MongodbConnectionFactory.php | 2 ++ pkg/mongodb/MongodbDestination.php | 9 +++++++++ pkg/mongodb/MongodbProducer.php | 2 +- pkg/mongodb/Tests/MongodbContextTest.php | 2 +- pkg/mongodb/Tests/MongodbDestinationTest.php | 2 +- 5 files changed, 14 insertions(+), 3 deletions(-) diff --git a/pkg/mongodb/MongodbConnectionFactory.php b/pkg/mongodb/MongodbConnectionFactory.php index 23ff86e0d..818fcfade 100644 --- a/pkg/mongodb/MongodbConnectionFactory.php +++ b/pkg/mongodb/MongodbConnectionFactory.php @@ -40,6 +40,8 @@ public function __construct($config = 'mongodb:') } $config = array_replace([ 'uri' => 'mongodb://127.0.0.1/', + 'dbname' => 'enqueue', + 'collection_name' => 'enqueue', ], $config); $this->config = $config; diff --git a/pkg/mongodb/MongodbDestination.php b/pkg/mongodb/MongodbDestination.php index 78b07ccb7..360f58a25 100644 --- a/pkg/mongodb/MongodbDestination.php +++ b/pkg/mongodb/MongodbDestination.php @@ -28,6 +28,15 @@ public function getQueueName() return $this->destinationName; } + /** + * Alias for getQueueName() + * {@inheritdoc} + */ + public function getName() + { + return $this->getQueueName(); + } + /** * {@inheritdoc} */ diff --git a/pkg/mongodb/MongodbProducer.php b/pkg/mongodb/MongodbProducer.php index 304a9ae5c..3135d84f0 100644 --- a/pkg/mongodb/MongodbProducer.php +++ b/pkg/mongodb/MongodbProducer.php @@ -83,7 +83,7 @@ public function send(PsrDestination $destination, PsrMessage $message) 'headers' => $message->getHeaders(), 'properties' => $message->getProperties(), 'priority' => $message->getPriority(), - 'queue' => $destination->getQueueName(), + 'queue' => $destination->getName(), 'redelivered' => $message->isRedelivered(), ]; diff --git a/pkg/mongodb/Tests/MongodbContextTest.php b/pkg/mongodb/Tests/MongodbContextTest.php index 01f44abfe..c9eb1cbd7 100644 --- a/pkg/mongodb/Tests/MongodbContextTest.php +++ b/pkg/mongodb/Tests/MongodbContextTest.php @@ -81,7 +81,7 @@ public function testShouldCreateQueue() $queue = $context->createQueue('queue'); $this->assertInstanceOf(MongodbDestination::class, $queue); - $this->assertEquals('queue', $queue->getQueueName()); + $this->assertEquals('queue', $queue->getName()); } public function testShouldCreateProducer() diff --git a/pkg/mongodb/Tests/MongodbDestinationTest.php b/pkg/mongodb/Tests/MongodbDestinationTest.php index ddde2110c..2651b0e0b 100644 --- a/pkg/mongodb/Tests/MongodbDestinationTest.php +++ b/pkg/mongodb/Tests/MongodbDestinationTest.php @@ -31,7 +31,7 @@ public function testShouldReturnTopicAndQueuePreviouslySetInConstructor() { $destination = new MongodbDestination('topic-or-queue-name'); - $this->assertSame('topic-or-queue-name', $destination->getQueueName()); + $this->assertSame('topic-or-queue-name', $destination->getName()); $this->assertSame('topic-or-queue-name', $destination->getTopicName()); } } From 9cbbce8167bf6ebde02bb07fbbf26015636f9ff8 Mon Sep 17 00:00:00 2001 From: Dmitry Tomin Date: Wed, 2 May 2018 12:11:38 +0300 Subject: [PATCH 06/24] develop --- pkg/mongodb/MongodbMessage.php | 2 -- pkg/mongodb/MongodbProducer.php | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/pkg/mongodb/MongodbMessage.php b/pkg/mongodb/MongodbMessage.php index 0ef7ce0e0..9a10e5f7e 100644 --- a/pkg/mongodb/MongodbMessage.php +++ b/pkg/mongodb/MongodbMessage.php @@ -66,8 +66,6 @@ public function __construct($body = '', array $properties = [], array $headers = $this->properties = $properties; $this->headers = $headers; $this->redelivered = false; - $this->priority = 0; - $this->deliveryDelay = null; } /** diff --git a/pkg/mongodb/MongodbProducer.php b/pkg/mongodb/MongodbProducer.php index 3135d84f0..2a06791cd 100644 --- a/pkg/mongodb/MongodbProducer.php +++ b/pkg/mongodb/MongodbProducer.php @@ -52,7 +52,7 @@ public function send(PsrDestination $destination, PsrMessage $message) InvalidDestinationException::assertDestinationInstanceOf($destination, MongodbDestination::class); InvalidMessageException::assertMessageInstanceOf($message, MongodbMessage::class); - if (null !== $this->priority && 0 === $message->getPriority()) { + if (null !== $this->priority && null === $message->getPriority()) { $message->setPriority($this->priority); } if (null !== $this->deliveryDelay && null === $message->getDeliveryDelay()) { From 947e00f0598f28554bbb5a3100b046b90215a626 Mon Sep 17 00:00:00 2001 From: Max Kotliar Date: Thu, 3 May 2018 12:13:34 +0300 Subject: [PATCH 07/24] Update .travis.yml --- .travis.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.travis.yml b/.travis.yml index 328164818..f64d1928b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -48,6 +48,9 @@ cache: directories: - $HOME/.composer/cache +before_install: + - echo "extension = mongodb.so" >> ~/.phpenv/versions/$(phpenv version-name)/etc/php.ini + install: - rm $HOME/.phpenv/versions/$(phpenv version-name)/etc/conf.d/xdebug.ini; - echo "memory_limit=2048M" >> ~/.phpenv/versions/$(phpenv version-name)/etc/conf.d/travis.ini From a7ea2f1962c873d06836f3b80fb0afda74d2e15f Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Thu, 3 May 2018 12:25:49 +0300 Subject: [PATCH 08/24] update enqueue/dev docker image --- docker/Dockerfile | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/docker/Dockerfile b/docker/Dockerfile index 7c23fb7ec..323e241d5 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -13,10 +13,6 @@ RUN set -x && \ git clone https://github.com/pdezwart/php-amqp.git . && git checkout v1.9.3 && \ phpize --clean && phpize && ./configure && make install -## confis - -# RUN rm -f /etc/php/7.0/cli/conf.d/*xdebug.ini - ## librdkafka RUN set -x && \ apt-get update && \ @@ -27,10 +23,10 @@ RUN set -x && \ git checkout v0.11.1 && \ ./configure && make && make install && \ pecl install rdkafka && \ - echo "extension=rdkafka.so" > /etc/php/7.1/cli/conf.d/10-rdkafka.ini && \ - echo "extension=rdkafka.so" > /etc/php/7.1/fpm/conf.d/10-rdkafka.ini + echo "extension=rdkafka.so" > /etc/php/7.2/cli/conf.d/10-rdkafka.ini && \ + echo "extension=rdkafka.so" > /etc/php/7.2/fpm/conf.d/10-rdkafka.ini -COPY ./php/cli.ini /etc/php/7.1/cli/conf.d/1-dev_cli.ini +COPY ./php/cli.ini /etc/php/7.2/cli/conf.d/1-dev_cli.ini COPY ./bin/dev_entrypoiny.sh /usr/local/bin/entrypoint.sh RUN chmod u+x /usr/local/bin/entrypoint.sh From e924c23027081ee82ff03c816f771da8899bc86b Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Thu, 3 May 2018 12:25:54 +0300 Subject: [PATCH 09/24] debug travis. --- .travis.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.travis.yml b/.travis.yml index c368c000e..2d1036e4a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -46,6 +46,8 @@ cache: before_install: - echo "extension = mongodb.so" >> ~/.phpenv/versions/$(phpenv version-name)/etc/php.ini + - php -m + - php -i | grep -C 15 mongo install: - rm $HOME/.phpenv/versions/$(phpenv version-name)/etc/conf.d/xdebug.ini; From e8de54a8a03885a6c251b3ed9d8b0b68a8bffe90 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Thu, 3 May 2018 12:34:32 +0300 Subject: [PATCH 10/24] use $HOME --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 2d1036e4a..19f623a12 100644 --- a/.travis.yml +++ b/.travis.yml @@ -45,7 +45,7 @@ cache: - $HOME/.composer/cache before_install: - - echo "extension = mongodb.so" >> ~/.phpenv/versions/$(phpenv version-name)/etc/php.ini + - echo "extension = mongodb.so" >> $HOME/.phpenv/versions/$(phpenv version-name)/etc/php.ini - php -m - php -i | grep -C 15 mongo From f76e48d414901b7a60b33d7b827aab30486b9c4c Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Thu, 3 May 2018 13:13:08 +0300 Subject: [PATCH 11/24] add group --- pkg/mongodb/Tests/MongodbConnectionFactoryTest.php | 3 +++ pkg/mongodb/Tests/MongodbConsumerTest.php | 3 +++ pkg/mongodb/Tests/MongodbContextTest.php | 3 +++ pkg/mongodb/Tests/MongodbDestinationTest.php | 3 +++ pkg/mongodb/Tests/MongodbMessageTest.php | 3 +++ pkg/mongodb/Tests/MongodbProducerTest.php | 3 +++ pkg/mongodb/Tests/Spec/MongodbConnectionFactoryTest.php | 3 +++ pkg/mongodb/Tests/Spec/MongodbContextTest.php | 1 + pkg/mongodb/Tests/Spec/MongodbMessageTest.php | 3 +++ pkg/mongodb/Tests/Spec/MongodbProducerTest.php | 1 + pkg/mongodb/Tests/Spec/MongodbQueueTest.php | 3 +++ pkg/mongodb/Tests/Spec/MongodbRequeueMessageTest.php | 1 + .../Spec/MongodbSendAndReceiveDelayedMessageFromQueueTest.php | 1 + .../MongodbSendAndReceivePriorityMessagesFromQueueTest.php | 1 + .../MongodbSendAndReceiveTimeToLiveMessagesFromQueueTest.php | 1 + .../Tests/Spec/MongodbSendToAndReceiveFromQueueTest.php | 1 + .../Tests/Spec/MongodbSendToAndReceiveFromTopicTest.php | 1 + .../Tests/Spec/MongodbSendToAndReceiveNoWaitFromQueueTest.php | 1 + .../Tests/Spec/MongodbSendToAndReceiveNoWaitFromTopicTest.php | 1 + pkg/mongodb/Tests/Spec/MongodbTopicTest.php | 3 +++ pkg/mongodb/Tests/Symfony/MongodbTransportFactoryTest.php | 3 +++ pkg/rdkafka/Tests/Client/RdKafkaDriverTest.php | 3 +++ pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php | 3 +++ 23 files changed, 49 insertions(+) diff --git a/pkg/mongodb/Tests/MongodbConnectionFactoryTest.php b/pkg/mongodb/Tests/MongodbConnectionFactoryTest.php index 3fb1dfdc2..541d8c404 100644 --- a/pkg/mongodb/Tests/MongodbConnectionFactoryTest.php +++ b/pkg/mongodb/Tests/MongodbConnectionFactoryTest.php @@ -7,6 +7,9 @@ use Enqueue\Test\ClassExtensionTrait; use Interop\Queue\PsrConnectionFactory; +/** + * @group mongodb + */ class MongodbConnectionFactoryTest extends \PHPUnit_Framework_TestCase { use ClassExtensionTrait; diff --git a/pkg/mongodb/Tests/MongodbConsumerTest.php b/pkg/mongodb/Tests/MongodbConsumerTest.php index d4f59b26a..0c2e5c664 100644 --- a/pkg/mongodb/Tests/MongodbConsumerTest.php +++ b/pkg/mongodb/Tests/MongodbConsumerTest.php @@ -12,6 +12,9 @@ use Interop\Queue\PsrConsumer; use Interop\Queue\PsrMessage; +/** + * @group mongodb + */ class MongodbConsumerTest extends \PHPUnit_Framework_TestCase { use ClassExtensionTrait; diff --git a/pkg/mongodb/Tests/MongodbContextTest.php b/pkg/mongodb/Tests/MongodbContextTest.php index c9eb1cbd7..85707a6fc 100644 --- a/pkg/mongodb/Tests/MongodbContextTest.php +++ b/pkg/mongodb/Tests/MongodbContextTest.php @@ -13,6 +13,9 @@ use Interop\Queue\PsrDestination; use MongoDB\Client; +/** + * @group mongodb + */ class MongodbContextTest extends \PHPUnit_Framework_TestCase { use ClassExtensionTrait; diff --git a/pkg/mongodb/Tests/MongodbDestinationTest.php b/pkg/mongodb/Tests/MongodbDestinationTest.php index 2651b0e0b..ef81e8fc9 100644 --- a/pkg/mongodb/Tests/MongodbDestinationTest.php +++ b/pkg/mongodb/Tests/MongodbDestinationTest.php @@ -8,6 +8,9 @@ use Interop\Queue\PsrQueue; use Interop\Queue\PsrTopic; +/** + * @group mongodb + */ class MongodbDestinationTest extends \PHPUnit_Framework_TestCase { use ClassExtensionTrait; diff --git a/pkg/mongodb/Tests/MongodbMessageTest.php b/pkg/mongodb/Tests/MongodbMessageTest.php index 50af98e80..df6ca2595 100644 --- a/pkg/mongodb/Tests/MongodbMessageTest.php +++ b/pkg/mongodb/Tests/MongodbMessageTest.php @@ -5,6 +5,9 @@ use Enqueue\Mongodb\MongodbMessage; use Enqueue\Test\ClassExtensionTrait; +/** + * @group mongodb + */ class MongodbMessageTest extends \PHPUnit_Framework_TestCase { use ClassExtensionTrait; diff --git a/pkg/mongodb/Tests/MongodbProducerTest.php b/pkg/mongodb/Tests/MongodbProducerTest.php index 3b32d1a8b..ca59d5520 100644 --- a/pkg/mongodb/Tests/MongodbProducerTest.php +++ b/pkg/mongodb/Tests/MongodbProducerTest.php @@ -12,6 +12,9 @@ use Interop\Queue\PsrDestination; use Interop\Queue\PsrProducer; +/** + * @group mongodb + */ class MongodbProducerTest extends \PHPUnit_Framework_TestCase { use ClassExtensionTrait; diff --git a/pkg/mongodb/Tests/Spec/MongodbConnectionFactoryTest.php b/pkg/mongodb/Tests/Spec/MongodbConnectionFactoryTest.php index 748ed516b..324fe52a6 100644 --- a/pkg/mongodb/Tests/Spec/MongodbConnectionFactoryTest.php +++ b/pkg/mongodb/Tests/Spec/MongodbConnectionFactoryTest.php @@ -5,6 +5,9 @@ use Enqueue\Mongodb\MongodbConnectionFactory; use Interop\Queue\Spec\PsrConnectionFactorySpec; +/** + * @group mongodb + */ class MongodbConnectionFactoryTest extends PsrConnectionFactorySpec { /** diff --git a/pkg/mongodb/Tests/Spec/MongodbContextTest.php b/pkg/mongodb/Tests/Spec/MongodbContextTest.php index c892f0c62..a53efa077 100644 --- a/pkg/mongodb/Tests/Spec/MongodbContextTest.php +++ b/pkg/mongodb/Tests/Spec/MongodbContextTest.php @@ -6,6 +6,7 @@ /** * @group functional + * @group mongodb */ class MongodbContextTest extends PsrContextSpec { diff --git a/pkg/mongodb/Tests/Spec/MongodbMessageTest.php b/pkg/mongodb/Tests/Spec/MongodbMessageTest.php index 1a4e8d77d..51ab55ac5 100644 --- a/pkg/mongodb/Tests/Spec/MongodbMessageTest.php +++ b/pkg/mongodb/Tests/Spec/MongodbMessageTest.php @@ -5,6 +5,9 @@ use Enqueue\Mongodb\MongodbMessage; use Interop\Queue\Spec\PsrMessageSpec; +/** + * @group mongodb + */ class MongodbMessageTest extends PsrMessageSpec { /** diff --git a/pkg/mongodb/Tests/Spec/MongodbProducerTest.php b/pkg/mongodb/Tests/Spec/MongodbProducerTest.php index d5c518119..4f6dc8522 100644 --- a/pkg/mongodb/Tests/Spec/MongodbProducerTest.php +++ b/pkg/mongodb/Tests/Spec/MongodbProducerTest.php @@ -6,6 +6,7 @@ /** * @group functional + * @group mongodb */ class MongodbProducerTest extends PsrProducerSpec { diff --git a/pkg/mongodb/Tests/Spec/MongodbQueueTest.php b/pkg/mongodb/Tests/Spec/MongodbQueueTest.php index 5945975f5..a555461a7 100644 --- a/pkg/mongodb/Tests/Spec/MongodbQueueTest.php +++ b/pkg/mongodb/Tests/Spec/MongodbQueueTest.php @@ -5,6 +5,9 @@ use Enqueue\Mongodb\MongodbDestination; use Interop\Queue\Spec\PsrQueueSpec; +/** + * @group mongodb + */ class MongodbQueueTest extends PsrQueueSpec { /** diff --git a/pkg/mongodb/Tests/Spec/MongodbRequeueMessageTest.php b/pkg/mongodb/Tests/Spec/MongodbRequeueMessageTest.php index ff7048faf..fa5832b4a 100644 --- a/pkg/mongodb/Tests/Spec/MongodbRequeueMessageTest.php +++ b/pkg/mongodb/Tests/Spec/MongodbRequeueMessageTest.php @@ -6,6 +6,7 @@ /** * @group functional + * @group mongodb */ class MongodbRequeueMessageTest extends RequeueMessageSpec { diff --git a/pkg/mongodb/Tests/Spec/MongodbSendAndReceiveDelayedMessageFromQueueTest.php b/pkg/mongodb/Tests/Spec/MongodbSendAndReceiveDelayedMessageFromQueueTest.php index 6cf600326..aa7ebf4d7 100644 --- a/pkg/mongodb/Tests/Spec/MongodbSendAndReceiveDelayedMessageFromQueueTest.php +++ b/pkg/mongodb/Tests/Spec/MongodbSendAndReceiveDelayedMessageFromQueueTest.php @@ -6,6 +6,7 @@ /** * @group functional + * @group mongodb */ class MongodbSendAndReceiveDelayedMessageFromQueueTest extends SendAndReceiveDelayedMessageFromQueueSpec { diff --git a/pkg/mongodb/Tests/Spec/MongodbSendAndReceivePriorityMessagesFromQueueTest.php b/pkg/mongodb/Tests/Spec/MongodbSendAndReceivePriorityMessagesFromQueueTest.php index f06eeb3ff..d105bc90e 100644 --- a/pkg/mongodb/Tests/Spec/MongodbSendAndReceivePriorityMessagesFromQueueTest.php +++ b/pkg/mongodb/Tests/Spec/MongodbSendAndReceivePriorityMessagesFromQueueTest.php @@ -9,6 +9,7 @@ /** * @group functional + * @group mongodb */ class MongodbSendAndReceivePriorityMessagesFromQueueTest extends SendAndReceivePriorityMessagesFromQueueSpec { diff --git a/pkg/mongodb/Tests/Spec/MongodbSendAndReceiveTimeToLiveMessagesFromQueueTest.php b/pkg/mongodb/Tests/Spec/MongodbSendAndReceiveTimeToLiveMessagesFromQueueTest.php index 766a02e33..5f591eb43 100644 --- a/pkg/mongodb/Tests/Spec/MongodbSendAndReceiveTimeToLiveMessagesFromQueueTest.php +++ b/pkg/mongodb/Tests/Spec/MongodbSendAndReceiveTimeToLiveMessagesFromQueueTest.php @@ -6,6 +6,7 @@ /** * @group functional + * @group mongodb */ class MongodbSendAndReceiveTimeToLiveMessagesFromQueueTest extends SendAndReceiveTimeToLiveMessagesFromQueueSpec { diff --git a/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveFromQueueTest.php b/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveFromQueueTest.php index b67dd570f..cb420e368 100644 --- a/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveFromQueueTest.php +++ b/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveFromQueueTest.php @@ -6,6 +6,7 @@ /** * @group functional + * @group mongodb */ class MongodbSendToAndReceiveFromQueueTest extends SendToAndReceiveFromQueueSpec { diff --git a/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveFromTopicTest.php b/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveFromTopicTest.php index e8ef8396d..1078291d3 100644 --- a/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveFromTopicTest.php +++ b/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveFromTopicTest.php @@ -6,6 +6,7 @@ /** * @group functional + * @group mongodb */ class MongodbSendToAndReceiveFromTopicTest extends SendToAndReceiveFromTopicSpec { diff --git a/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveNoWaitFromQueueTest.php b/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveNoWaitFromQueueTest.php index 955d41495..596d17735 100644 --- a/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveNoWaitFromQueueTest.php +++ b/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveNoWaitFromQueueTest.php @@ -6,6 +6,7 @@ /** * @group functional + * @group mongodb */ class MongodbSendToAndReceiveNoWaitFromQueueTest extends SendToAndReceiveNoWaitFromQueueSpec { diff --git a/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveNoWaitFromTopicTest.php b/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveNoWaitFromTopicTest.php index 511485af0..c8ca6d1ec 100644 --- a/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveNoWaitFromTopicTest.php +++ b/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveNoWaitFromTopicTest.php @@ -6,6 +6,7 @@ /** * @group functional + * @group mongodb */ class MongodbSendToAndReceiveNoWaitFromTopicTest extends SendToAndReceiveNoWaitFromTopicSpec { diff --git a/pkg/mongodb/Tests/Spec/MongodbTopicTest.php b/pkg/mongodb/Tests/Spec/MongodbTopicTest.php index 8010874cd..14a79f5a5 100644 --- a/pkg/mongodb/Tests/Spec/MongodbTopicTest.php +++ b/pkg/mongodb/Tests/Spec/MongodbTopicTest.php @@ -5,6 +5,9 @@ use Enqueue\Mongodb\MongodbDestination; use Interop\Queue\Spec\PsrTopicSpec; +/** + * @group mongodb + */ class MongodbTopicTest extends PsrTopicSpec { /** diff --git a/pkg/mongodb/Tests/Symfony/MongodbTransportFactoryTest.php b/pkg/mongodb/Tests/Symfony/MongodbTransportFactoryTest.php index e38b78adc..6499651ed 100644 --- a/pkg/mongodb/Tests/Symfony/MongodbTransportFactoryTest.php +++ b/pkg/mongodb/Tests/Symfony/MongodbTransportFactoryTest.php @@ -12,6 +12,9 @@ use Symfony\Component\DependencyInjection\ContainerBuilder; use Symfony\Component\DependencyInjection\Reference; +/** + * @group mongodb + */ class MongodbTransportFactoryTest extends \PHPUnit_Framework_TestCase { use ClassExtensionTrait; diff --git a/pkg/rdkafka/Tests/Client/RdKafkaDriverTest.php b/pkg/rdkafka/Tests/Client/RdKafkaDriverTest.php index c0cfab616..af9de4110 100644 --- a/pkg/rdkafka/Tests/Client/RdKafkaDriverTest.php +++ b/pkg/rdkafka/Tests/Client/RdKafkaDriverTest.php @@ -13,6 +13,9 @@ use Enqueue\Test\ClassExtensionTrait; use Interop\Queue\PsrProducer; +/** + * @group rdkafka + */ class RdKafkaDriverTest extends \PHPUnit_Framework_TestCase { use ClassExtensionTrait; diff --git a/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php b/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php index 950034205..6816e1690 100644 --- a/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php +++ b/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php @@ -13,6 +13,9 @@ use Symfony\Component\DependencyInjection\ContainerBuilder; use Symfony\Component\DependencyInjection\Reference; +/** + * @group rdkafka + */ class RdKafkaTransportFactoryTest extends TestCase { use ClassExtensionTrait; From aa3178f456589d0073be73dbb6807ce0d60e52ea Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Thu, 3 May 2018 13:13:28 +0300 Subject: [PATCH 12/24] add license, readme, phpunit cfg --- pkg/mongodb/LICENSE | 20 ++++++++++++++++++++ pkg/mongodb/README.md | 27 +++++++++++++++++++++++++++ pkg/mongodb/phpunit.xml.dist | 30 ++++++++++++++++++++++++++++++ 3 files changed, 77 insertions(+) create mode 100644 pkg/mongodb/LICENSE create mode 100644 pkg/mongodb/README.md create mode 100644 pkg/mongodb/phpunit.xml.dist diff --git a/pkg/mongodb/LICENSE b/pkg/mongodb/LICENSE new file mode 100644 index 000000000..20211e5fd --- /dev/null +++ b/pkg/mongodb/LICENSE @@ -0,0 +1,20 @@ +The MIT License (MIT) +Copyright (c) 2018 Max Kotliar + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is furnished +to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/pkg/mongodb/README.md b/pkg/mongodb/README.md new file mode 100644 index 000000000..d29cde0f2 --- /dev/null +++ b/pkg/mongodb/README.md @@ -0,0 +1,27 @@ +# Mongodb Transport + +[![Gitter](https://badges.gitter.im/php-enqueue/Lobby.svg)](https://gitter.im/php-enqueue/Lobby) +[![Build Status](https://travis-ci.org/php-enqueue/mongodb.png?branch=master)](https://travis-ci.org/php-enqueue/mongodb) +[![Total Downloads](https://poser.pugx.org/enqueue/mongodb/d/total.png)](https://packagist.org/packages/enqueue/mongodb) +[![Latest Stable Version](https://poser.pugx.org/enqueue/mongodb/version.png)](https://packagist.org/packages/enqueue/mongodb) + +This is an implementation of the queue specification. It allows you to use MongoDB database as a message broker. + +## Resources + +* [Site](https://enqueue.forma-pro.com/) +* [Documentation](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/index.md) +* [Questions](https://gitter.im/php-enqueue/Lobby) +* [Issue Tracker](https://github.com/php-enqueue/enqueue-dev/issues) + +## Developed by Forma-Pro + +Forma-Pro is a full stack development company which interests also spread to open source development. +Being a team of strong professionals we have an aim an ability to help community by developing cutting edge solutions in the areas of e-commerce, docker & microservice oriented architecture where we have accumulated a huge many-years experience. +Our main specialization is Symfony framework based solution, but we are always looking to the technologies that allow us to do our job the best way. We are committed to creating solutions that revolutionize the way how things are developed in aspects of architecture & scalability. + +If you have any questions and inquires about our open source development, this product particularly or any other matter feel free to contact at opensource@forma-pro.com + +## License + +It is released under the [MIT License](LICENSE). \ No newline at end of file diff --git a/pkg/mongodb/phpunit.xml.dist b/pkg/mongodb/phpunit.xml.dist new file mode 100644 index 000000000..1f34af01d --- /dev/null +++ b/pkg/mongodb/phpunit.xml.dist @@ -0,0 +1,30 @@ + + + + + + + ./Tests + + + + + + . + + ./vendor + ./Tests + + + + From f460e6c2d448b2c52c2241833e574b2771f69ff9 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Thu, 3 May 2018 13:18:18 +0300 Subject: [PATCH 13/24] mongolib ^1.2, ext-mongodb ^1.3 --- pkg/mongodb/composer.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/mongodb/composer.json b/pkg/mongodb/composer.json index 8889713f1..118001a27 100644 --- a/pkg/mongodb/composer.json +++ b/pkg/mongodb/composer.json @@ -12,7 +12,8 @@ "require": { "php": "^7.1", "queue-interop/queue-interop": "^0.6@dev|^1.0.0-alpha1", - "mongodb/mongodb": "^1.3" + "mongodb/mongodb": "^1.2", + "ext-mongodb": "^1.3" }, "require-dev": { "phpunit/phpunit": "~5.4.0", From 41f2b47c985724cf03dd9c7d4c2a6bda47e1ea0a Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Thu, 3 May 2018 13:22:36 +0300 Subject: [PATCH 14/24] require php7.0 --- pkg/mongodb/composer.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/mongodb/composer.json b/pkg/mongodb/composer.json index 118001a27..e4a08da8c 100644 --- a/pkg/mongodb/composer.json +++ b/pkg/mongodb/composer.json @@ -10,7 +10,7 @@ "homepage": "https://enqueue.forma-pro.com/", "license": "MIT", "require": { - "php": "^7.1", + "php": "^7.0", "queue-interop/queue-interop": "^0.6@dev|^1.0.0-alpha1", "mongodb/mongodb": "^1.2", "ext-mongodb": "^1.3" From a9fd835dbf75f7640b0ed73aff98e2f10972236b Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Thu, 3 May 2018 14:04:50 +0300 Subject: [PATCH 15/24] [doc][skip ci] add docs. --- README.md | 4 ++ docs/transport/mongodb.md | 142 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 146 insertions(+) create mode 100644 docs/transport/mongodb.md diff --git a/README.md b/README.md index 59b0ae68e..b6df3127b 100644 --- a/README.md +++ b/README.md @@ -61,6 +61,10 @@ Features: [![Build Status](https://travis-ci.org/php-enqueue/fs.png?branch=master)](https://travis-ci.org/php-enqueue/fs) [![Total Downloads](https://poser.pugx.org/enqueue/fs/d/total.png)](https://packagist.org/packages/enqueue/fs) [![Latest Stable Version](https://poser.pugx.org/enqueue/fs/version.png)](https://packagist.org/packages/enqueue/fs) + * [Mongodb](docs/transport/mongodb.md) +[![Build Status](https://travis-ci.org/php-enqueue/mongodb.png?branch=master)](https://travis-ci.org/php-enqueue/mongodb) +[![Total Downloads](https://poser.pugx.org/enqueue/mongodb/d/total.png)](https://packagist.org/packages/enqueue/mongodb) +[![Latest Stable Version](https://poser.pugx.org/enqueue/mongodb/version.png)](https://packagist.org/packages/enqueue/mongodb) * [Null](docs/transport/null.md). [![Build Status](https://travis-ci.org/php-enqueue/null.png?branch=master)](https://travis-ci.org/php-enqueue/null) [![Total Downloads](https://poser.pugx.org/enqueue/null/d/total.png)](https://packagist.org/packages/enqueue/null) diff --git a/docs/transport/mongodb.md b/docs/transport/mongodb.md new file mode 100644 index 000000000..8bae7bd97 --- /dev/null +++ b/docs/transport/mongodb.md @@ -0,0 +1,142 @@ +# Mongodb transport + +Allows to use [MongoDB](https://www.mongodb.com/) as a message queue broker. + +* [Installation](#installation) +* [Create context](#create-context) +* [Send message to topic](#send-message-to-topic) +* [Send message to queue](#send-message-to-queue) +* [Send priority message](#send-priority-message) +* [Send expiration message](#send-expiration-message) +* [Send delayed message](#send-delayed-message) +* [Consume message](#consume-message) + +## Installation + +```bash +$ composer require enqueue/mongodb +``` + +## Create context + +```php + 'mongodb://localhost:27017/db_name', + 'dbname' => 'enqueue', + 'collection_name' => 'enqueue', + 'polling_interval' => '1000', +]); + +$psrContext = $factory->createContext(); + +// if you have enqueue/enqueue library installed you can use a function from there to create the context +$psrContext = \Enqueue\dsn_to_context('mongodb:'); +``` + +## Send message to topic + +```php +createMessage('Hello world!'); + +$psrContext->createProducer()->send($fooTopic, $message); +``` + +## Send message to queue + +```php +createMessage('Hello world!'); + +$psrContext->createProducer()->send($fooQueue, $message); +``` + +## Send priority message + +```php +createQueue('foo'); + +$message = $psrContext->createMessage('Hello world!'); + +$psrContext->createProducer() + ->setPriority(5) // the higher priority the sooner a message gets to a consumer + // + ->send($fooQueue, $message) +; +``` + +## Send expiration message + +```php +createMessage('Hello world!'); + +$psrContext->createProducer() + ->setTimeToLive(60000) // 60 sec + // + ->send($fooQueue, $message) +; +``` + +## Send delayed message + +```php +createMessage('Hello world!'); + +$psrContext->createProducer() + ->setDeliveryDelay(5000) // 5 sec + + ->send($fooQueue, $message) +; +```` + +## Consume message: + +```php +createConsumer($fooQueue); + +$message = $consumer->receive(); + +// process a message + +$consumer->acknowledge($message); +// $consumer->reject($message); +``` + +[back to index](../index.md) From 368b608817d0dd579a331713e3a2628b210939bf Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Thu, 3 May 2018 14:07:42 +0300 Subject: [PATCH 16/24] [symfony] add mongodb to default transport and dsn_to_function --- pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php | 7 +++++++ pkg/enqueue/Symfony/DefaultTransportFactory.php | 6 ++++++ .../Tests/Functions/DsnToConnectionFactoryFunctionTest.php | 3 +++ pkg/enqueue/Tests/Symfony/DefaultTransportFactoryTest.php | 2 ++ pkg/enqueue/functions.php | 5 +++++ 5 files changed, 23 insertions(+) diff --git a/pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php b/pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php index cec646a7c..34adc2c68 100644 --- a/pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php +++ b/pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php @@ -206,6 +206,13 @@ public function provideEnqueueConfigs() ]]; } + yield 'mongodb_dsn' => [[ + 'transport' => [ + 'default' => 'mongodb', + 'mongodb' => getenv('MONGO_DSN'), + ], + ]]; + // yield 'gps' => [[ // 'transport' => [ // 'default' => 'gps', diff --git a/pkg/enqueue/Symfony/DefaultTransportFactory.php b/pkg/enqueue/Symfony/DefaultTransportFactory.php index 65b877a30..fa45d2544 100644 --- a/pkg/enqueue/Symfony/DefaultTransportFactory.php +++ b/pkg/enqueue/Symfony/DefaultTransportFactory.php @@ -8,6 +8,8 @@ use Enqueue\Fs\Symfony\FsTransportFactory; use Enqueue\Gps\GpsConnectionFactory; use Enqueue\Gps\Symfony\GpsTransportFactory; +use Enqueue\Mongodb\MongodbConnectionFactory; +use Enqueue\Mongodb\Symfony\MongodbTransportFactory; use Enqueue\Null\NullConnectionFactory; use Enqueue\Null\Symfony\NullTransportFactory; use Enqueue\RdKafka\RdKafkaConnectionFactory; @@ -215,6 +217,10 @@ private function findFactory($dsn) return new RdKafkaTransportFactory('default_kafka'); } + if ($factory instanceof MongodbConnectionFactory) { + return new MongodbTransportFactory('default_mongodb'); + } + throw new \LogicException(sprintf( 'There is no supported transport factory for the connection factory "%s" created from DSN "%s"', get_class($factory), diff --git a/pkg/enqueue/Tests/Functions/DsnToConnectionFactoryFunctionTest.php b/pkg/enqueue/Tests/Functions/DsnToConnectionFactoryFunctionTest.php index f123b116b..2c65eb6d9 100644 --- a/pkg/enqueue/Tests/Functions/DsnToConnectionFactoryFunctionTest.php +++ b/pkg/enqueue/Tests/Functions/DsnToConnectionFactoryFunctionTest.php @@ -9,6 +9,7 @@ use Enqueue\Fs\FsConnectionFactory; use Enqueue\Gearman\GearmanConnectionFactory; use Enqueue\Gps\GpsConnectionFactory; +use Enqueue\Mongodb\MongodbConnectionFactory; use Enqueue\Null\NullConnectionFactory; use Enqueue\Pheanstalk\PheanstalkConnectionFactory; use Enqueue\RdKafka\RdKafkaConnectionFactory; @@ -97,5 +98,7 @@ public static function provideDSNs() yield ['sqs:', SqsConnectionFactory::class]; yield ['gps:', GpsConnectionFactory::class]; + + yield ['mongodb:', MongodbConnectionFactory::class]; } } diff --git a/pkg/enqueue/Tests/Symfony/DefaultTransportFactoryTest.php b/pkg/enqueue/Tests/Symfony/DefaultTransportFactoryTest.php index 8f2df545b..a97fb86b8 100644 --- a/pkg/enqueue/Tests/Symfony/DefaultTransportFactoryTest.php +++ b/pkg/enqueue/Tests/Symfony/DefaultTransportFactoryTest.php @@ -291,5 +291,7 @@ public static function provideDSNs() yield ['stomp:', 'default_stomp']; yield ['kafka:', 'default_kafka']; + + yield ['mongodb:', 'default_mongodb']; } } diff --git a/pkg/enqueue/functions.php b/pkg/enqueue/functions.php index 8960785b7..537f9964a 100644 --- a/pkg/enqueue/functions.php +++ b/pkg/enqueue/functions.php @@ -10,6 +10,7 @@ use Enqueue\Fs\FsConnectionFactory; use Enqueue\Gearman\GearmanConnectionFactory; use Enqueue\Gps\GpsConnectionFactory; +use Enqueue\Mongodb\MongodbConnectionFactory; use Enqueue\Null\NullConnectionFactory; use Enqueue\Pheanstalk\PheanstalkConnectionFactory; use Enqueue\RdKafka\RdKafkaConnectionFactory; @@ -108,6 +109,10 @@ function dsn_to_connection_factory($dsn) $map['gps'] = GpsConnectionFactory::class; } + if (class_exists(MongodbConnectionFactory::class)) { + $map['mongodb'] = MongodbConnectionFactory::class; + } + list($scheme) = explode(':', $dsn, 2); if (false == $scheme || false === strpos($dsn, ':')) { throw new \LogicException(sprintf('The scheme could not be parsed from DSN "%s"', $dsn)); From 0f8d1dc245ba060e33229292b6f1cb073432de52 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Thu, 3 May 2018 14:09:59 +0300 Subject: [PATCH 17/24] run mongodb tests. --- phpunit.xml.dist | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/phpunit.xml.dist b/phpunit.xml.dist index b3b93f5a2..61c7430cb 100644 --- a/phpunit.xml.dist +++ b/phpunit.xml.dist @@ -77,6 +77,10 @@ pkg/gps/Tests + + pkg/mongodb/Tests + + pkg/enqueue-bundle/Tests From 41254f9ef71418917156f5b0b7b0048517e9a08b Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Thu, 3 May 2018 14:38:22 +0300 Subject: [PATCH 18/24] fix tests. --- .../Tests/Symfony/MongodbTransportFactoryTest.php | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/mongodb/Tests/Symfony/MongodbTransportFactoryTest.php b/pkg/mongodb/Tests/Symfony/MongodbTransportFactoryTest.php index 6499651ed..7f88a9636 100644 --- a/pkg/mongodb/Tests/Symfony/MongodbTransportFactoryTest.php +++ b/pkg/mongodb/Tests/Symfony/MongodbTransportFactoryTest.php @@ -129,12 +129,12 @@ public function testShouldCreateContext() $serviceId = $transport->createContext($container, []); - $this->assertEquals('enqueue.transport.Mongodb.context', $serviceId); + $this->assertEquals('enqueue.transport.mongodb.context', $serviceId); $this->assertTrue($container->hasDefinition($serviceId)); - $context = $container->getDefinition('enqueue.transport.Mongodb.context'); + $context = $container->getDefinition('enqueue.transport.mongodb.context'); $this->assertInstanceOf(Reference::class, $context->getFactory()[0]); - $this->assertEquals('enqueue.transport.Mongodb.connection_factory', (string) $context->getFactory()[0]); + $this->assertEquals('enqueue.transport.mongodb.connection_factory', (string) $context->getFactory()[0]); $this->assertEquals('createContext', $context->getFactory()[1]); } @@ -146,14 +146,14 @@ public function testShouldCreateDriver() $serviceId = $transport->createDriver($container, []); - $this->assertEquals('enqueue.client.Mongodb.driver', $serviceId); + $this->assertEquals('enqueue.client.mongodb.driver', $serviceId); $this->assertTrue($container->hasDefinition($serviceId)); $driver = $container->getDefinition($serviceId); $this->assertSame(MongodbDriver::class, $driver->getClass()); $this->assertInstanceOf(Reference::class, $driver->getArgument(0)); - $this->assertEquals('enqueue.transport.Mongodb.context', (string) $driver->getArgument(0)); + $this->assertEquals('enqueue.transport.mongodb.context', (string) $driver->getArgument(0)); $this->assertInstanceOf(Reference::class, $driver->getArgument(1)); $this->assertEquals('enqueue.client.config', (string) $driver->getArgument(1)); From f011a8b24a7601d051c375b50b006ab63258a194 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Thu, 3 May 2018 15:06:53 +0300 Subject: [PATCH 19/24] fix tests. --- pkg/mongodb/MongodbContext.php | 2 ++ pkg/mongodb/Tests/MongodbConnectionFactoryTest.php | 4 ++++ pkg/mongodb/Tests/MongodbContextTest.php | 2 +- 3 files changed, 7 insertions(+), 1 deletion(-) diff --git a/pkg/mongodb/MongodbContext.php b/pkg/mongodb/MongodbContext.php index 17f96daf2..ce8945e53 100644 --- a/pkg/mongodb/MongodbContext.php +++ b/pkg/mongodb/MongodbContext.php @@ -13,6 +13,7 @@ class MongodbContext implements PsrContext * @var array */ private $config; + /** * @var Client */ @@ -25,6 +26,7 @@ public function __construct($client, array $config = []) 'collection_name' => 'enqueue', 'polling_interval' => null, ], $config); + $this->client = $client; } diff --git a/pkg/mongodb/Tests/MongodbConnectionFactoryTest.php b/pkg/mongodb/Tests/MongodbConnectionFactoryTest.php index 541d8c404..b46f8563e 100644 --- a/pkg/mongodb/Tests/MongodbConnectionFactoryTest.php +++ b/pkg/mongodb/Tests/MongodbConnectionFactoryTest.php @@ -23,6 +23,8 @@ public function testCouldBeConstructedWithEmptyConfiguration() { $params = [ 'uri' => 'mongodb://127.0.0.1/', + 'dbname' => 'enqueue', + 'collection_name' => 'enqueue', ]; $factory = new MongodbConnectionFactory(); @@ -35,6 +37,8 @@ public function testCouldBeConstructedWithCustomConfiguration() 'uri' => 'mongodb://127.0.0.3/', 'uriOptions' => ['testValue' => 123], 'driverOptions' => ['testValue' => 123], + 'dbname' => 'enqueue', + 'collection_name' => 'enqueue', ]; $factory = new MongodbConnectionFactory($params); diff --git a/pkg/mongodb/Tests/MongodbContextTest.php b/pkg/mongodb/Tests/MongodbContextTest.php index 85707a6fc..0c06397d9 100644 --- a/pkg/mongodb/Tests/MongodbContextTest.php +++ b/pkg/mongodb/Tests/MongodbContextTest.php @@ -65,7 +65,7 @@ public function testShouldCreateMessage() $this->assertEquals('body', $message->getBody()); $this->assertEquals(['pkey' => 'pval'], $message->getProperties()); $this->assertEquals(['hkey' => 'hval'], $message->getHeaders()); - $this->assertSame(0, $message->getPriority()); + $this->assertNull($message->getPriority()); $this->assertFalse($message->isRedelivered()); } From b128a65e0c8babf3480e1f2f61641f914eee4731 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Thu, 3 May 2018 17:21:23 +0300 Subject: [PATCH 20/24] fix tests --- pkg/mongodb/Symfony/MongodbTransportFactory.php | 2 +- pkg/mongodb/Tests/MongodbMessageTest.php | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/mongodb/Symfony/MongodbTransportFactory.php b/pkg/mongodb/Symfony/MongodbTransportFactory.php index 2efb7627a..3486d8a8e 100644 --- a/pkg/mongodb/Symfony/MongodbTransportFactory.php +++ b/pkg/mongodb/Symfony/MongodbTransportFactory.php @@ -22,7 +22,7 @@ class MongodbTransportFactory implements TransportFactoryInterface, DriverFactor /** * @param string $name */ - public function __construct($name = 'Mongodb') + public function __construct($name = 'mongodb') { $this->name = $name; } diff --git a/pkg/mongodb/Tests/MongodbMessageTest.php b/pkg/mongodb/Tests/MongodbMessageTest.php index df6ca2595..b9e5e7501 100644 --- a/pkg/mongodb/Tests/MongodbMessageTest.php +++ b/pkg/mongodb/Tests/MongodbMessageTest.php @@ -30,11 +30,11 @@ public function testCouldBeConstructedWithOptionalArguments() $this->assertSame(['fooHeader' => 'fooHeaderVal'], $message->getHeaders()); } - public function testShouldSetPriorityToZeroInConstructor() + public function testShouldSetNullPriorityInConstructor() { $message = new MongodbMessage(); - $this->assertSame(0, $message->getPriority()); + $this->assertNull($message->getPriority()); } public function testShouldSetDelayToNullInConstructor() From e15e92f380fa16f8f84d31e83efd57d76ea20bac Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Thu, 3 May 2018 17:34:03 +0300 Subject: [PATCH 21/24] fix test. --- pkg/mongodb/Tests/Symfony/MongodbTransportFactoryTest.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/mongodb/Tests/Symfony/MongodbTransportFactoryTest.php b/pkg/mongodb/Tests/Symfony/MongodbTransportFactoryTest.php index 7f88a9636..3621acafe 100644 --- a/pkg/mongodb/Tests/Symfony/MongodbTransportFactoryTest.php +++ b/pkg/mongodb/Tests/Symfony/MongodbTransportFactoryTest.php @@ -28,7 +28,7 @@ public function testCouldBeConstructedWithDefaultName() { $transport = new MongodbTransportFactory(); - $this->assertEquals('Mongodb', $transport->getName()); + $this->assertEquals('mongodb', $transport->getName()); } public function testCouldBeConstructedWithCustomName() From 3b8c997d65a2fe6397be96fa1cec4512126ce6b4 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Thu, 3 May 2018 17:51:06 +0300 Subject: [PATCH 22/24] Add mongodb transport. --- pkg/enqueue-bundle/EnqueueBundle.php | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pkg/enqueue-bundle/EnqueueBundle.php b/pkg/enqueue-bundle/EnqueueBundle.php index a62f2db2b..843cd498d 100644 --- a/pkg/enqueue-bundle/EnqueueBundle.php +++ b/pkg/enqueue-bundle/EnqueueBundle.php @@ -21,6 +21,7 @@ use Enqueue\Fs\Symfony\FsTransportFactory; use Enqueue\Gps\GpsConnectionFactory; use Enqueue\Gps\Symfony\GpsTransportFactory; +use Enqueue\Mongodb\Symfony\MongodbTransportFactory; use Enqueue\RdKafka\RdKafkaConnectionFactory; use Enqueue\RdKafka\Symfony\RdKafkaTransportFactory; use Enqueue\Redis\RedisConnectionFactory; @@ -112,6 +113,12 @@ class_exists(AmqpLibConnectionFactory::class) $extension->setTransportFactory(new MissingTransportFactory('rdkafka', ['enqueue/rdkafka'])); } + if (class_exists(MongodbTransportFactory::class)) { + $extension->setTransportFactory(new MongodbTransportFactory('mongodb')); + } else { + $extension->setTransportFactory(new MissingTransportFactory('mongodb', ['enqueue/mongodb'])); + } + $container->addCompilerPass(new AsyncEventsPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100); $container->addCompilerPass(new AsyncTransformersPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100); } From 1b2327099d9104fd916fa76546c5a233d2f6ecdb Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Thu, 3 May 2018 22:59:49 +0300 Subject: [PATCH 23/24] mongodb transport, fixes --- bin/run-fun-test.sh | 2 +- composer.json | 3 +- docker-compose.yml | 2 +- .../Tests/Functional/UseCasesTest.php | 14 ++--- pkg/mongodb/JSON.php | 59 +++++++++++++++++++ pkg/mongodb/MongodbConnectionFactory.php | 10 ++-- pkg/mongodb/MongodbConsumer.php | 5 +- pkg/mongodb/MongodbProducer.php | 4 +- .../Tests/Functional/MongodbConsumerTest.php | 6 +- .../Tests/MongodbConnectionFactoryTest.php | 4 +- pkg/mongodb/Tests/Spec/MongodbContextTest.php | 5 +- .../Tests/Spec/MongodbProducerTest.php | 5 +- .../Tests/Spec/MongodbRequeueMessageTest.php | 5 +- ...dAndReceiveDelayedMessageFromQueueTest.php | 5 +- ...ndReceivePriorityMessagesFromQueueTest.php | 5 +- ...ReceiveTimeToLiveMessagesFromQueueTest.php | 5 +- .../MongodbSendToAndReceiveFromQueueTest.php | 5 +- .../MongodbSendToAndReceiveFromTopicTest.php | 5 +- ...odbSendToAndReceiveNoWaitFromQueueTest.php | 5 +- ...odbSendToAndReceiveNoWaitFromTopicTest.php | 5 +- pkg/mongodb/composer.json | 2 +- pkg/simple-client/SimpleClient.php | 16 +++++ .../Tests/Functional/SimpleClientTest.php | 23 +++++++- .../MongodbExtensionTrait.php} | 8 +-- 24 files changed, 157 insertions(+), 51 deletions(-) create mode 100644 pkg/mongodb/JSON.php rename pkg/{mongodb/Tests/Spec/CreateMongodbContextTrait.php => test/MongodbExtensionTrait.php} (62%) diff --git a/bin/run-fun-test.sh b/bin/run-fun-test.sh index b8150ac54..626741366 100755 --- a/bin/run-fun-test.sh +++ b/bin/run-fun-test.sh @@ -3,4 +3,4 @@ set -x set -e -COMPOSE_PROJECT_NAME=mqdev docker-compose run --workdir="/mqdev" --rm dev ./bin/test "$@" \ No newline at end of file +docker-compose run --workdir="/mqdev" --rm dev ./bin/test "$@" \ No newline at end of file diff --git a/composer.json b/composer.json index 544c884d9..c5bbe060f 100644 --- a/composer.json +++ b/composer.json @@ -61,7 +61,8 @@ "platform": { "ext-amqp": "1.9.3", "ext-gearman": "1.1", - "ext-rdkafka": "3.3" + "ext-rdkafka": "3.3", + "ext-mongodb": "1.3" } }, "repositories": [ diff --git a/docker-compose.yml b/docker-compose.yml index f004f825d..77a4773a0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -105,7 +105,7 @@ services: entrypoint: 'gcloud beta emulators pubsub start --host-port=0.0.0.0:8085' mongo: - image: mongo + image: mongo:3.7 ports: - "27017:27017" diff --git a/pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php b/pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php index 34adc2c68..92533c216 100644 --- a/pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php +++ b/pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php @@ -14,7 +14,6 @@ use Interop\Queue\PsrQueue; use Symfony\Component\Console\Tester\CommandTester; use Symfony\Component\Filesystem\Filesystem; -use Symfony\Component\HttpKernel\Kernel; /** * @group functional @@ -93,14 +92,11 @@ public function provideEnqueueConfigs() ], ]]; - // Symfony 2.x does not such env syntax - if (version_compare(Kernel::VERSION, '3.2', '>=')) { - yield 'default_dsn_as_env' => [[ - 'transport' => [ - 'default' => '%env(AMQP_DSN)%', - ], - ]]; - } + yield 'default_dsn_as_env' => [[ + 'transport' => [ + 'default' => '%env(AMQP_DSN)%', + ], + ]]; yield 'default_dbal_as_dsn' => [[ 'transport' => [ diff --git a/pkg/mongodb/JSON.php b/pkg/mongodb/JSON.php new file mode 100644 index 000000000..84cac50da --- /dev/null +++ b/pkg/mongodb/JSON.php @@ -0,0 +1,59 @@ + 'mongodb://127.0.0.1/' - Mongodb connection string. see http://docs.mongodb.org/manual/reference/connection-string/ + * 'dsn' => 'mongodb://127.0.0.1/' - Mongodb connection string. see http://docs.mongodb.org/manual/reference/connection-string/ * 'dbname' => 'enqueue', - database name. * 'collection_name' => 'enqueue' - collection name * 'polling_interval' => '1000', - How often query for new messages (milliseconds) @@ -39,7 +39,7 @@ public function __construct($config = 'mongodb:') throw new \LogicException('The config must be either an array of options, a DSN string or null'); } $config = array_replace([ - 'uri' => 'mongodb://127.0.0.1/', + 'dsn' => 'mongodb://127.0.0.1/', 'dbname' => 'enqueue', 'collection_name' => 'enqueue', ], $config); @@ -49,7 +49,7 @@ public function __construct($config = 'mongodb:') public function createContext() { - $client = new Client($this->config['uri']); + $client = new Client($this->config['dsn']); return new MongodbContext($client, $this->config); } @@ -75,10 +75,10 @@ public static function parseDsn($dsn) } if ('mongodb:' === $dsn) { return [ - 'uri' => 'mongodb://127.0.0.1/', + 'dsn' => 'mongodb://127.0.0.1/', ]; } - $config['uri'] = $dsn; + $config['dsn'] = $dsn; if (isset($parsedUrl['path']) && '/' !== $parsedUrl['path']) { $pathParts = explode('/', $parsedUrl['path']); //DB name diff --git a/pkg/mongodb/MongodbConsumer.php b/pkg/mongodb/MongodbConsumer.php index 4986d662b..6fe34289c 100644 --- a/pkg/mongodb/MongodbConsumer.php +++ b/pkg/mongodb/MongodbConsumer.php @@ -163,7 +163,10 @@ protected function receiveMessage() */ protected function convertMessage(array $mongodbMessage) { - $message = $this->context->createMessage($mongodbMessage['body'], $mongodbMessage['properties'], $mongodbMessage['headers']); + $properties = JSON::decode($mongodbMessage['properties']); + $headers = JSON::decode($mongodbMessage['headers']); + + $message = $this->context->createMessage($mongodbMessage['body'], $properties, $headers); $message->setId((string) $mongodbMessage['_id']); $message->setPriority((int) $mongodbMessage['priority']); $message->setRedelivered((bool) $mongodbMessage['redelivered']); diff --git a/pkg/mongodb/MongodbProducer.php b/pkg/mongodb/MongodbProducer.php index 2a06791cd..c5132b62c 100644 --- a/pkg/mongodb/MongodbProducer.php +++ b/pkg/mongodb/MongodbProducer.php @@ -80,8 +80,8 @@ public function send(PsrDestination $destination, PsrMessage $message) $mongoMessage = [ 'published_at' => $publishedAt, 'body' => $body, - 'headers' => $message->getHeaders(), - 'properties' => $message->getProperties(), + 'headers' => JSON::encode($message->getHeaders()), + 'properties' => JSON::encode($message->getProperties()), 'priority' => $message->getPriority(), 'queue' => $destination->getName(), 'redelivered' => $message->isRedelivered(), diff --git a/pkg/mongodb/Tests/Functional/MongodbConsumerTest.php b/pkg/mongodb/Tests/Functional/MongodbConsumerTest.php index eddf213fe..609a22e16 100644 --- a/pkg/mongodb/Tests/Functional/MongodbConsumerTest.php +++ b/pkg/mongodb/Tests/Functional/MongodbConsumerTest.php @@ -4,7 +4,7 @@ use Enqueue\Mongodb\MongodbContext; use Enqueue\Mongodb\MongodbMessage; -use Enqueue\Mongodb\Tests\Spec\CreateMongodbContextTrait; +use Enqueue\Test\MongodbExtensionTrait; use PHPUnit\Framework\TestCase; /** @@ -12,7 +12,7 @@ */ class MongodbConsumerTest extends TestCase { - use CreateMongodbContextTrait; + use MongodbExtensionTrait; /** * @var MongodbContext @@ -21,7 +21,7 @@ class MongodbConsumerTest extends TestCase public function setUp() { - $this->context = $this->createMongodbContext(); + $this->context = $this->buildMongodbContext(); } protected function tearDown() diff --git a/pkg/mongodb/Tests/MongodbConnectionFactoryTest.php b/pkg/mongodb/Tests/MongodbConnectionFactoryTest.php index b46f8563e..daaef5102 100644 --- a/pkg/mongodb/Tests/MongodbConnectionFactoryTest.php +++ b/pkg/mongodb/Tests/MongodbConnectionFactoryTest.php @@ -22,7 +22,7 @@ public function testShouldImplementConnectionFactoryInterface() public function testCouldBeConstructedWithEmptyConfiguration() { $params = [ - 'uri' => 'mongodb://127.0.0.1/', + 'dsn' => 'mongodb://127.0.0.1/', 'dbname' => 'enqueue', 'collection_name' => 'enqueue', ]; @@ -34,7 +34,7 @@ public function testCouldBeConstructedWithEmptyConfiguration() public function testCouldBeConstructedWithCustomConfiguration() { $params = [ - 'uri' => 'mongodb://127.0.0.3/', + 'dsn' => 'mongodb://127.0.0.3/', 'uriOptions' => ['testValue' => 123], 'driverOptions' => ['testValue' => 123], 'dbname' => 'enqueue', diff --git a/pkg/mongodb/Tests/Spec/MongodbContextTest.php b/pkg/mongodb/Tests/Spec/MongodbContextTest.php index a53efa077..51d4c4b88 100644 --- a/pkg/mongodb/Tests/Spec/MongodbContextTest.php +++ b/pkg/mongodb/Tests/Spec/MongodbContextTest.php @@ -2,6 +2,7 @@ namespace Enqueue\Mongodb\Tests\Spec; +use Enqueue\Test\MongodbExtensionTrait; use Interop\Queue\Spec\PsrContextSpec; /** @@ -10,13 +11,13 @@ */ class MongodbContextTest extends PsrContextSpec { - use CreateMongodbContextTrait; + use MongodbExtensionTrait; /** * {@inheritdoc} */ protected function createContext() { - return $this->createMongodbContext(); + return $this->buildMongodbContext(); } } diff --git a/pkg/mongodb/Tests/Spec/MongodbProducerTest.php b/pkg/mongodb/Tests/Spec/MongodbProducerTest.php index 4f6dc8522..54eb096d0 100644 --- a/pkg/mongodb/Tests/Spec/MongodbProducerTest.php +++ b/pkg/mongodb/Tests/Spec/MongodbProducerTest.php @@ -2,6 +2,7 @@ namespace Enqueue\Mongodb\Tests\Spec; +use Enqueue\Test\MongodbExtensionTrait; use Interop\Queue\Spec\PsrProducerSpec; /** @@ -10,13 +11,13 @@ */ class MongodbProducerTest extends PsrProducerSpec { - use CreateMongodbContextTrait; + use MongodbExtensionTrait; /** * {@inheritdoc} */ protected function createProducer() { - return $this->createMongodbContext()->createProducer(); + return $this->buildMongodbContext()->createProducer(); } } diff --git a/pkg/mongodb/Tests/Spec/MongodbRequeueMessageTest.php b/pkg/mongodb/Tests/Spec/MongodbRequeueMessageTest.php index fa5832b4a..454d357ad 100644 --- a/pkg/mongodb/Tests/Spec/MongodbRequeueMessageTest.php +++ b/pkg/mongodb/Tests/Spec/MongodbRequeueMessageTest.php @@ -2,6 +2,7 @@ namespace Enqueue\Mongodb\Tests\Spec; +use Enqueue\Test\MongodbExtensionTrait; use Interop\Queue\Spec\RequeueMessageSpec; /** @@ -10,13 +11,13 @@ */ class MongodbRequeueMessageTest extends RequeueMessageSpec { - use CreateMongodbContextTrait; + use MongodbExtensionTrait; /** * {@inheritdoc} */ protected function createContext() { - return $this->createMongodbContext(); + return $this->buildMongodbContext(); } } diff --git a/pkg/mongodb/Tests/Spec/MongodbSendAndReceiveDelayedMessageFromQueueTest.php b/pkg/mongodb/Tests/Spec/MongodbSendAndReceiveDelayedMessageFromQueueTest.php index aa7ebf4d7..a5eb3511d 100644 --- a/pkg/mongodb/Tests/Spec/MongodbSendAndReceiveDelayedMessageFromQueueTest.php +++ b/pkg/mongodb/Tests/Spec/MongodbSendAndReceiveDelayedMessageFromQueueTest.php @@ -2,6 +2,7 @@ namespace Enqueue\Mongodb\Tests\Spec; +use Enqueue\Test\MongodbExtensionTrait; use Interop\Queue\Spec\SendAndReceiveDelayedMessageFromQueueSpec; /** @@ -10,13 +11,13 @@ */ class MongodbSendAndReceiveDelayedMessageFromQueueTest extends SendAndReceiveDelayedMessageFromQueueSpec { - use CreateMongodbContextTrait; + use MongodbExtensionTrait; /** * {@inheritdoc} */ protected function createContext() { - return $this->createMongodbContext(); + return $this->buildMongodbContext(); } } diff --git a/pkg/mongodb/Tests/Spec/MongodbSendAndReceivePriorityMessagesFromQueueTest.php b/pkg/mongodb/Tests/Spec/MongodbSendAndReceivePriorityMessagesFromQueueTest.php index d105bc90e..5400820b8 100644 --- a/pkg/mongodb/Tests/Spec/MongodbSendAndReceivePriorityMessagesFromQueueTest.php +++ b/pkg/mongodb/Tests/Spec/MongodbSendAndReceivePriorityMessagesFromQueueTest.php @@ -4,6 +4,7 @@ use Enqueue\Mongodb\MongodbContext; use Enqueue\Mongodb\MongodbMessage; +use Enqueue\Test\MongodbExtensionTrait; use Interop\Queue\PsrContext; use Interop\Queue\Spec\SendAndReceivePriorityMessagesFromQueueSpec; @@ -13,7 +14,7 @@ */ class MongodbSendAndReceivePriorityMessagesFromQueueTest extends SendAndReceivePriorityMessagesFromQueueSpec { - use CreateMongodbContextTrait; + use MongodbExtensionTrait; private $publishedAt; @@ -29,7 +30,7 @@ public function setUp() */ protected function createContext() { - return $this->createMongodbContext(); + return $this->buildMongodbContext(); } /** diff --git a/pkg/mongodb/Tests/Spec/MongodbSendAndReceiveTimeToLiveMessagesFromQueueTest.php b/pkg/mongodb/Tests/Spec/MongodbSendAndReceiveTimeToLiveMessagesFromQueueTest.php index 5f591eb43..d87ac10e9 100644 --- a/pkg/mongodb/Tests/Spec/MongodbSendAndReceiveTimeToLiveMessagesFromQueueTest.php +++ b/pkg/mongodb/Tests/Spec/MongodbSendAndReceiveTimeToLiveMessagesFromQueueTest.php @@ -2,6 +2,7 @@ namespace Enqueue\Mongodb\Tests\Spec; +use Enqueue\Test\MongodbExtensionTrait; use Interop\Queue\Spec\SendAndReceiveTimeToLiveMessagesFromQueueSpec; /** @@ -10,13 +11,13 @@ */ class MongodbSendAndReceiveTimeToLiveMessagesFromQueueTest extends SendAndReceiveTimeToLiveMessagesFromQueueSpec { - use CreateMongodbContextTrait; + use MongodbExtensionTrait; /** * {@inheritdoc} */ protected function createContext() { - return $this->createMongodbContext(); + return $this->buildMongodbContext(); } } diff --git a/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveFromQueueTest.php b/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveFromQueueTest.php index cb420e368..992c0626e 100644 --- a/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveFromQueueTest.php +++ b/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveFromQueueTest.php @@ -2,6 +2,7 @@ namespace Enqueue\Mongodb\Tests\Spec; +use Enqueue\Test\MongodbExtensionTrait; use Interop\Queue\Spec\SendToAndReceiveFromQueueSpec; /** @@ -10,13 +11,13 @@ */ class MongodbSendToAndReceiveFromQueueTest extends SendToAndReceiveFromQueueSpec { - use CreateMongodbContextTrait; + use MongodbExtensionTrait; /** * {@inheritdoc} */ protected function createContext() { - return $this->createMongodbContext(); + return $this->buildMongodbContext(); } } diff --git a/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveFromTopicTest.php b/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveFromTopicTest.php index 1078291d3..c539386f7 100644 --- a/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveFromTopicTest.php +++ b/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveFromTopicTest.php @@ -2,6 +2,7 @@ namespace Enqueue\Mongodb\Tests\Spec; +use Enqueue\Test\MongodbExtensionTrait; use Interop\Queue\Spec\SendToAndReceiveFromTopicSpec; /** @@ -10,13 +11,13 @@ */ class MongodbSendToAndReceiveFromTopicTest extends SendToAndReceiveFromTopicSpec { - use CreateMongodbContextTrait; + use MongodbExtensionTrait; /** * {@inheritdoc} */ protected function createContext() { - return $this->createMongodbContext(); + return $this->buildMongodbContext(); } } diff --git a/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveNoWaitFromQueueTest.php b/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveNoWaitFromQueueTest.php index 596d17735..ea4febcc2 100644 --- a/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveNoWaitFromQueueTest.php +++ b/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveNoWaitFromQueueTest.php @@ -2,6 +2,7 @@ namespace Enqueue\Mongodb\Tests\Spec; +use Enqueue\Test\MongodbExtensionTrait; use Interop\Queue\Spec\SendToAndReceiveNoWaitFromQueueSpec; /** @@ -10,13 +11,13 @@ */ class MongodbSendToAndReceiveNoWaitFromQueueTest extends SendToAndReceiveNoWaitFromQueueSpec { - use CreateMongodbContextTrait; + use MongodbExtensionTrait; /** * {@inheritdoc} */ protected function createContext() { - return $this->createMongodbContext(); + return $this->buildMongodbContext(); } } diff --git a/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveNoWaitFromTopicTest.php b/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveNoWaitFromTopicTest.php index c8ca6d1ec..1e1be32c1 100644 --- a/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveNoWaitFromTopicTest.php +++ b/pkg/mongodb/Tests/Spec/MongodbSendToAndReceiveNoWaitFromTopicTest.php @@ -2,6 +2,7 @@ namespace Enqueue\Mongodb\Tests\Spec; +use Enqueue\Test\MongodbExtensionTrait; use Interop\Queue\Spec\SendToAndReceiveNoWaitFromTopicSpec; /** @@ -10,13 +11,13 @@ */ class MongodbSendToAndReceiveNoWaitFromTopicTest extends SendToAndReceiveNoWaitFromTopicSpec { - use CreateMongodbContextTrait; + use MongodbExtensionTrait; /** * {@inheritdoc} */ protected function createContext() { - return $this->createMongodbContext(); + return $this->buildMongodbContext(); } } diff --git a/pkg/mongodb/composer.json b/pkg/mongodb/composer.json index e4a08da8c..59a2b6caa 100644 --- a/pkg/mongodb/composer.json +++ b/pkg/mongodb/composer.json @@ -18,7 +18,7 @@ "require-dev": { "phpunit/phpunit": "~5.4.0", "queue-interop/queue-spec": "^0.5.5@dev", - "enqueue/test": "^0.8@dev", + "enqueue/test": "^0.8.25@dev", "enqueue/enqueue": "^0.8@dev", "enqueue/null": "^0.8@dev" }, diff --git a/pkg/simple-client/SimpleClient.php b/pkg/simple-client/SimpleClient.php index 640211781..000743259 100644 --- a/pkg/simple-client/SimpleClient.php +++ b/pkg/simple-client/SimpleClient.php @@ -22,7 +22,11 @@ use Enqueue\Fs\Symfony\FsTransportFactory; use Enqueue\Gps\GpsConnectionFactory; use Enqueue\Gps\Symfony\GpsTransportFactory; +use Enqueue\Mongodb\MongodbConnectionFactory; +use Enqueue\Mongodb\Symfony\MongodbTransportFactory; use Enqueue\Null\Symfony\NullTransportFactory; +use Enqueue\RdKafka\RdKafkaConnectionFactory; +use Enqueue\RdKafka\Symfony\RdKafkaTransportFactory; use Enqueue\Redis\RedisConnectionFactory; use Enqueue\Redis\Symfony\RedisTransportFactory; use Enqueue\Rpc\Promise; @@ -351,6 +355,18 @@ class_exists(AmqpLibConnectionFactory::class) $extension->addTransportFactory(new MissingTransportFactory('gps', ['enqueue/gps'])); } + if (class_exists(RdKafkaConnectionFactory::class)) { + $extension->addTransportFactory(new RdKafkaTransportFactory('rdkafka')); + } else { + $extension->addTransportFactory(new MissingTransportFactory('rdkafka', ['enqueue/rdkafka'])); + } + + if (class_exists(MongodbConnectionFactory::class)) { + $extension->addTransportFactory(new MongodbTransportFactory('mongodb')); + } else { + $extension->addTransportFactory(new MissingTransportFactory('mongodb', ['enqueue/mongodb'])); + } + return $extension; } diff --git a/pkg/simple-client/Tests/Functional/SimpleClientTest.php b/pkg/simple-client/Tests/Functional/SimpleClientTest.php index 75d508c8d..87d60034b 100644 --- a/pkg/simple-client/Tests/Functional/SimpleClientTest.php +++ b/pkg/simple-client/Tests/Functional/SimpleClientTest.php @@ -7,8 +7,8 @@ use Enqueue\Consumption\Extension\LimitConsumptionTimeExtension; use Enqueue\Consumption\Result; use Enqueue\SimpleClient\SimpleClient; -use Enqueue\Test\RabbitmqAmqpExtension; use Enqueue\Test\RabbitManagementExtensionTrait; +use Enqueue\Test\RabbitmqAmqpExtension; use Interop\Queue\PsrMessage; use PHPUnit\Framework\TestCase; @@ -73,6 +73,27 @@ public function transportConfigDataProvider() ], ], ]]; + + yield [[ + 'transport' => [ + 'default' => 'rabbitmq_amqp', + 'rabbitmq_amqp' => [ + 'driver' => 'ext', + 'host' => getenv('RABBITMQ_HOST'), + 'port' => getenv('RABBITMQ_AMQP__PORT'), + 'user' => getenv('RABBITMQ_USER'), + 'pass' => getenv('RABBITMQ_PASSWORD'), + 'vhost' => getenv('RABBITMQ_VHOST'), + ], + ], + ]]; + + yield 'mongodb_dsn' => [[ + 'transport' => [ + 'default' => 'mongodb', + 'mongodb' => getenv('MONGO_DSN'), + ], + ]]; } /** diff --git a/pkg/mongodb/Tests/Spec/CreateMongodbContextTrait.php b/pkg/test/MongodbExtensionTrait.php similarity index 62% rename from pkg/mongodb/Tests/Spec/CreateMongodbContextTrait.php rename to pkg/test/MongodbExtensionTrait.php index 703ea7fc5..4d94fca40 100644 --- a/pkg/mongodb/Tests/Spec/CreateMongodbContextTrait.php +++ b/pkg/test/MongodbExtensionTrait.php @@ -1,18 +1,18 @@ markTestSkipped('The MONGO_DSN env is not available. Skip tests'); } - $factory = new MongodbConnectionFactory(['uri' => $env]); + $factory = new MongodbConnectionFactory(['dsn' => $env]); $context = $factory->createContext(); From c407d247046a65142c1c48750d6cca9bdc0405af Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Thu, 3 May 2018 23:16:22 +0300 Subject: [PATCH 24/24] revert sf version check --- .../Tests/Functional/UseCasesTest.php | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php b/pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php index 92533c216..34adc2c68 100644 --- a/pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php +++ b/pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php @@ -14,6 +14,7 @@ use Interop\Queue\PsrQueue; use Symfony\Component\Console\Tester\CommandTester; use Symfony\Component\Filesystem\Filesystem; +use Symfony\Component\HttpKernel\Kernel; /** * @group functional @@ -92,11 +93,14 @@ public function provideEnqueueConfigs() ], ]]; - yield 'default_dsn_as_env' => [[ - 'transport' => [ - 'default' => '%env(AMQP_DSN)%', - ], - ]]; + // Symfony 2.x does not such env syntax + if (version_compare(Kernel::VERSION, '3.2', '>=')) { + yield 'default_dsn_as_env' => [[ + 'transport' => [ + 'default' => '%env(AMQP_DSN)%', + ], + ]]; + } yield 'default_dbal_as_dsn' => [[ 'transport' => [