Skip to content

Releases: amphp/pipeline

1.2.2

19 Jan 15:57
v1.2.2
97cbf28
Compare
Choose a tag to compare

What's Changed

  • Fixed clearing back-pressure if a Queue was completed while containing emitted values which had not been consumed, then the consumer explicitly disposed (called ConcurrentIterator::dispose()) of the associated iterator without consuming those values. Prior, the futures were not properly resolved with a DisposedException, but now will resolve as expected.

Full Changelog: v1.2.1...v1.2.2

1.2.1

04 Jul 01:12
v1.2.1
66c0956
Compare
Choose a tag to compare

What's Changed

  • Fix a potential race condition when using a Queue to create a ConcurrentIterator. An Error with message "Must call suspend() before calling throw()" was thrown when a Cancellation provided to ConcurrentIterator::continue() was cancelled and the underlying Queue instance was simultaneously completed. See #22.

Full Changelog: v1.2.0...v1.2.1

1.2.0

10 Mar 14:57
v1.2.0
f1c2ce3
Compare
Choose a tag to compare

What's Changed

  • Added Pipeline::buffer(), which provides control of the number of values buffered by the pipeline before back-pressure is applied to the data source by @trowski in #21

Full Changelog: v1.1.0...v1.2.0

1.1.0

23 Dec 04:41
v1.1.0
8a0ecc2
Compare
Choose a tag to compare
  • Added Pipeline::merge() which combines multiple iterators, emitting a value whenever any iterator emits a value.
  • Fixed Pipeline::take() not completing until a value beyond the given count was emitted. The pipeline now completes immediately after emitting the last view.
  • Marked the template type of ConcurrentIterator as covariant.

1.0.0

23 Dec 16:56
v1.0.0
810dee4
Compare
Choose a tag to compare

Initial stable release 🎉

Changes from 1.0.0 Beta 7

  • Marked ConcurrentArrayIterator, ConcurrentChainedIterator, and ConcurrentIterableIterator as @internal. Instead of these classes, use Pipeline::fromIterable() or Pipeline::concat()
  • Pipeline::concat() now accepts an array of any iterable, not only other Pipeline objects

1.0.0 Beta 7

18 Nov 15:40
v1.0.0-beta.7
419f1ff
Compare
Choose a tag to compare
1.0.0 Beta 7 Pre-release
Pre-release
  • Removed failing a Queue that is destructed without being completed. PHP's random destruct order sometimes will lead to the Queue destructor being invoked before another destructor that would have completed the queue.

1.0.0 Beta 6

07 Nov 21:37
v1.0.0-beta.6
c1580cf
Compare
Choose a tag to compare
1.0.0 Beta 6 Pre-release
Pre-release
  • Add compatibility with Revolt v1.x
  • Improve ConcurrentIterableIterator

1.0.0 Beta 5

10 Apr 01:13
v1.0.0-beta.5
dc38b82
Compare
Choose a tag to compare
1.0.0 Beta 5 Pre-release
Pre-release
  • Added isComplete() to the ConcurrentIterator interface that returns true when the iterator has been completed (either successfully or with an error) and no further values are pending)
  • Fixed an issue where a reference to the prior value emitted on a ConcurrentIterator was held while awaiting the next value.

1.0.0 Beta 4

24 Feb 20:43
v1.0.0-beta.4
c21db4d
Compare
Choose a tag to compare
1.0.0 Beta 4 Pre-release
Pre-release
  • PHP 8.1 is now required.
  • Fixed circular references in ConcurrentIterableIterator and ConcurrentFlatMapIterator that prevented quick garbage collection, particularly problematic with instances created from Pipeline::fromIterable() using a generator.

1.0.0 Beta 3

30 Jan 20:03
c2a5be2
Compare
Choose a tag to compare
1.0.0 Beta 3 Pre-release
Pre-release
  • Pipeline has been changed from an interface to a final class. ConcurrentIterator acts as the interface replacement
  • Pipeline::pipe() has been removed in favor of operator methods directly on Pipeline, such as map() and filter()
  • Emitter has been renamed to Queue
    • yield() has been renamed to push()
    • emit() has been renamed to pushAsync()
  • All functions in the Amp\Pipeline have been removed.
    • fromIterable() is available as Pipeline::fromIterable()
    • concat() is now Pipeline::concat()
    • Most operators are available directly on Pipeline
  • Added Pipeline::generate() that invokes a closure to create each pipeline value.

Example of using Pipeline for concurrency:

use Amp\Pipeline\Pipeline;
use function Amp\delay;

$pipeline = Pipeline::fromIterable(function (): \Generator {
    for ($i = 0; $i < 100; ++$i) {
        yield $i;
    }
});

$results = $pipeline->concurrent(10)
        ->tap(fn () => delay(\random_int(1, 10) / 10))  // Delay for 0.1 to 1 seconds, simulating I/O.
        ->map(fn (int $input): int => $input * 10)
        ->filter(fn (int $input) => $input % 3 === 0); // Filter only values divisible by 3.

foreach ($results as $value) {
    echo $value, "\n";
}