Feel free to contribute or give any feedback.
- Installation
- Connection
- Publish Subscribe
- Request Response
- JetStream Api Usage
- Key Value Storage
- Using NKeys with JWT
- Performance
- Configuration Options
The recommended way to install the library is through Composer:
$ composer require basis-company/nats
The NKeys functionality requires Ed25519, which is provided in libsodium
extension or sodium_compat
package.
use Basis\Nats\Client;
use Basis\Nats\Configuration;
// this is default options, you can override anyone
$configuration = new Configuration([
'host' => 'localhost',
'jwt' => null,
'lang' => 'php',
'pass' => null,
'pedantic' => false,
'port' => 4222,
'reconnect' => true,
'timeout' => 1,
'token' => null,
'user' => null,
'nkey' => null,
'verbose' => false,
'version' => 'dev',
]);
// default delay mode is constant - first retry be in 1ms, second in 1ms, third in 1ms
$configuration->setDelay(0.001);
// linear delay mode - first retry be in 1ms, second in 2ms, third in 3ms, fourth in 4ms, etc...
$configuration->setDelay(0.001, Configuration::DELAY_LINEAR);
// exponential delay mode - first retry be in 10ms, second in 100ms, third in 1s, fourth if 10 seconds, etc...
$configuration->setDelay(0.01, Configuration::DELAY_EXPONENTIAL);
$client = new Client($configuration);
$client->ping(); // true
Typically, when connecting to a cluster with TLS enabled the connection settings do not change. The client lib will automatically switch over to TLS 1.2. However, if you're using a self-signed certificate you may have to point to your local CA file using the tlsCaFile setting.
When connecting to a nats cluster that requires the client to provide TLS certificates use the tlsCertFile and tlsKeyFile to point at your local TLS certificate and private key file.
Nats Server documentation for:
Connection settings when connecting to a nats server that has TLS and TLS Client verify enabled.
use Basis\Nats\Client;
use Basis\Nats\Configuration;
// this is default options, you can override anyone
$configuration = new Configuration([
'host' => 'localhost',
'jwt' => null,
'lang' => 'php',
'pass' => null,
'pedantic' => false,
'port' => 4222,
'reconnect' => true,
'timeout' => 1,
'token' => null,
'user' => null,
'nkey' => null,
'verbose' => false,
'version' => 'dev',
'tlsCertFile' => "./certs/client-cert.pem",
'tlsKeyFile' => "./certs/client-key.pem",
'tlsCaFile' => "./certs/client-key.pem",
]);
$configuration->setDelay(0.001);
$client = new Client($configuration);
$client->ping(); // true
$client->subscribe('hello', function ($message) {
var_dump('got message', $message); // tester
});
$client->publish('hello', 'tester');
$client->process();
// if you want to append some headers, construct payload manually
use Basis\Nats\Message\Payload;
$payload = new Payload('tester', [
'Nats-Msg-Id' => 'payload-example'
]);
$client->publish('hello', $payload);
There is a simple wrapper over publish and feedback processing, so payload can be constructed manually same way.
$client->subscribe('hello.request', function ($name) {
return "Hello, " . $name;
});
// async interaction
$client->request('hello.request', 'Nekufa1', function ($response) {
var_dump($response); // Hello, Nekufa1
});
$client->process(); // process request
// sync interaction (block until response get back)
$client->dispatch('hello.request', 'Nekufa2'); // Hello, Nekufa2
use Basis\Nats\Stream\RetentionPolicy;
use Basis\Nats\Stream\StorageBackend;
$accountInfo = $client->getApi()->getInfo(); // account_info_response object
$stream = $client->getApi()->getStream('mailer');
$stream->getConfiguration()
->setRetentionPolicy(RetentionPolicy::WORK_QUEUE)
->setStorageBackend(StorageBackend::MEMORY)
->setSubjects(['mailer.greet', 'mailer.bye']);
// stream is created with given configuration
$stream->create();
// and put some tasks so workers would be doing something
$stream->put('mailer.greet', '[email protected]');
$stream->put('mailer.bye', '[email protected]');
var_dump($stream->info()); // can stream info
// this should be set in your worker
$greeter = $stream->getConsumer('greeter');
$greeter->getConfiguration()->setSubjectFilter('mailer.greet');
// consumer would be created would on first handle call
$greeter->handle(function ($address) {
mail($address, "Hi there!");
});
var_dump($greater->info()); // can consumer info
$goodbyer = $stream->getConsumer('goodbyer');
$goodbyer->getConfiguration()->setSubjectFilter('mailer.bye');
$goodbyer->create(); // create consumer if you don't want to handle anything right now
$goodbyer->handle(function ($address) {
mail($address, "See you later");
});
// you can configure batching and iteration count using chain api
$goodbyer
->setBatching(2) // how many messages would be requested from nats stream
->setIterations(3) // how many times message request should be sent
->handle(function () {
// if you need to break on next iteration simply call interrupt method
// batch will be processed to the end and the handling would be stopped
// $goodbyer->interrupt();
});
// you also can create ephemeral consumer
// the only thing that ephemeral consumer is created as soon as object is created
// you have to create full consumer configuration first
use Basis\Nats\Consumer\Configuration as ConsumerConfiguration;
use Basis\Nats\Consumer\DeliverPolicy;
$configuration = (new ConsumerConfiguration($stream->getName()))
->setDeliverPolicy(DeliverPolicy::NEW)
->setSubjectFilter('mailer.greet');
$ephemeralConsumer = $stream->createEphemeralConsumer($configuration);
// now you can use ephemeral consumer in the same way as durable consumer
$ephemeralConsumer->handle(function ($address) {
mail($address, "Hi there!");
});
// the only difference - you don't have to remove it manually, it will be deleted by NATS when socket connection is closed
// be aware that NATS will not remove that consumer immediately, process can take few seconds
var_dump(
$ephemeralConsumer->getName(),
$ephemeralConsumer->info(),
);
// if you need to append some headers, construct payload manually
use Basis\Nats\Message\Payload;
$payload = new Payload('[email protected]', [
'Nats-Msg-Id' => 'single-send'
]);
$stream->put('mailer.bye', $payload);
$bucket = $client->getApi()->getBucket('bucket_name');
// basics
$bucket->put('username', 'nekufa');
echo $bucket->get('username'); // nekufa
// safe update (given revision)
$entry = $bucket->getEntry('username');
echo $entry->value; // nekufa
$bucket->update('username', 'bazyaba', $entry->revision);
// delete value
$bucket->delete('username');
// purge value history
$bucket->purge('username');
// get bucket stats
var_dump($bucket->getStatus());
// in advance, you can fetch all bucket values
$bucket->update('email', '[email protected]');
var_dump($bucket->getAll()); // ['email' => '[email protected]', 'username' => 'nekufa']
To use NKeys with JWT, simply provide them in the Configuration
options as jwt
and nkey
.
You can also provide a credentials file with CredentialsParser
use Basis\Nats\Client;
use Basis\Nats\Configuration;
use Basis\Nats\NKeys\CredentialsParser;
$configuration = new Configuration(
[
'host' => 'localhost',
'port' => 4222
],
CredentialsParser::fromFile($credentialPath)
);
$client = new Client($configuration);
Testing on i5-4670k with nats running in docker gives 420k rps for publish and 350k rps for receive in non-verbose mode.
You can run tests on your environment.
% wget https://getcomposer.org/download/latest-stable/composer.phar
...
Saving to: ‘composer.phar’
% ./composer.phar install
Installing dependencies from lock file (including require-dev)
...
% export NATS_HOST=0.0.0.0
% export NATS_PORT=4222
% export NATS_CLIENT_LOG=1
% composer run perf-test
PHPUnit 9.5.10 by Sebastian Bergmann and contributors.
Runtime: PHP 8.1.1
Configuration: /home/nekufa/software/github/nats.php/phpunit.xml.dist
Warning: No code coverage driver available
[2022-01-19T10:42:14.008230+00:00] SubjectTest.testPerformance.INFO: start performance test [] []
[2022-01-19T10:42:14.246606+00:00] SubjectTest.testPerformance.INFO: publishing {"rps":421871.0,"length":100000,"time":0.23703885078430176} []
[2022-01-19T10:42:14.530670+00:00] SubjectTest.testPerformance.INFO: processing {"rps":355120.0,"length":100000,"time":0.2839939594268799} []
% export NATS_CLIENT_VERBOSE=1
% composer run perf-test
PHPUnit 9.5.10 by Sebastian Bergmann and contributors.
Runtime: PHP 8.1.1
Configuration: /home/nekufa/software/github/nats.php/phpunit.xml.dist
Warning: No code coverage driver available
[2022-01-19T10:42:21.319838+00:00] SubjectTest.testPerformance.INFO: start performance test [] []
[2022-01-19T10:42:21.766501+00:00] SubjectTest.testPerformance.INFO: publishing {"rps":224640.0,"length":100000,"time":0.4451560974121094} []
[2022-01-19T10:42:21.922010+00:00] SubjectTest.testPerformance.INFO: processing {"rps":353317.0,"length":100000,"time":0.15544414520263672} []
. 1 / 1 (100%)
nekufa@fasiga ~ % cat /proc/cpuinfo | grep i5
model name : Intel(R) Core(TM) i5-4670K CPU @ 3.40GHz
The following is the list of configuration options and default values.
Option | Default | Description |
---|---|---|
inboxPrefix |
"_INBOX" |
Sets de prefix for automatically created inboxes |
jwt |
Token for JWT Authentication. Alternatively you can use CredentialsParser | |
nkey |
Ed25519 based public key signature used for NKEY Authentication. | |
pass |
Sets the password for a connection. | |
pedantic |
false |
Turns on strict subject format checks. |
pingInterval |
2 |
Number of seconds between client-sent pings. |
port |
4222 |
Port to connect to (only used if servers is not specified). |
timeout |
1 | Number of seconds the client will wait for a connection to be established. |
token |
Sets a authorization token for a connection. | |
tlsKeyFile |
TLS 1.2 Client key file path. | |
tlsCertFile |
TLS 1.2 Client certificate file path. | |
tlsCaFile |
TLS 1.2 CA certificate filepath. | |
user |
Sets the username for a connection. | |
verbose |
false |
Turns on +OK protocol acknowledgements. |