Skip to content

Commit

Permalink
Merge pull request #117 from twomiao/master
Browse files Browse the repository at this point in the history
Router least connections
  • Loading branch information
walkor authored Dec 7, 2023
2 parents 3e86fba + 3cf3672 commit 152894c
Showing 1 changed file with 31 additions and 32 deletions.
63 changes: 31 additions & 32 deletions src/Gateway.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,18 @@ class Gateway extends Worker
const ROUTER_RANDOM = 'router_random';

/**
* 轮询负载均衡
* 最小连接数负载均衡模式
*
* @var string
*/
const ROUTER_ROUND_ROBIN = 'router_round_robin';
const ROUTER_LEAST_CONNECTIONS = 'router_least_connections';

/**
* 默认负载均衡模式轮询
* Gateway 默认负载均衡模式
*
* @var string $selectLoadBalancingMode
*/
public static $selectLoadBalancingMode = self::ROUTER_ROUND_ROBIN;
public static $selectLoadBalancingMode = self::ROUTER_LEAST_CONNECTIONS;

/**
* 本机 IP
Expand Down Expand Up @@ -172,12 +172,12 @@ class Gateway extends Worker
public $onBusinessWorkerClose = null;

/**
* 轮询负载均衡记录
* 最小连接数负载均衡记录表,用于新上线业务服务器负载足够均衡
* [ ip+businessworker key => 连接记录, ip+businessworker key => 连接记录, .... ]
*
* @var array
*/
protected static $roundRobinRecord = array();
protected static $leastConnectionsRecord = array();

/**
* 保存客户端的所有 connection 对象
Expand Down Expand Up @@ -480,46 +480,45 @@ public static function routerRand(array $worker_connections) : string
}

/**
* 轮询路由,返回 worker connection 标识
* 返回最少客户端连接数量的业务服务器标识
* 新上线服务器由于客户端连接数过低,会先分配给新服务器
*
* @throws \Exception
* @param array $roundRobinRecord
* @param array $leastConnections
* @return string
*/
protected static function routerRoundRobin(array $roundRobinRecord) : string
protected static function routerLeastConnectionsRecord(array $leastConnections) : string
{
if (empty($roundRobinRecord))
if (empty($leastConnections))
{
throw new \Exception("Round-robin record is empty.");
throw new \Exception("The routing record is empty.");
}

// min($roundRobinRecord) 返回连接数最少businessWorker 连接标识
return array_search(min($roundRobinRecord), $roundRobinRecord, true);
// 返回最少客户端连接数量的业务服务器地址
return array_search(min($leastConnections), $leastConnections, true);
}

/**
* @param array $roundRobinRecord
* @param array $worker_connections
* @param string $selectLoadBalancingMode
* @return string
* @throws \Exception
*/
protected static function businessWorkerAddress(array $roundRobinRecord, array $worker_connections, string $selectLoadBalancingMode)
protected static function businessWorkerAddress(array $worker_connections, string $selectLoadBalancingMode)
{
switch ($selectLoadBalancingMode)
{
case static::ROUTER_ROUND_ROBIN:
case static::ROUTER_LEAST_CONNECTIONS:
// 选择连接最少的businessWorker 服务器
$businessWorkerAddress = static::routerRoundRobin($roundRobinRecord);
// 增加轮询表记录
static::$roundRobinRecord[$businessWorkerAddress]++;
$businessWorkerAddress = static::routerLeastConnectionsRecord(static::$leastConnectionsRecord);
// 更新轮询表连接数量
static::$leastConnectionsRecord[$businessWorkerAddress]++;
return $businessWorkerAddress;
case static::ROUTER_RANDOM:
// 随机轮询
return static::routerRand($worker_connections);
default:
throw new \Exception("This type of load balancing mode is not supported.");
throw new \Exception("The load balancing mode is not supported.");
}
}

Expand All @@ -536,7 +535,7 @@ protected static function businessWorkerAddress(array $roundRobinRecord, array $
public static function routerBind($worker_connections, $client_connection, $cmd, $buffer)
{
if (!isset($client_connection->businessworker_address) || !isset($worker_connections[$client_connection->businessworker_address])) {
$client_connection->businessworker_address = static::businessWorkerAddress(static::$roundRobinRecord, $worker_connections, static::$selectLoadBalancingMode);
$client_connection->businessworker_address = static::businessWorkerAddress($worker_connections, static::$selectLoadBalancingMode);
}
return $worker_connections[$client_connection->businessworker_address];
}
Expand All @@ -551,14 +550,14 @@ public function onClientClose($connection)
// 尝试通知 worker,触发 Event::onClose
$this->sendToWorker(GatewayProtocol::CMD_ON_CLOSE, $connection);

// 轮询记录减少
if(static::$selectLoadBalancingMode === static::ROUTER_ROUND_ROBIN &&
// 客户端下线,更新路由表数据
if(static::$selectLoadBalancingMode === static::ROUTER_LEAST_CONNECTIONS &&
isset($connection->businessworker_address))
{
// 轮询记录 >0,减少连接数
if((static::$roundRobinRecord[$connection->businessworker_address]) > 0)
// 客户端连接数 >0,减少连接数
if((static::$leastConnectionsRecord[$connection->businessworker_address]) > 0)
{
static::$roundRobinRecord[$connection->businessworker_address]--;
static::$leastConnectionsRecord[$connection->businessworker_address]--;
}
}

Expand Down Expand Up @@ -691,9 +690,9 @@ public function onWorkerMessage($connection, $data)
$connection->key = $key;
$this->_workerConnections[$key] = $connection;
$connection->authorized = true;
// 轮询负载均衡初始化
if(static::$selectLoadBalancingMode === static::ROUTER_ROUND_ROBIN) {
static::$roundRobinRecord[$key] = 0;
// 新上线业务服务器,初始路由表为0
if(static::$selectLoadBalancingMode === static::ROUTER_LEAST_CONNECTIONS) {
static::$leastConnectionsRecord[$key] = 0;
}
if ($this->onBusinessWorkerConnected) {
call_user_func($this->onBusinessWorkerConnected, $connection);
Expand Down Expand Up @@ -1056,10 +1055,10 @@ public function onWorkerMessage($connection, $data)
public function onWorkerClose($connection)
{
if (isset($connection->key)) {
// 删除轮询记录
if (static::$selectLoadBalancingMode === static::ROUTER_ROUND_ROBIN)
// 业务服务器下线, 清理路由表数据
if (static::$selectLoadBalancingMode === static::ROUTER_LEAST_CONNECTIONS)
{
unset(static::$roundRobinRecord[$connection->key]);
unset(static::$leastConnectionsRecord[$connection->key]);
}
unset($this->_workerConnections[$connection->key]);
if ($this->onBusinessWorkerClose) {
Expand Down

0 comments on commit 152894c

Please sign in to comment.