Skip to content

Commit

Permalink
Use lvmopstools.AsyncSocketHandler for TCP sources
Browse files Browse the repository at this point in the history
  • Loading branch information
albireox committed Jan 19, 2024
1 parent 2014da0 commit 1de8c52
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 58 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog

## Next version

### ✨ Improved

* Replaced `TCPSource` internals with the use of `lvmopstools.AsyncSocketHandler` which includes retrying and better error handling.


## 1.2.0 - November 24, 2023

### 🚀 Added
Expand Down
45 changes: 27 additions & 18 deletions cerebro/sources/lvm.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,17 @@ def __init__(

self.bucket = self.bucket or "sensors"

async def _read_internal(self) -> list[dict] | None:
if not self.writer or not self.reader:
return
async def _read_internal(
self,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
) -> list[dict] | None:
"""Queries the TCP server and returns a list of points."""

self.writer.write((f"status {self.address}\n").encode())
await self.writer.drain()
writer.write((f"status {self.address}\n").encode())
await writer.drain()

data = await asyncio.wait_for(self.reader.readline(), timeout=5)
data = await asyncio.wait_for(reader.readline(), timeout=5)

# Not found
if data == b"?\n":
Expand Down Expand Up @@ -131,14 +134,17 @@ def __init__(

self.bucket = self.bucket or "sensors"

async def _read_internal(self) -> list[dict] | None:
if not self.writer or not self.reader:
return
async def _read_internal(
self,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
) -> list[dict] | None:
"""Queries the TCP server and returns a list of points."""

self.writer.write((f"@{self.device_id:d}Q?\\").encode())
await self.writer.drain()
writer.write((f"@{self.device_id:d}Q?\\").encode())
await writer.drain()

data = await asyncio.wait_for(self.reader.readuntil(b"\\"), timeout=5)
data = await asyncio.wait_for(reader.readuntil(b"\\"), timeout=5)
data = data.decode()

m = re.match(
Expand Down Expand Up @@ -201,14 +207,17 @@ def __init__(
self.delay = delay
self.bucket = self.bucket or "sensors"

async def _read_internal(self) -> list[dict] | None:
if not self.writer or not self.reader:
return
async def _read_internal(
self,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
) -> list[dict] | None:
"""Queries the TCP server and returns a list of points."""

self.writer.write(("~*P*~\n").encode())
await self.writer.drain()
writer.write(("~*P*~\n").encode())
await writer.drain()

data = await asyncio.wait_for(self.reader.readline(), timeout=5)
data = await asyncio.wait_for(reader.readline(), timeout=5)
data = data.decode()

m = re.search(r"\s([\-0-9.]+)\slb", data)
Expand Down
60 changes: 21 additions & 39 deletions cerebro/sources/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,16 @@

import abc
import asyncio
from contextlib import suppress

from typing import Any, Dict, List, NamedTuple, Optional, Type

import rx
from lvmopstools.socket import AsyncSocketHandler
from rx.disposable.disposable import Disposable
from rx.subject.subject import Subject

from sdsstools.utils import cancel_task

from cerebro import log


Expand Down Expand Up @@ -156,6 +158,8 @@ def __init__(
host: str,
port: int,
delay: Optional[float] = None,
retry: bool = True,
retrier_params: dict = {},
**kwargs,
):
super().__init__(name, **kwargs)
Expand All @@ -168,6 +172,12 @@ def __init__(
self.reader: asyncio.StreamReader | None = None
self.writer: asyncio.StreamWriter | None = None

self.handler = AsyncSocketHandler(
self.host,
self.port,
retry=retry,
retrier_params=retrier_params,
)
self._runner: asyncio.Task | None = None

async def start(self):
Expand All @@ -185,16 +195,17 @@ async def stop(self):
if not self.running:
raise RuntimeError(f"{self.name}: source is not running.")

with suppress(asyncio.CancelledError):
if self._runner:
self._runner.cancel()
await self._runner
self._runner = None
await cancel_task(self._runner)
self._runner = None

super().stop()

@abc.abstractmethod
async def _read_internal(self) -> list[dict] | None:
async def _read_internal(
self,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
) -> list[dict] | None:
"""Queries the TCP server and returns a list of points."""

pass
Expand All @@ -207,41 +218,12 @@ async def read(self, delay=None):
while True:
# Connect to server
try:
if self.writer and self.writer.is_closing():
self.writer.close()
await self.writer.wait_closed()

self.reader, self.writer = await asyncio.open_connection(
self.host,
self.port,
)

except (
OSError,
ConnectionError,
ConnectionResetError,
ConnectionRefusedError,
) as err:
log.warning(f"{self.name}: {err}. Reconnecting in {delay} seconds.")
await asyncio.sleep(delay)
continue

except BaseException as err:
log.warning(f"{self.name}: Stopping after unknown error {err}.")
await self.stop()
return

# Communicate with server
try:
points = await self._read_internal()
if not self.reader.at_eof() and points is not None:
points = await self.handler(self._read_internal)
if points is not None:
self.on_next(DataPoints(data=points, bucket=self.bucket))

except asyncio.TimeoutError:
log.warning(f"{self.name}: timed out waiting for the server to reply.")

except Exception as err:
log.warning(f"{self.name}: {str(err)}")
log.warning(f"{self.name}: Error while reading device: {str(err)}")

finally:
await asyncio.sleep(delay)
Expand Down
37 changes: 36 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ rx = "^3.2.0"
pymysql = "^1.0.2"
peewee = "^3.15.4"
asyncudp = "^0.11.0"
lvmopstools = {git="https://github.com/sdss/lvmopstools", branch="main"}

[tool.poetry.dev-dependencies]
ipython = ">=8.0.0"
Expand Down

0 comments on commit 1de8c52

Please sign in to comment.