From 3cf367215231f685a070ba7223db9dbd8e1fc062 Mon Sep 17 00:00:00 2001 From: twomiao <995200452@qq.com> Date: Thu, 7 Dec 2023 20:58:51 +0800 Subject: [PATCH] Router least connections --- src/Gateway.php | 63 ++++++++++++++++++++++++------------------------- 1 file changed, 31 insertions(+), 32 deletions(-) diff --git a/src/Gateway.php b/src/Gateway.php index 5b03f2c..582d47b 100644 --- a/src/Gateway.php +++ b/src/Gateway.php @@ -43,18 +43,18 @@ class Gateway extends Worker const ROUTER_RANDOM = 'router_random'; /** - * 轮询负载均衡 + * 最小连接数负载均衡模式 * * @var string */ - const ROUTER_ROUND_ROBIN = 'router_round_robin'; + const ROUTER_LEAST_CONNECTIONS = 'router_least_connections'; /** - * 默认负载均衡模式轮询 + * Gateway 默认负载均衡模式 * * @var string $selectLoadBalancingMode */ - public static $selectLoadBalancingMode = self::ROUTER_ROUND_ROBIN; + public static $selectLoadBalancingMode = self::ROUTER_LEAST_CONNECTIONS; /** * 本机 IP @@ -172,12 +172,12 @@ class Gateway extends Worker public $onBusinessWorkerClose = null; /** - * 轮询负载均衡记录 + * 最小连接数负载均衡记录表,用于新上线业务服务器负载足够均衡 * [ ip+businessworker key => 连接记录, ip+businessworker key => 连接记录, .... ] * * @var array */ - protected static $roundRobinRecord = array(); + protected static $leastConnectionsRecord = array(); /** * 保存客户端的所有 connection 对象 @@ -480,46 +480,45 @@ public static function routerRand(array $worker_connections) : string } /** - * 轮询路由,返回 worker connection 标识 + * 返回最少客户端连接数量的业务服务器标识 * 新上线服务器由于客户端连接数过低,会先分配给新服务器 * * @throws \Exception - * @param array $roundRobinRecord + * @param array $leastConnections * @return string */ - protected static function routerRoundRobin(array $roundRobinRecord) : string + protected static function routerLeastConnectionsRecord(array $leastConnections) : string { - if (empty($roundRobinRecord)) + if (empty($leastConnections)) { - throw new \Exception("Round-robin record is empty."); + throw new \Exception("The routing record is empty."); } - // min($roundRobinRecord) 返回连接数最少businessWorker 连接标识 - return array_search(min($roundRobinRecord), $roundRobinRecord, true); + // 返回最少客户端连接数量的业务服务器地址 + return array_search(min($leastConnections), $leastConnections, true); } /** - * @param array $roundRobinRecord * @param array $worker_connections * @param string $selectLoadBalancingMode * @return string * @throws \Exception */ - protected static function businessWorkerAddress(array $roundRobinRecord, array $worker_connections, string $selectLoadBalancingMode) + protected static function businessWorkerAddress(array $worker_connections, string $selectLoadBalancingMode) { switch ($selectLoadBalancingMode) { - case static::ROUTER_ROUND_ROBIN: + case static::ROUTER_LEAST_CONNECTIONS: // 选择连接最少的businessWorker 服务器 - $businessWorkerAddress = static::routerRoundRobin($roundRobinRecord); - // 增加轮询表记录 - static::$roundRobinRecord[$businessWorkerAddress]++; + $businessWorkerAddress = static::routerLeastConnectionsRecord(static::$leastConnectionsRecord); + // 更新轮询表连接数量 + static::$leastConnectionsRecord[$businessWorkerAddress]++; return $businessWorkerAddress; case static::ROUTER_RANDOM: // 随机轮询 return static::routerRand($worker_connections); default: - throw new \Exception("This type of load balancing mode is not supported."); + throw new \Exception("The load balancing mode is not supported."); } } @@ -536,7 +535,7 @@ protected static function businessWorkerAddress(array $roundRobinRecord, array $ public static function routerBind($worker_connections, $client_connection, $cmd, $buffer) { if (!isset($client_connection->businessworker_address) || !isset($worker_connections[$client_connection->businessworker_address])) { - $client_connection->businessworker_address = static::businessWorkerAddress(static::$roundRobinRecord, $worker_connections, static::$selectLoadBalancingMode); + $client_connection->businessworker_address = static::businessWorkerAddress($worker_connections, static::$selectLoadBalancingMode); } return $worker_connections[$client_connection->businessworker_address]; } @@ -551,14 +550,14 @@ public function onClientClose($connection) // 尝试通知 worker,触发 Event::onClose $this->sendToWorker(GatewayProtocol::CMD_ON_CLOSE, $connection); - // 轮询记录减少 - if(static::$selectLoadBalancingMode === static::ROUTER_ROUND_ROBIN && + // 客户端下线,更新路由表数据 + if(static::$selectLoadBalancingMode === static::ROUTER_LEAST_CONNECTIONS && isset($connection->businessworker_address)) { - // 轮询记录 >0,减少连接数 - if((static::$roundRobinRecord[$connection->businessworker_address]) > 0) + // 客户端连接数 >0,减少连接数 + if((static::$leastConnectionsRecord[$connection->businessworker_address]) > 0) { - static::$roundRobinRecord[$connection->businessworker_address]--; + static::$leastConnectionsRecord[$connection->businessworker_address]--; } } @@ -691,9 +690,9 @@ public function onWorkerMessage($connection, $data) $connection->key = $key; $this->_workerConnections[$key] = $connection; $connection->authorized = true; - // 轮询负载均衡初始化 - if(static::$selectLoadBalancingMode === static::ROUTER_ROUND_ROBIN) { - static::$roundRobinRecord[$key] = 0; + // 新上线业务服务器,初始路由表为0 + if(static::$selectLoadBalancingMode === static::ROUTER_LEAST_CONNECTIONS) { + static::$leastConnectionsRecord[$key] = 0; } if ($this->onBusinessWorkerConnected) { call_user_func($this->onBusinessWorkerConnected, $connection); @@ -1056,10 +1055,10 @@ public function onWorkerMessage($connection, $data) public function onWorkerClose($connection) { if (isset($connection->key)) { - // 删除轮询记录 - if (static::$selectLoadBalancingMode === static::ROUTER_ROUND_ROBIN) + // 业务服务器下线, 清理路由表数据 + if (static::$selectLoadBalancingMode === static::ROUTER_LEAST_CONNECTIONS) { - unset(static::$roundRobinRecord[$connection->key]); + unset(static::$leastConnectionsRecord[$connection->key]); } unset($this->_workerConnections[$connection->key]); if ($this->onBusinessWorkerClose) {