diff --git a/lib/ApiOperations/Request.php b/lib/ApiOperations/Request.php index e300dc13fb..b1fd65b3fe 100644 --- a/lib/ApiOperations/Request.php +++ b/lib/ApiOperations/Request.php @@ -45,6 +45,23 @@ protected function _request($method, $url, $params = [], $options = null) return [$resp->json, $options]; } + /** + * @param string $method HTTP method ('get', 'post', etc.) + * @param string $url URL for the request + * @param callable $readBodyChunk function that will receive chunks of data from a successful request body + * @param array $params list of parameters for the request + * @param null|array|string $options + * + * @throws \Stripe\Exception\ApiErrorException if the request fails + * + * @return array tuple containing (the JSON response, $options) + */ + protected function _requestStreaming($method, $url, $readBodyChunk, $params = [], $options = null) + { + $opts = $this->_opts->merge($options); + static::_staticStreamingRequest($method, $url, $readBodyChunk, $params, $opts); + } + /** * @param string $method HTTP method ('get', 'post', etc.) * @param string $url URL for the request @@ -65,4 +82,21 @@ protected static function _staticRequest($method, $url, $params, $options) return [$response, $opts]; } + + /** + * @param string $method HTTP method ('get', 'post', etc.) + * @param string $url URL for the request + * @param callable $readBodyChunk function that will receive chunks of data from a successful request body + * @param array $params list of parameters for the request + * @param null|array|string $options + * + * @throws \Stripe\Exception\ApiErrorException if the request fails + */ + protected static function _staticStreamingRequest($method, $url, $readBodyChunk, $params, $options) + { + $opts = \Stripe\Util\RequestOptions::parse($options); + $baseUrl = isset($opts->apiBase) ? $opts->apiBase : static::baseUrl(); + $requestor = new \Stripe\ApiRequestor($opts->apiKey, $baseUrl); + $requestor->requestStreaming($method, $url, $readBodyChunk, $params, $opts->headers); + } } diff --git a/lib/ApiRequestor.php b/lib/ApiRequestor.php index 048c462c06..55a6a45b3a 100644 --- a/lib/ApiRequestor.php +++ b/lib/ApiRequestor.php @@ -123,6 +123,28 @@ public function request($method, $url, $params = null, $headers = null) return [$resp, $myApiKey]; } + /** + * @param string $method + * @param string $url + * @param callable $readBodyChunk + * @param null|array $params + * @param null|array $headers + * + * @throws Exception\ApiErrorException + * + * @return array tuple containing (ApiReponse, API key) + */ + public function requestStreaming($method, $url, $readBodyChunk, $params = null, $headers = null) + { + $params = $params ?: []; + $headers = $headers ?: []; + list($rbody, $rcode, $rheaders, $myApiKey) = + $this->_requestRawStreaming($method, $url, $params, $headers, $readBodyChunk); + if ($rcode >= 300) { + $this->_interpretResponse($rbody, $rcode, $rheaders); + } + } + /** * @param string $rbody a JSON string * @param int $rcode @@ -328,18 +350,7 @@ private static function _defaultHeaders($apiKey, $clientInfo = null) ]; } - /** - * @param string $method - * @param string $url - * @param array $params - * @param array $headers - * - * @throws Exception\AuthenticationException - * @throws Exception\ApiConnectionException - * - * @return array - */ - private function _requestRaw($method, $url, $params, $headers) + private function _prepareRequest($method, $url, $params, $headers) { $myApiKey = $this->_apiKey; if (!$myApiKey) { @@ -416,6 +427,24 @@ function ($key) use ($params) { $rawHeaders[] = $header . ': ' . $value; } + return [$absUrl, $rawHeaders, $params, $hasFile, $myApiKey]; + } + + /** + * @param string $method + * @param string $url + * @param array $params + * @param array $headers + * + * @throws Exception\AuthenticationException + * @throws Exception\ApiConnectionException + * + * @return array + */ + private function _requestRaw($method, $url, $params, $headers) + { + list($absUrl, $rawHeaders, $params, $hasFile, $myApiKey) = $this->_prepareRequest($method, $url, $params, $headers); + $requestStartMs = Util\Util::currentTimeMillis(); list($rbody, $rcode, $rheaders) = $this->httpClient()->request( @@ -438,6 +467,45 @@ function ($key) use ($params) { return [$rbody, $rcode, $rheaders, $myApiKey]; } + /** + * @param string $method + * @param string $url + * @param array $params + * @param array $headers + * @param callable $readBodyChunk + * + * @throws Exception\AuthenticationException + * @throws Exception\ApiConnectionException + * + * @return array + */ + private function _requestRawStreaming($method, $url, $params, $headers, $readBodyChunk) + { + list($absUrl, $rawHeaders, $params, $hasFile, $myApiKey) = $this->_prepareRequest($method, $url, $params, $headers); + + $requestStartMs = Util\Util::currentTimeMillis(); + + list($rbody, $rcode, $rheaders) = $this->httpClient()->requestStreaming( + $method, + $absUrl, + $rawHeaders, + $params, + $hasFile, + $readBodyChunk + ); + + if (isset($rheaders['request-id']) + && \is_string($rheaders['request-id']) + && \strlen($rheaders['request-id']) > 0) { + self::$requestTelemetry = new RequestTelemetry( + $rheaders['request-id'], + Util\Util::currentTimeMillis() - $requestStartMs + ); + } + + return [$rbody, $rcode, $rheaders, $myApiKey]; + } + /** * @param resource $resource * diff --git a/lib/HttpClient/CurlClient.php b/lib/HttpClient/CurlClient.php index bae0428972..e239a192bc 100644 --- a/lib/HttpClient/CurlClient.php +++ b/lib/HttpClient/CurlClient.php @@ -193,7 +193,7 @@ public function getConnectTimeout() // END OF USER DEFINED TIMEOUTS - public function request($method, $absUrl, $headers, $params, $hasFile) + private function constructRequest($method, $absUrl, $headers, $params, $hasFile) { $method = \strtolower($method); @@ -275,16 +275,193 @@ public function request($method, $absUrl, $headers, $params, $hasFile) // potential issues (cf. https://github.com/stripe/stripe-php/issues/1045). $opts[\CURLOPT_IPRESOLVE] = \CURL_IPRESOLVE_V4; + return [$opts, $absUrl]; + } + + public function request($method, $absUrl, $headers, $params, $hasFile) + { + list($opts, $absUrl) = $this->constructRequest($method, $absUrl, $headers, $params, $hasFile); + list($rbody, $rcode, $rheaders) = $this->executeRequestWithRetries($opts, $absUrl); return [$rbody, $rcode, $rheaders]; } + public function requestStreaming($method, $absUrl, $headers, $params, $hasFile, $readBodyChunk) + { + list($opts, $absUrl) = $this->constructRequest($method, $absUrl, $headers, $params, $hasFile); + + $opts[\CURLOPT_RETURNTRANSFER] = false; + list($rbody, $rcode, $rheaders) = $this->executeStreamingRequestWithRetries($opts, $absUrl, $readBodyChunk); + + return [$rbody, $rcode, $rheaders]; + } + + /** + * Curl permits sending \CURLOPT_HEADERFUNCTION, which is called with lines + * from the header and \CURLOPT_WRITEFUNCTION, which is called with bytes + * from the body. You usually want to handle the body differently depending + * on what was in the header. + * + * This function makes it easier to specify different callbacks depending + * on the contents of the heeder. After the header has been completely read + * and the body begins to stream, it will call $determineWriteCallback with + * the array of headers. $determineWriteCallback should, based on the + * headers it receives, return a "writeCallback" that describes what to do + * with the incoming HTTP response body. + * + * @param array $opts + * @param callable $determineWriteCallback + * + * @return array + */ + private function useHeadersToDetermineWriteCallback($opts, $determineWriteCallback) + { + $rheaders = new Util\CaseInsensitiveArray(); + $headerCallback = function ($curl, $header_line) use (&$rheaders) { + // Ignore the HTTP request line (HTTP/1.1 200 OK) + if (false === \strpos($header_line, ':')) { + return \strlen($header_line); + } + list($key, $value) = \explode(':', \trim($header_line), 2); + $rheaders[\trim($key)] = \trim($value); + + return \strlen($header_line); + }; + + $writeCallback = null; + $writeCallbackWrapper = function ($curl, $data) use (&$writeCallback, &$rheaders, &$determineWriteCallback) { + if (null === $writeCallback) { + $writeCallback = \call_user_func_array($determineWriteCallback, [$rheaders]); + } + + return \call_user_func_array($writeCallback, [$curl, $data]); + }; + + return [$headerCallback, $writeCallbackWrapper]; + } + + /** + * Like `executeRequestWithRetries` except: + * 1. Does not buffer the body of a successful (status code < 300) + * response into memory -- instead, calls the caller-provided + * $readBodyChunk with each chunk of incoming data. + * 2. Does not retry if a network error occurs while streaming the + * body of a successful response. + * + * @param array $opts cURL options + * @param string $absUrl + * @param callable @readBodyChunk + * @param mixed $readBodyChunk + * + * @return array + */ + public function executeStreamingRequestWithRetries($opts, $absUrl, $readBodyChunk) + { + /** @var bool */ + $shouldRetry = false; + /** @var int */ + $numRetries = 0; + + // Did the last request return statusCode < 300? + /** @var null|bool */ + $succeeded = null; + + // Will contain the bytes of the body of the last request + // if it was not successful and should not be retries + /** @var null|string */ + $rbody = null; + + // Status code of the last request + /** @var null|bool */ + $rcode = null; + + // Array of headers from the last request + /** @var null|array */ + $lastRHeaders = null; + + $determineWriteCallback = function ($rheaders) use ( + &$readBodyChunk, + &$shouldRetry, + &$succeeded, + &$rbody, + &$numRetries, + &$rcode, + &$lastRHeaders + ) { + $lastRHeaders = $rheaders; + $errno = \curl_errno($this->curlHandle); + $rcode = \curl_getinfo($this->curlHandle, \CURLINFO_HTTP_CODE); + + // Send the bytes from the body of a successful request to the caller-provided $readBodyChunk. + if ($rcode < 300) { + $rbody = null; + $succeeded = true; + + return function ($curl, $data) use (&$readBodyChunk) { + // Don't expose the $curl handle to the user, and don't require them to + // return the length of $data. + \call_user_func_array($readBodyChunk, [$data]); + + return \strlen($data); + }; + } + $succeeded = false; + + $shouldRetry = $this->shouldRetry($errno, $rcode, $rheaders, $numRetries); + + // Discard the body from an unsuccessful request that should be retried. + if ($shouldRetry) { + return function ($curl, $data) { + return \strlen($data); + }; + } else { + // Otherwise, buffer the body into $rbody. It will need to be parsed to determine + // which exception to throw to the user. + $rbody = ''; + + return function ($curl, $data) use (&$rbody) { + $rbody .= $data; + + return \strlen($data); + }; + } + }; + + while (true) { + list($headerCallback, $writeCallback) = $this->useHeadersToDetermineWriteCallback($opts, $determineWriteCallback); + $opts[\CURLOPT_HEADERFUNCTION] = $headerCallback; + $opts[\CURLOPT_WRITEFUNCTION] = $writeCallback; + + $shouldRetry = false; + $rbody = null; + $succeeded = null; + $this->resetCurlHandle(); + \curl_setopt_array($this->curlHandle, $opts); + $result = \curl_exec($this->curlHandle); + if ($shouldRetry) { + ++$numRetries; + $sleepSeconds = $this->sleepTime($numRetries, $lastRHeaders); + \usleep((int) ($sleepSeconds * 1000000)); + } else { + break; + } + } + + $errno = \curl_errno($this->curlHandle); + if (0 !== $errno) { + $message = \curl_error($this->curlHandle); + $this->handleCurlError($absUrl, $errno, $message, $numRetries); + } + + return [$rbody, $rcode, $lastRHeaders]; + } + /** * @param array $opts cURL options * @param string $absUrl */ - private function executeRequestWithRetries($opts, $absUrl) + public function executeRequestWithRetries($opts, $absUrl) { $numRetries = 0; diff --git a/tests/Stripe/ApiResourceTest.php b/tests/Stripe/ApiResourceTest.php new file mode 100644 index 0000000000..598ce5eb04 --- /dev/null +++ b/tests/Stripe/ApiResourceTest.php @@ -0,0 +1,43 @@ +instanceUrl(); + list($opts) = $this->_requestStreaming('get', $url, $readBodyChunk, $params, $opts); + + return $this; + } +} + +/** + * @internal + * @coversNothing + */ +final class ApiResourceTest extends \PHPUnit\Framework\TestCase +{ + use TestHelper; + + public function testRequestStreaming() + { + $foo = FooResource::retrieve('foo'); + $body = ''; + $readBodyChunk = function ($chunk) use (&$body) { + $body .= $chunk; + }; + + $foo->pdf($readBodyChunk); + + $parsed = \json_decode($body, true); + static::assertSame($parsed['object'], 'coupon'); + } +} diff --git a/tests/Stripe/HttpClient/CurlClientTest.php b/tests/Stripe/HttpClient/CurlClientTest.php index 4c74efd994..e784522313 100644 --- a/tests/Stripe/HttpClient/CurlClientTest.php +++ b/tests/Stripe/HttpClient/CurlClientTest.php @@ -9,6 +9,7 @@ final class CurlClientTest extends \PHPUnit\Framework\TestCase { use \Stripe\TestHelper; + use \Stripe\TestServer; /** @var \ReflectionProperty */ private $initialNetworkRetryDelayProperty; @@ -352,4 +353,120 @@ public function testSetRequestStatusCallback() \Stripe\ApiRequestor::setHttpClient(null); } } + + /** + * @after + */ + public function tearDownTestServer() + { + $this->stopTestServer(); + } + + // This is a pretty flaky/uncertain way to try and get an + // http server to deliver the body in separate "chunks". + // + // It seems to work but feel free to just skip or delete + // if this flakes. + public function testExecuteRequestWithRetriesCallsWriteFunctionWithChunks() + { + $chunk1 = 'First, bytes'; + $chunk2 = 'more bytes'; + $chunk3 = 'final bytes'; + $serverCode = <<startTestServer($serverCode); + $opts = []; + $opts[\CURLOPT_HTTPGET] = 1; + $opts[\CURLOPT_URL] = $absUrl; + $opts[\CURLOPT_HTTPHEADER] = ['Authorization: Basic c2tfdGVzdF94eXo6']; + $curl = new CurlClient(); + $receivedChunks = []; + $curl->executeStreamingRequestWithRetries($opts, $absUrl, function ($chunk) use (&$receivedChunks) { + $receivedChunks[] = $chunk; + }); + static::assertSame([$chunk1, $chunk2, $chunk3], $receivedChunks); + $this->stopTestServer(); + } + + public function testExecuteStreamingRequestWithRetriesRetries() + { + $serverCode = <<<'EOF' + +{} +EOF; + + $originalNRetries = \Stripe\Stripe::getMaxNetworkRetries(); + \Stripe\Stripe::setMaxNetworkRetries(3); + $absUrl = $this->startTestServer($serverCode); + $opts = []; + $opts[\CURLOPT_HTTPGET] = 1; + $opts[\CURLOPT_URL] = $absUrl; + $opts[\CURLOPT_HTTPHEADER] = ['Authorization: Basic c2tfdGVzdF94eXo6']; + $curl = new CurlClient(); + $receivedChunks = []; + $result = $curl->executeStreamingRequestWithRetries($opts, $absUrl, function ($chunk) use (&$receivedChunks) { + $receivedChunks[] = $chunk; + }); + $nRequests = $this->stopTestServer(); + + static::assertSame([], $receivedChunks); + \Stripe\Stripe::setMaxNetworkRetries($originalNRetries); + + static::assertSame(4, $nRequests); + } + + public function testExecuteStreamingRequestWithRetriesHandlesDisconnect() + { + $serverCode = <<<'EOF' +startTestServer($serverCode); + $opts = []; + $opts[\CURLOPT_HTTPGET] = 1; + $opts[\CURLOPT_URL] = $absUrl; + $opts[\CURLOPT_HTTPHEADER] = ['Authorization: Basic c2tfdGVzdF94eXo6']; + $curl = new CurlClient(); + $receivedChunks = []; + $exception = null; + + try { + $result = $curl->executeStreamingRequestWithRetries($opts, $absUrl, function ($chunk) use (&$receivedChunks) { + $receivedChunks[] = $chunk; + }); + } catch (\Exception $e) { + $exception = $e; + } + + $nRequests = $this->stopTestServer(); + static::assertSame('Stripe\Exception\ApiConnectionException', \get_class($exception)); + + static::assertSame(['12345'], $receivedChunks); + \Stripe\Stripe::setMaxNetworkRetries($originalNRetries); + + static::assertSame(1, $nRequests); + } } diff --git a/tests/TestServer.php b/tests/TestServer.php new file mode 100644 index 0000000000..146a4e2ab1 --- /dev/null +++ b/tests/TestServer.php @@ -0,0 +1,126 @@ +serverProc) { + throw new \Exception('Error: test server was already running'); + } + + $dir = $this->makeTemporaryServerDirectory($code); + + $command = 'php -S localhost:' . $this->serverPort . ' -t ' . $dir; + $pipes = []; + $this->serverProc = \proc_open( + $command, + [2 => ['pipe', 'w']], + $pipes + ); + + $pid = \proc_get_status($this->serverProc)['pid']; + + // echo "Started test server on pid $pid\n"; + + $this->serverStderr = $pipes[2]; + + if (!\is_resource($this->serverProc)) { + throw new \Exception("Error starting test server on pid {$pid}, command failed: {$command}"); + } + + while ($r = \fgets($this->serverStderr)) { + if (\str_contains($r, 'started')) { + break; + } + if (\str_contains($r, 'Failed')) { + throw new \Exception("Error starting test server on pid {$pid}: " . $r . ' Was the port ' . $this->serverPort . ' already taken?'); + } + } + + return 'localhost:' . $this->serverPort; + } + + /** + * Stops the test server and returns the number of requests + * as indicated the logs in its stderr. + */ + public function stopTestServer() + { + $n = 0; + if (!$this->serverProc) { + return; + } + + \stream_set_blocking($this->serverStderr, false); + $lines = \explode(\PHP_EOL, \stream_get_contents($this->serverStderr)); + foreach ($lines as $line) { + if (\str_contains($line, 'Accepted')) { + ++$n; + } + } + while (true) { + $status = \proc_get_status($this->serverProc); + if (!$status['running']) { + break; + } + $pid = $status['pid']; + \exec("pkill -P {$pid}"); + \usleep(100000); + } + // echo "Terminated test server on pid $pid\n"; + \fclose($this->serverStderr); + \proc_close($this->serverProc); + $this->serverProc = null; + $this->serverStderr = null; + + return $n; + } +} diff --git a/tests/bootstrap.php b/tests/bootstrap.php index c2e449dde9..5ab2c37d8d 100644 --- a/tests/bootstrap.php +++ b/tests/bootstrap.php @@ -68,3 +68,4 @@ function checkStripeMockReachable() } require_once __DIR__ . '/TestHelper.php'; +require_once __DIR__ . '/TestServer.php';