-
-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
3 changed files
with
283 additions
and
8 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
<?php declare(strict_types=1); | ||
|
||
namespace Amp\Pipeline\Internal; | ||
|
||
use Amp\Cancellation; | ||
use Amp\DeferredCancellation; | ||
use Amp\Future; | ||
use Amp\Pipeline\ConcurrentIterator; | ||
use Amp\Pipeline\Queue; | ||
use Revolt\EventLoop; | ||
use function Amp\async; | ||
|
||
/** | ||
* @internal | ||
* | ||
* @template-covariant T | ||
* @template-implements ConcurrentIterator<T> | ||
*/ | ||
final class ConcurrentMergedIterator implements ConcurrentIterator | ||
{ | ||
/** @var ConcurrentIterator<T> */ | ||
private readonly ConcurrentIterator $iterator; | ||
|
||
private readonly DeferredCancellation $deferredCancellation; | ||
|
||
/** | ||
* @param ConcurrentIterator<T>[] $iterators | ||
*/ | ||
public function __construct(array $iterators) | ||
{ | ||
foreach ($iterators as $key => $iterator) { | ||
if (!$iterator instanceof ConcurrentIterator) { | ||
throw new \TypeError(\sprintf( | ||
'Argument #1 ($iterators) must be of type array<%s>, %s given at key %s', | ||
ConcurrentIterator::class, | ||
\get_debug_type($iterator), | ||
$key | ||
)); | ||
} | ||
} | ||
|
||
$queue = new Queue(\count($iterators)); | ||
$this->iterator = $queue->iterate(); | ||
|
||
$this->deferredCancellation = $deferredCancellation = new DeferredCancellation(); | ||
$cancellation = $this->deferredCancellation->getCancellation(); | ||
|
||
$futures = []; | ||
foreach ($iterators as $iterator) { | ||
$futures[] = async(static function () use ($iterator, $queue, $cancellation): void { | ||
try { | ||
while ($iterator->continue($cancellation)) { | ||
if ($queue->isComplete()) { | ||
return; | ||
} | ||
|
||
$queue->push($iterator->getValue()); | ||
} | ||
} finally { | ||
$iterator->dispose(); | ||
} | ||
}); | ||
} | ||
|
||
EventLoop::queue(static function () use ($futures, $queue, $deferredCancellation): void { | ||
try { | ||
Future\await($futures); | ||
$queue->complete(); | ||
} catch (\Throwable $exception) { | ||
$queue->error($exception); | ||
} finally { | ||
$deferredCancellation->cancel(); | ||
} | ||
}); | ||
} | ||
|
||
public function continue(?Cancellation $cancellation = null): bool | ||
{ | ||
return $this->iterator->continue($cancellation); | ||
} | ||
|
||
public function getValue(): mixed | ||
{ | ||
return $this->iterator->getValue(); | ||
} | ||
|
||
public function getPosition(): int | ||
{ | ||
return $this->iterator->getPosition(); | ||
} | ||
|
||
public function isComplete(): bool | ||
{ | ||
return $this->iterator->isComplete(); | ||
} | ||
|
||
public function dispose(): void | ||
{ | ||
$this->iterator->dispose(); | ||
$this->deferredCancellation->cancel(); | ||
} | ||
|
||
public function getIterator(): \Traversable | ||
{ | ||
return $this->iterator; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,141 @@ | ||
<?php declare(strict_types=1); | ||
|
||
namespace Amp\Pipeline; | ||
|
||
use Amp\Future; | ||
use Amp\PHPUnit\AsyncTestCase; | ||
use Amp\PHPUnit\TestException; | ||
use function Amp\async; | ||
use function Amp\delay; | ||
|
||
class MergeTest extends AsyncTestCase | ||
{ | ||
public function getArrays(): array | ||
{ | ||
return [ | ||
[[\range(1, 3), \range(4, 6)], [1, 4, 2, 5, 3, 6]], | ||
[[\range(1, 5), \range(6, 8)], [1, 6, 2, 7, 3, 8, 4, 5]], | ||
[[\range(1, 4), \range(5, 10)], [1, 5, 2, 6, 3, 7, 4, 8, 9, 10]], | ||
]; | ||
} | ||
|
||
/** | ||
* @dataProvider getArrays | ||
*/ | ||
public function testMerge(array $array, array $expected): void | ||
{ | ||
$pipelines = \array_map( | ||
fn (array $iterator) => Pipeline::fromIterable($iterator) | ||
->tap(fn () => delay(0.01)), | ||
$array, | ||
); | ||
|
||
$pipeline = Pipeline::merge($pipelines); | ||
|
||
self::assertSame($expected, $pipeline->toArray()); | ||
} | ||
|
||
/** | ||
* @dataProvider getArrays | ||
* @depends testMerge | ||
*/ | ||
public function testMergeWithConcurrentMap(array $array, array $expected): void | ||
{ | ||
$mapper = static fn (int $value) => $value * 10; | ||
|
||
$pipelines = \array_map( | ||
fn (array $iterator) => Pipeline::fromIterable($iterator) | ||
->concurrent(3) | ||
->map($mapper), | ||
$array, | ||
); | ||
|
||
$pipeline = Pipeline::merge($pipelines); | ||
|
||
self::assertSame(\array_map($mapper, $expected), $pipeline->toArray()); | ||
} | ||
|
||
/** | ||
* @depends testMerge | ||
*/ | ||
public function testMergeWithDelayedYields(): void | ||
{ | ||
$pipelines = []; | ||
$values1 = [$this->asyncValue(0.01, 1), $this->asyncValue(0.05, 2), $this->asyncValue(0.07, 3)]; | ||
$values2 = [$this->asyncValue(0.02, 4), $this->asyncValue(0.04, 5), $this->asyncValue(0.06, 6)]; | ||
|
||
$pipelines[] = Pipeline::fromIterable(function () use ($values1) { | ||
foreach ($values1 as $value) { | ||
yield $value->await(); | ||
} | ||
}); | ||
|
||
$pipelines[] = Pipeline::fromIterable(function () use ($values2) { | ||
foreach ($values2 as $value) { | ||
yield $value->await(); | ||
} | ||
}); | ||
|
||
$pipeline = Pipeline::merge($pipelines); | ||
|
||
self::assertSame([1, 4, 5, 2, 6, 3], $pipeline->toArray()); | ||
} | ||
|
||
/** | ||
* @depends testMerge | ||
*/ | ||
public function testDisposedMerge(): void | ||
{ | ||
$pipelines = []; | ||
|
||
$pipelines[] = Pipeline::fromIterable([1, 2, 3, 4, 5])->tap(fn () => delay(0.1)); | ||
$pipelines[] = Pipeline::fromIterable([6, 7, 8, 9, 10])->tap(fn () => delay(0.1)); | ||
|
||
$iterator = Pipeline::merge($pipelines)->getIterator(); | ||
|
||
$this->expectException(DisposedException::class); | ||
$this->setTimeout(0.3); | ||
|
||
while ($iterator->continue()) { | ||
if ($iterator->getValue() === 7) { | ||
$iterator->dispose(); | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* @depends testMerge | ||
*/ | ||
public function testMergeWithFailedPipeline(): void | ||
{ | ||
$exception = new TestException(); | ||
$generator = Pipeline::fromIterable(function () use ($exception) { | ||
yield 1; // Emit once before failing. | ||
throw $exception; | ||
}); | ||
|
||
$pipeline = Pipeline::merge([$generator, Pipeline::fromIterable(\range(1, 5))]); | ||
|
||
try { | ||
$pipeline->forEach(fn () => null); | ||
self::fail("The exception used to fail the pipeline should be thrown from continue()"); | ||
} catch (TestException $reason) { | ||
self::assertSame($exception, $reason); | ||
} | ||
} | ||
|
||
public function testNonPipeline(): void | ||
{ | ||
$this->expectException(\TypeError::class); | ||
|
||
Pipeline::merge([1]); | ||
} | ||
|
||
private function asyncValue(float $delay, mixed $value): Future | ||
{ | ||
return async(static function () use ($delay, $value): mixed { | ||
delay($delay); | ||
return $value; | ||
}); | ||
} | ||
} |