Skip to content

Commit

Permalink
Fix take not completing without another emitted value
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Dec 23, 2023
1 parent f59dcf9 commit 8a0ecc2
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 5 deletions.
12 changes: 9 additions & 3 deletions src/Pipeline.php
Original file line number Diff line number Diff line change
Expand Up @@ -489,12 +489,18 @@ public function take(int $count): self
return $this->flatMap(static function (mixed $value) use ($count) {
static $i = 0;

if ($i++ < $count) {
if (++$i < $count) {
return [$value];
}

/** @var T[] */
return [FlatMapOperation::getStopMarker()];
/** @var T $stopMarker Fake stop marker as type T. */
$stopMarker = FlatMapOperation::getStopMarker();

if ($i === $count) {
return [$value, $stopMarker];
}

return [$stopMarker];
});
}

Expand Down
21 changes: 19 additions & 2 deletions test/TakeTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use Amp\PHPUnit\AsyncTestCase;
use Amp\PHPUnit\TestException;
use function Amp\delay;

class TakeTest extends AsyncTestCase
{
Expand All @@ -15,10 +16,26 @@ public function testValuesEmitted(): void
self::assertSame([1, 2], $pipeline->toArray());
}

public function testCompleteBeforeSourceCompletes()
{
$count = 3;
$this->setTimeout(0.1 * $count + 0.1);

$emitted = Pipeline::fromIterable(function () use ($count): \Generator {
for ($i = 0; $i < $count; ++$i) {
delay(0.1);
yield $i;
}
delay(1);
})->take($count)->toArray();

self::assertSame(\range(0, $count - 1), $emitted);
}

public function testPipelineFails(): void
{
$exception = new TestException;
$source = new Queue;
$exception = new TestException();
$source = new Queue();

$iterator = $source->pipe()->take(2)->getIterator();

Expand Down

0 comments on commit 8a0ecc2

Please sign in to comment.