From 555b06b00e9091de5340af7a2ca695743e263782 Mon Sep 17 00:00:00 2001 From: Dmitry K Date: Sun, 30 Aug 2020 07:44:35 +0300 Subject: [PATCH] Connection Pool Update * validateConnectionsInterval renamed to validationInterval * Added maxLifeTime parameter * Added resetConnections parameter * Added getStats method * Improved connection create security by adding lock --- README.md | 78 +++++-- src/Pool.php | 470 ++++++++++++++++++++++++++++++++------ src/PoolInterface.php | 46 ++++ src/PoolStats.php | 36 +++ tests/PoolTest.php | 276 +++++++++++++++++++++- tests/Stub/Connection.php | 2 +- tests/Stub/Pool.php | 12 +- 7 files changed, 824 insertions(+), 96 deletions(-) create mode 100644 src/PoolStats.php diff --git a/README.md b/README.md index ca2918a..0c972e4 100644 --- a/README.md +++ b/README.md @@ -7,34 +7,52 @@ Connection Pool based on Swoole with balancer feature implementation * float `maxWaitTime` (default: 5.0) - The maximum number of seconds that the pool will wait (when there are no available connections) for a connection to be returned before throwing an exception. Zero value (0.0) will disable wait timeout. -* float `validateConnectionsInterval` (default: 5.0) - The number of seconds to sleep between runs of the idle connection validation/cleaner timer. +* float `validationInterval` (default: 5.0) - The number of seconds to sleep between runs of the idle connection validation/cleaner timer. This value should not be set under 1 second. Zero value will disable validate connections timer. * int `maxIdleTime` (default: 60) - The minimum amount of time (seconds) a connection may sit idle in the pool before it is eligible for closing. -Zero (0.0) value will disable idle connections freeing. +Zero value will disable idle connections freeing. +* int `maxLifeTime` (default: 0) - The maximum amount of time (seconds) a connection may exist in the pool before it is eligible for closing. +Zero value will disable expired connections freeing. * bool `testOnBorrow` (default: true) - The indication of whether objects will be validated before being borrowed from the pool. If the object fails to validate, it will be dropped from the pool, and we will attempt to borrow another. * bool `testOnReturn` (default: true) - The indication of whether objects will be validated before being returned to the pool +* bool `resetConnections` (default: false) - Reset the connection to its initial state when it is borrowed from the pool -These parameters could be set by public setter methods of Pool instance. All of these parameters can be changed at runtime. ## API -* `init` (visibility: public) - Initialize (start) connection pool -* `close` (visibility: public) - Close (stop) connection pool +* `init` - Initialize (start) connection pool +* `close` - Close (stop) connection pool +* `getStats` - Get connection pool statistics * `pop` (default visibility: `protected`) - Borrow connection from pool. May throw `BorrowTimeoutException` when waiting of free connection is timed out (read `maxWaitTime` parameter doc). May throw `PoolIsClosedException` when connection pool is closed. * `push` (default visibility: `protected`) - Return connection to pool -* `getIdleCount` (visibility: public) - Get idle connections count -* `getTotalCount` (visibility: public) - Get count of all connections allocated by pool -* `setMaxActive` (visibility: public) - read `maxActive` parameter doc -* `setMinActive` (visibility: public) - read `minActive` parameter doc -* `setMaxWaitTime` (visibility: public) - read `maxWaitTime` parameter doc -* `setMaxIdleTime` (visibility: public) - read `maxIdleTime` parameter doc -* `setValidateConnectionsInterval` (visibility: public) - read `validateConnectionsInterval` parameter doc -* `setTestOnBorrow` (visibility: public) - read `testOnBorrow` parameter doc -* `setTestOnReturn` (visibility: public) - read `testOnReturn` parameter doc + +### Getters +* `getIdleCount` - Get idle connections count +* `getTotalCount` - Get count of connections created by the pool +* `getMaxActive` - read `maxActive` parameter doc +* `getMinActive` - read `minActive` parameter doc +* `getMaxWaitTime` - read `maxWaitTime` parameter doc +* `getMaxIdleTime` - read `maxIdleTime` parameter doc +* `getMaxLifeTime` - read `maxLifeTime` parameter doc +* `getValidationInterval` - read `validationInterval` parameter doc +* `getTestOnBorrow` - read `testOnBorrow` parameter doc +* `getTestOnReturn` - read `testOnReturn` parameter doc +* `getResetConnections` - read `resetConnections` parameter doc + +### Setters +* `setMaxActive` - read `maxActive` parameter doc +* `setMinActive` - read `minActive` parameter doc +* `setMaxWaitTime` - read `maxWaitTime` parameter doc +* `setMaxIdleTime` - read `maxIdleTime` parameter doc +* `setMaxLifeTime` - read `maxLifeTime` parameter doc +* `setValidationInterval` - read `validationInterval` parameter doc +* `setTestOnBorrow` - read `testOnBorrow` parameter doc +* `setTestOnReturn` - read `testOnReturn` parameter doc +* `setResetConnections` - read `resetConnections` parameter doc ## Complete example (HTTP Connection Pool) ```php @@ -153,6 +171,10 @@ class HttpConnection implements HttpConnectionInterface return $this->lastUsedAt; } + public function resetSession(): void + { + } + public static function connect(HttpConnectionConfig $config): self { $client = new Client($config->getHost(), $config->getPort(), $config->getSsl()); @@ -291,7 +313,18 @@ run(static function () { printf("Task: %s returned %d status code\n", $result->task, $result->success->statusCode); } - printf("\nResults fetched in %.4f secs\n", round($end - $start, 4)); + printf("\nResults fetched in %.4f secs\n\n", round($end - $start, 4)); + + $stats = $httpPool->getStats(); + + printf("Connections limit = %d\n", $stats->maxActive); + printf("Connections count = %d\n", $stats->totalCount); + printf("Idle connections = %d\n", $stats->idle); + printf("Busy connections = %d\n", $stats->inUse); + + printf("Total wait time for an available connections = %f secs\n", $stats->waitDuration); + printf("Total wait count = %d\n", $stats->waitCount); + printf("Average wait time per one connection = %f secs\n", $stats->waitDuration / $stats->waitCount); $httpPool->close(); }); @@ -300,15 +333,22 @@ run(static function () { Output is: ``` -php some.php -Task: /help returned 404 status code -Task: / returned 301 status code Task: /test returned 404 status code +Task: / returned 301 status code Task: /search returned 301 status code +Task: /help returned 404 status code Task: /query returned 404 status code Task: /images returned 301 status code Task: /mail returned 301 status code Task: /videos returned 404 status code -Results fetched in 0.1697 secs +Results fetched in 0.1483 secs + +Connections limit = 4 +Connections count = 4 +Idle connections = 4 +Busy connections = 0 +Total wait time for an available connections = 0.380008 secs +Total wait count = 4 +Average wait time per one connection = 0.095002 secs ``` diff --git a/src/Pool.php b/src/Pool.php index 42ec455..9a332d8 100644 --- a/src/Pool.php +++ b/src/Pool.php @@ -9,7 +9,7 @@ use MakiseCo\Connection\ConnectionConfigInterface; use MakiseCo\Connection\ConnectionInterface; use MakiseCo\Connection\ConnectorInterface; -use MakiseCo\EvPrimitives\Deferred; +use MakiseCo\EvPrimitives\Lock; use MakiseCo\EvPrimitives\Timer; use MakiseCo\Pool\Exception\BorrowTimeoutException; use MakiseCo\Pool\Exception\PoolIsClosedException; @@ -17,6 +17,7 @@ use Swoole\Coroutine; use Throwable; +use function microtime; use function time; use const SWOOLE_CHANNEL_OK; @@ -24,6 +25,41 @@ abstract class Pool implements PoolInterface { + /** + * Connection accepted by pool + */ + public const PUSH_OK = 0; + + /** + * Connection discarded by pool, because pool is not initialized + */ + public const PUSH_POOL_NOT_INITIALIZED = 1; + + /** + * Connection discarded by pool, because passed connection is not part of pool + */ + public const PUSH_CONN_NOT_PART_OF_POOL = 2; + + /** + * Connection discarded by pool, because maximum connection limit has reached + */ + public const PUSH_LIMIT_REACHED = 3; + + /** + * Connection discarded by pool, because connection is dead + */ + public const PUSH_DEAD_CONNECTION = 4; + + /** + * Connection discarded by pool, because pool was closed + */ + public const PUSH_POOL_CLOSED = 5; + + /** + * Connection discarded by pool, because connection max life time has reached + */ + public const PUSH_MAX_LIFE_TIME = 6; + private ConnectionConfigInterface $connectionConfig; private ConnectorInterface $connector; @@ -57,13 +93,18 @@ abstract class Pool implements PoolInterface * This value should not be set under 1 second. * It dictates how often we check for idle, abandoned connections, and how often we validate idle connections */ - private float $validateConnectionsInterval = 5.0; + private float $validationInterval = 5.0; /** * The minimum amount of time (seconds) a connection may sit idle in the pool before it is eligible for closing */ private int $maxIdleTime = 60; + /** + * The maximum amount of time (seconds) a connection may exist in the pool before it is eligible for closing + */ + private int $maxLifeTime = 0; + /** * The indication of whether objects will be validated before being borrowed from the pool. * If the object fails to validate, it will be dropped from the pool, and we will attempt to borrow another. @@ -75,10 +116,15 @@ abstract class Pool implements PoolInterface */ private bool $testOnReturn = true; + /** + * Reset the connection to its initial state when it is borrowed from the pool + */ + private bool $resetConnections = false; + /** * Created connections storage * - * @var SplObjectStorage|ConnectionInterface[] + * @var SplObjectStorage|ConnectionInterface[] */ private SplObjectStorage $connections; @@ -92,12 +138,41 @@ abstract class Pool implements PoolInterface /** * Preventing simultaneous connection creation */ - private ?Deferred $deferred = null; + private Lock $lock; + + private Timer $validationTimer; - private Timer $validateConnectionsTimer; + /** + * Total time waited for available connections + */ + private float $waitDuration = 0.0; + + /** + * Total number of connections waited for + */ + private int $waitCount = 0; + + /** + * Total number of connections closed due to idle time + */ + private int $maxIdleTimeClosedCount = 0; + + /** + * Total number of connections closed due to max connection lifetime limit + */ + private int $maxLifeTimeClosedCount = 0; abstract protected function createDefaultConnector(): ConnectorInterface; + /** + * Reset connection to its initial state + * + * @param ConnectionInterface $connection + */ + protected function resetConnection(ConnectionInterface $connection): void + { + } + public function __construct( ConnectionConfigInterface $connConfig, ?ConnectorInterface $connector = null @@ -107,8 +182,8 @@ public function __construct( $this->connections = new SplObjectStorage(); - $this->validateConnectionsTimer = new Timer( - (int)($this->validateConnectionsInterval * 1000), + $this->validationTimer = new Timer( + (int)($this->validationInterval * 1000), Closure::fromCallable([$this, 'validateConnections']), false ); @@ -128,29 +203,14 @@ public function init(): void $this->isInitialized = true; $this->idle = new Coroutine\Channel($this->maxActive); + $this->lock = new Lock(); // create initial connections if ($this->minActive > 0) { - Coroutine::create(function () { - try { - while ($this->connections->count() < $this->maxActive) { - $connection = $this->connector->connect($this->connectionConfig); - - $this->connections->attach($connection); - - if (!$this->idle->push($connection, 0.001)) { - $this->connections->detach($connection); - - break; - } - } - } catch (Throwable $e) { - // ignore create connection errors - } - }); + Coroutine::create(Closure::fromCallable([$this, 'fillPool'])); } - $this->startValidateConnectionsTimer(); + $this->startValidationTimer(); } public function close(): void @@ -161,7 +221,7 @@ public function close(): void $this->isInitialized = false; - $this->stopValidateConnectionsTimer(); + $this->stopValidationTimer(); // forget all connection instances foreach ($this->connections as $connection) { @@ -173,6 +233,7 @@ public function close(): void while (!$this->idle->isEmpty()) { $connection = $this->idle->pop(); if (false === $connection) { + // connection pool is closed break; } @@ -216,12 +277,41 @@ public function setMaxActive(int $maxActive): void throw new InvalidArgumentException('maxActive should be at least 1'); } + $oldMaxActive = $this->maxActive; $this->maxActive = $maxActive; // minActive cannot be greater than maxActive if ($this->minActive > $this->maxActive) { $this->minActive = $this->maxActive; } + + // resize connection pool + if ($this->isInitialized && $oldMaxActive !== $maxActive) { + $oldIdle = $this->idle; + $this->idle = new Coroutine\Channel($maxActive); + + while (!$oldIdle->isEmpty()) { + /** @var ConnectionInterface|false $idleConnection */ + $idleConnection = $oldIdle->pop(); + if (false === $idleConnection) { + // connection pool is closed + break; + } + + if ($this->idle->isFull()) { + $this->removeConnection($idleConnection); + + continue; + } + + if (!$this->pushConnectionToIdle($idleConnection)) { + // connection pool is closed + continue; + } + } + + $oldIdle->close(); + } } /** @@ -279,6 +369,24 @@ public function setMaxIdleTime(int $maxIdleTime): void $this->maxIdleTime = $maxIdleTime; } + /** + * The maximum amount of time a connection may exist in the pool before it is eligible for closing. + * Zero value is disabling max life time checking + * + * @param int $maxLifeTime seconds + * + * @throws InvalidArgumentException when $maxLifeTime is less than 0 + */ + public function setMaxLifeTime(int $maxLifeTime): void + { + // maxIdleTime cannot be negative + if ($maxLifeTime < 0) { + throw new InvalidArgumentException('maxLifeTime should be a positive value'); + } + + $this->maxLifeTime = $maxLifeTime; + } + /** * Set the number of seconds to sleep between runs of the idle connection validation/cleaner timer. * This value should not be set under 1 second. @@ -286,25 +394,25 @@ public function setMaxIdleTime(int $maxIdleTime): void * * Zero value will disable connections checking. * - * @param float $validateConnectionsInterval seconds with milliseconds precision + * @param float $validationInterval seconds with milliseconds precision * * @throws InvalidArgumentException when $idleCheckInterval is less than 0 */ - public function setValidateConnectionsInterval(float $validateConnectionsInterval): void + public function setValidationInterval(float $validationInterval): void { - if ($validateConnectionsInterval < 0) { + if ($validationInterval < 0) { throw new InvalidArgumentException('validateConnectionsInterval should be a positive value'); } - $this->validateConnectionsInterval = $validateConnectionsInterval; + $this->validationInterval = $validationInterval; - if ($validateConnectionsInterval === 0.0) { + if ($validationInterval === 0.0) { // stop timer on zero idle check interval - if ($this->validateConnectionsTimer->isStarted()) { - $this->validateConnectionsTimer->stop(); + if ($this->validationTimer->isStarted()) { + $this->validationTimer->stop(); } } else { - $this->validateConnectionsTimer->setInterval((int)($validateConnectionsInterval * 1000)); + $this->validationTimer->setInterval((int)($validationInterval * 1000)); } } @@ -329,6 +437,16 @@ public function setTestOnReturn(bool $testOnReturn): void $this->testOnReturn = $testOnReturn; } + /** + * Reset the connection to its initial state when it is borrowed from the pool + * + * @param bool $resetConnections + */ + public function setResetConnections(bool $resetConnections): void + { + $this->resetConnections = $resetConnections; + } + public function getIdleCount(): int { if (!$this->isInitialized) { @@ -343,6 +461,69 @@ public function getTotalCount(): int return $this->connections->count(); } + public function getStats(): PoolStats + { + $stats = new PoolStats(); + + $stats->maxActive = $this->getMaxActive(); + + $stats->totalCount = $this->getTotalCount(); + $stats->idle = $this->getIdleCount(); + $stats->inUse = $stats->totalCount - $stats->idle; + + $stats->waitCount = $this->waitCount; + $stats->waitDuration = $this->waitDuration; + $stats->maxIdleTimeClosed = $this->maxIdleTimeClosedCount; + $stats->maxLifeTimeClosed = $this->maxLifeTimeClosedCount; + + return $stats; + } + + public function getMaxActive(): int + { + return $this->maxActive; + } + + public function getMinActive(): int + { + return $this->minActive; + } + + public function getMaxWaitTime(): float + { + return $this->maxWaitTime; + } + + public function getValidationInterval(): float + { + return $this->validationInterval; + } + + public function getMaxIdleTime(): int + { + return $this->maxIdleTime; + } + + public function getMaxLifeTime(): int + { + return $this->maxLifeTime; + } + + public function getTestOnBorrow(): bool + { + return $this->testOnBorrow; + } + + public function getTestOnReturn(): bool + { + return $this->testOnReturn; + } + + public function getResetConnections(): bool + { + return $this->resetConnections; + } + /** * @throws PoolIsClosedException If the pool has been closed. * @throws BorrowTimeoutException when connection pop timeout reached @@ -354,28 +535,13 @@ protected function pop(): ConnectionInterface } // Prevent simultaneous connection creation. - while ($this->deferred !== null) { - try { - $this->deferred->wait(); - } catch (Throwable $e) { - } + while ($this->lock->isLocked()) { + $this->lock->wait(); } // Max connection count has not been reached, so open another connection. if ($this->idle->isEmpty() && $this->connections->count() < $this->maxActive) { - $this->deferred = new Deferred(); - - try { - $connection = $this->connector->connect($this->connectionConfig); - $this->connections->attach($connection); - } finally { - $deferred = $this->deferred; - $this->deferred = null; - - $deferred->resolve(null); - } - - return $connection; + return $this->createConnection(); } return $this->popConnectionFromIdle(); @@ -389,17 +555,61 @@ protected function pop(): ConnectionInterface */ private function popConnectionFromIdle(): ConnectionInterface { - /** @var ConnectionInterface $connection */ + $waitStart = null; + + if ($this->idle->isEmpty()) { + $waitStart = microtime(true); + + // integer overflow + if ($this->waitCount === PHP_INT_MAX) { + $this->waitCount = 0; + $this->waitDuration = 0.0; + } + + $this->waitCount++; + } + + /** @var ConnectionInterface|false $connection */ $connection = $this->idle->pop($this->maxWaitTime); + if ($waitStart !== null) { + $waitTime = microtime(true) - $waitStart; + $this->waitDuration += $waitTime; + + // float overflow + if ($this->waitDuration === PHP_FLOAT_MAX) { + $this->waitDuration = $waitTime; + $this->waitCount = 1; + } + } + if ($this->idle->errCode === SWOOLE_CHANNEL_OK) { + // connection pool was resized + if ($connection === false) { + return $this->pop(); + } + // remove dead connection if ($this->testOnBorrow && !$connection->isAlive()) { $this->connections->detach($connection); // create new connection instead of dead one - $connection = $this->connector->connect($this->connectionConfig); - $this->connections->attach($connection); + $connection = $this->createConnection(); + + return $connection; + } + + if ($this->isConnectionExpired($connection, -1)) { + $this->removeExpiredConnection($connection); + + // create new connection instead of expired one + $connection = $this->createConnection(); + + return $connection; + } + + if ($this->resetConnections) { + $this->resetConnection($connection); } return $connection; @@ -412,33 +622,81 @@ private function popConnectionFromIdle(): ConnectionInterface throw new PoolIsClosedException('Pool closed before an active connection could be obtained'); } - protected function push(ConnectionInterface $connection): void + /** + * @param ConnectionInterface $connection + * + * @return int push status (read PUSH_* constant docs) + */ + protected function push(ConnectionInterface $connection): int { // discard connection when pool is not initialized if (!$this->isInitialized) { $this->removeConnection($connection); - return; + return self::PUSH_POOL_NOT_INITIALIZED; } - // discard connection when pool reached connections limit - if ($this->connections->count() > $this->maxActive) { + // discard connection that is not part of this pool + if (!$this->connections->contains($connection)) { $this->removeConnection($connection); - return; + return self::PUSH_CONN_NOT_PART_OF_POOL; + } + + // discard connection when pool has reached the maximum connection limit + if ($this->idle->isFull()) { + $this->removeConnection($connection); + + return self::PUSH_LIMIT_REACHED; } // discard dead connections if ($this->testOnReturn && !$connection->isAlive()) { $this->removeConnection($connection); - return; + return self::PUSH_DEAD_CONNECTION; } - $this->idle->push($connection); + // discard connection due to max life time + if ($this->isConnectionExpired($connection, -1)) { + $this->removeExpiredConnection($connection); + + return self::PUSH_MAX_LIFE_TIME; + } + + if (!$this->pushConnectionToIdle($connection)) { + return self::PUSH_POOL_CLOSED; + } + + return self::PUSH_OK; } - protected function removeConnection(ConnectionInterface $connection): void + private function pushConnectionToIdle(ConnectionInterface $connection): bool + { + if (!$this->idle->push($connection)) { + $this->removeConnection($connection); + + return false; + } + + return true; + } + + private function createConnection(): ConnectionInterface + { + $this->lock->lock(); + + try { + $connection = $this->connector->connect($this->connectionConfig); + $this->connections->attach($connection, time()); + } finally { + $this->lock->unlock(); + } + + return $connection; + } + + private function removeConnection(ConnectionInterface $connection): void { $this->connections->detach($connection); @@ -456,7 +714,7 @@ static function (ConnectionInterface $connection): void { } } - protected function validateConnections(): void + private function validateConnections(): void { $now = time(); @@ -465,6 +723,7 @@ protected function validateConnections(): void while (!$this->idle->isEmpty()) { $connection = $this->idle->pop(); + // connection pool is closed if (false === $connection) { return; } @@ -483,26 +742,93 @@ protected function validateConnections(): void && $connectionsCount > $this->minActive && $connection->getLastUsedAt() + $this->maxIdleTime <= $now) { $connectionsCount--; - $this->removeConnection($connection); + $this->removeMaxIdleTimeConnection($connection); continue; } - $this->idle->push($connection); + // remove expired connections + if ($this->isConnectionExpired($connection, $now)) { + $connectionsCount--; + $this->removeExpiredConnection($connection); + + continue; + } + + $this->pushConnectionToIdle($connection); } + + $this->fillPool(); } - protected function startValidateConnectionsTimer(): void + private function fillPool(): void { - if ($this->validateConnectionsInterval > 0) { - $this->validateConnectionsTimer->start(); + while ($this->connections->count() < $this->minActive) { + // connection pool is closed or connection is currently being created by another coroutine + if (!$this->isInitialized || $this->lock->isLocked()) { + break; + } + + try { + $connection = $this->createConnection(); + } catch (Throwable $e) { + // stop on connection errors + return; + } + + if (!$this->pushConnectionToIdle($connection)) { + // connection pool is closed + return; + } } } - protected function stopValidateConnectionsTimer(): void + private function startValidationTimer(): void { - if ($this->validateConnectionsTimer->isStarted()) { - $this->validateConnectionsTimer->stop(); + if ($this->validationInterval > 0) { + $this->validationTimer->start(); } } + + private function stopValidationTimer(): void + { + if ($this->validationTimer->isStarted()) { + $this->validationTimer->stop(); + } + } + + private function removeMaxIdleTimeConnection(ConnectionInterface $connection): void + { + if ($this->maxIdleTimeClosedCount === PHP_INT_MAX) { + $this->maxIdleTimeClosedCount = 0; + } + + $this->maxIdleTimeClosedCount++; + + $this->removeConnection($connection); + } + + private function removeExpiredConnection(ConnectionInterface $connection): void + { + if ($this->maxLifeTimeClosedCount === PHP_INT_MAX) { + $this->maxLifeTimeClosedCount = 0; + } + + $this->maxLifeTimeClosedCount++; + + $this->removeConnection($connection); + } + + private function isConnectionExpired(ConnectionInterface $connection, int $time): bool + { + if ($this->maxLifeTime <= 0) { + return false; + } + + if ($time === -1) { + $time = time(); + } + + return $this->connections[$connection] + $this->maxLifeTime <= $time; + } } diff --git a/src/PoolInterface.php b/src/PoolInterface.php index 08c103b..215f2d0 100644 --- a/src/PoolInterface.php +++ b/src/PoolInterface.php @@ -8,4 +8,50 @@ interface PoolInterface extends TransientResource { + /** + * Initialize (start) connection pool + */ + public function init(): void; + + /** + * Get connection pool statistics + * + * @return PoolStats + */ + public function getStats(): PoolStats; + + /** + * Get count of connections created by the pool + * + * @return int + */ + public function getTotalCount(): int; + + /** + * Get idle connections count + * + * @return int + */ + public function getIdleCount(): int; + + /** + * Get maximum connection count limit + * + * @return int + */ + public function getMaxActive(): int; + + /** + * Get minimum number of established connections that should be kept in the pool at all times + * + * @return int + */ + public function getMinActive(): int; + + /** + * Get minimum amount of time (seconds) a connection may sit idle in the pool before it is eligible for closing + * + * @return int + */ + public function getMaxIdleTime(): int; } diff --git a/src/PoolStats.php b/src/PoolStats.php new file mode 100644 index 0000000..ae66fcb --- /dev/null +++ b/src/PoolStats.php @@ -0,0 +1,36 @@ +pool->setMaxActive(4); $this->pool->setMinActive(2); $this->pool->setMaxWaitTime(0.001); - $this->pool->setValidateConnectionsInterval(0.005); + $this->pool->setValidationInterval(0.005); $this->pool->setMaxIdleTime(60); $this->pool->init(); @@ -98,6 +98,79 @@ public function testIdleConnectionsRemovedAfterTimeout(): void self::assertSame(2, $this->pool->getIdleCount()); self::assertSame(2, $this->pool->getTotalCount()); + + $stats = $this->pool->getStats(); + + self::assertSame(2, $stats->maxIdleTimeClosed); + } + + public function testConnMaxLifeTime(): void + { + $this->pool->setMaxActive(4); + $this->pool->setMinActive(0); + $this->pool->setMaxWaitTime(0.001); + $this->pool->setValidationInterval(0.005); + $this->pool->setMaxIdleTime(600); + $this->pool->setMaxLifeTime(2); + $this->pool->init(); + + $connections = []; + + for ($i = 0; $i < 4; $i++) { + $connection = $this->pool->pop(); + $connections[] = $connection; + } + foreach ($connections as $connection) { + $this->pool->push($connection); + } + + $reflection = new \ReflectionClass($this->pool); + $parent = $reflection->getParentClass(); + /** @var \ReflectionClass|false $parent */ + if (false === $parent) { + self::fail('Cannot get parent class of pool'); + return; + } + + // bypass PhpStorm code analyse bug + $func = static function (\ReflectionClass $class, string $name): \ReflectionProperty { + return $class->getProperty($name); + }; + + $property = $func($parent, 'connections'); + $property->setAccessible(true); + + /** @var \SplObjectStorage|Connection[] $connections */ + $connections = $property->getValue($this->pool); + + // mark connections as expired + foreach ($connections as $connection) { + $connections[$connection] -= 2; + } + + Coroutine::sleep(0.010); + + self::assertSame(0, $this->pool->getIdleCount()); + self::assertSame(0, $this->pool->getTotalCount()); + + $stats = $this->pool->getStats(); + + self::assertSame(4, $stats->maxLifeTimeClosed); + + $connection = $this->pool->pop(); + $connections[$connection] = time() - 2; + $this->pool->push($connection); + + self::assertSame(0, $this->pool->getIdleCount()); + self::assertSame(0, $this->pool->getTotalCount()); + + $connection = $this->pool->pop(); + $this->pool->push($connection); + + $connections[$connection] = time() - 2; + $newConnection = $this->pool->pop(); + + self::assertNotSame($connection->getId(), $newConnection->getId()); } public function testIdleConnectionResolved(): void @@ -192,4 +265,203 @@ public function testBorrowDeadConnection(): void self::assertSame(1, $this->pool->getTotalCount()); self::assertSame(0, $this->pool->getIdleCount()); } + + public function testConnectionReset(): void + { + $this->pool->setResetConnections(true); + $this->pool->init(); + + $this->pool->push($this->pool->pop()); + + self::assertSame(322, $this->pool->pop()->getLastUsedAt()); + } + + public function testGetStats(): void + { + $this->pool->setMaxActive(2); + $this->pool->init(); + + $conn1 = $this->pool->pop(); + + $stats = $this->pool->getStats(); + + self::assertSame(2, $stats->maxActive); + self::assertSame(1, $stats->totalCount); + self::assertSame(1, $stats->inUse); + self::assertSame(0, $stats->idle); + self::assertSame(0, $stats->waitCount); + self::assertSame(0.0, $stats->waitDuration); + + $conn2 = $this->pool->pop(); + + $stats = $this->pool->getStats(); + + self::assertSame(2, $stats->maxActive); + self::assertSame(2, $stats->totalCount); + self::assertSame(2, $stats->inUse); + self::assertSame(0, $stats->idle); + self::assertSame(0, $stats->waitCount); + self::assertSame(0.0, $stats->waitDuration); + + $this->pool->push($conn2); + + $stats = $this->pool->getStats(); + + self::assertSame(2, $stats->maxActive); + self::assertSame(2, $stats->totalCount); + self::assertSame(1, $stats->inUse); + self::assertSame(1, $stats->idle); + self::assertSame(0, $stats->waitCount); + self::assertSame(0.0, $stats->waitDuration); + + $conn2 = $this->pool->pop(); + + $func = function () { + $conn = $this->pool->pop(); + Coroutine::sleep(0.001); + $this->pool->push($conn); + }; + + Coroutine::create($func); + Coroutine::create($func); + + $this->pool->push($conn1); + $this->pool->push($conn2); + + Coroutine::sleep(0.004); + + $stats = $this->pool->getStats(); + + self::assertSame(2, $stats->maxActive); + self::assertSame(2, $stats->totalCount); + self::assertSame(0, $stats->inUse); + self::assertSame(2, $stats->idle); + self::assertGreaterThan(0, $stats->waitDuration); + self::assertSame(2, $stats->waitCount); + self::assertSame(0, $stats->maxIdleTimeClosed); + self::assertSame(0, $stats->maxLifeTimeClosed); + } + + public function testResizeUp(): void + { + $this->pool->setMaxActive(1); + $this->pool->setMinActive(0); + $this->pool->setMaxWaitTime(0); + $this->pool->init(); + + $ch = new Coroutine\Channel(1); + + $connection = $this->pool->pop(); + + Coroutine::create(function () use ($ch) { + try { + $connection = $this->pool->pop(); + $this->pool->push($connection); + + $ch->push($connection); + } catch (Throwable $e) { + $ch->push($e); + } + }); + + $this->pool->setMaxActive(2); + + /** @var Connection $result */ + $result = $ch->pop(0.5); + + self::assertInstanceOf(Connection::class, $result); + self::assertSame(2, $result->getId()); + + $this->pool->push($connection); + + self::assertSame(2, $this->pool->getIdleCount()); + self::assertSame(2, $this->pool->getTotalCount()); + } + + public function testResizeDownWithEmptyIdle(): void + { + $this->pool->setMaxActive(2); + $this->pool->setMinActive(0); + $this->pool->init(); + + $connection1 = $this->pool->pop(); + $connection2 = $this->pool->pop(); + + self::assertSame(0, $this->pool->getIdleCount()); + self::assertSame(2, $this->pool->getTotalCount()); + + $this->pool->setMaxActive(1); + + $this->pool->push($connection1); + $this->pool->push($connection2); + + self::assertSame(1, $this->pool->getIdleCount()); + self::assertSame(1, $this->pool->getTotalCount()); + } + + public function testResizeDownWithFullIdle(): void + { + $initialMaxActive = 4; + + $this->pool->setMaxActive($initialMaxActive); + $this->pool->setMinActive(0); + $this->pool->init(); + + $connections = []; + + for ($i = 0; $i < $initialMaxActive; $i++) { + $connections[] = $this->pool->pop(); + } + foreach ($connections as $connection) { + $this->pool->push($connection); + } + + self::assertSame($initialMaxActive, $this->pool->getIdleCount()); + self::assertSame($initialMaxActive, $this->pool->getTotalCount()); + + $this->pool->setMaxActive(1); + + self::assertSame(1, $this->pool->getIdleCount()); + self::assertSame(1, $this->pool->getTotalCount()); + } + + public function testResizeDownWithActiveWaiting(): void + { + $this->pool->setMaxActive(2); + $this->pool->init(); + + $connection1 = $this->pool->pop(); + $connection2 = $this->pool->pop(); + + self::assertSame(0, $this->pool->getIdleCount()); + self::assertSame(2, $this->pool->getTotalCount()); + + $ch = new Coroutine\Channel(); + Coroutine::create(function () use ($ch) { + try { + $connection = $this->pool->pop(); + $ch->push($connection); + } catch (Throwable $e) { + $ch->push($e); + } + }); + + $this->pool->setMaxActive(1); + + Coroutine::create(function () use ($connection1, $connection2) { + $this->pool->push($connection1); + $this->pool->push($connection2); + }); + + /** @var Connection $result */ + $result = $ch->pop(0.5); + + self::assertInstanceOf(Connection::class, $result); + self::assertSame(1, $result->getId()); + + self::assertSame(Pool::PUSH_LIMIT_REACHED, $this->pool->push($result)); + + self::assertSame(1, $this->pool->getIdleCount()); + self::assertSame(1, $this->pool->getTotalCount()); + } } diff --git a/tests/Stub/Connection.php b/tests/Stub/Connection.php index cddcff0..c5de2e8 100644 --- a/tests/Stub/Connection.php +++ b/tests/Stub/Connection.php @@ -27,7 +27,7 @@ public function getId(): int return $this->id; } - public function close() + public function close(): void { if (!$this->isAlive) { return; diff --git a/tests/Stub/Pool.php b/tests/Stub/Pool.php index 51a3f2f..89bb6ba 100644 --- a/tests/Stub/Pool.php +++ b/tests/Stub/Pool.php @@ -30,13 +30,21 @@ public function pop(): Connection * * @param Connection|ConnectionInterface $connection */ - public function push(ConnectionInterface $connection): void + public function push(ConnectionInterface $connection): int { - parent::push($connection); + return parent::push($connection); } protected function createDefaultConnector(): ConnectorInterface { return new Connector(); } + + /** + * @param Connection $connection + */ + protected function resetConnection(ConnectionInterface $connection): void + { + $connection->setLastUsedAt(322); + } }