Skip to content
This repository has been archived by the owner on Sep 7, 2019. It is now read-only.

Codestyle fixes and inject dns server address #79

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions src/Devristo/Phpws/Client/Connector.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,31 @@
use React\SocketClient\Connector as BaseConnector;
use React\EventLoop\LoopInterface;
use React\Dns\Resolver\Resolver;
use React\Promise\When;
use React\Promise;

class Connector extends BaseConnector
{
protected $contextOptions = array();
protected $contextOptions = [];

/**
* @param LoopInterface $loop
* @param Resolver $resolver
* @param array|null $contextOptions
*/
public function __construct(LoopInterface $loop, Resolver $resolver, array $contextOptions = null)
{
parent::__construct($loop, $resolver);

$contextOptions = null === $contextOptions ? array() : $contextOptions;
$contextOptions = null === $contextOptions ? [] : $contextOptions;
$this->contextOptions = $contextOptions;
}

/**
* @param $address
* @param $port
* @param null $hostName
* @return \React\Promise\PromiseInterface|static
*/
public function createSocketForAddress($address, $port, $hostName = null)
{
$url = $this->getSocketUrl($address, $port);
Expand All @@ -36,7 +47,7 @@ public function createSocketForAddress($address, $port, $hostName = null)
$socket = stream_socket_client($url, $errno, $errstr, 0, $flags, $context);

if (!$socket) {
return When::reject(new \RuntimeException(
return Promise\reject(new \RuntimeException(
sprintf("connection to %s:%d failed: %s", $address, $port, $errstr),
$errno
));
Expand All @@ -48,7 +59,7 @@ public function createSocketForAddress($address, $port, $hostName = null)

return $this
->waitForStreamOnce($socket)
->then(array($this, 'checkConnectedSocket'))
->then(array($this, 'handleConnectedSocket'));
->then([$this, 'checkConnectedSocket'])
->then([$this, 'handleConnectedSocket']);
}
}
88 changes: 63 additions & 25 deletions src/Devristo/Phpws/Client/WebSocket.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
use Devristo\Phpws\Protocol\WebSocketConnection;
use Devristo\Phpws\Reflection\FullAccessWrapper;
use Evenement\EventEmitter;
use React\Dns\Resolver\Resolver;
use React\EventLoop\LoopInterface;
use React\Promise\Deferred;
use Zend\Http\Request;
Expand Down Expand Up @@ -44,34 +45,55 @@ class WebSocket extends EventEmitter
/**
* @var WebSocketTransport
*/
protected $transport = null;
protected $transport;

protected $headers;
protected $loop;

protected $logger;

protected $isClosing = false;

protected $streamOptions = null;

public function __construct($url, LoopInterface $loop, LoggerInterface $logger, array $streamOptions = null)
{
protected $streamOptions;

/** @var \React\Dns\Resolver\Resolver */
protected $dns;

/**
* @param string $url
* @param LoopInterface $loop
* @param LoggerInterface $logger
* @param array|null $streamOptions
* @param string $dns
* @throws WebSocketInvalidUrlScheme
*/
public function __construct(
$url,
LoopInterface $loop,
LoggerInterface $logger,
array $streamOptions = null,
$dns = '8.8.8.8'
) {
$this->logger = $logger;
$this->loop = $loop;
$this->streamOptions = $streamOptions;
$parts = parse_url($url);

$this->url = $url;

if (in_array($parts['scheme'], array('ws', 'wss')) === false)
if (in_array($parts['scheme'], ['ws', 'wss']) === false) {
throw new WebSocketInvalidUrlScheme();
}

$dnsResolverFactory = new \React\Dns\Resolver\Factory();
$this->dns = $dnsResolverFactory->createCached('8.8.8.8', $loop);
$this->dns = $dnsResolverFactory->createCached($dns, $loop);
}

public function open($timeOut=null)
/**
* @param null $timeOut
* @return \React\Promise\Promise|\React\Promise\PromiseInterface
*/
public function open($timeOut = null)
{
/**
* @var $that self
Expand All @@ -80,7 +102,7 @@ public function open($timeOut=null)

$uri = new Uri($this->url);

$isSecured = 'wss' === $uri->getScheme();
$isSecured = 'wss' === $uri->getScheme();
$defaultPort = $isSecured ? 443 : 80;

$connector = new Connector($this->loop, $this->dns, $this->streamOptions);
Expand All @@ -92,42 +114,45 @@ public function open($timeOut=null)
$deferred = new Deferred();

$connector->create($uri->getHost(), $uri->getPort() ?: $defaultPort)
->then(function (\React\Stream\DuplexStreamInterface $stream) use ($that, $uri, $deferred, $timeOut){
->then(function (\React\Stream\DuplexStreamInterface $stream) use ($that, $uri, $deferred, $timeOut) {

if($timeOut){
$timeOutTimer = $that->loop->addTimer($timeOut, function() use($deferred, $stream, $that){
if ($timeOut) {
$timeOutTimer = $that->loop->addTimer($timeOut, function () use ($deferred, $stream, $that) {
$stream->close();
$that->logger->notice("Timeout occured, closing connection");
$that->emit("error");
$deferred->reject("Timeout occured");
});
} else $timeOutTimer = null;
} else {
$timeOutTimer = null;
}

$transport = new WebSocketTransportHybi($stream);
$transport->setLogger($that->logger);
$that->transport = $transport;
$that->stream = $stream;

$stream->on("close", function() use($that){
$stream->on("close", function () use ($that) {
$that->isClosing = false;
$that->state = WebSocket::STATE_CLOSED;
});

// Give the chance to change request
$transport->on("request", function(Request $handshake) use($that){
$transport->on("request", function (Request $handshake) use ($that) {
$that->emit("request", func_get_args());
});

$transport->on("handshake", function(Handshake $handshake) use($that){
$transport->on("handshake", function (Handshake $handshake) use ($that) {
$that->request = $handshake->getRequest();
$that->response = $handshake->getRequest();

$that->emit("handshake", array($handshake));
$that->emit("handshake", [$handshake]);
});

$transport->on("connect", function() use(&$state, $that, $transport, $timeOutTimer, $deferred){
if($timeOutTimer)
$transport->on("connect", function () use (&$state, $that, $transport, $timeOutTimer, $deferred) {
if ($timeOutTimer) {
$timeOutTimer->cancel();
}

$deferred->resolve($transport);
$that->state = WebSocket::STATE_CONNECTED;
Expand All @@ -136,13 +161,12 @@ public function open($timeOut=null)
});

$transport->on('message', function ($message) use ($that, $transport) {
$that->emit("message", array("message" => $message));
$that->emit("message", ["message" => $message]);
});

$transport->initiateHandshake($uri);
$that->state = WebSocket::STATE_HANDSHAKE_SENT;
}, function($reason) use ($that, $deferred)
{
}, function ($reason) use ($that, $deferred) {
$deferred->reject($reason);
$that->logger->err($reason);
});
Expand All @@ -151,28 +175,41 @@ public function open($timeOut=null)

}

/**
* @param string $string
*/
public function send($string)
{
$this->transport->sendString($string);
}

/**
* @param WebSocketMessageInterface $msg
*/
public function sendMessage(WebSocketMessageInterface $msg)
{
$this->transport->sendMessage($msg);
}

/**
* @param WebSocketFrameInterface $frame
*/
public function sendFrame(WebSocketFrameInterface $frame)
{
$this->transport->sendFrame($frame);
}

/**
* @return void
*/
public function close()
{
if ($this->isClosing)
if ($this->isClosing) {
return;
}

$this->isClosing = true;
$this->sendFrame(WebSocketFrame::create(WebSocketOpcode::CloseFrame));
$this->sendFrame(WebSocketFrame::create(WebSocketOpcode::CLOSE_FRAME));

$this->state = self::STATE_CLOSING;
$stream = $this->stream;
Expand All @@ -183,8 +220,9 @@ public function close()

$loop = $this->loop;
$stream->once("close", function () use ($closeTimer, $loop) {
if ($closeTimer)
if ($closeTimer) {
$loop->cancelTimer($closeTimer);
}
});
}
}
7 changes: 4 additions & 3 deletions src/Devristo/Phpws/Exceptions/WebSocketFrameSizeMismatch.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@

class WebSocketFrameSizeMismatch extends Exception
{

/**
* @param WebSocketFrameInterface $msg
*/
public function __construct(WebSocketFrameInterface $msg)
{
parent::__construct("Frame size mismatches with the expected frame size. Maybe a buggy client.");
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@

class WebSocketInvalidChallengeResponse extends Exception
{

public function __construct()
{
parent::__construct("Server send an incorrect response to the clients challenge!");
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

class WebSocketInvalidKeyException extends Exception
{

/**
* @param string $key1
* @param string $key2
Expand All @@ -17,5 +16,4 @@ public function __construct($key1, $key2, $l8b)
parent::__construct("Client sent an invalid opening handshake!");
fwrite(STDERR, "Key 1: \t$key1\nKey 2: \t$key2\nL8b: \t$l8b");
}

}
}
4 changes: 1 addition & 3 deletions src/Devristo/Phpws/Exceptions/WebSocketInvalidUrlScheme.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@

class WebSocketInvalidUrlScheme extends Exception
{

public function __construct()
{
parent::__construct("Only 'ws://' urls are supported!");
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,8 @@

class WebSocketMessageNotFinalised extends Exception
{

public function __construct(WebSocketMessageInterface $msg)
{
parent::__construct("WebSocketMessage is not finalised!");
}

}
}
Loading