diff --git a/Dockerfile b/Dockerfile index faff564..3bd181e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -9,6 +9,8 @@ ENV XMR_DEBUG=false ENV XMR_QUEUE_POLL=5 ENV XMR_QUEUE_SIZE=10 ENV XMR_IPV6PUBSUPPORT=false +ENV XMR_RELAY_OLD_MESSAGES=false +ENV XMR_RELAY_MESSAGES=false RUN apt-get update && apt-get install -y libzmq3-dev git \ && rm -rf /var/lib/apt/lists/* diff --git a/composer.json b/composer.json index 739f8a4..967997a 100644 --- a/composer.json +++ b/composer.json @@ -31,7 +31,8 @@ "react/react": "^1.4", "react/socket": "^1.16", "react/zmq": "^0.4.0", - "cboden/ratchet": "^0.4.4" + "cboden/ratchet": "^0.4.4", + "guzzlehttp/guzzle": "^7.9" }, "autoload": { "psr-4": { diff --git a/composer.lock b/composer.lock index 226ad28..3b65780 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "2962bb0981206b9c30166891cdec6983", + "content-hash": "58cf90f353cd3f8d2862b0ebb1dff300", "packages": [ { "name": "cboden/ratchet", @@ -172,6 +172,215 @@ }, "time": "2020-11-24T22:02:12+00:00" }, + { + "name": "guzzlehttp/guzzle", + "version": "7.9.2", + "source": { + "type": "git", + "url": "https://github.com/guzzle/guzzle.git", + "reference": "d281ed313b989f213357e3be1a179f02196ac99b" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/guzzle/guzzle/zipball/d281ed313b989f213357e3be1a179f02196ac99b", + "reference": "d281ed313b989f213357e3be1a179f02196ac99b", + "shasum": "" + }, + "require": { + "ext-json": "*", + "guzzlehttp/promises": "^1.5.3 || ^2.0.3", + "guzzlehttp/psr7": "^2.7.0", + "php": "^7.2.5 || ^8.0", + "psr/http-client": "^1.0", + "symfony/deprecation-contracts": "^2.2 || ^3.0" + }, + "provide": { + "psr/http-client-implementation": "1.0" + }, + "require-dev": { + "bamarni/composer-bin-plugin": "^1.8.2", + "ext-curl": "*", + "guzzle/client-integration-tests": "3.0.2", + "php-http/message-factory": "^1.1", + "phpunit/phpunit": "^8.5.39 || ^9.6.20", + "psr/log": "^1.1 || ^2.0 || ^3.0" + }, + "suggest": { + "ext-curl": "Required for CURL handler support", + "ext-intl": "Required for Internationalized Domain Name (IDN) support", + "psr/log": "Required for using the Log middleware" + }, + "type": "library", + "extra": { + "bamarni-bin": { + "bin-links": true, + "forward-command": false + } + }, + "autoload": { + "files": [ + "src/functions_include.php" + ], + "psr-4": { + "GuzzleHttp\\": "src/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Graham Campbell", + "email": "hello@gjcampbell.co.uk", + "homepage": "https://github.com/GrahamCampbell" + }, + { + "name": "Michael Dowling", + "email": "mtdowling@gmail.com", + "homepage": "https://github.com/mtdowling" + }, + { + "name": "Jeremy Lindblom", + "email": "jeremeamia@gmail.com", + "homepage": "https://github.com/jeremeamia" + }, + { + "name": "George Mponos", + "email": "gmponos@gmail.com", + "homepage": "https://github.com/gmponos" + }, + { + "name": "Tobias Nyholm", + "email": "tobias.nyholm@gmail.com", + "homepage": "https://github.com/Nyholm" + }, + { + "name": "Márk Sági-Kazár", + "email": "mark.sagikazar@gmail.com", + "homepage": "https://github.com/sagikazarmark" + }, + { + "name": "Tobias Schultze", + "email": "webmaster@tubo-world.de", + "homepage": "https://github.com/Tobion" + } + ], + "description": "Guzzle is a PHP HTTP client library", + "keywords": [ + "client", + "curl", + "framework", + "http", + "http client", + "psr-18", + "psr-7", + "rest", + "web service" + ], + "support": { + "issues": "https://github.com/guzzle/guzzle/issues", + "source": "https://github.com/guzzle/guzzle/tree/7.9.2" + }, + "funding": [ + { + "url": "https://github.com/GrahamCampbell", + "type": "github" + }, + { + "url": "https://github.com/Nyholm", + "type": "github" + }, + { + "url": "https://tidelift.com/funding/github/packagist/guzzlehttp/guzzle", + "type": "tidelift" + } + ], + "time": "2024-07-24T11:22:20+00:00" + }, + { + "name": "guzzlehttp/promises", + "version": "2.0.4", + "source": { + "type": "git", + "url": "https://github.com/guzzle/promises.git", + "reference": "f9c436286ab2892c7db7be8c8da4ef61ccf7b455" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/guzzle/promises/zipball/f9c436286ab2892c7db7be8c8da4ef61ccf7b455", + "reference": "f9c436286ab2892c7db7be8c8da4ef61ccf7b455", + "shasum": "" + }, + "require": { + "php": "^7.2.5 || ^8.0" + }, + "require-dev": { + "bamarni/composer-bin-plugin": "^1.8.2", + "phpunit/phpunit": "^8.5.39 || ^9.6.20" + }, + "type": "library", + "extra": { + "bamarni-bin": { + "bin-links": true, + "forward-command": false + } + }, + "autoload": { + "psr-4": { + "GuzzleHttp\\Promise\\": "src/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Graham Campbell", + "email": "hello@gjcampbell.co.uk", + "homepage": "https://github.com/GrahamCampbell" + }, + { + "name": "Michael Dowling", + "email": "mtdowling@gmail.com", + "homepage": "https://github.com/mtdowling" + }, + { + "name": "Tobias Nyholm", + "email": "tobias.nyholm@gmail.com", + "homepage": "https://github.com/Nyholm" + }, + { + "name": "Tobias Schultze", + "email": "webmaster@tubo-world.de", + "homepage": "https://github.com/Tobion" + } + ], + "description": "Guzzle promises library", + "keywords": [ + "promise" + ], + "support": { + "issues": "https://github.com/guzzle/promises/issues", + "source": "https://github.com/guzzle/promises/tree/2.0.4" + }, + "funding": [ + { + "url": "https://github.com/GrahamCampbell", + "type": "github" + }, + { + "url": "https://github.com/Nyholm", + "type": "github" + }, + { + "url": "https://tidelift.com/funding/github/packagist/guzzlehttp/promises", + "type": "tidelift" + } + ], + "time": "2024-10-17T10:06:22+00:00" + }, { "name": "guzzlehttp/psr7", "version": "2.7.0", @@ -374,6 +583,58 @@ ], "time": "2022-06-09T08:53:42+00:00" }, + { + "name": "psr/http-client", + "version": "1.0.3", + "source": { + "type": "git", + "url": "https://github.com/php-fig/http-client.git", + "reference": "bb5906edc1c324c9a05aa0873d40117941e5fa90" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/php-fig/http-client/zipball/bb5906edc1c324c9a05aa0873d40117941e5fa90", + "reference": "bb5906edc1c324c9a05aa0873d40117941e5fa90", + "shasum": "" + }, + "require": { + "php": "^7.0 || ^8.0", + "psr/http-message": "^1.0 || ^2.0" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "1.0.x-dev" + } + }, + "autoload": { + "psr-4": { + "Psr\\Http\\Client\\": "src/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "PHP-FIG", + "homepage": "https://www.php-fig.org/" + } + ], + "description": "Common interface for HTTP clients", + "homepage": "https://github.com/php-fig/http-client", + "keywords": [ + "http", + "http-client", + "psr", + "psr-18" + ], + "support": { + "source": "https://github.com/php-fig/http-client" + }, + "time": "2023-09-23T14:17:50+00:00" + }, { "name": "psr/http-factory", "version": "1.1.0", diff --git a/entrypoint.sh b/entrypoint.sh index adfeb5b..1d7d085 100755 --- a/entrypoint.sh +++ b/entrypoint.sh @@ -1,5 +1,26 @@ #!/bin/sh +# +# Copyright (C) 2024 Xibo Signage Ltd +# +# Xibo - Digital Signage - https://xibosignage.com +# +# This file is part of Xibo. +# +# Xibo is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# any later version. +# +# Xibo is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Xibo. If not, see . +# + # Write config.json echo '{' > /opt/xmr/config.json echo ' "sockets": {' >> /opt/xmr/config.json @@ -10,7 +31,9 @@ echo ' },' >> /opt/xmr/config.json echo ' "queuePoll": '$XMR_QUEUE_POLL',' >> /opt/xmr/config.json echo ' "queueSize": '$XMR_QUEUE_SIZE',' >> /opt/xmr/config.json echo ' "debug": '$XMR_DEBUG',' >> /opt/xmr/config.json -echo ' "ipv6PubSupport": '$XMR_IPV6PUBSUPPORT >> /opt/xmr/config.json +echo ' "ipv6PubSupport": '$XMR_IPV6PUBSUPPORT',' >> /opt/xmr/config.json +echo ' "relayOldMessages": '$XMR_RELAY_OLD_MESSAGES',' >> /opt/xmr/config.json +echo ' "relayMessages": '$XMR_RELAY_MESSAGES >> /opt/xmr/config.json echo '}' >> /opt/xmr/config.json /usr/local/bin/php /opt/xmr/index.php \ No newline at end of file diff --git a/index.php b/index.php index df99b4c..47a81a9 100644 --- a/index.php +++ b/index.php @@ -29,6 +29,7 @@ use React\EventLoop\Loop; use React\Http\Message\Response; use Xibo\Controller\Api; +use Xibo\Controller\Relay; use Xibo\Controller\Server; use Xibo\Entity\Queue; @@ -68,8 +69,15 @@ $log->pushHandler(new StreamHandler(STDOUT, $logLevel)); // Queue settings -$queuePoll = (property_exists($config, 'queuePoll')) ? $config->queuePoll : 5; -$queueSize = (property_exists($config, 'queueSize')) ? $config->queueSize : 10; +$queuePoll = $config->queuePoll ?? 5; +$queueSize = $config->queueSize ?? 10; + +// Create a client to relay messages +$relay = new Relay( + $log, + $config->relayMessages ?? '', + $config->relayOldMessages ?? '', +); // Create an in memory message queue. $messageQueue = new Queue(); @@ -77,37 +85,10 @@ try { $loop = Loop::get(); - // Web Socket server - $messagingServer = new Server($messageQueue, $log); - $wsSocket = new React\Socket\SocketServer($config->sockets->ws); - $wsServer = new WsServer($messagingServer); - $ioServer = new IoServer( - new HttpServer($wsServer), - $wsSocket, - $loop - ); - - // Enable keep alive - $wsServer->enableKeepAlive($ioServer->loop); - - $log->info('WS listening on ' . $config->sockets->ws); - - // LEGACY: Pub socket for messages to Players (subs) - $publisher = (new React\ZMQ\Context($loop))->getSocket(ZMQ::SOCKET_PUB); - - // Set PUB socket options - if (isset($config->ipv6PubSupport) && $config->ipv6PubSupport === true) { - $log->debug('Pub MQ Setting socket option for IPv6 to TRUE'); - $publisher->setSockOpt(\ZMQ::SOCKOPT_IPV6, true); - } - - foreach ($config->sockets->zmq as $pubOn) { - $log->info(sprintf('Bind to %s for Publish.', $pubOn)); - $publisher->bind($pubOn); - } - + // Private API + // ----------- // Create a private API to receive messages from the CMS - $api = new Api($messageQueue, $log); + $api = new Api($messageQueue, $log, $relay); // Create a HTTP server to handle requests to the API $http = new React\Http\HttpServer(function (Psr\Http\Message\ServerRequestInterface $request) use ($log, $api) { @@ -139,9 +120,52 @@ $log->info('HTTP listening'); + // WS + // ---- + // Web Socket server + $messagingServer = new Server($messageQueue, $log); + $wsSocket = new React\Socket\SocketServer($config->sockets->ws); + $wsServer = new WsServer($messagingServer); + $ioServer = new IoServer( + new HttpServer($wsServer), + $wsSocket, + $loop + ); + + // Enable keep alive + $wsServer->enableKeepAlive($ioServer->loop); + + $log->info('WS listening on ' . $config->sockets->ws); + + // PUB/SUB + // ------- + // LEGACY: Pub socket for messages to Players (subs) + if ($relay->isRelayOld()) { + $log->info('Legacy: relaying old messages'); + + $publisher = null; + $relay->configureZmq(); + } else { + $log->info('Legacy: handling old messages'); + + $publisher = (new React\ZMQ\Context($loop))->getSocket(ZMQ::SOCKET_PUB); + + // Set PUB socket options + if (isset($config->ipv6PubSupport) && $config->ipv6PubSupport === true) { + $log->debug('Pub MQ Setting socket option for IPv6 to TRUE'); + $publisher->setSockOpt(\ZMQ::SOCKOPT_IPV6, true); + } + + foreach ($config->sockets->zmq as $pubOn) { + $log->info(sprintf('Bind to %s for Publish.', $pubOn)); + $publisher->bind($pubOn); + } + } + // Queue Processor + // --------------- $log->debug('Adding a queue processor for every ' . $queuePoll . ' seconds'); - $loop->addPeriodicTimer($queuePoll, function() use ($log, $messagingServer, $publisher, $messageQueue, $queueSize) { + $loop->addPeriodicTimer($queuePoll, function() use ($log, $messagingServer, $relay, $publisher, $messageQueue, $queueSize) { // Is there work to be done if ($messageQueue->hasItems()) { $log->debug('Queue Poll - work to be done.'); @@ -167,12 +191,20 @@ if ($msg->isWebSocket) { $display = $messagingServer->getDisplayById($msg->channel); if ($display === null) { - $log->info('Display ' . $msg->channel . ' not connected'); - continue; + if ($relay->isRelay()) { + $relay->relay($msg); + } else { + $log->info('Display ' . $msg->channel . ' not connected'); + } + } else { + $display->connection->send($msg->message); } - $display->connection->send($msg->message); - } else { + } else if ($relay->isRelayOld()) { + $relay->relay($msg); + } else if ($publisher !== null) { $publisher->sendmulti([$msg->channel, $msg->key, $msg->message], \ZMQ::MODE_DONTWAIT); + } else { + $log->error('No route to send'); } $log->debug('Popped ' . $i . ' from the queue, new queue size ' . $messageQueue->queueSize()); @@ -188,7 +220,7 @@ $messagingServer->heartbeat(); // Send to PUB queue - $publisher->sendmulti(["H", "", ""], \ZMQ::MODE_DONTWAIT); + $publisher?->sendmulti(["H", "", ""], \ZMQ::MODE_DONTWAIT); }); // Key management diff --git a/src/Controller/Api.php b/src/Controller/Api.php index 655e955..8aa591e 100644 --- a/src/Controller/Api.php +++ b/src/Controller/Api.php @@ -29,7 +29,8 @@ class Api { public function __construct( private readonly Queue $queue, - private readonly LoggerInterface $logger + private readonly LoggerInterface $logger, + private readonly Relay $relay, ) { } @@ -50,6 +51,11 @@ public function handleMessage(array $message): Response } else if ($type === 'keys') { // Register new keys for this CMS. $this->queue->addKey($message['id'], $message['key']); + + // Relay new keys. + if ($this->relay->isRelay()) { + $this->relay->relayArray($message); + } } else if ($type === 'multi') { $this->logger->debug('Queuing multiple messages'); foreach ($message['messages'] as $message) { diff --git a/src/Controller/Relay.php b/src/Controller/Relay.php new file mode 100644 index 0000000..59dbb58 --- /dev/null +++ b/src/Controller/Relay.php @@ -0,0 +1,128 @@ +. + */ + +namespace Xibo\Controller; + + +use GuzzleHttp\Client; +use GuzzleHttp\Exception\GuzzleException; +use Psr\Log\LoggerInterface; +use Xibo\Entity\Message; + +class Relay +{ + private readonly ?Client $client; + private ?\ZMQSocket $socket; + + public function __construct( + private readonly LoggerInterface $logger, + private readonly string $relayMessages, + private string $relayOldMessages, + ) { + // Create a client for us to use + if (!empty($this->relayMessages)) { + $this->client = new Client([ + 'base_uri' => $this->relayMessages, + ]); + } else { + $this->client = null; + } + } + + public function configureZmq(): void + { + // Create a socket for us to use. + try { + $this->socket = (new \ZMQContext())->getSocket(\ZMQ::SOCKET_REQ); + $this->socket->setSockOpt(\ZMQ::SOCKOPT_LINGER, 2000); + $this->socket->connect($this->relayOldMessages); + } catch (\Exception $exception) { + $this->socket = null; + $this->relayOldMessages = null; + + $this->logger->critical('Unable to connect to old message relay: ' + . $this->relayOldMessages . ', e = ' . $exception->getMessage()); + } + } + + public function isRelay(): bool + { + return !empty($this->relayMessages); + } + + public function isRelayOld(): bool + { + return !empty($this->relayOldMessages); + } + + /** + * Relay a message appropriately + * @param \Xibo\Entity\Message $message + * @return void + */ + public function relay(Message $message): void + { + if ($message->isWebSocket) { + $this->relayArray($message->jsonSerialize()); + } else { + try { + $this->socket->send(json_encode($message)); + } catch (\ZMQSocketException $socketException) { + $this->logger->error('relay: [' . $socketException->getCode() . '] ' . $socketException->getMessage()); + return; + } + + $retries = 15; + + do { + try { + $reply = $this->socket->recv(\ZMQ::MODE_DONTWAIT); + + if ($reply !== false) { + break; + } + } catch (\ZMQSocketException $socketException) { + $this->logger->error('relay: [' . $socketException->getCode() . '] ' . $socketException->getMessage()); + break; + } + + usleep(100000); + } while (--$retries); + } + } + + /** + * Relay array (only ever a message over private API) + * @param array $message + * @return void + */ + public function relayArray(array $message): void + { + try { + $this->client?->post('/', [ + 'json' => $message, + ]); + } catch (GuzzleException | \Exception $e) { + $this->logger->error('relayArray: Unable to relay, e = ' . $e->getMessage()); + } + } +} \ No newline at end of file diff --git a/src/Entity/Message.php b/src/Entity/Message.php index 722e8bd..8f7e26c 100644 --- a/src/Entity/Message.php +++ b/src/Entity/Message.php @@ -21,11 +21,22 @@ */ namespace Xibo\Entity; -class Message +class Message implements \JsonSerializable { public string $channel; public string $key; public string $message; public int $qos; public bool $isWebSocket; + + public function jsonSerialize(): array + { + return [ + 'channel' => $this->channel, + 'key' => $this->key, + 'message' => $this->message, + 'qos' => $this->qos, + 'isWebSocket' => $this->isWebSocket, + ]; + } }