Skip to content

Commit

Permalink
refactor message processing
Browse files Browse the repository at this point in the history
  • Loading branch information
nekufa committed Feb 21, 2024
1 parent 0847898 commit 049974b
Showing 1 changed file with 53 additions and 57 deletions.
110 changes: 53 additions & 57 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -86,22 +86,21 @@ public function connect(): self
$dsn = "$config->host:$config->port";
$flags = STREAM_CLIENT_CONNECT;
$this->context = stream_context_create();
$this->socket = @stream_socket_client($dsn, $errorCode, $errorMessage, $config->timeout, $flags, $this->context);
$this->socket = @stream_socket_client($dsn, $error, $errorMessage, $config->timeout, $flags, $this->context);

if ($errorCode || !$this->socket) {
throw new Exception($errorMessage ?: "Connection error", $errorCode);
if ($error || !$this->socket) {
throw new Exception($errorMessage ?: "Connection error", $error);
}

$this->setTimeout($config->timeout);

// Process server info
$this->process($config->timeout);

$this->connect = new Connect($config->getOptions());

if ($this->name) {
$this->connect->name = $this->name;
}

$this->info = $this->process($config->timeout);
if (isset($this->info->nonce) && $this->authenticator) {
$this->connect->sig = $this->authenticator->sign($this->info->nonce);
$this->connect->nkey = $this->authenticator->getPublicKey();
Expand Down Expand Up @@ -294,66 +293,63 @@ public function process(null|int|float $timeout = 0, bool $reply = true, bool $c
throw $exception;
}

switch (get_class($message)) {
case Info::class:
$this->logger?->debug('receive ' . $line);
$this->handleInfoMessage($message);
return $this->info = $message;

case Msg::class:
$payload = '';
if (!($message instanceof Msg)) {
break;
}
if ($message->length) {
$iteration = 0;
while (strlen($payload) < $message->length) {
$payloadLine = $this->readLine($message->length, '', false);
if (!$payloadLine) {
if ($iteration > 16) {
$exception = new LogicException("No payload for message $message->sid");
$this->processSocketException($exception);
break;
}
$this->configuration->delay($iteration++);
continue;
}
if (strlen($payloadLine) != $message->length) {
$this->logger?->debug(
'got ' . strlen($payloadLine) . '/' . $message->length . ': ' . $payloadLine
);
}
$payload .= $payloadLine;
}
}
$message->parse($payload);
$this->logger?->debug('receive ' . $line . $payload);
if (!array_key_exists($message->sid, $this->handlers)) {
if ($this->skipInvalidMessages) {
return;
$payload = '';
if ($message instanceof Msg && $message->length) {
$iteration = 0;
while (strlen($payload) < $message->length) {
$payloadLine = $this->readLine($message->length, '', false);
if (!$payloadLine) {
if ($iteration > 16) {
$this->processSocketException(
new LogicException("No payload for message $message->sid")
);
break;
}
throw new LogicException("No handler for message $message->sid");
$this->configuration->delay($iteration++);
continue;
}
$result = $this->handlers[$message->sid]($message->payload, $message->replyTo);
if ($reply && $message->replyTo) {
$this->publish($message->replyTo, $result);
if (strlen($payloadLine) != $message->length) {
$this->logger?->debug(
'got ' . strlen($payloadLine) . '/' . $message->length . ': ' . $payloadLine
);
}
return $result;
$payload .= $payloadLine;
}

$message->parse($payload);
}

$this->logger?->debug('receive ' . $line . $payload);
return $this->onMessage($message, $reply);
}

/**
* @throws Exception
*/
private function handleInfoMessage(Info $info): void
protected function onMessage(Prototype $message, bool $reply)
{
if (isset($info->tls_verify) && $info->tls_verify) {
$this->enableTls(true);
} elseif (isset($info->tls_required) && $info->tls_required) {
$this->enableTls(false);
if ($message instanceof Info) {
if (isset($message->tls_verify) && $message->tls_verify) {
$this->enableTls(true);
} elseif (isset($message->tls_required) && $message->tls_required) {
$this->enableTls(false);
}
return $message;
}

if ($message instanceof Msg) {
if (!array_key_exists($message->sid, $this->handlers)) {
if ($this->skipInvalidMessages) {
return null;
}
throw new LogicException("No handler for message $message->sid");
}
$result = $this->handlers[$message->sid]($message->payload, $message->replyTo);
if ($reply && $message->replyTo) {
$this->publish($message->replyTo, $result);
}
return $result;
}
}

return null;
}

/**
*
Expand Down

0 comments on commit 049974b

Please sign in to comment.