Skip to content

Commit

Permalink
Event loop improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
adamziel committed Jul 14, 2024
1 parent 248b4ef commit ef41a68
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 209 deletions.
119 changes: 7 additions & 112 deletions http_api.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,125 +11,20 @@
require_once __DIR__ . '/src/WordPress/AsyncHttp/InflateStreamWrapperData.php';

$requests = [
// new Request("https://playground.internal"),
// (new Request("https://anglesharp.azurewebsites.net/Chunked"))->set_http_version('1.1'),
// (new Request("https://anglesharp.azurewebsites.net/Chunked"))->set_http_version('1.0'),
(new Request("http://127.0.0.1:3000/")) //->set_http_version('1.0'),
new Request("https://playground.internal"),
(new Request("https://anglesharp.azurewebsites.net/Chunked"))->set_http_version('1.1'),
(new Request("https://anglesharp.azurewebsites.net/Chunked"))->set_http_version('1.0'),
(new Request("http://127.0.0.1:3000/"))->set_http_version('1.0'),
];
// list($streams, $headers, $errors) = streams_send_http_requests($requests);
// print_r($streams);
// print_r($errors);

// var_dump(streams_http_response_read_bytes($streams, 1024));
// Enqueuing another request here is instant and won't start the download yet.
$client = new Client();
$queue = $client->enqueue( $requests );
var_dump($client->read_bytes($requests[0], 10, Client::READ_POLL_ANY));
var_dump($client->read_bytes($requests[0], 1024, Client::READ_NON_BLOCKING));
var_dump($client->read_bytes($requests[0], 1024, Client::READ_POLL_ANY));
// var_dump($client->read_bytes($requests[0], 1024));

die();

// @TODO: handle wait_for_all_requested_bytes for more than content-length bytes
var_dump(stream_get_contents($requests[1]->get_response()->body_stream));
// var_dump($client->read_bytes($requests[1], 359, [
// 'mode' => 'poll_once',
// ]));
// var_dump($client->read_bytes($requests[1], 359, [
// 'mode' => 'poll_once',
// ]));
// var_dump($client->read_bytes($requests[1], 359, [
// 'mode' => 'poll_once',
// ]));
// @TODO: poll_once should eventully mark the request as finished
var_dump("----");
var_dump($client->read_bytes($requests[2], 1024, [
'mode' => 'return',
]));
var_dump($client->read_bytes($requests[2], 1024, [
'mode' => 'poll_once',
]));
// var_dump($queue);
// var_dump($queue[0]);
// var_dump($client->read_bytes($requests[0], 1024, [
// 'mode' => 'return',
// ]));
// var_dump(fread($queue[0]->get_body_stream(), 1));
// var_dump(fread($queue[0]->get_body_stream(), 1));
// var_dump(fread($queue[0]->get_body_stream(), 1));
die();
// var_dump($queue[0]);
var_dump($client->read_bytes($requests[0], 186, [
'mode' => 'return',
]));
var_dump($client->read_bytes($requests[0], 186, [
'mode' => 'return',
]));
// var_dump($queue[0]->get_status_code());
// var_dump($queue[0]->get_headers());

// var_dump(stream_get_contents($queue[0]->response->body_stream));
die();
$client = new Client();
$client->set_progress_callback( function ( Request $request, $downloaded, $total ) {
echo "$request->url – Downloaded: $downloaded / $total\n";
// echo "$request->url – Downloaded: $downloaded / $total\n";
} );

$requests = [
new Request("https://anglesharp.azurewebsites.net/Chunked")
// new Request( "https://downloads.wordpress.org/plugin/gutenberg.17.7.0.zip" ),
// new Request( "https://downloads.wordpress.org/theme/pendant.zip" ),
];
$queue = $client->enqueue( $requests );
var_dump($queue[0]);
die();
// Enqueuing another request here is instant and won't start the download yet.
//$streams2 = $client->enqueue( [
// new Request( "https://downloads.wordpress.org/plugin/hello-dolly.1.7.3.zip" ),
//] );

try {
$client->read_bytes($requests[0], 4096);
// var_dump(stream_get_contents($streams1[0]));
} catch (Exception $e) {
echo $e->getMessage();
}
print_r($client);
print_r(stream_context_get_options($streams1[0]));
// Stream a single file, while streaming all the files
// file_put_contents( 'output-round1-0.zip', stream_get_contents( $streams1[0] ) );
//file_put_contents( 'output-round1-1.zip', stream_get_contents( $streams1[1] ) );
die();
// Initiate more HTTPS requests
$streams3 = $client->enqueue( [
new Request( "https://downloads.wordpress.org/plugin/akismet.4.1.12.zip" ),
new Request( "https://downloads.wordpress.org/plugin/hello-dolly.1.7.3.zip" ),
new Request( "https://downloads.wordpress.org/plugin/hello-dolly.1.7.3.zip" ),
] );

// Download the rest of the files. Foreach() seems like downloading things
// sequentially, but we're actually streaming all the files in parallel.
$streams = array_merge( $streams2, $streams3 );
foreach ( $streams as $k => $stream ) {
file_put_contents( 'output-round2-' . $k . '.zip', stream_get_contents( $stream ) );
}

echo "Done! :)";

// ----------------------------
//
// Previous explorations:

// Non-blocking parallel processing – the fastest method.
//while ( $results = sockets_http_response_read_bytes( $streams, 8096 ) ) {
// foreach ( $results as $k => $chunk ) {
// file_put_contents( 'output' . $k . '.zip', $chunk, FILE_APPEND );
// }
//}

// Blocking sequential processing – the slowest method.
//foreach ( $streams as $k => $stream ) {
// stream_set_blocking( $stream, 1 );
// file_put_contents( 'output' . $k . '.zip', stream_get_contents( $stream ) );
//}
$client->wait_for_headers($requests[3]);
var_dump($requests[3]->get_response()->get_headers());
145 changes: 87 additions & 58 deletions src/WordPress/AsyncHttp/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,13 @@
* **Supports custom request headers and body**
*/
class Client {
protected $concurrency = 2;
protected $requests;
protected $onProgress;
protected $is_processing_queue = false;

const STREAM_SELECT_READ = 1;
const STREAM_SELECT_WRITE = 2;

const READ_NON_BLOCKING = 'READ_NON_BLOCKING';
const READ_POLL_ANY = 'READ_POLL_ANY';
const READ_POLL_ALL = 'READ_POLL_ALL';

/**
* Microsecond is 1 millionth of a second.
Expand All @@ -104,6 +107,11 @@ class Client {
*/
const NONBLOCKING_TIMEOUT_MICROSECONDS = 0.05 * self::MICROSECONDS_TO_SECONDS;

protected $concurrency = 2;
protected $requests;
protected $onProgress;
protected $is_processing_queue = false;

public function __construct() {
$this->requests = [];
$this->onProgress = function () {
Expand Down Expand Up @@ -153,29 +161,6 @@ public function enqueue( $requests ) {
return $enqueued_streams;
}

/**
* Returns the response stream associated with the given Request object.
* Reading from that stream also runs this Client's event loop.
*
* @param Request $request
*
* @return resource
*/
public function get_stream( $request ) {
throw new Exception('Not implemented yet');
// if ( ! isset( $this->requests[ $request ] ) ) {
// $this->enqueue_request( $request );
// }

// if ( $this->queue_needs_processing ) {
// $this->process_queue();
// }

// StreamWrapper::create_resource(
// new StreamData($request, $client)
// )
}

/**
* @param \WordPress\AsyncHttp\Request $request
*/
Expand All @@ -184,10 +169,6 @@ protected function enqueue_request( $request ) {
return $request->get_response();
}


const READ_NON_BLOCKING = 'READ_NON_BLOCKING';
const READ_POLL_ANY = 'READ_POLL_ANY';
const READ_POLL_ALL = 'READ_POLL_ALL';
/**
* Reads $length bytes from the given request while also running
* non-blocking event loop operations.
Expand Down Expand Up @@ -218,21 +199,74 @@ public function read_bytes( $request, $length, $mode = self::READ_NON_BLOCKING )
) {
break;
}
} while ($this->event_loop_pass());
} while ($this->event_loop_tick());

return $buffered;
}

public function event_loop_pass()
public function wait_for_headers( $request )
{
if(count($this->get_concurrent_requests()) === 0) {
if(!in_array($request, $this->requests, true)) {
trigger_error('Request not found in the client', E_USER_WARNING);
return false;
}
echo "event_loop_pass\n";
foreach($this->requests as $request) {
echo "request state: $request->state\n";

while($this->event_loop_tick() && $request->state !== Request::STATE_FAILED) {
if($request->get_response()->get_headers()) {
return true;
}
}
sleep(1);

return false;
}

public function wait_for_response_body_stream( $request )
{
if(!in_array($request, $this->requests, true)) {
trigger_error('Request not found in the client', E_USER_WARNING);
return false;
}

while($this->event_loop_tick() && $request->state !== Request::STATE_FAILED) {
if($request->get_response()->decoded_response_stream) {
return true;
}
}

return false;
}

/**
* Returns the response stream associated with the given Request object.
* Reading from that stream also runs this Client's event loop.
*
* @param Request $request
*
* @return resource|bool
*/
// public function get_response_stream( $request ) {
// if(!in_array($request, $this->requests, true)) {
// trigger_error('Request not found in the client', E_USER_WARNING);
// return false;
// }

// if(
// $request->state !== Request::STATE_RECEIVING_BODY &&
// $request->state !== Request::STATE_FINISHED
// ) {
// trigger_error('Request is not in a state where the response stream is available', E_USER_WARNING);
// return false;
// }

// return $request->get_response()->event_loop_decoded_response_stream;
// }

public function event_loop_tick()
{
if(count($this->get_concurrent_requests()) === 0) {
return false;
}

static::open_nonblocking_http_sockets(
$this->get_concurrent_requests( Request::STATE_ENQUEUED )
);
Expand Down Expand Up @@ -275,16 +309,14 @@ protected function get_concurrent_requests($states=null)
Request::STATE_RECEIVING_BODY,
Request::STATE_RECEIVED,
]);
$available_slots = $this->concurrency - count($processed_requests);
$enqueued_requests = $this->get_requests(Request::STATE_ENQUEUED);
$backfill_enqueued_nb = min(
count($enqueued_requests),
$this->concurrency - count($processed_requests)
);

for($i = 0; $i < $backfill_enqueued_nb; $i++) {
for($i = 0; $i < $available_slots; $i++) {
if(!isset($enqueued_requests[$i])) {
break;
}
$processed_requests[] = $enqueued_requests[$i];
}

if($states !== null) {
$processed_requests = static::filter_requests($processed_requests, $states);
}
Expand Down Expand Up @@ -476,17 +508,13 @@ static private function receive_response_headers( $requests ) {
$response->statusMessage = $parsed['status']['message'];
$response->protocol = $parsed['status']['protocol'];

$content_length = $response->get_header('content-length');
$transfer_encoding = $response->get_header('transfer-encoding');
// If we're expecting a body, let's start receiving it.
if(
$transfer_encoding === 'chunked' ||
($content_length !== null && (int) $content_length > 0)
) {
$request->state = Request::STATE_RECEIVING_BODY;
} else {
// If we're being redirected, we don't need to wait for the body.
if($response->statusCode >= 300 && $response->statusCode < 400) {
$request->state = Request::STATE_RECEIVED;
break;
}

$request->state = Request::STATE_RECEIVING_BODY;
break;
}
}
Expand All @@ -498,6 +526,9 @@ static private function receive_response_headers( $requests ) {
* @param array $requests An array of requests.
*/
private function receive_response_body( $requests ) {
// @TODO: Assume body is received when either
// * Content-Length is reached
// * The last chunk in Transfer-Encoding: chunked is received
foreach (static::stream_select($requests, static::STREAM_SELECT_READ) as $request) {
$response = $request->get_response();
if (!$response->decoded_response_stream) {
Expand Down Expand Up @@ -700,17 +731,15 @@ static private function filter_requests( array $requests, $states ) {
$states = [$states];
}
$results = [];
foreach($requests as $k => $request) {
foreach($requests as $request) {
if(in_array($request->state, $states)) {
$results[$k] = $request;
$results[] = $request;
}
}
return $results;
}


const STREAM_SELECT_READ = 1;
const STREAM_SELECT_WRITE = 2;
static private function stream_select( $requests, $mode ) {
if(empty($requests)) {
return [];
Expand Down
3 changes: 0 additions & 3 deletions src/WordPress/AsyncHttp/Request.php
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,6 @@ public function set_error($error)
$this->error = $error;
$this->state = self::STATE_FAILED;

$this->response->error = $error;
$this->response->state = self::STATE_FAILED;

if($this->http_socket) {
fclose($this->http_socket);
$this->http_socket = null;
Expand Down
6 changes: 5 additions & 1 deletion src/WordPress/AsyncHttp/Response.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public function get_protocol()
}

public function get_header( $name ) {
if($this->headers === null) {
if(false === $this->get_headers()) {
return false;
}

Expand All @@ -51,6 +51,10 @@ public function get_header( $name ) {

public function get_headers()
{
if(!$this->headers) {
return false;
}

return $this->headers;
}

Expand Down
Loading

0 comments on commit ef41a68

Please sign in to comment.