diff --git a/chunked_encoding_server.js b/chunked_encoding_server.js index da185eb8..c7b99caa 100644 --- a/chunked_encoding_server.js +++ b/chunked_encoding_server.js @@ -10,6 +10,12 @@ const server = http.createServer((req, res) => { const acceptEncoding = req.headers['accept-encoding']; const useGzip = acceptEncoding && acceptEncoding.includes('gzip'); + if (req.headers['please-redirect']) { + res.writeHead(301, { Location: req.url }); + res.end(); + return; + } + // Set headers for chunked transfer encoding if HTTP/1.1 if (isHttp11) { res.setHeader('Transfer-Encoding', 'chunked'); diff --git a/http_api.php b/http_api.php index b20fe4e0..8aaccd29 100644 --- a/http_api.php +++ b/http_api.php @@ -4,38 +4,50 @@ use WordPress\AsyncHttp\Request; require __DIR__ . '/vendor/autoload.php'; -require_once __DIR__ . '/src/WordPress/Streams/StreamWrapperInterface.php'; -require_once __DIR__ . '/src/WordPress/Streams/VanillaStreamWrapper.php'; -require_once __DIR__ . '/src/WordPress/Streams/StreamPeekerWrapper.php'; -require_once __DIR__ . '/src/WordPress/AsyncHttp/InflateStreamWrapper.php'; -require_once __DIR__ . '/src/WordPress/AsyncHttp/InflateStreamWrapperData.php'; $requests = [ new Request("https://playground.internal"), - (new Request("https://anglesharp.azurewebsites.net/Chunked"))->set_http_version('1.1'), - (new Request("https://anglesharp.azurewebsites.net/Chunked"))->set_http_version('1.0'), - (new Request("http://127.0.0.1:3000/"))->set_http_version('1.0'), + new Request("https://anglesharp.azurewebsites.net/Chunked", [ + 'http_version' => '1.1' + ]), + new Request("https://anglesharp.azurewebsites.net/Chunked", [ + 'http_version' => '1.0' + ]), + new Request("http://127.0.0.1:3000/", [ + 'http_version' => '1.0', + 'headers' => [ + 'please-redirect' => 'yes' + ] + ]), ]; // var_dump(streams_http_response_read_bytes($streams, 1024)); // Enqueuing another request here is instant and won't start the download yet. -$client = new Client(); -$client->set_progress_callback( function ( Request $request, $downloaded, $total ) { - // echo "$request->url – Downloaded: $downloaded / $total\n"; -} ); +$client = new Client([ + 'concurrency' => 2, + 'on_progress' => function ( Request $request, $downloaded, $total ) { + echo "$request->url – Downloaded: $downloaded / $total\n"; + }, + 'max_redirects' => 0 +]); $client->enqueue( $requests ); - +echo $client->read_bytes($requests[3], 100, Client::READ_POLL_ANY)."\n\n"; while(true) { - $request = $client->next_response_chunk(); + $request = $client->await_response_bytes(); if(false === $request) { break; } echo "GOT DATA CHUNK ON REQUEST $request->id:\n"; - echo $request->get_response()->consume_buffer(1024); + echo $client->read_bytes($request, 1024); echo "----------------\n\n"; } +foreach($client->get_failed_requests() as $failed_request) { + echo "Failed request to " . $failed_request->url . " – " . $failed_request->error . "\n"; +} + + // $client->wait_for_headers($requests[3]); // var_dump($requests[3]->get_response()->get_headers()); diff --git a/src/WordPress/AsyncHttp/Client.php b/src/WordPress/AsyncHttp/Client.php index e3a99967..3557faca 100644 --- a/src/WordPress/AsyncHttp/Client.php +++ b/src/WordPress/AsyncHttp/Client.php @@ -2,10 +2,11 @@ namespace WordPress\AsyncHttp; -use Exception; -use WordPress\Util\Map; -use WordPress\Streams\VanillaStreamWrapperData; -use WordPress\AsyncHttp\CountReadBytesStreamWrapper; +use WordPress\AsyncHttp\StreamWrapper\ChunkedEncodingWrapper; +use WordPress\AsyncHttp\StreamWrapper\CountReadBytesWrapper; +use WordPress\AsyncHttp\StreamWrapper\EventLoopWrapper; +use WordPress\AsyncHttp\StreamWrapper\InflateStreamWrapper; +use WordPress\Streams\StreamWrapperData; /** * An asynchronous HTTP client library designed for WordPress. Main features: @@ -97,7 +98,7 @@ class Client { /** * Microsecond is 1 millionth of a second. - * + * * @var int */ const MICROSECONDS_TO_SECONDS = 1000000; @@ -107,35 +108,19 @@ class Client { */ const NONBLOCKING_TIMEOUT_MICROSECONDS = 0.05 * self::MICROSECONDS_TO_SECONDS; - protected $concurrency = 2; + protected $concurrency; + protected $max_redirects = 3; + protected $requests; - protected $onProgress; - protected $is_processing_queue = false; + protected $on_progress; + protected $connections = array(); - public function __construct() { - $this->requests = []; - $this->onProgress = function () { + public function __construct( $options = [] ) { + $this->concurrency = $options['concurrency'] ?? 2; + $this->requests = []; + $this->on_progress = $options['on_progress'] ?? function () { }; - } - - /** - * Sets the limit of concurrent connections this client will open. - * - * @param int $concurrency - */ - public function set_concurrency_limit( $concurrency ) { - $this->concurrency = $concurrency; - } - - /** - * Sets the callback called when response bytes are received on any of the enqueued - * requests. - * - * @param callable $onProgress A function of three arguments: - * Request $request, int $downloaded, int $total. - */ - public function set_progress_callback( $onProgress ) { - $this->onProgress = $onProgress; + $this->max_redirects = $options['max_redirects'] ?? 3; } /** @@ -144,147 +129,123 @@ public function set_progress_callback( $onProgress ) { * an internal queue. Network transmission is delayed until one of the returned * streams is read from. * - * @param Request|Request[] $requests The HTTP request(s) to enqueue. Can be a single request or an array of requests. + * @param Request|Request[] $requests The HTTP request(s) to enqueue. Can be a single request or an array of requests. */ public function enqueue( $requests ) { if ( ! is_array( $requests ) ) { - $this->requests[] = $requests; + $this->requests[] = $requests; + $this->connections[ $requests->id ] = new Connection( $requests ); + return; } foreach ( $requests as $request ) { - $this->requests[] = $request; + $this->requests[] = $request; + $this->connections[ $request->id ] = new Connection( $request ); } } + public function await_response_bytes() { + do { + foreach ( $this->requests as $request ) { + $request = $request->latest_redirect(); + $connection = $this->connections[ $request->id ]; + if ( + $request->state !== Request::STATE_FAILED && + $request->response && + strlen( $connection->response_buffer ) > 0 + ) { + return $request; + } + } + } while ( $this->event_loop_tick() ); + + return false; + } + /** * Reads $length bytes from the given request while also running * non-blocking event loop operations. * - * @param Request $request + * @param Request $request * @param $length + * + * @return string */ - public function read_bytes( $request, $length, $mode = self::READ_NON_BLOCKING ) - { - $response = $request->get_response(); - $buffered = ''; + public function read_bytes( Request $request, $length, $mode = self::READ_NON_BLOCKING ) { + $buffered = ''; do { - $next_chunk = ''; + $request = $request->latest_redirect(); + $connection = $this->connections[ $request->id ]; if ( $request->state === Request::STATE_RECEIVING_BODY || $request->state === Request::STATE_FINISHED ) { - $next_chunk = $response->consume_buffer($length - strlen($buffered)); - $buffered .= $next_chunk; + $buffered .= $connection->consume_buffer( $length - strlen( $buffered ) ); } if ( - ($mode === self::READ_NON_BLOCKING) || - ($mode === self::READ_POLL_ANY && strlen($buffered)) || - ($mode === self::READ_POLL_ALL && strlen($buffered) >= $length) || + ( $mode === self::READ_NON_BLOCKING ) || + ( $mode === self::READ_POLL_ANY && strlen( $buffered ) ) || + ( $mode === self::READ_POLL_ALL && strlen( $buffered ) >= $length ) || // End of data - ($request->state === Request::STATE_FINISHED && feof($response->raw_response_stream)) + ( $request->state === Request::STATE_FINISHED && ( + !is_resource($this->connections[ $request->id ]->http_socket) || + feof( $this->connections[ $request->id ]->http_socket ) + ) ) ) { break; } - } while ($this->event_loop_tick()); + } while ( $this->event_loop_tick() ); return $buffered; } - public function next_response_chunk() - { - do { - foreach($this->requests as $request) { - if ( strlen($request->get_response()->buffer) > 0 ) { - return $request; - } - } - } while($this->event_loop_tick()); - - return false; - } - - public function wait_for_headers( $request ) - { - if(!in_array($request, $this->requests, true)) { - trigger_error('Request not found in the client', E_USER_WARNING); - return false; - } - - do { - if ($request->get_response()->get_headers()) { - return true; - } - } while ($this->event_loop_tick() && $request->state !== Request::STATE_FAILED); - - return false; - } - - public function wait_for_response_body_stream( $request ) - { - if(!in_array($request, $this->requests, true)) { - trigger_error('Request not found in the client', E_USER_WARNING); - return false; - } - - do { - if ($request->get_response()->decoded_response_stream) { - return true; - } - } while ($this->event_loop_tick() && $request->state !== Request::STATE_FAILED); - - return false; - } - /** * Returns the response stream associated with the given Request object. * Reading from that stream also runs this Client's event loop. * - * @param Request $request + * @param Request $request * * @return resource|bool */ - // public function get_response_stream( $request ) { - // if(!in_array($request, $this->requests, true)) { - // trigger_error('Request not found in the client', E_USER_WARNING); - // return false; - // } - - // if( - // $request->state !== Request::STATE_RECEIVING_BODY && - // $request->state !== Request::STATE_FINISHED - // ) { - // trigger_error('Request is not in a state where the response stream is available', E_USER_WARNING); - // return false; - // } - - // return $request->get_response()->event_loop_decoded_response_stream; - // } - - public function event_loop_tick() - { - if(count($this->get_concurrent_requests()) === 0) { + public function await_response_stream( Request $request ) { + do { + $request = $request->latest_redirect(); + $connection = $this->connections[ $request->id ]; + if ( $connection->event_loop_decoded_response_stream ) { + return $connection->event_loop_decoded_response_stream; + } + // If a request finished without opening a decoded response stream, it either failed + // or it's a redirect. Let's return false. + if ( $request->state === Request::STATE_FAILED || $request->state === Request::STATE_FINISHED ) { + return false; + } + } while ( $this->event_loop_tick() ); + } + + public function event_loop_tick() { + if ( count( $this->get_concurrent_requests() ) === 0 ) { return false; } - - static::open_nonblocking_http_sockets( + + $this->open_nonblocking_http_sockets( $this->get_concurrent_requests( Request::STATE_ENQUEUED ) ); - static::enable_crypto( + $this->enable_crypto( $this->get_concurrent_requests( Request::STATE_WILL_ENABLE_CRYPTO ) ); - static::send_request_headers( + $this->send_request_headers( $this->get_concurrent_requests( Request::STATE_WILL_SEND_HEADERS ) ); - static::send_request_body( + $this->send_request_body( $this->get_concurrent_requests( Request::STATE_WILL_SEND_BODY ) ); - static::receive_response_headers( + $this->receive_response_headers( $this->get_concurrent_requests( Request::STATE_RECEIVING_HEADERS ) ); @@ -296,12 +257,36 @@ public function event_loop_tick() $this->get_concurrent_requests( Request::STATE_RECEIVED ) ); + $this->cleanup_finished_and_consumed_requests( + $this->get_requests( Request::STATE_FINISHED ) + ); + return true; } - protected function get_concurrent_requests($states=null) - { - $processed_requests = $this->get_requests([ + protected function mark_finished( Request $request ) { + $request->state = Request::STATE_FINISHED; + $this->close_connection( $request ); + } + + protected function set_error( Request $request, $error ) { + $request->error = $error; + $request->state = Request::STATE_FAILED; + + $this->close_connection( $request ); + } + + private function close_connection( Request $request ) { + $socket = $this->connections[$request->id]->http_socket; + if ( $socket && is_resource( $socket ) ) { + @fclose( $socket ); + // No need to close all the dependent stream wrappers – they are + // invalidated when the root resource is closed. + } + } + + protected function get_concurrent_requests( $states = null ) { + $processed_requests = $this->get_requests( [ Request::STATE_WILL_ENABLE_CRYPTO, Request::STATE_WILL_SEND_HEADERS, Request::STATE_WILL_SEND_BODY, @@ -309,89 +294,91 @@ protected function get_concurrent_requests($states=null) Request::STATE_RECEIVING_HEADERS, Request::STATE_RECEIVING_BODY, Request::STATE_RECEIVED, - ]); - $available_slots = $this->concurrency - count($processed_requests); - $enqueued_requests = $this->get_requests(Request::STATE_ENQUEUED); - for($i = 0; $i < $available_slots; $i++) { - if(!isset($enqueued_requests[$i])) { + ] ); + $available_slots = $this->concurrency - count( $processed_requests ); + $enqueued_requests = $this->get_requests( Request::STATE_ENQUEUED ); + for ( $i = 0; $i < $available_slots; $i ++ ) { + if ( ! isset( $enqueued_requests[ $i ] ) ) { break; } - $processed_requests[] = $enqueued_requests[$i]; + $processed_requests[] = $enqueued_requests[ $i ]; } - if($states !== null) { - $processed_requests = static::filter_requests($processed_requests, $states); + if ( $states !== null ) { + $processed_requests = static::filter_requests( $processed_requests, $states ); } return $processed_requests; } - private function get_requests($states) { - if(!is_array($states)) { - $states = [$states]; + public function get_failed_requests() { + return $this->get_requests( Request::STATE_FAILED ); + } + + private function get_requests( $states ) { + if ( ! is_array( $states ) ) { + $states = [ $states ]; } - return static::filter_requests($this->requests, $states); + + return static::filter_requests( $this->requests, $states ); } /** * Handle transfer encodings. - * + * * @param Request[] $requests An array of HTTP requests. */ - private function decode_and_monitor_response_body_stream(Request $request) { - $wrapped_stream = CountReadBytesStreamWrapper::wrap( - $request->http_socket, - function ($downloaded) use ($request) { - $response = $request->get_response(); - $total = $response->get_header('content-length') ?: null; - if($total !== null) { + private function decode_and_monitor_response_body_stream( Request $request ) { + $wrapped_stream = CountReadBytesWrapper::wrap( + $this->connections[ $request->id ]->http_socket, + function ( $downloaded ) use ( $request ) { + $total = $request->response->get_header( 'content-length' ) ?: null; + if ( $total !== null ) { $total = (int) $total; } - $onProgress = $this->onProgress; - $onProgress($request, $downloaded, $total); + $onProgress = $this->on_progress; + $onProgress( $request, $downloaded, $total ); } ); - $response = $request->get_response(); - $transfer_encodings = array(); - $transfer_encoding = $response->get_header('transfer-encoding'); - if($transfer_encoding) { - $transfer_encodings = array_map('trim', explode(',', $transfer_encoding)); + $transfer_encoding = $request->response->get_header( 'transfer-encoding' ); + if ( $transfer_encoding ) { + $transfer_encodings = array_map( 'trim', explode( ',', $transfer_encoding ) ); } - $content_encoding = $response->get_header('content-encoding'); - if($content_encoding && !in_array($content_encoding, $transfer_encodings)) { + $content_encoding = $request->response->get_header( 'content-encoding' ); + if ( $content_encoding && ! in_array( $content_encoding, $transfer_encodings ) ) { $transfer_encodings[] = $content_encoding; } - foreach($transfer_encodings as $transfer_encoding) { - switch($transfer_encoding) { + foreach ( $transfer_encodings as $transfer_encoding ) { + switch ( $transfer_encoding ) { case 'chunked': /** * Wrap the stream in a chunked encoding decoder. - * There was an attempt to use stream filters, but unfortunately + * There was an attempt to use stream filters, but unfortunately * they are incompatible with stream_select(). */ - $wrapped_stream = ChunkedEncodingStreamWrapper::create_resource(new VanillaStreamWrapperData( - $wrapped_stream - )); + $wrapped_stream = ChunkedEncodingWrapper::wrap( $wrapped_stream ); break; case 'gzip': case 'deflate': - $wrapped_stream = InflateStreamWrapper::create_resource(new InflateStreamWrapperData( + $wrapped_stream = InflateStreamWrapper::wrap( $wrapped_stream, $transfer_encoding === 'gzip' ? ZLIB_ENCODING_GZIP : ZLIB_ENCODING_RAW - )); + ); break; case 'identity': // No-op break; default: - $request->set_error(new HttpError( 'Unsupported transfer encoding received from the server: ' . $transfer_encoding )); + $this->set_error( $request, + new HttpError( 'Unsupported transfer encoding received from the server: ' . $transfer_encoding ) ); break; } } + return $wrapped_stream; } @@ -402,18 +389,17 @@ function ($downloaded) use ($request) { * * @param Request[] $requests An array of HTTP requests. */ - static private function enable_crypto(array $requests) - { - foreach ( static::stream_select($requests, static::STREAM_SELECT_WRITE) as $request ) { + private function enable_crypto( array $requests ) { + foreach ( $this->stream_select( $requests, static::STREAM_SELECT_WRITE ) as $request ) { $enabled_crypto = stream_socket_enable_crypto( - $request->http_socket, + $this->connections[ $request->id ]->http_socket, true, STREAM_CRYPTO_METHOD_TLSv1_2_CLIENT ); - if (false === $enabled_crypto) { - $request->set_error(new HttpError('Failed to enable crypto: ' . error_get_last()['message'])); + if ( false === $enabled_crypto ) { + $this->set_error( $request, new HttpError( 'Failed to enable crypto: ' . error_get_last()['message'] ) ); continue; - } elseif (0 === $enabled_crypto) { + } elseif ( 0 === $enabled_crypto ) { // The SSL handshake isn't finished yet, let's skip it // for now and try again on the next event loop pass. continue; @@ -428,47 +414,45 @@ static private function enable_crypto(array $requests) * * @param Request[] $requests An array of HTTP requests. */ - static private function send_request_headers(array $requests) - { - foreach ( static::stream_select($requests, static::STREAM_SELECT_WRITE) as $request ) { + private function send_request_headers( array $requests ) { + foreach ( $this->stream_select( $requests, static::STREAM_SELECT_WRITE ) as $request ) { $header_bytes = static::prepare_request_headers( $request ); - if(false === fwrite( $request->http_socket, $header_bytes )) { - $request->set_error(new HttpError( 'Failed to write request bytes.' )); + if ( false === @fwrite( $this->connections[ $request->id ]->http_socket, $header_bytes ) ) { + $this->set_error( $request, new HttpError( 'Failed to write request bytes – ' . error_get_last()['message'] ) ); continue; } - if ($request->upload_body_stream) { + if ( $request->upload_body_stream ) { $request->state = Request::STATE_WILL_SEND_BODY; } else { $request->state = Request::STATE_RECEIVING_HEADERS; } } } - + /** * Sends HTTP request body. * * @param Request[] $requests An array of HTTP requests. */ - static private function send_request_body(array $requests) - { - foreach ( static::stream_select($requests, self::STREAM_SELECT_WRITE) as $request ) { + private function send_request_body( array $requests ) { + foreach ( $this->stream_select( $requests, self::STREAM_SELECT_WRITE ) as $request ) { $chunk = fread( $request->upload_body_stream, 8192 ); if ( false === $chunk ) { - $request->set_error(new HttpError( 'Failed to read from the request body stream' )); + $this->set_error( $request, new HttpError( 'Failed to read from the request body stream' ) ); continue; } - if(false === fwrite( $request->http_socket, $chunk )) { - $request->set_error(new HttpError( 'Failed to write request bytes.' )); + if ( false === fwrite( $this->connections[ $request->id ]->http_socket, $chunk ) ) { + $this->set_error( $request, new HttpError( 'Failed to write request bytes.' ) ); continue; } - if('' === $chunk || feof($request->upload_body_stream)) { - fclose($request->upload_body_stream); + if ( '' === $chunk || feof( $request->upload_body_stream ) ) { + fclose( $request->upload_body_stream ); $request->upload_body_stream = null; - $request->state = Request::STATE_RECEIVING_HEADERS; + $request->state = Request::STATE_RECEIVING_HEADERS; } } } @@ -478,43 +462,48 @@ static private function send_request_body(array $requests) * * @param array $requests An array of requests. */ - static private function receive_response_headers( $requests ) { - foreach (static::stream_select($requests, static::STREAM_SELECT_READ) as $request) { - $response = $request->get_response(); + private function receive_response_headers( $requests ) { + foreach ( $this->stream_select( $requests, static::STREAM_SELECT_READ ) as $request ) { + if ( ! $request->response ) { + $request->response = new Response( $request ); + } + $connection = $this->connections[ $request->id ]; + $response = $request->response; - while (true) { + while ( true ) { // @TODO: Use a larger chunk size here and then scan for \r\n\r\n. // 1 seems slow and overly conservative. - $header_byte = fread($response->raw_response_stream, 1); - if (false === $header_byte || '' === $header_byte) { + $header_byte = fread( $this->connections[ $request->id ]->http_socket, 1 ); + if ( false === $header_byte || '' === $header_byte ) { break; } - $response->buffer .= $header_byte; + $connection->response_buffer .= $header_byte; + $buffer_size = strlen( $connection->response_buffer ); if ( - strlen($response->buffer) < 4 || - $response->buffer[strlen($response->buffer) - 4] !== "\r" || - $response->buffer[strlen($response->buffer) - 3] !== "\n" || - $response->buffer[strlen($response->buffer) - 2] !== "\r" || - $response->buffer[strlen($response->buffer) - 1] !== "\n" + $buffer_size < 4 || + $connection->response_buffer[ $buffer_size - 4 ] !== "\r" || + $connection->response_buffer[ $buffer_size - 3 ] !== "\n" || + $connection->response_buffer[ $buffer_size - 2 ] !== "\r" || + $connection->response_buffer[ $buffer_size - 1 ] !== "\n" ) { continue; } - $parsed = static::parse_http_headers($response->buffer); - $response->buffer = ''; + $parsed = static::parse_http_headers( $connection->response_buffer ); + $connection->response_buffer = ''; - $response->headers = $parsed['headers']; - $response->statusCode = $parsed['status']['code']; - $response->statusMessage = $parsed['status']['message']; - $response->protocol = $parsed['status']['protocol']; + $response->headers = $parsed['headers']; + $response->status_code = $parsed['status']['code']; + $response->status_message = $parsed['status']['message']; + $response->protocol = $parsed['status']['protocol']; // If we're being redirected, we don't need to wait for the body. - if($response->statusCode >= 300 && $response->statusCode < 400) { + if ( $response->status_code >= 300 && $response->status_code < 400 ) { $request->state = Request::STATE_RECEIVED; break; } - + $request->state = Request::STATE_RECEIVING_BODY; break; } @@ -531,54 +520,101 @@ private function receive_response_body( $requests ) { // * Content-Length is reached // * The last chunk in Transfer-Encoding: chunked is received // * The connection is closed - foreach (static::stream_select($requests, static::STREAM_SELECT_READ) as $request) { - $response = $request->get_response(); - if (!$response->decoded_response_stream) { - $response->decoded_response_stream = $this->decode_and_monitor_response_body_stream($request); - $response->event_loop_decoded_response_stream = StreamWrapper::create_resource( - new StreamData($request, $this) + foreach ( $this->stream_select( $requests, static::STREAM_SELECT_READ ) as $request ) { + $response = $request->response; + if ( ! $this->connections[ $request->id ]->decoded_response_stream ) { + $this->connections[ $request->id ]->decoded_response_stream = $this->decode_and_monitor_response_body_stream( $request ); + $this->connections[ $request->id ]->event_loop_decoded_response_stream = EventLoopWrapper::wrap( + $request, + $this->connections[ $request->id ]->http_socket, + $this ); } - while (true) { - if(feof($response->decoded_response_stream)) { + while ( true ) { + if ( feof( $this->connections[ $request->id ]->decoded_response_stream ) ) { $request->state = Request::STATE_RECEIVED; break; } - $body_bytes = fread($response->decoded_response_stream, 1024); - if (false === $body_bytes || '' === $body_bytes) { + $body_bytes = fread( $this->connections[ $request->id ]->decoded_response_stream, 1024 ); + if ( false === $body_bytes || '' === $body_bytes ) { break; } - $response->buffer .= $body_bytes; + $this->connections[ $request->id ]->response_buffer .= $body_bytes; } } } /** - * @TODO: Limit to n redirects. - * - * @param array $requests An array of requests. + * @param array $requests An array of requests. */ private function handle_redirects( $requests ) { - foreach($requests as $request) { - $response = $request->get_response(); - $code = $response->get_status_code(); - if(!($code >= 300 && $code < 400)) { - $request->state = Request::STATE_FINISHED; + foreach ( $requests as $request ) { + $response = $request->response; + $code = $response->status_code; + $this->mark_finished( $request ); + if ( ! ( $code >= 300 && $code < 400 ) ) { + continue; + } + + $location = $response->get_header( 'location' ); + if ( $location === null ) { continue; } + - $location = $response->get_header('location'); - if($location === null) { - $request->state = Request::STATE_FINISHED; + $redirects_so_far = 0; + $cause = $request; + while($cause->redirected_from) { + ++$redirects_so_far; + $cause = $cause->redirected_from; + } + + if($redirects_so_far >= $this->max_redirects) { + $this->set_error($request, new HttpError('Too many redirects')); + continue; + } + + $redirect_url = $location; + if(strpos($redirect_url, 'http://') !== 0 && strpos($redirect_url, 'https://') !== 0) { + $current_url_parts = parse_url($request->url); + $redirect_url = $current_url_parts['scheme'] . '://' . $current_url_parts['host']; + if($current_url_parts['port']){ + $redirect_url .= ':' . $current_url_parts['port']; + } + if(!str_starts_with($location, '/')) { + $redirect_url .= '/'; + } + $redirect_url .= $location; + } + + if (!filter_var($redirect_url, FILTER_VALIDATE_URL)) { + $this->set_error($request, new HttpError('Invalid redirect URL')); continue; } - $request->state = Request::STATE_FINISHED; - $redirect = (new Request($location))->set_redirected_from($request); - $this->requests[] = $redirect; + $this->enqueue(new Request($redirect_url, ['redirected_from' => $request])); + } + } + + private function cleanup_finished_and_consumed_requests( $requests ) { + foreach ( $requests as $request ) { + // Interestingly, relying on foreach-provided $k => $request unsets + // the wrong request. Is it a case of a non-sparse array being re-indexed + // when iterating and unsetting? + $request_key = array_search($request, $this->requests, true); + if ( ! isset( $this->connections[ $request->id ] ) ) { + // unset( $this->requests[ $request_key ] ); + continue; + } + + $connection = $this->connections[ $request->id ]; + if ( ! $connection || ! $connection->response_buffer ) { + // unset( $this->requests[ $request_key ] ); + // unset( $this->connections[ $request->id ] ); + } } } @@ -589,7 +625,7 @@ private function handle_redirects( $requests ) { * * @return array An array containing the parsed status and headers. */ - static private function parse_http_headers( string $headers ) { + private function parse_http_headers( string $headers ) { $lines = explode( "\r\n", $headers ); $status = array_shift( $lines ); $status = explode( ' ', $status ); @@ -634,27 +670,28 @@ static private function parse_http_headers( string $headers ) { * * @return bool Whether the stream was opened successfully. */ - static private function open_nonblocking_http_sockets($requests) { - foreach ($requests as $request) { - $url = $request->url; - $parts = parse_url($url); + private function open_nonblocking_http_sockets( $requests ) { + foreach ( $requests as $request ) { + $url = $request->url; + $parts = parse_url( $url ); $scheme = $parts['scheme']; - if (!in_array($scheme, array('http', 'https'))) { - $request->set_error(new HttpError('stream_http_open_nonblocking: Invalid scheme in URL ' . $url . ' – only http:// and https:// URLs are supported')); + if ( ! in_array( $scheme, array( 'http', 'https' ) ) ) { + $this->set_error( $request, + new HttpError( 'stream_http_open_nonblocking: Invalid scheme in URL ' . $url . ' – only http:// and https:// URLs are supported' ) ); continue; } $is_ssl = $scheme === 'https'; - $port = $parts['port'] ?? ($scheme === 'https' ? 443 : 80); - $host = $parts['host']; + $port = $parts['port'] ?? ( $scheme === 'https' ? 443 : 80 ); + $host = $parts['host']; // Create stream context $context = stream_context_create( array( 'socket' => array( - 'isSsl' => $is_ssl, + 'isSsl' => $is_ssl, 'originalUrl' => $url, - 'socketUrl' => 'tcp://' . $host . ':' . $port, + 'socketUrl' => 'tcp://' . $host . ':' . $port, ), ) ); @@ -667,11 +704,13 @@ static private function open_nonblocking_http_sockets($requests) { STREAM_CLIENT_CONNECT | STREAM_CLIENT_ASYNC_CONNECT, $context ); - if ($stream === false) { - $request->set_error(new HttpError("stream_http_open_nonblocking: stream_socket_client() was unable to open a stream to $url. $errno: $errstr")); + + if ( $stream === false ) { + $this->set_error( $request, + new HttpError( "stream_http_open_nonblocking: stream_socket_client() was unable to open a stream to $url. $errno: $errstr" ) ); continue; } - + if ( PHP_VERSION_ID >= 72000 ) { // In PHP <= 7.1 and later, making the socket non-blocking before the // SSL handshake makes the stream_socket_enable_crypto() call always return @@ -680,15 +719,14 @@ static private function open_nonblocking_http_sockets($requests) { stream_set_blocking( $stream, 0 ); } - $request->http_socket = $stream; - $request->get_response()->raw_response_stream = $stream; - if($is_ssl) { + $this->connections[ $request->id ]->http_socket = $stream; + if ( $is_ssl ) { $request->state = Request::STATE_WILL_ENABLE_CRYPTO; } else { $request->state = Request::STATE_WILL_SEND_HEADERS; } } - + return true; } @@ -696,24 +734,25 @@ static private function open_nonblocking_http_sockets($requests) { * Prepares an HTTP request string for a given URL. * * @param Request $request The Request to prepare the HTTP headers for. + * * @return string The prepared HTTP request string. */ - private static function prepare_request_headers( Request $request ) { + static private function prepare_request_headers( Request $request ) { $url = $request->url; $parts = parse_url( $url ); $host = $parts['host']; - $path = (isset($parts['path']) ? $parts['path'] : '/') . ( isset( $parts['query'] ) ? '?' . $parts['query'] : '' ); + $path = ( isset( $parts['path'] ) ? $parts['path'] : '/' ) . ( isset( $parts['query'] ) ? '?' . $parts['query'] : '' ); $headers = [ - "Host" => $host, - "User-Agent" => "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36", - "Accept" => "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9", + "Host" => $host, + "User-Agent" => "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36", + "Accept" => "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9", "Accept-Encoding" => "gzip", "Accept-Language" => "en-US,en;q=0.9", - "Connection" => "close", + "Connection" => "close", ]; - foreach($request->headers as $k => $v) { - $headers[$k] = $v; + foreach ( $request->headers as $k => $v ) { + $headers[ $k ] = $v; } @@ -728,33 +767,34 @@ private static function prepare_request_headers( Request $request ) { return implode( "\r\n", $request_parts ) . "\r\n\r\n"; } - static private function filter_requests( array $requests, $states ) { - if(!is_array($states)) { - $states = [$states]; + private function filter_requests( array $requests, $states ) { + if ( ! is_array( $states ) ) { + $states = [ $states ]; } $results = []; - foreach($requests as $request) { - if(in_array($request->state, $states)) { + foreach ( $requests as $request ) { + if ( in_array( $request->state, $states ) ) { $results[] = $request; } } + return $results; } - static private function stream_select( $requests, $mode ) { - if(empty($requests)) { + private function stream_select( $requests, $mode ) { + if ( empty( $requests ) ) { return []; } - $read = []; + $read = []; $write = []; foreach ( $requests as $k => $request ) { - if($mode & static::STREAM_SELECT_READ) { - $read[ $k ] = $request->http_socket; + if ( $mode & static::STREAM_SELECT_READ ) { + $read[ $k ] = $this->connections[ $request->id ]->http_socket; } - if($mode & static::STREAM_SELECT_WRITE) { - $write[ $k ] = $request->http_socket; + if ( $mode & static::STREAM_SELECT_WRITE ) { + $write[ $k ] = $this->connections[ $request->id ]->http_socket; } } $except = null; @@ -763,24 +803,26 @@ static private function stream_select( $requests, $mode ) { $ready = @stream_select( $read, $write, $except, 0, static::NONBLOCKING_TIMEOUT_MICROSECONDS ); if ( $ready === false ) { foreach ( $requests as $request ) { - $request->set_error(new HttpError( 'Error: ' . error_get_last()['message'] )); + $this->set_error( $request, new HttpError( 'Error: ' . error_get_last()['message'] ) ); } + return []; } elseif ( $ready <= 0 ) { // @TODO allow at most X stream_select attempts per request // foreach ( $unprocessed_requests as $request ) { - // $request->set_error(new HttpError( 'stream_select timed out' )); + // $this->>set_error($request, new HttpError( 'stream_select timed out' )); // } return []; } $selected_requests = []; - foreach (array_keys($read) as $k) { + foreach ( array_keys( $read ) as $k ) { $selected_requests[ $k ] = $requests[ $k ]; } - foreach (array_keys($write) as $k) { + foreach ( array_keys( $write ) as $k ) { $selected_requests[ $k ] = $requests[ $k ]; } + return $selected_requests; } diff --git a/src/WordPress/AsyncHttp/Connection.php b/src/WordPress/AsyncHttp/Connection.php new file mode 100644 index 00000000..0ff5b853 --- /dev/null +++ b/src/WordPress/AsyncHttp/Connection.php @@ -0,0 +1,24 @@ +request = $request; + } + + public function consume_buffer($length) + { + $buffer = substr($this->response_buffer, 0, $length); + $this->response_buffer = substr($this->response_buffer, $length); + return $buffer; + } + +} diff --git a/src/WordPress/AsyncHttp/CountReadBytesStreamWrapper.php b/src/WordPress/AsyncHttp/CountReadBytesStreamWrapper.php deleted file mode 100644 index 57781898..00000000 --- a/src/WordPress/AsyncHttp/CountReadBytesStreamWrapper.php +++ /dev/null @@ -1,35 +0,0 @@ -message = $message; } + + public function __toString( ) { + return $this->message; + } } diff --git a/src/WordPress/AsyncHttp/InflateStreamWrapperData.php b/src/WordPress/AsyncHttp/InflateStreamWrapperData.php deleted file mode 100644 index b7a92970..00000000 --- a/src/WordPress/AsyncHttp/InflateStreamWrapperData.php +++ /dev/null @@ -1,13 +0,0 @@ -fp = $fp; - $this->encoding = $encoding; - } -} diff --git a/src/WordPress/AsyncHttp/InternalRequestState.php b/src/WordPress/AsyncHttp/InternalRequestState.php deleted file mode 100644 index fb486130..00000000 --- a/src/WordPress/AsyncHttp/InternalRequestState.php +++ /dev/null @@ -1,40 +0,0 @@ -request = $request; - $this->response = new Response( $this ); - } - - public function is_finished() { - return $this->state === self::STATE_FINISHED; - } - - public function get_header( $name ) { - if($this->headers === null) { - return false; - } - - return $this->headers[ strtolower($name) ] ?? null; - } -} diff --git a/src/WordPress/AsyncHttp/Request.php b/src/WordPress/AsyncHttp/Request.php index d27c4f20..b53e4784 100644 --- a/src/WordPress/AsyncHttp/Request.php +++ b/src/WordPress/AsyncHttp/Request.php @@ -19,7 +19,7 @@ class Request { public $id; - public $state = self::STATE_ENQUEUED; + public $state = self::STATE_ENQUEUED; public $url; public $is_ssl; @@ -28,94 +28,44 @@ class Request { public $http_version; public $upload_body_stream; public $redirected_from; - public $http_socket; + public $redirected_to; public $error; - protected $response; + public $response; /** * @param string $url */ - public function __construct( string $url, $method='GET', $headers=[], $body_stream=null, $http_version='1.1' ) { + public function __construct( string $url, $request_info = array() ) { + $request_info = array_merge([ + 'http_version' => '1.1', + 'method' => 'GET', + 'headers' => [], + 'body_stream' => null, + 'redirected_from' => null, + ], $request_info); + $this->id = ++self::$last_id; $this->url = $url; $this->is_ssl = strpos( $url, 'https://' ) === 0; - $this->method = $method; - $this->headers = $headers; - $this->upload_body_stream = $body_stream; - $this->http_version = $http_version; - $this->response = new Response( $this ); - } - - public function set_method(string $method) - { - if($this->state === self::STATE_ENQUEUED) { - $this->method = $method; - } else { - trigger_error('Cannot change method after the request has been sent', E_USER_WARNING); - } - return $this; - } - - public function set_headers(array $headers) - { - if($this->state === self::STATE_ENQUEUED) { - $this->headers = $headers; - } else { - trigger_error('Cannot change headers after the request has been sent', E_USER_WARNING); + $this->method = $request_info['method']; + $this->headers = $request_info['headers']; + $this->upload_body_stream = $request_info['body_stream']; + $this->http_version = $request_info['http_version']; + $this->redirected_from = $request_info['redirected_from']; + if($this->redirected_from) { + $this->redirected_from->redirected_to = $this; } - return $this; } - public function set_http_version(string $http_version) + public function latest_redirect() { - if($this->state === self::STATE_ENQUEUED) { - $this->http_version = $http_version; - } else { - trigger_error('Cannot change http_version after the request has been sent', E_USER_WARNING); + $request = $this; + while ($request->redirected_to) { + $request = $request->redirected_to; } - - return $this; - } - - public function set_upload_body_stream($upload_body_stream) - { - if($this->state === self::STATE_ENQUEUED) { - $this->upload_body_stream = $upload_body_stream; - } else { - trigger_error('Cannot change upload_body_stream after the request has been sent', E_USER_WARNING); - } - - return $this; - } - - public function set_redirected_from($request) - { - if($this->redirected_from === null) { - $this->redirected_from = $request; - } else { - trigger_error('Cannot change redirected_from after it was already set', E_USER_WARNING); - } - return $this; - } - - public function set_error($error) - { - $this->error = $error; - $this->state = self::STATE_FAILED; - - if($this->http_socket) { - fclose($this->http_socket); - $this->http_socket = null; - } - } - - /** - * @return Response - */ - public function get_response() { - return $this->response; + return $request; } } diff --git a/src/WordPress/AsyncHttp/Response.php b/src/WordPress/AsyncHttp/Response.php index 1ce7914b..ee58adeb 100644 --- a/src/WordPress/AsyncHttp/Response.php +++ b/src/WordPress/AsyncHttp/Response.php @@ -4,43 +4,18 @@ class Response { - public $protocol; - public $statusCode; - public $statusMessage; + public $status_code; + public $status_message; public $headers = []; - - public $raw_response_stream; - public $decoded_response_stream; - public $event_loop_decoded_response_stream; + public $request; public $buffer = ''; - private $request; public function __construct(Request $request) { $this->request = $request; } - public function get_request() - { - return $this->request; - } - - public function get_status_code() - { - return $this->statusCode; - } - - public function get_status_message() - { - return $this->statusMessage; - } - - public function get_protocol() - { - return $this->protocol; - } - public function get_header( $name ) { if(false === $this->get_headers()) { return false; @@ -58,11 +33,5 @@ public function get_headers() return $this->headers; } - public function consume_buffer($length) - { - $buffer = substr($this->buffer, 0, $length); - $this->buffer = substr($this->buffer, $length); - return $buffer; - } } diff --git a/src/WordPress/AsyncHttp/StreamData.php b/src/WordPress/AsyncHttp/StreamData.php deleted file mode 100644 index a580b657..00000000 --- a/src/WordPress/AsyncHttp/StreamData.php +++ /dev/null @@ -1,17 +0,0 @@ -request = $request; - $this->client = $group; - } -} diff --git a/src/WordPress/AsyncHttp/StreamWrapper.php b/src/WordPress/AsyncHttp/StreamWrapper.php deleted file mode 100644 index 022164ad..00000000 --- a/src/WordPress/AsyncHttp/StreamWrapper.php +++ /dev/null @@ -1,78 +0,0 @@ -stream ) { - $this->stream = $this->wrapper_data->request->http_socket; - } - } - - public function stream_open( $path, $mode, $options, &$opened_path ) { - if ( ! parent::stream_open( $path, $mode, $options, $opened_path ) ) { - return false; - } - - if ( ! $this->wrapper_data->client ) { - return false; - } - $this->client = $this->wrapper_data->client; - - return true; - } - - /** - * @param int $cast_as - */ - public function stream_cast( $cast_as ) { - $this->initialize(); - - return parent::stream_cast( $cast_as ); - } - - public function stream_read( $count ) { - $this->initialize(); - - $this->client->event_loop_tick(); - return $this->client->read_bytes( $this->wrapper_data->request, $count ); - } - - public function stream_write( $data ) { - $this->initialize(); - - return parent::stream_write( $data ); - } - - public function stream_tell() { - $this->initialize(); - - return parent::stream_tell(); - } - - public function stream_close() { - $this->initialize(); - - return parent::stream_close(); - } - - public function stream_eof() { - $this->initialize(); - - return parent::stream_eof(); - } - - public function stream_seek( $offset, $whence ) { - $this->initialize(); - - return parent::stream_seek( $offset, $whence ); - } -} diff --git a/src/WordPress/AsyncHttp/ChunkedEncodingStreamWrapper.php b/src/WordPress/AsyncHttp/StreamWrapper/ChunkedEncodingWrapper.php similarity index 87% rename from src/WordPress/AsyncHttp/ChunkedEncodingStreamWrapper.php rename to src/WordPress/AsyncHttp/StreamWrapper/ChunkedEncodingWrapper.php index 172e69da..6bde2c79 100644 --- a/src/WordPress/AsyncHttp/ChunkedEncodingStreamWrapper.php +++ b/src/WordPress/AsyncHttp/StreamWrapper/ChunkedEncodingWrapper.php @@ -1,10 +1,10 @@ $response_stream + ] ); + } + + protected function do_initialize() { + $this->stream = $this->wrapper_data['response_stream']; + } + /** * Assumptions: - * + * * * $count is the maximum number of **decoded bytes** to return. To decode $count * bytes, we may need to read more than $count bytes from the underlying stream. * * We can call parent::stream_read() without blocking. If the underlying stream * has no more data to read, it will return an empty string. - * + * * @param mixed $count * @return bool|string */ @@ -47,7 +58,7 @@ private function decode_chunks() { if(self::SCAN_FINAL_CHUNK === $this->state) { return ''; } - + $at = 0; $chunks = []; while($at < strlen($this->raw_buffer)) { @@ -57,7 +68,7 @@ private function decode_chunks() { if($chunk_bytes_nb === 0 || strlen($this->raw_buffer) < $chunk_bytes_nb + 2 ) { break; } - + // Check if we received chunk extension and skip over it if yes. if($this->raw_buffer[$chunk_bytes_nb] === ";") { ++$at; @@ -81,8 +92,8 @@ private function decode_chunks() { $this->state = self::SCAN_CHUNK_DATA; } } else if ( $this->state === self::SCAN_CHUNK_DATA ) { - $bytes_to_read = min( - $this->chunk_remaining_bytes, + $bytes_to_read = min( + $this->chunk_remaining_bytes, strlen($this->raw_buffer) - $at ); $data = substr( $this->raw_buffer, $at, $bytes_to_read ); diff --git a/src/WordPress/AsyncHttp/StreamWrapper/CountReadBytesWrapper.php b/src/WordPress/AsyncHttp/StreamWrapper/CountReadBytesWrapper.php new file mode 100644 index 00000000..3b4282ce --- /dev/null +++ b/src/WordPress/AsyncHttp/StreamWrapper/CountReadBytesWrapper.php @@ -0,0 +1,28 @@ + $request, + 'http_socket' => $http_socket, + 'client' => $client + ] ); + } + + protected function do_initialize() { + $this->stream = $this->wrapper_data['http_socket']; + $this->client = $this->wrapper_data['client']; + $this->request = $this->wrapper_data['request']; + } + + public function stream_read( $count ) { + $this->client->event_loop_tick(); + return $this->client->read_bytes( $this->request, $count ); + } +} diff --git a/src/WordPress/AsyncHttp/InflateStreamWrapper.php b/src/WordPress/AsyncHttp/StreamWrapper/InflateStreamWrapper.php similarity index 56% rename from src/WordPress/AsyncHttp/InflateStreamWrapper.php rename to src/WordPress/AsyncHttp/StreamWrapper/InflateStreamWrapper.php index ef4d58ee..66ace74e 100644 --- a/src/WordPress/AsyncHttp/InflateStreamWrapper.php +++ b/src/WordPress/AsyncHttp/StreamWrapper/InflateStreamWrapper.php @@ -1,44 +1,35 @@ $response_stream, + 'encoding' => $encoding, + ] ); } - protected function init() - { - if($this->initialized) { - return; - } - $this->initialized = true; - - if(!($this->wrapper_data instanceof InflateStreamWrapperData)) { - throw new \Exception('InflateStreamWrapper requires an instance of InflateStreamWrapperData'); - } - - $this->inflate_handle = inflate_init($this->wrapper_data->encoding); + protected function do_initialize() { + $this->stream = $this->wrapper_data['response_stream']; + $this->inflate_handle = inflate_init($this->wrapper_data['encoding']); if(false === $this->inflate_handle) { throw new \Exception('Failed to initialize inflate handle'); } } public function stream_read( $count ) { - $this->init(); - $bytes = parent::stream_read( $count ); if($bytes === false) { return false; diff --git a/src/WordPress/Streams/StreamPeekerData.php b/src/WordPress/Streams/StreamPeekerData.php deleted file mode 100644 index 08a51f18..00000000 --- a/src/WordPress/Streams/StreamPeekerData.php +++ /dev/null @@ -1,16 +0,0 @@ -fp = $fp; - $this->onChunk = $onChunk; - $this->onClose = $onClose; - parent::__construct( $fp ); - } -} diff --git a/src/WordPress/Streams/StreamPeekerWrapper.php b/src/WordPress/Streams/StreamPeekerWrapper.php index 2b83aca2..b4efd555 100644 --- a/src/WordPress/Streams/StreamPeekerWrapper.php +++ b/src/WordPress/Streams/StreamPeekerWrapper.php @@ -2,42 +2,26 @@ namespace WordPress\Streams; -class StreamPeekerWrapper extends VanillaStreamWrapper { - protected $onChunk; - protected $onClose; +class StreamPeekerWrapper extends StreamWrapper { + protected $on_data; + protected $on_close; protected $position; const SCHEME = 'peek'; - // Opens the stream - public function stream_open( $path, $mode, $options, &$opened_path ) { - parent::stream_open( $path, $mode, $options, $opened_path ); - - if ( isset( $this->wrapper_data->fp ) ) { - $this->stream = $this->wrapper_data->fp; - } else { - return false; - } - - if ( isset( $this->wrapper_data->onChunk ) && is_callable( $this->wrapper_data->onChunk ) ) { - $this->onChunk = $this->wrapper_data->onChunk; - } else { - // Default onChunk function if none provided - $this->onChunk = function ( $data ) { - }; - } - - if ( isset( $this->wrapper_data->onClose ) && is_callable( $this->wrapper_data->onClose ) ) { - $this->onClose = $this->wrapper_data->onClose; - } else { - // Default onClose function if none provided - $this->onClose = function () { - }; - } + public static function wrap( $response_stream, $on_data, $on_close=null ) { + return parent::create_resource( [ + 'stream' => $response_stream, + 'on_data' => $on_data, + 'on_close' => $on_close + ] ); + } + protected function do_initialize() { + $this->stream = $this->wrapper_data['stream']; + $this->on_data = $this->wrapper_data['on_data'] ?? function ( $data ) {}; + $this->on_close = $this->wrapper_data['on_close'] ?? function () {}; $this->position = 0; - - return true; } // Reads from the stream @@ -45,7 +29,7 @@ public function stream_read( $count ) { $ret = fread( $this->stream, $count ); $this->position += strlen( $ret ); - $onChunk = $this->onChunk; + $onChunk = $this->on_data; $onChunk( $ret ); return $ret; @@ -61,8 +45,11 @@ public function stream_write( $data ) { // Closes the stream public function stream_close() { - fclose( $this->stream ); - $onClose = $this->onClose; + if(is_resource($this->stream)) { + fclose( $this->stream ); + } + $this->stream = null; + $onClose = $this->on_close; $onClose(); } diff --git a/src/WordPress/Streams/VanillaStreamWrapper.php b/src/WordPress/Streams/StreamWrapper.php similarity index 84% rename from src/WordPress/Streams/VanillaStreamWrapper.php rename to src/WordPress/Streams/StreamWrapper.php index 9afe9467..531b2877 100644 --- a/src/WordPress/Streams/VanillaStreamWrapper.php +++ b/src/WordPress/Streams/StreamWrapper.php @@ -2,19 +2,17 @@ namespace WordPress\Streams; -class VanillaStreamWrapper implements StreamWrapperInterface { +class StreamWrapper implements StreamWrapperInterface { protected $stream; protected $context; protected $wrapper_data; + protected $initialized = false; const SCHEME = 'vanilla'; - /** - * @param \WordPress\Streams\VanillaStreamWrapperData $data - */ - public static function create_resource( $data ) { + protected static function create_resource( $data ) { static::register(); $context = stream_context_create( @@ -42,6 +40,17 @@ public static function unregister() { stream_wrapper_unregister( 'async' ); } + protected function initialize() { + if ( $this->initialized ) { + return; + } + $this->initialized = true; + $this->do_initialize(); + } + + protected function do_initialize() { + } + /** * @param int $option @@ -49,6 +58,7 @@ public static function unregister() { * @param int|null $arg2 */ public function stream_set_option( $option, $arg1, $arg2 = null ): bool { + $this->initialize(); if ( \STREAM_OPTION_BLOCKING === $option ) { return stream_set_blocking( $this->stream, (bool) $arg1 ); } elseif ( \STREAM_OPTION_READ_TIMEOUT === $option ) { @@ -61,15 +71,12 @@ public function stream_set_option( $option, $arg1, $arg2 = null ): bool { public function stream_open( $path, $mode, $options, &$opened_path ) { $contextOptions = stream_context_get_options( $this->context ); - if ( ! isset( $contextOptions[ static::SCHEME ]['wrapper_data'] ) || ! is_object( $contextOptions[ static::SCHEME ]['wrapper_data'] ) ) { + if ( ! isset( $contextOptions[ static::SCHEME ]['wrapper_data'] ) ) { return false; } $this->wrapper_data = $contextOptions[ static::SCHEME ]['wrapper_data']; - - if ( $this->wrapper_data->fp ) { - $this->stream = $this->wrapper_data->fp; - } + $this->initialize(); return true; } @@ -78,10 +85,12 @@ public function stream_open( $path, $mode, $options, &$opened_path ) { * @param int $cast_as */ public function stream_cast( $cast_as ) { + $this->initialize(); return $this->stream; } public function stream_read( $count ) { + $this->initialize(); if ( ! $this->stream ) { return false; } @@ -90,6 +99,7 @@ public function stream_read( $count ) { } public function stream_write( $data ) { + $this->initialize(); if ( ! $this->stream ) { return false; } @@ -98,6 +108,7 @@ public function stream_write( $data ) { } public function stream_tell() { + $this->initialize(); if ( ! $this->stream ) { return false; } @@ -106,6 +117,7 @@ public function stream_tell() { } public function stream_close() { + $this->initialize(); if ( ! $this->stream ) { return false; } @@ -118,6 +130,7 @@ public function stream_close() { } public function stream_eof() { + $this->initialize(); if ( ! $this->stream ) { return false; } @@ -130,6 +143,7 @@ public function stream_eof() { } public function stream_seek( $offset, $whence ) { + $this->initialize(); if ( ! $this->stream ) { return false; } @@ -138,9 +152,10 @@ public function stream_seek( $offset, $whence ) { } public function stream_stat() { + $this->initialize(); return array(); } - + /* * This stream_close call could be initiated not by the developer, * but by the PHP internal request shutdown handler (written in C). diff --git a/src/WordPress/Streams/VanillaStreamWrapperData.php b/src/WordPress/Streams/VanillaStreamWrapperData.php deleted file mode 100644 index 31cdf459..00000000 --- a/src/WordPress/Streams/VanillaStreamWrapperData.php +++ /dev/null @@ -1,11 +0,0 @@ -fp = $fp; - } -}