Skip to content

Commit

Permalink
修复接收服务端主动推送数据时,可能会接收到客户端创建流的响应问题
Browse files Browse the repository at this point in the history
  • Loading branch information
Yurunsoft committed Dec 2, 2019
1 parent ea2d246 commit 1cf00a0
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 12 deletions.
12 changes: 11 additions & 1 deletion examples/http2Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,21 @@
require dirname(__DIR__) . '/vendor/autoload.php';

go(function(){
$uri = new Uri('https://www.taobao.com/');
$uri = new Uri('http://www.taobao.com/');

$client = new \Yurun\Util\YurunHttp\Http2\SwooleClient($uri->getHost(), Uri::getServerPort($uri), 'https' === $uri->getScheme());
$client->connect();

// 接收服务端主动推送
$client->setServerPushQueueLength(16); // 接收服务端推送的队列长度
go(function() use($client){
do {
$response = $client->recv();
var_dump($response->body());
} while($response->success);
});

// 客户端请求和响应获取
$httpRequest = new HttpRequest;

$count = 10;
Expand Down
6 changes: 4 additions & 2 deletions src/YurunHttp/Http2/IHttp2Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ interface IHttp2Client
* @param string $host
* @param int $port
* @param bool $ssl
* @param mixed $handler
*/
public function __construct($host, $port, $ssl);
public function __construct($host, $port, $ssl, $handler = null);

/**
* 连接
Expand Down Expand Up @@ -36,9 +37,10 @@ public function close();
* 成功返回streamId,失败返回false
*
* @param \Yurun\Util\YurunHttp\Http\Request $request
* @param bool $dropRecvResponse
* @return int|bool
*/
public function send($request);
public function send($request, $dropRecvResponse = false);

/**
* 接收数据
Expand Down
60 changes: 51 additions & 9 deletions src/YurunHttp/Http2/SwooleClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ class SwooleClient implements IHttp2Client
*/
private $recvChannels = [];

/**
* 服务端推送数据队列长度
*
* @var integer
*/
private $serverPushQueueLength = 16;

/**
* @param string $host
* @param int $port
Expand Down Expand Up @@ -121,9 +128,10 @@ public function close()
* 成功返回streamId,失败返回false
*
* @param \Yurun\Util\YurunHttp\Http\Request $request
* @param bool $dropRecvResponse 丢弃接收到的响应数据
* @return int|bool
*/
public function send($request)
public function send($request, $dropRecvResponse = false)
{
if('2.0' !== $request->getProtocolVersion())
{
Expand All @@ -135,12 +143,16 @@ public function send($request)
throw new \RuntimeException(sprintf('Current http2 connection instance just support %s://%s:%s, does not support %s', $this->ssl ? 'https' : 'http', $this->host, $this->port, $uri->__toString()));
}
$this->handler->buildRequest($request, $this->http2Client, $http2Request);
$result = $this->http2Client->send($http2Request);
if(!$result)
$streamId = $this->http2Client->send($http2Request);
if(!$streamId)
{
$this->close();
}
return $result;
if(!$dropRecvResponse)
{
$this->recvChannels[$streamId] = new Channel(1);
}
return $streamId;
}

/**
Expand All @@ -154,12 +166,18 @@ public function recv($streamId = -1, $timeout = null)
{
if(isset($this->recvChannels[$streamId]))
{
throw new \RuntimeException(sprintf('Cannot listen to stream #%s repeatedly', $streamId));
$channel = $this->recvChannels[$streamId];
}
else
{
$this->recvChannels[$streamId] = $channel = new Channel(-1 === $streamId ? $this->serverPushQueueLength : 1);
}
$this->recvChannels[$streamId] = $channel = new Channel(1);
$swooleResponse = $channel->pop($timeout);
unset($this->recvChannels[$streamId]);
$channel->close();
if(-1 !== $streamId)
{
unset($this->recvChannels[$streamId]);
$channel->close();
}
$response = $this->handler->buildHttp2Response($swooleResponse);
return $response;
}
Expand Down Expand Up @@ -196,7 +214,7 @@ private function startRecvCo()
return;
}
$streamId = $swooleResponse->streamId;
if(isset($this->recvChannels[$streamId]) || isset($this->recvChannels[$streamId = -1]))
if(isset($this->recvChannels[$streamId]) || (0 === $streamId % 2 && isset($this->recvChannels[$streamId = -1])))
{
$this->recvChannels[$streamId]->push($swooleResponse);
}
Expand Down Expand Up @@ -244,4 +262,28 @@ public function getRecvingCount()
return count($this->recvChannels);
}

/**
* Get 服务端推送数据队列长度
*
* @return integer
*/
public function getServerPushQueueLength()
{
return $this->serverPushQueueLength;
}

/**
* Set 服务端推送数据队列长度
*
* @param integer $serverPushQueueLength 服务端推送数据队列长度
*
* @return self
*/
public function setServerPushQueueLength($serverPushQueueLength)
{
$this->serverPushQueueLength = $serverPushQueueLength;

return $this;
}

}
7 changes: 7 additions & 0 deletions tests/unit/Http2/SwooleHttp2Test.php
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
<?php
namespace Yurun\Util\YurunHttp\Test\Http2;

use Swoole\Coroutine;
use Yurun\Util\HttpRequest;
use Swoole\Coroutine\Channel;
use Yurun\Util\YurunHttp\Http2\SwooleClient;
Expand Down Expand Up @@ -46,6 +47,10 @@ public function testMuiltCo()
$this->call(function(){
$uri = new Uri($this->http2Host);
$client = new SwooleClient($uri->getHost(), Uri::getServerPort($uri), 'https' === $uri->getScheme());
go(function() use($client){
$result = $client->recv();
$this->assertFalse($result->success);
});

$this->assertTrue($client->connect());

Expand All @@ -57,6 +62,8 @@ public function testMuiltCo()

$streamId = $client->send($request);
$this->assertGreaterThan(0, $streamId);

Coroutine::sleep(1);

$response = $client->recv($streamId, 3);
$data = $response->json(true);
Expand Down

0 comments on commit 1cf00a0

Please sign in to comment.