diff --git a/Client/RedisDriver.php b/Client/RedisDriver.php new file mode 100644 index 0000000..57ae0f2 --- /dev/null +++ b/Client/RedisDriver.php @@ -0,0 +1,151 @@ +context = $context; + $this->config = $config; + $this->queueMetaRegistry = $queueMetaRegistry; + } + + /** + * {@inheritdoc} + */ + public function sendToRouter(Message $message) + { + if (false == $message->getProperty(Config::PARAMETER_TOPIC_NAME)) { + throw new \LogicException('Topic name parameter is required but is not set'); + } + + $queue = $this->createQueue($this->config->getRouterQueueName()); + $transportMessage = $this->createTransportMessage($message); + + $this->context->createProducer()->send($queue, $transportMessage); + } + + /** + * {@inheritdoc} + */ + public function sendToProcessor(Message $message) + { + if (false == $message->getProperty(Config::PARAMETER_PROCESSOR_NAME)) { + throw new \LogicException('Processor name parameter is required but is not set'); + } + + if (false == $queueName = $message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME)) { + throw new \LogicException('Queue name parameter is required but is not set'); + } + + $transportMessage = $this->createTransportMessage($message); + $destination = $this->createQueue($queueName); + + $this->context->createProducer()->send($destination, $transportMessage); + } + + /** + * {@inheritdoc} + */ + public function setupBroker(LoggerInterface $logger = null) + { + } + + /** + * {@inheritdoc} + * + * @return RedisDestination + */ + public function createQueue($queueName) + { + return $this->context->createQueue($this->config->createTransportQueueName($queueName)); + } + + /** + * {@inheritdoc} + * + * @return RedisMessage + */ + public function createTransportMessage(Message $message) + { + $properties = $message->getProperties(); + + $headers = $message->getHeaders(); + $headers['content_type'] = $message->getContentType(); + + $transportMessage = $this->context->createMessage(); + $transportMessage->setBody($message->getBody()); + $transportMessage->setHeaders($headers); + $transportMessage->setProperties($properties); + $transportMessage->setMessageId($message->getMessageId()); + $transportMessage->setTimestamp($message->getTimestamp()); + $transportMessage->setReplyTo($message->getReplyTo()); + $transportMessage->setCorrelationId($message->getCorrelationId()); + + return $transportMessage; + } + + /** + * @param RedisMessage $message + * + * {@inheritdoc} + */ + public function createClientMessage(PsrMessage $message) + { + $clientMessage = new Message(); + + $clientMessage->setBody($message->getBody()); + $clientMessage->setHeaders($message->getHeaders()); + $clientMessage->setProperties($message->getProperties()); + + $clientMessage->setContentType($message->getHeader('content_type')); + $clientMessage->setMessageId($message->getMessageId()); + $clientMessage->setTimestamp($message->getTimestamp()); + $clientMessage->setPriority(MessagePriority::NORMAL); + $clientMessage->setReplyTo($message->getReplyTo()); + $clientMessage->setCorrelationId($message->getCorrelationId()); + + return $clientMessage; + } + + /** + * @return Config + */ + public function getConfig() + { + return $this->config; + } +} diff --git a/PRedis.php b/PRedis.php index 55a0316..16a731a 100644 --- a/PRedis.php +++ b/PRedis.php @@ -37,7 +37,7 @@ public function lpush($key, $value) public function brpop($key, $timeout) { try { - if ($result = $this->redis->brpop($key, $timeout)) { + if ($result = $this->redis->brpop([$key], $timeout)) { return $result[1]; } } catch (PRedisServerException $e) { diff --git a/RedisConsumer.php b/RedisConsumer.php index 50cb7be..bcf656d 100644 --- a/RedisConsumer.php +++ b/RedisConsumer.php @@ -45,7 +45,16 @@ public function getQueue() */ public function receive($timeout = 0) { - if ($message = $this->getRedis()->brpop($this->queue->getName(), (int) $timeout / 1000)) { + $timeout = (int) ($timeout / 1000); + if (empty($timeout)) { +// Caused by +// Predis\Response\ServerException: ERR timeout is not an integer or out of range +// /mqdev/vendor/predis/predis/src/Client.php:370 + + return $this->receiveNoWait(); + } + + if ($message = $this->getRedis()->brpop($this->queue->getName(), $timeout)) { return RedisMessage::jsonUnserialize($message); } } diff --git a/Symfony/RedisTransportFactory.php b/Symfony/RedisTransportFactory.php new file mode 100644 index 0000000..20a8cdf --- /dev/null +++ b/Symfony/RedisTransportFactory.php @@ -0,0 +1,114 @@ +name = $name; + } + + /** + * {@inheritdoc} + */ + public function addConfiguration(ArrayNodeDefinition $builder) + { + $builder + ->children() + ->scalarNode('host') + ->isRequired() + ->cannotBeEmpty() + ->info('can be a host, or the path to a unix domain socket') + ->end() + ->integerNode('port')->end() + ->enumNode('vendor') + ->values(['phpredis', 'predis']) + ->isRequired() + ->cannotBeEmpty() + ->info('The library used internally to interact with Redis server') + ->end() + ->booleanNode('persisted') + ->defaultFalse() + ->info('bool, Whether it use single persisted connection or open a new one for every context') + ->end() + ->booleanNode('lazy') + ->defaultTrue() + ->info('the connection will be performed as later as possible, if the option set to true') + ->end() + ; + } + + /** + * {@inheritdoc} + */ + public function createConnectionFactory(ContainerBuilder $container, array $config) + { + $factory = new Definition(RedisConnectionFactory::class); + $factory->setArguments([$config]); + + $factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName()); + $container->setDefinition($factoryId, $factory); + + return $factoryId; + } + + /** + * {@inheritdoc} + */ + public function createContext(ContainerBuilder $container, array $config) + { + $factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName()); + + $context = new Definition(RedisContext::class); + $context->setFactory([new Reference($factoryId), 'createContext']); + + $contextId = sprintf('enqueue.transport.%s.context', $this->getName()); + $container->setDefinition($contextId, $context); + + return $contextId; + } + + /** + * {@inheritdoc} + */ + public function createDriver(ContainerBuilder $container, array $config) + { + $driver = new Definition(RedisDriver::class); + $driver->setArguments([ + new Reference(sprintf('enqueue.transport.%s.context', $this->getName())), + new Reference('enqueue.client.config'), + new Reference('enqueue.client.meta.queue_meta_registry'), + ]); + + $driverId = sprintf('enqueue.client.%s.driver', $this->getName()); + $container->setDefinition($driverId, $driver); + + return $driverId; + } + + /** + * {@inheritdoc} + */ + public function getName() + { + return $this->name; + } +} diff --git a/Tests/Client/RedisDriverTest.php b/Tests/Client/RedisDriverTest.php new file mode 100644 index 0000000..3a8e8c9 --- /dev/null +++ b/Tests/Client/RedisDriverTest.php @@ -0,0 +1,354 @@ +assertClassImplements(DriverInterface::class, RedisDriver::class); + } + + public function testCouldBeConstructedWithRequiredArguments() + { + new RedisDriver( + $this->createPsrContextMock(), + Config::create(), + $this->createQueueMetaRegistryMock() + ); + } + + public function testShouldReturnConfigObject() + { + $config = Config::create();; + + $driver = new RedisDriver($this->createPsrContextMock(), $config, $this->createQueueMetaRegistryMock()); + + $this->assertSame($config, $driver->getConfig()); + } + + public function testShouldCreateAndReturnQueueInstance() + { + $expectedQueue = new RedisDestination('aQueueName'); + + $context = $this->createPsrContextMock(); + $context + ->expects($this->once()) + ->method('createQueue') + ->with('name') + ->will($this->returnValue($expectedQueue)) + ; + + $driver = new RedisDriver($context, Config::create(), $this->createQueueMetaRegistryMock()); + + $queue = $driver->createQueue('name'); + + $this->assertSame($expectedQueue, $queue); + $this->assertSame('aQueueName', $queue->getQueueName()); + } + + public function testShouldConvertTransportMessageToClientMessage() + { + $transportMessage = new RedisMessage(); + $transportMessage->setBody('body'); + $transportMessage->setHeaders(['hkey' => 'hval']); + $transportMessage->setProperties(['key' => 'val']); + $transportMessage->setHeader('content_type', 'ContentType'); + $transportMessage->setMessageId('MessageId'); + $transportMessage->setTimestamp(1000); + $transportMessage->setReplyTo('theReplyTo'); + $transportMessage->setCorrelationId('theCorrelationId'); + $transportMessage->setReplyTo('theReplyTo'); + $transportMessage->setCorrelationId('theCorrelationId'); + + $driver = new RedisDriver( + $this->createPsrContextMock(), + Config::create(), + $this->createQueueMetaRegistryMock() + ); + + $clientMessage = $driver->createClientMessage($transportMessage); + + $this->assertInstanceOf(Message::class, $clientMessage); + $this->assertSame('body', $clientMessage->getBody()); + $this->assertSame([ + 'hkey' => 'hval', + 'content_type' => 'ContentType', + 'message_id' => 'MessageId', + 'timestamp' => 1000, + 'reply_to' => 'theReplyTo', + 'correlation_id' => 'theCorrelationId', + ], $clientMessage->getHeaders()); + $this->assertSame([ + 'key' => 'val', + ], $clientMessage->getProperties()); + $this->assertSame('MessageId', $clientMessage->getMessageId()); + $this->assertSame('ContentType', $clientMessage->getContentType()); + $this->assertSame(1000, $clientMessage->getTimestamp()); + $this->assertSame('theReplyTo', $clientMessage->getReplyTo()); + $this->assertSame('theCorrelationId', $clientMessage->getCorrelationId()); + + $this->assertNull($clientMessage->getExpire()); + $this->assertSame(MessagePriority::NORMAL, $clientMessage->getPriority()); + } + + public function testShouldConvertClientMessageToTransportMessage() + { + $clientMessage = new Message(); + $clientMessage->setBody('body'); + $clientMessage->setHeaders(['hkey' => 'hval']); + $clientMessage->setProperties(['key' => 'val']); + $clientMessage->setContentType('ContentType'); + $clientMessage->setExpire(123); + $clientMessage->setPriority(MessagePriority::VERY_HIGH); + $clientMessage->setMessageId('MessageId'); + $clientMessage->setTimestamp(1000); + $clientMessage->setReplyTo('theReplyTo'); + $clientMessage->setCorrelationId('theCorrelationId'); + + $context = $this->createPsrContextMock(); + $context + ->expects($this->once()) + ->method('createMessage') + ->willReturn(new RedisMessage()) + ; + + $driver = new RedisDriver( + $context, + Config::create(), + $this->createQueueMetaRegistryMock() + ); + + $transportMessage = $driver->createTransportMessage($clientMessage); + + $this->assertInstanceOf(RedisMessage::class, $transportMessage); + $this->assertSame('body', $transportMessage->getBody()); + $this->assertSame([ + 'hkey' => 'hval', + 'content_type' => 'ContentType', + 'message_id' => 'MessageId', + 'timestamp' => 1000, + 'reply_to' => 'theReplyTo', + 'correlation_id' => 'theCorrelationId', + ], $transportMessage->getHeaders()); + $this->assertSame([ + 'key' => 'val', + ], $transportMessage->getProperties()); + $this->assertSame('MessageId', $transportMessage->getMessageId()); + $this->assertSame(1000, $transportMessage->getTimestamp()); + $this->assertSame('theReplyTo', $transportMessage->getReplyTo()); + $this->assertSame('theCorrelationId', $transportMessage->getCorrelationId()); + } + + public function testShouldSendMessageToRouterQueue() + { + $topic = new RedisDestination('aDestinationName'); + $transportMessage = new RedisMessage(); + $config = $this->createConfigMock(); + + $config + ->expects($this->once()) + ->method('getRouterQueueName') + ->willReturn('queueName'); + + $config + ->expects($this->once()) + ->method('createTransportQueueName') + ->with('queueName') + ->willReturn('app.queueName'); + + $producer = $this->createPsrProducerMock(); + $producer + ->expects($this->once()) + ->method('send') + ->with($this->identicalTo($topic), $this->identicalTo($transportMessage)) + ; + $context = $this->createPsrContextMock(); + $context + ->expects($this->once()) + ->method('createQueue') + ->with('app.queueName') + ->willReturn($topic) + ; + $context + ->expects($this->once()) + ->method('createProducer') + ->willReturn($producer) + ; + $context + ->expects($this->once()) + ->method('createMessage') + ->willReturn($transportMessage) + ; + + $driver = new RedisDriver( + $context, + $config, + $this->createQueueMetaRegistryMock() + ); + + $message = new Message(); + $message->setProperty(Config::PARAMETER_TOPIC_NAME, 'topic'); + + $driver->sendToRouter($message); + } + + public function testShouldThrowExceptionIfTopicParameterIsNotSet() + { + $driver = new RedisDriver( + $this->createPsrContextMock(), + Config::create(), + $this->createQueueMetaRegistryMock() + ); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Topic name parameter is required but is not set'); + + $driver->sendToRouter(new Message()); + } + + public function testShouldSendMessageToProcessor() + { + $queue = new RedisDestination('aDestinationName'); + $transportMessage = new RedisMessage(); + + $producer = $this->createPsrProducerMock(); + $producer + ->expects($this->once()) + ->method('send') + ->with($this->identicalTo($queue), $this->identicalTo($transportMessage)) + ; + $context = $this->createPsrContextMock(); + $context + ->expects($this->once()) + ->method('createQueue') + ->willReturn($queue) + ; + $context + ->expects($this->once()) + ->method('createProducer') + ->willReturn($producer) + ; + $context + ->expects($this->once()) + ->method('createMessage') + ->willReturn($transportMessage) + ; + + $driver = new RedisDriver( + $context, + Config::create(), + $this->createQueueMetaRegistryMock() + ); + + $message = new Message(); + $message->setProperty(Config::PARAMETER_PROCESSOR_NAME, 'processor'); + $message->setProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME, 'queue'); + + $driver->sendToProcessor($message); + } + + public function testShouldThrowExceptionIfProcessorNameParameterIsNotSet() + { + $driver = new RedisDriver( + $this->createPsrContextMock(), + Config::create(), + $this->createQueueMetaRegistryMock() + ); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Processor name parameter is required but is not set'); + + $driver->sendToProcessor(new Message()); + } + + public function testShouldThrowExceptionIfProcessorQueueNameParameterIsNotSet() + { + $driver = new RedisDriver( + $this->createPsrContextMock(), + Config::create(), + $this->createQueueMetaRegistryMock() + ); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Queue name parameter is required but is not set'); + + $message = new Message(); + $message->setProperty(Config::PARAMETER_PROCESSOR_NAME, 'processor'); + + $driver->sendToProcessor($message); + } + + public function testShouldDoNothingOnSetupBroker() + { + $context = $this->createPsrContextMock(); + // setup router + $context + ->expects($this->never()) + ->method('createTopic') + ; + $context + ->expects($this->never()) + ->method('createQueue') + ; + + $meta = new QueueMetaRegistry(Config::create(), [ + 'default' => [], + ], 'default'); + + $driver = new RedisDriver( + $context, + Config::create(), + $meta + ); + + $driver->setupBroker(); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|RedisContext + */ + private function createPsrContextMock() + { + return $this->createMock(RedisContext::class); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|PsrProducer + */ + private function createPsrProducerMock() + { + return $this->createMock(PsrProducer::class); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|QueueMetaRegistry + */ + private function createQueueMetaRegistryMock() + { + return $this->createMock(QueueMetaRegistry::class); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|Config + */ + private function createConfigMock() + { + return $this->createMock(Config::class); + } +} diff --git a/Tests/Functional/ConsumptionUseCasesTrait.php b/Tests/Functional/ConsumptionUseCasesTrait.php new file mode 100644 index 0000000..9ef034f --- /dev/null +++ b/Tests/Functional/ConsumptionUseCasesTrait.php @@ -0,0 +1,76 @@ +getContext()->createQueue('enqueue.test_queue'); + + $message = $this->getContext()->createMessage(__METHOD__); + $this->getContext()->createProducer()->send($queue, $message); + + $queueConsumer = new QueueConsumer($this->getContext(), new ChainExtension([ + new LimitConsumedMessagesExtension(1), + new LimitConsumptionTimeExtension(new \DateTime('+3sec')), + ])); + + $processor = new StubProcessor(); + $queueConsumer->bind($queue, $processor); + + $queueConsumer->consume(); + + $this->assertInstanceOf(PsrMessage::class, $processor->lastProcessedMessage); + $this->assertEquals(__METHOD__, $processor->lastProcessedMessage->getBody()); + } + + public function testConsumeOneMessageAndSendReplyExit() + { + $queue = $this->getContext()->createQueue('enqueue.test_queue'); + + $replyQueue = $this->getContext()->createQueue('enqueue.test_queue_reply'); + + $message = $this->getContext()->createMessage(__METHOD__); + $message->setReplyTo($replyQueue->getQueueName()); + $this->getContext()->createProducer()->send($queue, $message); + + $queueConsumer = new QueueConsumer($this->getContext(), new ChainExtension([ + new LimitConsumedMessagesExtension(2), + new LimitConsumptionTimeExtension(new \DateTime('+3sec')), + new ReplyExtension(), + ])); + + $replyMessage = $this->getContext()->createMessage(__METHOD__.'.reply'); + + $processor = new StubProcessor(); + $processor->result = Result::reply($replyMessage); + + $replyProcessor = new StubProcessor(); + + $queueConsumer->bind($queue, $processor); + $queueConsumer->bind($replyQueue, $replyProcessor); + $queueConsumer->consume(); + + $this->assertInstanceOf(PsrMessage::class, $processor->lastProcessedMessage); + $this->assertEquals(__METHOD__, $processor->lastProcessedMessage->getBody()); + + $this->assertInstanceOf(PsrMessage::class, $replyProcessor->lastProcessedMessage); + $this->assertEquals(__METHOD__.'.reply', $replyProcessor->lastProcessedMessage->getBody()); + } + + /** + * @return RedisContext + */ + abstract protected function getContext(); +} + diff --git a/Tests/Functional/PRedisConsumptionUseCasesTest.php b/Tests/Functional/PRedisConsumptionUseCasesTest.php new file mode 100644 index 0000000..87fca37 --- /dev/null +++ b/Tests/Functional/PRedisConsumptionUseCasesTest.php @@ -0,0 +1,42 @@ +context = $this->buildPRedisContext(); + + $this->context->deleteQueue($this->context->createQueue('enqueue.test_queue')); + $this->context->deleteQueue($this->context->createQueue('enqueue.test_queue_reply')); + } + + public function tearDown() + { + $this->context->close(); + } + + /** + * {@inheritdoc} + */ + protected function getContext() + { + return $this->context; + } +} diff --git a/Tests/Functional/PhpRedisConsumptionUseCasesTest.php b/Tests/Functional/PhpRedisConsumptionUseCasesTest.php new file mode 100644 index 0000000..50c639f --- /dev/null +++ b/Tests/Functional/PhpRedisConsumptionUseCasesTest.php @@ -0,0 +1,42 @@ +context = $this->buildPhpRedisContext(); + + $this->context->deleteQueue($this->context->createQueue('enqueue.test_queue')); + $this->context->deleteQueue($this->context->createQueue('enqueue.test_queue_reply')); + } + + public function tearDown() + { + $this->context->close(); + } + + /** + * {@inheritdoc} + */ + protected function getContext() + { + return $this->context; + } +} diff --git a/Tests/Functional/StubProcessor.php b/Tests/Functional/StubProcessor.php new file mode 100644 index 0000000..a3ef5c7 --- /dev/null +++ b/Tests/Functional/StubProcessor.php @@ -0,0 +1,21 @@ +lastProcessedMessage = $message; + + return $this->result; + } +} diff --git a/Tests/Symfony/RedisTransportFactoryTest.php b/Tests/Symfony/RedisTransportFactoryTest.php new file mode 100644 index 0000000..1b4175e --- /dev/null +++ b/Tests/Symfony/RedisTransportFactoryTest.php @@ -0,0 +1,130 @@ +assertClassImplements(TransportFactoryInterface::class, RedisTransportFactory::class); + } + + public function testCouldBeConstructedWithDefaultName() + { + $transport = new RedisTransportFactory(); + + $this->assertEquals('redis', $transport->getName()); + } + + public function testCouldBeConstructedWithCustomName() + { + $transport = new RedisTransportFactory('theCustomName'); + + $this->assertEquals('theCustomName', $transport->getName()); + } + + public function testShouldAllowAddConfiguration() + { + $transport = new RedisTransportFactory(); + $tb = new TreeBuilder(); + $rootNode = $tb->root('foo'); + + $transport->addConfiguration($rootNode); + $processor = new Processor(); + $config = $processor->process($tb->buildTree(), [[ + 'host' => 'localhost', + 'port' => 123, + 'vendor' => 'phpredis', + 'persisted' => true, + 'lazy' => false, + ]]); + + $this->assertEquals([ + 'host' => 'localhost', + 'port' => 123, + 'vendor' => 'phpredis', + 'persisted' => true, + 'lazy' => false, + ], $config); + } + + public function testShouldCreateConnectionFactory() + { + $container = new ContainerBuilder(); + + $transport = new RedisTransportFactory(); + + $serviceId = $transport->createConnectionFactory($container, [ + 'host' => 'localhost', + 'port' => 123, + 'vendor' => 'phpredis', + ]); + + $this->assertTrue($container->hasDefinition($serviceId)); + $factory = $container->getDefinition($serviceId); + $this->assertEquals(RedisConnectionFactory::class, $factory->getClass()); + $this->assertSame([[ + 'host' => 'localhost', + 'port' => 123, + 'vendor' => 'phpredis', + ]], $factory->getArguments()); + } + + public function testShouldCreateContext() + { + $container = new ContainerBuilder(); + + $transport = new RedisTransportFactory(); + + $serviceId = $transport->createContext($container, [ + 'host' => 'localhost', + 'port' => 123, + 'vendor' => 'predis', + ]); + + $this->assertEquals('enqueue.transport.redis.context', $serviceId); + $this->assertTrue($container->hasDefinition($serviceId)); + + $context = $container->getDefinition('enqueue.transport.redis.context'); + $this->assertInstanceOf(Reference::class, $context->getFactory()[0]); + $this->assertEquals('enqueue.transport.redis.connection_factory', (string) $context->getFactory()[0]); + $this->assertEquals('createContext', $context->getFactory()[1]); + } + + public function testShouldCreateDriver() + { + $container = new ContainerBuilder(); + + $transport = new RedisTransportFactory(); + + $serviceId = $transport->createDriver($container, []); + + $this->assertEquals('enqueue.client.redis.driver', $serviceId); + $this->assertTrue($container->hasDefinition($serviceId)); + + $driver = $container->getDefinition($serviceId); + $this->assertSame(RedisDriver::class, $driver->getClass()); + + $this->assertInstanceOf(Reference::class, $driver->getArgument(0)); + $this->assertEquals('enqueue.transport.redis.context', (string) $driver->getArgument(0)); + + $this->assertInstanceOf(Reference::class, $driver->getArgument(1)); + $this->assertEquals('enqueue.client.config', (string) $driver->getArgument(1)); + + $this->assertInstanceOf(Reference::class, $driver->getArgument(2)); + $this->assertEquals('enqueue.client.meta.queue_meta_registry', (string) $driver->getArgument(2)); + } +}