Skip to content

Commit

Permalink
[client] Redis driver
Browse files Browse the repository at this point in the history
  • Loading branch information
makasim committed Apr 21, 2017
1 parent 6b0efaf commit 14e0fd8
Show file tree
Hide file tree
Showing 10 changed files with 941 additions and 2 deletions.
151 changes: 151 additions & 0 deletions Client/RedisDriver.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
<?php

namespace Enqueue\Redis\Client;

use Enqueue\Client\Config;
use Enqueue\Client\DriverInterface;
use Enqueue\Client\Message;
use Enqueue\Client\MessagePriority;
use Enqueue\Client\Meta\QueueMetaRegistry;
use Enqueue\Psr\PsrMessage;
use Enqueue\Redis\RedisContext;
use Enqueue\Redis\RedisDestination;
use Enqueue\Redis\RedisMessage;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;

class RedisDriver implements DriverInterface
{
/**
* @var RedisContext
*/
private $context;

/**
* @var Config
*/
private $config;

/**
* @var QueueMetaRegistry
*/
private $queueMetaRegistry;

/**
* @param RedisContext $context
* @param Config $config
* @param QueueMetaRegistry $queueMetaRegistry
*/
public function __construct(RedisContext $context, Config $config, QueueMetaRegistry $queueMetaRegistry)
{
$this->context = $context;
$this->config = $config;
$this->queueMetaRegistry = $queueMetaRegistry;
}

/**
* {@inheritdoc}
*/
public function sendToRouter(Message $message)
{
if (false == $message->getProperty(Config::PARAMETER_TOPIC_NAME)) {
throw new \LogicException('Topic name parameter is required but is not set');
}

$queue = $this->createQueue($this->config->getRouterQueueName());
$transportMessage = $this->createTransportMessage($message);

$this->context->createProducer()->send($queue, $transportMessage);
}

/**
* {@inheritdoc}
*/
public function sendToProcessor(Message $message)
{
if (false == $message->getProperty(Config::PARAMETER_PROCESSOR_NAME)) {
throw new \LogicException('Processor name parameter is required but is not set');
}

if (false == $queueName = $message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME)) {
throw new \LogicException('Queue name parameter is required but is not set');
}

$transportMessage = $this->createTransportMessage($message);
$destination = $this->createQueue($queueName);

$this->context->createProducer()->send($destination, $transportMessage);
}

/**
* {@inheritdoc}
*/
public function setupBroker(LoggerInterface $logger = null)
{
}

/**
* {@inheritdoc}
*
* @return RedisDestination
*/
public function createQueue($queueName)
{
return $this->context->createQueue($this->config->createTransportQueueName($queueName));
}

/**
* {@inheritdoc}
*
* @return RedisMessage
*/
public function createTransportMessage(Message $message)
{
$properties = $message->getProperties();

$headers = $message->getHeaders();
$headers['content_type'] = $message->getContentType();

$transportMessage = $this->context->createMessage();
$transportMessage->setBody($message->getBody());
$transportMessage->setHeaders($headers);
$transportMessage->setProperties($properties);
$transportMessage->setMessageId($message->getMessageId());
$transportMessage->setTimestamp($message->getTimestamp());
$transportMessage->setReplyTo($message->getReplyTo());
$transportMessage->setCorrelationId($message->getCorrelationId());

return $transportMessage;
}

/**
* @param RedisMessage $message
*
* {@inheritdoc}
*/
public function createClientMessage(PsrMessage $message)
{
$clientMessage = new Message();

$clientMessage->setBody($message->getBody());
$clientMessage->setHeaders($message->getHeaders());
$clientMessage->setProperties($message->getProperties());

$clientMessage->setContentType($message->getHeader('content_type'));
$clientMessage->setMessageId($message->getMessageId());
$clientMessage->setTimestamp($message->getTimestamp());
$clientMessage->setPriority(MessagePriority::NORMAL);
$clientMessage->setReplyTo($message->getReplyTo());
$clientMessage->setCorrelationId($message->getCorrelationId());

return $clientMessage;
}

/**
* @return Config
*/
public function getConfig()
{
return $this->config;
}
}
2 changes: 1 addition & 1 deletion PRedis.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public function lpush($key, $value)
public function brpop($key, $timeout)
{
try {
if ($result = $this->redis->brpop($key, $timeout)) {
if ($result = $this->redis->brpop([$key], $timeout)) {
return $result[1];
}
} catch (PRedisServerException $e) {
Expand Down
11 changes: 10 additions & 1 deletion RedisConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,16 @@ public function getQueue()
*/
public function receive($timeout = 0)
{
if ($message = $this->getRedis()->brpop($this->queue->getName(), (int) $timeout / 1000)) {
$timeout = (int) ($timeout / 1000);
if (empty($timeout)) {
// Caused by
// Predis\Response\ServerException: ERR timeout is not an integer or out of range
// /mqdev/vendor/predis/predis/src/Client.php:370

return $this->receiveNoWait();
}

if ($message = $this->getRedis()->brpop($this->queue->getName(), $timeout)) {
return RedisMessage::jsonUnserialize($message);
}
}
Expand Down
114 changes: 114 additions & 0 deletions Symfony/RedisTransportFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
<?php

namespace Enqueue\Redis\Symfony;

use Enqueue\Redis\Client\RedisDriver;
use Enqueue\Redis\RedisConnectionFactory;
use Enqueue\Redis\RedisContext;
use Enqueue\Symfony\TransportFactoryInterface;
use Symfony\Component\Config\Definition\Builder\ArrayNodeDefinition;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Definition;
use Symfony\Component\DependencyInjection\Reference;

class RedisTransportFactory implements TransportFactoryInterface
{
/**
* @var string
*/
private $name;

/**
* @param string $name
*/
public function __construct($name = 'redis')
{
$this->name = $name;
}

/**
* {@inheritdoc}
*/
public function addConfiguration(ArrayNodeDefinition $builder)
{
$builder
->children()
->scalarNode('host')
->isRequired()
->cannotBeEmpty()
->info('can be a host, or the path to a unix domain socket')
->end()
->integerNode('port')->end()
->enumNode('vendor')
->values(['phpredis', 'predis'])
->isRequired()
->cannotBeEmpty()
->info('The library used internally to interact with Redis server')
->end()
->booleanNode('persisted')
->defaultFalse()
->info('bool, Whether it use single persisted connection or open a new one for every context')
->end()
->booleanNode('lazy')
->defaultTrue()
->info('the connection will be performed as later as possible, if the option set to true')
->end()
;
}

/**
* {@inheritdoc}
*/
public function createConnectionFactory(ContainerBuilder $container, array $config)
{
$factory = new Definition(RedisConnectionFactory::class);
$factory->setArguments([$config]);

$factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName());
$container->setDefinition($factoryId, $factory);

return $factoryId;
}

/**
* {@inheritdoc}
*/
public function createContext(ContainerBuilder $container, array $config)
{
$factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName());

$context = new Definition(RedisContext::class);
$context->setFactory([new Reference($factoryId), 'createContext']);

$contextId = sprintf('enqueue.transport.%s.context', $this->getName());
$container->setDefinition($contextId, $context);

return $contextId;
}

/**
* {@inheritdoc}
*/
public function createDriver(ContainerBuilder $container, array $config)
{
$driver = new Definition(RedisDriver::class);
$driver->setArguments([
new Reference(sprintf('enqueue.transport.%s.context', $this->getName())),
new Reference('enqueue.client.config'),
new Reference('enqueue.client.meta.queue_meta_registry'),
]);

$driverId = sprintf('enqueue.client.%s.driver', $this->getName());
$container->setDefinition($driverId, $driver);

return $driverId;
}

/**
* {@inheritdoc}
*/
public function getName()
{
return $this->name;
}
}
Loading

0 comments on commit 14e0fd8

Please sign in to comment.