diff --git a/src/Internal/ConcurrentMergedIterator.php b/src/Internal/ConcurrentMergedIterator.php new file mode 100644 index 0000000..5020d4d --- /dev/null +++ b/src/Internal/ConcurrentMergedIterator.php @@ -0,0 +1,107 @@ + + */ +final class ConcurrentMergedIterator implements ConcurrentIterator +{ + /** @var ConcurrentIterator */ + private readonly ConcurrentIterator $iterator; + + private readonly DeferredCancellation $deferredCancellation; + + /** + * @param ConcurrentIterator[] $iterators + */ + public function __construct(array $iterators) + { + foreach ($iterators as $key => $iterator) { + if (!$iterator instanceof ConcurrentIterator) { + throw new \TypeError(\sprintf( + 'Argument #1 ($iterators) must be of type array<%s>, %s given at key %s', + ConcurrentIterator::class, + \get_debug_type($iterator), + $key + )); + } + } + + $queue = new Queue(\count($iterators)); + $this->iterator = $queue->iterate(); + + $this->deferredCancellation = $deferredCancellation = new DeferredCancellation(); + $cancellation = $this->deferredCancellation->getCancellation(); + + $futures = []; + foreach ($iterators as $iterator) { + $futures[] = async(static function () use ($iterator, $queue, $cancellation): void { + try { + while ($iterator->continue($cancellation)) { + if ($queue->isComplete()) { + return; + } + + $queue->push($iterator->getValue()); + } + } finally { + $iterator->dispose(); + } + }); + } + + EventLoop::queue(static function () use ($futures, $queue, $deferredCancellation): void { + try { + Future\await($futures); + $queue->complete(); + } catch (\Throwable $exception) { + $queue->error($exception); + } finally { + $deferredCancellation->cancel(); + } + }); + } + + public function continue(?Cancellation $cancellation = null): bool + { + return $this->iterator->continue($cancellation); + } + + public function getValue(): mixed + { + return $this->iterator->getValue(); + } + + public function getPosition(): int + { + return $this->iterator->getPosition(); + } + + public function isComplete(): bool + { + return $this->iterator->isComplete(); + } + + public function dispose(): void + { + $this->iterator->dispose(); + $this->deferredCancellation->cancel(); + } + + public function getIterator(): \Traversable + { + return $this->iterator; + } +} diff --git a/src/Pipeline.php b/src/Pipeline.php index 3d23ba5..f1c49ac 100644 --- a/src/Pipeline.php +++ b/src/Pipeline.php @@ -7,6 +7,7 @@ use Amp\Pipeline\Internal\ConcurrentChainedIterator; use Amp\Pipeline\Internal\ConcurrentClosureIterator; use Amp\Pipeline\Internal\ConcurrentIterableIterator; +use Amp\Pipeline\Internal\ConcurrentMergedIterator; use Amp\Pipeline\Internal\FlatMapOperation; use Amp\Pipeline\Internal\Sequence; use Amp\Pipeline\Internal\SortOperation; @@ -69,31 +70,57 @@ public static function generate(\Closure $supplier): Pipeline } /** - * Concatenates the given pipelines into a single pipeline in sequential order. + * Merges the given iterables into a single pipeline. The returned pipeline emits a value anytime one of the + * merged iterables produces a value. + * + * @template Ts + * + * @param array> $pipelines + *f + * @return self + */ + public static function merge(array $pipelines): self + { + return new self(new ConcurrentMergedIterator(self::mapToConcurrentIterators($pipelines))); + } + + /** + * Concatenates the given iterables into a single pipeline in sequential order. * * The prior pipeline must complete before values are taken from any subsequent pipelines. * * @template Ts * - * @param iterable[] $pipelines + * @param array> $pipelines * * @return self */ public static function concat(array $pipelines): self { - foreach ($pipelines as $key => $pipeline) { - if (!\is_iterable($pipeline)) { + return new self(new ConcurrentChainedIterator(self::mapToConcurrentIterators($pipelines))); + } + + /** + * @template Tk of array-key + * @template Ts + * + * @param array> $iterables + * + * @return array> + */ + private static function mapToConcurrentIterators(array $iterables): array + { + foreach ($iterables as $key => $iterable) { + if (!\is_iterable($iterable)) { throw new \TypeError(\sprintf( 'Argument #1 ($pipelines) must be of type array, %s given at key %s', - \get_debug_type($pipeline), + \get_debug_type($iterable), $key, )); } } - return new self(new ConcurrentChainedIterator( - \array_map(static fn (iterable $pipeline) => self::fromIterable($pipeline)->getIterator(), $pipelines) - )); + return \array_map(static fn (iterable $pipeline) => self::fromIterable($pipeline)->getIterator(), $iterables); } private int $concurrency = 1; diff --git a/test/MergeTest.php b/test/MergeTest.php new file mode 100644 index 0000000..2017af9 --- /dev/null +++ b/test/MergeTest.php @@ -0,0 +1,141 @@ + Pipeline::fromIterable($iterator) + ->tap(fn () => delay(0.01)), + $array, + ); + + $pipeline = Pipeline::merge($pipelines); + + self::assertSame($expected, $pipeline->toArray()); + } + + /** + * @dataProvider getArrays + * @depends testMerge + */ + public function testMergeWithConcurrentMap(array $array, array $expected): void + { + $mapper = static fn (int $value) => $value * 10; + + $pipelines = \array_map( + fn (array $iterator) => Pipeline::fromIterable($iterator) + ->concurrent(3) + ->map($mapper), + $array, + ); + + $pipeline = Pipeline::merge($pipelines); + + self::assertSame(\array_map($mapper, $expected), $pipeline->toArray()); + } + + /** + * @depends testMerge + */ + public function testMergeWithDelayedYields(): void + { + $pipelines = []; + $values1 = [$this->asyncValue(0.01, 1), $this->asyncValue(0.05, 2), $this->asyncValue(0.07, 3)]; + $values2 = [$this->asyncValue(0.02, 4), $this->asyncValue(0.04, 5), $this->asyncValue(0.06, 6)]; + + $pipelines[] = Pipeline::fromIterable(function () use ($values1) { + foreach ($values1 as $value) { + yield $value->await(); + } + }); + + $pipelines[] = Pipeline::fromIterable(function () use ($values2) { + foreach ($values2 as $value) { + yield $value->await(); + } + }); + + $pipeline = Pipeline::merge($pipelines); + + self::assertSame([1, 4, 5, 2, 6, 3], $pipeline->toArray()); + } + + /** + * @depends testMerge + */ + public function testDisposedMerge(): void + { + $pipelines = []; + + $pipelines[] = Pipeline::fromIterable([1, 2, 3, 4, 5])->tap(fn () => delay(0.1)); + $pipelines[] = Pipeline::fromIterable([6, 7, 8, 9, 10])->tap(fn () => delay(0.1)); + + $iterator = Pipeline::merge($pipelines)->getIterator(); + + $this->expectException(DisposedException::class); + $this->setTimeout(0.3); + + while ($iterator->continue()) { + if ($iterator->getValue() === 7) { + $iterator->dispose(); + } + } + } + + /** + * @depends testMerge + */ + public function testMergeWithFailedPipeline(): void + { + $exception = new TestException(); + $generator = Pipeline::fromIterable(function () use ($exception) { + yield 1; // Emit once before failing. + throw $exception; + }); + + $pipeline = Pipeline::merge([$generator, Pipeline::fromIterable(\range(1, 5))]); + + try { + $pipeline->forEach(fn () => null); + self::fail("The exception used to fail the pipeline should be thrown from continue()"); + } catch (TestException $reason) { + self::assertSame($exception, $reason); + } + } + + public function testNonPipeline(): void + { + $this->expectException(\TypeError::class); + + Pipeline::merge([1]); + } + + private function asyncValue(float $delay, mixed $value): Future + { + return async(static function () use ($delay, $value): mixed { + delay($delay); + return $value; + }); + } +}