diff --git a/src/Consumer/Consumer.php b/src/Consumer/Consumer.php index 5f0cb9f..558c605 100644 --- a/src/Consumer/Consumer.php +++ b/src/Consumer/Consumer.php @@ -4,6 +4,7 @@ namespace Basis\Nats\Consumer; +use Basis\Nats\Message\Payload; use Closure; use Basis\Nats\Client; @@ -115,10 +116,21 @@ public function handle(Closure $handler, Closure $emptyHandler = null, bool $ack $this->create(); $this->client->subscribe($handlerSubject, function ($message, $replyTo) use ($handler, $runtime) { - if (!$message->isEmpty()) { + if (!($message instanceof Payload)) { + return; + } + + $kv_operation = $message->getHeader('KV-Operation'); + + // Consuming deleted or purged messages must not stop processing messages as more + // messages might arrive after this. + if (!$message->isEmpty() || $kv_operation === 'DEL' || $kv_operation === 'PURGE') { $runtime->empty = false; $runtime->processed++; - $handler($message, $replyTo); + + if (!$message->isEmpty()) { + $handler($message, $replyTo); + } } }); diff --git a/src/KeyValue/Bucket.php b/src/KeyValue/Bucket.php index c96a6ef..1caf414 100644 --- a/src/KeyValue/Bucket.php +++ b/src/KeyValue/Bucket.php @@ -5,6 +5,7 @@ namespace Basis\Nats\KeyValue; use Basis\Nats\Client; +use Basis\Nats\Consumer\Configuration as ConsumerConfiguration; use Basis\Nats\Stream\Stream; use Basis\Nats\Message\Payload; @@ -47,6 +48,39 @@ public function getEntry(string $key): ?Entry return new Entry($this->name, $key, $value, $revision); } + /** + * @return Entry[] + */ + public function getAll(): array + { + $entries = []; + + $stream = $this->getStream(); + if (!$stream->exists()) { + return $entries; + } + + $stream_name = $stream->getName(); + $configuration = new ConsumerConfiguration($stream_name); + $consumer = $stream->createEphemeralConsumer($configuration); + $subject_prefix_length = 1 + strlen(sprintf('$KV.%s', $this->name)); + + $consumer->handle(function (Payload $payload) use (&$entries, $subject_prefix_length) { + if ($payload->subject === null) { + return; + } + + $key = substr($payload->subject, $subject_prefix_length); + $entries[] = new Entry('', $key, $payload->body, 0); + }, function () use ($consumer) { + $consumer->interrupt(); + }); + + $consumer->delete(); + + return $entries; + } + public function getStatus(): Status { return new Status($this->name, $this->getStream()->info()); diff --git a/tests/Functional/KeyValue/BucketTest.php b/tests/Functional/KeyValue/BucketTest.php index bed783f..7306f44 100644 --- a/tests/Functional/KeyValue/BucketTest.php +++ b/tests/Functional/KeyValue/BucketTest.php @@ -4,6 +4,7 @@ namespace Tests\Functional\KeyValue; +use Basis\Nats\KeyValue\Entry; use Tests\FunctionalTestCase; class BucketTest extends FunctionalTestCase @@ -53,4 +54,66 @@ public function testBasics() $this->assertCount(1, json_decode($bucket->get('service_handlers'))); } + + public function testGetAll() + { + $bucket = $this->createClient() + ->getApi() + ->getBucket('test_bucket'); + + $this->assertSame(0, $bucket->getStatus()->values); + + $kv_pairs = [ + 'KEY1' => 'value1', + 'KEY2' => 'value2', + 'KEY3' => 'value3', + ]; + + foreach ($kv_pairs as $key => $value) { + $bucket->put($key, $value); + } + + $this->assertSame(count($kv_pairs), $bucket->getStatus()->values); + $actual_entries = $this->entriesAsAssocArray($bucket->getAll()); + $this->assertEquals($kv_pairs, $actual_entries); + } + + public function testGetAllAfterPurge() + { + $bucket = $this->createClient() + ->getApi() + ->getBucket('test_bucket'); + + $this->assertSame(0, $bucket->getStatus()->values); + + $bucket->put('KEY1', 'value1'); + $bucket->purge('KEY1'); + + $kv_pairs = [ + 'KEY2' => 'value2', + 'KEY3' => 'value3', + ]; + + foreach ($kv_pairs as $key => $value) { + $bucket->put($key, $value); + } + + $actual_entries = $this->entriesAsAssocArray($bucket->getAll()); + $this->assertEquals($kv_pairs, $actual_entries); + } + + /** + * @param Entry[] $entries + * @return array + */ + private function entriesAsAssocArray(array $entries): array + { + $assoc = []; + + foreach ($entries as $entry) { + $assoc[$entry->key] = $entry->value; + } + + return $assoc; + } }