Skip to content

Commit

Permalink
[redis] Add unit and func tests, fix bugs.
Browse files Browse the repository at this point in the history
  • Loading branch information
makasim committed Apr 20, 2017
1 parent ea9676c commit 6b0efaf
Show file tree
Hide file tree
Showing 14 changed files with 745 additions and 231 deletions.
20 changes: 18 additions & 2 deletions PRedis.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ public function lpush($key, $value)
public function brpop($key, $timeout)
{
try {
return $this->brpop($key, (int) $timeout / 1000);
if ($result = $this->redis->brpop($key, $timeout)) {
return $result[1];
}
} catch (PRedisServerException $e) {
throw new ServerException('brpop command has failed', null, $e);
}
Expand All @@ -48,19 +50,33 @@ public function brpop($key, $timeout)
*/
public function rpop($key)
{ try {
return $this->rpop($key);
return $this->redis->rpop($key);
} catch (PRedisServerException $e) {
throw new ServerException('rpop command has failed', null, $e);
}
}

/**
* {@inheritdoc}
*/
public function connect()
{
$this->redis->connect();
}

/**
* {@inheritdoc}
*/
public function disconnect()
{
$this->redis->disconnect();
}

/**
* {@inheritdoc}
*/
public function del($key)
{
$this->redis->del([$key]);
}
}
19 changes: 15 additions & 4 deletions PhpRedis.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,10 @@ class PhpRedis implements Redis
private $config;

/**
* @param \Redis $redis
* @param array $config
*/
public function __construct(\Redis $redis, array $config)
public function __construct(array $config)
{
$this->redis = $redis;

$this->config = array_replace([
'host' => null,
'port' => null,
Expand Down Expand Up @@ -59,6 +56,9 @@ public function rpop($key)
return $this->redis->rPop($key);
}

/**
* {@inheritdoc}
*/
public function connect()
{
if (false == $this->redis) {
Expand All @@ -84,10 +84,21 @@ public function connect()
return $this->redis;
}

/**
* {@inheritdoc}
*/
public function disconnect()
{
if ($this->redis) {
$this->redis->close();
}
}

/**
* {@inheritdoc}
*/
public function del($key)
{
$this->redis->del($key);
}
}
5 changes: 5 additions & 0 deletions Redis.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,9 @@ public function rpop($key);
public function connect();

public function disconnect();

/**
* @param string $key
*/
public function del($key);
}
19 changes: 10 additions & 9 deletions RedisConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,7 @@ public function createContext()
{
if ($this->config['lazy']) {
return new RedisContext(function () {
$redis = $this->createRedis();
$redis->connect();

return $redis;
return $this->createRedis();
});
}

Expand All @@ -78,12 +75,16 @@ public function createContext()
*/
private function createRedis()
{
if ('phpredis' == $this->config['vendor'] && false == $this->redis) {
$this->redis = new PhpRedis(new \Redis(), $this->config);
}
if (false == $this->redis) {
if ('phpredis' == $this->config['vendor'] && false == $this->redis) {
$this->redis = new PhpRedis($this->config);
}

if ('predis' == $this->config['vendor'] && false == $this->redis) {
$this->redis = new PRedis(new Client($this->config, ['exceptions' => true]));
}

if ('predis' == $this->config['vendor'] && false == $this->redis) {
$this->redis = new PRedis(new Client($this->config, ['exceptions' => true]));
$this->redis->connect();
}

return $this->redis;
Expand Down
24 changes: 23 additions & 1 deletion RedisContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
use Enqueue\Psr\InvalidDestinationException;
use Enqueue\Psr\PsrContext;
use Enqueue\Psr\PsrDestination;
use Enqueue\Psr\PsrQueue;
use Enqueue\Psr\PsrTopic;

class RedisContext implements PsrContext
{
Expand Down Expand Up @@ -68,6 +70,26 @@ public function createQueue($queueName)
return new RedisDestination($queueName);
}

/**
* @param RedisDestination|PsrQueue $queue
*/
public function deleteQueue(PsrQueue $queue)
{
InvalidDestinationException::assertDestinationInstanceOf($queue, RedisDestination::class);

$this->getRedis()->del($queue->getName());
}

/**
* @param RedisDestination|PsrTopic $topic
*/
public function deleteTopic(PsrTopic $topic)
{
InvalidDestinationException::assertDestinationInstanceOf($topic, RedisDestination::class);

$this->getRedis()->del($topic->getName());
}

/**
* {@inheritdoc}
*/
Expand Down Expand Up @@ -102,7 +124,7 @@ public function createConsumer(PsrDestination $destination)

public function close()
{
$this->getRedis()->close();
$this->getRedis()->disconnect();
}

/**
Expand Down
127 changes: 127 additions & 0 deletions Tests/Functional/CommonUseCasesTrait.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
<?php

namespace Enqueue\Redis\Tests\Functional;

use Enqueue\Redis\RedisContext;
use Enqueue\Redis\RedisMessage;

trait CommonUseCasesTrait
{
public function testWaitsForTwoSecondsAndReturnNullOnReceive()
{
$queue = $this->getContext()->createQueue('enqueue.test_queue');

$startAt = microtime(true);

$consumer = $this->getContext()->createConsumer($queue);
$message = $consumer->receive(2000);

$endAt = microtime(true);

$this->assertNull($message);

$this->assertGreaterThan(1.5, $endAt - $startAt);
$this->assertLessThan(2.5, $endAt - $startAt);
}

public function testReturnNullImmediatelyOnReceiveNoWait()
{
$queue = $this->getContext()->createQueue('enqueue.test_queue');

$startAt = microtime(true);

$consumer = $this->getContext()->createConsumer($queue);
$message = $consumer->receiveNoWait();

$endAt = microtime(true);

$this->assertNull($message);

$this->assertLessThan(0.5, $endAt - $startAt);
}

public function testProduceAndReceiveOneMessageSentDirectlyToQueue()
{
$queue = $this->getContext()->createQueue('enqueue.test_queue');

$message = $this->getContext()->createMessage(
__METHOD__,
['FooProperty' => 'FooVal'],
['BarHeader' => 'BarVal']
);

$producer = $this->getContext()->createProducer();
$producer->send($queue, $message);

$consumer = $this->getContext()->createConsumer($queue);
$message = $consumer->receive(1000);

$this->assertInstanceOf(RedisMessage::class, $message);
$consumer->acknowledge($message);

$this->assertEquals(__METHOD__, $message->getBody());
$this->assertEquals(['FooProperty' => 'FooVal'], $message->getProperties());
$this->assertEquals(['BarHeader' => 'BarVal'], $message->getHeaders());
}

public function testProduceAndReceiveOneMessageSentDirectlyToTopic()
{
$topic = $this->getContext()->createTopic('enqueue.test_topic');

$message = $this->getContext()->createMessage(__METHOD__);

$producer = $this->getContext()->createProducer();
$producer->send($topic, $message);

$consumer = $this->getContext()->createConsumer($topic);
$message = $consumer->receive(1000);

$this->assertInstanceOf(RedisMessage::class, $message);
$consumer->acknowledge($message);

$this->assertEquals(__METHOD__, $message->getBody());
}

public function testConsumerReceiveMessageWithZeroTimeout()
{
$topic = $this->getContext()->createTopic('enqueue.test_topic');

$consumer = $this->getContext()->createConsumer($topic);

//guard
$this->assertNull($consumer->receive(1000));

$message = $this->getContext()->createMessage(__METHOD__);

$producer = $this->getContext()->createProducer();
$producer->send($topic, $message);
usleep(100);
$actualMessage = $consumer->receive(0);

$this->assertInstanceOf(RedisMessage::class, $actualMessage);
$consumer->acknowledge($message);

$this->assertEquals(__METHOD__, $message->getBody());
}

public function testShouldReceiveMessagesInExpectedOrder()
{
$queue = $this->getContext()->createQueue('enqueue.test_queue');

$producer = $this->getContext()->createProducer();
$producer->send($queue, $this->getContext()->createMessage(1));
$producer->send($queue, $this->getContext()->createMessage(2));
$producer->send($queue, $this->getContext()->createMessage(3));

$consumer = $this->getContext()->createConsumer($queue);

$this->assertSame(1, $consumer->receiveNoWait()->getBody());
$this->assertSame(2, $consumer->receiveNoWait()->getBody());
$this->assertSame(3, $consumer->receiveNoWait()->getBody());
}

/**
* @return RedisContext
*/
abstract protected function getContext();
}
42 changes: 42 additions & 0 deletions Tests/Functional/PRedisCommonUseCasesTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
<?php

namespace Enqueue\Redis\Tests\Functional;

use Enqueue\Redis\RedisContext;
use Enqueue\Test\RedisExtension;
use PHPUnit\Framework\TestCase;

/**
* @group functional
*/
class PRedisCommonUseCasesTest extends TestCase
{
use RedisExtension;
use CommonUseCasesTrait;

/**
* @var RedisContext
*/
private $context;

public function setUp()
{
$this->context = $this->buildPRedisContext();

$this->context->deleteQueue($this->context->createQueue('enqueue.test_queue'));
$this->context->deleteTopic($this->context->createTopic('enqueue.test_topic'));
}

public function tearDown()
{
$this->context->close();
}

/**
* {@inheritdoc}
*/
protected function getContext()
{
return $this->context;
}
}
42 changes: 42 additions & 0 deletions Tests/Functional/PhpRedisCommonUseCasesTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
<?php

namespace Enqueue\Redis\Tests\Functional;

use Enqueue\Redis\RedisContext;
use Enqueue\Test\RedisExtension;
use PHPUnit\Framework\TestCase;

/**
* @group functional
*/
class PhpRedisCommonUseCasesTest extends TestCase
{
use RedisExtension;
use CommonUseCasesTrait;

/**
* @var RedisContext
*/
private $context;

public function setUp()
{
$this->context = $this->buildPhpRedisContext();

$this->context->deleteQueue($this->context->createQueue('enqueue.test_queue'));
$this->context->deleteTopic($this->context->createTopic('enqueue.test_topic'));
}

public function tearDown()
{
$this->context->close();
}

/**
* {@inheritdoc}
*/
protected function getContext()
{
return $this->context;
}
}
Loading

0 comments on commit 6b0efaf

Please sign in to comment.