diff --git a/examples/http2Client.php b/examples/http2Client.php index e9d3632..5100a33 100644 --- a/examples/http2Client.php +++ b/examples/http2Client.php @@ -10,11 +10,21 @@ require dirname(__DIR__) . '/vendor/autoload.php'; go(function(){ - $uri = new Uri('https://www.taobao.com/'); + $uri = new Uri('http://www.taobao.com/'); $client = new \Yurun\Util\YurunHttp\Http2\SwooleClient($uri->getHost(), Uri::getServerPort($uri), 'https' === $uri->getScheme()); $client->connect(); + // 接收服务端主动推送 + $client->setServerPushQueueLength(16); // 接收服务端推送的队列长度 + go(function() use($client){ + do { + $response = $client->recv(); + var_dump($response->body()); + } while($response->success); + }); + + // 客户端请求和响应获取 $httpRequest = new HttpRequest; $count = 10; diff --git a/src/YurunHttp/Attributes.php b/src/YurunHttp/Attributes.php index 71a8178..eaa5f88 100644 --- a/src/YurunHttp/Attributes.php +++ b/src/YurunHttp/Attributes.php @@ -163,6 +163,11 @@ abstract class Attributes */ const HTTP2_NOT_RECV = 'http2_not_recv'; + /** + * 启用 Http2 pipeline + */ + const HTTP2_PIPELINE = 'http2_pipeline'; + /** * 重定向计数 */ diff --git a/src/YurunHttp/Handler/Swoole.php b/src/YurunHttp/Handler/Swoole.php index 441fb03..306a3bc 100644 --- a/src/YurunHttp/Handler/Swoole.php +++ b/src/YurunHttp/Handler/Swoole.php @@ -149,6 +149,7 @@ public function buildRequest($request, $connection, &$http2Request) if($isHttp2) { $http2Request->headers = $headers; + $http2Request->pipeline = $request->getAttribute(Attributes::HTTP2_PIPELINE, false); } else { @@ -311,12 +312,7 @@ private function parseCookies(&$request, $connection, $http2Request) $cookies = $this->cookieManager->getRequestCookies($request->getUri()); if($http2Request) { - $cookie = []; - foreach($cookies as $name => $value) - { - $cookie[] = $name . '=' . urlencode($value); - } - $request = $request->withHeader('cookie', implode(',', $cookie)); + $http2Request->cookies = $cookies; } else { diff --git a/src/YurunHttp/Http2/IHttp2Client.php b/src/YurunHttp/Http2/IHttp2Client.php index e17327b..364c609 100644 --- a/src/YurunHttp/Http2/IHttp2Client.php +++ b/src/YurunHttp/Http2/IHttp2Client.php @@ -7,8 +7,9 @@ interface IHttp2Client * @param string $host * @param int $port * @param bool $ssl + * @param mixed $handler */ - public function __construct($host, $port, $ssl); + public function __construct($host, $port, $ssl, $handler = null); /** * 连接 @@ -36,9 +37,29 @@ public function close(); * 成功返回streamId,失败返回false * * @param \Yurun\Util\YurunHttp\Http\Request $request + * @param bool $pipeline 默认send方法在发送请求之后,会结束当前的Http2 Stream,启用PIPELINE后,底层会保持stream流,可以多次调用write方法,向服务器发送数据帧,请参考write方法。 + * @param bool $dropRecvResponse 丢弃接收到的响应数据 * @return int|bool */ - public function send($request); + public function send($request, $pipeline = false, $dropRecvResponse = false); + + /** + * 向一个流写入数据帧 + * + * @param int $streamId + * @param string $data + * @param boolean $end 是否关闭流 + * @return bool + */ + public function write($streamId, $data, $end = false); + + /** + * 关闭一个流 + * + * @param int $streamId + * @return bool + */ + public function end($streamId); /** * 接收数据 diff --git a/src/YurunHttp/Http2/SwooleClient.php b/src/YurunHttp/Http2/SwooleClient.php index 28cd9f7..3c2065f 100644 --- a/src/YurunHttp/Http2/SwooleClient.php +++ b/src/YurunHttp/Http2/SwooleClient.php @@ -3,6 +3,7 @@ use Swoole\Coroutine; use Swoole\Coroutine\Channel; +use Yurun\Util\YurunHttp\Attributes; use Yurun\Util\YurunHttp\Http\Psr7\Uri; class SwooleClient implements IHttp2Client @@ -49,6 +50,13 @@ class SwooleClient implements IHttp2Client */ private $recvChannels = []; + /** + * 服务端推送数据队列长度 + * + * @var integer + */ + private $serverPushQueueLength = 16; + /** * @param string $host * @param int $port @@ -121,9 +129,11 @@ public function close() * 成功返回streamId,失败返回false * * @param \Yurun\Util\YurunHttp\Http\Request $request + * @param bool $pipeline 默认send方法在发送请求之后,会结束当前的Http2 Stream,启用PIPELINE后,底层会保持stream流,可以多次调用write方法,向服务器发送数据帧,请参考write方法。 + * @param bool $dropRecvResponse 丢弃接收到的响应数据 * @return int|bool */ - public function send($request) + public function send($request, $pipeline = false, $dropRecvResponse = false) { if('2.0' !== $request->getProtocolVersion()) { @@ -134,13 +144,42 @@ public function send($request) { throw new \RuntimeException(sprintf('Current http2 connection instance just support %s://%s:%s, does not support %s', $this->ssl ? 'https' : 'http', $this->host, $this->port, $uri->__toString())); } + $request = $request->withAttribute(Attributes::HTTP2_PIPELINE, $pipeline); $this->handler->buildRequest($request, $this->http2Client, $http2Request); - $result = $this->http2Client->send($http2Request); - if(!$result) + $streamId = $this->http2Client->send($http2Request); + if(!$streamId) { $this->close(); } - return $result; + if(!$dropRecvResponse) + { + $this->recvChannels[$streamId] = new Channel(1); + } + return $streamId; + } + + /** + * 向一个流写入数据帧 + * + * @param int $streamId + * @param string $data + * @param boolean $end 是否关闭流 + * @return bool + */ + public function write($streamId, $data, $end = false) + { + return $this->http2Client->write($streamId, $data, $end); + } + + /** + * 关闭一个流 + * + * @param int $streamId + * @return bool + */ + public function end($streamId) + { + return $this->http2Client->write($streamId, '', true); } /** @@ -154,12 +193,18 @@ public function recv($streamId = -1, $timeout = null) { if(isset($this->recvChannels[$streamId])) { - throw new \RuntimeException(sprintf('Cannot listen to stream #%s repeatedly', $streamId)); + $channel = $this->recvChannels[$streamId]; + } + else + { + $this->recvChannels[$streamId] = $channel = new Channel(-1 === $streamId ? $this->serverPushQueueLength : 1); } - $this->recvChannels[$streamId] = $channel = new Channel(1); $swooleResponse = $channel->pop($timeout); - unset($this->recvChannels[$streamId]); - $channel->close(); + if(-1 !== $streamId) + { + unset($this->recvChannels[$streamId]); + $channel->close(); + } $response = $this->handler->buildHttp2Response($swooleResponse); return $response; } @@ -196,7 +241,7 @@ private function startRecvCo() return; } $streamId = $swooleResponse->streamId; - if(isset($this->recvChannels[$streamId]) || isset($this->recvChannels[$streamId = -1])) + if(isset($this->recvChannels[$streamId]) || (0 === $streamId % 2 && isset($this->recvChannels[$streamId = -1]))) { $this->recvChannels[$streamId]->push($swooleResponse); } @@ -244,4 +289,28 @@ public function getRecvingCount() return count($this->recvChannels); } + /** + * Get 服务端推送数据队列长度 + * + * @return integer + */ + public function getServerPushQueueLength() + { + return $this->serverPushQueueLength; + } + + /** + * Set 服务端推送数据队列长度 + * + * @param integer $serverPushQueueLength 服务端推送数据队列长度 + * + * @return self + */ + public function setServerPushQueueLength($serverPushQueueLength) + { + $this->serverPushQueueLength = $serverPushQueueLength; + + return $this; + } + } diff --git a/tests/unit/Http2/SwooleHttp2Test.php b/tests/unit/Http2/SwooleHttp2Test.php index eccab3d..588234a 100644 --- a/tests/unit/Http2/SwooleHttp2Test.php +++ b/tests/unit/Http2/SwooleHttp2Test.php @@ -1,6 +1,7 @@ call(function(){ $uri = new Uri($this->http2Host); $client = new SwooleClient($uri->getHost(), Uri::getServerPort($uri), 'https' === $uri->getScheme()); + go(function() use($client){ + $result = $client->recv(); + $this->assertFalse($result->success); + }); $this->assertTrue($client->connect()); @@ -57,6 +62,8 @@ public function testMuiltCo() $streamId = $client->send($request); $this->assertGreaterThan(0, $streamId); + + Coroutine::sleep(1); $response = $client->recv($streamId, 3); $data = $response->json(true); @@ -91,4 +98,75 @@ public function testMuiltCo() }); } + public function testPipeline1() + { + $this->call(function(){ + $uri = new Uri($this->http2Host); + $client = new SwooleClient($uri->getHost(), Uri::getServerPort($uri), 'https' === $uri->getScheme()); + + $this->assertTrue($client->connect()); + + $http = new HttpRequest; + $http->protocolVersion = '2.0'; + $http->timeout = 3000; + + $date = strtotime('2017-03-24 17:12:14'); + $data = json_encode([ + 'date' => $date, + ]); + + $request = $http->buildRequest($this->http2Host, substr($data, 0, 2)); + $streamId = $client->send($request, true); + $this->assertGreaterThan(0, $streamId); + $this->assertTrue($client->write($streamId, substr($data, 2), true)); + // $this->assertTrue($client->end($streamId)); + + $response = $client->recv($streamId); + $data = $response->json(true); + + $this->assertEquals($date, isset($data['date']) ? $data['date'] : null); + $this->assertGreaterThan(1, isset($data['fd']) ? $data['fd'] : null); + $this->assertEquals('yurun', $response->getHeaderLine('trailer')); + // Swoole 4.4.12 BUG,暂时无法获取 + // $this->assertEquals('niubi', $response->getHeaderLine('yurun')); + $client->close(); + }); + } + + public function testPipeline2() + { + $this->call(function(){ + $uri = new Uri($this->http2Host); + $client = new SwooleClient($uri->getHost(), Uri::getServerPort($uri), 'https' === $uri->getScheme()); + + $this->assertTrue($client->connect()); + + $http = new HttpRequest; + $http->protocolVersion = '2.0'; + $http->timeout = 3000; + + $date = strtotime('2017-03-24 17:12:14'); + $data = json_encode([ + 'date' => $date, + ]); + + $request = $http->buildRequest($this->http2Host, substr($data, 0, 2)); + $streamId = $client->send($request, true); + $this->assertGreaterThan(0, $streamId); + $this->assertTrue($client->write($streamId, substr($data, 2))); + $this->assertTrue($client->end($streamId)); + + $response = $client->recv($streamId); + $data = $response->json(true); + + $this->assertEquals($date, isset($data['date']) ? $data['date'] : null); + $this->assertGreaterThan(1, isset($data['fd']) ? $data['fd'] : null); + $this->assertEquals('yurun', $response->getHeaderLine('trailer')); + // Swoole 4.4.12 BUG,暂时无法获取 + // $this->assertEquals('niubi', $response->getHeaderLine('yurun')); + + $client->close(); + }); + } + } \ No newline at end of file