Skip to content

Commit

Permalink
convert to dict literals
Browse files Browse the repository at this point in the history
switch to bootstrap grid instead of table for tasmota editor
add hide option for sensor only devices, #166
  • Loading branch information
jneilliii committed Jan 22, 2022
1 parent d320b35 commit 4bc9a10
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 153 deletions.
140 changes: 59 additions & 81 deletions octoprint_tasmota/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def __init__(self, interval, function, args=None, kwargs=None, on_reset=None, on
if args is None:
args = []
if kwargs is None:
kwargs = dict()
kwargs = {}

self.interval = interval
self.function = function
Expand Down Expand Up @@ -200,8 +200,8 @@ def on_settings_save(self, data):

if self.powerOffWhenIdle != old_powerOffWhenIdle:
self._plugin_manager.send_plugin_message(self._identifier,
dict(powerOffWhenIdle=self.powerOffWhenIdle, type="timeout",
timeout_value=self._timeout_value))
{'powerOffWhenIdle': self.powerOffWhenIdle, 'type': "timeout",
'timeout_value': self._timeout_value})

if self.powerOffWhenIdle == True:
self._tasmota_logger.debug("Settings saved, Automatic Power Off Enabled, starting idle timer...")
Expand All @@ -220,7 +220,7 @@ def on_settings_save(self, data):
else:
self._tasmota_logger.setLevel(logging.INFO)

if old_polling_value != new_polling_value or old_polling_timer != new_polling_timer:
if (new_polling_value and not old_polling_value) or old_polling_timer != new_polling_timer:
if self.poll_status:
self.poll_status.cancel()

Expand All @@ -229,7 +229,7 @@ def on_settings_save(self, data):
self.poll_status.start()

def get_settings_version(self):
return 11
return 12

def on_settings_migrate(self, target, current=None):
if current is None or current < 6:
Expand Down Expand Up @@ -273,28 +273,33 @@ def on_settings_migrate(self, target, current=None):
plug["brightness"] = 50
arrSmartplugs_new.append(plug)
self._settings.set(["arrSmartplugs"], arrSmartplugs_new)
if current < 12:
# Add new fields
arrSmartplugs_new = []
for plug in self._settings.get(['arrSmartplugs']):
plug["is_sensor_only"] = False
arrSmartplugs_new.append(plug)
self._settings.set(["arrSmartplugs"], arrSmartplugs_new)

##~~ AssetPlugin mixin

def get_assets(self):
return dict(
js=["js/jquery-ui.min.js", "js/knockout-sortable.1.2.0.js", "js/fontawesome-iconpicker.js",
"js/ko.iconpicker.js", "js/plotly-latest.min.js", "js/knockout-bootstrap.min.js",
"js/ko.observableDictionary.js", "js/tasmota.js"],
css=["css/font-awesome.min.css", "css/font-awesome-v4-shims.min.css", "css/fontawesome-iconpicker.css",
"css/tasmota.css"]
)
return {'js': ["js/jquery-ui.min.js", "js/knockout-sortable.1.2.0.js", "js/fontawesome-iconpicker.js",
"js/ko.iconpicker.js", "js/plotly-latest.min.js", "js/knockout-bootstrap.min.js",
"js/ko.observableDictionary.js", "js/tasmota.js"],
'css': ["css/font-awesome.min.css", "css/font-awesome-v4-shims.min.css",
"css/fontawesome-iconpicker.css",
"css/tasmota.css"]}

##~~ TemplatePlugin mixin

def get_template_configs(self):
return [
dict(type="navbar", custom_bindings=True),
dict(type="settings", custom_bindings=True),
dict(type="tab", custom_bindings=True),
dict(type="sidebar", icon="plug", custom_bindings=True, data_bind="visible: show_sidebar",
template="tasmota_sidebar.jinja2", template_header="tasmota_sidebar_header.jinja2",
styles=["display: none"])
{'type': "navbar", 'custom_bindings': True},
{'type': "settings", 'custom_bindings': True},
{'type': "tab", 'custom_bindings': True},
{'type': "sidebar", 'icon': "plug", 'custom_bindings': True, 'data_bind': "visible: show_sidebar",
'template': "tasmota_sidebar.jinja2", 'template_header': "tasmota_sidebar_header.jinja2"}
]

##~~ ProgressPlugin mixin
Expand Down Expand Up @@ -335,15 +340,15 @@ def on_event(self, event, payload):
# Client Opened Event
if event == Events.CLIENT_OPENED:
self._plugin_manager.send_plugin_message(self._identifier,
dict(powerOffWhenIdle=self.powerOffWhenIdle, type="timeout",
timeout_value=self._timeout_value))
{'powerOffWhenIdle': self.powerOffWhenIdle, 'type': "timeout",
'timeout_value': self._timeout_value})
return

# Printer Connected Event
if event == Events.CONNECTED:
if self.thermal_runaway_triggered:
self._plugin_manager.send_plugin_message(self._identifier,
dict(thermal_runaway=True, type="connection"))
{'thermal_runaway': True, 'type': "connection"})
self._tasmota_logger.debug("thermal runaway event triggered prior to last connection.")
self.thermal_runaway_triggered = False
if self._autostart_file:
Expand Down Expand Up @@ -380,7 +385,7 @@ def on_event(self, event, payload):
self._tasmota_logger.debug(payload.get("path", None))
if self.thermal_runaway_triggered:
self._plugin_manager.send_plugin_message(self._identifier,
dict(thermal_runaway=True, type="connection"))
{'thermal_runaway': True, 'type': "connection"})
self._tasmota_logger.debug("thermal runaway event triggered prior to last connection.")
self.thermal_runaway_triggered = False
for plug in self._settings.get(["arrSmartplugs"]):
Expand All @@ -400,8 +405,8 @@ def on_event(self, event, payload):
self._reset_idle_timer()
self._timeout_value = None
self._plugin_manager.send_plugin_message(self._identifier,
dict(powerOffWhenIdle=self.powerOffWhenIdle, type="timeout",
timeout_value=self._timeout_value))
{'powerOffWhenIdle': self.powerOffWhenIdle, 'type': "timeout",
'timeout_value': self._timeout_value})

# Print Cancelled/Done Events
if event == Events.PRINT_DONE and self.print_job_started:
Expand All @@ -420,8 +425,8 @@ def on_event(self, event, payload):
self._tasmota_logger.debug("power total cost: %s" % power_cost)

self._storage_interface = self._file_manager._storage(payload.get("origin", "local"))
self._storage_interface.set_additional_metadata(payload.get("path"), "statistics", dict(
lastPowerCost=dict(_default=float('{:.4f}'.format(power_cost)))), merge=True)
self._storage_interface.set_additional_metadata(payload.get("path"), "statistics", {
'lastPowerCost': {'_default': float('{:.4f}'.format(power_cost))}}, merge=True)

self._autostart_file = None
self.print_job_power = 0.0
Expand Down Expand Up @@ -452,7 +457,7 @@ def turn_on(self, plugip, plugidx):
requests.get("http://{}/cm".format(plugip),
params={"user": plug["username"], "password": plug["password"], "cmnd": backlog_command},
timeout=self._settings.get_int(["request_timeout"]))
response = dict()
response = {}
response["POWER%s" % plug["idx"]] = "ON"
else:
webresponse = requests.get("http://{}/cm".format(plugip),
Expand All @@ -472,8 +477,8 @@ def turn_on(self, plugip, plugidx):
if plug["autoConnect"] and self._printer.is_closed_or_error():
self._logger.info(self._settings.global_get(['serial']))
c = threading.Timer(int(plug["autoConnectDelay"]), self._printer.connect,
kwargs=dict(port=self._settings.global_get(['serial', 'port']),
baudrate=self._settings.global_get(['serial', 'baudrate'])))
kwargs={'port': self._settings.global_get(['serial', 'port']),
'baudrate': self._settings.global_get(['serial', 'baudrate'])})
c.daemon = True
c.start()
if plug["sysCmdOn"]:
Expand All @@ -485,7 +490,8 @@ def turn_on(self, plugip, plugidx):
"Resetting idle timer since plug %s:%s was just turned on." % (plugip, plugidx))
self._waitForHeaters = False
self._reset_idle_timer()
self._plugin_manager.send_plugin_message(self._identifier, dict(currentState="on", ip=plugip, idx=plugidx))
self._plugin_manager.send_plugin_message(self._identifier,
{'currentState': "on", 'ip': plugip, 'idx': plugidx})

def turn_off(self, plugip, plugidx):
plug = self.plug_search(self._settings.get(["arrSmartplugs"]), "ip", plugip, "idx", plugidx)
Expand All @@ -499,7 +505,7 @@ def turn_off(self, plugip, plugidx):
requests.get("http://{}/cm".format(plugip),
params={"user": plug["username"], "password": plug["password"], "cmnd": backlog_command},
timeout=self._settings.get_int(["request_timeout"]))
response = dict()
response = {}
response["POWER%s" % plug["idx"]] = "OFF"
if plug["sysCmdOff"]:
self._tasmota_logger.debug(
Expand All @@ -521,7 +527,7 @@ def turn_off(self, plugip, plugidx):
chk = response["POWER%s" % plug["idx"]]
if chk.upper() in ["OFF", "0"]:
self._plugin_manager.send_plugin_message(self._identifier,
dict(currentState="off", ip=plugip, idx=plugidx))
{'currentState': "off", 'ip': plugip, 'idx': plugidx})
except:
self._tasmota_logger.error('Invalid ip or unknown error connecting to %s.' % plug["ip"], exc_info=True)
response = "Unknown error turning off %s index %s." % (plugip, plugidx)
Expand Down Expand Up @@ -634,15 +640,9 @@ def setSetOption26(self, plugip, username, password):
return response

def get_api_commands(self):
return dict(turnOn=["ip", "idx"],
turnOff=["ip", "idx"],
checkStatus=["ip", "idx"],
getEnergyData=[],
checkSetOption26=["ip", "username", "password"],
setSetOption26=["ip", "username", "password"],
enableAutomaticShutdown=[],
disableAutomaticShutdown=[],
abortAutomaticShutdown=[])
return {'turnOn': ["ip", "idx"], 'turnOff': ["ip", "idx"], 'checkStatus': ["ip", "idx"], 'getEnergyData': [],
'checkSetOption26': ["ip", "username", "password"], 'setSetOption26': ["ip", "username", "password"],
'enableAutomaticShutdown': [], 'disableAutomaticShutdown': [], 'abortAutomaticShutdown': []}

def on_api_command(self, command, data):
self._tasmota_logger.debug(data)
Expand Down Expand Up @@ -721,8 +721,8 @@ def on_api_command(self, command, data):
# eventManager().fire(Events.SETTINGS_UPDATED)
if command == "enableAutomaticShutdown" or command == "disableAutomaticShutdown" or command == "abortAutomaticShutdown":
self._plugin_manager.send_plugin_message(self._identifier,
dict(powerOffWhenIdle=self.powerOffWhenIdle, type="timeout",
timeout_value=self._timeout_value))
{'powerOffWhenIdle': self.powerOffWhenIdle, 'type': "timeout",
'timeout_value': self._timeout_value})

##~~ Gcode processing hook

Expand Down Expand Up @@ -751,8 +751,8 @@ def gcode_led(self, plugip, led_data):
led_data["LEDBlue"], led_data["LEDWhite"], plug["idx"])},
timeout=self._settings.get_int(["request_timeout"]))
self._plugin_manager.send_plugin_message(self._identifier,
dict(currentState="on", ip=plug["ip"], idx=plug["idx"],
color=led_data))
{'currentState': "on", 'ip': plug["ip"],
'idx': plug["idx"], 'color': led_data})
except Exception as e:
self._logger.debug("Error: {}".format(e))

Expand All @@ -761,7 +761,7 @@ def process_echo(self, comm, line, *args, **kwargs):
return line

plugip = None
workleds = dict(LEDRed=0, LEDBlue=0, LEDGreen=0, LEDWhite=0, LEDBrightness=-1)
workleds = {'LEDRed': 0, 'LEDBlue': 0, 'LEDGreen': 0, 'LEDWhite': 0, 'LEDBrightness': -1}
workval = line.upper().split()
for i in workval:
firstchar = str(i[0].upper())
Expand Down Expand Up @@ -830,12 +830,12 @@ def check_temps(self, parsed_temps):
for k, v in process_items:
if k == "B" and v[0] > int(self._settings.get(["thermal_runaway_max_bed"])):
self._tasmota_logger.debug("Max bed temp reached, shutting off plugs.")
self._plugin_manager.send_plugin_message(self._identifier, dict(thermal_runaway=True, type="bed"))
self._plugin_manager.send_plugin_message(self._identifier, {'thermal_runaway': True, 'type': "bed"})
self.thermal_runaway_triggered = True
if k.startswith("T") and v[0] > int(self._settings.get(["thermal_runaway_max_extruder"])):
self._tasmota_logger.debug("Extruder max temp reached, shutting off plugs.")
self._plugin_manager.send_plugin_message(self._identifier,
dict(thermal_runaway=True, type="extruder"))
{'thermal_runaway': True, 'type': "extruder"})
self.thermal_runaway_triggered = True
if self.thermal_runaway_triggered == True:
for plug in self._settings.get(['arrSmartplugs']):
Expand Down Expand Up @@ -1005,8 +1005,8 @@ def _timer_task(self):

self._timeout_value -= 1
self._plugin_manager.send_plugin_message(self._identifier,
dict(powerOffWhenIdle=self.powerOffWhenIdle, type="timeout",
timeout_value=self._timeout_value))
{'powerOffWhenIdle': self.powerOffWhenIdle, 'type': "timeout",
'timeout_value': self._timeout_value})
if self._timeout_value <= 0:
if self._abort_timer is not None:
self._abort_timer.cancel()
Expand Down Expand Up @@ -1050,42 +1050,20 @@ def deep_get(self, d, keys, default=None):

def get_additional_permissions(self, *args, **kwargs):
return [
dict(key="CONTROL",
name="Control Devices",
description=gettext("Allows control of configured devices."),
roles=["admin"],
dangerous=True,
default_groups=[ADMIN_GROUP])
{'key': "CONTROL", 'name': "Control Devices",
'description': gettext("Allows control of configured devices."), 'roles': ["admin"], 'dangerous': True,
'default_groups': [ADMIN_GROUP]}
]

##~~ Softwareupdate hook

def get_update_information(self):
return dict(
tasmota=dict(
displayName="Tasmota",
displayVersion=self._plugin_version,

# version check: github repository
type="github_release",
user="jneilliii",
repo="OctoPrint-Tasmota",
current=self._plugin_version,
stable_branch=dict(
name="Stable", branch="master", comittish=["master"]
),
prerelease_branches=[
dict(
name="Release Candidate",
branch="rc",
comittish=["rc", "master"],
)
],

# update method: pip
pip="https://github.com/jneilliii/OctoPrint-Tasmota/archive/{target_version}.zip"
)
)
return {'tasmota': {'displayName': "Tasmota", 'displayVersion': self._plugin_version, 'type': "github_release",
'user': "jneilliii", 'repo': "OctoPrint-Tasmota", 'current': self._plugin_version,
'stable_branch': {'name': "Stable", 'branch': "master", 'comittish': ["master"]},
'prerelease_branches': [
{'name': "Release Candidate", 'branch': "rc", 'comittish': ["rc", "master"]}
], 'pip': "https://github.com/jneilliii/OctoPrint-Tasmota/archive/{target_version}.zip"}}


__plugin_name__ = "Tasmota"
Expand Down
Loading

0 comments on commit 4bc9a10

Please sign in to comment.