diff --git a/examples/http2Client.php b/examples/http2Client.php index e9d3632..5100a33 100644 --- a/examples/http2Client.php +++ b/examples/http2Client.php @@ -10,11 +10,21 @@ require dirname(__DIR__) . '/vendor/autoload.php'; go(function(){ - $uri = new Uri('https://www.taobao.com/'); + $uri = new Uri('http://www.taobao.com/'); $client = new \Yurun\Util\YurunHttp\Http2\SwooleClient($uri->getHost(), Uri::getServerPort($uri), 'https' === $uri->getScheme()); $client->connect(); + // 接收服务端主动推送 + $client->setServerPushQueueLength(16); // 接收服务端推送的队列长度 + go(function() use($client){ + do { + $response = $client->recv(); + var_dump($response->body()); + } while($response->success); + }); + + // 客户端请求和响应获取 $httpRequest = new HttpRequest; $count = 10; diff --git a/src/YurunHttp/Http2/IHttp2Client.php b/src/YurunHttp/Http2/IHttp2Client.php index e17327b..38f81ff 100644 --- a/src/YurunHttp/Http2/IHttp2Client.php +++ b/src/YurunHttp/Http2/IHttp2Client.php @@ -7,8 +7,9 @@ interface IHttp2Client * @param string $host * @param int $port * @param bool $ssl + * @param mixed $handler */ - public function __construct($host, $port, $ssl); + public function __construct($host, $port, $ssl, $handler = null); /** * 连接 @@ -36,9 +37,10 @@ public function close(); * 成功返回streamId,失败返回false * * @param \Yurun\Util\YurunHttp\Http\Request $request + * @param bool $dropRecvResponse * @return int|bool */ - public function send($request); + public function send($request, $dropRecvResponse = false); /** * 接收数据 diff --git a/src/YurunHttp/Http2/SwooleClient.php b/src/YurunHttp/Http2/SwooleClient.php index 28cd9f7..2d71159 100644 --- a/src/YurunHttp/Http2/SwooleClient.php +++ b/src/YurunHttp/Http2/SwooleClient.php @@ -49,6 +49,13 @@ class SwooleClient implements IHttp2Client */ private $recvChannels = []; + /** + * 服务端推送数据队列长度 + * + * @var integer + */ + private $serverPushQueueLength = 16; + /** * @param string $host * @param int $port @@ -121,9 +128,10 @@ public function close() * 成功返回streamId,失败返回false * * @param \Yurun\Util\YurunHttp\Http\Request $request + * @param bool $dropRecvResponse 丢弃接收到的响应数据 * @return int|bool */ - public function send($request) + public function send($request, $dropRecvResponse = false) { if('2.0' !== $request->getProtocolVersion()) { @@ -135,12 +143,16 @@ public function send($request) throw new \RuntimeException(sprintf('Current http2 connection instance just support %s://%s:%s, does not support %s', $this->ssl ? 'https' : 'http', $this->host, $this->port, $uri->__toString())); } $this->handler->buildRequest($request, $this->http2Client, $http2Request); - $result = $this->http2Client->send($http2Request); - if(!$result) + $streamId = $this->http2Client->send($http2Request); + if(!$streamId) { $this->close(); } - return $result; + if(!$dropRecvResponse) + { + $this->recvChannels[$streamId] = new Channel(1); + } + return $streamId; } /** @@ -154,12 +166,18 @@ public function recv($streamId = -1, $timeout = null) { if(isset($this->recvChannels[$streamId])) { - throw new \RuntimeException(sprintf('Cannot listen to stream #%s repeatedly', $streamId)); + $channel = $this->recvChannels[$streamId]; + } + else + { + $this->recvChannels[$streamId] = $channel = new Channel(-1 === $streamId ? $this->serverPushQueueLength : 1); } - $this->recvChannels[$streamId] = $channel = new Channel(1); $swooleResponse = $channel->pop($timeout); - unset($this->recvChannels[$streamId]); - $channel->close(); + if(-1 !== $streamId) + { + unset($this->recvChannels[$streamId]); + $channel->close(); + } $response = $this->handler->buildHttp2Response($swooleResponse); return $response; } @@ -196,7 +214,7 @@ private function startRecvCo() return; } $streamId = $swooleResponse->streamId; - if(isset($this->recvChannels[$streamId]) || isset($this->recvChannels[$streamId = -1])) + if(isset($this->recvChannels[$streamId]) || (0 === $streamId % 2 && isset($this->recvChannels[$streamId = -1]))) { $this->recvChannels[$streamId]->push($swooleResponse); } @@ -244,4 +262,28 @@ public function getRecvingCount() return count($this->recvChannels); } + /** + * Get 服务端推送数据队列长度 + * + * @return integer + */ + public function getServerPushQueueLength() + { + return $this->serverPushQueueLength; + } + + /** + * Set 服务端推送数据队列长度 + * + * @param integer $serverPushQueueLength 服务端推送数据队列长度 + * + * @return self + */ + public function setServerPushQueueLength($serverPushQueueLength) + { + $this->serverPushQueueLength = $serverPushQueueLength; + + return $this; + } + } diff --git a/tests/unit/Http2/SwooleHttp2Test.php b/tests/unit/Http2/SwooleHttp2Test.php index eccab3d..133d131 100644 --- a/tests/unit/Http2/SwooleHttp2Test.php +++ b/tests/unit/Http2/SwooleHttp2Test.php @@ -1,6 +1,7 @@ call(function(){ $uri = new Uri($this->http2Host); $client = new SwooleClient($uri->getHost(), Uri::getServerPort($uri), 'https' === $uri->getScheme()); + go(function() use($client){ + $result = $client->recv(); + $this->assertFalse($result->success); + }); $this->assertTrue($client->connect()); @@ -57,6 +62,8 @@ public function testMuiltCo() $streamId = $client->send($request); $this->assertGreaterThan(0, $streamId); + + Coroutine::sleep(1); $response = $client->recv($streamId, 3); $data = $response->json(true);