diff --git a/octoprint_karmen/__init__.py b/octoprint_karmen/__init__.py index a271628..11a2e02 100644 --- a/octoprint_karmen/__init__.py +++ b/octoprint_karmen/__init__.py @@ -1,6 +1,7 @@ # coding=utf-8 from __future__ import absolute_import +import sentry_sdk import flask from websocket import WebSocketBadStatusException, WebSocketTimeoutException from octoprint.settings import settings @@ -8,6 +9,8 @@ from octoprint.util.version import is_octoprint_compatible from .connector import Connector from .utils import SentryWrapper, parse_path_whitelist, get_ip +from .utils.sentry import capture_exception + class KarmenPlugin( octoprint.plugin.SettingsPlugin, @@ -35,22 +38,23 @@ def get_settings_restricted_paths(self): } def get_template_vars(self): - host = 'localhost' if self.host == '::' else self.host - key = self._settings.get(["karmen_key"]) - if (key and len(key) <= 4): - key_redacted = key - else: - key_redacted = (key[:2] + "*" * (len(key) - 4) + key[-2:]) if key else None - return { - "is_octoprint_compatible": self.is_octoprint_compatible, - "ws_server": self._settings.get(["ws_server"]), - "path_whitelist": list(parse_path_whitelist(self._settings.get(["path_whitelist"]))), - "api_port": self.port, - "api_host": host, - "karmen_key_redacted": key_redacted, - "snapshot_url": settings().get(["webcam", "snapshot"]), - "sentry_opt": settings().get(["sentry_opt"]), - } + with capture_exception(): + host = 'localhost' if self.host == '::' else self.host + key = self._settings.get(["karmen_key"]) + if (key and len(key) <= 4): + key_redacted = key + else: + key_redacted = (key[:2] + "*" * (len(key) - 4) + key[-2:]) if key else None + return { + "is_octoprint_compatible": self.is_octoprint_compatible, + "ws_server": self._settings.get(["ws_server"]), + "path_whitelist": list(parse_path_whitelist(self._settings.get(["path_whitelist"]))), + "api_port": self.port, + "api_host": host, + "karmen_key_redacted": key_redacted, + "snapshot_url": settings().get(["webcam", "snapshot"]), + "sentry_opt": settings().get(["sentry_opt"]), + } def get_template_configs(self): return [dict(type="settings", custom_bindings=False)] @@ -99,9 +103,10 @@ def get_update_information(self): def on_settings_save(self, data): "update settings and reconnect" - octoprint.plugin.SettingsPlugin.on_settings_save(self, data) - self._logger.info("Settings saved") - self.ws_proxy_reconnect() + with capture_exception(): + octoprint.plugin.SettingsPlugin.on_settings_save(self, data) + self._logger.info("Settings saved") + self.ws_proxy_reconnect() def get_api_commands(self): @@ -109,9 +114,10 @@ def get_api_commands(self): def on_api_get(self, request): 'update status' - if 'update_status' in request.args: - self.send_status_message() - return flask.jsonify(**self.get_status('with_ip_address' in request.args)) + with capture_exception(): + if 'update_status' in request.args: + self.send_status_message() + return flask.jsonify(**self.get_status('with_ip_address' in request.args)) def ws_proxy_reconnect(self): "reload settings and reconnect" @@ -121,11 +127,18 @@ def ws_proxy_reconnect(self): else: self._connector.set_config(self.ws_get_connector_config()) if self._connector: - self._connector.reconnect() + try: + self._connector.reconnect() + except Exception as error: + sentry_sdk.capture_exception(error) + try: + self._connector.disconnect() + finally: + self._connector = None def ws_get_connector_config(self): "return connector or None if settings are not applicable" - self.sentry.init_context() + sentry_sdk.set_user({'id': self.key()}) ws_server_url = self._settings.get(["ws_server"]) key = self._settings.get(["karmen_key"]) @@ -191,26 +204,29 @@ def ws_get_connector(self): self._logger.error(error) self._connector = None else: - self._connector = Connector(self._logger, self.sentry, **connector_config) + self._connector = Connector(self._logger, **connector_config) self._connector.on_state_change = self.send_status_message return self._connector def on_startup(self, host, port): - self.is_octoprint_compatible = is_octoprint_compatible(">1.8") - self._connector = None - self.host = host - self.port = port + with capture_exception(): + self.is_octoprint_compatible = is_octoprint_compatible(">1.8") + self._connector = None + self.host = host + self.port = port def on_after_startup(self): - self.send_status_message() - self._logger.info("🍓 Karmen plugin is starting...") - if self.ws_get_connector(): - self._connector.connect() + with capture_exception(): + self.send_status_message() + self._logger.info("🍓 Karmen plugin is starting...") + if self.ws_get_connector(): + self._connector.connect() def on_shutdown(self): - self._logger.info("🍓 Karmen plugin shutdown...") - if self._connector: - self._connector.disconnect() + with capture_exception(): + self._logger.info("🍓 Karmen plugin shutdown...") + if self._connector: + self._connector.disconnect() def key(self): return self._settings.get(["karmen_key"]) diff --git a/octoprint_karmen/connector.py b/octoprint_karmen/connector.py index c18ad47..fc5ddbd 100644 --- a/octoprint_karmen/connector.py +++ b/octoprint_karmen/connector.py @@ -5,9 +5,12 @@ from dataclasses import dataclass from threading import Timer import io +import sentry_sdk import websocket import atexit from .utils.singleton import Singleton +from .utils.sentry import capture_exception +from .utils import wait_till_true from .request_forwarder import ( RequestForwarder, ForwarderMessage, @@ -46,7 +49,7 @@ class Config: class Connector(metaclass=Singleton): - def __init__(self, logger: logging.Logger, sentry, **config): + def __init__(self, logger: logging.Logger, **config): # self.__state = DISCONNECTED self._timeout = 3 @@ -60,7 +63,6 @@ def __init__(self, logger: logging.Logger, sentry, **config): self._reconnection_timer_lock = threading.Lock() self._heartbeat_clock = RepeatedTimer(self._timeout*3, logger, self._on_timer_tick) self.logger = logger - self.sentry = sentry self.config: Optional[Config] = None self.set_config(config) @@ -86,7 +88,8 @@ def set_state(self, new_state): self.__state = new_state self.logger.debug('Setting state to %s', self.state) self.state_condition.notify() - self.on_state_change() + if self.on_state_change: + self.on_state_change() def wait_for_state(self, *states, timeout=0.1): self.logger.debug("... waiting for %s state(s) (current: %s)", states, self.state) @@ -123,37 +126,75 @@ def connect(self): self.logger.info(f"Connecting to {self.config.ws_url}") with self.state_condition: self.wait_for_state(DISCONNECTED) - self.logger.debug("... connecting ...") - with self._reconnection_timer_lock: - if self._reconnection_timer: - self._reconnection_timer.cancel() - self._reconnection_timer = None - self.ws = self._get_websocket( - self.config.ws_url, - on_open=self.on_open, - on_message=self.on_message, - on_error=self.on_error, - on_close=self.on_close, - ) - self.request_forwarder = RequestForwarder(self.config.base_uri, self.ws, self.logger, self.config.path_whitelist, self.sentry) - self.logger.debug('... creating ws thread (was %r).', self.ws_thread) - self.ws_thread = threading.Thread( - name='WS Thread', - target=self.ws.run_forever, kwargs={ - "skip_utf8_validation": True, - }, - daemon=True, - ) - self.logger.debug('... starting websocket thread %r.', self.ws_thread) - self.set_state(CONNECTING) - self.ws_thread.start() - self.ws.sock.settimeout(self._timeout) - self.ping_pong = PingPonger(self.ws, self.logger, self.sentry) try: - self.wait_for_state(CONNECTED, timeout=1) - except InvalidStateException: - pass - return self.ws + self.logger.debug("... connecting ...") + with self._reconnection_timer_lock: + if self._reconnection_timer: + self._reconnection_timer.cancel() + self._reconnection_timer = None + self.ws = self._get_websocket( + self.config.ws_url, + on_open=self.on_open, + on_message=self.on_message, + on_error=self.on_error, + on_close=self.on_close, + ) + self.request_forwarder = RequestForwarder(self.config.base_uri, self.ws, self.logger, self.config.path_whitelist) + self.logger.debug('... creating ws thread (was %r).', self.ws_thread) + + def on_error(error): + """try to fix starting error state and recover + + When the code gets here, it means that there is something + really bad. This code tries the best to rescue the internal + state. + + The best solution for future would be to make rewrite whole + connector to run in a separate thread (rather than starting + just the unreliable websocket in thread. This would + simplify deisng and make it possible to restart plugin + without restarting Octoprint. + """ + self.last_error = error + # set to disconnected to prevent waiting for lock on state + try: + acquired = self.state_condition.wait(0) + if self.state == CONNECTING and acquired: + self.set_state(DISCONNECTED) + self.on_error(self.ws, error) + except RuntimeError: + self.on_error(self.ws, error) + self.on_close(self.ws, None, f'Unable to start thred {error}') + + def run_thread(*args, **kwargs): + 'proxy method to help sentry indentify exception source' + with capture_exception(reraise=False, on_error=on_error): + return self.ws.run_forever(*args, **kwargs) + + self.ws_thread = threading.Thread( + name='WS Thread', + target=run_thread, + kwargs={ + "skip_utf8_validation": True, + }, + daemon=True, + ) + self.logger.debug('... starting websocket thread %r.', self.ws_thread) + self.set_state(CONNECTING) + self.ws_thread.start() + # wait for the websocket's socket to be created + if wait_till_true(lambda: self.ws.sock): + self.ws.sock.settimeout(self._timeout) + self.ping_pong = PingPonger(self.ws, self.logger) + try: # just wait for connection to leave connect() when connected + self.wait_for_state(CONNECTED, timeout=1) + except InvalidStateException: + pass + return self.ws + except Exception as error: + if self.ws: + self.ws.close() + self.on_error(self.ws, error) def _get_websocket(self, *args, **kwargs) -> websocket.WebSocketApp: "test injection point" @@ -195,26 +236,12 @@ def _on_timer_tick(self): def on_message(self, ws, message): "process message" - try: + with capture_exception(reraise=False): data = ForwarderMessage(message) - except Exception as e: - logging.warning(e) - self.sentry.captureException(e) - return - if data.channel == 'ping-pong': - try: + if data.channel == 'ping-pong': self.ping_pong.handle_request(data) - except Exception as e: - logging.warning(e) - self.sentry.captureException(e) - return - else: - try: + else: self.request_forwarder.handle_request(data) - except Exception as e: - logging.error(e) - self.sentry.captureException(e) - def on_error(self, ws, error: Exception): """process error event from underlaying websocket @@ -223,10 +250,11 @@ def on_error(self, ws, error: Exception): """ self.logger.debug('on_error called') self.logger.exception(error) - self.sentry.captureException(error) + sentry_sdk.capture_exception(error) self.last_error = error - with self.state_condition: - self.set_state(DISCONNECTING) + if self.state != DISCONNECTED: + with self.state_condition: + self.set_state(DISCONNECTING) def on_close(self, ws, close_status_code=None, close_msg=None): self.logger.debug('on_close called') @@ -317,11 +345,10 @@ def stop(self): class PingPonger: - def __init__(self, ws, logger, sentry): + def __init__(self, ws, logger): self.logger = logger self.ws = ws self.gotPong = True - self.sentry = sentry def handle_request(self, message): if message.event == "pong": @@ -331,33 +358,30 @@ def handle_request(self, message): def ping(self, on_close): if not self.gotPong: self.logger.warning("No pong response received") - self.sentry.captureMessage("No pong received") - try: + with capture_exception(reraise=False): self.logger.warning("closing connection") on_close() return - except Exception as e: - self.logger.error("Unable to reconnect %s", e) - self.sentry.captureException(e) else: - self.logger.debug("Going to ping") - buf = io.BytesIO() - msg = { - "channel": str.encode("ping-pong"), - "event": str.encode("ping"), - "dataType": MessageType.NONE.value, - "data": b"", - } - - BufferMessage.pack(msg, buf) - buf.seek(0) - try: - self.ws.send(buf.read(), websocket.ABNF.OPCODE_BINARY) - except websocket.WebSocketException as error: - self.logger.info(f"Websocket exception {error} during ping. Closing connection.") - on_close() - - self.gotPong = False + with capture_exception(): + self.logger.debug("Going to ping") + buf = io.BytesIO() + msg = { + "channel": str.encode("ping-pong"), + "event": str.encode("ping"), + "dataType": MessageType.NONE.value, + "data": b"", + } + + BufferMessage.pack(msg, buf) + buf.seek(0) + try: + self.ws.send(buf.read(), websocket.ABNF.OPCODE_BINARY) + except websocket.WebSocketException as error: + self.logger.info(f"Websocket exception {error} during ping. Closing connection.") + on_close() + + self.gotPong = False class OnCloseTimer: diff --git a/octoprint_karmen/request_forwarder.py b/octoprint_karmen/request_forwarder.py index c914078..097082c 100644 --- a/octoprint_karmen/request_forwarder.py +++ b/octoprint_karmen/request_forwarder.py @@ -44,13 +44,12 @@ def __str__(self): class RequestForwarder: - def __init__(self, base_uri, ws, logger, path_whitelist, sentry): + def __init__(self, base_uri, ws, logger, path_whitelist): self.base_uri = base_uri self.ws = ws self._channels = {} self.logger = logger self.path_whitelist = path_whitelist - self.sentry = sentry def handle_request(self, message): channel_id = message.channel diff --git a/octoprint_karmen/test_connector.py b/octoprint_karmen/test_connector.py index e8d648b..3c9d02d 100644 --- a/octoprint_karmen/test_connector.py +++ b/octoprint_karmen/test_connector.py @@ -120,7 +120,6 @@ def get_websocket(*args, **kwargs): } connector = Connector( logger=logger, - sentry=MagicMock(), **config ) connector.set_config(config) diff --git a/octoprint_karmen/utils/__init__.py b/octoprint_karmen/utils/__init__.py index dd2d60e..8ad1f91 100644 --- a/octoprint_karmen/utils/__init__.py +++ b/octoprint_karmen/utils/__init__.py @@ -1,5 +1,17 @@ +import time from .sentry import SentryWrapper from .get_my_ip import get_ip + def parse_path_whitelist(path_whitelist_settings: str) -> tuple: return tuple(filter(None, path_whitelist_settings.split(';'))) + +def wait_till_true(condition: callable, max_timeout=0.5): + begin = time.monotonic() + result = False + while not result and time.monotonic() - begin <= max_timeout: + result = condition() + if result: + break + time.sleep(0.1) + return result diff --git a/octoprint_karmen/utils/sentry.py b/octoprint_karmen/utils/sentry.py index b479ec0..f92e47c 100644 --- a/octoprint_karmen/utils/sentry.py +++ b/octoprint_karmen/utils/sentry.py @@ -1,18 +1,25 @@ import logging import sentry_sdk +import traceback +from contextlib import contextmanager from sentry_sdk.integrations.threading import ThreadingIntegration from sentry_sdk.integrations.logging import LoggingIntegration -import requests + class SentryWrapper: def __init__(self, plugin): + def before_send(event, hint): + if not self.enabled(): + return None if 'exc_info' in hint: exc_type, exc_value, tb = hint['exc_info'] - if isinstance(exc_value, requests.exceptions.RequestException): - event['fingerprint'] = ['database-unavailable'] - return event + # exclude exceptions which does not contain /octoprint_karmen/ + # in list of source files in traceback + if any(filter(lambda frame: '/octoprint_karmen/' in frame[0], traceback.StackSummary.extract(traceback.walk_tb(tb), limit=1000))): + return event + return None self.plugin = plugin sentry_sdk.init( @@ -33,14 +40,14 @@ def before_send(event, hint): def enabled(self): return self.plugin._settings.get(["sentry_opt"]) != 'out' - def init_context(self): - if self.enabled(): - sentry_sdk.set_user({'id': self.plugin.key()}) - - def captureException(self, *args, **kwargs): - if self.enabled(): - sentry_sdk.capture_exception(*args, **kwargs) - def captureMessage(self, *args, **kwargs): - if self.enabled(): - sentry_sdk.capture_message(*args, **kwargs) +@contextmanager +def capture_exception(reraise=True, on_error=None): + try: + yield + except Exception as error: + if on_error: + on_error(error) + sentry_sdk.capture_exception(error) + if reraise: + raise diff --git a/octoprint_karmen/utils/tests.py b/octoprint_karmen/utils/tests.py index 07c3510..8bd423a 100644 --- a/octoprint_karmen/utils/tests.py +++ b/octoprint_karmen/utils/tests.py @@ -61,7 +61,8 @@ def run_forever(self, *args, **kwargs): event = self._m_received.pop() if event.name.startswith('_m_'): # run ws app method from within loop getattr(self, event.name[3:])(*event.args, **event.kwargs) - elif hasattr(self, f'on_{event.name}'): # pretend an event + elif getattr(self, f'on_{event.name}', None): # pretend an event + print(f'calling on_{event.name} {getattr(self, "on_%s" % event.name)}') getattr(self, f'on_{event.name}')(self, *event.args, **event.kwargs) time.sleep(0.01) if not self._m_run: diff --git a/setup.py b/setup.py index 70a075d..7dc9f53 100644 --- a/setup.py +++ b/setup.py @@ -14,7 +14,7 @@ plugin_name = "OctoPrint-Karmen" # The plugin's version. Can be overwritten within OctoPrint's internal data via __plugin_version__ in the plugin module -plugin_version = "0.7.0" +plugin_version = "0.8.0" # The plugin's description. Can be overwritten within OctoPrint's internal data via __plugin_description__ in the plugin # module